azkaban-aplcache

Changes

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 449d432..8a8e1d1 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -46,9 +46,9 @@ public class PropsUtils {
       .compile("\\$\\{([a-zA-Z_.0-9]+)\\}");
 
   /**
-   * Load job schedules from the given directories ] * @param dir The directory
-   * to look in
+   * Load job schedules from the given directories
    *
+   * @param dir The directory to look in
    * @param suffixes File suffixes to load
    * @return The loaded set of schedules
    */
@@ -160,7 +160,11 @@ public class PropsUtils {
 
     final LinkedHashSet<String> visitedVariables = new LinkedHashSet<>();
     for (final String key : props.getKeySet()) {
-      final String value = props.get(key);
+      String value = props.get(key);
+      if (value == null) {
+        logger.warn("Null value in props for key '" + key + "'. Replacing with empty string.");
+        value = "";
+      }
 
       visitedVariables.add(key);
       final String replacedValue =
@@ -229,8 +233,8 @@ public class PropsUtils {
   }
 
   /**
-   * Function that looks for expressions to parse. It parses backwards to
-   * capture embedded expressions
+   * Function that looks for expressions to parse. It parses backwards to capture embedded
+   * expressions
    */
   private static String resolveVariableExpression(final String value, final int last,
       final JexlEngine jexl) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
index 0bd233d..ba7894a 100644
--- a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
@@ -16,6 +16,8 @@
 
 package azkaban.executor;
 
+import static azkaban.flow.CommonJobProperties.JOB_ATTEMPT;
+
 import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.AbstractProcessJob;
 import azkaban.utils.Props;
@@ -24,7 +26,7 @@ import org.apache.log4j.Logger;
 
 public class InteractiveTestJob extends AbstractProcessJob {
 
-  private static final ConcurrentHashMap<String, InteractiveTestJob> testJobs =
+  public static final ConcurrentHashMap<String, InteractiveTestJob> testJobs =
       new ConcurrentHashMap<>();
   private Props generatedProperties = new Props();
   private boolean isWaiting = true;
@@ -36,7 +38,18 @@ public class InteractiveTestJob extends AbstractProcessJob {
   }
 
   public static InteractiveTestJob getTestJob(final String name) {
-    return testJobs.get(name);
+    for (int i = 0; i < 100; i++) {
+      if (testJobs.containsKey(name)) {
+        return testJobs.get(name);
+      }
+      synchronized (testJobs) {
+        try {
+          InteractiveTestJob.testJobs.wait(10L);
+        } catch (final InterruptedException e) {
+        }
+      }
+    }
+    throw new IllegalStateException(name + " wasn't added in testJobs map");
   }
 
   public static void clearTestJobs() {
@@ -53,12 +66,33 @@ public class InteractiveTestJob extends AbstractProcessJob {
       id = groupName + ":" + id;
     }
     testJobs.put(id, this);
+    synchronized (testJobs) {
+      testJobs.notifyAll();
+    }
+
+    if (this.jobProps.getBoolean("fail", false)) {
+      final int passRetry = this.jobProps.getInt("passRetry", -1);
+      if (passRetry > 0 && passRetry < this.jobProps.getInt(JOB_ATTEMPT)) {
+        succeedJob();
+      } else {
+        failJob();
+      }
+    }
+    if (!this.succeed) {
+      throw new RuntimeException("Forced failure of " + getId());
+    }
 
     while (this.isWaiting) {
       synchronized (this) {
-        try {
-          wait(30000);
-        } catch (final InterruptedException e) {
+        final int waitMillis = this.jobProps.getInt("seconds", 5) * 1000;
+        if (waitMillis > 0) {
+          try {
+            wait(waitMillis);
+          } catch (final InterruptedException e) {
+          }
+        }
+        if (this.jobProps.containsKey("fail")) {
+          succeedJob();
         }
 
         if (!this.isWaiting) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 728cb16..148a0a1 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -16,130 +16,125 @@
 
 package azkaban.executor;
 
+import azkaban.executor.ExecutorLogEvent.EventType;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import azkaban.executor.ExecutorLogEvent.EventType;
-import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.Pair;
-import azkaban.utils.Props;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class MockExecutorLoader implements ExecutorLoader {
 
-  HashMap<Integer, Integer> executionExecutorMapping =
-      new HashMap<Integer, Integer>();
-  HashMap<Integer, ExecutableFlow> flows =
-      new HashMap<Integer, ExecutableFlow>();
-  HashMap<String, ExecutableNode> nodes = new HashMap<String, ExecutableNode>();
-  HashMap<Integer, ExecutionReference> refs =
-      new HashMap<Integer, ExecutionReference>();
+  Map<Integer, Integer> executionExecutorMapping = new ConcurrentHashMap<>();
+  Map<Integer, ExecutableFlow> flows = new ConcurrentHashMap<>();
+  Map<String, ExecutableNode> nodes = new ConcurrentHashMap<>();
+  Map<Integer, ExecutionReference> refs = new ConcurrentHashMap<>();
   int flowUpdateCount = 0;
-  HashMap<String, Integer> jobUpdateCount = new HashMap<String, Integer>();
-  Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
-      new HashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
-  List<Executor> executors = new ArrayList<Executor>();
+  Map<String, Integer> jobUpdateCount = new ConcurrentHashMap<>();
+  Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new ConcurrentHashMap<>();
+  List<Executor> executors = new ArrayList<>();
   int executorIdCounter = 0;
-  Map<Integer, ArrayList<ExecutorLogEvent>> executorEvents =
-    new HashMap<Integer, ArrayList<ExecutorLogEvent>>();
+  Map<Integer, ArrayList<ExecutorLogEvent>> executorEvents = new ConcurrentHashMap<>();
 
   @Override
-  public void uploadExecutableFlow(ExecutableFlow flow)
+  public void uploadExecutableFlow(final ExecutableFlow flow)
       throws ExecutorManagerException {
-    flows.put(flow.getExecutionId(), flow);
-    flowUpdateCount++;
+    this.flows.put(flow.getExecutionId(), flow);
+    this.flowUpdateCount++;
   }
 
   @Override
-  public ExecutableFlow fetchExecutableFlow(int execId)
+  public ExecutableFlow fetchExecutableFlow(final int execId)
       throws ExecutorManagerException {
-    ExecutableFlow flow = flows.get(execId);
+    final ExecutableFlow flow = this.flows.get(execId);
     return ExecutableFlow.createExecutableFlowFromObject(flow.toObject());
   }
 
   @Override
   public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException {
-    return activeFlows;
+    return this.activeFlows;
   }
 
   @Override
-  public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(int execId)
+  public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
       throws ExecutorManagerException {
-    return activeFlows.get(execId);
+    return this.activeFlows.get(execId);
   }
 
   @Override
-  public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
-      int skip, int num) throws ExecutorManagerException {
+  public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+      final int skip, final int num) throws ExecutorManagerException {
     return null;
   }
 
   @Override
-  public void addActiveExecutableReference(ExecutionReference ref)
+  public void addActiveExecutableReference(final ExecutionReference ref)
       throws ExecutorManagerException {
-    refs.put(ref.getExecId(), ref);
+    this.refs.put(ref.getExecId(), ref);
   }
 
   @Override
-  public void removeActiveExecutableReference(int execId)
+  public void removeActiveExecutableReference(final int execId)
       throws ExecutorManagerException {
-    refs.remove(execId);
+    this.refs.remove(execId);
   }
 
-  public boolean hasActiveExecutableReference(int execId) {
-    return refs.containsKey(execId);
+  public boolean hasActiveExecutableReference(final int execId) {
+    return this.refs.containsKey(execId);
   }
 
   @Override
-  public void uploadLogFile(int execId, String name, int attempt, File... files)
+  public void uploadLogFile(final int execId, final String name, final int attempt,
+      final File... files)
       throws ExecutorManagerException {
 
   }
 
   @Override
-  public void updateExecutableFlow(ExecutableFlow flow)
+  public void updateExecutableFlow(final ExecutableFlow flow)
       throws ExecutorManagerException {
-    ExecutableFlow toUpdate = flows.get(flow.getExecutionId());
+    final ExecutableFlow toUpdate = this.flows.get(flow.getExecutionId());
 
     toUpdate.applyUpdateObject((Map<String, Object>) flow.toUpdateObject(0));
-    flowUpdateCount++;
+    this.flowUpdateCount++;
   }
 
   @Override
-  public void uploadExecutableNode(ExecutableNode node, Props inputParams)
+  public void uploadExecutableNode(final ExecutableNode node, final Props inputParams)
       throws ExecutorManagerException {
-    ExecutableNode exNode = new ExecutableNode();
+    final ExecutableNode exNode = new ExecutableNode();
     exNode.fillExecutableFromMapObject(node.toObject());
 
-    nodes.put(node.getId(), exNode);
-    jobUpdateCount.put(node.getId(), 1);
+    this.nodes.put(node.getId(), exNode);
+    this.jobUpdateCount.put(node.getId(), 1);
   }
 
   @Override
-  public void updateExecutableNode(ExecutableNode node)
+  public void updateExecutableNode(final ExecutableNode node)
       throws ExecutorManagerException {
-    ExecutableNode foundNode = nodes.get(node.getId());
+    final ExecutableNode foundNode = this.nodes.get(node.getId());
     foundNode.setEndTime(node.getEndTime());
     foundNode.setStartTime(node.getStartTime());
     foundNode.setStatus(node.getStatus());
     foundNode.setUpdateTime(node.getUpdateTime());
 
-    Integer value = jobUpdateCount.get(node.getId());
+    Integer value = this.jobUpdateCount.get(node.getId());
     if (value == null) {
       throw new ExecutorManagerException("The node has not been uploaded");
     } else {
-      jobUpdateCount.put(node.getId(), ++value);
+      this.jobUpdateCount.put(node.getId(), ++value);
     }
 
-    flowUpdateCount++;
+    this.flowUpdateCount++;
   }
 
   @Override
-  public int fetchNumExecutableFlows(int projectId, String flowId)
+  public int fetchNumExecutableFlows(final int projectId, final String flowId)
       throws ExecutorManagerException {
     return 0;
   }
@@ -151,114 +146,116 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   public int getFlowUpdateCount() {
-    return flowUpdateCount;
+    return this.flowUpdateCount;
   }
 
-  public Integer getNodeUpdateCount(String jobId) {
-    return jobUpdateCount.get(jobId);
+  public Integer getNodeUpdateCount(final String jobId) {
+    return this.jobUpdateCount.get(jobId);
   }
 
   @Override
-  public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempt)
+  public ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempt)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public boolean updateExecutableReference(int execId, long updateTime)
+  public boolean updateExecutableReference(final int execId, final long updateTime)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return true;
   }
 
   @Override
-  public LogData fetchLogs(int execId, String name, int attempt, int startByte,
-      int endByte) throws ExecutorManagerException {
+  public LogData fetchLogs(final int execId, final String name, final int attempt,
+      final int startByte,
+      final int endByte) throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public List<ExecutableFlow> fetchFlowHistory(int skip, int num)
+  public List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public List<ExecutableFlow> fetchFlowHistory(String projectContains,
-      String flowContains, String userNameContains, int status, long startData,
-      long endData, int skip, int num) throws ExecutorManagerException {
+  public List<ExecutableFlow> fetchFlowHistory(final String projectContains,
+      final String flowContains, final String userNameContains, final int status,
+      final long startData,
+      final long endData, final int skip, final int num) throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId,
-      int skip, int size) throws ExecutorManagerException {
+  public List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
+      final int skip, final int size) throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public int fetchNumExecutableNodes(int projectId, String jobId)
+  public int fetchNumExecutableNodes(final int projectId, final String jobId)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return 0;
   }
 
   @Override
-  public Props fetchExecutionJobInputProps(int execId, String jobId)
+  public Props fetchExecutionJobInputProps(final int execId, final String jobId)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public Props fetchExecutionJobOutputProps(int execId, String jobId)
+  public Props fetchExecutionJobOutputProps(final int execId, final String jobId)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId)
+  public Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId)
+  public List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public int removeExecutionLogsByTime(long millis)
+  public int removeExecutionLogsByTime(final long millis)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return 0;
   }
 
   @Override
-  public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId,
-      int skip, int num, Status status) throws ExecutorManagerException {
+  public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+      final int skip, final int num, final Status status) throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public List<Object> fetchAttachments(int execId, String name, int attempt)
+  public List<Object> fetchAttachments(final int execId, final String name, final int attempt)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public void uploadAttachmentFile(ExecutableNode node, File file)
+  public void uploadAttachmentFile(final ExecutableNode node, final File file)
       throws ExecutorManagerException {
     // TODO Auto-generated method stub
 
@@ -266,8 +263,8 @@ public class MockExecutorLoader implements ExecutorLoader {
 
   @Override
   public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
-    List<Executor> activeExecutors = new ArrayList<Executor>();
-    for (Executor executor : executors) {
+    final List<Executor> activeExecutors = new ArrayList<>();
+    for (final Executor executor : this.executors) {
       if (executor.isActive()) {
         activeExecutors.add(executor);
       }
@@ -276,9 +273,9 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public Executor fetchExecutor(String host, int port)
-    throws ExecutorManagerException {
-    for (Executor executor : executors) {
+  public Executor fetchExecutor(final String host, final int port)
+      throws ExecutorManagerException {
+    for (final Executor executor : this.executors) {
       if (executor.getHost().equals(host) && executor.getPort() == port) {
         return executor;
       }
@@ -287,8 +284,8 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
-    for (Executor executor : executors) {
+  public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
+    for (final Executor executor : this.executors) {
       if (executor.getId() == executorId) {
         return executor;
       }
@@ -297,95 +294,95 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public Executor addExecutor(String host, int port)
-    throws ExecutorManagerException {
+  public Executor addExecutor(final String host, final int port)
+      throws ExecutorManagerException {
     Executor executor = null;
     if (fetchExecutor(host, port) == null) {
-      executorIdCounter++;
-      executor = new Executor(executorIdCounter, host, port, true);
-      executors.add(executor);
+      this.executorIdCounter++;
+      executor = new Executor(this.executorIdCounter, host, port, true);
+      this.executors.add(executor);
     }
     return executor;
   }
 
   @Override
-  public void removeExecutor(String host, int port) throws ExecutorManagerException {
-    Executor executor = fetchExecutor(host, port);
+  public void removeExecutor(final String host, final int port) throws ExecutorManagerException {
+    final Executor executor = fetchExecutor(host, port);
     if (executor != null) {
-        executorIdCounter--;
-        executors.remove(executor);
+      this.executorIdCounter--;
+      this.executors.remove(executor);
     }
   }
 
   @Override
-  public void postExecutorEvent(Executor executor, EventType type, String user,
-    String message) throws ExecutorManagerException {
-    ExecutorLogEvent event =
-      new ExecutorLogEvent(executor.getId(), user, new Date(), type, message);
+  public void postExecutorEvent(final Executor executor, final EventType type, final String user,
+      final String message) throws ExecutorManagerException {
+    final ExecutorLogEvent event =
+        new ExecutorLogEvent(executor.getId(), user, new Date(), type, message);
 
-    if (!executorEvents.containsKey(executor.getId())) {
-      executorEvents.put(executor.getId(), new ArrayList<ExecutorLogEvent>());
+    if (!this.executorEvents.containsKey(executor.getId())) {
+      this.executorEvents.put(executor.getId(), new ArrayList<>());
     }
 
-    executorEvents.get(executor.getId()).add(event);
+    this.executorEvents.get(executor.getId()).add(event);
   }
 
   @Override
-  public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
-    int skip) throws ExecutorManagerException {
-    if (!executorEvents.containsKey(executor.getId())) {
-      List<ExecutorLogEvent> events = executorEvents.get(executor.getId());
+  public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
+      final int skip) throws ExecutorManagerException {
+    if (!this.executorEvents.containsKey(executor.getId())) {
+      final List<ExecutorLogEvent> events = this.executorEvents.get(executor.getId());
       return events.subList(skip, Math.min(num + skip - 1, events.size() - 1));
     }
     return null;
   }
 
   @Override
-  public void updateExecutor(Executor executor) throws ExecutorManagerException {
-    Executor oldExecutor = fetchExecutor(executor.getId());
-    executors.remove(oldExecutor);
-    executors.add(executor);
+  public void updateExecutor(final Executor executor) throws ExecutorManagerException {
+    final Executor oldExecutor = fetchExecutor(executor.getId());
+    this.executors.remove(oldExecutor);
+    this.executors.add(executor);
   }
 
   @Override
   public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
-    return executors;
+    return this.executors;
   }
 
   @Override
-  public void assignExecutor(int executorId, int execId)
-    throws ExecutorManagerException {
-    ExecutionReference ref = refs.get(execId);
+  public void assignExecutor(final int executorId, final int execId)
+      throws ExecutorManagerException {
+    final ExecutionReference ref = this.refs.get(execId);
     ref.setExecutor(fetchExecutor(executorId));
-    executionExecutorMapping.put(execId, executorId);
+    this.executionExecutorMapping.put(execId, executorId);
   }
 
   @Override
-  public Executor fetchExecutorByExecutionId(int execId) throws ExecutorManagerException {
-    if (executionExecutorMapping.containsKey(execId)) {
-      return fetchExecutor(executionExecutorMapping.get(execId));
+  public Executor fetchExecutorByExecutionId(final int execId) throws ExecutorManagerException {
+    if (this.executionExecutorMapping.containsKey(execId)) {
+      return fetchExecutor(this.executionExecutorMapping.get(execId));
     } else {
       throw new ExecutorManagerException(
-        "Failed to find executor with execution : " + execId);
+          "Failed to find executor with execution : " + execId);
     }
   }
 
   @Override
   public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
-    throws ExecutorManagerException {
-    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
-      new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
-    for (int execId : refs.keySet()) {
-      if (!executionExecutorMapping.containsKey(execId)) {
-        queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(refs
-          .get(execId), flows.get(execId)));
+      throws ExecutorManagerException {
+    final List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+        new ArrayList<>();
+    for (final int execId : this.refs.keySet()) {
+      if (!this.executionExecutorMapping.containsKey(execId)) {
+        queuedFlows.add(new Pair<>(this.refs
+            .get(execId), this.flows.get(execId)));
       }
     }
     return queuedFlows;
   }
 
   @Override
-  public void unassignExecutor(int executionId) throws ExecutorManagerException {
-    executionExecutorMapping.remove(executionId);
+  public void unassignExecutor(final int executionId) throws ExecutorManagerException {
+    this.executionExecutorMapping.remove(executionId);
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/AllJobExecutorTests.java b/azkaban-common/src/test/java/azkaban/jobExecutor/AllJobExecutorTests.java
index f3b5ea6..64ea2cc 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/AllJobExecutorTests.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/AllJobExecutorTests.java
@@ -19,9 +19,9 @@ package azkaban.jobExecutor;
 import azkaban.flow.CommonJobProperties;
 import azkaban.utils.Props;
 
-class AllJobExecutorTests {
+public class AllJobExecutorTests {
 
-  static Props setUpCommonProps() {
+  public static Props setUpCommonProps() {
 
     final Props props = new Props();
     props.put("fullPath", ".");
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 0eae405..603af51 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -131,8 +131,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   }
 
   /**
-   * Constructor. If executorService is null, then it will create it's own for
-   * thread pools.
+   * Constructor. If executorService is null, then it will create it's own for thread pools.
    */
   public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
@@ -257,7 +256,11 @@ public class FlowRunner extends EventHandler implements Runnable {
       this.watcher.setLogger(this.logger);
     }
 
-    this.logger.info("Assigned executor : " + AzkabanExecutorServer.getApp().getExecutorHostPort());
+    // Avoid NPE in unit tests when the static app instance is not set
+    if (AzkabanExecutorServer.getApp() != null) {
+      this.logger
+          .info("Assigned executor : " + AzkabanExecutorServer.getApp().getExecutorHostPort());
+    }
     this.logger.info("Running execid:" + this.execId + " flow:" + flowId + " project:"
         + projectId + " version:" + version);
     if (this.pipelineExecId != null) {
@@ -730,8 +733,8 @@ public class FlowRunner extends EventHandler implements Runnable {
   }
 
   /**
-   * Determines what the state of the next node should be. Returns null if the
-   * node should not be run.
+   * Determines what the state of the next node should be. Returns null if the node should not be
+   * run.
    */
   public Status getImpliedStatus(final ExecutableNode node) {
     // If it's running or finished with 'SUCCEEDED', than don't even
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index f19b91f..13af90a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -60,6 +60,8 @@ import org.json.simple.JSONObject;
 public class JobRunner extends EventHandler implements Runnable {
 
   public static final String AZKABAN_WEBSERVER_URL = "azkaban.webserver.url";
+
+  private static final Logger serverLogger = Logger.getLogger(JobRunner.class);
   private static final Object logCreatorLock = new Object();
   private final Layout DEFAULT_LAYOUT = new EnhancedPatternLayout(
       "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
@@ -226,8 +228,8 @@ public class JobRunner extends EventHandler implements Runnable {
   }
 
   /**
-   * Returns a list of jobs that this JobRunner will wait upon to finish before
-   * starting. It is only relevant if pipeline is turned on.
+   * Returns a list of jobs that this JobRunner will wait upon to finish before starting. It is only
+   * relevant if pipeline is turned on.
    */
   public Set<String> getPipelineWatchedJobs() {
     return this.pipelineJobs;
@@ -398,8 +400,8 @@ public class JobRunner extends EventHandler implements Runnable {
   }
 
   /**
-   * Used to handle non-ready and special status's (i.e. KILLED). Returns true
-   * if they handled anything.
+   * Used to handle non-ready and special status's (i.e. KILLED). Returns true if they handled
+   * anything.
    */
   private boolean handleNonReadyStatus() {
     Status nodeStatus = this.node.getStatus();
@@ -552,6 +554,15 @@ public class JobRunner extends EventHandler implements Runnable {
    */
   @Override
   public void run() {
+    try {
+      doRun();
+    } catch (final Exception e) {
+      serverLogger.error("Unexpected exception", e);
+      throw e;
+    }
+  }
+
+  private void doRun() {
     Thread.currentThread().setName(
         "JobRunner-" + this.jobId + "-" + this.executionId);
 
@@ -678,8 +689,8 @@ public class JobRunner extends EventHandler implements Runnable {
   }
 
   /**
-   * Add useful JVM arguments so it is easier to map a running Java process to a
-   * flow, execution id and job
+   * Add useful JVM arguments so it is easier to map a running Java process to a flow, execution id
+   * and job
    */
   private void insertJVMAargs() {
     final String flowName = this.node.getParentFlow().getFlowId();
@@ -698,8 +709,8 @@ public class JobRunner extends EventHandler implements Runnable {
   }
 
   /**
-   * Add relevant links to the job properties so that downstream consumers may
-   * know what executions initiated their execution.
+   * Add relevant links to the job properties so that downstream consumers may know what executions
+   * initiated their execution.
    */
   private void insertJobMetadata() {
     final String baseURL = this.azkabanProps.get(AZKABAN_WEBSERVER_URL);
@@ -744,8 +755,13 @@ public class JobRunner extends EventHandler implements Runnable {
         logError("Job run failed, but will treat it like success.");
         logError(e.getMessage() + " cause: " + e.getCause(), e);
       } else {
-        finalStatus = changeStatus(Status.FAILED);
-        logError("Job run failed!", e);
+        if (isKilled() || this.node.getStatus() == Status.KILLED) {
+          finalStatus = Status.KILLED;
+          logError("Job run killed!", e);
+        } else {
+          finalStatus = changeStatus(Status.FAILED);
+          logError("Job run failed!", e);
+        }
         logError(e.getMessage() + " cause: " + e.getCause());
       }
     }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
index 34bfc2b..e4253c5 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/EventCollectorListener.java
@@ -19,27 +19,33 @@ package azkaban.execapp;
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
 import azkaban.event.EventListener;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public class EventCollectorListener implements EventListener {
 
-  private final ArrayList<Event> eventList = new ArrayList<>();
+  public static final Object handleEvent = new Object();
+  // CopyOnWriteArrayList allows concurrent iteration and modification
+  private final List<Event> eventList = new CopyOnWriteArrayList<>();
   private final HashSet<Event.Type> filterOutTypes = new HashSet<>();
 
   public void setEventFilterOut(final Event.Type... types) {
     this.filterOutTypes.addAll(Arrays.asList(types));
   }
-
+  
   @Override
   public void handleEvent(final Event event) {
+    synchronized (handleEvent) {
+      handleEvent.notifyAll();
+    }
     if (!this.filterOutTypes.contains(event.getType())) {
       this.eventList.add(event);
     }
   }
 
-  public ArrayList<Event> getEventList() {
+  public List<Event> getEventList() {
     return this.eventList;
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index 98b06cf..11709c6 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -16,22 +16,27 @@
 
 package azkaban.execapp;
 
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.when;
+
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
+import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
-import azkaban.executor.JavaJob;
-import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
+import azkaban.jobExecutor.AllJobExecutorTests;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.MockProjectLoader;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.test.Utils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 import java.io.File;
@@ -41,35 +46,38 @@ import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
-public class FlowRunnerTest {
+public class FlowRunnerTest extends FlowRunnerTestBase {
 
+  private static final File TEST_DIR = new File(
+      "../azkaban-test/src/test/resources/azkaban/test/executions/exectest1");
   private File workingDir;
   private JobTypeManager jobtypeManager;
   private ProjectLoader fakeProjectLoader;
-
-  public FlowRunnerTest() {
-
-  }
+  @Mock
+  private ExecutorLoader loader;
 
   @Before
   public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(this.loader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
     System.out.println("Create temp dir");
-    synchronized (this) {
-      this.workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
-      if (this.workingDir.exists()) {
-        FileUtils.deleteDirectory(this.workingDir);
-      }
-      this.workingDir.mkdirs();
+    this.workingDir = new File("build/tmp/_AzkabanTestDir_" + System.currentTimeMillis());
+    if (this.workingDir.exists()) {
+      FileUtils.deleteDirectory(this.workingDir);
     }
+    this.workingDir.mkdirs();
     this.jobtypeManager =
         new JobTypeManager(null, null, this.getClass().getClassLoader());
     final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
-    pluginSet.addPluginClass("java", JavaJob.class);
+    pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
     pluginSet.addPluginClass("test", InteractiveTestJob.class);
     this.fakeProjectLoader = new MockProjectLoader(this.workingDir);
+    Utils.initServiceProvider();
+    JmxJobMBeanManager.getInstance().initialize(new Props());
 
     InteractiveTestJob.clearTestJobs();
   }
@@ -85,32 +93,29 @@ public class FlowRunnerTest {
     }
   }
 
-  @Ignore
   @Test
   public void exec1Normal() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
-    // just making compile. may not work at all.
-
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
+    this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
 
-    Assert.assertTrue(!runner.isKilled());
-    runner.run();
-    final ExecutableFlow exFlow = runner.getExecutableFlow();
-    Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
-    compareFinishedRuntime(runner);
-
-    testStatus(exFlow, "job1", Status.SUCCEEDED);
-    testStatus(exFlow, "job2", Status.SUCCEEDED);
-    testStatus(exFlow, "job3", Status.SUCCEEDED);
-    testStatus(exFlow, "job4", Status.SUCCEEDED);
-    testStatus(exFlow, "job5", Status.SUCCEEDED);
-    testStatus(exFlow, "job6", Status.SUCCEEDED);
-    testStatus(exFlow, "job7", Status.SUCCEEDED);
-    testStatus(exFlow, "job8", Status.SUCCEEDED);
-    testStatus(exFlow, "job10", Status.SUCCEEDED);
+    startThread(this.runner);
+    succeedJobs("job3", "job4", "job6");
+
+    assertFlowStatus(Status.SUCCEEDED);
+    assertThreadShutDown();
+    compareFinishedRuntime(this.runner);
+
+    assertStatus("job1", Status.SUCCEEDED);
+    assertStatus("job2", Status.SUCCEEDED);
+    assertStatus("job3", Status.SUCCEEDED);
+    assertStatus("job4", Status.SUCCEEDED);
+    assertStatus("job5", Status.SUCCEEDED);
+    assertStatus("job6", Status.SUCCEEDED);
+    assertStatus("job7", Status.SUCCEEDED);
+    assertStatus("job8", Status.SUCCEEDED);
+    assertStatus("job10", Status.SUCCEEDED);
 
     try {
       eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -122,15 +127,12 @@ public class FlowRunnerTest {
     }
   }
 
-  @Ignore
   @Test
   public void exec1Disabled() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final File testDir = new File("unit/executions/exectest1");
-    ExecutableFlow exFlow = prepareExecDir(testDir, "exec1", 1);
+    final ExecutableFlow exFlow = prepareExecDir(TEST_DIR, "exec1", 1);
 
     // Disable couple in the middle and at the end.
     exFlow.getExecutableNode("job1").setStatus(Status.DISABLED);
@@ -138,26 +140,28 @@ public class FlowRunnerTest {
     exFlow.getExecutableNode("job5").setStatus(Status.DISABLED);
     exFlow.getExecutableNode("job10").setStatus(Status.DISABLED);
 
-    final FlowRunner runner = createFlowRunner(exFlow, loader, eventCollector);
+    this.runner = createFlowRunner(exFlow, this.loader, eventCollector);
 
-    Assert.assertTrue(!runner.isKilled());
-    Assert.assertTrue(exFlow.getStatus() == Status.READY);
-    runner.run();
+    Assert.assertTrue(!this.runner.isKilled());
+    assertFlowStatus(Status.READY);
 
-    exFlow = runner.getExecutableFlow();
-    compareFinishedRuntime(runner);
+    startThread(this.runner);
+    succeedJobs("job3", "job4");
 
-    Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
+    assertThreadShutDown();
+    compareFinishedRuntime(this.runner);
 
-    testStatus(exFlow, "job1", Status.SKIPPED);
-    testStatus(exFlow, "job2", Status.SUCCEEDED);
-    testStatus(exFlow, "job3", Status.SUCCEEDED);
-    testStatus(exFlow, "job4", Status.SUCCEEDED);
-    testStatus(exFlow, "job5", Status.SKIPPED);
-    testStatus(exFlow, "job6", Status.SKIPPED);
-    testStatus(exFlow, "job7", Status.SUCCEEDED);
-    testStatus(exFlow, "job8", Status.SUCCEEDED);
-    testStatus(exFlow, "job10", Status.SKIPPED);
+    assertFlowStatus(Status.SUCCEEDED);
+
+    assertStatus("job1", Status.SKIPPED);
+    assertStatus("job2", Status.SUCCEEDED);
+    assertStatus("job3", Status.SUCCEEDED);
+    assertStatus("job4", Status.SUCCEEDED);
+    assertStatus("job5", Status.SKIPPED);
+    assertStatus("job6", Status.SKIPPED);
+    assertStatus("job7", Status.SUCCEEDED);
+    assertStatus("job8", Status.SUCCEEDED);
+    assertStatus("job10", Status.SKIPPED);
 
     try {
       eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -169,34 +173,32 @@ public class FlowRunnerTest {
     }
   }
 
-  @Ignore
   @Test
   public void exec1Failed() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final File testDir = new File("unit/executions/exectest1");
-    final ExecutableFlow flow = prepareExecDir(testDir, "exec2", 1);
+    final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec2", 1);
 
-    final FlowRunner runner = createFlowRunner(flow, loader, eventCollector);
+    this.runner = createFlowRunner(flow, this.loader, eventCollector);
 
-    runner.run();
-    final ExecutableFlow exFlow = runner.getExecutableFlow();
-    Assert.assertTrue(!runner.isKilled());
-    Assert.assertTrue("Flow status " + exFlow.getStatus(),
-        exFlow.getStatus() == Status.FAILED);
-
-    testStatus(exFlow, "job1", Status.SUCCEEDED);
-    testStatus(exFlow, "job2d", Status.FAILED);
-    testStatus(exFlow, "job3", Status.CANCELLED);
-    testStatus(exFlow, "job4", Status.CANCELLED);
-    testStatus(exFlow, "job5", Status.CANCELLED);
-    testStatus(exFlow, "job6", Status.SUCCEEDED);
-    testStatus(exFlow, "job7", Status.CANCELLED);
-    testStatus(exFlow, "job8", Status.CANCELLED);
-    testStatus(exFlow, "job9", Status.CANCELLED);
-    testStatus(exFlow, "job10", Status.CANCELLED);
+    startThread(this.runner);
+    succeedJobs("job6");
+
+    Assert.assertTrue(!this.runner.isKilled());
+    assertFlowStatus(Status.FAILED);
+
+    assertStatus("job1", Status.SUCCEEDED);
+    assertStatus("job2d", Status.FAILED);
+    assertStatus("job3", Status.CANCELLED);
+    assertStatus("job4", Status.CANCELLED);
+    assertStatus("job5", Status.CANCELLED);
+    assertStatus("job6", Status.SUCCEEDED);
+    assertStatus("job7", Status.CANCELLED);
+    assertStatus("job8", Status.CANCELLED);
+    assertStatus("job9", Status.CANCELLED);
+    assertStatus("job10", Status.CANCELLED);
+    assertThreadShutDown();
 
     try {
       eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -208,43 +210,33 @@ public class FlowRunnerTest {
     }
   }
 
-  @Ignore
   @Test
   public void exec1FailedKillAll() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final File testDir = new File("unit/executions/exectest1");
-    final ExecutableFlow flow = prepareExecDir(testDir, "exec2", 1);
+    final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec2", 1);
     flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
 
-    final FlowRunner runner = createFlowRunner(flow, loader, eventCollector);
+    this.runner = createFlowRunner(flow, this.loader, eventCollector);
 
-    runner.run();
-    final ExecutableFlow exFlow = runner.getExecutableFlow();
+    startThread(this.runner);
+    assertThreadShutDown();
 
-    Assert.assertTrue(runner.isKilled());
+    Assert.assertTrue(this.runner.isKilled());
 
-    Assert.assertTrue(
-        "Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(),
-        exFlow.getStatus() == Status.FAILED);
-
-    try {
-      Thread.sleep(500);
-    } catch (final InterruptedException e) {
-    }
+    assertFlowStatus(Status.KILLED);
 
-    testStatus(exFlow, "job1", Status.SUCCEEDED);
-    testStatus(exFlow, "job2d", Status.FAILED);
-    testStatus(exFlow, "job3", Status.CANCELLED);
-    testStatus(exFlow, "job4", Status.CANCELLED);
-    testStatus(exFlow, "job5", Status.CANCELLED);
-    testStatus(exFlow, "job6", Status.KILLED);
-    testStatus(exFlow, "job7", Status.CANCELLED);
-    testStatus(exFlow, "job8", Status.CANCELLED);
-    testStatus(exFlow, "job9", Status.CANCELLED);
-    testStatus(exFlow, "job10", Status.CANCELLED);
+    assertStatus("job1", Status.SUCCEEDED);
+    assertStatus("job2d", Status.FAILED);
+    assertStatus("job3", Status.CANCELLED);
+    assertStatus("job4", Status.CANCELLED);
+    assertStatus("job5", Status.CANCELLED);
+    assertStatus("job6", Status.KILLED);
+    assertStatus("job7", Status.CANCELLED);
+    assertStatus("job8", Status.CANCELLED);
+    assertStatus("job9", Status.CANCELLED);
+    assertStatus("job10", Status.CANCELLED);
 
     try {
       eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -256,40 +248,32 @@ public class FlowRunnerTest {
     }
   }
 
-  @Ignore
   @Test
   public void exec1FailedFinishRest() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final File testDir = new File("unit/executions/exectest1");
-    final ExecutableFlow flow = prepareExecDir(testDir, "exec3", 1);
+    final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec3", 1);
     flow.getExecutionOptions().setFailureAction(
         FailureAction.FINISH_ALL_POSSIBLE);
-    final FlowRunner runner = createFlowRunner(flow, loader, eventCollector);
+    this.runner = createFlowRunner(flow, this.loader, eventCollector);
 
-    runner.run();
-    final ExecutableFlow exFlow = runner.getExecutableFlow();
-    Assert.assertTrue(
-        "Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(),
-        exFlow.getStatus() == Status.FAILED);
+    startThread(this.runner);
+    succeedJobs("job3");
 
-    try {
-      Thread.sleep(500);
-    } catch (final InterruptedException e) {
-    }
+    assertFlowStatus(Status.FAILED);
 
-    testStatus(exFlow, "job1", Status.SUCCEEDED);
-    testStatus(exFlow, "job2d", Status.FAILED);
-    testStatus(exFlow, "job3", Status.SUCCEEDED);
-    testStatus(exFlow, "job4", Status.CANCELLED);
-    testStatus(exFlow, "job5", Status.CANCELLED);
-    testStatus(exFlow, "job6", Status.CANCELLED);
-    testStatus(exFlow, "job7", Status.SUCCEEDED);
-    testStatus(exFlow, "job8", Status.SUCCEEDED);
-    testStatus(exFlow, "job9", Status.SUCCEEDED);
-    testStatus(exFlow, "job10", Status.CANCELLED);
+    assertStatus("job1", Status.SUCCEEDED);
+    assertStatus("job2d", Status.FAILED);
+    assertStatus("job3", Status.SUCCEEDED);
+    assertStatus("job4", Status.CANCELLED);
+    assertStatus("job5", Status.CANCELLED);
+    assertStatus("job6", Status.CANCELLED);
+    assertStatus("job7", Status.SUCCEEDED);
+    assertStatus("job8", Status.SUCCEEDED);
+    assertStatus("job9", Status.SUCCEEDED);
+    assertStatus("job10", Status.CANCELLED);
+    assertThreadShutDown();
 
     try {
       eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -301,50 +285,32 @@ public class FlowRunnerTest {
     }
   }
 
-  @Ignore
   @Test
   public void execAndCancel() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
+    this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
 
-    Assert.assertTrue(!runner.isKilled());
-    final Thread thread = new Thread(runner);
-    thread.start();
+    startThread(this.runner);
 
-    try {
-      Thread.sleep(5000);
-    } catch (final InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
+    assertStatus("job1", Status.SUCCEEDED);
+    assertStatus("job2", Status.SUCCEEDED);
+    waitJobsStarted(this.runner, "job3", "job4", "job6");
 
-    runner.kill("me");
-    Assert.assertTrue(runner.isKilled());
+    this.runner.kill("me");
+    Assert.assertTrue(this.runner.isKilled());
 
-    try {
-      Thread.sleep(2000);
-    } catch (final InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
+    assertStatus("job5", Status.CANCELLED);
+    assertStatus("job7", Status.CANCELLED);
+    assertStatus("job8", Status.CANCELLED);
+    assertStatus("job10", Status.CANCELLED);
+    assertStatus("job3", Status.KILLED);
+    assertStatus("job4", Status.KILLED);
+    assertStatus("job6", Status.KILLED);
+    assertThreadShutDown();
 
-    final ExecutableFlow exFlow = runner.getExecutableFlow();
-    testStatus(exFlow, "job1", Status.SUCCEEDED);
-    testStatus(exFlow, "job2", Status.SUCCEEDED);
-    testStatus(exFlow, "job5", Status.CANCELLED);
-    testStatus(exFlow, "job7", Status.CANCELLED);
-    testStatus(exFlow, "job8", Status.CANCELLED);
-    testStatus(exFlow, "job10", Status.CANCELLED);
-    testStatus(exFlow, "job3", Status.KILLED);
-    testStatus(exFlow, "job4", Status.KILLED);
-    testStatus(exFlow, "job6", Status.KILLED);
-
-    Assert.assertTrue(
-        "Expected FAILED status instead got " + exFlow.getStatus(),
-        exFlow.getStatus() == Status.KILLED);
+    assertFlowStatus(Status.KILLED);
 
     try {
       eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -356,42 +322,34 @@ public class FlowRunnerTest {
     }
   }
 
-  @Ignore
   @Test
   public void execRetries() throws Exception {
-    final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final FlowRunner runner = createFlowRunner(loader, eventCollector, "exec4-retry");
+    this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");
 
-    runner.run();
+    startThread(this.runner);
+    assertThreadShutDown();
 
-    final ExecutableFlow exFlow = runner.getExecutableFlow();
-    testStatus(exFlow, "job-retry", Status.SUCCEEDED);
-    testStatus(exFlow, "job-pass", Status.SUCCEEDED);
-    testStatus(exFlow, "job-retry-fail", Status.FAILED);
-    testAttempts(exFlow, "job-retry", 3);
-    testAttempts(exFlow, "job-pass", 0);
-    testAttempts(exFlow, "job-retry-fail", 2);
+    assertStatus("job-retry", Status.SUCCEEDED);
+    assertStatus("job-pass", Status.SUCCEEDED);
+    assertStatus("job-retry-fail", Status.FAILED);
+    assertAttempts("job-retry", 3);
+    assertAttempts("job-pass", 0);
+    assertAttempts("job-retry-fail", 2);
 
-    Assert.assertTrue(
-        "Expected FAILED status instead got " + exFlow.getStatus(),
-        exFlow.getStatus() == Status.FAILED);
+    assertFlowStatus(Status.FAILED);
   }
 
-  private void testStatus(final ExecutableFlow flow, final String name, final Status status) {
-    final ExecutableNode node = flow.getExecutableNode(name);
-
-    if (node.getStatus() != status) {
-      Assert.fail("Status of job " + node.getId() + " is " + node.getStatus()
-          + " not " + status + " as expected.");
-    }
+  private void startThread(final FlowRunner runner) {
+    Assert.assertTrue(!runner.isKilled());
+    final Thread thread = new Thread(runner);
+    thread.start();
   }
 
-  private void testAttempts(final ExecutableFlow flow, final String name, final int attempt) {
-    final ExecutableNode node = flow.getExecutableNode(name);
-
+  private void assertAttempts(final String name, final int attempt) {
+    final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNode(name);
     if (node.getAttempt() != attempt) {
       Assert.fail("Expected " + attempt + " got " + node.getAttempt()
           + " attempts " + name);
@@ -400,9 +358,7 @@ public class FlowRunnerTest {
 
   private ExecutableFlow prepareExecDir(final File execDir, final String flowName,
       final int execId) throws IOException {
-    synchronized (this) {
-      FileUtils.copyDirectory(execDir, this.workingDir);
-    }
+    FileUtils.copyDirectory(execDir, this.workingDir);
 
     final File jsonFlowFile = new File(this.workingDir, flowName + ".flow");
     final HashMap<String, Object> flowObj =
@@ -437,8 +393,6 @@ public class FlowRunnerTest {
       return;
     }
 
-    // System.out.println("Node " + node.getJobId() + " start:" + startTime +
-    // " end:" + endTime + " previous:" + previousEndTime);
     Assert.assertTrue("Checking start and end times", startTime > 0
         && endTime >= startTime);
     Assert.assertTrue("Start time for " + node.getId() + " is " + startTime
@@ -459,9 +413,6 @@ public class FlowRunnerTest {
       final ExecutorLoader loader, final EventCollectorListener eventCollector,
       final Props azkabanProps)
       throws Exception {
-    // File testDir = new File("unit/executions/exectest1");
-    // MockProjectLoader projectLoader = new MockProjectLoader(new
-    // File(flow.getExecutionPath()));
 
     loader.uploadExecutableFlow(flow);
     final FlowRunner runner =
@@ -480,10 +431,7 @@ public class FlowRunnerTest {
   private FlowRunner createFlowRunner(final ExecutorLoader loader,
       final EventCollectorListener eventCollector, final String flowName, final Props azkabanProps)
       throws Exception {
-    final File testDir = new File("unit/executions/exectest1");
-    final ExecutableFlow exFlow = prepareExecDir(testDir, flowName, 1);
-    // MockProjectLoader projectLoader = new MockProjectLoader(new
-    // File(exFlow.getExecutionPath()));
+    final ExecutableFlow exFlow = prepareExecDir(TEST_DIR, flowName, 1);
 
     loader.uploadExecutableFlow(exFlow);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index decc17a..540e9bb 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -16,6 +16,9 @@
 
 package azkaban.execapp;
 
+import static org.junit.Assert.assertEquals;
+
+import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
@@ -26,6 +29,7 @@ import azkaban.executor.JavaJob;
 import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
+import azkaban.jobExecutor.AllJobExecutorTests;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.DirectoryFlowLoader;
@@ -33,6 +37,7 @@ import azkaban.project.MockProjectLoader;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
+import azkaban.test.Utils;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -41,56 +46,54 @@ import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
  * Test the flow run, especially with embedded flows.
  *
- * This test uses executions/embedded2. It also mainly uses the flow named
- * jobf. The test is designed to control success/failures explicitly so we
- * don't have to time the flow exactly.
+ * This test uses executions/embedded2. It also mainly uses the flow named jobf. The test is
+ * designed to control success/failures explicitly so we don't have to time the flow exactly.
  *
  * Flow jobf looks like the following:
  *
  *
- * joba       joba1
- * /  |  \      |
- * /   |   \     |
- * jobb  jobd jobc  |
- * \   |   /    /
- * \  |  /    /
- * jobe    /
- * |     /
- * |    /
- * jobf
+ *       joba       joba1
+ *      /  |  \      |
+ *     /   |   \     |
+ *  jobb  jobd jobc  |
+ *     \   |   /    /
+ *      \  |  /    /
+ *        jobe    /
+ *         |     /
+ *         |    /
+ *        jobf
  *
- * The job 'jobb' is an embedded flow:
+ *  The job 'jobb' is an embedded flow:
  *
- * jobb:innerFlow
+ *  jobb:innerFlow
  *
- * innerJobA
- * /       \
- * innerJobB   innerJobC
- * \       /
- * innerFlow
+ *        innerJobA
+ *        /       \
+ *   innerJobB   innerJobC
+ *        \       /
+ *        innerFlow
  *
  *
- * The job 'jobd' is a simple embedded flow:
+ *  The job 'jobd' is a simple embedded flow:
  *
- * jobd:innerFlow2
+ *  jobd:innerFlow2
  *
- * innerJobA
- * |
- * innerFlow2
+ *       innerJobA
+ *           |
+ *       innerFlow2
  *
- * The following tests checks each stage of the flow run by forcing jobs to
- * succeed or fail.
+ * The following tests checks each stage of the flow run by forcing jobs to succeed or fail.
  */
-public class FlowRunnerTest2 {
+public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
+  private static final File TEST_DIR = new File(
+      "../azkaban-test/src/test/resources/azkaban/test/executions/embedded2");
   private static int id = 101;
   private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
   private File workingDir;
@@ -100,13 +103,10 @@ public class FlowRunnerTest2 {
   private Project project;
   private Map<String, Flow> flowMap;
 
-  public FlowRunnerTest2() {
-  }
-
   @Before
   public void setUp() throws Exception {
     System.out.println("Create temp dir");
-    this.workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+    this.workingDir = new File("build/tmp/_AzkabanTestDir_" + System.currentTimeMillis());
     if (this.workingDir.exists()) {
       FileUtils.deleteDirectory(this.workingDir);
     }
@@ -115,14 +115,16 @@ public class FlowRunnerTest2 {
         this.getClass().getClassLoader());
     final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
 
+    pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
     pluginSet.addPluginClass("java", JavaJob.class);
     pluginSet.addPluginClass("test", InteractiveTestJob.class);
     this.fakeProjectLoader = new MockProjectLoader(this.workingDir);
     this.fakeExecutorLoader = new MockExecutorLoader();
     this.project = new Project(1, "testProject");
+    Utils.initServiceProvider();
+    JmxJobMBeanManager.getInstance().initialize(new Props());
 
-    final File dir = new File("unit/executions/embedded2");
-    prepareProject(this.project, dir);
+    prepareProject(this.project, TEST_DIR);
 
     InteractiveTestJob.clearTestJobs();
   }
@@ -137,1231 +139,956 @@ public class FlowRunnerTest2 {
   }
 
   /**
-   * Tests the basic successful flow run, and also tests all output variables
-   * from each job.
+   * Tests the basic successful flow run, and also tests all output variables from each job.
    */
-  @Ignore
   @Test
   public void testBasicRun() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+    this.runner = createFlowRunner(eventCollector, "jobf");
 
     // 1. START FLOW
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-
-    compareStates(expectedStateMap, nodeMap);
-    final Props joba = nodeMap.get("joba").getInputProps();
-    Assert.assertEquals("joba.1", joba.get("param1"));
-    Assert.assertEquals("test1.2", joba.get("param2"));
-    Assert.assertEquals("test1.3", joba.get("param3"));
-    Assert.assertEquals("override.4", joba.get("param4"));
-    Assert.assertEquals("test2.5", joba.get("param5"));
-    Assert.assertEquals("test2.6", joba.get("param6"));
-    Assert.assertEquals("test2.7", joba.get("param7"));
-    Assert.assertEquals("test2.8", joba.get("param8"));
-
-    final Props joba1 = nodeMap.get("joba1").getInputProps();
-    Assert.assertEquals("test1.1", joba1.get("param1"));
-    Assert.assertEquals("test1.2", joba1.get("param2"));
-    Assert.assertEquals("test1.3", joba1.get("param3"));
-    Assert.assertEquals("override.4", joba1.get("param4"));
-    Assert.assertEquals("test2.5", joba1.get("param5"));
-    Assert.assertEquals("test2.6", joba1.get("param6"));
-    Assert.assertEquals("test2.7", joba1.get("param7"));
-    Assert.assertEquals("test2.8", joba1.get("param8"));
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
+
+    final Props joba = this.runner.getExecutableFlow().getExecutableNodePath("joba")
+        .getInputProps();
+    assertEquals("joba.1", joba.get("param1"));
+    assertEquals("test1.2", joba.get("param2"));
+    assertEquals("test1.3", joba.get("param3"));
+    assertEquals("override.4", joba.get("param4"));
+    assertEquals("test2.5", joba.get("param5"));
+    assertEquals("test2.6", joba.get("param6"));
+    assertEquals("test2.7", joba.get("param7"));
+    assertEquals("test2.8", joba.get("param8"));
+
+    final Props joba1 = this.runner.getExecutableFlow().getExecutableNodePath("joba1")
+        .getInputProps();
+    assertEquals("test1.1", joba1.get("param1"));
+    assertEquals("test1.2", joba1.get("param2"));
+    assertEquals("test1.3", joba1.get("param3"));
+    assertEquals("override.4", joba1.get("param4"));
+    assertEquals("test2.5", joba1.get("param5"));
+    assertEquals("test2.6", joba1.get("param6"));
+    assertEquals("test2.7", joba1.get("param7"));
+    assertEquals("test2.8", joba1.get("param8"));
 
     // 2. JOB A COMPLETES SUCCESSFULLY
     InteractiveTestJob.getTestJob("joba").succeedJob(
         Props.of("output.joba", "joba", "output.override", "joba"));
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
-
-    final ExecutableNode node = nodeMap.get("jobb");
-    Assert.assertEquals(Status.RUNNING, node.getStatus());
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("joba1", Status.RUNNING);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+
+    final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNodePath("jobb");
+    assertEquals(Status.RUNNING, node.getStatus());
     final Props jobb = node.getInputProps();
-    Assert.assertEquals("override.4", jobb.get("param4"));
+    assertEquals("override.4", jobb.get("param4"));
     // Test that jobb properties overwrites the output properties
-    Assert.assertEquals("moo", jobb.get("testprops"));
-    Assert.assertEquals("jobb", jobb.get("output.override"));
-    Assert.assertEquals("joba", jobb.get("output.joba"));
-
-    final Props jobbInnerJobA = nodeMap.get("jobb:innerJobA").getInputProps();
-    Assert.assertEquals("test1.1", jobbInnerJobA.get("param1"));
-    Assert.assertEquals("test1.2", jobbInnerJobA.get("param2"));
-    Assert.assertEquals("test1.3", jobbInnerJobA.get("param3"));
-    Assert.assertEquals("override.4", jobbInnerJobA.get("param4"));
-    Assert.assertEquals("test2.5", jobbInnerJobA.get("param5"));
-    Assert.assertEquals("test2.6", jobbInnerJobA.get("param6"));
-    Assert.assertEquals("test2.7", jobbInnerJobA.get("param7"));
-    Assert.assertEquals("test2.8", jobbInnerJobA.get("param8"));
-    Assert.assertEquals("joba", jobbInnerJobA.get("output.joba"));
+    assertEquals("moo", jobb.get("testprops"));
+    assertEquals("jobb", jobb.get("output.override"));
+    assertEquals("joba", jobb.get("output.joba"));
+
+    final Props jobbInnerJobA = this.runner.getExecutableFlow()
+        .getExecutableNodePath("jobb:innerJobA")
+        .getInputProps();
+    assertEquals("test1.1", jobbInnerJobA.get("param1"));
+    assertEquals("test1.2", jobbInnerJobA.get("param2"));
+    assertEquals("test1.3", jobbInnerJobA.get("param3"));
+    assertEquals("override.4", jobbInnerJobA.get("param4"));
+    assertEquals("test2.5", jobbInnerJobA.get("param5"));
+    assertEquals("test2.6", jobbInnerJobA.get("param6"));
+    assertEquals("test2.7", jobbInnerJobA.get("param7"));
+    assertEquals("test2.8", jobbInnerJobA.get("param8"));
+    assertEquals("joba", jobbInnerJobA.get("output.joba"));
 
     // 3. jobb:Inner completes
     /// innerJobA completes
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob(
         Props.of("output.jobb.innerJobA", "jobb.innerJobA"));
-    pause(250);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
-    final Props jobbInnerJobB = nodeMap.get("jobb:innerJobB").getInputProps();
-    Assert.assertEquals("test1.1", jobbInnerJobB.get("param1"));
-    Assert.assertEquals("override.4", jobbInnerJobB.get("param4"));
-    Assert.assertEquals("jobb.innerJobA",
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobB", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.RUNNING);
+    final Props jobbInnerJobB = this.runner.getExecutableFlow()
+        .getExecutableNodePath("jobb:innerJobB")
+        .getInputProps();
+    assertEquals("test1.1", jobbInnerJobB.get("param1"));
+    assertEquals("override.4", jobbInnerJobB.get("param4"));
+    assertEquals("jobb.innerJobA",
         jobbInnerJobB.get("output.jobb.innerJobA"));
-    Assert.assertEquals("moo", jobbInnerJobB.get("testprops"));
+    assertEquals("moo", jobbInnerJobB.get("testprops"));
     /// innerJobB, C completes
     InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob(
         Props.of("output.jobb.innerJobB", "jobb.innerJobB"));
     InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob(
         Props.of("output.jobb.innerJobC", "jobb.innerJobC"));
-    pause(250);
-    expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerFlow", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
-
-    final Props jobbInnerJobD = nodeMap.get("jobb:innerFlow").getInputProps();
-    Assert.assertEquals("test1.1", jobbInnerJobD.get("param1"));
-    Assert.assertEquals("override.4", jobbInnerJobD.get("param4"));
-    Assert.assertEquals("jobb.innerJobB",
+    assertStatus("jobb:innerJobB", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+    assertStatus("jobb:innerFlow", Status.RUNNING);
+
+    final Props jobbInnerJobD = this.runner.getExecutableFlow()
+        .getExecutableNodePath("jobb:innerFlow")
+        .getInputProps();
+    assertEquals("test1.1", jobbInnerJobD.get("param1"));
+    assertEquals("override.4", jobbInnerJobD.get("param4"));
+    assertEquals("jobb.innerJobB",
         jobbInnerJobD.get("output.jobb.innerJobB"));
-    Assert.assertEquals("jobb.innerJobC",
+    assertEquals("jobb.innerJobC",
         jobbInnerJobD.get("output.jobb.innerJobC"));
 
     // 4. Finish up on inner flow for jobb
     InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob(
         Props.of("output1.jobb", "test1", "output2.jobb", "test2"));
-    pause(250);
-    expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
-    expectedStateMap.put("jobb", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
-    final Props jobbOutput = nodeMap.get("jobb").getOutputProps();
-    Assert.assertEquals("test1", jobbOutput.get("output1.jobb"));
-    Assert.assertEquals("test2", jobbOutput.get("output2.jobb"));
+    assertStatus("jobb:innerFlow", Status.SUCCEEDED);
+    assertStatus("jobb", Status.SUCCEEDED);
+    final Props jobbOutput = this.runner.getExecutableFlow().getExecutableNodePath("jobb")
+        .getOutputProps();
+    assertEquals("test1", jobbOutput.get("output1.jobb"));
+    assertEquals("test2", jobbOutput.get("output2.jobb"));
 
     // 5. Finish jobc, jobd
     InteractiveTestJob.getTestJob("jobc").succeedJob(
         Props.of("output.jobc", "jobc"));
-    pause(250);
-    expectedStateMap.put("jobc", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobc", Status.SUCCEEDED);
     InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
-    pause(250);
     InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
-    expectedStateMap.put("jobd", Status.SUCCEEDED);
-    expectedStateMap.put("jobe", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+    assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
+    assertStatus("jobd", Status.SUCCEEDED);
+    assertStatus("jobe", Status.RUNNING);
 
-    final Props jobd = nodeMap.get("jobe").getInputProps();
-    Assert.assertEquals("test1", jobd.get("output1.jobb"));
-    Assert.assertEquals("jobc", jobd.get("output.jobc"));
+    final Props jobd = this.runner.getExecutableFlow().getExecutableNodePath("jobe")
+        .getInputProps();
+    assertEquals("test1", jobd.get("output1.jobb"));
+    assertEquals("jobc", jobd.get("output.jobc"));
 
     // 6. Finish off flow
     InteractiveTestJob.getTestJob("joba1").succeedJob();
-    pause(250);
     InteractiveTestJob.getTestJob("jobe").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobe", Status.SUCCEEDED);
-    expectedStateMap.put("jobf", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobe", Status.SUCCEEDED);
+    assertStatus("jobf", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobf").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobf", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
-
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobf", Status.SUCCEEDED);
+    assertFlowStatus(Status.SUCCEEDED);
   }
 
   /**
-   * Tests a flow with Disabled jobs and flows. They should properly SKIP
-   * executions
+   * Tests a flow with Disabled jobs and flows. They should properly SKIP executions
    */
-  @Ignore
   @Test
   public void testDisabledNormal() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+    this.runner = createFlowRunner(eventCollector, "jobf");
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
     flow.getExecutableNode("jobb").setStatus(Status.DISABLED);
     ((ExecutableFlowBase) flow.getExecutableNode("jobd")).getExecutableNode(
         "innerJobA").setStatus(Status.DISABLED);
 
     // 1. START FLOW
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    expectedStateMap.put("jobb", Status.SKIPPED);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.SKIPPED);
-    expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.READY);
-    expectedStateMap.put("jobb:innerJobB", Status.READY);
-    expectedStateMap.put("jobb:innerJobC", Status.READY);
-    expectedStateMap.put("jobb:innerFlow", Status.READY);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("joba1", Status.RUNNING);
+    assertStatus("jobb", Status.SKIPPED);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.SKIPPED);
+    assertStatus("jobd:innerFlow2", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.READY);
+    assertStatus("jobb:innerJobB", Status.READY);
+    assertStatus("jobb:innerJobC", Status.READY);
+    assertStatus("jobb:innerFlow", Status.READY);
 
     // 3. jobb:Inner completes
     /// innerJobA completes
     InteractiveTestJob.getTestJob("jobc").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
-    expectedStateMap.put("jobd", Status.SUCCEEDED);
-    expectedStateMap.put("jobc", Status.SUCCEEDED);
-    expectedStateMap.put("jobe", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
+    assertStatus("jobd", Status.SUCCEEDED);
+    assertStatus("jobc", Status.SUCCEEDED);
+    assertStatus("jobe", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobe").succeedJob();
     InteractiveTestJob.getTestJob("joba1").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobe", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobf", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobe", Status.SUCCEEDED);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobf", Status.RUNNING);
 
     // 4. Finish up on inner flow for jobb
     InteractiveTestJob.getTestJob("jobf").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobf", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobf", Status.SUCCEEDED);
 
-    Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
-    Assert.assertFalse(thread.isAlive());
+    assertFlowStatus(Status.SUCCEEDED);
+    assertThreadShutDown();
   }
 
   /**
-   * Tests a failure with the default FINISH_CURRENTLY_RUNNING.
-   * After the first failure, every job that started should complete, and the
-   * rest of the jobs should be skipped.
+   * Tests a failure with the default FINISH_CURRENTLY_RUNNING. After the first failure, every job
+   * that started should complete, and the rest of the jobs should be skipped.
    */
-  @Ignore
   @Test
   public void testNormalFailure1() throws Exception {
     // Test propagation of KILLED status to embedded flows.
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+    this.runner = createFlowRunner(eventCollector, "jobf");
 
     // 1. START FLOW
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
     InteractiveTestJob.getTestJob("joba").failJob();
-    pause(250);
-    Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
-    expectedStateMap.put("joba", Status.FAILED);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    expectedStateMap.put("jobb", Status.CANCELLED);
-    expectedStateMap.put("jobc", Status.CANCELLED);
-    expectedStateMap.put("jobd", Status.CANCELLED);
-    expectedStateMap.put("jobd:innerJobA", Status.READY);
-    expectedStateMap.put("jobd:innerFlow2", Status.READY);
-    expectedStateMap.put("jobb:innerJobA", Status.READY);
-    expectedStateMap.put("jobb:innerFlow", Status.READY);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-    compareStates(expectedStateMap, nodeMap);
+    assertFlowStatus(Status.FAILED_FINISHING);
+    assertStatus("joba", Status.FAILED);
+    assertStatus("joba1", Status.RUNNING);
+    assertStatus("jobb", Status.CANCELLED);
+    assertStatus("jobc", Status.CANCELLED);
+    assertStatus("jobd", Status.CANCELLED);
+    assertStatus("jobd:innerJobA", Status.READY);
+    assertStatus("jobd:innerFlow2", Status.READY);
+    assertStatus("jobb:innerJobA", Status.READY);
+    assertStatus("jobb:innerFlow", Status.READY);
+    assertStatus("jobe", Status.CANCELLED);
 
     // 3. jobb:Inner completes
     /// innerJobA completes
     InteractiveTestJob.getTestJob("joba1").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-    Assert.assertEquals(Status.FAILED, flow.getStatus());
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobf", Status.CANCELLED);
+    assertFlowStatus(Status.FAILED);
+    assertThreadShutDown();
   }
 
   /**
    * Test #2 on the default failure case.
    */
-  @Ignore
   @Test
   public void testNormalFailure2() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+    this.runner = createFlowRunner(eventCollector, "jobf");
 
     // 1. START FLOW
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("joba1").failJob();
-    pause(250);
-    expectedStateMap.put("joba1", Status.FAILED);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba1", Status.FAILED);
 
     // 3. joba completes, everything is killed
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
-    expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
-    expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
-    expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
-    expectedStateMap.put("jobb", Status.KILLED);
-    expectedStateMap.put("jobd", Status.KILLED);
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobB", Status.CANCELLED);
+    assertStatus("jobb:innerJobC", Status.CANCELLED);
+    assertStatus("jobb:innerFlow", Status.CANCELLED);
+    assertStatus("jobd:innerFlow2", Status.CANCELLED);
+    assertStatus("jobb", Status.KILLED);
+    assertStatus("jobd", Status.KILLED);
+    assertFlowStatus(Status.FAILED_FINISHING);
 
     InteractiveTestJob.getTestJob("jobc").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobc", Status.SUCCEEDED);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(Status.FAILED, flow.getStatus());
-
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobc", Status.SUCCEEDED);
+    assertStatus("jobe", Status.CANCELLED);
+    assertStatus("jobf", Status.CANCELLED);
+    assertFlowStatus(Status.FAILED);
+    assertThreadShutDown();
   }
 
-  @Ignore
   @Test
   public void testNormalFailure3() throws Exception {
     // Test propagation of CANCELLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+    this.runner = createFlowRunner(eventCollector, "jobf");
 
     // 1. START FLOW
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB in subflow FAILS
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("joba1").succeedJob();
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobB", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
-    pause(250);
-    expectedStateMap.put("jobb", Status.FAILED_FINISHING);
-    expectedStateMap.put("jobb:innerJobB", Status.FAILED);
-    Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb", Status.FAILED_FINISHING);
+    assertStatus("jobb:innerJobB", Status.FAILED);
+    assertFlowStatus(Status.FAILED_FINISHING);
 
     InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
-    expectedStateMap.put("jobd", Status.KILLED);
-    expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
-    expectedStateMap.put("jobb", Status.FAILED);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+    assertStatus("jobd:innerFlow2", Status.CANCELLED);
+    assertStatus("jobd", Status.KILLED);
+    assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+    assertStatus("jobb:innerFlow", Status.CANCELLED);
+    assertStatus("jobb", Status.FAILED);
 
     // 3. jobc completes, everything is killed
     InteractiveTestJob.getTestJob("jobc").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobc", Status.SUCCEEDED);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(Status.FAILED, flow.getStatus());
-
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobc", Status.SUCCEEDED);
+    assertStatus("jobe", Status.CANCELLED);
+    assertStatus("jobf", Status.CANCELLED);
+    assertFlowStatus(Status.FAILED);
+    assertThreadShutDown();
   }
 
   /**
-   * Tests failures when the fail behaviour is FINISH_ALL_POSSIBLE.
-   * In this case, all jobs which have had its pre-requisite met can continue
-   * to run. Finishes when the failure is propagated to the last node of the
-   * flow.
+   * Tests failures when the fail behaviour is FINISH_ALL_POSSIBLE. In this case, all jobs which
+   * have had its pre-requisite met can continue to run. Finishes when the failure is propagated to
+   * the last node of the flow.
    */
-  @Ignore
   @Test
   public void testFailedFinishingFailure3() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+    this.runner = createFlowRunner(eventCollector, "jobf",
         FailureAction.FINISH_ALL_POSSIBLE);
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
 
     // 1. START FLOW
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB in subflow FAILS
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("joba1").succeedJob();
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobB", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
-    pause(250);
-    expectedStateMap.put("jobb", Status.FAILED_FINISHING);
-    expectedStateMap.put("jobb:innerJobB", Status.FAILED);
-    Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb", Status.FAILED_FINISHING);
+    assertStatus("jobb:innerJobB", Status.FAILED);
+    assertFlowStatus(Status.FAILED_FINISHING);
 
     InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb", Status.FAILED);
-    expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb", Status.FAILED);
+    assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+    assertStatus("jobd:innerFlow2", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+    assertStatus("jobb:innerFlow", Status.CANCELLED);
 
     InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
-    expectedStateMap.put("jobd", Status.SUCCEEDED);
+    assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
+    assertStatus("jobd", Status.SUCCEEDED);
 
     // 3. jobc completes, everything is killed
     InteractiveTestJob.getTestJob("jobc").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobc", Status.SUCCEEDED);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(Status.FAILED, flow.getStatus());
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobc", Status.SUCCEEDED);
+    assertStatus("jobe", Status.CANCELLED);
+    assertStatus("jobf", Status.CANCELLED);
+    assertFlowStatus(Status.FAILED);
+    assertThreadShutDown();
   }
 
   /**
-   * Tests the failure condition when a failure invokes a cancel (or killed)
-   * on the flow.
+   * Tests the failure condition when a failure invokes a cancel (or killed) on the flow.
    *
-   * Any jobs that are running will be assigned a KILLED state, and any nodes
-   * which were skipped due to prior errors will be given a CANCELLED state.
+   * Any jobs that are running will be assigned a KILLED state, and any nodes which were skipped due
+   * to prior errors will be given a CANCELLED state.
    */
-  @Ignore
   @Test
   public void testCancelOnFailure() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+    this.runner = createFlowRunner(eventCollector, "jobf",
         FailureAction.CANCEL_ALL);
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
 
     // 1. START FLOW
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB in subflow FAILS
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("joba1").succeedJob();
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobB", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
-    pause(250);
-    expectedStateMap.put("jobb", Status.FAILED);
-    expectedStateMap.put("jobb:innerJobB", Status.FAILED);
-    expectedStateMap.put("jobb:innerJobC", Status.KILLED);
-    expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
-    expectedStateMap.put("jobc", Status.KILLED);
-    expectedStateMap.put("jobd", Status.KILLED);
-    expectedStateMap.put("jobd:innerJobA", Status.KILLED);
-    expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-    compareStates(expectedStateMap, nodeMap);
-
-    Assert.assertFalse(thread.isAlive());
-    Assert.assertEquals(Status.FAILED, flow.getStatus());
-
+    assertStatus("jobb", Status.FAILED);
+    assertStatus("jobb:innerJobB", Status.FAILED);
+    assertStatus("jobb:innerJobC", Status.KILLED);
+    assertStatus("jobb:innerFlow", Status.CANCELLED);
+    assertStatus("jobc", Status.KILLED);
+    assertStatus("jobd", Status.KILLED);
+    assertStatus("jobd:innerJobA", Status.KILLED);
+    assertStatus("jobd:innerFlow2", Status.CANCELLED);
+    assertStatus("jobe", Status.CANCELLED);
+    assertStatus("jobf", Status.CANCELLED);
+
+    assertThreadShutDown();
+    assertFlowStatus(Status.KILLED);
   }
 
   /**
    * Tests retries after a failure
    */
-  @Ignore
   @Test
   public void testRetryOnFailure() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+    this.runner = createFlowRunner(eventCollector, "jobf");
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
     flow.getExecutableNode("joba").setStatus(Status.DISABLED);
     ((ExecutableFlowBase) flow.getExecutableNode("jobb")).getExecutableNode(
         "innerFlow").setStatus(Status.DISABLED);
 
     // 1. START FLOW
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
-    // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.SKIPPED);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.SKIPPED);
+    assertStatus("joba1", Status.RUNNING);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobB", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
     InteractiveTestJob.getTestJob("jobb:innerJobC").failJob();
-    pause(250);
+    assertStatus("jobb:innerJobB", Status.FAILED);
+    assertStatus("jobb:innerJobC", Status.FAILED);
+    assertStatus("jobb", Status.FAILED);
+    assertStatus("jobb:innerFlow", Status.SKIPPED);
     InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb", Status.FAILED);
-    expectedStateMap.put("jobb:innerJobB", Status.FAILED);
-    expectedStateMap.put("jobb:innerJobC", Status.FAILED);
-    expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
-    expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
-    expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("jobd", Status.KILLED);
-    Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
-    compareStates(expectedStateMap, nodeMap);
-
-    final ExecutableNode node = nodeMap.get("jobd:innerFlow2");
+    assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+    assertStatus("jobd:innerFlow2", Status.CANCELLED);
+    assertStatus("jobd", Status.KILLED);
+    assertFlowStatus(Status.FAILED_FINISHING);
+
+    final ExecutableNode node = this.runner.getExecutableFlow()
+        .getExecutableNodePath("jobd:innerFlow2");
     final ExecutableFlowBase base = node.getParentFlow();
     for (final String nodeId : node.getInNodes()) {
       final ExecutableNode inNode = base.getExecutableNode(nodeId);
       System.out.println(inNode.getId() + " > " + inNode.getStatus());
     }
 
-    runner.retryFailures("me");
-    pause(500);
-    expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobb:innerFlow", Status.DISABLED);
-    expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
-    Assert.assertEquals(Status.RUNNING, flow.getStatus());
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertTrue(thread.isAlive());
+    this.runner.retryFailures("me");
+    assertStatus("jobb:innerJobB", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.RUNNING);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobb:innerFlow", Status.DISABLED);
+    assertStatus("jobd:innerFlow2", Status.RUNNING);
+    assertFlowStatus(Status.RUNNING);
+    assertThreadRunning();
 
     InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
     InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
     InteractiveTestJob.getTestJob("jobc").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
-    expectedStateMap.put("jobb", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-    expectedStateMap.put("jobc", Status.SUCCEEDED);
-    expectedStateMap.put("jobd", Status.SUCCEEDED);
-    expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
-    expectedStateMap.put("jobe", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb:innerFlow", Status.SKIPPED);
+    assertStatus("jobb", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobB", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+    assertStatus("jobc", Status.SUCCEEDED);
+    assertStatus("jobd", Status.SUCCEEDED);
+    assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
+    assertStatus("jobe", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobe").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobe", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobe", Status.SUCCEEDED);
 
     InteractiveTestJob.getTestJob("joba1").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobf", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobf", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobf").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobf", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobf", Status.SUCCEEDED);
+    assertFlowStatus(Status.SUCCEEDED);
+    assertThreadShutDown();
   }
 
   /**
-   * Tests the manual Killing of a flow. In this case, the flow is just fine
-   * before the cancel
-   * is called.
+   * Tests the manual Killing of a flow. In this case, the flow is just fine before the cancel is
+   * called.
    */
-  @Ignore
   @Test
   public void testCancel() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+    this.runner = createFlowRunner(eventCollector, "jobf",
         FailureAction.CANCEL_ALL);
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
 
     // 1. START FLOW
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(1000);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB in subflow FAILS
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("joba1").succeedJob();
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
-
-    runner.kill("me");
-    pause(250);
-
-    expectedStateMap.put("jobb", Status.KILLED);
-    expectedStateMap.put("jobb:innerJobB", Status.KILLED);
-    expectedStateMap.put("jobb:innerJobC", Status.KILLED);
-    expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
-    expectedStateMap.put("jobc", Status.KILLED);
-    expectedStateMap.put("jobd", Status.KILLED);
-    expectedStateMap.put("jobd:innerJobA", Status.KILLED);
-    expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-
-    Assert.assertEquals(Status.KILLED, flow.getStatus());
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobB", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.RUNNING);
+
+    this.runner.kill("me");
+
+    assertStatus("jobb", Status.KILLED);
+    assertStatus("jobb:innerJobB", Status.KILLED);
+    assertStatus("jobb:innerJobC", Status.KILLED);
+    assertStatus("jobb:innerFlow", Status.CANCELLED);
+    assertStatus("jobc", Status.KILLED);
+    assertStatus("jobd", Status.KILLED);
+    assertStatus("jobd:innerJobA", Status.KILLED);
+    assertStatus("jobd:innerFlow2", Status.CANCELLED);
+    assertStatus("jobe", Status.CANCELLED);
+    assertStatus("jobf", Status.CANCELLED);
+
+    assertFlowStatus(Status.KILLED);
+    assertThreadShutDown();
   }
 
   /**
    * Tests the manual invocation of cancel on a flow that is FAILED_FINISHING
    */
-  @Ignore
   @Test
   public void testManualCancelOnFailure() throws Exception {
     // Test propagation of KILLED status to embedded flows different branch
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+    this.runner = createFlowRunner(eventCollector, "jobf");
 
     // 1. START FLOW
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB in subflow FAILS
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("joba1").succeedJob();
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobB", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
-    pause(250);
-    expectedStateMap.put("jobb:innerJobB", Status.FAILED);
-    expectedStateMap.put("jobb", Status.FAILED_FINISHING);
-    Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
-    compareStates(expectedStateMap, nodeMap);
-
-    runner.kill("me");
-    pause(1000);
-
-    expectedStateMap.put("jobb", Status.FAILED);
-    expectedStateMap.put("jobb:innerJobC", Status.KILLED);
-    expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
-    expectedStateMap.put("jobc", Status.KILLED);
-    expectedStateMap.put("jobd", Status.KILLED);
-    expectedStateMap.put("jobd:innerJobA", Status.KILLED);
-    expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-
-    Assert.assertEquals(Status.KILLED, flow.getStatus());
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobb:innerJobB", Status.FAILED);
+    assertStatus("jobb", Status.FAILED_FINISHING);
+    assertFlowStatus(Status.FAILED_FINISHING);
+
+    this.runner.kill("me");
+
+    assertStatus("jobb", Status.FAILED);
+    assertStatus("jobb:innerJobC", Status.KILLED);
+    assertStatus("jobb:innerFlow", Status.CANCELLED);
+    assertStatus("jobc", Status.KILLED);
+    assertStatus("jobd", Status.KILLED);
+    assertStatus("jobd:innerJobA", Status.KILLED);
+    assertStatus("jobd:innerFlow2", Status.CANCELLED);
+    assertStatus("jobe", Status.CANCELLED);
+    assertStatus("jobf", Status.CANCELLED);
+
+    assertFlowStatus(Status.KILLED);
+    assertThreadShutDown();
   }
 
   /**
    * Tests that pause and resume work
    */
-  @Ignore
   @Test
   public void testPause() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+    this.runner = createFlowRunner(eventCollector, "jobf");
 
     // 1. START FLOW
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
-    runner.pause("test");
+    this.runner.pause("test");
     InteractiveTestJob.getTestJob("joba").succeedJob();
     // 2.1 JOB A COMPLETES SUCCESSFULLY AFTER PAUSE
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertFlowStatus(Status.PAUSED);
 
     // 2.2 Flow is unpaused
-    runner.resume("test");
-    pause(250);
-    Assert.assertEquals(flow.getStatus(), Status.RUNNING);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    this.runner.resume("test");
+    assertFlowStatus(Status.RUNNING);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("joba1", Status.RUNNING);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
 
     // 3. jobb:Inner completes
-    runner.pause("test");
+    this.runner.pause("test");
 
     /// innerJobA completes, but paused
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob(
         Props.of("output.jobb.innerJobA", "jobb.innerJobA"));
-    pause(250);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
 
-    runner.resume("test");
-    pause(250);
-    expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    this.runner.resume("test");
+    assertStatus("jobb:innerJobB", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.RUNNING);
 
     /// innerJobB, C completes
     InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob(
         Props.of("output.jobb.innerJobB", "jobb.innerJobB"));
     InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob(
         Props.of("output.jobb.innerJobC", "jobb.innerJobC"));
-    pause(250);
-    expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerFlow", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb:innerJobB", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+    assertStatus("jobb:innerFlow", Status.RUNNING);
 
     // 4. Finish up on inner flow for jobb
     InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob(
         Props.of("output1.jobb", "test1", "output2.jobb", "test2"));
-    pause(250);
-    expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
-    expectedStateMap.put("jobb", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobb:innerFlow", Status.SUCCEEDED);
+    assertStatus("jobb", Status.SUCCEEDED);
 
     // 5. Finish jobc, jobd
     InteractiveTestJob.getTestJob("jobc").succeedJob(
         Props.of("output.jobc", "jobc"));
-    pause(250);
-    expectedStateMap.put("jobc", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobc", Status.SUCCEEDED);
     InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
-    pause(250);
     InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
-    expectedStateMap.put("jobd", Status.SUCCEEDED);
-    expectedStateMap.put("jobe", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+    assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
+    assertStatus("jobd", Status.SUCCEEDED);
+    assertStatus("jobe", Status.RUNNING);
 
     // 6. Finish off flow
     InteractiveTestJob.getTestJob("joba1").succeedJob();
-    pause(250);
     InteractiveTestJob.getTestJob("jobe").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobe", Status.SUCCEEDED);
-    expectedStateMap.put("jobf", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobe", Status.SUCCEEDED);
+    assertStatus("jobf", Status.RUNNING);
 
     InteractiveTestJob.getTestJob("jobf").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobf", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
-
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobf", Status.SUCCEEDED);
+    assertFlowStatus(Status.SUCCEEDED);
+    assertThreadShutDown();
   }
 
   /**
-   * Test the condition for a manual invocation of a KILL (cancel) on a flow
-   * that has been paused. The flow should unpause and be killed immediately.
+   * Test the condition for a manual invocation of a KILL (cancel) on a flow that has been paused.
+   * The flow should unpause and be killed immediately.
    */
-  @Ignore
   @Test
   public void testPauseKill() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+    this.runner = createFlowRunner(eventCollector, "jobf");
 
     // 1. START FLOW
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB A COMPLETES SUCCESSFULLY
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
-
-    runner.pause("me");
-    pause(250);
-    Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("joba1", Status.RUNNING);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+
+    this.runner.pause("me");
+    assertFlowStatus(Status.PAUSED);
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
-
-    runner.kill("me");
-    pause(250);
-    expectedStateMap.put("joba1", Status.KILLED);
-    expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
-    expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
-    expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
-    expectedStateMap.put("jobb", Status.KILLED);
-    expectedStateMap.put("jobc", Status.KILLED);
-    expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
-    expectedStateMap.put("jobd", Status.KILLED);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(Status.KILLED, flow.getStatus());
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+    assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+
+    this.runner.kill("me");
+    assertStatus("joba1", Status.KILLED);
+    assertStatus("jobb:innerJobB", Status.CANCELLED);
+    assertStatus("jobb:innerJobC", Status.CANCELLED);
+    assertStatus("jobb:innerFlow", Status.CANCELLED);
+    assertStatus("jobb", Status.KILLED);
+    assertStatus("jobc", Status.KILLED);
+    assertStatus("jobd:innerFlow2", Status.CANCELLED);
+    assertStatus("jobd", Status.KILLED);
+    assertStatus("jobe", Status.CANCELLED);
+    assertStatus("jobf", Status.CANCELLED);
+
+    assertFlowStatus(Status.KILLED);
+    assertThreadShutDown();
   }
 
   /**
-   * Tests the case where a failure occurs on a Paused flow. In this case, the
-   * flow should stay paused.
+   * Tests the case where a failure occurs on a Paused flow. In this case, the flow should stay
+   * paused.
    */
-  @Ignore
   @Test
   public void testPauseFail() throws Exception {
-    final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+    this.eventCollector = new EventCollectorListener();
+    this.runner = createFlowRunner(this.eventCollector, "jobf",
         FailureAction.FINISH_CURRENTLY_RUNNING);
 
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
-
     // 1. START FLOW
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB A COMPLETES SUCCESSFULLY
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
-
-    runner.pause("me");
-    pause(250);
-    Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("joba1", Status.RUNNING);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+
+    this.runner.pause("me");
+    assertFlowStatus(Status.PAUSED);
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
-    pause(250);
-    expectedStateMap.put("jobd:innerJobA", Status.FAILED);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(flow.getStatus(), Status.PAUSED);
-
-    runner.resume("me");
-    pause(250);
-    expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
-    expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
-    expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
-    expectedStateMap.put("jobb", Status.KILLED);
-    expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
-    expectedStateMap.put("jobd", Status.FAILED);
+    assertStatus("jobd:innerJobA", Status.FAILED);
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+    // When flow is paused, no new jobs are started. So these two jobs that were already running
+    // are allowed to finish, but their dependencies aren't started.
+    // Now, ensure that jobd:innerJobA has completely finished as failed before resuming.
+    // If we would resume before the job failure has been completely processed, FlowRunner would be
+    // able to start some new jobs instead of cancelling everything.
+    waitEventFired("jobd:innerJobA", Status.FAILED);
+    assertFlowStatus(Status.PAUSED);
+
+    this.runner.resume("me");
+    assertStatus("jobb:innerJobB", Status.CANCELLED);
+    assertStatus("jobb:innerJobC", Status.CANCELLED);
+    assertStatus("jobb:innerFlow", Status.CANCELLED);
+    assertStatus("jobb", Status.KILLED);
+    assertStatus("jobd:innerFlow2", Status.CANCELLED);
+    assertStatus("jobd", Status.FAILED);
 
     InteractiveTestJob.getTestJob("jobc").succeedJob();
     InteractiveTestJob.getTestJob("joba1").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobc", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(Status.FAILED, flow.getStatus());
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobc", Status.SUCCEEDED);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobf", Status.CANCELLED);
+    assertStatus("jobe", Status.CANCELLED);
+
+    assertFlowStatus(Status.FAILED);
+    assertThreadShutDown();
   }
 
   /**
-   * Test the condition when a Finish all possible is called during a pause.
-   * The Failure is not acted upon until the flow is resumed.
+   * Test the condition when a Finish all possible is called during a pause. The Failure is not
+   * acted upon until the flow is resumed.
    */
-  @Ignore
   @Test
   public void testPauseFailFinishAll() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+    this.runner = createFlowRunner(eventCollector, "jobf",
         FailureAction.FINISH_ALL_POSSIBLE);
 
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
-
     // 1. START FLOW
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
 
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB A COMPLETES SUCCESSFULLY
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(250);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
-
-    runner.pause("me");
-    pause(250);
-    Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("joba1", Status.RUNNING);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+
+    this.runner.pause("me");
+    assertFlowStatus(Status.PAUSED);
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
-    pause(250);
-    expectedStateMap.put("jobd:innerJobA", Status.FAILED);
-    expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
-    compareStates(expectedStateMap, nodeMap);
-
-    runner.resume("me");
-    pause(250);
-    expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
-    expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
-    expectedStateMap.put("jobd", Status.FAILED);
+    assertStatus("jobd:innerJobA", Status.FAILED);
+    assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+
+    this.runner.resume("me");
+    assertStatus("jobb:innerJobB", Status.RUNNING);
+    assertStatus("jobb:innerJobC", Status.RUNNING);
+    assertStatus("jobd:innerFlow2", Status.CANCELLED);
+    assertStatus("jobd", Status.FAILED);
 
     InteractiveTestJob.getTestJob("jobc").succeedJob();
     InteractiveTestJob.getTestJob("joba1").succeedJob();
     InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
     InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
-    pause(250);
     InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob();
-    pause(250);
-    expectedStateMap.put("jobc", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
-    expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
-    expectedStateMap.put("jobb", Status.SUCCEEDED);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-
-    compareStates(expectedStateMap, nodeMap);
-    Assert.assertEquals(Status.FAILED, flow.getStatus());
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobc", Status.SUCCEEDED);
+    assertStatus("joba1", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobB", Status.SUCCEEDED);
+    assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+    assertStatus("jobb:innerFlow", Status.SUCCEEDED);
+    assertStatus("jobb", Status.SUCCEEDED);
+    assertStatus("jobe", Status.CANCELLED);
+    assertStatus("jobf", Status.CANCELLED);
+
+    assertFlowStatus(Status.FAILED);
+    assertThreadShutDown();
   }
 
   /**
-   * Tests the case when a flow is paused and a failure causes a kill. The
-   * flow should die immediately regardless of the 'paused' status.
+   * Tests the case when a flow is paused and a failure causes a kill. The flow should die
+   * immediately regardless of the 'paused' status.
    */
-  @Ignore
   @Test
   public void testPauseFailKill() throws Exception {
     final EventCollectorListener eventCollector = new EventCollectorListener();
-    final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+    this.runner = createFlowRunner(eventCollector, "jobf",
         FailureAction.CANCEL_ALL);
 
-    final Map<String, Status> expectedStateMap = new HashMap<>();
-    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
-
     // 1. START FLOW
-    final ExecutableFlow flow = runner.getExecutableFlow();
-    createExpectedStateMap(flow, expectedStateMap, nodeMap);
-    final Thread thread = runFlowRunnerInThread(runner);
-    pause(250);
+    runFlowRunnerInThread(this.runner);
     // After it starts up, only joba should be running
-    expectedStateMap.put("joba", Status.RUNNING);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
 
     // 2. JOB A COMPLETES SUCCESSFULLY
     InteractiveTestJob.getTestJob("joba").succeedJob();
-    pause(500);
-    expectedStateMap.put("joba", Status.SUCCEEDED);
-    expectedStateMap.put("joba1", Status.RUNNING);
-    expectedStateMap.put("jobb", Status.RUNNING);
-    expectedStateMap.put("jobc", Status.RUNNING);
-    expectedStateMap.put("jobd", Status.RUNNING);
-    expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
-    expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-    compareStates(expectedStateMap, nodeMap);
-
-    runner.pause("me");
-    pause(250);
-    Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+    assertStatus("joba", Status.SUCCEEDED);
+    assertStatus("joba1", Status.RUNNING);
+    assertStatus("jobb", Status.RUNNING);
+    assertStatus("jobc", Status.RUNNING);
+    assertStatus("jobd", Status.RUNNING);
+    assertStatus("jobd:innerJobA", Status.RUNNING);
+    assertStatus("jobb:innerJobA", Status.RUNNING);
+
+    this.runner.pause("me");
+    assertFlowStatus(Status.PAUSED);
     InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
-    pause(250);
-    expectedStateMap.put("jobd:innerJobA", Status.FAILED);
-    expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
-    expectedStateMap.put("jobd", Status.FAILED);
-    expectedStateMap.put("jobb:innerJobA", Status.KILLED);
-    expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
-    expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
-    expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
-    expectedStateMap.put("jobb", Status.KILLED);
-    expectedStateMap.put("jobc", Status.KILLED);
-    expectedStateMap.put("jobe", Status.CANCELLED);
-    expectedStateMap.put("jobf", Status.CANCELLED);
-    expectedStateMap.put("joba1", Status.KILLED);
-    compareStates(expectedStateMap, nodeMap);
-
-    Assert.assertEquals(Status.FAILED, flow.getStatus());
-    Assert.assertFalse(thread.isAlive());
+    assertStatus("jobd:innerJobA", Status.FAILED);
+    assertStatus("jobd:innerFlow2", Status.CANCELLED);
+    assertStatus("jobd", Status.FAILED);
+    assertStatus("jobb:innerJobA", Status.KILLED);
+    assertStatus("jobb:innerJobB", Status.CANCELLED);
+    assertStatus("jobb:innerJobC", Status.CANCELLED);
+    assertStatus("jobb:innerFlow", Status.CANCELLED);
+    assertStatus("jobb", Status.KILLED);
+    assertStatus("jobc", Status.KILLED);
+    assertStatus("jobe", Status.CANCELLED);
+    assertStatus("jobf", Status.CANCELLED);
+    assertStatus("joba1", Status.KILLED);
+
+    assertFlowStatus(Status.KILLED);
+    assertThreadShutDown();
   }
 
-
   private Thread runFlowRunnerInThread(final FlowRunner runner) {
     final Thread thread = new Thread(runner);
     thread.start();
     return thread;
   }
 
-  private void pause(final long millisec) {
+  private void sleep(final long millisec) {
     try {
       Thread.sleep(millisec);
     } catch (final InterruptedException e) {
     }
   }
 
-  private void createExpectedStateMap(final ExecutableFlowBase flow,
-      final Map<String, Status> expectedStateMap,
-      final Map<String, ExecutableNode> nodeMap) {
-    for (final ExecutableNode node : flow.getExecutableNodes()) {
-      expectedStateMap.put(node.getNestedId(), node.getStatus());
-      nodeMap.put(node.getNestedId(), node);
-      if (node instanceof ExecutableFlowBase) {
-        createExpectedStateMap((ExecutableFlowBase) node, expectedStateMap,
-            nodeMap);
-      }
-    }
-  }
-
-  private void compareStates(final Map<String, Status> expectedStateMap,
-      final Map<String, ExecutableNode> nodeMap) {
-    for (final String printedId : expectedStateMap.keySet()) {
-      final Status expectedStatus = expectedStateMap.get(printedId);
-      final ExecutableNode node = nodeMap.get(printedId);
-
-      if (expectedStatus != node.getStatus()) {
-        Assert.fail("Expected values do not match for " + printedId
-            + ". Expected " + expectedStatus + ", instead received "
-            + node.getStatus());
-      }
-    }
-  }
-
   private void prepareProject(final Project project, final File directory)
       throws ProjectManagerException, IOException {
     final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), this.logger);
@@ -1370,7 +1097,6 @@ public class FlowRunnerTest2 {
       for (final String error : loader.getErrors()) {
         System.out.println(error);
       }
-
       throw new RuntimeException("Errors found in setup");
     }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
new file mode 100644
index 0000000..09e041a
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -0,0 +1,173 @@
+package azkaban.execapp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import azkaban.event.Event;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.InteractiveTestJob;
+import azkaban.executor.Status;
+import java.util.function.Function;
+import org.junit.Assert;
+
+public class FlowRunnerTestBase {
+
+  protected FlowRunner runner;
+  protected EventCollectorListener eventCollector;
+
+  public static boolean isStarted(final Status status) {
+    if (status == Status.QUEUED) {
+      return false;
+    } else if (!Status.isStatusFinished(status) && !Status.isStatusRunning(status)) {
+      return false;
+    }
+    return true;
+  }
+
+  public void assertThreadShutDown() {
+    waitFlowRunner(
+        runner -> runner.getExecutableFlow().isFlowFinished() && !runner.isRunnerThreadAlive());
+  }
+
+  public void assertThreadRunning() {
+    waitFlowRunner(
+        runner -> !runner.getExecutableFlow().isFlowFinished() && runner.isRunnerThreadAlive());
+  }
+
+  public void waitFlowRunner(final Function<FlowRunner, Boolean> statusCheck) {
+    for (int i = 0; i < 100; i++) {
+      if (statusCheck.apply(this.runner)) {
+        return;
+      }
+      synchronized (EventCollectorListener.handleEvent) {
+        try {
+          EventCollectorListener.handleEvent.wait(10L);
+        } catch (final InterruptedException e) {
+        }
+      }
+    }
+    Assert.fail("Flow didn't reach expected status");
+  }
+
+  public void waitJobStatuses(final Function<Status, Boolean> statusCheck,
+      final String... jobs) {
+    for (int i = 0; i < 100; i++) {
+      if (checkJobStatuses(statusCheck, jobs)) {
+        return;
+      }
+      synchronized (EventCollectorListener.handleEvent) {
+        try {
+          EventCollectorListener.handleEvent.wait(10L);
+        } catch (final InterruptedException e) {
+        }
+      }
+    }
+    Assert.fail("Jobs didn't reach expected statuses");
+  }
+
+  public void waitJobsStarted(final FlowRunner runner, final String... jobs) {
+    waitJobStatuses(FlowRunnerTest::isStarted, jobs);
+  }
+
+  protected void waitEventFired(final String nestedId, final Status status)
+      throws InterruptedException {
+    for (int i = 0; i < 100; i++) {
+      for (final Event event : this.eventCollector.getEventList()) {
+        if (event.getData().getStatus() == status && event.getData().getNestedId()
+            .equals(nestedId)) {
+          return;
+        }
+      }
+      synchronized (EventCollectorListener.handleEvent) {
+        EventCollectorListener.handleEvent.wait(10L);
+      }
+    }
+    fail("Event wasn't fired with [" + nestedId + "], " + status);
+  }
+
+  public boolean checkJobStatuses(final Function<Status, Boolean> statusCheck,
+      final String[] jobs) {
+    final ExecutableFlow exFlow = this.runner.getExecutableFlow();
+    for (final String name : jobs) {
+      final ExecutableNode node = exFlow.getExecutableNodePath(name);
+      assertNotNull(name + " wasn't found", node);
+      if (!statusCheck.apply(node.getStatus())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected void assertFlowStatus(final Status status) {
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    for (int i = 0; i < 100; i++) {
+      if (flow.getStatus() == status) {
+        break;
+      }
+      synchronized (EventCollectorListener.handleEvent) {
+        try {
+          EventCollectorListener.handleEvent.wait(10L);
+        } catch (final InterruptedException e) {
+        }
+      }
+    }
+    printStatuses(status, flow);
+    assertEquals(status, flow.getStatus());
+  }
+
+  protected void assertStatus(final String name, final Status status) {
+    final ExecutableFlow exFlow = this.runner.getExecutableFlow();
+    final ExecutableNode node = exFlow.getExecutableNodePath(name);
+    assertNotNull(name + " wasn't found", node);
+    for (int i = 0; i < 100; i++) {
+      if (node.getStatus() == status) {
+        break;
+      }
+      synchronized (EventCollectorListener.handleEvent) {
+        try {
+          EventCollectorListener.handleEvent.wait(10L);
+        } catch (final InterruptedException e) {
+        }
+      }
+    }
+    printStatuses(status, node);
+    assertEquals("Wrong status for [" + name + "]", status, node.getStatus());
+  }
+
+  protected void printStatuses(final Status status, final ExecutableNode node) {
+    if (status != node.getStatus()) {
+      printTestJobs();
+      printFlowJobs(this.runner.getExecutableFlow());
+    }
+  }
+
+  private void printTestJobs() {
+    for (final String testJob : InteractiveTestJob.testJobs.keySet()) {
+      final ExecutableNode testNode = this.runner.getExecutableFlow()
+          .getExecutableNodePath(testJob);
+      System.err.println("testJob: " + testNode.getNestedId() + " " + testNode.getStatus());
+    }
+  }
+
+  private void printFlowJobs(final ExecutableFlowBase flow) {
+    System.err.println("ExecutableFlow: " + flow.getNestedId() + " " + flow.getStatus());
+    for (final ExecutableNode node : flow.getExecutableNodes()) {
+      if (node instanceof ExecutableFlowBase) {
+        printFlowJobs((ExecutableFlowBase) node);
+      } else {
+        System.err.println("ExecutableNode: " + node.getNestedId() + " " + node.getStatus());
+      }
+    }
+  }
+
+  protected void succeedJobs(final String... jobs) {
+    waitJobsStarted(this.runner, jobs);
+    for (final String name : jobs) {
+      InteractiveTestJob.getTestJob(name).succeedJob();
+    }
+  }
+
+}
diff --git a/azkaban-exec-server/src/test/resources/log4j.properties b/azkaban-exec-server/src/test/resources/log4j.properties
index 36c72cb..14e6647 100644
--- a/azkaban-exec-server/src/test/resources/log4j.properties
+++ b/azkaban-exec-server/src/test/resources/log4j.properties
@@ -1,4 +1,4 @@
 log4j.rootLogger=INFO, Console
 log4j.appender.Console=org.apache.log4j.ConsoleAppender
 log4j.appender.Console.layout=org.apache.log4j.PatternLayout
-log4j.appender.Console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%c{1}] %m%n
+log4j.appender.Console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS Z} %p [%t] [%c{1}] %m%n
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job1.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job1.job
index 0a60dc4..c4cb967 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job1.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job1.job
@@ -1,4 +1,3 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
-seconds=1
+type=test
+seconds=0
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job10.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job10.job
index 218f774..33d572a 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job10.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job10.job
@@ -1,5 +1,4 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
+type=test
 dependencies=job8,job9
-seconds=5
+seconds=0
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job2.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job2.job
index 3c918c8..fa4aa37 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job2.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job2.job
@@ -1,5 +1,4 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
+type=test
 dependencies=job1
-seconds=2
+seconds=0
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job2d.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job2d.job
index b4216ba..91171a1 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job2d.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job2d.job
@@ -1,5 +1,4 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
+type=test
 dependencies=job1
-seconds=1
+seconds=0
 fail=true
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job3.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job3.job
index b26a76c..4918ee4 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job3.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job3.job
@@ -1,5 +1,4 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
+type=test
 dependencies=job2
-seconds=3
+seconds=1
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job4.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job4.job
index 1eccb73..4918ee4 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job4.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job4.job
@@ -1,5 +1,4 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
+type=test
 dependencies=job2
-seconds=8
+seconds=1
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job5.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job5.job
index 8dd934d..8d680fd 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job5.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job5.job
@@ -1,5 +1,4 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
+type=test
 dependencies=job3,job4
-seconds=5
+seconds=0
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job6.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job6.job
index b5df29c..b210308 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job6.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job6.job
@@ -1,5 +1,4 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
+type=test
 dependencies=job1
-seconds=4
+seconds=1
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job7.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job7.job
index d01cf79..48276e9 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job7.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job7.job
@@ -1,5 +1,4 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
+type=test
 dependencies=job5,job6
-seconds=2
+seconds=0
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job8.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job8.job
index 643598c..94865ac 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job8.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job8.job
@@ -1,5 +1,4 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
+type=test
 dependencies=job7
-seconds=3
+seconds=0
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job9.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job9.job
index 5d6dda9..94865ac 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job9.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job9.job
@@ -1,5 +1,4 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
+type=test
 dependencies=job7
-seconds=4
+seconds=0
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-pass.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-pass.job
index 0a60dc4..c4cb967 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-pass.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-pass.job
@@ -1,4 +1,3 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
-seconds=1
+type=test
+seconds=0
 fail=false
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-retry.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-retry.job
index 94cd0fa..6cddff4 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-retry.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-retry.job
@@ -1,8 +1,7 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
-seconds=1
+type=test
+seconds=0
 fail=true
 passRetry=2
 retries=3
-retry.backoff=1000
+retry.backoff=0
 
diff --git a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-retry-fail.job b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-retry-fail.job
index bd51b47..826c2a7 100644
--- a/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-retry-fail.job
+++ b/azkaban-test/src/test/resources/azkaban/test/executions/exectest1/job-retry-fail.job
@@ -1,8 +1,7 @@
-type=java
-job.class=azkaban.test.executor.SleepJavaJob
-seconds=1
+type=test
+seconds=0
 fail=true
 passRetry=3
 retries=2
-retry.backoff=2000
+retry.backoff=0