azkaban-aplcache

Fix the race condition where two same flows are being submitted

4/4/2017 5:04:45 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 930f9c0..a018542 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -19,6 +19,7 @@ package azkaban.executor;
 import azkaban.metrics.CommonMetrics;
 import azkaban.constants.ServerProperties;
 import azkaban.utils.FlowUtils;
+import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
@@ -101,6 +102,7 @@ public class ExecutorManager extends EventHandler implements
 
   final private Set<Executor> activeExecutors = new HashSet<Executor>();
   private QueueProcessorThread queueProcessor;
+  private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
 
   private ExecutingManagerUpdaterThread executingManager;
   // 12 weeks
@@ -462,6 +464,11 @@ public class ExecutorManager extends EventHandler implements
     List<Integer> executionIds = new ArrayList<Integer>();
     executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
       queuedFlows.getAllEntries()));
+    // it's possible an execution is runningCandidate, meaning it's in dispatching state neither in queuedFlows nor runningFlows,
+    // so checks the runningCandidate as well.
+    if (runningCandidate != null) {
+      executionIds.addAll(getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(runningCandidate)));
+    }
     executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
       runningFlows.values()));
     Collections.sort(executionIds);
@@ -934,7 +941,11 @@ public class ExecutorManager extends EventHandler implements
   @Override
   public String submitExecutableFlow(ExecutableFlow exflow, String userId)
     throws ExecutorManagerException {
-    synchronized (exflow) {
+
+    String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
+    // using project and flow name to prevent race condition when same flow is submitted by API and schedule at the same time
+    // causing two same flow submission entering this piece.
+    synchronized (exFlowKey.intern()) {
       String flowId = exflow.getFlowId();
 
       logger.info("Submitting execution flow " + flowId + " by " + userId);
@@ -1791,18 +1802,17 @@ public class ExecutorManager extends EventHandler implements
       int maxContinuousFlowProcessed) throws InterruptedException,
       ExecutorManagerException {
       long lastExecutorRefreshTime = 0;
-      Pair<ExecutionReference, ExecutableFlow> runningCandidate;
       int currentContinuousFlowProcessed = 0;
 
       while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
         ExecutionReference reference = runningCandidate.getFirst();
         ExecutableFlow exflow = runningCandidate.getSecond();
-
         long currentTime = System.currentTimeMillis();
 
         // if we have dispatched more than maxContinuousFlowProcessed or
         // It has been more then activeExecutorsRefreshWindow millisec since we
         // refreshed
+
         if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
           || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
           // Refresh executorInfo for all activeExecutors
@@ -1811,6 +1821,7 @@ public class ExecutorManager extends EventHandler implements
           currentContinuousFlowProcessed = 0;
         }
 
+
         /**
          * <pre>
          *  TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
@@ -1827,6 +1838,7 @@ public class ExecutorManager extends EventHandler implements
         if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
           // put back in the queue
           queuedFlows.enqueue(exflow, reference);
+          runningCandidate = null;
           long sleepInterval =
             activeExecutorsRefreshWindow
               - (currentTime - lastExecutorRefreshTime);
@@ -1836,6 +1848,7 @@ public class ExecutorManager extends EventHandler implements
           exflow.setUpdateTime(currentTime);
           // process flow with current snapshot of activeExecutors
           selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+          runningCandidate = null;
         }
 
         // do not count failed flow processsing (flows still in queue)
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 2a1fd9f..3419235 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -240,6 +240,7 @@ public class ExecuteFlowAction implements TriggerAction {
     exflow.setExecutionOptions(executionOptions);
 
     try {
+      logger.info("Invoking flow " + project.getName() + "." + flowName);
       executorManager.submitExecutableFlow(exflow, submitUser);
       logger.info("Invoked flow " + project.getName() + "." + flowName);
     } catch (ExecutorManagerException e) {