azkaban-aplcache

No recursion in dispatch error handling (#1922) The code

9/13/2018 5:58:12 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 6ec5c8e..a69297f 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1853,8 +1853,7 @@ public class ExecutorManager extends EventHandler implements
         } else {
           exflow.setUpdateTime(currentTime);
           // process flow with current snapshot of activeExecutors
-          selectExecutorAndDispatchFlow(reference, exflow, new HashSet<>(
-              ExecutorManager.this.activeExecutors));
+          selectExecutorAndDispatchFlow(reference, exflow);
           ExecutorManager.this.runningCandidate = null;
         }
 
@@ -1867,29 +1866,69 @@ public class ExecutorManager extends EventHandler implements
 
     /* process flow with a snapshot of available Executors */
     private void selectExecutorAndDispatchFlow(final ExecutionReference reference,
-        final ExecutableFlow exflow, final Set<Executor> availableExecutors)
+        final ExecutableFlow exflow)
         throws ExecutorManagerException {
+      final Set<Executor> remainingExecutors = new HashSet<>(ExecutorManager.this.activeExecutors);
       synchronized (exflow) {
-        final Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
-        if (selectedExecutor != null) {
-          try {
-            dispatch(reference, exflow, selectedExecutor);
-            ExecutorManager.this.commonMetrics.markDispatchSuccess();
-          } catch (final ExecutorManagerException e) {
-            ExecutorManager.this.commonMetrics.markDispatchFail();
-            logger.warn(String.format(
-                "Executor %s responded with exception for exec: %d",
-                selectedExecutor, exflow.getExecutionId()), e);
-            handleDispatchExceptionCase(reference, exflow, selectedExecutor,
-                availableExecutors);
+        for (int i = 0; i <= this.maxDispatchingErrors; i++) {
+          final String giveUpReason = checkGiveUpDispatching(reference, remainingExecutors);
+          if (giveUpReason != null) {
+            logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
+                + giveUpReason);
+            finalizeFlows(exflow);
+            // GIVE UP DISPATCHING - exit
+            return;
+          } else {
+            final Executor selectedExecutor = selectExecutor(exflow, remainingExecutors);
+            if (selectedExecutor == null) {
+              ExecutorManager.this.commonMetrics.markDispatchFail();
+              handleNoExecutorSelectedCase(reference, exflow);
+              // RE-QUEUED - exit
+              return;
+            } else {
+              try {
+                dispatch(reference, exflow, selectedExecutor);
+                ExecutorManager.this.commonMetrics.markDispatchSuccess();
+                // SUCCESS - exit
+                return;
+              } catch (final ExecutorManagerException e) {
+                logFailedDispatchAttempt(reference, exflow, selectedExecutor, e);
+                ExecutorManager.this.commonMetrics.markDispatchFail();
+                reference.setNumErrors(reference.getNumErrors() + 1);
+                // FAILED ATTEMPT - try other executors except selectedExecutor
+                remainingExecutors.remove(selectedExecutor);
+              }
+            }
           }
-        } else {
-          ExecutorManager.this.commonMetrics.markDispatchFail();
-          handleNoExecutorSelectedCase(reference, exflow);
         }
+        throw new IllegalStateException(
+            "Unexpected error in dispatching " + exflow.getExecutionId());
+      }
+    }
+
+    private String checkGiveUpDispatching(final ExecutionReference reference,
+        final Set<Executor> remainingExecutors) {
+      if (reference.getNumErrors() >= this.maxDispatchingErrors) {
+        return "reached " + ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
+            + " (tried " + reference.getNumErrors() + " executors)";
+      } else if (remainingExecutors.isEmpty()) {
+        return "tried calling all executors (total: "
+            + ExecutorManager.this.activeExecutors.size() + ") but all failed";
+      } else {
+        return null;
       }
     }
 
+    private void logFailedDispatchAttempt(final ExecutionReference reference, final ExecutableFlow exflow,
+        final Executor selectedExecutor, final ExecutorManagerException e) {
+      logger.warn(String.format(
+          "Executor %s responded with exception for exec: %d",
+          selectedExecutor, exflow.getExecutionId()), e);
+      logger.info(String.format(
+          "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+          exflow.getExecutionId(), reference.getNumErrors()));
+    }
+
     /* Helper method to fetch  overriding Executor, if a valid user has specifed otherwise return null */
     private Executor getUserSpecifiedExecutor(final ExecutionOptions options,
         final int executionId) {
@@ -1945,34 +1984,6 @@ public class ExecutorManager extends EventHandler implements
       return choosenExecutor;
     }
 
-    private void handleDispatchExceptionCase(final ExecutionReference reference,
-        final ExecutableFlow exflow, final Executor lastSelectedExecutor,
-        final Set<Executor> remainingExecutors) throws ExecutorManagerException {
-      logger
-          .info(String
-              .format(
-                  "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
-                  exflow.getExecutionId(), reference.getNumErrors()));
-      reference.setNumErrors(reference.getNumErrors() + 1);
-      String giveUpReason = null;
-      if (reference.getNumErrors() >= this.maxDispatchingErrors) {
-        giveUpReason = "reached " + Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED
-            + " (tried " + reference.getNumErrors() + " executors)";
-      } else if (remainingExecutors.size() <= 1) {
-        giveUpReason = "tried calling all executors (total: "
-            + ExecutorManager.this.activeExecutors.size() + ") but all failed";
-      }
-      if (giveUpReason != null) {
-        logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
-            + giveUpReason);
-        finalizeFlows(exflow);
-      } else {
-        remainingExecutors.remove(lastSelectedExecutor);
-        // try other executors except chosenExecutor
-        selectExecutorAndDispatchFlow(reference, exflow, remainingExecutors);
-      }
-    }
-
     private void handleNoExecutorSelectedCase(final ExecutionReference reference,
         final ExecutableFlow exflow) throws ExecutorManagerException {
       logger