diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 39dd831..6589b48 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -117,8 +117,10 @@ public class ExecutorManager extends EventHandler implements
public ExecutorManager(Props props, ExecutorLoader loader,
Map<String, Alerter> alters) throws ExecutorManagerException {
+ alerters = alters;
azkProps = props;
this.executorLoader = loader;
+
this.setupExecutors();
this.loadRunningFlows();
@@ -126,8 +128,6 @@ public class ExecutorManager extends EventHandler implements
new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
this.loadQueuedFlows();
- alerters = alters;
-
cacheDir = new File(props.getString("cache.directory", "cache"));
executingManager = new ExecutingManagerUpdaterThread();
@@ -1035,14 +1035,11 @@ public class ExecutorManager extends EventHandler implements
executorLoader.addActiveExecutableReference(reference);
queuedFlows.enqueue(exflow, reference);
} else {
- Executor executor = activeExecutors.iterator().next();
// assign only local executor we have
- reference.setExecutor(executor);
+ Executor choosenExecutor = activeExecutors.iterator().next();
executorLoader.addActiveExecutableReference(reference);
try {
- callExecutorServer(exflow, executor, ConnectorParams.EXECUTE_ACTION);
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+ dispatch(reference, exflow, choosenExecutor);
} catch (ExecutorManagerException e) {
executorLoader.removeActiveExecutableReference(reference
.getExecId());
@@ -1729,6 +1726,28 @@ public class ExecutorManager extends EventHandler implements
}
}
+ /**
+ * Calls executor to dispatch the flow, update db to assign the executor and
+ * in-memory state of executableFlow
+ */
+ private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+ Executor choosenExecutor) throws ExecutorManagerException {
+ exflow.setUpdateTime(System.currentTimeMillis());
+ callExecutorServer(exflow, choosenExecutor,
+ ConnectorParams.EXECUTE_ACTION);
+ executorLoader.assignExecutor(choosenExecutor.getId(),
+ exflow.getExecutionId());
+ reference.setExecutor(choosenExecutor);
+
+ // move from flow to running flows
+ runningFlows.put(exflow.getExecutionId(),
+ new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+ logger.info(String.format(
+ "Successfully dispatched exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ }
+
/*
* This thread is responsible for processing queued flows using dispatcher and
* making rest api calls to executor server
@@ -1925,23 +1944,5 @@ public class ExecutorManager extends EventHandler implements
// schedule can starve all others
queuedFlows.enqueue(exflow, reference);
}
-
- private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
- Executor choosenExecutor) throws ExecutorManagerException {
- exflow.setUpdateTime(System.currentTimeMillis());
- callExecutorServer(exflow, choosenExecutor,
- ConnectorParams.EXECUTE_ACTION);
- executorLoader.assignExecutor(choosenExecutor.getId(),
- exflow.getExecutionId());
- reference.setExecutor(choosenExecutor);
-
- // move from flow to running flows
- runningFlows.put(exflow.getExecutionId(),
- new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
- logger.info(String.format(
- "Successfully dispatched exec %d with error count %d",
- exflow.getExecutionId(), reference.getNumErrors()));
- }
}
}
\ No newline at end of file