azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 32176e9..a81b95d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -108,7 +108,8 @@ public class Executor {
     this.isActive = isActive;
   }
 
+  @Override
   public String toString() {
-    return String.format("%s:%d", host, port);
+    return String.format("[%d]%s:%d", id, host, port);
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index ed64364..ab9859e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -69,6 +69,10 @@ public class ExecutorManager extends EventHandler implements
     "azkaban.use.multiple.executors";
   private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
     "azkaban.webserver.queue.size";
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
+    "azkaban.activeexecutor.refresh.milisecinterval";
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW =
+    "azkaban.activeexecutor.refresh.flowinterval";
 
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
   private ExecutorLoader executorLoader;
@@ -80,16 +84,9 @@ public class ExecutorManager extends EventHandler implements
   private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
       new ConcurrentHashMap<Integer, ExecutableFlow>();
 
-  /* map to easily access queued flows */
-  final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
-    new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
-  /* web server side queue */
-  final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlowList =
-    new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
-      new ExecutableFlowPriorityComparator());
+  QueuedExecutions queuedFlows;
 
   final private Set<Executor> activeExecutors = new HashSet<Executor>();
-  final private long webserverQueueCapacity;
   private QueueProcessorThread queueProcessor;
 
   private ExecutingManagerUpdaterThread executingManager;
@@ -126,15 +123,19 @@ public class ExecutorManager extends EventHandler implements
     if(isMultiExecutorMode()) {
       queueProcessor =
         new QueueProcessorThread(azkProps.getBoolean(
-          AZKABAN_QUEUEPROCESSING_ENABLED, true));
+          AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
+          AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000),
+          azkProps
+            .getInt(AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW, 1000));
       queueProcessor.start();
     }
 
     long executionLogsRetentionMs =
-        props.getLong("execution.logs.retention.ms",
-            DEFAULT_EXECUTION_LOGS_RETENTION_MS);
-    webserverQueueCapacity =
-        props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000);
+      props.getLong("execution.logs.retention.ms",
+        DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+
+    queuedFlows =
+      new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
 
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
     cleanerThread.start();
@@ -335,7 +336,7 @@ public class ExecutorManager extends EventHandler implements
   private void loadQueuedFlows() throws ExecutorManagerException {
     for (Pair<ExecutionReference, ExecutableFlow> pair : executorLoader
       .fetchQueuedFlows()) {
-      enqueueFlow(pair.getSecond(), pair.getFirst());
+      queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
     }
   }
 
@@ -350,7 +351,7 @@ public class ExecutorManager extends EventHandler implements
   public List<Integer> getRunningFlows(int projectId, String flowId) {
     List<Integer> executionIds = new ArrayList<Integer>();
     executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
-      queuedFlowMap.values()));
+      queuedFlows.getAllEntries()));
     executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
       runningFlows.values()));
     return executionIds;
@@ -380,7 +381,7 @@ public class ExecutorManager extends EventHandler implements
     throws IOException {
     List<Pair<ExecutableFlow, Executor>> flows =
       new ArrayList<Pair<ExecutableFlow, Executor>>();
-    getActiveFlowsWithExecutorHelper(flows, queuedFlowMap.values());
+    getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
     getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
     return flows;
   }
@@ -407,11 +408,11 @@ public class ExecutorManager extends EventHandler implements
     boolean isRunning = false;
     isRunning =
       isRunning
-        || isFlowRunningHelper(projectId, flowId, queuedFlowMap.values());
+        || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
     isRunning =
       isRunning
         || isFlowRunningHelper(projectId, flowId, runningFlows.values());
-    return false;
+    return isRunning;
   }
 
   /* Search a running flow in a collection */
@@ -437,8 +438,8 @@ public class ExecutorManager extends EventHandler implements
     throws ExecutorManagerException {
     if (runningFlows.containsKey(execId)) {
       return runningFlows.get(execId).getSecond();
-    } else if (queuedFlowMap.containsKey(execId)) {
-      return queuedFlowMap.get(execId).getSecond();
+    } else if (queuedFlows.hasExecution(execId)) {
+      return queuedFlows.getFlow(execId);
     } else {
       return executorLoader.fetchExecutableFlow(execId);
     }
@@ -454,7 +455,7 @@ public class ExecutorManager extends EventHandler implements
   @Override
   public List<ExecutableFlow> getRunningFlows() {
     ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
-    getActiveFlowHelper(flows, queuedFlowMap.values());
+    getActiveFlowHelper(flows, queuedFlows.getAllEntries());
     getActiveFlowHelper(flows, runningFlows.values());
     return flows;
   }
@@ -479,7 +480,7 @@ public class ExecutorManager extends EventHandler implements
    */
   public String getRunningFlowIds() {
     List<Integer> allIds = new ArrayList<Integer>();
-    getRunningFlowsIdsHelper(allIds, queuedFlowMap.values());
+    getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
     getRunningFlowsIdsHelper(allIds, runningFlows.values());
     Collections.sort(allIds);
     return allIds.toString();
@@ -494,7 +495,7 @@ public class ExecutorManager extends EventHandler implements
    */
   public String getQueuedFlowIds() {
     List<Integer> allIds = new ArrayList<Integer>();
-    getRunningFlowsIdsHelper(allIds, queuedFlowMap.values());
+    getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
     Collections.sort(allIds);
     return allIds.toString();
   }
@@ -688,13 +689,9 @@ public class ExecutorManager extends EventHandler implements
           runningFlows.get(exFlow.getExecutionId());
         callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
           userId);
-      } else if (queuedFlowMap.containsKey(exFlow.getExecutionId())) {
-        Pair<ExecutionReference, ExecutableFlow> pair =
-          queuedFlowMap.get(exFlow.getExecutionId());
-        synchronized (pair) {
-          dequeueFlow(exFlow.getExecutionId());
-          finalizeFlows(exFlow);
-        }
+      } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
+        queuedFlows.dequeue(exFlow.getExecutionId());
+        finalizeFlows(exFlow);
       } else {
         throw new ExecutorManagerException("Execution "
           + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -863,7 +860,7 @@ public class ExecutorManager extends EventHandler implements
       logger.info("Submitting execution flow " + flowId + " by " + userId);
 
       String message = "";
-      if (queuedFlowList.size() >= webserverQueueCapacity) {
+      if (queuedFlows.isFull()) {
         message =
           String
             .format(
@@ -928,7 +925,7 @@ public class ExecutorManager extends EventHandler implements
         if (isMultiExecutorMode()) {
           //Take MultiExecutor route
           executorLoader.addActiveExecutableReference(reference);
-          enqueueFlow(exflow, reference);
+          queuedFlows.enqueue(exflow, reference);
         } else {
           // assign only local executor we have
           reference.setExecutor(activeExecutors.iterator().next());
@@ -951,47 +948,6 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-
-  /**
-   * Wraps BoundedQueue Take method to have a corresponding update in
-   * queuedFlowMap lookup table
-   *
-   * @return
-   * @throws InterruptedException
-   */
-  private Pair<ExecutionReference, ExecutableFlow> waitAndFetchQueueHead()
-    throws InterruptedException {
-    Pair<ExecutionReference, ExecutableFlow> pair = queuedFlowList.take();
-    if (pair != null && pair.getFirst() != null) {
-      queuedFlowMap.remove(pair.getFirst().getExecId());
-    }
-    return pair;
-  }
-
-  /* Helper method to have a single point of deletion in the queued flows */
-  private void dequeueFlow(int executionId) {
-    if (queuedFlowMap.containsKey(executionId)) {
-      queuedFlowList.remove(queuedFlowMap.get(executionId));
-      queuedFlowMap.remove(executionId);
-    }
-  }
-
-  /* Helper method to have a single point of insertion in the queued flows */
-  private void enqueueFlow(ExecutableFlow exflow, ExecutionReference ref)
-    throws ExecutorManagerException {
-    Pair<ExecutionReference, ExecutableFlow> pair =
-      new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
-    try {
-      queuedFlowMap.put(exflow.getExecutionId(), pair);
-      queuedFlowList.put(pair);
-    } catch (InterruptedException e) {
-      String errMsg = "Failed to queue flow " + exflow.getExecutionId();
-      logger.error(errMsg, e);
-      finalizeFlows(exflow);
-      throw new ExecutorManagerException(errMsg);
-    }
-  }
-
   private void cleanOldExecutionLogs(long millis) {
     try {
       int count = executorLoader.removeExecutionLogsByTime(millis);
@@ -1710,21 +1666,26 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-
   /*
    * This thread is responsible for processing queued flows.
    */
   private class QueueProcessorThread extends Thread {
     private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
-    private static final long ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS = 1000;
     private static final int MAX_DISPATCHING_ERRORS_PERMITTED = 5;
-    private static final int MAX_CONTINUOUS_FLOW_PROCESSED = 10;
+    private final long activeExecutorRefreshWindowInMilisec;
+    private final int activeExecutorRefreshWindowInFlows;
 
     private boolean shutdown = false;
     private boolean isActive = true;
 
-    public QueueProcessorThread(boolean isActive) {
+    public QueueProcessorThread(boolean isActive,
+      long activeExecutorRefreshWindowInTime,
+      int activeExecutorRefreshWindowInFlows) {
       setActive(isActive);
+      this.activeExecutorRefreshWindowInFlows =
+        activeExecutorRefreshWindowInFlows;
+      this.activeExecutorRefreshWindowInMilisec =
+        activeExecutorRefreshWindowInTime;
       this.setName("AzkabanWebServer-QueueProcessor-Thread");
     }
 
@@ -1748,8 +1709,8 @@ public class ExecutorManager extends EventHandler implements
           try {
             // start processing queue if active, other wait for sometime
             if (isActive) {
-              processQueuedFlows(ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS,
-                MAX_CONTINUOUS_FLOW_PROCESSED);
+              processQueuedFlows(activeExecutorRefreshWindowInMilisec,
+                activeExecutorRefreshWindowInFlows);
             }
             wait(QUEUE_PROCESSOR_WAIT_IN_MS);
           } catch (Exception e) {
@@ -1764,19 +1725,19 @@ public class ExecutorManager extends EventHandler implements
     private void processQueuedFlows(long activeExecutorsRefreshWindow,
       int maxContinuousFlowProcessed) throws InterruptedException,
       ExecutorManagerException {
-      long lastProcessingTime = System.currentTimeMillis();
+      long lastExecutorRefreshTime = System.currentTimeMillis();
       Pair<ExecutionReference, ExecutableFlow> runningCandidate;
       int currentContinuousFlowProcessed = 0;
 
-      while (isActive() && (runningCandidate = waitAndFetchQueueHead()) != null) {
+      while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
         ExecutionReference reference = runningCandidate.getFirst();
         ExecutableFlow exflow = runningCandidate.getSecond();
 
         long currentTime = System.currentTimeMillis();
-        if (currentTime - lastProcessingTime > activeExecutorsRefreshWindow
-              || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
+        if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
+          || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
           refreshExecutors(); // Refresh executor stats to be used by selector
-          lastProcessingTime = currentTime;
+          lastExecutorRefreshTime = currentTime;
           currentContinuousFlowProcessed = 0;
         }
 
@@ -1841,7 +1802,7 @@ public class ExecutorManager extends EventHandler implements
         finalizeFlows(exflow);
       } else {
         // again queue this flow
-        enqueueFlow(exflow, reference);
+        queuedFlows.enqueue(exflow, reference);
       }
     }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
new file mode 100644
index 0000000..641ffae
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
@@ -0,0 +1,201 @@
+package azkaban.executor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * <pre>
+ * Composite data structure to represent non-dispatched flows in webserver.
+ * This data structure wraps a blocking queue and a concurrent hashmap.
+ * </pre>
+ */
+public class QueuedExecutions {
+  private static Logger logger = Logger.getLogger(QueuedExecutions.class);
+  final long capacity;
+
+  /* map to easily access queued flows */
+  final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap;
+  /* actual queue */
+  final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlowList;
+
+  public QueuedExecutions(long capacity) {
+    this.capacity = capacity;
+    queuedFlowMap =
+      new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+    queuedFlowList =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+  }
+
+  /**
+   * Wraps BoundedQueue Take method to have a corresponding update in
+   * queuedFlowMap lookup table
+   *
+   * @return
+   * @throws InterruptedException
+   */
+  public Pair<ExecutionReference, ExecutableFlow> fetchHead()
+    throws InterruptedException {
+    Pair<ExecutionReference, ExecutableFlow> pair = queuedFlowList.take();
+    if (pair != null && pair.getFirst() != null) {
+      queuedFlowMap.remove(pair.getFirst().getExecId());
+    }
+    return pair;
+  }
+
+  /**
+   * Helper method to have a single point of deletion in the queued flows
+   *
+   * @param executionId
+   */
+  public void dequeue(int executionId) {
+    if (queuedFlowMap.containsKey(executionId)) {
+      queuedFlowList.remove(queuedFlowMap.get(executionId));
+      queuedFlowMap.remove(executionId);
+    }
+  }
+
+  /**
+   * <pre>
+   * Helper method to have a single point of insertion in the queued flows
+   *
+   * @param exflow
+   *          flow to be enqueued
+   * @param ref
+   *          reference to be enqueued
+   * @throws ExecutorManagerException
+   *           case 1: if blocking queue put method fails due to
+   *           InterruptedException
+   *           case 2: if there already an element with
+   *           same execution Id
+   * </pre>
+   */
+  public void enqueue(ExecutableFlow exflow, ExecutionReference ref)
+    throws ExecutorManagerException {
+    if (hasExecution(exflow.getExecutionId())) {
+      String errMsg = "Flow already in queue " + exflow.getExecutionId();
+      throw new ExecutorManagerException(errMsg);
+    }
+
+    Pair<ExecutionReference, ExecutableFlow> pair =
+      new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
+    try {
+      queuedFlowMap.put(exflow.getExecutionId(), pair);
+      queuedFlowList.put(pair);
+    } catch (InterruptedException e) {
+      String errMsg = "Failed to insert flow " + exflow.getExecutionId();
+      logger.error(errMsg, e);
+      throw new ExecutorManagerException(errMsg);
+    }
+  }
+
+  /**
+   * <pre>
+   * Enqueues all the elements of a collection
+   *
+   * @param collection
+   *
+   * @throws ExecutorManagerException
+   *           case 1: if blocking queue put method fails due to
+   *           InterruptedException
+   *           case 2: if there already an element with
+   *           same execution Id
+   * </pre>
+   */
+  public void enqueueAll(
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection)
+    throws ExecutorManagerException {
+    for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
+      enqueue(pair.getSecond(), pair.getFirst());
+    }
+  }
+
+  /**
+   * Returns a read only collection of all the queued (flows, reference) pairs
+   *
+   * @return
+   */
+  public Collection<Pair<ExecutionReference, ExecutableFlow>> getAllEntries() {
+    return Collections.unmodifiableCollection(queuedFlowMap.values());
+  }
+
+  /**
+   * Checks if an execution is queued or not
+   *
+   * @param executionId
+   * @return
+   */
+  public boolean hasExecution(int executionId) {
+    return queuedFlowMap.containsKey(executionId);
+  }
+
+  /**
+   * Fetch flow for an execution. Returns null, if execution not in queue
+   *
+   * @param executionId
+   * @return
+   */
+  public ExecutableFlow getFlow(int executionId) {
+    if (hasExecution(executionId)) {
+      return queuedFlowMap.get(executionId).getSecond();
+    }
+    return null;
+  }
+
+  /**
+   * Fetch Activereference for an execution. Returns null, if execution not in
+   * queue
+   *
+   * @param executionId
+   * @return
+   */
+  public ExecutionReference getReference(int executionId) {
+    if (hasExecution(executionId)) {
+      return queuedFlowMap.get(executionId).getFirst();
+    }
+    return null;
+  }
+
+  /**
+   * Size of the queue
+   *
+   * @return
+   */
+  public long size() {
+    return queuedFlowList.size();
+  }
+
+  /**
+   * Verify, if queue is full as per initialized capacity
+   *
+   * @return
+   */
+  public boolean isFull() {
+    return size() >= capacity;
+  }
+
+  /**
+   * Verify, if queue is empty or not
+   *
+   * @return
+   */
+  public boolean isEmpty() {
+    return queuedFlowList.isEmpty() && queuedFlowMap.isEmpty();
+  }
+
+  /**
+   * Empties queue by dequeuing all the elements
+   */
+  public void clear() {
+    for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowMap.values()) {
+      dequeue(pair.getFirst().getExecId());
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index 287e971..bbd642a 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -69,7 +69,7 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
   }
 
   @Override
-  public String getUndispatchedFlows() {
+  public String getQueuedFlows() {
     return manager.getQueuedFlowIds();
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index babb2a0..94012e0 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -43,8 +43,8 @@ public interface JmxExecutorManagerMBean {
   @DisplayName("OPERATION: isQueueProcessorActive")
   public boolean isQueueProcessorActive();
 
-  @DisplayName("OPERATION: getUndispatchedFlows")
-  public String getUndispatchedFlows();
+  @DisplayName("OPERATION: getQueuedFlows")
+  public String getQueuedFlows();
 
   @DisplayName("OPERATION: getQueueProcessorThreadState")
   public String getQueueProcessorThreadState();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
index 319f314..f9850d6 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -35,34 +35,19 @@ import azkaban.user.User;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
 
 /**
  * Test class for ExecutableFlowPriorityComparator
  * */
 
 public class ExecutableFlowPriorityComparatorTest {
-  /* Directory with serialized description of test flows */
-  private static final String UNIT_BASE_DIR =
-    "../azkaban-test/src/test/resources/executions/exectest1/";
-
-  private File getFlowDir(String flow) {
-    return new File(UNIT_BASE_DIR + flow + ".flow");
-  }
 
   /* Helper method to create an ExecutableFlow from serialized description */
   private ExecutableFlow createExecutableFlow(String flowName, int priority,
     long updateTime) throws IOException {
-    File jsonFlowFile = getFlowDir(flowName);
-    @SuppressWarnings("unchecked")
-    HashMap<String, Object> flowObj =
-      (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    Flow flow = Flow.flowFromObject(flowObj);
-    Project project = new Project(1, "flow");
-    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
-    flowMap.put(flow.getId(), flow);
-    project.setFlows(flowMap);
-    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+    ExecutableFlow execFlow =
+      TestUtils.createExecutableFlow("exectest1", flowName);
 
     execFlow.setUpdateTime(updateTime);
     if (priority > 0) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index d77b734..62d187b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -34,22 +34,12 @@ import azkaban.user.User;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
 
 /**
  * Test class for executor manager
  */
 public class ExecutorManagerTest {
-  /* Directory with serialized description of test flows */
-  private static final String UNIT_BASE_DIR =
-    "../azkaban-test/src/test/resources/executions/exectest1/";
-
-  private File getFlowDir(String flow) {
-    return new File(UNIT_BASE_DIR + flow + ".flow");
-  }
-
-  private User getTestUser() {
-    return new User("testUser");
-  }
 
   /* Helper method to create a ExecutorManager Instance */
   private ExecutorManager createMultiExecutorManagerInstance()
@@ -72,41 +62,18 @@ public class ExecutorManagerTest {
     return new ExecutorManager(props, loader, new HashMap<String, Alerter>());
   }
 
-  /* Helper method to create an ExecutableFlow from serialized description */
-  private ExecutableFlow createExecutableFlow(String flowName)
-    throws IOException {
-    File jsonFlowFile = getFlowDir(flowName);
-    @SuppressWarnings("unchecked")
-    HashMap<String, Object> flowObj =
-      (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    Flow flow = Flow.flowFromObject(flowObj);
-    Project project = new Project(1, "flow");
-    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
-    flowMap.put(flow.getId(), flow);
-    project.setFlows(flowMap);
-    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-
-    return execFlow;
-  }
-
   /*
    * Test create an executor manager instance without any executor local or
    * remote
    */
-  @Test
-  public void testNoExecutorScenario() {
-    try {
-      Props props = new Props();
-      props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-      ExecutorLoader loader = new MockExecutorLoader();
-      @SuppressWarnings("unused")
-      ExecutorManager manager =
-        new ExecutorManager(props, loader, new HashMap<String, Alerter>());
-      Assert.fail("Expecting exception, but didn't get one");
-    } catch (ExecutorManagerException ex) {
-      System.out.println("Test true");
-    }
+  @Test(expected = ExecutorManagerException.class)
+  public void testNoExecutorScenario() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    @SuppressWarnings("unused")
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
   }
 
   /*
@@ -180,29 +147,24 @@ public class ExecutorManagerTest {
    * Test executor manager active executor reload and resulting in no active
    * executors
    */
-  @Test
-  public void testSetupExecutorsException() {
-    try {
-      Props props = new Props();
-      props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
-      ExecutorLoader loader = new MockExecutorLoader();
-      Executor executor1 = loader.addExecutor("localhost", 12345);
-
-      ExecutorManager manager =
-        new ExecutorManager(props, loader, new HashMap<String, Alerter>());
-      Set<Executor> activeExecutors =
-        new HashSet(manager.getAllActiveExecutors());
-      Assert.assertArrayEquals(activeExecutors.toArray(),
-        new Executor[] { executor1 });
-
-      // mark older executor as inactive
-      executor1.setActive(false);
-      loader.updateExecutor(executor1);
-      manager.setupExecutors();
-      Assert.fail("Expecting exception, but didn't get one");
-    } catch (ExecutorManagerException ex) {
-      System.out.println("Test true");
-    }
+  @Test(expected = ExecutorManagerException.class)
+  public void testSetupExecutorsException() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    Executor executor1 = loader.addExecutor("localhost", 12345);
+
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Set<Executor> activeExecutors =
+      new HashSet(manager.getAllActiveExecutors());
+    Assert.assertArrayEquals(activeExecutors.toArray(),
+      new Executor[] { executor1 });
+
+    // mark older executor as inactive
+    executor1.setActive(false);
+    loader.updateExecutor(executor1);
+    manager.setupExecutors();
   }
 
   /* Test disabling queue process thread to pause dispatching */
@@ -229,12 +191,12 @@ public class ExecutorManagerTest {
   public void testQueuedFlows() throws ExecutorManagerException, IOException {
     ExecutorLoader loader = new MockExecutorLoader();
     ExecutorManager manager = createMultiExecutorManagerInstance(loader);
-    ExecutableFlow flow1 = createExecutableFlow("exec1");
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     flow1.setExecutionId(1);
-    ExecutableFlow flow2 = createExecutableFlow("exec2");
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
     flow2.setExecutionId(2);
 
-    User testUser = getTestUser();
+    User testUser = TestUtils.getTestUser();
     manager.submitExecutableFlow(flow1, testUser.getUserId());
     manager.submitExecutableFlow(flow2, testUser.getUserId());
 
@@ -261,24 +223,17 @@ public class ExecutorManagerTest {
   }
 
   /* Test submit duplicate flow when previous instance is not dispatched */
-  @Test
+  @Test(expected = ExecutorManagerException.class)
   public void testDuplicateQueuedFlows() throws ExecutorManagerException,
     IOException {
-    try {
-      ExecutorManager manager = createMultiExecutorManagerInstance();
-      ExecutableFlow flow1 = createExecutableFlow("exec1");
-      flow1.getExecutionOptions().setConcurrentOption(
-        ExecutionOptions.CONCURRENT_OPTION_SKIP);
-
-      User testUser = getTestUser();
-      manager.submitExecutableFlow(flow1, testUser.getUserId());
-      manager.submitExecutableFlow(flow1, testUser.getUserId());
-      manager.enableQueueProcessorThread();
-      manager.submitExecutableFlow(flow1, testUser.getUserId());
-      Assert.fail("Expecting exception, but didn't get one");
-    } catch (ExecutorManagerException ex) {
-      System.out.println("Test true");
-    }
+    ExecutorManager manager = createMultiExecutorManagerInstance();
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    flow1.getExecutionOptions().setConcurrentOption(
+      ExecutionOptions.CONCURRENT_OPTION_SKIP);
+
+    User testUser = TestUtils.getTestUser();
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
   }
 
   /*
@@ -289,8 +244,8 @@ public class ExecutorManagerTest {
   public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
     ExecutorLoader loader = new MockExecutorLoader();
     ExecutorManager manager = createMultiExecutorManagerInstance(loader);
-    ExecutableFlow flow1 = createExecutableFlow("exec1");
-    User testUser = getTestUser();
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    User testUser = TestUtils.getTestUser();
     manager.submitExecutableFlow(flow1, testUser.getUserId());
 
     manager.cancelFlow(flow1, testUser.getUserId());
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index e725a8d..a7e2b5f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -49,9 +49,13 @@ import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
 
 public class JdbcExecutorLoaderTest {
   private static boolean testDBExists;
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions";
   // @TODO remove this and turn into local host.
   private static final String host = "localhost";
   private static final int port = 3306;
@@ -59,10 +63,6 @@ public class JdbcExecutorLoaderTest {
   private static final String user = "azkaban";
   private static final String password = "azkaban";
   private static final int numConnections = 10;
-  private static final String UNIT_BASE_DIR =
-    "../azkaban-test/src/test/resources/executions/";
-
-  private File flowDir = new File(UNIT_BASE_DIR + "/exectest1");
 
   @BeforeClass
   public static void setupDB() {
@@ -230,7 +230,7 @@ public class JdbcExecutorLoaderTest {
       return;
     }
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
 
     loader.uploadExecutableFlow(flow);
 
@@ -259,7 +259,7 @@ public class JdbcExecutorLoaderTest {
     }
 
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
 
     loader.uploadExecutableFlow(flow);
 
@@ -298,7 +298,7 @@ public class JdbcExecutorLoaderTest {
     ExecutableFlow flow = createExecutableFlow(10, "exec1");
     flow.setExecutionId(10);
 
-    File jobFile = new File(flowDir, "job10.job");
+    File jobFile = new File(UNIT_BASE_DIR + "/exectest1", "job10.job");
     Props props = new Props(null, jobFile);
     props.put("test", "test2");
     ExecutableNode oldNode = flow.getExecutableNode("job10");
@@ -343,7 +343,7 @@ public class JdbcExecutorLoaderTest {
       return;
     }
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     try {
       loader.assignExecutor(flow.getExecutionId(), 1);
@@ -391,7 +391,7 @@ public class JdbcExecutorLoaderTest {
       return;
     }
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
       null);
@@ -408,7 +408,7 @@ public class JdbcExecutorLoaderTest {
     String host = "localhost";
     int port = 12345;
     Executor executor = loader.addExecutor(host, port);
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     loader.assignExecutor(executor.getId(), flow.getExecutionId());
     Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
@@ -434,13 +434,13 @@ public class JdbcExecutorLoaderTest {
     int port = 12345;
     Executor executor = loader.addExecutor(host, port);
 
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
     loader.assignExecutor(executor.getId(), flow.getExecutionId());
     // only completed flows
     Assert.assertTrue(queuedFlows.isEmpty());
 
-    ExecutableFlow flow2 = createExecutableFlow("exec2");
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
     loader.uploadExecutableFlow(flow);
     loader.assignExecutor(executor.getId(), flow.getExecutionId());
     ExecutionReference ref = new ExecutionReference(flow2.getExecutionId());
@@ -461,9 +461,9 @@ public class JdbcExecutorLoaderTest {
     List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
       new LinkedList<Pair<ExecutionReference, ExecutableFlow>>();
 
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow);
-    ExecutableFlow flow2 = createExecutableFlow("exec2");
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
     loader.uploadExecutableFlow(flow);
 
     ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
@@ -732,20 +732,20 @@ public class JdbcExecutorLoaderTest {
     }
 
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow1 = createExecutableFlow("exec1");
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow1);
     Executor executor = new Executor(2, "test", 1, true);
     ExecutionReference ref1 =
         new ExecutionReference(flow1.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref1);
 
-    ExecutableFlow flow2 = createExecutableFlow("exec1");
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow2);
     ExecutionReference ref2 =
         new ExecutionReference(flow2.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref2);
 
-    ExecutableFlow flow3 = createExecutableFlow("exec1");
+    ExecutableFlow flow3 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow3);
 
     Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
@@ -884,37 +884,10 @@ public class JdbcExecutorLoaderTest {
   }
 
   private ExecutableFlow createExecutableFlow(int executionId, String flowName)
-      throws IOException {
-    File jsonFlowFile = new File(flowDir, flowName + ".flow");
-    @SuppressWarnings("unchecked")
-    HashMap<String, Object> flowObj =
-        (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    Flow flow = Flow.flowFromObject(flowObj);
-    Project project = new Project(1, "flow");
-    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
-    flowMap.put(flow.getId(), flow);
-    project.setFlows(flowMap);
-    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+    throws IOException {
+    ExecutableFlow execFlow =
+      TestUtils.createExecutableFlow("exectest1", flowName);
     execFlow.setExecutionId(executionId);
-
-    return execFlow;
-  }
-
-  private ExecutableFlow createExecutableFlow(String flowName)
-      throws IOException {
-    File jsonFlowFile = new File(flowDir, flowName + ".flow");
-    @SuppressWarnings("unchecked")
-    HashMap<String, Object> flowObj =
-        (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    Flow flow = Flow.flowFromObject(flowObj);
-    Project project = new Project(1, "flow");
-    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
-    flowMap.put(flow.getId(), flow);
-    project.setFlows(flowMap);
-    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-
     return execFlow;
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
new file mode 100644
index 0000000..94bbd7e
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
@@ -0,0 +1,208 @@
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+
+public class QueuedExecutionsTest {
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions/exectest1/";
+
+  private File getFlowDir(String flow) {
+    return new File(UNIT_BASE_DIR + flow + ".flow");
+  }
+
+  /*
+   * Helper method to create an (ExecutionReference, ExecutableFlow) from
+   * serialized description
+   */
+  private Pair<ExecutionReference, ExecutableFlow> createExecutablePair(
+    String flowName, int execId) throws IOException {
+    File jsonFlowFile = getFlowDir(flowName);
+    @SuppressWarnings("unchecked")
+    HashMap<String, Object> flowObj =
+      (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+    Flow flow = Flow.flowFromObject(flowObj);
+    Project project = new Project(1, "flow");
+    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+    flowMap.put(flow.getId(), flow);
+    project.setFlows(flowMap);
+    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+    execFlow.setExecutionId(execId);
+    ExecutionReference ref = new ExecutionReference(execId);
+    return new Pair<ExecutionReference, ExecutableFlow>(ref, execFlow);
+  }
+
+  public List<Pair<ExecutionReference, ExecutableFlow>> getDummyData()
+    throws IOException {
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList =
+      new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+    dataList.add(createExecutablePair("exec1", 1));
+    dataList.add(createExecutablePair("exec2", 2));
+    return dataList;
+  }
+
+  /* Test enqueue method happy case */
+  @Test
+  public void testEnqueueHappyCase() throws IOException,
+    ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      queue.enqueue(pair.getSecond(), pair.getFirst());
+    }
+
+    Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+    Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+  }
+
+  /* Test enqueue duplicate execution ids */
+  @Test(expected = ExecutorManagerException.class)
+  public void testEnqueueDuplicateExecution() throws IOException,
+    ExecutorManagerException {
+    Pair<ExecutionReference, ExecutableFlow> pair1 =
+      createExecutablePair("exec1", 1);
+    QueuedExecutions queue = new QueuedExecutions(5);
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+  }
+
+  /* Test enqueue more than capacity */
+  @Test(expected = ExecutorManagerException.class)
+  public void testEnqueueOverflow() throws IOException,
+    ExecutorManagerException {
+    Pair<ExecutionReference, ExecutableFlow> pair1 =
+      createExecutablePair("exec1", 1);
+    QueuedExecutions queue = new QueuedExecutions(1);
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+  }
+
+  /* Test EnqueueAll method */
+  @Test
+  public void testEnqueueAll() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+    Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+  }
+
+  /* Test size method */
+  @Test
+  public void testSize() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.size(), 2);
+  }
+
+  /* Test dequeue method */
+  @Test
+  public void testDequeue() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    queue.dequeue(dataList.get(0).getFirst().getExecId());
+    Assert.assertEquals(queue.size(), 1);
+    Assert.assertTrue(queue.getAllEntries().contains(dataList.get(1)));
+  }
+
+  /* Test clear method */
+  @Test
+  public void testClear() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.size(), 2);
+    queue.clear();
+    Assert.assertEquals(queue.size(), 0);
+  }
+
+  /* Test isEmpty method */
+  @Test
+  public void testIsEmpty() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    Assert.assertTrue(queue.isEmpty());
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.size(), 2);
+    queue.clear();
+    Assert.assertTrue(queue.isEmpty());
+  }
+
+  /* Test fetchHead method */
+  @Test
+  public void testFetchHead() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    Assert.assertTrue(queue.isEmpty());
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.fetchHead(), dataList.get(0));
+    Assert.assertEquals(queue.fetchHead(), dataList.get(1));
+  }
+
+  /* Test isFull method */
+  @Test
+  public void testIsFull() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertTrue(queue.isFull());
+  }
+
+  /* Test hasExecution method */
+  @Test
+  public void testHasExecution() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      Assert.assertTrue(queue.hasExecution(pair.getFirst().getExecId()));
+    }
+    Assert.assertFalse(queue.hasExecution(5));
+    Assert.assertFalse(queue.hasExecution(7));
+    Assert.assertFalse(queue.hasExecution(15));
+  }
+
+  /* Test getFlow method */
+  @Test
+  public void testGetFlow() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      Assert.assertEquals(pair.getSecond(),
+        queue.getFlow(pair.getFirst().getExecId()));
+    }
+  }
+
+  /* Test getReferences method */
+  @Test
+  public void testGetReferences() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      Assert.assertEquals(pair.getFirst(),
+        queue.getReference(pair.getFirst().getExecId()));
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
new file mode 100644
index 0000000..e51b575
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+
+/**
+ * Commonly used utils method for unit/integration tests
+ */
+public class TestUtils {
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions";
+
+  public static File getFlowDir(String projectName, String flow) {
+    return new File(String.format("%s/%s/%s.flow", UNIT_BASE_DIR, projectName,
+      flow));
+  }
+
+  public static User getTestUser() {
+    return new User("testUser");
+  }
+
+  /* Helper method to create an ExecutableFlow from serialized description */
+  public static ExecutableFlow createExecutableFlow(String projectName, String flowName)
+    throws IOException {
+    File jsonFlowFile = getFlowDir(projectName, flowName);
+    @SuppressWarnings("unchecked")
+    HashMap<String, Object> flowObj =
+      (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+    Flow flow = Flow.flowFromObject(flowObj);
+    Project project = new Project(1, "flow");
+    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+    flowMap.put(flow.getId(), flow);
+    project.setFlows(flowMap);
+    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+
+    return execFlow;
+  }
+}