azkaban-developers

Merge pull request #295 from hluu/master Issue #285 - injecting

8/1/2014 3:07:52 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index f2d2fc2..9c5e972 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -125,6 +125,8 @@ public class CommonJobProperties {
   public static final String EXECUTION_LINK = "azkaban.link.execution.url";
   public static final String JOBEXEC_LINK = "azkaban.link.jobexec.url";
   public static final String ATTEMPT_LINK = "azkaban.link.attempt.url";
+  public static final String OUT_NODES = "azkaban.job.outnodes";
+  public static final String IN_NODES = "azkaban.job.innodes";
 
   /**
    * Properties for passing the flow start time to the jobs.
diff --git a/azkaban-common/src/main/java/azkaban/utils/StringUtils.java b/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
index 1a314b3..17792a0 100644
--- a/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/StringUtils.java
@@ -67,4 +67,25 @@ public class StringUtils {
     return buffer.toString();
   }
 
+  /**
+   * Don't bother to add delimiter for last element
+   * 
+   * @param list
+   * @param delimiter
+   * @return String - elements in the list separated by delimiter
+   */
+  public static String join2(Collection<String> list, String delimiter) {
+    StringBuffer buffer = new StringBuffer();
+    boolean first = true;
+    for (String str : list) {
+      if (!first) {
+        buffer.append(delimiter);
+      }
+      buffer.append(str);
+      first = false;
+
+    }
+
+    return buffer.toString();
+  }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
index b4040c0..3492330 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
@@ -48,8 +48,11 @@ import azkaban.jobExecutor.Job;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
 import azkaban.utils.Props;
+import azkaban.utils.StringUtils;
 
 public class JobRunner extends EventHandler implements Runnable {
+  private static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
+
   private final Layout DEFAULT_LAYOUT = new EnhancedPatternLayout(
       "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
 
@@ -488,7 +491,7 @@ public class JobRunner extends EventHandler implements Runnable {
         props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
       }
 
-      insertLinks();
+      insertJobMetadata();
       insertJVMAargs();
 
       props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
@@ -534,9 +537,9 @@ public class JobRunner extends EventHandler implements Runnable {
         String.format(
             "-Dazkaban.flowid=%s -Dazkaban.execid=%s -Dazkaban.jobid=%s",
             flowName, executionId, jobId);
-    
-    String previousJVMArgs = props.get(JavaProcessJob.JVM_PARAMS);    
-    jobJVMArgs += (previousJVMArgs == null) ? "" : " " + previousJVMArgs; 
+
+    String previousJVMArgs = props.get(JavaProcessJob.JVM_PARAMS);
+    jobJVMArgs += (previousJVMArgs == null) ? "" : " " + previousJVMArgs;
 
     logger.info("job JVM args: " + jobJVMArgs);
     props.put(JavaProcessJob.JVM_PARAMS, jobJVMArgs);
@@ -546,28 +549,37 @@ public class JobRunner extends EventHandler implements Runnable {
    * Add relevant links to the job properties so that downstream consumers may
    * know what executions initiated their execution.
    */
-  private void insertLinks() {
+  private void insertJobMetadata() {
     Props azkProps = AzkabanExecutorServer.getApp().getAzkabanProps();
-    String baseURL = azkProps.get("azkaban.webserver.url");
-    if (baseURL == null) {
-      return;
+    String baseURL = azkProps.get(AZKABAN_WEBSERVER_URL);
+    if (baseURL != null) {
+      String flowName = node.getParentFlow().getFlowId();
+      String projectName = node.getParentFlow().getProjectName();
+
+      props.put(CommonJobProperties.EXECUTION_LINK,
+          String.format("%s/executor?execid=%d", baseURL, executionId));
+      props.put(CommonJobProperties.JOBEXEC_LINK, String.format(
+          "%s/executor?execid=%d&job=%s", baseURL, executionId, jobId));
+      props.put(CommonJobProperties.ATTEMPT_LINK, String.format(
+          "%s/executor?execid=%d&job=%s&attempt=%d", baseURL, executionId,
+          jobId, node.getAttempt()));
+      props.put(CommonJobProperties.WORKFLOW_LINK, String.format(
+          "%s/manager?project=%s&flow=%s", baseURL, projectName, flowName));
+      props.put(CommonJobProperties.JOB_LINK, String.format(
+          "%s/manager?project=%s&flow=%s&job=%s", baseURL, projectName,
+          flowName, jobId));
+    } else {
+      if (logger != null) {
+        logger.info(AZKABAN_WEBSERVER_URL + " property was not set");
+      }
     }
+    // out nodes
+    props.put(CommonJobProperties.OUT_NODES,
+        StringUtils.join2(node.getOutNodes(), ","));
 
-    String flowName = node.getParentFlow().getFlowId();
-    String projectName = node.getParentFlow().getProjectName();
-
-    props.put(CommonJobProperties.EXECUTION_LINK,
-        String.format("%s/executor?execid=%d", baseURL, executionId));
-    props.put(CommonJobProperties.JOBEXEC_LINK, String.format(
-        "%s/executor?execid=%d&job=%s", baseURL, executionId, jobId));
-    props.put(CommonJobProperties.ATTEMPT_LINK, String.format(
-        "%s/executor?execid=%d&job=%s&attempt=%d", baseURL, executionId, jobId,
-        node.getAttempt()));
-    props.put(CommonJobProperties.WORKFLOW_LINK, String.format(
-        "%s/manager?project=%s&flow=%s", baseURL, projectName, flowName));
-    props.put(CommonJobProperties.JOB_LINK, String.format(
-        "%s/manager?project=%s&flow=%s&job=%s", baseURL, projectName, flowName,
-        jobId));
+    // in nodes
+    props.put(CommonJobProperties.IN_NODES,
+        StringUtils.join2(node.getInNodes(), ","));
   }
 
   private void runJob() {