azkaban-developers

Changes

azkaban-sql/src/sql/update.execution_logs.2.1.sql 6(+0 -6)

azkaban-sql/src/sql/update.project_properties.2.1.sql 3(+0 -3)

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index 8bc725f..8abbd61 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -17,6 +17,7 @@
 package azkaban.executor;
 
 public interface ConnectorParams {
+  public static final String EXECUTOR_ID_PARAM = "executorId";
   public static final String ACTION_PARAM = "action";
   public static final String EXECID_PARAM = "execid";
   public static final String SHAREDTOKEN_PARAM = "token";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
new file mode 100644
index 0000000..3050a8d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * Comparator implicitly used in priority queue for QueuedExecutions.
+ */
+public final class ExecutableFlowPriorityComparator implements
+  Comparator<Pair<ExecutionReference, ExecutableFlow>> {
+  private static Logger logger = Logger
+    .getLogger(ExecutableFlowPriorityComparator.class);
+
+  /**
+   * <pre>
+   * Sorting order is determined by:-
+   * 1. descending order of priority
+   * 2. if same priority, ascending order of update time
+   * 3. if same priority and updateTime, ascending order of execution id
+   * </pre>
+   *
+   * {@inheritDoc}
+   *
+   * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+   */
+  @Override
+  public int compare(Pair<ExecutionReference, ExecutableFlow> pair1,
+    Pair<ExecutionReference, ExecutableFlow> pair2) {
+    ExecutableFlow exflow1 = null, exflow2 = null;
+    if (pair1 != null && pair1.getSecond() != null) {
+      exflow1 = pair1.getSecond();
+    }
+    if (pair2 != null && pair2.getSecond() != null) {
+      exflow2 = pair2.getSecond();
+    }
+    if (exflow1 == null && exflow2 == null)
+      return 0;
+    else if (exflow1 == null)
+      return -1;
+    else if (exflow2 == null)
+      return 1;
+    else {
+      // descending order of priority
+      int diff = getPriority(exflow2) - getPriority(exflow1);
+      if (diff == 0) {
+        // ascending order of update time, if same priority
+        diff = (int) (exflow1.getUpdateTime() - exflow2.getUpdateTime());
+      }
+      if (diff == 0) {
+        // ascending order of execution id, if same priority and updateTime
+        diff = exflow1.getExecutionId() - exflow2.getExecutionId();
+      }
+      return diff;
+    }
+  }
+
+  /* Helper method to fetch flow priority from flow props */
+  private int getPriority(ExecutableFlow exflow) {
+    ExecutionOptions options = exflow.getExecutionOptions();
+    int priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+    if (options != null
+      && options.getFlowParameters() != null
+      && options.getFlowParameters()
+        .containsKey(ExecutionOptions.FLOW_PRIORITY)) {
+      try {
+        priority =
+          Integer.valueOf(options.getFlowParameters().get(
+            ExecutionOptions.FLOW_PRIORITY));
+      } catch (NumberFormatException ex) {
+        priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+        logger.error(
+          "Failed to parse flow priority for exec_id = "
+            + exflow.getExecutionId(), ex);
+      }
+    }
+    return priority;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index d4cb262..312be44 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -33,6 +33,8 @@ public class ExecutionOptions {
   public static final String CONCURRENT_OPTION_SKIP = "skip";
   public static final String CONCURRENT_OPTION_PIPELINE = "pipeline";
   public static final String CONCURRENT_OPTION_IGNORE = "ignore";
+  public static final String FLOW_PRIORITY = "flowPriority";
+  public static final int DEFAULT_FLOW_PRIORITY = 5;
 
   private static final String FLOW_PARAMETERS = "flowParameters";
   private static final String NOTIFY_ON_FIRST_FAILURE = "notifyOnFirstFailure";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index e314206..9d93476 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -18,16 +18,23 @@ package azkaban.executor;
 
 public class ExecutionReference {
   private final int execId;
-  private final String host;
-  private final int port;
+  private Executor executor;
   private long updateTime;
   private long nextCheckTime = -1;
   private int numErrors = 0;
 
-  public ExecutionReference(int execId, String host, int port) {
+
+  public ExecutionReference(int execId) {
+    this.execId = execId;
+  }
+
+  public ExecutionReference(int execId, Executor executor) {
+    if (executor == null) {
+      throw new IllegalArgumentException(String.format(
+        "Executor cannot be null for exec id: %d ExecutionReference", execId));
+    }
     this.execId = execId;
-    this.host = host;
-    this.port = port;
+    this.executor = executor;
   }
 
   public void setUpdateTime(long updateTime) {
@@ -51,11 +58,11 @@ public class ExecutionReference {
   }
 
   public String getHost() {
-    return host;
+    return executor.getHost();
   }
 
   public int getPort() {
-    return port;
+    return executor.getPort();
   }
 
   public int getNumErrors() {
@@ -65,4 +72,12 @@ public class ExecutionReference {
   public void setNumErrors(int numErrors) {
     this.numErrors = numErrors;
   }
-}
+
+  public void setExecutor(Executor executor) {
+    this.executor = executor;
+  }
+
+  public Executor getExecutor() {
+    return executor;
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 1bfa910..6120002 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -171,6 +171,50 @@ public interface ExecutorLoader {
   public void removeActiveExecutableReference(int execId)
       throws ExecutorManagerException;
 
+  /**
+   * <pre>
+   * Set an executor Id to an execution
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. throws an Exception in case executionId or executorId do not exist
+   * </pre>
+   *
+   * @param executorId
+   * @param execId
+   * @throws ExecutorManagerException
+   */
+  public void assignExecutor(int executorId, int execId)
+    throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetches an executor corresponding to a given execution
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found with the given executionId
+   * </pre>
+   *
+   * @param executionId
+   * @return fetched Executor
+   * @throws ExecutorManagerException
+   */
+  public Executor fetchExecutorByExecutionId(int executionId)
+    throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetch queued flows which have not yet dispatched
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return empty list when no queued execution is found
+   * </pre>
+   *
+   * @return List of queued flows and corresponding execution reference
+   * @throws ExecutorManagerException
+   */
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+    throws ExecutorManagerException;
+
   public boolean updateExecutableReference(int execId, long updateTime)
       throws ExecutorManagerException;
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c86648e..c97ef00 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -22,13 +22,17 @@ import java.lang.Thread.State;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.http.client.HttpClient;
@@ -59,10 +63,19 @@ import azkaban.utils.Props;
  */
 public class ExecutorManager extends EventHandler implements
     ExecutorManagerAdapter {
+  static final String AZKABAN_QUEUEPROCESSING_ENABLED =
+    "azkaban.queueprocessing.enabled";
+  static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
+    "azkaban.use.multiple.executors";
+  private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
+    "azkaban.webserver.queue.size";
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
+    "azkaban.activeexecutor.refresh.milisecinterval";
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW =
+    "azkaban.activeexecutor.refresh.flowinterval";
+
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
   private ExecutorLoader executorLoader;
-  private String executorHost;
-  private int executorPort;
 
   private CleanerThread cleanerThread;
 
@@ -71,8 +84,13 @@ public class ExecutorManager extends EventHandler implements
   private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
       new ConcurrentHashMap<Integer, ExecutableFlow>();
 
-  private ExecutingManagerUpdaterThread executingManager;
+  QueuedExecutions queuedFlows;
 
+  final private Set<Executor> activeExecutors = new HashSet<Executor>();
+  private QueueProcessorThread queueProcessor;
+
+  private ExecutingManagerUpdaterThread executingManager;
+  // 12 weeks
   private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
       * 24 * 60 * 60 * 1000l;
   private long lastCleanerThreadCheckTime = -1;
@@ -84,12 +102,16 @@ public class ExecutorManager extends EventHandler implements
 
   File cacheDir;
 
+  final Props azkProps;
+
   public ExecutorManager(Props props, ExecutorLoader loader,
       Map<String, Alerter> alters) throws ExecutorManagerException {
+    azkProps = props;
+
     this.executorLoader = loader;
+    this.setupExecutors();
     this.loadRunningFlows();
-    executorHost = props.getString("executor.host", "localhost");
-    executorPort = props.getInt("executor.port");
+    this.loadQueuedFlows();
 
     alerters = alters;
 
@@ -98,14 +120,139 @@ public class ExecutorManager extends EventHandler implements
     executingManager = new ExecutingManagerUpdaterThread();
     executingManager.start();
 
+    if(isMultiExecutorMode()) {
+      queueProcessor =
+        new QueueProcessorThread(azkProps.getBoolean(
+          AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
+          AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000),
+          azkProps
+            .getInt(AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW, 1000));
+      queueProcessor.start();
+    }
+
     long executionLogsRetentionMs =
-        props.getLong("execution.logs.retention.ms",
-            DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+      props.getLong("execution.logs.retention.ms",
+        DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+
+    queuedFlows =
+      new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
     cleanerThread.start();
 
   }
 
+  /**
+   * <pre>
+   * Setup activeExecutors using azkaban.properties and database executors
+   * Note:
+   * 1. If azkaban.use.multiple.executors is set true, this method will
+   *    load all active executors
+   * 2. In local mode, If a local executor is specified and it is missing from db,
+   *    this method add local executor as active in DB
+   * 3. In local mode, If a local executor is specified and it is marked inactive in db,
+   *    this method will convert local executor as active in DB
+   * </pre>
+   *
+   * @throws ExecutorManagerException
+   */
+  public void setupExecutors() throws ExecutorManagerException {
+    Set<Executor> newExecutors = new HashSet<Executor>();
+
+    if (isMultiExecutorMode()) {
+      logger.info("Initializing multi executors from database");
+      newExecutors.addAll(executorLoader.fetchActiveExecutors());
+    } else if (azkProps.containsKey("executor.port")) {
+      // Add local executor, if specified as per properties
+      String executorHost = azkProps.getString("executor.host", "localhost");
+      int executorPort = azkProps.getInt("executor.port");
+      logger.info(String.format("Initializing local executor %s:%d",
+        executorHost, executorPort));
+      Executor executor =
+        executorLoader.fetchExecutor(executorHost, executorPort);
+      if (executor == null) {
+        executor = executorLoader.addExecutor(executorHost, executorPort);
+      } else if (!executor.isActive()) {
+        executor.setActive(true);
+        executorLoader.updateExecutor(executor);
+      }
+      newExecutors.add(new Executor(executor.getId(), executorHost,
+        executorPort, true));
+    }
+
+    if (newExecutors.isEmpty()) {
+      throw new ExecutorManagerException("No active executor found");
+    } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+      throw new ExecutorManagerException("Multiple local executors specified");
+    } else {
+      // clear all active executors, only if we have at least one new active
+      // executors
+      activeExecutors.clear();
+      activeExecutors.addAll(newExecutors);
+    }
+  }
+
+  private boolean isMultiExecutorMode() {
+    return azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
+  }
+
+  /**
+   * Refresh Executor stats for all the actie executors in this executorManager
+   */
+  private void refreshExecutors() {
+    synchronized (activeExecutors) {
+      // TODO: rest api call to refresh executor stats
+    }
+  }
+
+  /**
+   * Disable flow dispatching in QueueProcessor
+   *
+   * @throws ExecutorManagerException
+   */
+  public void disableQueueProcessorThread() throws ExecutorManagerException {
+    if (isMultiExecutorMode()) {
+      queueProcessor.setActive(false);
+    } else {
+      throw new ExecutorManagerException(
+        "Cannot disable QueueProcessor in local mode");
+    }
+  }
+
+  /**
+   * Enable flow dispatching in QueueProcessor
+   *
+   * @throws ExecutorManagerException
+   */
+  public void enableQueueProcessorThread() throws ExecutorManagerException {
+    if (isMultiExecutorMode()) {
+      queueProcessor.setActive(true);
+    } else {
+      throw new ExecutorManagerException(
+        "Cannot enable QueueProcessor in local mode");
+    }
+  }
+
+  public State getQueueProcessorThreadState() {
+    if (isMultiExecutorMode())
+      return queueProcessor.getState();
+    else
+      return State.NEW; // not started in local mode
+  }
+
+  /**
+   * Returns state of QueueProcessor False, no flow is being dispatched True ,
+   * flows are being dispatched as expected
+   *
+   * @return
+   */
+  public boolean isQueueProcessorThreadActive() {
+    if (isMultiExecutorMode())
+      return queueProcessor.isActive();
+    else
+      return false;
+  }
+
   @Override
   public State getExecutorManagerThreadState() {
     return executingManager.getState();
@@ -130,10 +277,33 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
+  public Collection<Executor> getAllActiveExecutors() {
+    return Collections.unmodifiableCollection(activeExecutors);
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#fetchExecutor(int)
+   */
+  @Override
+  public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+    for (Executor executor : activeExecutors) {
+      if (executor.getId() == executorId) {
+        return executor;
+      }
+    }
+    return executorLoader.fetchExecutor(executorId);
+  }
+
+  @Override
   public Set<String> getPrimaryServerHosts() {
     // Only one for now. More probably later.
     HashSet<String> ports = new HashSet<String>();
-    ports.add(executorHost + ":" + executorPort);
+    for (Executor executor : activeExecutors) {
+      ports.add(executor.getHost() + ":" + executor.getPort());
+    }
     return ports;
   }
 
@@ -141,11 +311,8 @@ public class ExecutorManager extends EventHandler implements
   public Set<String> getAllActiveExecutorServerHosts() {
     // Includes non primary server/hosts
     HashSet<String> ports = new HashSet<String>();
-    ports.add(executorHost + ":" + executorPort);
-    for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
-        .values()) {
-      ExecutionReference ref = running.getFirst();
-      ports.add(ref.getHost() + ":" + ref.getPort());
+    for (Executor executor : activeExecutors) {
+      ports.add(executor.getHost() + ":" + executor.getPort());
     }
 
     return ports;
@@ -153,59 +320,194 @@ public class ExecutorManager extends EventHandler implements
 
   private void loadRunningFlows() throws ExecutorManagerException {
     runningFlows.putAll(executorLoader.fetchActiveFlows());
+    // Finalize all flows which were running on an executor which is now
+    // inactive
+    for (Pair<ExecutionReference, ExecutableFlow> pair : runningFlows.values()) {
+      if (!activeExecutors.contains(pair.getFirst().getExecutor())) {
+        finalizeFlows(pair.getSecond());
+      }
+    }
+  }
+
+  /*
+   * load queued flows i.e with active_execution_reference and not assigned to
+   * any executor
+   */
+  private void loadQueuedFlows() throws ExecutorManagerException {
+    for (Pair<ExecutionReference, ExecutableFlow> pair : executorLoader
+      .fetchQueuedFlows()) {
+      queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+    }
   }
 
+  /**
+   * Gets a list of all the active (running flows and non-dispatched flows)
+   * executions for a given project and flow {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
+   *      java.lang.String)
+   */
   @Override
   public List<Integer> getRunningFlows(int projectId, String flowId) {
-    ArrayList<Integer> executionIds = new ArrayList<Integer>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+    List<Integer> executionIds = new ArrayList<Integer>();
+    executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+      queuedFlows.getAllEntries()));
+    executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+      runningFlows.values()));
+    return executionIds;
+  }
+
+  /* Helper method for getRunningFlows */
+  private List<Integer> getRunningFlowsHelper(int projectId, String flowId,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    List<Integer> executionIds = new ArrayList<Integer>();
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       if (ref.getSecond().getFlowId().equals(flowId)
-          && ref.getSecond().getProjectId() == projectId) {
+        && ref.getSecond().getProjectId() == projectId) {
         executionIds.add(ref.getFirst().getExecId());
       }
     }
     return executionIds;
   }
 
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
+   */
+  @Override
+  public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+    throws IOException {
+    List<Pair<ExecutableFlow, Executor>> flows =
+      new ArrayList<Pair<ExecutableFlow, Executor>>();
+    getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
+    getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
+    return flows;
+  }
+
+  /* Helper method for getActiveFlowsWithExecutor */
+  private void getActiveFlowsWithExecutorHelper(
+    List<Pair<ExecutableFlow, Executor>> flows,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+      flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
+        .getFirst().getExecutor()));
+    }
+  }
+
+  /**
+   * Checks whether the given flow has an active (running, non-dispatched)
+   * executions {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int,
+   *      java.lang.String)
+   */
   @Override
   public boolean isFlowRunning(int projectId, String flowId) {
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+    boolean isRunning = false;
+    isRunning =
+      isRunning
+        || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
+    isRunning =
+      isRunning
+        || isFlowRunningHelper(projectId, flowId, runningFlows.values());
+    return isRunning;
+  }
+
+  /* Search a running flow in a collection */
+  private boolean isFlowRunningHelper(int projectId, String flowId,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       if (ref.getSecond().getProjectId() == projectId
-          && ref.getSecond().getFlowId().equals(flowId)) {
+        && ref.getSecond().getFlowId().equals(flowId)) {
         return true;
       }
     }
     return false;
   }
 
+  /**
+   * Fetch ExecutableFlow from an active (running, non-dispatched) or from
+   * database {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getExecutableFlow(int)
+   */
   @Override
   public ExecutableFlow getExecutableFlow(int execId)
-      throws ExecutorManagerException {
-    Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
-    if (active == null) {
+    throws ExecutorManagerException {
+    if (runningFlows.containsKey(execId)) {
+      return runningFlows.get(execId).getSecond();
+    } else if (queuedFlows.hasExecution(execId)) {
+      return queuedFlows.getFlow(execId);
+    } else {
       return executorLoader.fetchExecutableFlow(execId);
     }
-    return active.getSecond();
   }
 
+  /**
+   * Get all active (running, non-dispatched) flows
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+   */
   @Override
   public List<ExecutableFlow> getRunningFlows() {
     ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+    getActiveFlowHelper(flows, queuedFlows.getAllEntries());
+    getActiveFlowHelper(flows, runningFlows.values());
+    return flows;
+  }
+
+  /*
+   * Helper method to get all running flows from a Pair<ExecutionReference,
+   * ExecutableFlow collection
+   */
+  private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       flows.add(ref.getSecond());
     }
-    return flows;
   }
 
+  /**
+   * Get execution Ids of all active (running, non-dispatched) flows
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+   */
   public String getRunningFlowIds() {
     List<Integer> allIds = new ArrayList<Integer>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
-      allIds.add(ref.getSecond().getExecutionId());
-    }
+    getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
+    getRunningFlowsIdsHelper(allIds, runningFlows.values());
+    Collections.sort(allIds);
+    return allIds.toString();
+  }
+
+  /**
+   * Get execution Ids of all non-dispatched flows
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+   */
+  public String getQueuedFlowIds() {
+    List<Integer> allIds = new ArrayList<Integer>();
+    getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
     Collections.sort(allIds);
     return allIds.toString();
   }
 
+  /* Helper method to flow ids of all running flows */
+  private void getRunningFlowsIdsHelper(List<Integer> allIds,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+      allIds.add(ref.getSecond().getExecutionId());
+    }
+  }
+
   public List<ExecutableFlow> getRecentlyFinishedFlows() {
     return new ArrayList<ExecutableFlow>(recentlyFinished.values());
   }
@@ -371,18 +673,30 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
+  /**
+   * if flows was dispatched to an executor, cancel by calling Executor else if
+   * flow is still in queue, remove from queue and finalize {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#cancelFlow(azkaban.executor.ExecutableFlow,
+   *      java.lang.String)
+   */
   @Override
   public void cancelFlow(ExecutableFlow exFlow, String userId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> pair =
+      if (runningFlows.containsKey(exFlow.getExecutionId())) {
+        Pair<ExecutionReference, ExecutableFlow> pair =
           runningFlows.get(exFlow.getExecutionId());
-      if (pair == null) {
+        callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
+          userId);
+      } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
+        queuedFlows.dequeue(exFlow.getExecutionId());
+        finalizeFlows(exFlow);
+      } else {
         throw new ExecutorManagerException("Execution "
-            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
-            + " isn't running.");
+          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+          + " isn't running.");
       }
-      callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION, userId);
     }
   }
 
@@ -539,80 +853,97 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public String submitExecutableFlow(ExecutableFlow exflow, String userId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     synchronized (exflow) {
-      logger.info("Submitting execution flow " + exflow.getFlowId() + " by "
-          + userId);
-
-      int projectId = exflow.getProjectId();
       String flowId = exflow.getFlowId();
-      exflow.setSubmitUser(userId);
-      exflow.setSubmitTime(System.currentTimeMillis());
-
-      List<Integer> running = getRunningFlows(projectId, flowId);
 
-      ExecutionOptions options = exflow.getExecutionOptions();
-      if (options == null) {
-        options = new ExecutionOptions();
-      }
+      logger.info("Submitting execution flow " + flowId + " by " + userId);
 
       String message = "";
-      if (options.getDisabledJobs() != null) {
-        applyDisabledJobs(options.getDisabledJobs(), exflow);
-      }
+      if (queuedFlows.isFull()) {
+        message =
+          String
+            .format(
+              "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
+              flowId, exflow.getProjectName());
+        logger.error(message);
+      } else {
+        int projectId = exflow.getProjectId();
+        exflow.setSubmitUser(userId);
+        exflow.setSubmitTime(System.currentTimeMillis());
+
+        List<Integer> running = getRunningFlows(projectId, flowId);
+
+        ExecutionOptions options = exflow.getExecutionOptions();
+        if (options == null) {
+          options = new ExecutionOptions();
+        }
+
+        if (options.getDisabledJobs() != null) {
+          applyDisabledJobs(options.getDisabledJobs(), exflow);
+        }
 
-      if (!running.isEmpty()) {
-        if (options.getConcurrentOption().equals(
+        if (!running.isEmpty()) {
+          if (options.getConcurrentOption().equals(
             ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
-          Collections.sort(running);
-          Integer runningExecId = running.get(running.size() - 1);
+            Collections.sort(running);
+            Integer runningExecId = running.get(running.size() - 1);
 
-          options.setPipelineExecutionId(runningExecId);
-          message =
+            options.setPipelineExecutionId(runningExecId);
+            message =
               "Flow " + flowId + " is already running with exec id "
-                  + runningExecId + ". Pipelining level "
-                  + options.getPipelineLevel() + ". \n";
-        } else if (options.getConcurrentOption().equals(
+                + runningExecId + ". Pipelining level "
+                + options.getPipelineLevel() + ". \n";
+          } else if (options.getConcurrentOption().equals(
             ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
-          throw new ExecutorManagerException("Flow " + flowId
+            throw new ExecutorManagerException("Flow " + flowId
               + " is already running. Skipping execution.",
               ExecutorManagerException.Reason.SkippedExecution);
-        } else {
-          // The settings is to run anyways.
-          message =
+          } else {
+            // The settings is to run anyways.
+            message =
               "Flow " + flowId + " is already running with exec id "
-                  + StringUtils.join(running, ",")
-                  + ". Will execute concurrently. \n";
+                + StringUtils.join(running, ",")
+                + ". Will execute concurrently. \n";
+          }
         }
-      }
 
-      boolean memoryCheck = !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
-              ProjectWhitelist.WhitelistType.MemoryCheck);
-      options.setMemoryCheck(memoryCheck);
-
-      // The exflow id is set by the loader. So it's unavailable until after
-      // this call.
-      executorLoader.uploadExecutableFlow(exflow);
-
-      // We create an active flow reference in the datastore. If the upload
-      // fails, we remove the reference.
-      ExecutionReference reference =
-          new ExecutionReference(exflow.getExecutionId(), executorHost,
-              executorPort);
-      executorLoader.addActiveExecutableReference(reference);
-      try {
-        callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
-        runningFlows.put(exflow.getExecutionId(),
-            new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+        boolean memoryCheck =
+          !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+            ProjectWhitelist.WhitelistType.MemoryCheck);
+        options.setMemoryCheck(memoryCheck);
+
+        // The exflow id is set by the loader. So it's unavailable until after
+        // this call.
+        executorLoader.uploadExecutableFlow(exflow);
 
+        // We create an active flow reference in the datastore. If the upload
+        // fails, we remove the reference.
+        ExecutionReference reference =
+          new ExecutionReference(exflow.getExecutionId());
+
+        if (isMultiExecutorMode()) {
+          //Take MultiExecutor route
+          executorLoader.addActiveExecutableReference(reference);
+          queuedFlows.enqueue(exflow, reference);
+        } else {
+          // assign only local executor we have
+          reference.setExecutor(activeExecutors.iterator().next());
+          executorLoader.addActiveExecutableReference(reference);
+          try {
+            callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+            runningFlows.put(exflow.getExecutionId(),
+              new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+          } catch (ExecutorManagerException e) {
+            executorLoader.removeActiveExecutableReference(reference
+              .getExecId());
+            throw e;
+          }
+        }
         message +=
-            "Execution submitted successfully with exec id "
-                + exflow.getExecutionId();
-      } catch (ExecutorManagerException e) {
-        executorLoader.removeActiveExecutableReference(reference.getExecId());
-        throw e;
+          "Execution submitted successfully with exec id "
+            + exflow.getExecutionId();
       }
-
       return message;
     }
   }
@@ -725,13 +1056,20 @@ public class ExecutorManager extends EventHandler implements
   /**
    * Manage servlet call for stats servlet in Azkaban execution server
    * {@inheritDoc}
-   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String, azkaban.utils.Pair[])
+   *
+   * @throws ExecutorManagerException
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
+   *      azkaban.utils.Pair[])
    */
   @Override
-  public Map<String, Object> callExecutorStats(String action, Pair<String, String>... params) throws IOException {
+  public Map<String, Object> callExecutorStats(int executorId, String action,
+    Pair<String, String>... params) throws IOException, ExecutorManagerException {
 
     URIBuilder builder = new URIBuilder();
-    builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
+    Executor executor = fetchExecutor(executorId);
+    builder.setScheme("http").setHost(executor.getHost())
+      .setPort(executor.getPort()).setPath("/stats");
 
     builder.setParameter(ConnectorParams.ACTION_PARAM, action);
 
@@ -763,7 +1101,7 @@ public class ExecutorManager extends EventHandler implements
 
     @SuppressWarnings("unchecked")
     Map<String, Object> jsonResponse =
-        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+      (Map<String, Object>) JSONUtils.parseJSONFromString(response);
 
     return jsonResponse;
   }
@@ -815,6 +1153,9 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public void shutdown() {
+    if (isMultiExecutorMode()) {
+      queueProcessor.shutdown();
+    }
     executingManager.shutdown();
   }
 
@@ -846,7 +1187,7 @@ public class ExecutorManager extends EventHandler implements
           lastThreadCheckTime = System.currentTimeMillis();
           updaterStage = "Starting update all flows.";
 
-          Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
+          Map<Executor, List<ExecutableFlow>> exFlowMap =
               getFlowToExecutorMap();
           ArrayList<ExecutableFlow> finishedFlows =
               new ArrayList<ExecutableFlow>();
@@ -854,16 +1195,16 @@ public class ExecutorManager extends EventHandler implements
               new ArrayList<ExecutableFlow>();
 
           if (exFlowMap.size() > 0) {
-            for (Map.Entry<ConnectionInfo, List<ExecutableFlow>> entry : exFlowMap
+            for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
                 .entrySet()) {
               List<Long> updateTimesList = new ArrayList<Long>();
               List<Integer> executionIdsList = new ArrayList<Integer>();
 
-              ConnectionInfo connection = entry.getKey();
+              Executor executor = entry.getKey();
 
               updaterStage =
-                  "Starting update flows on " + connection.getHost() + ":"
-                      + connection.getPort();
+                  "Starting update flows on " + executor.getHost() + ":"
+                      + executor.getPort();
 
               // We pack the parameters of the same host together before we
               // query.
@@ -881,8 +1222,8 @@ public class ExecutorManager extends EventHandler implements
               Map<String, Object> results = null;
               try {
                 results =
-                    callExecutorServer(connection.getHost(),
-                        connection.getPort(), ConnectorParams.UPDATE_ACTION,
+                    callExecutorServer(executor.getHost(),
+                      executor.getPort(), ConnectorParams.UPDATE_ACTION,
                         null, null, executionIds, updateTimes);
               } catch (IOException e) {
                 logger.error(e);
@@ -1222,15 +1563,16 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-  private Map<ConnectionInfo, List<ExecutableFlow>> getFlowToExecutorMap() {
-    HashMap<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
-        new HashMap<ConnectionInfo, List<ExecutableFlow>>();
+  /* Group Executable flow by Executors to reduce number of REST calls */
+  private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
+    HashMap<Executor, List<ExecutableFlow>> exFlowMap =
+      new HashMap<Executor, List<ExecutableFlow>>();
 
-    ConnectionInfo lastPort = new ConnectionInfo(executorHost, executorPort);
     for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
-        .values()) {
+      .values()) {
       ExecutionReference ref = runningFlow.getFirst();
       ExecutableFlow flow = runningFlow.getSecond();
+      Executor executor = ref.getExecutor();
 
       // We can set the next check time to prevent the checking of certain
       // flows.
@@ -1238,16 +1580,10 @@ public class ExecutorManager extends EventHandler implements
         continue;
       }
 
-      // Just a silly way to reduce object creation construction of objects
-      // since it's most likely that the values will be the same.
-      if (!lastPort.isEqual(ref.getHost(), ref.getPort())) {
-        lastPort = new ConnectionInfo(ref.getHost(), ref.getPort());
-      }
-
-      List<ExecutableFlow> flows = exFlowMap.get(lastPort);
+      List<ExecutableFlow> flows = exFlowMap.get(executor);
       if (flows == null) {
         flows = new ArrayList<ExecutableFlow>();
-        exFlowMap.put(lastPort, flows);
+        exFlowMap.put(executor, flows);
       }
 
       flows.add(flow);
@@ -1256,61 +1592,6 @@ public class ExecutorManager extends EventHandler implements
     return exFlowMap;
   }
 
-  private static class ConnectionInfo {
-    private String host;
-    private int port;
-
-    public ConnectionInfo(String host, int port) {
-      this.host = host;
-      this.port = port;
-    }
-
-    @SuppressWarnings("unused")
-    private ConnectionInfo getOuterType() {
-      return ConnectionInfo.this;
-    }
-
-    public boolean isEqual(String host, int port) {
-      return this.port == port && this.host.equals(host);
-    }
-
-    public String getHost() {
-      return host;
-    }
-
-    public int getPort() {
-      return port;
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((host == null) ? 0 : host.hashCode());
-      result = prime * result + port;
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      ConnectionInfo other = (ConnectionInfo) obj;
-      if (host == null) {
-        if (other.host != null)
-          return false;
-      } else if (!host.equals(other.host))
-        return false;
-      if (port != other.port)
-        return false;
-      return true;
-    }
-  }
-
   @Override
   public int getExecutableFlows(int projectId, String flowId, int from,
       int length, List<ExecutableFlow> outputList)
@@ -1384,4 +1665,182 @@ public class ExecutorManager extends EventHandler implements
           - executionLogsRetentionMs);
     }
   }
-}
+
+  /*
+   * This thread is responsible for processing queued flows using dispatcher and
+   * making rest api calls to executor server
+   */
+  private class QueueProcessorThread extends Thread {
+    private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
+    private static final int MAX_DISPATCHING_ERRORS_PERMITTED = 5;
+    private final long activeExecutorRefreshWindowInMilisec;
+    private final int activeExecutorRefreshWindowInFlows;
+
+    private volatile boolean shutdown = false;
+    private volatile boolean isActive = true;
+
+    public QueueProcessorThread(boolean isActive,
+      long activeExecutorRefreshWindowInTime,
+      int activeExecutorRefreshWindowInFlows) {
+      setActive(isActive);
+      this.activeExecutorRefreshWindowInFlows =
+        activeExecutorRefreshWindowInFlows;
+      this.activeExecutorRefreshWindowInMilisec =
+        activeExecutorRefreshWindowInTime;
+      this.setName("AzkabanWebServer-QueueProcessor-Thread");
+    }
+
+    public void setActive(boolean isActive) {
+      this.isActive = isActive;
+      logger.info("QueueProcessorThread active turned " + this.isActive);
+    }
+
+    public boolean isActive() {
+      return isActive;
+    }
+
+    public void shutdown() {
+      shutdown = true;
+      this.interrupt();
+    }
+
+    public void run() {
+      // Loops till QueueProcessorThread is shutdown
+      while (!shutdown) {
+        synchronized (this) {
+          try {
+            // start processing queue if active, other wait for sometime
+            if (isActive) {
+              processQueuedFlows(activeExecutorRefreshWindowInMilisec,
+                activeExecutorRefreshWindowInFlows);
+            }
+            wait(QUEUE_PROCESSOR_WAIT_IN_MS);
+          } catch (Exception e) {
+            logger.error(
+              "QueueProcessorThread Interrupted. Probably to shut down.", e);
+          }
+        }
+      }
+    }
+
+    /* Method responsible for processing the non-dispatched flows */
+    private void processQueuedFlows(long activeExecutorsRefreshWindow,
+      int maxContinuousFlowProcessed) throws InterruptedException,
+      ExecutorManagerException {
+      long lastExecutorRefreshTime = System.currentTimeMillis();
+      Pair<ExecutionReference, ExecutableFlow> runningCandidate;
+      int currentContinuousFlowProcessed = 0;
+
+      while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
+        ExecutionReference reference = runningCandidate.getFirst();
+        ExecutableFlow exflow = runningCandidate.getSecond();
+
+        long currentTime = System.currentTimeMillis();
+
+        // if we have dispatched more than maxContinuousFlowProcessed or
+        // It has been more then activeExecutorsRefreshWindow millisec since we
+        // refreshed
+        if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
+          || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
+          // Refresh executorInfo for all activeExecutors
+          refreshExecutors();
+          lastExecutorRefreshTime = currentTime;
+          currentContinuousFlowProcessed = 0;
+        }
+
+        // process flow with current snapshot of activeExecutors
+        processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+        currentContinuousFlowProcessed++;
+      }
+    }
+
+    /* process flow with a snapshot of available Executors */
+    private void processFlow(ExecutionReference reference,
+      ExecutableFlow exflow, Set<Executor> availableExecutors)
+      throws ExecutorManagerException {
+      synchronized (exflow) {
+        Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
+        if (selectedExecutor != null) {
+          try {
+            dispatch(reference, exflow, selectedExecutor);
+          } catch (ExecutorManagerException e) {
+            logger.warn(String.format(
+              "Executor %s responded with exception for exec: %d",
+              selectedExecutor, exflow.getExecutionId()), e);
+            handleDispatchExceptionCase(reference, exflow, selectedExecutor,
+              availableExecutors);
+          }
+        } else {
+          handleNoExecutorSelectedCase(reference, exflow);
+        }
+      }
+    }
+
+    /* Choose Executor for exflow among the available executors */
+    private Executor selectExecutor(ExecutableFlow exflow,
+      Set<Executor> availableExecutors) {
+      Executor choosenExecutor;
+      // TODO: use dispatcher
+      choosenExecutor = availableExecutors.iterator().next();
+      return choosenExecutor;
+    }
+
+    private void handleDispatchExceptionCase(ExecutionReference reference,
+      ExecutableFlow exflow, Executor lastSelectedExecutor,
+      Set<Executor> remainingExecutors) throws ExecutorManagerException {
+      logger
+        .info(String
+          .format(
+            "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+            exflow.getExecutionId(), reference.getNumErrors()));
+      reference.setNumErrors(reference.getNumErrors() + 1);
+      reference.setExecutor(null);
+      if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
+        || remainingExecutors.size() <= 1) {
+        logger.error("Failed to process queued flow");
+        finalizeFlows(exflow);
+      } else {
+        remainingExecutors.remove(lastSelectedExecutor);
+        // try other executors except chosenExecutor
+        processFlow(reference, exflow, remainingExecutors);
+      }
+    }
+
+    private void handleNoExecutorSelectedCase(ExecutionReference reference,
+      ExecutableFlow exflow) throws ExecutorManagerException {
+      logger
+      .info(String
+        .format(
+          "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
+          exflow.getExecutionId(), reference.getNumErrors()));
+      reference.setNumErrors(reference.getNumErrors() + 1);
+      // Scenario: when dispatcher didn't assigned any executor
+      if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED) {
+        finalizeFlows(exflow);
+      } else {
+        // again queue this flow
+        queuedFlows.enqueue(exflow, reference);
+      }
+    }
+
+    private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+      Executor choosenExecutor) throws ExecutorManagerException {
+      exflow.setUpdateTime(System.currentTimeMillis());
+
+      // to be moved after db update once we integrate rest api changes
+      reference.setExecutor(choosenExecutor);
+      // TODO: ADD rest call to do an actual dispatch
+      callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+      executorLoader.assignExecutor(exflow.getExecutionId(),
+        choosenExecutor.getId());
+
+      // move from flow to running flows
+      runningFlows.put(exflow.getExecutionId(),
+        new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+      logger.info(String.format(
+        "Successfully dispatched exec %d with error count %d",
+        exflow.getExecutionId(), reference.getNumErrors()));
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 10379ec..2b47293 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -18,6 +18,7 @@ package azkaban.executor;
 
 import java.io.IOException;
 import java.lang.Thread.State;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -88,6 +89,18 @@ public interface ExecutorManagerAdapter {
 
   public List<ExecutableFlow> getRunningFlows() throws IOException;
 
+  /**
+   * <pre>
+   * Returns All running with executors and queued flows
+   * Note, returns empty list if there isn't any running or queued flows
+   * </pre>
+   *
+   * @return
+   * @throws IOException
+   */
+  public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+    throws IOException;
+
   public List<ExecutableFlow> getRecentlyFinishedFlows();
 
   public List<ExecutableFlow> getExecutableFlows(Project project,
@@ -177,9 +190,10 @@ public interface ExecutorManagerAdapter {
    * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li>
    * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li>
    * </ul>
+   * @throws ExecutorManagerException
    */
-  public Map<String, Object> callExecutorStats(String action,
-      Pair<String, String>... params) throws IOException;
+  public Map<String, Object> callExecutorStats(int executorId, String action,
+    Pair<String, String>... param) throws IOException, ExecutorManagerException;
 
   public Map<String, Object> callExecutorJMX(String hostPort, String action,
       String mBean) throws IOException;
@@ -196,4 +210,25 @@ public interface ExecutorManagerAdapter {
 
   public Set<? extends String> getPrimaryServerHosts();
 
+  /**
+   * Returns a collection of all the active executors maintained by active
+   * executors
+   *
+   * @return
+   */
+  public Collection<Executor> getAllActiveExecutors();
+
+  /**
+   * <pre>
+   * Fetch executor from executors with a given executorId
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found with the given executorId
+   * </pre>
+   *
+   * @throws ExecutorManagerException
+   *
+   */
+  public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 2a1cf26..7740163 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -180,6 +180,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+  /**
+   *
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorLoader#fetchQueuedFlows()
+   */
+  @Override
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+    throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
+
+    try {
+      List<Pair<ExecutionReference, ExecutableFlow>> flows =
+        runner.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
+          flowHandler);
+      return flows;
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error fetching active flows", e);
+    }
+  }
+
   @Override
   public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException {
@@ -383,12 +404,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       throws ExecutorManagerException {
     final String INSERT =
         "INSERT INTO active_executing_flows "
-            + "(exec_id, host, port, update_time) values (?,?,?,?)";
+            + "(exec_id, update_time) values (?,?)";
     QueryRunner runner = createQueryRunner();
 
     try {
-      runner.update(INSERT, reference.getExecId(), reference.getHost(),
-          reference.getPort(), reference.getUpdateTime());
+      runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
     } catch (SQLException e) {
       throw new ExecutorManagerException(
           "Error updating active flow reference " + reference.getExecId(), e);
@@ -976,6 +996,65 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return events;
   }
 
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#assignExecutor(int, int)
+   */
+  @Override
+  public void assignExecutor(int executorId, int executionId)
+    throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE execution_flows SET executor_id=? where exec_id=?";
+
+    QueryRunner runner = createQueryRunner();
+    try {
+      Executor executor = fetchExecutor(executorId);
+      if (executor == null) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to assign non-existent executor Id: %d to execution : %d  ",
+          executorId, executionId));
+      }
+
+      int rows = runner.update(UPDATE, executorId, executionId);
+      if (rows == 0) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to assign executor Id: %d to non-existent execution : %d  ",
+          executorId, executionId));
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error updating executor id "
+        + executorId, e);
+    }
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecutionId(int)
+   */
+  @Override
+  public Executor fetchExecutorByExecutionId(int executionId)
+    throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+    Executor executor = null;
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
+          executorHandler, executionId);
+      if (executors.size() > 0) {
+        executor = executors.get(0);
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException(
+        "Error fetching executor for exec_id : " + executionId, e);
+    }
+    return executor;
+  }
+
   private static class LastInsertID implements ResultSetHandler<Long> {
     private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
 
@@ -1178,13 +1257,80 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+  /**
+   * JDBC ResultSetHandler to fetch queued executions
+   */
+  private static class FetchQueuedExecutableFlows implements
+    ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+    // Select queued unassigned flows
+    private static String FETCH_QUEUED_EXECUTABLE_FLOW =
+      "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, "
+        + " ax.update_time axUpdateTime FROM execution_flows ex"
+        + " INNER JOIN"
+        + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+        + " Where ex.executor_id is NULL";
+
+    @Override
+    public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+      throws SQLException {
+      if (!rs.next()) {
+        return Collections
+          .<Pair<ExecutionReference, ExecutableFlow>> emptyList();
+      }
+
+      List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+        new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+      do {
+        int id = rs.getInt(1);
+        int encodingType = rs.getInt(2);
+        byte[] data = rs.getBytes(3);
+        long updateTime = rs.getLong(4);
+
+        if (data == null) {
+          logger.error("Found a flow with empty data blob exec_id: " + id);
+        } else {
+          EncodingType encType = EncodingType.fromInteger(encodingType);
+          Object flowObj;
+          try {
+            // Convoluted way to inflate strings. Should find common package or
+            // helper function.
+            if (encType == EncodingType.GZIP) {
+              // Decompress the sucker.
+              String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              String jsonString = new String(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+
+            ExecutableFlow exFlow =
+              ExecutableFlow.createExecutableFlowFromObject(flowObj);
+            ExecutionReference ref = new ExecutionReference(id);
+            ref.setUpdateTime(updateTime);
+
+            execFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref,
+              exFlow));
+          } catch (IOException e) {
+            throw new SQLException("Error retrieving flow data " + id, e);
+          }
+        }
+      } while (rs.next());
+
+      return execFlows;
+    }
+  }
+
   private static class FetchActiveExecutableFlows implements
       ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
+    // Select running and executor assigned flows
     private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
-        "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data "
-            + "flow_data, ax.host host, ax.port port, ax.update_time "
-            + "axUpdateTime " + "FROM execution_flows ex "
-            + "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
+      "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+        + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+        + " FROM execution_flows ex"
+        + " INNER JOIN "
+        + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+        + " INNER JOIN "
+        + " executors et ON ex.executor_id = et.id";
 
     @Override
     public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
@@ -1203,6 +1349,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         String host = rs.getString(4);
         int port = rs.getInt(5);
         long updateTime = rs.getLong(6);
+        int executorId = rs.getInt(7);
+        boolean executorStatus = rs.getBoolean(8);
 
         if (data == null) {
           execFlows.put(id, null);
@@ -1223,7 +1371,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
             ExecutableFlow exFlow =
                 ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            ExecutionReference ref = new ExecutionReference(id, host, port);
+            Executor executor = new Executor(executorId, host, port, executorStatus);
+            ExecutionReference ref = new ExecutionReference(id, executor);
             ref.setUpdateTime(updateTime);
 
             execFlows.put(id, new Pair<ExecutionReference, ExecutableFlow>(ref,
@@ -1309,6 +1458,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
     private static String NUM_JOB_EXECUTIONS =
         "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
+    private static String FETCH_EXECUTOR_ID =
+        "SELECT executor_id FROM execution_flows WHERE exec_id=?";
 
     @Override
     public Integer handle(ResultSet rs) throws SQLException {
@@ -1351,6 +1502,10 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       "SELECT id, host, port, active FROM executors where id=?";
     private static String FETCH_EXECUTOR_BY_HOST_PORT =
       "SELECT id, host, port, active FROM executors where host=? AND port=?";
+    private static String FETCH_EXECUTION_EXECUTOR =
+      "SELECT ex.id, ex.host, ex.port, ex.active FROM "
+        + " executors ex INNER JOIN execution_flows ef "
+        + "on ex.id = ef.executor_id  where exec_id=?";
 
     @Override
     public List<Executor> handle(ResultSet rs) throws SQLException {
diff --git a/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
new file mode 100644
index 0000000..641ffae
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
@@ -0,0 +1,201 @@
+package azkaban.executor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * <pre>
+ * Composite data structure to represent non-dispatched flows in webserver.
+ * This data structure wraps a blocking queue and a concurrent hashmap.
+ * </pre>
+ */
+public class QueuedExecutions {
+  private static Logger logger = Logger.getLogger(QueuedExecutions.class);
+  final long capacity;
+
+  /* map to easily access queued flows */
+  final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap;
+  /* actual queue */
+  final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlowList;
+
+  public QueuedExecutions(long capacity) {
+    this.capacity = capacity;
+    queuedFlowMap =
+      new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+    queuedFlowList =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+  }
+
+  /**
+   * Wraps BoundedQueue Take method to have a corresponding update in
+   * queuedFlowMap lookup table
+   *
+   * @return
+   * @throws InterruptedException
+   */
+  public Pair<ExecutionReference, ExecutableFlow> fetchHead()
+    throws InterruptedException {
+    Pair<ExecutionReference, ExecutableFlow> pair = queuedFlowList.take();
+    if (pair != null && pair.getFirst() != null) {
+      queuedFlowMap.remove(pair.getFirst().getExecId());
+    }
+    return pair;
+  }
+
+  /**
+   * Helper method to have a single point of deletion in the queued flows
+   *
+   * @param executionId
+   */
+  public void dequeue(int executionId) {
+    if (queuedFlowMap.containsKey(executionId)) {
+      queuedFlowList.remove(queuedFlowMap.get(executionId));
+      queuedFlowMap.remove(executionId);
+    }
+  }
+
+  /**
+   * <pre>
+   * Helper method to have a single point of insertion in the queued flows
+   *
+   * @param exflow
+   *          flow to be enqueued
+   * @param ref
+   *          reference to be enqueued
+   * @throws ExecutorManagerException
+   *           case 1: if blocking queue put method fails due to
+   *           InterruptedException
+   *           case 2: if there already an element with
+   *           same execution Id
+   * </pre>
+   */
+  public void enqueue(ExecutableFlow exflow, ExecutionReference ref)
+    throws ExecutorManagerException {
+    if (hasExecution(exflow.getExecutionId())) {
+      String errMsg = "Flow already in queue " + exflow.getExecutionId();
+      throw new ExecutorManagerException(errMsg);
+    }
+
+    Pair<ExecutionReference, ExecutableFlow> pair =
+      new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
+    try {
+      queuedFlowMap.put(exflow.getExecutionId(), pair);
+      queuedFlowList.put(pair);
+    } catch (InterruptedException e) {
+      String errMsg = "Failed to insert flow " + exflow.getExecutionId();
+      logger.error(errMsg, e);
+      throw new ExecutorManagerException(errMsg);
+    }
+  }
+
+  /**
+   * <pre>
+   * Enqueues all the elements of a collection
+   *
+   * @param collection
+   *
+   * @throws ExecutorManagerException
+   *           case 1: if blocking queue put method fails due to
+   *           InterruptedException
+   *           case 2: if there already an element with
+   *           same execution Id
+   * </pre>
+   */
+  public void enqueueAll(
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection)
+    throws ExecutorManagerException {
+    for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
+      enqueue(pair.getSecond(), pair.getFirst());
+    }
+  }
+
+  /**
+   * Returns a read only collection of all the queued (flows, reference) pairs
+   *
+   * @return
+   */
+  public Collection<Pair<ExecutionReference, ExecutableFlow>> getAllEntries() {
+    return Collections.unmodifiableCollection(queuedFlowMap.values());
+  }
+
+  /**
+   * Checks if an execution is queued or not
+   *
+   * @param executionId
+   * @return
+   */
+  public boolean hasExecution(int executionId) {
+    return queuedFlowMap.containsKey(executionId);
+  }
+
+  /**
+   * Fetch flow for an execution. Returns null, if execution not in queue
+   *
+   * @param executionId
+   * @return
+   */
+  public ExecutableFlow getFlow(int executionId) {
+    if (hasExecution(executionId)) {
+      return queuedFlowMap.get(executionId).getSecond();
+    }
+    return null;
+  }
+
+  /**
+   * Fetch Activereference for an execution. Returns null, if execution not in
+   * queue
+   *
+   * @param executionId
+   * @return
+   */
+  public ExecutionReference getReference(int executionId) {
+    if (hasExecution(executionId)) {
+      return queuedFlowMap.get(executionId).getFirst();
+    }
+    return null;
+  }
+
+  /**
+   * Size of the queue
+   *
+   * @return
+   */
+  public long size() {
+    return queuedFlowList.size();
+  }
+
+  /**
+   * Verify, if queue is full as per initialized capacity
+   *
+   * @return
+   */
+  public boolean isFull() {
+    return size() >= capacity;
+  }
+
+  /**
+   * Verify, if queue is empty or not
+   *
+   * @return
+   */
+  public boolean isEmpty() {
+    return queuedFlowList.isEmpty() && queuedFlowMap.isEmpty();
+  }
+
+  /**
+   * Empties queue by dequeuing all the elements
+   */
+  public void clear() {
+    for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowMap.values()) {
+      dequeue(pair.getFirst().getExecId());
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index 08e5534..bbd642a 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -62,4 +62,20 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
   public String getRunningFlows() {
     return manager.getRunningFlowIds();
   }
+
+  @Override
+  public boolean isQueueProcessorActive() {
+    return manager.isQueueProcessorThreadActive();
+  }
+
+  @Override
+  public String getQueuedFlows() {
+    return manager.getQueuedFlowIds();
+  }
+
+  @Override
+  public String getQueueProcessorThreadState() {
+    return manager.getQueueProcessorThreadState().toString();
+  }
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index 9bc1175..94012e0 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -39,4 +39,14 @@ public interface JmxExecutorManagerMBean {
 
   @DisplayName("OPERATION: getPrimaryExecutorHostPorts")
   public List<String> getPrimaryExecutorHostPorts();
+
+  @DisplayName("OPERATION: isQueueProcessorActive")
+  public boolean isQueueProcessorActive();
+
+  @DisplayName("OPERATION: getQueuedFlows")
+  public String getQueuedFlows();
+
+  @DisplayName("OPERATION: getQueueProcessorThreadState")
+  public String getQueueProcessorThreadState();
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index bde18ec..12b72dd 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -141,7 +141,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
           runner.query(connection, ProjectResultHandler.SELECT_PROJECT_BY_ID,
               handler, id);
       if (projects.isEmpty()) {
-        throw new ProjectManagerException("No active project with id " + id
+        throw new ProjectManagerException("No project with id " + id
             + " exists in db.");
       }
 
@@ -169,6 +169,67 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     return project;
   }
 
+    /**
+     * Fetch first project with a given name {@inheritDoc}
+     *
+     * @see azkaban.project.ProjectLoader#fetchProjectByName(java.lang.String)
+     */
+    @Override
+    public Project fetchProjectByName(String name)
+        throws ProjectManagerException {
+        Connection connection = getConnection();
+
+        Project project = null;
+        try {
+            project = fetchProjectByName(connection, name);
+        } finally {
+            DbUtils.closeQuietly(connection);
+        }
+
+        return project;
+    }
+
+    private Project fetchProjectByName(Connection connection, String name)
+        throws ProjectManagerException {
+        QueryRunner runner = new QueryRunner();
+        // Fetch the project
+        Project project = null;
+        ProjectResultHandler handler = new ProjectResultHandler();
+        try {
+            List<Project> projects =
+                runner.query(connection,
+                    ProjectResultHandler.SELECT_PROJECT_BY_NAME, handler, name);
+            if (projects.isEmpty()) {
+                throw new ProjectManagerException(
+                    "No project with name " + name + " exists in db.");
+            }
+
+            project = projects.get(0);
+        } catch (SQLException e) {
+            logger.error(ProjectResultHandler.SELECT_PROJECT_BY_NAME
+                + " failed.");
+            throw new ProjectManagerException(
+                "Query for existing project failed. Project " + name, e);
+        }
+
+        // Fetch the user permissions
+        List<Triple<String, Boolean, Permission>> permissions =
+            fetchPermissionsForProject(connection, project);
+
+        for (Triple<String, Boolean, Permission> perm : permissions) {
+            if (perm.getThird().toFlags() != 0) {
+                if (perm.getSecond()) {
+                    project
+                        .setGroupPermission(perm.getFirst(), perm.getThird());
+                } else {
+                    project.setUserPermission(perm.getFirst(), perm.getThird());
+                }
+            }
+        }
+
+        return project;
+    }
+
   private List<Triple<String, Boolean, Permission>> fetchPermissionsForProject(
       Connection connection, Project project) throws ProjectManagerException {
     ProjectPermissionsResultHandler permHander =
@@ -1136,6 +1197,9 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
   private static class ProjectResultHandler implements
       ResultSetHandler<List<Project>> {
+    private static String SELECT_PROJECT_BY_NAME =
+        "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=?";
+
     private static String SELECT_PROJECT_BY_ID =
         "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE id=?";
 
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 371599f..aa886d6 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -48,6 +48,14 @@ public interface ProjectLoader {
   public Project fetchProjectById(int id) throws ProjectManagerException;
 
   /**
+   * Loads whole project, including permissions, by the project name.
+   * @param name
+   * @return
+   * @throws ProjectManagerException
+   */
+  public Project fetchProjectByName(String name) throws ProjectManagerException;
+
+  /**
    * Should create an empty project with the given name and user and adds it to
    * the data store. It will auto assign a unique id for this project if
    * successful.
@@ -269,5 +277,4 @@ public interface ProjectLoader {
       throws ProjectManagerException;
 
   void updateProjectSettings(Project project) throws ProjectManagerException;
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLogEvent.java b/azkaban-common/src/main/java/azkaban/project/ProjectLogEvent.java
index a19d013..5512291 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLogEvent.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLogEvent.java
@@ -32,7 +32,8 @@ public class ProjectLogEvent {
     UPLOADED(6),
     SCHEDULE(7),
     SLA(8),
-    PROXY_USER(9);
+    PROXY_USER(9),
+    PURGE(10);
 
     private int numVal;
 
@@ -64,6 +65,8 @@ public class ProjectLogEvent {
         return SLA;
       case 9:
         return PROXY_USER;
+      case 10:
+        return PURGE;
       case 128:
         return ERROR;
       default:
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 89c6539..0f09b9c 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -196,13 +196,65 @@ public class ProjectManager {
     return allProjects;
   }
 
-  public Project getProject(String name) {
-    return projectsByName.get(name);
-  }
+    /**
+     * Checks if a project is active using project_name
+     *
+     * @param name
+     */
+    public Boolean isActiveProject(String name) {
+        return projectsByName.containsKey(name);
+    }
 
-  public Project getProject(int id) {
-    return projectsById.get(id);
-  }
+    /**
+     * Checks if a project is active using project_id
+     *
+     * @param name
+     */
+    public Boolean isActiveProject(int id) {
+        return projectsById.containsKey(id);
+    }
+
+    /**
+     * fetch active project from cache and inactive projects from db by
+     * project_name
+     *
+     * @param name
+     * @return
+     */
+    public Project getProject(String name) {
+        Project fetchedProject = null;
+        if (isActiveProject(name)) {
+            fetchedProject = projectsByName.get(name);
+        } else {
+            try {
+                fetchedProject = projectLoader.fetchProjectByName(name);
+            } catch (ProjectManagerException e) {
+                logger.error("Could not load project from store.", e);
+            }
+        }
+        return fetchedProject;
+    }
+
+    /**
+     * fetch active project from cache and inactive projects from db by
+     * project_id
+     *
+     * @param id
+     * @return
+     */
+    public Project getProject(int id) {
+        Project fetchedProject = null;
+        if (isActiveProject(id)) {
+            fetchedProject = projectsById.get(id);
+        } else {
+            try {
+                fetchedProject = projectLoader.fetchProjectById(id);
+            } catch (ProjectManagerException e) {
+                logger.error("Could not load project from store.", e);
+            }
+        }
+        return fetchedProject;
+    }
 
   public Project createProject(String projectName, String description,
       User creator) throws ProjectManagerException {
@@ -249,6 +301,25 @@ public class ProjectManager {
     return newProject;
   }
 
+    /**
+     * Permanently delete all project files and properties data for all versions
+     * of a project and log event in project_events table
+     *
+     * @param project
+     * @param deleter
+     * @return
+     * @throws ProjectManagerException
+     */
+    public synchronized Project purgeProject(Project project, User deleter)
+        throws ProjectManagerException {
+        projectLoader.cleanOlderProjectVersion(project.getId(),
+            project.getVersion() + 1);
+        projectLoader
+            .postEvent(project, EventType.PURGE, deleter.getUserId(), String
+                .format("Purged versions before %d", project.getVersion() + 1));
+        return project;
+    }
+
   public synchronized Project removeProject(Project project, User deleter)
       throws ProjectManagerException {
     projectLoader.removeProject(project, deleter.getUserId());
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
new file mode 100644
index 0000000..ed6543f
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for ExecutableFlowPriorityComparator
+ * */
+
+public class ExecutableFlowPriorityComparatorTest {
+
+  /* Helper method to create an ExecutableFlow from serialized description */
+  private ExecutableFlow createExecutableFlow(String flowName, int priority,
+    long updateTime, int executionId) throws IOException {
+    ExecutableFlow execFlow =
+      TestUtils.createExecutableFlow("exectest1", flowName);
+
+    execFlow.setUpdateTime(updateTime);
+    execFlow.setExecutionId(executionId);
+    if (priority > 0) {
+      execFlow.getExecutionOptions().getFlowParameters()
+        .put(ExecutionOptions.FLOW_PRIORITY, String.valueOf(priority));
+    }
+    return execFlow;
+  }
+
+  /* priority queue order when all priorities are explicitly specified */
+  @Test
+  public void testExplicitlySpecifiedPriorities() throws IOException,
+    InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 5, 3, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 6, 3, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", 2, 3, 3);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow2, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow3, queue.take().getSecond());
+  }
+
+  /* priority queue order when some priorities are implicitly specified */
+  @Test
+  public void testMixedSpecifiedPriorities() throws IOException,
+    InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 3, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 3, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow3, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow2, queue.take().getSecond());
+  }
+
+  /*
+   * priority queue order when some priorities are equal, updatetime is used in
+   * this case
+   */
+  @Test
+  public void testEqualPriorities() throws IOException, InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
+    ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow3, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow4, queue.take().getSecond());
+    Assert.assertEquals(flow2, queue.take().getSecond());
+  }
+
+  /*
+   * priority queue order when some priorities and updatetime are equal,
+   * execution Id is used in this case
+   */
+  @Test
+  public void testEqualUpdateTimeAndPriority() throws IOException,
+    InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 2, 3);
+    ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow3, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow4, queue.take().getSecond());
+    Assert.assertEquals(flow2, queue.take().getSecond());
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
new file mode 100644
index 0000000..62d187b
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for executor manager
+ */
+public class ExecutorManagerTest {
+
+  /* Helper method to create a ExecutorManager Instance */
+  private ExecutorManager createMultiExecutorManagerInstance()
+    throws ExecutorManagerException {
+    return createMultiExecutorManagerInstance(new MockExecutorLoader());
+  }
+
+  /*
+   * Helper method to create a ExecutorManager Instance with the given
+   * ExecutorLoader
+   */
+  private ExecutorManager createMultiExecutorManagerInstance(
+    ExecutorLoader loader) throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
+
+    loader.addExecutor("localhost", 12345);
+    loader.addExecutor("localhost", 12346);
+    return new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+  }
+
+  /*
+   * Test create an executor manager instance without any executor local or
+   * remote
+   */
+  @Test(expected = ExecutorManagerException.class)
+  public void testNoExecutorScenario() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    @SuppressWarnings("unused")
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+  }
+
+  /*
+   * Test backward compatibility with just local executor
+   */
+  @Test
+  public void testLocalExecutorScenario() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put("executor.port", 12345);
+
+    ExecutorLoader loader = new MockExecutorLoader();
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Set<Executor> activeExecutors =
+      new HashSet(manager.getAllActiveExecutors());
+
+    Assert.assertEquals(activeExecutors.size(), 1);
+    Executor executor = activeExecutors.iterator().next();
+    Assert.assertEquals(executor.getHost(), "localhost");
+    Assert.assertEquals(executor.getPort(), 12345);
+    Assert.assertArrayEquals(activeExecutors.toArray(), loader
+      .fetchActiveExecutors().toArray());
+  }
+
+  /*
+   * Test executor manager initialization with multiple executors
+   */
+  @Test
+  public void testMultipleExecutorScenario() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    Executor executor1 = loader.addExecutor("localhost", 12345);
+    Executor executor2 = loader.addExecutor("localhost", 12346);
+
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Set<Executor> activeExecutors =
+      new HashSet(manager.getAllActiveExecutors());
+    Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
+      executor1, executor2 });
+  }
+
+  /*
+   * Test executor manager active executor reload
+   */
+  @Test
+  public void testSetupExecutorsSucess() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    Executor executor1 = loader.addExecutor("localhost", 12345);
+
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
+      new Executor[] { executor1 });
+
+    // mark older executor as inactive
+    executor1.setActive(false);
+    loader.updateExecutor(executor1);
+    Executor executor2 = loader.addExecutor("localhost", 12346);
+    Executor executor3 = loader.addExecutor("localhost", 12347);
+    manager.setupExecutors();
+
+    Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
+      new Executor[] { executor2, executor3 });
+  }
+
+  /*
+   * Test executor manager active executor reload and resulting in no active
+   * executors
+   */
+  @Test(expected = ExecutorManagerException.class)
+  public void testSetupExecutorsException() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    Executor executor1 = loader.addExecutor("localhost", 12345);
+
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Set<Executor> activeExecutors =
+      new HashSet(manager.getAllActiveExecutors());
+    Assert.assertArrayEquals(activeExecutors.toArray(),
+      new Executor[] { executor1 });
+
+    // mark older executor as inactive
+    executor1.setActive(false);
+    loader.updateExecutor(executor1);
+    manager.setupExecutors();
+  }
+
+  /* Test disabling queue process thread to pause dispatching */
+  @Test
+  public void testDisablingQueueProcessThread() throws ExecutorManagerException {
+    ExecutorManager manager = createMultiExecutorManagerInstance();
+    manager.enableQueueProcessorThread();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+    manager.disableQueueProcessorThread();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+  }
+
+  /* Test renabling queue process thread to pause restart dispatching */
+  @Test
+  public void testEnablingQueueProcessThread() throws ExecutorManagerException {
+    ExecutorManager manager = createMultiExecutorManagerInstance();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+    manager.enableQueueProcessorThread();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+  }
+
+  /* Test submit a non-dispatched flow */
+  @Test
+  public void testQueuedFlows() throws ExecutorManagerException, IOException {
+    ExecutorLoader loader = new MockExecutorLoader();
+    ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    flow1.setExecutionId(1);
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    flow2.setExecutionId(2);
+
+    User testUser = TestUtils.getTestUser();
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+    manager.submitExecutableFlow(flow2, testUser.getUserId());
+
+    List<ExecutableFlow> testFlows = new LinkedList<ExecutableFlow>();
+    testFlows.add(flow1);
+    testFlows.add(flow2);
+
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
+      loader.fetchQueuedFlows();
+    Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
+    // Verify things are correctly setup in db
+    for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
+      Assert.assertTrue(testFlows.contains(pair.getSecond()));
+    }
+
+    // Verify running flows using old definition of "running" flows i.e. a
+    // non-dispatched flow is also considered running
+    List<ExecutableFlow> managerActiveFlows = manager.getRunningFlows();
+    Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
+      && testFlows.containsAll(managerActiveFlows));
+
+    // Verify getQueuedFlowIds method
+    Assert.assertEquals("[1, 2]", manager.getQueuedFlowIds());
+  }
+
+  /* Test submit duplicate flow when previous instance is not dispatched */
+  @Test(expected = ExecutorManagerException.class)
+  public void testDuplicateQueuedFlows() throws ExecutorManagerException,
+    IOException {
+    ExecutorManager manager = createMultiExecutorManagerInstance();
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    flow1.getExecutionOptions().setConcurrentOption(
+      ExecutionOptions.CONCURRENT_OPTION_SKIP);
+
+    User testUser = TestUtils.getTestUser();
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+  }
+
+  /*
+   * Test killing a job in preparation stage at webserver side i.e. a
+   * non-dispatched flow
+   */
+  @Test
+  public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
+    ExecutorLoader loader = new MockExecutorLoader();
+    ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    User testUser = TestUtils.getTestUser();
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+
+    manager.cancelFlow(flow1, testUser.getUserId());
+    ExecutableFlow fetchedFlow =
+      loader.fetchExecutableFlow(flow1.getExecutionId());
+    Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
+
+    Assert.assertFalse(manager.getRunningFlows().contains(flow1));
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 71be50f..a7e2b5f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -48,19 +49,21 @@ import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
 
 public class JdbcExecutorLoaderTest {
   private static boolean testDBExists;
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions";
   // @TODO remove this and turn into local host.
-  private static final String host = "cyu-ld.linkedin.biz";
+  private static final String host = "localhost";
   private static final int port = 3306;
   private static final String database = "azkaban2";
   private static final String user = "azkaban";
   private static final String password = "azkaban";
   private static final int numConnections = 10;
 
-  private File flowDir = new File("unit/executions/exectest1");
-
   @BeforeClass
   public static void setupDB() {
     DataSource dataSource =
@@ -226,10 +229,8 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-    Assert.assertEquals(1, 0);
-
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
 
     loader.uploadExecutableFlow(flow);
 
@@ -258,7 +259,7 @@ public class JdbcExecutorLoaderTest {
     }
 
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
 
     loader.uploadExecutableFlow(flow);
 
@@ -297,7 +298,7 @@ public class JdbcExecutorLoaderTest {
     ExecutableFlow flow = createExecutableFlow(10, "exec1");
     flow.setExecutionId(10);
 
-    File jobFile = new File(flowDir, "job10.job");
+    File jobFile = new File(UNIT_BASE_DIR + "/exectest1", "job10.job");
     Props props = new Props(null, jobFile);
     props.put("test", "test2");
     ExecutableNode oldNode = flow.getExecutableNode("job10");
@@ -334,6 +335,149 @@ public class JdbcExecutorLoaderTest {
 
   }
 
+  /* Test exception when assigning a non-existent executor to a flow */
+  @Test
+  public void testAssignExecutorInvalidExecutor()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    try {
+      loader.assignExecutor(flow.getExecutionId(), 1);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test exception when assigning an executor to a non-existent flow execution */
+  @Test
+  public void testAssignExecutorInvalidExecution()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    String host = "localhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+    try {
+      loader.assignExecutor(2, executor.getId());
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test null return when an invalid execution flows */
+  @Test
+  public void testFetchMissingExecutorByExecution()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    Assert.assertEquals(loader.fetchExecutorByExecutionId(1), null);
+  }
+
+  /* Test null return when for a non-dispatched execution */
+  @Test
+  public void testFetchExecutorByQueuedExecution()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
+      null);
+  }
+
+  /* Test happy case when assigning and fetching an executor to a flow execution */
+  @Test
+  public void testAssignAndFetchExecutor() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    String host = "localhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
+      executor);
+  }
+
+  /* Test fetchQueuedFlows when there are no queued flows */
+  @Test
+  public void testFetchNoQueuedFlows() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+      loader.fetchQueuedFlows();
+
+    // no execution flows at all i.e. no running, completed or queued flows
+    Assert.assertTrue(queuedFlows.isEmpty());
+
+    String host = "lcoalhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    // only completed flows
+    Assert.assertTrue(queuedFlows.isEmpty());
+
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    ExecutionReference ref = new ExecutionReference(flow2.getExecutionId());
+    loader.addActiveExecutableReference(ref);
+    // only running and completed flows
+    Assert.assertTrue(queuedFlows.isEmpty());
+  }
+
+  /* Test fetchQueuedFlows happy case */
+  @Test
+  public void testFetchQueuedFlows() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+      new LinkedList<Pair<ExecutionReference, ExecutableFlow>>();
+
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    loader.uploadExecutableFlow(flow);
+
+    ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
+    loader.addActiveExecutableReference(ref2);
+    ExecutionReference ref = new ExecutionReference(flow.getExecutionId());
+    loader.addActiveExecutableReference(ref);
+
+    queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref, flow));
+    queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref2, flow2));
+
+    // only running and completed flows
+    Assert.assertArrayEquals(loader.fetchQueuedFlows().toArray(),
+      queuedFlows.toArray());
+  }
 
   /* Test all executors fetch from empty executors */
   @Test
@@ -434,7 +578,7 @@ public class JdbcExecutorLoaderTest {
     ExecutorLoader loader = createLoader();
     try {
       String host = "localhost";
-      int port = 123456;
+      int port = 12345;
       loader.addExecutor(host, port);
       loader.addExecutor(host, port);
       Assert.fail("Expecting exception, but didn't get one");
@@ -457,6 +601,7 @@ public class JdbcExecutorLoaderTest {
     } catch (ExecutorManagerException ex) {
       System.out.println("Test true");
     }
+    clearDB();
   }
 
   /* Test add & fetch by Id Executors */
@@ -587,19 +732,20 @@ public class JdbcExecutorLoaderTest {
     }
 
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow1 = createExecutableFlow("exec1");
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow1);
+    Executor executor = new Executor(2, "test", 1, true);
     ExecutionReference ref1 =
-        new ExecutionReference(flow1.getExecutionId(), "test", 1);
+        new ExecutionReference(flow1.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref1);
 
-    ExecutableFlow flow2 = createExecutableFlow("exec1");
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow2);
     ExecutionReference ref2 =
-        new ExecutionReference(flow2.getExecutionId(), "test", 1);
+        new ExecutionReference(flow2.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref2);
 
-    ExecutableFlow flow3 = createExecutableFlow("exec1");
+    ExecutableFlow flow3 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow3);
 
     Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
@@ -643,7 +789,7 @@ public class JdbcExecutorLoaderTest {
 
   @Ignore @Test
   public void testSmallUploadLog() throws ExecutorManagerException {
-    File logDir = new File("unit/executions/logtest");
+    File logDir = new File(UNIT_BASE_DIR + "logtest");
     File[] smalllog =
         { new File(logDir, "log1.log"), new File(logDir, "log2.log"),
             new File(logDir, "log3.log") };
@@ -668,7 +814,7 @@ public class JdbcExecutorLoaderTest {
 
   @Ignore @Test
   public void testLargeUploadLog() throws ExecutorManagerException {
-    File logDir = new File("unit/executions/logtest");
+    File logDir = new File(UNIT_BASE_DIR + "logtest");
 
     // Multiple of 255 for Henry the Eigth
     File[] largelog =
@@ -714,7 +860,7 @@ public class JdbcExecutorLoaderTest {
 
     ExecutorLoader loader = createLoader();
 
-    File logDir = new File("unit/executions/logtest");
+    File logDir = new File(UNIT_BASE_DIR + "logtest");
 
     // Multiple of 255 for Henry the Eigth
     File[] largelog =
@@ -738,37 +884,10 @@ public class JdbcExecutorLoaderTest {
   }
 
   private ExecutableFlow createExecutableFlow(int executionId, String flowName)
-      throws IOException {
-    File jsonFlowFile = new File(flowDir, flowName + ".flow");
-    @SuppressWarnings("unchecked")
-    HashMap<String, Object> flowObj =
-        (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    Flow flow = Flow.flowFromObject(flowObj);
-    Project project = new Project(1, "flow");
-    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
-    flowMap.put(flow.getId(), flow);
-    project.setFlows(flowMap);
-    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+    throws IOException {
+    ExecutableFlow execFlow =
+      TestUtils.createExecutableFlow("exectest1", flowName);
     execFlow.setExecutionId(executionId);
-
-    return execFlow;
-  }
-
-  private ExecutableFlow createExecutableFlow(String flowName)
-      throws IOException {
-    File jsonFlowFile = new File(flowDir, flowName + ".flow");
-    @SuppressWarnings("unchecked")
-    HashMap<String, Object> flowObj =
-        (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    Flow flow = Flow.flowFromObject(flowObj);
-    Project project = new Project(1, "flow");
-    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
-    flowMap.put(flow.getId(), flow);
-    project.setFlows(flowMap);
-    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-
     return execFlow;
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index f2ffee8..833e0c6 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -30,6 +30,8 @@ import azkaban.utils.Props;
 
 public class MockExecutorLoader implements ExecutorLoader {
 
+  HashMap<Integer, Integer> executionExecutorMapping =
+      new HashMap<Integer, Integer>();
   HashMap<Integer, ExecutableFlow> flows =
       new HashMap<Integer, ExecutableFlow>();
   HashMap<String, ExecutableNode> nodes = new HashMap<String, ExecutableNode>();
@@ -291,11 +293,12 @@ public class MockExecutorLoader implements ExecutorLoader {
   @Override
   public Executor addExecutor(String host, int port)
     throws ExecutorManagerException {
-    if (fetchExecutor(host, port) != null) {
-
+    Executor executor = null;
+    if (fetchExecutor(host, port) == null) {
+      executorIdCounter++;
+      executor = new Executor(executorIdCounter, host, port, true);
+      executors.add(executor);
     }
-    executorIdCounter++;
-    Executor executor = new Executor(executorIdCounter, host, port, true);
     return executor;
   }
 
@@ -334,4 +337,35 @@ public class MockExecutorLoader implements ExecutorLoader {
     return executors;
   }
 
+  @Override
+  public void assignExecutor(int executorId, int execId)
+    throws ExecutorManagerException {
+    ExecutionReference ref = refs.get(execId);
+    ref.setExecutor(fetchExecutor(executorId));
+    executionExecutorMapping.put(execId, executorId);
+  }
+
+  @Override
+  public Executor fetchExecutorByExecutionId(int execId) throws ExecutorManagerException {
+    if (executionExecutorMapping.containsKey(execId)) {
+      return fetchExecutor(executionExecutorMapping.get(execId));
+    } else {
+      throw new ExecutorManagerException(
+        "Failed to find executor with execution : " + execId);
+    }
+  }
+
+  @Override
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+    throws ExecutorManagerException {
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+      new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+    for (int execId : refs.keySet()) {
+      if (!executionExecutorMapping.containsKey(execId)) {
+        queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(refs
+          .get(execId), flows.get(execId)));
+      }
+    }
+    return queuedFlows;
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
new file mode 100644
index 0000000..94bbd7e
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
@@ -0,0 +1,208 @@
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+
+public class QueuedExecutionsTest {
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions/exectest1/";
+
+  private File getFlowDir(String flow) {
+    return new File(UNIT_BASE_DIR + flow + ".flow");
+  }
+
+  /*
+   * Helper method to create an (ExecutionReference, ExecutableFlow) from
+   * serialized description
+   */
+  private Pair<ExecutionReference, ExecutableFlow> createExecutablePair(
+    String flowName, int execId) throws IOException {
+    File jsonFlowFile = getFlowDir(flowName);
+    @SuppressWarnings("unchecked")
+    HashMap<String, Object> flowObj =
+      (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+    Flow flow = Flow.flowFromObject(flowObj);
+    Project project = new Project(1, "flow");
+    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+    flowMap.put(flow.getId(), flow);
+    project.setFlows(flowMap);
+    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+    execFlow.setExecutionId(execId);
+    ExecutionReference ref = new ExecutionReference(execId);
+    return new Pair<ExecutionReference, ExecutableFlow>(ref, execFlow);
+  }
+
+  public List<Pair<ExecutionReference, ExecutableFlow>> getDummyData()
+    throws IOException {
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList =
+      new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+    dataList.add(createExecutablePair("exec1", 1));
+    dataList.add(createExecutablePair("exec2", 2));
+    return dataList;
+  }
+
+  /* Test enqueue method happy case */
+  @Test
+  public void testEnqueueHappyCase() throws IOException,
+    ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      queue.enqueue(pair.getSecond(), pair.getFirst());
+    }
+
+    Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+    Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+  }
+
+  /* Test enqueue duplicate execution ids */
+  @Test(expected = ExecutorManagerException.class)
+  public void testEnqueueDuplicateExecution() throws IOException,
+    ExecutorManagerException {
+    Pair<ExecutionReference, ExecutableFlow> pair1 =
+      createExecutablePair("exec1", 1);
+    QueuedExecutions queue = new QueuedExecutions(5);
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+  }
+
+  /* Test enqueue more than capacity */
+  @Test(expected = ExecutorManagerException.class)
+  public void testEnqueueOverflow() throws IOException,
+    ExecutorManagerException {
+    Pair<ExecutionReference, ExecutableFlow> pair1 =
+      createExecutablePair("exec1", 1);
+    QueuedExecutions queue = new QueuedExecutions(1);
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+  }
+
+  /* Test EnqueueAll method */
+  @Test
+  public void testEnqueueAll() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+    Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+  }
+
+  /* Test size method */
+  @Test
+  public void testSize() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.size(), 2);
+  }
+
+  /* Test dequeue method */
+  @Test
+  public void testDequeue() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    queue.dequeue(dataList.get(0).getFirst().getExecId());
+    Assert.assertEquals(queue.size(), 1);
+    Assert.assertTrue(queue.getAllEntries().contains(dataList.get(1)));
+  }
+
+  /* Test clear method */
+  @Test
+  public void testClear() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.size(), 2);
+    queue.clear();
+    Assert.assertEquals(queue.size(), 0);
+  }
+
+  /* Test isEmpty method */
+  @Test
+  public void testIsEmpty() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    Assert.assertTrue(queue.isEmpty());
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.size(), 2);
+    queue.clear();
+    Assert.assertTrue(queue.isEmpty());
+  }
+
+  /* Test fetchHead method */
+  @Test
+  public void testFetchHead() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    Assert.assertTrue(queue.isEmpty());
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.fetchHead(), dataList.get(0));
+    Assert.assertEquals(queue.fetchHead(), dataList.get(1));
+  }
+
+  /* Test isFull method */
+  @Test
+  public void testIsFull() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertTrue(queue.isFull());
+  }
+
+  /* Test hasExecution method */
+  @Test
+  public void testHasExecution() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      Assert.assertTrue(queue.hasExecution(pair.getFirst().getExecId()));
+    }
+    Assert.assertFalse(queue.hasExecution(5));
+    Assert.assertFalse(queue.hasExecution(7));
+    Assert.assertFalse(queue.hasExecution(15));
+  }
+
+  /* Test getFlow method */
+  @Test
+  public void testGetFlow() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      Assert.assertEquals(pair.getSecond(),
+        queue.getFlow(pair.getFirst().getExecId()));
+    }
+  }
+
+  /* Test getReferences method */
+  @Test
+  public void testGetReferences() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      Assert.assertEquals(pair.getFirst(),
+        queue.getReference(pair.getFirst().getExecId()));
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
index a7ef5f3..67efed9 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
@@ -28,7 +28,6 @@ import javax.sql.DataSource;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
-
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -214,6 +213,96 @@ public class JdbcProjectLoaderTest {
     DbUtils.closeQuietly(connection);
   }
 
+    /** Test case to validated permissions for fetchProjectByName **/
+    @Test
+    public void testPermissionRetrivalByFetchProjectByName()
+        throws ProjectManagerException {
+        if (!isTestSetup()) {
+            return;
+        }
+
+        ProjectLoader loader = createLoader();
+        String projectName = "mytestProject";
+        String projectDescription = "This is my new project";
+        User user = new User("testUser");
+
+        Project project =
+            loader.createNewProject(projectName, projectDescription, user);
+
+        Permission perm = new Permission(0x2);
+        loader.updatePermission(project, user.getUserId(), perm, false);
+        loader.updatePermission(project, "group", perm, true);
+
+        Permission permOverride = new Permission(0x6);
+        loader.updatePermission(project, user.getUserId(), permOverride, false);
+
+        Project fetchedProject = loader.fetchProjectByName(project.getName());
+        assertProjectMemberEquals(project, fetchedProject);
+        Assert.assertEquals(permOverride,
+            fetchedProject.getUserPermission(user.getUserId()));
+    }
+
+    /** Default Test case for fetchProjectByName **/
+    @Test
+    public void testProjectRetrievalByFetchProjectByName()
+        throws ProjectManagerException {
+        if (!isTestSetup()) {
+            return;
+        }
+
+        ProjectLoader loader = createLoader();
+        String projectName = "mytestProject";
+        String projectDescription = "This is my new project";
+        User user = new User("testUser");
+
+        Project project =
+            loader.createNewProject(projectName, projectDescription, user);
+
+        Project fetchedProject = loader.fetchProjectByName(project.getName());
+        assertProjectMemberEquals(project, fetchedProject);
+    }
+
+    /** Default Test case for fetchProjectByName **/
+    @Test
+    public void testDuplicateRetrivalByFetchProjectByName()
+        throws ProjectManagerException {
+        if (!isTestSetup()) {
+            return;
+        }
+
+        ProjectLoader loader = createLoader();
+        String projectName = "mytestProject";
+        String projectDescription = "This is my new project";
+        User user = new User("testUser");
+
+        Project project =
+            loader.createNewProject(projectName, projectDescription, user);
+
+        loader.removeProject(project, user.getUserId());
+
+        Project newProject =
+            loader.createNewProject(projectName, projectDescription, user);
+
+        Project fetchedProject = loader.fetchProjectByName(project.getName());
+        Assert.assertEquals(newProject.getId(), fetchedProject.getId());
+
+    }
+
+    /** Test case for NonExistantProject project fetch **/
+    @Test
+    public void testInvalidProjectByFetchProjectByName() {
+        if (!isTestSetup()) {
+            return;
+        }
+        ProjectLoader loader = createLoader();
+        try {
+            loader.fetchProjectByName("NonExistantProject");
+        } catch (ProjectManagerException ex) {
+            System.out.println("Test true");
+        }
+        Assert.fail("Expecting exception, but didn't get one");
+    }
+
   @Test
   public void testCreateProject() throws ProjectManagerException {
     if (!isTestSetup()) {
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 909b82e..42d57b7 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -243,4 +243,10 @@ public class MockProjectLoader implements ProjectLoader {
     // TODO Auto-generated method stub
 
   }
+
+@Override
+public Project fetchProjectByName(String name) throws ProjectManagerException {
+    // TODO Auto-generated method stub
+    return null;
+}
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
new file mode 100644
index 0000000..e51b575
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+
+/**
+ * Commonly used utils method for unit/integration tests
+ */
+public class TestUtils {
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions";
+
+  public static File getFlowDir(String projectName, String flow) {
+    return new File(String.format("%s/%s/%s.flow", UNIT_BASE_DIR, projectName,
+      flow));
+  }
+
+  public static User getTestUser() {
+    return new User("testUser");
+  }
+
+  /* Helper method to create an ExecutableFlow from serialized description */
+  public static ExecutableFlow createExecutableFlow(String projectName, String flowName)
+    throws IOException {
+    File jsonFlowFile = getFlowDir(projectName, flowName);
+    @SuppressWarnings("unchecked")
+    HashMap<String, Object> flowObj =
+      (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+    Flow flow = Flow.flowFromObject(flowObj);
+    Project project = new Project(1, "flow");
+    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+    flowMap.put(flow.getId(), flow);
+    project.setFlows(flowMap);
+    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+
+    return execFlow;
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index e66a586..1d9af6d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -72,7 +72,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
 
   /**
    * Handle all get request to Stats Servlet {@inheritDoc}
-   * 
+   *
    * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
    *      javax.servlet.http.HttpServletResponse)
    */
@@ -176,7 +176,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
 
   /**
    * Get metric snapshots for a metric and date specification
-   * 
+   *
    * @throws ServletException
    */
   private void handleGetMetricHistory(HttpServletRequest req,
@@ -252,7 +252,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
 
   /**
    * Update tracking interval for a given metrics
-   * 
+   *
    * @throws ServletException
    */
   private void handleChangeMetricInterval(HttpServletRequest req,
diff --git a/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql b/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql
new file mode 100644
index 0000000..dcb4ec5
--- /dev/null
+++ b/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql
@@ -0,0 +1,2 @@
+ALTER TABLE active_executing_flows DROP COLUMN host;
+ALTER TABLE active_executing_flows DROP COLUMN port;
\ No newline at end of file
diff --git a/azkaban-sql/src/sql/update.execution_flows.3.0.sql b/azkaban-sql/src/sql/update.execution_flows.3.0.sql
new file mode 100644
index 0000000..2935810
--- /dev/null
+++ b/azkaban-sql/src/sql/update.execution_flows.3.0.sql
@@ -0,0 +1,2 @@
+ALTER TABLE execution_flows ADD COLUMN executor_id INT DEFAULT NULL;
+CREATE INDEX executor_id ON execution_flows(executor_id);
\ No newline at end of file
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 2923d5e..f3e8c8f 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -34,6 +34,7 @@ import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Executor;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
@@ -47,8 +48,11 @@ import azkaban.server.HttpRequestUtils;
 import azkaban.server.session.Session;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
+import azkaban.user.Role;
 import azkaban.user.User;
+import azkaban.user.UserManager;
 import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.plugin.PluginRegistry;
 import azkaban.webapp.plugin.ViewerPlugin;
@@ -59,11 +63,13 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   private ExecutorManagerAdapter executorManager;
   private ScheduleManager scheduleManager;
   private ExecutorVelocityHelper velocityHelper;
+  private UserManager userManager;
 
   @Override
   public void init(ServletConfig config) throws ServletException {
     super.init(config);
     AzkabanWebServer server = (AzkabanWebServer) getApplication();
+    userManager = server.getUserManager();
     projectManager = server.getProjectManager();
     executorManager = server.getExecutorManager();
     scheduleManager = server.getScheduleManager();
@@ -225,7 +231,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
         newPage(req, resp, session,
             "azkaban/webapp/servlet/velocity/executionspage.vm");
 
-    List<ExecutableFlow> runningFlows = executorManager.getRunningFlows();
+    List<Pair<ExecutableFlow, Executor>> runningFlows =
+      executorManager.getActiveFlowsWithExecutor();
     page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
 
     List<ExecutableFlow> finishedFlows =
@@ -806,6 +813,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     if (!options.isSuccessEmailsOverridden()) {
       options.setSuccessEmails(flow.getSuccessEmails());
     }
+    fixFlowPriorityByPermission(options, user);
     options.setMailCreator(flow.getMailCreator());
 
     try {
@@ -821,6 +829,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     ret.put("execid", exflow.getExecutionId());
   }
 
+  /* Reset flow priority if submitting user is not a Azkaban admin */
+  private void fixFlowPriorityByPermission(ExecutionOptions options, User user) {
+    if (!(options.getFlowParameters().containsKey(
+      ExecutionOptions.FLOW_PRIORITY) && hasPermission(user, Type.ADMIN))) {
+      options.getFlowParameters().put(ExecutionOptions.FLOW_PRIORITY,
+        String.valueOf(ExecutionOptions.DEFAULT_FLOW_PRIORITY));
+    }
+  }
+
   public class ExecutorVelocityHelper {
     public String getProjectName(int id) {
       Project project = projectManager.getProject(id);
@@ -831,4 +848,16 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       return project.getName();
     }
   }
+
+  /* returns true if user has access of type */
+  protected boolean hasPermission(User user, Permission.Type type) {
+    for (String roleName : user.getRoles()) {
+      Role role = userManager.getRole(roleName);
+      if (role.getPermission().isPermissionSet(type)
+        || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+        return true;
+      }
+    }
+    return false;
+  }
 }
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 435e1f9..dc0ef87 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -148,6 +148,8 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
         handleFlowPage(req, resp, session);
       } else if (hasParam(req, "delete")) {
         handleRemoveProject(req, resp, session);
+      } else if (hasParam(req, "purge")) {
+        handlePurgeProject(req, resp, session);
       } else if (hasParam(req, "download")) {
         handleDownloadProject(req, resp, session);
       } else {
@@ -518,6 +520,56 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 
   }
 
+    /**
+     * validate readiness of a project and user permission and use
+     * projectManager to purge the project if things looks good
+     **/
+    private void handlePurgeProject(HttpServletRequest req,
+        HttpServletResponse resp, Session session) throws ServletException,
+        IOException {
+        User user = session.getUser();
+        String projectName = getParam(req, "project");
+        HashMap<String, Object> ret = new HashMap<String, Object>();
+        boolean isOperationSuccessful = true;
+
+        if (projectManager.isActiveProject(projectName)) {
+            ret.put("error", "Project " + projectName
+                + " should be deleted before purging");
+            isOperationSuccessful = false;
+        }
+
+        try {
+            Project project = null;
+
+            // project is already deleted
+            if (isOperationSuccessful) {
+                project = projectManager.getProject(projectName);
+                if (project == null) {
+                    ret.put("error", "no project with name : " + projectName);
+                    isOperationSuccessful = false;
+                }
+            }
+
+            // only eligible users can purge a project
+            if (isOperationSuccessful
+                && !hasPermission(project, user, Type.ADMIN)) {
+                ret.put("error", "Cannot purge. User '" + user.getUserId()
+                    + "' is not an ADMIN.");
+                isOperationSuccessful = false;
+            }
+
+            if (isOperationSuccessful) {
+                projectManager.purgeProject(project, user);
+            }
+        } catch (Exception e) {
+            ret.put("error", e.getMessage());
+            isOperationSuccessful = false;
+        }
+
+        ret.put("success", isOperationSuccessful);
+        this.writeJSON(resp, ret);
+    }
+
   private void handleRemoveProject(HttpServletRequest req,
       HttpServletResponse resp, Session session) throws ServletException,
       IOException {
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index 6d14839..b2b3487 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -17,6 +17,7 @@
 package azkaban.webapp.servlet;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -29,7 +30,9 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import azkaban.executor.ConnectorParams;
+import azkaban.executor.Executor;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.server.session.Session;
 import azkaban.user.Permission;
 import azkaban.user.Role;
@@ -67,55 +70,105 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
   private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session)
       throws ServletException, IOException {
     HashMap<String, Object> ret = new HashMap<String, Object>();
+    int executorId = getIntParam(req, ConnectorParams.EXECUTOR_ID_PARAM);
     String actionName = getParam(req, ConnectorParams.ACTION_PARAM);
 
     if (actionName.equals(ConnectorParams.STATS_GET_METRICHISTORY)) {
-      handleGetMetricHistory(req, ret, session.getUser());
+      handleGetMetricHistory(executorId, req, ret, session.getUser());
+    } else if (actionName.equals(ConnectorParams.STATS_GET_ALLMETRICSNAME)) {
+      handleGetAllMetricName(executorId, req, ret);
     } else if (actionName.equals(ConnectorParams.STATS_SET_REPORTINGINTERVAL)) {
-      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
+      handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
     } else if (actionName.equals(ConnectorParams.STATS_SET_CLEANINGINTERVAL)) {
-      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
+      handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
     } else if (actionName.equals(ConnectorParams.STATS_SET_MAXREPORTERPOINTS)) {
-      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
+      handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
     } else if (actionName.equals(ConnectorParams.STATS_SET_ENABLEMETRICS)) {
-      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
+      handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
     } else if (actionName.equals(ConnectorParams.STATS_SET_DISABLEMETRICS)) {
-      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
+      handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
     }
 
     writeJSON(resp, ret);
   }
 
   /**
+   * Get all metrics tracked by the given executor
+   *
+   * @param executorId
+   * @param req
+   * @param ret
+   */
+  private void handleGetAllMetricName(int executorId, HttpServletRequest req,
+    HashMap<String, Object> ret) throws IOException {
+    Map<String, Object> result;
+    try {
+      result =
+        execManager.callExecutorStats(executorId,
+          ConnectorParams.STATS_GET_ALLMETRICSNAME,
+          (Pair<String, String>[]) null);
+
+      if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+        ret.put("error", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+      } else {
+        ret.put("metricList", result.get("data"));
+      }
+    } catch (ExecutorManagerException e) {
+      ret.put("error", "Failed to fetch metric names for executor : "
+        + executorId);
+    }
+  }
+
+  /**
    * Generic method to facilitate actionName action using Azkaban exec server
+   * @param executorId
    * @param actionName  Name of the action
+   * @throws ExecutorManagerException
    */
-  private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
+  private void handleChangeConfigurationRequest(int executorId, String actionName, HttpServletRequest req, HashMap<String, Object> ret)
       throws ServletException, IOException {
-    Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
-    if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
-      ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
-    } else {
-      ret.put(ConnectorParams.STATUS_PARAM, result.get(ConnectorParams.STATUS_PARAM));
+    try {
+      Map<String, Object> result =
+        execManager
+          .callExecutorStats(executorId, actionName, getAllParams(req));
+      if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+        ret.put(ConnectorParams.RESPONSE_ERROR,
+          result.get(ConnectorParams.RESPONSE_ERROR).toString());
+      } else {
+        ret.put(ConnectorParams.STATUS_PARAM,
+          result.get(ConnectorParams.STATUS_PARAM));
+      }
+    } catch (ExecutorManagerException ex) {
+      ret.put("error", "Failed to change config change");
     }
   }
 
   /**
    * Get metric snapshots for a metric and date specification
+   * @param executorId
    * @throws ServletException
+   * @throws ExecutorManagerException
    */
-  private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
-      throws IOException, ServletException {
-    Map<String, Object> result =
-        execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
-    if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
-      ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
-    } else {
-      ret.put("data", result.get("data"));
+  private void handleGetMetricHistory(int executorId, HttpServletRequest req,
+    HashMap<String, Object> ret, User user) throws IOException,
+    ServletException {
+    try {
+      Map<String, Object> result =
+        execManager.callExecutorStats(executorId,
+          ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
+      if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+        ret.put(ConnectorParams.RESPONSE_ERROR,
+          result.get(ConnectorParams.RESPONSE_ERROR).toString());
+      } else {
+        ret.put("data", result.get("data"));
+      }
+    } catch (ExecutorManagerException ex) {
+      ret.put("error", "Failed to fetch metric history");
     }
   }
 
   /**
+   * @throws ExecutorManagerException
    *
    */
   private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
@@ -128,14 +181,20 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
     }
 
     try {
+      Collection<Executor> executors = execManager.getAllActiveExecutors();
+      page.add("executorList", executors);
+
       Map<String, Object> result =
-          execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, (Pair<String, String>[]) null);
+        execManager.callExecutorStats(executors.iterator().next().getId(),
+          ConnectorParams.STATS_GET_ALLMETRICSNAME,
+          (Pair<String, String>[]) null);
       if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
-        page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+        page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR)
+          .toString());
       } else {
         page.add("metricList", result.get("data"));
       }
-    } catch (IOException e) {
+    } catch (Exception e) {
       page.add("errorMsg", "Failed to get a response from Azkaban exec server");
     }
 
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
index 49d1085..9c2a6fb 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -68,6 +68,7 @@
               <tr>
                 <th>#</th>
                 <th class="execid">Execution Id</th>
+                <th >Executor Id</th>
                 <th>Flow</th>
                 <th>Project</th>
                 <th class="user">User</th>
@@ -80,25 +81,33 @@
               </tr>
             </thead>
             <tbody>
-#if ($runningFlows)
+
+#if ( !$null.isNull(${runningFlows}))
   #foreach ($flow in $runningFlows)
               <tr>
                  <td class="tb-name">
                    $velocityCount
                 </td>
                 <td class="tb-name">
-                  <a href="${context}/executor?execid=${flow.executionId}">${flow.executionId}</a>
+                  <a href="${context}/executor?execid=${flow.getFirst().executionId}">${flow.getFirst().executionId}</a>
                 </td>
-                <td><a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})&flow=${flow.flowId}">${flow.flowId}</a></td>
                 <td>
-                  <a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})">$vmutils.getProjectName(${flow.projectId})</a>
+                   #if (${flow.getSecond()})
+                       ${flow.getSecond().getId()}											
+                   #else
+                       -
+                   #end
                 </td>
-                <td>${flow.submitUser}</td>
-                <td>${flow.proxyUsers}</td>
-                <td>$utils.formatDate(${flow.startTime})</td>
-                <td>$utils.formatDate(${flow.endTime})</td>
-                <td>$utils.formatDuration(${flow.startTime}, ${flow.endTime})</td>
-                <td><div class="status ${flow.status}">$utils.formatStatus(${flow.status})</div></td>
+                <td><a href="${context}/manager?project=$vmutils.getProjectName(${flow.getFirst().projectId})&flow=${flow.getFirst().flowId}">${flow.getFirst().flowId}</a></td>
+                <td>
+                  <a href="${context}/manager?project=$vmutils.getProjectName(${flow.getFirst().projectId})">$vmutils.getProjectName(${flow.getFirst().projectId})</a>
+                </td>
+                <td>${flow.getFirst().submitUser}</td>
+                <td>${flow.getFirst().proxyUsers}</td>
+                <td>$utils.formatDate(${flow.getFirst().startTime})</td>
+                <td>$utils.formatDate(${flow.getFirst().endTime})</td>
+                <td>$utils.formatDuration(${flow.getFirst().startTime}, ${flow.getFirst().endTime})</td>
+                <td><div class="status ${flow.getFirst().status}">$utils.formatStatus(${flow.getFirst().status})</div></td>
                 <td></td>
               </tr>
   #end
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index 4596ac2..d77bc52 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -29,6 +29,25 @@
          var currentTime = ${currentTime};
          var timezone = "${timezone}";
 
+         function refreshMetricList() {
+               var requestURL = '/stats';
+               var requestData = {
+                 'action': 'getAllMetricNames',
+                 'executorId':  $('#executorName').val()
+               };
+               var successHandler = function(responseData) {
+                 if(responseData.error != null) {
+                   $('#reportedMetric').html(responseData.error);
+                 } else {
+                    $('#metricName').empty();
+                    for(var index = 0; index < responseData.metricList.length; index++) {
+                      $('#metricName').append($('<option value="1">' + responseData.metricList[index] + '</option>'));
+                    }
+                  }
+               };
+               $.get(requestURL, requestData, successHandler, 'json');
+         }
+
          function refreshMetricChart() {
                var requestURL = '/stats';
                var requestData = {
@@ -36,7 +55,8 @@
                  'from': new Date($('#datetimebegin').val()).toUTCString(),
                  'to'  : new Date($('#datetimeend').val()).toUTCString(),
                  'metricName': $('#metricName').val(),
-                 'useStats': $("#useStats").is(':checked')
+                 'useStats': $("#useStats").is(':checked'),
+                 'executorId':  $('#executorName').val()
                };
                var successHandler = function(responseData) {
                  if(responseData.error != null) {
@@ -67,6 +87,7 @@
                  $('#datetimebegin').data('DateTimePicker').setEndDate(e.date);
                });
              $('#retrieve').click(refreshMetricChart);
+             $('#executorName').click(refreshMetricList);
          });
 
       </script>
@@ -84,8 +105,16 @@
                <div class="header-title" style="width: 17%;">
                   <h1><a href="${context}/stats">Statistics</a></h1>
                </div>
-               <div class="header-control" style="width: 900px; padding-top: 5px;">
+               <div class="header-control" style="width: 1300px; padding-top: 5px;">
                   <form id="metric-form" method="get">
+                     <label for="executorLabel" >Executor</label>
+                     #if (!$executorList.isEmpty())
+                     <select id="executorName"  name="executorName" style="width:200px">
+                        #foreach ($executor in $executorList)
+                        <option value="${executor.getId()}" style="width:200px">${executor.getHost()}:${executor.getPort()}</option>
+                        #end
+                     </select>
+                     #end
                      <label for="metricLabel" >Metric</label>
                      #if (!$metricList.isEmpty())
                      <select id="metricName"  name="metricName" style="width:200px">
@@ -119,4 +148,4 @@
       <!-- /container-full -->
       #end
    </body>
-   <html>
\ No newline at end of file
+   <html>
diff --git a/azkaban-webserver/src/web/js/azkaban/view/exflow.js b/azkaban-webserver/src/web/js/azkaban/view/exflow.js
index dbb8ae4..026a946 100644
--- a/azkaban-webserver/src/web/js/azkaban/view/exflow.js
+++ b/azkaban-webserver/src/web/js/azkaban/view/exflow.js
@@ -178,8 +178,11 @@ azkaban.FlowTabView = Backbone.View.extend({
 		$("#retrybtn").hide();
 
 		if (data.status == "SUCCEEDED") {
-      $("#executebtn").show();
+                        $("#executebtn").show();
 		}
+                else if (data.status == "PREPARING") {
+                        $("#cancelbtn").show();
+                }
 		else if (data.status == "FAILED") {
 			$("#executebtn").show();
 		}