diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 4c8d2f5..0ad8c19 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1953,9 +1953,17 @@ public class ExecutorManager extends EventHandler implements
"Reached handleDispatchExceptionCase stage for exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
reference.setNumErrors(reference.getNumErrors() + 1);
- if (reference.getNumErrors() > this.maxDispatchingErrors
- || remainingExecutors.size() <= 1) {
- logger.error("Failed to process queued flow");
+ String giveUpReason = null;
+ if (reference.getNumErrors() >= this.maxDispatchingErrors) {
+ giveUpReason = "reached " + Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
+ + " (tried " + reference.getNumErrors() + " executors)";
+ } else if (remainingExecutors.size() <= 1) {
+ giveUpReason = "tried calling all executors (total: "
+ + ExecutorManager.this.activeExecutors.size() + ") but all failed";
+ }
+ if (giveUpReason != null) {
+ logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
+ + giveUpReason);
finalizeFlows(exflow);
} else {
remainingExecutors.remove(lastSelectedExecutor);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 67467fa..302e3ed 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import azkaban.Constants;
+import azkaban.alert.Alerter;
import azkaban.metrics.CommonMetrics;
import azkaban.metrics.MetricsManager;
import azkaban.user.User;
@@ -49,6 +50,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
/**
* Test class for executor manager
@@ -66,11 +68,14 @@ public class ExecutorManagerTest {
private ExecutableFlow flow2;
private AlerterHolder alertHolder;
private ExecutorApiGateway apiGateway;
+ private Alerter mailAlerter;
@Before
public void setup() {
this.props = new Props();
+ this.mailAlerter = mock(Alerter.class);
this.alertHolder = mock(AlerterHolder.class);
+ when(this.alertHolder.get("email")).thenReturn(this.mailAlerter);
this.loader = new MockExecutorLoader();
}
@@ -276,15 +281,64 @@ public class ExecutorManagerTest {
this.manager.start();
final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+ mockFlowDoesNotExist();
+ this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+ final ExecutableFlow fetchedFlow = waitFlowFinished(flow1);
+ Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
+ }
+
+ /**
+ * 1. Executor 1 throws an exception when trying to dispatch to it
+ * 2. ExecutorManager should try next executor
+ * 3. Executor 2 accepts the dispatched execution
+ */
+ @Test
+ public void testDispatchException() throws Exception {
+ testSetUpForRunningFlows();
+ this.manager.start();
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+ mockFlowDoesNotExist();
+ when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
+ .thenThrow(new ExecutorManagerException("Mocked dispatch exception"))
+ .thenReturn(null);
+ this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+ waitFlowFinished(flow1);
+ verify(this.apiGateway)
+ .callWithExecutable(flow1, this.manager.fetchExecutor(1), ConnectorParams.EXECUTE_ACTION);
+ verify(this.apiGateway)
+ .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
+ verify(this.loader, Mockito.times(1)).unassignExecutor(-1);
+ }
+ /**
+ * ExecutorManager should try to dispatch to all executors & when both fail it should remove the
+ * execution from queue and finalize it.
+ */
+ @Test
+ public void testDispatchFailed() throws Exception {
+ testSetUpForRunningFlows();
+ this.manager.start();
+ final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+ flow1.getExecutionOptions().setFailureEmails(Arrays.asList("test@example.com"));
+ when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+ when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
+ .thenThrow(new ExecutorManagerException("Mocked dispatch exception"));
+ this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+ waitFlowFinished(flow1);
+ verify(this.apiGateway)
+ .callWithExecutable(flow1, this.manager.fetchExecutor(1), ConnectorParams.EXECUTE_ACTION);
+ verify(this.apiGateway)
+ .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
+ verify(this.loader, Mockito.times(2)).unassignExecutor(-1);
+ verify(this.mailAlerter).alertOnError(flow1);
+ }
+
+ private void mockFlowDoesNotExist() throws Exception {
mockUpdateResponse(ImmutableMap.of(ConnectorParams.RESPONSE_UPDATED_FLOWS,
Collections.singletonList(ImmutableMap.of(
ConnectorParams.UPDATE_MAP_EXEC_ID, -1,
"error", "Flow does not exist"))));
-
- this.manager.submitExecutableFlow(flow1, this.user.getUserId());
- final ExecutableFlow fetchedFlow = waitFlowFinished(flow1);
- Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
}
// Suppress "unchecked generic array creation for varargs parameter".