azkaban-aplcache

block deactivating executor until all flow preparation work

3/1/2019 9:20:17 PM
3.69.0

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
index b8692e7..e966e11 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
@@ -286,6 +286,23 @@ public class ExecutionFlowDao {
     }
   }
 
+  /**
+   * set executor id to null for the execution id
+   */
+  public void unsetExecutorIdForExecution(final int executionId) throws ExecutorManagerException {
+    final String UNSET_EXECUTOR = "UPDATE execution_flows SET executor_id = null where exec_id = ?";
+
+    final SQLTransaction<Integer> unsetExecutor =
+        transOperator -> transOperator.update(UNSET_EXECUTOR, executionId);
+
+    try {
+      this.dbOperator.transaction(unsetExecutor);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error unsetting executor id for execution " + executionId,
+          e);
+    }
+  }
+
   public int selectAndUpdateExecution(final int executorId, final boolean isActive)
       throws ExecutorManagerException {
     final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ? where exec_id = ?";
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
index 9887911..ac7635a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorLoader.java
@@ -281,6 +281,9 @@ public interface ExecutorLoader {
   int removeExecutionLogsByTime(long millis)
       throws ExecutorManagerException;
 
+  void unsetExecutorIdForExecution(final int executionId) throws ExecutorManagerException;
+
   int selectAndUpdateExecution(final int executorId, boolean isActive)
       throws ExecutorManagerException;
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 6adf826..3eb91bf 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -351,4 +351,9 @@ public class JdbcExecutorLoader implements ExecutorLoader {
       throws ExecutorManagerException {
     return this.executionFlowDao.selectAndUpdateExecution(executorId, isActive);
   }
+
+  @Override
+  public void unsetExecutorIdForExecution(final int executionId) throws ExecutorManagerException {
+    this.executionFlowDao.unsetExecutorIdForExecution(executionId);
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
index d1675e8..f3b0c78 100644
--- a/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
+++ b/azkaban-common/src/test/java/azkaban/executor/MockExecutorLoader.java
@@ -456,4 +456,8 @@ public class MockExecutorLoader implements ExecutorLoader {
       throws ExecutorManagerException {
     return 1;
   }
+
+  @Override
+  public void unsetExecutorIdForExecution(final int executionId) {
+  }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
index 0136a93..536f89c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -47,6 +47,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
   public static final String JSON_MIME_TYPE = "application/json";
   private static final Logger logger = Logger.getLogger(ExecutorServlet.class
       .getName());
+  private static final long serialVersionUID = -3528600004096666451L;
   private AzkabanExecutorServer application;
   private FlowRunnerManager flowRunnerManager;
 
@@ -96,55 +97,55 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
       throws IOException {
     final HashMap<String, Object> respMap = new HashMap<>();
     try {
-      if (!hasParam(req, ACTION_PARAM)) {
+      if (!hasParam(req, ConnectorParams.ACTION_PARAM)) {
         logger.error("Parameter action not set");
         respMap.put("error", "Parameter action not set");
       } else {
-        final String action = getParam(req, ACTION_PARAM);
-        if (action.equals(UPDATE_ACTION)) {
+        final String action = getParam(req, ConnectorParams.ACTION_PARAM);
+        if (action.equals(ConnectorParams.UPDATE_ACTION)) {
           handleAjaxUpdateRequest(req, respMap);
-        } else if (action.equals(PING_ACTION)) {
-          respMap.put(STATUS_PARAM, RESPONSE_ALIVE);
-        } else if (action.equals(RELOAD_JOBTYPE_PLUGINS_ACTION)) {
+        } else if (action.equals(ConnectorParams.PING_ACTION)) {
+          respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_ALIVE);
+        } else if (action.equals(ConnectorParams.RELOAD_JOBTYPE_PLUGINS_ACTION)) {
           logger.info("Reloading Jobtype plugins");
           handleReloadJobTypePlugins(respMap);
-        } else if (action.equals(ACTIVATE)) {
+        } else if (action.equals(ConnectorParams.ACTIVATE)) {
           logger.warn("Setting ACTIVE flag to true");
           setActive(true, respMap);
-        } else if (action.equals(GET_STATUS)) {
+        } else if (action.equals(ConnectorParams.GET_STATUS)) {
           logger.debug("Get Executor Status: ");
           getStatus(respMap);
-        } else if (action.equals(DEACTIVATE)) {
+        } else if (action.equals(ConnectorParams.DEACTIVATE)) {
           logger.warn("Setting ACTIVE flag to false");
           setActive(false, respMap);
-        } else if (action.equals(SHUTDOWN)) {
+        } else if (action.equals(ConnectorParams.SHUTDOWN)) {
           shutdown(respMap);
         } else {
-          final int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
-          final String user = getParam(req, USER_PARAM, null);
+          final int execid = Integer.parseInt(getParam(req, ConnectorParams.EXECID_PARAM));
+          final String user = getParam(req, ConnectorParams.USER_PARAM, null);
 
           logger.info("User " + user + " has called action " + action + " on "
               + execid);
-          if (action.equals(METADATA_ACTION)) {
+          if (action.equals(ConnectorParams.METADATA_ACTION)) {
             handleFetchMetaDataEvent(execid, req, resp, respMap);
-          } else if (action.equals(LOG_ACTION)) {
+          } else if (action.equals(ConnectorParams.LOG_ACTION)) {
             handleFetchLogEvent(execid, req, resp, respMap);
-          } else if (action.equals(ATTACHMENTS_ACTION)) {
+          } else if (action.equals(ConnectorParams.ATTACHMENTS_ACTION)) {
             handleFetchAttachmentsEvent(execid, req, resp, respMap);
-          } else if (action.equals(EXECUTE_ACTION)) {
+          } else if (action.equals(ConnectorParams.EXECUTE_ACTION)) {
             handleAjaxExecute(req, respMap, execid);
-          } else if (action.equals(STATUS_ACTION)) {
+          } else if (action.equals(ConnectorParams.STATUS_ACTION)) {
             handleAjaxFlowStatus(respMap, execid);
-          } else if (action.equals(CANCEL_ACTION)) {
+          } else if (action.equals(ConnectorParams.CANCEL_ACTION)) {
             logger.info("Cancel called.");
             handleAjaxCancel(respMap, execid, user);
-          } else if (action.equals(PAUSE_ACTION)) {
+          } else if (action.equals(ConnectorParams.PAUSE_ACTION)) {
             logger.info("Paused called.");
             handleAjaxPause(respMap, execid, user);
-          } else if (action.equals(RESUME_ACTION)) {
+          } else if (action.equals(ConnectorParams.RESUME_ACTION)) {
             logger.info("Resume called.");
             handleAjaxResume(respMap, execid, user);
-          } else if (action.equals(MODIFY_EXECUTION_ACTION)) {
+          } else if (action.equals(ConnectorParams.MODIFY_EXECUTION_ACTION)) {
             logger.info("Modify Execution Action");
             handleModifyExecutionRequest(respMap, execid, user, req);
           } else {
@@ -155,7 +156,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
       }
     } catch (final Exception e) {
       logger.error(e.getMessage(), e);
-      respMap.put(RESPONSE_ERROR, e.getMessage());
+      respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
     }
     writeJSON(resp, respMap);
     resp.flushBuffer();
@@ -163,13 +164,13 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 
   private void handleModifyExecutionRequest(final Map<String, Object> respMap,
       final int execId, final String user, final HttpServletRequest req) throws ServletException {
-    if (!hasParam(req, MODIFY_EXECUTION_ACTION_TYPE)) {
-      respMap.put(RESPONSE_ERROR, "Modification type not set.");
+    if (!hasParam(req, ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE)) {
+      respMap.put(ConnectorParams.RESPONSE_ERROR, "Modification type not set.");
     }
-    final String modificationType = getParam(req, MODIFY_EXECUTION_ACTION_TYPE);
+    final String modificationType = getParam(req, ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE);
 
     try {
-      if (MODIFY_RETRY_FAILURES.equals(modificationType)) {
+      if (ConnectorParams.MODIFY_RETRY_FAILURES.equals(modificationType)) {
         this.flowRunnerManager.retryFailures(execId, user);
       }
     } catch (final ExecutorManagerException e) {
@@ -195,7 +196,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
         respMap.putAll(result.toObject());
       } catch (final Exception e) {
         logger.error(e.getMessage(), e);
-        respMap.put(RESPONSE_ERROR, e.getMessage());
+        respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
       }
     } else {
       final int attempt = getIntParam(req, "attempt", 0);
@@ -254,10 +255,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
       final Map<String, Object> respMap) throws ServletException, IOException {
     final ArrayList<Object> updateTimesList =
         (ArrayList<Object>) JSONUtils.parseJSONFromString(getParam(req,
-            UPDATE_TIME_LIST_PARAM));
+            ConnectorParams.UPDATE_TIME_LIST_PARAM));
     final ArrayList<Object> execIDList =
         (ArrayList<Object>) JSONUtils.parseJSONFromString(getParam(req,
-            EXEC_ID_LIST_PARAM));
+            ConnectorParams.EXEC_ID_LIST_PARAM));
 
     final ArrayList<Object> updateList = new ArrayList<>();
     for (int i = 0; i < execIDList.size(); ++i) {
@@ -267,8 +268,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
       final ExecutableFlowBase flow = this.flowRunnerManager.getExecutableFlow(execId);
       if (flow == null) {
         final Map<String, Object> errorResponse = new HashMap<>();
-        errorResponse.put(RESPONSE_ERROR, "Flow does not exist");
-        errorResponse.put(UPDATE_MAP_EXEC_ID, execId);
+        errorResponse.put(ConnectorParams.RESPONSE_ERROR, "Flow does not exist");
+        errorResponse.put(ConnectorParams.UPDATE_MAP_EXEC_ID, execId);
         updateList.add(errorResponse);
         continue;
       }
@@ -278,7 +279,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
       }
     }
 
-    respMap.put(RESPONSE_UPDATED_FLOWS, updateList);
+    respMap.put(ConnectorParams.RESPONSE_UPDATED_FLOWS, updateList);
   }
 
   private void handleAjaxExecute(final HttpServletRequest req,
@@ -287,90 +288,90 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
       this.flowRunnerManager.submitFlow(execId);
     } catch (final ExecutorManagerException e) {
       logger.error(e.getMessage(), e);
-      respMap.put(RESPONSE_ERROR, e.getMessage());
+      respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
     }
   }
 
   private void handleAjaxFlowStatus(final Map<String, Object> respMap, final int execid) {
     final ExecutableFlowBase flow = this.flowRunnerManager.getExecutableFlow(execid);
     if (flow == null) {
-      respMap.put(STATUS_PARAM, RESPONSE_NOTFOUND);
+      respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_NOTFOUND);
     } else {
-      respMap.put(STATUS_PARAM, flow.getStatus().toString());
-      respMap.put(RESPONSE_UPDATETIME, flow.getUpdateTime());
+      respMap.put(ConnectorParams.STATUS_PARAM, flow.getStatus().toString());
+      respMap.put(ConnectorParams.RESPONSE_UPDATETIME, flow.getUpdateTime());
     }
   }
 
   private void handleAjaxPause(final Map<String, Object> respMap, final int execid,
       final String user) {
     if (user == null) {
-      respMap.put(RESPONSE_ERROR, "user has not been set");
+      respMap.put(ConnectorParams.RESPONSE_ERROR, "user has not been set");
       return;
     }
 
     try {
       this.flowRunnerManager.pauseFlow(execid, user);
-      respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
     } catch (final ExecutorManagerException e) {
       logger.error(e.getMessage(), e);
-      respMap.put(RESPONSE_ERROR, e.getMessage());
+      respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
     }
   }
 
   private void handleAjaxResume(final Map<String, Object> respMap, final int execid,
       final String user) throws ServletException {
     if (user == null) {
-      respMap.put(RESPONSE_ERROR, "user has not been set");
+      respMap.put(ConnectorParams.RESPONSE_ERROR, "user has not been set");
       return;
     }
 
     try {
       this.flowRunnerManager.resumeFlow(execid, user);
-      respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
     } catch (final ExecutorManagerException e) {
       logger.error(e.getMessage(), e);
-      respMap.put(RESPONSE_ERROR, e.getMessage());
+      respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
     }
   }
 
   private void handleAjaxCancel(final Map<String, Object> respMap, final int execid,
       final String user) {
     if (user == null) {
-      respMap.put(RESPONSE_ERROR, "user has not been set");
+      respMap.put(ConnectorParams.RESPONSE_ERROR, "user has not been set");
       return;
     }
 
     try {
       this.flowRunnerManager.cancelFlow(execid, user);
-      respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
     } catch (final ExecutorManagerException e) {
       logger.error(e.getMessage(), e);
-      respMap.put(RESPONSE_ERROR, e.getMessage());
+      respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
     }
   }
 
   private void handleReloadJobTypePlugins(final Map<String, Object> respMap) {
     try {
       this.flowRunnerManager.reloadJobTypePlugins();
-      respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
     } catch (final Exception e) {
       logger.error(e.getMessage(), e);
-      respMap.put(RESPONSE_ERROR, e.getMessage());
+      respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
     }
   }
 
   private void setActive(final boolean value, final Map<String, Object> respMap) {
     try {
       setActiveInternal(value);
-      respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
     } catch (final Exception e) {
       logger.error(e.getMessage(), e);
-      respMap.put(RESPONSE_ERROR, e.getMessage());
+      respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
     }
   }
 
   private void setActiveInternal(final boolean value)
-      throws ExecutorManagerException {
+      throws ExecutorManagerException, InterruptedException {
     this.flowRunnerManager.setExecutorActive(value,
         this.application.getHost(), this.application.getPort());
   }
@@ -387,10 +388,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
       // Set the executor to inactive. Will receive no new flows.
       setActiveInternal(false);
       this.application.shutdown();
-      respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
     } catch (final Exception e) {
       logger.error(e.getMessage(), e);
-      respMap.put(RESPONSE_ERROR, e.getMessage());
+      respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
     }
   }
 
@@ -403,10 +404,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 
       respMap.put("executor_id", Integer.toString(executor.getId()));
       respMap.put("isActive", String.valueOf(executor.isActive()));
-      respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+      respMap.put(ConnectorParams.STATUS_PARAM, ConnectorParams.RESPONSE_SUCCESS);
     } catch (final Exception e) {
       logger.error(e.getMessage(), e);
-      respMap.put(RESPONSE_ERROR, e.getMessage());
+      respMap.put(ConnectorParams.RESPONSE_ERROR, e.getMessage());
     }
   }
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index b9c6bf1..096d4cb 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -58,6 +58,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -72,6 +73,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -119,6 +121,8 @@ public class FlowRunnerManager implements EventListener,
   // in the queue waiting to be executed or in executing state.
   private final Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<>();
   private final Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<>();
+  // keep track of the number of flow being setup({@link createFlowRunner()})
+  private final AtomicInteger preparingFlowCount = new AtomicInteger(0);
   private final Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap<>();
   private final TrackingThreadPool executorService;
   private final CleanerThread cleanerThread;
@@ -150,7 +154,7 @@ public class FlowRunnerManager implements EventListener,
   // date time of the the last flow submitted.
   private long lastFlowSubmittedDate = 0;
   // Indicate if the executor is set to active.
-  private boolean active;
+  private volatile boolean active;
 
   @Inject
   public FlowRunnerManager(final Props props,
@@ -277,7 +281,7 @@ public class FlowRunnerManager implements EventListener,
   }
 
   public void setExecutorActive(final boolean isActive, final String host, final int port)
-      throws ExecutorManagerException {
+      throws ExecutorManagerException, InterruptedException {
     final Executor executor = this.executorLoader.fetchExecutor(host, port);
     Preconditions.checkState(executor != null, "Unable to obtain self entry in DB");
     if (executor.isActive() != isActive) {
@@ -288,6 +292,28 @@ public class FlowRunnerManager implements EventListener,
           "Set active action ignored. Executor is already " + (isActive ? "active" : "inactive"));
     }
     this.active = isActive;
+    if (!this.active) {
+      // When deactivating this executor, this call will wait to return until every thread in {@link
+      // #createFlowRunner} has finished. When deploying new executor, old running executor will be
+      // deactivated before new one is activated and only one executor is allowed to
+      // delete/hard-linking project dirs to avoid race condition described in {@link
+      // FlowPreparer#setup}. So to make deactivation process block until flow preparation work
+      // finishes guarantees the old executor won't access {@link FlowPreparer#setup} after
+      // deactivation.
+      waitUntilFlowPreparationFinish();
+    }
+  }
+
+  /**
+   * Wait until ongoing flow preparation work finishes.
+   */
+  private void waitUntilFlowPreparationFinish() throws InterruptedException {
+    final Duration SLEEP_INTERVAL = Duration.ofSeconds(5);
+    while (this.preparingFlowCount.intValue() != 0) {
+      logger.info(this.preparingFlowCount + " flow(s) is/are still being setup before complete "
+          + "deactivation.");
+      Thread.sleep(SLEEP_INTERVAL.toMillis());
+    }
   }
 
   public long getLastFlowSubmittedTime() {
@@ -309,7 +335,9 @@ public class FlowRunnerManager implements EventListener,
     if (isAlreadyRunning(execId)) {
       return;
     }
+
     final FlowRunner runner = createFlowRunner(execId);
+
     // Check again.
     if (isAlreadyRunning(execId)) {
       return;
@@ -333,6 +361,15 @@ public class FlowRunnerManager implements EventListener,
     return false;
   }
 
+  /**
+   * return whether this execution has useExecutor defined. useExecutor is for running test
+   * executions on inactive executor.
+   */
+  private boolean isExecutorSpecified(final ExecutableFlow flow) {
+    return flow.getExecutionOptions().getFlowParameters()
+        .containsKey(ExecutionOptions.USE_EXECUTOR);
+  }
+
   private FlowRunner createFlowRunner(final int execId) throws ExecutorManagerException {
     final ExecutableFlow flow;
     flow = this.executorLoader.fetchExecutableFlow(execId);
@@ -341,6 +378,8 @@ public class FlowRunnerManager implements EventListener,
           + execId);
     }
 
+    // Sets up the project files and execution directory.
+    this.preparingFlowCount.incrementAndGet();
     // Record the time between submission, and when the flow preparation/execution starts.
     // Note that since submit time is recorded on the web server, while flow preparation is on
     // the executor, there could be some inaccuracies due to clock skew.
@@ -348,10 +387,18 @@ public class FlowRunnerManager implements EventListener,
         flow.getExecutableFlow().getSubmitTime());
 
     final Timer.Context flowPrepTimerContext = commonMetrics.getFlowSetupTimerContext();
+
     try {
-      // Sets up the project files and execution directory.
-      this.flowPreparer.setup(flow);
+      if (this.active || isExecutorSpecified(flow)) {
+        this.flowPreparer.setup(flow);
+      } else {
+        // Unset the executor.
+        this.executorLoader.unsetExecutorIdForExecution(execId);
+        throw new ExecutorManagerException("executor became inactive before setting up the "
+            + "flow " + execId);
+      }
     } finally {
+      this.preparingFlowCount.decrementAndGet();
       flowPrepTimerContext.stop();
     }