diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 349c682..3fede00 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -33,6 +33,7 @@ import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.File;
@@ -108,6 +109,7 @@ public class ExecutorManager extends EventHandler implements
private Map<String, Integer> comparatorWeightsMap;
private long lastSuccessfulExecutorInfoRefresh;
private ExecutorService executorInforRefresherService;
+ private Duration sleepAfterDispatchFailure = Duration.ofSeconds(1L);
@Inject
public ExecutorManager(final Props azkProps, final ExecutorLoader executorLoader,
@@ -219,7 +221,8 @@ public class ExecutorManager extends EventHandler implements
Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5),
this.azkProps.getInt(
Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
- this.activeExecutors.getAll().size()));
+ this.activeExecutors.getAll().size()),
+ this.sleepAfterDispatchFailure);
}
/**
@@ -1324,6 +1327,7 @@ public class ExecutorManager extends EventHandler implements
private final int maxDispatchingErrors;
private final long activeExecutorRefreshWindowInMillisec;
private final int activeExecutorRefreshWindowInFlows;
+ private final Duration sleepAfterDispatchFailure;
private volatile boolean shutdown = false;
private volatile boolean isActive = true;
@@ -1331,13 +1335,15 @@ public class ExecutorManager extends EventHandler implements
public QueueProcessorThread(final boolean isActive,
final long activeExecutorRefreshWindowInTime,
final int activeExecutorRefreshWindowInFlows,
- final int maxDispatchingErrors) {
+ final int maxDispatchingErrors,
+ final Duration sleepAfterDispatchFailure) {
setActive(isActive);
this.maxDispatchingErrors = maxDispatchingErrors;
this.activeExecutorRefreshWindowInFlows =
activeExecutorRefreshWindowInFlows;
this.activeExecutorRefreshWindowInMillisec =
activeExecutorRefreshWindowInTime;
+ this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
this.setName("AzkabanWebServer-QueueProcessor-Thread");
}
@@ -1442,56 +1448,54 @@ public class ExecutorManager extends EventHandler implements
throws ExecutorManagerException {
final Set<Executor> remainingExecutors = new HashSet<>(
ExecutorManager.this.activeExecutors.getAll());
+ Throwable lastError;
synchronized (exflow) {
- for (int i = 0; i <= this.maxDispatchingErrors; i++) {
- final String giveUpReason = checkGiveUpDispatching(reference, remainingExecutors);
- if (giveUpReason != null) {
- ExecutorManager.logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
- + giveUpReason);
- ExecutorManager.this.executionFinalizer
- .finalizeFlow(exflow, "Failed to dispatch because " + giveUpReason, null);
- // GIVE UP DISPATCHING - exit
+ do {
+ final Executor selectedExecutor = selectExecutor(exflow, remainingExecutors);
+ if (selectedExecutor == null) {
+ ExecutorManager.this.commonMetrics.markDispatchFail();
+ handleNoExecutorSelectedCase(reference, exflow);
+ // RE-QUEUED - exit
return;
} else {
- final Executor selectedExecutor = selectExecutor(exflow, remainingExecutors);
- if (selectedExecutor == null) {
- ExecutorManager.this.commonMetrics.markDispatchFail();
- handleNoExecutorSelectedCase(reference, exflow);
- // RE-QUEUED - exit
+ try {
+ dispatch(reference, exflow, selectedExecutor);
+ ExecutorManager.this.commonMetrics.markDispatchSuccess();
+ // SUCCESS - exit
return;
- } else {
- try {
- dispatch(reference, exflow, selectedExecutor);
- ExecutorManager.this.commonMetrics.markDispatchSuccess();
- // SUCCESS - exit
- return;
- } catch (final ExecutorManagerException e) {
- logFailedDispatchAttempt(reference, exflow, selectedExecutor, e);
- ExecutorManager.this.commonMetrics.markDispatchFail();
- reference.setNumErrors(reference.getNumErrors() + 1);
- // FAILED ATTEMPT - try other executors except selectedExecutor
- remainingExecutors.remove(selectedExecutor);
- }
+ } catch (final ExecutorManagerException e) {
+ lastError = e;
+ logFailedDispatchAttempt(reference, exflow, selectedExecutor, e);
+ ExecutorManager.this.commonMetrics.markDispatchFail();
+ reference.setNumErrors(reference.getNumErrors() + 1);
+ // FAILED ATTEMPT - try other executors except selectedExecutor
+ updateRemainingExecutorsAndSleep(remainingExecutors, selectedExecutor);
}
}
- }
- throw new IllegalStateException(
- "Unexpected error in dispatching " + exflow.getExecutionId());
+ } while (reference.getNumErrors() < this.maxDispatchingErrors);
+ // GAVE UP DISPATCHING
+ final String message = "Failed to dispatch queued execution " + exflow.getId() + " because "
+ + "reached " + ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
+ + " (tried " + reference.getNumErrors() + " executors)";
+ ExecutorManager.logger.error(message);
+ executionFinalizer.finalizeFlow(exflow, message, lastError);
}
}
- private String checkGiveUpDispatching(final ExecutionReference reference,
- final Set<Executor> remainingExecutors) {
- if (reference.getNumErrors() >= this.maxDispatchingErrors) {
- return "reached " + ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
- + " (tried " + reference.getNumErrors() + " executors)";
- } else if (remainingExecutors.isEmpty()) {
- return "tried calling all executors (total: "
- // TODO rather use the original size (activeExecutors may have been reloaded in the
- // meanwhile)
- + ExecutorManager.this.activeExecutors.getAll().size() + ") but all failed";
- } else {
- return null;
+ private void updateRemainingExecutorsAndSleep(final Set<Executor> remainingExecutors,
+ final Executor selectedExecutor) {
+ remainingExecutors.remove(selectedExecutor);
+ if (remainingExecutors.isEmpty()) {
+ remainingExecutors.addAll(activeExecutors.getAll());
+ sleepAfterDispatchFailure();
+ }
+ }
+
+ private void sleepAfterDispatchFailure() {
+ try {
+ Thread.sleep(this.sleepAfterDispatchFailure.toMillis());
+ } catch (final InterruptedException e1) {
+ ExecutorManager.logger.warn("Sleep after dispatch failure was interrupted - ignoring");
}
}
@@ -1573,4 +1577,9 @@ public class ExecutorManager extends EventHandler implements
ExecutorManager.this.queuedFlows.enqueue(exflow, reference);
}
}
+
+ @VisibleForTesting
+ void setSleepAfterDispatchFailure(Duration sleepAfterDispatchFailure) {
+ this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 49f639c..fedc383 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -17,6 +17,7 @@
package azkaban.executor;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
@@ -35,6 +36,7 @@ import azkaban.utils.Props;
import azkaban.utils.TestUtils;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -154,12 +156,14 @@ public class ExecutorManagerTest {
final RunningExecutionsUpdaterThread updaterThread = new RunningExecutionsUpdaterThread(
new RunningExecutionsUpdater(
this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
- this.runningExecutions, executionFinalizer), runningExecutions);
+ this.runningExecutions, executionFinalizer), this.runningExecutions);
updaterThread.waitTimeIdleMs = 0;
updaterThread.waitTimeMs = 0;
- return new ExecutorManager(this.props, this.loader, this.commonMetrics, this.apiGateway,
- this.runningExecutions, activeExecutors, this.updaterStage, executionFinalizer,
- updaterThread);
+ final ExecutorManager executorManager = new ExecutorManager(this.props, this.loader,
+ this.commonMetrics, this.apiGateway, this.runningExecutions, activeExecutors,
+ this.updaterStage, executionFinalizer, updaterThread);
+ executorManager.setSleepAfterDispatchFailure(Duration.ZERO);
+ return executorManager;
}
/*
@@ -311,7 +315,7 @@ public class ExecutorManagerTest {
testSetUpForRunningFlows();
this.manager.start();
final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
- when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+ doReturn(flow1).when(this.loader).fetchExecutableFlow(-1);
mockFlowDoesNotExist();
when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
.thenThrow(new ExecutorManagerException("Mocked dispatch exception"))
@@ -346,7 +350,9 @@ public class ExecutorManagerTest {
.callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
verify(this.loader, Mockito.times(2)).unassignExecutor(-1);
verify(this.mailAlerter).alertOnError(eq(flow1),
- eq("Failed to dispatch because reached azkaban.maxDispatchingErrors (tried 2 executors)"));
+ eq("Failed to dispatch queued execution derived-member-data because reached "
+ + "azkaban.maxDispatchingErrors (tried 2 executors)"),
+ contains("Mocked dispatch exception"));
}
private void mockFlowDoesNotExist() throws Exception {
@@ -447,6 +453,43 @@ public class ExecutorManagerTest {
activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
}
+ /**
+ * ExecutorManager should try to dispatch to all executors until it succeeds.
+ */
+ @Test
+ public void testDispatchMultipleRetries() throws Exception {
+ this.props.put(Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED, 4);
+ testSetUpForRunningFlows();
+ this.manager.start();
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ flow1.getExecutionOptions().setFailureEmails(Arrays.asList("test@example.com"));
+ when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+
+ // fail 2 first dispatch attempts, then succeed
+ when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
+ .thenThrow(new ExecutorManagerException("Mocked dispatch exception 1"))
+ .thenThrow(new ExecutorManagerException("Mocked dispatch exception 2"))
+ .thenReturn(null);
+
+ // this is just to clean up the execution as FAILED after it has been submitted
+ mockFlowDoesNotExist();
+
+ this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+ waitFlowFinished(flow1);
+
+ // it's random which executor is chosen each time, but both should have been tried at least once
+ verify(this.apiGateway, Mockito.atLeast(1))
+ .callWithExecutable(flow1, this.manager.fetchExecutor(1), ConnectorParams.EXECUTE_ACTION);
+ verify(this.apiGateway, Mockito.atLeast(1))
+ .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
+
+ // verify that there was a 3rd (successful) dispatch call
+ verify(this.apiGateway, Mockito.times(3))
+ .callWithExecutable(eq(flow1), any(), eq(ConnectorParams.EXECUTE_ACTION));
+
+ verify(this.loader, Mockito.times(2)).unassignExecutor(-1);
+ }
+
/*
* TODO: will move below method to setUp() and run before every test for both runningFlows and queuedFlows
*/