azkaban-uncached

Major changes in ExecutorManager and new methods in ExecutorLoader

8/26/2015 9:43:11 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index e314206..0897555 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -18,16 +18,22 @@ package azkaban.executor;
 
 public class ExecutionReference {
   private final int execId;
-  private final String host;
-  private final int port;
+  private Executor executor;
   private long updateTime;
   private long nextCheckTime = -1;
   private int numErrors = 0;
 
-  public ExecutionReference(int execId, String host, int port) {
+  public ExecutionReference(int execId) {
     this.execId = execId;
-    this.host = host;
-    this.port = port;
+  }
+
+  public ExecutionReference(int execId, Executor executor) {
+    this.execId = execId;
+    this.executor = executor;
+  }
+
+  public Executor getExecutor() {
+    return executor;
   }
 
   public void setUpdateTime(long updateTime) {
@@ -51,11 +57,11 @@ public class ExecutionReference {
   }
 
   public String getHost() {
-    return host;
+    return executor.getHost();
   }
 
   public int getPort() {
-    return port;
+    return executor.getPort();
   }
 
   public int getNumErrors() {
@@ -65,4 +71,8 @@ public class ExecutionReference {
   public void setNumErrors(int numErrors) {
     this.numErrors = numErrors;
   }
+
+  public void setExecutor(Executor executor) {
+    this.executor = executor;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index e2d4fac..ec493bb 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -130,6 +130,25 @@ public interface ExecutorLoader {
   public void removeActiveExecutableReference(int execId)
       throws ExecutorManagerException;
 
+  /**
+   * Set executorId for a flow execution
+   *
+   * @param executorId
+   * @param execId
+   * @throws ExecutorManagerException
+   */
+  public void assignExecutor(int executorId, int execId)
+    throws ExecutorManagerException;
+
+  /**
+   * Fetch executorId for a given flow execution
+   *
+   * @param execId
+   * @return
+   * @throws ExecutorManagerException
+   */
+  public int fetchExecutorId(int execId) throws ExecutorManagerException;
+
   public boolean updateExecutableReference(int execId, long updateTime)
       throws ExecutorManagerException;
 
@@ -182,4 +201,12 @@ public interface ExecutorLoader {
 
   public int removeExecutionLogsByTime(long millis)
       throws ExecutorManagerException;
+
+  /**
+   * Fetch queued flows which have not yet dispatched
+   * @return List of queued flows and corresponding execution reference
+   * @throws ExecutorManagerException
+   */
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+    throws ExecutorManagerException;
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c86648e..0bc4f39 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.http.client.HttpClient;
@@ -58,23 +59,30 @@ import azkaban.utils.Props;
  *
  */
 public class ExecutorManager extends EventHandler implements
-    ExecutorManagerAdapter {
+  ExecutorManagerAdapter {
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
   private ExecutorLoader executorLoader;
-  private String executorHost;
-  private int executorPort;
 
   private CleanerThread cleanerThread;
 
   private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
-      new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+    new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
   private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
-      new ConcurrentHashMap<Integer, ExecutableFlow>();
+    new ConcurrentHashMap<Integer, ExecutableFlow>();
+  /* all flows ExecutorManager is currently dealing with */
+  private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
+    new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+  private ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+    new ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>>();
+
+  private Set<Executor> activeExecutors = new HashSet<Executor>();
 
   private ExecutingManagerUpdaterThread executingManager;
+  private QueueProcessorThread queueProcessor;
 
   private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
-      * 24 * 60 * 60 * 1000l;
+    * 24 * 60 * 60 * 1000l;
+
   private long lastCleanerThreadCheckTime = -1;
 
   private long lastThreadCheckTime = -1;
@@ -85,11 +93,12 @@ public class ExecutorManager extends EventHandler implements
   File cacheDir;
 
   public ExecutorManager(Props props, ExecutorLoader loader,
-      Map<String, Alerter> alters) throws ExecutorManagerException {
+    Map<String, Alerter> alters) throws ExecutorManagerException {
     this.executorLoader = loader;
     this.loadRunningFlows();
-    executorHost = props.getString("executor.host", "localhost");
-    executorPort = props.getInt("executor.port");
+    this.loadQueuedFlows();
+
+    setupExecutors(props);
 
     alerters = alters;
 
@@ -98,14 +107,43 @@ public class ExecutorManager extends EventHandler implements
     executingManager = new ExecutingManagerUpdaterThread();
     executingManager.start();
 
+    queueProcessor = new QueueProcessorThread();
+    queueProcessor.start();
+
     long executionLogsRetentionMs =
-        props.getLong("execution.logs.retention.ms",
-            DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+      props.getLong("execution.logs.retention.ms",
+        DEFAULT_EXECUTION_LOGS_RETENTION_MS);
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
     cleanerThread.start();
 
   }
 
+  /* setup activeExecutors using azkaban.properties and database executors */
+  private void setupExecutors(Props props) throws ExecutorManagerException {
+    // Add local executor, if specified as per properties
+    if (props.containsKey("executor.port")) {
+      String executorHost = props.getString("executor.host", "localhost");
+      int executorPort = props.getInt("executor.port");
+      Executor executor =
+        executorLoader.fetchExecutor(executorHost, executorPort);
+      if (executor == null) {
+        executor = executorLoader.addExecutor(executorHost, executorPort);
+      } else if (!executor.isActive()) {
+        executorLoader.activateExecutor(executor.getId());
+      }
+      activeExecutors.add(new Executor(executor.getId(), executorHost,
+        executorPort));
+    }
+
+    if (props.getBoolean("azkaban.multiple.executors", false)) {
+      activeExecutors.addAll(executorLoader.fetchActiveExecutors());
+    }
+
+    if (activeExecutors.isEmpty()) {
+      throw new ExecutorManagerException("No active executor found");
+    }
+  }
+
   @Override
   public State getExecutorManagerThreadState() {
     return executingManager.getState();
@@ -131,9 +169,11 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public Set<String> getPrimaryServerHosts() {
-    // Only one for now. More probably later.
+    // TODO: do we want to have a primary
     HashSet<String> ports = new HashSet<String>();
-    ports.add(executorHost + ":" + executorPort);
+    for (Executor executor : activeExecutors) {
+      ports.add(executor.getHost() + ":" + executor.getPort());
+    }
     return ports;
   }
 
@@ -141,11 +181,8 @@ public class ExecutorManager extends EventHandler implements
   public Set<String> getAllActiveExecutorServerHosts() {
     // Includes non primary server/hosts
     HashSet<String> ports = new HashSet<String>();
-    ports.add(executorHost + ":" + executorPort);
-    for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
-        .values()) {
-      ExecutionReference ref = running.getFirst();
-      ports.add(ref.getHost() + ":" + ref.getPort());
+    for (Executor executor : activeExecutors) {
+      ports.add(executor.getHost() + ":" + executor.getPort());
     }
 
     return ports;
@@ -155,50 +192,115 @@ public class ExecutorManager extends EventHandler implements
     runningFlows.putAll(executorLoader.fetchActiveFlows());
   }
 
+  /*
+   * load queued flows i.e with active_execution_reference and not assigned to
+   * any executor
+   */
+  private void loadQueuedFlows() throws ExecutorManagerException {
+    queuedFlows.addAll(executorLoader.fetchQueuedFlows());
+    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlows) {
+      queuedFlowMap.put(ref.getSecond().getExecutionId(), ref);
+    }
+  }
+
+  /**
+   * Gets a list of all the active (running, non-dispatched) executions for a
+   * given project and flow {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
+   *      java.lang.String)
+   */
   @Override
   public List<Integer> getRunningFlows(int projectId, String flowId) {
     ArrayList<Integer> executionIds = new ArrayList<Integer>();
+    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
+      if (ref.getSecond().getFlowId().equals(flowId)
+        && ref.getSecond().getProjectId() == projectId) {
+        executionIds.add(ref.getFirst().getExecId());
+      }
+    }
     for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
       if (ref.getSecond().getFlowId().equals(flowId)
-          && ref.getSecond().getProjectId() == projectId) {
+        && ref.getSecond().getProjectId() == projectId) {
         executionIds.add(ref.getFirst().getExecId());
       }
     }
     return executionIds;
   }
 
+  /**
+   * Checks whether the given flow has an active (running, non-dispatched)
+   * executions {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int,
+   *      java.lang.String)
+   */
   @Override
   public boolean isFlowRunning(int projectId, String flowId) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
+      if (ref.getSecond().getProjectId() == projectId
+        && ref.getSecond().getFlowId().equals(flowId)) {
+        return true;
+      }
+    }
     for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
       if (ref.getSecond().getProjectId() == projectId
-          && ref.getSecond().getFlowId().equals(flowId)) {
+        && ref.getSecond().getFlowId().equals(flowId)) {
         return true;
       }
     }
     return false;
   }
 
+  /**
+   * Fetch ExecutableFlow from an active (running, non-dispatched) or from
+   * database {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getExecutableFlow(int)
+   */
   @Override
   public ExecutableFlow getExecutableFlow(int execId)
-      throws ExecutorManagerException {
-    Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
-    if (active == null) {
+    throws ExecutorManagerException {
+    if (runningFlows.containsKey(execId)) {
+      return runningFlows.get(execId).getSecond();
+    } else if (queuedFlowMap.containsKey(execId)) {
+      return queuedFlowMap.get(execId).getSecond();
+    } else {
       return executorLoader.fetchExecutableFlow(execId);
     }
-    return active.getSecond();
   }
 
+  /**
+   * Get all active (running, non-dispatched) flows
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+   */
   @Override
   public List<ExecutableFlow> getRunningFlows() {
     ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
+    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
+      flows.add(ref.getSecond());
+    }
     for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
       flows.add(ref.getSecond());
     }
     return flows;
   }
 
+  /**
+   * Get execution Ids of all active (running, non-dispatched) flows
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+   */
   public String getRunningFlowIds() {
     List<Integer> allIds = new ArrayList<Integer>();
+    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
+      allIds.add(ref.getSecond().getExecutionId());
+    }
     for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
       allIds.add(ref.getSecond().getExecutionId());
     }
@@ -206,136 +308,141 @@ public class ExecutorManager extends EventHandler implements
     return allIds.toString();
   }
 
+  /**
+   * Get recently finished flows {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRecentlyFinishedFlows()
+   */
   public List<ExecutableFlow> getRecentlyFinishedFlows() {
     return new ArrayList<ExecutableFlow>(recentlyFinished.values());
   }
 
   @Override
   public List<ExecutableFlow> getExecutableFlows(Project project,
-      String flowId, int skip, int size) throws ExecutorManagerException {
+    String flowId, int skip, int size) throws ExecutorManagerException {
     List<ExecutableFlow> flows =
-        executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
+      executorLoader.fetchFlowHistory(project.getId(), flowId, skip, size);
     return flows;
   }
 
   @Override
   public List<ExecutableFlow> getExecutableFlows(int skip, int size)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
     return flows;
   }
 
   @Override
   public List<ExecutableFlow> getExecutableFlows(String flowIdContains,
-      int skip, int size) throws ExecutorManagerException {
+    int skip, int size) throws ExecutorManagerException {
     List<ExecutableFlow> flows =
-        executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
-            0, -1, -1, skip, size);
+      executorLoader.fetchFlowHistory(null, '%' + flowIdContains + '%', null,
+        0, -1, -1, skip, size);
     return flows;
   }
 
   @Override
   public List<ExecutableFlow> getExecutableFlows(String projContain,
-      String flowContain, String userContain, int status, long begin, long end,
-      int skip, int size) throws ExecutorManagerException {
+    String flowContain, String userContain, int status, long begin, long end,
+    int skip, int size) throws ExecutorManagerException {
     List<ExecutableFlow> flows =
-        executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
-            status, begin, end, skip, size);
+      executorLoader.fetchFlowHistory(projContain, flowContain, userContain,
+        status, begin, end, skip, size);
     return flows;
   }
 
   @Override
   public List<ExecutableJobInfo> getExecutableJobs(Project project,
-      String jobId, int skip, int size) throws ExecutorManagerException {
+    String jobId, int skip, int size) throws ExecutorManagerException {
     List<ExecutableJobInfo> nodes =
-        executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
+      executorLoader.fetchJobHistory(project.getId(), jobId, skip, size);
     return nodes;
   }
 
   @Override
   public int getNumberOfJobExecutions(Project project, String jobId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     return executorLoader.fetchNumExecutableNodes(project.getId(), jobId);
   }
 
   @Override
   public int getNumberOfExecutions(Project project, String flowId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     return executorLoader.fetchNumExecutableFlows(project.getId(), flowId);
   }
 
   @Override
   public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset,
-      int length) throws ExecutorManagerException {
+    int length) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+      runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
       Pair<String, String> typeParam = new Pair<String, String>("type", "flow");
       Pair<String, String> offsetParam =
-          new Pair<String, String>("offset", String.valueOf(offset));
+        new Pair<String, String>("offset", String.valueOf(offset));
       Pair<String, String> lengthParam =
-          new Pair<String, String>("length", String.valueOf(length));
+        new Pair<String, String>("length", String.valueOf(length));
 
       @SuppressWarnings("unchecked")
       Map<String, Object> result =
-          callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
-              typeParam, offsetParam, lengthParam);
+        callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+          typeParam, offsetParam, lengthParam);
       return LogData.createLogDataFromObject(result);
     } else {
       LogData value =
-          executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset,
-              length);
+        executorLoader
+          .fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
       return value;
     }
   }
 
   @Override
   public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId,
-      int offset, int length, int attempt) throws ExecutorManagerException {
+    int offset, int length, int attempt) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+      runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
       Pair<String, String> typeParam = new Pair<String, String>("type", "job");
       Pair<String, String> jobIdParam =
-          new Pair<String, String>("jobId", jobId);
+        new Pair<String, String>("jobId", jobId);
       Pair<String, String> offsetParam =
-          new Pair<String, String>("offset", String.valueOf(offset));
+        new Pair<String, String>("offset", String.valueOf(offset));
       Pair<String, String> lengthParam =
-          new Pair<String, String>("length", String.valueOf(length));
+        new Pair<String, String>("length", String.valueOf(length));
       Pair<String, String> attemptParam =
-          new Pair<String, String>("attempt", String.valueOf(attempt));
+        new Pair<String, String>("attempt", String.valueOf(attempt));
 
       @SuppressWarnings("unchecked")
       Map<String, Object> result =
-          callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
-              typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+        callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION,
+          typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
       return LogData.createLogDataFromObject(result);
     } else {
       LogData value =
-          executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
-              offset, length);
+        executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt,
+          offset, length);
       return value;
     }
   }
 
   @Override
   public List<Object> getExecutionJobStats(ExecutableFlow exFlow, String jobId,
-      int attempt) throws ExecutorManagerException {
+    int attempt) throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+      runningFlows.get(exFlow.getExecutionId());
     if (pair == null) {
       return executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
-          attempt);
+        attempt);
     }
 
     Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
     Pair<String, String> attemptParam =
-        new Pair<String, String>("attempt", String.valueOf(attempt));
+      new Pair<String, String>("attempt", String.valueOf(attempt));
 
     @SuppressWarnings("unchecked")
     Map<String, Object> result =
-        callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
-            jobIdParam, attemptParam);
+      callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION,
+        jobIdParam, attemptParam);
 
     @SuppressWarnings("unchecked")
     List<Object> jobStats = (List<Object>) result.get("attachments");
@@ -345,57 +452,74 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow,
-      String jobId, int offset, int length, int attempt)
-      throws ExecutorManagerException {
+    String jobId, int offset, int length, int attempt)
+    throws ExecutorManagerException {
     Pair<ExecutionReference, ExecutableFlow> pair =
-        runningFlows.get(exFlow.getExecutionId());
+      runningFlows.get(exFlow.getExecutionId());
     if (pair != null) {
 
       Pair<String, String> typeParam = new Pair<String, String>("type", "job");
       Pair<String, String> jobIdParam =
-          new Pair<String, String>("jobId", jobId);
+        new Pair<String, String>("jobId", jobId);
       Pair<String, String> offsetParam =
-          new Pair<String, String>("offset", String.valueOf(offset));
+        new Pair<String, String>("offset", String.valueOf(offset));
       Pair<String, String> lengthParam =
-          new Pair<String, String>("length", String.valueOf(length));
+        new Pair<String, String>("length", String.valueOf(length));
       Pair<String, String> attemptParam =
-          new Pair<String, String>("attempt", String.valueOf(attempt));
+        new Pair<String, String>("attempt", String.valueOf(attempt));
 
       @SuppressWarnings("unchecked")
       Map<String, Object> result =
-          callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
-              typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+        callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION,
+          typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
       return JobMetaData.createJobMetaDataFromObject(result);
     } else {
       return null;
     }
   }
 
+  /**
+   * if flows was dispatched to an executor, cancel by calling Executor else if
+   * flow is still in queue, remove from queue and finalize {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#cancelFlow(azkaban.executor.ExecutableFlow,
+   *      java.lang.String)
+   */
   @Override
   public void cancelFlow(ExecutableFlow exFlow, String userId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> pair =
+      if (runningFlows.containsKey(exFlow.getExecutionId())) {
+        Pair<ExecutionReference, ExecutableFlow> pair =
           runningFlows.get(exFlow.getExecutionId());
-      if (pair == null) {
+        callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
+          userId);
+      } else if (queuedFlowMap.containsKey(exFlow.getExecutionId())) {
+        Pair<ExecutionReference, ExecutableFlow> pair =
+          queuedFlowMap.get(exFlow.getExecutionId());
+        synchronized (pair) {
+          queuedFlows.remove(pair);
+          queuedFlowMap.remove(exFlow.getExecutionId());
+          finalizeFlows(exFlow);
+        }
+      } else {
         throw new ExecutorManagerException("Execution "
-            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
-            + " isn't running.");
+          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+          + " isn't running.");
       }
-      callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION, userId);
     }
   }
 
   @Override
   public void resumeFlow(ExecutableFlow exFlow, String userId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     synchronized (exFlow) {
       Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
+        runningFlows.get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
-            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
-            + " isn't running.");
+          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+          + " isn't running.");
       }
       callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, userId);
     }
@@ -403,14 +527,14 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public void pauseFlow(ExecutableFlow exFlow, String userId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     synchronized (exFlow) {
       Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
+        runningFlows.get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
-            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
-            + " isn't running.");
+          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+          + " isn't running.");
       }
       callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, userId);
     }
@@ -418,63 +542,63 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public void pauseExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+    String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId,
-        jobIds);
+      jobIds);
   }
 
   @Override
   public void resumeExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+    String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId,
-        jobIds);
+      jobIds);
   }
 
   @Override
   public void retryFailures(ExecutableFlow exFlow, String userId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
   }
 
   @Override
   public void retryExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+    String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId,
-        jobIds);
+      jobIds);
   }
 
   @Override
   public void disableExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+    String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId,
-        jobIds);
+      jobIds);
   }
 
   @Override
   public void enableExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+    String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId,
-        jobIds);
+      jobIds);
   }
 
   @Override
   public void cancelExecutingJobs(ExecutableFlow exFlow, String userId,
-      String... jobIds) throws ExecutorManagerException {
+    String... jobIds) throws ExecutorManagerException {
     modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId,
-        jobIds);
+      jobIds);
   }
 
   @SuppressWarnings("unchecked")
   private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow,
-      String command, String userId, String... jobIds)
-      throws ExecutorManagerException {
+    String command, String userId, String... jobIds)
+    throws ExecutorManagerException {
     synchronized (exFlow) {
       Pair<ExecutionReference, ExecutableFlow> pair =
-          runningFlows.get(exFlow.getExecutionId());
+        runningFlows.get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
-            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
-            + " isn't running.");
+          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+          + " isn't running.");
       }
 
       Map<String, Object> response = null;
@@ -484,24 +608,24 @@ public class ExecutorManager extends EventHandler implements
             ExecutableNode node = exFlow.getExecutableNode(jobId);
             if (node == null) {
               throw new ExecutorManagerException("Job " + jobId
-                  + " doesn't exist in execution " + exFlow.getExecutionId()
-                  + ".");
+                + " doesn't exist in execution " + exFlow.getExecutionId()
+                + ".");
             }
           }
         }
         String ids = StringUtils.join(jobIds, ',');
         response =
-            callExecutorServer(pair.getFirst(),
-                ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
-                new Pair<String, String>(
-                    ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
-                new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+          callExecutorServer(pair.getFirst(),
+            ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+            new Pair<String, String>(
+              ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command),
+            new Pair<String, String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
       } else {
         response =
-            callExecutorServer(pair.getFirst(),
-                ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
-                new Pair<String, String>(
-                    ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
+          callExecutorServer(pair.getFirst(),
+            ConnectorParams.MODIFY_EXECUTION_ACTION, userId,
+            new Pair<String, String>(
+              ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
       }
 
       return response;
@@ -509,7 +633,7 @@ public class ExecutorManager extends EventHandler implements
   }
 
   private void applyDisabledJobs(List<Object> disabledJobs,
-      ExecutableFlowBase exflow) {
+    ExecutableFlowBase exflow) {
     for (Object disabled : disabledJobs) {
       if (disabled instanceof String) {
         String nodeName = (String) disabled;
@@ -523,7 +647,7 @@ public class ExecutorManager extends EventHandler implements
         String nodeName = (String) nestedDisabled.get("id");
         @SuppressWarnings("unchecked")
         List<Object> subDisabledJobs =
-            (List<Object>) nestedDisabled.get("children");
+          (List<Object>) nestedDisabled.get("children");
 
         if (nodeName == null || subDisabledJobs == null) {
           return;
@@ -539,10 +663,10 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public String submitExecutableFlow(ExecutableFlow exflow, String userId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     synchronized (exflow) {
       logger.info("Submitting execution flow " + exflow.getFlowId() + " by "
-          + userId);
+        + userId);
 
       int projectId = exflow.getProjectId();
       String flowId = exflow.getFlowId();
@@ -563,31 +687,32 @@ public class ExecutorManager extends EventHandler implements
 
       if (!running.isEmpty()) {
         if (options.getConcurrentOption().equals(
-            ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
+          ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
           Collections.sort(running);
           Integer runningExecId = running.get(running.size() - 1);
 
           options.setPipelineExecutionId(runningExecId);
           message =
-              "Flow " + flowId + " is already running with exec id "
-                  + runningExecId + ". Pipelining level "
-                  + options.getPipelineLevel() + ". \n";
+            "Flow " + flowId + " is already running with exec id "
+              + runningExecId + ". Pipelining level "
+              + options.getPipelineLevel() + ". \n";
         } else if (options.getConcurrentOption().equals(
-            ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+          ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
           throw new ExecutorManagerException("Flow " + flowId
-              + " is already running. Skipping execution.",
-              ExecutorManagerException.Reason.SkippedExecution);
+            + " is already running. Skipping execution.",
+            ExecutorManagerException.Reason.SkippedExecution);
         } else {
           // The settings is to run anyways.
           message =
-              "Flow " + flowId + " is already running with exec id "
-                  + StringUtils.join(running, ",")
-                  + ". Will execute concurrently. \n";
+            "Flow " + flowId + " is already running with exec id "
+              + StringUtils.join(running, ",")
+              + ". Will execute concurrently. \n";
         }
       }
 
-      boolean memoryCheck = !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
-              ProjectWhitelist.WhitelistType.MemoryCheck);
+      boolean memoryCheck =
+        !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+          ProjectWhitelist.WhitelistType.MemoryCheck);
       options.setMemoryCheck(memoryCheck);
 
       // The exflow id is set by the loader. So it's unavailable until after
@@ -597,22 +722,16 @@ public class ExecutorManager extends EventHandler implements
       // We create an active flow reference in the datastore. If the upload
       // fails, we remove the reference.
       ExecutionReference reference =
-          new ExecutionReference(exflow.getExecutionId(), executorHost,
-              executorPort);
+        new ExecutionReference(exflow.getExecutionId());
+      // Added to db queue
       executorLoader.addActiveExecutableReference(reference);
-      try {
-        callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
-        runningFlows.put(exflow.getExecutionId(),
-            new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
-        message +=
-            "Execution submitted successfully with exec id "
-                + exflow.getExecutionId();
-      } catch (ExecutorManagerException e) {
-        executorLoader.removeActiveExecutableReference(reference.getExecId());
-        throw e;
-      }
-
+      Pair<ExecutionReference, ExecutableFlow> pair =
+        new Pair<ExecutionReference, ExecutableFlow>(reference, exflow);
+      queuedFlowMap.put(exflow.getExecutionId(), pair);
+      queuedFlows.add(pair);
+      message +=
+        "Execution submitted successfully with exec id "
+          + exflow.getExecutionId();
       return message;
     }
   }
@@ -627,50 +746,50 @@ public class ExecutorManager extends EventHandler implements
   }
 
   private Map<String, Object> callExecutorServer(ExecutionReference ref,
-      String action) throws ExecutorManagerException {
+    String action) throws ExecutorManagerException {
     try {
       return callExecutorServer(ref.getHost(), ref.getPort(), action,
-          ref.getExecId(), null, (Pair<String, String>[]) null);
+        ref.getExecId(), null, (Pair<String, String>[]) null);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
   private Map<String, Object> callExecutorServer(ExecutionReference ref,
-      String action, String user) throws ExecutorManagerException {
+    String action, String user) throws ExecutorManagerException {
     try {
       return callExecutorServer(ref.getHost(), ref.getPort(), action,
-          ref.getExecId(), user, (Pair<String, String>[]) null);
+        ref.getExecId(), user, (Pair<String, String>[]) null);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
   private Map<String, Object> callExecutorServer(ExecutionReference ref,
-      String action, Pair<String, String>... params)
-      throws ExecutorManagerException {
+    String action, Pair<String, String>... params)
+    throws ExecutorManagerException {
     try {
       return callExecutorServer(ref.getHost(), ref.getPort(), action,
-          ref.getExecId(), null, params);
+        ref.getExecId(), null, params);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
   private Map<String, Object> callExecutorServer(ExecutionReference ref,
-      String action, String user, Pair<String, String>... params)
-      throws ExecutorManagerException {
+    String action, String user, Pair<String, String>... params)
+    throws ExecutorManagerException {
     try {
       return callExecutorServer(ref.getHost(), ref.getPort(), action,
-          ref.getExecId(), user, params);
+        ref.getExecId(), user, params);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
     }
   }
 
   private Map<String, Object> callExecutorServer(String host, int port,
-      String action, Integer executionId, String user,
-      Pair<String, String>... params) throws IOException {
+    String action, Integer executionId, String user,
+    Pair<String, String>... params) throws IOException {
     URIBuilder builder = new URIBuilder();
     builder.setScheme("http").setHost(host).setPort(port).setPath("/executor");
 
@@ -678,7 +797,7 @@ public class ExecutorManager extends EventHandler implements
 
     if (executionId != null) {
       builder.setParameter(ConnectorParams.EXECID_PARAM,
-          String.valueOf(executionId));
+        String.valueOf(executionId));
     }
 
     if (user != null) {
@@ -713,7 +832,7 @@ public class ExecutorManager extends EventHandler implements
 
     @SuppressWarnings("unchecked")
     Map<String, Object> jsonResponse =
-        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+      (Map<String, Object>) JSONUtils.parseJSONFromString(response);
     String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
     if (error != null) {
       throw new IOException(error);
@@ -725,13 +844,17 @@ public class ExecutorManager extends EventHandler implements
   /**
    * Manage servlet call for stats servlet in Azkaban execution server
    * {@inheritDoc}
-   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String, azkaban.utils.Pair[])
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
+   *      azkaban.utils.Pair[])
    */
   @Override
-  public Map<String, Object> callExecutorStats(String action, Pair<String, String>... params) throws IOException {
+  public Map<String, Object> callExecutorStats(String action,
+    Pair<String, String>... params) throws IOException {
 
     URIBuilder builder = new URIBuilder();
-    builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
+    // TODO: fix to take host and port form user
+    // builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
 
     builder.setParameter(ConnectorParams.ACTION_PARAM, action);
 
@@ -763,20 +886,19 @@ public class ExecutorManager extends EventHandler implements
 
     @SuppressWarnings("unchecked")
     Map<String, Object> jsonResponse =
-        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+      (Map<String, Object>) JSONUtils.parseJSONFromString(response);
 
     return jsonResponse;
   }
 
-
   @Override
   public Map<String, Object> callExecutorJMX(String hostPort, String action,
-      String mBean) throws IOException {
+    String mBean) throws IOException {
     URIBuilder builder = new URIBuilder();
 
     String[] hostPortSplit = hostPort.split(":");
     builder.setScheme("http").setHost(hostPortSplit[0])
-        .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
+      .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
 
     builder.setParameter(action, "");
     if (mBean != null) {
@@ -805,7 +927,7 @@ public class ExecutorManager extends EventHandler implements
 
     @SuppressWarnings("unchecked")
     Map<String, Object> jsonResponse =
-        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+      (Map<String, Object>) JSONUtils.parseJSONFromString(response);
     String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
     if (error != null) {
       throw new IOException(error);
@@ -815,6 +937,7 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public void shutdown() {
+    queueProcessor.shutdown();
     executingManager.shutdown();
   }
 
@@ -846,64 +969,64 @@ public class ExecutorManager extends EventHandler implements
           lastThreadCheckTime = System.currentTimeMillis();
           updaterStage = "Starting update all flows.";
 
-          Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
-              getFlowToExecutorMap();
+          Map<Executor, List<ExecutableFlow>> exFlowMap =
+            getFlowToExecutorMap();
           ArrayList<ExecutableFlow> finishedFlows =
-              new ArrayList<ExecutableFlow>();
+            new ArrayList<ExecutableFlow>();
           ArrayList<ExecutableFlow> finalizeFlows =
-              new ArrayList<ExecutableFlow>();
+            new ArrayList<ExecutableFlow>();
 
           if (exFlowMap.size() > 0) {
-            for (Map.Entry<ConnectionInfo, List<ExecutableFlow>> entry : exFlowMap
-                .entrySet()) {
+            for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
+              .entrySet()) {
               List<Long> updateTimesList = new ArrayList<Long>();
               List<Integer> executionIdsList = new ArrayList<Integer>();
 
-              ConnectionInfo connection = entry.getKey();
+              Executor executor = entry.getKey();
 
               updaterStage =
-                  "Starting update flows on " + connection.getHost() + ":"
-                      + connection.getPort();
+                "Starting update flows on " + executor.getHost() + ":"
+                  + executor.getPort();
 
               // We pack the parameters of the same host together before we
               // query.
               fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
-                  updateTimesList);
+                updateTimesList);
 
               Pair<String, String> updateTimes =
-                  new Pair<String, String>(
-                      ConnectorParams.UPDATE_TIME_LIST_PARAM,
-                      JSONUtils.toJSON(updateTimesList));
+                new Pair<String, String>(
+                  ConnectorParams.UPDATE_TIME_LIST_PARAM,
+                  JSONUtils.toJSON(updateTimesList));
               Pair<String, String> executionIds =
-                  new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
-                      JSONUtils.toJSON(executionIdsList));
+                new Pair<String, String>(ConnectorParams.EXEC_ID_LIST_PARAM,
+                  JSONUtils.toJSON(executionIdsList));
 
               Map<String, Object> results = null;
               try {
                 results =
-                    callExecutorServer(connection.getHost(),
-                        connection.getPort(), ConnectorParams.UPDATE_ACTION,
-                        null, null, executionIds, updateTimes);
+                  callExecutorServer(executor.getHost(), executor.getPort(),
+                    ConnectorParams.UPDATE_ACTION, null, null, executionIds,
+                    updateTimes);
               } catch (IOException e) {
                 logger.error(e);
                 for (ExecutableFlow flow : entry.getValue()) {
                   Pair<ExecutionReference, ExecutableFlow> pair =
-                      runningFlows.get(flow.getExecutionId());
+                    runningFlows.get(flow.getExecutionId());
 
                   updaterStage =
-                      "Failed to get update. Doing some clean up for flow "
-                          + pair.getSecond().getExecutionId();
+                    "Failed to get update. Doing some clean up for flow "
+                      + pair.getSecond().getExecutionId();
 
                   if (pair != null) {
                     ExecutionReference ref = pair.getFirst();
                     int numErrors = ref.getNumErrors();
                     if (ref.getNumErrors() < this.numErrors) {
                       ref.setNextCheckTime(System.currentTimeMillis()
-                          + errorThreshold);
+                        + errorThreshold);
                       ref.setNumErrors(++numErrors);
                     } else {
                       logger.error("Evicting flow " + flow.getExecutionId()
-                          + ". The executor is unresponsive.");
+                        + ". The executor is unresponsive.");
                       // TODO should send out an unresponsive email here.
                       finalizeFlows.add(pair.getSecond());
                     }
@@ -914,8 +1037,8 @@ public class ExecutorManager extends EventHandler implements
               // We gets results
               if (results != null) {
                 List<Map<String, Object>> executionUpdates =
-                    (List<Map<String, Object>>) results
-                        .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+                  (List<Map<String, Object>>) results
+                    .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
                 for (Map<String, Object> updateMap : executionUpdates) {
                   try {
                     ExecutableFlow flow = updateExecution(updateMap);
@@ -945,16 +1068,16 @@ public class ExecutorManager extends EventHandler implements
             // Add new finished
             for (ExecutableFlow flow : finishedFlows) {
               if (flow.getScheduleId() >= 0
-                  && flow.getStatus() == Status.SUCCEEDED) {
+                && flow.getStatus() == Status.SUCCEEDED) {
                 ScheduleStatisticManager.invalidateCache(flow.getScheduleId(),
-                    cacheDir);
+                  cacheDir);
               }
               fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
               recentlyFinished.put(flow.getExecutionId(), flow);
             }
 
             updaterStage =
-                "Finalizing " + finalizeFlows.size() + " error flows.";
+              "Finalizing " + finalizeFlows.size() + " error flows.";
 
             // Kill error flows
             for (ExecutableFlow flow : finalizeFlows) {
@@ -1015,6 +1138,7 @@ public class ExecutorManager extends EventHandler implements
 
       updaterStage = "finalizing flow " + execId + " cleaning from memory";
       runningFlows.remove(execId);
+
       fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED));
       recentlyFinished.put(execId, dsFlow);
 
@@ -1032,12 +1156,12 @@ public class ExecutorManager extends EventHandler implements
     Alerter mailAlerter = alerters.get("email");
     if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
       if (options.getFailureEmails() != null
-          && !options.getFailureEmails().isEmpty()) {
+        && !options.getFailureEmails().isEmpty()) {
         try {
           mailAlerter
-              .alertOnError(
-                  flow,
-                  "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+            .alertOnError(
+              flow,
+              "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
         } catch (Exception e) {
           logger.error(e);
         }
@@ -1048,9 +1172,9 @@ public class ExecutorManager extends EventHandler implements
         if (alerter != null) {
           try {
             alerter
-                .alertOnError(
-                    flow,
-                    "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
+              .alertOnError(
+                flow,
+                "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
           } catch (Exception e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
@@ -1058,12 +1182,12 @@ public class ExecutorManager extends EventHandler implements
           }
         } else {
           logger.error("Alerter type " + alertType
-              + " doesn't exist. Failed to alert.");
+            + " doesn't exist. Failed to alert.");
         }
       }
     } else {
       if (options.getSuccessEmails() != null
-          && !options.getSuccessEmails().isEmpty()) {
+        && !options.getSuccessEmails().isEmpty()) {
         try {
 
           mailAlerter.alertOnSuccess(flow);
@@ -1084,7 +1208,7 @@ public class ExecutorManager extends EventHandler implements
           }
         } else {
           logger.error("Alerter type " + alertType
-              + " doesn't exist. Failed to alert.");
+            + " doesn't exist. Failed to alert.");
         }
       }
     }
@@ -1127,7 +1251,7 @@ public class ExecutorManager extends EventHandler implements
 
   private void evictOldRecentlyFinished(long ageMs) {
     ArrayList<Integer> recentlyFinishedKeys =
-        new ArrayList<Integer>(recentlyFinished.keySet());
+      new ArrayList<Integer>(recentlyFinished.keySet());
     long oldAgeThreshold = System.currentTimeMillis() - ageMs;
     for (Integer key : recentlyFinishedKeys) {
       ExecutableFlow flow = recentlyFinished.get(key);
@@ -1140,20 +1264,20 @@ public class ExecutorManager extends EventHandler implements
   }
 
   private ExecutableFlow updateExecution(Map<String, Object> updateData)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
 
     Integer execId =
-        (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
+      (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
     if (execId == null) {
       throw new ExecutorManagerException(
-          "Response is malformed. Need exec id to update.");
+        "Response is malformed. Need exec id to update.");
     }
 
     Pair<ExecutionReference, ExecutableFlow> refPair =
-        this.runningFlows.get(execId);
+      this.runningFlows.get(execId);
     if (refPair == null) {
       throw new ExecutorManagerException(
-          "No running flow found with the execution id. Removing " + execId);
+        "No running flow found with the execution id. Removing " + execId);
     }
 
     ExecutionReference ref = refPair.getFirst();
@@ -1195,7 +1319,7 @@ public class ExecutorManager extends EventHandler implements
           }
         } else {
           logger.error("Alerter type " + alertType
-              + " doesn't exist. Failed to alert.");
+            + " doesn't exist. Failed to alert.");
         }
       }
     }
@@ -1215,22 +1339,22 @@ public class ExecutorManager extends EventHandler implements
   }
 
   private void fillUpdateTimeAndExecId(List<ExecutableFlow> flows,
-      List<Integer> executionIds, List<Long> updateTimes) {
+    List<Integer> executionIds, List<Long> updateTimes) {
     for (ExecutableFlow flow : flows) {
       executionIds.add(flow.getExecutionId());
       updateTimes.add(flow.getUpdateTime());
     }
   }
 
-  private Map<ConnectionInfo, List<ExecutableFlow>> getFlowToExecutorMap() {
-    HashMap<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
-        new HashMap<ConnectionInfo, List<ExecutableFlow>>();
+  private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
+    HashMap<Executor, List<ExecutableFlow>> exFlowMap =
+      new HashMap<Executor, List<ExecutableFlow>>();
 
-    ConnectionInfo lastPort = new ConnectionInfo(executorHost, executorPort);
     for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
-        .values()) {
+      .values()) {
       ExecutionReference ref = runningFlow.getFirst();
       ExecutableFlow flow = runningFlow.getSecond();
+      Executor executor = ref.getExecutor();
 
       // We can set the next check time to prevent the checking of certain
       // flows.
@@ -1238,16 +1362,10 @@ public class ExecutorManager extends EventHandler implements
         continue;
       }
 
-      // Just a silly way to reduce object creation construction of objects
-      // since it's most likely that the values will be the same.
-      if (!lastPort.isEqual(ref.getHost(), ref.getPort())) {
-        lastPort = new ConnectionInfo(ref.getHost(), ref.getPort());
-      }
-
-      List<ExecutableFlow> flows = exFlowMap.get(lastPort);
+      List<ExecutableFlow> flows = exFlowMap.get(executor);
       if (flows == null) {
         flows = new ArrayList<ExecutableFlow>();
-        exFlowMap.put(lastPort, flows);
+        exFlowMap.put(executor, flows);
       }
 
       flows.add(flow);
@@ -1256,76 +1374,110 @@ public class ExecutorManager extends EventHandler implements
     return exFlowMap;
   }
 
-  private static class ConnectionInfo {
-    private String host;
-    private int port;
+  @Override
+  public int getExecutableFlows(int projectId, String flowId, int from,
+    int length, List<ExecutableFlow> outputList)
+    throws ExecutorManagerException {
+    List<ExecutableFlow> flows =
+      executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+    outputList.addAll(flows);
+    return executorLoader.fetchNumExecutableFlows(projectId, flowId);
+  }
 
-    public ConnectionInfo(String host, int port) {
-      this.host = host;
-      this.port = port;
+  @Override
+  public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId,
+    int from, int length, Status status) throws ExecutorManagerException {
+    return executorLoader.fetchFlowHistory(projectId, flowId, from, length,
+      status);
+  }
+
+  /*
+   * This thread is responsible for processing queued flows.
+   */
+  private class QueueProcessorThread extends Thread {
+    private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
+    private boolean shutdown = false;
+    private long lastProcessingTime = -1;
+    private long maxContinousSubmission = 10;
+
+    public QueueProcessorThread() {
+      this.setName("AzkabanWebServer-QueueProcessor-Thread");
     }
 
     @SuppressWarnings("unused")
-    private ConnectionInfo getOuterType() {
-      return ConnectionInfo.this;
+    public void shutdown() {
+      shutdown = true;
+      this.interrupt();
     }
 
-    public boolean isEqual(String host, int port) {
-      return this.port == port && this.host.equals(host);
+    public void run() {
+      while (!shutdown) {
+        synchronized (this) {
+          try {
+            refreshExecutors();
+            lastCleanerThreadCheckTime = System.currentTimeMillis();
+            long currentTime = System.currentTimeMillis();
+            if (currentTime - QUEUE_PROCESSOR_WAIT_IN_MS > lastProcessingTime) {
+              processQueuedFlows(maxContinousSubmission);
+              lastProcessingTime = currentTime;
+            }
+            wait(QUEUE_PROCESSOR_WAIT_IN_MS);
+          } catch (InterruptedException e) {
+            logger.info("Interrupted. Probably to shut down.");
+          }
+        }
+      }
     }
+  }
 
-    public String getHost() {
-      return host;
-    }
+  private void refreshExecutors() {
+    // TODO: rest api call to refresh executor stats
+  }
 
-    public int getPort() {
-      return port;
-    }
+  private void processQueuedFlows(long maxContinousSubmission) {
+    try {
+      Pair<ExecutionReference, ExecutableFlow> runningCandidate;
+      int submissionNum = 0;
+      while (submissionNum < maxContinousSubmission
+        && (runningCandidate = queuedFlows.peek()) != null) {
+        synchronized (runningCandidate) {
 
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((host == null) ? 0 : host.hashCode());
-      result = prime * result + port;
-      return result;
-    }
+          ExecutionReference reference = runningCandidate.getFirst();
+          ExecutableFlow exflow = runningCandidate.getSecond();
 
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      ConnectionInfo other = (ConnectionInfo) obj;
-      if (host == null) {
-        if (other.host != null)
-          return false;
-      } else if (!host.equals(other.host))
-        return false;
-      if (port != other.port)
-        return false;
-      return true;
-    }
-  }
+          // TODO: use dispatcher
+          Executor choosenExecutor;
 
-  @Override
-  public int getExecutableFlows(int projectId, String flowId, int from,
-      int length, List<ExecutableFlow> outputList)
-      throws ExecutorManagerException {
-    List<ExecutableFlow> flows =
-        executorLoader.fetchFlowHistory(projectId, flowId, from, length);
-    outputList.addAll(flows);
-    return executorLoader.fetchNumExecutableFlows(projectId, flowId);
-  }
+          synchronized (activeExecutors) {
+            choosenExecutor = activeExecutors.iterator().next();
+          }
 
-  @Override
-  public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId,
-      int from, int length, Status status) throws ExecutorManagerException {
-    return executorLoader.fetchFlowHistory(projectId, flowId, from, length,
-        status);
+          if (choosenExecutor != null) {
+            queuedFlows.poll();
+            queuedFlowMap.remove(exflow.getExecutionId());
+
+            try {
+              reference.setExecutor(choosenExecutor);
+              executorLoader.assignExecutor(choosenExecutor.getId(),
+                exflow.getExecutionId());
+              // TODO: ADD rest call to do an actual dispatch
+              callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+              runningFlows
+                .put(exflow.getExecutionId(),
+                  new Pair<ExecutionReference, ExecutableFlow>(reference,
+                    exflow));
+            } catch (ExecutorManagerException e) {
+              logger.error("Failed to process queued flow", e);
+              // TODO: allow N errors and re-try
+              finalizeFlows(exflow);
+            }
+          }
+        }
+        submissionNum++;
+      }
+    } catch (Throwable th) {
+      logger.error("Failed to process the queue", th);
+    }
   }
 
   /*
@@ -1336,7 +1488,7 @@ public class ExecutorManager extends EventHandler implements
 
     // check every day
     private static final long CLEANER_THREAD_WAIT_INTERVAL_MS =
-        24 * 60 * 60 * 1000;
+      24 * 60 * 60 * 1000;
 
     private final long executionLogsRetentionMs;
 
@@ -1379,9 +1531,9 @@ public class ExecutorManager extends EventHandler implements
       logger.info("Cleaning old logs from execution_logs");
       long cutoff = DateTime.now().getMillis() - executionLogsRetentionMs;
       logger.info("Cleaning old log files before "
-          + new DateTime(cutoff).toString());
+        + new DateTime(cutoff).toString());
       cleanOldExecutionLogs(DateTime.now().getMillis()
-          - executionLogsRetentionMs);
+        - executionLogsRetentionMs);
     }
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 9b12724..62102a2 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -180,6 +180,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+  /**
+   *
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorLoader#fetchQueuedFlows()
+   */
+  @Override
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+    throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
+
+    try {
+      List<Pair<ExecutionReference, ExecutableFlow>> flows =
+        runner.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
+          flowHandler);
+      return flows;
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error fetching active flows", e);
+    }
+  }
+
   @Override
   public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException {
@@ -383,12 +404,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       throws ExecutorManagerException {
     final String INSERT =
         "INSERT INTO active_executing_flows "
-            + "(exec_id, host, port, update_time) values (?,?,?,?)";
+            + "(exec_id, update_time) values (?,?)";
     QueryRunner runner = createQueryRunner();
 
     try {
-      runner.update(INSERT, reference.getExecId(), reference.getHost(),
-          reference.getPort(), reference.getUpdateTime());
+      runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
     } catch (SQLException e) {
       throw new ExecutorManagerException(
           "Error updating active flow reference " + reference.getExecId(), e);
@@ -1177,13 +1197,77 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+  private static class FetchQueuedExecutableFlows implements
+    ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+    // Select queued unassigned flows
+    private static String FETCH_QUEUED_EXECUTABLE_FLOW =
+      "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, "
+        + " ax.update_time axUpdateTime FROM execution_flows ex"
+        + " INNER JOIN"
+        + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+        + " Where ex.executor_id is NULL";
+
+    @Override
+    public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+      throws SQLException {
+      if (!rs.next()) {
+        return Collections
+          .<Pair<ExecutionReference, ExecutableFlow>> emptyList();
+      }
+
+      List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+        new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+      do {
+        int id = rs.getInt(1);
+        int encodingType = rs.getInt(2);
+        byte[] data = rs.getBytes(3);
+        long updateTime = rs.getLong(4);
+
+        if (data == null) {
+          logger.error("Found a flow with empty data blob exec_id: " + id);
+        } else {
+          EncodingType encType = EncodingType.fromInteger(encodingType);
+          Object flowObj;
+          try {
+            // Convoluted way to inflate strings. Should find common package or
+            // helper function.
+            if (encType == EncodingType.GZIP) {
+              // Decompress the sucker.
+              String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              String jsonString = new String(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+
+            ExecutableFlow exFlow =
+              ExecutableFlow.createExecutableFlowFromObject(flowObj);
+            ExecutionReference ref = new ExecutionReference(id);
+            ref.setUpdateTime(updateTime);
+
+            execFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref,
+              exFlow));
+          } catch (IOException e) {
+            throw new SQLException("Error retrieving flow data " + id, e);
+          }
+        }
+      } while (rs.next());
+
+      return execFlows;
+    }
+  }
+
   private static class FetchActiveExecutableFlows implements
       ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
+    // Select running and executor assigned flows
     private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
-        "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data "
-            + "flow_data, ax.host host, ax.port port, ax.update_time "
-            + "axUpdateTime " + "FROM execution_flows ex "
-            + "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
+      "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, "
+        + "et.host host, et.port port, ax.update_time axUpdateTime, et.id executorId"
+        + " FROM execution_flows ex"
+        + " INNER JOIN "
+        + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+        + " INNER JOIN "
+        + " executors et ON ex.executor_id = et.id";
 
     @Override
     public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
@@ -1202,6 +1286,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         String host = rs.getString(4);
         int port = rs.getInt(5);
         long updateTime = rs.getLong(6);
+        int executorId = rs.getInt(7);
 
         if (data == null) {
           execFlows.put(id, null);
@@ -1222,7 +1307,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
             ExecutableFlow exFlow =
                 ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            ExecutionReference ref = new ExecutionReference(id, host, port);
+            Executor executor = new Executor(executorId, host, port);
+            ExecutionReference ref = new ExecutionReference(id, executor);
             ref.setUpdateTime(updateTime);
 
             execFlows.put(id, new Pair<ExecutionReference, ExecutableFlow>(ref,
@@ -1308,6 +1394,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
     private static String NUM_JOB_EXECUTIONS =
         "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
+    private static String FETCH_EXECUTOR_ID =
+        "SELECT executor_id FROM execution_flows WHERE exec_id=?";
 
     @Override
     public Integer handle(ResultSet rs) throws SQLException {
@@ -1402,4 +1490,38 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       return events;
     }
   }
+
+  @Override
+  public void assignExecutor(int executorId, int execId)
+    throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE execution_flows SET executor_id=? where exec_id=?";
+
+    QueryRunner runner = createQueryRunner();
+    try {
+      int rows = runner.update(UPDATE, executorId, execId);
+      if (rows == 0) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to update executor Id: %d to execution : %d  ", executorId,
+          execId));
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error updating executor id "
+        + executorId, e);
+    }
+  }
+
+  @Override
+  public int fetchExecutorId(int executionId) throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    IntHandler intHandler = new IntHandler();
+    try {
+      int executorId =
+        runner.query(IntHandler.FETCH_EXECUTOR_ID, intHandler, executionId);
+      return executorId;
+    } catch (SQLException e) {
+      throw new ExecutorManagerException(
+        "Error fetching executorId for exec_id : " + executionId, e);
+    }
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index fed5c6b..55963ad 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -546,14 +546,15 @@ public class JdbcExecutorLoaderTest {
     ExecutorLoader loader = createLoader();
     ExecutableFlow flow1 = createExecutableFlow("exec1");
     loader.uploadExecutableFlow(flow1);
+    Executor executor = new Executor(2, "test", 1);
     ExecutionReference ref1 =
-        new ExecutionReference(flow1.getExecutionId(), "test", 1);
+        new ExecutionReference(flow1.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref1);
 
     ExecutableFlow flow2 = createExecutableFlow("exec1");
     loader.uploadExecutableFlow(flow2);
     ExecutionReference ref2 =
-        new ExecutionReference(flow2.getExecutionId(), "test", 1);
+        new ExecutionReference(flow2.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref2);
 
     ExecutableFlow flow3 = createExecutableFlow("exec1");
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index 7f6eeff..5b8b1a7 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -30,6 +30,8 @@ import azkaban.utils.Props;
 
 public class MockExecutorLoader implements ExecutorLoader {
 
+  HashMap<Integer, Integer> executionExecutorMapping =
+      new HashMap<Integer, Integer>();
   HashMap<Integer, ExecutableFlow> flows =
       new HashMap<Integer, ExecutableFlow>();
   HashMap<String, ExecutableNode> nodes = new HashMap<String, ExecutableNode>();
@@ -342,4 +344,35 @@ public class MockExecutorLoader implements ExecutorLoader {
     }
   }
 
+  @Override
+  public void assignExecutor(int executorId, int execId)
+    throws ExecutorManagerException {
+    ExecutionReference ref = refs.get(execId);
+    ref.setExecutor(fetchExecutor(executorId));
+    executionExecutorMapping.put(execId, executorId);
+  }
+
+  @Override
+  public int fetchExecutorId(int execId) throws ExecutorManagerException {
+    if (executionExecutorMapping.containsKey(execId)) {
+      return executionExecutorMapping.get(execId);
+    } else {
+      throw new ExecutorManagerException(
+        "Failed to find executor with execution : " + execId);
+    }
+  }
+
+  @Override
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+    throws ExecutorManagerException {
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+      new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+    for (int execId : refs.keySet()) {
+      if (executionExecutorMapping.containsKey(execId)) {
+        queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(refs
+          .get(execId), flows.get(execId)));
+      }
+    }
+    return queuedFlows;
+  }
 }
diff --git a/azkaban-sql/src/sql/update.active_executing_flows.2.7.sql b/azkaban-sql/src/sql/update.active_executing_flows.2.7.sql
new file mode 100644
index 0000000..dcb4ec5
--- /dev/null
+++ b/azkaban-sql/src/sql/update.active_executing_flows.2.7.sql
@@ -0,0 +1,2 @@
+ALTER TABLE active_executing_flows DROP COLUMN host;
+ALTER TABLE active_executing_flows DROP COLUMN port;
\ No newline at end of file
diff --git a/azkaban-sql/src/sql/update.execution_flows.2.7.sql b/azkaban-sql/src/sql/update.execution_flows.2.7.sql
new file mode 100644
index 0000000..d725922
--- /dev/null
+++ b/azkaban-sql/src/sql/update.execution_flows.2.7.sql
@@ -0,0 +1 @@
+ALTER TABLE execution_flows ADD COLUMN executor_id INT DEFAULT NULL;
\ No newline at end of file