azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 39dd831..6589b48 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -117,8 +117,10 @@ public class ExecutorManager extends EventHandler implements
 
   public ExecutorManager(Props props, ExecutorLoader loader,
       Map<String, Alerter> alters) throws ExecutorManagerException {
+    alerters = alters;
     azkProps = props;
     this.executorLoader = loader;
+
     this.setupExecutors();
     this.loadRunningFlows();
 
@@ -126,8 +128,6 @@ public class ExecutorManager extends EventHandler implements
         new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));
     this.loadQueuedFlows();
 
-    alerters = alters;
-
     cacheDir = new File(props.getString("cache.directory", "cache"));
 
     executingManager = new ExecutingManagerUpdaterThread();
@@ -1035,14 +1035,11 @@ public class ExecutorManager extends EventHandler implements
           executorLoader.addActiveExecutableReference(reference);
           queuedFlows.enqueue(exflow, reference);
         } else {
-          Executor executor = activeExecutors.iterator().next();
           // assign only local executor we have
-          reference.setExecutor(executor);
+          Executor choosenExecutor = activeExecutors.iterator().next();
           executorLoader.addActiveExecutableReference(reference);
           try {
-            callExecutorServer(exflow, executor, ConnectorParams.EXECUTE_ACTION);
-            runningFlows.put(exflow.getExecutionId(),
-              new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+            dispatch(reference, exflow, choosenExecutor);
           } catch (ExecutorManagerException e) {
             executorLoader.removeActiveExecutableReference(reference
               .getExecId());
@@ -1729,6 +1726,28 @@ public class ExecutorManager extends EventHandler implements
     }
   }
 
+  /**
+   * Calls executor to dispatch the flow, update db to assign the executor and
+   * in-memory state of executableFlow
+   */
+  private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
+    Executor choosenExecutor) throws ExecutorManagerException {
+    exflow.setUpdateTime(System.currentTimeMillis());
+    callExecutorServer(exflow, choosenExecutor,
+      ConnectorParams.EXECUTE_ACTION);
+    executorLoader.assignExecutor(choosenExecutor.getId(),
+      exflow.getExecutionId());
+    reference.setExecutor(choosenExecutor);
+
+    // move from flow to running flows
+    runningFlows.put(exflow.getExecutionId(),
+      new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
+
+    logger.info(String.format(
+      "Successfully dispatched exec %d with error count %d",
+      exflow.getExecutionId(), reference.getNumErrors()));
+  }
+
   /*
    * This thread is responsible for processing queued flows using dispatcher and
    * making rest api calls to executor server
@@ -1925,23 +1944,5 @@ public class ExecutorManager extends EventHandler implements
       // schedule can starve all others
       queuedFlows.enqueue(exflow, reference);
     }
-
-    private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
-      Executor choosenExecutor) throws ExecutorManagerException {
-      exflow.setUpdateTime(System.currentTimeMillis());
-      callExecutorServer(exflow, choosenExecutor,
-        ConnectorParams.EXECUTE_ACTION);
-      executorLoader.assignExecutor(choosenExecutor.getId(),
-        exflow.getExecutionId());
-      reference.setExecutor(choosenExecutor);
-
-      // move from flow to running flows
-      runningFlows.put(exflow.getExecutionId(),
-        new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
-
-      logger.info(String.format(
-        "Successfully dispatched exec %d with error count %d",
-        exflow.getExecutionId(), reference.getNumErrors()));
-    }
   }
 }
\ No newline at end of file