azkaban-aplcache

merged with multi-executor branch

10/5/2015 3:20:38 PM

Changes

azkaban-sql/src/sql/update.execution_logs.2.1.sql 6(+0 -6)

azkaban-sql/src/sql/update.project_properties.2.1.sql 3(+0 -3)

Details

diff --git a/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java b/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
index b31e1eb..9c82f2a 100644
--- a/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
+++ b/azkaban-common/src/main/java/azkaban/database/AzkabanDatabaseSetup.java
@@ -242,9 +242,15 @@ public class AzkabanDatabaseSetup {
         upgradeList.put(key, upgradeVersions);
       }
     }
+    for (String key : missingTables) {
+      List<String> upgradeVersions = findOutOfDateTable(key, "");
+      if (upgradeVersions != null && !upgradeVersions.isEmpty()) {
+        upgradeList.put(key, upgradeVersions);
+      }
+    }
   }
 
-  private List<String> findOutOfDateTable(String table, String version) {
+  private List<String> findOutOfDateTable(String table, String currentVersion) {
     File directory = new File(scriptPath);
     ArrayList<String> versions = new ArrayList<String>();
 
@@ -255,30 +261,29 @@ public class AzkabanDatabaseSetup {
       return null;
     }
 
-    String updateFileNameVersion = UPDATE_SCRIPT_PREFIX + table + "." + version;
+    String updateFileNameVersion = UPDATE_SCRIPT_PREFIX + table + "." + currentVersion;
     for (File file : createScripts) {
       String fileName = file.getName();
       if (fileName.compareTo(updateFileNameVersion) > 0) {
-        if (fileName.startsWith(updateFileNameVersion)) {
-          continue;
-        }
-
         String[] split = fileName.split("\\.");
-        String versionNum = "";
+        String updateScriptVersion = "";
 
         for (int i = 2; i < split.length - 1; ++i) {
           try {
             Integer.parseInt(split[i]);
-            versionNum += split[i] + ".";
+            updateScriptVersion += split[i] + ".";
           } catch (NumberFormatException e) {
             break;
           }
         }
-        if (versionNum.endsWith(".")) {
-          versionNum = versionNum.substring(0, versionNum.length() - 1);
-
-          if (versionNum.compareTo(version) == 0) {
-            versions.add(versionNum);
+        if (updateScriptVersion.endsWith(".")) {
+          updateScriptVersion = updateScriptVersion.substring(0, updateScriptVersion.length() - 1);
+
+          // add to update list if updateScript will update above current
+          // version and upto targetVersion in database.properties
+          if (updateScriptVersion.compareTo(currentVersion) > 0
+            && updateScriptVersion.compareTo(this.version) <= 0) {
+            versions.add(updateScriptVersion);
           }
         }
       }
@@ -300,6 +305,8 @@ public class AzkabanDatabaseSetup {
       for (String table : missingTables) {
         if (!table.equals("properties")) {
           runTableScripts(conn, table, version, dataSource.getDBType(), false);
+          // update version as we have create a new table
+          installedVersions.put(table, version);
         }
       }
     } finally {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
index 8bc725f..8abbd61 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ConnectorParams.java
@@ -17,6 +17,7 @@
 package azkaban.executor;
 
 public interface ConnectorParams {
+  public static final String EXECUTOR_ID_PARAM = "executorId";
   public static final String ACTION_PARAM = "action";
   public static final String EXECID_PARAM = "execid";
   public static final String SHAREDTOKEN_PARAM = "token";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
new file mode 100644
index 0000000..3050a8d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * Comparator implicitly used in priority queue for QueuedExecutions.
+ */
+public final class ExecutableFlowPriorityComparator implements
+  Comparator<Pair<ExecutionReference, ExecutableFlow>> {
+  private static Logger logger = Logger
+    .getLogger(ExecutableFlowPriorityComparator.class);
+
+  /**
+   * <pre>
+   * Sorting order is determined by:-
+   * 1. descending order of priority
+   * 2. if same priority, ascending order of update time
+   * 3. if same priority and updateTime, ascending order of execution id
+   * </pre>
+   *
+   * {@inheritDoc}
+   *
+   * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+   */
+  @Override
+  public int compare(Pair<ExecutionReference, ExecutableFlow> pair1,
+    Pair<ExecutionReference, ExecutableFlow> pair2) {
+    ExecutableFlow exflow1 = null, exflow2 = null;
+    if (pair1 != null && pair1.getSecond() != null) {
+      exflow1 = pair1.getSecond();
+    }
+    if (pair2 != null && pair2.getSecond() != null) {
+      exflow2 = pair2.getSecond();
+    }
+    if (exflow1 == null && exflow2 == null)
+      return 0;
+    else if (exflow1 == null)
+      return -1;
+    else if (exflow2 == null)
+      return 1;
+    else {
+      // descending order of priority
+      int diff = getPriority(exflow2) - getPriority(exflow1);
+      if (diff == 0) {
+        // ascending order of update time, if same priority
+        diff = (int) (exflow1.getUpdateTime() - exflow2.getUpdateTime());
+      }
+      if (diff == 0) {
+        // ascending order of execution id, if same priority and updateTime
+        diff = exflow1.getExecutionId() - exflow2.getExecutionId();
+      }
+      return diff;
+    }
+  }
+
+  /* Helper method to fetch flow priority from flow props */
+  private int getPriority(ExecutableFlow exflow) {
+    ExecutionOptions options = exflow.getExecutionOptions();
+    int priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+    if (options != null
+      && options.getFlowParameters() != null
+      && options.getFlowParameters()
+        .containsKey(ExecutionOptions.FLOW_PRIORITY)) {
+      try {
+        priority =
+          Integer.valueOf(options.getFlowParameters().get(
+            ExecutionOptions.FLOW_PRIORITY));
+      } catch (NumberFormatException ex) {
+        priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+        logger.error(
+          "Failed to parse flow priority for exec_id = "
+            + exflow.getExecutionId(), ex);
+      }
+    }
+    return priority;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index d4cb262..d8b10f1 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -33,6 +33,10 @@ public class ExecutionOptions {
   public static final String CONCURRENT_OPTION_SKIP = "skip";
   public static final String CONCURRENT_OPTION_PIPELINE = "pipeline";
   public static final String CONCURRENT_OPTION_IGNORE = "ignore";
+  public static final String FLOW_PRIORITY = "flowPriority";
+  /* override dispatcher selection and use executor id specified */
+  public static final String USE_EXECUTOR = "useExecutor";
+  public static final int DEFAULT_FLOW_PRIORITY = 5;
 
   private static final String FLOW_PARAMETERS = "flowParameters";
   private static final String NOTIFY_ON_FIRST_FAILURE = "notifyOnFirstFailure";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
index e314206..9d93476 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionReference.java
@@ -18,16 +18,23 @@ package azkaban.executor;
 
 public class ExecutionReference {
   private final int execId;
-  private final String host;
-  private final int port;
+  private Executor executor;
   private long updateTime;
   private long nextCheckTime = -1;
   private int numErrors = 0;
 
-  public ExecutionReference(int execId, String host, int port) {
+
+  public ExecutionReference(int execId) {
+    this.execId = execId;
+  }
+
+  public ExecutionReference(int execId, Executor executor) {
+    if (executor == null) {
+      throw new IllegalArgumentException(String.format(
+        "Executor cannot be null for exec id: %d ExecutionReference", execId));
+    }
     this.execId = execId;
-    this.host = host;
-    this.port = port;
+    this.executor = executor;
   }
 
   public void setUpdateTime(long updateTime) {
@@ -51,11 +58,11 @@ public class ExecutionReference {
   }
 
   public String getHost() {
-    return host;
+    return executor.getHost();
   }
 
   public int getPort() {
-    return port;
+    return executor.getPort();
   }
 
   public int getNumErrors() {
@@ -65,4 +72,12 @@ public class ExecutionReference {
   public void setNumErrors(int numErrors) {
     this.numErrors = numErrors;
   }
-}
+
+  public void setExecutor(Executor executor) {
+    this.executor = executor;
+  }
+
+  public Executor getExecutor() {
+    return executor;
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
new file mode 100644
index 0000000..f0600ab
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Date;
+import azkaban.utils.Utils;
+
+/**
+ * Class to represent an AzkabanExecutorServer details for ExecutorManager
+ *
+ * @author gaggarwa
+ */
+public class Executor implements Comparable<Executor> {
+  private final int id;
+  private final String host;
+  private final int port;
+  private boolean isActive;
+  // cached copy of the latest statistics from  the executor.
+  private ExecutorInfo cachedExecutorStats;
+  private Date lastStatsUpdatedTime;
+
+  /**
+   * <pre>
+   * Construct an Executor Object
+   * Note: port should be a within unsigned 2 byte
+   * integer range
+   * </pre>
+   *
+   * @param executor_id
+   * @param executor_host
+   * @param executor_port
+   */
+  public Executor(int id, String host, int port, boolean isActive) {
+    if (!Utils.isValidPort(port)) {
+      throw new IllegalArgumentException(String.format(
+        "Invalid port number %d for host %s, executor_id %d", port, host, id));
+    }
+
+    this.id = id;
+    this.host = host;
+    this.port = port;
+    this.isActive = isActive;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (isActive ? 1231 : 1237);
+    result = prime * result + ((host == null) ? 0 : host.hashCode());
+    result = prime * result + id;
+    result = prime * result + port;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof Executor))
+      return false;
+    Executor other = (Executor) obj;
+    if (isActive != other.isActive)
+      return false;
+    if (host == null) {
+      if (other.host != null)
+        return false;
+    } else if (!host.equals(other.host))
+      return false;
+    if (id != other.id)
+      return false;
+    if (port != other.port)
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString(){
+    return String.format("%s:%s (id: %s)",
+      null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
+      this.port, this.id);
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public boolean isActive() {
+    return isActive;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  public ExecutorInfo getExecutorInfo() {
+    return this.cachedExecutorStats;
+  }
+
+  public void setExecutorInfo(ExecutorInfo info) {
+    this.cachedExecutorStats = info;
+    this.lastStatsUpdatedTime = new Date();
+  }
+
+  /**
+   * Gets the timestamp when the executor info is last updated.
+   * @return date object represents the timestamp, null if the executor info of this
+   *         specific executor is never refreshed.
+   * */
+  public Date getLastStatsUpdatedTime(){
+    return this.lastStatsUpdatedTime;
+  }
+
+  public void setActive(boolean isActive) {
+    this.isActive = isActive;
+  }
+
+  @Override
+  public int compareTo(Executor o) {
+    return null == o ? 1 : this.hashCode() - o.hashCode();
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
new file mode 100644
index 0000000..2ab814d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.util.EntityUtils;
+import azkaban.utils.RestfulApiClient;
+
+/** Client class that will be used to handle all Restful API calls between Executor and the host application.
+ * */
+public class ExecutorApiClient extends RestfulApiClient<String> {
+  private static ExecutorApiClient instance = null;
+  private ExecutorApiClient(){}
+
+  /**
+   * Singleton method to return the instance of the current object.
+   * */
+  public static ExecutorApiClient getInstance(){
+    if (null == instance){
+      instance = new ExecutorApiClient();
+    }
+
+    return instance;
+  }
+
+  /**Implementing the parseResponse function to return de-serialized Json object.
+   * @param response  the returned response from the HttpClient.
+   * @return de-serialized object from Json or null if the response doesn't have a body.
+   * */
+  @Override
+  protected String parseResponse(HttpResponse response)
+      throws HttpResponseException, IOException {
+    final StatusLine statusLine = response.getStatusLine();
+    String responseBody = response.getEntity() != null ?
+        EntityUtils.toString(response.getEntity()) : "";
+
+    if (statusLine.getStatusCode() >= 300) {
+
+        logger.error(String.format("unable to parse response as the response status is %s",
+            statusLine.getStatusCode()));
+
+        throw new HttpResponseException(statusLine.getStatusCode(),responseBody);
+    }
+
+    return responseBody;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
new file mode 100644
index 0000000..03de432
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+ /** Class that exposes the statistics from the executor server.
+  *  List of the statistics -
+  *  remainingMemoryPercent;
+  *  remainingMemory;
+  *  remainingFlowCapacity;
+  *  numberOfAssignedFlows;
+  *  lastDispatchedTime;
+  *  cpuUsage;
+  *
+  * */
+  public class ExecutorInfo implements java.io.Serializable{
+    private static final long serialVersionUID = 3009746603773371263L;
+    private double remainingMemoryPercent;
+    private long   remainingMemoryInMB;
+    private int    remainingFlowCapacity;
+    private int    numberOfAssignedFlows;
+    private long   lastDispatchedTime;
+    private double cpuUsage;
+
+    public double getCpuUsage() {
+      return this.cpuUsage;
+    }
+
+    public void setCpuUpsage(double value){
+      this.cpuUsage = value;
+    }
+
+    public double getRemainingMemoryPercent() {
+      return this.remainingMemoryPercent;
+    }
+
+    public void setRemainingMemoryPercent(double value){
+      this.remainingMemoryPercent = value;
+    }
+
+    public long getRemainingMemoryInMB(){
+      return this.remainingMemoryInMB;
+    }
+
+    public void setRemainingMemoryInMB(long value){
+      this.remainingMemoryInMB = value;
+    }
+
+    public int getRemainingFlowCapacity(){
+      return this.remainingFlowCapacity;
+    }
+
+    public void setRemainingFlowCapacity(int value){
+      this.remainingFlowCapacity = value;
+    }
+
+    public long getLastDispatchedTime(){
+      return this.lastDispatchedTime;
+    }
+
+    public void setLastDispatchedTime(long value){
+      this.lastDispatchedTime = value;
+    }
+
+    public int getNumberOfAssignedFlows () {
+      return this.numberOfAssignedFlows;
+    }
+
+    public void setNumberOfAssignedFlows (int value) {
+      this.numberOfAssignedFlows = value;
+    }
+
+    public ExecutorInfo(){}
+
+    public ExecutorInfo (double remainingMemoryPercent,
+        long remainingMemory,
+        int remainingFlowCapacity,
+        long lastDispatched,
+        double cpuUsage,
+        int numberOfAssignedFlows){
+      this.remainingMemoryInMB = remainingMemory;
+      this.cpuUsage = cpuUsage;
+      this.remainingFlowCapacity = remainingFlowCapacity;
+      this.remainingMemoryPercent = remainingMemoryPercent;
+      this.lastDispatchedTime = lastDispatched;
+      this.numberOfAssignedFlows = numberOfAssignedFlows;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (obj instanceof ExecutorInfo)
+        {
+          boolean result = true;
+          ExecutorInfo stat = (ExecutorInfo) obj;
+
+          result &=this.remainingMemoryInMB == stat.remainingMemoryInMB;
+          result &=this.cpuUsage == stat.cpuUsage;
+          result &=this.remainingFlowCapacity == stat.remainingFlowCapacity;
+          result &=this.remainingMemoryPercent == stat.remainingMemoryPercent;
+          result &=this.numberOfAssignedFlows == stat.numberOfAssignedFlows;
+          result &= this.lastDispatchedTime == stat.lastDispatchedTime;
+          return result;
+        }
+        return false;
+    }
+
+    /**
+     * Helper function to get an ExecutorInfo instance from the JSon String serialized from another object.
+     * @param  jsonString the string that will be de-serialized from.
+     * @return instance of the object if the parsing is successful, null other wise.
+     * @throws JsonParseException,JsonMappingException,IOException
+     * */
+    public static ExecutorInfo fromJSONString(String jsonString) throws
+    JsonParseException,
+    JsonMappingException,
+    IOException{
+      if (null == jsonString || jsonString.length() == 0) return null;
+      return new ObjectMapper().readValue(jsonString, ExecutorInfo.class);
+    }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 6dc0e11..80e8167 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.util.List;
 import java.util.Map;
 
+import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -47,12 +48,187 @@ public interface ExecutorLoader {
       String flowContains, String userNameContains, int status, long startData,
       long endData, int skip, int num) throws ExecutorManagerException;
 
+  /**
+   * <pre>
+   * Fetch all executors from executors table
+   * Note:-
+   * 1 throws an Exception in case of a SQL issue
+   * 2 returns an empty list in case of no executor
+   * </pre>
+   *
+   * @return List<Executor>
+   * @throws ExecutorManagerException
+   */
+  public List<Executor> fetchAllExecutors() throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetch all executors from executors table with active = true
+   * Note:-
+   * 1 throws an Exception in case of a SQL issue
+   * 2 returns an empty list in case of no active executor
+   * </pre>
+   *
+   * @return List<Executor>
+   * @throws ExecutorManagerException
+   */
+  public List<Executor> fetchActiveExecutors() throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetch executor from executors with a given (host, port)
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found
+   * with the given (host,port)
+   * </pre>
+   *
+   * @return Executor
+   * @throws ExecutorManagerException
+   */
+  public Executor fetchExecutor(String host, int port)
+    throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetch executor from executors with a given executorId
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found with the given executorId
+   * </pre>
+   *
+   * @return Executor
+   * @throws ExecutorManagerException
+   */
+  public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * create an executor and insert in executors table.
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. throws an Exception if a executor with (host, port) already exist
+   * 3. return null when no executor is found with the given executorId
+   * </pre>
+   *
+   * @return Executor
+   * @throws ExecutorManagerException
+   */
+  public Executor addExecutor(String host, int port)
+    throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * create an executor and insert in executors table.
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. throws an Exception if there is no executor with the given id
+   * 3. return null when no executor is found with the given executorId
+   * </pre>
+   *
+   * @param executorId
+   * @throws ExecutorManagerException
+   */
+  public void updateExecutor(Executor executor) throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Log an event in executor_event audit table Note:- throws an Exception in
+   * case of a SQL issue
+   * Note: throws an Exception in case of a SQL issue
+   * </pre>
+   *
+   * @param executor
+   * @param type
+   * @param user
+   * @param message
+   * @return isSuccess
+   */
+  public void postExecutorEvent(Executor executor, EventType type, String user,
+    String message) throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * This method is to fetch events recorded in executor audit table, inserted
+   * by postExecutorEvents with a given executor, starting from skip
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. Returns an empty list in case of no events
+   * </pre>
+   *
+   * @param executor
+   * @param num
+   * @param skip
+   * @return List<ExecutorLogEvent>
+   * @throws ExecutorManagerException
+   */
+  List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+    int offset) throws ExecutorManagerException;
+
   public void addActiveExecutableReference(ExecutionReference ref)
       throws ExecutorManagerException;
 
   public void removeActiveExecutableReference(int execId)
       throws ExecutorManagerException;
 
+
+  /**
+   * <pre>
+   * Unset executor Id for an execution
+   * Note:-
+   * throws an Exception in case of a SQL issue
+   * </pre>
+   *
+   * @param executorId
+   * @param execId
+   * @throws ExecutorManagerException
+   */
+  public void unassignExecutor(int executionId) throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Set an executor Id to an execution
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. throws an Exception in case executionId or executorId do not exist
+   * </pre>
+   *
+   * @param executorId
+   * @param execId
+   * @throws ExecutorManagerException
+   */
+  public void assignExecutor(int executorId, int execId)
+    throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetches an executor corresponding to a given execution
+   * Note:-
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found with the given executionId
+   * </pre>
+   *
+   * @param executionId
+   * @return fetched Executor
+   * @throws ExecutorManagerException
+   */
+  public Executor fetchExecutorByExecutionId(int executionId)
+    throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Fetch queued flows which have not yet dispatched
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return empty list when no queued execution is found
+   * </pre>
+   *
+   * @return List of queued flows and corresponding execution reference
+   * @throws ExecutorManagerException
+   */
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+    throws ExecutorManagerException;
+
   public boolean updateExecutableReference(int execId, long updateTime)
       throws ExecutorManagerException;
 
@@ -105,5 +281,4 @@ public interface ExecutorLoader {
 
   public int removeExecutionLogsByTime(long millis)
       throws ExecutorManagerException;
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
new file mode 100644
index 0000000..5598a0d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLogEvent.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Date;
+
+/**
+ * Class to represent events on Azkaban executors
+ *
+ * @author gaggarwa
+ */
+public class ExecutorLogEvent {
+  /**
+   * Log event type messages. Do not change the numeric representation of each
+   * enum. Only represent from 0 to 255 different codes.
+   */
+  public static enum EventType {
+    ERROR(128), HOST_UPDATE(1), PORT_UPDATE(2), ACTIVATION(3), INACTIVATION(4),
+    CREATED(5);
+
+    private int numVal;
+
+    EventType(int numVal) {
+      this.numVal = numVal;
+    }
+
+    public int getNumVal() {
+      return numVal;
+    }
+
+    public static EventType fromInteger(int x)
+        throws IllegalArgumentException {
+      switch (x) {
+      case 1:
+        return HOST_UPDATE;
+      case 2:
+        return PORT_UPDATE;
+      case 3:
+        return ACTIVATION;
+      case 4:
+        return INACTIVATION;
+      case 5:
+        return CREATED;
+      case 128:
+        return ERROR;
+      default:
+        throw new IllegalArgumentException(String.format(
+          "inalid status code %d", x));
+      }
+    }
+  }
+
+  private final int executorId;
+  private final String user;
+  private final Date time;
+  private final EventType type;
+  private final String message;
+
+  public ExecutorLogEvent(int executorId, String user, Date time,
+    EventType type, String message) {
+    this.executorId = executorId;
+    this.user = user;
+    this.time = time;
+    this.type = type;
+    this.message = message;
+  }
+
+  public int getExecutorId() {
+    return executorId;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public Date getTime() {
+    return time;
+  }
+
+  public EventType getType() {
+    return type;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 7fb61bd..1a46230 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -20,23 +20,25 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
@@ -44,6 +46,9 @@ import azkaban.alert.Alerter;
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
 import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutorComparator;
+import azkaban.executor.selector.ExecutorFilter;
+import azkaban.executor.selector.ExecutorSelector;
 import azkaban.project.Project;
 import azkaban.project.ProjectWhitelist;
 import azkaban.scheduler.ScheduleStatisticManager;
@@ -59,10 +64,27 @@ import azkaban.utils.Props;
  */
 public class ExecutorManager extends EventHandler implements
     ExecutorManagerAdapter {
+  static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
+      "azkaban.executorselector.filters";
+  static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
+      "azkaban.executorselector.comparator.";
+  static final String AZKABAN_QUEUEPROCESSING_ENABLED =
+    "azkaban.queueprocessing.enabled";
+  static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
+    "azkaban.use.multiple.executors";
+  private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
+    "azkaban.webserver.queue.size";
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
+    "azkaban.activeexecutor.refresh.milisecinterval";
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
+    "azkaban.activeexecutor.refresh.flowinterval";
+  private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
+      "azkaban.executorinfo.refresh.maxThreads";
+  private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
+    "azkaban.maxDispatchingErrors";
+
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
   private ExecutorLoader executorLoader;
-  private String executorHost;
-  private int executorPort;
 
   private CleanerThread cleanerThread;
 
@@ -71,8 +93,13 @@ public class ExecutorManager extends EventHandler implements
   private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
       new ConcurrentHashMap<Integer, ExecutableFlow>();
 
-  private ExecutingManagerUpdaterThread executingManager;
+  QueuedExecutions queuedFlows;
+
+  final private Set<Executor> activeExecutors = new HashSet<Executor>();
+  private QueueProcessorThread queueProcessor;
 
+  private ExecutingManagerUpdaterThread executingManager;
+  // 12 weeks
   private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
       * 24 * 60 * 60 * 1000L;
   private long lastCleanerThreadCheckTime = -1;
@@ -84,28 +111,252 @@ public class ExecutorManager extends EventHandler implements
 
   File cacheDir;
 
-  public ExecutorManager(Props props, ExecutorLoader loader,
-      Map<String, Alerter> alters) throws ExecutorManagerException {
+  private final Props azkProps;
+  private List<String> filterList;
+  private Map<String, Integer> comparatorWeightsMap;
+  private long lastSuccessfulExecutorInfoRefresh;
+  private ExecutorService executorInforRefresherService;
+
+  public ExecutorManager(Props azkProps, ExecutorLoader loader,
+      Map<String, Alerter> alerters) throws ExecutorManagerException {
+    this.alerters = alerters;
+    this.azkProps = azkProps;
     this.executorLoader = loader;
+    this.setupExecutors();
     this.loadRunningFlows();
-    executorHost = props.getString("executor.host", "localhost");
-    executorPort = props.getInt("executor.port");
 
-    alerters = alters;
+    queuedFlows =
+        new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+    this.loadQueuedFlows();
 
-    cacheDir = new File(props.getString("cache.directory", "cache"));
+    cacheDir = new File(azkProps.getString("cache.directory", "cache"));
 
     executingManager = new ExecutingManagerUpdaterThread();
     executingManager.start();
 
+    if(isMultiExecutorMode()) {
+      setupMultiExecutorMode();
+    }
+
     long executionLogsRetentionMs =
-        props.getLong("execution.logs.retention.ms",
-            DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+        azkProps.getLong("execution.logs.retention.ms",
+        DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
     cleanerThread.start();
 
   }
 
+  private void setupMultiExecutorMode() {
+    // initliatize hard filters for executor selector from azkaban.properties
+    String filters = azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
+    if (filters != null) {
+      filterList = Arrays.asList(StringUtils.split(filters, ","));
+    }
+
+    // initliatize comparator feature weights for executor selector from
+    // azkaban.properties
+    Map<String, String> compListStrings =
+      azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+    if (compListStrings != null) {
+      comparatorWeightsMap = new TreeMap<String, Integer>();
+      for (Map.Entry<String, String> entry : compListStrings.entrySet()) {
+        comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
+      }
+    }
+
+    executorInforRefresherService =
+        Executors.newFixedThreadPool(azkProps.getInt(
+          AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+
+    // configure queue processor
+    queueProcessor =
+      new QueueProcessorThread(azkProps.getBoolean(
+        AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
+        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), azkProps.getInt(
+        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), azkProps.getInt(
+        AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, activeExecutors.size()));
+
+    queueProcessor.start();
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorManagerAdapter#setupExecutors()
+   */
+  @Override
+  public void setupExecutors() throws ExecutorManagerException {
+    Set<Executor> newExecutors = new HashSet<Executor>();
+
+    if (isMultiExecutorMode()) {
+      logger.info("Initializing multi executors from database");
+      newExecutors.addAll(executorLoader.fetchActiveExecutors());
+    } else if (azkProps.containsKey("executor.port")) {
+      // Add local executor, if specified as per properties
+      String executorHost = azkProps.getString("executor.host", "localhost");
+      int executorPort = azkProps.getInt("executor.port");
+      logger.info(String.format("Initializing local executor %s:%d",
+        executorHost, executorPort));
+      Executor executor =
+        executorLoader.fetchExecutor(executorHost, executorPort);
+      if (executor == null) {
+        executor = executorLoader.addExecutor(executorHost, executorPort);
+      } else if (!executor.isActive()) {
+        executor.setActive(true);
+        executorLoader.updateExecutor(executor);
+      }
+      newExecutors.add(new Executor(executor.getId(), executorHost,
+        executorPort, true));
+    }
+
+    if (newExecutors.isEmpty()) {
+      logger.error("No active executor found");
+      throw new ExecutorManagerException("No active executor found");
+    } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+      logger.error("Multiple local executors specified");
+      throw new ExecutorManagerException("Multiple local executors specified");
+    } else {
+      // clear all active executors, only if we have at least one new active
+      // executors
+      activeExecutors.clear();
+      activeExecutors.addAll(newExecutors);
+    }
+  }
+
+  private boolean isMultiExecutorMode() {
+    return azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
+  }
+
+  /**
+   * Refresh Executor stats for all the actie executors in this executorManager
+   */
+  private void refreshExecutors() {
+    synchronized (activeExecutors) {
+
+      List<Pair<Executor, Future<String>>> futures =
+        new ArrayList<Pair<Executor, Future<String>>>();
+      for (final Executor executor : activeExecutors) {
+        // execute each executorInfo refresh task to fetch
+        Future<String> fetchExecutionInfo =
+          executorInforRefresherService.submit(new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+              return callExecutorForJsonString(executor.getHost(),
+                executor.getPort(), "/serverStatistics", null);
+            }
+          });
+        futures.add(new Pair<Executor, Future<String>>(executor,
+          fetchExecutionInfo));
+      }
+
+      boolean wasSuccess = true;
+      for (Pair<Executor, Future<String>> refreshPair : futures) {
+        Executor executor = refreshPair.getFirst();
+        executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
+        try {
+          // max 5 secs
+          String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+          executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
+          logger.info(String.format(
+            "Successfully refreshed executor: %s with executor info : %s",
+            executor, jsonString));
+        } catch (TimeoutException e) {
+          wasSuccess = false;
+          logger.error("Timed out while waiting for ExecutorInfo refresh"
+            + executor, e);
+        } catch (Exception e) {
+          wasSuccess = false;
+          logger.error("Failed to update ExecutorInfo for executor : "
+            + executor, e);
+        }
+      }
+
+      // update is successful for all executors
+      if (wasSuccess) {
+        lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
+      }
+    }
+  }
+
+  /**
+   * Throws exception if running in local mode
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorManagerAdapter#disableQueueProcessorThread()
+   */
+  @Override
+  public void disableQueueProcessorThread() throws ExecutorManagerException {
+    if (isMultiExecutorMode()) {
+      queueProcessor.setActive(false);
+    } else {
+      throw new ExecutorManagerException(
+        "Cannot disable QueueProcessor in local mode");
+    }
+  }
+
+  /**
+   * Throws exception if running in local mode
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorManagerAdapter#enableQueueProcessorThread()
+   */
+  @Override
+  public void enableQueueProcessorThread() throws ExecutorManagerException {
+    if (isMultiExecutorMode()) {
+      queueProcessor.setActive(true);
+    } else {
+      throw new ExecutorManagerException(
+        "Cannot enable QueueProcessor in local mode");
+    }
+  }
+
+  public State getQueueProcessorThreadState() {
+    if (isMultiExecutorMode())
+      return queueProcessor.getState();
+    else
+      return State.NEW; // not started in local mode
+  }
+
+  /**
+   * Returns state of QueueProcessor False, no flow is being dispatched True ,
+   * flows are being dispatched as expected
+   *
+   * @return
+   */
+  public boolean isQueueProcessorThreadActive() {
+    if (isMultiExecutorMode())
+      return queueProcessor.isActive();
+    else
+      return false;
+  }
+
+  /**
+   * Return last Successful ExecutorInfo Refresh for all active executors
+   *
+   * @return
+   */
+  public long getLastSuccessfulExecutorInfoRefresh() {
+    return this.lastSuccessfulExecutorInfoRefresh;
+  }
+
+  /**
+   * Get currently supported Comparators available to use via azkaban.properties
+   *
+   * @return
+   */
+  public Set<String> getAvailableExecutorComparatorNames() {
+    return ExecutorComparator.getAvailableComparatorNames();
+
+  }
+
+  /**
+   * Get currently supported filters available to use via azkaban.properties
+   *
+   * @return
+   */
+  public Set<String> getAvailableExecutorFilterNames() {
+    return ExecutorFilter.getAvailableFilterNames();
+  }
+
   @Override
   public State getExecutorManagerThreadState() {
     return executingManager.getState();
@@ -130,10 +381,33 @@ public class ExecutorManager extends EventHandler implements
   }
 
   @Override
+  public Collection<Executor> getAllActiveExecutors() {
+    return Collections.unmodifiableCollection(activeExecutors);
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#fetchExecutor(int)
+   */
+  @Override
+  public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+    for (Executor executor : activeExecutors) {
+      if (executor.getId() == executorId) {
+        return executor;
+      }
+    }
+    return executorLoader.fetchExecutor(executorId);
+  }
+
+  @Override
   public Set<String> getPrimaryServerHosts() {
     // Only one for now. More probably later.
     HashSet<String> ports = new HashSet<String>();
-    ports.add(executorHost + ":" + executorPort);
+    for (Executor executor : activeExecutors) {
+      ports.add(executor.getHost() + ":" + executor.getPort());
+    }
     return ports;
   }
 
@@ -141,71 +415,211 @@ public class ExecutorManager extends EventHandler implements
   public Set<String> getAllActiveExecutorServerHosts() {
     // Includes non primary server/hosts
     HashSet<String> ports = new HashSet<String>();
-    ports.add(executorHost + ":" + executorPort);
+    for (Executor executor : activeExecutors) {
+      ports.add(executor.getHost() + ":" + executor.getPort());
+    }
+    // include executor which were initially active and still has flows running
     for (Pair<ExecutionReference, ExecutableFlow> running : runningFlows
-        .values()) {
+      .values()) {
       ExecutionReference ref = running.getFirst();
       ports.add(ref.getHost() + ":" + ref.getPort());
     }
-
     return ports;
   }
 
   private void loadRunningFlows() throws ExecutorManagerException {
     runningFlows.putAll(executorLoader.fetchActiveFlows());
+    // Finalize all flows which were running on an executor which is now
+    // inactive
+    for (Pair<ExecutionReference, ExecutableFlow> pair : runningFlows.values()) {
+      if (!activeExecutors.contains(pair.getFirst().getExecutor())) {
+        finalizeFlows(pair.getSecond());
+      }
+    }
   }
 
+  /*
+   * load queued flows i.e with active_execution_reference and not assigned to
+   * any executor
+   */
+  private void loadQueuedFlows() throws ExecutorManagerException {
+    List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
+      executorLoader.fetchQueuedFlows();
+    if (retrievedExecutions != null) {
+      for (Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
+        queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+      }
+    }
+  }
+
+  /**
+   * Gets a list of all the active (running flows and non-dispatched flows)
+   * executions for a given project and flow {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
+   *      java.lang.String)
+   */
   @Override
   public List<Integer> getRunningFlows(int projectId, String flowId) {
-    ArrayList<Integer> executionIds = new ArrayList<Integer>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+    List<Integer> executionIds = new ArrayList<Integer>();
+    executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+      queuedFlows.getAllEntries()));
+    executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
+      runningFlows.values()));
+    return executionIds;
+  }
+
+  /* Helper method for getRunningFlows */
+  private List<Integer> getRunningFlowsHelper(int projectId, String flowId,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    List<Integer> executionIds = new ArrayList<Integer>();
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       if (ref.getSecond().getFlowId().equals(flowId)
-          && ref.getSecond().getProjectId() == projectId) {
+        && ref.getSecond().getProjectId() == projectId) {
         executionIds.add(ref.getFirst().getExecId());
       }
     }
     return executionIds;
   }
 
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
+   */
+  @Override
+  public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+    throws IOException {
+    List<Pair<ExecutableFlow, Executor>> flows =
+      new ArrayList<Pair<ExecutableFlow, Executor>>();
+    getActiveFlowsWithExecutorHelper(flows, queuedFlows.getAllEntries());
+    getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
+    return flows;
+  }
+
+  /* Helper method for getActiveFlowsWithExecutor */
+  private void getActiveFlowsWithExecutorHelper(
+    List<Pair<ExecutableFlow, Executor>> flows,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+      flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
+        .getFirst().getExecutor()));
+    }
+  }
+
+  /**
+   * Checks whether the given flow has an active (running, non-dispatched)
+   * executions {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#isFlowRunning(int,
+   *      java.lang.String)
+   */
   @Override
   public boolean isFlowRunning(int projectId, String flowId) {
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+    boolean isRunning = false;
+    isRunning =
+      isRunning
+        || isFlowRunningHelper(projectId, flowId, queuedFlows.getAllEntries());
+    isRunning =
+      isRunning
+        || isFlowRunningHelper(projectId, flowId, runningFlows.values());
+    return isRunning;
+  }
+
+  /* Search a running flow in a collection */
+  private boolean isFlowRunningHelper(int projectId, String flowId,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       if (ref.getSecond().getProjectId() == projectId
-          && ref.getSecond().getFlowId().equals(flowId)) {
+        && ref.getSecond().getFlowId().equals(flowId)) {
         return true;
       }
     }
     return false;
   }
 
+  /**
+   * Fetch ExecutableFlow from an active (running, non-dispatched) or from
+   * database {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getExecutableFlow(int)
+   */
   @Override
   public ExecutableFlow getExecutableFlow(int execId)
-      throws ExecutorManagerException {
-    Pair<ExecutionReference, ExecutableFlow> active = runningFlows.get(execId);
-    if (active == null) {
+    throws ExecutorManagerException {
+    if (runningFlows.containsKey(execId)) {
+      return runningFlows.get(execId).getSecond();
+    } else if (queuedFlows.hasExecution(execId)) {
+      return queuedFlows.getFlow(execId);
+    } else {
       return executorLoader.fetchExecutableFlow(execId);
     }
-    return active.getSecond();
   }
 
+  /**
+   * Get all active (running, non-dispatched) flows
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+   */
   @Override
   public List<ExecutableFlow> getRunningFlows() {
     ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+    getActiveFlowHelper(flows, queuedFlows.getAllEntries());
+    getActiveFlowHelper(flows, runningFlows.values());
+    return flows;
+  }
+
+  /*
+   * Helper method to get all running flows from a Pair<ExecutionReference,
+   * ExecutableFlow collection
+   */
+  private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
       flows.add(ref.getSecond());
     }
-    return flows;
   }
 
+  /**
+   * Get execution Ids of all active (running, non-dispatched) flows
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+   */
   public String getRunningFlowIds() {
     List<Integer> allIds = new ArrayList<Integer>();
-    for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
-      allIds.add(ref.getSecond().getExecutionId());
-    }
+    getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
+    getRunningFlowsIdsHelper(allIds, runningFlows.values());
     Collections.sort(allIds);
     return allIds.toString();
   }
 
+  /**
+   * Get execution Ids of all non-dispatched flows
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+   */
+  public String getQueuedFlowIds() {
+    List<Integer> allIds = new ArrayList<Integer>();
+    getRunningFlowsIdsHelper(allIds, queuedFlows.getAllEntries());
+    Collections.sort(allIds);
+    return allIds.toString();
+  }
+
+  /* Helper method to flow ids of all running flows */
+  private void getRunningFlowsIdsHelper(List<Integer> allIds,
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+      allIds.add(ref.getSecond().getExecutionId());
+    }
+  }
+
   public List<ExecutableFlow> getRecentlyFinishedFlows() {
     return new ArrayList<ExecutableFlow>(recentlyFinished.values());
   }
@@ -371,18 +785,30 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
+  /**
+   * if flows was dispatched to an executor, cancel by calling Executor else if
+   * flow is still in queue, remove from queue and finalize {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#cancelFlow(azkaban.executor.ExecutableFlow,
+   *      java.lang.String)
+   */
   @Override
   public void cancelFlow(ExecutableFlow exFlow, String userId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     synchronized (exFlow) {
-      Pair<ExecutionReference, ExecutableFlow> pair =
+      if (runningFlows.containsKey(exFlow.getExecutionId())) {
+        Pair<ExecutionReference, ExecutableFlow> pair =
           runningFlows.get(exFlow.getExecutionId());
-      if (pair == null) {
+        callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
+          userId);
+      } else if (queuedFlows.hasExecution(exFlow.getExecutionId())) {
+        queuedFlows.dequeue(exFlow.getExecutionId());
+        finalizeFlows(exFlow);
+      } else {
         throw new ExecutorManagerException("Execution "
-            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
-            + " isn't running.");
+          + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+          + " isn't running.");
       }
-      callExecutorServer(pair.getFirst(), ConnectorParams.CANCEL_ACTION, userId);
     }
   }
 
@@ -539,80 +965,95 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public String submitExecutableFlow(ExecutableFlow exflow, String userId)
-      throws ExecutorManagerException {
+    throws ExecutorManagerException {
     synchronized (exflow) {
-      logger.info("Submitting execution flow " + exflow.getFlowId() + " by "
-          + userId);
-
-      int projectId = exflow.getProjectId();
       String flowId = exflow.getFlowId();
-      exflow.setSubmitUser(userId);
-      exflow.setSubmitTime(System.currentTimeMillis());
 
-      List<Integer> running = getRunningFlows(projectId, flowId);
-
-      ExecutionOptions options = exflow.getExecutionOptions();
-      if (options == null) {
-        options = new ExecutionOptions();
-      }
+      logger.info("Submitting execution flow " + flowId + " by " + userId);
 
       String message = "";
-      if (options.getDisabledJobs() != null) {
-        applyDisabledJobs(options.getDisabledJobs(), exflow);
-      }
+      if (queuedFlows.isFull()) {
+        message =
+          String
+            .format(
+              "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
+              flowId, exflow.getProjectName());
+        logger.error(message);
+      } else {
+        int projectId = exflow.getProjectId();
+        exflow.setSubmitUser(userId);
+        exflow.setSubmitTime(System.currentTimeMillis());
+
+        List<Integer> running = getRunningFlows(projectId, flowId);
+
+        ExecutionOptions options = exflow.getExecutionOptions();
+        if (options == null) {
+          options = new ExecutionOptions();
+        }
 
-      if (!running.isEmpty()) {
-        if (options.getConcurrentOption().equals(
+        if (options.getDisabledJobs() != null) {
+          applyDisabledJobs(options.getDisabledJobs(), exflow);
+        }
+
+        if (!running.isEmpty()) {
+          if (options.getConcurrentOption().equals(
             ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
-          Collections.sort(running);
-          Integer runningExecId = running.get(running.size() - 1);
+            Collections.sort(running);
+            Integer runningExecId = running.get(running.size() - 1);
 
-          options.setPipelineExecutionId(runningExecId);
-          message =
+            options.setPipelineExecutionId(runningExecId);
+            message =
               "Flow " + flowId + " is already running with exec id "
-                  + runningExecId + ". Pipelining level "
-                  + options.getPipelineLevel() + ". \n";
-        } else if (options.getConcurrentOption().equals(
+                + runningExecId + ". Pipelining level "
+                + options.getPipelineLevel() + ". \n";
+          } else if (options.getConcurrentOption().equals(
             ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
-          throw new ExecutorManagerException("Flow " + flowId
+            throw new ExecutorManagerException("Flow " + flowId
               + " is already running. Skipping execution.",
               ExecutorManagerException.Reason.SkippedExecution);
-        } else {
-          // The settings is to run anyways.
-          message =
+          } else {
+            // The settings is to run anyways.
+            message =
               "Flow " + flowId + " is already running with exec id "
-                  + StringUtils.join(running, ",")
-                  + ". Will execute concurrently. \n";
+                + StringUtils.join(running, ",")
+                + ". Will execute concurrently. \n";
+          }
         }
-      }
 
-      boolean memoryCheck = !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
-              ProjectWhitelist.WhitelistType.MemoryCheck);
-      options.setMemoryCheck(memoryCheck);
-
-      // The exflow id is set by the loader. So it's unavailable until after
-      // this call.
-      executorLoader.uploadExecutableFlow(exflow);
-
-      // We create an active flow reference in the datastore. If the upload
-      // fails, we remove the reference.
-      ExecutionReference reference =
-          new ExecutionReference(exflow.getExecutionId(), executorHost,
-              executorPort);
-      executorLoader.addActiveExecutableReference(reference);
-      try {
-        callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
-        runningFlows.put(exflow.getExecutionId(),
-            new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+        boolean memoryCheck =
+          !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+            ProjectWhitelist.WhitelistType.MemoryCheck);
+        options.setMemoryCheck(memoryCheck);
 
+        // The exflow id is set by the loader. So it's unavailable until after
+        // this call.
+        executorLoader.uploadExecutableFlow(exflow);
+
+        // We create an active flow reference in the datastore. If the upload
+        // fails, we remove the reference.
+        ExecutionReference reference =
+          new ExecutionReference(exflow.getExecutionId());
+
+        if (isMultiExecutorMode()) {
+          //Take MultiExecutor route
+          executorLoader.addActiveExecutableReference(reference);
+          queuedFlows.enqueue(exflow, reference);
+        } else {
+          // assign only local executor we have
+          Executor choosenExecutor = activeExecutors.iterator().next();
+          executorLoader.addActiveExecutableReference(reference);
+          try {
+            dispatch(reference, exflow, choosenExecutor);
+          } catch (ExecutorManagerException e) {
+            executorLoader.removeActiveExecutableReference(reference
+              .getExecId());
+            throw e;
+          }
+        }
         message +=
-            "Execution submitted successfully with exec id "
-                + exflow.getExecutionId();
-      } catch (ExecutorManagerException e) {
-        executorLoader.removeActiveExecutableReference(reference.getExecId());
-        throw e;
+          "Execution submitted successfully with exec id "
+            + exflow.getExecutionId();
       }
-
       return message;
     }
   }
@@ -626,11 +1067,11 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-  private Map<String, Object> callExecutorServer(ExecutionReference ref,
-      String action) throws ExecutorManagerException {
+  private Map<String, Object> callExecutorServer(ExecutableFlow exflow,
+    Executor executor, String action) throws ExecutorManagerException {
     try {
-      return callExecutorServer(ref.getHost(), ref.getPort(), action,
-          ref.getExecId(), null, (Pair<String, String>[]) null);
+      return callExecutorServer(executor.getHost(), executor.getPort(), action,
+        exflow.getExecutionId(), null, (Pair<String, String>[]) null);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
     }
@@ -671,150 +1112,114 @@ public class ExecutorManager extends EventHandler implements
   private Map<String, Object> callExecutorServer(String host, int port,
       String action, Integer executionId, String user,
       Pair<String, String>... params) throws IOException {
-    URIBuilder builder = new URIBuilder();
-    builder.setScheme("http").setHost(host).setPort(port).setPath("/executor");
-
-    builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+    List<Pair<String, String>> paramList = new ArrayList<Pair<String,String>>();
 
-    if (executionId != null) {
-      builder.setParameter(ConnectorParams.EXECID_PARAM,
-          String.valueOf(executionId));
+    // if params = null
+    if(params != null) {
+      paramList.addAll(Arrays.asList(params));
     }
 
-    if (user != null) {
-      builder.setParameter(ConnectorParams.USER_PARAM, user);
-    }
-
-    if (params != null) {
-      for (Pair<String, String> pair : params) {
-        builder.setParameter(pair.getFirst(), pair.getSecond());
-      }
-    }
+    paramList
+      .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
+    paramList.add(new Pair<String, String>(ConnectorParams.EXECID_PARAM, String
+      .valueOf(executionId)));
+    paramList.add(new Pair<String, String>(ConnectorParams.USER_PARAM, user));
 
-    URI uri = null;
-    try {
-      uri = builder.build();
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
+    Map<String, Object> jsonResponse =
+      callExecutorForJsonObject(host, port, "/executor", paramList);
 
-    ResponseHandler<String> responseHandler = new BasicResponseHandler();
+    return jsonResponse;
+  }
 
-    HttpClient httpclient = new DefaultHttpClient();
-    HttpGet httpget = new HttpGet(uri);
-    String response = null;
-    try {
-      response = httpclient.execute(httpget, responseHandler);
-    } catch (IOException e) {
-      throw e;
-    } finally {
-      httpclient.getConnectionManager().shutdown();
-    }
+  /*
+   * Helper method used by ExecutorManager to call executor and return json
+   * object map
+   */
+  private Map<String, Object> callExecutorForJsonObject(String host, int port,
+    String path, List<Pair<String, String>> paramList) throws IOException {
+    String responseString =
+      callExecutorForJsonString(host, port, path, paramList);
 
     @SuppressWarnings("unchecked")
     Map<String, Object> jsonResponse =
-        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+      (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
     String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
     if (error != null) {
       throw new IOException(error);
     }
-
     return jsonResponse;
   }
 
+  /*
+   * Helper method used by ExecutorManager to call executor and return raw json
+   * string
+   */
+  private String callExecutorForJsonString(String host, int port, String path,
+    List<Pair<String, String>> paramList) throws IOException {
+    if (paramList == null) {
+      paramList = new ArrayList<Pair<String, String>>();
+    }
+
+    ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
+    @SuppressWarnings("unchecked")
+    URI uri =
+      ExecutorApiClient.buildUri(host, port, path, true,
+        paramList.toArray(new Pair[0]));
+
+    return apiclient.httpGet(uri, null);
+  }
+
   /**
    * Manage servlet call for stats servlet in Azkaban execution server
    * {@inheritDoc}
-   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String, azkaban.utils.Pair[])
+   *
+   * @throws ExecutorManagerException
+   *
+   * @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
+   *      azkaban.utils.Pair[])
    */
   @Override
-  public Map<String, Object> callExecutorStats(String action, Pair<String, String>... params) throws IOException {
-
-    URIBuilder builder = new URIBuilder();
-    builder.setScheme("http").setHost(executorHost).setPort(executorPort).setPath("/stats");
+  public Map<String, Object> callExecutorStats(int executorId, String action,
+    Pair<String, String>... params) throws IOException, ExecutorManagerException {
+    Executor executor = fetchExecutor(executorId);
 
-    builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+    List<Pair<String, String>> paramList =
+      new ArrayList<Pair<String, String>>();
 
+    // if params = null
     if (params != null) {
-      for (Pair<String, String> pair : params) {
-        builder.setParameter(pair.getFirst(), pair.getSecond());
-      }
+      paramList.addAll(Arrays.asList(params));
     }
 
-    URI uri = null;
-    try {
-      uri = builder.build();
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
-
-    ResponseHandler<String> responseHandler = new BasicResponseHandler();
+    paramList
+      .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
 
-    HttpClient httpclient = new DefaultHttpClient();
-    HttpGet httpget = new HttpGet(uri);
-    String response = null;
-    try {
-      response = httpclient.execute(httpget, responseHandler);
-    } catch (IOException e) {
-      throw e;
-    } finally {
-      httpclient.getConnectionManager().shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    Map<String, Object> jsonResponse =
-        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
-
-    return jsonResponse;
+    return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
+      "/stats", paramList);
   }
 
 
   @Override
   public Map<String, Object> callExecutorJMX(String hostPort, String action,
       String mBean) throws IOException {
-    URIBuilder builder = new URIBuilder();
-
-    String[] hostPortSplit = hostPort.split(":");
-    builder.setScheme("http").setHost(hostPortSplit[0])
-        .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
+    List<Pair<String, String>> paramList =
+      new ArrayList<Pair<String, String>>();
 
-    builder.setParameter(action, "");
-    if (mBean != null) {
-      builder.setParameter(ConnectorParams.JMX_MBEAN, mBean);
+    paramList.add(new Pair<String, String>(action, ""));
+    if(mBean != null) {
+      paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
     }
 
-    URI uri = null;
-    try {
-      uri = builder.build();
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
-
-    ResponseHandler<String> responseHandler = new BasicResponseHandler();
-
-    HttpClient httpclient = new DefaultHttpClient();
-    HttpGet httpget = new HttpGet(uri);
-    String response = null;
-    try {
-      response = httpclient.execute(httpget, responseHandler);
-    } catch (IOException e) {
-      throw e;
-    } finally {
-      httpclient.getConnectionManager().shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    Map<String, Object> jsonResponse =
-        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
-    String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
-    if (error != null) {
-      throw new IOException(error);
-    }
-    return jsonResponse;
+    String[] hostPortSplit = hostPort.split(":");
+    return callExecutorForJsonObject(hostPortSplit[0],
+      Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
   }
 
   @Override
   public void shutdown() {
+    if (isMultiExecutorMode()) {
+      queueProcessor.shutdown();
+    }
     executingManager.shutdown();
   }
 
@@ -846,7 +1251,7 @@ public class ExecutorManager extends EventHandler implements
           lastThreadCheckTime = System.currentTimeMillis();
           updaterStage = "Starting update all flows.";
 
-          Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
+          Map<Executor, List<ExecutableFlow>> exFlowMap =
               getFlowToExecutorMap();
           ArrayList<ExecutableFlow> finishedFlows =
               new ArrayList<ExecutableFlow>();
@@ -854,16 +1259,16 @@ public class ExecutorManager extends EventHandler implements
               new ArrayList<ExecutableFlow>();
 
           if (exFlowMap.size() > 0) {
-            for (Map.Entry<ConnectionInfo, List<ExecutableFlow>> entry : exFlowMap
+            for (Map.Entry<Executor, List<ExecutableFlow>> entry : exFlowMap
                 .entrySet()) {
               List<Long> updateTimesList = new ArrayList<Long>();
               List<Integer> executionIdsList = new ArrayList<Integer>();
 
-              ConnectionInfo connection = entry.getKey();
+              Executor executor = entry.getKey();
 
               updaterStage =
-                  "Starting update flows on " + connection.getHost() + ":"
-                      + connection.getPort();
+                  "Starting update flows on " + executor.getHost() + ":"
+                      + executor.getPort();
 
               // We pack the parameters of the same host together before we
               // query.
@@ -881,8 +1286,8 @@ public class ExecutorManager extends EventHandler implements
               Map<String, Object> results = null;
               try {
                 results =
-                    callExecutorServer(connection.getHost(),
-                        connection.getPort(), ConnectorParams.UPDATE_ACTION,
+                    callExecutorServer(executor.getHost(),
+                      executor.getPort(), ConnectorParams.UPDATE_ACTION,
                         null, null, executionIds, updateTimes);
               } catch (IOException e) {
                 logger.error(e);
@@ -1222,15 +1627,16 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-  private Map<ConnectionInfo, List<ExecutableFlow>> getFlowToExecutorMap() {
-    HashMap<ConnectionInfo, List<ExecutableFlow>> exFlowMap =
-        new HashMap<ConnectionInfo, List<ExecutableFlow>>();
+  /* Group Executable flow by Executors to reduce number of REST calls */
+  private Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
+    HashMap<Executor, List<ExecutableFlow>> exFlowMap =
+      new HashMap<Executor, List<ExecutableFlow>>();
 
-    ConnectionInfo lastPort = new ConnectionInfo(executorHost, executorPort);
     for (Pair<ExecutionReference, ExecutableFlow> runningFlow : runningFlows
-        .values()) {
+      .values()) {
       ExecutionReference ref = runningFlow.getFirst();
       ExecutableFlow flow = runningFlow.getSecond();
+      Executor executor = ref.getExecutor();
 
       // We can set the next check time to prevent the checking of certain
       // flows.
@@ -1238,16 +1644,10 @@ public class ExecutorManager extends EventHandler implements
         continue;
       }
 
-      // Just a silly way to reduce object creation construction of objects
-      // since it's most likely that the values will be the same.
-      if (!lastPort.isEqual(ref.getHost(), ref.getPort())) {
-        lastPort = new ConnectionInfo(ref.getHost(), ref.getPort());
-      }
-
-      List<ExecutableFlow> flows = exFlowMap.get(lastPort);
+      List<ExecutableFlow> flows = exFlowMap.get(executor);
       if (flows == null) {
         flows = new ArrayList<ExecutableFlow>();
-        exFlowMap.put(lastPort, flows);
+        exFlowMap.put(executor, flows);
       }
 
       flows.add(flow);
@@ -1256,61 +1656,6 @@ public class ExecutorManager extends EventHandler implements
     return exFlowMap;
   }
 
-  private static class ConnectionInfo {
-    private String host;
-    private int port;
-
-    public ConnectionInfo(String host, int port) {
-      this.host = host;
-      this.port = port;
-    }
-
-    @SuppressWarnings("unused")
-    private ConnectionInfo getOuterType() {
-      return ConnectionInfo.this;
-    }
-
-    public boolean isEqual(String host, int port) {
-      return this.port == port && this.host.equals(host);
-    }
-
-    public String getHost() {
-      return host;
-    }
-
-    public int getPort() {
-      return port;
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((host == null) ? 0 : host.hashCode());
-      result = prime * result + port;
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      ConnectionInfo other = (ConnectionInfo) obj;
-      if (host == null) {
-        if (other.host != null)
-          return false;
-      } else if (!host.equals(other.host))
-        return false;
-      if (port != other.port)
-        return false;
-      return true;
-    }
-  }
-
   @Override
   public int getExecutableFlows(int projectId, String flowId, int from,
       int length, List<ExecutableFlow> outputList)
@@ -1384,4 +1729,257 @@ public class ExecutorManager extends EventHandler implements
           - executionLogsRetentionMs);
     }
   }
+
+  /**
+   * Calls executor to dispatch the flow, update db to assign the executor and
+   * in-memory state of executableFlow
+   */
+  private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+    Executor choosenExecutor) throws ExecutorManagerException {
+    exflow.setUpdateTime(System.currentTimeMillis());
+
+    executorLoader.assignExecutor(choosenExecutor.getId(),
+      exflow.getExecutionId());
+    try {
+      callExecutorServer(exflow, choosenExecutor,
+        ConnectorParams.EXECUTE_ACTION);
+    } catch (ExecutorManagerException ex) {
+      logger.error("Rolling back executor assignment for execution id:"
+        + exflow.getExecutionId(), ex);
+      executorLoader.unassignExecutor(exflow.getExecutionId());
+      throw new ExecutorManagerException(ex);
+    }
+    reference.setExecutor(choosenExecutor);
+
+    // move from flow to running flows
+    runningFlows.put(exflow.getExecutionId(),
+      new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+    logger.info(String.format(
+      "Successfully dispatched exec %d with error count %d",
+      exflow.getExecutionId(), reference.getNumErrors()));
+  }
+
+  /*
+   * This thread is responsible for processing queued flows using dispatcher and
+   * making rest api calls to executor server
+   */
+  private class QueueProcessorThread extends Thread {
+    private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
+    private final int maxDispatchingErrors;
+    private final long activeExecutorRefreshWindowInMilisec;
+    private final int activeExecutorRefreshWindowInFlows;
+
+    private volatile boolean shutdown = false;
+    private volatile boolean isActive = true;
+
+    public QueueProcessorThread(boolean isActive,
+      long activeExecutorRefreshWindowInTime,
+      int activeExecutorRefreshWindowInFlows,
+      int maxDispatchingErrors) {
+      setActive(isActive);
+      this.maxDispatchingErrors = maxDispatchingErrors;
+      this.activeExecutorRefreshWindowInFlows =
+        activeExecutorRefreshWindowInFlows;
+      this.activeExecutorRefreshWindowInMilisec =
+        activeExecutorRefreshWindowInTime;
+      this.setName("AzkabanWebServer-QueueProcessor-Thread");
+    }
+
+    public void setActive(boolean isActive) {
+      this.isActive = isActive;
+      logger.info("QueueProcessorThread active turned " + this.isActive);
+    }
+
+    public boolean isActive() {
+      return isActive;
+    }
+
+    public void shutdown() {
+      shutdown = true;
+      this.interrupt();
+    }
+
+    public void run() {
+      // Loops till QueueProcessorThread is shutdown
+      while (!shutdown) {
+        synchronized (this) {
+          try {
+            // start processing queue if active, other wait for sometime
+            if (isActive) {
+              processQueuedFlows(activeExecutorRefreshWindowInMilisec,
+                activeExecutorRefreshWindowInFlows);
+            }
+            wait(QUEUE_PROCESSOR_WAIT_IN_MS);
+          } catch (Exception e) {
+            logger.error(
+              "QueueProcessorThread Interrupted. Probably to shut down.", e);
+          }
+        }
+      }
+    }
+
+    /* Method responsible for processing the non-dispatched flows */
+    private void processQueuedFlows(long activeExecutorsRefreshWindow,
+      int maxContinuousFlowProcessed) throws InterruptedException,
+      ExecutorManagerException {
+      long lastExecutorRefreshTime = 0;
+      Pair<ExecutionReference, ExecutableFlow> runningCandidate;
+      int currentContinuousFlowProcessed = 0;
+
+      while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
+        ExecutionReference reference = runningCandidate.getFirst();
+        ExecutableFlow exflow = runningCandidate.getSecond();
+
+        long currentTime = System.currentTimeMillis();
+
+        // if we have dispatched more than maxContinuousFlowProcessed or
+        // It has been more then activeExecutorsRefreshWindow millisec since we
+        // refreshed
+        if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
+          || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
+          // Refresh executorInfo for all activeExecutors
+          refreshExecutors();
+          lastExecutorRefreshTime = currentTime;
+          currentContinuousFlowProcessed = 0;
+        }
+
+        /**
+         * <pre>
+         *  TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
+         *        Currently we try each queued flow once to infer a global busy state
+         * Possible improvements:-
+         *   1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
+         *   2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
+         *      taking out all the filters which do not depend on the flow but are still being part of Selector.
+         * Assumptions:-
+         *   1. no one else except QueueProcessor is updating ExecutableFlow update time
+         *   2. re-attempting a flow (which has been tried before) is considered as all executors are busy
+         * </pre>
+         */
+        if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
+          // put back in the queue
+          queuedFlows.enqueue(exflow, reference);
+          long sleepInterval =
+            activeExecutorsRefreshWindow
+              - (currentTime - lastExecutorRefreshTime);
+          // wait till next executor refresh
+          sleep(sleepInterval);
+        } else {
+          exflow.setUpdateTime(currentTime);
+          // process flow with current snapshot of activeExecutors
+          selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+        }
+        currentContinuousFlowProcessed++;
+      }
+    }
+
+    /* process flow with a snapshot of available Executors */
+    private void selectExecutorAndDispatchFlow(ExecutionReference reference,
+      ExecutableFlow exflow, Set<Executor> availableExecutors)
+      throws ExecutorManagerException {
+      synchronized (exflow) {
+        Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
+        if (selectedExecutor != null) {
+          try {
+            dispatch(reference, exflow, selectedExecutor);
+          } catch (ExecutorManagerException e) {
+            logger.warn(String.format(
+              "Executor %s responded with exception for exec: %d",
+              selectedExecutor, exflow.getExecutionId()), e);
+            handleDispatchExceptionCase(reference, exflow, selectedExecutor,
+              availableExecutors);
+          }
+        } else {
+          handleNoExecutorSelectedCase(reference, exflow);
+        }
+      }
+    }
+
+    /* Helper method to fetch  overriding Executor, if a valid user has specifed otherwise return null */
+    private Executor getUserSpecifiedExecutor(ExecutionOptions options,
+      int executionId) {
+      Executor executor = null;
+      if (options != null
+        && options.getFlowParameters() != null
+        && options.getFlowParameters().containsKey(
+          ExecutionOptions.USE_EXECUTOR)) {
+        try {
+          int executorId =
+            Integer.valueOf(options.getFlowParameters().get(
+              ExecutionOptions.USE_EXECUTOR));
+          executor = fetchExecutor(executorId);
+
+          if (executor == null) {
+            logger
+              .warn(String
+                .format(
+                  "User specified executor id: %d for execution id: %d is not active, Looking up db.",
+                  executorId, executionId));
+            executor = executorLoader.fetchExecutor(executorId);
+            if (executor == null) {
+              logger
+                .warn(String
+                  .format(
+                    "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
+                    executorId, executionId));
+            }
+          }
+        } catch (ExecutorManagerException ex) {
+          logger.error("Failed to fetch user specified executor for exec_id = "
+            + executionId, ex);
+        }
+      }
+      return executor;
+    }
+
+    /* Choose Executor for exflow among the available executors */
+    private Executor selectExecutor(ExecutableFlow exflow,
+      Set<Executor> availableExecutors) {
+      Executor choosenExecutor =
+        getUserSpecifiedExecutor(exflow.getExecutionOptions(),
+          exflow.getExecutionId());
+
+      // If no executor was specified by admin
+      if (choosenExecutor == null) {
+        logger.info("Using dispatcher for execution id :"
+          + exflow.getExecutionId());
+        ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
+        choosenExecutor = selector.getBest(availableExecutors, exflow);
+      }
+      return choosenExecutor;
+    }
+
+    private void handleDispatchExceptionCase(ExecutionReference reference,
+      ExecutableFlow exflow, Executor lastSelectedExecutor,
+      Set<Executor> remainingExecutors) throws ExecutorManagerException {
+      logger
+        .info(String
+          .format(
+            "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+            exflow.getExecutionId(), reference.getNumErrors()));
+      reference.setNumErrors(reference.getNumErrors() + 1);
+      if (reference.getNumErrors() > this.maxDispatchingErrors
+        || remainingExecutors.size() <= 1) {
+        logger.error("Failed to process queued flow");
+        finalizeFlows(exflow);
+      } else {
+        remainingExecutors.remove(lastSelectedExecutor);
+        // try other executors except chosenExecutor
+        selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
+      }
+    }
+
+    private void handleNoExecutorSelectedCase(ExecutionReference reference,
+      ExecutableFlow exflow) throws ExecutorManagerException {
+      logger
+      .info(String
+        .format(
+          "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
+            exflow.getExecutionId(), reference.getNumErrors()));
+      // TODO: handle scenario where a high priority flow failing to get
+      // schedule can starve all others
+      queuedFlows.enqueue(exflow, reference);
+    }
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 10379ec..c50b0bc 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -18,6 +18,7 @@ package azkaban.executor;
 
 import java.io.IOException;
 import java.lang.Thread.State;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -88,6 +89,18 @@ public interface ExecutorManagerAdapter {
 
   public List<ExecutableFlow> getRunningFlows() throws IOException;
 
+  /**
+   * <pre>
+   * Returns All running with executors and queued flows
+   * Note, returns empty list if there isn't any running or queued flows
+   * </pre>
+   *
+   * @return
+   * @throws IOException
+   */
+  public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor()
+    throws IOException;
+
   public List<ExecutableFlow> getRecentlyFinishedFlows();
 
   public List<ExecutableFlow> getExecutableFlows(Project project,
@@ -177,9 +190,10 @@ public interface ExecutorManagerAdapter {
    * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_ENABLEMETRICS}<li>
    * <li>{@link azkaban.executor.ConnectorParams#STATS_SET_DISABLEMETRICS}<li>
    * </ul>
+   * @throws ExecutorManagerException
    */
-  public Map<String, Object> callExecutorStats(String action,
-      Pair<String, String>... params) throws IOException;
+  public Map<String, Object> callExecutorStats(int executorId, String action,
+    Pair<String, String>... param) throws IOException, ExecutorManagerException;
 
   public Map<String, Object> callExecutorJMX(String hostPort, String action,
       String mBean) throws IOException;
@@ -196,4 +210,54 @@ public interface ExecutorManagerAdapter {
 
   public Set<? extends String> getPrimaryServerHosts();
 
+  /**
+   * Returns a collection of all the active executors maintained by active
+   * executors
+   *
+   * @return
+   */
+  public Collection<Executor> getAllActiveExecutors();
+
+  /**
+   * <pre>
+   * Fetch executor from executors with a given executorId
+   * Note:
+   * 1. throws an Exception in case of a SQL issue
+   * 2. return null when no executor is found with the given executorId
+   * </pre>
+   *
+   * @throws ExecutorManagerException
+   *
+   */
+  public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
+
+  /**
+   * <pre>
+   * Setup activeExecutors using azkaban.properties and database executors
+   * Note:
+   * 1. If azkaban.use.multiple.executors is set true, this method will
+   *    load all active executors
+   * 2. In local mode, If a local executor is specified and it is missing from db,
+   *    this method add local executor as active in DB
+   * 3. In local mode, If a local executor is specified and it is marked inactive in db,
+   *    this method will convert local executor as active in DB
+   * </pre>
+   *
+   * @throws ExecutorManagerException
+   */
+   public void setupExecutors() throws ExecutorManagerException;
+
+   /**
+    * Enable flow dispatching in QueueProcessor
+    *
+    * @throws ExecutorManagerException
+    */
+   public void enableQueueProcessorThread() throws ExecutorManagerException;
+
+   /**
+    * Disable flow dispatching in QueueProcessor
+    *
+    * @throws ExecutorManagerException
+    */
+   public void disableQueueProcessorThread() throws ExecutorManagerException;
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 25e61c3..a588cca 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -21,12 +21,14 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.lang.annotation.Inherited;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +42,7 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
 import azkaban.database.AbstractJdbcLoader;
+import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.GZIPUtils;
@@ -177,6 +180,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+  /**
+   *
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorLoader#fetchQueuedFlows()
+   */
+  @Override
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+    throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchQueuedExecutableFlows flowHandler = new FetchQueuedExecutableFlows();
+
+    try {
+      List<Pair<ExecutionReference, ExecutableFlow>> flows =
+        runner.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW,
+          flowHandler);
+      return flows;
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error fetching active flows", e);
+    }
+  }
+
   @Override
   public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException {
@@ -380,12 +404,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       throws ExecutorManagerException {
     final String INSERT =
         "INSERT INTO active_executing_flows "
-            + "(exec_id, host, port, update_time) values (?,?,?,?)";
+            + "(exec_id, update_time) values (?,?)";
     QueryRunner runner = createQueryRunner();
 
     try {
-      runner.update(INSERT, reference.getExecId(), reference.getHost(),
-          reference.getPort(), reference.getUpdateTime());
+      runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
     } catch (SQLException e) {
       throw new ExecutorManagerException(
           "Error updating active flow reference " + reference.getExecId(), e);
@@ -773,6 +796,265 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     return connection;
   }
 
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
+   */
+  @Override
+  public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_ALL_EXECUTORS, executorHandler);
+      return executors;
+    } catch (Exception e) {
+      throw new ExecutorManagerException("Error fetching executors", e);
+    }
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchActiveExecutors()
+   */
+  @Override
+  public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_ACTIVE_EXECUTORS,
+          executorHandler);
+      return executors;
+    } catch (Exception e) {
+      throw new ExecutorManagerException("Error fetching active executors", e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchExecutor(java.lang.String, int)
+   */
+  @Override
+  public Executor fetchExecutor(String host, int port)
+    throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_HOST_PORT,
+          executorHandler, host, port);
+      if (executors.isEmpty()) {
+        return null;
+      } else {
+        return executors.get(0);
+      }
+    } catch (Exception e) {
+      throw new ExecutorManagerException(String.format(
+        "Error fetching executor %s:%d", host, port), e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchExecutor(int)
+   */
+  @Override
+  public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_EXECUTOR_BY_ID,
+          executorHandler, executorId);
+      if (executors.isEmpty()) {
+        return null;
+      } else {
+        return executors.get(0);
+      }
+    } catch (Exception e) {
+      throw new ExecutorManagerException(String.format(
+        "Error fetching executor with id: %d", executorId), e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#updateExecutor(int)
+   */
+  @Override
+  public void updateExecutor(Executor executor) throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE executors SET host=?, port=?, active=? where id=?";
+
+    QueryRunner runner = createQueryRunner();
+    try {
+      int rows =
+        runner.update(UPDATE, executor.getHost(), executor.getPort(),
+          executor.isActive(), executor.getId());
+      if (rows == 0) {
+        throw new ExecutorManagerException("No executor with id :"
+          + executor.getId());
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error inactivating executor "
+        + executor.getId(), e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#addExecutor(java.lang.String, int)
+   */
+  @Override
+  public Executor addExecutor(String host, int port)
+    throws ExecutorManagerException {
+    // verify, if executor already exists
+    Executor executor = fetchExecutor(host, port);
+    if (executor != null) {
+      throw new ExecutorManagerException(String.format(
+        "Executor %s:%d already exist", host, port));
+    }
+    // add new executor
+    addExecutorHelper(host, port);
+    // fetch newly added executor
+    executor = fetchExecutor(host, port);
+
+    return executor;
+  }
+
+  private void addExecutorHelper(String host, int port)
+    throws ExecutorManagerException {
+    final String INSERT = "INSERT INTO executors (host, port) values (?,?)";
+    QueryRunner runner = createQueryRunner();
+    try {
+      runner.update(INSERT, host, port);
+    } catch (SQLException e) {
+      throw new ExecutorManagerException(String.format("Error adding %s:%d ",
+        host, port), e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#postExecutorEvent(azkaban.executor.Executor,
+   *      azkaban.executor.ExecutorLogEvent.EventType, java.lang.String,
+   *      java.lang.String)
+   */
+  @Override
+  public void postExecutorEvent(Executor executor, EventType type, String user,
+    String message) throws ExecutorManagerException{
+    QueryRunner runner = createQueryRunner();
+
+    final String INSERT_PROJECT_EVENTS =
+      "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
+    Date updateDate = new Date();
+    try {
+      runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
+        updateDate, user, message);
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Failed to post executor event", e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#getExecutorEvents(azkaban.executor.Executor,
+   *      int, int)
+   */
+  @Override
+  public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+    int offset) throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+
+    ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
+    List<ExecutorLogEvent> events = null;
+    try {
+      events =
+        runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
+          logHandler, executor.getId(), num, offset);
+    } catch (SQLException e) {
+      throw new ExecutorManagerException(
+        "Failed to fetch events for executor id : " + executor.getId(), e);
+    }
+
+    return events;
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#assignExecutor(int, int)
+   */
+  @Override
+  public void assignExecutor(int executorId, int executionId)
+    throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE execution_flows SET executor_id=? where exec_id=?";
+
+    QueryRunner runner = createQueryRunner();
+    try {
+      Executor executor = fetchExecutor(executorId);
+      if (executor == null) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to assign non-existent executor Id: %d to execution : %d  ",
+          executorId, executionId));
+      }
+
+      int rows = runner.update(UPDATE, executorId, executionId);
+      if (rows == 0) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to assign executor Id: %d to non-existent execution : %d  ",
+          executorId, executionId));
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error updating executor id "
+        + executorId, e);
+    }
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   * @see azkaban.executor.ExecutorLoader#fetchExecutorByExecutionId(int)
+   */
+  @Override
+  public Executor fetchExecutorByExecutionId(int executionId)
+    throws ExecutorManagerException {
+    QueryRunner runner = createQueryRunner();
+    FetchExecutorHandler executorHandler = new FetchExecutorHandler();
+    Executor executor = null;
+    try {
+      List<Executor> executors =
+        runner.query(FetchExecutorHandler.FETCH_EXECUTION_EXECUTOR,
+          executorHandler, executionId);
+      if (executors.size() > 0) {
+        executor = executors.get(0);
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException(
+        "Error fetching executor for exec_id : " + executionId, e);
+    }
+    return executor;
+  }
+
   private static class LastInsertID implements ResultSetHandler<Long> {
     private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
 
@@ -975,13 +1257,80 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
+  /**
+   * JDBC ResultSetHandler to fetch queued executions
+   */
+  private static class FetchQueuedExecutableFlows implements
+    ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+    // Select queued unassigned flows
+    private static String FETCH_QUEUED_EXECUTABLE_FLOW =
+      "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, "
+        + " ax.update_time axUpdateTime FROM execution_flows ex"
+        + " INNER JOIN"
+        + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+        + " Where ex.executor_id is NULL";
+
+    @Override
+    public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs)
+      throws SQLException {
+      if (!rs.next()) {
+        return Collections
+          .<Pair<ExecutionReference, ExecutableFlow>> emptyList();
+      }
+
+      List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+        new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+      do {
+        int id = rs.getInt(1);
+        int encodingType = rs.getInt(2);
+        byte[] data = rs.getBytes(3);
+        long updateTime = rs.getLong(4);
+
+        if (data == null) {
+          logger.error("Found a flow with empty data blob exec_id: " + id);
+        } else {
+          EncodingType encType = EncodingType.fromInteger(encodingType);
+          Object flowObj;
+          try {
+            // Convoluted way to inflate strings. Should find common package or
+            // helper function.
+            if (encType == EncodingType.GZIP) {
+              // Decompress the sucker.
+              String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              String jsonString = new String(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+
+            ExecutableFlow exFlow =
+              ExecutableFlow.createExecutableFlowFromObject(flowObj);
+            ExecutionReference ref = new ExecutionReference(id);
+            ref.setUpdateTime(updateTime);
+
+            execFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref,
+              exFlow));
+          } catch (IOException e) {
+            throw new SQLException("Error retrieving flow data " + id, e);
+          }
+        }
+      } while (rs.next());
+
+      return execFlows;
+    }
+  }
+
   private static class FetchActiveExecutableFlows implements
       ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
+    // Select running and executor assigned flows
     private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
-        "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data "
-            + "flow_data, ax.host host, ax.port port, ax.update_time "
-            + "axUpdateTime " + "FROM execution_flows ex "
-            + "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
+      "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+        + "et.port port, ax.update_time axUpdateTime, et.id executorId, et.active executorStatus"
+        + " FROM execution_flows ex"
+        + " INNER JOIN "
+        + " active_executing_flows ax ON ex.exec_id = ax.exec_id"
+        + " INNER JOIN "
+        + " executors et ON ex.executor_id = et.id";
 
     @Override
     public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
@@ -1000,6 +1349,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         String host = rs.getString(4);
         int port = rs.getInt(5);
         long updateTime = rs.getLong(6);
+        int executorId = rs.getInt(7);
+        boolean executorStatus = rs.getBoolean(8);
 
         if (data == null) {
           execFlows.put(id, null);
@@ -1020,7 +1371,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
             ExecutableFlow exFlow =
                 ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            ExecutionReference ref = new ExecutionReference(id, host, port);
+            Executor executor = new Executor(executorId, host, port, executorStatus);
+            ExecutionReference ref = new ExecutionReference(id, executor);
             ref.setUpdateTime(updateTime);
 
             execFlows.put(id, new Pair<ExecutionReference, ExecutableFlow>(ref,
@@ -1106,6 +1458,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
         "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
     private static String NUM_JOB_EXECUTIONS =
         "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";
+    private static String FETCH_EXECUTOR_ID =
+        "SELECT executor_id FROM execution_flows WHERE exec_id=?";
 
     @Override
     public Integer handle(ResultSet rs) throws SQLException {
@@ -1134,4 +1488,98 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
 
     return updateNum;
   }
+
+  /**
+   * JDBC ResultSetHandler to fetch records from executors table
+   */
+  private static class FetchExecutorHandler implements
+    ResultSetHandler<List<Executor>> {
+    private static String FETCH_ALL_EXECUTORS =
+      "SELECT id, host, port, active FROM executors";
+    private static String FETCH_ACTIVE_EXECUTORS =
+      "SELECT id, host, port, active FROM executors where active=true";
+    private static String FETCH_EXECUTOR_BY_ID =
+      "SELECT id, host, port, active FROM executors where id=?";
+    private static String FETCH_EXECUTOR_BY_HOST_PORT =
+      "SELECT id, host, port, active FROM executors where host=? AND port=?";
+    private static String FETCH_EXECUTION_EXECUTOR =
+      "SELECT ex.id, ex.host, ex.port, ex.active FROM "
+        + " executors ex INNER JOIN execution_flows ef "
+        + "on ex.id = ef.executor_id  where exec_id=?";
+
+    @Override
+    public List<Executor> handle(ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.<Executor> emptyList();
+      }
+
+      List<Executor> executors = new ArrayList<Executor>();
+      do {
+        int id = rs.getInt(1);
+        String host = rs.getString(2);
+        int port = rs.getInt(3);
+        boolean active = rs.getBoolean(4);
+        Executor executor = new Executor(id, host, port, active);
+        executors.add(executor);
+      } while (rs.next());
+
+      return executors;
+    }
+  }
+
+  /**
+   * JDBC ResultSetHandler to fetch records from executor_events table
+   */
+  private static class ExecutorLogsResultHandler implements
+    ResultSetHandler<List<ExecutorLogEvent>> {
+    private static String SELECT_EXECUTOR_EVENTS_ORDER =
+      "SELECT executor_id, event_type, event_time, username, message FROM executor_events "
+        + " WHERE executor_id=? ORDER BY event_time LIMIT ? OFFSET ?";
+
+    @Override
+    public List<ExecutorLogEvent> handle(ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.<ExecutorLogEvent> emptyList();
+      }
+
+      ArrayList<ExecutorLogEvent> events = new ArrayList<ExecutorLogEvent>();
+      do {
+        int executorId = rs.getInt(1);
+        int eventType = rs.getInt(2);
+        Date eventTime = rs.getDate(3);
+        String username = rs.getString(4);
+        String message = rs.getString(5);
+
+        ExecutorLogEvent event =
+          new ExecutorLogEvent(executorId, username, eventTime,
+            EventType.fromInteger(eventType), message);
+        events.add(event);
+      } while (rs.next());
+
+      return events;
+    }
+  }
+
+  /**
+   *
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorLoader#unassignExecutor(int)
+   */
+  @Override
+  public void unassignExecutor(int executionId) throws ExecutorManagerException {
+    final String UPDATE =
+      "UPDATE execution_flows SET executor_id=NULL where exec_id=?";
+
+    QueryRunner runner = createQueryRunner();
+    try {
+      int rows = runner.update(UPDATE, executionId);
+      if (rows == 0) {
+        throw new ExecutorManagerException(String.format(
+          "Failed to unassign executor for execution : %d  ", executionId));
+      }
+    } catch (SQLException e) {
+      throw new ExecutorManagerException("Error updating execution id "
+        + executionId, e);
+    }
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
new file mode 100644
index 0000000..641ffae
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/QueuedExecutions.java
@@ -0,0 +1,201 @@
+package azkaban.executor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * <pre>
+ * Composite data structure to represent non-dispatched flows in webserver.
+ * This data structure wraps a blocking queue and a concurrent hashmap.
+ * </pre>
+ */
+public class QueuedExecutions {
+  private static Logger logger = Logger.getLogger(QueuedExecutions.class);
+  final long capacity;
+
+  /* map to easily access queued flows */
+  final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap;
+  /* actual queue */
+  final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlowList;
+
+  public QueuedExecutions(long capacity) {
+    this.capacity = capacity;
+    queuedFlowMap =
+      new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+    queuedFlowList =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+  }
+
+  /**
+   * Wraps BoundedQueue Take method to have a corresponding update in
+   * queuedFlowMap lookup table
+   *
+   * @return
+   * @throws InterruptedException
+   */
+  public Pair<ExecutionReference, ExecutableFlow> fetchHead()
+    throws InterruptedException {
+    Pair<ExecutionReference, ExecutableFlow> pair = queuedFlowList.take();
+    if (pair != null && pair.getFirst() != null) {
+      queuedFlowMap.remove(pair.getFirst().getExecId());
+    }
+    return pair;
+  }
+
+  /**
+   * Helper method to have a single point of deletion in the queued flows
+   *
+   * @param executionId
+   */
+  public void dequeue(int executionId) {
+    if (queuedFlowMap.containsKey(executionId)) {
+      queuedFlowList.remove(queuedFlowMap.get(executionId));
+      queuedFlowMap.remove(executionId);
+    }
+  }
+
+  /**
+   * <pre>
+   * Helper method to have a single point of insertion in the queued flows
+   *
+   * @param exflow
+   *          flow to be enqueued
+   * @param ref
+   *          reference to be enqueued
+   * @throws ExecutorManagerException
+   *           case 1: if blocking queue put method fails due to
+   *           InterruptedException
+   *           case 2: if there already an element with
+   *           same execution Id
+   * </pre>
+   */
+  public void enqueue(ExecutableFlow exflow, ExecutionReference ref)
+    throws ExecutorManagerException {
+    if (hasExecution(exflow.getExecutionId())) {
+      String errMsg = "Flow already in queue " + exflow.getExecutionId();
+      throw new ExecutorManagerException(errMsg);
+    }
+
+    Pair<ExecutionReference, ExecutableFlow> pair =
+      new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
+    try {
+      queuedFlowMap.put(exflow.getExecutionId(), pair);
+      queuedFlowList.put(pair);
+    } catch (InterruptedException e) {
+      String errMsg = "Failed to insert flow " + exflow.getExecutionId();
+      logger.error(errMsg, e);
+      throw new ExecutorManagerException(errMsg);
+    }
+  }
+
+  /**
+   * <pre>
+   * Enqueues all the elements of a collection
+   *
+   * @param collection
+   *
+   * @throws ExecutorManagerException
+   *           case 1: if blocking queue put method fails due to
+   *           InterruptedException
+   *           case 2: if there already an element with
+   *           same execution Id
+   * </pre>
+   */
+  public void enqueueAll(
+    Collection<Pair<ExecutionReference, ExecutableFlow>> collection)
+    throws ExecutorManagerException {
+    for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
+      enqueue(pair.getSecond(), pair.getFirst());
+    }
+  }
+
+  /**
+   * Returns a read only collection of all the queued (flows, reference) pairs
+   *
+   * @return
+   */
+  public Collection<Pair<ExecutionReference, ExecutableFlow>> getAllEntries() {
+    return Collections.unmodifiableCollection(queuedFlowMap.values());
+  }
+
+  /**
+   * Checks if an execution is queued or not
+   *
+   * @param executionId
+   * @return
+   */
+  public boolean hasExecution(int executionId) {
+    return queuedFlowMap.containsKey(executionId);
+  }
+
+  /**
+   * Fetch flow for an execution. Returns null, if execution not in queue
+   *
+   * @param executionId
+   * @return
+   */
+  public ExecutableFlow getFlow(int executionId) {
+    if (hasExecution(executionId)) {
+      return queuedFlowMap.get(executionId).getSecond();
+    }
+    return null;
+  }
+
+  /**
+   * Fetch Activereference for an execution. Returns null, if execution not in
+   * queue
+   *
+   * @param executionId
+   * @return
+   */
+  public ExecutionReference getReference(int executionId) {
+    if (hasExecution(executionId)) {
+      return queuedFlowMap.get(executionId).getFirst();
+    }
+    return null;
+  }
+
+  /**
+   * Size of the queue
+   *
+   * @return
+   */
+  public long size() {
+    return queuedFlowList.size();
+  }
+
+  /**
+   * Verify, if queue is full as per initialized capacity
+   *
+   * @return
+   */
+  public boolean isFull() {
+    return size() >= capacity;
+  }
+
+  /**
+   * Verify, if queue is empty or not
+   *
+   * @return
+   */
+  public boolean isEmpty() {
+    return queuedFlowList.isEmpty() && queuedFlowMap.isEmpty();
+  }
+
+  /**
+   * Empties queue by dequeuing all the elements
+   */
+  public void clear() {
+    for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowMap.values()) {
+      dequeue(pair.getFirst().getExecId());
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
new file mode 100644
index 0000000..f83788f
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+/**
+ * <pre>
+ *  Abstract class for a candidate comparator.
+ *  this class contains implementation of most of the core logics. Implementing classes is expected only to
+ *  register factor comparators using the provided register function.
+ * <pre>
+ */
+public abstract class CandidateComparator<T> implements Comparator<T> {
+  protected static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+  // internal repository of the registered comparators .
+  private Map<String,FactorComparator<T>> factorComparatorList =
+      new ConcurrentHashMap<String,FactorComparator<T>>();
+
+  /** gets the name of the current implementation of the candidate comparator.
+   * @returns : name of the comparator.
+   * */
+  public abstract String getName();
+
+  /** tieBreak method which will kick in when the comparator list generated an equality result for
+   *  both sides. the tieBreak method will try best to make sure a stable result is returned.
+   * */
+  protected boolean tieBreak(T object1, T object2){
+    if (null == object2) return true;
+    if (null == object1) return false;
+    return object1.hashCode() >= object2.hashCode();
+  }
+
+  /** function to register a factorComparator to the internal Map for future reference.
+   * @param factorComparator : the comparator object to be registered.
+   * @throws IllegalArgumentException
+   * */
+  protected void registerFactorComparator(FactorComparator<T> comparator){
+      if (null == comparator ||
+          Integer.MAX_VALUE - this.getTotalWeight() < comparator.getWeight() ) {
+        throw new IllegalArgumentException("unable to register comparator."+
+          " The passed comparator is null or has an invalid weight value.");
+      }
+
+      // add or replace the Comparator.
+      this.factorComparatorList.put(comparator.getFactorName(),comparator);
+      logger.debug(String.format("Factor comparator added for '%s'. Weight = '%s'",
+          comparator.getFactorName(), comparator.getWeight()));
+  }
+
+  /** function returns the total weight of the registered comparators.
+   * @return the value of total weight.
+   * */
+  public int getTotalWeight(){
+    int totalWeight = 0 ;
+
+    // save out a copy of the values as HashMap.values() takes o(n) to return the value.
+    Collection<FactorComparator<T>> allValues = this.factorComparatorList.values();
+    for (FactorComparator<T> item : allValues){
+      if (item != null){
+        totalWeight += item.getWeight();
+      }
+    }
+
+    return totalWeight;
+  }
+
+  /**
+   * <pre>
+   * function to actually calculate the scores for the two objects that are being compared.
+   *  the comparison follows the following logic -
+   *  1. if both objects are equal return 0 score for both.
+   *  2. if one side is null, the other side gets all the score.
+   *  3. if both sides are non-null value, both values will be passed to all the registered FactorComparators
+   *     each factor comparator will generate a result based off it sole logic the weight of the comparator will be
+   *     added to the wining side, if equal, no value will be added to either side.
+   *  4. final result will be returned in a Pair container.
+   *
+   * </pre>
+   * @param object1  the first  object (left side)  to be compared.
+   * @param object2  the second object (right side) to be compared.
+   * @return a pair structure contains the score for both sides.
+   * */
+  public Pair<Integer,Integer> getComparisonScore(T object1, T object2){
+    logger.debug(String.format("start comparing '%s' with '%s',  total weight = %s ",
+        object1 == null ? "(null)" : object1.toString(),
+        object2 == null ? "(null)" : object2.toString(),
+        this.getTotalWeight()));
+
+    int result1 = 0 ;
+    int result2 = 0 ;
+
+    // short cut if object equals.
+    if (object1 ==  object2){
+      logger.debug("[Comparator] same object.");
+    } else
+    // left side is null.
+    if (object1 == null){
+      logger.debug("[Comparator] left side is null, right side gets total weight.");
+      result2 = this.getTotalWeight();
+    } else
+    // right side is null.
+    if (object2 == null){
+      logger.debug("[Comparator] right side is null, left side gets total weight.");
+      result1 = this.getTotalWeight();
+    } else
+    // both side is not null,put them thru the full loop
+    {
+      Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
+      for (FactorComparator<T> comparator :comparatorList){
+        int result = comparator.compare(object1, object2);
+        result1  = result1 + (result > 0 ? comparator.getWeight() : 0);
+        result2  = result2 + (result < 0 ? comparator.getWeight() : 0);
+        logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
+            comparator.getFactorName(), result, result1, result2));
+      }
+    }
+    // in case of same score, use tie-breaker to stabilize the result.
+    if (result1 == result2){
+      boolean result = this.tieBreak(object1, object2);
+      logger.debug("[TieBreaker] TieBreaker chose " +
+      (result? String.format("left side (%s)",  null== object1 ? "null": object1.toString()) :
+               String.format("right side (%s)", null== object2 ? "null": object2.toString()) ));
+      if (result) result1++; else result2++;
+    }
+
+    logger.debug(String.format("Result : %s vs %s ",result1,result2));
+    return new Pair<Integer,Integer>(result1,result2);
+  }
+
+  @Override
+  public int compare(T o1, T o2) {
+    Pair<Integer,Integer> result = this.getComparisonScore(o1,o2);
+    return result.getFirst() == result.getSecond() ? 0 :
+                                result.getFirst() > result.getSecond() ? 1 : -1;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
new file mode 100644
index 0000000..f927a2a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+
+/** Abstract class for a candidate filter.
+ *  this class contains implementation of most of the core logics. Implementing classes is expected only to
+ *  register filters using the provided register function.
+ */
+public abstract class CandidateFilter<T,V>  {
+  protected static Logger logger = Logger.getLogger(CandidateFilter.class);
+
+  // internal repository of the registered filters .
+  private Map<String,FactorFilter<T,V>> factorFilterList =
+      new ConcurrentHashMap<String,FactorFilter<T,V>>();
+
+  /** gets the name of the current implementation of the candidate filter.
+   * @return : name of the filter.
+   * */
+  public abstract String getName();
+
+  /** function to register a factorFilter to the internal Map for future reference.
+   * @param factorfilter : the Filter object to be registered.
+   * @throws IllegalArgumentException
+   * */
+  protected void registerFactorFilter(FactorFilter<T,V> filter){
+      if (null == filter ) {
+        throw new IllegalArgumentException("unable to register factor filter. " +
+                  "The passed comaractor is null or has an invalid weight value.");
+      }
+
+      // add or replace the filter.
+      this.factorFilterList.put(filter.getFactorName(),filter);
+      logger.debug(String.format("Factor filter added for '%s'.",
+          filter.getFactorName()));
+  }
+
+  /** function to analyze the target item according to the reference object to decide whether the item should be filtered.
+   * @param filteringTarget:   object to be checked.
+   * @param referencingObject: object which contains statistics based on which a decision is made whether
+   *                      the object being checked need to be filtered or not.
+   * @return true if the check passed, false if check failed, which means the item need to be filtered.
+   * */
+  public boolean filterTarget(T filteringTarget, V referencingObject){
+    logger.debug(String.format("start filtering '%s' with factor filter for '%s'",
+        filteringTarget == null ? "(null)" : filteringTarget.toString(),
+        this.getName()));
+
+    Collection<FactorFilter<T,V>> filterList = this.factorFilterList.values();
+    boolean result = true;
+    for (FactorFilter<T,V> filter : filterList){
+      result &= filter.filterTarget(filteringTarget,referencingObject);
+      logger.debug(String.format("[Factor: %s] filter result : %s ",
+          filter.getFactorName(), result));
+      if (!result){
+        break;
+      }
+    }
+    logger.debug(String.format("Final filtering result : %s ",result));
+    return result;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
new file mode 100644
index 0000000..8fa91d0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.log4j.Logger;
+
+/** Implementation of the CandidateSelector.
+ *  @param K executor object type.
+ *  @param V dispatching object type.
+ * */
+public class CandidateSelector<K extends Comparable<K>, V> implements Selector<K, V> {
+  private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+  private CandidateFilter<K,V> filter;
+  private CandidateComparator<K> comparator;
+
+  /**constructor of the class.
+   * @param filter CandidateFilter object to be used to perform the candidate filtering.
+   * @param comparator CandidateComparator object to be used to find the best suit candidate from the filtered list.
+   * */
+  public CandidateSelector(CandidateFilter<K,V> filter,
+      CandidateComparator<K> comparator){
+    this.filter = filter;
+    this.comparator = comparator;
+  }
+
+  @Override
+  public K getBest(Collection<K> candidateList, V dispatchingObject) {
+
+     // shortcut if the candidateList is empty.
+     if ( null == candidateList || candidateList.size() == 0){
+       logger.error("failed to getNext candidate as the passed candidateList is null or empty.");
+       return null;
+     }
+
+     logger.debug("start candidate selection logic.");
+     logger.debug(String.format("candidate count before filtering: %s", candidateList.size()));
+
+     // to keep the input untouched, we will form up a new list based off the filtering result.
+     Collection<K> filteredList = new ArrayList<K>();
+
+     if (null != this.filter){
+       for (K candidateInfo : candidateList){
+         if (filter.filterTarget(candidateInfo,dispatchingObject)){
+           filteredList.add(candidateInfo);
+         }
+       }
+     } else{
+       filteredList = candidateList;
+       logger.debug("skipping the candidate filtering as the filter object is not specifed.");
+     }
+
+     logger.debug(String.format("candidate count after filtering: %s", filteredList.size()));
+     if (filteredList.size() == 0){
+       logger.debug("failed to select candidate as the filtered candidate list is empty.");
+       return null;
+     }
+
+     if (null == comparator){
+       logger.debug("candidate comparator is not specified, default hash code comparator class will be used.");
+     }
+
+     // final work - find the best candidate from the filtered list.
+     K executor = Collections.max(filteredList, comparator);
+     logger.debug(String.format("candidate selected %s",
+         null == executor ? "(null)" : executor.toString()));
+     return executor;
+  }
+
+  @Override
+  public String getName() {
+    return "CandidateSelector";
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
new file mode 100644
index 0000000..978bcb9
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorInfo;
+
+
+/**
+ * De-normalized version of the CandidateComparator, which also contains the implementation of the factor comparators.
+ * */
+public class ExecutorComparator extends CandidateComparator<Executor> {
+  private static Map<String, ComparatorCreator> comparatorCreatorRepository = null;
+
+  /**
+   * Gets the name list of all available comparators.
+   * @return the list of the names.
+   * */
+  public static Set<String> getAvailableComparatorNames(){
+    return comparatorCreatorRepository.keySet();
+  }
+
+  // factor comparator names
+  private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
+  private static final String MEMORY_COMPARATOR_NAME = "Memory";
+  private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
+  private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
+
+  /**
+   * static initializer of the class.
+   * We will build the filter repository here.
+   * when a new comparator is added, please do remember to register it here.
+   * */
+  static {
+    comparatorCreatorRepository = new HashMap<String, ComparatorCreator>();
+
+    // register the creator for number of assigned flow comparator.
+    comparatorCreatorRepository.put(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, new ComparatorCreator(){
+      public FactorComparator<Executor> create(int weight) { return getNumberOfAssignedFlowComparator(weight); }});
+
+    // register the creator for memory comparator.
+    comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
+      public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
+
+    // register the creator for last dispatched time comparator.
+    comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
+      public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
+
+    // register the creator for CPU Usage comparator.
+    comparatorCreatorRepository.put(CPUUSAGE_COMPARATOR_NAME, new ComparatorCreator(){
+      public FactorComparator<Executor> create(int weight) { return getCpuUsageComparator(weight); }});
+  }
+
+
+  /**
+   * constructor of the ExecutorComparator.
+   * @param comparatorList   the list of comparator, plus its weight information to be registered,
+   *  the parameter must be a not-empty and valid list object.
+   * */
+  public ExecutorComparator(Map<String,Integer> comparatorList) {
+    if (null == comparatorList|| comparatorList.size() == 0){
+      throw new IllegalArgumentException("failed to initialize executor comparator" +
+                                         "as the passed comparator list is invalid or empty.");
+    }
+
+    // register the comparators, we will now throw here if the weight is invalid, it is handled in the super.
+    for (Entry<String,Integer> entry : comparatorList.entrySet()){
+      if (comparatorCreatorRepository.containsKey(entry.getKey())){
+        this.registerFactorComparator(comparatorCreatorRepository.
+            get(entry.getKey()).
+            create(entry.getValue()));
+      } else {
+        throw new IllegalArgumentException(String.format("failed to initialize executor comparator " +
+                                        "as the comparator implementation for requested factor '%s' doesn't exist.",
+                                        entry.getKey()));
+      }
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "ExecutorComparator";
+  }
+
+  private interface ComparatorCreator{
+    FactorComparator<Executor> create(int weight);
+  }
+
+  /**<pre>
+   * helper function that does the object  on two statistics, comparator can leverage this function to provide
+   * shortcuts if   the statistics object is missing from one or both sides of the executors.
+   * </pre>
+   * @param stat1   the first statistics  object to be checked .
+   * @param stat2   the second statistics object to be checked.
+   * @param caller  the name of the calling function, for logging purpose.
+   * @param result  result Integer to pass out the result in case the statistics are not both valid.
+   * @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
+   *         false otherwise.
+   * */
+  private static boolean statisticsObjectCheck(ExecutorInfo statisticsObj1,
+                                               ExecutorInfo statisticsObj2, String caller, Integer result){
+    result = 0 ;
+    // both doesn't expose the info
+    if (null == statisticsObj1 && null == statisticsObj2){
+      logger.debug(String.format("%s : neither of the executors exposed statistics info.",
+          caller));
+      return true;
+    }
+
+    //right side doesn't expose the info.
+    if (null == statisticsObj2 ){
+        logger.debug(String.format("%s : choosing left side and the right side executor doesn't expose statistics info",
+            caller));
+        result = 1;
+        return true;
+    }
+
+    //left side doesn't expose the info.
+    if (null == statisticsObj1 ){
+      logger.debug(String.format("%s : choosing right side and the left side executor doesn't expose statistics info",
+          caller));
+      result = -1;
+      return true;
+      }
+
+    // both not null
+    return false;
+  }
+
+  /**
+   * function defines the number of assigned flow comparator.
+   * @param weight weight of the comparator.
+   * */
+  private static FactorComparator<Executor> getNumberOfAssignedFlowComparator(int weight){
+    return FactorComparator.create(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        ExecutorInfo stat1 = o1.getExecutorInfo();
+        ExecutorInfo stat2 = o2.getExecutorInfo();
+
+        Integer result = 0;
+        if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
+          return result;
+        }
+        return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
+      }});
+  }
+
+  /**
+   * function defines the cpuUsage comparator.
+   * @param weight weight of the comparator.
+   * @return
+   * */
+  private static FactorComparator<Executor> getCpuUsageComparator(int weight){
+    return FactorComparator.create(CPUUSAGE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        ExecutorInfo stat1 = o1.getExecutorInfo();
+        ExecutorInfo stat2 = o2.getExecutorInfo();
+
+        int result = 0;
+        if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
+          return result;
+        }
+
+        // CPU usage , the lesser the value is, the better.
+        return ((Double)stat2.getCpuUsage()).compareTo(stat1.getCpuUsage());
+      }});
+  }
+
+
+  /**
+   * function defines the last dispatched time comparator.
+   * @param weight weight of the comparator.
+   * @return
+   * */
+  private static FactorComparator<Executor> getLstDispatchedTimeComparator(int weight){
+    return FactorComparator.create(LSTDISPATCHED_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        ExecutorInfo stat1 = o1.getExecutorInfo();
+        ExecutorInfo stat2 = o2.getExecutorInfo();
+
+        int result = 0;
+        if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
+          return result;
+        }
+        // Note: an earlier date time indicates higher weight.
+        return ((Long)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
+      }});
+  }
+
+
+  /**<pre>
+   * function defines the Memory comparator.
+   * Note: comparator firstly take the absolute value of the remaining memory, if both sides have the same value,
+   *       it go further to check the percent of the remaining memory.
+   * </pre>
+   * @param weight weight of the comparator.
+
+   * @return
+   * */
+  private static FactorComparator<Executor> getMemoryComparator(int weight){
+    return FactorComparator.create(MEMORY_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+       ExecutorInfo stat1 = o1.getExecutorInfo();
+       ExecutorInfo stat2 = o2.getExecutorInfo();
+
+       int result = 0;
+       if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
+         return result;
+       }
+
+       if (stat1.getRemainingMemoryInMB() != stat2.getRemainingMemoryInMB()){
+         return stat1.getRemainingMemoryInMB() > stat2.getRemainingMemoryInMB() ? 1:-1;
+       }
+
+       return Double.compare(stat1.getRemainingMemoryPercent(), stat2.getRemainingMemoryPercent());
+      }});
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
new file mode 100644
index 0000000..a47bc16
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorInfo;
+
+/**
+ * De-normalized version of the candidateFilter, which also contains the implementation of the factor filters.
+ * */
+public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFlow> {
+  private static Map<String, FactorFilter<Executor, ExecutableFlow>> filterRepository = null;
+
+  /**
+   * Gets the name list of all available filters.
+   * @return the list of the names.
+   * */
+  public static Set<String> getAvailableFilterNames(){
+    return filterRepository.keySet();
+  }
+
+
+  // factor filter names.
+  private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
+  private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimumFreeMemory";
+  private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";
+
+  /**<pre>
+   * static initializer of the class.
+   * We will build the filter repository here.
+   * when a new filter is added, please do remember to register it here.
+   * </pre>
+   * */
+  static {
+    filterRepository = new HashMap<String, FactorFilter<Executor, ExecutableFlow>>();
+    filterRepository.put(STATICREMAININGFLOWSIZE_FILTER_NAME, getStaticRemainingFlowSizeFilter());
+    filterRepository.put(MINIMUMFREEMEMORY_FILTER_NAME, getMinimumReservedMemoryFilter());
+    filterRepository.put(CPUSTATUS_FILTER_NAME, getCpuStatusFilter());
+  }
+
+  /**
+   * constructor of the ExecutorFilter.
+   * @param filterList   the list of filter to be registered, the parameter must be a not-empty and valid list object.
+   * */
+  public ExecutorFilter(Collection<String> filterList) {
+    // shortcut if the filter list is invalid. A little bit ugly to have to throw in constructor.
+    if (null == filterList || filterList.size() == 0){
+      logger.error("failed to initialize executor filter as the passed filter list is invalid or empty.");
+      throw new IllegalArgumentException("filterList");
+    }
+
+    // register the filters according to the list.
+    for (String filterName : filterList){
+      if (filterRepository.containsKey(filterName)){
+        this.registerFactorFilter(filterRepository.get(filterName));
+      } else {
+        logger.error(String.format("failed to initialize executor filter "+
+                                   "as the filter implementation for requested factor '%s' doesn't exist.",
+                                   filterName));
+        throw new IllegalArgumentException("filterList");
+      }
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "ExecutorFilter";
+  }
+
+  /**<pre>
+   * function to register the static remaining flow size filter.
+   * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+   *        Coming for the passed flow.
+   *        Ideally this filter will make sure only the executor hasn't reached the Max allowed # of executing flows.
+   *</pre>
+   * */
+  private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
+    return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+      public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+        if (null == filteringTarget){
+          logger.debug(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
+          return false;
+        }
+
+        ExecutorInfo stats = filteringTarget.getExecutorInfo();
+        if (null == stats) {
+          logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
+              STATICREMAININGFLOWSIZE_FILTER_NAME,
+              filteringTarget.toString()));
+          return false;
+        }
+        return stats.getRemainingFlowCapacity() > 0 ;
+       }
+    });
+  }
+
+  /**<pre>
+   * function to register the static Minimum Reserved Memory filter.
+   * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+   *        Coming for the passed flow.
+   *        This filter will filter out any executors that has the remaining  memory below 6G
+   *</pre>
+   * */
+  private static FactorFilter<Executor, ExecutableFlow> getMinimumReservedMemoryFilter(){
+    return FactorFilter.create(MINIMUMFREEMEMORY_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+      private static final int MINIMUM_FREE_MEMORY = 6 * 1024;
+      public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+        if (null == filteringTarget){
+          logger.debug(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
+          return false;
+        }
+
+        ExecutorInfo stats = filteringTarget.getExecutorInfo();
+        if (null == stats) {
+          logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
+              MINIMUMFREEMEMORY_FILTER_NAME,
+              filteringTarget.toString()));
+          return false;
+        }
+        return stats.getRemainingMemoryInMB() > MINIMUM_FREE_MEMORY ;
+       }
+    });
+  }
+
+
+  /**
+   * <pre>
+   * function to register the static Minimum Reserved Memory filter.
+   * NOTE :  this is a static filter which means the filter will be filtering based on the system standard which
+   *        is not Coming for the passed flow.
+   *        This filter will filter out any executors that the current CPU usage exceed 95%
+   * </pre>
+   * */
+  private static FactorFilter<Executor, ExecutableFlow> getCpuStatusFilter(){
+    return FactorFilter.create(CPUSTATUS_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+      private static final int MAX_CPU_CURRENT_USAGE = 95;
+      public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+        if (null == filteringTarget){
+          logger.debug(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
+          return false;
+        }
+
+        ExecutorInfo stats = filteringTarget.getExecutorInfo();
+        if (null == stats) {
+          logger.debug(String.format("%s : filtering out %s as it's stats is unavailable.",
+              MINIMUMFREEMEMORY_FILTER_NAME,
+              filteringTarget.toString()));
+          return false;
+        }
+        return stats.getCpuUsage() < MAX_CPU_CURRENT_USAGE ;
+       }
+    });
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
new file mode 100644
index 0000000..2405c28
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+
+/**<pre>
+ * Executor selector class implementation.
+ * NOTE: This class is a de-generalized version of the CandidateSelector, which provides a
+ *       clean and convenient constructor to take in filter and comparator name list and build
+ *       the instance from that.
+ *</pre>
+ * */
+public class ExecutorSelector extends CandidateSelector<Executor, ExecutableFlow> {
+
+  /**
+   * Contractor of the class.
+   * @param filterList      name list of the filters to be registered,
+   *                        filter feature will be disabled if a null value is passed.
+   * @param comparatorList  name/weight pair list of the comparators to be registered ,
+   *                        again comparator feature is disabled if a null value is passed.
+   * */
+  public ExecutorSelector(Collection<String> filterList, Map<String,Integer> comparatorList) {
+    super(null == filterList || filterList.isEmpty() ?         null : new ExecutorFilter(filterList),
+          null == comparatorList || comparatorList.isEmpty() ? null : new ExecutorComparator(comparatorList));
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
new file mode 100644
index 0000000..6d4d2e0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Comparator;
+import org.apache.log4j.Logger;
+
+/** wrapper class for a factor comparator .
+ *@param T: the type of the objects to be compared.
+ */
+public final class FactorComparator<T>{
+  private static Logger logger = Logger.getLogger(CandidateComparator.class);
+
+  private String factorName;
+  private int weight;
+  private Comparator<T> comparator;
+
+  /** private constructor of the class. User will create the instance of the class by calling the static
+   *  method provided below.
+   * @param factorName : the factor name .
+   * @param weight : the weight of the comparator.
+   * @param comparator : function to be provided by user on how the comparison should be made.
+   * */
+  private FactorComparator(String factorName, int weight, Comparator<T> comparator){
+    this.factorName = factorName;
+    this.weight = weight;
+    this.comparator = comparator;
+  }
+
+  /** static function to generate an instance of the class.
+   *  refer to the constructor for the param definitions.
+   * */
+  public static <T> FactorComparator<T> create(String factorName, int weight, Comparator<T> comparator){
+
+    if (null == factorName || factorName.length() == 0 || weight < 0 || null == comparator){
+      logger.error("failed to create instance of FactorComparator, at least one of the input paramters are invalid");
+      return null;
+    }
+
+    return new FactorComparator<T>(factorName,weight,comparator);
+  }
+
+  // function to return the factor name.
+  public String getFactorName(){
+    return this.factorName;
+  }
+
+  // function to return the weight value.
+  public int getWeight(){
+    return this.weight;
+  }
+
+  // function to return the weight value.
+  public void updateWeight(int value){
+    this.weight = value;
+  }
+
+  // the actual compare function, which will leverage the user defined function.
+  public int compare(T object1, T object2){
+    return this.comparator.compare(object1, object2);
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
new file mode 100644
index 0000000..04b04b7
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import org.apache.log4j.Logger;
+
+/** wrapper class for a factor Filter .
+ *@param T: the type of the objects to be compared.
+ *@param V: the type of the object to be used for filtering.
+ */
+public final class FactorFilter<T,V>{
+  private static Logger logger = Logger.getLogger(FactorFilter.class);
+
+  private String factorName;
+  private Filter<T,V> filter;
+
+  /** private constructor of the class. User will create the instance of the class by calling the static
+   *  method provided below.
+   * @param factorName : the factor name .
+   * @param filter : user defined function specifying how the filtering should be implemented.
+   * */
+  private FactorFilter(String factorName, Filter<T,V> filter){
+    this.factorName = factorName;
+    this.filter = filter;
+  }
+
+  /** static function to generate an instance of the class.
+   *  refer to the constructor for the param definitions.
+   * */
+  public static <T,V> FactorFilter<T,V> create(String factorName, Filter<T,V> filter){
+
+    if (null == factorName || factorName.length() == 0 || null == filter){
+      logger.error("failed to create instance of FactorFilter, at least one of the input paramters are invalid");
+      return null;
+    }
+
+    return new FactorFilter<T,V>(factorName,filter);
+  }
+
+  // function to return the factor name.
+  public String getFactorName(){
+    return this.factorName;
+  }
+
+  // the actual check function, which will leverage the logic defined by user.
+  public boolean filterTarget(T filteringTarget, V referencingObject){
+    return this.filter.filterTarget(filteringTarget, referencingObject);
+  }
+
+  // interface of the filter.
+  public interface Filter<T,V>{
+
+    /**function to analyze the target item according to the reference object to decide whether the item should be filtered.
+     * @param filteringTarget   object to be checked.
+     * @param referencingObject object which contains statistics based on which a decision is made whether
+     *                      the object being checked need to be filtered or not.
+     * @return true if the check passed, false if check failed, which means the item need to be filtered.
+     * */
+    public boolean filterTarget(T filteringTarget, V referencingObject);
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
new file mode 100644
index 0000000..a56b41a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Collection;
+
+
+/**<pre>
+ *  Definition of the selector interface.
+ *  an implementation of the selector interface provides the functionality
+ *  to return a candidate from the candidateList that suits best for the dispatchingObject.
+ * </pre>
+ *  @param K : type of the candidate.
+ *  @param V : type of the dispatching object.
+ */
+public interface Selector <K extends Comparable<K>,V> {
+
+  /** Function returns the next best suit candidate from the candidateList for the dispatching object.
+   *  @param  candidateList : List of the candidates to select from .
+   *  @param  dispatchingObject : the object to be dispatched .
+   *  @return candidate from the candidate list that suits best for the dispatching object.
+   * */
+  public K getBest(Collection<K> candidateList, V dispatchingObject);
+
+  /** Function returns the name of the current Dispatcher
+   *  @return name of the dispatcher.
+   * */
+  public String getName();
+}
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index 08e5534..d459ae9 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -62,4 +62,35 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
   public String getRunningFlows() {
     return manager.getRunningFlowIds();
   }
+
+  @Override
+  public boolean isQueueProcessorActive() {
+    return manager.isQueueProcessorThreadActive();
+  }
+
+  @Override
+  public String getQueuedFlows() {
+    return manager.getQueuedFlowIds();
+  }
+
+  @Override
+  public String getQueueProcessorThreadState() {
+    return manager.getQueueProcessorThreadState().toString();
+  }
+
+  @Override
+  public List<String> getAvailableExecutorComparatorNames() {
+    return new ArrayList<String>(manager.getAvailableExecutorComparatorNames());
+  }
+
+  @Override
+  public List<String> getAvailableExecutorFilterNames() {
+    return new ArrayList<String>(manager.getAvailableExecutorFilterNames());
+  }
+
+  @Override
+  public long getLastSuccessfulExecutorInfoRefresh() {
+    return manager.getLastSuccessfulExecutorInfoRefresh();
+  }
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index 9bc1175..69e401c 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -39,4 +39,23 @@ public interface JmxExecutorManagerMBean {
 
   @DisplayName("OPERATION: getPrimaryExecutorHostPorts")
   public List<String> getPrimaryExecutorHostPorts();
+
+  @DisplayName("OPERATION: isQueueProcessorActive")
+  public boolean isQueueProcessorActive();
+
+  @DisplayName("OPERATION: getQueuedFlows")
+  public String getQueuedFlows();
+
+  @DisplayName("OPERATION: getQueueProcessorThreadState")
+  public String getQueueProcessorThreadState();
+
+  @DisplayName("OPERATION: getAvailableExecutorComparatorNames")
+  List<String> getAvailableExecutorComparatorNames();
+
+  @DisplayName("OPERATION: getAvailableExecutorFilterNames")
+  List<String> getAvailableExecutorFilterNames();
+
+  @DisplayName("OPERATION: getLastSuccessfulExecutorInfoRefresh")
+  long getLastSuccessfulExecutorInfoRefresh();
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
index 80eff74..fc159ec 100644
--- a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
+++ b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
@@ -25,9 +25,17 @@ import java.util.Map;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.commons.lang.StringUtils;
+
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.Role;
+import azkaban.user.User;
+import azkaban.user.UserManager;
 import azkaban.utils.JSONUtils;
 
 public class HttpRequestUtils {
@@ -113,6 +121,68 @@ public class HttpRequestUtils {
   }
 
   /**
+   * <pre>
+   * Remove following flow param if submitting user is not an Azkaban admin
+   * FLOW_PRIORITY
+   * USE_EXECUTOR
+   * @param userManager
+   * @param options
+   * @param user
+   * </pre>
+   */
+  public static void filterAdminOnlyFlowParams(UserManager userManager,
+    ExecutionOptions options, User user)  throws ExecutorManagerException {
+    if (options == null || options.getFlowParameters() == null)
+      return;
+
+    Map<String, String> params = options.getFlowParameters();
+    // is azkaban Admin
+    if (!hasPermission(userManager, user, Type.ADMIN)) {
+      params.remove(ExecutionOptions.FLOW_PRIORITY);
+      params.remove(ExecutionOptions.USE_EXECUTOR);
+    } else {
+      validateIntegerParam(params, ExecutionOptions.FLOW_PRIORITY);
+      validateIntegerParam(params, ExecutionOptions.USE_EXECUTOR);
+    }
+  }
+
+  /**
+   * parse a string as number and throws exception if parsed value is not a
+   * valid integer
+   * @param params
+   * @param paramName
+   * @throws ExecutorManagerException if paramName is not a valid integer
+   */
+  public static boolean validateIntegerParam(Map<String, String> params,
+    String paramName) throws ExecutorManagerException {
+    if (params != null && params.containsKey(paramName)
+      && !StringUtils.isNumeric(params.get(paramName))) {
+      throw new ExecutorManagerException(paramName + " should be an integer");
+    }
+    return true;
+  }
+
+  /**
+   * returns true if user has access of type
+   *
+   * @param userManager
+   * @param user
+   * @param type
+   * @return
+   */
+  public static boolean hasPermission(UserManager userManager, User user,
+    Permission.Type type) {
+    for (String roleName : user.getRoles()) {
+      Role role = userManager.getRole(roleName);
+      if (role.getPermission().isPermissionSet(type)
+        || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Checks for the existance of the parameter in the request
    *
    * @param request
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index 7227b0b..138b80f 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -111,6 +111,19 @@ public class Utils {
     System.exit(exitCode);
   }
 
+  /**
+   * Tests whether a port is valid or not
+   *
+   * @param port
+   * @return true, if port is valid
+   */
+  public static boolean isValidPort(int port) {
+    if (port >= 1 && port <= 65535) {
+      return true;
+    }
+    return false;
+  }
+
   public static File createTempDir() {
     return createTempDir(new File(System.getProperty("java.io.tmpdir")));
   }
@@ -410,7 +423,7 @@ public class Utils {
   }
 
   /**
-   * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000 
+   * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000
    * @return : long value of memory amount in kb
    */
   public static long parseMemString(String strMemSize) {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
new file mode 100644
index 0000000..ed6543f
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for ExecutableFlowPriorityComparator
+ * */
+
+public class ExecutableFlowPriorityComparatorTest {
+
+  /* Helper method to create an ExecutableFlow from serialized description */
+  private ExecutableFlow createExecutableFlow(String flowName, int priority,
+    long updateTime, int executionId) throws IOException {
+    ExecutableFlow execFlow =
+      TestUtils.createExecutableFlow("exectest1", flowName);
+
+    execFlow.setUpdateTime(updateTime);
+    execFlow.setExecutionId(executionId);
+    if (priority > 0) {
+      execFlow.getExecutionOptions().getFlowParameters()
+        .put(ExecutionOptions.FLOW_PRIORITY, String.valueOf(priority));
+    }
+    return execFlow;
+  }
+
+  /* priority queue order when all priorities are explicitly specified */
+  @Test
+  public void testExplicitlySpecifiedPriorities() throws IOException,
+    InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 5, 3, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 6, 3, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", 2, 3, 3);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow2, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow3, queue.take().getSecond());
+  }
+
+  /* priority queue order when some priorities are implicitly specified */
+  @Test
+  public void testMixedSpecifiedPriorities() throws IOException,
+    InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 3, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 3, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow3, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow2, queue.take().getSecond());
+  }
+
+  /*
+   * priority queue order when some priorities are equal, updatetime is used in
+   * this case
+   */
+  @Test
+  public void testEqualPriorities() throws IOException, InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 3, 3);
+    ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow3, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow4, queue.take().getSecond());
+    Assert.assertEquals(flow2, queue.take().getSecond());
+  }
+
+  /*
+   * priority queue order when some priorities and updatetime are equal,
+   * execution Id is used in this case
+   */
+  @Test
+  public void testEqualUpdateTimeAndPriority() throws IOException,
+    InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 3, 1, 1);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", -2, 2, 3);
+    ExecutableFlow flow4 = createExecutableFlow("exec3", 3, 4, 4);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow3, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow4, queue.take().getSecond());
+    Assert.assertEquals(flow2, queue.take().getSecond());
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
new file mode 100644
index 0000000..62d187b
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for executor manager
+ */
+public class ExecutorManagerTest {
+
+  /* Helper method to create a ExecutorManager Instance */
+  private ExecutorManager createMultiExecutorManagerInstance()
+    throws ExecutorManagerException {
+    return createMultiExecutorManagerInstance(new MockExecutorLoader());
+  }
+
+  /*
+   * Helper method to create a ExecutorManager Instance with the given
+   * ExecutorLoader
+   */
+  private ExecutorManager createMultiExecutorManagerInstance(
+    ExecutorLoader loader) throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "false");
+
+    loader.addExecutor("localhost", 12345);
+    loader.addExecutor("localhost", 12346);
+    return new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+  }
+
+  /*
+   * Test create an executor manager instance without any executor local or
+   * remote
+   */
+  @Test(expected = ExecutorManagerException.class)
+  public void testNoExecutorScenario() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    @SuppressWarnings("unused")
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+  }
+
+  /*
+   * Test backward compatibility with just local executor
+   */
+  @Test
+  public void testLocalExecutorScenario() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put("executor.port", 12345);
+
+    ExecutorLoader loader = new MockExecutorLoader();
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Set<Executor> activeExecutors =
+      new HashSet(manager.getAllActiveExecutors());
+
+    Assert.assertEquals(activeExecutors.size(), 1);
+    Executor executor = activeExecutors.iterator().next();
+    Assert.assertEquals(executor.getHost(), "localhost");
+    Assert.assertEquals(executor.getPort(), 12345);
+    Assert.assertArrayEquals(activeExecutors.toArray(), loader
+      .fetchActiveExecutors().toArray());
+  }
+
+  /*
+   * Test executor manager initialization with multiple executors
+   */
+  @Test
+  public void testMultipleExecutorScenario() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    Executor executor1 = loader.addExecutor("localhost", 12345);
+    Executor executor2 = loader.addExecutor("localhost", 12346);
+
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Set<Executor> activeExecutors =
+      new HashSet(manager.getAllActiveExecutors());
+    Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[] {
+      executor1, executor2 });
+  }
+
+  /*
+   * Test executor manager active executor reload
+   */
+  @Test
+  public void testSetupExecutorsSucess() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    Executor executor1 = loader.addExecutor("localhost", 12345);
+
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
+      new Executor[] { executor1 });
+
+    // mark older executor as inactive
+    executor1.setActive(false);
+    loader.updateExecutor(executor1);
+    Executor executor2 = loader.addExecutor("localhost", 12346);
+    Executor executor3 = loader.addExecutor("localhost", 12347);
+    manager.setupExecutors();
+
+    Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
+      new Executor[] { executor2, executor3 });
+  }
+
+  /*
+   * Test executor manager active executor reload and resulting in no active
+   * executors
+   */
+  @Test(expected = ExecutorManagerException.class)
+  public void testSetupExecutorsException() throws ExecutorManagerException {
+    Props props = new Props();
+    props.put(ExecutorManager.AZKABAN_USE_MULTIPLE_EXECUTORS, "true");
+    ExecutorLoader loader = new MockExecutorLoader();
+    Executor executor1 = loader.addExecutor("localhost", 12345);
+
+    ExecutorManager manager =
+      new ExecutorManager(props, loader, new HashMap<String, Alerter>());
+    Set<Executor> activeExecutors =
+      new HashSet(manager.getAllActiveExecutors());
+    Assert.assertArrayEquals(activeExecutors.toArray(),
+      new Executor[] { executor1 });
+
+    // mark older executor as inactive
+    executor1.setActive(false);
+    loader.updateExecutor(executor1);
+    manager.setupExecutors();
+  }
+
+  /* Test disabling queue process thread to pause dispatching */
+  @Test
+  public void testDisablingQueueProcessThread() throws ExecutorManagerException {
+    ExecutorManager manager = createMultiExecutorManagerInstance();
+    manager.enableQueueProcessorThread();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+    manager.disableQueueProcessorThread();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+  }
+
+  /* Test renabling queue process thread to pause restart dispatching */
+  @Test
+  public void testEnablingQueueProcessThread() throws ExecutorManagerException {
+    ExecutorManager manager = createMultiExecutorManagerInstance();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
+    manager.enableQueueProcessorThread();
+    Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
+  }
+
+  /* Test submit a non-dispatched flow */
+  @Test
+  public void testQueuedFlows() throws ExecutorManagerException, IOException {
+    ExecutorLoader loader = new MockExecutorLoader();
+    ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    flow1.setExecutionId(1);
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    flow2.setExecutionId(2);
+
+    User testUser = TestUtils.getTestUser();
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+    manager.submitExecutableFlow(flow2, testUser.getUserId());
+
+    List<ExecutableFlow> testFlows = new LinkedList<ExecutableFlow>();
+    testFlows.add(flow1);
+    testFlows.add(flow2);
+
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
+      loader.fetchQueuedFlows();
+    Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
+    // Verify things are correctly setup in db
+    for (Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
+      Assert.assertTrue(testFlows.contains(pair.getSecond()));
+    }
+
+    // Verify running flows using old definition of "running" flows i.e. a
+    // non-dispatched flow is also considered running
+    List<ExecutableFlow> managerActiveFlows = manager.getRunningFlows();
+    Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
+      && testFlows.containsAll(managerActiveFlows));
+
+    // Verify getQueuedFlowIds method
+    Assert.assertEquals("[1, 2]", manager.getQueuedFlowIds());
+  }
+
+  /* Test submit duplicate flow when previous instance is not dispatched */
+  @Test(expected = ExecutorManagerException.class)
+  public void testDuplicateQueuedFlows() throws ExecutorManagerException,
+    IOException {
+    ExecutorManager manager = createMultiExecutorManagerInstance();
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    flow1.getExecutionOptions().setConcurrentOption(
+      ExecutionOptions.CONCURRENT_OPTION_SKIP);
+
+    User testUser = TestUtils.getTestUser();
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+  }
+
+  /*
+   * Test killing a job in preparation stage at webserver side i.e. a
+   * non-dispatched flow
+   */
+  @Test
+  public void testKillQueuedFlow() throws ExecutorManagerException, IOException {
+    ExecutorLoader loader = new MockExecutorLoader();
+    ExecutorManager manager = createMultiExecutorManagerInstance(loader);
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    User testUser = TestUtils.getTestUser();
+    manager.submitExecutableFlow(flow1, testUser.getUserId());
+
+    manager.cancelFlow(flow1, testUser.getUserId());
+    ExecutableFlow fetchedFlow =
+      loader.fetchExecutableFlow(flow1.getExecutionId());
+    Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
+
+    Assert.assertFalse(manager.getRunningFlows().contains(flow1));
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index f453f0c..dd32302 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -21,8 +21,11 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import javax.sql.DataSource;
@@ -30,34 +33,37 @@ import javax.sql.DataSource;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
-
 import org.joda.time.DateTime;
-
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import azkaban.database.DataSourceUtils;
+import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
+import azkaban.user.User;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
 
 public class JdbcExecutorLoaderTest {
   private static boolean testDBExists;
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions";
   // @TODO remove this and turn into local host.
-  private static final String host = "cyu-ld.linkedin.biz";
+  private static final String host = "localhost";
   private static final int port = 3306;
   private static final String database = "azkaban2";
   private static final String user = "azkaban";
   private static final String password = "azkaban";
   private static final int numConnections = 10;
 
-  private File flowDir = new File("unit/executions/exectest1");
-
   @BeforeClass
   public static void setupDB() {
     DataSource dataSource =
@@ -117,12 +123,31 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
-    DbUtils.closeQuietly(connection);
+    try {
+      runner.query(connection, "SELECT COUNT(1) FROM executors",
+          countHandler);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      testDBExists = false;
+      DbUtils.closeQuietly(connection);
+      return;
+    }
 
-    clearDB();
+    try {
+      runner.query(connection, "SELECT COUNT(1) FROM executor_events",
+          countHandler);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      testDBExists = false;
+      DbUtils.closeQuietly(connection);
+      return;
+    }
+
+    DbUtils.closeQuietly(connection);
   }
 
-  private static void clearDB() {
+  @After
+  public void clearDB() {
     if (!testDBExists) {
       return;
     }
@@ -178,6 +203,24 @@ public class JdbcExecutorLoaderTest {
       return;
     }
 
+    try {
+      runner.update(connection, "DELETE FROM executors");
+    } catch (SQLException e) {
+      e.printStackTrace();
+      testDBExists = false;
+      DbUtils.closeQuietly(connection);
+      return;
+    }
+
+    try {
+      runner.update(connection, "DELETE FROM executor_events");
+    } catch (SQLException e) {
+      e.printStackTrace();
+      testDBExists = false;
+      DbUtils.closeQuietly(connection);
+      return;
+    }
+
     DbUtils.closeQuietly(connection);
   }
 
@@ -186,9 +229,8 @@ public class JdbcExecutorLoaderTest {
     if (!isTestSetup()) {
       return;
     }
-
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
 
     loader.uploadExecutableFlow(flow);
 
@@ -217,7 +259,7 @@ public class JdbcExecutorLoaderTest {
     }
 
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow = createExecutableFlow("exec1");
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
 
     loader.uploadExecutableFlow(flow);
 
@@ -256,7 +298,7 @@ public class JdbcExecutorLoaderTest {
     ExecutableFlow flow = createExecutableFlow(10, "exec1");
     flow.setExecutionId(10);
 
-    File jobFile = new File(flowDir, "job10.job");
+    File jobFile = new File(UNIT_BASE_DIR + "/exectest1", "job10.job");
     Props props = new Props(null, jobFile);
     props.put("test", "test2");
     ExecutableNode oldNode = flow.getExecutableNode("job10");
@@ -293,6 +335,433 @@ public class JdbcExecutorLoaderTest {
 
   }
 
+  /* Test exception when unassigning an missing execution */
+  @Test
+  public void testUnassignExecutorException() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    try {
+      loader.unassignExecutor(2);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test happy case when unassigning executor for a flow execution */
+  @Test
+  public void testUnassignExecutor() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    String host = "localhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    Assert.assertEquals(
+      loader.fetchExecutorByExecutionId(flow.getExecutionId()), executor);
+    loader.unassignExecutor(flow.getExecutionId());
+    Assert.assertEquals(
+      loader.fetchExecutorByExecutionId(flow.getExecutionId()), null);
+  }
+
+  /* Test exception when assigning a non-existent executor to a flow */
+  @Test
+  public void testAssignExecutorInvalidExecutor()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    try {
+      loader.assignExecutor(flow.getExecutionId(), 1);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test exception when assigning an executor to a non-existent flow execution */
+  @Test
+  public void testAssignExecutorInvalidExecution()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    String host = "localhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+    try {
+      loader.assignExecutor(2, executor.getId());
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test null return when an invalid execution flows */
+  @Test
+  public void testFetchMissingExecutorByExecution()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    Assert.assertEquals(loader.fetchExecutorByExecutionId(1), null);
+  }
+
+  /* Test null return when for a non-dispatched execution */
+  @Test
+  public void testFetchExecutorByQueuedExecution()
+    throws ExecutorManagerException, IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
+      null);
+  }
+
+  /* Test happy case when assigning and fetching an executor to a flow execution */
+  @Test
+  public void testAssignAndFetchExecutor() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    String host = "localhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    Assert.assertEquals(loader.fetchExecutorByExecutionId(flow.getExecutionId()),
+      executor);
+  }
+
+  /* Test fetchQueuedFlows when there are no queued flows */
+  @Test
+  public void testFetchNoQueuedFlows() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+      loader.fetchQueuedFlows();
+
+    // no execution flows at all i.e. no running, completed or queued flows
+    Assert.assertTrue(queuedFlows.isEmpty());
+
+    String host = "lcoalhost";
+    int port = 12345;
+    Executor executor = loader.addExecutor(host, port);
+
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    // only completed flows
+    Assert.assertTrue(queuedFlows.isEmpty());
+
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    loader.uploadExecutableFlow(flow);
+    loader.assignExecutor(executor.getId(), flow.getExecutionId());
+    ExecutionReference ref = new ExecutionReference(flow2.getExecutionId());
+    loader.addActiveExecutableReference(ref);
+    // only running and completed flows
+    Assert.assertTrue(queuedFlows.isEmpty());
+  }
+
+  /* Test fetchQueuedFlows happy case */
+  @Test
+  public void testFetchQueuedFlows() throws ExecutorManagerException,
+    IOException {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+      new LinkedList<Pair<ExecutionReference, ExecutableFlow>>();
+
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    loader.uploadExecutableFlow(flow);
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    loader.uploadExecutableFlow(flow);
+
+    ExecutionReference ref2 = new ExecutionReference(flow2.getExecutionId());
+    loader.addActiveExecutableReference(ref2);
+    ExecutionReference ref = new ExecutionReference(flow.getExecutionId());
+    loader.addActiveExecutableReference(ref);
+
+    queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref, flow));
+    queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref2, flow2));
+
+    // only running and completed flows
+    Assert.assertArrayEquals(loader.fetchQueuedFlows().toArray(),
+      queuedFlows.toArray());
+  }
+
+  /* Test all executors fetch from empty executors */
+  @Test
+  public void testFetchEmptyExecutors() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = loader.fetchAllExecutors();
+    Assert.assertEquals(executors.size(), 0);
+  }
+
+  /* Test active executors fetch from empty executors */
+  @Test
+  public void testFetchEmptyActiveExecutors() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = loader.fetchActiveExecutors();
+    Assert.assertEquals(executors.size(), 0);
+  }
+
+  /* Test missing executor fetch with search by executor id */
+  @Test
+  public void testFetchMissingExecutorId() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    Executor executor = loader.fetchExecutor(0);
+    Assert.assertEquals(executor, null);
+  }
+
+  /* Test missing executor fetch with search by host:port */
+  @Test
+  public void testFetchMissingExecutorHostPort() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    Executor executor = loader.fetchExecutor("localhost", 12345);
+    Assert.assertEquals(executor, null);
+  }
+
+  /* Test executor events fetch from with no logged executor */
+  @Test
+  public void testFetchEmptyExecutorEvents() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    Executor executor = new Executor(1, "localhost", 12345, true);
+    List<ExecutorLogEvent> executorEvents =
+      loader.getExecutorEvents(executor, 5, 0);
+    Assert.assertEquals(executorEvents.size(), 0);
+  }
+
+  /* Test logging ExecutorEvents */
+  @Test
+  public void testExecutorEvents() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    int skip = 1;
+    User user = new User("testUser");
+    Executor executor = new Executor(1, "localhost", 12345, true);
+    String message = "My message ";
+    EventType[] events =
+      { EventType.CREATED, EventType.HOST_UPDATE, EventType.INACTIVATION };
+
+    for (EventType event : events) {
+      loader.postExecutorEvent(executor, event, user.getUserId(),
+        message + event.getNumVal());
+    }
+
+    List<ExecutorLogEvent> eventLogs =
+      loader.getExecutorEvents(executor, 10, skip);
+    Assert.assertTrue(eventLogs.size() == 2);
+
+    for (int index = 0; index < eventLogs.size(); ++index) {
+      ExecutorLogEvent eventLog = eventLogs.get(index);
+      Assert.assertEquals(eventLog.getExecutorId(), executor.getId());
+      Assert.assertEquals(eventLog.getUser(), user.getUserId());
+      Assert.assertEquals(eventLog.getType(), events[index + skip]);
+      Assert.assertEquals(eventLog.getMessage(),
+        message + events[index + skip].getNumVal());
+    }
+  }
+
+  /* Test to add duplicate executors */
+  @Test
+  public void testDuplicateAddExecutor() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    try {
+      String host = "localhost";
+      int port = 12345;
+      loader.addExecutor(host, port);
+      loader.addExecutor(host, port);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+  }
+
+  /* Test to try update a non-existent executor */
+  @Test
+  public void testMissingExecutorUpdate() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    try {
+      Executor executor = new Executor(1, "localhost", 1234, true);
+      loader.updateExecutor(executor);
+      Assert.fail("Expecting exception, but didn't get one");
+    } catch (ExecutorManagerException ex) {
+      System.out.println("Test true");
+    }
+    clearDB();
+  }
+
+  /* Test add & fetch by Id Executors */
+  @Test
+  public void testSingleExecutorFetchById() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = addTestExecutors(loader);
+    for (Executor executor : executors) {
+      Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+      Assert.assertEquals(executor, fetchedExecutor);
+    }
+  }
+
+  /* Test fetch all executors */
+  @Test
+  public void testFetchAllExecutors() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = addTestExecutors(loader);
+
+    executors.get(0).setActive(false);
+    loader.updateExecutor(executors.get(0));
+
+    List<Executor> fetchedExecutors = loader.fetchAllExecutors();
+    Assert.assertEquals(executors.size(), fetchedExecutors.size());
+
+    Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
+  }
+
+  /* Test fetch only active executors */
+  @Test
+  public void testFetchActiveExecutors() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = addTestExecutors(loader);
+
+    executors.get(0).setActive(false);
+    loader.updateExecutor(executors.get(0));
+
+    List<Executor> fetchedExecutors = loader.fetchActiveExecutors();
+    Assert.assertEquals(executors.size(), fetchedExecutors.size() + 1);
+    executors.remove(0);
+
+    Assert.assertArrayEquals(executors.toArray(), fetchedExecutors.toArray());
+  }
+
+  /* Test add & fetch by host:port Executors */
+  @Test
+  public void testSingleExecutorFetchHostPort() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+    ExecutorLoader loader = createLoader();
+    List<Executor> executors = addTestExecutors(loader);
+    for (Executor executor : executors) {
+      Executor fetchedExecutor =
+        loader.fetchExecutor(executor.getHost(), executor.getPort());
+      Assert.assertEquals(executor, fetchedExecutor);
+    }
+  }
+
+  /* Helper method used in methods testing jdbc interface for executors table */
+  private List<Executor> addTestExecutors(ExecutorLoader loader)
+    throws ExecutorManagerException {
+    List<Executor> executors = new ArrayList<Executor>();
+    executors.add(loader.addExecutor("localhost1", 12345));
+    executors.add(loader.addExecutor("localhost2", 12346));
+    executors.add(loader.addExecutor("localhost1", 12347));
+    return executors;
+  }
+
+  /* Test Executor Inactivation */
+  @Test
+  public void testExecutorInactivation() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    Executor executor = loader.addExecutor("localhost1", 12345);
+    Assert.assertTrue(executor.isActive());
+
+    executor.setActive(false);
+    loader.updateExecutor(executor);
+
+    Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+
+    Assert.assertEquals(executor.getHost(), fetchedExecutor.getHost());
+    Assert.assertEquals(executor.getId(), fetchedExecutor.getId());
+    Assert.assertEquals(executor.getPort(), fetchedExecutor.getPort());
+    Assert.assertFalse(fetchedExecutor.isActive());
+  }
+
+  /* Test Executor reactivation */
+  @Test
+  public void testExecutorActivation() throws Exception {
+    if (!isTestSetup()) {
+      return;
+    }
+
+    ExecutorLoader loader = createLoader();
+    Executor executor = loader.addExecutor("localhost1", 12345);
+    Assert.assertTrue(executor.isActive());
+
+    executor.setActive(false);
+    loader.updateExecutor(executor);
+    Executor fetchedExecutor = loader.fetchExecutor(executor.getId());
+    Assert.assertFalse(fetchedExecutor.isActive());
+
+    executor.setActive(true);
+    loader.updateExecutor(executor);
+    fetchedExecutor = loader.fetchExecutor(executor.getId());
+
+    Assert.assertEquals(executor, fetchedExecutor);
+  }
+
   @Test
   public void testActiveReference() throws Exception {
     if (!isTestSetup()) {
@@ -300,19 +769,20 @@ public class JdbcExecutorLoaderTest {
     }
 
     ExecutorLoader loader = createLoader();
-    ExecutableFlow flow1 = createExecutableFlow("exec1");
+    ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow1);
+    Executor executor = new Executor(2, "test", 1, true);
     ExecutionReference ref1 =
-        new ExecutionReference(flow1.getExecutionId(), "test", 1);
+        new ExecutionReference(flow1.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref1);
 
-    ExecutableFlow flow2 = createExecutableFlow("exec1");
+    ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow2);
     ExecutionReference ref2 =
-        new ExecutionReference(flow2.getExecutionId(), "test", 1);
+        new ExecutionReference(flow2.getExecutionId(), executor);
     loader.addActiveExecutableReference(ref2);
 
-    ExecutableFlow flow3 = createExecutableFlow("exec1");
+    ExecutableFlow flow3 = TestUtils.createExecutableFlow("exectest1", "exec1");
     loader.uploadExecutableFlow(flow3);
 
     Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
@@ -356,7 +826,7 @@ public class JdbcExecutorLoaderTest {
 
   @Ignore @Test
   public void testSmallUploadLog() throws ExecutorManagerException {
-    File logDir = new File("unit/executions/logtest");
+    File logDir = new File(UNIT_BASE_DIR + "logtest");
     File[] smalllog =
         { new File(logDir, "log1.log"), new File(logDir, "log2.log"),
             new File(logDir, "log3.log") };
@@ -381,7 +851,7 @@ public class JdbcExecutorLoaderTest {
 
   @Ignore @Test
   public void testLargeUploadLog() throws ExecutorManagerException {
-    File logDir = new File("unit/executions/logtest");
+    File logDir = new File(UNIT_BASE_DIR + "logtest");
 
     // Multiple of 255 for Henry the Eigth
     File[] largelog =
@@ -427,7 +897,7 @@ public class JdbcExecutorLoaderTest {
 
     ExecutorLoader loader = createLoader();
 
-    File logDir = new File("unit/executions/logtest");
+    File logDir = new File(UNIT_BASE_DIR + "logtest");
 
     // Multiple of 255 for Henry the Eigth
     File[] largelog =
@@ -451,37 +921,10 @@ public class JdbcExecutorLoaderTest {
   }
 
   private ExecutableFlow createExecutableFlow(int executionId, String flowName)
-      throws IOException {
-    File jsonFlowFile = new File(flowDir, flowName + ".flow");
-    @SuppressWarnings("unchecked")
-    HashMap<String, Object> flowObj =
-        (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    Flow flow = Flow.flowFromObject(flowObj);
-    Project project = new Project(1, "flow");
-    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
-    flowMap.put(flow.getId(), flow);
-    project.setFlows(flowMap);
-    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+    throws IOException {
+    ExecutableFlow execFlow =
+      TestUtils.createExecutableFlow("exectest1", flowName);
     execFlow.setExecutionId(executionId);
-
-    return execFlow;
-  }
-
-  private ExecutableFlow createExecutableFlow(String flowName)
-      throws IOException {
-    File jsonFlowFile = new File(flowDir, flowName + ".flow");
-    @SuppressWarnings("unchecked")
-    HashMap<String, Object> flowObj =
-        (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    Flow flow = Flow.flowFromObject(flowObj);
-    Project project = new Project(1, "flow");
-    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
-    flowMap.put(flow.getId(), flow);
-    project.setFlows(flowMap);
-    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-
     return execFlow;
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index b9ad178..0db2de5 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -17,16 +17,21 @@
 package azkaban.executor;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
 public class MockExecutorLoader implements ExecutorLoader {
 
+  HashMap<Integer, Integer> executionExecutorMapping =
+      new HashMap<Integer, Integer>();
   HashMap<Integer, ExecutableFlow> flows =
       new HashMap<Integer, ExecutableFlow>();
   HashMap<String, ExecutableNode> nodes = new HashMap<String, ExecutableNode>();
@@ -36,6 +41,10 @@ public class MockExecutorLoader implements ExecutorLoader {
   HashMap<String, Integer> jobUpdateCount = new HashMap<String, Integer>();
   Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows =
       new HashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
+  List<Executor> executors = new ArrayList<Executor>();
+  int executorIdCounter = 0;
+  Map<Integer, ArrayList<ExecutorLogEvent>> executorEvents =
+    new HashMap<Integer, ArrayList<ExecutorLogEvent>>();
 
   @Override
   public void uploadExecutableFlow(ExecutableFlow flow)
@@ -249,4 +258,119 @@ public class MockExecutorLoader implements ExecutorLoader {
 
   }
 
+  @Override
+  public List<Executor> fetchActiveExecutors() throws ExecutorManagerException {
+    List<Executor> activeExecutors = new ArrayList<Executor>();
+    for (Executor executor : executors) {
+      if (executor.isActive()) {
+        activeExecutors.add(executor);
+      }
+    }
+    return activeExecutors;
+  }
+
+  @Override
+  public Executor fetchExecutor(String host, int port)
+    throws ExecutorManagerException {
+    for (Executor executor : executors) {
+      if (executor.getHost().equals(host) && executor.getPort() == port) {
+        return executor;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
+    for (Executor executor : executors) {
+      if (executor.getId() == executorId) {
+        return executor;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Executor addExecutor(String host, int port)
+    throws ExecutorManagerException {
+    Executor executor = null;
+    if (fetchExecutor(host, port) == null) {
+      executorIdCounter++;
+      executor = new Executor(executorIdCounter, host, port, true);
+      executors.add(executor);
+    }
+    return executor;
+  }
+
+  @Override
+  public void postExecutorEvent(Executor executor, EventType type, String user,
+    String message) throws ExecutorManagerException {
+    ExecutorLogEvent event =
+      new ExecutorLogEvent(executor.getId(), user, new Date(), type, message);
+
+    if (!executorEvents.containsKey(executor.getId())) {
+      executorEvents.put(executor.getId(), new ArrayList<ExecutorLogEvent>());
+    }
+
+    executorEvents.get(executor.getId()).add(event);
+  }
+
+  @Override
+  public List<ExecutorLogEvent> getExecutorEvents(Executor executor, int num,
+    int skip) throws ExecutorManagerException {
+    if (!executorEvents.containsKey(executor.getId())) {
+      List<ExecutorLogEvent> events = executorEvents.get(executor.getId());
+      return events.subList(skip, Math.min(num + skip - 1, events.size() - 1));
+    }
+    return null;
+  }
+
+  @Override
+  public void updateExecutor(Executor executor) throws ExecutorManagerException {
+    Executor oldExecutor = fetchExecutor(executor.getId());
+    executors.remove(oldExecutor);
+    executors.add(executor);
+  }
+
+  @Override
+  public List<Executor> fetchAllExecutors() throws ExecutorManagerException {
+    return executors;
+  }
+
+  @Override
+  public void assignExecutor(int executorId, int execId)
+    throws ExecutorManagerException {
+    ExecutionReference ref = refs.get(execId);
+    ref.setExecutor(fetchExecutor(executorId));
+    executionExecutorMapping.put(execId, executorId);
+  }
+
+  @Override
+  public Executor fetchExecutorByExecutionId(int execId) throws ExecutorManagerException {
+    if (executionExecutorMapping.containsKey(execId)) {
+      return fetchExecutor(executionExecutorMapping.get(execId));
+    } else {
+      throw new ExecutorManagerException(
+        "Failed to find executor with execution : " + execId);
+    }
+  }
+
+  @Override
+  public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows()
+    throws ExecutorManagerException {
+    List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+      new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+    for (int execId : refs.keySet()) {
+      if (!executionExecutorMapping.containsKey(execId)) {
+        queuedFlows.add(new Pair<ExecutionReference, ExecutableFlow>(refs
+          .get(execId), flows.get(execId)));
+      }
+    }
+    return queuedFlows;
+  }
+
+  @Override
+  public void unassignExecutor(int executionId) throws ExecutorManagerException {
+    executionExecutorMapping.remove(executionId);
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
new file mode 100644
index 0000000..94bbd7e
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/QueuedExecutionsTest.java
@@ -0,0 +1,208 @@
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+
+public class QueuedExecutionsTest {
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions/exectest1/";
+
+  private File getFlowDir(String flow) {
+    return new File(UNIT_BASE_DIR + flow + ".flow");
+  }
+
+  /*
+   * Helper method to create an (ExecutionReference, ExecutableFlow) from
+   * serialized description
+   */
+  private Pair<ExecutionReference, ExecutableFlow> createExecutablePair(
+    String flowName, int execId) throws IOException {
+    File jsonFlowFile = getFlowDir(flowName);
+    @SuppressWarnings("unchecked")
+    HashMap<String, Object> flowObj =
+      (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+    Flow flow = Flow.flowFromObject(flowObj);
+    Project project = new Project(1, "flow");
+    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+    flowMap.put(flow.getId(), flow);
+    project.setFlows(flowMap);
+    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+    execFlow.setExecutionId(execId);
+    ExecutionReference ref = new ExecutionReference(execId);
+    return new Pair<ExecutionReference, ExecutableFlow>(ref, execFlow);
+  }
+
+  public List<Pair<ExecutionReference, ExecutableFlow>> getDummyData()
+    throws IOException {
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList =
+      new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
+    dataList.add(createExecutablePair("exec1", 1));
+    dataList.add(createExecutablePair("exec2", 2));
+    return dataList;
+  }
+
+  /* Test enqueue method happy case */
+  @Test
+  public void testEnqueueHappyCase() throws IOException,
+    ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      queue.enqueue(pair.getSecond(), pair.getFirst());
+    }
+
+    Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+    Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+  }
+
+  /* Test enqueue duplicate execution ids */
+  @Test(expected = ExecutorManagerException.class)
+  public void testEnqueueDuplicateExecution() throws IOException,
+    ExecutorManagerException {
+    Pair<ExecutionReference, ExecutableFlow> pair1 =
+      createExecutablePair("exec1", 1);
+    QueuedExecutions queue = new QueuedExecutions(5);
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+  }
+
+  /* Test enqueue more than capacity */
+  @Test(expected = ExecutorManagerException.class)
+  public void testEnqueueOverflow() throws IOException,
+    ExecutorManagerException {
+    Pair<ExecutionReference, ExecutableFlow> pair1 =
+      createExecutablePair("exec1", 1);
+    QueuedExecutions queue = new QueuedExecutions(1);
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+    queue.enqueue(pair1.getSecond(), pair1.getFirst());
+  }
+
+  /* Test EnqueueAll method */
+  @Test
+  public void testEnqueueAll() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertTrue(queue.getAllEntries().containsAll(dataList));
+    Assert.assertTrue(dataList.containsAll(queue.getAllEntries()));
+  }
+
+  /* Test size method */
+  @Test
+  public void testSize() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.size(), 2);
+  }
+
+  /* Test dequeue method */
+  @Test
+  public void testDequeue() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    queue.dequeue(dataList.get(0).getFirst().getExecId());
+    Assert.assertEquals(queue.size(), 1);
+    Assert.assertTrue(queue.getAllEntries().contains(dataList.get(1)));
+  }
+
+  /* Test clear method */
+  @Test
+  public void testClear() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.size(), 2);
+    queue.clear();
+    Assert.assertEquals(queue.size(), 0);
+  }
+
+  /* Test isEmpty method */
+  @Test
+  public void testIsEmpty() throws IOException, ExecutorManagerException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    Assert.assertTrue(queue.isEmpty());
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.size(), 2);
+    queue.clear();
+    Assert.assertTrue(queue.isEmpty());
+  }
+
+  /* Test fetchHead method */
+  @Test
+  public void testFetchHead() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(5);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    Assert.assertTrue(queue.isEmpty());
+    queue.enqueueAll(dataList);
+    Assert.assertEquals(queue.fetchHead(), dataList.get(0));
+    Assert.assertEquals(queue.fetchHead(), dataList.get(1));
+  }
+
+  /* Test isFull method */
+  @Test
+  public void testIsFull() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    Assert.assertTrue(queue.isFull());
+  }
+
+  /* Test hasExecution method */
+  @Test
+  public void testHasExecution() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      Assert.assertTrue(queue.hasExecution(pair.getFirst().getExecId()));
+    }
+    Assert.assertFalse(queue.hasExecution(5));
+    Assert.assertFalse(queue.hasExecution(7));
+    Assert.assertFalse(queue.hasExecution(15));
+  }
+
+  /* Test getFlow method */
+  @Test
+  public void testGetFlow() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      Assert.assertEquals(pair.getSecond(),
+        queue.getFlow(pair.getFirst().getExecId()));
+    }
+  }
+
+  /* Test getReferences method */
+  @Test
+  public void testGetReferences() throws IOException, ExecutorManagerException,
+    InterruptedException {
+    QueuedExecutions queue = new QueuedExecutions(2);
+    List<Pair<ExecutionReference, ExecutableFlow>> dataList = getDummyData();
+    queue.enqueueAll(dataList);
+    for (Pair<ExecutionReference, ExecutableFlow> pair : dataList) {
+      Assert.assertEquals(pair.getFirst(),
+        queue.getReference(pair.getFirst().getExecId()));
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
new file mode 100644
index 0000000..e54b182
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -0,0 +1,738 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.BasicConfigurator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.executor.selector.*;
+import azkaban.utils.JSONUtils;
+
+public class SelectorTest {
+  // mock executor object.
+  protected class MockExecutorObject implements Comparable <MockExecutorObject>{
+    public String name;
+    public int    port;
+    public double percentOfRemainingMemory;
+    public int    amountOfRemainingMemory;
+    public int    priority;
+    public Date   lastAssigned;
+    public double percentOfRemainingFlowcapacity;
+    public int    remainingTmp;
+
+    public MockExecutorObject(String name,
+        int port,
+        double percentOfRemainingMemory,
+        int amountOfRemainingMemory,
+        int priority,
+        Date lastAssigned,
+        double percentOfRemainingFlowcapacity,
+        int remainingTmp)
+    {
+      this.name = name;
+      this.port = port;
+      this.percentOfRemainingMemory = percentOfRemainingMemory;
+      this.amountOfRemainingMemory =amountOfRemainingMemory;
+      this.priority = priority;
+      this.lastAssigned = lastAssigned;
+      this.percentOfRemainingFlowcapacity = percentOfRemainingFlowcapacity;
+      this.remainingTmp = remainingTmp;
+    }
+
+    @Override
+    public String toString()
+    {
+      return this.name;
+    }
+
+    @Override
+    public int compareTo(MockExecutorObject o) {
+      return null == o ? 1 : this.hashCode() - o.hashCode();
+    }
+  }
+
+  // Mock flow object.
+  protected class MockFlowObject{
+    public String name;
+    public int    requiredRemainingMemory;
+    public int    requiredTotalMemory;
+    public int    requiredRemainingTmpSpace;
+    public int    priority;
+
+    public MockFlowObject(String name,
+        int requiredTotalMemory,
+        int requiredRemainingMemory,
+        int requiredRemainingTmpSpace,
+        int priority)
+    {
+      this.name = name;
+      this.requiredTotalMemory = requiredTotalMemory;
+      this.requiredRemainingMemory = requiredRemainingMemory;
+      this.requiredRemainingTmpSpace = requiredRemainingTmpSpace;
+      this.priority = priority;
+    }
+
+    @Override
+    public String toString()
+    {
+      return this.name;
+    }
+  }
+
+  // mock Filter class.
+  protected class MockFilter
+  extends CandidateFilter<MockExecutorObject,MockFlowObject>{
+
+    @Override
+    public String getName() {
+      return "Mockfilter";
+    }
+
+    public MockFilter(){
+    }
+
+    // function to register the remainingMemory filter.
+    // for test purpose the registration is put in a separated method, in production the work should be done
+    // in the constructor.
+    public void registerFilterforTotalMemory(){
+      this.registerFactorFilter(FactorFilter.create("requiredTotalMemory",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+          public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+            // REAL LOGIC COMES HERE -
+            if (null == itemToCheck || null == sourceObject){
+              return false;
+            }
+
+            // Box has infinite memory.:)
+            if (itemToCheck.percentOfRemainingMemory == 0) {
+              return true;
+            }
+
+            // calculate the memory and return.
+            return itemToCheck.amountOfRemainingMemory / itemToCheck.percentOfRemainingMemory * 100 >
+                   sourceObject.requiredTotalMemory;
+          }}));
+    }
+
+    public void registerFilterforRemainingMemory(){
+      this.registerFactorFilter(FactorFilter.create("requiredRemainingMemory",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+        public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+          // REAL LOGIC COMES HERE -
+          if (null == itemToCheck || null == sourceObject){
+            return false;
+         }
+         return itemToCheck.amountOfRemainingMemory > sourceObject.requiredRemainingMemory;
+        }}));
+    }
+
+    public void registerFilterforPriority(){
+      this.registerFactorFilter(FactorFilter.create("requiredProprity",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+        public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+          // REAL LOGIC COMES HERE -
+          if (null == itemToCheck || null == sourceObject){
+            return false;
+          }
+
+          // priority value, the bigger the lower.
+          return itemToCheck.priority >= sourceObject.priority;
+        }}));
+    }
+
+    public void registerFilterforRemainingTmpSpace(){
+      this.registerFactorFilter(FactorFilter.create("requiredRemainingTmpSpace",
+          new FactorFilter.Filter<MockExecutorObject,MockFlowObject>() {
+        public boolean filterTarget(MockExecutorObject itemToCheck, MockFlowObject sourceObject) {
+          // REAL LOGIC COMES HERE -
+          if (null == itemToCheck || null == sourceObject){
+            return false;
+          }
+
+         return itemToCheck.remainingTmp > sourceObject.requiredRemainingTmpSpace;
+        }}));
+    }
+
+  }
+
+  // mock comparator class.
+  protected class MockComparator
+  extends CandidateComparator<MockExecutorObject>{
+
+    @Override
+    public String getName() {
+      return "MockComparator";
+    }
+
+    @Override
+    protected boolean tieBreak(MockExecutorObject object1, MockExecutorObject object2){
+      if (null == object2) return true;
+      if (null == object1) return false;
+      return object1.name.compareTo(object2.name) >= 0;
+    }
+
+    public MockComparator(){
+    }
+
+    public void registerComparerForMemory(int weight){
+      this.registerFactorComparator(FactorComparator.create("Memory", weight, new Comparator<MockExecutorObject>(){
+        public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+          int result = 0 ;
+
+          // check remaining amount of memory.
+          result = o1.amountOfRemainingMemory - o2.amountOfRemainingMemory;
+          if (result != 0){
+            return result > 0 ? 1 : -1;
+          }
+
+          // check remaining % .
+          result = (int)(o1.percentOfRemainingMemory - o2.percentOfRemainingMemory);
+          return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+        } }));
+    }
+
+    public void registerComparerForRemainingSpace(int weight){
+      this.registerFactorComparator(FactorComparator.create("RemainingTmp", weight, new Comparator<MockExecutorObject>(){
+        public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+          int result = 0 ;
+
+          // check remaining % .
+          result = (int)(o1.remainingTmp - o2.remainingTmp);
+          return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+        } }));
+    }
+
+    public void registerComparerForPriority(int weight){
+      this.registerFactorComparator(FactorComparator.create("Priority", weight, new Comparator<MockExecutorObject>(){
+        public int compare(MockExecutorObject o1, MockExecutorObject o2) {
+          int result = 0 ;
+
+          // check priority, bigger the better.
+          result = (int)(o1.priority - o2.priority);
+          return result == 0 ? 0 : result > 0 ? 1 : -1;
+
+        } }));
+    }
+  }
+
+  // test samples.
+  protected ArrayList<MockExecutorObject> executorList = new ArrayList<MockExecutorObject>();
+
+  @BeforeClass public static void onlyOnce() {
+    BasicConfigurator.configure();
+   }
+
+  @Before
+  public void setUp() throws Exception {
+    executorList.clear();
+    executorList.add(new MockExecutorObject("Executor1",8080,50.0,2048,5,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor2",8080,50.0,2048,4,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor3",8080,40.0,2048,1,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor4",8080,50.0,2048,4,new Date(), 20, 6400));
+    executorList.add(new MockExecutorObject("Executor5",8080,50.0,1024,5,new Date(), 90, 6400));
+    executorList.add(new MockExecutorObject("Executor6",8080,50.0,1024,5,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor7",8080,50.0,1024,5,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor8",8080,50.0,2048,1,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor9",8080,50.0,2050,5,new Date(), 90, 4200));
+    executorList.add(new MockExecutorObject("Executor10",8080,00.0,1024,1,new Date(), 90, 3200));
+    executorList.add(new MockExecutorObject("Executor11",8080,20.0,2096,3,new Date(), 90, 2400));
+    executorList.add(new MockExecutorObject("Executor12",8080,90.0,2050,5,new Date(), 60, 2500));
+
+
+    // make sure each time the order is different.
+    Collections.shuffle(this.executorList);
+  }
+
+  private MockExecutorObject  getExecutorByName(String name){
+    MockExecutorObject returnVal = null;
+    for (MockExecutorObject item : this.executorList){
+      if (item.name.equals(name)){
+        returnVal = item;
+        break;
+      }
+    }
+    return returnVal;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testExecutorFilter() throws Exception {
+
+      // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+      MockFlowObject  dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+      MockFilter mFilter = new MockFilter();
+      mFilter.registerFilterforRemainingMemory();
+
+      // expect true.
+      boolean result = mFilter.filterTarget(this.getExecutorByName("Executor1"), dispatchingObj);
+      Assert.assertTrue(result);
+
+      //expect true.
+      result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+      /*
+      1 [main] INFO azkaban.executor.Selector.CandidateFilter  - start checking 'Executor3' with factor filter for 'Mockfilter'
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - Final checking result : true
+      */
+      Assert.assertTrue(result);
+
+      // add the priority filter.
+      mFilter.registerFilterforPriority();
+      result = mFilter.filterTarget(this.getExecutorByName("Executor3"), dispatchingObj);
+      // expect false, for priority.
+      /*
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - start checking 'Executor3' with factor filter for 'Mockfilter'
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredProprity] filter result : false
+      2 [main] INFO azkaban.executor.Selector.CandidateFilter  - Final checking result : false
+      */
+      Assert.assertFalse(result);
+
+      // add the remaining space filter.
+      mFilter.registerFilterforRemainingTmpSpace();
+
+      // expect pass.
+      result = mFilter.filterTarget(this.getExecutorByName("Executor2"), dispatchingObj);
+      /*
+      3 [main] INFO azkaban.executor.Selector.CandidateFilter  - start checking 'Executor2' with factor filter for 'Mockfilter'
+      3 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      3 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : true
+      3 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredProprity] filter result : true
+      3 [main] INFO azkaban.executor.Selector.CandidateFilter  - Final checking result : true
+      */
+      Assert.assertTrue(result);
+
+      // expect false, remaining tmp, priority will also fail but the logic shortcuts when the Tmp size check Fails.
+      result = mFilter.filterTarget(this.getExecutorByName("Executor8"), dispatchingObj);
+      /*
+      4 [main] INFO azkaban.executor.Selector.CandidateFilter  - start checking 'Executor8' with factor filter for 'Mockfilter'
+      4 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingMemory] filter result : true
+      4 [main] INFO azkaban.executor.Selector.CandidateFilter  - [Factor: requiredRemainingTmpSpace] filter result : false
+      4 [main] INFO azkaban.executor.Selector.CandidateFilter  - Final checking result : false
+      */
+      Assert.assertFalse(result);
+
+  }
+
+  @Test
+  public void testExecutorFilterWithNullInputs() throws Exception {
+    MockFilter filter = new MockFilter();
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+    boolean result = false;
+    try {
+        result = filter.filterTarget(this.getExecutorByName("Executor1"), null);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when null value is passed to the filter.");
+      }
+    // note : the FactorFilter logic will decide whether true or false should be returned when null value
+    //        is passed, for the Mock class it returns false.
+    Assert.assertFalse(result);
+
+    try {
+        result = filter.filterTarget(null, null);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when null value is passed to the filter.");
+      }
+    // note : the FactorFilter logic will decide whether true or false should be returned when null value
+    //        is passed, for the Mock class it returns false.
+    Assert.assertFalse(result);
+  }
+
+  @Test
+  public void testExecutorComparer() throws Exception {
+    MockComparator comparator = new MockComparator();
+    comparator.registerComparerForMemory(5);
+
+    MockExecutorObject nextExecutor = Collections.max(this.executorList, comparator);
+
+    // expect the first item to be selected, memory wise it is the max.
+    Assert.assertEquals(this.getExecutorByName("Executor11"),nextExecutor);
+
+    // add the priority factor.
+    // expect again the #9 item to be selected.
+    comparator.registerComparerForPriority(6);
+    nextExecutor = Collections.max(this.executorList, comparator);
+    Assert.assertEquals(this.getExecutorByName("Executor12"),nextExecutor);
+
+    // add the remaining space factor.
+    // expect the #12 item to be returned.
+    comparator.registerComparerForRemainingSpace(3);
+    nextExecutor = Collections.max(this.executorList, comparator);
+    Assert.assertEquals(this.getExecutorByName("Executor12"),nextExecutor);
+  }
+
+  @Test
+  public void testExecutorComparerResisterComparerWInvalidWeight() throws Exception {
+    MockComparator comparator = new MockComparator();
+    comparator.registerComparerForMemory(0);
+  }
+
+  @Test
+  public void testSelector() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(5);
+    comparator.registerComparerForRemainingSpace(3);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    // mock object, remaining memory 11500, total memory 3095, remainingTmpSpace 4200, priority 2.
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",3096, 1500,4200,2);
+
+    // expected selection = #12
+    MockExecutorObject nextExecutor = morkSelector.getBest(this.executorList, dispatchingObj);
+    Assert.assertEquals(this.getExecutorByName("Executor1"),nextExecutor);
+
+   // remaining memory 11500, total memory 3095, remainingTmpSpace 14200, priority 2.
+   dispatchingObj = new MockFlowObject("flow1",3096, 1500,14200,2);
+   // all candidates should be filtered by the remaining memory.
+   nextExecutor = morkSelector.getBest(this.executorList, dispatchingObj);
+   Assert.assertEquals(null,nextExecutor);
+  }
+
+  @Test
+  public void testSelectorsignleCandidate() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    ArrayList<MockExecutorObject> signleExecutorList = new ArrayList<MockExecutorObject>();
+    MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+    signleExecutorList.add(signleExecutor);
+
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+    MockExecutorObject executor = morkSelector.getBest(signleExecutorList, dispatchingObj);
+    // expected to see null result, as the only executor is filtered out .
+    Assert.assertTrue(null == executor);
+
+    // adjust the priority to let the executor pass the filter.
+    dispatchingObj.priority = 3;
+    executor = morkSelector.getBest(signleExecutorList, dispatchingObj);
+    Assert.assertEquals(signleExecutor, executor);
+  }
+
+  @Test
+  public void testSelectorListWithItemsThatAreReferenceEqual() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+    MockExecutorObject signleExecutor = new MockExecutorObject("ExecutorX",8080,50.0,2048,3,new Date(), 20, 6400);
+    list.add(signleExecutor);
+    list.add(signleExecutor);
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+    MockExecutorObject executor = morkSelector.getBest(list, dispatchingObj);
+    Assert.assertTrue(signleExecutor == executor);
+  }
+
+  @Test
+  public void testSelectorListWithItemsThatAreEqualInValue() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    // note - as the tieBreaker set in the MockComparator uses the name value of the executor to do the
+    //        final diff therefore we need to set the name differently to make a meaningful test, in real
+    //        scenario we may want to use something else (say hash code) to be the bottom line for the tieBreaker
+    //        to make a final decision, the purpose of the test here is to prove that for two candidates with
+    //        exact value (in the case of test, all values except for the name) the decision result is stable.
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+    MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+    MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+    list.add(executor1);
+    list.add(executor2);
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+    MockExecutorObject executor = morkSelector.getBest(list, dispatchingObj);
+    Assert.assertTrue(executor2 == executor);
+
+    // shuffle and test again.
+    list.remove(0);
+    list.add(executor1);
+    executor = morkSelector.getBest(list, dispatchingObj);
+    Assert.assertTrue(executor2 == executor);
+  }
+
+  @Test
+  public void testSelectorEmptyList() throws Exception {
+    MockFilter filter = new MockFilter();
+    MockComparator comparator = new MockComparator();
+
+    filter.registerFilterforPriority();
+    filter.registerFilterforRemainingMemory();
+    filter.registerFilterforRemainingTmpSpace();
+    filter.registerFilterforTotalMemory();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(filter,comparator);
+
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,5);
+
+    MockExecutorObject executor  = null;
+
+    try {
+      executor = morkSelector.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when an empty list is passed to the Selector.");
+      }
+
+    // expected to see null result.
+    Assert.assertTrue(null == executor);
+
+    try {
+      executor = morkSelector.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when null is passed to the Selector as the candidate list.");
+      }
+
+      // expected to see null result, as the only executor is filtered out .
+      Assert.assertTrue(null == executor);
+
+  }
+
+  @Test
+  public void testSelectorListWithNullValue() throws Exception {
+    MockComparator comparator = new MockComparator();
+
+    comparator.registerComparerForMemory(3);
+    comparator.registerComparerForPriority(4);
+    comparator.registerComparerForRemainingSpace(1);
+
+    CandidateSelector<MockExecutorObject,MockFlowObject> morkSelector =
+        new CandidateSelector<MockExecutorObject,MockFlowObject>(null,comparator);
+
+    ArrayList<MockExecutorObject> list = new ArrayList<MockExecutorObject>();
+    MockExecutorObject executor1 = new MockExecutorObject("ExecutorX", 8080,50.0,2048,3,new Date(), 20, 6400);
+    MockExecutorObject executor2 = new MockExecutorObject("ExecutorX2",8080,50.0,2048,3,new Date(), 20, 6400);
+    list.add(executor1);
+    list.add(executor2);
+    list.add(null);
+
+    MockFlowObject  dispatchingObj = new MockFlowObject("flow1",100, 100,100,3);
+    MockExecutorObject executor  = null;
+    try {
+      executor = morkSelector.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when an List contains null value.");
+      }
+    Assert.assertTrue(executor2 == executor);
+
+    // try to compare null vs null, no exception is expected.
+    list.clear();
+    list.add(null);
+    list.add(null);
+    try {
+      executor = morkSelector.getBest(list, dispatchingObj);
+      } catch (Exception ex){
+        Assert.fail("no exception should be thrown when an List contains multiple null values.");
+      }
+    Assert.assertTrue(null == executor);
+
+  }
+
+  @Test
+  public void testCreatingExectorfilterObject() throws Exception{
+    List<String> validList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+    try {
+      new ExecutorFilter(validList);
+    }catch (Exception ex){
+      Assert.fail("creating ExecutorFilter with valid list throws exception . ex -" + ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreatingExectorfilterObjectWInvalidList() throws Exception{
+    List<String> invalidList = new ArrayList<String>();
+    invalidList.add("notExistingFilter");
+    Exception result = null;
+    try {
+      new ExecutorFilter(invalidList);
+    }catch (Exception ex){
+      if (ex instanceof IllegalArgumentException)
+      result = ex;
+    }
+    Assert.assertNotNull(result);
+  }
+
+  @Test
+  public void testCreatingExectorComparatorObject() throws Exception{
+   Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+   for (String name : ExecutorComparator.getAvailableComparatorNames()){
+     comparatorMap.put(name, 1);
+   }
+   try {
+      new ExecutorComparator(comparatorMap);
+    }catch (Exception ex){
+      Assert.fail("creating ExecutorComparator with valid list throws exception . ex -" + ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreatingExectorComparatorObjectWInvalidName() throws Exception{
+    Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+    comparatorMap.put("invalidName", 0);
+    Exception result = null;
+    try {
+      new ExecutorComparator(comparatorMap);
+    }catch (Exception ex){
+      if (ex instanceof IllegalArgumentException)
+      result = ex;
+    }
+    Assert.assertNotNull(result);
+  }
+
+  @Test
+  public void testCreatingExectorComparatorObjectWInvalidWeight() throws Exception{
+    Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+    for (String name : ExecutorComparator.getAvailableComparatorNames()){
+      comparatorMap.put(name, -1);
+    }
+    Exception result = null;
+    try {
+      new ExecutorComparator(comparatorMap);
+    }catch (Exception ex){
+      if (ex instanceof IllegalArgumentException)
+      result = ex;
+    }
+    Assert.assertNotNull(result);
+  }
+
+  @Test
+  public void testCreatingExecutorSelectorWithEmptyFilterComparatorList() throws Exception{
+    List<Executor> executorList = new ArrayList<Executor>();
+    executorList.add(new Executor(1, "host1", 80, true));
+    executorList.add(new Executor(2, "host2", 80, true));
+    executorList.add(new Executor(3, "host3", 80, true));
+
+    executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+    executorList.get(1).setExecutorInfo(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90,  0));
+    executorList.get(2).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90,  0));
+
+    ExecutableFlow flow = new ExecutableFlow();
+
+    ExecutorSelector selector = new ExecutorSelector(null , null);
+    Executor executor = selector.getBest(executorList, flow);
+    Assert.assertEquals(executorList.get(2), executor);
+  }
+
+
+  @Test
+  public void testExecutorSelectorE2E() throws Exception{
+    List<String> filterList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+    Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+    List<Executor> executorList = new ArrayList<Executor>();
+    executorList.add(new Executor(1, "host1", 80, true));
+    executorList.add(new Executor(2, "host2", 80, true));
+    executorList.add(new Executor(3, "host3", 80, true));
+
+    executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+    executorList.get(1).setExecutorInfo(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90,  0));
+    executorList.get(2).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90,  0));
+
+    ExecutableFlow flow = new ExecutableFlow();
+
+    for (String name : ExecutorComparator.getAvailableComparatorNames()){
+      comparatorMap.put(name, 1);
+    }
+    ExecutorSelector selector = new ExecutorSelector(filterList,comparatorMap);
+    Executor executor = selector.getBest(executorList, flow);
+    Assert.assertEquals(executorList.get(0), executor);
+
+    // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
+    // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
+    executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 4095, 50, System.currentTimeMillis(), 90, 1));
+    executor = selector.getBest(executorList, flow);
+    Assert.assertEquals(executorList.get(2), executor);
+  }
+
+  @Test
+  public void  testExecutorInfoJsonParser() throws Exception{
+    ExecutorInfo exeInfo = new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 10);
+    String json = JSONUtils.toJSON(exeInfo);
+    ExecutorInfo exeInfo2 = ExecutorInfo.fromJSONString(json);
+    Assert.assertTrue(exeInfo.equals(exeInfo2));
+  }
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
new file mode 100644
index 0000000..24f6ef7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.server;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.UserManagerException;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for HttpRequestUtils
+ */
+public final class HttpRequestUtilsTest {
+  /* Helper method to get a test flow and add required properties */
+  public static ExecutableFlow createExecutableFlow() throws IOException {
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    flow.getExecutionOptions().getFlowParameters()
+      .put(ExecutionOptions.FLOW_PRIORITY, "1");
+    flow.getExecutionOptions().getFlowParameters()
+      .put(ExecutionOptions.USE_EXECUTOR, "2");
+    return flow;
+  }
+
+  /* Test that flow properties are removed for non-admin user */
+  @Test
+  public void TestFilterNonAdminOnlyFlowParams() throws IOException,
+    ExecutorManagerException, UserManagerException {
+    ExecutableFlow flow = createExecutableFlow();
+    UserManager manager = TestUtils.createTestXmlUserManager();
+    User user = manager.getUser("testUser", "testUser");
+
+    HttpRequestUtils.filterAdminOnlyFlowParams(manager,
+      flow.getExecutionOptions(), user);
+
+    Assert.assertFalse(flow.getExecutionOptions().getFlowParameters()
+      .containsKey(ExecutionOptions.FLOW_PRIORITY));
+    Assert.assertFalse(flow.getExecutionOptions().getFlowParameters()
+      .containsKey(ExecutionOptions.USE_EXECUTOR));
+  }
+
+  /* Test that flow properties are retained for admin user */
+  @Test
+  public void TestFilterAdminOnlyFlowParams() throws IOException,
+    ExecutorManagerException, UserManagerException {
+    ExecutableFlow flow = createExecutableFlow();
+    UserManager manager = TestUtils.createTestXmlUserManager();
+    User user = manager.getUser("testAdmin", "testAdmin");
+
+    HttpRequestUtils.filterAdminOnlyFlowParams(manager,
+      flow.getExecutionOptions(), user);
+
+    Assert.assertTrue(flow.getExecutionOptions().getFlowParameters()
+      .containsKey(ExecutionOptions.FLOW_PRIORITY));
+    Assert.assertTrue(flow.getExecutionOptions().getFlowParameters()
+      .containsKey(ExecutionOptions.USE_EXECUTOR));
+  }
+
+  /* Test exception, if param is a valid integer */
+  @Test
+  public void testvalidIntegerParam() throws ExecutorManagerException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put("param1", "123");
+    HttpRequestUtils.validateIntegerParam(params, "param1");
+  }
+
+  /* Test exception, if param is not a valid integer */
+  @Test(expected = ExecutorManagerException.class)
+  public void testInvalidIntegerParam() throws ExecutorManagerException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put("param1", "1dff2");
+    HttpRequestUtils.validateIntegerParam(params, "param1");
+  }
+
+  /* Verify permission for admin user */
+  @Test
+  public void testHasAdminPermission() throws UserManagerException {
+    UserManager manager = TestUtils.createTestXmlUserManager();
+    User adminUser = manager.getUser("testAdmin", "testAdmin");
+    Assert.assertTrue(HttpRequestUtils.hasPermission(manager, adminUser,
+      Type.ADMIN));
+  }
+
+  /* verify permission for non-admin user */
+  @Test
+  public void testHasOrdinaryPermission() throws UserManagerException {
+    UserManager manager = TestUtils.createTestXmlUserManager();
+    User testUser = manager.getUser("testUser", "testUser");
+    Assert.assertFalse(HttpRequestUtils.hasPermission(manager, testUser,
+      Type.ADMIN));
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
index c01d3f8..55dd5d9 100644
--- a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
@@ -84,9 +84,9 @@ public class CacheTest {
 
   @Test
   public void testTimeToLiveExpiry() {
-    CacheManager.setUpdateFrequency(200);
     CacheManager manager = CacheManager.getInstance();
     Cache cache = manager.createCache();
+    CacheManager.setUpdateFrequency(200);
 
     cache.setUpdateFrequencyMs(200);
     cache.setEjectionPolicy(EjectionPolicy.FIFO);
@@ -122,9 +122,9 @@ public class CacheTest {
 
   @Test
   public void testIdleExpireExpiry() {
-    CacheManager.setUpdateFrequency(250);
     CacheManager manager = CacheManager.getInstance();
     Cache cache = manager.createCache();
+    CacheManager.setUpdateFrequency(250);
 
     cache.setUpdateFrequencyMs(250);
     cache.setEjectionPolicy(EjectionPolicy.FIFO);
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
new file mode 100644
index 0000000..68b10ee
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.XmlUserManager;
+
+/**
+ * Commonly used utils method for unit/integration tests
+ */
+public class TestUtils {
+  /* Base  resource direcotyr for unit tests */
+  private static final String UNIT_RESOURCE_DIR =
+      "../azkaban-test/src/test/resources";
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_EXECUTION_DIR =
+      UNIT_RESOURCE_DIR + "/executions";
+
+  public static File getFlowDir(String projectName, String flow) {
+    return new File(String.format("%s/%s/%s.flow", UNIT_EXECUTION_DIR, projectName,
+      flow));
+  }
+
+  public static User getTestUser() {
+    return new User("testUser");
+  }
+
+  /* Helper method to create an ExecutableFlow from serialized description */
+  public static ExecutableFlow createExecutableFlow(String projectName,
+    String flowName) throws IOException {
+    File jsonFlowFile = getFlowDir(projectName, flowName);
+    @SuppressWarnings("unchecked")
+    HashMap<String, Object> flowObj =
+      (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+    Flow flow = Flow.flowFromObject(flowObj);
+    Project project = new Project(1, "flow");
+    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+    flowMap.put(flow.getId(), flow);
+    project.setFlows(flowMap);
+    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+
+    return execFlow;
+  }
+
+  /* Helper method to create an XmlUserManager from XML_FILE_PARAM file */
+  public static UserManager createTestXmlUserManager() {
+    Props props = new Props();
+    props.put(XmlUserManager.XML_FILE_PARAM, UNIT_RESOURCE_DIR
+      + "/azkaban-users.xml");
+    UserManager manager = new XmlUserManager(props);
+    return manager;
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java
new file mode 100644
index 0000000..4ea0930
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/UtilsTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class for azkaban.utils.Utils
+ */
+public class UtilsTest {
+
+  /* Test negative port case */
+  @Test
+  public void testNegativePort() {
+    Assert.assertFalse(Utils.isValidPort(-1));
+    Assert.assertFalse(Utils.isValidPort(-10));
+  }
+
+  /* Test zero port case */
+  @Test
+  public void testZeroPort() {
+    Assert.assertFalse(Utils.isValidPort(0));
+  }
+
+  /* Test port beyond limit */
+  @Test
+  public void testOverflowPort() {
+    Assert.assertFalse(Utils.isValidPort(70000));
+    Assert.assertFalse(Utils.isValidPort(65536));
+  }
+
+  /* Test happy isValidPort case*/
+  @Test
+  public void testValidPort() {
+    Assert.assertTrue(Utils.isValidPort(1023));
+    Assert.assertTrue(Utils.isValidPort(10000));
+    Assert.assertTrue(Utils.isValidPort(3030));
+    Assert.assertTrue(Utils.isValidPort(1045));
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 6e012ab..47fbfad 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TimeZone;
@@ -131,6 +132,7 @@ public class AzkabanExecutorServer {
     root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
     root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
     root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
+    root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
 
     root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
 
@@ -175,7 +177,7 @@ public class AzkabanExecutorServer {
 
   /**
    * Configure Metric Reporting as per azkaban.properties settings
-   * 
+   *
    * @throws MetricException
    */
   private void configureMetricReports() throws MetricException {
@@ -224,13 +226,13 @@ public class AzkabanExecutorServer {
   /**
    * Load a custom class, which is provided by a configuration
    * CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY.
-   * 
+   *
    * This method will try to instantiate an instance of this custom class and
    * with given properties as the argument in the constructor.
-   * 
+   *
    * Basically the custom class must have a constructor that takes an argument
    * with type Properties.
-   * 
+   *
    * @param props
    */
   private void loadCustomJMXAttributeProcessor(Props props) {
@@ -500,4 +502,18 @@ public class AzkabanExecutorServer {
       return null;
     }
   }
+
+  /**
+   * Returns host:port combination for currently running executor
+   * @return
+   */
+  public String getExecutorHostPort() {
+    String host = "unkownHost";
+    try {
+      host = InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (Exception e) {
+      logger.error("Failed to fetch LocalHostName");
+    }
+    return host + ":" + props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+  }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index da98b99..7eedee4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -271,6 +271,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       this.watcher.setLogger(logger);
     }
 
+    logger.info("Assigned executor : " + AzkabanExecutorServer.getApp().getExecutorHostPort());
     logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
         + projectId + " version:" + version);
     if (pipelineExecId != null) {
@@ -840,7 +841,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Configure Azkaban metrics tracking for a new jobRunner instance
-   * 
+   *
    * @param jobRunner
    */
   private void configureJobLevelMetrics(JobRunner jobRunner) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 78de829..01ab373 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -144,6 +145,9 @@ public class FlowRunnerManager implements EventListener,
 
   private Object executionDirDeletionSync = new Object();
 
+  // date time of the the last flow submitted.
+  private long lastFlowSubmittedDate = 0;
+
   public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
       ProjectLoader projectLoader, ClassLoader parentClassLoader)
       throws IOException {
@@ -256,6 +260,13 @@ public class FlowRunnerManager implements EventListener,
     return allProjects;
   }
 
+  public long getLastFlowSubmittedTime(){
+    // Note: this is not thread safe and may result in providing dirty data.
+    //       we will provide this data as is for now and will revisit if there
+    //       is a string justification for change.
+    return lastFlowSubmittedDate;
+  }
+
   public Props getGlobalProps() {
     return globalProps;
   }
@@ -511,6 +522,8 @@ public class FlowRunnerManager implements EventListener,
       Future<?> future = executorService.submit(runner);
       // keep track of this future
       submittedFlows.put(future, runner.getExecutionId());
+      // update the last submitted time.
+      this.lastFlowSubmittedDate = System.currentTimeMillis();
     } catch (RejectedExecutionException re) {
       throw new ExecutorManagerException(
           "Azkaban server can't execute any more flows. "
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
new file mode 100644
index 0000000..e07eac5
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutorInfo;
+import azkaban.utils.JSONUtils;
+
+
+public class ServerStatisticsServlet extends HttpServlet {
+  private static final long serialVersionUID = 1L;
+  private static final int cacheTimeInMilliseconds = 1000;
+  private static final Logger logger = Logger.getLogger(ServerStatisticsServlet.class);
+  private static final String noCacheParamName = "nocache";
+  private static final boolean exists_Bash = new File("/bin/bash").exists();
+  private static final boolean exists_Cat = new File("/bin/cat").exists();
+  private static final boolean exists_Grep = new File("/bin/grep").exists();
+  private static final boolean exists_Meminfo = new File("/proc/meminfo").exists();
+  private static final boolean exists_LoadAvg = new File("/proc/loadavg").exists();
+
+  protected static long lastRefreshedTime = 0;
+  protected static ExecutorInfo cachedstats = null;
+
+  /**
+   * Handle all get request to Statistics Servlet {@inheritDoc}
+   *
+   * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
+   *      javax.servlet.http.HttpServletResponse)
+   */
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+
+    boolean noCache = null != req && Boolean.valueOf(req.getParameter(noCacheParamName));
+
+    if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds) {
+      this.populateStatistics(noCache);
+    }
+
+    JSONUtils.toJSON(cachedstats, resp.getOutputStream(), true);
+  }
+
+  /**
+   * fill the result set with the percent of the remaining system memory on the server.
+   * @param stats reference to the result container which contains all the results, this specific method
+   *              will only work work on the property "remainingMemory" and "remainingMemoryPercent".
+   *
+   * NOTE:
+   * a double value will be used to present the remaining memory,
+   *         a returning value of '55.6' means 55.6%
+   */
+  protected void fillRemainingMemoryPercent(ExecutorInfo stats) {
+    if (exists_Bash && exists_Cat && exists_Grep && exists_Meminfo) {
+      java.lang.ProcessBuilder processBuilder =
+          new java.lang.ProcessBuilder("/bin/bash", "-c",
+              "/bin/cat /proc/meminfo | grep -E \"^MemTotal:|^MemFree:|^Buffers:|^Cached:|^SwapCached:\"");
+      try {
+        ArrayList<String> output = new ArrayList<String>();
+        Process process = processBuilder.start();
+        process.waitFor();
+        InputStream inputStream = process.getInputStream();
+        try {
+          java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(inputStream));
+          String line = null;
+          while ((line = reader.readLine()) != null) {
+            output.add(line);
+          }
+        } finally {
+          inputStream.close();
+        }
+
+        long totalMemory = 0;
+        long totalFreeMemory = 0;
+        Long parsedResult = (long) 0;
+
+        // process the output from bash call.
+        // we expect the result from the bash call to be something like following -
+        // MemTotal:       65894264 kB
+        // MemFree:        57753844 kB
+        // Buffers:          305552 kB
+        // Cached:          3802432 kB
+        // SwapCached:            0 kB
+        // Note : total free memory = freeMemory + cached + buffers + swapCached
+        // TODO : think about merging the logic in systemMemoryInfo as the logic is similar
+        if (output.size() == 5) {
+          for (String result : output) {
+            // find the total memory and value the variable.
+            parsedResult = extractMemoryInfo("MemTotal", result);
+            if (null != parsedResult) {
+              totalMemory = parsedResult;
+              continue;
+            }
+
+            // find the free memory.
+            parsedResult = extractMemoryInfo("MemFree", result);
+            if (null != parsedResult) {
+              totalFreeMemory += parsedResult;
+              continue;
+            }
+
+            // find the Buffers.
+            parsedResult = extractMemoryInfo("Buffers", result);
+            if (null != parsedResult) {
+              totalFreeMemory += parsedResult;
+              continue;
+            }
+
+            // find the Cached.
+            parsedResult = extractMemoryInfo("SwapCached", result);
+            if (null != parsedResult) {
+              totalFreeMemory += parsedResult;
+              continue;
+            }
+
+            // find the Cached.
+            parsedResult = extractMemoryInfo("Cached", result);
+            if (null != parsedResult) {
+              totalFreeMemory += parsedResult;
+              continue;
+            }
+          }
+        } else {
+          logger.error("failed to get total/free memory info as the bash call returned invalid result."
+              + String.format(" Output from the bash call - %s ", output.toString()));
+        }
+
+        // the number got from the proc file is in KBs we want to see the number in MBs so we are dividing it by 1024.
+        stats.setRemainingMemoryInMB(totalFreeMemory / 1024);
+        stats.setRemainingMemoryPercent(totalMemory == 0 ? 0 : ((double) totalFreeMemory / (double) totalMemory) * 100);
+      } catch (Exception ex) {
+        logger.error("failed fetch system memory info "
+            + "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+      }
+    } else {
+      logger.error("failed fetch system memory info, one or more files from the following list are missing -  "
+          + "'/bin/bash'," + "'/bin/cat'," + "'/proc/loadavg'");
+    }
+  }
+
+  private Long extractMemoryInfo(String field, String result) {
+    Long returnResult = null;
+    if (null != result && null != field && result.matches(String.format("^%s:.*", field))
+        && result.split("\\s+").length > 2) {
+      try {
+        returnResult = Long.parseLong(result.split("\\s+")[1]);
+        logger.debug(field + ":" + returnResult);
+      } catch (NumberFormatException e) {
+        returnResult = 0l;
+        logger.error(String.format("yielding 0 for %s as output is invalid - %s", field, result));
+      }
+    }
+    return returnResult;
+  }
+
+  /**
+   * call the data providers to fill the returning data container for statistics data.
+   * This function refreshes the static cached copy of data in case if necessary.
+   * */
+  protected synchronized void populateStatistics(boolean noCache) {
+    //check again before starting the work.
+    if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds) {
+      final ExecutorInfo stats = new ExecutorInfo();
+
+      fillRemainingMemoryPercent(stats);
+      fillRemainingFlowCapacityAndLastDispatchedTime(stats);
+      fillCpuUsage(stats);
+
+      cachedstats = stats;
+      lastRefreshedTime = System.currentTimeMillis();
+    }
+  }
+
+  /**
+   * fill the result set with the remaining flow capacity .
+   * @param stats reference to the result container which contains all the results, this specific method
+   *              will only work on the property "remainingFlowCapacity".
+   */
+  protected void fillRemainingFlowCapacityAndLastDispatchedTime(ExecutorInfo stats) {
+
+    AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
+    if (server != null) {
+      FlowRunnerManager runnerMgr = AzkabanExecutorServer.getApp().getFlowRunnerManager();
+      int assignedFlows = runnerMgr.getNumRunningFlows() + runnerMgr.getNumQueuedFlows();
+      stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() - assignedFlows);
+      stats.setNumberOfAssignedFlows(assignedFlows);
+      stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
+    } else {
+      logger.error("failed to get data for remaining flow capacity or LastDispatchedTime"
+          + " as the AzkabanExecutorServer has yet been initialized.");
+    }
+  }
+
+  /**<pre>
+   * fill the result set with the CPU usage .
+   * Note : As the 'Top' bash call doesn't yield accurate result for the system load,
+   *        the implementation has been changed to load from the "proc/loadavg" which keeps
+   *        the moving average of the system load, we are pulling the average for the recent 1 min.
+   *</pre>
+   * @param stats reference to the result container which contains all the results, this specific method
+   *              will only work on the property "cpuUsage".
+   */
+  protected void fillCpuUsage(ExecutorInfo stats) {
+    if (exists_Bash && exists_Cat && exists_LoadAvg) {
+      java.lang.ProcessBuilder processBuilder =
+          new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/loadavg");
+      try {
+        ArrayList<String> output = new ArrayList<String>();
+        Process process = processBuilder.start();
+        process.waitFor();
+        InputStream inputStream = process.getInputStream();
+        try {
+          java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(inputStream));
+          String line = null;
+          while ((line = reader.readLine()) != null) {
+            output.add(line);
+          }
+        } finally {
+          inputStream.close();
+        }
+
+        // process the output from bash call.
+        if (output.size() > 0) {
+          String[] splitedresult = output.get(0).split("\\s+");
+          double cpuUsage = 0.0;
+
+          try {
+            cpuUsage = Double.parseDouble(splitedresult[0]);
+          } catch (NumberFormatException e) {
+            logger.error("yielding 0.0 for CPU usage as output is invalid -" + output.get(0));
+          }
+          logger.info("System load : " + cpuUsage);
+          stats.setCpuUpsage(cpuUsage);
+        }
+      } catch (Exception ex) {
+        logger.error("failed fetch system load info "
+            + "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+      }
+    } else {
+      logger.error("failed fetch system load info, one or more files from the following list are missing -  "
+          + "'/bin/bash'," + "'/bin/cat'," + "'/proc/loadavg'");
+    }
+  }
+}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
index e66a586..1d9af6d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatsServlet.java
@@ -72,7 +72,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
 
   /**
    * Handle all get request to Stats Servlet {@inheritDoc}
-   * 
+   *
    * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
    *      javax.servlet.http.HttpServletResponse)
    */
@@ -176,7 +176,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
 
   /**
    * Get metric snapshots for a metric and date specification
-   * 
+   *
    * @throws ServletException
    */
   private void handleGetMetricHistory(HttpServletRequest req,
@@ -252,7 +252,7 @@ public class StatsServlet extends HttpServlet implements ConnectorParams {
 
   /**
    * Update tracking interval for a given metrics
-   * 
+   *
    * @throws ServletException
    */
   private void handleChangeMetricInterval(HttpServletRequest req,
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
new file mode 100644
index 0000000..15078b4
--- /dev/null
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -0,0 +1,81 @@
+package azkaban.execapp;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import azkaban.executor.ExecutorInfo;
+
+@Ignore
+public class StatisticsServletTest {
+  private class MockStatisticsServlet extends ServerStatisticsServlet {
+    /** */
+    private static final long serialVersionUID = 1L;
+
+    public ExecutorInfo getStastics() {
+      return cachedstats;
+    }
+
+    public long getUpdatedTime() {
+      return lastRefreshedTime;
+    }
+
+    public void callPopulateStatistics() {
+      this.populateStatistics(false);
+    }
+
+    public void callFillCpuUsage(ExecutorInfo stats) {
+      this.fillCpuUsage(stats);
+    }
+
+    public void callFillRemainingMemoryPercent(ExecutorInfo stats) {
+      this.fillRemainingMemoryPercent(stats);
+    }
+  }
+  private MockStatisticsServlet statServlet = new MockStatisticsServlet();
+
+  @Test
+  public void testFillMemory() {
+    ExecutorInfo stats = new ExecutorInfo();
+    this.statServlet.callFillRemainingMemoryPercent(stats);
+    // assume any machine that runs this test should
+    // have bash and top available and at least got some remaining memory.
+    Assert.assertTrue(stats.getRemainingMemoryInMB() > 0);
+    Assert.assertTrue(stats.getRemainingMemoryPercent() > 0);
+  }
+
+  @Test
+  public void testFillCpu() {
+    ExecutorInfo stats = new ExecutorInfo();
+    this.statServlet.callFillCpuUsage(stats);
+    Assert.assertTrue(stats.getCpuUsage() > 0);
+  }
+
+  @Test
+  public void testPopulateStatistics() {
+    this.statServlet.callPopulateStatistics();
+    Assert.assertNotNull(this.statServlet.getStastics());
+    Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryInMB() > 0);
+    Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryPercent() > 0);
+    Assert.assertTrue(this.statServlet.getStastics().getCpuUsage() > 0);
+  }
+
+  @Test
+  public void testPopulateStatisticsCache() {
+    this.statServlet.callPopulateStatistics();
+    final long updatedTime = this.statServlet.getUpdatedTime();
+    while (System.currentTimeMillis() - updatedTime < 1000) {
+      this.statServlet.callPopulateStatistics();
+      Assert.assertEquals(updatedTime, this.statServlet.getUpdatedTime());
+    }
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+    }
+
+    // make sure cache expires after timeout.
+    this.statServlet.callPopulateStatistics();
+    Assert.assertNotEquals(updatedTime, this.statServlet.getUpdatedTime());
+  }
+}
diff --git a/azkaban-sql/src/sql/create.executor_events.sql b/azkaban-sql/src/sql/create.executor_events.sql
new file mode 100644
index 0000000..1ff4799
--- /dev/null
+++ b/azkaban-sql/src/sql/create.executor_events.sql
@@ -0,0 +1,9 @@
+CREATE TABLE executor_events (
+  executor_id INT NOT NULL,
+  event_type TINYINT NOT NULL,
+  event_time DATETIME NOT NULL,
+  username VARCHAR(64),
+  message VARCHAR(512)
+);
+
+CREATE INDEX executor_log ON executor_events(executor_id, event_time);
\ No newline at end of file
diff --git a/azkaban-sql/src/sql/create.executors.sql b/azkaban-sql/src/sql/create.executors.sql
new file mode 100644
index 0000000..8e59ce3
--- /dev/null
+++ b/azkaban-sql/src/sql/create.executors.sql
@@ -0,0 +1,10 @@
+CREATE TABLE executors (
+  id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
+  host VARCHAR(64) NOT NULL,
+  port INT NOT NULL,
+  active BOOLEAN DEFAULT true,
+  UNIQUE (host, port),
+  UNIQUE INDEX executor_id (id)
+);
+
+CREATE INDEX executor_connection ON executors(host, port);
diff --git a/azkaban-sql/src/sql/database.properties b/azkaban-sql/src/sql/database.properties
index b68be28..b6802bc 100644
--- a/azkaban-sql/src/sql/database.properties
+++ b/azkaban-sql/src/sql/database.properties
@@ -1 +1 @@
-version=
+version=3.0
diff --git a/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql b/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql
new file mode 100644
index 0000000..dcb4ec5
--- /dev/null
+++ b/azkaban-sql/src/sql/update.active_executing_flows.3.0.sql
@@ -0,0 +1,2 @@
+ALTER TABLE active_executing_flows DROP COLUMN host;
+ALTER TABLE active_executing_flows DROP COLUMN port;
\ No newline at end of file
diff --git a/azkaban-sql/src/sql/update.execution_flows.3.0.sql b/azkaban-sql/src/sql/update.execution_flows.3.0.sql
new file mode 100644
index 0000000..2935810
--- /dev/null
+++ b/azkaban-sql/src/sql/update.execution_flows.3.0.sql
@@ -0,0 +1,2 @@
+ALTER TABLE execution_flows ADD COLUMN executor_id INT DEFAULT NULL;
+CREATE INDEX executor_id ON execution_flows(executor_id);
\ No newline at end of file
diff --git a/azkaban-test/src/test/resources/azkaban-users.xml b/azkaban-test/src/test/resources/azkaban-users.xml
new file mode 100644
index 0000000..55941a7
--- /dev/null
+++ b/azkaban-test/src/test/resources/azkaban-users.xml
@@ -0,0 +1,5 @@
+<azkaban-users>
+	<user username="testAdmin" password="testAdmin" roles="admin" groups="azkaban" />
+	<user username="testUser" password="testUser" />
+	<role name="admin" permissions="ADMIN" />
+</azkaban-users>
diff --git a/azkaban-webserver/.gitignore b/azkaban-webserver/.gitignore
new file mode 100644
index 0000000..5e56e04
--- /dev/null
+++ b/azkaban-webserver/.gitignore
@@ -0,0 +1 @@
+/bin
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 132a9f1..ef588f7 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -29,11 +29,13 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.lang.StringEscapeUtils;
 
+import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Executor;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
@@ -47,8 +49,11 @@ import azkaban.server.HttpRequestUtils;
 import azkaban.server.session.Session;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
+import azkaban.user.Role;
 import azkaban.user.User;
+import azkaban.user.UserManager;
 import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.Pair;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.plugin.PluginRegistry;
 import azkaban.webapp.plugin.ViewerPlugin;
@@ -59,11 +64,13 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   private ExecutorManagerAdapter executorManager;
   private ScheduleManager scheduleManager;
   private ExecutorVelocityHelper velocityHelper;
+  private UserManager userManager;
 
   @Override
   public void init(ServletConfig config) throws ServletException {
     super.init(config);
     AzkabanWebServer server = (AzkabanWebServer) getApplication();
+    userManager = server.getUserManager();
     projectManager = server.getProjectManager();
     executorManager = server.getExecutorManager();
     scheduleManager = server.getScheduleManager();
@@ -129,6 +136,12 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
           ajaxFetchExecutableFlowInfo(req, resp, ret, session.getUser(), exFlow);
         }
       }
+    } else if (ajaxName.equals("reloadExecutors")) {
+      ajaxReloadExecutors(req, resp, ret, session.getUser());
+    } else if (ajaxName.equals("enableQueueProcessor")) {
+      ajaxUpdateQueueProcessor(req, resp, ret, session.getUser(), true);
+    } else if (ajaxName.equals("disableQueueProcessor")) {
+      ajaxUpdateQueueProcessor(req, resp, ret, session.getUser(), false);
     } else if (ajaxName.equals("getRunning")) {
       String projectName = getParam(req, "project");
       String flowName = getParam(req, "flow");
@@ -152,6 +165,63 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
+  /**
+   * <pre>
+   * Enables queueProcessor if @param status is true
+   * disables queueProcessor if @param status is false.
+   * </pre>
+   */
+  private void ajaxUpdateQueueProcessor(HttpServletRequest req,
+    HttpServletResponse resp, HashMap<String, Object> returnMap, User user,
+    boolean enableQueue) {
+    boolean wasSuccess = false;
+    if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
+      try {
+        if (enableQueue) {
+          executorManager.enableQueueProcessorThread();
+        } else {
+          executorManager.disableQueueProcessorThread();
+        }
+        returnMap.put(ConnectorParams.STATUS_PARAM,
+          ConnectorParams.RESPONSE_SUCCESS);
+        wasSuccess = true;
+      } catch (ExecutorManagerException e) {
+        returnMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
+      }
+    } else {
+      returnMap.put(ConnectorParams.RESPONSE_ERROR,
+        "Only Admins are allowed to update queue processor");
+    }
+    if (!wasSuccess) {
+      returnMap.put(ConnectorParams.STATUS_PARAM,
+        ConnectorParams.RESPONSE_ERROR);
+    }
+  }
+
+  /* Reloads executors from DB and azkaban.properties via executorManager */
+  private void ajaxReloadExecutors(HttpServletRequest req,
+    HttpServletResponse resp, HashMap<String, Object> returnMap, User user) {
+    boolean wasSuccess = false;
+    if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
+      try {
+        executorManager.setupExecutors();
+        returnMap.put(ConnectorParams.STATUS_PARAM,
+          ConnectorParams.RESPONSE_SUCCESS);
+        wasSuccess = true;
+      } catch (ExecutorManagerException e) {
+        returnMap.put(ConnectorParams.RESPONSE_ERROR,
+          "Failed to refresh the executors " + e.getMessage());
+      }
+    } else {
+      returnMap.put(ConnectorParams.RESPONSE_ERROR,
+        "Only Admins are allowed to refresh the executors");
+    }
+    if (!wasSuccess) {
+      returnMap.put(ConnectorParams.STATUS_PARAM,
+        ConnectorParams.RESPONSE_ERROR);
+    }
+  }
+
   @Override
   protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
       Session session) throws ServletException, IOException {
@@ -225,7 +295,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
         newPage(req, resp, session,
             "azkaban/webapp/servlet/velocity/executionspage.vm");
 
-    List<ExecutableFlow> runningFlows = executorManager.getRunningFlows();
+    List<Pair<ExecutableFlow, Executor>> runningFlows =
+      executorManager.getActiveFlowsWithExecutor();
     page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
 
     List<ExecutableFlow> finishedFlows =
@@ -809,10 +880,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     options.setMailCreator(flow.getMailCreator());
 
     try {
+      HttpRequestUtils.filterAdminOnlyFlowParams(userManager, options, user);
       String message =
           executorManager.submitExecutableFlow(exflow, user.getUserId());
       ret.put("message", message);
-    } catch (ExecutorManagerException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       ret.put("error",
           "Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage());
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index f688820..c6445c0 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -62,6 +62,7 @@ import azkaban.sla.SlaOption;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.User;
+import azkaban.user.UserManager;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.SplitterOutputStream;
 import azkaban.utils.Utils;
@@ -73,11 +74,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
   private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
   private ProjectManager projectManager;
   private ScheduleManager scheduleManager;
+  private UserManager userManager;
 
   @Override
   public void init(ServletConfig config) throws ServletException {
     super.init(config);
     AzkabanWebServer server = (AzkabanWebServer) getApplication();
+    userManager = server.getUserManager();
     projectManager = server.getProjectManager();
     scheduleManager = server.getScheduleManager();
   }
@@ -656,6 +659,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     ExecutionOptions flowOptions = null;
     try {
       flowOptions = HttpRequestUtils.parseFlowOptions(req);
+      HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
     } catch (Exception e) {
       ret.put("error", e.getMessage());
     }
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
index 6d14839..b2b3487 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/StatsServlet.java
@@ -17,6 +17,7 @@
 package azkaban.webapp.servlet;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -29,7 +30,9 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import azkaban.executor.ConnectorParams;
+import azkaban.executor.Executor;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.server.session.Session;
 import azkaban.user.Permission;
 import azkaban.user.Role;
@@ -67,55 +70,105 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
   private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session)
       throws ServletException, IOException {
     HashMap<String, Object> ret = new HashMap<String, Object>();
+    int executorId = getIntParam(req, ConnectorParams.EXECUTOR_ID_PARAM);
     String actionName = getParam(req, ConnectorParams.ACTION_PARAM);
 
     if (actionName.equals(ConnectorParams.STATS_GET_METRICHISTORY)) {
-      handleGetMetricHistory(req, ret, session.getUser());
+      handleGetMetricHistory(executorId, req, ret, session.getUser());
+    } else if (actionName.equals(ConnectorParams.STATS_GET_ALLMETRICSNAME)) {
+      handleGetAllMetricName(executorId, req, ret);
     } else if (actionName.equals(ConnectorParams.STATS_SET_REPORTINGINTERVAL)) {
-      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
+      handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_REPORTINGINTERVAL, req, ret);
     } else if (actionName.equals(ConnectorParams.STATS_SET_CLEANINGINTERVAL)) {
-      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
+      handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_CLEANINGINTERVAL, req, ret);
     } else if (actionName.equals(ConnectorParams.STATS_SET_MAXREPORTERPOINTS)) {
-      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
+      handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_MAXREPORTERPOINTS, req, ret);
     } else if (actionName.equals(ConnectorParams.STATS_SET_ENABLEMETRICS)) {
-      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
+      handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_ENABLEMETRICS, req, ret);
     } else if (actionName.equals(ConnectorParams.STATS_SET_DISABLEMETRICS)) {
-      handleChangeConfigurationRequest(ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
+      handleChangeConfigurationRequest(executorId, ConnectorParams.STATS_SET_DISABLEMETRICS, req, ret);
     }
 
     writeJSON(resp, ret);
   }
 
   /**
+   * Get all metrics tracked by the given executor
+   *
+   * @param executorId
+   * @param req
+   * @param ret
+   */
+  private void handleGetAllMetricName(int executorId, HttpServletRequest req,
+    HashMap<String, Object> ret) throws IOException {
+    Map<String, Object> result;
+    try {
+      result =
+        execManager.callExecutorStats(executorId,
+          ConnectorParams.STATS_GET_ALLMETRICSNAME,
+          (Pair<String, String>[]) null);
+
+      if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+        ret.put("error", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+      } else {
+        ret.put("metricList", result.get("data"));
+      }
+    } catch (ExecutorManagerException e) {
+      ret.put("error", "Failed to fetch metric names for executor : "
+        + executorId);
+    }
+  }
+
+  /**
    * Generic method to facilitate actionName action using Azkaban exec server
+   * @param executorId
    * @param actionName  Name of the action
+   * @throws ExecutorManagerException
    */
-  private void handleChangeConfigurationRequest(String actionName, HttpServletRequest req, HashMap<String, Object> ret)
+  private void handleChangeConfigurationRequest(int executorId, String actionName, HttpServletRequest req, HashMap<String, Object> ret)
       throws ServletException, IOException {
-    Map<String, Object> result = execManager.callExecutorStats(actionName, getAllParams(req));
-    if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
-      ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
-    } else {
-      ret.put(ConnectorParams.STATUS_PARAM, result.get(ConnectorParams.STATUS_PARAM));
+    try {
+      Map<String, Object> result =
+        execManager
+          .callExecutorStats(executorId, actionName, getAllParams(req));
+      if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+        ret.put(ConnectorParams.RESPONSE_ERROR,
+          result.get(ConnectorParams.RESPONSE_ERROR).toString());
+      } else {
+        ret.put(ConnectorParams.STATUS_PARAM,
+          result.get(ConnectorParams.STATUS_PARAM));
+      }
+    } catch (ExecutorManagerException ex) {
+      ret.put("error", "Failed to change config change");
     }
   }
 
   /**
    * Get metric snapshots for a metric and date specification
+   * @param executorId
    * @throws ServletException
+   * @throws ExecutorManagerException
    */
-  private void handleGetMetricHistory(HttpServletRequest req, HashMap<String, Object> ret, User user)
-      throws IOException, ServletException {
-    Map<String, Object> result =
-        execManager.callExecutorStats(ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
-    if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
-      ret.put(ConnectorParams.RESPONSE_ERROR, result.get(ConnectorParams.RESPONSE_ERROR).toString());
-    } else {
-      ret.put("data", result.get("data"));
+  private void handleGetMetricHistory(int executorId, HttpServletRequest req,
+    HashMap<String, Object> ret, User user) throws IOException,
+    ServletException {
+    try {
+      Map<String, Object> result =
+        execManager.callExecutorStats(executorId,
+          ConnectorParams.STATS_GET_METRICHISTORY, getAllParams(req));
+      if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+        ret.put(ConnectorParams.RESPONSE_ERROR,
+          result.get(ConnectorParams.RESPONSE_ERROR).toString());
+      } else {
+        ret.put("data", result.get("data"));
+      }
+    } catch (ExecutorManagerException ex) {
+      ret.put("error", "Failed to fetch metric history");
     }
   }
 
   /**
+   * @throws ExecutorManagerException
    *
    */
   private void handleStatePageLoad(HttpServletRequest req, HttpServletResponse resp, Session session)
@@ -128,14 +181,20 @@ public class StatsServlet extends LoginAbstractAzkabanServlet {
     }
 
     try {
+      Collection<Executor> executors = execManager.getAllActiveExecutors();
+      page.add("executorList", executors);
+
       Map<String, Object> result =
-          execManager.callExecutorStats(ConnectorParams.STATS_GET_ALLMETRICSNAME, (Pair<String, String>[]) null);
+        execManager.callExecutorStats(executors.iterator().next().getId(),
+          ConnectorParams.STATS_GET_ALLMETRICSNAME,
+          (Pair<String, String>[]) null);
       if (result.containsKey(ConnectorParams.RESPONSE_ERROR)) {
-        page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR).toString());
+        page.add("errorMsg", result.get(ConnectorParams.RESPONSE_ERROR)
+          .toString());
       } else {
         page.add("metricList", result.get("data"));
       }
-    } catch (IOException e) {
+    } catch (Exception e) {
       page.add("errorMsg", "Failed to get a response from Azkaban exec server");
     }
 
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
index 49d1085..9c2a6fb 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -68,6 +68,7 @@
               <tr>
                 <th>#</th>
                 <th class="execid">Execution Id</th>
+                <th >Executor Id</th>
                 <th>Flow</th>
                 <th>Project</th>
                 <th class="user">User</th>
@@ -80,25 +81,33 @@
               </tr>
             </thead>
             <tbody>
-#if ($runningFlows)
+
+#if ( !$null.isNull(${runningFlows}))
   #foreach ($flow in $runningFlows)
               <tr>
                  <td class="tb-name">
                    $velocityCount
                 </td>
                 <td class="tb-name">
-                  <a href="${context}/executor?execid=${flow.executionId}">${flow.executionId}</a>
+                  <a href="${context}/executor?execid=${flow.getFirst().executionId}">${flow.getFirst().executionId}</a>
                 </td>
-                <td><a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})&flow=${flow.flowId}">${flow.flowId}</a></td>
                 <td>
-                  <a href="${context}/manager?project=$vmutils.getProjectName(${flow.projectId})">$vmutils.getProjectName(${flow.projectId})</a>
+                   #if (${flow.getSecond()})
+                       ${flow.getSecond().getId()}											
+                   #else
+                       -
+                   #end
                 </td>
-                <td>${flow.submitUser}</td>
-                <td>${flow.proxyUsers}</td>
-                <td>$utils.formatDate(${flow.startTime})</td>
-                <td>$utils.formatDate(${flow.endTime})</td>
-                <td>$utils.formatDuration(${flow.startTime}, ${flow.endTime})</td>
-                <td><div class="status ${flow.status}">$utils.formatStatus(${flow.status})</div></td>
+                <td><a href="${context}/manager?project=$vmutils.getProjectName(${flow.getFirst().projectId})&flow=${flow.getFirst().flowId}">${flow.getFirst().flowId}</a></td>
+                <td>
+                  <a href="${context}/manager?project=$vmutils.getProjectName(${flow.getFirst().projectId})">$vmutils.getProjectName(${flow.getFirst().projectId})</a>
+                </td>
+                <td>${flow.getFirst().submitUser}</td>
+                <td>${flow.getFirst().proxyUsers}</td>
+                <td>$utils.formatDate(${flow.getFirst().startTime})</td>
+                <td>$utils.formatDate(${flow.getFirst().endTime})</td>
+                <td>$utils.formatDuration(${flow.getFirst().startTime}, ${flow.getFirst().endTime})</td>
+                <td><div class="status ${flow.getFirst().status}">$utils.formatStatus(${flow.getFirst().status})</div></td>
                 <td></td>
               </tr>
   #end
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index 4596ac2..27be342 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -29,6 +29,25 @@
          var currentTime = ${currentTime};
          var timezone = "${timezone}";
 
+         function refreshMetricList() {
+               var requestURL = '/stats';
+               var requestData = {
+                 'action': 'getAllMetricNames',
+                 'executorId':  $('#executorName').val()
+               };
+               var successHandler = function(responseData) {
+                 if(responseData.error != null) {
+                   $('#reportedMetric').html(responseData.error);
+                 } else {
+                    $('#metricName').empty();
+                    for(var index = 0; index < responseData.metricList.length; index++) {
+                      $('#metricName').append($('<option value="' + responseData.metricList[index] +'">' + responseData.metricList[index] + '</option>'));
+                    }
+                  }
+               };
+               $.get(requestURL, requestData, successHandler, 'json');
+         }
+
          function refreshMetricChart() {
                var requestURL = '/stats';
                var requestData = {
@@ -36,7 +55,8 @@
                  'from': new Date($('#datetimebegin').val()).toUTCString(),
                  'to'  : new Date($('#datetimeend').val()).toUTCString(),
                  'metricName': $('#metricName').val(),
-                 'useStats': $("#useStats").is(':checked')
+                 'useStats': $("#useStats").is(':checked'),
+                 'executorId':  $('#executorName').val()
                };
                var successHandler = function(responseData) {
                  if(responseData.error != null) {
@@ -67,6 +87,7 @@
                  $('#datetimebegin').data('DateTimePicker').setEndDate(e.date);
                });
              $('#retrieve').click(refreshMetricChart);
+             $('#executorName').click(refreshMetricList);
          });
 
       </script>
@@ -84,8 +105,16 @@
                <div class="header-title" style="width: 17%;">
                   <h1><a href="${context}/stats">Statistics</a></h1>
                </div>
-               <div class="header-control" style="width: 900px; padding-top: 5px;">
+               <div class="header-control" style="width: 1300px; padding-top: 5px;">
                   <form id="metric-form" method="get">
+                     <label for="executorLabel" >Executor</label>
+                     #if (!$executorList.isEmpty())
+                     <select id="executorName"  name="executorName" style="width:200px">
+                        #foreach ($executor in $executorList)
+                        <option value="${executor.getId()}" style="width:200px">${executor.getHost()}:${executor.getPort()}</option>
+                        #end
+                     </select>
+                     #end
                      <label for="metricLabel" >Metric</label>
                      #if (!$metricList.isEmpty())
                      <select id="metricName"  name="metricName" style="width:200px">
@@ -119,4 +148,4 @@
       <!-- /container-full -->
       #end
    </body>
-   <html>
\ No newline at end of file
+   <html>
diff --git a/azkaban-webserver/src/web/js/azkaban/view/exflow.js b/azkaban-webserver/src/web/js/azkaban/view/exflow.js
index dbb8ae4..026a946 100644
--- a/azkaban-webserver/src/web/js/azkaban/view/exflow.js
+++ b/azkaban-webserver/src/web/js/azkaban/view/exflow.js
@@ -178,8 +178,11 @@ azkaban.FlowTabView = Backbone.View.extend({
 		$("#retrybtn").hide();
 
 		if (data.status == "SUCCEEDED") {
-      $("#executebtn").show();
+                        $("#executebtn").show();
 		}
+                else if (data.status == "PREPARING") {
+                        $("#cancelbtn").show();
+                }
 		else if (data.status == "FAILED") {
 			$("#executebtn").show();
 		}