azkaban-aplcache

Removing cached EXecutorInfos and changing default params

9/29/2015 7:33:42 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 678a51f..2e07828 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -80,6 +80,8 @@ public class ExecutorManager extends EventHandler implements
     "azkaban.activeexecutor.refresh.flowinterval";
   private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
       "azkaban.executorinfo.refresh.maxThreads";
+  private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
+    "azkaban.maxDispatchingErrors";
 
   private static Logger logger = Logger.getLogger(ExecutorManager.class);
   private ExecutorLoader executorLoader;
@@ -171,8 +173,9 @@ public class ExecutorManager extends EventHandler implements
     queueProcessor =
       new QueueProcessorThread(azkProps.getBoolean(
         AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 10000), azkProps.getInt(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 10));
+        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), azkProps.getInt(
+        AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), azkProps.getInt(
+        AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, activeExecutors.size()));
 
     queueProcessor.start();
   }
@@ -250,6 +253,7 @@ public class ExecutorManager extends EventHandler implements
       boolean wasSuccess = true;
       for (Pair<Executor, Future<String>> refreshPair : futures) {
         Executor executor = refreshPair.getFirst();
+        executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
         try {
           // max 5 secs
           String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
@@ -1762,7 +1766,7 @@ public class ExecutorManager extends EventHandler implements
    */
   private class QueueProcessorThread extends Thread {
     private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
-    private static final int MAX_DISPATCHING_ERRORS_PERMITTED = 5;
+    private final int maxDispatchingErrors;
     private final long activeExecutorRefreshWindowInMilisec;
     private final int activeExecutorRefreshWindowInFlows;
 
@@ -1771,8 +1775,10 @@ public class ExecutorManager extends EventHandler implements
 
     public QueueProcessorThread(boolean isActive,
       long activeExecutorRefreshWindowInTime,
-      int activeExecutorRefreshWindowInFlows) {
+      int activeExecutorRefreshWindowInFlows,
+      int maxDispatchingErrors) {
       setActive(isActive);
+      this.maxDispatchingErrors = maxDispatchingErrors;
       this.activeExecutorRefreshWindowInFlows =
         activeExecutorRefreshWindowInFlows;
       this.activeExecutorRefreshWindowInMilisec =
@@ -1953,7 +1959,7 @@ public class ExecutorManager extends EventHandler implements
             "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
             exflow.getExecutionId(), reference.getNumErrors()));
       reference.setNumErrors(reference.getNumErrors() + 1);
-      if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
+      if (reference.getNumErrors() > this.maxDispatchingErrors
         || remainingExecutors.size() <= 1) {
         logger.error("Failed to process queued flow");
         finalizeFlows(exflow);