azkaban-uncached

Moving common job property keys to one java file. Less confusing

2/13/2013 6:50:39 PM

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 23379a9..9682f16 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -35,6 +35,7 @@ import azkaban.flow.FlowProps;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
 
 public class FlowRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
@@ -101,6 +102,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 			int version = flow.getVersion();
 			String flowId = flow.getFlowId();
 			
+			// Add a bunch of common azkaban properties
+			PropsUtils.produceParentProperties(flow);
+			
 			// Create execution dir
 			createLogger(flowId);
 			logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 0b8d3c6..0ab138b 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -31,6 +31,7 @@ import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.AbstractProcessJob;
 import azkaban.jobExecutor.Job;
 import azkaban.jobtype.JobTypeManager;
@@ -197,6 +198,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			else {
 				logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
 			}
+			props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
 			node.setStatus(Status.RUNNING);
 
 			// Ability to specify working directory
@@ -247,7 +249,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			}
 		}
 	}
-
+	
 	public Status getStatus() {
 		return node.getStatus();
 	}
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
new file mode 100644
index 0000000..319ce96
--- /dev/null
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -0,0 +1,97 @@
+package azkaban.flow;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+public class CommonJobProperties {
+	/*
+	 * The following are Common properties that can be set in a job file
+	 */
+	
+	/**
+	 * The type of job that will be executed.
+	 * Examples: command, java, etc.
+	 */
+	public static final String JOB_TYPE = "type";
+	
+	/**
+	 * Comma delimited list of job names which are dependencies
+	 */
+	public static final String DEPENDENCIES = "dependencies";
+	
+	/**
+	 * The number of retries when this job has failed.
+	 */
+	public static final String RETRIES = "retries";
+	
+	/**
+	 * The time in millisec to back off after every retry
+	 */
+	public static final String RETRY_BACKOFF = "retry.backoff";
+	
+	/**
+	 * Comma delimited list of email addresses for both failure and success messages
+	 */
+	public static final String NOTIFY_EMAILS = "notify.emails";
+	
+	/**
+	 * Comma delimited list of email addresses for success messages
+	 */
+	public static final String SUCCESS_EMAILS = "success.emails";
+	
+	/**
+	 * Comma delimited list of email addresses for failure messages
+	 */
+	public static final String FAILURE_EMAILS = "failure.emails";
+
+	/*
+	 * The following are the common props that will be added to the job by azkaban
+	 */
+	
+	/**
+	 * The attempt number of the executing job.
+	 */
+	public static final String JOB_ATTEMPT = "azkaban.job.attempt";
+	
+	/**
+	 * The executing flow id
+	 */
+	public static final String FLOW_ID = "azkaban.flow.flowid";
+	
+	/**
+	 * The execution id. This should be unique per flow, but may not be due to 
+	 * restarts.
+	 */
+	public static final String EXEC_ID = "azkaban.flow.execid";
+	
+	/**
+	 * The numerical project id identifier.
+	 */
+	public static final String PROJECT_ID = "azkaban.flow.projectid";
+	
+	/**
+	 * The version of the project the flow is running. This may change if a
+	 * forced hotspot occurs.
+	 */
+	public static final String PROJECT_VERSION = "azkaban.flow.projectversion";
+	
+	/**
+	 * A uuid assigned to every execution
+	 */
+	public static final String FLOW_UUID = "azkaban.flow.uuid";
+	
+	/**
+	 * Properties for passing the flow start time to the jobs.
+	 */
+	public static final String FLOW_START_TIMESTAMP = "azkaban.flow.start.timestamp";
+	public static final String FLOW_START_YEAR = "azkaban.flow.start.year";
+	public static final String FLOW_START_MONTH = "azkaban.flow.start.month";
+	public static final String FLOW_START_DAY = "azkaban.flow.start.day";
+	public static final String FLOW_START_HOUR = "azkaban.flow.start.hour";
+	public static final String FLOW_START_MINUTE = "azkaban.flow.start.minute";
+	public static final String FLOW_START_SECOND = "azkaban.flow.start.second";
+	public static final String FLOW_START_MILLISSECOND = "azkaban.flow.start.milliseconds";
+	public static final String FLOW_START_TIMEZONE = "azkaban.flow.start.timezone";
+
+}
diff --git a/src/java/azkaban/utils/DirectoryFlowLoader.java b/src/java/azkaban/utils/DirectoryFlowLoader.java
index 9e77a2e..d0f14fc 100644
--- a/src/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/src/java/azkaban/utils/DirectoryFlowLoader.java
@@ -14,6 +14,7 @@ import java.util.Set;
 
 import org.apache.log4j.Logger;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
@@ -22,7 +23,6 @@ import azkaban.flow.Node;
 public class DirectoryFlowLoader {
 	private static final DirFilter DIR_FILTER = new DirFilter();
 	private static final String PROPERTY_SUFFIX = ".properties";
-	private static final String DEPENDENCIES = "dependencies";
 	private static final String JOB_SUFFIX = ".job";
 	
 	private final Logger logger;
@@ -153,7 +153,7 @@ public class DirectoryFlowLoader {
 				continue;
 			}
 
-			List<String> dependencyList = props.getStringList(DEPENDENCIES, (List<String>)null);
+			List<String> dependencyList = props.getStringList(CommonJobProperties.DEPENDENCIES, (List<String>)null);
 			
 			if (dependencyList != null) {
 				Map<String, Edge> dependencies = nodeDependencies.get(node.getId());
@@ -217,21 +217,21 @@ public class DirectoryFlowLoader {
 				
 				// Dedup with sets
 				@SuppressWarnings("unchecked")
-				List<String> successEmailList = jobProp.getStringList("success.emails", Collections.EMPTY_LIST);
+				List<String> successEmailList = jobProp.getStringList(CommonJobProperties.SUCCESS_EMAILS, Collections.EMPTY_LIST);
 				Set<String> successEmail = new HashSet<String>();
 				for (String email: successEmailList) {
 					successEmail.add(email.toLowerCase());
 				}
 	
 				@SuppressWarnings("unchecked")
-				List<String> failureEmailList = jobProp.getStringList("failure.emails", Collections.EMPTY_LIST);
+				List<String> failureEmailList = jobProp.getStringList(CommonJobProperties.FAILURE_EMAILS, Collections.EMPTY_LIST);
 				Set<String> failureEmail = new HashSet<String>();
 				for (String email: failureEmailList) {
 					failureEmail.add(email.toLowerCase());
 				}
 				
 				@SuppressWarnings("unchecked")
-				List<String> notifyEmailList = jobProp.getStringList("notify.emails", Collections.EMPTY_LIST);
+				List<String> notifyEmailList = jobProp.getStringList(CommonJobProperties.NOTIFY_EMAILS, Collections.EMPTY_LIST);
 				for (String email: notifyEmailList) {
 					email = email.toLowerCase();
 					successEmail.add(email);
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index e97c221..86109bf 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -28,6 +28,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.flow.CommonJobProperties;
+
 import org.joda.time.DateTime;
 
 public class PropsUtils {
@@ -188,20 +190,23 @@ public class PropsUtils {
 	public static Props produceParentProperties(final ExecutableFlow flow) {
 		Props parentProps = new Props();
 
-		parentProps.put("azkaban.flow.id", flow.getFlowId());
-		parentProps.put("azkaban.flow.uuid", UUID.randomUUID().toString());
+		parentProps.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
+		parentProps.put(CommonJobProperties.EXEC_ID, flow.getExecutionId());
+		parentProps.put(CommonJobProperties.PROJECT_ID, flow.getProjectId());
+		parentProps.put(CommonJobProperties.PROJECT_VERSION, flow.getVersion());
+		parentProps.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
 
 		DateTime loadTime = new DateTime();
 
-		parentProps.put("azkaban.flow.start.timestamp", loadTime.toString());
-		parentProps.put("azkaban.flow.start.year", loadTime.toString("yyyy"));
-		parentProps.put("azkaban.flow.start.month", loadTime.toString("MM"));
-		parentProps.put("azkaban.flow.start.day", loadTime.toString("dd"));
-		parentProps.put("azkaban.flow.start.hour", loadTime.toString("HH"));
-		parentProps.put("azkaban.flow.start.minute", loadTime.toString("mm"));
-		parentProps.put("azkaban.flow.start.seconds", loadTime.toString("ss"));
-		parentProps.put("azkaban.flow.start.milliseconds", loadTime.toString("SSS"));
-		parentProps.put("azkaban.flow.start.timezone", loadTime.toString("ZZZZ"));
+		parentProps.put(CommonJobProperties.FLOW_START_TIMESTAMP, loadTime.toString());
+		parentProps.put(CommonJobProperties.FLOW_START_YEAR, loadTime.toString("yyyy"));
+		parentProps.put(CommonJobProperties.FLOW_START_MONTH, loadTime.toString("MM"));
+		parentProps.put(CommonJobProperties.FLOW_START_DAY, loadTime.toString("dd"));
+		parentProps.put(CommonJobProperties.FLOW_START_HOUR, loadTime.toString("HH"));
+		parentProps.put(CommonJobProperties.FLOW_START_MINUTE, loadTime.toString("mm"));
+		parentProps.put(CommonJobProperties.FLOW_START_SECOND, loadTime.toString("ss"));
+		parentProps.put(CommonJobProperties.FLOW_START_MILLISSECOND, loadTime.toString("SSS"));
+		parentProps.put(CommonJobProperties.FLOW_START_TIMEZONE, loadTime.toString("ZZZZ"));
 		return parentProps;
 	}