azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 157(+72 -85)
Details
src/java/azkaban/execapp/FlowRunner.java 157(+72 -85)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 01898bb..dcef893 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -333,12 +333,7 @@ public class FlowRunner extends EventHandler implements Runnable {
continue;
}
else {
- if (!progressGraph(flow)) {
- if (flow.isFlowFinished() ) {
- flowFinished = true;
- break;
- }
-
+ if (!progressGraph()) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
@@ -356,41 +351,73 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Finished Flow");
}
- private boolean progressGraph(ExecutableFlowBase flow) throws IOException {
+ private boolean progressGraph() throws IOException {
List<ExecutableNode> jobsReadyToRun = flow.findNextJobsToRun();
- if (!jobsReadyToRun.isEmpty()) {
- long currentTime = System.currentTimeMillis();
- for (ExecutableNode node: jobsReadyToRun) {
- Status nextStatus = getImpliedStatus(node);
-
- // If the flow has seen previous failures and the flow has been cancelled, than
- if (nextStatus == Status.KILLED) {
- logger.info("Killing " + node.getId() + " due to prior errors.");
- node.killNode(currentTime);
- finalizeFlowIfFinished(node.getParentFlow());
- fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
- }
- else if (nextStatus == Status.DISABLED) {
- logger.info("Skipping disabled job " + node.getId() + ".");
- node.skipNode(currentTime);
- finalizeFlowIfFinished(node.getParentFlow());
- fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
- }
- else {
- runExecutableNode(node);
- }
+ // If its the current flow
+ if (jobsReadyToRun.size() == 1 && jobsReadyToRun.get(0) == flow) {
+ flowFinished = true;
+ return true;
+ }
+
+ long currentTime = System.currentTimeMillis();
+ for (ExecutableNode node: jobsReadyToRun) {
+ Status nextStatus = getImpliedStatus(node);
+
+ if (node instanceof ExecutableFlowBase && ((ExecutableFlowBase)node).isFlowFinished()) {
+ finalizeFlow((ExecutableFlowBase)node);
+ }
+ else if (nextStatus == Status.KILLED || isCancelled()) {
+ logger.info("Killing " + node.getId() + " due to prior errors.");
+ node.killNode(currentTime);
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
}
+ else if (nextStatus == Status.DISABLED) {
+ logger.info("Skipping disabled job " + node.getId() + ".");
+ node.skipNode(currentTime);
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+ }
+ else {
+ runExecutableNode(node);
+ }
+ }
+ if (!jobsReadyToRun.isEmpty()) {
updateFlow();
return true;
}
-
- return false;
+ else {
+ return false;
+ }
}
private void finalizeFlow(ExecutableFlowBase flow) {
String id = flow == this.flow ? "" : flow.getPrintableId() + " ";
+
+ // If it's not the starting flow, we'll create set of output props
+ // for the finished flow.
+ boolean succeeded = true;
+ Props previousOutput = null;
+
+ for(String end: flow.getEndNodes()) {
+ ExecutableNode node = flow.getExecutableNode(end);
+
+ if (node.getStatus() == Status.KILLED) {
+ succeeded = false;
+ }
+
+ Props output = node.getOutputProps();
+ if (output != null) {
+ output = Props.clone(output);
+ output.setParent(previousOutput);
+ previousOutput = output;
+ }
+ }
+
+ flow.setOutputProps(previousOutput);
+ if (!succeeded && (flow.getStatus() == Status.RUNNING)) {
+ flow.setStatus(Status.KILLED);
+ }
flow.setEndTime(System.currentTimeMillis());
switch(flow.getStatus()) {
@@ -653,11 +680,6 @@ public class FlowRunner extends EventHandler implements Runnable {
for (JobRunner runner : activeJobRunners) {
runner.cancel();
}
-
- if (flow.getStatus() != Status.FAILED && flow.getStatus() != Status.FAILED_FINISHING) {
- logger.info("Setting flow status to " + Status.KILLED.toString());
- flow.setStatus(Status.KILLED);
- }
}
}
@@ -725,7 +747,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private class JobRunnerEventListener implements EventListener {
public JobRunnerEventListener() {
}
- // TODO: HANDLE subflow execution
+
@Override
public synchronized void handleEvent(Event event) {
JobRunner runner = (JobRunner)event.getRunner();
@@ -738,33 +760,33 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = runner.getNode();
activeJobRunners.remove(node.getId());
- String id = node.getPrintableId(":");
+ String id = node.getPrintableId();
logger.info("Job Finished " + id + " with status " + node.getStatus());
if (node.getOutputProps() != null && node.getOutputProps().size() > 0) {
logger.info("Job " + id + " had output props.");
}
-
+
if (node.getStatus() == Status.FAILED) {
- // Retry failure if conditions are met.
- if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
+ if (runner.getRetries() > node.getAttempt()) {
logger.info("Job " + id + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
node.setDelayedExecution(runner.getRetryBackoff());
node.resetForRetry();
}
else {
- if (!runner.isCancelled() && runner.getRetries() > 0) {
+ if (runner.getRetries() > 0) {
logger.info("Job " + id + " has run out of retry attempts");
// Setting delayed execution to 0 in case this is manually re-tried.
node.setDelayedExecution(0);
}
-
+
flowFailed = true;
// The KILLED status occurs when cancel is invoked. We want to keep this
// status even in failure conditions.
- if (flow.getStatus() != Status.KILLED && flow.getStatus() != Status.FAILED) {
+ if (flow.getStatus() != Status.KILLED) {
+ // During a failure, we propagate the failure to parent flows
propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
-
+
if (failureAction == FailureAction.CANCEL_ALL && !flowCancelled) {
logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
cancel();
@@ -772,59 +794,24 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
}
- finalizeFlowIfFinished(node.getParentFlow());
+
updateFlow();
interrupt();
-
fireEventListeners(event);
}
}
}
private void propagateStatus(ExecutableFlowBase base, Status status) {
- base.setStatus(status);
- if (base.getParentFlow() != null) {
- propagateStatus(base.getParentFlow(), status);
- }
- }
- }
-
- private void finalizeFlowIfFinished(ExecutableFlowBase base) {
- // We let main thread finalize the main flow.
- if (base == flow) {
- return;
- }
-
- if (base.isFlowFinished()) {
- boolean succeeded = true;
- Props previousOutput = null;
- for(String end: base.getEndNodes()) {
- ExecutableNode node = base.getExecutableNode(end);
-
- if (node.getStatus() == Status.KILLED) {
- succeeded = false;
+ if (!Status.isStatusFinished(base.getStatus())) {
+ base.setStatus(status);
+ if (base.getParentFlow() != null) {
+ propagateStatus(base.getParentFlow(), status);
}
-
- Props output = node.getOutputProps();
- if (output != null) {
- output = Props.clone(output);
- output.setParent(previousOutput);
- previousOutput = output;
- }
- }
-
- if (!succeeded && (base.getStatus() == Status.RUNNING)) {
- base.setStatus(Status.KILLED);
- }
- base.setOutputProps(previousOutput);
- finalizeFlow(base);
-
- if (base.getParentFlow() != null) {
- finalizeFlowIfFinished(base.getParentFlow());
}
}
}
-
+
public boolean isCancelled() {
return flowCancelled;
}
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index cda8279..e99bbe2 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -317,34 +317,42 @@ public class ExecutableFlowBase extends ExecutableNode {
* Finds all jobs which are ready to run. This occurs when all of its
* dependency nodes are finished running.
*
+ * It will also return any subflow that has been completed such that the
+ * FlowRunner can properly handle them.
+ *
* @param flow
* @return
*/
public List<ExecutableNode> findNextJobsToRun() {
ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
- nodeloop:
- for (ExecutableNode node: executableNodes.values()) {
- if(Status.isStatusFinished(node.getStatus())) {
- continue;
- }
-
- if ((node instanceof ExecutableFlowBase) && Status.isStatusRunning(node.getStatus())) {
- // If the flow is still running, we traverse into the flow
- jobsToRun.addAll(((ExecutableFlowBase)node).findNextJobsToRun());
- }
- else if (Status.isStatusRunning(node.getStatus())) {
- continue;
- }
- else {
- for (String dependency: node.getInNodes()) {
- // We find that the outer-loop is unfinished.
- if (!Status.isStatusFinished(getExecutableNode(dependency).getStatus())) {
- continue nodeloop;
+ if (isFlowFinished() && !Status.isStatusFinished(getStatus())) {
+ jobsToRun.add(this);
+ }
+ else {
+ nodeloop:
+ for (ExecutableNode node: executableNodes.values()) {
+ if(Status.isStatusFinished(node.getStatus())) {
+ continue;
+ }
+
+ if ((node instanceof ExecutableFlowBase) && Status.isStatusRunning(node.getStatus())) {
+ // If the flow is still running, we traverse into the flow
+ jobsToRun.addAll(((ExecutableFlowBase)node).findNextJobsToRun());
+ }
+ else if (Status.isStatusRunning(node.getStatus())) {
+ continue;
+ }
+ else {
+ for (String dependency: node.getInNodes()) {
+ // We find that the outer-loop is unfinished.
+ if (!Status.isStatusFinished(getExecutableNode(dependency).getStatus())) {
+ continue nodeloop;
+ }
}
+
+ jobsToRun.add(node);
}
-
- jobsToRun.add(node);
}
}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index d7fe287..a8c620e 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -285,7 +285,7 @@ public class FlowRunnerTest {
synchronized(this) {
try {
- wait(4500);
+ wait(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 312dea1..2cb47b8 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -223,6 +223,7 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobf", Status.SUCCEEDED);
compareStates(expectedStateMap, nodeMap);
Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
+
Assert.assertFalse(thread.isAlive());
}
@@ -577,7 +578,7 @@ public class FlowRunnerTest2 {
compareStates(expectedStateMap, nodeMap);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
- pause(1000);
+ pause(250);
expectedStateMap.put("jobb", Status.FAILED);
expectedStateMap.put("jobb:innerJobB", Status.FAILED);
expectedStateMap.put("jobb:innerJobC", Status.FAILED);
@@ -588,10 +589,11 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
expectedStateMap.put("jobe", Status.KILLED);
expectedStateMap.put("jobf", Status.KILLED);
-
- Assert.assertEquals(Status.FAILED, flow.getStatus());
compareStates(expectedStateMap, nodeMap);
+
Assert.assertFalse(thread.isAlive());
+ Assert.assertEquals(Status.FAILED, flow.getStatus());
+
}
@Test
@@ -700,7 +702,7 @@ public class FlowRunnerTest2 {
// 1. START FLOW
createExpectedStateMap(flow, expectedStateMap, nodeMap);
Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ pause(1000);
// After it starts up, only joba should be running
expectedStateMap.put("joba", Status.RUNNING);
@@ -728,7 +730,7 @@ public class FlowRunnerTest2 {
compareStates(expectedStateMap, nodeMap);
runner.cancel("me");
- pause(1000);
+ pause(250);
expectedStateMap.put("jobb", Status.KILLED);
expectedStateMap.put("jobb:innerJobB", Status.FAILED);
@@ -805,14 +807,11 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobe", Status.KILLED);
expectedStateMap.put("jobf", Status.KILLED);
- Assert.assertEquals(Status.FAILED, flow.getStatus());
+ Assert.assertEquals(Status.KILLED, flow.getStatus());
compareStates(expectedStateMap, nodeMap);
Assert.assertFalse(thread.isAlive());
}
- // @TODO write test for:
- // Pipeline
-
private Thread runFlowRunnerInThread(FlowRunner runner) {
Thread thread = new Thread(runner);
thread.start();