azkaban-aplcache

Merge pull request #487 from logiclord/multipleexecutors Integrating

9/15/2015 3:59:38 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index 312be44..d8b10f1 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -34,6 +34,8 @@ public class ExecutionOptions {
   public static final String CONCURRENT_OPTION_PIPELINE = "pipeline";
   public static final String CONCURRENT_OPTION_IGNORE = "ignore";
   public static final String FLOW_PRIORITY = "flowPriority";
+  /* override dispatcher selection and use executor id specified */
+  public static final String USE_EXECUTOR = "useExecutor";
   public static final int DEFAULT_FLOW_PRIORITY = 5;
 
   private static final String FLOW_PARAMETERS = "flowParameters";
diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index e21ae2a..f0600ab 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -92,9 +92,9 @@ public class Executor implements Comparable<Executor> {
 
   @Override
   public String toString(){
-    return String.format("%s (id: %s)",
-        null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
-        this.id);
+    return String.format("%s:%s (id: %s)",
+      null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
+      this.port, this.id);
   }
 
   public String getHost() {
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index c97ef00..d1120ea 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -20,8 +20,8 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,18 +29,16 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
@@ -48,6 +46,9 @@ import azkaban.alert.Alerter;
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
 import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutorComparator;
+import azkaban.executor.selector.ExecutorFilter;
+import azkaban.executor.selector.ExecutorSelector;
 import azkaban.project.Project;
 import azkaban.project.ProjectWhitelist;
 import azkaban.scheduler.ScheduleStatisticManager;
@@ -63,6 +64,10 @@ import azkaban.utils.Props;
  */
 public class ExecutorManager extends EventHandler implements
     ExecutorManagerAdapter {
+  static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
+      "azkaban.executorselector.filters";
+  static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
+      "azkaban.executorselector.comparator.";
   static final String AZKABAN_QUEUEPROCESSING_ENABLED =
     "azkaban.queueprocessing.enabled";
   static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
@@ -71,8 +76,10 @@ public class ExecutorManager extends EventHandler implements
     "azkaban.webserver.queue.size";
   private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
     "azkaban.activeexecutor.refresh.milisecinterval";
-  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW =
+  private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW =
     "azkaban.activeexecutor.refresh.flowinterval";
+  private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
+      "azkaban.executorinfo.refresh.maxThreads";
 
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
   private ExecutorLoader executorLoader;
@@ -102,15 +109,21 @@ public class ExecutorManager extends EventHandler implements
 
   File cacheDir;
 
-  final Props azkProps;
+  private final Props azkProps;
+  private List<String> filterList;
+  private Map<String, Integer> comparatorWeightsMap;
+  private long lastSuccessfulExecutorInfoRefresh;
+  private ExecutorService executorInforRefresherService;
 
   public ExecutorManager(Props props, ExecutorLoader loader,
       Map<String, Alerter> alters) throws ExecutorManagerException {
     azkProps = props;
-
     this.executorLoader = loader;
     this.setupExecutors();
     this.loadRunningFlows();
+
+    queuedFlows =
+        new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
     this.loadQueuedFlows();
 
     alerters = alters;
@@ -121,41 +134,56 @@ public class ExecutorManager extends EventHandler implements
     executingManager.start();
 
     if(isMultiExecutorMode()) {
-      queueProcessor =
-        new QueueProcessorThread(azkProps.getBoolean(
-          AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
-          AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000),
-          azkProps
-            .getInt(AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_FLOW, 1000));
-      queueProcessor.start();
+      setupMultiExecutorMode();
     }
 
     long executionLogsRetentionMs =
       props.getLong("execution.logs.retention.ms",
         DEFAULT_EXECUTION_LOGS_RETENTION_MS);
 
-    queuedFlows =
-      new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
-
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
     cleanerThread.start();
 
   }
 
+  private void setupMultiExecutorMode() {
+    // initliatize hard filters for executor selector from azkaban.properties
+    String filters = azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
+    if (filters != null) {
+      filterList = Arrays.asList(StringUtils.split(filters, ","));
+    }
+
+    // initliatize comparator feature weights for executor selector from
+    // azkaban.properties
+    Map<String, String> compListStrings =
+      azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
+    if (compListStrings != null) {
+      comparatorWeightsMap = new TreeMap<String, Integer>();
+      for (Map.Entry<String, String> entry : compListStrings.entrySet()) {
+        comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
+      }
+    }
+
+    executorInforRefresherService =
+        Executors.newFixedThreadPool(azkProps.getInt(
+          AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
+
+    // configure queue processor
+    queueProcessor =
+      new QueueProcessorThread(azkProps.getBoolean(
+        AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
+        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
+        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 1000));
+
+    queueProcessor.start();
+  }
+
   /**
-   * <pre>
-   * Setup activeExecutors using azkaban.properties and database executors
-   * Note:
-   * 1. If azkaban.use.multiple.executors is set true, this method will
-   *    load all active executors
-   * 2. In local mode, If a local executor is specified and it is missing from db,
-   *    this method add local executor as active in DB
-   * 3. In local mode, If a local executor is specified and it is marked inactive in db,
-   *    this method will convert local executor as active in DB
-   * </pre>
    *
-   * @throws ExecutorManagerException
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorManagerAdapter#setupExecutors()
    */
+  @Override
   public void setupExecutors() throws ExecutorManagerException {
     Set<Executor> newExecutors = new HashSet<Executor>();
 
@@ -181,8 +209,10 @@ public class ExecutorManager extends EventHandler implements
     }
 
     if (newExecutors.isEmpty()) {
+      logger.error("No active executor found");
       throw new ExecutorManagerException("No active executor found");
     } else if(newExecutors.size() > 1 && !isMultiExecutorMode()) {
+      logger.error("Multiple local executors specified");
       throw new ExecutorManagerException("Multiple local executors specified");
     } else {
       // clear all active executors, only if we have at least one new active
@@ -201,15 +231,56 @@ public class ExecutorManager extends EventHandler implements
    */
   private void refreshExecutors() {
     synchronized (activeExecutors) {
-      // TODO: rest api call to refresh executor stats
+
+      List<Pair<Executor, Future<String>>> futures =
+        new ArrayList<Pair<Executor, Future<String>>>();
+      for (final Executor executor : activeExecutors) {
+        // execute each executorInfo refresh task to fetch
+        Future<String> fetchExecutionInfo =
+          executorInforRefresherService.submit(new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+              return callExecutorForJsonString(executor.getHost(),
+                executor.getPort(), "/serverstastics", null);
+            }
+          });
+        futures.add(new Pair<Executor, Future<String>>(executor,
+          fetchExecutionInfo));
+      }
+
+      boolean wasSuccess = true;
+      for (Pair<Executor, Future<String>> refreshPair : futures) {
+        Executor executor = refreshPair.getFirst();
+        try {
+          // max 5 secs
+          String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
+          executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
+          logger.info("Successfully refreshed ExecutorInfo for executor: "
+            + executor);
+        } catch (TimeoutException e) {
+          wasSuccess = false;
+          logger.error("Timed out while waiting for ExecutorInfo refresh"
+            + executor, e);
+        } catch (Exception e) {
+          wasSuccess = false;
+          logger.error("Failed to update ExecutorInfo for executor : "
+            + executor, e);
+        }
+      }
+
+      // update is successful for all executors
+      if (wasSuccess) {
+        lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
+      }
     }
   }
 
   /**
-   * Disable flow dispatching in QueueProcessor
-   *
-   * @throws ExecutorManagerException
+   * Throws exception if running in local mode
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorManagerAdapter#disableQueueProcessorThread()
    */
+  @Override
   public void disableQueueProcessorThread() throws ExecutorManagerException {
     if (isMultiExecutorMode()) {
       queueProcessor.setActive(false);
@@ -220,10 +291,11 @@ public class ExecutorManager extends EventHandler implements
   }
 
   /**
-   * Enable flow dispatching in QueueProcessor
-   *
-   * @throws ExecutorManagerException
+   * Throws exception if running in local mode
+   * {@inheritDoc}
+   * @see azkaban.executor.ExecutorManagerAdapter#enableQueueProcessorThread()
    */
+  @Override
   public void enableQueueProcessorThread() throws ExecutorManagerException {
     if (isMultiExecutorMode()) {
       queueProcessor.setActive(true);
@@ -253,6 +325,34 @@ public class ExecutorManager extends EventHandler implements
       return false;
   }
 
+  /**
+   * Return last Successful ExecutorInfo Refresh for all active executors
+   *
+   * @return
+   */
+  public long getLastSuccessfulExecutorInfoRefresh() {
+    return this.lastSuccessfulExecutorInfoRefresh;
+  }
+
+  /**
+   * Get currently supported Comparators available to use via azkaban.properties
+   *
+   * @return
+   */
+  public Set<String> getAvailableExecutorComparatorNames() {
+    return ExecutorComparator.getAvailableComparatorNames();
+
+  }
+
+  /**
+   * Get currently supported filters available to use via azkaban.properties
+   *
+   * @return
+   */
+  public Set<String> getAvailableExecutorFilterNames() {
+    return ExecutorFilter.getAvailableFilterNames();
+  }
+
   @Override
   public State getExecutorManagerThreadState() {
     return executingManager.getState();
@@ -334,9 +434,12 @@ public class ExecutorManager extends EventHandler implements
    * any executor
    */
   private void loadQueuedFlows() throws ExecutorManagerException {
-    for (Pair<ExecutionReference, ExecutableFlow> pair : executorLoader
-      .fetchQueuedFlows()) {
-      queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+    List<Pair<ExecutionReference, ExecutableFlow>> retrievedExecutions =
+      executorLoader.fetchQueuedFlows();
+    if (retrievedExecutions != null) {
+      for (Pair<ExecutionReference, ExecutableFlow> pair : retrievedExecutions) {
+        queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
+      }
     }
   }
 
@@ -927,11 +1030,12 @@ public class ExecutorManager extends EventHandler implements
           executorLoader.addActiveExecutableReference(reference);
           queuedFlows.enqueue(exflow, reference);
         } else {
+          Executor executor = activeExecutors.iterator().next();
           // assign only local executor we have
-          reference.setExecutor(activeExecutors.iterator().next());
+          reference.setExecutor(executor);
           executorLoader.addActiveExecutableReference(reference);
           try {
-            callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+            callExecutorServer(exflow, executor, ConnectorParams.EXECUTE_ACTION);
             runningFlows.put(exflow.getExecutionId(),
               new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
           } catch (ExecutorManagerException e) {
@@ -957,11 +1061,11 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-  private Map<String, Object> callExecutorServer(ExecutionReference ref,
-      String action) throws ExecutorManagerException {
+  private Map<String, Object> callExecutorServer(ExecutableFlow exflow,
+    Executor executor, String action) throws ExecutorManagerException {
     try {
-      return callExecutorServer(ref.getHost(), ref.getPort(), action,
-          ref.getExecId(), null, (Pair<String, String>[]) null);
+      return callExecutorServer(executor.getHost(), executor.getPort(), action,
+        exflow.getExecutionId(), null, (Pair<String, String>[]) null);
     } catch (IOException e) {
       throw new ExecutorManagerException(e);
     }
@@ -1002,57 +1106,63 @@ public class ExecutorManager extends EventHandler implements
   private Map<String, Object> callExecutorServer(String host, int port,
       String action, Integer executionId, String user,
       Pair<String, String>... params) throws IOException {
-    URIBuilder builder = new URIBuilder();
-    builder.setScheme("http").setHost(host).setPort(port).setPath("/executor");
-
-    builder.setParameter(ConnectorParams.ACTION_PARAM, action);
-
-    if (executionId != null) {
-      builder.setParameter(ConnectorParams.EXECID_PARAM,
-          String.valueOf(executionId));
-    }
+    List<Pair<String, String>> paramList = new ArrayList<Pair<String,String>>();
 
-    if (user != null) {
-      builder.setParameter(ConnectorParams.USER_PARAM, user);
+    // if params = null
+    if(params != null) {
+      paramList.addAll(Arrays.asList(params));
     }
 
-    if (params != null) {
-      for (Pair<String, String> pair : params) {
-        builder.setParameter(pair.getFirst(), pair.getSecond());
-      }
-    }
+    paramList
+      .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
+    paramList.add(new Pair<String, String>(ConnectorParams.EXECID_PARAM, String
+      .valueOf(executionId)));
+    paramList.add(new Pair<String, String>(ConnectorParams.USER_PARAM, user));
 
-    URI uri = null;
-    try {
-      uri = builder.build();
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
+    Map<String, Object> jsonResponse =
+      callExecutorForJsonObject(host, port, "/executor", paramList);
 
-    ResponseHandler<String> responseHandler = new BasicResponseHandler();
+    return jsonResponse;
+  }
 
-    HttpClient httpclient = new DefaultHttpClient();
-    HttpGet httpget = new HttpGet(uri);
-    String response = null;
-    try {
-      response = httpclient.execute(httpget, responseHandler);
-    } catch (IOException e) {
-      throw e;
-    } finally {
-      httpclient.getConnectionManager().shutdown();
-    }
+  /*
+   * Helper method used by ExecutorManager to call executor and return json
+   * object map
+   */
+  private Map<String, Object> callExecutorForJsonObject(String host, int port,
+    String path, List<Pair<String, String>> paramList) throws IOException {
+    String responseString =
+      callExecutorForJsonString(host, port, path, paramList);
 
     @SuppressWarnings("unchecked")
     Map<String, Object> jsonResponse =
-        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+      (Map<String, Object>) JSONUtils.parseJSONFromString(responseString);
     String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
     if (error != null) {
       throw new IOException(error);
     }
-
     return jsonResponse;
   }
 
+  /*
+   * Helper method used by ExecutorManager to call executor and return raw json
+   * string
+   */
+  private String callExecutorForJsonString(String host, int port, String path,
+    List<Pair<String, String>> paramList) throws IOException {
+    if (paramList == null) {
+      paramList = new ArrayList<Pair<String, String>>();
+    }
+
+    ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
+    @SuppressWarnings("unchecked")
+    URI uri =
+      ExecutorApiClient.buildUri(host, port, path, true,
+        paramList.toArray(new Pair[0]));
+
+    return apiclient.httpGet(uri, null);
+  }
+
   /**
    * Manage servlet call for stats servlet in Azkaban execution server
    * {@inheritDoc}
@@ -1065,90 +1175,38 @@ public class ExecutorManager extends EventHandler implements
   @Override
   public Map<String, Object> callExecutorStats(int executorId, String action,
     Pair<String, String>... params) throws IOException, ExecutorManagerException {
-
-    URIBuilder builder = new URIBuilder();
     Executor executor = fetchExecutor(executorId);
-    builder.setScheme("http").setHost(executor.getHost())
-      .setPort(executor.getPort()).setPath("/stats");
 
-    builder.setParameter(ConnectorParams.ACTION_PARAM, action);
+    List<Pair<String, String>> paramList =
+      new ArrayList<Pair<String, String>>();
 
+    // if params = null
     if (params != null) {
-      for (Pair<String, String> pair : params) {
-        builder.setParameter(pair.getFirst(), pair.getSecond());
-      }
-    }
-
-    URI uri = null;
-    try {
-      uri = builder.build();
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
-
-    ResponseHandler<String> responseHandler = new BasicResponseHandler();
-
-    HttpClient httpclient = new DefaultHttpClient();
-    HttpGet httpget = new HttpGet(uri);
-    String response = null;
-    try {
-      response = httpclient.execute(httpget, responseHandler);
-    } catch (IOException e) {
-      throw e;
-    } finally {
-      httpclient.getConnectionManager().shutdown();
+      paramList.addAll(Arrays.asList(params));
     }
 
-    @SuppressWarnings("unchecked")
-    Map<String, Object> jsonResponse =
-      (Map<String, Object>) JSONUtils.parseJSONFromString(response);
+    paramList
+      .add(new Pair<String, String>(ConnectorParams.ACTION_PARAM, action));
 
-    return jsonResponse;
+    return callExecutorForJsonObject(executor.getHost(), executor.getPort(),
+      "/stats", paramList);
   }
 
 
   @Override
   public Map<String, Object> callExecutorJMX(String hostPort, String action,
       String mBean) throws IOException {
-    URIBuilder builder = new URIBuilder();
+    List<Pair<String, String>> paramList =
+      new ArrayList<Pair<String, String>>();
 
-    String[] hostPortSplit = hostPort.split(":");
-    builder.setScheme("http").setHost(hostPortSplit[0])
-        .setPort(Integer.parseInt(hostPortSplit[1])).setPath("/jmx");
-
-    builder.setParameter(action, "");
-    if (mBean != null) {
-      builder.setParameter(ConnectorParams.JMX_MBEAN, mBean);
-    }
-
-    URI uri = null;
-    try {
-      uri = builder.build();
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
-
-    ResponseHandler<String> responseHandler = new BasicResponseHandler();
-
-    HttpClient httpclient = new DefaultHttpClient();
-    HttpGet httpget = new HttpGet(uri);
-    String response = null;
-    try {
-      response = httpclient.execute(httpget, responseHandler);
-    } catch (IOException e) {
-      throw e;
-    } finally {
-      httpclient.getConnectionManager().shutdown();
+    paramList.add(new Pair<String, String>(action, ""));
+    if(mBean != null) {
+      paramList.add(new Pair<String, String>(ConnectorParams.JMX_MBEAN, mBean));
     }
 
-    @SuppressWarnings("unchecked")
-    Map<String, Object> jsonResponse =
-        (Map<String, Object>) JSONUtils.parseJSONFromString(response);
-    String error = (String) jsonResponse.get(ConnectorParams.RESPONSE_ERROR);
-    if (error != null) {
-      throw new IOException(error);
-    }
-    return jsonResponse;
+    String[] hostPortSplit = hostPort.split(":");
+    return callExecutorForJsonObject(hostPortSplit[0],
+      Integer.valueOf(hostPortSplit[1]), "/jmx", paramList);
   }
 
   @Override
@@ -1748,6 +1806,7 @@ public class ExecutorManager extends EventHandler implements
           currentContinuousFlowProcessed = 0;
         }
 
+        exflow.setUpdateTime(currentTime);
         // process flow with current snapshot of activeExecutors
         processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
         currentContinuousFlowProcessed++;
@@ -1776,12 +1835,57 @@ public class ExecutorManager extends EventHandler implements
       }
     }
 
+    /* Helper method to fetch  overriding Executor, if a valid user has specifed otherwise return null */
+    private Executor getUserSpecifiedExecutor(ExecutionOptions options,
+      int executionId) {
+      Executor executor = null;
+      if (options != null
+        && options.getFlowParameters() != null
+        && options.getFlowParameters().containsKey(
+          ExecutionOptions.USE_EXECUTOR)) {
+        try {
+          int executorId =
+            Integer.valueOf(options.getFlowParameters().get(
+              ExecutionOptions.USE_EXECUTOR));
+          executor = fetchExecutor(executorId);
+
+          if (executor == null) {
+            logger
+              .warn(String
+                .format(
+                  "User specified executor id: %d for execution id: %d is not active, Looking up db.",
+                  executorId, executionId));
+            executor = executorLoader.fetchExecutor(executorId);
+            if (executor == null) {
+              logger
+                .warn(String
+                  .format(
+                    "User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors",
+                    executorId, executionId));
+            }
+          }
+        } catch (ExecutorManagerException ex) {
+          logger.error("Failed to fetch user specified executor for exec_id = "
+            + executionId, ex);
+        }
+      }
+      return executor;
+    }
+
     /* Choose Executor for exflow among the available executors */
     private Executor selectExecutor(ExecutableFlow exflow,
       Set<Executor> availableExecutors) {
-      Executor choosenExecutor;
-      // TODO: use dispatcher
-      choosenExecutor = availableExecutors.iterator().next();
+      Executor choosenExecutor =
+        getUserSpecifiedExecutor(exflow.getExecutionOptions(),
+          exflow.getExecutionId());
+
+      // If no executor was specified by admin
+      if (choosenExecutor == null) {
+        logger.info("Using dispatcher for execution id :"
+          + exflow.getExecutionId());
+        ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap);
+        choosenExecutor = selector.getBest(activeExecutors, exflow);
+      }
       return choosenExecutor;
     }
 
@@ -1794,7 +1898,6 @@ public class ExecutorManager extends EventHandler implements
             "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
             exflow.getExecutionId(), reference.getNumErrors()));
       reference.setNumErrors(reference.getNumErrors() + 1);
-      reference.setExecutor(null);
       if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
         || remainingExecutors.size() <= 1) {
         logger.error("Failed to process queued flow");
@@ -1812,27 +1915,20 @@ public class ExecutorManager extends EventHandler implements
       .info(String
         .format(
           "Reached handleNoExecutorSelectedCase stage for exec %d with error count %d",
-          exflow.getExecutionId(), reference.getNumErrors()));
-      reference.setNumErrors(reference.getNumErrors() + 1);
-      // Scenario: when dispatcher didn't assigned any executor
-      if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED) {
-        finalizeFlows(exflow);
-      } else {
-        // again queue this flow
-        queuedFlows.enqueue(exflow, reference);
-      }
+            exflow.getExecutionId(), reference.getNumErrors()));
+      // TODO: handle scenario where a high priority flow failing to get
+      // schedule can starve all others
+      queuedFlows.enqueue(exflow, reference);
     }
 
     private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
       Executor choosenExecutor) throws ExecutorManagerException {
       exflow.setUpdateTime(System.currentTimeMillis());
-
-      // to be moved after db update once we integrate rest api changes
+      callExecutorServer(exflow, choosenExecutor,
+        ConnectorParams.EXECUTE_ACTION);
+      executorLoader.assignExecutor(choosenExecutor.getId(),
+        exflow.getExecutionId());
       reference.setExecutor(choosenExecutor);
-      // TODO: ADD rest call to do an actual dispatch
-      callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
-      executorLoader.assignExecutor(exflow.getExecutionId(),
-        choosenExecutor.getId());
 
       // move from flow to running flows
       runningFlows.put(exflow.getExecutionId(),
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index 2b47293..c50b0bc 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -231,4 +231,33 @@ public interface ExecutorManagerAdapter {
    */
   public Executor fetchExecutor(int executorId) throws ExecutorManagerException;
 
+  /**
+   * <pre>
+   * Setup activeExecutors using azkaban.properties and database executors
+   * Note:
+   * 1. If azkaban.use.multiple.executors is set true, this method will
+   *    load all active executors
+   * 2. In local mode, If a local executor is specified and it is missing from db,
+   *    this method add local executor as active in DB
+   * 3. In local mode, If a local executor is specified and it is marked inactive in db,
+   *    this method will convert local executor as active in DB
+   * </pre>
+   *
+   * @throws ExecutorManagerException
+   */
+   public void setupExecutors() throws ExecutorManagerException;
+
+   /**
+    * Enable flow dispatching in QueueProcessor
+    *
+    * @throws ExecutorManagerException
+    */
+   public void enableQueueProcessorThread() throws ExecutorManagerException;
+
+   /**
+    * Disable flow dispatching in QueueProcessor
+    *
+    * @throws ExecutorManagerException
+    */
+   public void disableQueueProcessorThread() throws ExecutorManagerException;
 }
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
index bbd642a..d459ae9 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManager.java
@@ -78,4 +78,19 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
     return manager.getQueueProcessorThreadState().toString();
   }
 
+  @Override
+  public List<String> getAvailableExecutorComparatorNames() {
+    return new ArrayList<String>(manager.getAvailableExecutorComparatorNames());
+  }
+
+  @Override
+  public List<String> getAvailableExecutorFilterNames() {
+    return new ArrayList<String>(manager.getAvailableExecutorFilterNames());
+  }
+
+  @Override
+  public long getLastSuccessfulExecutorInfoRefresh() {
+    return manager.getLastSuccessfulExecutorInfoRefresh();
+  }
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
index 94012e0..69e401c 100644
--- a/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
+++ b/azkaban-common/src/main/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -49,4 +49,13 @@ public interface JmxExecutorManagerMBean {
   @DisplayName("OPERATION: getQueueProcessorThreadState")
   public String getQueueProcessorThreadState();
 
+  @DisplayName("OPERATION: getAvailableExecutorComparatorNames")
+  List<String> getAvailableExecutorComparatorNames();
+
+  @DisplayName("OPERATION: getAvailableExecutorFilterNames")
+  List<String> getAvailableExecutorFilterNames();
+
+  @DisplayName("OPERATION: getLastSuccessfulExecutorInfoRefresh")
+  long getLastSuccessfulExecutorInfoRefresh();
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
index 80eff74..fc159ec 100644
--- a/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
+++ b/azkaban-common/src/main/java/azkaban/server/HttpRequestUtils.java
@@ -25,9 +25,17 @@ import java.util.Map;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.commons.lang.StringUtils;
+
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.Role;
+import azkaban.user.User;
+import azkaban.user.UserManager;
 import azkaban.utils.JSONUtils;
 
 public class HttpRequestUtils {
@@ -113,6 +121,68 @@ public class HttpRequestUtils {
   }
 
   /**
+   * <pre>
+   * Remove following flow param if submitting user is not an Azkaban admin
+   * FLOW_PRIORITY
+   * USE_EXECUTOR
+   * @param userManager
+   * @param options
+   * @param user
+   * </pre>
+   */
+  public static void filterAdminOnlyFlowParams(UserManager userManager,
+    ExecutionOptions options, User user)  throws ExecutorManagerException {
+    if (options == null || options.getFlowParameters() == null)
+      return;
+
+    Map<String, String> params = options.getFlowParameters();
+    // is azkaban Admin
+    if (!hasPermission(userManager, user, Type.ADMIN)) {
+      params.remove(ExecutionOptions.FLOW_PRIORITY);
+      params.remove(ExecutionOptions.USE_EXECUTOR);
+    } else {
+      validateIntegerParam(params, ExecutionOptions.FLOW_PRIORITY);
+      validateIntegerParam(params, ExecutionOptions.USE_EXECUTOR);
+    }
+  }
+
+  /**
+   * parse a string as number and throws exception if parsed value is not a
+   * valid integer
+   * @param params
+   * @param paramName
+   * @throws ExecutorManagerException if paramName is not a valid integer
+   */
+  public static boolean validateIntegerParam(Map<String, String> params,
+    String paramName) throws ExecutorManagerException {
+    if (params != null && params.containsKey(paramName)
+      && !StringUtils.isNumeric(params.get(paramName))) {
+      throw new ExecutorManagerException(paramName + " should be an integer");
+    }
+    return true;
+  }
+
+  /**
+   * returns true if user has access of type
+   *
+   * @param userManager
+   * @param user
+   * @param type
+   * @return
+   */
+  public static boolean hasPermission(UserManager userManager, User user,
+    Permission.Type type) {
+    for (String roleName : user.getRoles()) {
+      Role role = userManager.getRole(roleName);
+      if (role.getPermission().isPermissionSet(type)
+        || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Checks for the existance of the parameter in the request
    *
    * @param request
diff --git a/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
new file mode 100644
index 0000000..24f6ef7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/server/HttpRequestUtilsTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.server;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.UserManagerException;
+import azkaban.utils.TestUtils;
+
+/**
+ * Test class for HttpRequestUtils
+ */
+public final class HttpRequestUtilsTest {
+  /* Helper method to get a test flow and add required properties */
+  public static ExecutableFlow createExecutableFlow() throws IOException {
+    ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    flow.getExecutionOptions().getFlowParameters()
+      .put(ExecutionOptions.FLOW_PRIORITY, "1");
+    flow.getExecutionOptions().getFlowParameters()
+      .put(ExecutionOptions.USE_EXECUTOR, "2");
+    return flow;
+  }
+
+  /* Test that flow properties are removed for non-admin user */
+  @Test
+  public void TestFilterNonAdminOnlyFlowParams() throws IOException,
+    ExecutorManagerException, UserManagerException {
+    ExecutableFlow flow = createExecutableFlow();
+    UserManager manager = TestUtils.createTestXmlUserManager();
+    User user = manager.getUser("testUser", "testUser");
+
+    HttpRequestUtils.filterAdminOnlyFlowParams(manager,
+      flow.getExecutionOptions(), user);
+
+    Assert.assertFalse(flow.getExecutionOptions().getFlowParameters()
+      .containsKey(ExecutionOptions.FLOW_PRIORITY));
+    Assert.assertFalse(flow.getExecutionOptions().getFlowParameters()
+      .containsKey(ExecutionOptions.USE_EXECUTOR));
+  }
+
+  /* Test that flow properties are retained for admin user */
+  @Test
+  public void TestFilterAdminOnlyFlowParams() throws IOException,
+    ExecutorManagerException, UserManagerException {
+    ExecutableFlow flow = createExecutableFlow();
+    UserManager manager = TestUtils.createTestXmlUserManager();
+    User user = manager.getUser("testAdmin", "testAdmin");
+
+    HttpRequestUtils.filterAdminOnlyFlowParams(manager,
+      flow.getExecutionOptions(), user);
+
+    Assert.assertTrue(flow.getExecutionOptions().getFlowParameters()
+      .containsKey(ExecutionOptions.FLOW_PRIORITY));
+    Assert.assertTrue(flow.getExecutionOptions().getFlowParameters()
+      .containsKey(ExecutionOptions.USE_EXECUTOR));
+  }
+
+  /* Test exception, if param is a valid integer */
+  @Test
+  public void testvalidIntegerParam() throws ExecutorManagerException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put("param1", "123");
+    HttpRequestUtils.validateIntegerParam(params, "param1");
+  }
+
+  /* Test exception, if param is not a valid integer */
+  @Test(expected = ExecutorManagerException.class)
+  public void testInvalidIntegerParam() throws ExecutorManagerException {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put("param1", "1dff2");
+    HttpRequestUtils.validateIntegerParam(params, "param1");
+  }
+
+  /* Verify permission for admin user */
+  @Test
+  public void testHasAdminPermission() throws UserManagerException {
+    UserManager manager = TestUtils.createTestXmlUserManager();
+    User adminUser = manager.getUser("testAdmin", "testAdmin");
+    Assert.assertTrue(HttpRequestUtils.hasPermission(manager, adminUser,
+      Type.ADMIN));
+  }
+
+  /* verify permission for non-admin user */
+  @Test
+  public void testHasOrdinaryPermission() throws UserManagerException {
+    UserManager manager = TestUtils.createTestXmlUserManager();
+    User testUser = manager.getUser("testUser", "testUser");
+    Assert.assertFalse(HttpRequestUtils.hasPermission(manager, testUser,
+      Type.ADMIN));
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
index e51b575..68b10ee 100644
--- a/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
+++ b/azkaban-common/src/test/java/azkaban/utils/TestUtils.java
@@ -24,17 +24,22 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.XmlUserManager;
 
 /**
  * Commonly used utils method for unit/integration tests
  */
 public class TestUtils {
+  /* Base  resource direcotyr for unit tests */
+  private static final String UNIT_RESOURCE_DIR =
+      "../azkaban-test/src/test/resources";
   /* Directory with serialized description of test flows */
-  private static final String UNIT_BASE_DIR =
-    "../azkaban-test/src/test/resources/executions";
+  private static final String UNIT_EXECUTION_DIR =
+      UNIT_RESOURCE_DIR + "/executions";
 
   public static File getFlowDir(String projectName, String flow) {
-    return new File(String.format("%s/%s/%s.flow", UNIT_BASE_DIR, projectName,
+    return new File(String.format("%s/%s/%s.flow", UNIT_EXECUTION_DIR, projectName,
       flow));
   }
 
@@ -43,8 +48,8 @@ public class TestUtils {
   }
 
   /* Helper method to create an ExecutableFlow from serialized description */
-  public static ExecutableFlow createExecutableFlow(String projectName, String flowName)
-    throws IOException {
+  public static ExecutableFlow createExecutableFlow(String projectName,
+    String flowName) throws IOException {
     File jsonFlowFile = getFlowDir(projectName, flowName);
     @SuppressWarnings("unchecked")
     HashMap<String, Object> flowObj =
@@ -59,4 +64,13 @@ public class TestUtils {
 
     return execFlow;
   }
+
+  /* Helper method to create an XmlUserManager from XML_FILE_PARAM file */
+  public static UserManager createTestXmlUserManager() {
+    Props props = new Props();
+    props.put(XmlUserManager.XML_FILE_PARAM, UNIT_RESOURCE_DIR
+      + "/azkaban-users.xml");
+    UserManager manager = new XmlUserManager(props);
+    return manager;
+  }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 235989d..8f5df1a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TimeZone;
@@ -501,4 +502,18 @@ public class AzkabanExecutorServer {
       return null;
     }
   }
+
+  /**
+   * Returns host:port combination for currently running executor
+   * @return
+   */
+  public String getExecutorHostPort() {
+    String host = "unkownHost";
+    try {
+      host = InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (Exception e) {
+      logger.error("Failed to fetch LocalHostName");
+    }
+    return host + ":" + props.getInt("executor.port", DEFAULT_PORT_NUMBER);
+  }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index da98b99..7eedee4 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -271,6 +271,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       this.watcher.setLogger(logger);
     }
 
+    logger.info("Assigned executor : " + AzkabanExecutorServer.getApp().getExecutorHostPort());
     logger.info("Running execid:" + execId + " flow:" + flowId + " project:"
         + projectId + " version:" + version);
     if (pipelineExecId != null) {
@@ -840,7 +841,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Configure Azkaban metrics tracking for a new jobRunner instance
-   * 
+   *
    * @param jobRunner
    */
   private void configureJobLevelMetrics(JobRunner jobRunner) {
diff --git a/azkaban-test/src/test/resources/azkaban-users.xml b/azkaban-test/src/test/resources/azkaban-users.xml
new file mode 100644
index 0000000..55941a7
--- /dev/null
+++ b/azkaban-test/src/test/resources/azkaban-users.xml
@@ -0,0 +1,5 @@
+<azkaban-users>
+	<user username="testAdmin" password="testAdmin" roles="admin" groups="azkaban" />
+	<user username="testUser" password="testUser" />
+	<role name="admin" permissions="ADMIN" />
+</azkaban-users>
diff --git a/azkaban-webserver/.gitignore b/azkaban-webserver/.gitignore
new file mode 100644
index 0000000..5e56e04
--- /dev/null
+++ b/azkaban-webserver/.gitignore
@@ -0,0 +1 @@
+/bin
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index f3e8c8f..4970fce 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.lang.StringEscapeUtils;
 
+import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
@@ -135,6 +136,12 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
           ajaxFetchExecutableFlowInfo(req, resp, ret, session.getUser(), exFlow);
         }
       }
+    } else if (ajaxName.equals("reloadExecutors")) {
+      ajaxReloadExecutors(req, resp, ret, session.getUser());
+    } else if (ajaxName.equals("enableQueueProcessor")) {
+      ajaxUpdateQueueProcessor(req, resp, ret, session.getUser(), true);
+    } else if (ajaxName.equals("disableQueueProcessor")) {
+      ajaxUpdateQueueProcessor(req, resp, ret, session.getUser(), false);
     } else if (ajaxName.equals("getRunning")) {
       String projectName = getParam(req, "project");
       String flowName = getParam(req, "flow");
@@ -158,6 +165,63 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
+  /**
+   * <pre>
+   * Enables queueProcessor if @param status is true
+   * disables queueProcessor if @param status is false.
+   * </pre>
+   */
+  private void ajaxUpdateQueueProcessor(HttpServletRequest req,
+    HttpServletResponse resp, HashMap<String, Object> returnMap, User user,
+    boolean enableQueue) {
+    boolean wasSuccess = false;
+    if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
+      try {
+        if (enableQueue) {
+          executorManager.enableQueueProcessorThread();
+        } else {
+          executorManager.disableQueueProcessorThread();
+        }
+        returnMap.put(ConnectorParams.STATUS_PARAM,
+          ConnectorParams.RESPONSE_SUCCESS);
+        wasSuccess = true;
+      } catch (ExecutorManagerException e) {
+        returnMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
+      }
+    } else {
+      returnMap.put(ConnectorParams.RESPONSE_ERROR,
+        "Only Admins are allowed to update queue processor");
+    }
+    if (!wasSuccess) {
+      returnMap.put(ConnectorParams.STATUS_PARAM,
+        ConnectorParams.RESPONSE_ERROR);
+    }
+  }
+
+  /* Reloads executors from DB and azkaban.properties via executorManager */
+  private void ajaxReloadExecutors(HttpServletRequest req,
+    HttpServletResponse resp, HashMap<String, Object> returnMap, User user) {
+    boolean wasSuccess = false;
+    if (HttpRequestUtils.hasPermission(userManager, user, Type.ADMIN)) {
+      try {
+        executorManager.setupExecutors();
+        returnMap.put(ConnectorParams.STATUS_PARAM,
+          ConnectorParams.RESPONSE_SUCCESS);
+        wasSuccess = true;
+      } catch (ExecutorManagerException e) {
+        returnMap.put(ConnectorParams.RESPONSE_ERROR,
+          "Failed to refresh the executors " + e.getMessage());
+      }
+    } else {
+      returnMap.put(ConnectorParams.RESPONSE_ERROR,
+        "Only Admins are allowed to refresh the executors");
+    }
+    if (!wasSuccess) {
+      returnMap.put(ConnectorParams.STATUS_PARAM,
+        ConnectorParams.RESPONSE_ERROR);
+    }
+  }
+
   @Override
   protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
       Session session) throws ServletException, IOException {
@@ -813,14 +877,14 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     if (!options.isSuccessEmailsOverridden()) {
       options.setSuccessEmails(flow.getSuccessEmails());
     }
-    fixFlowPriorityByPermission(options, user);
     options.setMailCreator(flow.getMailCreator());
 
     try {
+      HttpRequestUtils.filterAdminOnlyFlowParams(userManager, options, user);
       String message =
           executorManager.submitExecutableFlow(exflow, user.getUserId());
       ret.put("message", message);
-    } catch (ExecutorManagerException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       ret.put("error",
           "Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage());
@@ -829,15 +893,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     ret.put("execid", exflow.getExecutionId());
   }
 
-  /* Reset flow priority if submitting user is not a Azkaban admin */
-  private void fixFlowPriorityByPermission(ExecutionOptions options, User user) {
-    if (!(options.getFlowParameters().containsKey(
-      ExecutionOptions.FLOW_PRIORITY) && hasPermission(user, Type.ADMIN))) {
-      options.getFlowParameters().put(ExecutionOptions.FLOW_PRIORITY,
-        String.valueOf(ExecutionOptions.DEFAULT_FLOW_PRIORITY));
-    }
-  }
-
   public class ExecutorVelocityHelper {
     public String getProjectName(int id) {
       Project project = projectManager.getProject(id);
@@ -848,16 +903,4 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       return project.getName();
     }
   }
-
-  /* returns true if user has access of type */
-  protected boolean hasPermission(User user, Permission.Type type) {
-    for (String roleName : user.getRoles()) {
-      Role role = userManager.getRole(roleName);
-      if (role.getPermission().isPermissionSet(type)
-        || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
-        return true;
-      }
-    }
-    return false;
-  }
 }
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index f688820..c6445c0 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -62,6 +62,7 @@ import azkaban.sla.SlaOption;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.User;
+import azkaban.user.UserManager;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.SplitterOutputStream;
 import azkaban.utils.Utils;
@@ -73,11 +74,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
   private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
   private ProjectManager projectManager;
   private ScheduleManager scheduleManager;
+  private UserManager userManager;
 
   @Override
   public void init(ServletConfig config) throws ServletException {
     super.init(config);
     AzkabanWebServer server = (AzkabanWebServer) getApplication();
+    userManager = server.getUserManager();
     projectManager = server.getProjectManager();
     scheduleManager = server.getScheduleManager();
   }
@@ -656,6 +659,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     ExecutionOptions flowOptions = null;
     try {
       flowOptions = HttpRequestUtils.parseFlowOptions(req);
+      HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
     } catch (Exception e) {
       ret.put("error", e.getMessage());
     }
diff --git a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
index d77bc52..27be342 100644
--- a/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
+++ b/azkaban-webserver/src/main/resources/azkaban/webapp/servlet/velocity/statsPage.vm
@@ -41,7 +41,7 @@
                  } else {
                     $('#metricName').empty();
                     for(var index = 0; index < responseData.metricList.length; index++) {
-                      $('#metricName').append($('<option value="1">' + responseData.metricList[index] + '</option>'));
+                      $('#metricName').append($('<option value="' + responseData.metricList[index] +'">' + responseData.metricList[index] + '</option>'));
                     }
                   }
                };