azkaban-aplcache

Details

diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/FlowWatcherTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/FlowWatcherTestUtil.java
new file mode 100644
index 0000000..782084f
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/FlowWatcherTestUtil.java
@@ -0,0 +1,105 @@
+package azkaban.execapp.event;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+import org.junit.Assert;
+
+public class FlowWatcherTestUtil {
+
+  public static void assertPipelineLevel1(final FlowRunner runner1,
+      final FlowRunner runner2) throws Exception {
+
+    run(runner1, runner2);
+
+    final ExecutableFlow first = runner1.getExecutableFlow();
+    final ExecutableFlow second = runner2.getExecutableFlow();
+
+    for (final ExecutableNode node : second.getExecutableNodes()) {
+      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+
+      // check it's start time is after the first's children.
+      final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
+      if (watchedNode == null) {
+        continue;
+      }
+      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
+
+      System.out.println("Node " + node.getId() + " start: "
+          + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
+          + watchedNode.getEndTime() + " diff: "
+          + (node.getStartTime() - watchedNode.getEndTime()));
+
+      Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
+
+      long minParentDiff = 0;
+      if (node.getInNodes().size() > 0) {
+        minParentDiff = Long.MAX_VALUE;
+        for (final String dependency : node.getInNodes()) {
+          final ExecutableNode parent = second.getExecutableNode(dependency);
+          final long diff = node.getStartTime() - parent.getEndTime();
+          minParentDiff = Math.min(minParentDiff, diff);
+        }
+      }
+      final long diff = node.getStartTime() - watchedNode.getEndTime();
+      Assert.assertTrue(minParentDiff < 500 || diff < 500);
+    }
+  }
+
+  public static void assertPipelineLevel2(final FlowRunner runner1, final FlowRunner runner2,
+      final boolean job4Skipped) throws Exception {
+    run(runner1, runner2);
+    assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), job4Skipped);
+  }
+
+  public static void assertPipelineLevel2(final ExecutableFlow first,
+      final ExecutableFlow second, final boolean job4Skipped) {
+    for (final ExecutableNode node : second.getExecutableNodes()) {
+      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+
+      // check it's start time is after the first's children.
+      final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
+      if (watchedNode == null) {
+        continue;
+      }
+      Assert.assertEquals(watchedNode.getStatus(),
+          job4Skipped && watchedNode.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
+
+      long minDiff = Long.MAX_VALUE;
+      for (final String watchedChild : watchedNode.getOutNodes()) {
+        final ExecutableNode child = first.getExecutableNode(watchedChild);
+        if (child == null) {
+          continue;
+        }
+        Assert.assertEquals(child.getStatus(),
+            job4Skipped && child.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
+        final long diff = node.getStartTime() - child.getEndTime();
+        minDiff = Math.min(minDiff, diff);
+        Assert.assertTrue(
+            "Node " + node.getId() + " start: " + node.getStartTime() + " dependent on "
+                + watchedChild + " " + child.getEndTime() + " diff: " + diff,
+            node.getStartTime() >= child.getEndTime());
+      }
+
+      long minParentDiff = Long.MAX_VALUE;
+      for (final String dependency : node.getInNodes()) {
+        final ExecutableNode parent = second.getExecutableNode(dependency);
+        final long diff = node.getStartTime() - parent.getEndTime();
+        minParentDiff = Math.min(minParentDiff, diff);
+      }
+      Assert.assertTrue("minPipelineTimeDiff:" + minDiff
+              + " minDependencyTimeDiff:" + minParentDiff,
+          minParentDiff < 5000 || minDiff < 5000);
+    }
+  }
+
+  private static void run(final FlowRunner runner1, final FlowRunner runner2)
+      throws InterruptedException {
+    final Thread runner1Thread = new Thread(runner1);
+    final Thread runner2Thread = new Thread(runner2);
+    runner1Thread.start();
+    runner2Thread.start();
+    runner2Thread.join();
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index 2244ade..cf891a0 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -16,249 +16,55 @@
 
 package azkaban.execapp.event;
 
-import static org.mockito.Mockito.mock;
-
-import azkaban.execapp.EventCollectorListener;
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.FlowRunnerTestUtil;
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorLoader;
-import azkaban.executor.JavaJob;
-import azkaban.executor.MockExecutorLoader;
-import azkaban.executor.Status;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.utils.Props;
-import java.io.File;
+import azkaban.executor.InteractiveTestJob;
 import java.io.IOException;
-import org.apache.commons.io.FileUtils;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class LocalFlowWatcherTest {
 
-  private final AzkabanEventReporter azkabanEventReporter = null;
-  private JobTypeManager jobtypeManager;
-  private int dirVal = 0;
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private FlowRunnerTestUtil testUtil;
 
   @Before
   public void setUp() throws Exception {
-    this.jobtypeManager =
-        new JobTypeManager(null, null, this.getClass().getClassLoader());
-    this.jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
+    this.testUtil = new FlowRunnerTestUtil("exectest1", this.temporaryFolder);
+    InteractiveTestJob.setQuickSuccess(true);
   }
 
   @After
   public void tearDown() throws IOException {
+    InteractiveTestJob.resetQuickSuccess();
   }
 
-  public File setupDirectory() throws IOException {
-    System.out.println("Create temp dir");
-    final File workingDir = new File("_AzkabanTestDir_" + this.dirVal);
-    if (workingDir.exists()) {
-      FileUtils.deleteDirectory(workingDir);
-    }
-    workingDir.mkdirs();
-    this.dirVal++;
-
-    return workingDir;
-  }
-
-  @Ignore
   @Test
   public void testBasicLocalFlowWatcher() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
-
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-
-    final File workingDir1 = setupDirectory();
-    final FlowRunner runner1 =
-        createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
-            null);
-    final Thread runner1Thread = new Thread(runner1);
-
-    final File workingDir2 = setupDirectory();
-    final LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
-    final FlowRunner runner2 =
-        createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
-            watcher, 2);
-    final Thread runner2Thread = new Thread(runner2);
-
-    runner1Thread.start();
-    runner2Thread.start();
-    runner2Thread.join();
-
-    FileUtils.deleteDirectory(workingDir1);
-    FileUtils.deleteDirectory(workingDir2);
-
-    testPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+    final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+    final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1", watcher(runner1), 2);
+    FlowWatcherTestUtil.assertPipelineLevel2(runner1, runner2, false);
   }
 
-  @Ignore
   @Test
   public void testLevel1LocalFlowWatcher() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
-
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-
-    final File workingDir1 = setupDirectory();
-    final FlowRunner runner1 =
-        createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
-            null);
-    final Thread runner1Thread = new Thread(runner1);
-
-    final File workingDir2 = setupDirectory();
-    final LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
-    final FlowRunner runner2 =
-        createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
-            watcher, 1);
-    final Thread runner2Thread = new Thread(runner2);
-
-    runner1Thread.start();
-    runner2Thread.start();
-    runner2Thread.join();
-
-    FileUtils.deleteDirectory(workingDir1);
-    FileUtils.deleteDirectory(workingDir2);
-
-    testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+    final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+    final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1", watcher(runner1), 1);
+    FlowWatcherTestUtil.assertPipelineLevel1(runner1, runner2);
   }
 
-  @Ignore
   @Test
   public void testLevel2DiffLocalFlowWatcher() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
-
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-
-    final File workingDir1 = setupDirectory();
-    final FlowRunner runner1 =
-        createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
-            null);
-    final Thread runner1Thread = new Thread(runner1);
-
-    final File workingDir2 = setupDirectory();
-    final LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
-    final FlowRunner runner2 =
-        createFlowRunner(workingDir2, loader, eventCollector, "exec1-mod", 2,
-            watcher, 1);
-    final Thread runner2Thread = new Thread(runner2);
-
-    runner1Thread.start();
-    runner2Thread.start();
-    runner2Thread.join();
-
-    FileUtils.deleteDirectory(workingDir1);
-    FileUtils.deleteDirectory(workingDir2);
-
-    testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+    final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+    final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1-mod", watcher(runner1), 2);
+    FlowWatcherTestUtil.assertPipelineLevel2(runner1, runner2, false);
   }
 
-  private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
-    for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
-
-      // check it's start time is after the first's children.
-      final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
-      if (watchedNode == null) {
-        continue;
-      }
-      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
-
-      System.out.println("Node " + node.getId() + " start: "
-          + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
-          + watchedNode.getEndTime() + " diff: "
-          + (node.getStartTime() - watchedNode.getEndTime()));
-
-      Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
-
-      long minParentDiff = 0;
-      if (node.getInNodes().size() > 0) {
-        minParentDiff = Long.MAX_VALUE;
-        for (final String dependency : node.getInNodes()) {
-          final ExecutableNode parent = second.getExecutableNode(dependency);
-          final long diff = node.getStartTime() - parent.getEndTime();
-          minParentDiff = Math.min(minParentDiff, diff);
-        }
-      }
-      final long diff = node.getStartTime() - watchedNode.getEndTime();
-      System.out.println("   minPipelineTimeDiff:" + diff
-          + " minDependencyTimeDiff:" + minParentDiff);
-      Assert.assertTrue(minParentDiff < 100 || diff < 100);
-    }
+  private LocalFlowWatcher watcher(final FlowRunner previousRunner) {
+    return new LocalFlowWatcher(previousRunner);
   }
-
-  private void testPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
-    for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
-
-      // check it's start time is after the first's children.
-      final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
-      if (watchedNode == null) {
-        continue;
-      }
-      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
-
-      long minDiff = Long.MAX_VALUE;
-      for (final String watchedChild : watchedNode.getOutNodes()) {
-        final ExecutableNode child = first.getExecutableNode(watchedChild);
-        if (child == null) {
-          continue;
-        }
-        Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
-        final long diff = node.getStartTime() - child.getEndTime();
-        minDiff = Math.min(minDiff, diff);
-        System.out.println("Node " + node.getId() + " start: "
-            + node.getStartTime() + " dependent on " + watchedChild + " "
-            + child.getEndTime() + " diff: " + diff);
-
-        Assert.assertTrue(node.getStartTime() >= child.getEndTime());
-      }
-
-      long minParentDiff = Long.MAX_VALUE;
-      for (final String dependency : node.getInNodes()) {
-        final ExecutableNode parent = second.getExecutableNode(dependency);
-        final long diff = node.getStartTime() - parent.getEndTime();
-        minParentDiff = Math.min(minParentDiff, diff);
-      }
-      System.out.println("   minPipelineTimeDiff:" + minDiff
-          + " minDependencyTimeDiff:" + minParentDiff);
-      Assert.assertTrue(minParentDiff < 100 || minDiff < 100);
-    }
-  }
-
-  private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
-      final EventCollectorListener eventCollector, final String flowName, final int execId,
-      final FlowWatcher watcher, final Integer pipeline) throws Exception {
-    return createFlowRunner(workingDir, loader, eventCollector, flowName, execId, watcher, pipeline,
-        new Props());
-  }
-
-  private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
-      final EventCollectorListener eventCollector, final String flowName, final int execId,
-      final FlowWatcher watcher, final Integer pipeline, final Props azkabanProps)
-      throws Exception {
-    final File testDir = new File("unit/executions/exectest1");
-    final ExecutableFlow exFlow =
-        FlowRunnerTestUtil.prepareExecDir(workingDir, testDir, flowName, execId);
-    final ExecutionOptions option = exFlow.getExecutionOptions();
-    if (watcher != null) {
-      option.setPipelineLevel(pipeline);
-      option.setPipelineExecutionId(watcher.getExecId());
-    }
-    loader.uploadExecutableFlow(exFlow);
-    final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
-        this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
-    runner.setFlowWatcher(watcher);
-    runner.addListener(eventCollector);
-
-    return runner;
-  }
-
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index 49f443d..fbe6a6c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -16,30 +16,13 @@
 
 package azkaban.execapp.event;
 
-import static org.mockito.Mockito.mock;
-
-import azkaban.execapp.EventCollectorListener;
 import azkaban.execapp.FlowRunner;
 import azkaban.execapp.FlowRunnerTestUtil;
-import azkaban.execapp.jmx.JmxJobMBeanManager;
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlowBase;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.test.Utils;
-import azkaban.test.executions.ExecutionsTestUtil;
-import azkaban.utils.Props;
-import java.io.File;
 import java.io.IOException;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -47,18 +30,14 @@ import org.junit.rules.TemporaryFolder;
 
 public class RemoteFlowWatcherTest {
 
-  private final AzkabanEventReporter azkabanEventReporter = null;
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
-  private JobTypeManager jobtypeManager;
+  private FlowRunnerTestUtil testUtil;
 
   @Before
   public void setUp() throws Exception {
-    this.jobtypeManager =
-        new JobTypeManager(null, null, this.getClass().getClassLoader());
-    this.jobtypeManager.getJobTypePluginSet().addPluginClass("test", InteractiveTestJob.class);
-    Utils.initServiceProvider();
-    JmxJobMBeanManager.getInstance().initialize(new Props());
+    this.testUtil = new FlowRunnerTestUtil("exectest1", this.temporaryFolder);
+    this.testUtil.setExecutorLoader(new MockExecutorLoader());
     InteractiveTestJob.setQuickSuccess(true);
   }
 
@@ -69,230 +48,50 @@ public class RemoteFlowWatcherTest {
 
   @Test
   public void testBasicRemoteFlowWatcher() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
-
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-
-    final File workingDir1 = this.temporaryFolder.newFolder();
-    final FlowRunner runner1 =
-        createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
-            null);
-    final Thread runner1Thread = new Thread(runner1);
-
-    final File workingDir2 = this.temporaryFolder.newFolder();
-    final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
-    final FlowRunner runner2 =
-        createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
-            watcher, 2);
-    final Thread runner2Thread = new Thread(runner2);
-
-    printCurrentState("runner1 ", runner1.getExecutableFlow());
-    runner1Thread.start();
-    runner2Thread.start();
-
-    runner2Thread.join();
-
-    assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), false);
+    final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+    final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1", watcher(runner1), 2);
+    FlowWatcherTestUtil.assertPipelineLevel2(runner1, runner2, false);
   }
 
   @Test
   public void testRemoteFlowWatcherBlockingJobsLeftInReadyState() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
+    final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+    final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1", watcher(runner1), 2);
 
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-
-    final File workingDir1 = this.temporaryFolder.newFolder();
-    final FlowRunner runner1 =
-        createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
-            null);
     final Thread runner1Thread = new Thread(runner1);
-
-    final File workingDir2 = this.temporaryFolder.newFolder();
-    final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
-    final FlowRunner runner2 =
-        createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
-            watcher, 2);
-    final Thread runner2Thread = new Thread(runner2);
-
-    printCurrentState("runner1 ", runner1.getExecutableFlow());
     runner1Thread.start();
     runner1Thread.join();
+
     // simulate a real life scenario - this can happen for disabled jobs inside subflows:
     // flow has finished otherwise but a "blocking" job has READY status
     // the test gets stuck here without the fix in FlowWatcher.peekStatus
     runner1.getExecutableFlow().getExecutableNodePath("job4").setStatus(Status.READY);
-    loader.updateExecutableFlow(runner1.getExecutableFlow());
+    this.testUtil.getExecutorLoader().updateExecutableFlow(runner1.getExecutableFlow());
 
+    final Thread runner2Thread = new Thread(runner2);
     runner2Thread.start();
     runner2Thread.join();
 
-    assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), true);
+    FlowWatcherTestUtil
+        .assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), true);
   }
 
   @Test
   public void testLevel1RemoteFlowWatcher() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
-
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-
-    final File workingDir1 = this.temporaryFolder.newFolder();
-    final FlowRunner runner1 =
-        createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
-            null);
-    final Thread runner1Thread = new Thread(runner1);
-
-    final File workingDir2 = this.temporaryFolder.newFolder();
-    final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
-    final FlowRunner runner2 =
-        createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
-            watcher, 1);
-    final Thread runner2Thread = new Thread(runner2);
-
-    runner1Thread.start();
-    runner2Thread.start();
-    runner2Thread.join();
-
-    testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+    final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+    final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1", watcher(runner1), 1);
+    FlowWatcherTestUtil.assertPipelineLevel1(runner1, runner2);
   }
 
   @Test
   public void testLevel2DiffRemoteFlowWatcher() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
-
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-
-    final File workingDir1 = this.temporaryFolder.newFolder();
-    final FlowRunner runner1 =
-        createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
-            null);
-    final Thread runner1Thread = new Thread(runner1);
-
-    final File workingDir2 = this.temporaryFolder.newFolder();
-
-    final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
-    final FlowRunner runner2 =
-        createFlowRunner(workingDir2, loader, eventCollector, "exec1-mod", 2,
-            watcher, 1);
-    final Thread runner2Thread = new Thread(runner2);
-
-    runner1Thread.start();
-    runner2Thread.start();
-    runner2Thread.join();
-
-    testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+    final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+    final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1-mod", watcher(runner1), 2);
+    FlowWatcherTestUtil.assertPipelineLevel2(runner1, runner2, false);
   }
 
-  private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
-    for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
-
-      // check it's start time is after the first's children.
-      final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
-      if (watchedNode == null) {
-        continue;
-      }
-      Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
-
-      System.out.println("Node " + node.getId() + " start: "
-          + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
-          + watchedNode.getEndTime() + " diff: "
-          + (node.getStartTime() - watchedNode.getEndTime()));
-
-      Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
-
-      long minParentDiff = 0;
-      if (node.getInNodes().size() > 0) {
-        minParentDiff = Long.MAX_VALUE;
-        for (final String dependency : node.getInNodes()) {
-          final ExecutableNode parent = second.getExecutableNode(dependency);
-          final long diff = node.getStartTime() - parent.getEndTime();
-          minParentDiff = Math.min(minParentDiff, diff);
-        }
-      }
-      final long diff = node.getStartTime() - watchedNode.getEndTime();
-      Assert.assertTrue(minParentDiff < 500 || diff < 500);
-    }
+  private RemoteFlowWatcher watcher(final FlowRunner previousRunner) {
+    return new RemoteFlowWatcher(previousRunner.getExecutionId(),
+        this.testUtil.getExecutorLoader(), 10);
   }
-
-  private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second,
-      final boolean job4Skipped) {
-    for (final ExecutableNode node : second.getExecutableNodes()) {
-      Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
-
-      // check it's start time is after the first's children.
-      final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
-      if (watchedNode == null) {
-        continue;
-      }
-      Assert.assertEquals(watchedNode.getStatus(),
-          job4Skipped && watchedNode.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
-
-      long minDiff = Long.MAX_VALUE;
-      for (final String watchedChild : watchedNode.getOutNodes()) {
-        final ExecutableNode child = first.getExecutableNode(watchedChild);
-        if (child == null) {
-          continue;
-        }
-        Assert.assertEquals(child.getStatus(),
-            job4Skipped && child.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
-        final long diff = node.getStartTime() - child.getEndTime();
-        minDiff = Math.min(minDiff, diff);
-        Assert.assertTrue(
-            "Node " + node.getId() + " start: " + node.getStartTime() + " dependent on "
-                + watchedChild + " " + child.getEndTime() + " diff: " + diff,
-            node.getStartTime() >= child.getEndTime());
-      }
-
-      long minParentDiff = Long.MAX_VALUE;
-      for (final String dependency : node.getInNodes()) {
-        final ExecutableNode parent = second.getExecutableNode(dependency);
-        final long diff = node.getStartTime() - parent.getEndTime();
-        minParentDiff = Math.min(minParentDiff, diff);
-      }
-      Assert.assertTrue("minPipelineTimeDiff:" + minDiff
-              + " minDependencyTimeDiff:" + minParentDiff,
-          minParentDiff < 5000 || minDiff < 5000);
-    }
-  }
-
-  private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
-      final EventCollectorListener eventCollector, final String flowName, final int execId,
-      final FlowWatcher watcher, final Integer pipeline) throws Exception {
-    return createFlowRunner(workingDir, loader, eventCollector, flowName, execId, watcher, pipeline,
-        new Props());
-  }
-
-  private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
-      final EventCollectorListener eventCollector, final String flowName, final int execId,
-      final FlowWatcher watcher, final Integer pipeline, final Props azkabanProps)
-      throws Exception {
-    final File testDir = ExecutionsTestUtil.getFlowDir("exectest1");
-    final ExecutableFlow exFlow =
-        FlowRunnerTestUtil.prepareExecDir(workingDir, testDir, flowName, execId);
-    final ExecutionOptions options = exFlow.getExecutionOptions();
-    if (watcher != null) {
-      options.setPipelineLevel(pipeline);
-      options.setPipelineExecutionId(watcher.getExecId());
-    }
-
-    loader.uploadExecutableFlow(exFlow);
-    final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
-        this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
-    runner.setFlowWatcher(watcher);
-    runner.addListener(eventCollector);
-
-    return runner;
-  }
-
-  private void printCurrentState(final String prefix, final ExecutableFlowBase flow) {
-    for (final ExecutableNode node : flow.getExecutableNodes()) {
-
-      System.err.println(prefix + node.getNestedId() + "->"
-          + node.getStatus().name());
-      if (node instanceof ExecutableFlowBase) {
-        printCurrentState(prefix, (ExecutableFlowBase) node);
-      }
-    }
-  }
-
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 2de4f0e..7a36289 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -46,17 +46,14 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
   @Test
   public void testBasicPipelineLevel1RunDisabledJobs() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner previousRunner =
-        this.testUtil.createFromFlowMap(eventCollector, "jobf", "prev");
+    final FlowRunner previousRunner = this.testUtil.createFromFlowMap("jobf", "prev");
 
     final ExecutionOptions options = new ExecutionOptions();
     options.setPipelineExecutionId(previousRunner.getExecutableFlow()
         .getExecutionId());
     options.setPipelineLevel(1);
     final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
-    final FlowRunner pipelineRunner =
-        this.testUtil.createFromFlowMap(eventCollector, "jobf", "pipe", options);
+    final FlowRunner pipelineRunner = this.testUtil.createFromFlowMap("jobf", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
     // 1. START FLOW
@@ -165,17 +162,14 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
   @Test
   public void testBasicPipelineLevel1Run() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner previousRunner =
-        this.testUtil.createFromFlowMap(eventCollector, "jobf", "prev");
+    final FlowRunner previousRunner = this.testUtil.createFromFlowMap("jobf", "prev");
 
     final ExecutionOptions options = new ExecutionOptions();
     options.setPipelineExecutionId(previousRunner.getExecutableFlow()
         .getExecutionId());
     options.setPipelineLevel(1);
     final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
-    final FlowRunner pipelineRunner =
-        this.testUtil.createFromFlowMap(eventCollector, "jobf", "pipe", options);
+    final FlowRunner pipelineRunner = this.testUtil.createFromFlowMap("jobf", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
     // 1. START FLOW
@@ -286,17 +280,15 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
   @Test
   public void testBasicPipelineLevel2Run() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner previousRunner =
-        this.testUtil.createFromFlowMap(eventCollector, "pipelineFlow", "prev");
+    final FlowRunner previousRunner = this.testUtil.createFromFlowMap("pipelineFlow", "prev");
 
     final ExecutionOptions options = new ExecutionOptions();
     options.setPipelineExecutionId(previousRunner.getExecutableFlow()
         .getExecutionId());
     options.setPipelineLevel(2);
     final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
-    final FlowRunner pipelineRunner =
-        this.testUtil.createFromFlowMap(eventCollector, "pipelineFlow", "pipe", options);
+    final FlowRunner pipelineRunner = this.testUtil
+        .createFromFlowMap("pipelineFlow", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
     // 1. START FLOW
@@ -418,17 +410,15 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
   @Test
   public void testBasicPipelineLevel2Run2() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner previousRunner =
-        this.testUtil.createFromFlowMap(eventCollector, "pipeline1_2", "prev");
+    final FlowRunner previousRunner = this.testUtil.createFromFlowMap("pipeline1_2", "prev");
 
     final ExecutionOptions options = new ExecutionOptions();
     options.setPipelineExecutionId(previousRunner.getExecutableFlow()
         .getExecutionId());
     options.setPipelineLevel(2);
     final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
-    final FlowRunner pipelineRunner =
-        this.testUtil.createFromFlowMap(eventCollector, "pipeline1_2", "pipe", options);
+    final FlowRunner pipelineRunner = this.testUtil
+        .createFromFlowMap("pipeline1_2", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
     // 1. START FLOW
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 54db1e6..b3da436 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -91,8 +91,6 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
    */
   @Test
   public void testBasicRun() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-
     final Map<String, String> flowParams = new HashMap<>();
     flowParams.put("param4", "override.4");
     flowParams.put("param10", "override.10");
@@ -101,8 +99,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     final ExecutionOptions options = new ExecutionOptions();
     options.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
 
-    this.runner = this.testUtil
-        .createFromFlowMap(eventCollector, "jobf", options, flowParams, new Props());
+    this.runner = this.testUtil.createFromFlowMap("jobf", options, flowParams, new Props());
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -243,9 +240,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
    */
   @Test
   public void testDisabledNormal() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil
-        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
     final ExecutableFlow flow = this.runner.getExecutableFlow();
     flow.getExecutableNode("jobb").setStatus(Status.DISABLED);
     ((ExecutableFlowBase) flow.getExecutableNode("jobd")).getExecutableNode(
@@ -302,9 +297,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testNormalFailure1() throws Exception {
     // Test propagation of KILLED status to embedded flows.
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil
-        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -341,9 +334,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testNormalFailure2() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil
-        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -390,9 +381,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testNormalFailure3() throws Exception {
     // Test propagation of CANCELLED status to embedded flows different branch
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil
-        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -448,9 +437,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testFailedFinishingFailure3() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
-        FailureAction.FINISH_ALL_POSSIBLE);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_ALL_POSSIBLE);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -510,9 +497,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testCancelOnFailure() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
-        FailureAction.CANCEL_ALL);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.CANCEL_ALL);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -559,9 +544,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testRetryOnFailure() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil
-        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
     final ExecutableFlow flow = this.runner.getExecutableFlow();
     flow.getExecutableNode("joba").setStatus(Status.DISABLED);
     ((ExecutableFlowBase) flow.getExecutableNode("jobb")).getExecutableNode(
@@ -648,9 +631,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testCancel() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
-        FailureAction.CANCEL_ALL);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.CANCEL_ALL);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -698,9 +679,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testManualCancelOnFailure() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil
-        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -751,9 +730,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
    */
   @Test
   public void testPause() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil
-        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -836,9 +813,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
    */
   @Test
   public void testPauseKill() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil
-        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -886,9 +861,9 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
    */
   @Test
   public void testPauseFail() throws Exception {
-    this.eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil.createFromFlowMap(this.eventCollector, "jobf",
-        FailureAction.FINISH_CURRENTLY_RUNNING);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+    final EventCollectorListener eventCollector = new EventCollectorListener();
+    this.runner.addListener(eventCollector);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -918,7 +893,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     // Now, ensure that jobd:innerJobA has completely finished as failed before resuming.
     // If we would resume before the job failure has been completely processed, FlowRunner would be
     // able to start some new jobs instead of cancelling everything.
-    waitEventFired("jobd:innerJobA", Status.FAILED);
+    FlowRunnerTestUtil.waitEventFired(eventCollector, "jobd:innerJobA", Status.FAILED);
     waitForAndAssertFlowStatus(Status.PAUSED);
 
     this.runner.resume("me");
@@ -946,9 +921,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
    */
   @Test
   public void testPauseFailFinishAll() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
-        FailureAction.FINISH_ALL_POSSIBLE);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_ALL_POSSIBLE);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
@@ -1005,9 +978,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
    */
   @Test
   public void testFlowKilledByJobLevelSLA() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
-        FailureAction.CANCEL_ALL);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.CANCEL_ALL);
 
     FlowRunnerTestUtil.startThread(this.runner);
     assertStatus("joba", Status.RUNNING);
@@ -1030,8 +1001,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
    */
   @Test
   public void testPauseFailKill() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf", FailureAction.CANCEL_ALL);
+    this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.CANCEL_ALL);
 
     // 1. START FLOW
     FlowRunnerTestUtil.startThread(this.runner);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
index c1aa7a2..c15c2ce 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -18,9 +18,7 @@ package azkaban.execapp;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
 
-import azkaban.event.Event;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
@@ -37,7 +35,6 @@ public class FlowRunnerTestBase {
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   protected FlowRunner runner;
-  protected EventCollectorListener eventCollector;
 
   public static boolean isStarted(final Status status) {
     if (status == Status.QUEUED) {
@@ -104,22 +101,6 @@ public class FlowRunnerTestBase {
     waitJobStatuses(FlowRunnerTest::isStarted, jobs);
   }
 
-  protected void waitEventFired(final String nestedId, final Status status)
-      throws InterruptedException {
-    for (int i = 0; i < 1000; i++) {
-      for (final Event event : this.eventCollector.getEventList()) {
-        if (event.getData().getStatus() == status && event.getData().getNestedId()
-            .equals(nestedId)) {
-          return;
-        }
-      }
-      synchronized (EventCollectorListener.handleEvent) {
-        EventCollectorListener.handleEvent.wait(10L);
-      }
-    }
-    fail("Event wasn't fired with [" + nestedId + "], " + status);
-  }
-
   public boolean checkJobStatuses(final Function<Status, Boolean> statusCheck,
       final String[] jobs) {
     final ExecutableFlow exFlow = this.runner.getExecutableFlow();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index 27158ab..c170f6e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -16,17 +16,22 @@
 
 package azkaban.execapp;
 
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import azkaban.event.Event;
+import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
+import azkaban.executor.MockExecutorLoader;
+import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
@@ -51,9 +56,9 @@ public class FlowRunnerTestUtil {
   private final Map<String, Flow> flowMap;
   private final Project project;
   private final File workingDir;
-  private final ExecutorLoader fakeExecutorLoader;
   private final JobTypeManager jobtypeManager;
   private final File projectDir;
+  private ExecutorLoader executorLoader;
 
   public FlowRunnerTestUtil(final String flowName, final TemporaryFolder temporaryFolder)
       throws Exception {
@@ -65,8 +70,8 @@ public class FlowRunnerTestUtil {
     this.flowMap = FlowRunnerTestUtil
         .prepareProject(this.project, ExecutionsTestUtil.getFlowDir(flowName), this.workingDir);
 
-    this.fakeExecutorLoader = mock(ExecutorLoader.class);
-    when(this.fakeExecutorLoader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
+    this.executorLoader = mock(ExecutorLoader.class);
+    when(this.executorLoader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
 
     Utils.initServiceProvider();
     JmxJobMBeanManager.getInstance().initialize(new Props());
@@ -109,6 +114,23 @@ public class FlowRunnerTestUtil {
     return flowMap;
   }
 
+  public static void waitEventFired(final EventCollectorListener eventCollector,
+      final String nestedId, final Status status)
+      throws InterruptedException {
+    for (int i = 0; i < 1000; i++) {
+      for (final Event event : eventCollector.getEventList()) {
+        if (event.getData().getStatus() == status && event.getData().getNestedId()
+            .equals(nestedId)) {
+          return;
+        }
+      }
+      synchronized (EventCollectorListener.handleEvent) {
+        EventCollectorListener.handleEvent.wait(10L);
+      }
+    }
+    fail("Event wasn't fired with [" + nestedId + "], " + status);
+  }
+
   public static ExecutableFlow prepareExecDir(final File workingDir, final File execDir,
       final String flowName, final int execId) throws IOException {
     FileUtils.copyDirectory(execDir, workingDir);
@@ -126,30 +148,53 @@ public class FlowRunnerTestUtil {
     new Thread(runner).start();
   }
 
+  public FlowRunner createFromFlowFile(final String flowName) throws Exception {
+    return createFromFlowFile(new EventCollectorListener(), flowName);
+  }
+
   public FlowRunner createFromFlowFile(final EventCollectorListener eventCollector,
       final String flowName) throws Exception {
     return createFromFlowFile(flowName, eventCollector, new ExecutionOptions());
   }
 
   public FlowRunner createFromFlowFile(final String flowName,
+      final EventCollectorListener eventCollector, final ExecutionOptions options)
+      throws Exception {
+    return createFromFlowFile(flowName, eventCollector, options, null, null);
+  }
+
+  public FlowRunner createFromFlowFile(final String flowName, final FlowWatcher watcher,
+      final Integer pipeline) throws Exception {
+    return createFromFlowFile(flowName, new EventCollectorListener(), new ExecutionOptions(),
+        watcher, pipeline);
+  }
+
+  public FlowRunner createFromFlowFile(final String flowName,
       final EventCollectorListener eventCollector,
-      final ExecutionOptions options) throws Exception {
+      final ExecutionOptions options, final FlowWatcher watcher, final Integer pipeline)
+      throws Exception {
     final ExecutableFlow exFlow = FlowRunnerTestUtil
         .prepareExecDir(this.workingDir, this.projectDir, flowName, 1);
-    return createFromExecutableFlow(eventCollector, exFlow, options, new HashMap<>(), new Props());
+    if (watcher != null) {
+      options.setPipelineLevel(pipeline);
+      options.setPipelineExecutionId(watcher.getExecId());
+    }
+    final FlowRunner runner = createFromExecutableFlow(eventCollector, exFlow, options,
+        new HashMap<>(),
+        new Props());
+    runner.setFlowWatcher(watcher);
+    return runner;
   }
 
-  public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
-      final String flowName, final String jobIdPrefix) throws Exception {
-    return createFromFlowMap(eventCollector, flowName, jobIdPrefix,
-        new ExecutionOptions(), new Props());
+  public FlowRunner createFromFlowMap(final String flowName, final String jobIdPrefix)
+      throws Exception {
+    return createFromFlowMap(flowName, jobIdPrefix, new ExecutionOptions());
   }
 
-  public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
-      final String flowName, final String jobIdPrefix, final ExecutionOptions options)
+  public FlowRunner createFromFlowMap(final String flowName, final String jobIdPrefix,
+      final ExecutionOptions options)
       throws Exception {
-    return createFromFlowMap(eventCollector, flowName, jobIdPrefix,
-        options, new Props());
+    return createFromFlowMap(flowName, jobIdPrefix, options, new Props());
   }
 
   public FlowRunner createFromFlowMap(final String flowName,
@@ -157,30 +202,35 @@ public class FlowRunnerTestUtil {
     return createFromFlowMap(null, flowName, null, flowParams, new Props());
   }
 
+  public FlowRunner createFromFlowMap(final String flowName, final ExecutionOptions options,
+      final Map<String, String> flowParams, final Props azkabanProps) throws Exception {
+    return createFromFlowMap(null, flowName, options, flowParams,
+        azkabanProps);
+  }
+
   public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
       final String flowName, final ExecutionOptions options,
       final Map<String, String> flowParams, final Props azkabanProps)
       throws Exception {
     final Flow flow = this.flowMap.get(flowName);
     final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
-    return createFromExecutableFlow(eventCollector, exFlow, options, flowParams, azkabanProps);
+    return createFromExecutableFlow(eventCollector, exFlow, options, flowParams,
+        azkabanProps);
   }
 
-  public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
-      final String flowName, final FailureAction action)
+  public FlowRunner createFromFlowMap(final String flowName, final FailureAction action)
       throws Exception {
     final ExecutionOptions options = new ExecutionOptions();
     options.setFailureAction(action);
-    return createFromFlowMap(eventCollector, flowName, options, new HashMap<>(), new Props());
+    return createFromFlowMap(flowName, options, new HashMap<>(), new Props());
   }
 
-  private FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
-      final String flowName, final String jobIdPrefix, final ExecutionOptions options,
-      final Props azkabanProps)
-      throws Exception {
+  private FlowRunner createFromFlowMap(final String flowName, final String jobIdPrefix,
+      final ExecutionOptions options, final Props azkabanProps) throws Exception {
     final Map<String, String> flowParams = new HashMap<>();
     flowParams.put(InteractiveTestJob.JOB_ID_PREFIX, jobIdPrefix);
-    return createFromFlowMap(eventCollector, flowName, options, flowParams, azkabanProps);
+    return createFromFlowMap(new EventCollectorListener(), flowName, options, flowParams,
+        azkabanProps);
   }
 
   private FlowRunner createFromExecutableFlow(final EventCollectorListener eventCollector,
@@ -194,12 +244,22 @@ public class FlowRunnerTestUtil {
       exFlow.setExecutionOptions(options);
     }
     exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
+    this.executorLoader.uploadExecutableFlow(exFlow);
     final FlowRunner runner =
-        new FlowRunner(exFlow, this.fakeExecutorLoader, mock(ProjectLoader.class),
+        new FlowRunner(exFlow, this.executorLoader, mock(ProjectLoader.class),
             this.jobtypeManager, azkabanProps, null);
     if (eventCollector != null) {
       runner.addListener(eventCollector);
     }
     return runner;
   }
+
+  public ExecutorLoader getExecutorLoader() {
+    return this.executorLoader;
+  }
+
+  public void setExecutorLoader(final MockExecutorLoader executorLoader) {
+    this.executorLoader = executorLoader;
+  }
+
 }