diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 39dd831..cca687b 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -74,9 +74,9 @@ public class ExecutorManager extends EventHandler implements
"azkaban.use.multiple.executors";
private static final String AZKABAN_WEBSERVER_QUEUE_SIZE =
"azkaban.webserver.queue.size";
- private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS =
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS =
"azkaban.activeexecutor.refresh.milisecinterval";
- private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW =
+ private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
"azkaban.activeexecutor.refresh.flowinterval";
private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
"azkaban.executorinfo.refresh.maxThreads";
@@ -115,20 +115,19 @@ public class ExecutorManager extends EventHandler implements
private long lastSuccessfulExecutorInfoRefresh;
private ExecutorService executorInforRefresherService;
- public ExecutorManager(Props props, ExecutorLoader loader,
- Map<String, Alerter> alters) throws ExecutorManagerException {
- azkProps = props;
+ public ExecutorManager(Props azkProps, ExecutorLoader loader,
+ Map<String, Alerter> alerters) throws ExecutorManagerException {
+ this.alerters = alerters;
+ this.azkProps = azkProps;
this.executorLoader = loader;
this.setupExecutors();
this.loadRunningFlows();
queuedFlows =
- new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
+ new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
this.loadQueuedFlows();
- alerters = alters;
-
- cacheDir = new File(props.getString("cache.directory", "cache"));
+ cacheDir = new File(azkProps.getString("cache.directory", "cache"));
executingManager = new ExecutingManagerUpdaterThread();
executingManager.start();
@@ -138,7 +137,7 @@ public class ExecutorManager extends EventHandler implements
}
long executionLogsRetentionMs =
- props.getLong("execution.logs.retention.ms",
+ azkProps.getLong("execution.logs.retention.ms",
DEFAULT_EXECUTION_LOGS_RETENTION_MS);
cleanerThread = new CleanerThread(executionLogsRetentionMs);
@@ -172,8 +171,8 @@ public class ExecutorManager extends EventHandler implements
queueProcessor =
new QueueProcessorThread(azkProps.getBoolean(
AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
- AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
- AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 1000));
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 1000), azkProps.getInt(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 10));
queueProcessor.start();
}
@@ -255,8 +254,9 @@ public class ExecutorManager extends EventHandler implements
// max 5 secs
String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
- logger.info("Successfully refreshed ExecutorInfo for executor: "
- + executor);
+ logger.info(String.format(
+ "Successfully refreshed executor: %s with executor info : %s",
+ executor, jsonString));
} catch (TimeoutException e) {
wasSuccess = false;
logger.error("Timed out while waiting for ExecutorInfo refresh"
@@ -1035,14 +1035,11 @@ public class ExecutorManager extends EventHandler implements
executorLoader.addActiveExecutableReference(reference);
queuedFlows.enqueue(exflow, reference);
} else {
- Executor executor = activeExecutors.iterator().next();
// assign only local executor we have
- reference.setExecutor(executor);
+ Executor choosenExecutor = activeExecutors.iterator().next();
executorLoader.addActiveExecutableReference(reference);
try {
- callExecutorServer(exflow, executor, ConnectorParams.EXECUTE_ACTION);
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+ dispatch(reference, exflow, choosenExecutor);
} catch (ExecutorManagerException e) {
executorLoader.removeActiveExecutableReference(reference
.getExecId());
@@ -1729,6 +1726,36 @@ public class ExecutorManager extends EventHandler implements
}
}
+ /**
+ * Calls executor to dispatch the flow, update db to assign the executor and
+ * in-memory state of executableFlow
+ */
+ private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+ Executor choosenExecutor) throws ExecutorManagerException {
+ exflow.setUpdateTime(System.currentTimeMillis());
+
+ executorLoader.assignExecutor(choosenExecutor.getId(),
+ exflow.getExecutionId());
+ try {
+ callExecutorServer(exflow, choosenExecutor,
+ ConnectorParams.EXECUTE_ACTION);
+ } catch (ExecutorManagerException ex) {
+ logger.error("Rolling back executor assignment for execution id:"
+ + exflow.getExecutionId(), ex);
+ executorLoader.unassignExecutor(exflow.getExecutionId());
+ throw new ExecutorManagerException(ex);
+ }
+ reference.setExecutor(choosenExecutor);
+
+ // move from flow to running flows
+ runningFlows.put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+ logger.info(String.format(
+ "Successfully dispatched exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ }
+
/*
* This thread is responsible for processing queued flows using dispatcher and
* making rest api calls to executor server
@@ -1790,7 +1817,7 @@ public class ExecutorManager extends EventHandler implements
private void processQueuedFlows(long activeExecutorsRefreshWindow,
int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
- long lastExecutorRefreshTime = System.currentTimeMillis();
+ long lastExecutorRefreshTime = 0;
Pair<ExecutionReference, ExecutableFlow> runningCandidate;
int currentContinuousFlowProcessed = 0;
@@ -1811,15 +1838,38 @@ public class ExecutorManager extends EventHandler implements
currentContinuousFlowProcessed = 0;
}
- exflow.setUpdateTime(currentTime);
- // process flow with current snapshot of activeExecutors
- processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+ /**
+ * <pre>
+ * TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
+ * Currently we try each queued flow once to infer a global busy state
+ * Possible improvements:-
+ * 1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
+ * 2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
+ * taking out all the filters which do not depend on the flow but are still being part of Selector.
+ * Assumptions:-
+ * 1. no one else except QueueProcessor is updating ExecutableFlow update time
+ * 2. re-attempting a flow (which has been tried before) is considered as all executors are busy
+ * </pre>
+ */
+ if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
+ // put back in the queue
+ queuedFlows.enqueue(exflow, reference);
+ long sleepInterval =
+ activeExecutorsRefreshWindow
+ - (currentTime - lastExecutorRefreshTime);
+ // wait till next executor refresh
+ sleep(sleepInterval);
+ } else {
+ exflow.setUpdateTime(currentTime);
+ // process flow with current snapshot of activeExecutors
+ selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+ }
currentContinuousFlowProcessed++;
}
}
/* process flow with a snapshot of available Executors */
- private void processFlow(ExecutionReference reference,
+ private void selectExecutorAndDispatchFlow(ExecutionReference reference,
ExecutableFlow exflow, Set<Executor> availableExecutors)
throws ExecutorManagerException {
synchronized (exflow) {
@@ -1910,7 +1960,7 @@ public class ExecutorManager extends EventHandler implements
} else {
remainingExecutors.remove(lastSelectedExecutor);
// try other executors except chosenExecutor
- processFlow(reference, exflow, remainingExecutors);
+ selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
}
}
@@ -1925,23 +1975,5 @@ public class ExecutorManager extends EventHandler implements
// schedule can starve all others
queuedFlows.enqueue(exflow, reference);
}
-
- private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
- Executor choosenExecutor) throws ExecutorManagerException {
- exflow.setUpdateTime(System.currentTimeMillis());
- callExecutorServer(exflow, choosenExecutor,
- ConnectorParams.EXECUTE_ACTION);
- executorLoader.assignExecutor(choosenExecutor.getId(),
- exflow.getExecutionId());
- reference.setExecutor(choosenExecutor);
-
- // move from flow to running flows
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
- logger.info(String.format(
- "Successfully dispatched exec %d with error count %d",
- exflow.getExecutionId(), reference.getNumErrors()));
- }
}
}
\ No newline at end of file