Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
new file mode 100644
index 0000000..7e6ab80
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowPriorityComparator.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Pair;
+
+public final class ExecutableFlowPriorityComparator implements
+ Comparator<Pair<ExecutionReference, ExecutableFlow>> {
+ private static Logger logger = Logger
+ .getLogger(ExecutableFlowPriorityComparator.class);
+
+ @Override
+ public int compare(Pair<ExecutionReference, ExecutableFlow> pair1,
+ Pair<ExecutionReference, ExecutableFlow> pair2) {
+ ExecutableFlow exflow1 = null, exflow2 = null;
+ if (pair1 != null && pair1.getSecond() != null) {
+ exflow1 = pair1.getSecond();
+ }
+ if (pair2 != null && pair2.getSecond() != null) {
+ exflow2 = pair2.getSecond();
+ }
+ if (exflow1 == null && exflow2 == null)
+ return 0;
+ else if (exflow1 == null)
+ return -1;
+ else if (exflow2 == null)
+ return 1;
+ else {
+ // descending order of priority
+ int diff = getPriority(exflow2) - getPriority(exflow1);
+ if (diff == 0) {
+ // increasing order of execution id, if same priority
+ diff = exflow1.getExecutionId() - exflow2.getExecutionId();
+ }
+ return diff;
+ }
+ }
+
+ /* Helper method to fetch flow priority from flow props */
+ private int getPriority(ExecutableFlow exflow) {
+ ExecutionOptions options = exflow.getExecutionOptions();
+ int priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+ if (options != null
+ && options.getFlowParameters() != null
+ && options.getFlowParameters()
+ .containsKey(ExecutionOptions.FLOW_PRIORITY)) {
+ try {
+ priority =
+ Integer.valueOf(options.getFlowParameters().get(
+ ExecutionOptions.FLOW_PRIORITY));
+ } catch (NumberFormatException ex) {
+ priority = ExecutionOptions.DEFAULT_FLOW_PRIORITY;
+ logger.error("Failed to parse flow priority for exec_id = "
+ + exflow.getExecutionId());
+ }
+ }
+ return priority;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index d4cb262..312be44 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -33,6 +33,8 @@ public class ExecutionOptions {
public static final String CONCURRENT_OPTION_SKIP = "skip";
public static final String CONCURRENT_OPTION_PIPELINE = "pipeline";
public static final String CONCURRENT_OPTION_IGNORE = "ignore";
+ public static final String FLOW_PRIORITY = "flowPriority";
+ public static final int DEFAULT_FLOW_PRIORITY = 5;
private static final String FLOW_PARAMETERS = "flowParameters";
private static final String NOTIFY_ON_FIRST_FAILURE = "notifyOnFirstFailure";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 6ab1bb6..c9633ba 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -24,13 +24,16 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.http.client.HttpClient;
@@ -63,6 +66,8 @@ public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
"azkaban.use.multiple.executors";
+ static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
+ "azkaban.webserver.queue.size";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
final private ExecutorLoader executorLoader;
@@ -77,8 +82,9 @@ public class ExecutorManager extends EventHandler implements
final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
/* web server side queue */
- final private ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
- new ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>>();
+ final private BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
final private Set<Executor> activeExecutors = new HashSet<Executor>();
@@ -89,7 +95,7 @@ public class ExecutorManager extends EventHandler implements
* 24 * 60 * 60 * 1000l;
private long lastCleanerThreadCheckTime = -1;
-
+ final private long webserverQueueCapacity;
private long lastThreadCheckTime = -1;
private String updaterStage = "not started";
@@ -121,6 +127,8 @@ public class ExecutorManager extends EventHandler implements
long executionLogsRetentionMs =
props.getLong("execution.logs.retention.ms",
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+ webserverQueueCapacity =
+ props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000);
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
}
@@ -140,8 +148,7 @@ public class ExecutorManager extends EventHandler implements
* @throws ExecutorManagerException
*/
public void setupExecutors() throws ExecutorManagerException {
- // clear all active executors
- activeExecutors.clear();
+ Set<Executor> newExecutors = new HashSet<Executor>();
// Add local executor, if specified as per properties
if (azkProps.containsKey("executor.port")) {
@@ -155,16 +162,21 @@ public class ExecutorManager extends EventHandler implements
executor.setActive(true);
executorLoader.updateExecutor(executor);
}
- activeExecutors.add(new Executor(executor.getId(), executorHost,
+ newExecutors.add(new Executor(executor.getId(), executorHost,
executorPort, true));
}
if (azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false)) {
- activeExecutors.addAll(executorLoader.fetchActiveExecutors());
+ newExecutors.addAll(executorLoader.fetchActiveExecutors());
}
- if (activeExecutors.isEmpty()) {
+ if (newExecutors.isEmpty()) {
throw new ExecutorManagerException("No active executor found");
+ } else {
+ // clear all active executors, only if we have at least one new active
+ // executors
+ activeExecutors.clear();
+ activeExecutors.addAll(newExecutors);
}
}
@@ -277,9 +289,9 @@ public class ExecutorManager extends EventHandler implements
* any executor
*/
private void loadQueuedFlows() throws ExecutorManagerException {
- queuedFlows.addAll(executorLoader.fetchQueuedFlows());
- for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlows) {
- queuedFlowMap.put(ref.getSecond().getExecutionId(), ref);
+ for (Pair<ExecutionReference, ExecutableFlow> pair : executorLoader
+ .fetchQueuedFlows()) {
+ enqueueFlow(pair.getSecond(), pair.getFirst());
}
}
@@ -640,8 +652,7 @@ public class ExecutorManager extends EventHandler implements
Pair<ExecutionReference, ExecutableFlow> pair =
queuedFlowMap.get(exFlow.getExecutionId());
synchronized (pair) {
- queuedFlows.remove(pair);
- queuedFlowMap.remove(exFlow.getExecutionId());
+ dequeueFlow(exFlow.getExecutionId());
finalizeFlows(exFlow);
}
} else {
@@ -807,74 +818,103 @@ public class ExecutorManager extends EventHandler implements
public String submitExecutableFlow(ExecutableFlow exflow, String userId)
throws ExecutorManagerException {
synchronized (exflow) {
- logger.info("Submitting execution flow " + exflow.getFlowId() + " by "
- + userId);
-
- int projectId = exflow.getProjectId();
String flowId = exflow.getFlowId();
- exflow.setSubmitUser(userId);
- exflow.setSubmitTime(System.currentTimeMillis());
- List<Integer> running = getRunningFlows(projectId, flowId);
-
- ExecutionOptions options = exflow.getExecutionOptions();
- if (options == null) {
- options = new ExecutionOptions();
- }
+ logger.info("Submitting execution flow " + flowId + " by " + userId);
String message = "";
- if (options.getDisabledJobs() != null) {
- applyDisabledJobs(options.getDisabledJobs(), exflow);
- }
+ if (queuedFlows.size() >= webserverQueueCapacity) {
+ message =
+ String
+ .format(
+ "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
+ flowId, exflow.getProjectName());
+ logger.error(message);
+ } else {
+ int projectId = exflow.getProjectId();
+ exflow.setSubmitUser(userId);
+ exflow.setSubmitTime(System.currentTimeMillis());
- if (!running.isEmpty()) {
- if (options.getConcurrentOption().equals(
- ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
- Collections.sort(running);
- Integer runningExecId = running.get(running.size() - 1);
-
- options.setPipelineExecutionId(runningExecId);
- message =
- "Flow " + flowId + " is already running with exec id "
- + runningExecId + ". Pipelining level "
- + options.getPipelineLevel() + ". \n";
- } else if (options.getConcurrentOption().equals(
- ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
- throw new ExecutorManagerException("Flow " + flowId
- + " is already running. Skipping execution.",
- ExecutorManagerException.Reason.SkippedExecution);
- } else {
- // The settings is to run anyways.
- message =
- "Flow " + flowId + " is already running with exec id "
- + StringUtils.join(running, ",")
- + ". Will execute concurrently. \n";
+ List<Integer> running = getRunningFlows(projectId, flowId);
+
+ ExecutionOptions options = exflow.getExecutionOptions();
+ if (options == null) {
+ options = new ExecutionOptions();
}
+ if (options.getDisabledJobs() != null) {
+ applyDisabledJobs(options.getDisabledJobs(), exflow);
+ }
+
+ if (!running.isEmpty()) {
+ if (options.getConcurrentOption().equals(
+ ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
+ Collections.sort(running);
+ Integer runningExecId = running.get(running.size() - 1);
+
+ options.setPipelineExecutionId(runningExecId);
+ message =
+ "Flow " + flowId + " is already running with exec id "
+ + runningExecId + ". Pipelining level "
+ + options.getPipelineLevel() + ". \n";
+ } else if (options.getConcurrentOption().equals(
+ ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
+ throw new ExecutorManagerException("Flow " + flowId
+ + " is already running. Skipping execution.",
+ ExecutorManagerException.Reason.SkippedExecution);
+ } else {
+ // The settings is to run anyways.
+ message =
+ "Flow " + flowId + " is already running with exec id "
+ + StringUtils.join(running, ",")
+ + ". Will execute concurrently. \n";
+ }
+ }
+
+ boolean memoryCheck =
+ !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+ ProjectWhitelist.WhitelistType.MemoryCheck);
+ options.setMemoryCheck(memoryCheck);
+
+ // The exflow id is set by the loader. So it's unavailable until after
+ // this call.
+ executorLoader.uploadExecutableFlow(exflow);
+
+ // We create an active flow reference in the datastore. If the upload
+ // fails, we remove the reference.
+ ExecutionReference reference =
+ new ExecutionReference(exflow.getExecutionId());
+ // Added to db queue
+ executorLoader.addActiveExecutableReference(reference);
+ enqueueFlow(exflow, reference);
+ message +=
+ "Execution submitted successfully with exec id "
+ + exflow.getExecutionId();
}
+ return message;
+ }
+ }
- boolean memoryCheck =
- !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
- ProjectWhitelist.WhitelistType.MemoryCheck);
- options.setMemoryCheck(memoryCheck);
-
- // The exflow id is set by the loader. So it's unavailable until after
- // this call.
- executorLoader.uploadExecutableFlow(exflow);
-
- // We create an active flow reference in the datastore. If the upload
- // fails, we remove the reference.
- ExecutionReference reference =
- new ExecutionReference(exflow.getExecutionId());
- // Added to db queue
- executorLoader.addActiveExecutableReference(reference);
- Pair<ExecutionReference, ExecutableFlow> pair =
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow);
+ /* Helper method to have a single point of deletion in the queued flows */
+ private void dequeueFlow(int executionId) {
+ if (queuedFlowMap.contains(executionId)) {
+ queuedFlows.remove(queuedFlowMap.get(executionId));
+ queuedFlowMap.remove(executionId);
+ }
+ }
+
+ /* Helper method to have a single point of insertion in the queued flows */
+ private void enqueueFlow(ExecutableFlow exflow, ExecutionReference ref)
+ throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ new Pair<ExecutionReference, ExecutableFlow>(ref, exflow);
+ try {
queuedFlowMap.put(exflow.getExecutionId(), pair);
- queuedFlows.add(pair);
- message +=
- "Execution submitted successfully with exec id "
- + exflow.getExecutionId();
- return message;
+ queuedFlows.put(pair);
+ } catch (InterruptedException e) {
+ String errMsg = "Failed to queue flow " + exflow.getExecutionId();
+ logger.error(errMsg, e);
+ finalizeFlows(exflow);
+ throw new ExecutorManagerException(errMsg);
}
}
@@ -1546,9 +1586,8 @@ public class ExecutorManager extends EventHandler implements
*/
private class QueueProcessorThread extends Thread {
private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
+ private static final long ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS = 1000;
private boolean shutdown = false;
- private long lastProcessingTime = -1;
- private long maxContinousSubmission = 10;
private boolean isActive = true;
public QueueProcessorThread() {
@@ -1563,29 +1602,24 @@ public class ExecutorManager extends EventHandler implements
return isActive;
}
- @SuppressWarnings("unused")
public void shutdown() {
shutdown = true;
this.interrupt();
}
public void run() {
+ // Loops till QueueProcessorThread is shutdown
while (!shutdown) {
synchronized (this) {
try {
- long currentTime = System.currentTimeMillis();
- if (isActive
- && currentTime - QUEUE_PROCESSOR_WAIT_IN_MS > lastProcessingTime) {
- // Refresh executor stats to be used by selector
- refreshExecutors();
- // process upto a maximum of maxContinousSubmission from queued
- // flows
- processQueuedFlows(maxContinousSubmission);
- lastProcessingTime = currentTime;
+ // start processing queue if active, other wait for sometime
+ if (isActive) {
+ processQueuedFlows(ACTIVE_EXECUTOR_REFRESH_WINDOW_IN_MS);
}
wait(QUEUE_PROCESSOR_WAIT_IN_MS);
} catch (InterruptedException e) {
- logger.info("Interrupted. Probably to shut down.");
+ logger.info(
+ "QueueProcessorThread Interrupted. Probably to shut down.", e);
}
}
}
@@ -1598,49 +1632,55 @@ public class ExecutorManager extends EventHandler implements
}
}
- private void processQueuedFlows(long maxContinousSubmission) {
- try {
- Pair<ExecutionReference, ExecutableFlow> runningCandidate;
- int submissionNum = 0;
- while (submissionNum < maxContinousSubmission
- && (runningCandidate = queuedFlows.peek()) != null) {
- synchronized (runningCandidate) {
+ private void processQueuedFlows(long activeExecutorsRefreshWindow)
+ throws InterruptedException {
+ long lastProcessingTime = System.currentTimeMillis();
+ Pair<ExecutionReference, ExecutableFlow> runningCandidate;
+ while ((runningCandidate = queuedFlows.take()) != null) {
+ // stop queue processing, if queueProcessor is marked inactive
+ if (!queueProcessor.isActive())
+ return;
- ExecutionReference reference = runningCandidate.getFirst();
- ExecutableFlow exflow = runningCandidate.getSecond();
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - lastProcessingTime > activeExecutorsRefreshWindow) {
+ // Refresh executor stats to be used by selector
+ refreshExecutors();
+ lastProcessingTime = currentTime;
+ }
- // TODO: use dispatcher
- Executor choosenExecutor;
+ ExecutionReference reference = runningCandidate.getFirst();
+ ExecutableFlow exflow = runningCandidate.getSecond();
+ synchronized (exflow) {
+ Executor choosenExecutor;
- synchronized (activeExecutors) {
- choosenExecutor = activeExecutors.iterator().next();
- }
+ // TODO: use dispatcher
+ synchronized (activeExecutors) {
+ choosenExecutor = activeExecutors.iterator().next();
+ }
- if (choosenExecutor != null) {
- queuedFlows.poll();
- queuedFlowMap.remove(exflow.getExecutionId());
+ if (choosenExecutor != null) {
- try {
- reference.setExecutor(choosenExecutor);
- executorLoader.assignExecutor(choosenExecutor.getId(),
- exflow.getExecutionId());
- // TODO: ADD rest call to do an actual dispatch
- callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
- runningFlows
- .put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference,
- exflow));
- } catch (ExecutorManagerException e) {
- logger.error("Failed to process queued flow", e);
- // TODO: allow N errors and re-try
- finalizeFlows(exflow);
- }
+ try {
+ // TODO: ADD rest call to do an actual dispatch
+ callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+
+ executorLoader.assignExecutor(exflow.getExecutionId(),
+ choosenExecutor.getId());
+ reference.setExecutor(choosenExecutor);
+
+ // move from queuedFlows to running flows
+ dequeueFlow(exflow.getExecutionId());
+ runningFlows.put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+ } catch (ExecutorManagerException e) {
+ logger.error("Failed to process queued flow", e);
+ // TODO: allow N errors and re-try
+ finalizeFlows(exflow);
}
+ } else {
+ // TODO: handle scenario where dispatcher didn't assigned any executor
}
- submissionNum++;
}
- } catch (Throwable th) {
- logger.error("Failed to process the queue", th);
}
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
new file mode 100644
index 0000000..09abade
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowPriorityComparatorTest.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.user.User;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/**
+ * Test class for ExecutableFlowPriorityComparator
+ * */
+
+public class ExecutableFlowPriorityComparatorTest {
+ /* Directory with serialized description of test flows */
+ private static final String UNIT_BASE_DIR =
+ "../azkaban-test/src/test/resources/executions/exectest1/";
+
+ private File getFlowDir(String flow) {
+ return new File(UNIT_BASE_DIR + flow + ".flow");
+ }
+
+ /* Helper method to create an ExecutableFlow from serialized description */
+ private ExecutableFlow createExecutableFlow(String flowName, int execId,
+ int priority) throws IOException {
+ File jsonFlowFile = getFlowDir(flowName);
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> flowObj =
+ (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+
+ Flow flow = Flow.flowFromObject(flowObj);
+ Project project = new Project(1, "flow");
+ HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+ flowMap.put(flow.getId(), flow);
+ project.setFlows(flowMap);
+ ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+
+ execFlow.setExecutionId(execId);
+ if (priority > 0) {
+ execFlow.getExecutionOptions().getFlowParameters()
+ .put(ExecutionOptions.FLOW_PRIORITY, String.valueOf(priority));
+ }
+ return execFlow;
+ }
+
+ /* priority queue order when all priorities are explicitly specified */
+ @Test
+ public void testExplicitlySpecifiedPriorities() throws IOException,
+ InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 1, 5);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 6);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", 3, 2);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ }
+
+ /* priority queue order when some priorities are implicitly specified */
+ @Test
+ public void testMixedSpecifiedPriorities() throws IOException,
+ InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 1, 3);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", 3, -2);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ }
+
+ /*
+ * priority queue order when some priorities are equal, execution Id is used
+ * in this case
+ */
+ @Test
+ public void testEqualPriorities() throws IOException, InterruptedException {
+ ExecutableFlow flow1 = createExecutableFlow("exec1", 1, 3);
+ ExecutableFlow flow2 = createExecutableFlow("exec2", 2, 2);
+ ExecutableFlow flow3 = createExecutableFlow("exec3", 3, -2);
+ ExecutableFlow flow4 = createExecutableFlow("exec3", 4, 3);
+ ExecutionReference dummyRef = new ExecutionReference(0);
+
+ BlockingQueue<Pair<ExecutionReference, ExecutableFlow>> queue =
+ new PriorityBlockingQueue<Pair<ExecutionReference, ExecutableFlow>>(10,
+ new ExecutableFlowPriorityComparator());
+
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow4));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow1));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow2));
+ queue.put(new Pair<ExecutionReference, ExecutableFlow>(dummyRef, flow3));
+
+ Assert.assertEquals(flow3, queue.take().getSecond());
+ Assert.assertEquals(flow1, queue.take().getSecond());
+ Assert.assertEquals(flow4, queue.take().getSecond());
+ Assert.assertEquals(flow2, queue.take().getSecond());
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 3db34aa..7fa7f03 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -21,10 +21,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import org.apache.commons.lang.ArrayUtils;
import org.junit.Assert;
import org.junit.Test;
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 3d3f760..f3e8c8f 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -48,7 +48,9 @@ import azkaban.server.HttpRequestUtils;
import azkaban.server.session.Session;
import azkaban.user.Permission;
import azkaban.user.Permission.Type;
+import azkaban.user.Role;
import azkaban.user.User;
+import azkaban.user.UserManager;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.webapp.AzkabanWebServer;
@@ -61,11 +63,13 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
private ExecutorVelocityHelper velocityHelper;
+ private UserManager userManager;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
AzkabanWebServer server = (AzkabanWebServer) getApplication();
+ userManager = server.getUserManager();
projectManager = server.getProjectManager();
executorManager = server.getExecutorManager();
scheduleManager = server.getScheduleManager();
@@ -809,6 +813,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (!options.isSuccessEmailsOverridden()) {
options.setSuccessEmails(flow.getSuccessEmails());
}
+ fixFlowPriorityByPermission(options, user);
options.setMailCreator(flow.getMailCreator());
try {
@@ -824,6 +829,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("execid", exflow.getExecutionId());
}
+ /* Reset flow priority if submitting user is not a Azkaban admin */
+ private void fixFlowPriorityByPermission(ExecutionOptions options, User user) {
+ if (!(options.getFlowParameters().containsKey(
+ ExecutionOptions.FLOW_PRIORITY) && hasPermission(user, Type.ADMIN))) {
+ options.getFlowParameters().put(ExecutionOptions.FLOW_PRIORITY,
+ String.valueOf(ExecutionOptions.DEFAULT_FLOW_PRIORITY));
+ }
+ }
+
public class ExecutorVelocityHelper {
public String getProjectName(int id) {
Project project = projectManager.getProject(id);
@@ -834,4 +848,16 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
return project.getName();
}
}
+
+ /* returns true if user has access of type */
+ protected boolean hasPermission(User user, Permission.Type type) {
+ for (String roleName : user.getRoles()) {
+ Role role = userManager.getRole(roleName);
+ if (role.getPermission().isPermissionSet(type)
+ || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+ return true;
+ }
+ }
+ return false;
+ }
}