azkaban-aplcache

Added flow priority specification for admins; refractored

9/3/2015 12:43:14 AM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
new file mode 100644
index 0000000..7e6ab80
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+public final class ExecutableFlowPriorityComparator implements
+  Comparator<Pair<ExecutionReference, ExecutableFlow>> {
+  private static Logger logger = Logger
+    .getLogger(ExecutableFlowPriorityComparator.class);
+
+  @Override
+  public int compare(Pair<ExecutionReference, ExecutableFlow> pair1,
+    Pair<ExecutionReference, ExecutableFlow> pair2) {
+    ExecutableFlow exflow1 = null, exflow2 = null;
+    if (pair1 != null && pair1.getSecond() != null) {
+      exflow1 = pair1.getSecond();
+    }
+    if (pair2 != null && pair2.getSecond() != null) {
+      exflow2 = pair2.getSecond();
+    }
+    if (exflow1 == null && exflow2 == null)
+      return 0;
+    else if (exflow1 == null)
+      return -1;
+    else if (exflow2 == null)
+      return 1;
+    else {
+      // descending order of priority
+      int diff = getPriority(exflow2) - getPriority(exflow1);
+      if (diff == 0) {
+        // increasing order of execution id, if same priority
+        diff = exflow1.getExecutionId() - exflow2.getExecutionId();
+      }
+      return diff;
+    }
+  }
+
+  /* Helper method to fetch flow priority from flow props */
+  private int getPriority(ExecutableFlow exflow) {
+    ExecutionOptions options = exflow.getExecutionOptions();
+    int priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+    if (options != null
+      && options.getFlowParameters() != null
+      && options.getFlowParameters()
+        .containsKey(ExecutionOptions.FLOW_PRIORITY)) {
+      try {
+        priority =
+          Integer.valueOf(options.getFlowParameters().get(
+            ExecutionOptions.FLOW_PRIORITY));
+      } catch (NumberFormatException ex) {
+        priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+        logger.error("Failed to parse flow priority for exec_id = "
+          + exflow.getExecutionId());
+      }
+    }
+    return priority;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index d4cb262..312be44 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -33,6 +33,8 @@ public class ExecutionOptions {
   public static final String CONCURRENT_OPTION_SKIP = "skip";
   public static final String CONCURRENT_OPTION_PIPELINE = "pipeline";
   public static final String CONCURRENT_OPTION_IGNORE = "ignore";
+  public static final String FLOW_PRIORITY = "flowPriority";
+  public static final int DEFAULT_FLOW_PRIORITY = 5;
 
   private static final String FLOW_PARAMETERS = "flowParameters";
   private static final String NOTIFY_ON_FIRST_FAILURE = "notifyOnFirstFailure";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 6ab1bb6..c9633ba 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -24,13 +24,16 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.http.client.HttpClient;
@@ -63,6 +66,8 @@ public class ExecutorManager extends EventHandler implements
   ExecutorManagerAdapter {
   static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
     "azkaban.use.multiple.executors";
+  static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
+    "azkaban.webserver.queue.size";
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
   final private ExecutorLoader executorLoader;
 
@@ -77,8 +82,9 @@ public class ExecutorManager extends EventHandler implements
   final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
     new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
   /* web server side queue */
-  final private ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
-    new ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>>();
+  final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+    new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+      new ExecutableFlowPriorityComparator());
 
   final private Set<Executor> activeExecutors = new HashSet<Executor>();
 
@@ -89,7 +95,7 @@ public class ExecutorManager extends EventHandler implements
     * 24 * 60 * 60 * 1000l;
 
   private long lastCleanerThreadCheckTime = -1;
-
+  final private long webserverQueueCapacity;
   private long lastThreadCheckTime = -1;
   private String updaterStage = "not started";
 
@@ -121,6 +127,8 @@ public class ExecutorManager extends EventHandler implements
     long executionLogsRetentionMs =
       props.getLong("execution.logs.retention.ms",
         DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+    webserverQueueCapacity =
+      props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000);
     cleanerThread = new CleanerThread(executionLogsRetentionMs);
     cleanerThread.start();
   }
@@ -140,8 +148,7 @@ public class ExecutorManager extends EventHandler implements
    * @throws ExecutorManagerException
    */
   public void setupExecutors() throws ExecutorManagerException {
-    // clear all active executors
-    activeExecutors.clear();
+    Set<Executor> newExecutors = new HashSet<Executor>();
 
     // Add local executor, if specified as per properties
     if (azkProps.containsKey("executor.port")) {
@@ -155,16 +162,21 @@ public class ExecutorManager extends EventHandler implements
         executor.setActive(true);
         executorLoader.updateExecutor(executor);
       }
-      activeExecutors.add(new Executor(executor.getId(), executorHost,
+      newExecutors.add(new Executor(executor.getId(), executorHost,
         executorPort, true));
     }
 
     if (azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false)) {
-      activeExecutors.addAll(executorLoader.fetchActiveExecutors());
+      newExecutors.addAll(executorLoader.fetchActiveExecutors());
     }
 
-    if (activeExecutors.isEmpty()) {
+    if (newExecutors.isEmpty()) {
       throw new ExecutorManagerException("No active executor found");
+    } else {
+      // clear all active executors, only if we have at least one new active
+      // executors
+      activeExecutors.clear();
+      activeExecutors.addAll(newExecutors);
     }
   }
 
@@ -277,9 +289,9 @@ public class ExecutorManager extends EventHandler implements
    * any executor
    */
   private void loadQueuedFlows() throws ExecutorManagerException {
-    queuedFlows.addAll(executorLoader.fetchQueuedFlows());
-    for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlows) {
-      queuedFlowMap.put(ref.getSecond().getExecutionId(), ref);
+    for (Pair<ExecutionReference, ExecutableFlow> pair : executorLoader
+      .fetchQueuedFlows()) {
+      enqueueFlow(pair.getSecond(), pair.getFirst());
     }
   }
 
@@ -640,8 +652,7 @@ public class ExecutorManager extends EventHandler implements
         Pair<ExecutionReference, ExecutableFlow> pair =
           queuedFlowMap.get(exFlow.getExecutionId());
         synchronized (pair) {
-          queuedFlows.remove(pair);
-          queuedFlowMap.remove(exFlow.getExecutionId());
+          dequeueFlow(exFlow.getExecutionId());
           finalizeFlows(exFlow);
         }
       } else {
@@ -807,74 +818,103 @@ public class ExecutorManager extends EventHandler implements
   public String submitExecutableFlow(ExecutableFlow exflow, String userId)
     throws ExecutorManagerException {
     synchronized (exflow) {
-      logger.info("Submitting execution flow " + exflow.getFlowId() + " by "
-        + userId);
-
-      int projectId = exflow.getProjectId();
       String flowId = exflow.getFlowId();
-      exflow.setSubmitUser(userId);
-      exflow.setSubmitTime(System.currentTimeMillis());
 
-      List<Integer> running = getRunningFlows(projectId, flowId);
-
-      ExecutionOptions options = exflow.getExecutionOptions();
-      if (options == null) {
-        options = new ExecutionOptions();
-      }
+      logger.info("Submitting execution flow " + flowId + " by " + userId);
 
       String message = "";
-      if (options.getDisabledJobs() != null) {
-        applyDisabledJobs(options.getDisabledJobs(), exflow);
-      }
+      if (queuedFlows.size() >= webserverQueueCapacity) {
+        message =
+          String
+            .format(
+              "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
+              flowId, exflow.getProjectName());
+        logger.error(message);
+      } else {
+        int projectId = exflow.getProjectId();
+        exflow.setSubmitUser(userId);
+        exflow.setSubmitTime(System.currentTimeMillis());
 
-      if (!running.isEmpty()) {
-        if (options.getConcurrentOption().equals(
-          ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
-          Collections.sort(running);
-          Integer runningExecId = running.get(running.size() - 1);
-
-          options.setPipelineExecutionId(runningExecId);
-          message =
-            "Flow " + flowId + " is already running with exec id "
-              + runningExecId + ". Pipelining level "
-              + options.getPipelineLevel() + ". \n";
-        } else if (options.getConcurrentOption().equals(
-          ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
-          throw new ExecutorManagerException("Flow " + flowId
-            + " is already running. Skipping execution.",
-            ExecutorManagerException.Reason.SkippedExecution);
-        } else {
-          // The settings is to run anyways.
-          message =
-            "Flow " + flowId + " is already running with exec id "
-              + StringUtils.join(running, ",")
-              + ". Will execute concurrently. \n";
+        List<Integer> running = getRunningFlows(projectId, flowId);
+
+        ExecutionOptions options = exflow.getExecutionOptions();
+        if (options == null) {
+          options = new ExecutionOptions();
         }
+        if (options.getDisabledJobs() != null) {
+          applyDisabledJobs(options.getDisabledJobs(), exflow);
+        }
+
+        if (!running.isEmpty()) {
+          if (options.getConcurrentOption().equals(
+            ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
+            Collections.sort(running);
+            Integer runningExecId = running.get(running.size() - 1);
+
+            options.setPipelineExecutionId(runningExecId);
+            message =
+              "Flow " + flowId + " is already running with exec id "
+                + runningExecId + ". Pipelining level "
+                + options.getPipelineLevel() + ". \n";
+          } else if (options.getConcurrentOption().equals(
+            ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+            throw new ExecutorManagerException("Flow " + flowId
+              + " is already running. Skipping execution.",
+              ExecutorManagerException.Reason.SkippedExecution);
+          } else {
+            // The settings is to run anyways.
+            message =
+              "Flow " + flowId + " is already running with exec id "
+                + StringUtils.join(running, ",")
+                + ". Will execute concurrently. \n";
+          }
+        }
+
+        boolean memoryCheck =
+          !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+            ProjectWhitelist.WhitelistType.MemoryCheck);
+        options.setMemoryCheck(memoryCheck);
+
+        // The exflow id is set by the loader. So it's unavailable until after
+        // this call.
+        executorLoader.uploadExecutableFlow(exflow);
+
+        // We create an active flow reference in the datastore. If the upload
+        // fails, we remove the reference.
+        ExecutionReference reference =
+          new ExecutionReference(exflow.getExecutionId());
+        // Added to db queue
+        executorLoader.addActiveExecutableReference(reference);
+        enqueueFlow(exflow, reference);
+        message +=
+          "Execution submitted successfully with exec id "
+            + exflow.getExecutionId();
       }
+      return message;
+    }
+  }
 
-      boolean memoryCheck =
-        !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
-          ProjectWhitelist.WhitelistType.MemoryCheck);
-      options.setMemoryCheck(memoryCheck);
-
-      // The exflow id is set by the loader. So it's unavailable until after
-      // this call.
-      executorLoader.uploadExecutableFlow(exflow);
-
-      // We create an active flow reference in the datastore. If the upload
-      // fails, we remove the reference.
-      ExecutionReference reference =
-        new ExecutionReference(exflow.getExecutionId());
-      // Added to db queue
-      executorLoader.addActiveExecutableReference(reference);
-      Pair<ExecutionReference, ExecutableFlow> pair =
-        new Pair<ExecutionReference, ExecutableFlow>(reference, exflow);
+  /* Helper method to have a single point of deletion in the queued flows */
+  private void dequeueFlow(int executionId) {
+    if (queuedFlowMap.contains(executionId)) {
+      queuedFlows.remove(queuedFlowMap.get(executionId));
+      queuedFlowMap.remove(executionId);
+    }
+  }
+
+  /* Helper method to have a single point of insertion in the queued flows */
+  private void enqueueFlow(ExecutableFlow exflow, ExecutionReference ref)
+    throws ExecutorManagerException {
+    Pair<ExecutionReference, ExecutableFlow> pair =
+      new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
+    try {
       queuedFlowMap.put(exflow.getExecutionId(), pair);
-      queuedFlows.add(pair);
-      message +=
-        "Execution submitted successfully with exec id "
-          + exflow.getExecutionId();
-      return message;
+      queuedFlows.put(pair);
+    } catch (InterruptedException e) {
+      String errMsg = "Failed to queue flow " + exflow.getExecutionId();
+      logger.error(errMsg, e);
+      finalizeFlows(exflow);
+      throw new ExecutorManagerException(errMsg);
     }
   }
 
@@ -1546,9 +1586,8 @@ public class ExecutorManager extends EventHandler implements
    */
   private class QueueProcessorThread extends Thread {
     private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
+    private static final long ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS = 1000;
     private boolean shutdown = false;
-    private long lastProcessingTime = -1;
-    private long maxContinousSubmission = 10;
     private boolean isActive = true;
 
     public QueueProcessorThread() {
@@ -1563,29 +1602,24 @@ public class ExecutorManager extends EventHandler implements
       return isActive;
     }
 
-    @SuppressWarnings("unused")
     public void shutdown() {
       shutdown = true;
       this.interrupt();
     }
 
     public void run() {
+      // Loops till QueueProcessorThread is shutdown
       while (!shutdown) {
         synchronized (this) {
           try {
-            long currentTime = System.currentTimeMillis();
-            if (isActive
-              && currentTime - QUEUE_PROCESSOR_WAIT_IN_MS > lastProcessingTime) {
-              // Refresh executor stats to be used by selector
-              refreshExecutors();
-              // process upto a maximum of maxContinousSubmission from queued
-              // flows
-              processQueuedFlows(maxContinousSubmission);
-              lastProcessingTime = currentTime;
+            // start processing queue if active, other wait for sometime
+            if (isActive) {
+              processQueuedFlows(ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS);
             }
             wait(QUEUE_PROCESSOR_WAIT_IN_MS);
           } catch (InterruptedException e) {
-            logger.info("Interrupted. Probably to shut down.");
+            logger.info(
+              "QueueProcessorThread Interrupted. Probably to shut down.", e);
           }
         }
       }
@@ -1598,49 +1632,55 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
-  private void processQueuedFlows(long maxContinousSubmission) {
-    try {
-      Pair<ExecutionReference, ExecutableFlow> runningCandidate;
-      int submissionNum = 0;
-      while (submissionNum < maxContinousSubmission
-        && (runningCandidate = queuedFlows.peek()) != null) {
-        synchronized (runningCandidate) {
+  private void processQueuedFlows(long activeExecutorsRefreshWindow)
+    throws InterruptedException {
+    long lastProcessingTime = System.currentTimeMillis();
+    Pair<ExecutionReference, ExecutableFlow> runningCandidate;
+    while ((runningCandidate = queuedFlows.take()) != null) {
+      // stop queue processing, if queueProcessor is marked inactive
+      if (!queueProcessor.isActive())
+        return;
 
-          ExecutionReference reference = runningCandidate.getFirst();
-          ExecutableFlow exflow = runningCandidate.getSecond();
+      long currentTime = System.currentTimeMillis();
+      if (currentTime - lastProcessingTime > activeExecutorsRefreshWindow) {
+        // Refresh executor stats to be used by selector
+        refreshExecutors();
+        lastProcessingTime = currentTime;
+      }
 
-          // TODO: use dispatcher
-          Executor choosenExecutor;
+      ExecutionReference reference = runningCandidate.getFirst();
+      ExecutableFlow exflow = runningCandidate.getSecond();
+      synchronized (exflow) {
+        Executor choosenExecutor;
 
-          synchronized (activeExecutors) {
-            choosenExecutor = activeExecutors.iterator().next();
-          }
+        // TODO: use dispatcher
+        synchronized (activeExecutors) {
+          choosenExecutor = activeExecutors.iterator().next();
+        }
 
-          if (choosenExecutor != null) {
-            queuedFlows.poll();
-            queuedFlowMap.remove(exflow.getExecutionId());
+        if (choosenExecutor != null) {
 
-            try {
-              reference.setExecutor(choosenExecutor);
-              executorLoader.assignExecutor(choosenExecutor.getId(),
-                exflow.getExecutionId());
-              // TODO: ADD rest call to do an actual dispatch
-              callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
-              runningFlows
-                .put(exflow.getExecutionId(),
-                  new Pair<ExecutionReference, ExecutableFlow>(reference,
-                    exflow));
-            } catch (ExecutorManagerException e) {
-              logger.error("Failed to process queued flow", e);
-              // TODO: allow N errors and re-try
-              finalizeFlows(exflow);
-            }
+          try {
+            // TODO: ADD rest call to do an actual dispatch
+            callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+
+            executorLoader.assignExecutor(exflow.getExecutionId(),
+              choosenExecutor.getId());
+            reference.setExecutor(choosenExecutor);
+
+            // move from queuedFlows to running flows
+            dequeueFlow(exflow.getExecutionId());
+            runningFlows.put(exflow.getExecutionId(),
+              new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+          } catch (ExecutorManagerException e) {
+            logger.error("Failed to process queued flow", e);
+            // TODO: allow N errors and re-try
+            finalizeFlows(exflow);
           }
+        } else {
+          // TODO: handle scenario where dispatcher didn't assigned any executor
         }
-        submissionNum++;
       }
-    } catch (Throwable th) {
-      logger.error("Failed to process the queue", th);
     }
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
new file mode 100644
index 0000000..09abade
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/**
+ * Test class for ExecutableFlowPriorityComparator
+ * */
+
+public class ExecutableFlowPriorityComparatorTest {
+  /* Directory with serialized description of test flows */
+  private static final String UNIT_BASE_DIR =
+    "../azkaban-test/src/test/resources/executions/exectest1/";
+
+  private File getFlowDir(String flow) {
+    return new File(UNIT_BASE_DIR + flow + ".flow");
+  }
+
+  /* Helper method to create an ExecutableFlow from serialized description */
+  private ExecutableFlow createExecutableFlow(String flowName, int execId,
+    int priority) throws IOException {
+    File jsonFlowFile = getFlowDir(flowName);
+    @SuppressWarnings("unchecked")
+    HashMap<String, Object> flowObj =
+      (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+    Flow flow = Flow.flowFromObject(flowObj);
+    Project project = new Project(1, "flow");
+    HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+    flowMap.put(flow.getId(), flow);
+    project.setFlows(flowMap);
+    ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+
+    execFlow.setExecutionId(execId);
+    if (priority > 0) {
+      execFlow.getExecutionOptions().getFlowParameters()
+        .put(ExecutionOptions.FLOW_PRIORITY, String.valueOf(priority));
+    }
+    return execFlow;
+  }
+
+  /* priority queue order when all priorities are explicitly specified */
+  @Test
+  public void testExplicitlySpecifiedPriorities() throws IOException,
+    InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 1, 5);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 6);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", 3, 2);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow2, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow3, queue.take().getSecond());
+  }
+
+  /* priority queue order when some priorities are implicitly specified */
+  @Test
+  public void testMixedSpecifiedPriorities() throws IOException,
+    InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 1, 3);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", 3, -2);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow3, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow2, queue.take().getSecond());
+  }
+
+  /*
+   * priority queue order when some priorities are equal, execution Id is used
+   * in this case
+   */
+  @Test
+  public void testEqualPriorities() throws IOException, InterruptedException {
+    ExecutableFlow flow1 = createExecutableFlow("exec1", 1, 3);
+    ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2);
+    ExecutableFlow flow3 = createExecutableFlow("exec3", 3, -2);
+    ExecutableFlow flow4 = createExecutableFlow("exec3", 4, 3);
+    ExecutionReference dummyRef = new ExecutionReference(0);
+
+    BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+      new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+        new ExecutableFlowPriorityComparator());
+
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+    queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+    Assert.assertEquals(flow3, queue.take().getSecond());
+    Assert.assertEquals(flow1, queue.take().getSecond());
+    Assert.assertEquals(flow4, queue.take().getSecond());
+    Assert.assertEquals(flow2, queue.take().getSecond());
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 3db34aa..7fa7f03 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -21,10 +21,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 3d3f760..f3e8c8f 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -48,7 +48,9 @@ import azkaban.server.HttpRequestUtils;
 import azkaban.server.session.Session;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
+import azkaban.user.Role;
 import azkaban.user.User;
+import azkaban.user.UserManager;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.webapp.AzkabanWebServer;
@@ -61,11 +63,13 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   private ExecutorManagerAdapter executorManager;
   private ScheduleManager scheduleManager;
   private ExecutorVelocityHelper velocityHelper;
+  private UserManager userManager;
 
   @Override
   public void init(ServletConfig config) throws ServletException {
     super.init(config);
     AzkabanWebServer server = (AzkabanWebServer) getApplication();
+    userManager = server.getUserManager();
     projectManager = server.getProjectManager();
     executorManager = server.getExecutorManager();
     scheduleManager = server.getScheduleManager();
@@ -809,6 +813,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     if (!options.isSuccessEmailsOverridden()) {
       options.setSuccessEmails(flow.getSuccessEmails());
     }
+    fixFlowPriorityByPermission(options, user);
     options.setMailCreator(flow.getMailCreator());
 
     try {
@@ -824,6 +829,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     ret.put("execid", exflow.getExecutionId());
   }
 
+  /* Reset flow priority if submitting user is not a Azkaban admin */
+  private void fixFlowPriorityByPermission(ExecutionOptions options, User user) {
+    if (!(options.getFlowParameters().containsKey(
+      ExecutionOptions.FLOW_PRIORITY) && hasPermission(user, Type.ADMIN))) {
+      options.getFlowParameters().put(ExecutionOptions.FLOW_PRIORITY,
+        String.valueOf(ExecutionOptions.DEFAULT_FLOW_PRIORITY));
+    }
+  }
+
   public class ExecutorVelocityHelper {
     public String getProjectName(int id) {
       Project project = projectManager.getProject(id);
@@ -834,4 +848,16 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       return project.getName();
     }
   }
+
+  /* returns true if user has access of type */
+  protected boolean hasPermission(User user, Permission.Type type) {
+    for (String roleName : user.getRoles()) {
+      Role role = userManager.getRole(roleName);
+      if (role.getPermission().isPermissionSet(type)
+        || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+        return true;
+      }
+    }
+    return false;
+  }
 }