azkaban-aplcache

Details

diff --git a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
index 506cffd..d886ffd 100644
--- a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
@@ -28,6 +28,7 @@ import org.apache.log4j.Logger;
 
 public class InteractiveTestJob extends AbstractProcessJob {
 
+  public static final String JOB_ID_PREFIX = "InteractiveTestJob.jobIdPrefix";
   private static final ConcurrentHashMap<String, InteractiveTestJob> testJobs =
       new ConcurrentHashMap<>();
   private static volatile boolean quickSuccess = false;
@@ -83,10 +84,10 @@ public class InteractiveTestJob extends AbstractProcessJob {
   public void run() throws Exception {
     final String nestedFlowPath =
         this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
-    final String groupName = this.getJobProps().getString("group", null);
+    final String jobIdPrefix = this.getJobProps().getString(JOB_ID_PREFIX, null);
     String id = nestedFlowPath == null ? this.getId() : nestedFlowPath;
-    if (groupName != null) {
-      id = groupName + ":" + id;
+    if (jobIdPrefix != null) {
+      id = jobIdPrefix + ":" + id;
     }
     testJobs.put(id, this);
     synchronized (testJobs) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 8b37f50..2de4f0e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -16,34 +16,14 @@
 
 package azkaban.execapp;
 
-import static org.mockito.Mockito.mock;
-
 import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.event.LocalFlowWatcher;
-import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
-import azkaban.executor.JavaJob;
-import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
-import azkaban.flow.Flow;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.jobtype.JobTypePluginSet;
-import azkaban.project.Project;
-import azkaban.project.ProjectLoader;
-import azkaban.test.Utils;
-import azkaban.test.executions.ExecutionsTestUtil;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.utils.Props;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 /**
  * Flows in this test: joba jobb joba1 jobc->joba jobd->joba jobe->jobb,jobc,jobd jobf->jobe,joba1
@@ -57,45 +37,18 @@ import org.junit.rules.TemporaryFolder;
  */
 public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
 
-  private static int id = 101;
-  private final AzkabanEventReporter azkabanEventReporter = null;
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
-  private File workingDir;
-  private JobTypeManager jobtypeManager;
-  private ExecutorLoader fakeExecutorLoader;
-  private Project project;
-  private Map<String, Flow> flowMap;
-
-  public FlowRunnerPipelineTest() {
-  }
+  private FlowRunnerTestUtil testUtil;
 
   @Before
   public void setUp() throws Exception {
-    this.workingDir = this.temporaryFolder.newFolder();
-    this.jobtypeManager =
-        new JobTypeManager(null, null, this.getClass().getClassLoader());
-    final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
-
-    pluginSet.addPluginClass("java", JavaJob.class);
-    pluginSet.addPluginClass("test", InteractiveTestJob.class);
-    this.fakeExecutorLoader = new MockExecutorLoader();
-    this.project = new Project(1, "testProject");
-    Utils.initServiceProvider();
-    JmxJobMBeanManager.getInstance().initialize(new Props());
-
-    final File dir = ExecutionsTestUtil.getFlowDir("embedded2");
-    this.flowMap = FlowRunnerTestUtil
-        .prepareProject(this.project, dir, this.workingDir);
-
-    InteractiveTestJob.clearTestJobs();
+    this.testUtil = new FlowRunnerTestUtil("embedded2", this.temporaryFolder);
   }
 
   @Test
   public void testBasicPipelineLevel1RunDisabledJobs() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final FlowRunner previousRunner =
-        createFlowRunner(eventCollector, "jobf", "prev");
+        this.testUtil.createFromFlowMap(eventCollector, "jobf", "prev");
 
     final ExecutionOptions options = new ExecutionOptions();
     options.setPipelineExecutionId(previousRunner.getExecutableFlow()
@@ -103,7 +56,7 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
     options.setPipelineLevel(1);
     final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
     final FlowRunner pipelineRunner =
-        createFlowRunner(eventCollector, "jobf", "pipe", options);
+        this.testUtil.createFromFlowMap(eventCollector, "jobf", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
     // 1. START FLOW
@@ -112,12 +65,12 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
     // disable the innerFlow (entire sub-flow)
     previousFlow.getExecutableNodePath("jobb").setStatus(Status.DISABLED);
 
-    runFlowRunnerInThread(previousRunner);
+    FlowRunnerTestUtil.startThread(previousRunner);
     assertStatus(previousFlow, "joba", Status.RUNNING);
     assertStatus(previousFlow, "joba", Status.RUNNING);
     assertStatus(previousFlow, "joba1", Status.RUNNING);
 
-    runFlowRunnerInThread(pipelineRunner);
+    FlowRunnerTestUtil.startThread(pipelineRunner);
     assertStatus(pipelineFlow, "joba", Status.QUEUED);
     assertStatus(pipelineFlow, "joba1", Status.QUEUED);
 
@@ -214,7 +167,7 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
   public void testBasicPipelineLevel1Run() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final FlowRunner previousRunner =
-        createFlowRunner(eventCollector, "jobf", "prev");
+        this.testUtil.createFromFlowMap(eventCollector, "jobf", "prev");
 
     final ExecutionOptions options = new ExecutionOptions();
     options.setPipelineExecutionId(previousRunner.getExecutableFlow()
@@ -222,19 +175,19 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
     options.setPipelineLevel(1);
     final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
     final FlowRunner pipelineRunner =
-        createFlowRunner(eventCollector, "jobf", "pipe", options);
+        this.testUtil.createFromFlowMap(eventCollector, "jobf", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
     // 1. START FLOW
     final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
     final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
 
-    runFlowRunnerInThread(previousRunner);
+    FlowRunnerTestUtil.startThread(previousRunner);
     assertStatus(previousFlow, "joba", Status.RUNNING);
     assertStatus(previousFlow, "joba", Status.RUNNING);
     assertStatus(previousFlow, "joba1", Status.RUNNING);
 
-    runFlowRunnerInThread(pipelineRunner);
+    FlowRunnerTestUtil.startThread(pipelineRunner);
     assertStatus(pipelineFlow, "joba", Status.QUEUED);
     assertStatus(pipelineFlow, "joba1", Status.QUEUED);
 
@@ -335,7 +288,7 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
   public void testBasicPipelineLevel2Run() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final FlowRunner previousRunner =
-        createFlowRunner(eventCollector, "pipelineFlow", "prev");
+        this.testUtil.createFromFlowMap(eventCollector, "pipelineFlow", "prev");
 
     final ExecutionOptions options = new ExecutionOptions();
     options.setPipelineExecutionId(previousRunner.getExecutableFlow()
@@ -343,17 +296,17 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
     options.setPipelineLevel(2);
     final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
     final FlowRunner pipelineRunner =
-        createFlowRunner(eventCollector, "pipelineFlow", "pipe", options);
+        this.testUtil.createFromFlowMap(eventCollector, "pipelineFlow", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
     // 1. START FLOW
     final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
     final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
 
-    runFlowRunnerInThread(previousRunner);
+    FlowRunnerTestUtil.startThread(previousRunner);
     assertStatus(previousFlow, "pipeline1", Status.RUNNING);
 
-    runFlowRunnerInThread(pipelineRunner);
+    FlowRunnerTestUtil.startThread(pipelineRunner);
     assertStatus(pipelineFlow, "pipeline1", Status.QUEUED);
 
     InteractiveTestJob.getTestJob("prev:pipeline1").succeedJob();
@@ -467,7 +420,7 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
   public void testBasicPipelineLevel2Run2() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final FlowRunner previousRunner =
-        createFlowRunner(eventCollector, "pipeline1_2", "prev");
+        this.testUtil.createFromFlowMap(eventCollector, "pipeline1_2", "prev");
 
     final ExecutionOptions options = new ExecutionOptions();
     options.setPipelineExecutionId(previousRunner.getExecutableFlow()
@@ -475,18 +428,18 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
     options.setPipelineLevel(2);
     final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
     final FlowRunner pipelineRunner =
-        createFlowRunner(eventCollector, "pipeline1_2", "pipe", options);
+        this.testUtil.createFromFlowMap(eventCollector, "pipeline1_2", "pipe", options);
     pipelineRunner.setFlowWatcher(watcher);
 
     // 1. START FLOW
     final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
     final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
 
-    runFlowRunnerInThread(previousRunner);
+    FlowRunnerTestUtil.startThread(previousRunner);
     assertStatus(previousFlow, "pipeline1_1", Status.RUNNING);
     assertStatus(previousFlow, "pipeline1_1:innerJobA", Status.RUNNING);
 
-    runFlowRunnerInThread(pipelineRunner);
+    FlowRunnerTestUtil.startThread(pipelineRunner);
     assertStatus(pipelineFlow, "pipeline1_1", Status.RUNNING);
     assertStatus(pipelineFlow, "pipeline1_1:innerJobA", Status.QUEUED);
 
@@ -540,48 +493,4 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
     assertThreadShutDown(pipelineRunner);
   }
 
-  private void runFlowRunnerInThread(final FlowRunner runner) {
-    final Thread thread = new Thread(runner);
-    thread.start();
-  }
-
-  private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
-      final String flowName, final String groupName) throws Exception {
-    return createFlowRunner(eventCollector, flowName, groupName,
-        new ExecutionOptions(), new Props());
-  }
-
-  private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
-      final String flowName, final String groupName, final ExecutionOptions options)
-      throws Exception {
-    return createFlowRunner(eventCollector, flowName, groupName,
-        options, new Props());
-  }
-
-  private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
-      final String flowName, final String groupName, final ExecutionOptions options,
-      final Props azkabanProps)
-      throws Exception {
-    final Flow flow = this.flowMap.get(flowName);
-
-    final int exId = id++;
-    final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
-    exFlow.setExecutionPath(this.workingDir.getPath());
-    exFlow.setExecutionId(exId);
-
-    final Map<String, String> flowParam = new HashMap<>();
-    flowParam.put("group", groupName);
-    options.addAllFlowParameters(flowParam);
-    exFlow.setExecutionOptions(options);
-    this.fakeExecutorLoader.uploadExecutableFlow(exFlow);
-
-    final FlowRunner runner =
-        new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
-            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
-            this.azkabanEventReporter);
-    runner.addListener(eventCollector);
-
-    return runner;
-  }
-
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index ff82221..8c2feb9 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -16,30 +16,16 @@
 
 package azkaban.execapp;
 
-import static org.mockito.Mockito.mock;
-
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
-import azkaban.executor.JavaJob;
-import azkaban.executor.MockExecutorLoader;
-import azkaban.flow.Flow;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.project.Project;
-import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
+import azkaban.executor.Status;
 import azkaban.utils.Props;
-import java.io.File;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -57,70 +43,36 @@ import org.junit.Test;
  *
  * job2 and 4 are in nested directories so should have different shared properties than other jobs.
  */
-public class FlowRunnerPropertyResolutionTest {
+public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {
 
-  private static int id = 101;
-  private final AzkabanEventReporter azkabanEventReporter = null;
-  private File workingDir;
-  private JobTypeManager jobtypeManager;
-  private ExecutorLoader fakeExecutorLoader;
-  private Project project;
-  private Map<String, Flow> flowMap;
+  private FlowRunnerTestUtil testUtil;
 
   @Before
   public void setUp() throws Exception {
-    System.out.println("Create temp dir");
-    this.workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
-    if (this.workingDir.exists()) {
-      FileUtils.deleteDirectory(this.workingDir);
-    }
-    this.workingDir.mkdirs();
-    this.jobtypeManager =
-        new JobTypeManager(null, null, this.getClass().getClassLoader());
-    this.jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
-    this.jobtypeManager.getJobTypePluginSet().addPluginClass("test",
-        InteractiveTestJob.class);
-    this.fakeExecutorLoader = new MockExecutorLoader();
-    this.project = new Project(1, "testProject");
-
-    final File dir = new File("unit/executions/execpropstest");
-    this.flowMap = FlowRunnerTestUtil
-        .prepareProject(this.project, dir, this.workingDir);
-
-    InteractiveTestJob.clearTestJobs();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    System.out.println("Teardown temp dir");
-    if (this.workingDir != null) {
-      FileUtils.deleteDirectory(this.workingDir);
-      this.workingDir = null;
-    }
+    this.testUtil = new FlowRunnerTestUtil("execpropstest", this.temporaryFolder);
   }
 
   /**
    * Tests the basic flow resolution. Flow is defined in execpropstest
    */
-  @Ignore
   @Test
   public void testPropertyResolution() throws Exception {
     final HashMap<String, String> flowProps = new HashMap<>();
     flowProps.put("props7", "flow7");
     flowProps.put("props6", "flow6");
     flowProps.put("props5", "flow5");
-    final FlowRunner runner = createFlowRunner("job3", flowProps);
+    final FlowRunner runner = this.testUtil.createFromFlowMap("job3", flowProps);
     final Map<String, ExecutableNode> nodeMap = new HashMap<>();
     createNodeMap(runner.getExecutableFlow(), nodeMap);
+    final ExecutableFlow flow = runner.getExecutableFlow();
 
     // 1. Start flow. Job 2 should start
-    runFlowRunnerInThread(runner);
-    pause(250);
+    FlowRunnerTestUtil.startThread(runner);
+    assertStatus(flow, "job2", Status.RUNNING);
 
     // Job 2 is a normal job.
     // Only the flow overrides and the shared properties matter
-    ExecutableNode node = nodeMap.get("job2");
-    final Props job2Props = node.getInputProps();
+    final Props job2Props = nodeMap.get("job2").getInputProps();
     Assert.assertEquals("shared1", job2Props.get("props1"));
     Assert.assertEquals("job2", job2Props.get("props2"));
     Assert.assertEquals("moo3", job2Props.get("props3"));
@@ -138,9 +90,9 @@ public class FlowRunnerPropertyResolutionTest {
     job2Generated.put("props9", "gjob9");
     job2Generated.put("props10", "gjob10");
     InteractiveTestJob.getTestJob("job2").succeedJob(job2Generated);
-    pause(250);
-    node = nodeMap.get("innerflow:job1");
-    final Props job1Props = node.getInputProps();
+    assertStatus(flow, "innerflow:job1", Status.RUNNING);
+
+    final Props job1Props = nodeMap.get("innerflow:job1").getInputProps();
     Assert.assertEquals("job1", job1Props.get("props1"));
     Assert.assertEquals("job2", job1Props.get("props2"));
     Assert.assertEquals("job8", job1Props.get("props8"));
@@ -161,9 +113,8 @@ public class FlowRunnerPropertyResolutionTest {
     job1GeneratedProps.put("props7", "g2job7");
     InteractiveTestJob.getTestJob("innerflow:job1").succeedJob(
         job1GeneratedProps);
-    pause(250);
-    node = nodeMap.get("innerflow:job4");
-    final Props job4Props = node.getInputProps();
+    assertStatus(flow, "innerflow:job4", Status.RUNNING);
+    final Props job4Props = nodeMap.get("innerflow:job4").getInputProps();
     Assert.assertEquals("job8", job4Props.get("props8"));
     Assert.assertEquals("job9", job4Props.get("props9"));
     Assert.assertEquals("g2job7", job4Props.get("props7"));
@@ -183,9 +134,8 @@ public class FlowRunnerPropertyResolutionTest {
     job4GeneratedProps.put("props6", "g4job6");
     InteractiveTestJob.getTestJob("innerflow:job4").succeedJob(
         job4GeneratedProps);
-    pause(250);
-    node = nodeMap.get("job3");
-    final Props job3Props = node.getInputProps();
+    assertStatus(flow, "job3", Status.RUNNING);
+    final Props job3Props = nodeMap.get("job3").getInputProps();
     Assert.assertEquals("job3", job3Props.get("props3"));
     Assert.assertEquals("g4job6", job3Props.get("props6"));
     Assert.assertEquals("g4job9", job3Props.get("props9"));
@@ -196,30 +146,6 @@ public class FlowRunnerPropertyResolutionTest {
     Assert.assertEquals("moo4", job3Props.get("props4"));
   }
 
-  private FlowRunner createFlowRunner(final String flowName,
-      final HashMap<String, String> flowParams) throws Exception {
-    return createFlowRunner(flowName, flowParams, new Props());
-  }
-
-  private FlowRunner createFlowRunner(final String flowName,
-      final HashMap<String, String> flowParams, final Props azkabanProps) throws Exception {
-    final Flow flow = this.flowMap.get(flowName);
-
-    final int exId = id++;
-    final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
-    exFlow.setExecutionPath(this.workingDir.getPath());
-    exFlow.setExecutionId(exId);
-
-    exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
-    this.fakeExecutorLoader.uploadExecutableFlow(exFlow);
-
-    final FlowRunner runner =
-        new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
-            this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
-            this.azkabanEventReporter);
-    return runner;
-  }
-
   private void createNodeMap(final ExecutableFlowBase flow,
       final Map<String, ExecutableNode> nodeMap) {
     for (final ExecutableNode node : flow.getExecutableNodes()) {
@@ -230,17 +156,4 @@ public class FlowRunnerPropertyResolutionTest {
       }
     }
   }
-
-  private Thread runFlowRunnerInThread(final FlowRunner runner) {
-    final Thread thread = new Thread(runner);
-    thread.start();
-    return thread;
-  }
-
-  private void pause(final long millisec) {
-    try {
-      Thread.sleep(millisec);
-    } catch (final InterruptedException e) {
-    }
-  }
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index 64128df..e534171 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -16,62 +16,24 @@
 
 package azkaban.execapp;
 
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.when;
-
-import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
-import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.Status;
-import azkaban.jobExecutor.AllJobExecutorTests;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.jobtype.JobTypePluginSet;
-import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
 import azkaban.spi.EventType;
-import azkaban.test.Utils;
-import azkaban.test.executions.ExecutionsTestUtil;
-import azkaban.utils.Props;
-import java.io.File;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 
 public class FlowRunnerTest extends FlowRunnerTestBase {
 
-  private static final File TEST_DIR = ExecutionsTestUtil.getFlowDir("exectest1");
-  private final AzkabanEventReporter azkabanEventReporter = null;
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
-  private File workingDir;
-  private JobTypeManager jobtypeManager;
-  @Mock
-  private ProjectLoader fakeProjectLoader;
-  @Mock
-  private ExecutorLoader loader;
+  private FlowRunnerTestUtil testUtil;
 
   @Before
   public void setUp() throws Exception {
-    MockitoAnnotations.initMocks(this);
-    when(this.loader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
-    this.workingDir = this.temporaryFolder.newFolder();
-    this.jobtypeManager =
-        new JobTypeManager(null, null, this.getClass().getClassLoader());
-    final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
-    pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
-    pluginSet.addPluginClass("test", InteractiveTestJob.class);
-    Utils.initServiceProvider();
-    JmxJobMBeanManager.getInstance().initialize(new Props());
-
-    InteractiveTestJob.clearTestJobs();
+    this.testUtil = new FlowRunnerTestUtil("exectest1", this.temporaryFolder);
   }
 
   @Test
@@ -79,9 +41,9 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
         EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
-    this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
+    this.runner = this.testUtil.createFromFlowFile(eventCollector, "exec1");
 
-    startThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
     succeedJobs("job3", "job4", "job6");
 
     waitForAndAssertFlowStatus(Status.SUCCEEDED);
@@ -106,8 +68,9 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
         EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
-    final ExecutableFlow exFlow = FlowRunnerTestUtil
-        .prepareExecDir(this.workingDir, TEST_DIR, "exec1", 1);
+
+    this.runner = this.testUtil.createFromFlowFile(eventCollector, "exec1");
+    final ExecutableFlow exFlow = this.runner.getExecutableFlow();
 
     // Disable couple in the middle and at the end.
     exFlow.getExecutableNode("job1").setStatus(Status.DISABLED);
@@ -115,12 +78,10 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     exFlow.getExecutableNode("job5").setStatus(Status.DISABLED);
     exFlow.getExecutableNode("job10").setStatus(Status.DISABLED);
 
-    this.runner = createFlowRunner(exFlow, this.loader, eventCollector);
-
     Assert.assertTrue(!this.runner.isKilled());
     waitForAndAssertFlowStatus(Status.READY);
 
-    startThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
     succeedJobs("job3", "job4");
 
     assertThreadShutDown();
@@ -146,12 +107,10 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
         EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
-    final ExecutableFlow flow = FlowRunnerTestUtil
-        .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
 
-    this.runner = createFlowRunner(flow, this.loader, eventCollector);
+    this.runner = this.testUtil.createFromFlowFile(eventCollector, "exec2");
 
-    startThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
     succeedJobs("job6");
 
     Assert.assertTrue(!this.runner.isKilled());
@@ -177,13 +136,12 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
         EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
-    final ExecutableFlow flow = FlowRunnerTestUtil
-        .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
-    flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
+    final ExecutionOptions options = new ExecutionOptions();
+    options.setFailureAction(FailureAction.CANCEL_ALL);
 
-    this.runner = createFlowRunner(flow, this.loader, eventCollector);
+    this.runner = this.testUtil.createFromFlowFile("exec2", eventCollector, options);
 
-    startThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
     assertThreadShutDown();
 
     Assert.assertTrue(this.runner.isKilled());
@@ -209,13 +167,11 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
         EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
-    final ExecutableFlow flow = FlowRunnerTestUtil
-        .prepareExecDir(this.workingDir, TEST_DIR, "exec3", 1);
-    flow.getExecutionOptions().setFailureAction(
-        FailureAction.FINISH_ALL_POSSIBLE);
-    this.runner = createFlowRunner(flow, this.loader, eventCollector);
+    final ExecutionOptions options = new ExecutionOptions();
+    options.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+    this.runner = this.testUtil.createFromFlowFile("exec3", eventCollector, options);
 
-    startThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
     succeedJobs("job3");
 
     waitForAndAssertFlowStatus(Status.FAILED);
@@ -240,9 +196,9 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
         EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
-    this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
+    this.runner = this.testUtil.createFromFlowFile(eventCollector, "exec1");
 
-    startThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     assertStatus("job1", Status.SUCCEEDED);
     assertStatus("job2", Status.SUCCEEDED);
@@ -275,9 +231,9 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
         EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
-    this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");
+    this.runner = this.testUtil.createFromFlowFile(eventCollector, "exec4-retry");
 
-    startThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
     assertThreadShutDown();
 
     assertStatus("job-retry", Status.SUCCEEDED);
@@ -290,12 +246,6 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     waitForAndAssertFlowStatus(Status.FAILED);
   }
 
-  private void startThread(final FlowRunner runner) {
-    Assert.assertTrue(!runner.isKilled());
-    final Thread thread = new Thread(runner);
-    thread.start();
-  }
-
   private void assertAttempts(final String name, final int attempt) {
     final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNode(name);
     if (node.getAttempt() != attempt) {
@@ -333,46 +283,4 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
       compareStartFinishTimes(flow, childNode, endTime);
     }
   }
-
-  private FlowRunner createFlowRunner(final ExecutableFlow flow,
-      final ExecutorLoader loader, final EventCollectorListener eventCollector) throws Exception {
-    return createFlowRunner(flow, loader, eventCollector, new Props());
-  }
-
-  private FlowRunner createFlowRunner(final ExecutableFlow flow,
-      final ExecutorLoader loader, final EventCollectorListener eventCollector,
-      final Props azkabanProps)
-      throws Exception {
-
-    loader.uploadExecutableFlow(flow);
-    final FlowRunner runner =
-        new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
-            this.azkabanEventReporter);
-
-    runner.addListener(eventCollector);
-
-    return runner;
-  }
-
-  private FlowRunner createFlowRunner(final ExecutorLoader loader,
-      final EventCollectorListener eventCollector, final String flowName) throws Exception {
-    return createFlowRunner(loader, eventCollector, flowName, new Props());
-  }
-
-  private FlowRunner createFlowRunner(final ExecutorLoader loader,
-      final EventCollectorListener eventCollector, final String flowName, final Props azkabanProps)
-      throws Exception {
-    final ExecutableFlow exFlow = FlowRunnerTestUtil
-        .prepareExecDir(this.workingDir, TEST_DIR, flowName, 1);
-
-    loader.uploadExecutableFlow(exFlow);
-
-    final FlowRunner runner =
-        new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
-            this.azkabanEventReporter);
-
-    runner.addListener(eventCollector);
-
-    return runner;
-  }
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 0790935..54db1e6 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -17,35 +17,19 @@
 package azkaban.execapp;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
 
-import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
-import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
-import azkaban.executor.JavaJob;
-import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
-import azkaban.flow.Flow;
-import azkaban.jobExecutor.AllJobExecutorTests;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.jobtype.JobTypePluginSet;
-import azkaban.project.Project;
-import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.test.Utils;
-import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
-import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 /**
  * Test the flow run, especially with embedded flows.
@@ -95,36 +79,11 @@ import org.junit.rules.TemporaryFolder;
  */
 public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
-  private static int id = 101;
-  private final AzkabanEventReporter azkabanEventReporter = null;
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
-  private File workingDir;
-  private JobTypeManager jobtypeManager;
-  private ExecutorLoader fakeExecutorLoader;
-  private Project project;
-  private Map<String, Flow> flowMap;
+  private FlowRunnerTestUtil testUtil;
 
   @Before
   public void setUp() throws Exception {
-    this.workingDir = this.temporaryFolder.newFolder();
-    this.jobtypeManager = new JobTypeManager(null, null,
-        this.getClass().getClassLoader());
-    final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
-
-    pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
-    pluginSet.addPluginClass("java", JavaJob.class);
-    pluginSet.addPluginClass("test", InteractiveTestJob.class);
-    this.fakeExecutorLoader = new MockExecutorLoader();
-    this.project = new Project(1, "testProject");
-    Utils.initServiceProvider();
-    JmxJobMBeanManager.getInstance().initialize(new Props());
-
-    this.flowMap = FlowRunnerTestUtil
-        .prepareProject(this.project, ExecutionsTestUtil.getFlowDir("embedded2"),
-            this.workingDir);
-
-    InteractiveTestJob.clearTestJobs();
+    this.testUtil = new FlowRunnerTestUtil("embedded2", this.temporaryFolder);
   }
 
   /**
@@ -133,10 +92,20 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testBasicRun() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector);
+
+    final Map<String, String> flowParams = new HashMap<>();
+    flowParams.put("param4", "override.4");
+    flowParams.put("param10", "override.10");
+    flowParams.put("param11", "override.11");
+
+    final ExecutionOptions options = new ExecutionOptions();
+    options.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
+
+    this.runner = this.testUtil
+        .createFromFlowMap(eventCollector, "jobf", options, flowParams, new Props());
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -275,14 +244,15 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testDisabledNormal() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector);
+    this.runner = this.testUtil
+        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
     final ExecutableFlow flow = this.runner.getExecutableFlow();
     flow.getExecutableNode("jobb").setStatus(Status.DISABLED);
     ((ExecutableFlowBase) flow.getExecutableNode("jobd")).getExecutableNode(
         "innerJobA").setStatus(Status.DISABLED);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -333,10 +303,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   public void testNormalFailure1() throws Exception {
     // Test propagation of KILLED status to embedded flows.
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector);
+    this.runner = this.testUtil
+        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -371,10 +342,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   public void testNormalFailure2() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector);
+    this.runner = this.testUtil
+        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -419,10 +391,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   public void testNormalFailure3() throws Exception {
     // Test propagation of CANCELLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector);
+    this.runner = this.testUtil
+        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -476,11 +449,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   public void testFailedFinishingFailure3() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector,
+    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
         FailureAction.FINISH_ALL_POSSIBLE);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -538,11 +511,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   public void testCancelOnFailure() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector,
+    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
         FailureAction.CANCEL_ALL);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -587,14 +560,15 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   public void testRetryOnFailure() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector);
+    this.runner = this.testUtil
+        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
     final ExecutableFlow flow = this.runner.getExecutableFlow();
     flow.getExecutableNode("joba").setStatus(Status.DISABLED);
     ((ExecutableFlowBase) flow.getExecutableNode("jobb")).getExecutableNode(
         "innerFlow").setStatus(Status.DISABLED);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     assertStatus("joba", Status.SKIPPED);
     assertStatus("joba1", Status.RUNNING);
@@ -675,11 +649,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   public void testCancel() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector,
+    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
         FailureAction.CANCEL_ALL);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -725,10 +699,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   public void testManualCancelOnFailure() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector);
+    this.runner = this.testUtil
+        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -777,10 +752,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testPause() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector);
+    this.runner = this.testUtil
+        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -861,10 +837,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testPauseKill() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector);
+    this.runner = this.testUtil
+        .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -910,11 +887,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testPauseFail() throws Exception {
     this.eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(this.eventCollector,
+    this.runner = this.testUtil.createFromFlowMap(this.eventCollector, "jobf",
         FailureAction.FINISH_CURRENTLY_RUNNING);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -970,11 +947,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testPauseFailFinishAll() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector,
+    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
         FailureAction.FINISH_ALL_POSSIBLE);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
 
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
@@ -1029,10 +1006,10 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testFlowKilledByJobLevelSLA() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector,
+    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
         FailureAction.CANCEL_ALL);
 
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
     assertStatus("joba", Status.RUNNING);
     assertStatus("joba1", Status.RUNNING);
 
@@ -1054,11 +1031,10 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   @Test
   public void testPauseFailKill() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    this.runner = createFlowRunner(eventCollector,
-        FailureAction.CANCEL_ALL);
+    this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf", FailureAction.CANCEL_ALL);
 
     // 1. START FLOW
-    runFlowRunnerInThread(this.runner);
+    FlowRunnerTestUtil.startThread(this.runner);
     // After it starts up, only joba should be running
     assertStatus("joba", Status.RUNNING);
     assertStatus("joba1", Status.RUNNING);
@@ -1093,46 +1069,4 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertThreadShutDown();
   }
 
-  private void runFlowRunnerInThread(final FlowRunner runner) {
-    final Thread thread = new Thread(runner);
-    thread.start();
-  }
-
-  private FlowRunner createFlowRunner(final EventCollectorListener eventCollector)
-      throws Exception {
-    return createFlowRunner(eventCollector,
-        FailureAction.FINISH_CURRENTLY_RUNNING);
-  }
-
-  private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
-      final FailureAction action) throws Exception {
-    return createFlowRunner(eventCollector, action, new Props());
-  }
-
-  private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
-      final FailureAction action, final Props azkabanProps)
-      throws Exception {
-    final Flow flow = this.flowMap.get("jobf");
-
-    final int exId = id++;
-    final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
-    exFlow.setExecutionPath(this.workingDir.getPath());
-    exFlow.setExecutionId(exId);
-
-    final Map<String, String> flowParam = new HashMap<>();
-    flowParam.put("param4", "override.4");
-    flowParam.put("param10", "override.10");
-    flowParam.put("param11", "override.11");
-    exFlow.getExecutionOptions().addAllFlowParameters(flowParam);
-    exFlow.getExecutionOptions().setFailureAction(action);
-    this.fakeExecutorLoader.uploadExecutableFlow(exFlow);
-
-    final FlowRunner runner = new FlowRunner(
-        this.fakeExecutorLoader.fetchExecutableFlow(exId), this.fakeExecutorLoader,
-        mock(ProjectLoader.class), this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
-    runner.addListener(eventCollector);
-
-    return runner;
-  }
-
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
index 029cbf3..c1aa7a2 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -28,9 +28,14 @@ import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.Status;
 import java.util.function.Function;
 import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
 
 public class FlowRunnerTestBase {
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   protected FlowRunner runner;
   protected EventCollectorListener eventCollector;
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index 8e0d9c9..27158ab 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -16,20 +16,68 @@
 
 package azkaban.execapp;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.InteractiveTestJob;
 import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.DirectoryFlowLoader;
 import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
+import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.junit.rules.TemporaryFolder;
 
 public class FlowRunnerTestUtil {
 
+  private static int id = 101;
+  private final Map<String, Flow> flowMap;
+  private final Project project;
+  private final File workingDir;
+  private final ExecutorLoader fakeExecutorLoader;
+  private final JobTypeManager jobtypeManager;
+  private final File projectDir;
+
+  public FlowRunnerTestUtil(final String flowName, final TemporaryFolder temporaryFolder)
+      throws Exception {
+
+    this.projectDir = ExecutionsTestUtil.getFlowDir(flowName);
+    this.workingDir = temporaryFolder.newFolder();
+    this.project = new Project(1, "testProject");
+
+    this.flowMap = FlowRunnerTestUtil
+        .prepareProject(this.project, ExecutionsTestUtil.getFlowDir(flowName), this.workingDir);
+
+    this.fakeExecutorLoader = mock(ExecutorLoader.class);
+    when(this.fakeExecutorLoader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
+
+    Utils.initServiceProvider();
+    JmxJobMBeanManager.getInstance().initialize(new Props());
+
+    InteractiveTestJob.clearTestJobs();
+
+    this.jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
+    final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
+    pluginSet.addPluginClass("test", InteractiveTestJob.class);
+  }
+
   /**
    * Initialize the project with the flow definitions stored in the given source directory. Also
    * copy the source directory to the working directory.
@@ -58,7 +106,6 @@ public class FlowRunnerTestUtil {
     final Map<String, Flow> flowMap = loader.getFlowMap();
     project.setFlows(flowMap);
     FileUtils.copyDirectory(sourceDir, workingDir);
-
     return flowMap;
   }
 
@@ -75,4 +122,84 @@ public class FlowRunnerTestUtil {
     return execFlow;
   }
 
+  public static void startThread(final FlowRunner runner) {
+    new Thread(runner).start();
+  }
+
+  public FlowRunner createFromFlowFile(final EventCollectorListener eventCollector,
+      final String flowName) throws Exception {
+    return createFromFlowFile(flowName, eventCollector, new ExecutionOptions());
+  }
+
+  public FlowRunner createFromFlowFile(final String flowName,
+      final EventCollectorListener eventCollector,
+      final ExecutionOptions options) throws Exception {
+    final ExecutableFlow exFlow = FlowRunnerTestUtil
+        .prepareExecDir(this.workingDir, this.projectDir, flowName, 1);
+    return createFromExecutableFlow(eventCollector, exFlow, options, new HashMap<>(), new Props());
+  }
+
+  public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
+      final String flowName, final String jobIdPrefix) throws Exception {
+    return createFromFlowMap(eventCollector, flowName, jobIdPrefix,
+        new ExecutionOptions(), new Props());
+  }
+
+  public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
+      final String flowName, final String jobIdPrefix, final ExecutionOptions options)
+      throws Exception {
+    return createFromFlowMap(eventCollector, flowName, jobIdPrefix,
+        options, new Props());
+  }
+
+  public FlowRunner createFromFlowMap(final String flowName,
+      final HashMap<String, String> flowParams) throws Exception {
+    return createFromFlowMap(null, flowName, null, flowParams, new Props());
+  }
+
+  public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
+      final String flowName, final ExecutionOptions options,
+      final Map<String, String> flowParams, final Props azkabanProps)
+      throws Exception {
+    final Flow flow = this.flowMap.get(flowName);
+    final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
+    return createFromExecutableFlow(eventCollector, exFlow, options, flowParams, azkabanProps);
+  }
+
+  public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
+      final String flowName, final FailureAction action)
+      throws Exception {
+    final ExecutionOptions options = new ExecutionOptions();
+    options.setFailureAction(action);
+    return createFromFlowMap(eventCollector, flowName, options, new HashMap<>(), new Props());
+  }
+
+  private FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
+      final String flowName, final String jobIdPrefix, final ExecutionOptions options,
+      final Props azkabanProps)
+      throws Exception {
+    final Map<String, String> flowParams = new HashMap<>();
+    flowParams.put(InteractiveTestJob.JOB_ID_PREFIX, jobIdPrefix);
+    return createFromFlowMap(eventCollector, flowName, options, flowParams, azkabanProps);
+  }
+
+  private FlowRunner createFromExecutableFlow(final EventCollectorListener eventCollector,
+      final ExecutableFlow exFlow, final ExecutionOptions options,
+      final Map<String, String> flowParams, final Props azkabanProps)
+      throws Exception {
+    final int exId = id++;
+    exFlow.setExecutionPath(this.workingDir.getPath());
+    exFlow.setExecutionId(exId);
+    if (options != null) {
+      exFlow.setExecutionOptions(options);
+    }
+    exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
+    final FlowRunner runner =
+        new FlowRunner(exFlow, this.fakeExecutorLoader, mock(ProjectLoader.class),
+            this.jobtypeManager, azkabanProps, null);
+    if (eventCollector != null) {
+      runner.addListener(eventCollector);
+    }
+    return runner;
+  }
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index e7f0bcd..f177c65 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -16,6 +16,10 @@
 
 package azkaban.execapp;
 
+import static java.lang.Thread.State.TIMED_WAITING;
+import static java.lang.Thread.State.WAITING;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import azkaban.event.Event;
 import azkaban.event.EventData;
 import azkaban.executor.ExecutableFlow;
@@ -29,6 +33,10 @@ import azkaban.jobtype.JobTypePluginSet;
 import azkaban.spi.EventType;
 import azkaban.test.TestUtils;
 import azkaban.utils.Props;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 import org.junit.After;
@@ -36,15 +44,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.concurrent.TimeUnit;
-
-import static java.lang.Thread.State.TIMED_WAITING;
-import static java.lang.Thread.State.WAITING;
-import static org.assertj.core.api.Assertions.assertThat;
-
 public class JobRunnerTest {
 
   private final Logger logger = Logger.getLogger("JobRunnerTest");
@@ -211,8 +210,7 @@ public class JobRunnerTest {
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
         || runner.getStatus() != Status.FAILED);
 
-    final Thread thread = new Thread(runner);
-    thread.start();
+    final Thread thread = startThread(runner);
 
     StatusTestUtils.waitForStatus(node, Status.RUNNING);
     runner.kill();
@@ -250,8 +248,7 @@ public class JobRunnerTest {
     eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
-    final Thread thread = new Thread(runner);
-    thread.start();
+    final Thread thread = startThread(runner);
 
     // wait for job to get into delayExecution() -> wait()
     assertThreadIsWaiting(thread);
@@ -293,8 +290,7 @@ public class JobRunnerTest {
     eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
-    final Thread thread = new Thread(runner);
-    thread.start();
+    final Thread thread = startThread(runner);
 
     StatusTestUtils.waitForStatus(node, Status.READY);
     // wait for job to get into delayExecution() -> wait()
@@ -369,4 +365,9 @@ public class JobRunnerTest {
     }
   }
 
+  private Thread startThread(final JobRunner runner) {
+    final Thread thread = new Thread(runner);
+    thread.start();
+    return thread;
+  }
 }