azkaban-developers
Changes
src/java/azkaban/execapp/JobRunner.java 38(+29 -9)
unit/executions/embedded2/jobg.job 2(+2 -0)
Details
src/java/azkaban/execapp/JobRunner.java 38(+29 -9)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index a684307..b1a8c54 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -123,16 +123,36 @@ public class JobRunner extends EventHandler implements Runnable {
else if (this.pipelineLevel == 2) {
pipelineJobs.add(node.getNestedId());
ExecutableFlowBase parentFlow = node.getParentFlow();
- for (String outNode : node.getOutNodes()) {
- ExecutableNode nextNode = parentFlow.getExecutableNode(outNode);
-
- // If the next node is a nested flow, then we add the nested starting nodes
- if (nextNode instanceof ExecutableFlowBase) {
- ExecutableFlowBase nextFlow = (ExecutableFlowBase)nextNode;
- findAllStartingNodes(nextFlow, pipelineJobs);
+
+ if (parentFlow.getEndNodes().contains(node.getId())) {
+ if (!parentFlow.getOutNodes().isEmpty()) {
+ ExecutableFlowBase grandParentFlow = parentFlow.getParentFlow();
+ for (String outNode: parentFlow.getOutNodes()) {
+ ExecutableNode nextNode = grandParentFlow.getExecutableNode(outNode);
+
+ // If the next node is a nested flow, then we add the nested starting nodes
+ if (nextNode instanceof ExecutableFlowBase) {
+ ExecutableFlowBase nextFlow = (ExecutableFlowBase)nextNode;
+ findAllStartingNodes(nextFlow, pipelineJobs);
+ }
+ else {
+ pipelineJobs.add(nextNode.getNestedId());
+ }
+ }
}
- else {
- pipelineJobs.add(nextNode.getNestedId());
+ }
+ else {
+ for (String outNode : node.getOutNodes()) {
+ ExecutableNode nextNode = parentFlow.getExecutableNode(outNode);
+
+ // If the next node is a nested flow, then we add the nested starting nodes
+ if (nextNode instanceof ExecutableFlowBase) {
+ ExecutableFlowBase nextFlow = (ExecutableFlowBase)nextNode;
+ findAllStartingNodes(nextFlow, pipelineJobs);
+ }
+ else {
+ pipelineJobs.add(nextNode.getNestedId());
+ }
}
}
}
unit/executions/embedded2/jobg.job 2(+2 -0)
diff --git a/unit/executions/embedded2/jobg.job b/unit/executions/embedded2/jobg.job
new file mode 100644
index 0000000..b1b00ce
--- /dev/null
+++ b/unit/executions/embedded2/jobg.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=jobe,joba1
diff --git a/unit/executions/embedded2/pipeline1.job b/unit/executions/embedded2/pipeline1.job
new file mode 100644
index 0000000..4afbfdc
--- /dev/null
+++ b/unit/executions/embedded2/pipeline1.job
@@ -0,0 +1 @@
+type=test
\ No newline at end of file
diff --git a/unit/executions/embedded2/pipeline1_1.job b/unit/executions/embedded2/pipeline1_1.job
new file mode 100644
index 0000000..cfe35cc
--- /dev/null
+++ b/unit/executions/embedded2/pipeline1_1.job
@@ -0,0 +1,4 @@
+type=flow
+flow.name=innerFlow2
+testprops=moo
+output.override=jobb
\ No newline at end of file
diff --git a/unit/executions/embedded2/pipeline1_2.job b/unit/executions/embedded2/pipeline1_2.job
new file mode 100644
index 0000000..711d823
--- /dev/null
+++ b/unit/executions/embedded2/pipeline1_2.job
@@ -0,0 +1,5 @@
+type=flow
+flow.name=innerFlow2
+dependencies=pipeline1_1
+testprops=moo
+output.override=jobb
\ No newline at end of file
diff --git a/unit/executions/embedded2/pipeline2.job b/unit/executions/embedded2/pipeline2.job
new file mode 100644
index 0000000..84f6498
--- /dev/null
+++ b/unit/executions/embedded2/pipeline2.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=pipeline1
\ No newline at end of file
diff --git a/unit/executions/embedded2/pipeline4.job b/unit/executions/embedded2/pipeline4.job
new file mode 100644
index 0000000..b24c4ba
--- /dev/null
+++ b/unit/executions/embedded2/pipeline4.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=pipelineEmbeddedFlow3
\ No newline at end of file
diff --git a/unit/executions/embedded2/pipelineEmbeddedFlow3.job b/unit/executions/embedded2/pipelineEmbeddedFlow3.job
new file mode 100644
index 0000000..0a1ae46
--- /dev/null
+++ b/unit/executions/embedded2/pipelineEmbeddedFlow3.job
@@ -0,0 +1,5 @@
+type=flow
+flow.name=innerFlow
+dependencies=pipeline2
+testprops=moo
+output.override=jobb
\ No newline at end of file
diff --git a/unit/executions/embedded2/pipelineFlow.job b/unit/executions/embedded2/pipelineFlow.job
new file mode 100644
index 0000000..e50329c
--- /dev/null
+++ b/unit/executions/embedded2/pipelineFlow.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=pipeline4
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index 1ef8804..9d755df 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -259,6 +259,273 @@ public class FlowRunnerPipelineTest {
Assert.assertFalse(thread2.isAlive());
}
+ @Test
+ public void testBasicPipelineLevel2Run() throws Exception {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner previousRunner = createFlowRunner(eventCollector, "pipelineFlow", "prev");
+
+ ExecutionOptions options = new ExecutionOptions();
+ options.setPipelineExecutionId(previousRunner.getExecutableFlow().getExecutionId());
+ options.setPipelineLevel(2);
+ FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
+ FlowRunner pipelineRunner = createFlowRunner(eventCollector, "pipelineFlow", "pipe", options);
+ pipelineRunner.setFlowWatcher(watcher);
+
+ Map<String, Status> previousExpectedStateMap = new HashMap<String, Status>();
+ Map<String, Status> pipelineExpectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> previousNodeMap = new HashMap<String, ExecutableNode>();
+ Map<String, ExecutableNode> pipelineNodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
+ ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
+ createExpectedStateMap(previousFlow, previousExpectedStateMap, previousNodeMap);
+ createExpectedStateMap(pipelineFlow, pipelineExpectedStateMap, pipelineNodeMap);
+
+ Thread thread1 = runFlowRunnerInThread(previousRunner);
+ pause(250);
+ Thread thread2 = runFlowRunnerInThread(pipelineRunner);
+ pause(250);
+
+ previousExpectedStateMap.put("pipeline1", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipeline1", Status.QUEUED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipeline1").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipeline1", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipeline2", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipeline2").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipeline2", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3", Status.RUNNING);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobA", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipeline1", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipeline1").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipeline1", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipeline2", Status.QUEUED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipelineEmbeddedFlow3:innerJobA").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobA", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobB", Status.RUNNING);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobC", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipeline2", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipeline2").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipeline2", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobA", Status.QUEUED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipelineEmbeddedFlow3:innerJobB").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobB", Status.SUCCEEDED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipelineEmbeddedFlow3:innerJobC").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow", Status.RUNNING);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobC", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobA", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipelineEmbeddedFlow3:innerJobA").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobA", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobC", Status.QUEUED);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobB", Status.QUEUED);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipelineEmbeddedFlow3:innerFlow").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipelineEmbeddedFlow3", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipeline4", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobC", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobB", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipelineEmbeddedFlow3:innerJobB").succeedJob();
+ InteractiveTestJob.getTestJob("pipe:pipelineEmbeddedFlow3:innerJobC").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobC", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobB", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow", Status.QUEUED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipeline4").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipeline4", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipelineFlow", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipelineFlow").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipelineFlow", Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
+ Assert.assertFalse(thread1.isAlive());
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipelineEmbeddedFlow3:innerFlow").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipelineEmbeddedFlow3", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipeline4", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipeline4").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipeline4", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipelineFlow", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipelineFlow").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipelineFlow", Status.SUCCEEDED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+ Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
+ Assert.assertFalse(thread2.isAlive());
+ }
+
+ @Test
+ public void testBasicPipelineLevel2Run2() throws Exception {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner previousRunner = createFlowRunner(eventCollector, "pipeline1_2", "prev");
+
+ ExecutionOptions options = new ExecutionOptions();
+ options.setPipelineExecutionId(previousRunner.getExecutableFlow().getExecutionId());
+ options.setPipelineLevel(2);
+ FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
+ FlowRunner pipelineRunner = createFlowRunner(eventCollector, "pipeline1_2", "pipe", options);
+ pipelineRunner.setFlowWatcher(watcher);
+
+ Map<String, Status> previousExpectedStateMap = new HashMap<String, Status>();
+ Map<String, Status> pipelineExpectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> previousNodeMap = new HashMap<String, ExecutableNode>();
+ Map<String, ExecutableNode> pipelineNodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
+ ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
+ createExpectedStateMap(previousFlow, previousExpectedStateMap, previousNodeMap);
+ createExpectedStateMap(pipelineFlow, pipelineExpectedStateMap, pipelineNodeMap);
+
+ Thread thread1 = runFlowRunnerInThread(previousRunner);
+ pause(250);
+ Thread thread2 = runFlowRunnerInThread(pipelineRunner);
+ pause(250);
+
+ previousExpectedStateMap.put("pipeline1_1", Status.RUNNING);
+ previousExpectedStateMap.put("pipeline1_1:innerJobA", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipeline1_1", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipeline1_1:innerJobA", Status.QUEUED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipeline1_1:innerJobA").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipeline1_1:innerJobA", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipeline1_1:innerFlow2", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipeline1_1:innerFlow2").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipeline1_1", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipeline1_1:innerFlow2", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipeline1_2", Status.RUNNING);
+ previousExpectedStateMap.put("pipeline1_2:innerJobA", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipeline1_1:innerJobA", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipeline1_1:innerJobA").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipeline1_1:innerJobA", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipeline1_1:innerFlow2", Status.QUEUED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipeline1_2:innerJobA").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipeline1_2:innerJobA", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipeline1_2:innerFlow2", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipeline1_1:innerFlow2", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipeline1_1:innerFlow2").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipeline1_1", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipeline1_1:innerFlow2", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipeline1_2", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipeline1_2:innerJobA", Status.QUEUED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipeline1_1:innerFlow2").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipeline1_1", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipeline1_1:innerFlow2", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipeline1_2", Status.RUNNING);
+ pipelineExpectedStateMap.put("pipeline1_2:innerJobA", Status.QUEUED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:pipeline1_2:innerFlow2").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("pipeline1_2:innerFlow2", Status.SUCCEEDED);
+ previousExpectedStateMap.put("pipeline1_2", Status.SUCCEEDED);
+ Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
+ Assert.assertFalse(thread1.isAlive());
+ pipelineExpectedStateMap.put("pipeline1_2:innerJobA", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipeline1_2:innerJobA").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipeline1_2:innerJobA", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipeline1_2:innerFlow2", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:pipeline1_2:innerFlow2").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("pipeline1_2", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("pipeline1_2:innerFlow2", Status.SUCCEEDED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+ Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
+ Assert.assertFalse(thread2.isAlive());
+ }
+
private Thread runFlowRunnerInThread(FlowRunner runner) {
Thread thread = new Thread(runner);
thread.start();
@@ -290,7 +557,9 @@ public class FlowRunnerPipelineTest {
for (String printedId: expectedStateMap.keySet()) {
Status expectedStatus = expectedStateMap.get(printedId);
ExecutableNode node = nodeMap.get(printedId);
-
+ if (node == null) {
+ System.out.println("id node: " + printedId + " doesn't exist.");
+ }
if (expectedStatus != node.getStatus()) {
Assert.fail("Expected values do not match for " + printedId + ". Expected " + expectedStatus + ", instead received " + node.getStatus());
}