diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
index b4040c0..3492330 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
@@ -48,8 +48,11 @@ import azkaban.jobExecutor.Job;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
import azkaban.utils.Props;
+import azkaban.utils.StringUtils;
public class JobRunner extends EventHandler implements Runnable {
+ private static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
+
private final Layout DEFAULT_LAYOUT = new EnhancedPatternLayout(
"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
@@ -488,7 +491,7 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
}
- insertLinks();
+ insertJobMetadata();
insertJVMAargs();
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
@@ -534,9 +537,9 @@ public class JobRunner extends EventHandler implements Runnable {
String.format(
"-Dazkaban.flowid=%s -Dazkaban.execid=%s -Dazkaban.jobid=%s",
flowName, executionId, jobId);
-
- String previousJVMArgs = props.get(JavaProcessJob.JVM_PARAMS);
- jobJVMArgs += (previousJVMArgs == null) ? "" : " " + previousJVMArgs;
+
+ String previousJVMArgs = props.get(JavaProcessJob.JVM_PARAMS);
+ jobJVMArgs += (previousJVMArgs == null) ? "" : " " + previousJVMArgs;
logger.info("job JVM args: " + jobJVMArgs);
props.put(JavaProcessJob.JVM_PARAMS, jobJVMArgs);
@@ -546,28 +549,37 @@ public class JobRunner extends EventHandler implements Runnable {
* Add relevant links to the job properties so that downstream consumers may
* know what executions initiated their execution.
*/
- private void insertLinks() {
+ private void insertJobMetadata() {
Props azkProps = AzkabanExecutorServer.getApp().getAzkabanProps();
- String baseURL = azkProps.get("azkaban.webserver.url");
- if (baseURL == null) {
- return;
+ String baseURL = azkProps.get(AZKABAN_WEBSERVER_URL);
+ if (baseURL != null) {
+ String flowName = node.getParentFlow().getFlowId();
+ String projectName = node.getParentFlow().getProjectName();
+
+ props.put(CommonJobProperties.EXECUTION_LINK,
+ String.format("%s/executor?execid=%d", baseURL, executionId));
+ props.put(CommonJobProperties.JOBEXEC_LINK, String.format(
+ "%s/executor?execid=%d&job=%s", baseURL, executionId, jobId));
+ props.put(CommonJobProperties.ATTEMPT_LINK, String.format(
+ "%s/executor?execid=%d&job=%s&attempt=%d", baseURL, executionId,
+ jobId, node.getAttempt()));
+ props.put(CommonJobProperties.WORKFLOW_LINK, String.format(
+ "%s/manager?project=%s&flow=%s", baseURL, projectName, flowName));
+ props.put(CommonJobProperties.JOB_LINK, String.format(
+ "%s/manager?project=%s&flow=%s&job=%s", baseURL, projectName,
+ flowName, jobId));
+ } else {
+ if (logger != null) {
+ logger.info(AZKABAN_WEBSERVER_URL + " property was not set");
+ }
}
+ // out nodes
+ props.put(CommonJobProperties.OUT_NODES,
+ StringUtils.join2(node.getOutNodes(), ","));
- String flowName = node.getParentFlow().getFlowId();
- String projectName = node.getParentFlow().getProjectName();
-
- props.put(CommonJobProperties.EXECUTION_LINK,
- String.format("%s/executor?execid=%d", baseURL, executionId));
- props.put(CommonJobProperties.JOBEXEC_LINK, String.format(
- "%s/executor?execid=%d&job=%s", baseURL, executionId, jobId));
- props.put(CommonJobProperties.ATTEMPT_LINK, String.format(
- "%s/executor?execid=%d&job=%s&attempt=%d", baseURL, executionId, jobId,
- node.getAttempt()));
- props.put(CommonJobProperties.WORKFLOW_LINK, String.format(
- "%s/manager?project=%s&flow=%s", baseURL, projectName, flowName));
- props.put(CommonJobProperties.JOB_LINK, String.format(
- "%s/manager?project=%s&flow=%s&job=%s", baseURL, projectName, flowName,
- jobId));
+ // in nodes
+ props.put(CommonJobProperties.IN_NODES,
+ StringUtils.join2(node.getInNodes(), ","));
}
private void runJob() {