azkaban-developers

Merge pull request #259 from wagnermarkd/links Insert links

6/17/2014 5:45:57 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
index 978e13d..5a77a3e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
@@ -37,10 +37,12 @@ public class ExecutableFlow extends ExecutableFlowBase {
   public static final String SUBMITTIME_PARAM = "submitTime";
   public static final String VERSION_PARAM = "version";
   public static final String PROXYUSERS_PARAM = "proxyUsers";
+  public static final String PROJECTNAME_PARAM = "projectName";
 
   private int executionId = -1;
   private int scheduleId = -1;
   private int projectId;
+  private String projectName;
   private int version;
   private long submitTime = -1;
   private String submitUser;
@@ -51,6 +53,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
 
   public ExecutableFlow(Project project, Flow flow) {
     this.projectId = project.getId();
+    this.projectName = project.getName();
     this.version = project.getVersion();
     this.scheduleId = -1;
 
@@ -86,6 +89,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
     return executionOptions;
   }
 
+  @Override
   protected void setFlow(Project project, Flow flow) {
     super.setFlow(project, flow);
     executionOptions = new ExecutionOptions();
@@ -99,6 +103,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
     }
   }
 
+  @Override
   public int getExecutionId() {
     return executionId;
   }
@@ -116,6 +121,11 @@ public class ExecutableFlow extends ExecutableFlowBase {
     this.projectId = projectId;
   }
 
+  @Override
+  public String getProjectName() {
+    return projectName;
+  }
+
   public int getScheduleId() {
     return scheduleId;
   }
@@ -157,6 +167,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
     this.submitTime = submitTime;
   }
 
+  @Override
   public Map<String, Object> toObject() {
     HashMap<String, Object> flowObj = new HashMap<String, Object>();
     fillMapFromExecutable(flowObj);
@@ -164,6 +175,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
     flowObj.put(EXECUTIONID_PARAM, executionId);
     flowObj.put(EXECUTIONPATH_PARAM, executionPath);
     flowObj.put(PROJECTID_PARAM, projectId);
+    flowObj.put(PROJECTNAME_PARAM, projectName);
 
     if (scheduleId >= 0) {
       flowObj.put(SCHEDULEID_PARAM, scheduleId);
@@ -201,6 +213,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
     this.executionPath = flowObj.getString(EXECUTIONPATH_PARAM);
 
     this.projectId = flowObj.getInt(PROJECTID_PARAM);
+    this.projectName = flowObj.getString(PROJECTNAME_PARAM);
     this.scheduleId = flowObj.getInt(SCHEDULEID_PARAM);
     this.submitUser = flowObj.getString(SUBMITUSER_PARAM);
     this.version = flowObj.getInt(VERSION_PARAM);
@@ -222,12 +235,14 @@ public class ExecutableFlow extends ExecutableFlowBase {
     }
   }
 
+  @Override
   public Map<String, Object> toUpdateObject(long lastUpdateTime) {
     Map<String, Object> updateData = super.toUpdateObject(lastUpdateTime);
     updateData.put(EXECUTIONID_PARAM, this.executionId);
     return updateData;
   }
 
+  @Override
   public void resetForRetry() {
     super.resetForRetry();
     this.setStatus(Status.RUNNING);
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
index d4b4b59..fb574fb 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
@@ -71,6 +71,14 @@ public class ExecutableFlowBase extends ExecutableNode {
     return -1;
   }
 
+  public String getProjectName() {
+    if (this.getParentFlow() != null) {
+      return this.getParentFlow().getProjectName();
+    }
+
+    return null;
+  }
+
   public int getVersion() {
     if (this.getParentFlow() != null) {
       return this.getParentFlow().getVersion();
@@ -181,6 +189,7 @@ public class ExecutableFlowBase extends ExecutableNode {
     return endNodes;
   }
 
+  @Override
   public Map<String, Object> toObject() {
     Map<String, Object> mapObj = new HashMap<String, Object>();
     fillMapFromExecutable(mapObj);
@@ -188,6 +197,7 @@ public class ExecutableFlowBase extends ExecutableNode {
     return mapObj;
   }
 
+  @Override
   protected void fillMapFromExecutable(Map<String, Object> flowObjMap) {
     super.fillMapFromExecutable(flowObjMap);
 
@@ -303,7 +313,7 @@ public class ExecutableFlowBase extends ExecutableNode {
 
     List<Map<String, Object>> nodes =
         (List<Map<String, Object>>) updateData
-            .<Map<String, Object>> getList(NODES_PARAM);
+        .<Map<String, Object>> getList(NODES_PARAM);
     if (nodes != null) {
       for (Map<String, Object> node : nodes) {
         TypedMapWrapper<String, Object> nodeWrapper =
diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index 5ead549..f2d2fc2 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -105,6 +105,11 @@ public class CommonJobProperties {
   public static final String PROJECT_ID = "azkaban.flow.projectid";
 
   /**
+   * The project name.
+   */
+  public static final String PROJECT_NAME = "azkaban.flow.projectname";
+
+  /**
    * The version of the project the flow is running. This may change if a forced
    * hotspot occurs.
    */
@@ -115,6 +120,12 @@ public class CommonJobProperties {
    */
   public static final String FLOW_UUID = "azkaban.flow.uuid";
 
+  public static final String JOB_LINK = "azkaban.link.job.url";
+  public static final String WORKFLOW_LINK = "azkaban.link.workflow.url";
+  public static final String EXECUTION_LINK = "azkaban.link.execution.url";
+  public static final String JOBEXEC_LINK = "azkaban.link.jobexec.url";
+  public static final String ATTEMPT_LINK = "azkaban.link.attempt.url";
+
   /**
    * Properties for passing the flow start time to the jobs.
    */
@@ -130,4 +141,5 @@ public class CommonJobProperties {
       "azkaban.flow.start.milliseconds";
   public static final String FLOW_START_TIMEZONE =
       "azkaban.flow.start.timezone";
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 064562f..b397ad4 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -282,6 +282,7 @@ public class PropsUtils {
     props.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
     props.put(CommonJobProperties.EXEC_ID, flow.getExecutionId());
     props.put(CommonJobProperties.PROJECT_ID, flow.getProjectId());
+    props.put(CommonJobProperties.PROJECT_NAME, flow.getProjectName());
     props.put(CommonJobProperties.PROJECT_VERSION, flow.getVersion());
     props.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
 
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 2878f2f..89351ee 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -160,6 +160,15 @@ public class AzkabanExecutorServer {
   }
 
   /**
+   * Returns the currently executing executor server, if one exists.
+   * 
+   * @return
+   */
+  public static AzkabanExecutorServer getApp() {
+    return app;
+  }
+
+  /**
    * Azkaban using Jetty
    *
    * @param args
@@ -189,6 +198,7 @@ public class AzkabanExecutorServer {
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
 
+      @Override
       public void run() {
         logger.info("Shutting down http server...");
         try {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
index 5d7a996..3d9a0b1 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
@@ -20,10 +20,10 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.log4j.Appender;
 import org.apache.log4j.EnhancedPatternLayout;
@@ -488,6 +488,8 @@ public class JobRunner extends EventHandler implements Runnable {
         props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
       }
 
+      insertLinks();
+
       props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
       props.put(CommonJobProperties.JOB_METADATA_FILE,
           createMetaDataFileName(node));
@@ -519,6 +521,33 @@ public class JobRunner extends EventHandler implements Runnable {
     return true;
   }
 
+  /**
+   * Add relevant links to the job properties so that downstream consumers may
+   * know what executions initiated their execution.
+   */
+  private void insertLinks() {
+    Props azkProps = AzkabanExecutorServer.getApp().getAzkabanProps();
+    String baseURL = azkProps.get("azkaban.webserver.url");
+    if (baseURL == null){
+      return;
+    }
+
+    String flowName = node.getParentFlow().getFlowId();
+    String projectName = node.getParentFlow().getProjectName();
+
+    props.put(CommonJobProperties.EXECUTION_LINK,
+        String.format("%s/executor?execid=%d", baseURL, executionId));
+    props.put(CommonJobProperties.JOBEXEC_LINK, String.format(
+        "%s/executor?execid=%d&job=%s", baseURL, executionId, jobId));
+    props.put(CommonJobProperties.ATTEMPT_LINK, String.format(
+        "%s/executor?execid=%d&job=%s&attempt=%d", baseURL, executionId, jobId, node.getAttempt()));
+    props.put(CommonJobProperties.WORKFLOW_LINK, String.format(
+        "%s/manager?project=%s&flow=%s", baseURL, projectName, flowName));
+    props.put(CommonJobProperties.JOB_LINK, String.format(
+        "%s/manager?project=%s&flow=%s&job=%s", baseURL, projectName, flowName,
+        jobId));
+  }
+
   private void runJob() {
     try {
       job.run();