Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
index 978e13d..5a77a3e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
@@ -37,10 +37,12 @@ public class ExecutableFlow extends ExecutableFlowBase {
public static final String SUBMITTIME_PARAM = "submitTime";
public static final String VERSION_PARAM = "version";
public static final String PROXYUSERS_PARAM = "proxyUsers";
+ public static final String PROJECTNAME_PARAM = "projectName";
private int executionId = -1;
private int scheduleId = -1;
private int projectId;
+ private String projectName;
private int version;
private long submitTime = -1;
private String submitUser;
@@ -51,6 +53,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
public ExecutableFlow(Project project, Flow flow) {
this.projectId = project.getId();
+ this.projectName = project.getName();
this.version = project.getVersion();
this.scheduleId = -1;
@@ -86,6 +89,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
return executionOptions;
}
+ @Override
protected void setFlow(Project project, Flow flow) {
super.setFlow(project, flow);
executionOptions = new ExecutionOptions();
@@ -99,6 +103,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
}
}
+ @Override
public int getExecutionId() {
return executionId;
}
@@ -116,6 +121,11 @@ public class ExecutableFlow extends ExecutableFlowBase {
this.projectId = projectId;
}
+ @Override
+ public String getProjectName() {
+ return projectName;
+ }
+
public int getScheduleId() {
return scheduleId;
}
@@ -157,6 +167,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
this.submitTime = submitTime;
}
+ @Override
public Map<String, Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
fillMapFromExecutable(flowObj);
@@ -164,6 +175,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
flowObj.put(EXECUTIONID_PARAM, executionId);
flowObj.put(EXECUTIONPATH_PARAM, executionPath);
flowObj.put(PROJECTID_PARAM, projectId);
+ flowObj.put(PROJECTNAME_PARAM, projectName);
if (scheduleId >= 0) {
flowObj.put(SCHEDULEID_PARAM, scheduleId);
@@ -201,6 +213,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
this.executionPath = flowObj.getString(EXECUTIONPATH_PARAM);
this.projectId = flowObj.getInt(PROJECTID_PARAM);
+ this.projectName = flowObj.getString(PROJECTNAME_PARAM);
this.scheduleId = flowObj.getInt(SCHEDULEID_PARAM);
this.submitUser = flowObj.getString(SUBMITUSER_PARAM);
this.version = flowObj.getInt(VERSION_PARAM);
@@ -222,12 +235,14 @@ public class ExecutableFlow extends ExecutableFlowBase {
}
}
+ @Override
public Map<String, Object> toUpdateObject(long lastUpdateTime) {
Map<String, Object> updateData = super.toUpdateObject(lastUpdateTime);
updateData.put(EXECUTIONID_PARAM, this.executionId);
return updateData;
}
+ @Override
public void resetForRetry() {
super.resetForRetry();
this.setStatus(Status.RUNNING);
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
index d4b4b59..fb574fb 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
@@ -71,6 +71,14 @@ public class ExecutableFlowBase extends ExecutableNode {
return -1;
}
+ public String getProjectName() {
+ if (this.getParentFlow() != null) {
+ return this.getParentFlow().getProjectName();
+ }
+
+ return null;
+ }
+
public int getVersion() {
if (this.getParentFlow() != null) {
return this.getParentFlow().getVersion();
@@ -181,6 +189,7 @@ public class ExecutableFlowBase extends ExecutableNode {
return endNodes;
}
+ @Override
public Map<String, Object> toObject() {
Map<String, Object> mapObj = new HashMap<String, Object>();
fillMapFromExecutable(mapObj);
@@ -188,6 +197,7 @@ public class ExecutableFlowBase extends ExecutableNode {
return mapObj;
}
+ @Override
protected void fillMapFromExecutable(Map<String, Object> flowObjMap) {
super.fillMapFromExecutable(flowObjMap);
@@ -303,7 +313,7 @@ public class ExecutableFlowBase extends ExecutableNode {
List<Map<String, Object>> nodes =
(List<Map<String, Object>>) updateData
- .<Map<String, Object>> getList(NODES_PARAM);
+ .<Map<String, Object>> getList(NODES_PARAM);
if (nodes != null) {
for (Map<String, Object> node : nodes) {
TypedMapWrapper<String, Object> nodeWrapper =
diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index 5ead549..f2d2fc2 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -105,6 +105,11 @@ public class CommonJobProperties {
public static final String PROJECT_ID = "azkaban.flow.projectid";
/**
+ * The project name.
+ */
+ public static final String PROJECT_NAME = "azkaban.flow.projectname";
+
+ /**
* The version of the project the flow is running. This may change if a forced
* hotspot occurs.
*/
@@ -115,6 +120,12 @@ public class CommonJobProperties {
*/
public static final String FLOW_UUID = "azkaban.flow.uuid";
+ public static final String JOB_LINK = "azkaban.link.job.url";
+ public static final String WORKFLOW_LINK = "azkaban.link.workflow.url";
+ public static final String EXECUTION_LINK = "azkaban.link.execution.url";
+ public static final String JOBEXEC_LINK = "azkaban.link.jobexec.url";
+ public static final String ATTEMPT_LINK = "azkaban.link.attempt.url";
+
/**
* Properties for passing the flow start time to the jobs.
*/
@@ -130,4 +141,5 @@ public class CommonJobProperties {
"azkaban.flow.start.milliseconds";
public static final String FLOW_START_TIMEZONE =
"azkaban.flow.start.timezone";
+
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 064562f..b397ad4 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -282,6 +282,7 @@ public class PropsUtils {
props.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
props.put(CommonJobProperties.EXEC_ID, flow.getExecutionId());
props.put(CommonJobProperties.PROJECT_ID, flow.getProjectId());
+ props.put(CommonJobProperties.PROJECT_NAME, flow.getProjectName());
props.put(CommonJobProperties.PROJECT_VERSION, flow.getVersion());
props.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 2878f2f..89351ee 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -160,6 +160,15 @@ public class AzkabanExecutorServer {
}
/**
+ * Returns the currently executing executor server, if one exists.
+ *
+ * @return
+ */
+ public static AzkabanExecutorServer getApp() {
+ return app;
+ }
+
+ /**
* Azkaban using Jetty
*
* @param args
@@ -189,6 +198,7 @@ public class AzkabanExecutorServer {
Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
public void run() {
logger.info("Shutting down http server...");
try {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
index 5d7a996..3d9a0b1 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/JobRunner.java
@@ -20,10 +20,10 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.log4j.Appender;
import org.apache.log4j.EnhancedPatternLayout;
@@ -488,6 +488,8 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
}
+ insertLinks();
+
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
props.put(CommonJobProperties.JOB_METADATA_FILE,
createMetaDataFileName(node));
@@ -519,6 +521,33 @@ public class JobRunner extends EventHandler implements Runnable {
return true;
}
+ /**
+ * Add relevant links to the job properties so that downstream consumers may
+ * know what executions initiated their execution.
+ */
+ private void insertLinks() {
+ Props azkProps = AzkabanExecutorServer.getApp().getAzkabanProps();
+ String baseURL = azkProps.get("azkaban.webserver.url");
+ if (baseURL == null){
+ return;
+ }
+
+ String flowName = node.getParentFlow().getFlowId();
+ String projectName = node.getParentFlow().getProjectName();
+
+ props.put(CommonJobProperties.EXECUTION_LINK,
+ String.format("%s/executor?execid=%d", baseURL, executionId));
+ props.put(CommonJobProperties.JOBEXEC_LINK, String.format(
+ "%s/executor?execid=%d&job=%s", baseURL, executionId, jobId));
+ props.put(CommonJobProperties.ATTEMPT_LINK, String.format(
+ "%s/executor?execid=%d&job=%s&attempt=%d", baseURL, executionId, jobId, node.getAttempt()));
+ props.put(CommonJobProperties.WORKFLOW_LINK, String.format(
+ "%s/manager?project=%s&flow=%s", baseURL, projectName, flowName));
+ props.put(CommonJobProperties.JOB_LINK, String.format(
+ "%s/manager?project=%s&flow=%s&job=%s", baseURL, projectName, flowName,
+ jobId));
+ }
+
private void runJob() {
try {
job.run();