azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
index 7e6ab80..eb3f834 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
@@ -47,7 +47,11 @@ public final class ExecutableFlowPriorityComparator implements
       // descending order of priority
       int diff = getPriority(exflow2) - getPriority(exflow1);
       if (diff == 0) {
-        // increasing order of execution id, if same priority
+        // increasing order of update time, if same priority
+        diff = (int) (exflow1.getUpdateTime() - exflow2.getUpdateTime());
+      }
+      if (diff == 0) {
+        // increasing order of execution id, if same priority and updateTime
         diff = exflow1.getExecutionId() - exflow2.getExecutionId();
       }
       return diff;
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index f47fe7b..82d6e80 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -23,6 +23,7 @@ public class ExecutionReference {
   private long nextCheckTime = -1;
   private int numErrors = 0;
 
+
   public ExecutionReference(int execId) {
     this.execId = execId;
   }
@@ -32,10 +33,6 @@ public class ExecutionReference {
     this.executor = executor;
   }
 
-  public Executor getExecutor() {
-    return executor;
-  }
-
   public void setUpdateTime(long updateTime) {
     this.updateTime = updateTime;
   }
@@ -56,6 +53,14 @@ public class ExecutionReference {
     return execId;
   }
 
+  public String getHost() {
+    return executor.getHost();
+  }
+
+  public int getPort() {
+    return executor.getPort();
+  }
+
   public int getNumErrors() {
     return numErrors;
   }
@@ -67,4 +72,8 @@ public class ExecutionReference {
   public void setExecutor(Executor executor) {
     this.executor = executor;
   }
-}
+
+  public Executor getExecutor() {
+    return executor;
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 31070bf..32176e9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -107,4 +107,8 @@ public class Executor {
   public void setActive(boolean isActive) {
     this.isActive = isActive;
   }
+
+  public String toString() {
+    return String.format("%s:%d", host, port);
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index ab7a561..6120002 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -198,7 +198,7 @@ public interface ExecutorLoader {
    * @return fetched Executor
    * @throws ExecutorManagerException
    */
-  public Executor fetchExecutorByExecution(int executionId)
+  public Executor fetchExecutorByExecutionId(int executionId)
     throws ExecutorManagerException;
 
   /**
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c9633ba..ed64364 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -24,7 +24,6 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -32,7 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
 
 import org.apache.commons.lang.StringUtils;
@@ -63,50 +62,53 @@ import azkaban.utils.Props;
  *
  */
 public class ExecutorManager extends EventHandler implements
-  ExecutorManagerAdapter {
+    ExecutorManagerAdapter {
+  static final String AZKABAN_QUEUEPROCESSING_ENABLED =
+    "azkaban.queueprocessing.enabled";
   static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
     "azkaban.use.multiple.executors";
-  static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
+  private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
     "azkaban.webserver.queue.size";
+
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
-  final private ExecutorLoader executorLoader;
+  private ExecutorLoader executorLoader;
 
-  final private CleanerThread cleanerThread;
+  private CleanerThread cleanerThread;
 
-  final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
-    new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
-  final private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
-    new ConcurrentHashMap<Integer, ExecutableFlow>();
+  private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
+      new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+  private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
+      new ConcurrentHashMap<Integer, ExecutableFlow>();
 
   /* map to easily access queued flows */
   final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
     new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
   /* web server side queue */
-  final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+  final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlowList =
     new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
       new ExecutableFlowPriorityComparator());
 
   final private Set<Executor> activeExecutors = new HashSet<Executor>();
+  final private long webserverQueueCapacity;
+  private QueueProcessorThread queueProcessor;
 
-  final private ExecutingManagerUpdaterThread executingManager;
-  final private QueueProcessorThread queueProcessor;
-
+  private ExecutingManagerUpdaterThread executingManager;
+  // 12 weeks
   private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
-    * 24 * 60 * 60 * 1000l;
-
+      * 24 * 60 * 60 * 1000l;
   private long lastCleanerThreadCheckTime = -1;
-  final private long webserverQueueCapacity;
+
   private long lastThreadCheckTime = -1;
   private String updaterStage = "not started";
 
-  final private Map<String, Alerter> alerters;
-
-  final Props azkProps;
+  private Map<String, Alerter> alerters;
 
   File cacheDir;
 
+  final Props azkProps;
+
   public ExecutorManager(Props props, ExecutorLoader loader,
-    Map<String, Alerter> alters) throws ExecutorManagerException {
+      Map<String, Alerter> alters) throws ExecutorManagerException {
     azkProps = props;
 
     this.executorLoader = loader;
@@ -121,28 +123,34 @@ public class ExecutorManager extends EventHandler implements
     executingManager = new ExecutingManagerUpdaterThread();
     executingManager.start();
 
-    queueProcessor = new QueueProcessorThread();
-    queueProcessor.start();
+    if(isMultiExecutorMode()) {
+      queueProcessor =
+        new QueueProcessorThread(azkProps.getBoolean(
+          AZKABAN_QUEUEPROCESSING_ENABLED, true));
+      queueProcessor.start();
+    }
 
     long executionLogsRetentionMs =
-      props.getLong("execution.logs.retention.ms",
-        DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+        props.getLong("execution.logs.retention.ms",
+            DEFAULT_EXECUTION_LOGS_RETENTION_MS);
     webserverQueueCapacity =
-      props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000);
+        props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000);
+
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
     cleanerThread.start();
+
   }
 
   /**
    * <pre>
    * Setup activeExecutors using azkaban.properties and database executors
    * Note:
-   * 1. If a local executor is specified and it is missing from db,
+   * 1. If azkaban.use.multiple.executors is set true, this method will
+   *    load all active executors
+   * 2. In local mode, If a local executor is specified and it is missing from db,
    *    this method add local executor as active in DB
-   * 2. If a local executor is specified and it is marked inactive in db,
+   * 3. In local mode, If a local executor is specified and it is marked inactive in db,
    *    this method will convert local executor as active in DB
-   * 3. If azkaban.use.multiple.executors is set true, this method will
-   *    load all active projects
    * </pre>
    *
    * @throws ExecutorManagerException
@@ -150,10 +158,15 @@ public class ExecutorManager extends EventHandler implements
   public void setupExecutors() throws ExecutorManagerException {
     Set<Executor> newExecutors = new HashSet<Executor>();
 
-    // Add local executor, if specified as per properties
-    if (azkProps.containsKey("executor.port")) {
+    if (isMultiExecutorMode()) {
+      logger.info("Initializing multi executors from database");
+      newExecutors.addAll(executorLoader.fetchActiveExecutors());
+    } else if (azkProps.containsKey("executor.port")) {
+      // Add local executor, if specified as per properties
       String executorHost = azkProps.getString("executor.host", "localhost");
       int executorPort = azkProps.getInt("executor.port");
+      logger.info(String.format("Initializing local executor %s:%d",
+        executorHost, executorPort));
       Executor executor =
         executorLoader.fetchExecutor(executorHost, executorPort);
       if (executor == null) {
@@ -166,12 +179,10 @@ public class ExecutorManager extends EventHandler implements
         executorPort, true));
     }
 
-    if (azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false)) {
-      newExecutors.addAll(executorLoader.fetchActiveExecutors());
-    }
-
     if (newExecutors.isEmpty()) {
       throw new ExecutorManagerException("No active executor found");
+    } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+      throw new ExecutorManagerException("Multiple local executors specified");
     } else {
       // clear all active executors, only if we have at least one new active
       // executors
@@ -180,22 +191,52 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
+  private boolean isMultiExecutorMode() {
+    return azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
+  }
+
+  /**
+   * Refresh Executor stats for all the actie executors in this executorManager
+   */
+  private void refreshExecutors() {
+    synchronized (activeExecutors) {
+      // TODO: rest api call to refresh executor stats
+    }
+  }
+
   /**
    * Disable flow dispatching in QueueProcessor
+   *
+   * @throws ExecutorManagerException
    */
-  public void disableQueueProcessorThread() {
-    queueProcessor.setActive(false);
+  public void disableQueueProcessorThread() throws ExecutorManagerException {
+    if (isMultiExecutorMode()) {
+      queueProcessor.setActive(false);
+    } else {
+      throw new ExecutorManagerException(
+        "Cannot disable QueueProcessor in local mode");
+    }
   }
 
   /**
    * Enable flow dispatching in QueueProcessor
+   *
+   * @throws ExecutorManagerException
    */
-  public void enableQueueProcessorThread() {
-    queueProcessor.setActive(true);
+  public void enableQueueProcessorThread() throws ExecutorManagerException {
+    if (isMultiExecutorMode()) {
+      queueProcessor.setActive(true);
+    } else {
+      throw new ExecutorManagerException(
+        "Cannot enable QueueProcessor in local mode");
+    }
   }
 
   public State getQueueProcessorThreadState() {
-    return queueProcessor.getState();
+    if (isMultiExecutorMode())
+      return queueProcessor.getState();
+    else
+      return State.NEW; // not started in local mode
   }
 
   /**
@@ -205,7 +246,10 @@ public class ExecutorManager extends EventHandler implements
    * @return
    */
   public boolean isQueueProcessorThreadActive() {
-    return queueProcessor.isActive();
+    if (isMultiExecutorMode())
+      return queueProcessor.isActive();
+    else
+      return false;
   }
 
   @Override
@@ -232,8 +276,8 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
-  public Set<Executor> getAllActiveExecutors() {
-    return activeExecutors;
+  public Collection<Executor> getAllActiveExecutors() {
+    return Collections.unmodifiableCollection(activeExecutors);
   }
 
   /**
@@ -254,7 +298,7 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public Set<String> getPrimaryServerHosts() {
-    // TODO: do we want to have a primary
+    // Only one for now. More probably later.
     HashSet<String> ports = new HashSet<String>();
     for (Executor executor : activeExecutors) {
       ports.add(executor.getHost() + ":" + executor.getPort());
@@ -296,32 +340,33 @@ public class ExecutorManager extends EventHandler implements
   }
 
   /**
-   * Gets a list of all the active (running, non-dispatched) executions for a
-   * given project and flow {@inheritDoc}
+   * Gets a list of all the active (running flows and non-dispatched flows)
+   * executions for a given project and flow {@inheritDoc}
    *
    * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
    *      java.lang.String)
    */
   @Override
   public List<Integer> getRunningFlows(int projectId, String flowId) {
-    ArrayList<Integer> executionIds = new ArrayList<Integer>();
-    getRunningFlowsHelper(projectId, flowId, executionIds,
-      queuedFlowMap.values());
-    getRunningFlowsHelper(projectId, flowId, executionIds,
-      runningFlows.values());
+    List<Integer> executionIds = new ArrayList<Integer>();
+    executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+      queuedFlowMap.values()));
+    executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+      runningFlows.values()));
     return executionIds;
   }
 
   /* Helper method for getRunningFlows */
-  private void getRunningFlowsHelper(int projectId, String flowId,
-    ArrayList<Integer> executionIds,
+  private List<Integer> getRunningFlowsHelper(int projectId, String flowId,
     Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    List<Integer> executionIds = new ArrayList<Integer>();
     for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       if (ref.getSecond().getFlowId().equals(flowId)
         && ref.getSecond().getProjectId() == projectId) {
         executionIds.add(ref.getFirst().getExecId());
       }
     }
+    return executionIds;
   }
 
   /**
@@ -462,141 +507,136 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-  /**
-   * Get recently finished flows {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorManagerAdapter#getRecentlyFinishedFlows()
-   */
   public List<ExecutableFlow> getRecentlyFinishedFlows() {
     return new ArrayList<ExecutableFlow>(recentlyFinished.values());
   }
 
   @Override
   public List<ExecutableFlow> getExecutableFlows(Project project,
-    String flowId, int skip, int size) throws ExecutorManagerException {
+      String flowId, int skip, int size) throws ExecutorManagerException {
     List<ExecutableFlow> flows =
-      executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
+        executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
     return flows;
   }
 
   @Override
   public List<ExecutableFlow> getExecutableFlows(int skip, int size)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
     return flows;
   }
 
   @Override
   public List<ExecutableFlow> getExecutableFlows(String flowIdContains,
-    int skip, int size) throws ExecutorManagerException {
+      int skip, int size) throws ExecutorManagerException {
     List<ExecutableFlow> flows =
-      executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
-        0, -1, -1, skip, size);
+        executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
+            0, -1, -1, skip, size);
     return flows;
   }
 
   @Override
   public List<ExecutableFlow> getExecutableFlows(String projContain,
-    String flowContain, String userContain, int status, long begin, long end,
-    int skip, int size) throws ExecutorManagerException {
+      String flowContain, String userContain, int status, long begin, long end,
+      int skip, int size) throws ExecutorManagerException {
     List<ExecutableFlow> flows =
-      executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
-        status, begin, end, skip, size);
+        executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
+            status, begin, end, skip, size);
     return flows;
   }
 
   @Override
   public List<ExecutableJobInfo> getExecutableJobs(Project project,
-    String jobId, int skip, int size) throws ExecutorManagerException {
+      String jobId, int skip, int size) throws ExecutorManagerException {
     List<ExecutableJobInfo> nodes =
-      executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
+        executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
     return nodes;
   }
 
   @Override
   public int getNumberOfJobExecutions(Project project, String jobId)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
   }
 
   @Override
   public int getNumberOfExecutions(Project project, String flowId)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
   }
 
   @Override
   public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
-    int length) throws ExecutorManagerException {
+      int length) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-      runningFlows.get(exFlow.getExecutionId());
+        runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
       Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
       Pair<String, String> offsetParam =
-        new Pair<String, String>("offset", String.valueOf(offset));
+          new Pair<String, String>("offset", String.valueOf(offset));
       Pair<String, String> lengthParam =
-        new Pair<String, String>("length", String.valueOf(length));
+          new Pair<String, String>("length", String.valueOf(length));
 
       @SuppressWarnings("unchecked")
       Map<String, Object> result =
-        callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
-          typeParam, offsetParam, lengthParam);
+          callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+              typeParam, offsetParam, lengthParam);
       return LogData.createLogDataFromObject(result);
     } else {
       LogData value =
-        executorLoader
-          .fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
+          executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
+              length);
       return value;
     }
   }
 
   @Override
   public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
-    int offset, int length, int attempt) throws ExecutorManagerException {
+      int offset, int length, int attempt) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-      runningFlows.get(exFlow.getExecutionId());
+        runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
       Pair<String, String> typeParam = new Pair<String, String>("type", "job");
       Pair<String, String> jobIdParam =
-        new Pair<String, String>("jobId", jobId);
+          new Pair<String, String>("jobId", jobId);
       Pair<String, String> offsetParam =
-        new Pair<String, String>("offset", String.valueOf(offset));
+          new Pair<String, String>("offset", String.valueOf(offset));
       Pair<String, String> lengthParam =
-        new Pair<String, String>("length", String.valueOf(length));
+          new Pair<String, String>("length", String.valueOf(length));
       Pair<String, String> attemptParam =
-        new Pair<String, String>("attempt", String.valueOf(attempt));
+          new Pair<String, String>("attempt", String.valueOf(attempt));
 
       @SuppressWarnings("unchecked")
       Map<String, Object> result =
-        callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
-          typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+          callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+              typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
       return LogData.createLogDataFromObject(result);
     } else {
       LogData value =
-        executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
-          offset, length);
+          executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
+              offset, length);
       return value;
     }
   }
 
   @Override
   public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
-    int attempt) throws ExecutorManagerException {
+      int attempt) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-      runningFlows.get(exFlow.getExecutionId());
+        runningFlows.get(exFlow.getExecutionId());
     if (pair == null) {
       return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
-        attempt);
+          attempt);
     }
 
     Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
     Pair<String, String> attemptParam =
-      new Pair<String, String>("attempt", String.valueOf(attempt));
+        new Pair<String, String>("attempt", String.valueOf(attempt));
 
     @SuppressWarnings("unchecked")
     Map<String, Object> result =
-      callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
-        jobIdParam, attemptParam);
+        callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
+            jobIdParam, attemptParam);
 
     @SuppressWarnings("unchecked")
     List<Object> jobStats = (List<Object>) result.get("attachments");
@@ -606,26 +646,26 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
-    String jobId, int offset, int length, int attempt)
-    throws ExecutorManagerException {
+      String jobId, int offset, int length, int attempt)
+      throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-      runningFlows.get(exFlow.getExecutionId());
+        runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
 
       Pair<String, String> typeParam = new Pair<String, String>("type", "job");
       Pair<String, String> jobIdParam =
-        new Pair<String, String>("jobId", jobId);
+          new Pair<String, String>("jobId", jobId);
       Pair<String, String> offsetParam =
-        new Pair<String, String>("offset", String.valueOf(offset));
+          new Pair<String, String>("offset", String.valueOf(offset));
       Pair<String, String> lengthParam =
-        new Pair<String, String>("length", String.valueOf(length));
+          new Pair<String, String>("length", String.valueOf(length));
       Pair<String, String> attemptParam =
-        new Pair<String, String>("attempt", String.valueOf(attempt));
+          new Pair<String, String>("attempt", String.valueOf(attempt));
 
       @SuppressWarnings("unchecked")
       Map<String, Object> result =
-        callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
-          typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+          callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
+              typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
       return JobMetaData.createJobMetaDataFromObject(result);
     } else {
       return null;
@@ -665,14 +705,14 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public void resumeFlow(ExecutableFlow exFlow, String userId)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     synchronized (exFlow) {
       Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+          runningFlows.get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
-          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
-          + " isn't running.");
+            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+            + " isn't running.");
       }
       callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
     }
@@ -680,14 +720,14 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public void pauseFlow(ExecutableFlow exFlow, String userId)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     synchronized (exFlow) {
       Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+          runningFlows.get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
-          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
-          + " isn't running.");
+            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+            + " isn't running.");
       }
       callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
     }
@@ -695,63 +735,63 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public void pauseExecutingJobs(ExecutableFlow exFlow, String userId,
-    String... jobIds) throws ExecutorManagerException {
+      String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId,
-      jobIds);
+        jobIds);
   }
 
   @Override
   public void resumeExecutingJobs(ExecutableFlow exFlow, String userId,
-    String... jobIds) throws ExecutorManagerException {
+      String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId,
-      jobIds);
+        jobIds);
   }
 
   @Override
   public void retryFailures(ExecutableFlow exFlow, String userId)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
   }
 
   @Override
   public void retryExecutingJobs(ExecutableFlow exFlow, String userId,
-    String... jobIds) throws ExecutorManagerException {
+      String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId,
-      jobIds);
+        jobIds);
   }
 
   @Override
   public void disableExecutingJobs(ExecutableFlow exFlow, String userId,
-    String... jobIds) throws ExecutorManagerException {
+      String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId,
-      jobIds);
+        jobIds);
   }
 
   @Override
   public void enableExecutingJobs(ExecutableFlow exFlow, String userId,
-    String... jobIds) throws ExecutorManagerException {
+      String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId,
-      jobIds);
+        jobIds);
   }
 
   @Override
   public void cancelExecutingJobs(ExecutableFlow exFlow, String userId,
-    String... jobIds) throws ExecutorManagerException {
+      String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId,
-      jobIds);
+        jobIds);
   }
 
   @SuppressWarnings("unchecked")
   private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow,
-    String command, String userId, String... jobIds)
-    throws ExecutorManagerException {
+      String command, String userId, String... jobIds)
+      throws ExecutorManagerException {
     synchronized (exFlow) {
       Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+          runningFlows.get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
-          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
-          + " isn't running.");
+            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+            + " isn't running.");
       }
 
       Map<String, Object> response = null;
@@ -761,24 +801,24 @@ public class ExecutorManager extends EventHandler implements
             ExecutableNode node = exFlow.getExecutableNode(jobId);
             if (node == null) {
               throw new ExecutorManagerException("Job " + jobId
-                + " doesn't exist in execution " + exFlow.getExecutionId()
-                + ".");
+                  + " doesn't exist in execution " + exFlow.getExecutionId()
+                  + ".");
             }
           }
         }
         String ids = StringUtils.join(jobIds, ',');
         response =
-          callExecutorServer(pair.getFirst(),
-            ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
-            new Pair<String, String>(
-              ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
-            new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+            callExecutorServer(pair.getFirst(),
+                ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+                new Pair<String, String>(
+                    ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
+                new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
       } else {
         response =
-          callExecutorServer(pair.getFirst(),
-            ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
-            new Pair<String, String>(
-              ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
+            callExecutorServer(pair.getFirst(),
+                ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+                new Pair<String, String>(
+                    ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
       }
 
       return response;
@@ -786,7 +826,7 @@ public class ExecutorManager extends EventHandler implements
   }
 
   private void applyDisabledJobs(List<Object> disabledJobs,
-    ExecutableFlowBase exflow) {
+      ExecutableFlowBase exflow) {
     for (Object disabled : disabledJobs) {
       if (disabled instanceof String) {
         String nodeName = (String) disabled;
@@ -800,7 +840,7 @@ public class ExecutorManager extends EventHandler implements
         String nodeName = (String) nestedDisabled.get("id");
         @SuppressWarnings("unchecked")
         List<Object> subDisabledJobs =
-          (List<Object>) nestedDisabled.get("children");
+            (List<Object>) nestedDisabled.get("children");
 
         if (nodeName == null || subDisabledJobs == null) {
           return;
@@ -823,7 +863,7 @@ public class ExecutorManager extends EventHandler implements
       logger.info("Submitting execution flow " + flowId + " by " + userId);
 
       String message = "";
-      if (queuedFlows.size() >= webserverQueueCapacity) {
+      if (queuedFlowList.size() >= webserverQueueCapacity) {
         message =
           String
             .format(
@@ -841,6 +881,7 @@ public class ExecutorManager extends EventHandler implements
         if (options == null) {
           options = new ExecutionOptions();
         }
+
         if (options.getDisabledJobs() != null) {
           applyDisabledJobs(options.getDisabledJobs(), exflow);
         }
@@ -883,9 +924,25 @@ public class ExecutorManager extends EventHandler implements
         // fails, we remove the reference.
         ExecutionReference reference =
           new ExecutionReference(exflow.getExecutionId());
-        // Added to db queue
-        executorLoader.addActiveExecutableReference(reference);
-        enqueueFlow(exflow, reference);
+
+        if (isMultiExecutorMode()) {
+          //Take MultiExecutor route
+          executorLoader.addActiveExecutableReference(reference);
+          enqueueFlow(exflow, reference);
+        } else {
+          // assign only local executor we have
+          reference.setExecutor(activeExecutors.iterator().next());
+          executorLoader.addActiveExecutableReference(reference);
+          try {
+            callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+            runningFlows.put(exflow.getExecutionId(),
+              new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+          } catch (ExecutorManagerException e) {
+            executorLoader.removeActiveExecutableReference(reference
+              .getExecId());
+            throw e;
+          }
+        }
         message +=
           "Execution submitted successfully with exec id "
             + exflow.getExecutionId();
@@ -894,10 +951,27 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
+
+  /**
+   * Wraps BoundedQueue Take method to have a corresponding update in
+   * queuedFlowMap lookup table
+   *
+   * @return
+   * @throws InterruptedException
+   */
+  private Pair<ExecutionReference, ExecutableFlow> waitAndFetchQueueHead()
+    throws InterruptedException {
+    Pair<ExecutionReference, ExecutableFlow> pair = queuedFlowList.take();
+    if (pair != null && pair.getFirst() != null) {
+      queuedFlowMap.remove(pair.getFirst().getExecId());
+    }
+    return pair;
+  }
+
   /* Helper method to have a single point of deletion in the queued flows */
   private void dequeueFlow(int executionId) {
-    if (queuedFlowMap.contains(executionId)) {
-      queuedFlows.remove(queuedFlowMap.get(executionId));
+    if (queuedFlowMap.containsKey(executionId)) {
+      queuedFlowList.remove(queuedFlowMap.get(executionId));
       queuedFlowMap.remove(executionId);
     }
   }
@@ -909,7 +983,7 @@ public class ExecutorManager extends EventHandler implements
       new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
     try {
       queuedFlowMap.put(exflow.getExecutionId(), pair);
-      queuedFlows.put(pair);
+      queuedFlowList.put(pair);
     } catch (InterruptedException e) {
       String errMsg = "Failed to queue flow " + exflow.getExecutionId();
       logger.error(errMsg, e);
@@ -928,54 +1002,50 @@ public class ExecutorManager extends EventHandler implements
   }
 
   private Map<String, Object> callExecutorServer(ExecutionReference ref,
-    String action) throws ExecutorManagerException {
+      String action) throws ExecutorManagerException {
     try {
-      Executor executor = ref.getExecutor();
-      return callExecutorServer(executor.getHost(), executor.getPort(), action,
-        ref.getExecId(), null, (Pair<String, String>[]) null);
+      return callExecutorServer(ref.getHost(), ref.getPort(), action,
+          ref.getExecId(), null, (Pair<String, String>[]) null);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
   private Map<String, Object> callExecutorServer(ExecutionReference ref,
-    String action, String user) throws ExecutorManagerException {
+      String action, String user) throws ExecutorManagerException {
     try {
-      Executor executor = ref.getExecutor();
-      return callExecutorServer(executor.getHost(), executor.getPort(), action,
-        ref.getExecId(), user, (Pair<String, String>[]) null);
+      return callExecutorServer(ref.getHost(), ref.getPort(), action,
+          ref.getExecId(), user, (Pair<String, String>[]) null);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
   private Map<String, Object> callExecutorServer(ExecutionReference ref,
-    String action, Pair<String, String>... params)
-    throws ExecutorManagerException {
+      String action, Pair<String, String>... params)
+      throws ExecutorManagerException {
     try {
-      Executor executor = ref.getExecutor();
-      return callExecutorServer(executor.getHost(), executor.getPort(), action,
-        ref.getExecId(), null, params);
+      return callExecutorServer(ref.getHost(), ref.getPort(), action,
+          ref.getExecId(), null, params);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
   private Map<String, Object> callExecutorServer(ExecutionReference ref,
-    String action, String user, Pair<String, String>... params)
-    throws ExecutorManagerException {
+      String action, String user, Pair<String, String>... params)
+      throws ExecutorManagerException {
     try {
-      Executor executor = ref.getExecutor();
-      return callExecutorServer(executor.getHost(), executor.getPort(), action,
-        ref.getExecId(), user, params);
+      return callExecutorServer(ref.getHost(), ref.getPort(), action,
+          ref.getExecId(), user, params);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
   private Map<String, Object> callExecutorServer(String host, int port,
-    String action, Integer executionId, String user,
-    Pair<String, String>... params) throws IOException {
+      String action, Integer executionId, String user,
+      Pair<String, String>... params) throws IOException {
     URIBuilder builder = new URIBuilder();
     builder.setScheme("http").setHost(host).setPort(port).setPath("/executor");
 
@@ -983,7 +1053,7 @@ public class ExecutorManager extends EventHandler implements
 
     if (executionId != null) {
       builder.setParameter(ConnectorParams.EXECID_PARAM,
-        String.valueOf(executionId));
+          String.valueOf(executionId));
     }
 
     if (user != null) {
@@ -1018,7 +1088,7 @@ public class ExecutorManager extends EventHandler implements
 
     @SuppressWarnings("unchecked")
     Map<String, Object> jsonResponse =
-      (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
     String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
     if (error != null) {
       throw new IOException(error);
@@ -1038,8 +1108,7 @@ public class ExecutorManager extends EventHandler implements
    */
   @Override
   public Map<String, Object> callExecutorStats(int executorId, String action,
-    Pair<String, String>... params) throws IOException,
-    ExecutorManagerException {
+    Pair<String, String>... params) throws IOException, ExecutorManagerException {
 
     URIBuilder builder = new URIBuilder();
     Executor executor = fetchExecutor(executorId);
@@ -1081,14 +1150,15 @@ public class ExecutorManager extends EventHandler implements
     return jsonResponse;
   }
 
+
   @Override
   public Map<String, Object> callExecutorJMX(String hostPort, String action,
-    String mBean) throws IOException {
+      String mBean) throws IOException {
     URIBuilder builder = new URIBuilder();
 
     String[] hostPortSplit = hostPort.split(":");
     builder.setScheme("http").setHost(hostPortSplit[0])
-      .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
+        .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
 
     builder.setParameter(action, "");
     if (mBean != null) {
@@ -1117,7 +1187,7 @@ public class ExecutorManager extends EventHandler implements
 
     @SuppressWarnings("unchecked")
     Map<String, Object> jsonResponse =
-      (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
     String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
     if (error != null) {
       throw new IOException(error);
@@ -1127,10 +1197,175 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public void shutdown() {
-    queueProcessor.shutdown();
+    if (isMultiExecutorMode()) {
+      queueProcessor.shutdown();
+    }
     executingManager.shutdown();
   }
 
+  private class ExecutingManagerUpdaterThread extends Thread {
+    private boolean shutdown = false;
+
+    public ExecutingManagerUpdaterThread() {
+      this.setName("ExecutorManagerUpdaterThread");
+    }
+
+    // 10 mins recently finished threshold.
+    private long recentlyFinishedLifetimeMs = 600000;
+    private int waitTimeIdleMs = 2000;
+    private int waitTimeMs = 500;
+
+    // When we have an http error, for that flow, we'll check every 10 secs, 6
+    // times (1 mins) before we evict.
+    private int numErrors = 6;
+    private long errorThreshold = 10000;
+
+    private void shutdown() {
+      shutdown = true;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void run() {
+      while (!shutdown) {
+        try {
+          lastThreadCheckTime = System.currentTimeMillis();
+          updaterStage = "Starting update all flows.";
+
+          Map<Executor, List<ExecutableFlow>> exFlowMap =
+              getFlowToExecutorMap();
+          ArrayList<ExecutableFlow> finishedFlows =
+              new ArrayList<ExecutableFlow>();
+          ArrayList<ExecutableFlow> finalizeFlows =
+              new ArrayList<ExecutableFlow>();
+
+          if (exFlowMap.size() > 0) {
+            for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
+                .entrySet()) {
+              List<Long> updateTimesList = new ArrayList<Long>();
+              List<Integer> executionIdsList = new ArrayList<Integer>();
+
+              Executor executor = entry.getKey();
+
+              updaterStage =
+                  "Starting update flows on " + executor.getHost() + ":"
+                      + executor.getPort();
+
+              // We pack the parameters of the same host together before we
+              // query.
+              fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
+                  updateTimesList);
+
+              Pair<String, String> updateTimes =
+                  new Pair<String, String>(
+                      ConnectorParams.UPDATE_TIME_LIST_PARAM,
+                      JSONUtils.toJSON(updateTimesList));
+              Pair<String, String> executionIds =
+                  new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
+                      JSONUtils.toJSON(executionIdsList));
+
+              Map<String, Object> results = null;
+              try {
+                results =
+                    callExecutorServer(executor.getHost(),
+                      executor.getPort(), ConnectorParams.UPDATE_ACTION,
+                        null, null, executionIds, updateTimes);
+              } catch (IOException e) {
+                logger.error(e);
+                for (ExecutableFlow flow : entry.getValue()) {
+                  Pair<ExecutionReference, ExecutableFlow> pair =
+                      runningFlows.get(flow.getExecutionId());
+
+                  updaterStage =
+                      "Failed to get update. Doing some clean up for flow "
+                          + pair.getSecond().getExecutionId();
+
+                  if (pair != null) {
+                    ExecutionReference ref = pair.getFirst();
+                    int numErrors = ref.getNumErrors();
+                    if (ref.getNumErrors() < this.numErrors) {
+                      ref.setNextCheckTime(System.currentTimeMillis()
+                          + errorThreshold);
+                      ref.setNumErrors(++numErrors);
+                    } else {
+                      logger.error("Evicting flow " + flow.getExecutionId()
+                          + ". The executor is unresponsive.");
+                      // TODO should send out an unresponsive email here.
+                      finalizeFlows.add(pair.getSecond());
+                    }
+                  }
+                }
+              }
+
+              // We gets results
+              if (results != null) {
+                List<Map<String, Object>> executionUpdates =
+                    (List<Map<String, Object>>) results
+                        .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+                for (Map<String, Object> updateMap : executionUpdates) {
+                  try {
+                    ExecutableFlow flow = updateExecution(updateMap);
+
+                    updaterStage = "Updated flow " + flow.getExecutionId();
+
+                    if (isFinished(flow)) {
+                      finishedFlows.add(flow);
+                      finalizeFlows.add(flow);
+                    }
+                  } catch (ExecutorManagerException e) {
+                    ExecutableFlow flow = e.getExecutableFlow();
+                    logger.error(e);
+
+                    if (flow != null) {
+                      logger.error("Finalizing flow " + flow.getExecutionId());
+                      finalizeFlows.add(flow);
+                    }
+                  }
+                }
+              }
+            }
+
+            updaterStage = "Evicting old recently finished flows.";
+
+            evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
+            // Add new finished
+            for (ExecutableFlow flow : finishedFlows) {
+              if (flow.getScheduleId() >= 0
+                  && flow.getStatus() == Status.SUCCEEDED) {
+                ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
+                    cacheDir);
+              }
+              fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
+              recentlyFinished.put(flow.getExecutionId(), flow);
+            }
+
+            updaterStage =
+                "Finalizing " + finalizeFlows.size() + " error flows.";
+
+            // Kill error flows
+            for (ExecutableFlow flow : finalizeFlows) {
+              finalizeFlows(flow);
+            }
+          }
+
+          updaterStage = "Updated all active flows. Waiting for next round.";
+
+          synchronized (this) {
+            try {
+              if (runningFlows.size() > 0) {
+                this.wait(waitTimeMs);
+              } else {
+                this.wait(waitTimeIdleMs);
+              }
+            } catch (InterruptedException e) {
+            }
+          }
+        } catch (Exception e) {
+          logger.error(e);
+        }
+      }
+    }
+  }
+
   private void finalizeFlows(ExecutableFlow flow) {
 
     int execId = flow.getExecutionId();
@@ -1165,7 +1400,6 @@ public class ExecutorManager extends EventHandler implements
 
       updaterStage = "finalizing flow " + execId + " cleaning from memory";
       runningFlows.remove(execId);
-
       fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED));
       recentlyFinished.put(execId, dsFlow);
 
@@ -1183,12 +1417,12 @@ public class ExecutorManager extends EventHandler implements
     Alerter mailAlerter = alerters.get("email");
     if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
       if (options.getFailureEmails() != null
-        && !options.getFailureEmails().isEmpty()) {
+          && !options.getFailureEmails().isEmpty()) {
         try {
           mailAlerter
-            .alertOnError(
-              flow,
-              "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+              .alertOnError(
+                  flow,
+                  "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
         } catch (Exception e) {
           logger.error(e);
         }
@@ -1199,9 +1433,9 @@ public class ExecutorManager extends EventHandler implements
         if (alerter != null) {
           try {
             alerter
-              .alertOnError(
-                flow,
-                "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+                .alertOnError(
+                    flow,
+                    "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
           } catch (Exception e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
@@ -1209,12 +1443,12 @@ public class ExecutorManager extends EventHandler implements
           }
         } else {
           logger.error("Alerter type " + alertType
-            + " doesn't exist. Failed to alert.");
+              + " doesn't exist. Failed to alert.");
         }
       }
     } else {
       if (options.getSuccessEmails() != null
-        && !options.getSuccessEmails().isEmpty()) {
+          && !options.getSuccessEmails().isEmpty()) {
         try {
 
           mailAlerter.alertOnSuccess(flow);
@@ -1235,7 +1469,7 @@ public class ExecutorManager extends EventHandler implements
           }
         } else {
           logger.error("Alerter type " + alertType
-            + " doesn't exist. Failed to alert.");
+              + " doesn't exist. Failed to alert.");
         }
       }
     }
@@ -1278,7 +1512,7 @@ public class ExecutorManager extends EventHandler implements
 
   private void evictOldRecentlyFinished(long ageMs) {
     ArrayList<Integer> recentlyFinishedKeys =
-      new ArrayList<Integer>(recentlyFinished.keySet());
+        new ArrayList<Integer>(recentlyFinished.keySet());
     long oldAgeThreshold = System.currentTimeMillis() - ageMs;
     for (Integer key : recentlyFinishedKeys) {
       ExecutableFlow flow = recentlyFinished.get(key);
@@ -1291,20 +1525,20 @@ public class ExecutorManager extends EventHandler implements
   }
 
   private ExecutableFlow updateExecution(Map<String, Object> updateData)
-    throws ExecutorManagerException {
+      throws ExecutorManagerException {
 
     Integer execId =
-      (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
+        (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
     if (execId == null) {
       throw new ExecutorManagerException(
-        "Response is malformed. Need exec id to update.");
+          "Response is malformed. Need exec id to update.");
     }
 
     Pair<ExecutionReference, ExecutableFlow> refPair =
-      this.runningFlows.get(execId);
+        this.runningFlows.get(execId);
     if (refPair == null) {
       throw new ExecutorManagerException(
-        "No running flow found with the execution id. Removing " + execId);
+          "No running flow found with the execution id. Removing " + execId);
     }
 
     ExecutionReference ref = refPair.getFirst();
@@ -1346,7 +1580,7 @@ public class ExecutorManager extends EventHandler implements
           }
         } else {
           logger.error("Alerter type " + alertType
-            + " doesn't exist. Failed to alert.");
+              + " doesn't exist. Failed to alert.");
         }
       }
     }
@@ -1366,13 +1600,14 @@ public class ExecutorManager extends EventHandler implements
   }
 
   private void fillUpdateTimeAndExecId(List<ExecutableFlow> flows,
-    List<Integer> executionIds, List<Long> updateTimes) {
+      List<Integer> executionIds, List<Long> updateTimes) {
     for (ExecutableFlow flow : flows) {
       executionIds.add(flow.getExecutionId());
       updateTimes.add(flow.getUpdateTime());
     }
   }
 
+  /* Group Executable flow by Executors to reduce number of REST calls */
   private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
     HashMap<Executor, List<ExecutableFlow>> exFlowMap =
       new HashMap<Executor, List<ExecutableFlow>>();
@@ -1403,194 +1638,93 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public int getExecutableFlows(int projectId, String flowId, int from,
-    int length, List<ExecutableFlow> outputList)
-    throws ExecutorManagerException {
+      int length, List<ExecutableFlow> outputList)
+      throws ExecutorManagerException {
     List<ExecutableFlow> flows =
-      executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+        executorLoader.fetchFlowHistory(projectId, flowId, from, length);
     outputList.addAll(flows);
     return executorLoader.fetchNumExecutableFlows(projectId, flowId);
   }
 
   @Override
   public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId,
-    int from, int length, Status status) throws ExecutorManagerException {
+      int from, int length, Status status) throws ExecutorManagerException {
     return executorLoader.fetchFlowHistory(projectId, flowId, from, length,
-      status);
+        status);
   }
 
-  private class ExecutingManagerUpdaterThread extends Thread {
-    private boolean shutdown = false;
+  /*
+   * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
+   */
+  private class CleanerThread extends Thread {
+    // log file retention is 1 month.
 
-    public ExecutingManagerUpdaterThread() {
-      this.setName("ExecutorManagerUpdaterThread");
-    }
+    // check every day
+    private static final long CLEANER_THREAD_WAIT_INTERVAL_MS =
+        24 * 60 * 60 * 1000;
 
-    // 10 mins recently finished threshold.
-    private long recentlyFinishedLifetimeMs = 600000;
-    private int waitTimeIdleMs = 2000;
-    private int waitTimeMs = 500;
+    private final long executionLogsRetentionMs;
 
-    // When we have an http error, for that flow, we'll check every 10 secs, 6
-    // times (1 mins) before we evict.
-    private int numErrors = 6;
-    private long errorThreshold = 10000;
+    private boolean shutdown = false;
+    private long lastLogCleanTime = -1;
 
-    private void shutdown() {
+    public CleanerThread(long executionLogsRetentionMs) {
+      this.executionLogsRetentionMs = executionLogsRetentionMs;
+      this.setName("AzkabanWebServer-Cleaner-Thread");
+    }
+
+    @SuppressWarnings("unused")
+    public void shutdown() {
       shutdown = true;
+      this.interrupt();
     }
 
-    @SuppressWarnings("unchecked")
     public void run() {
       while (!shutdown) {
-        try {
-          lastThreadCheckTime = System.currentTimeMillis();
-          updaterStage = "Starting update all flows.";
-
-          Map<Executor, List<ExecutableFlow>> exFlowMap =
-            getFlowToExecutorMap();
-          ArrayList<ExecutableFlow> finishedFlows =
-            new ArrayList<ExecutableFlow>();
-          ArrayList<ExecutableFlow> finalizeFlows =
-            new ArrayList<ExecutableFlow>();
-
-          if (exFlowMap.size() > 0) {
-            for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
-              .entrySet()) {
-              List<Long> updateTimesList = new ArrayList<Long>();
-              List<Integer> executionIdsList = new ArrayList<Integer>();
-
-              Executor executor = entry.getKey();
-
-              updaterStage =
-                "Starting update flows on " + executor.getHost() + ":"
-                  + executor.getPort();
-
-              // We pack the parameters of the same host together before we
-              // query.
-              fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
-                updateTimesList);
-
-              Pair<String, String> updateTimes =
-                new Pair<String, String>(
-                  ConnectorParams.UPDATE_TIME_LIST_PARAM,
-                  JSONUtils.toJSON(updateTimesList));
-              Pair<String, String> executionIds =
-                new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
-                  JSONUtils.toJSON(executionIdsList));
-
-              Map<String, Object> results = null;
-              try {
-                results =
-                  callExecutorServer(executor.getHost(), executor.getPort(),
-                    ConnectorParams.UPDATE_ACTION, null, null, executionIds,
-                    updateTimes);
-              } catch (IOException e) {
-                logger.error(e);
-                for (ExecutableFlow flow : entry.getValue()) {
-                  Pair<ExecutionReference, ExecutableFlow> pair =
-                    runningFlows.get(flow.getExecutionId());
-
-                  updaterStage =
-                    "Failed to get update. Doing some clean up for flow "
-                      + pair.getSecond().getExecutionId();
-
-                  if (pair != null) {
-                    ExecutionReference ref = pair.getFirst();
-                    int numErrors = ref.getNumErrors();
-                    if (ref.getNumErrors() < this.numErrors) {
-                      ref.setNextCheckTime(System.currentTimeMillis()
-                        + errorThreshold);
-                      ref.setNumErrors(++numErrors);
-                    } else {
-                      logger.error("Evicting flow " + flow.getExecutionId()
-                        + ". The executor is unresponsive.");
-                      // TODO should send out an unresponsive email here.
-                      finalizeFlows.add(pair.getSecond());
-                    }
-                  }
-                }
-              }
-
-              // We gets results
-              if (results != null) {
-                List<Map<String, Object>> executionUpdates =
-                  (List<Map<String, Object>>) results
-                    .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
-                for (Map<String, Object> updateMap : executionUpdates) {
-                  try {
-                    ExecutableFlow flow = updateExecution(updateMap);
-
-                    updaterStage = "Updated flow " + flow.getExecutionId();
-
-                    if (isFinished(flow)) {
-                      finishedFlows.add(flow);
-                      finalizeFlows.add(flow);
-                    }
-                  } catch (ExecutorManagerException e) {
-                    ExecutableFlow flow = e.getExecutableFlow();
-                    logger.error(e);
-
-                    if (flow != null) {
-                      logger.error("Finalizing flow " + flow.getExecutionId());
-                      finalizeFlows.add(flow);
-                    }
-                  }
-                }
-              }
-            }
-
-            updaterStage = "Evicting old recently finished flows.";
-
-            evictOldRecentlyFinished(recentlyFinishedLifetimeMs);
-            // Add new finished
-            for (ExecutableFlow flow : finishedFlows) {
-              if (flow.getScheduleId() >= 0
-                && flow.getStatus() == Status.SUCCEEDED) {
-                ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
-                  cacheDir);
-              }
-              fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
-              recentlyFinished.put(flow.getExecutionId(), flow);
-            }
-
-            updaterStage =
-              "Finalizing " + finalizeFlows.size() + " error flows.";
+        synchronized (this) {
+          try {
+            lastCleanerThreadCheckTime = System.currentTimeMillis();
 
-            // Kill error flows
-            for (ExecutableFlow flow : finalizeFlows) {
-              finalizeFlows(flow);
+            // Cleanup old stuff.
+            long currentTime = System.currentTimeMillis();
+            if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
+              cleanExecutionLogs();
+              lastLogCleanTime = currentTime;
             }
-          }
 
-          updaterStage = "Updated all active flows. Waiting for next round.";
-
-          synchronized (this) {
-            try {
-              if (runningFlows.size() > 0) {
-                this.wait(waitTimeMs);
-              } else {
-                this.wait(waitTimeIdleMs);
-              }
-            } catch (InterruptedException e) {
-            }
+            wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
+          } catch (InterruptedException e) {
+            logger.info("Interrupted. Probably to shut down.");
           }
-        } catch (Exception e) {
-          logger.error(e);
         }
       }
     }
+
+    private void cleanExecutionLogs() {
+      logger.info("Cleaning old logs from execution_logs");
+      long cutoff = DateTime.now().getMillis() - executionLogsRetentionMs;
+      logger.info("Cleaning old log files before "
+          + new DateTime(cutoff).toString());
+      cleanOldExecutionLogs(DateTime.now().getMillis()
+          - executionLogsRetentionMs);
+    }
   }
 
+
   /*
    * This thread is responsible for processing queued flows.
    */
   private class QueueProcessorThread extends Thread {
     private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
     private static final long ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS = 1000;
+    private static final int MAX_DISPATCHING_ERRORS_PERMITTED = 5;
+    private static final int MAX_CONTINUOUS_FLOW_PROCESSED = 10;
+
     private boolean shutdown = false;
     private boolean isActive = true;
 
-    public QueueProcessorThread() {
+    public QueueProcessorThread(boolean isActive) {
+      setActive(isActive);
       this.setName("AzkabanWebServer-QueueProcessor-Thread");
     }
 
@@ -1614,130 +1748,117 @@ public class ExecutorManager extends EventHandler implements
           try {
             // start processing queue if active, other wait for sometime
             if (isActive) {
-              processQueuedFlows(ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS);
+              processQueuedFlows(ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS,
+                MAX_CONTINUOUS_FLOW_PROCESSED);
             }
             wait(QUEUE_PROCESSOR_WAIT_IN_MS);
-          } catch (InterruptedException e) {
+          } catch (Exception e) {
             logger.info(
               "QueueProcessorThread Interrupted. Probably to shut down.", e);
           }
         }
       }
     }
-  }
 
-  private void refreshExecutors() {
-    synchronized (activeExecutors) {
-      // TODO: rest api call to refresh executor stats
-    }
-  }
+    /* Method responsible for processing the non-dispatched flows */
+    private void processQueuedFlows(long activeExecutorsRefreshWindow,
+      int maxContinuousFlowProcessed) throws InterruptedException,
+      ExecutorManagerException {
+      long lastProcessingTime = System.currentTimeMillis();
+      Pair<ExecutionReference, ExecutableFlow> runningCandidate;
+      int currentContinuousFlowProcessed = 0;
+
+      while (isActive() && (runningCandidate = waitAndFetchQueueHead()) != null) {
+        ExecutionReference reference = runningCandidate.getFirst();
+        ExecutableFlow exflow = runningCandidate.getSecond();
+
+        long currentTime = System.currentTimeMillis();
+        if (currentTime - lastProcessingTime > activeExecutorsRefreshWindow
+              || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
+          refreshExecutors(); // Refresh executor stats to be used by selector
+          lastProcessingTime = currentTime;
+          currentContinuousFlowProcessed = 0;
+        }
 
-  private void processQueuedFlows(long activeExecutorsRefreshWindow)
-    throws InterruptedException {
-    long lastProcessingTime = System.currentTimeMillis();
-    Pair<ExecutionReference, ExecutableFlow> runningCandidate;
-    while ((runningCandidate = queuedFlows.take()) != null) {
-      // stop queue processing, if queueProcessor is marked inactive
-      if (!queueProcessor.isActive())
-        return;
-
-      long currentTime = System.currentTimeMillis();
-      if (currentTime - lastProcessingTime > activeExecutorsRefreshWindow) {
-        // Refresh executor stats to be used by selector
-        refreshExecutors();
-        lastProcessingTime = currentTime;
+        // process flow with current snapshot of activeExecutors
+        processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+        currentContinuousFlowProcessed++;
       }
+    }
 
-      ExecutionReference reference = runningCandidate.getFirst();
-      ExecutableFlow exflow = runningCandidate.getSecond();
+    /* process flow with a snapshot of available Executors */
+    private void processFlow(ExecutionReference reference,
+      ExecutableFlow exflow, Set<Executor> availableExecutors)
+      throws ExecutorManagerException {
       synchronized (exflow) {
-        Executor choosenExecutor;
-
-        // TODO: use dispatcher
-        synchronized (activeExecutors) {
-          choosenExecutor = activeExecutors.iterator().next();
-        }
-
-        if (choosenExecutor != null) {
-
+        Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
+        if (selectedExecutor != null) {
           try {
-            // TODO: ADD rest call to do an actual dispatch
-            callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
-
-            executorLoader.assignExecutor(exflow.getExecutionId(),
-              choosenExecutor.getId());
-            reference.setExecutor(choosenExecutor);
-
-            // move from queuedFlows to running flows
-            dequeueFlow(exflow.getExecutionId());
-            runningFlows.put(exflow.getExecutionId(),
-              new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+            dispatch(reference, exflow, selectedExecutor);
           } catch (ExecutorManagerException e) {
-            logger.error("Failed to process queued flow", e);
-            // TODO: allow N errors and re-try
-            finalizeFlows(exflow);
+            logger.debug(String.format(
+              "Executor %s responded with exception for exec: %d",
+              selectedExecutor, exflow.getExecutionId()), e);
+            handleDispatchExceptionCase(reference, exflow, selectedExecutor,
+              availableExecutors);
           }
         } else {
-          // TODO: handle scenario where dispatcher didn't assigned any executor
+          handleNoExecutorSelectedCase(reference, exflow);
         }
       }
     }
-  }
-
-  /*
-   * cleaner thread to clean up execution_logs, etc in DB. Runs every day.
-   */
-  private class CleanerThread extends Thread {
-    // log file retention is 1 month.
 
-    // check every day
-    private static final long CLEANER_THREAD_WAIT_INTERVAL_MS =
-      24 * 60 * 60 * 1000;
-
-    private final long executionLogsRetentionMs;
-
-    private boolean shutdown = false;
-    private long lastLogCleanTime = -1;
-
-    public CleanerThread(long executionLogsRetentionMs) {
-      this.executionLogsRetentionMs = executionLogsRetentionMs;
-      this.setName("AzkabanWebServer-Cleaner-Thread");
+    /* Choose Executor for exflow among the available executors */
+    private Executor selectExecutor(ExecutableFlow exflow,
+      Set<Executor> availableExecutors) {
+      Executor choosenExecutor;
+      // TODO: use dispatcher
+      choosenExecutor = availableExecutors.iterator().next();
+      return choosenExecutor;
+    }
+
+    private void handleDispatchExceptionCase(ExecutionReference reference,
+      ExecutableFlow exflow, Executor lastSelectedExecutor,
+      Set<Executor> remainingExecutors) throws ExecutorManagerException {
+      reference.setNumErrors(reference.getNumErrors() + 1);
+      reference.setExecutor(null);
+      if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
+        || remainingExecutors.size() <= 1) {
+        logger.error("Failed to process queued flow");
+        finalizeFlows(exflow);
+      } else {
+        remainingExecutors.remove(lastSelectedExecutor);
+        // try other executors except chosenExecutor
+        processFlow(reference, exflow, remainingExecutors);
+      }
     }
 
-    @SuppressWarnings("unused")
-    public void shutdown() {
-      shutdown = true;
-      this.interrupt();
+    private void handleNoExecutorSelectedCase(ExecutionReference reference,
+      ExecutableFlow exflow) throws ExecutorManagerException {
+      reference.setNumErrors(reference.getNumErrors() + 1);
+      // Scenario: when dispatcher didn't assigned any executor
+      if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED) {
+        finalizeFlows(exflow);
+      } else {
+        // again queue this flow
+        enqueueFlow(exflow, reference);
+      }
     }
 
-    public void run() {
-      while (!shutdown) {
-        synchronized (this) {
-          try {
-            lastCleanerThreadCheckTime = System.currentTimeMillis();
+    private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+      Executor choosenExecutor) throws ExecutorManagerException {
+      exflow.setUpdateTime(System.currentTimeMillis());
 
-            // Cleanup old stuff.
-            long currentTime = System.currentTimeMillis();
-            if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > lastLogCleanTime) {
-              cleanExecutionLogs();
-              lastLogCleanTime = currentTime;
-            }
-
-            wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
-          } catch (InterruptedException e) {
-            logger.info("Interrupted. Probably to shut down.");
-          }
-        }
-      }
-    }
+      // to be moved after db update once we integrate rest api changes
+      reference.setExecutor(choosenExecutor);
+      // TODO: ADD rest call to do an actual dispatch
+      callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+      executorLoader.assignExecutor(exflow.getExecutionId(),
+        choosenExecutor.getId());
 
-    private void cleanExecutionLogs() {
-      logger.info("Cleaning old logs from execution_logs");
-      long cutoff = DateTime.now().getMillis() - executionLogsRetentionMs;
-      logger.info("Cleaning old log files before "
-        + new DateTime(cutoff).toString());
-      cleanOldExecutionLogs(DateTime.now().getMillis()
-        - executionLogsRetentionMs);
+      // move from flow to running flows
+      runningFlows.put(exflow.getExecutionId(),
+        new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
     }
   }
-}
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 6991c20..2b47293 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -18,6 +18,7 @@ package azkaban.executor;
 
 import java.io.IOException;
 import java.lang.Thread.State;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -210,11 +211,12 @@ public interface ExecutorManagerAdapter {
   public Set<? extends String> getPrimaryServerHosts();
 
   /**
-   * Returns a set of all the active executors maintained by active executors
+   * Returns a collection of all the active executors maintained by active
+   * executors
    *
    * @return
    */
-  public Set<Executor> getAllActiveExecutors();
+  public Collection<Executor> getAllActiveExecutors();
 
   /**
    * <pre>
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 3d14a17..7740163 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -1033,10 +1033,10 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
    *
    * {@inheritDoc}
    *
-   * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecution(int)
+   * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecutionId(int)
    */
   @Override
-  public Executor fetchExecutorByExecution(int executionId)
+  public Executor fetchExecutorByExecutionId(int executionId)
     throws ExecutorManagerException {
     QueryRunner runner = createQueryRunner();
     FetchExecutorHandler executorHandler = new FetchExecutorHandler();
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
index 09abade..319f314 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -50,8 +50,8 @@ public class ExecutableFlowPriorityComparatorTest {
   }
 
   /* Helper method to create an ExecutableFlow from serialized description */
-  private ExecutableFlow createExecutableFlow(String flowName, int execId,
-    int priority) throws IOException {
+  private ExecutableFlow createExecutableFlow(String flowName, int priority,
+    long updateTime) throws IOException {
     File jsonFlowFile = getFlowDir(flowName);
     @SuppressWarnings("unchecked")
     HashMap<String, Object> flowObj =
@@ -64,7 +64,7 @@ public class ExecutableFlowPriorityComparatorTest {
     project.setFlows(flowMap);
     ExecutableFlow execFlow = new ExecutableFlow(project, flow);
 
-    execFlow.setExecutionId(execId);
+    execFlow.setUpdateTime(updateTime);
     if (priority > 0) {
       execFlow.getExecutionOptions().getFlowParameters()
         .put(ExecutionOptions.FLOW_PRIORITY, String.valueOf(priority));
@@ -76,9 +76,9 @@ public class ExecutableFlowPriorityComparatorTest {
   @Test
   public void testExplicitlySpecifiedPriorities() throws IOException,
     InterruptedException {
-    ExecutableFlow flow1 = createExecutableFlow("exec1", 1, 5);
-    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 6);
-    ExecutableFlow flow3 = createExecutableFlow("exec3", 3, 2);
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 5, 3);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 6, 3);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", 2, 3);
     ExecutionReference dummyRef = new ExecutionReference(0);
 
     BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
@@ -97,9 +97,9 @@ public class ExecutableFlowPriorityComparatorTest {
   @Test
   public void testMixedSpecifiedPriorities() throws IOException,
     InterruptedException {
-    ExecutableFlow flow1 = createExecutableFlow("exec1", 1, 3);
-    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2);
-    ExecutableFlow flow3 = createExecutableFlow("exec3", 3, -2);
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 3);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 3);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3);
     ExecutionReference dummyRef = new ExecutionReference(0);
 
     BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
@@ -120,10 +120,10 @@ public class ExecutableFlowPriorityComparatorTest {
    */
   @Test
   public void testEqualPriorities() throws IOException, InterruptedException {
-    ExecutableFlow flow1 = createExecutableFlow("exec1", 1, 3);
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1);
     ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2);
-    ExecutableFlow flow3 = createExecutableFlow("exec3", 3, -2);
-    ExecutableFlow flow4 = createExecutableFlow("exec3", 4, 3);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3);
+    ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4);
     ExecutionReference dummyRef = new ExecutionReference(0);
 
     BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 7fa7f03..d77b734 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -19,6 +19,7 @@ package azkaban.executor;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -64,6 +65,8 @@ public class ExecutorManagerTest {
     ExecutorLoader loader) throws ExecutorManagerException {
     Props props = new Props();
     props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
+
     loader.addExecutor("localhost", 12345);
     loader.addExecutor("localhost", 12346);
     return new ExecutorManager(props, loader, new HashMap<String, Alerter>());
@@ -117,7 +120,8 @@ public class ExecutorManagerTest {
     ExecutorLoader loader = new MockExecutorLoader();
     ExecutorManager manager =
       new ExecutorManager(props, loader, new HashMap<String, Alerter>());
-    Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+    Set<Executor> activeExecutors =
+      new HashSet(manager.getAllActiveExecutors());
 
     Assert.assertEquals(activeExecutors.size(), 1);
     Executor executor = activeExecutors.iterator().next();
@@ -140,7 +144,8 @@ public class ExecutorManagerTest {
 
     ExecutorManager manager =
       new ExecutorManager(props, loader, new HashMap<String, Alerter>());
-    Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+    Set<Executor> activeExecutors =
+      new HashSet(manager.getAllActiveExecutors());
     Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
       executor1, executor2 });
   }
@@ -157,8 +162,7 @@ public class ExecutorManagerTest {
 
     ExecutorManager manager =
       new ExecutorManager(props, loader, new HashMap<String, Alerter>());
-    Set<Executor> activeExecutors = manager.getAllActiveExecutors();
-    Assert.assertArrayEquals(activeExecutors.toArray(),
+    Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
       new Executor[] { executor1 });
 
     // mark older executor as inactive
@@ -168,8 +172,8 @@ public class ExecutorManagerTest {
     Executor executor3 = loader.addExecutor("localhost", 12347);
     manager.setupExecutors();
 
-    Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
-      executor2, executor3 });
+    Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
+      new Executor[] { executor2, executor3 });
   }
 
   /*
@@ -186,7 +190,8 @@ public class ExecutorManagerTest {
 
       ExecutorManager manager =
         new ExecutorManager(props, loader, new HashMap<String, Alerter>());
-      Set<Executor> activeExecutors = manager.getAllActiveExecutors();
+      Set<Executor> activeExecutors =
+        new HashSet(manager.getAllActiveExecutors());
       Assert.assertArrayEquals(activeExecutors.toArray(),
         new Executor[] { executor1 });
 
@@ -204,6 +209,8 @@ public class ExecutorManagerTest {
   @Test
   public void testDisablingQueueProcessThread() throws ExecutorManagerException {
     ExecutorManager manager = createMultiExecutorManagerInstance();
+    manager.enableQueueProcessorThread();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
     manager.disableQueueProcessorThread();
     Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
   }
@@ -212,9 +219,6 @@ public class ExecutorManagerTest {
   @Test
   public void testEnablingQueueProcessThread() throws ExecutorManagerException {
     ExecutorManager manager = createMultiExecutorManagerInstance();
-
-    Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
-    manager.disableQueueProcessorThread();
     Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
     manager.enableQueueProcessorThread();
     Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
@@ -229,7 +233,6 @@ public class ExecutorManagerTest {
     flow1.setExecutionId(1);
     ExecutableFlow flow2 = createExecutableFlow("exec2");
     flow2.setExecutionId(2);
-    manager.disableQueueProcessorThread();
 
     User testUser = getTestUser();
     manager.submitExecutableFlow(flow1, testUser.getUserId());
@@ -266,11 +269,12 @@ public class ExecutorManagerTest {
       ExecutableFlow flow1 = createExecutableFlow("exec1");
       flow1.getExecutionOptions().setConcurrentOption(
         ExecutionOptions.CONCURRENT_OPTION_SKIP);
-      manager.disableQueueProcessorThread();
 
       User testUser = getTestUser();
       manager.submitExecutableFlow(flow1, testUser.getUserId());
       manager.submitExecutableFlow(flow1, testUser.getUserId());
+      manager.enableQueueProcessorThread();
+      manager.submitExecutableFlow(flow1, testUser.getUserId());
       Assert.fail("Expecting exception, but didn't get one");
     } catch (ExecutorManagerException ex) {
       System.out.println("Test true");
@@ -286,7 +290,6 @@ public class ExecutorManagerTest {
     ExecutorLoader loader = new MockExecutorLoader();
     ExecutorManager manager = createMultiExecutorManagerInstance(loader);
     ExecutableFlow flow1 = createExecutableFlow("exec1");
-    manager.disableQueueProcessorThread();
     User testUser = getTestUser();
     manager.submitExecutableFlow(flow1, testUser.getUserId());
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 8768f7e..e725a8d 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -380,7 +380,7 @@ public class JdbcExecutorLoaderTest {
       return;
     }
     ExecutorLoader loader = createLoader();
-    Assert.assertEquals(loader.fetchExecutorByExecution(1), null);
+    Assert.assertEquals(loader.fetchExecutorByExecutionId(1), null);
   }
 
   /* Test null return when for a non-dispatched execution */
@@ -393,7 +393,7 @@ public class JdbcExecutorLoaderTest {
     ExecutorLoader loader = createLoader();
     ExecutableFlow flow = createExecutableFlow("exec1");
     loader.uploadExecutableFlow(flow);
-    Assert.assertEquals(loader.fetchExecutorByExecution(flow.getExecutionId()),
+    Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
       null);
   }
 
@@ -411,7 +411,7 @@ public class JdbcExecutorLoaderTest {
     ExecutableFlow flow = createExecutableFlow("exec1");
     loader.uploadExecutableFlow(flow);
     loader.assignExecutor(executor.getId(), flow.getExecutionId());
-    Assert.assertEquals(loader.fetchExecutorByExecution(flow.getExecutionId()),
+    Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
       executor);
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index fec183c..833e0c6 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -346,7 +346,7 @@ public class MockExecutorLoader implements ExecutorLoader {
   }
 
   @Override
-  public Executor fetchExecutorByExecution(int execId) throws ExecutorManagerException {
+  public Executor fetchExecutorByExecutionId(int execId) throws ExecutorManagerException {
     if (executionExecutorMapping.containsKey(execId)) {
       return fetchExecutor(executionExecutorMapping.get(execId));
     } else {
diff --git a/azkaban-sql/src/sql/update.execution_flows.3.0.sql b/azkaban-sql/src/sql/update.execution_flows.3.0.sql
index d725922..2935810 100644
--- a/azkaban-sql/src/sql/update.execution_flows.3.0.sql
+++ b/azkaban-sql/src/sql/update.execution_flows.3.0.sql
@@ -1 +1,2 @@
-ALTER TABLE execution_flows ADD COLUMN executor_id INT DEFAULT NULL;
\ No newline at end of file
+ALTER TABLE execution_flows ADD COLUMN executor_id INT DEFAULT NULL;
+CREATE INDEX executor_id ON execution_flows(executor_id);
\ No newline at end of file
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index 06b6f86..b2b3487 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -17,12 +17,12 @@
 package azkaban.webapp.servlet;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -181,7 +181,7 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
     }
 
     try {
-      Set<Executor> executors = execManager.getAllActiveExecutors();
+      Collection<Executor> executors = execManager.getAllActiveExecutors();
       page.add("executorList", executors);
 
       Map<String, Object> result =