azkaban-aplcache

Add unit tests for execution dispatch failures (#1919) Add

8/16/2018 2:29:21 AM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 4c8d2f5..0ad8c19 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1953,9 +1953,17 @@ public class ExecutorManager extends EventHandler implements
                   "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
                   exflow.getExecutionId(), reference.getNumErrors()));
       reference.setNumErrors(reference.getNumErrors() + 1);
-      if (reference.getNumErrors() > this.maxDispatchingErrors
-          || remainingExecutors.size() <= 1) {
-        logger.error("Failed to process queued flow");
+      String giveUpReason = null;
+      if (reference.getNumErrors() >= this.maxDispatchingErrors) {
+        giveUpReason = "reached " + Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
+            + " (tried " + reference.getNumErrors() + " executors)";
+      } else if (remainingExecutors.size() <= 1) {
+        giveUpReason = "tried calling all executors (total: "
+            + ExecutorManager.this.activeExecutors.size() + ") but all failed";
+      }
+      if (giveUpReason != null) {
+        logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
+            + giveUpReason);
         finalizeFlows(exflow);
       } else {
         remainingExecutors.remove(lastSelectedExecutor);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 67467fa..302e3ed 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import azkaban.Constants;
+import azkaban.alert.Alerter;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
 import azkaban.user.User;
@@ -49,6 +50,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Test class for executor manager
@@ -66,11 +68,14 @@ public class ExecutorManagerTest {
   private ExecutableFlow flow2;
   private AlerterHolder alertHolder;
   private ExecutorApiGateway apiGateway;
+  private Alerter mailAlerter;
 
   @Before
   public void setup() {
     this.props = new Props();
+    this.mailAlerter = mock(Alerter.class);
     this.alertHolder = mock(AlerterHolder.class);
+    when(this.alertHolder.get("email")).thenReturn(this.mailAlerter);
     this.loader = new MockExecutorLoader();
   }
 
@@ -276,15 +281,64 @@ public class ExecutorManagerTest {
     this.manager.start();
     final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
     when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+    mockFlowDoesNotExist();
+    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+    final ExecutableFlow fetchedFlow = waitFlowFinished(flow1);
+    Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
+  }
+
+  /**
+   * 1. Executor 1 throws an exception when trying to dispatch to it
+   * 2. ExecutorManager should try next executor
+   * 3. Executor 2 accepts the dispatched execution
+   */
+  @Test
+  public void testDispatchException() throws Exception {
+    testSetUpForRunningFlows();
+    this.manager.start();
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+    mockFlowDoesNotExist();
+    when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
+        .thenThrow(new ExecutorManagerException("Mocked dispatch exception"))
+        .thenReturn(null);
+    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+    waitFlowFinished(flow1);
+    verify(this.apiGateway)
+        .callWithExecutable(flow1, this.manager.fetchExecutor(1), ConnectorParams.EXECUTE_ACTION);
+    verify(this.apiGateway)
+        .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
+    verify(this.loader, Mockito.times(1)).unassignExecutor(-1);
+  }
 
+  /**
+   * ExecutorManager should try to dispatch to all executors & when both fail it should remove the
+   * execution from queue and finalize it.
+   */
+  @Test
+  public void testDispatchFailed() throws Exception {
+    testSetUpForRunningFlows();
+    this.manager.start();
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    flow1.getExecutionOptions().setFailureEmails(Arrays.asList("test@example.com"));
+    when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+    when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
+        .thenThrow(new ExecutorManagerException("Mocked dispatch exception"));
+    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+    waitFlowFinished(flow1);
+    verify(this.apiGateway)
+        .callWithExecutable(flow1, this.manager.fetchExecutor(1), ConnectorParams.EXECUTE_ACTION);
+    verify(this.apiGateway)
+        .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
+    verify(this.loader, Mockito.times(2)).unassignExecutor(-1);
+    verify(this.mailAlerter).alertOnError(flow1);
+  }
+
+  private void mockFlowDoesNotExist() throws Exception {
     mockUpdateResponse(ImmutableMap.of(ConnectorParams.RESPONSE_UPDATED_FLOWS,
         Collections.singletonList(ImmutableMap.of(
             ConnectorParams.UPDATE_MAP_EXEC_ID, -1,
             "error", "Flow does not exist"))));
-
-    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
-    final ExecutableFlow fetchedFlow = waitFlowFinished(flow1);
-    Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
   }
 
   // Suppress "unchecked generic array creation for varargs parameter".