diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3740227..29d99ec 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -22,6 +22,7 @@ import java.lang.Thread.State;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -61,24 +62,26 @@ import azkaban.utils.Props;
public class ExecutorManager extends EventHandler implements
ExecutorManagerAdapter {
private static Logger logger = Logger.getLogger(ExecutorManager.class);
- private ExecutorLoader executorLoader;
+ final private ExecutorLoader executorLoader;
- private CleanerThread cleanerThread;
+ final private CleanerThread cleanerThread;
- private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
+ final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
- private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
+ final private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished =
new ConcurrentHashMap<Integer, ExecutableFlow>();
- /* all flows ExecutorManager is currently dealing with */
- private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
+
+ /* map to easily access queued flows */
+ final private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> queuedFlowMap =
new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
- private ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
+ /* web server side queue */
+ final private ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>> queuedFlows =
new ConcurrentLinkedQueue<Pair<ExecutionReference, ExecutableFlow>>();
- private Set<Executor> activeExecutors = new HashSet<Executor>();
+ final private Set<Executor> activeExecutors = new HashSet<Executor>();
- private ExecutingManagerUpdaterThread executingManager;
- private QueueProcessorThread queueProcessor;
+ final private ExecutingManagerUpdaterThread executingManager;
+ final private QueueProcessorThread queueProcessor;
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
* 24 * 60 * 60 * 1000l;
@@ -88,14 +91,18 @@ public class ExecutorManager extends EventHandler implements
private long lastThreadCheckTime = -1;
private String updaterStage = "not started";
- private Map<String, Alerter> alerters;
+ final private Map<String, Alerter> alerters;
+
+ final Props azkProps;
File cacheDir;
public ExecutorManager(Props props, ExecutorLoader loader,
Map<String, Alerter> alters) throws ExecutorManagerException {
+ azkProps = props;
+
this.executorLoader = loader;
- this.setupExecutors(props);
+ this.setupExecutors();
this.loadRunningFlows();
this.loadQueuedFlows();
@@ -114,15 +121,31 @@ public class ExecutorManager extends EventHandler implements
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
cleanerThread = new CleanerThread(executionLogsRetentionMs);
cleanerThread.start();
-
+ disableQueueProcessorThread();
}
- /* setup activeExecutors using azkaban.properties and database executors */
- private void setupExecutors(Props props) throws ExecutorManagerException {
+ /**
+ * <pre>
+ * Setup activeExecutors using azkaban.properties and database executors
+ * Note:
+ * 1. If a local executor is specified and it is missing from db,
+ * this method add local executor as active in DB
+ * 2. If a local executor is specified and it is marked inactive in db,
+ * this method will convert local executor as active in DB
+ * 3. If azkaban.use.multiple.executors is set true, this method will
+ * load all active projects
+ * </pre>
+ *
+ * @throws ExecutorManagerException
+ */
+ public void setupExecutors() throws ExecutorManagerException {
+ // clear all active executors
+ activeExecutors.clear();
+
// Add local executor, if specified as per properties
- if (props.containsKey("executor.port")) {
- String executorHost = props.getString("executor.host", "localhost");
- int executorPort = props.getInt("executor.port");
+ if (azkProps.containsKey("executor.port")) {
+ String executorHost = azkProps.getString("executor.host", "localhost");
+ int executorPort = azkProps.getInt("executor.port");
Executor executor =
executorLoader.fetchExecutor(executorHost, executorPort);
if (executor == null) {
@@ -135,7 +158,7 @@ public class ExecutorManager extends EventHandler implements
executorPort, true));
}
- if (props.getBoolean("azkaban.multiple.executors", false)) {
+ if (azkProps.getBoolean("azkaban.use.multiple.executors", false)) {
activeExecutors.addAll(executorLoader.fetchActiveExecutors());
}
@@ -144,6 +167,22 @@ public class ExecutorManager extends EventHandler implements
}
}
+ public void disableQueueProcessorThread() {
+ queueProcessor.setActive(false);
+ }
+
+ public void enableQueueProcessorThread() {
+ queueProcessor.setActive(true);
+ }
+
+ public State getQueueProcessorThreadState() {
+ return queueProcessor.getState();
+ }
+
+ public boolean isQueueProcessorThreadActive() {
+ return queueProcessor.isActive();
+ }
+
@Override
public State getExecutorManagerThreadState() {
return executingManager.getState();
@@ -172,6 +211,12 @@ public class ExecutorManager extends EventHandler implements
return activeExecutors;
}
+ /**
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#fetchExecutor(int)
+ */
@Override
public Executor fetchExecutor(int executorId) throws ExecutorManagerException {
for (Executor executor : activeExecutors) {
@@ -235,24 +280,29 @@ public class ExecutorManager extends EventHandler implements
@Override
public List<Integer> getRunningFlows(int projectId, String flowId) {
ArrayList<Integer> executionIds = new ArrayList<Integer>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
- if (ref.getSecond().getFlowId().equals(flowId)
- && ref.getSecond().getProjectId() == projectId) {
- executionIds.add(ref.getFirst().getExecId());
- }
- }
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ getRunningFlowsHelper(projectId, flowId, executionIds,
+ queuedFlowMap.values());
+ getRunningFlowsHelper(projectId, flowId, executionIds,
+ runningFlows.values());
+ return executionIds;
+ }
+
+ /* Helper method for getRunningFlows */
+ private void getRunningFlowsHelper(int projectId, String flowId,
+ ArrayList<Integer> executionIds,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getFlowId().equals(flowId)
&& ref.getSecond().getProjectId() == projectId) {
executionIds.add(ref.getFirst().getExecId());
}
}
- return executionIds;
}
/**
*
* {@inheritDoc}
+ *
* @see azkaban.executor.ExecutorManagerAdapter#getActiveFlowsWithExecutor()
*/
@Override
@@ -260,15 +310,19 @@ public class ExecutorManager extends EventHandler implements
throws IOException {
List<Pair<ExecutableFlow, Executor>> flows =
new ArrayList<Pair<ExecutableFlow, Executor>>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
- flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
- .getFirst().getExecutor()));
- }
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ getActiveFlowsWithExecutorHelper(flows, queuedFlowMap.values());
+ getActiveFlowsWithExecutorHelper(flows, runningFlows.values());
+ return flows;
+ }
+
+ /* Helper method for getActiveFlowsWithExecutor */
+ private void getActiveFlowsWithExecutorHelper(
+ List<Pair<ExecutableFlow, Executor>> flows,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
flows.add(new Pair<ExecutableFlow, Executor>(ref.getSecond(), ref
.getFirst().getExecutor()));
}
- return flows;
}
/**
@@ -280,13 +334,20 @@ public class ExecutorManager extends EventHandler implements
*/
@Override
public boolean isFlowRunning(int projectId, String flowId) {
- for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
- if (ref.getSecond().getProjectId() == projectId
- && ref.getSecond().getFlowId().equals(flowId)) {
- return true;
- }
- }
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ boolean isRunning = false;
+ isRunning =
+ isRunning
+ || isFlowRunningHelper(projectId, flowId, queuedFlowMap.values());
+ isRunning =
+ isRunning
+ || isFlowRunningHelper(projectId, flowId, runningFlows.values());
+ return false;
+ }
+
+ /* Search a running flow in a collection */
+ private boolean isFlowRunningHelper(int projectId, String flowId,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
if (ref.getSecond().getProjectId() == projectId
&& ref.getSecond().getFlowId().equals(flowId)) {
return true;
@@ -323,13 +384,20 @@ public class ExecutorManager extends EventHandler implements
@Override
public List<ExecutableFlow> getRunningFlows() {
ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
- flows.add(ref.getSecond());
- }
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+ getActiveFlowHelper(flows, queuedFlowMap.values());
+ getActiveFlowHelper(flows, runningFlows.values());
+ return flows;
+ }
+
+ /*
+ * Helper method to get all running flows from a Pair<ExecutionReference,
+ * ExecutableFlow collection
+ */
+ private void getActiveFlowHelper(ArrayList<ExecutableFlow> flows,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
flows.add(ref.getSecond());
}
- return flows;
}
/**
@@ -341,16 +409,34 @@ public class ExecutorManager extends EventHandler implements
*/
public String getRunningFlowIds() {
List<Integer> allIds = new ArrayList<Integer>();
- for (Pair<ExecutionReference, ExecutableFlow> ref : queuedFlowMap.values()) {
- allIds.add(ref.getSecond().getExecutionId());
- }
- for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
- allIds.add(ref.getSecond().getExecutionId());
- }
+ getRunningFlowsIdsHelper(allIds, queuedFlowMap.values());
+ getRunningFlowsIdsHelper(allIds, runningFlows.values());
+ Collections.sort(allIds);
+ return allIds.toString();
+ }
+
+ /**
+ * Get execution Ids of all non-dispatched flows
+ *
+ * {@inheritDoc}
+ *
+ * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+ */
+ public String getQueuedFlowIds() {
+ List<Integer> allIds = new ArrayList<Integer>();
+ getRunningFlowsIdsHelper(allIds, queuedFlowMap.values());
Collections.sort(allIds);
return allIds.toString();
}
+ /* Helper method to flow ids of all running flows */
+ private void getRunningFlowsIdsHelper(List<Integer> allIds,
+ Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ for (Pair<ExecutionReference, ExecutableFlow> ref : collection) {
+ allIds.add(ref.getSecond().getExecutionId());
+ }
+ }
+
/**
* Get recently finished flows {@inheritDoc}
*
@@ -791,7 +877,8 @@ public class ExecutorManager extends EventHandler implements
private Map<String, Object> callExecutorServer(ExecutionReference ref,
String action) throws ExecutorManagerException {
try {
- return callExecutorServer(ref.getHost(), ref.getPort(), action,
+ Executor executor = ref.getExecutor();
+ return callExecutorServer(executor.getHost(), executor.getPort(), action,
ref.getExecId(), null, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
@@ -801,7 +888,8 @@ public class ExecutorManager extends EventHandler implements
private Map<String, Object> callExecutorServer(ExecutionReference ref,
String action, String user) throws ExecutorManagerException {
try {
- return callExecutorServer(ref.getHost(), ref.getPort(), action,
+ Executor executor = ref.getExecutor();
+ return callExecutorServer(executor.getHost(), executor.getPort(), action,
ref.getExecId(), user, (Pair<String, String>[]) null);
} catch (IOException e) {
throw new ExecutorManagerException(e);
@@ -812,7 +900,8 @@ public class ExecutorManager extends EventHandler implements
String action, Pair<String, String>... params)
throws ExecutorManagerException {
try {
- return callExecutorServer(ref.getHost(), ref.getPort(), action,
+ Executor executor = ref.getExecutor();
+ return callExecutorServer(executor.getHost(), executor.getPort(), action,
ref.getExecId(), null, params);
} catch (IOException e) {
throw new ExecutorManagerException(e);
@@ -823,7 +912,8 @@ public class ExecutorManager extends EventHandler implements
String action, String user, Pair<String, String>... params)
throws ExecutorManagerException {
try {
- return callExecutorServer(ref.getHost(), ref.getPort(), action,
+ Executor executor = ref.getExecutor();
+ return callExecutorServer(executor.getHost(), executor.getPort(), action,
ref.getExecId(), user, params);
} catch (IOException e) {
throw new ExecutorManagerException(e);
@@ -887,6 +977,7 @@ public class ExecutorManager extends EventHandler implements
/**
* Manage servlet call for stats servlet in Azkaban execution server
* {@inheritDoc}
+ *
* @throws ExecutorManagerException
*
* @see azkaban.executor.ExecutorManagerAdapter#callExecutorStats(java.lang.String,
@@ -894,11 +985,13 @@ public class ExecutorManager extends EventHandler implements
*/
@Override
public Map<String, Object> callExecutorStats(int executorId, String action,
- Pair<String, String>... params) throws IOException, ExecutorManagerException {
+ Pair<String, String>... params) throws IOException,
+ ExecutorManagerException {
URIBuilder builder = new URIBuilder();
Executor executor = fetchExecutor(executorId);
- builder.setScheme("http").setHost(executor.getHost()).setPort(executor.getPort()).setPath("/stats");
+ builder.setScheme("http").setHost(executor.getHost())
+ .setPort(executor.getPort()).setPath("/stats");
builder.setParameter(ConnectorParams.ACTION_PARAM, action);
@@ -1443,11 +1536,20 @@ public class ExecutorManager extends EventHandler implements
private boolean shutdown = false;
private long lastProcessingTime = -1;
private long maxContinousSubmission = 10;
+ private boolean isActive = true;
public QueueProcessorThread() {
this.setName("AzkabanWebServer-QueueProcessor-Thread");
}
+ public void setActive(boolean isActive) {
+ this.isActive = isActive;
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
@SuppressWarnings("unused")
public void shutdown() {
shutdown = true;
@@ -1458,10 +1560,13 @@ public class ExecutorManager extends EventHandler implements
while (!shutdown) {
synchronized (this) {
try {
- refreshExecutors();
- lastCleanerThreadCheckTime = System.currentTimeMillis();
long currentTime = System.currentTimeMillis();
- if (currentTime - QUEUE_PROCESSOR_WAIT_IN_MS > lastProcessingTime) {
+ if (isActive
+ && currentTime - QUEUE_PROCESSOR_WAIT_IN_MS > lastProcessingTime) {
+ // Refresh executor stats to be used by selector
+ refreshExecutors();
+ // process upto a maximum of maxContinousSubmission from queued
+ // flows
processQueuedFlows(maxContinousSubmission);
lastProcessingTime = currentTime;
}