diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 930f9c0..a018542 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -19,6 +19,7 @@ package azkaban.executor;
import azkaban.metrics.CommonMetrics;
import azkaban.constants.ServerProperties;
import azkaban.utils.FlowUtils;
+import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
@@ -101,6 +102,7 @@ public class ExecutorManager extends EventHandler implements
final private Set<Executor> activeExecutors = new HashSet<Executor>();
private QueueProcessorThread queueProcessor;
+ private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
private ExecutingManagerUpdaterThread executingManager;
// 12 weeks
@@ -462,6 +464,11 @@ public class ExecutorManager extends EventHandler implements
List<Integer> executionIds = new ArrayList<Integer>();
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
queuedFlows.getAllEntries()));
+ // it's possible an execution is runningCandidate, meaning it's in dispatching state neither in queuedFlows nor runningFlows,
+ // so checks the runningCandidate as well.
+ if (runningCandidate != null) {
+ executionIds.addAll(getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(runningCandidate)));
+ }
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
runningFlows.values()));
Collections.sort(executionIds);
@@ -934,7 +941,11 @@ public class ExecutorManager extends EventHandler implements
@Override
public String submitExecutableFlow(ExecutableFlow exflow, String userId)
throws ExecutorManagerException {
- synchronized (exflow) {
+
+ String exFlowKey = exflow.getProjectName() + "." + exflow.getId() + ".submitFlow";
+ // using project and flow name to prevent race condition when same flow is submitted by API and schedule at the same time
+ // causing two same flow submission entering this piece.
+ synchronized (exFlowKey.intern()) {
String flowId = exflow.getFlowId();
logger.info("Submitting execution flow " + flowId + " by " + userId);
@@ -1791,18 +1802,17 @@ public class ExecutorManager extends EventHandler implements
int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
long lastExecutorRefreshTime = 0;
- Pair<ExecutionReference, ExecutableFlow> runningCandidate;
int currentContinuousFlowProcessed = 0;
while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
ExecutionReference reference = runningCandidate.getFirst();
ExecutableFlow exflow = runningCandidate.getSecond();
-
long currentTime = System.currentTimeMillis();
// if we have dispatched more than maxContinuousFlowProcessed or
// It has been more then activeExecutorsRefreshWindow millisec since we
// refreshed
+
if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
// Refresh executorInfo for all activeExecutors
@@ -1811,6 +1821,7 @@ public class ExecutorManager extends EventHandler implements
currentContinuousFlowProcessed = 0;
}
+
/**
* <pre>
* TODO: Work around till we improve Filters to have a notion of GlobalSystemState.
@@ -1827,6 +1838,7 @@ public class ExecutorManager extends EventHandler implements
if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
// put back in the queue
queuedFlows.enqueue(exflow, reference);
+ runningCandidate = null;
long sleepInterval =
activeExecutorsRefreshWindow
- (currentTime - lastExecutorRefreshTime);
@@ -1836,6 +1848,7 @@ public class ExecutorManager extends EventHandler implements
exflow.setUpdateTime(currentTime);
// process flow with current snapshot of activeExecutors
selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
+ runningCandidate = null;
}
// do not count failed flow processsing (flows still in queue)
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 2a1fd9f..3419235 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -240,6 +240,7 @@ public class ExecuteFlowAction implements TriggerAction {
exflow.setExecutionOptions(executionOptions);
try {
+ logger.info("Invoking flow " + project.getName() + "." + flowName);
executorManager.submitExecutableFlow(exflow, submitUser);
logger.info("Invoked flow " + project.getName() + "." + flowName);
} catch (ExecutorManagerException e) {