diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index b7cd06f..69c4835 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -94,7 +94,41 @@ public class RemoteFlowWatcherTest {
runner2Thread.join();
- assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), false);
+ }
+
+ @Test
+ public void testRemoteFlowWatcherBlockingJobsLeftInReadyState() throws Exception {
+ final MockExecutorLoader loader = new MockExecutorLoader();
+
+ final EventCollectorListener eventCollector = new EventCollectorListener();
+
+ final File workingDir1 = this.temporaryFolder.newFolder();
+ final FlowRunner runner1 =
+ createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
+ null);
+ final Thread runner1Thread = new Thread(runner1);
+
+ final File workingDir2 = this.temporaryFolder.newFolder();
+ final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
+ final FlowRunner runner2 =
+ createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
+ watcher, 2);
+ final Thread runner2Thread = new Thread(runner2);
+
+ printCurrentState("runner1 ", runner1.getExecutableFlow());
+ runner1Thread.start();
+ runner1Thread.join();
+ // simulate a real life scenario - this can happen for disabled jobs inside subflows:
+ // flow has finished otherwise but a "blocking" job has READY status
+ // the test gets stuck here without the fix in FlowWatcher.peekStatus
+ runner1.getExecutableFlow().getExecutableNodePath("job4").setStatus(Status.READY);
+ loader.updateExecutableFlow(runner1.getExecutableFlow());
+
+ runner2Thread.start();
+ runner2Thread.join();
+
+ assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), true);
}
@Test
@@ -182,7 +216,8 @@ public class RemoteFlowWatcherTest {
}
}
- private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
+ private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second,
+ final boolean job4Skipped) {
for (final ExecutableNode node : second.getExecutableNodes()) {
Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
@@ -191,7 +226,8 @@ public class RemoteFlowWatcherTest {
if (watchedNode == null) {
continue;
}
- Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
+ Assert.assertEquals(watchedNode.getStatus(),
+ job4Skipped && watchedNode.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
long minDiff = Long.MAX_VALUE;
for (final String watchedChild : watchedNode.getOutNodes()) {
@@ -199,7 +235,8 @@ public class RemoteFlowWatcherTest {
if (child == null) {
continue;
}
- Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
+ Assert.assertEquals(child.getStatus(),
+ job4Skipped && child.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
final long diff = node.getStartTime() - child.getEndTime();
minDiff = Math.min(minDiff, diff);
Assert.assertTrue(
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 39b6c78..89f8b4b 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -60,10 +60,10 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
private static int id = 101;
private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
private final AzkabanEventReporter azkabanEventReporter =
EventReporterUtil.getTestAzkabanEventReporter();
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File workingDir;
private JobTypeManager jobtypeManager;
private ExecutorLoader fakeExecutorLoader;
@@ -95,6 +95,125 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
}
@Test
+ public void testBasicPipelineLevel1RunDisabledJobs() throws Exception {
+ final EventCollectorListener eventCollector = new EventCollectorListener();
+ final FlowRunner previousRunner =
+ createFlowRunner(eventCollector, "jobf", "prev");
+
+ final ExecutionOptions options = new ExecutionOptions();
+ options.setPipelineExecutionId(previousRunner.getExecutableFlow()
+ .getExecutionId());
+ options.setPipelineLevel(1);
+ final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
+ final FlowRunner pipelineRunner =
+ createFlowRunner(eventCollector, "jobf", "pipe", options);
+ pipelineRunner.setFlowWatcher(watcher);
+
+ // 1. START FLOW
+ final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
+ final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
+ // disable the innerFlow (entire sub-flow)
+ previousFlow.getExecutableNodePath("jobb").setStatus(Status.DISABLED);
+
+ runFlowRunnerInThread(previousRunner);
+ assertStatus(previousFlow, "joba", Status.RUNNING);
+ assertStatus(previousFlow, "joba", Status.RUNNING);
+ assertStatus(previousFlow, "joba1", Status.RUNNING);
+
+ runFlowRunnerInThread(pipelineRunner);
+ assertStatus(pipelineFlow, "joba", Status.QUEUED);
+ assertStatus(pipelineFlow, "joba1", Status.QUEUED);
+
+ InteractiveTestJob.getTestJob("prev:joba").succeedJob();
+ assertStatus(previousFlow, "joba", Status.SUCCEEDED);
+ assertStatus(previousFlow, "jobb", Status.SKIPPED);
+ assertStatus(previousFlow, "jobb:innerJobA", Status.READY);
+ assertStatus(previousFlow, "jobd", Status.RUNNING);
+ assertStatus(previousFlow, "jobc", Status.RUNNING);
+ assertStatus(previousFlow, "jobd:innerJobA", Status.RUNNING);
+ assertStatus(pipelineFlow, "joba", Status.RUNNING);
+
+ assertStatus(previousFlow, "jobb:innerJobA", Status.READY);
+ assertStatus(previousFlow, "jobb:innerJobB", Status.READY);
+ assertStatus(previousFlow, "jobb:innerJobC", Status.READY);
+
+ InteractiveTestJob.getTestJob("pipe:joba").succeedJob();
+ assertStatus(pipelineFlow, "joba", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobb", Status.RUNNING);
+ assertStatus(pipelineFlow, "jobd", Status.RUNNING);
+ assertStatus(pipelineFlow, "jobc", Status.QUEUED);
+ assertStatus(pipelineFlow, "jobd:innerJobA", Status.QUEUED);
+ assertStatus(pipelineFlow, "jobb:innerJobA", Status.RUNNING);
+
+ InteractiveTestJob.getTestJob("prev:jobd:innerJobA").succeedJob();
+ assertStatus(previousFlow, "jobd:innerJobA", Status.SUCCEEDED);
+ assertStatus(previousFlow, "jobd:innerFlow2", Status.RUNNING);
+ assertStatus(pipelineFlow, "jobd:innerJobA", Status.RUNNING);
+
+ // Finish the previous d side
+ InteractiveTestJob.getTestJob("prev:jobd:innerFlow2").succeedJob();
+ assertStatus(previousFlow, "jobd:innerFlow2", Status.SUCCEEDED);
+ assertStatus(previousFlow, "jobd", Status.SUCCEEDED);
+
+ InteractiveTestJob.getTestJob("pipe:jobb:innerJobA").succeedJob();
+ InteractiveTestJob.getTestJob("prev:jobc").succeedJob();
+ assertStatus(previousFlow, "jobb:innerJobB", Status.READY);
+ assertStatus(previousFlow, "jobb:innerJobC", Status.READY);
+ assertStatus(previousFlow, "jobb:innerFlow", Status.READY);
+ assertStatus(previousFlow, "jobc", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobb:innerJobA", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobc", Status.RUNNING);
+ assertStatus(pipelineFlow, "jobb:innerJobB", Status.RUNNING);
+ assertStatus(pipelineFlow, "jobb:innerJobC", Status.RUNNING);
+
+ InteractiveTestJob.getTestJob("pipe:jobc").succeedJob();
+ assertStatus(previousFlow, "jobb:innerFlow", Status.READY);
+ assertStatus(previousFlow, "jobb", Status.SKIPPED);
+ assertStatus(previousFlow, "jobe", Status.RUNNING);
+ assertStatus(pipelineFlow, "jobc", Status.SUCCEEDED);
+
+ InteractiveTestJob.getTestJob("pipe:jobb:innerJobB").succeedJob();
+ InteractiveTestJob.getTestJob("pipe:jobb:innerJobC").succeedJob();
+ InteractiveTestJob.getTestJob("prev:jobe").succeedJob();
+ assertStatus(previousFlow, "jobe", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobb:innerJobB", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobb:innerJobC", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobb:innerFlow", Status.RUNNING);
+
+ InteractiveTestJob.getTestJob("pipe:jobd:innerJobA").succeedJob();
+ InteractiveTestJob.getTestJob("pipe:jobb:innerFlow").succeedJob();
+ assertStatus(pipelineFlow, "jobb", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobd:innerJobA", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobb:innerFlow", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobd:innerFlow2", Status.RUNNING);
+
+ InteractiveTestJob.getTestJob("pipe:jobd:innerFlow2").succeedJob();
+ InteractiveTestJob.getTestJob("prev:joba1").succeedJob();
+ assertStatus(pipelineFlow, "jobd:innerFlow2", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobd", Status.SUCCEEDED);
+ assertStatus(previousFlow, "jobf", Status.RUNNING);
+ assertStatus(previousFlow, "joba1", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "joba1", Status.RUNNING);
+ assertStatus(pipelineFlow, "jobe", Status.RUNNING);
+
+ InteractiveTestJob.getTestJob("pipe:jobe").succeedJob();
+ InteractiveTestJob.getTestJob("prev:jobf").succeedJob();
+ assertStatus(pipelineFlow, "jobe", Status.SUCCEEDED);
+ assertStatus(previousFlow, "jobf", Status.SUCCEEDED);
+ assertFlowStatus(previousFlow, Status.SUCCEEDED);
+
+ InteractiveTestJob.getTestJob("pipe:joba1").succeedJob();
+ assertStatus(pipelineFlow, "joba1", Status.SUCCEEDED);
+ assertStatus(pipelineFlow, "jobf", Status.RUNNING);
+
+ InteractiveTestJob.getTestJob("pipe:jobf").succeedJob();
+
+ assertThreadShutDown(previousRunner);
+ assertThreadShutDown(pipelineRunner);
+ assertFlowStatus(pipelineFlow, Status.SUCCEEDED);
+ }
+
+ @Test
public void testBasicPipelineLevel1Run() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
final FlowRunner previousRunner =