azkaban-uncached
Changes
src/java/azkaban/utils/PropsUtils.java 27(+16 -11)
Details
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 23379a9..9682f16 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -35,6 +35,7 @@ import azkaban.flow.FlowProps;
import azkaban.jobtype.JobTypeManager;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
public class FlowRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
@@ -101,6 +102,9 @@ public class FlowRunner extends EventHandler implements Runnable {
int version = flow.getVersion();
String flowId = flow.getFlowId();
+ // Add a bunch of common azkaban properties
+ PropsUtils.produceParentProperties(flow);
+
// Create execution dir
createLogger(flowId);
logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 0b8d3c6..0ab138b 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -31,6 +31,7 @@ import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.AbstractProcessJob;
import azkaban.jobExecutor.Job;
import azkaban.jobtype.JobTypeManager;
@@ -197,6 +198,7 @@ public class JobRunner extends EventHandler implements Runnable {
else {
logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
}
+ props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
node.setStatus(Status.RUNNING);
// Ability to specify working directory
@@ -247,7 +249,7 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
}
-
+
public Status getStatus() {
return node.getStatus();
}
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
new file mode 100644
index 0000000..319ce96
--- /dev/null
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -0,0 +1,97 @@
+package azkaban.flow;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+public class CommonJobProperties {
+ /*
+ * The following are Common properties that can be set in a job file
+ */
+
+ /**
+ * The type of job that will be executed.
+ * Examples: command, java, etc.
+ */
+ public static final String JOB_TYPE = "type";
+
+ /**
+ * Comma delimited list of job names which are dependencies
+ */
+ public static final String DEPENDENCIES = "dependencies";
+
+ /**
+ * The number of retries when this job has failed.
+ */
+ public static final String RETRIES = "retries";
+
+ /**
+ * The time in millisec to back off after every retry
+ */
+ public static final String RETRY_BACKOFF = "retry.backoff";
+
+ /**
+ * Comma delimited list of email addresses for both failure and success messages
+ */
+ public static final String NOTIFY_EMAILS = "notify.emails";
+
+ /**
+ * Comma delimited list of email addresses for success messages
+ */
+ public static final String SUCCESS_EMAILS = "success.emails";
+
+ /**
+ * Comma delimited list of email addresses for failure messages
+ */
+ public static final String FAILURE_EMAILS = "failure.emails";
+
+ /*
+ * The following are the common props that will be added to the job by azkaban
+ */
+
+ /**
+ * The attempt number of the executing job.
+ */
+ public static final String JOB_ATTEMPT = "azkaban.job.attempt";
+
+ /**
+ * The executing flow id
+ */
+ public static final String FLOW_ID = "azkaban.flow.flowid";
+
+ /**
+ * The execution id. This should be unique per flow, but may not be due to
+ * restarts.
+ */
+ public static final String EXEC_ID = "azkaban.flow.execid";
+
+ /**
+ * The numerical project id identifier.
+ */
+ public static final String PROJECT_ID = "azkaban.flow.projectid";
+
+ /**
+ * The version of the project the flow is running. This may change if a
+ * forced hotspot occurs.
+ */
+ public static final String PROJECT_VERSION = "azkaban.flow.projectversion";
+
+ /**
+ * A uuid assigned to every execution
+ */
+ public static final String FLOW_UUID = "azkaban.flow.uuid";
+
+ /**
+ * Properties for passing the flow start time to the jobs.
+ */
+ public static final String FLOW_START_TIMESTAMP = "azkaban.flow.start.timestamp";
+ public static final String FLOW_START_YEAR = "azkaban.flow.start.year";
+ public static final String FLOW_START_MONTH = "azkaban.flow.start.month";
+ public static final String FLOW_START_DAY = "azkaban.flow.start.day";
+ public static final String FLOW_START_HOUR = "azkaban.flow.start.hour";
+ public static final String FLOW_START_MINUTE = "azkaban.flow.start.minute";
+ public static final String FLOW_START_SECOND = "azkaban.flow.start.second";
+ public static final String FLOW_START_MILLISSECOND = "azkaban.flow.start.milliseconds";
+ public static final String FLOW_START_TIMEZONE = "azkaban.flow.start.timezone";
+
+}
diff --git a/src/java/azkaban/utils/DirectoryFlowLoader.java b/src/java/azkaban/utils/DirectoryFlowLoader.java
index 9e77a2e..d0f14fc 100644
--- a/src/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/src/java/azkaban/utils/DirectoryFlowLoader.java
@@ -14,6 +14,7 @@ import java.util.Set;
import org.apache.log4j.Logger;
+import azkaban.flow.CommonJobProperties;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
@@ -22,7 +23,6 @@ import azkaban.flow.Node;
public class DirectoryFlowLoader {
private static final DirFilter DIR_FILTER = new DirFilter();
private static final String PROPERTY_SUFFIX = ".properties";
- private static final String DEPENDENCIES = "dependencies";
private static final String JOB_SUFFIX = ".job";
private final Logger logger;
@@ -153,7 +153,7 @@ public class DirectoryFlowLoader {
continue;
}
- List<String> dependencyList = props.getStringList(DEPENDENCIES, (List<String>)null);
+ List<String> dependencyList = props.getStringList(CommonJobProperties.DEPENDENCIES, (List<String>)null);
if (dependencyList != null) {
Map<String, Edge> dependencies = nodeDependencies.get(node.getId());
@@ -217,21 +217,21 @@ public class DirectoryFlowLoader {
// Dedup with sets
@SuppressWarnings("unchecked")
- List<String> successEmailList = jobProp.getStringList("success.emails", Collections.EMPTY_LIST);
+ List<String> successEmailList = jobProp.getStringList(CommonJobProperties.SUCCESS_EMAILS, Collections.EMPTY_LIST);
Set<String> successEmail = new HashSet<String>();
for (String email: successEmailList) {
successEmail.add(email.toLowerCase());
}
@SuppressWarnings("unchecked")
- List<String> failureEmailList = jobProp.getStringList("failure.emails", Collections.EMPTY_LIST);
+ List<String> failureEmailList = jobProp.getStringList(CommonJobProperties.FAILURE_EMAILS, Collections.EMPTY_LIST);
Set<String> failureEmail = new HashSet<String>();
for (String email: failureEmailList) {
failureEmail.add(email.toLowerCase());
}
@SuppressWarnings("unchecked")
- List<String> notifyEmailList = jobProp.getStringList("notify.emails", Collections.EMPTY_LIST);
+ List<String> notifyEmailList = jobProp.getStringList(CommonJobProperties.NOTIFY_EMAILS, Collections.EMPTY_LIST);
for (String email: notifyEmailList) {
email = email.toLowerCase();
successEmail.add(email);
src/java/azkaban/utils/PropsUtils.java 27(+16 -11)
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index e97c221..86109bf 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -28,6 +28,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import azkaban.executor.ExecutableFlow;
+import azkaban.flow.CommonJobProperties;
+
import org.joda.time.DateTime;
public class PropsUtils {
@@ -188,20 +190,23 @@ public class PropsUtils {
public static Props produceParentProperties(final ExecutableFlow flow) {
Props parentProps = new Props();
- parentProps.put("azkaban.flow.id", flow.getFlowId());
- parentProps.put("azkaban.flow.uuid", UUID.randomUUID().toString());
+ parentProps.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
+ parentProps.put(CommonJobProperties.EXEC_ID, flow.getExecutionId());
+ parentProps.put(CommonJobProperties.PROJECT_ID, flow.getProjectId());
+ parentProps.put(CommonJobProperties.PROJECT_VERSION, flow.getVersion());
+ parentProps.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
DateTime loadTime = new DateTime();
- parentProps.put("azkaban.flow.start.timestamp", loadTime.toString());
- parentProps.put("azkaban.flow.start.year", loadTime.toString("yyyy"));
- parentProps.put("azkaban.flow.start.month", loadTime.toString("MM"));
- parentProps.put("azkaban.flow.start.day", loadTime.toString("dd"));
- parentProps.put("azkaban.flow.start.hour", loadTime.toString("HH"));
- parentProps.put("azkaban.flow.start.minute", loadTime.toString("mm"));
- parentProps.put("azkaban.flow.start.seconds", loadTime.toString("ss"));
- parentProps.put("azkaban.flow.start.milliseconds", loadTime.toString("SSS"));
- parentProps.put("azkaban.flow.start.timezone", loadTime.toString("ZZZZ"));
+ parentProps.put(CommonJobProperties.FLOW_START_TIMESTAMP, loadTime.toString());
+ parentProps.put(CommonJobProperties.FLOW_START_YEAR, loadTime.toString("yyyy"));
+ parentProps.put(CommonJobProperties.FLOW_START_MONTH, loadTime.toString("MM"));
+ parentProps.put(CommonJobProperties.FLOW_START_DAY, loadTime.toString("dd"));
+ parentProps.put(CommonJobProperties.FLOW_START_HOUR, loadTime.toString("HH"));
+ parentProps.put(CommonJobProperties.FLOW_START_MINUTE, loadTime.toString("mm"));
+ parentProps.put(CommonJobProperties.FLOW_START_SECOND, loadTime.toString("ss"));
+ parentProps.put(CommonJobProperties.FLOW_START_MILLISSECOND, loadTime.toString("SSS"));
+ parentProps.put(CommonJobProperties.FLOW_START_TIMEZONE, loadTime.toString("ZZZZ"));
return parentProps;
}