azkaban-aplcache

Allow more dispatch retries (#1953) Allow more dispatch

10/16/2018 3:28:15 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 349c682..3fede00 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -33,6 +33,7 @@ import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import java.io.BufferedReader;
 import java.io.File;
@@ -108,6 +109,7 @@ public class ExecutorManager extends EventHandler implements
   private Map<String, Integer> comparatorWeightsMap;
   private long lastSuccessfulExecutorInfoRefresh;
   private ExecutorService executorInforRefresherService;
+  private Duration sleepAfterDispatchFailure = Duration.ofSeconds(1L);
 
   @Inject
   public ExecutorManager(final Props azkProps, final ExecutorLoader executorLoader,
@@ -219,7 +221,8 @@ public class ExecutorManager extends EventHandler implements
                 Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5),
             this.azkProps.getInt(
                 Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
-                this.activeExecutors.getAll().size()));
+                this.activeExecutors.getAll().size()),
+            this.sleepAfterDispatchFailure);
   }
 
   /**
@@ -1324,6 +1327,7 @@ public class ExecutorManager extends EventHandler implements
     private final int maxDispatchingErrors;
     private final long activeExecutorRefreshWindowInMillisec;
     private final int activeExecutorRefreshWindowInFlows;
+    private final Duration sleepAfterDispatchFailure;
 
     private volatile boolean shutdown = false;
     private volatile boolean isActive = true;
@@ -1331,13 +1335,15 @@ public class ExecutorManager extends EventHandler implements
     public QueueProcessorThread(final boolean isActive,
         final long activeExecutorRefreshWindowInTime,
         final int activeExecutorRefreshWindowInFlows,
-        final int maxDispatchingErrors) {
+        final int maxDispatchingErrors,
+        final Duration sleepAfterDispatchFailure) {
       setActive(isActive);
       this.maxDispatchingErrors = maxDispatchingErrors;
       this.activeExecutorRefreshWindowInFlows =
           activeExecutorRefreshWindowInFlows;
       this.activeExecutorRefreshWindowInMillisec =
           activeExecutorRefreshWindowInTime;
+      this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
       this.setName("AzkabanWebServer-QueueProcessor-Thread");
     }
 
@@ -1442,56 +1448,54 @@ public class ExecutorManager extends EventHandler implements
         throws ExecutorManagerException {
       final Set<Executor> remainingExecutors = new HashSet<>(
           ExecutorManager.this.activeExecutors.getAll());
+      Throwable lastError;
       synchronized (exflow) {
-        for (int i = 0; i <= this.maxDispatchingErrors; i++) {
-          final String giveUpReason = checkGiveUpDispatching(reference, remainingExecutors);
-          if (giveUpReason != null) {
-            ExecutorManager.logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
-                + giveUpReason);
-            ExecutorManager.this.executionFinalizer
-                .finalizeFlow(exflow, "Failed to dispatch because " + giveUpReason, null);
-            // GIVE UP DISPATCHING - exit
+        do {
+          final Executor selectedExecutor = selectExecutor(exflow, remainingExecutors);
+          if (selectedExecutor == null) {
+            ExecutorManager.this.commonMetrics.markDispatchFail();
+            handleNoExecutorSelectedCase(reference, exflow);
+            // RE-QUEUED - exit
             return;
           } else {
-            final Executor selectedExecutor = selectExecutor(exflow, remainingExecutors);
-            if (selectedExecutor == null) {
-              ExecutorManager.this.commonMetrics.markDispatchFail();
-              handleNoExecutorSelectedCase(reference, exflow);
-              // RE-QUEUED - exit
+            try {
+              dispatch(reference, exflow, selectedExecutor);
+              ExecutorManager.this.commonMetrics.markDispatchSuccess();
+              // SUCCESS - exit
               return;
-            } else {
-              try {
-                dispatch(reference, exflow, selectedExecutor);
-                ExecutorManager.this.commonMetrics.markDispatchSuccess();
-                // SUCCESS - exit
-                return;
-              } catch (final ExecutorManagerException e) {
-                logFailedDispatchAttempt(reference, exflow, selectedExecutor, e);
-                ExecutorManager.this.commonMetrics.markDispatchFail();
-                reference.setNumErrors(reference.getNumErrors() + 1);
-                // FAILED ATTEMPT - try other executors except selectedExecutor
-                remainingExecutors.remove(selectedExecutor);
-              }
+            } catch (final ExecutorManagerException e) {
+              lastError = e;
+              logFailedDispatchAttempt(reference, exflow, selectedExecutor, e);
+              ExecutorManager.this.commonMetrics.markDispatchFail();
+              reference.setNumErrors(reference.getNumErrors() + 1);
+              // FAILED ATTEMPT - try other executors except selectedExecutor
+              updateRemainingExecutorsAndSleep(remainingExecutors, selectedExecutor);
             }
           }
-        }
-        throw new IllegalStateException(
-            "Unexpected error in dispatching " + exflow.getExecutionId());
+        } while (reference.getNumErrors() < this.maxDispatchingErrors);
+        // GAVE UP DISPATCHING
+        final String message = "Failed to dispatch queued execution " + exflow.getId() + " because "
+            + "reached " + ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
+            + " (tried " + reference.getNumErrors() + " executors)";
+        ExecutorManager.logger.error(message);
+        executionFinalizer.finalizeFlow(exflow, message, lastError);
       }
     }
 
-    private String checkGiveUpDispatching(final ExecutionReference reference,
-        final Set<Executor> remainingExecutors) {
-      if (reference.getNumErrors() >= this.maxDispatchingErrors) {
-        return "reached " + ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
-            + " (tried " + reference.getNumErrors() + " executors)";
-      } else if (remainingExecutors.isEmpty()) {
-        return "tried calling all executors (total: "
-            // TODO rather use the original size (activeExecutors may have been reloaded in the
-            // meanwhile)
-            + ExecutorManager.this.activeExecutors.getAll().size() + ") but all failed";
-      } else {
-        return null;
+    private void updateRemainingExecutorsAndSleep(final Set<Executor> remainingExecutors,
+        final Executor selectedExecutor) {
+      remainingExecutors.remove(selectedExecutor);
+      if (remainingExecutors.isEmpty()) {
+        remainingExecutors.addAll(activeExecutors.getAll());
+        sleepAfterDispatchFailure();
+      }
+    }
+
+    private void sleepAfterDispatchFailure() {
+      try {
+        Thread.sleep(this.sleepAfterDispatchFailure.toMillis());
+      } catch (final InterruptedException e1) {
+        ExecutorManager.logger.warn("Sleep after dispatch failure was interrupted - ignoring");
       }
     }
 
@@ -1573,4 +1577,9 @@ public class ExecutorManager extends EventHandler implements
       ExecutorManager.this.queuedFlows.enqueue(exflow, reference);
     }
   }
+
+  @VisibleForTesting
+  void setSleepAfterDispatchFailure(Duration sleepAfterDispatchFailure) {
+    this.sleepAfterDispatchFailure = sleepAfterDispatchFailure;
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 49f639c..fedc383 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -17,6 +17,7 @@
 package azkaban.executor;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.contains;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
@@ -35,6 +36,7 @@ import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -154,12 +156,14 @@ public class ExecutorManagerTest {
     final RunningExecutionsUpdaterThread updaterThread = new RunningExecutionsUpdaterThread(
         new RunningExecutionsUpdater(
             this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
-            this.runningExecutions, executionFinalizer), runningExecutions);
+            this.runningExecutions, executionFinalizer), this.runningExecutions);
     updaterThread.waitTimeIdleMs = 0;
     updaterThread.waitTimeMs = 0;
-    return new ExecutorManager(this.props, this.loader, this.commonMetrics, this.apiGateway,
-        this.runningExecutions, activeExecutors, this.updaterStage, executionFinalizer,
-        updaterThread);
+    final ExecutorManager executorManager = new ExecutorManager(this.props, this.loader,
+        this.commonMetrics, this.apiGateway, this.runningExecutions, activeExecutors,
+        this.updaterStage, executionFinalizer, updaterThread);
+    executorManager.setSleepAfterDispatchFailure(Duration.ZERO);
+    return executorManager;
   }
 
   /*
@@ -311,7 +315,7 @@ public class ExecutorManagerTest {
     testSetUpForRunningFlows();
     this.manager.start();
     final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
-    when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+    doReturn(flow1).when(this.loader).fetchExecutableFlow(-1);
     mockFlowDoesNotExist();
     when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
         .thenThrow(new ExecutorManagerException("Mocked dispatch exception"))
@@ -346,7 +350,9 @@ public class ExecutorManagerTest {
         .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
     verify(this.loader, Mockito.times(2)).unassignExecutor(-1);
     verify(this.mailAlerter).alertOnError(eq(flow1),
-        eq("Failed to dispatch because reached azkaban.maxDispatchingErrors (tried 2 executors)"));
+        eq("Failed to dispatch queued execution derived-member-data because reached "
+            + "azkaban.maxDispatchingErrors (tried 2 executors)"),
+        contains("Mocked dispatch exception"));
   }
 
   private void mockFlowDoesNotExist() throws Exception {
@@ -447,6 +453,43 @@ public class ExecutorManagerTest {
         activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
   }
 
+  /**
+   * ExecutorManager should try to dispatch to all executors until it succeeds.
+   */
+  @Test
+  public void testDispatchMultipleRetries() throws Exception {
+    this.props.put(Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED, 4);
+    testSetUpForRunningFlows();
+    this.manager.start();
+    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
+    flow1.getExecutionOptions().setFailureEmails(Arrays.asList("test@example.com"));
+    when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
+
+    // fail 2 first dispatch attempts, then succeed
+    when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
+        .thenThrow(new ExecutorManagerException("Mocked dispatch exception 1"))
+        .thenThrow(new ExecutorManagerException("Mocked dispatch exception 2"))
+        .thenReturn(null);
+
+    // this is just to clean up the execution as FAILED after it has been submitted
+    mockFlowDoesNotExist();
+
+    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
+    waitFlowFinished(flow1);
+
+    // it's random which executor is chosen each time, but both should have been tried at least once
+    verify(this.apiGateway, Mockito.atLeast(1))
+        .callWithExecutable(flow1, this.manager.fetchExecutor(1), ConnectorParams.EXECUTE_ACTION);
+    verify(this.apiGateway, Mockito.atLeast(1))
+        .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
+
+    // verify that there was a 3rd (successful) dispatch call
+    verify(this.apiGateway, Mockito.times(3))
+        .callWithExecutable(eq(flow1), any(), eq(ConnectorParams.EXECUTE_ACTION));
+
+    verify(this.loader, Mockito.times(2)).unassignExecutor(-1);
+  }
+
   /*
    * TODO: will move below method to setUp() and run before every test for both runningFlows and queuedFlows
    */