diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 678a51f..2e07828 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -80,6 +80,8 @@ public class ExecutorManager extends EventHandler implements
"azkaban.activeexecutor.refresh.flowinterval";
private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
"azkaban.executorinfo.refresh.maxThreads";
+ private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
+ "azkaban.maxDispatchingErrors";
private static Logger logger = Logger.getLogger(ExecutorManager.class);
private ExecutorLoader executorLoader;
@@ -171,8 +173,9 @@ public class ExecutorManager extends EventHandler implements
queueProcessor =
new QueueProcessorThread(azkProps.getBoolean(
AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
- AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 10000), azkProps.getInt(
- AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 10));
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000), azkProps.getInt(
+ AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), azkProps.getInt(
+ AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, activeExecutors.size()));
queueProcessor.start();
}
@@ -250,6 +253,7 @@ public class ExecutorManager extends EventHandler implements
boolean wasSuccess = true;
for (Pair<Executor, Future<String>> refreshPair : futures) {
Executor executor = refreshPair.getFirst();
+ executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
try {
// max 5 secs
String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
@@ -1762,7 +1766,7 @@ public class ExecutorManager extends EventHandler implements
*/
private class QueueProcessorThread extends Thread {
private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
- private static final int MAX_DISPATCHING_ERRORS_PERMITTED = 5;
+ private final int maxDispatchingErrors;
private final long activeExecutorRefreshWindowInMilisec;
private final int activeExecutorRefreshWindowInFlows;
@@ -1771,8 +1775,10 @@ public class ExecutorManager extends EventHandler implements
public QueueProcessorThread(boolean isActive,
long activeExecutorRefreshWindowInTime,
- int activeExecutorRefreshWindowInFlows) {
+ int activeExecutorRefreshWindowInFlows,
+ int maxDispatchingErrors) {
setActive(isActive);
+ this.maxDispatchingErrors = maxDispatchingErrors;
this.activeExecutorRefreshWindowInFlows =
activeExecutorRefreshWindowInFlows;
this.activeExecutorRefreshWindowInMilisec =
@@ -1953,7 +1959,7 @@ public class ExecutorManager extends EventHandler implements
"Reached handleDispatchExceptionCase stage for exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
reference.setNumErrors(reference.getNumErrors() + 1);
- if (reference.getNumErrors() >= MAX_DISPATCHING_ERRORS_PERMITTED
+ if (reference.getNumErrors() > this.maxDispatchingErrors
|| remainingExecutors.size() <= 1) {
logger.error("Failed to process queued flow");
finalizeFlows(exflow);