diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 6ec5c8e..a69297f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1853,8 +1853,7 @@ public class ExecutorManager extends EventHandler implements
} else {
exflow.setUpdateTime(currentTime);
// process flow with current snapshot of activeExecutors
- selectExecutorAndDispatchFlow(reference, exflow, new HashSet<>(
- ExecutorManager.this.activeExecutors));
+ selectExecutorAndDispatchFlow(reference, exflow);
ExecutorManager.this.runningCandidate = null;
}
@@ -1867,29 +1866,69 @@ public class ExecutorManager extends EventHandler implements
/* process flow with a snapshot of available Executors */
private void selectExecutorAndDispatchFlow(final ExecutionReference reference,
- final ExecutableFlow exflow, final Set<Executor> availableExecutors)
+ final ExecutableFlow exflow)
throws ExecutorManagerException {
+ final Set<Executor> remainingExecutors = new HashSet<>(ExecutorManager.this.activeExecutors);
synchronized (exflow) {
- final Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
- if (selectedExecutor != null) {
- try {
- dispatch(reference, exflow, selectedExecutor);
- ExecutorManager.this.commonMetrics.markDispatchSuccess();
- } catch (final ExecutorManagerException e) {
- ExecutorManager.this.commonMetrics.markDispatchFail();
- logger.warn(String.format(
- "Executor %s responded with exception for exec: %d",
- selectedExecutor, exflow.getExecutionId()), e);
- handleDispatchExceptionCase(reference, exflow, selectedExecutor,
- availableExecutors);
+ for (int i = 0; i <= this.maxDispatchingErrors; i++) {
+ final String giveUpReason = checkGiveUpDispatching(reference, remainingExecutors);
+ if (giveUpReason != null) {
+ logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
+ + giveUpReason);
+ finalizeFlows(exflow);
+ // GIVE UP DISPATCHING - exit
+ return;
+ } else {
+ final Executor selectedExecutor = selectExecutor(exflow, remainingExecutors);
+ if (selectedExecutor == null) {
+ ExecutorManager.this.commonMetrics.markDispatchFail();
+ handleNoExecutorSelectedCase(reference, exflow);
+ // RE-QUEUED - exit
+ return;
+ } else {
+ try {
+ dispatch(reference, exflow, selectedExecutor);
+ ExecutorManager.this.commonMetrics.markDispatchSuccess();
+ // SUCCESS - exit
+ return;
+ } catch (final ExecutorManagerException e) {
+ logFailedDispatchAttempt(reference, exflow, selectedExecutor, e);
+ ExecutorManager.this.commonMetrics.markDispatchFail();
+ reference.setNumErrors(reference.getNumErrors() + 1);
+ // FAILED ATTEMPT - try other executors except selectedExecutor
+ remainingExecutors.remove(selectedExecutor);
+ }
+ }
}
- } else {
- ExecutorManager.this.commonMetrics.markDispatchFail();
- handleNoExecutorSelectedCase(reference, exflow);
}
+ throw new IllegalStateException(
+ "Unexpected error in dispatching " + exflow.getExecutionId());
+ }
+ }
+
+ private String checkGiveUpDispatching(final ExecutionReference reference,
+ final Set<Executor> remainingExecutors) {
+ if (reference.getNumErrors() >= this.maxDispatchingErrors) {
+ return "reached " + ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
+ + " (tried " + reference.getNumErrors() + " executors)";
+ } else if (remainingExecutors.isEmpty()) {
+ return "tried calling all executors (total: "
+ + ExecutorManager.this.activeExecutors.size() + ") but all failed";
+ } else {
+ return null;
}
}
+ private void logFailedDispatchAttempt(final ExecutionReference reference, final ExecutableFlow exflow,
+ final Executor selectedExecutor, final ExecutorManagerException e) {
+ logger.warn(String.format(
+ "Executor %s responded with exception for exec: %d",
+ selectedExecutor, exflow.getExecutionId()), e);
+ logger.info(String.format(
+ "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+ exflow.getExecutionId(), reference.getNumErrors()));
+ }
+
/* Helper method to fetch overriding Executor, if a valid user has specifed otherwise return null */
private Executor getUserSpecifiedExecutor(final ExecutionOptions options,
final int executionId) {
@@ -1945,34 +1984,6 @@ public class ExecutorManager extends EventHandler implements
return choosenExecutor;
}
- private void handleDispatchExceptionCase(final ExecutionReference reference,
- final ExecutableFlow exflow, final Executor lastSelectedExecutor,
- final Set<Executor> remainingExecutors) throws ExecutorManagerException {
- logger
- .info(String
- .format(
- "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
- exflow.getExecutionId(), reference.getNumErrors()));
- reference.setNumErrors(reference.getNumErrors() + 1);
- String giveUpReason = null;
- if (reference.getNumErrors() >= this.maxDispatchingErrors) {
- giveUpReason = "reached " + Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
- + " (tried " + reference.getNumErrors() + " executors)";
- } else if (remainingExecutors.size() <= 1) {
- giveUpReason = "tried calling all executors (total: "
- + ExecutorManager.this.activeExecutors.size() + ") but all failed";
- }
- if (giveUpReason != null) {
- logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
- + giveUpReason);
- finalizeFlows(exflow);
- } else {
- remainingExecutors.remove(lastSelectedExecutor);
- // try other executors except chosenExecutor
- selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
- }
- }
-
private void handleNoExecutorSelectedCase(final ExecutionReference reference,
final ExecutableFlow exflow) throws ExecutorManagerException {
logger