diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 3fb6eb0..5b7bd5c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -275,13 +275,35 @@ public class FlowRunnerManager implements EventListener,
}
public void submitFlow(final int execId) throws ExecutorManagerException {
- // Load file and submit
+ if (isAlreadyRunning(execId)) {
+ return;
+ }
+ final FlowRunner runner = createFlowRunner(execId);
+ // Check again.
+ if (isAlreadyRunning(execId)) {
+ return;
+ }
+ submitFlowRunner(runner);
+ }
+
+ private boolean isAlreadyRunning(int execId) throws ExecutorManagerException {
if (this.runningFlows.containsKey(execId)) {
- throw new ExecutorManagerException("Execution " + execId
- + " is already running.");
+ logger.info("Execution " + execId + " is already in running.");
+ if (!this.submittedFlows.containsValue(execId)) {
+ // Execution had been added to running flows but not submitted - something's wrong.
+ // Return a response with error: this is a cue for the dispatcher to retry or finalize the
+ // execution as failed.
+ throw new ExecutorManagerException("Execution " + execId +
+ " is in runningFlows but not in submittedFlows. Most likely submission had failed.");
+ }
+ // Already running, everything seems fine. Report as a successful submission.
+ return true;
}
+ return false;
+ }
- ExecutableFlow flow = null;
+ private FlowRunner createFlowRunner(final int execId) throws ExecutorManagerException {
+ final ExecutableFlow flow;
flow = this.executorLoader.fetchExecutableFlow(execId);
if (flow == null) {
throw new ExecutorManagerException("Error loading flow with exec "
@@ -335,16 +357,11 @@ public class FlowRunnerManager implements EventListener,
.setNumJobThreads(numJobThreads).addListener(this);
configureFlowLevelMetrics(runner);
+ return runner;
+ }
- // Check again.
- if (this.runningFlows.containsKey(execId)) {
- throw new ExecutorManagerException("Execution " + execId
- + " is already running.");
- }
-
- // Finally, queue the sucker.
- this.runningFlows.put(execId, runner);
-
+ private void submitFlowRunner(final FlowRunner runner) throws ExecutorManagerException {
+ this.runningFlows.put(runner.getExecutionId(), runner);
try {
// The executorService already has a queue.
// The submit method below actually returns an instance of FutureTask,
@@ -356,6 +373,7 @@ public class FlowRunnerManager implements EventListener,
// update the last submitted time.
this.lastFlowSubmittedDate = System.currentTimeMillis();
} catch (final RejectedExecutionException re) {
+ this.runningFlows.remove(runner.getExecutionId());
final StringBuffer errorMsg = new StringBuffer(
"Azkaban executor can't execute any more flows. ");
if (this.executorService.isShutdown()) {