diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 6589b48..e622a15 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -173,7 +173,7 @@ public class ExecutorManager extends EventHandler implements
new QueueProcessorThread(azkProps.getBoolean(
AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
- AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 1000));
+ AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 10));
queueProcessor.start();
}
@@ -255,8 +255,9 @@ public class ExecutorManager extends EventHandler implements
// max 5 secs
String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
- logger.info("Successfully refreshed ExecutorInfo for executor: "
- + executor);
+ logger.info(String.format(
+ "Successfully refreshed executor: %s with executor info : %s",
+ executor, jsonString));
} catch (TimeoutException e) {
wasSuccess = false;
logger.error("Timed out while waiting for ExecutorInfo refresh"
@@ -1809,7 +1810,7 @@ public class ExecutorManager extends EventHandler implements
private void processQueuedFlows(long activeExecutorsRefreshWindow,
int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
- long lastExecutorRefreshTime = System.currentTimeMillis();
+ long lastExecutorRefreshTime = 0;
Pair<ExecutionReference, ExecutableFlow> runningCandidate;
int currentContinuousFlowProcessed = 0;
@@ -1830,9 +1831,32 @@ public class ExecutorManager extends EventHandler implements
currentContinuousFlowProcessed = 0;
}
- exflow.setUpdateTime(currentTime);
- // process flow with current snapshot of activeExecutors
- processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+ /**
+ * <pre>
+ * TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
+ * Currently we try each queued flow once to infer a global busy state
+ * Possible improvements:-
+ * 1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
+ * 2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
+ * taking out all the filters which do not depend on the flow but are still being part of Selector.
+ * Assumptions:-
+ * 1. no one else except QueueProcessor is updating ExecutableFlow update time
+ * 2. re-attempting a flow (which has been tried before) is considered as all executors are busy
+ * </pre>
+ */
+ if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
+ // put back in the queue
+ queuedFlows.enqueue(exflow, reference);
+ long sleepInterval =
+ activeExecutorsRefreshWindow
+ - (currentTime - lastExecutorRefreshTime);
+ // wait till next executor refresh
+ sleep(sleepInterval);
+ } else {
+ exflow.setUpdateTime(currentTime);
+ // process flow with current snapshot of activeExecutors
+ processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+ }
currentContinuousFlowProcessed++;
}
}