azkaban-aplcache

Improve dispatch request handling of a previously submitted

11/16/2018 11:27:44 PM

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 3fb6eb0..5b7bd5c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -275,13 +275,35 @@ public class FlowRunnerManager implements EventListener,
   }
 
   public void submitFlow(final int execId) throws ExecutorManagerException {
-    // Load file and submit
+    if (isAlreadyRunning(execId)) {
+      return;
+    }
+    final FlowRunner runner = createFlowRunner(execId);
+    // Check again.
+    if (isAlreadyRunning(execId)) {
+      return;
+    }
+    submitFlowRunner(runner);
+  }
+
+  private boolean isAlreadyRunning(int execId) throws ExecutorManagerException {
     if (this.runningFlows.containsKey(execId)) {
-      throw new ExecutorManagerException("Execution " + execId
-          + " is already running.");
+      logger.info("Execution " + execId + " is already in running.");
+      if (!this.submittedFlows.containsValue(execId)) {
+        // Execution had been added to running flows but not submitted - something's wrong.
+        // Return a response with error: this is a cue for the dispatcher to retry or finalize the
+        // execution as failed.
+        throw new ExecutorManagerException("Execution " + execId +
+            " is in runningFlows but not in submittedFlows. Most likely submission had failed.");
+      }
+      // Already running, everything seems fine. Report as a successful submission.
+      return true;
     }
+    return false;
+  }
 
-    ExecutableFlow flow = null;
+  private FlowRunner createFlowRunner(final int execId) throws ExecutorManagerException {
+    final ExecutableFlow flow;
     flow = this.executorLoader.fetchExecutableFlow(execId);
     if (flow == null) {
       throw new ExecutorManagerException("Error loading flow with exec "
@@ -335,16 +357,11 @@ public class FlowRunnerManager implements EventListener,
         .setNumJobThreads(numJobThreads).addListener(this);
 
     configureFlowLevelMetrics(runner);
+    return runner;
+  }
 
-    // Check again.
-    if (this.runningFlows.containsKey(execId)) {
-      throw new ExecutorManagerException("Execution " + execId
-          + " is already running.");
-    }
-
-    // Finally, queue the sucker.
-    this.runningFlows.put(execId, runner);
-
+  private void submitFlowRunner(final FlowRunner runner) throws ExecutorManagerException {
+    this.runningFlows.put(runner.getExecutionId(), runner);
     try {
       // The executorService already has a queue.
       // The submit method below actually returns an instance of FutureTask,
@@ -356,6 +373,7 @@ public class FlowRunnerManager implements EventListener,
       // update the last submitted time.
       this.lastFlowSubmittedDate = System.currentTimeMillis();
     } catch (final RejectedExecutionException re) {
+      this.runningFlows.remove(runner.getExecutionId());
       final StringBuffer errorMsg = new StringBuffer(
           "Azkaban executor can't execute any more flows. ");
       if (this.executorService.isShutdown()) {