azkaban-aplcache

Fix bug stuck in pipeline mode (#1245) * Enable RemoteFlowWatcherTest Fixed

9/16/2017 11:38:06 AM

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/event/FlowWatcher.java b/azkaban-exec-server/src/main/java/azkaban/execapp/event/FlowWatcher.java
index 3066ead..984eb8e 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/event/FlowWatcher.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/event/FlowWatcher.java
@@ -17,6 +17,7 @@
 package azkaban.execapp.event;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.Status;
 import java.util.Map;
@@ -82,8 +83,19 @@ public abstract class FlowWatcher {
   }
 
   public Status peekStatus(final String jobId) {
+    if (Status.isStatusFinished(this.flow.getStatus())) {
+      return null;
+    }
     final ExecutableNode node = this.flow.getExecutableNodePath(jobId);
     if (node != null) {
+      ExecutableFlowBase parentFlow = node.getParentFlow();
+      while (parentFlow != null) {
+        Status parentStatus = parentFlow.getStatus();
+        if (parentStatus == Status.SKIPPED || parentStatus == Status.DISABLED) {
+          return Status.SKIPPED;
+        }
+        parentFlow = parentFlow.getParentFlow();
+      }
       return node.getStatus();
     }
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index c1b5c87..761e6db 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -338,6 +338,8 @@ public class FlowRunnerManager implements EventListener,
       if (runner != null) {
         watcher = new LocalFlowWatcher(runner);
       } else {
+        // also ends up here if execute is called with pipelineExecId that's not running any more
+        // (it could have just finished, for example)
         watcher = new RemoteFlowWatcher(pipelineExecId, this.executorLoader);
       }
     }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index b7cd06f..69c4835 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -94,7 +94,41 @@ public class RemoteFlowWatcherTest {
 
     runner2Thread.join();
 
-    assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+    assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), false);
+  }
+
+  @Test
+  public void testRemoteFlowWatcherBlockingJobsLeftInReadyState() throws Exception {
+    final MockExecutorLoader loader = new MockExecutorLoader();
+
+    final EventCollectorListener eventCollector = new EventCollectorListener();
+
+    final File workingDir1 = this.temporaryFolder.newFolder();
+    final FlowRunner runner1 =
+        createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
+            null);
+    final Thread runner1Thread = new Thread(runner1);
+
+    final File workingDir2 = this.temporaryFolder.newFolder();
+    final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
+    final FlowRunner runner2 =
+        createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
+            watcher, 2);
+    final Thread runner2Thread = new Thread(runner2);
+
+    printCurrentState("runner1 ", runner1.getExecutableFlow());
+    runner1Thread.start();
+    runner1Thread.join();
+    // simulate a real life scenario - this can happen for disabled jobs inside subflows:
+    // flow has finished otherwise but a "blocking" job has READY status
+    // the test gets stuck here without the fix in FlowWatcher.peekStatus
+    runner1.getExecutableFlow().getExecutableNodePath("job4").setStatus(Status.READY);
+    loader.updateExecutableFlow(runner1.getExecutableFlow());
+
+    runner2Thread.start();
+    runner2Thread.join();
+
+    assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), true);
   }
 
   @Test
@@ -182,7 +216,8 @@ public class RemoteFlowWatcherTest {
     }
   }
 
-  private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
+  private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second,
+      final boolean job4Skipped) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
       Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
 
@@ -191,7 +226,8 @@ public class RemoteFlowWatcherTest {
       if (watchedNode == null) {
         continue;
       }
-      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
+      Assert.assertEquals(watchedNode.getStatus(),
+          job4Skipped && watchedNode.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
 
       long minDiff = Long.MAX_VALUE;
       for (final String watchedChild : watchedNode.getOutNodes()) {
@@ -199,7 +235,8 @@ public class RemoteFlowWatcherTest {
         if (child == null) {
           continue;
         }
-        Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
+        Assert.assertEquals(child.getStatus(),
+            job4Skipped && child.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
         final long diff = node.getStartTime() - child.getEndTime();
         minDiff = Math.min(minDiff, diff);
         Assert.assertTrue(
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 39b6c78..89f8b4b 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -60,10 +60,10 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
   private static int id = 101;
   private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private final AzkabanEventReporter azkabanEventReporter =
       EventReporterUtil.getTestAzkabanEventReporter();
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File workingDir;
   private JobTypeManager jobtypeManager;
   private ExecutorLoader fakeExecutorLoader;
@@ -95,6 +95,125 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
   }
 
   @Test
+  public void testBasicPipelineLevel1RunDisabledJobs() throws Exception {
+    final EventCollectorListener eventCollector = new EventCollectorListener();
+    final FlowRunner previousRunner =
+        createFlowRunner(eventCollector, "jobf", "prev");
+
+    final ExecutionOptions options = new ExecutionOptions();
+    options.setPipelineExecutionId(previousRunner.getExecutableFlow()
+        .getExecutionId());
+    options.setPipelineLevel(1);
+    final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
+    final FlowRunner pipelineRunner =
+        createFlowRunner(eventCollector, "jobf", "pipe", options);
+    pipelineRunner.setFlowWatcher(watcher);
+
+    // 1. START FLOW
+    final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
+    final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
+    // disable the innerFlow (entire sub-flow)
+    previousFlow.getExecutableNodePath("jobb").setStatus(Status.DISABLED);
+
+    runFlowRunnerInThread(previousRunner);
+    assertStatus(previousFlow, "joba", Status.RUNNING);
+    assertStatus(previousFlow, "joba", Status.RUNNING);
+    assertStatus(previousFlow, "joba1", Status.RUNNING);
+
+    runFlowRunnerInThread(pipelineRunner);
+    assertStatus(pipelineFlow, "joba", Status.QUEUED);
+    assertStatus(pipelineFlow, "joba1", Status.QUEUED);
+
+    InteractiveTestJob.getTestJob("prev:joba").succeedJob();
+    assertStatus(previousFlow, "joba", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobb", Status.SKIPPED);
+    assertStatus(previousFlow, "jobb:innerJobA", Status.READY);
+    assertStatus(previousFlow, "jobd", Status.RUNNING);
+    assertStatus(previousFlow, "jobc", Status.RUNNING);
+    assertStatus(previousFlow, "jobd:innerJobA", Status.RUNNING);
+    assertStatus(pipelineFlow, "joba", Status.RUNNING);
+
+    assertStatus(previousFlow, "jobb:innerJobA", Status.READY);
+    assertStatus(previousFlow, "jobb:innerJobB", Status.READY);
+    assertStatus(previousFlow, "jobb:innerJobC", Status.READY);
+
+    InteractiveTestJob.getTestJob("pipe:joba").succeedJob();
+    assertStatus(pipelineFlow, "joba", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobd", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobc", Status.QUEUED);
+    assertStatus(pipelineFlow, "jobd:innerJobA", Status.QUEUED);
+    assertStatus(pipelineFlow, "jobb:innerJobA", Status.RUNNING);
+
+    InteractiveTestJob.getTestJob("prev:jobd:innerJobA").succeedJob();
+    assertStatus(previousFlow, "jobd:innerJobA", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobd:innerFlow2", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobd:innerJobA", Status.RUNNING);
+
+    // Finish the previous d side
+    InteractiveTestJob.getTestJob("prev:jobd:innerFlow2").succeedJob();
+    assertStatus(previousFlow, "jobd:innerFlow2", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobd", Status.SUCCEEDED);
+
+    InteractiveTestJob.getTestJob("pipe:jobb:innerJobA").succeedJob();
+    InteractiveTestJob.getTestJob("prev:jobc").succeedJob();
+    assertStatus(previousFlow, "jobb:innerJobB", Status.READY);
+    assertStatus(previousFlow, "jobb:innerJobC", Status.READY);
+    assertStatus(previousFlow, "jobb:innerFlow", Status.READY);
+    assertStatus(previousFlow, "jobc", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobc", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobb:innerJobB", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobb:innerJobC", Status.RUNNING);
+
+    InteractiveTestJob.getTestJob("pipe:jobc").succeedJob();
+    assertStatus(previousFlow, "jobb:innerFlow", Status.READY);
+    assertStatus(previousFlow, "jobb", Status.SKIPPED);
+    assertStatus(previousFlow, "jobe", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobc", Status.SUCCEEDED);
+
+    InteractiveTestJob.getTestJob("pipe:jobb:innerJobB").succeedJob();
+    InteractiveTestJob.getTestJob("pipe:jobb:innerJobC").succeedJob();
+    InteractiveTestJob.getTestJob("prev:jobe").succeedJob();
+    assertStatus(previousFlow, "jobe", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb:innerJobB", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb:innerJobC", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb:innerFlow", Status.RUNNING);
+
+    InteractiveTestJob.getTestJob("pipe:jobd:innerJobA").succeedJob();
+    InteractiveTestJob.getTestJob("pipe:jobb:innerFlow").succeedJob();
+    assertStatus(pipelineFlow, "jobb", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobd:innerJobA", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb:innerFlow", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobd:innerFlow2", Status.RUNNING);
+
+    InteractiveTestJob.getTestJob("pipe:jobd:innerFlow2").succeedJob();
+    InteractiveTestJob.getTestJob("prev:joba1").succeedJob();
+    assertStatus(pipelineFlow, "jobd:innerFlow2", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobd", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobf", Status.RUNNING);
+    assertStatus(previousFlow, "joba1", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "joba1", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobe", Status.RUNNING);
+
+    InteractiveTestJob.getTestJob("pipe:jobe").succeedJob();
+    InteractiveTestJob.getTestJob("prev:jobf").succeedJob();
+    assertStatus(pipelineFlow, "jobe", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobf", Status.SUCCEEDED);
+    assertFlowStatus(previousFlow, Status.SUCCEEDED);
+
+    InteractiveTestJob.getTestJob("pipe:joba1").succeedJob();
+    assertStatus(pipelineFlow, "joba1", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobf", Status.RUNNING);
+
+    InteractiveTestJob.getTestJob("pipe:jobf").succeedJob();
+
+    assertThreadShutDown(previousRunner);
+    assertThreadShutDown(pipelineRunner);
+    assertFlowStatus(pipelineFlow, Status.SUCCEEDED);
+  }
+
+  @Test
   public void testBasicPipelineLevel1Run() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final FlowRunner previousRunner =