azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 6589b48..e622a15 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -173,7 +173,7 @@ public class ExecutorManager extends EventHandler implements
       new QueueProcessorThread(azkProps.getBoolean(
         AZKABAN_QUEUEPROCESSING_ENABLED, true), azkProps.getLong(
         AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_MS, 1000), azkProps.getInt(
-        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 1000));
+        AZKABAN_ACTIVE_EXECUTOR_REFRESHINTERVAL_IN_NUM_FLOW, 10));
 
     queueProcessor.start();
   }
@@ -255,8 +255,9 @@ public class ExecutorManager extends EventHandler implements
           // max 5 secs
           String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
           executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
-          logger.info("Successfully refreshed ExecutorInfo for executor: "
-            + executor);
+          logger.info(String.format(
+            "Successfully refreshed executor: %s with executor info : %s",
+            executor, jsonString));
         } catch (TimeoutException e) {
           wasSuccess = false;
           logger.error("Timed out while waiting for ExecutorInfo refresh"
@@ -1809,7 +1810,7 @@ public class ExecutorManager extends EventHandler implements
     private void processQueuedFlows(long activeExecutorsRefreshWindow,
       int maxContinuousFlowProcessed) throws InterruptedException,
       ExecutorManagerException {
-      long lastExecutorRefreshTime = System.currentTimeMillis();
+      long lastExecutorRefreshTime = 0;
       Pair<ExecutionReference, ExecutableFlow> runningCandidate;
       int currentContinuousFlowProcessed = 0;
 
@@ -1830,9 +1831,32 @@ public class ExecutorManager extends EventHandler implements
           currentContinuousFlowProcessed = 0;
         }
 
-        exflow.setUpdateTime(currentTime);
-        // process flow with current snapshot of activeExecutors
-        processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+        /**
+         * <pre>
+         *  TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
+         *        Currently we try each queued flow once to infer a global busy state
+         * Possible improvements:-
+         *   1. Move system level filters in refreshExecutors and sleep if we have all executors busy after refresh
+         *   2. Implement GlobalSystemState in selector or in a third place to manage system filters. Basically
+         *      taking out all the filters which do not depend on the flow but are still being part of Selector.
+         * Assumptions:-
+         *   1. no one else except QueueProcessor is updating ExecutableFlow update time
+         *   2. re-attempting a flow (which has been tried before) is considered as all executors are busy
+         * </pre>
+         */
+        if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
+          // put back in the queue
+          queuedFlows.enqueue(exflow, reference);
+          long sleepInterval =
+            activeExecutorsRefreshWindow
+              - (currentTime - lastExecutorRefreshTime);
+          // wait till next executor refresh
+          sleep(sleepInterval);
+        } else {
+          exflow.setUpdateTime(currentTime);
+          // process flow with current snapshot of activeExecutors
+          processFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+        }
         currentContinuousFlowProcessed++;
       }
     }