azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 387(+186 -201)
src/java/azkaban/execapp/JobRunner.java 13(+9 -4)
src/java/azkaban/executor/ExecutableFlow.java 44(+28 -16)
src/java/azkaban/executor/Status.java 10(+10 -0)
src/java/azkaban/utils/Props.java 7(+6 -1)
src/java/azkaban/utils/PropsUtils.java 34(+17 -17)
unit/executions/embeddedBad/joba.job 4(+4 -0)
unit/executions/embeddedBad/jobb.job 3(+3 -0)
unit/executions/embeddedBad/jobc.job 3(+3 -0)
unit/executions/embeddedBad/jobd.job 3(+3 -0)
unit/executions/embeddedBad/jobe.job 5(+5 -0)
Details
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index 4ba5889..cbf6333 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -44,9 +44,9 @@ public class LocalFlowWatcher extends FlowWatcher {
if (data instanceof ExecutableNode) {
ExecutableNode node = (ExecutableNode)data;
- if (node.getId()) {
-
- }
+// if (node.getId()) {
+//
+// }
handleJobFinished(node.getId(), node.getStatus());
}
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
index cecf1bc..7beb47a 100644
--- a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -1,6 +1,6 @@
package azkaban.execapp.event;
-import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
@@ -11,7 +11,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
private int execId;
private ExecutorLoader loader;
- private ExecutableFlowBase flow;
+ private ExecutableFlow flow;
private RemoteUpdaterThread thread;
private boolean isShutdown = false;
@@ -46,7 +46,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
@Override
public void run() {
do {
- ExecutableFlowBase updateFlow = null;
+ ExecutableFlow updateFlow = null;
try {
updateFlow = loader.fetchExecutableFlow(execId);
} catch (ExecutorManagerException e) {
src/java/azkaban/execapp/FlowRunner.java 387(+186 -201)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 38ece4a..0259777 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -57,6 +57,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private ExecutableFlow flow;
private Thread flowRunnerThread;
private int numJobThreads = 10;
+ private ExecutionOptions.FailureAction failureAction;
// Sync object for queuing
private Object mainSyncObj = new Object();
@@ -66,13 +67,13 @@ public class FlowRunner extends EventHandler implements Runnable {
private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
private Props globalProps;
- private Props commonProps;
+// private Props commonProps;
private final JobTypeManager jobtypeManager;
private JobRunnerEventListener listener = new JobRunnerEventListener();
private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
private Map<String, JobRunner> activeJobRunners = new ConcurrentHashMap<String, JobRunner>();
-
+
// Used for pipelining
private Integer pipelineLevel = null;
private Integer pipelineExecId = null;
@@ -92,6 +93,10 @@ public class FlowRunner extends EventHandler implements Runnable {
private boolean flowCancelled = false;
public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
+ this(flow, executorLoader, projectLoader, jobtypeManager, null);
+ }
+
+ public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager, ExecutorService executorService) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
@@ -102,8 +107,9 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutionOptions options = flow.getExecutionOptions();
this.pipelineLevel = options.getPipelineLevel();
this.pipelineExecId = options.getPipelineExecutionId();
-
+ this.failureAction = options.getFailureAction();
this.proxyUsers = flow.getProxyUsers();
+ this.executorService = executorService;
}
public FlowRunner setFlowWatcher(FlowWatcher watcher) {
@@ -176,13 +182,28 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+ @SuppressWarnings("unchecked")
private void setupFlowExecution() {
int projectId = flow.getProjectId();
int version = flow.getVersion();
String flowId = flow.getFlowId();
// Add a bunch of common azkaban properties
- commonProps = PropsUtils.addCommonFlowProperties(flow);
+ Props commonFlowProps = PropsUtils.addCommonFlowProperties(this.globalProps, flow);
+
+ if (flow.getJobSource() != null) {
+ String source = flow.getJobSource();
+ Props flowProps = sharedProps.get(source);
+ flowProps.setParent(commonFlowProps);
+ commonFlowProps = flowProps;
+ }
+
+ // If there are flow overrides, we apply them now.
+ Map<String,String> flowParam = flow.getExecutionOptions().getFlowParameters();
+ if (flowParam != null && !flowParam.isEmpty()) {
+ commonFlowProps = new Props(commonFlowProps, flowParam);
+ }
+ flow.setInputProps(commonFlowProps);
// Create execution dir
createLogger(flowId);
@@ -199,7 +220,6 @@ public class FlowRunner extends EventHandler implements Runnable {
// The current thread is used for interrupting blocks
flowRunnerThread = Thread.currentThread();
flowRunnerThread.setName("FlowRunner-exec-" + flow.getExecutionId());
-
}
private void updateFlowReference() throws ExecutorManagerException {
@@ -274,11 +294,6 @@ public class FlowRunner extends EventHandler implements Runnable {
props.setParent(inherits);
}
- else {
- String source = fprops.getSource();
- Props props = sharedProps.get(source);
- props.setParent(globalProps);
- }
}
}
@@ -303,47 +318,8 @@ public class FlowRunner extends EventHandler implements Runnable {
continue;
}
else {
- List<ExecutableNode> jobsReadyToRun = findReadyJobsToRun();
-
- if (!jobsReadyToRun.isEmpty() && !flowCancelled) {
- for (ExecutableNode node : jobsReadyToRun) {
- long currentTime = System.currentTimeMillis();
-
- // Queue a job only if it's ready to run.
- if (node.getStatus() == Status.READY) {
- // Collect output props from the job's dependencies.
- Props outputProps = collectOutputProps(node);
- node.setStatus(Status.QUEUED);
- JobRunner runner = createJobRunner(node, outputProps);
- logger.info("Submitting job " + node.getId() + " to run.");
- try {
- executorService.submit(runner);
- jobRunners.put(node.getId(), runner);
- activeJobRunners.put(node.getId(), runner);
- } catch (RejectedExecutionException e) {
- logger.error(e);
- };
-
- } // If killed, then auto complete and KILL
- else if (node.getStatus() == Status.KILLED) {
- logger.info("Killing " + node.getId() + " due to prior errors.");
- node.setStartTime(currentTime);
- node.setEndTime(currentTime);
- fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
- } // If disabled, then we auto skip
- else if (node.getStatus() == Status.DISABLED) {
- logger.info("Skipping disabled job " + node.getId() + ".");
- node.setStartTime(currentTime);
- node.setEndTime(currentTime);
- node.setStatus(Status.SKIPPED);
- fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
- }
- }
-
- updateFlow();
- }
- else {
- if (isFlowFinished() || flowCancelled ) {
+ if (!progressGraph(flow)) {
+ if (flow.isFlowFinished() || flowCancelled ) {
flowFinished = true;
break;
}
@@ -364,18 +340,7 @@ public class FlowRunner extends EventHandler implements Runnable {
activeRunner.cancel();
}
- for (ExecutableNode node: flow.getExecutableNodes()) {
- if (Status.isStatusFinished(node.getStatus())) {
- continue;
- }
- else if (node.getStatus() == Status.DISABLED) {
- node.setStatus(Status.SKIPPED);
- }
- else {
- node.setStatus(Status.KILLED);
- }
- fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
- }
+ flow.killNode(System.currentTimeMillis());
} catch (Exception e) {
logger.error(e);
}
@@ -403,125 +368,197 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- private List<ExecutableNode> findReadyJobsToRun() {
- ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
- for (ExecutableNode node : flow.getExecutableNodes()) {
- if (Status.isStatusFinished(node.getStatus())) {
- continue;
- }
- else {
- // Check the dependencies to see if execution conditions are met,
- // and what the status should be set to.
- Status impliedStatus = getImpliedStatus(node);
- if (getImpliedStatus(node) != null) {
- node.setStatus(impliedStatus);
- jobsToRun.add(node);
- }
- }
- }
-
- return jobsToRun;
- }
+ private boolean progressGraph(ExecutableFlowBase flow) throws IOException {
+ List<ExecutableNode> jobsReadyToRun = flow.findNextJobsToRun();
- private List<ExecutableNode> findReadyJobsToRun(ExecutableFlowBase flow) {
- ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
- for (ExecutableNode node : flow.getExecutableNodes()) {
- if (Status.isStatusFinished(node.getStatus())) {
- continue;
- }
- else {
- // Check the dependencies to see if execution conditions are met,
- // and what the status should be set to.
- Status impliedStatus = getImpliedStatus(node);
- if (getImpliedStatus(node) != null) {
- node.setStatus(impliedStatus);
- jobsToRun.add(node);
+ if (!jobsReadyToRun.isEmpty()) {
+ long currentTime = System.currentTimeMillis();
+ for (ExecutableNode node: jobsReadyToRun) {
+ Status nextStatus = getImpliedStatus(node);
+
+ // If the flow has seen previous failures and the flow has been cancelled, than
+ if (nextStatus == Status.KILLED) {
+ logger.info("Killing " + node.getId() + " due to prior errors.");
+ node.killNode(currentTime);
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+ }
+ else if (nextStatus == Status.DISABLED) {
+ logger.info("Skipping disabled job " + node.getId() + ".");
+ node.skipNode(currentTime);
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+ }
+ else {
+ runExecutableNode(node);
}
}
+
+ updateFlow();
+ return true;
}
- return jobsToRun;
+ return false;
}
- private boolean isFlowFinished() {
- if (!activeJobRunners.isEmpty()) {
- return false;
+ private void prepareJobProperties(ExecutableNode node) throws IOException {
+ Props props = null;
+ // The following is the hiearchical ordering of dependency resolution
+ // 1. Parent Flow Properties
+ ExecutableFlowBase parentFlow = node.getParentFlow();
+ if (parentFlow != null) {
+ props = parentFlow.getInputProps();
}
- for (String end: flow.getEndNodes()) {
- ExecutableNode node = flow.getExecutableNode(end);
- if (!Status.isStatusFinished(node.getStatus()) ) {
- return false;
+ // 2. Shared Properties
+ String sharedProps = node.getPropsSource();
+ if (sharedProps != null) {
+ Props shared = this.sharedProps.get(sharedProps);
+ if (shared != null) {
+ // Clone because we may clobber
+ shared = Props.clone(shared);
+ shared.setEarliestAncestor(props);
+ props = shared;
}
}
+
+ // 3. Output Properties
+ Props outputProps = collectOutputProps(node);
+ if (outputProps != null) {
+ outputProps.setEarliestAncestor(props);
+ props = outputProps;
+ }
- return true;
- }
-
- private Props collectOutputProps(ExecutableNode node) {
- Props previousOutput = null;
- // Iterate the in nodes again and create the dependencies
- for (String dependency : node.getInNodes()) {
- Props output = jobOutputProps.get(dependency);
- if (output != null) {
- output = Props.clone(output);
- output.setParent(previousOutput);
- previousOutput = output;
- }
+ // 4. The job source
+ Props jobSource = loadJobProps(node);
+ if (jobSource != null) {
+ jobSource.setParent(props);
+ props = jobSource;
}
- return previousOutput;
+ node.setInputProps(props);
}
- private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) {
+ private Props loadJobProps(ExecutableNode node) throws IOException {
+ Props props = null;
String source = node.getJobSource();
- String propsSource = node.getPropsSource();
-
- // If no properties are set, we just set the global properties.
- Props parentProps = propsSource == null ? globalProps : sharedProps.get(propsSource);
-
- // Set up overrides
- ExecutionOptions options = flow.getExecutionOptions();
- @SuppressWarnings("unchecked")
- Props flowProps = new Props(null, options.getFlowParameters());
- flowProps.putAll(commonProps);
- flowProps.setParent(parentProps);
- parentProps = flowProps;
-
- // We add the previous job output and put into this props.
- if (previousOutput != null) {
- Props earliestParent = previousOutput.getEarliestAncestor();
- earliestParent.setParent(parentProps);
-
- parentProps = previousOutput;
+ if (source == null) {
+ return null;
}
- // Load job file.
- File path = new File(execDir, source);
- Props prop = null;
-
// load the override props if any
try {
- prop = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId()+".jor");
+ props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId()+".jor");
}
catch(ProjectManagerException e) {
e.printStackTrace();
logger.error("Error loading job override property for job " + node.getId());
}
- if(prop == null) {
+
+ File path = new File(execDir, source);
+ if(props == null) {
// if no override prop, load the original one on disk
try {
- prop = new Props(null, path);
+ props = new Props(null, path);
} catch (IOException e) {
e.printStackTrace();
logger.error("Error loading job file " + source + " for job " + node.getId());
}
}
// setting this fake source as this will be used to determine the location of log files.
- prop.setSource(path.getPath());
- prop.setParent(parentProps);
+ props.setSource(path.getPath());
+ return props;
+ }
+
+ private void runExecutableNode(ExecutableNode node) throws IOException {
+ // Collect output props from the job's dependencies.
+ prepareJobProperties(node);
+
+ if (node instanceof ExecutableFlowBase) {
+ node.setStatus(Status.RUNNING);
+ node.setStartTime(System.currentTimeMillis());
+
+ logger.info("Starting subflow " + node.getId() + ".");
+ }
+ else {
+ node.setStatus(Status.QUEUED);
+ JobRunner runner = createJobRunner(node);
+ logger.info("Submitting job " + node.getId() + " to run.");
+ try {
+ executorService.submit(runner);
+ jobRunners.put(node.getId(), runner);
+ activeJobRunners.put(node.getId(), runner);
+ } catch (RejectedExecutionException e) {
+ logger.error(e);
+ };
+ }
+ }
+
+ /**
+ * Determines what the state of the next node should be.
+ *
+ * @param node
+ * @return
+ */
+ public Status getImpliedStatus(ExecutableNode node) {
+ if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
+ return Status.KILLED;
+ }
+ else if (node.getStatus() == Status.DISABLED) {
+ return Status.DISABLED;
+ }
- JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager);
+ ExecutableFlowBase flow = node.getParentFlow();
+ boolean shouldKill = false;
+ for (String dependency: node.getInNodes()) {
+ ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
+ Status depStatus = dependencyNode.getStatus();
+
+ switch (depStatus) {
+ case FAILED:
+ case KILLED:
+ shouldKill = true;
+ case SKIPPED:
+ case SUCCEEDED:
+ case FAILED_SUCCEEDED:
+ continue;
+ default:
+ // Should never come here.
+ return null;
+ }
+ }
+
+ if (shouldKill) {
+ return Status.KILLED;
+ }
+
+ // If it's disabled but ready to run, we want to make sure it continues being disabled.
+ if (node.getStatus() == Status.DISABLED) {
+ return Status.DISABLED;
+ }
+
+ // All good to go, ready to run.
+ return Status.READY;
+ }
+
+ private Props collectOutputProps(ExecutableNode node) {
+ Props previousOutput = null;
+ // Iterate the in nodes again and create the dependencies
+ for (String dependency : node.getInNodes()) {
+ Props output = jobOutputProps.get(dependency);
+ if (output != null) {
+ output = Props.clone(output);
+ output.setParent(previousOutput);
+ previousOutput = output;
+ }
+ }
+
+ return previousOutput;
+ }
+
+ private JobRunner createJobRunner(ExecutableNode node) {
+ // Load job file.
+ File path = new File(execDir, node.getJobSource());
+
+ JobRunner jobRunner = new JobRunner(node, path.getParentFile(), executorLoader, jobtypeManager);
if (watcher != null) {
jobRunner.setPipeline(watcher, pipelineLevel);
}
@@ -689,62 +726,11 @@ public class FlowRunner extends EventHandler implements Runnable {
private void interrupt() {
flowRunnerThread.interrupt();
}
-
- private Status getImpliedStatus(ExecutableNode node) {
- switch(node.getStatus()) {
- case FAILED:
- case KILLED:
- case SKIPPED:
- case SUCCEEDED:
- case FAILED_SUCCEEDED:
- case QUEUED:
- case RUNNING:
- return null;
- default:
- break;
- }
-
- boolean shouldKill = false;
- for (String dependency : node.getInNodes()) {
- ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
-
- Status depStatus = dependencyNode.getStatus();
- switch (depStatus) {
- case FAILED:
- case KILLED:
- shouldKill = true;
- case SKIPPED:
- case SUCCEEDED:
- case FAILED_SUCCEEDED:
- continue;
- case RUNNING:
- case QUEUED:
- case DISABLED:
- return null;
- default:
- // Return null means it's not ready to run.
- return null;
- }
- }
-
- ExecutionOptions options = flow.getExecutionOptions();
- if (shouldKill || flowCancelled || (flowFailed && options.getFailureAction() != FailureAction.FINISH_ALL_POSSIBLE)) {
- return Status.KILLED;
- }
-
- // If it's disabled but ready to run, we want to make sure it continues being disabled.
- if (node.getStatus() == Status.DISABLED) {
- return Status.DISABLED;
- }
-
- // All good to go, ready to run.
- return Status.READY;
- }
-
+
private class JobRunnerEventListener implements EventListener {
public JobRunnerEventListener() {
}
-
+ // TODO: HANDLE subflow execution
@Override
public synchronized void handleEvent(Event event) {
JobRunner runner = (JobRunner)event.getRunner();
@@ -774,7 +760,6 @@ public class FlowRunner extends EventHandler implements Runnable {
}
else {
if (!runner.isCancelled() && runner.getRetries() > 0) {
-
logger.info("Job " + node.getId() + " has run out of retry attempts");
// Setting delayed execution to 0 in case this is manually re-tried.
node.setDelayedExecution(0);
src/java/azkaban/execapp/JobRunner.java 13(+9 -4)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index c4ea32f..f77f1b2 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -88,8 +88,8 @@ public class JobRunner extends EventHandler implements Runnable {
private boolean cancelled = false;
private BlockingStatus currentBlockStatus = null;
- public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
- this.props = props;
+ public JobRunner(ExecutableNode node, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
+ this.props = node.getInputProps();
this.node = node;
this.workingDir = workingDir;
@@ -149,8 +149,14 @@ public class JobRunner extends EventHandler implements Runnable {
logger = Logger.getLogger(loggerName);
// Create file appender
- String logName = createLogFileName(this.executionId, this.jobId, node.getAttempt());
+ String id = this.jobId;
+ if (node.getExecutableFlow() != node.getParentFlow()) {
+ id = node.getParentFlow().getNestedId("._.");
+ }
+
+ String logName = createLogFileName(this.executionId, id, node.getAttempt());
logFile = new File(workingDir, logName);
+
String absolutePath = logFile.getAbsolutePath();
jobAppender = null;
@@ -306,7 +312,6 @@ public class JobRunner extends EventHandler implements Runnable {
);
Arrays.sort(files, Collections.reverseOrder());
-
loader.uploadLogFile(executionId, this.jobId, node.getAttempt(), files);
} catch (ExecutorManagerException e) {
flowLogger.error("Error writing out logs for job " + this.jobId, e);
src/java/azkaban/executor/ExecutableFlow.java 44(+28 -16)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 6141959..e9f81f7 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -34,7 +34,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
public static final String PROJECTID_PARAM ="projectId";
public static final String SCHEDULEID_PARAM ="scheduleId";
public static final String SUBMITUSER_PARAM = "submitUser";
- public static final String SUBMITTIME_PARAM = "submitUser";
+ public static final String SUBMITTIME_PARAM = "submitTime";
public static final String VERSION_PARAM = "version";
public static final String PROXYUSERS_PARAM = "proxyUsers";
@@ -95,8 +95,8 @@ public class ExecutableFlow extends ExecutableFlowBase {
return executionOptions;
}
- private void setFlow(Project project, Flow flow) {
- super.setFlow(project, flow, null);
+ protected void setFlow(Project project, Flow flow) {
+ super.setFlow(project, flow);
executionOptions = new ExecutionOptions();
if (flow.getSuccessEmails() != null) {
@@ -195,34 +195,46 @@ public class ExecutableFlow extends ExecutableFlowBase {
public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
ExecutableFlow exFlow = new ExecutableFlow();
HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
-
exFlow.fillExecutableFromMapObject(flowObj);
- exFlow.executionId = (Integer)flowObj.get(EXECUTIONID_PARAM);
- exFlow.executionPath = (String)flowObj.get(EXECUTIONPATH_PARAM);
+
+ return exFlow;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void fillExecutableFromMapObject(Map<String, Object> flowObj) {
+ super.fillExecutableFromMapObject(flowObj);
+
+ this.executionId = (Integer)flowObj.get(EXECUTIONID_PARAM);
+ this.executionPath = (String)flowObj.get(EXECUTIONPATH_PARAM);
- exFlow.projectId = (Integer)flowObj.get(PROJECTID_PARAM);
+ this.projectId = (Integer)flowObj.get(PROJECTID_PARAM);
if (flowObj.containsKey(SCHEDULEID_PARAM)) {
- exFlow.scheduleId = (Integer)flowObj.get(SCHEDULEID_PARAM);
+ this.scheduleId = (Integer)flowObj.get(SCHEDULEID_PARAM);
}
- exFlow.submitUser = (String)flowObj.get(SUBMITUSER_PARAM);
- exFlow.version = (Integer)flowObj.get(VERSION_PARAM);
- exFlow.submitTime = JSONUtils.getLongFromObject(flowObj.get(SUBMITTIME_PARAM));
+ if (flowObj.containsKey(SUBMITUSER_PARAM)) {
+ this.submitUser = (String)flowObj.get(SUBMITUSER_PARAM);
+ }
+ else {
+ this.submitUser = null;
+ }
+ this.version = (Integer)flowObj.get(VERSION_PARAM);
+
+ this.submitTime = JSONUtils.getLongFromObject(flowObj.get(SUBMITTIME_PARAM));
if (flowObj.containsKey(EXECUTIONOPTIONS_PARAM)) {
- exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get(EXECUTIONOPTIONS_PARAM));
+ this.executionOptions = ExecutionOptions.createFromObject(flowObj.get(EXECUTIONOPTIONS_PARAM));
}
else {
// for backwards compatibility should remove in a few versions.
- exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj);
+ this.executionOptions = ExecutionOptions.createFromObject(flowObj);
}
if(flowObj.containsKey(PROXYUSERS_PARAM)) {
ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get(PROXYUSERS_PARAM);
- exFlow.addAllProxyUsers(proxyUserList);
+ this.addAllProxyUsers(proxyUserList);
}
-
- return exFlow;
}
public Map<String, Object> toUpdateObject(long lastUpdateTime) {
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 311c33b..10db20f 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -44,7 +44,7 @@ public class ExecutableFlowBase extends ExecutableNode {
public ExecutableFlowBase(Project project, Node node, Flow flow, ExecutableFlowBase parent) {
super(node, parent);
- setFlow(project, flow, parent);
+ setFlow(project, flow);
}
public ExecutableFlowBase() {
@@ -78,15 +78,15 @@ public class ExecutableFlowBase extends ExecutableNode {
return flowId;
}
- public String getNestedId() {
+ public String getNestedId(String delimiter) {
if (this.getParentFlow() != null) {
- return this.getParentFlow().getNestedId() + ":" + getId();
+ return this.getParentFlow().getNestedId(delimiter) + delimiter + getId();
}
return getId();
}
- protected void setFlow(Project project, Flow flow, ExecutableFlowBase parent) {
+ protected void setFlow(Project project, Flow flow) {
this.flowId = flow.getId();
for (Node node: flow.getNodes()) {
@@ -95,11 +95,11 @@ public class ExecutableFlowBase extends ExecutableNode {
String embeddedFlowId = node.getEmbeddedFlowId();
Flow subFlow = project.getFlow(embeddedFlowId);
- ExecutableFlowBase embeddedFlow = new ExecutableFlowBase(project, node, subFlow, parent);
+ ExecutableFlowBase embeddedFlow = new ExecutableFlowBase(project, node, subFlow, this);
executableNodes.put(id, embeddedFlow);
}
else {
- ExecutableNode exNode = new ExecutableNode(node, parent);
+ ExecutableNode exNode = new ExecutableNode(node, this);
executableNodes.put(id, exNode);
}
}
@@ -270,4 +270,78 @@ public class ExecutableFlowBase extends ExecutableNode {
exNode.applyUpdateObject(node);
}
}
+
+ public void reEnableDependents(ExecutableNode ... nodes) {
+ for(ExecutableNode node: nodes) {
+ for(String dependent: node.getOutNodes()) {
+ ExecutableNode dependentNode = getExecutableNode(dependent);
+
+ if (dependentNode.getStatus() == Status.KILLED) {
+ dependentNode.setStatus(Status.READY);
+ dependentNode.setUpdateTime(System.currentTimeMillis());
+ reEnableDependents(dependentNode);
+
+ if (dependentNode instanceof ExecutableFlowBase) {
+
+ ((ExecutableFlowBase)dependentNode).reEnableDependents();
+ }
+ }
+ else if (dependentNode.getStatus() == Status.SKIPPED) {
+ dependentNode.setStatus(Status.DISABLED);
+ dependentNode.setUpdateTime(System.currentTimeMillis());
+ reEnableDependents(dependentNode);
+ }
+ }
+ }
+ }
+
+ /**
+ * Only returns true if the status of all finished nodes is true.
+ * @return
+ */
+ public boolean isFlowFinished() {
+ for (String end: getEndNodes()) {
+ ExecutableNode node = getExecutableNode(end);
+ if (!Status.isStatusFinished(node.getStatus()) ) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Finds all jobs which are ready to run. This occurs when all of its
+ * dependency nodes are finished running.
+ *
+ * @param flow
+ * @return
+ */
+ public List<ExecutableNode> findNextJobsToRun() {
+ ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
+
+ nodeloop:
+ for (ExecutableNode node: executableNodes.values()) {
+ if(Status.isStatusFinished(node.getStatus())) {
+ continue;
+ }
+
+ if ((node instanceof ExecutableFlowBase) && Status.isStatusRunning(node.getStatus())) {
+ // If the flow is still running, we traverse into the flow
+ jobsToRun.addAll(((ExecutableFlowBase)node).findNextJobsToRun());
+ }
+ else {
+ for (String dependency: node.getInNodes()) {
+ // We find that the outer-loop is unfinished.
+ if (!Status.isStatusFinished(getExecutableNode(dependency).getStatus())) {
+ continue nodeloop;
+ }
+ }
+
+ jobsToRun.add(node);
+ }
+ }
+
+ return jobsToRun;
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 64a0a35..6f41c56 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -41,6 +41,7 @@ public class ExecutableNode {
private Set<String> inNodes = null;
private Set<String> outNodes = null;
+ private Props inputProps;
private Props outputProps;
public static final String ATTEMPT_PARAM = "attempt";
@@ -60,14 +61,14 @@ public class ExecutableNode {
}
public ExecutableNode(Node node, ExecutableFlowBase parent) {
- this(node.getId(), node.getJobSource(), node.getPropsSource(), parent);
+ this(node.getId(), node.getType(), node.getJobSource(), node.getPropsSource(), parent);
}
- public ExecutableNode(String id, String jobSource, String propsSource, ExecutableFlowBase parent) {
+ public ExecutableNode(String id, String type, String jobSource, String propsSource, ExecutableFlowBase parent) {
this.id = id;
this.jobSource = jobSource;
this.propsSource = propsSource;
-
+ this.type = type;
setParentFlow(parent);
}
@@ -176,10 +177,18 @@ public class ExecutableNode {
return propsSource;
}
+ public void setInputProps(Props input) {
+ this.inputProps = input;
+ }
+
public void setOutputProps(Props output) {
this.outputProps = output;
}
+ public Props getInputProps() {
+ return this.inputProps;
+ }
+
public Props getOutputProps() {
return outputProps;
}
@@ -222,7 +231,6 @@ public class ExecutableNode {
this.setStatus(Status.READY);
}
-
public List<Object> getAttemptObjects() {
ArrayList<Object> array = new ArrayList<Object>();
@@ -247,6 +255,7 @@ public class ExecutableNode {
objMap.put(ENDTIME_PARAM, endTime);
objMap.put(UPDATETIME_PARAM, updateTime);
objMap.put(TYPE_PARAM, type);
+ objMap.put(ATTEMPT_PARAM, attempt);
if (inNodes != null) {
objMap.put(INNODES_PARAM, inNodes);
@@ -283,6 +292,7 @@ public class ExecutableNode {
this.endTime = JSONUtils.getLongFromObject(objMap.get(ENDTIME_PARAM));
this.updateTime = JSONUtils.getLongFromObject(objMap.get(UPDATETIME_PARAM));
this.type = (String)objMap.get(TYPE_PARAM);
+ this.attempt = (Integer)objMap.get(ATTEMPT_PARAM);
if (objMap.containsKey(INNODES_PARAM)) {
this.inNodes = new HashSet<String>();
@@ -362,6 +372,23 @@ public class ExecutableNode {
}
}
+ public void killNode(long killTime) {
+ if (this.status == Status.DISABLED) {
+ skipNode(killTime);
+ }
+ else {
+ this.setStatus(Status.KILLED);
+ this.setStartTime(killTime);
+ this.setEndTime(killTime);
+ }
+ }
+
+ public void skipNode(long skipTime) {
+ this.setStatus(Status.SKIPPED);
+ this.setStartTime(skipTime);
+ this.setEndTime(skipTime);
+ }
+
private void updatePastAttempts(List<Object> pastAttemptsList) {
if (pastAttemptsList == null) {
return;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index a231c41..9ef6722 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -379,7 +379,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
// if the main flow is not the parent, then we'll create a composite key for flowID
if (flow != node.getParentFlow()) {
- flowId = node.getParentFlow().getNestedId();
+ flowId = node.getParentFlow().getNestedId("+");
}
QueryRunner runner = createQueryRunner();
@@ -404,7 +404,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
@Override
public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
- final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND job_id=? AND attempt=?";
+ final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
byte[] outputParam = null;
Props outputProps = node.getOutputProps();
src/java/azkaban/executor/Status.java 10(+10 -0)
diff --git a/src/java/azkaban/executor/Status.java b/src/java/azkaban/executor/Status.java
index d8215df..f796378 100644
--- a/src/java/azkaban/executor/Status.java
+++ b/src/java/azkaban/executor/Status.java
@@ -56,4 +56,14 @@ public enum Status {
return false;
}
}
+
+ public static boolean isStatusRunning(Status status) {
+ switch (status) {
+ case RUNNING:
+ case FAILED_FINISHING:
+ return true;
+ default:
+ return false;
+ }
+ }
}
\ No newline at end of file
src/java/azkaban/utils/Props.java 7(+6 -1)
diff --git a/src/java/azkaban/utils/Props.java b/src/java/azkaban/utils/Props.java
index 12da4a7..4aeefa1 100644
--- a/src/java/azkaban/utils/Props.java
+++ b/src/java/azkaban/utils/Props.java
@@ -156,7 +156,12 @@ public class Props {
putAll(props);
}
}
-
+
+ public void setEarliestAncestor(Props parent) {
+ Props props = getEarliestAncestor();
+ props.setParent(parent);
+ }
+
public Props getEarliestAncestor() {
if (_parent == null) {
return this;
src/java/azkaban/utils/PropsUtils.java 34(+17 -17)
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index 14da916..bdaac5d 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -208,27 +208,27 @@ public class PropsUtils {
return buffer.toString();
}
- public static Props addCommonFlowProperties(final ExecutableFlowBase flow) {
- Props parentProps = new Props();
+ public static Props addCommonFlowProperties(Props parentProps, final ExecutableFlowBase flow) {
+ Props props = new Props(parentProps);
- parentProps.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
- parentProps.put(CommonJobProperties.EXEC_ID, flow.getExecutionId());
- parentProps.put(CommonJobProperties.PROJECT_ID, flow.getProjectId());
- parentProps.put(CommonJobProperties.PROJECT_VERSION, flow.getVersion());
- parentProps.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
+ props.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
+ props.put(CommonJobProperties.EXEC_ID, flow.getExecutionId());
+ props.put(CommonJobProperties.PROJECT_ID, flow.getProjectId());
+ props.put(CommonJobProperties.PROJECT_VERSION, flow.getVersion());
+ props.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
DateTime loadTime = new DateTime();
- parentProps.put(CommonJobProperties.FLOW_START_TIMESTAMP, loadTime.toString());
- parentProps.put(CommonJobProperties.FLOW_START_YEAR, loadTime.toString("yyyy"));
- parentProps.put(CommonJobProperties.FLOW_START_MONTH, loadTime.toString("MM"));
- parentProps.put(CommonJobProperties.FLOW_START_DAY, loadTime.toString("dd"));
- parentProps.put(CommonJobProperties.FLOW_START_HOUR, loadTime.toString("HH"));
- parentProps.put(CommonJobProperties.FLOW_START_MINUTE, loadTime.toString("mm"));
- parentProps.put(CommonJobProperties.FLOW_START_SECOND, loadTime.toString("ss"));
- parentProps.put(CommonJobProperties.FLOW_START_MILLISSECOND, loadTime.toString("SSS"));
- parentProps.put(CommonJobProperties.FLOW_START_TIMEZONE, loadTime.toString("ZZZZ"));
- return parentProps;
+ props.put(CommonJobProperties.FLOW_START_TIMESTAMP, loadTime.toString());
+ props.put(CommonJobProperties.FLOW_START_YEAR, loadTime.toString("yyyy"));
+ props.put(CommonJobProperties.FLOW_START_MONTH, loadTime.toString("MM"));
+ props.put(CommonJobProperties.FLOW_START_DAY, loadTime.toString("dd"));
+ props.put(CommonJobProperties.FLOW_START_HOUR, loadTime.toString("HH"));
+ props.put(CommonJobProperties.FLOW_START_MINUTE, loadTime.toString("mm"));
+ props.put(CommonJobProperties.FLOW_START_SECOND, loadTime.toString("ss"));
+ props.put(CommonJobProperties.FLOW_START_MILLISSECOND, loadTime.toString("SSS"));
+ props.put(CommonJobProperties.FLOW_START_TIMEZONE, loadTime.toString("ZZZZ"));
+ return props;
}
public static String toJSONString(Props props, boolean localOnly) {
diff --git a/unit/executions/embeddedBad/innerFlow.job b/unit/executions/embeddedBad/innerFlow.job
new file mode 100644
index 0000000..da71d64
--- /dev/null
+++ b/unit/executions/embeddedBad/innerFlow.job
@@ -0,0 +1,5 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
+dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embeddedBad/innerJobA.job b/unit/executions/embeddedBad/innerJobA.job
new file mode 100644
index 0000000..665b38d
--- /dev/null
+++ b/unit/executions/embeddedBad/innerJobA.job
@@ -0,0 +1,4 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/embeddedBad/innerJobB.job b/unit/executions/embeddedBad/innerJobB.job
new file mode 100644
index 0000000..dc29b4a
--- /dev/null
+++ b/unit/executions/embeddedBad/innerJobB.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=jobe
+dependencies=innerJobA
diff --git a/unit/executions/embeddedBad/innerJobC.job b/unit/executions/embeddedBad/innerJobC.job
new file mode 100644
index 0000000..178bbef
--- /dev/null
+++ b/unit/executions/embeddedBad/innerJobC.job
@@ -0,0 +1,5 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
+dependencies=innerJobA
unit/executions/embeddedBad/joba.job 4(+4 -0)
diff --git a/unit/executions/embeddedBad/joba.job b/unit/executions/embeddedBad/joba.job
new file mode 100644
index 0000000..665b38d
--- /dev/null
+++ b/unit/executions/embeddedBad/joba.job
@@ -0,0 +1,4 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
unit/executions/embeddedBad/jobb.job 3(+3 -0)
diff --git a/unit/executions/embeddedBad/jobb.job b/unit/executions/embeddedBad/jobb.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embeddedBad/jobb.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
unit/executions/embeddedBad/jobc.job 3(+3 -0)
diff --git a/unit/executions/embeddedBad/jobc.job b/unit/executions/embeddedBad/jobc.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embeddedBad/jobc.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
unit/executions/embeddedBad/jobd.job 3(+3 -0)
diff --git a/unit/executions/embeddedBad/jobd.job b/unit/executions/embeddedBad/jobd.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embeddedBad/jobd.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
unit/executions/embeddedBad/jobe.job 5(+5 -0)
diff --git a/unit/executions/embeddedBad/jobe.job b/unit/executions/embeddedBad/jobe.job
new file mode 100644
index 0000000..fe986d5
--- /dev/null
+++ b/unit/executions/embeddedBad/jobe.job
@@ -0,0 +1,5 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
+dependencies=jobb,jobc,jobd
diff --git a/unit/executions/embeddedBad/selfreference.job b/unit/executions/embeddedBad/selfreference.job
new file mode 100644
index 0000000..708f351
--- /dev/null
+++ b/unit/executions/embeddedBad/selfreference.job
@@ -0,0 +1,2 @@
+type=flow
+flow.name=selfreference
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/ExecutableFlowTest.java b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
new file mode 100644
index 0000000..7a2f55d
--- /dev/null
+++ b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
@@ -0,0 +1,355 @@
+package azkaban.test.executor;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionAttempt;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Status;
+import azkaban.flow.Flow;
+import azkaban.flow.FlowProps;
+import azkaban.project.Project;
+import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+
+public class ExecutableFlowTest {
+ private Project project;
+
+ @Before
+ public void setUp() throws Exception {
+ Logger logger = Logger.getLogger(this.getClass());
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ loader.loadProjectFlow(new File("unit/executions/embedded"));
+ Assert.assertEquals(0, loader.getErrors().size());
+
+ project = new Project(11, "myTestProject");
+ project.setFlows(loader.getFlowMap());
+ project.setVersion(123);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testExecutorFlowCreation() throws Exception {
+ Flow flow = project.getFlow("jobe");
+ Assert.assertNotNull(flow);
+
+ ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+ Assert.assertNotNull(exFlow.getExecutableNode("joba"));
+ Assert.assertNotNull(exFlow.getExecutableNode("jobb"));
+ Assert.assertNotNull(exFlow.getExecutableNode("jobc"));
+ Assert.assertNotNull(exFlow.getExecutableNode("jobd"));
+ Assert.assertNotNull(exFlow.getExecutableNode("jobe"));
+
+ Assert.assertFalse(exFlow.getExecutableNode("joba") instanceof ExecutableFlowBase);
+ Assert.assertTrue(exFlow.getExecutableNode("jobb") instanceof ExecutableFlowBase);
+ Assert.assertTrue(exFlow.getExecutableNode("jobc") instanceof ExecutableFlowBase);
+ Assert.assertTrue(exFlow.getExecutableNode("jobd") instanceof ExecutableFlowBase);
+ Assert.assertFalse(exFlow.getExecutableNode("jobe") instanceof ExecutableFlowBase);
+
+ ExecutableFlowBase jobbFlow = (ExecutableFlowBase)exFlow.getExecutableNode("jobb");
+ ExecutableFlowBase jobcFlow = (ExecutableFlowBase)exFlow.getExecutableNode("jobc");
+ ExecutableFlowBase jobdFlow = (ExecutableFlowBase)exFlow.getExecutableNode("jobd");
+
+ Assert.assertEquals("innerFlow", jobbFlow.getFlowId());
+ Assert.assertEquals("jobb", jobbFlow.getId());
+ Assert.assertEquals(4, jobbFlow.getExecutableNodes().size());
+
+ Assert.assertEquals("innerFlow", jobcFlow.getFlowId());
+ Assert.assertEquals("jobc", jobcFlow.getId());
+ Assert.assertEquals(4, jobcFlow.getExecutableNodes().size());
+
+ Assert.assertEquals("innerFlow", jobdFlow.getFlowId());
+ Assert.assertEquals("jobd", jobdFlow.getId());
+ Assert.assertEquals(4, jobdFlow.getExecutableNodes().size());
+ }
+
+ @Test
+ public void testExecutorFlowJson() throws Exception {
+ Flow flow = project.getFlow("jobe");
+ Assert.assertNotNull(flow);
+
+ ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+
+ Object obj = exFlow.toObject();
+ String exFlowJSON = JSONUtils.toJSON(obj);
+ @SuppressWarnings("unchecked")
+ Map<String,Object> flowObjMap = (Map<String,Object>)JSONUtils.parseJSONFromString(exFlowJSON);
+
+ ExecutableFlow parsedExFlow = ExecutableFlow.createExecutableFlowFromObject(flowObjMap);
+ testEquals(exFlow, parsedExFlow);
+ }
+
+ @Test
+ public void testExecutorFlowJson2() throws Exception {
+ Flow flow = project.getFlow("jobe");
+ Assert.assertNotNull(flow);
+
+ ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+ exFlow.setExecutionId(101);
+ exFlow.setAttempt(2);
+ exFlow.setDelayedExecution(1000);
+
+ ExecutionOptions options = new ExecutionOptions();
+ options.setConcurrentOption("blah");
+ options.setDisabledJobs(Arrays.asList(new String[] {"bee", null, "boo"}));
+ options.setFailureAction(FailureAction.CANCEL_ALL);
+ options.setFailureEmails(Arrays.asList(new String[] {"doo", null, "daa"}));
+ options.setSuccessEmails(Arrays.asList(new String[] {"dee", null, "dae"}));
+ options.setPipelineLevel(2);
+ options.setPipelineExecutionId(3);
+ options.setNotifyOnFirstFailure(true);
+ options.setNotifyOnLastFailure(true);
+
+ HashMap<String, String> flowProps = new HashMap<String,String>();
+ flowProps.put("la", "fa");
+ options.setFlowParameters(flowProps);
+ exFlow.setExecutionOptions(options);
+
+ Object obj = exFlow.toObject();
+ String exFlowJSON = JSONUtils.toJSON(obj);
+ @SuppressWarnings("unchecked")
+ Map<String,Object> flowObjMap = (Map<String,Object>)JSONUtils.parseJSONFromString(exFlowJSON);
+
+ ExecutableFlow parsedExFlow = ExecutableFlow.createExecutableFlowFromObject(flowObjMap);
+ testEquals(exFlow, parsedExFlow);
+ }
+
+ @Test
+ public void testExecutorFlowUpdates() throws Exception {
+ Flow flow = project.getFlow("jobe");
+ ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+ exFlow.setExecutionId(101);
+
+ // Create copy of flow
+ Object obj = exFlow.toObject();
+ String exFlowJSON = JSONUtils.toJSON(obj);
+ @SuppressWarnings("unchecked")
+ Map<String,Object> flowObjMap = (Map<String,Object>)JSONUtils.parseJSONFromString(exFlowJSON);
+ ExecutableFlow copyFlow = ExecutableFlow.createExecutableFlowFromObject(flowObjMap);
+
+ testEquals(exFlow, copyFlow);
+
+ ExecutableNode joba = exFlow.getExecutableNode("joba");
+ ExecutableFlowBase jobb = (ExecutableFlowBase)(exFlow.getExecutableNode("jobb"));
+ ExecutableFlowBase jobc = (ExecutableFlowBase)(exFlow.getExecutableNode("jobc"));
+ ExecutableFlowBase jobd = (ExecutableFlowBase)(exFlow.getExecutableNode("jobd"));
+ ExecutableNode jobe = exFlow.getExecutableNode("jobe");
+ assertNotNull(joba, jobb, jobc, jobd, jobe);
+
+ ExecutableNode jobbInnerFlowA = jobb.getExecutableNode("innerJobA");
+ ExecutableNode jobbInnerFlowB = jobb.getExecutableNode("innerJobB");
+ ExecutableNode jobbInnerFlowC = jobb.getExecutableNode("innerJobC");
+ ExecutableNode jobbInnerFlow = jobb.getExecutableNode("innerFlow");
+ assertNotNull(jobbInnerFlowA, jobbInnerFlowB, jobbInnerFlowC, jobbInnerFlow);
+
+ ExecutableNode jobcInnerFlowA = jobc.getExecutableNode("innerJobA");
+ ExecutableNode jobcInnerFlowB = jobc.getExecutableNode("innerJobB");
+ ExecutableNode jobcInnerFlowC = jobc.getExecutableNode("innerJobC");
+ ExecutableNode jobcInnerFlow = jobc.getExecutableNode("innerFlow");
+ assertNotNull(jobcInnerFlowA, jobcInnerFlowB, jobcInnerFlowC, jobcInnerFlow);
+
+ ExecutableNode jobdInnerFlowA = jobd.getExecutableNode("innerJobA");
+ ExecutableNode jobdInnerFlowB = jobd.getExecutableNode("innerJobB");
+ ExecutableNode jobdInnerFlowC = jobd.getExecutableNode("innerJobC");
+ ExecutableNode jobdInnerFlow = jobd.getExecutableNode("innerFlow");
+ assertNotNull(jobdInnerFlowA, jobdInnerFlowB, jobdInnerFlowC, jobdInnerFlow);
+
+ exFlow.setEndTime(1000);
+ exFlow.setStartTime(500);
+ exFlow.setStatus(Status.RUNNING);
+ exFlow.setUpdateTime(133);
+
+ // Change one job and see if it updates
+ jobe.setEndTime(System.currentTimeMillis());
+ jobe.setUpdateTime(System.currentTimeMillis());
+ jobe.setStatus(Status.DISABLED);
+ jobe.setStartTime(System.currentTimeMillis() - 1000);
+ // Should be one node that was changed
+ Map<String,Object> updateObject = exFlow.toUpdateObject(0);
+ Assert.assertEquals(1, ((List)(updateObject.get("nodes"))).size());
+ // Reapplying should give equal results.
+ copyFlow.applyUpdateObject(updateObject);
+ testEquals(exFlow, copyFlow);
+
+ // This update shouldn't provide any results
+ updateObject = exFlow.toUpdateObject(System.currentTimeMillis());
+ Assert.assertNull(updateObject.get("nodes"));
+
+ // Change inner flow
+ jobbInnerFlowA.setEndTime(System.currentTimeMillis());
+ jobbInnerFlowA.setUpdateTime(System.currentTimeMillis());
+ jobbInnerFlowA.setStatus(Status.DISABLED);
+ jobbInnerFlowA.setStartTime(System.currentTimeMillis() - 1000);
+ // We should get 2 updates if we do a toUpdateObject using 0 as the start time
+ updateObject = exFlow.toUpdateObject(0);
+ Assert.assertEquals(2, ((List)(updateObject.get("nodes"))).size());
+
+ // This should provide 1 update. That we can apply
+ updateObject = exFlow.toUpdateObject(jobe.getUpdateTime());
+ Assert.assertEquals(1, ((List)(updateObject.get("nodes"))).size());
+ copyFlow.applyUpdateObject(updateObject);
+ testEquals(exFlow, copyFlow);
+
+ // This shouldn't give any results anymore
+ updateObject = exFlow.toUpdateObject(jobbInnerFlowA.getUpdateTime());
+ Assert.assertNull(updateObject.get("nodes"));
+ }
+
+ private void assertNotNull(ExecutableNode ... nodes) {
+ for (ExecutableNode node: nodes) {
+ Assert.assertNotNull(node);
+ }
+ }
+
+ public static void testEquals(ExecutableNode a, ExecutableNode b) {
+ if (a instanceof ExecutableFlow) {
+ if (b instanceof ExecutableFlow) {
+ ExecutableFlow exA = (ExecutableFlow)a;
+ ExecutableFlow exB = (ExecutableFlow)b;
+
+ Assert.assertEquals(exA.getScheduleId(), exB.getScheduleId());
+ Assert.assertEquals(exA.getProjectId(), exB.getProjectId());
+ Assert.assertEquals(exA.getVersion(), exB.getVersion());
+ Assert.assertEquals(exA.getSubmitTime(), exB.getSubmitTime());
+ Assert.assertEquals(exA.getSubmitUser(), exB.getSubmitUser());
+ Assert.assertEquals(exA.getExecutionPath(), exB.getExecutionPath());
+
+ testEquals(exA.getExecutionOptions(), exB.getExecutionOptions());
+ }
+ else {
+ Assert.fail("A is ExecutableFlow, but B is not");
+ }
+ }
+
+ if (a instanceof ExecutableFlowBase) {
+ if (b instanceof ExecutableFlowBase) {
+ ExecutableFlowBase exA = (ExecutableFlowBase)a;
+ ExecutableFlowBase exB = (ExecutableFlowBase)b;
+
+ Assert.assertEquals(exA.getFlowId(), exB.getFlowId());
+ Assert.assertEquals(exA.getExecutableNodes().size(), exB.getExecutableNodes().size());
+
+ for(ExecutableNode nodeA : exA.getExecutableNodes()) {
+ ExecutableNode nodeB = exB.getExecutableNode(nodeA.getId());
+ Assert.assertNotNull(nodeB);
+ Assert.assertEquals(a, nodeA.getParentFlow());
+ Assert.assertEquals(b, nodeB.getParentFlow());
+
+ testEquals(nodeA, nodeB);
+ }
+ }
+ else {
+ Assert.fail("A is ExecutableFlowBase, but B is not");
+ }
+ }
+
+ Assert.assertEquals(a.getId(), b.getId());
+ Assert.assertEquals(a.getStatus(), b.getStatus());
+ Assert.assertEquals(a.getStartTime(), b.getStartTime());
+ Assert.assertEquals(a.getEndTime(), b.getEndTime());
+ Assert.assertEquals(a.getUpdateTime(), b.getUpdateTime());
+ Assert.assertEquals(a.getAttempt(), b.getAttempt());
+
+ Assert.assertEquals(a.getJobSource(), b.getJobSource());
+ Assert.assertEquals(a.getPropsSource(), b.getPropsSource());
+ Assert.assertEquals(a.getInNodes(), a.getInNodes());
+ Assert.assertEquals(a.getOutNodes(), a.getOutNodes());
+ }
+
+
+
+ public static void testEquals(ExecutionOptions optionsA, ExecutionOptions optionsB) {
+ Assert.assertEquals(optionsA.getConcurrentOption(), optionsB.getConcurrentOption());
+ Assert.assertEquals(optionsA.getNotifyOnFirstFailure(), optionsB.getNotifyOnFirstFailure());
+ Assert.assertEquals(optionsA.getNotifyOnLastFailure(), optionsB.getNotifyOnLastFailure());
+ Assert.assertEquals(optionsA.getFailureAction(), optionsB.getFailureAction());
+ Assert.assertEquals(optionsA.getPipelineExecutionId(), optionsB.getPipelineExecutionId());
+ Assert.assertEquals(optionsA.getPipelineLevel(), optionsB.getPipelineLevel());
+ Assert.assertEquals(optionsA.isFailureEmailsOverridden(), optionsB.isFailureEmailsOverridden());
+ Assert.assertEquals(optionsA.isSuccessEmailsOverridden(), optionsB.isSuccessEmailsOverridden());
+
+ testEquals(optionsA.getDisabledJobs(), optionsB.getDisabledJobs());
+ testEquals(optionsA.getSuccessEmails(), optionsB.getSuccessEmails());
+ testEquals(optionsA.getFailureEmails(), optionsB.getFailureEmails());
+ testEquals(optionsA.getFlowParameters(), optionsB.getFlowParameters());
+ }
+
+ public static void testEquals(Set<String> a, Set<String> b) {
+ if (a == b) {
+ return;
+ }
+
+ if (a == null || b == null) {
+ Assert.fail();
+ }
+
+ Assert.assertEquals(a.size(), b.size());
+
+ Iterator<String> iterA = a.iterator();
+
+ while(iterA.hasNext()) {
+ String aStr = iterA.next();
+ Assert.assertTrue(b.contains(aStr));
+ }
+ }
+
+ public static void testEquals(List<String> a, List<String> b) {
+ if (a == b) {
+ return;
+ }
+
+ if (a == null || b == null) {
+ Assert.fail();
+ }
+
+ Assert.assertEquals(a.size(), b.size());
+
+ Iterator<String> iterA = a.iterator();
+ Iterator<String> iterB = b.iterator();
+
+ while(iterA.hasNext()) {
+ String aStr = iterA.next();
+ String bStr = iterB.next();
+
+ Assert.assertEquals(aStr, bStr);
+ }
+ }
+
+ public static void testEquals(Map<String, String> a, Map<String, String> b) {
+ if (a == b) {
+ return;
+ }
+
+ if (a == null || b == null) {
+ Assert.fail();
+ }
+
+ Assert.assertEquals(a.size(), b.size());
+
+ for (String key: a.keySet()) {
+ Assert.assertEquals(a.get(key), b.get(key));
+ }
+ }
+}
diff --git a/unit/java/azkaban/test/utils/DirectoryFlowLoaderTest.java b/unit/java/azkaban/test/utils/DirectoryFlowLoaderTest.java
index 4316c5c..d671e6f 100644
--- a/unit/java/azkaban/test/utils/DirectoryFlowLoaderTest.java
+++ b/unit/java/azkaban/test/utils/DirectoryFlowLoaderTest.java
@@ -2,6 +2,8 @@ package azkaban.test.utils;
import java.io.File;
+import junit.framework.Assert;
+
import org.apache.log4j.Logger;
import org.junit.Test;
@@ -18,4 +20,26 @@ public class DirectoryFlowLoaderTest {
logger.info(loader.getFlowMap().size());
}
+ @Test
+ public void testLoadEmbeddedFlow() {
+ Logger logger = Logger.getLogger(this.getClass());
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+
+ loader.loadProjectFlow(new File("unit/executions/embedded"));
+ Assert.assertEquals(0, loader.getErrors().size());
+ }
+
+ @Test
+ public void testRecursiveLoadEmbeddedFlow() {
+ Logger logger = Logger.getLogger(this.getClass());
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+
+ loader.loadProjectFlow(new File("unit/executions/embeddedBad"));
+ for (String error: loader.getErrors()) {
+ System.out.println(error);
+ }
+
+ // Should be 3 errors: jobe->innerFlow, innerFlow->jobe, innerFlow
+ Assert.assertEquals(3, loader.getErrors().size());
+ }
}