azkaban-aplcache

Fix and enable FlowRunnerPipelineTest (#1243) * Enable

9/16/2017 1:07:03 AM

Details

diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 21511cc..39b6c78 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -20,9 +20,8 @@ import static org.mockito.Mockito.mock;
 
 import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.event.LocalFlowWatcher;
+import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlowBase;
-import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
@@ -35,18 +34,17 @@ import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.spi.AzkabanEventReporter;
+import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
 import java.io.File;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
-import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /**
  * Flows in this test: joba jobb joba1 jobc->joba jobd->joba jobe->jobb,jobc,jobd jobf->jobe,joba1
@@ -58,10 +56,12 @@ import org.junit.Test;
  *
  * @author rpark
  */
-public class FlowRunnerPipelineTest {
+public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
   private static int id = 101;
   private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private final AzkabanEventReporter azkabanEventReporter =
       EventReporterUtil.getTestAzkabanEventReporter();
   private File workingDir;
@@ -75,12 +75,7 @@ public class FlowRunnerPipelineTest {
 
   @Before
   public void setUp() throws Exception {
-    System.out.println("Create temp dir");
-    this.workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
-    if (this.workingDir.exists()) {
-      FileUtils.deleteDirectory(this.workingDir);
-    }
-    this.workingDir.mkdirs();
+    this.workingDir = this.temporaryFolder.newFolder();
     this.jobtypeManager =
         new JobTypeManager(null, null, this.getClass().getClassLoader());
     final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
@@ -89,24 +84,16 @@ public class FlowRunnerPipelineTest {
     pluginSet.addPluginClass("test", InteractiveTestJob.class);
     this.fakeExecutorLoader = new MockExecutorLoader();
     this.project = new Project(1, "testProject");
+    Utils.initServiceProvider();
+    JmxJobMBeanManager.getInstance().initialize(new Props());
 
-    final File dir = new File("unit/executions/embedded2");
+    final File dir = ExecutionsTestUtil.getFlowDir("embedded2");
     this.flowMap = FlowRunnerTestUtil
         .prepareProject(this.project, dir, this.logger, this.workingDir);
 
     InteractiveTestJob.clearTestJobs();
   }
 
-  @After
-  public void tearDown() throws IOException {
-    System.out.println("Teardown temp dir");
-    if (this.workingDir != null) {
-      FileUtils.deleteDirectory(this.workingDir);
-      this.workingDir = null;
-    }
-  }
-
-  @Ignore
   @Test
   public void testBasicPipelineLevel1Run() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
@@ -122,165 +109,112 @@ public class FlowRunnerPipelineTest {
         createFlowRunner(eventCollector, "jobf", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
-    final Map<String, Status> previousExpectedStateMap =
-        new HashMap<>();
-    final Map<String, Status> pipelineExpectedStateMap =
-        new HashMap<>();
-    final Map<String, ExecutableNode> previousNodeMap =
-        new HashMap<>();
-    final Map<String, ExecutableNode> pipelineNodeMap =
-        new HashMap<>();
-
     // 1. START FLOW
     final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
     final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
-    createExpectedStateMap(previousFlow, previousExpectedStateMap,
-        previousNodeMap);
-    createExpectedStateMap(pipelineFlow, pipelineExpectedStateMap,
-        pipelineNodeMap);
-
-    final Thread thread1 = runFlowRunnerInThread(previousRunner);
-    pause(250);
-    final Thread thread2 = runFlowRunnerInThread(pipelineRunner);
-    pause(500);
-
-    previousExpectedStateMap.put("joba", Status.RUNNING);
-    previousExpectedStateMap.put("joba1", Status.RUNNING);
-    pipelineExpectedStateMap.put("joba", Status.QUEUED);
-    pipelineExpectedStateMap.put("joba1", Status.QUEUED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+    runFlowRunnerInThread(previousRunner);
+    assertStatus(previousFlow, "joba", Status.RUNNING);
+    assertStatus(previousFlow, "joba", Status.RUNNING);
+    assertStatus(previousFlow, "joba1", Status.RUNNING);
+
+    runFlowRunnerInThread(pipelineRunner);
+    assertStatus(pipelineFlow, "joba", Status.QUEUED);
+    assertStatus(pipelineFlow, "joba1", Status.QUEUED);
 
     InteractiveTestJob.getTestJob("prev:joba").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("joba", Status.SUCCEEDED);
-    previousExpectedStateMap.put("jobb", Status.RUNNING);
-    previousExpectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    previousExpectedStateMap.put("jobd", Status.RUNNING);
-    previousExpectedStateMap.put("jobc", Status.RUNNING);
-    previousExpectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    pipelineExpectedStateMap.put("joba", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "joba", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobb", Status.RUNNING);
+    assertStatus(previousFlow, "jobb:innerJobA", Status.RUNNING);
+    assertStatus(previousFlow, "jobd", Status.RUNNING);
+    assertStatus(previousFlow, "jobc", Status.RUNNING);
+    assertStatus(previousFlow, "jobd:innerJobA", Status.RUNNING);
+    assertStatus(pipelineFlow, "joba", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("prev:jobb:innerJobA").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    previousExpectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    previousExpectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobb:innerJobB", Status.RUNNING);
+    assertStatus(previousFlow, "jobb:innerJobC", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:joba").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("joba", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobb", Status.RUNNING);
-    pipelineExpectedStateMap.put("jobd", Status.RUNNING);
-    pipelineExpectedStateMap.put("jobc", Status.QUEUED);
-    pipelineExpectedStateMap.put("jobd:innerJobA", Status.QUEUED);
-    pipelineExpectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "joba", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobd", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobc", Status.QUEUED);
+    assertStatus(pipelineFlow, "jobd:innerJobA", Status.QUEUED);
+    assertStatus(pipelineFlow, "jobb:innerJobA", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("prev:jobd:innerJobA").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-    previousExpectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
-    pipelineExpectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "jobd:innerJobA", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobd:innerFlow2", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobd:innerJobA", Status.RUNNING);
 
     // Finish the previous d side
     InteractiveTestJob.getTestJob("prev:jobd:innerFlow2").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
-    previousExpectedStateMap.put("jobd", Status.SUCCEEDED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
+    assertStatus(previousFlow, "jobd:innerFlow2", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobd", Status.SUCCEEDED);
 
     InteractiveTestJob.getTestJob("prev:jobb:innerJobB").succeedJob();
     InteractiveTestJob.getTestJob("prev:jobb:innerJobC").succeedJob();
     InteractiveTestJob.getTestJob("prev:jobc").succeedJob();
-    pause(250);
     InteractiveTestJob.getTestJob("pipe:jobb:innerJobA").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
-    previousExpectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-    previousExpectedStateMap.put("jobb:innerFlow", Status.RUNNING);
-    previousExpectedStateMap.put("jobc", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobc", Status.RUNNING);
-    pipelineExpectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    pipelineExpectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "jobb:innerJobB", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobb:innerJobC", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobb:innerFlow", Status.RUNNING);
+    assertStatus(previousFlow, "jobc", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobc", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobb:innerJobB", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobb:innerJobC", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("prev:jobb:innerFlow").succeedJob();
     InteractiveTestJob.getTestJob("pipe:jobc").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
-    previousExpectedStateMap.put("jobb", Status.SUCCEEDED);
-    previousExpectedStateMap.put("jobe", Status.RUNNING);
-    pipelineExpectedStateMap.put("jobc", Status.SUCCEEDED);
-
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "jobb:innerFlow", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobb", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobe", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobc", Status.SUCCEEDED);
 
     InteractiveTestJob.getTestJob("pipe:jobb:innerJobB").succeedJob();
     InteractiveTestJob.getTestJob("pipe:jobb:innerJobC").succeedJob();
     InteractiveTestJob.getTestJob("prev:jobe").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("jobe", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobb:innerFlow", Status.RUNNING);
-
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "jobe", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb:innerJobB", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb:innerJobC", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb:innerFlow", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:jobd:innerJobA").succeedJob();
     InteractiveTestJob.getTestJob("pipe:jobb:innerFlow").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("jobb", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "jobb", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobd:innerJobA", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobb:innerFlow", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobd:innerFlow2", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:jobd:innerFlow2").succeedJob();
     InteractiveTestJob.getTestJob("prev:joba1").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobd", Status.SUCCEEDED);
-    previousExpectedStateMap.put("jobf", Status.RUNNING);
-    previousExpectedStateMap.put("joba1", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("joba1", Status.RUNNING);
-    pipelineExpectedStateMap.put("jobe", Status.RUNNING);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
-    compareStates(previousExpectedStateMap, previousNodeMap);
+    assertStatus(pipelineFlow, "jobd:innerFlow2", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobd", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobf", Status.RUNNING);
+    assertStatus(previousFlow, "joba1", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "joba1", Status.RUNNING);
+    assertStatus(pipelineFlow, "jobe", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:jobe").succeedJob();
     InteractiveTestJob.getTestJob("prev:jobf").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("jobe", Status.SUCCEEDED);
-    previousExpectedStateMap.put("jobf", Status.SUCCEEDED);
-    Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "jobe", Status.SUCCEEDED);
+    assertStatus(previousFlow, "jobf", Status.SUCCEEDED);
+    assertFlowStatus(previousFlow, Status.SUCCEEDED);
 
     InteractiveTestJob.getTestJob("pipe:joba1").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("joba1", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("jobf", Status.RUNNING);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "joba1", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "jobf", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:jobf").succeedJob();
-    pause(250);
-    Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
-    Assert.assertFalse(thread1.isAlive());
-    Assert.assertFalse(thread2.isAlive());
+
+    assertThreadShutDown(previousRunner);
+    assertThreadShutDown(pipelineRunner);
+    assertFlowStatus(pipelineFlow, Status.SUCCEEDED);
   }
 
-  @Ignore
   @Test
   public void testBasicPipelineLevel2Run() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
@@ -296,185 +230,123 @@ public class FlowRunnerPipelineTest {
         createFlowRunner(eventCollector, "pipelineFlow", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
-    final Map<String, Status> previousExpectedStateMap =
-        new HashMap<>();
-    final Map<String, Status> pipelineExpectedStateMap =
-        new HashMap<>();
-    final Map<String, ExecutableNode> previousNodeMap =
-        new HashMap<>();
-    final Map<String, ExecutableNode> pipelineNodeMap =
-        new HashMap<>();
-
     // 1. START FLOW
     final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
     final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
-    createExpectedStateMap(previousFlow, previousExpectedStateMap,
-        previousNodeMap);
-    createExpectedStateMap(pipelineFlow, pipelineExpectedStateMap,
-        pipelineNodeMap);
 
-    final Thread thread1 = runFlowRunnerInThread(previousRunner);
-    pause(250);
-    final Thread thread2 = runFlowRunnerInThread(pipelineRunner);
-    pause(250);
+    runFlowRunnerInThread(previousRunner);
+    assertStatus(previousFlow, "pipeline1", Status.RUNNING);
 
-    previousExpectedStateMap.put("pipeline1", Status.RUNNING);
-    pipelineExpectedStateMap.put("pipeline1", Status.QUEUED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    runFlowRunnerInThread(pipelineRunner);
+    assertStatus(pipelineFlow, "pipeline1", Status.QUEUED);
 
     InteractiveTestJob.getTestJob("prev:pipeline1").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipeline1", Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipeline2", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "pipeline1", Status.SUCCEEDED);
+    assertStatus(previousFlow, "pipeline2", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("prev:pipeline2").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipeline2", Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3", Status.RUNNING);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobA",
+    assertStatus(previousFlow, "pipeline2", Status.SUCCEEDED);
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3", Status.RUNNING);
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3:innerJobA",
         Status.RUNNING);
-    pipelineExpectedStateMap.put("pipeline1", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "pipeline1", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:pipeline1").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipeline1", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipeline2", Status.QUEUED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "pipeline1", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipeline2", Status.QUEUED);
 
     InteractiveTestJob.getTestJob("prev:pipelineEmbeddedFlow3:innerJobA")
         .succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobA",
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3:innerJobA",
         Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobB",
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3:innerJobB",
         Status.RUNNING);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobC",
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3:innerJobC",
         Status.RUNNING);
-    pipelineExpectedStateMap.put("pipeline2", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "pipeline2", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:pipeline2").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipeline2", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3", Status.RUNNING);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobA",
+    assertStatus(pipelineFlow, "pipeline2", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3", Status.RUNNING);
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerJobA",
         Status.QUEUED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
 
     InteractiveTestJob.getTestJob("prev:pipelineEmbeddedFlow3:innerJobB")
         .succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobB",
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3:innerJobB",
         Status.SUCCEEDED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
 
     InteractiveTestJob.getTestJob("prev:pipelineEmbeddedFlow3:innerJobC")
         .succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow",
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3:innerFlow",
         Status.RUNNING);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobC",
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3:innerJobC",
         Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobA",
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerJobA",
         Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
 
     InteractiveTestJob.getTestJob("pipe:pipelineEmbeddedFlow3:innerJobA")
         .succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobA",
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerJobA",
         Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobC",
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerJobC",
         Status.QUEUED);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobB",
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerJobB",
         Status.QUEUED);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow",
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3:innerFlow",
         Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
 
     InteractiveTestJob.getTestJob("prev:pipelineEmbeddedFlow3:innerFlow")
         .succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow",
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3:innerFlow",
         Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipelineEmbeddedFlow3", Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipeline4", Status.RUNNING);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobC",
+    assertStatus(previousFlow, "pipelineEmbeddedFlow3", Status.SUCCEEDED);
+    assertStatus(previousFlow, "pipeline4", Status.RUNNING);
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerJobC",
         Status.RUNNING);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobB",
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerJobB",
         Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
 
     InteractiveTestJob.getTestJob("pipe:pipelineEmbeddedFlow3:innerJobB")
         .succeedJob();
     InteractiveTestJob.getTestJob("pipe:pipelineEmbeddedFlow3:innerJobC")
         .succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobC",
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerJobC",
         Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerJobB",
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerJobB",
         Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow",
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerFlow",
         Status.QUEUED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
 
     InteractiveTestJob.getTestJob("prev:pipeline4").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipeline4", Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipelineFlow", Status.RUNNING);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow",
+    assertStatus(previousFlow, "pipeline4", Status.SUCCEEDED);
+    assertStatus(previousFlow, "pipelineFlow", Status.RUNNING);
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerFlow",
         Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
 
     InteractiveTestJob.getTestJob("prev:pipelineFlow").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipelineFlow", Status.SUCCEEDED);
-    Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
-    Assert.assertFalse(thread1.isAlive());
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "pipelineFlow", Status.SUCCEEDED);
+    assertFlowStatus(previousFlow, Status.SUCCEEDED);
+    assertThreadShutDown(previousRunner);
 
     InteractiveTestJob.getTestJob("pipe:pipelineEmbeddedFlow3:innerFlow")
         .succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3:innerFlow",
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3:innerFlow",
         Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipelineEmbeddedFlow3", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipeline4", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "pipelineEmbeddedFlow3", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipeline4", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:pipeline4").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipeline4", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipelineFlow", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "pipeline4", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipelineFlow", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:pipelineFlow").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipelineFlow", Status.SUCCEEDED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
-    Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
-    Assert.assertFalse(thread2.isAlive());
+    assertStatus(pipelineFlow, "pipelineFlow", Status.SUCCEEDED);
+    assertFlowStatus(pipelineFlow, Status.SUCCEEDED);
+    assertThreadShutDown(pipelineRunner);
+
   }
 
-  @Ignore
   @Test
   public void testBasicPipelineLevel2Run2() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
@@ -490,164 +362,73 @@ public class FlowRunnerPipelineTest {
         createFlowRunner(eventCollector, "pipeline1_2", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
-    final Map<String, Status> previousExpectedStateMap =
-        new HashMap<>();
-    final Map<String, Status> pipelineExpectedStateMap =
-        new HashMap<>();
-    final Map<String, ExecutableNode> previousNodeMap =
-        new HashMap<>();
-    final Map<String, ExecutableNode> pipelineNodeMap =
-        new HashMap<>();
-
     // 1. START FLOW
     final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
     final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
-    createExpectedStateMap(previousFlow, previousExpectedStateMap,
-        previousNodeMap);
-    createExpectedStateMap(pipelineFlow, pipelineExpectedStateMap,
-        pipelineNodeMap);
-
-    final Thread thread1 = runFlowRunnerInThread(previousRunner);
-    pause(250);
-    final Thread thread2 = runFlowRunnerInThread(pipelineRunner);
-    pause(250);
-
-    previousExpectedStateMap.put("pipeline1_1", Status.RUNNING);
-    previousExpectedStateMap.put("pipeline1_1:innerJobA", Status.RUNNING);
-    pipelineExpectedStateMap.put("pipeline1_1", Status.RUNNING);
-    pipelineExpectedStateMap.put("pipeline1_1:innerJobA", Status.QUEUED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+    runFlowRunnerInThread(previousRunner);
+    assertStatus(previousFlow, "pipeline1_1", Status.RUNNING);
+    assertStatus(previousFlow, "pipeline1_1:innerJobA", Status.RUNNING);
+
+    runFlowRunnerInThread(pipelineRunner);
+    assertStatus(pipelineFlow, "pipeline1_1", Status.RUNNING);
+    assertStatus(pipelineFlow, "pipeline1_1:innerJobA", Status.QUEUED);
 
     InteractiveTestJob.getTestJob("prev:pipeline1_1:innerJobA").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipeline1_1:innerJobA", Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipeline1_1:innerFlow2", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "pipeline1_1:innerJobA", Status.SUCCEEDED);
+    assertStatus(previousFlow, "pipeline1_1:innerFlow2", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("prev:pipeline1_1:innerFlow2").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipeline1_1", Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipeline1_1:innerFlow2", Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipeline1_2", Status.RUNNING);
-    previousExpectedStateMap.put("pipeline1_2:innerJobA", Status.RUNNING);
-    pipelineExpectedStateMap.put("pipeline1_1:innerJobA", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "pipeline1_1", Status.SUCCEEDED);
+    assertStatus(previousFlow, "pipeline1_1:innerFlow2", Status.SUCCEEDED);
+    assertStatus(previousFlow, "pipeline1_2", Status.RUNNING);
+    assertStatus(previousFlow, "pipeline1_2:innerJobA", Status.RUNNING);
+    assertStatus(pipelineFlow, "pipeline1_1:innerJobA", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:pipeline1_1:innerJobA").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipeline1_1:innerJobA", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipeline1_1:innerFlow2", Status.QUEUED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "pipeline1_1:innerJobA", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipeline1_1:innerFlow2", Status.QUEUED);
 
     InteractiveTestJob.getTestJob("prev:pipeline1_2:innerJobA").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipeline1_2:innerJobA", Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipeline1_2:innerFlow2", Status.RUNNING);
-    pipelineExpectedStateMap.put("pipeline1_1:innerFlow2", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "pipeline1_2:innerJobA", Status.SUCCEEDED);
+    assertStatus(previousFlow, "pipeline1_2:innerFlow2", Status.RUNNING);
+    assertStatus(pipelineFlow, "pipeline1_1:innerFlow2", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:pipeline1_1:innerFlow2").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipeline1_1", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipeline1_1:innerFlow2", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipeline1_2", Status.RUNNING);
-    pipelineExpectedStateMap.put("pipeline1_2:innerJobA", Status.QUEUED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "pipeline1_1", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipeline1_1:innerFlow2", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipeline1_2", Status.RUNNING);
+    assertStatus(pipelineFlow, "pipeline1_2:innerJobA", Status.QUEUED);
 
     InteractiveTestJob.getTestJob("pipe:pipeline1_1:innerFlow2").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipeline1_1", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipeline1_1:innerFlow2", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipeline1_2", Status.RUNNING);
-    pipelineExpectedStateMap.put("pipeline1_2:innerJobA", Status.QUEUED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "pipeline1_1", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipeline1_1:innerFlow2", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipeline1_2", Status.RUNNING);
+    assertStatus(pipelineFlow, "pipeline1_2:innerJobA", Status.QUEUED);
 
     InteractiveTestJob.getTestJob("prev:pipeline1_2:innerFlow2").succeedJob();
-    pause(250);
-    previousExpectedStateMap.put("pipeline1_2:innerFlow2", Status.SUCCEEDED);
-    previousExpectedStateMap.put("pipeline1_2", Status.SUCCEEDED);
-    Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
-    Assert.assertFalse(thread1.isAlive());
-    pipelineExpectedStateMap.put("pipeline1_2:innerJobA", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(previousFlow, "pipeline1_2:innerFlow2", Status.SUCCEEDED);
+    assertStatus(previousFlow, "pipeline1_2", Status.SUCCEEDED);
+    assertFlowStatus(previousFlow, Status.SUCCEEDED);
+    assertThreadShutDown(previousRunner);
+    assertStatus(pipelineFlow, "pipeline1_2:innerJobA", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:pipeline1_2:innerJobA").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipeline1_2:innerJobA", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipeline1_2:innerFlow2", Status.RUNNING);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+    assertStatus(pipelineFlow, "pipeline1_2:innerJobA", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipeline1_2:innerFlow2", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("pipe:pipeline1_2:innerFlow2").succeedJob();
-    pause(250);
-    pipelineExpectedStateMap.put("pipeline1_2", Status.SUCCEEDED);
-    pipelineExpectedStateMap.put("pipeline1_2:innerFlow2", Status.SUCCEEDED);
-    compareStates(previousExpectedStateMap, previousNodeMap);
-    compareStates(pipelineExpectedStateMap, pipelineNodeMap);
-    Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
-    Assert.assertFalse(thread2.isAlive());
+    assertStatus(pipelineFlow, "pipeline1_2", Status.SUCCEEDED);
+    assertStatus(pipelineFlow, "pipeline1_2:innerFlow2", Status.SUCCEEDED);
+    assertFlowStatus(pipelineFlow, Status.SUCCEEDED);
+    assertThreadShutDown(pipelineRunner);
   }
 
-  private Thread runFlowRunnerInThread(final FlowRunner runner) {
+  private void runFlowRunnerInThread(final FlowRunner runner) {
     final Thread thread = new Thread(runner);
     thread.start();
-    return thread;
-  }
-
-  private void pause(final long millisec) {
-    try {
-      Thread.sleep(millisec);
-    } catch (final InterruptedException e) {
-    }
-  }
-
-  private void createExpectedStateMap(final ExecutableFlowBase flow,
-      final Map<String, Status> expectedStateMap, final Map<String, ExecutableNode> nodeMap) {
-    for (final ExecutableNode node : flow.getExecutableNodes()) {
-      expectedStateMap.put(node.getNestedId(), node.getStatus());
-      nodeMap.put(node.getNestedId(), node);
-
-      if (node instanceof ExecutableFlowBase) {
-        createExpectedStateMap((ExecutableFlowBase) node, expectedStateMap,
-            nodeMap);
-      }
-    }
-  }
-
-  private void compareStates(final Map<String, Status> expectedStateMap,
-      final Map<String, ExecutableNode> nodeMap) {
-    for (final String printedId : expectedStateMap.keySet()) {
-      final Status expectedStatus = expectedStateMap.get(printedId);
-      final ExecutableNode node = nodeMap.get(printedId);
-      if (node == null) {
-        System.out.println("id node: " + printedId + " doesn't exist.");
-      }
-      if (expectedStatus != node.getStatus()) {
-        Assert.fail("Expected values do not match for " + printedId
-            + ". Expected " + expectedStatus + ", instead received "
-            + node.getStatus());
-      }
-    }
   }
 
-  // private void printCurrentState(String prefix, ExecutableFlowBase flow) {
-  //   for  (ExecutableNode node: flow.getExecutableNodes()) {
-  //     System.err.println(prefix + node.getNestedId() + "->" +
-  //         node.getStatus().name());
-  //     if (node instanceof ExecutableFlowBase) {
-  //       printCurrentState(prefix, (ExecutableFlowBase)node);
-  //     }
-  //   }
-  // }
-  //
   private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
       final String flowName, final String groupName) throws Exception {
     return createFlowRunner(eventCollector, flowName, groupName,
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
index 55f61fa..029cbf3 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -47,6 +47,12 @@ public class FlowRunnerTestBase {
             && !runner.isRunnerThreadAlive());
   }
 
+  public void assertThreadShutDown(final FlowRunner flowRunner) {
+    waitFlowRunner(flowRunner,
+        runner -> Status.isStatusFinished(runner.getExecutableFlow().getStatus())
+            && !runner.isRunnerThreadAlive());
+  }
+
   public void assertThreadRunning() {
     waitFlowRunner(
         runner -> Status.isStatusRunning(runner.getExecutableFlow().getStatus())
@@ -54,8 +60,13 @@ public class FlowRunnerTestBase {
   }
 
   public void waitFlowRunner(final Function<FlowRunner, Boolean> statusCheck) {
+    waitFlowRunner(this.runner, statusCheck);
+  }
+
+  public void waitFlowRunner(final FlowRunner runner,
+      final Function<FlowRunner, Boolean> statusCheck) {
     for (int i = 0; i < 1000; i++) {
-      if (statusCheck.apply(this.runner)) {
+      if (statusCheck.apply(runner)) {
         return;
       }
       synchronized (EventCollectorListener.handleEvent) {
@@ -119,32 +130,42 @@ public class FlowRunnerTestBase {
 
   protected void waitForAndAssertFlowStatus(final Status status) {
     final ExecutableFlow flow = this.runner.getExecutableFlow();
+    assertFlowStatus(flow, status);
+  }
+
+  protected void assertFlowStatus(final ExecutableFlow flow, final Status status) {
     StatusTestUtils.waitForStatus(flow, status);
-    printStatuses(status, flow);
+    printStatuses(status, flow, flow);
     assertEquals(status, flow.getStatus());
   }
 
-  protected void assertStatus(final String name, final Status status) {
-    final ExecutableFlow exFlow = this.runner.getExecutableFlow();
-    final ExecutableNode node = exFlow.getExecutableNodePath(name);
+  protected void assertStatus(final ExecutableFlow flow, final String name, final Status status) {
+    final ExecutableNode node = flow.getExecutableNodePath(name);
     assertNotNull(name + " wasn't found", node);
     StatusTestUtils.waitForStatus(node, status);
-    printStatuses(status, node);
+    printStatuses(status, node, flow);
     assertEquals("Wrong status for [" + name + "]", status, node.getStatus());
   }
 
-  protected void printStatuses(final Status status, final ExecutableNode node) {
+  protected void assertStatus(final String name, final Status status) {
+    final ExecutableFlow exFlow = this.runner.getExecutableFlow();
+    assertStatus(exFlow, name, status);
+  }
+
+  protected void printStatuses(final Status status, final ExecutableNode node,
+      final ExecutableFlow flow) {
     if (status != node.getStatus()) {
-      printTestJobs();
-      printFlowJobs(this.runner.getExecutableFlow());
+      printTestJobs(flow);
+      printFlowJobs(flow);
     }
   }
 
-  private void printTestJobs() {
+  private void printTestJobs(final ExecutableFlow flow) {
     for (final String testJob : InteractiveTestJob.getTestJobNames()) {
-      final ExecutableNode testNode = this.runner.getExecutableFlow()
-          .getExecutableNodePath(testJob);
-      System.err.println("testJob: " + testNode.getNestedId() + " " + testNode.getStatus());
+      final ExecutableNode testNode = flow.getExecutableNodePath(testJob);
+      if (testNode != null) {
+        System.err.println("testJob: " + testNode.getNestedId() + " " + testNode.getStatus());
+      }
     }
   }