azkaban-aplcache

Refactor ExecutorManager (#1971) * Refactoring: move ExecutingManagerUpdaterThread

10/15/2018 3:18:11 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java
new file mode 100644
index 0000000..28b8bb0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.utils.Props;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+/**
+ * Loads & provides executors.
+ */
+public class ActiveExecutors {
+
+  private static final Logger logger = Logger.getLogger(ExecutorManager.class);
+
+  private volatile ImmutableSet<Executor> activeExecutors;
+  private final Props azkProps;
+  private final ExecutorLoader executorLoader;
+
+  @Inject
+  public ActiveExecutors(final Props azkProps, final ExecutorLoader executorLoader) {
+    this.azkProps = azkProps;
+    this.executorLoader = executorLoader;
+  }
+
+  /**
+   * Loads executors. Can be also used to reload executors if there have been changes in the DB.
+   *
+   * @throws ExecutorManagerException if no active executors are found or if loading executors
+   * fails.
+   */
+  public void setupExecutors() throws ExecutorManagerException {
+    final ImmutableSet<Executor> newExecutors;
+    if (ExecutorManager.isMultiExecutorMode(this.azkProps)) {
+      newExecutors = setupMultiExecutors();
+    } else {
+      // TODO remove this - switch everything to use multi-executor mode
+      newExecutors = setupSingleExecutor();
+    }
+    if (newExecutors.isEmpty()) {
+      final String error = "No active executors found";
+      logger.error(error);
+      throw new ExecutorManagerException(error);
+    } else {
+      this.activeExecutors = newExecutors;
+    }
+  }
+
+  /**
+   * Returns all executors. The result is cached. To reload, call {@link #setupExecutors()}.
+   *
+   * @return all executors
+   */
+  public Collection<Executor> getAll() {
+    return this.activeExecutors;
+  }
+
+  private ImmutableSet<Executor> setupMultiExecutors() throws ExecutorManagerException {
+    logger.info("Initializing multi executors from database.");
+    return ImmutableSet.copyOf(this.executorLoader.fetchActiveExecutors());
+  }
+
+  private ImmutableSet<Executor> setupSingleExecutor() throws ExecutorManagerException {
+    if (this.azkProps.containsKey(ConfigurationKeys.EXECUTOR_PORT)) {
+      return getOrAddSingleExecutor();
+    } else {
+      // throw exception when in single executor mode and no executor port specified in azkaban
+      // properties
+      //todo chengren311: convert to slf4j and parameterized logging
+      final String error = "Missing" + ConfigurationKeys.EXECUTOR_PORT + " in azkaban properties.";
+      logger.error(error);
+      throw new ExecutorManagerException(error);
+    }
+  }
+
+  private ImmutableSet<Executor> getOrAddSingleExecutor() throws ExecutorManagerException {
+    // add single executor, if specified as per properties
+    final String executorHost = this.azkProps
+        .getString(ConfigurationKeys.EXECUTOR_HOST, "localhost");
+    final int executorPort = this.azkProps.getInt(ConfigurationKeys.EXECUTOR_PORT);
+    logger.info(String.format("Initializing single executor %s:%d", executorHost, executorPort));
+    Executor executor = this.executorLoader.fetchExecutor(executorHost, executorPort);
+    if (executor == null) {
+      executor = this.executorLoader.addExecutor(executorHost, executorPort);
+    } else if (!executor.isActive()) {
+      executor.setActive(true);
+      this.executorLoader.updateExecutor(executor);
+    }
+    return ImmutableSet.of(new Executor(executor.getId(), executorHost, executorPort, true));
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
new file mode 100644
index 0000000..53a23d3
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.alert.Alerter;
+import java.util.LinkedList;
+import java.util.List;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Handles removing of running executions (after they have been deemed to be be done or orphaned).
+ */
+public class ExecutionFinalizer {
+
+  private static final Logger logger = Logger.getLogger(ExecutionFinalizer.class);
+
+  private final ExecutorLoader executorLoader;
+  private final ExecutorManagerUpdaterStage updaterStage;
+  private final AlerterHolder alerterHolder;
+  private final RunningExecutions runningExecutions;
+
+  @Inject
+  public ExecutionFinalizer(final ExecutorLoader executorLoader,
+      final ExecutorManagerUpdaterStage updaterStage,
+      final AlerterHolder alerterHolder, final RunningExecutions runningExecutions) {
+    this.executorLoader = executorLoader;
+    this.updaterStage = updaterStage;
+    this.alerterHolder = alerterHolder;
+    this.runningExecutions = runningExecutions;
+  }
+
+  /**
+   * If the current status of the execution is not one of the finished statuses, marks the execution
+   * as failed in the DB. Removes the execution from the running executions cache.
+   *
+   * @param flow the execution
+   * @param reason reason for finalizing the execution
+   * @param originalError the cause, if execution is being finalized because of an error
+   */
+  public void finalizeFlow(final ExecutableFlow flow, final String reason,
+      @Nullable final Throwable originalError) {
+
+    final int execId = flow.getExecutionId();
+    boolean alertUser = true;
+    final String[] extraReasons = getFinalizeFlowReasons(reason, originalError);
+    this.updaterStage.set("finalizing flow " + execId);
+    // First we check if the execution in the datastore is complete
+    try {
+      final ExecutableFlow dsFlow;
+      if (ExecutorManager.isFinished(flow)) {
+        dsFlow = flow;
+      } else {
+        this.updaterStage.set("finalizing flow " + execId + " loading from db");
+        dsFlow = this.executorLoader.fetchExecutableFlow(execId);
+
+        // If it's marked finished, we're good. If not, we fail everything and
+        // then mark it finished.
+        if (!ExecutorManager.isFinished(dsFlow)) {
+          this.updaterStage.set("finalizing flow " + execId + " failing the flow");
+          failEverything(dsFlow);
+          this.executorLoader.updateExecutableFlow(dsFlow);
+        }
+      }
+
+      this.updaterStage.set("finalizing flow " + execId + " deleting active reference");
+
+      // Delete the executing reference.
+      if (flow.getEndTime() == -1) {
+        flow.setEndTime(System.currentTimeMillis());
+        this.executorLoader.updateExecutableFlow(dsFlow);
+      }
+      this.executorLoader.removeActiveExecutableReference(execId);
+
+      this.updaterStage.set("finalizing flow " + execId + " cleaning from memory");
+      this.runningExecutions.get().remove(execId);
+    } catch (final ExecutorManagerException e) {
+      alertUser = false; // failed due to azkaban internal error, not to alert user
+      logger.error(e);
+    }
+
+    // TODO append to the flow log that we marked this flow as failed + the extraReasons
+
+    this.updaterStage.set("finalizing flow " + execId + " alerting and emailing");
+    if (alertUser) {
+      final ExecutionOptions options = flow.getExecutionOptions();
+      // But we can definitely email them.
+      final Alerter mailAlerter = this.alerterHolder.get("email");
+      if (flow.getStatus() != Status.SUCCEEDED) {
+        if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
+          try {
+            mailAlerter.alertOnError(flow, extraReasons);
+          } catch (final Exception e) {
+            logger.error(e);
+          }
+        }
+        if (options.getFlowParameters().containsKey("alert.type")) {
+          final String alertType = options.getFlowParameters().get("alert.type");
+          final Alerter alerter = this.alerterHolder.get(alertType);
+          if (alerter != null) {
+            try {
+              alerter.alertOnError(flow, extraReasons);
+            } catch (final Exception e) {
+              logger.error("Failed to alert by " + alertType, e);
+            }
+          } else {
+            logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+          }
+        }
+      } else {
+        if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
+          try {
+
+            mailAlerter.alertOnSuccess(flow);
+          } catch (final Exception e) {
+            logger.error(e);
+          }
+        }
+        if (options.getFlowParameters().containsKey("alert.type")) {
+          final String alertType = options.getFlowParameters().get("alert.type");
+          final Alerter alerter = this.alerterHolder.get(alertType);
+          if (alerter != null) {
+            try {
+              alerter.alertOnSuccess(flow);
+            } catch (final Exception e) {
+              logger.error("Failed to alert by " + alertType, e);
+            }
+          } else {
+            logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+          }
+        }
+      }
+    }
+
+  }
+
+  private String[] getFinalizeFlowReasons(final String reason, final Throwable originalError) {
+    final List<String> reasons = new LinkedList<>();
+    reasons.add(reason);
+    if (originalError != null) {
+      reasons.add(ExceptionUtils.getStackTrace(originalError));
+    }
+    return reasons.toArray(new String[reasons.size()]);
+  }
+
+  private void failEverything(final ExecutableFlow exFlow) {
+    final long time = System.currentTimeMillis();
+    for (final ExecutableNode node : exFlow.getExecutableNodes()) {
+      switch (node.getStatus()) {
+        case SUCCEEDED:
+        case FAILED:
+        case KILLED:
+        case SKIPPED:
+        case DISABLED:
+          continue;
+          // case UNKNOWN:
+        case READY:
+          node.setStatus(Status.KILLING);
+          break;
+        default:
+          node.setStatus(Status.FAILED);
+          break;
+      }
+
+      if (node.getStartTime() == -1) {
+        node.setStartTime(time);
+      }
+      if (node.getEndTime() == -1) {
+        node.setEndTime(time);
+      }
+    }
+
+    if (exFlow.getEndTime() == -1) {
+      exFlow.setEndTime(time);
+    }
+
+    exFlow.setStatus(Status.FAILED);
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3641e5e..c6fe2a2 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -20,7 +20,6 @@ import static java.util.Objects.requireNonNull;
 
 import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
-import azkaban.alert.Alerter;
 import azkaban.event.EventHandler;
 import azkaban.executor.selector.ExecutorComparator;
 import azkaban.executor.selector.ExecutorFilter;
@@ -32,10 +31,8 @@ import azkaban.project.ProjectWhitelist;
 import azkaban.utils.AuthenticationUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import java.io.BufferedReader;
 import java.io.File;
@@ -50,15 +47,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -69,7 +63,6 @@ import java.util.regex.Pattern;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
@@ -96,42 +89,46 @@ public class ExecutorManager extends EventHandler implements
       * 24 * 60 * 60 * 1000L;
   private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
   private static final Logger logger = Logger.getLogger(ExecutorManager.class);
-  private final AlerterHolder alerterHolder;
+  private final RunningExecutions runningExecutions;
   private final Props azkProps;
   private final CommonMetrics commonMetrics;
   private final ExecutorLoader executorLoader;
   private final CleanerThread cleanerThread;
-  private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
-      new ConcurrentHashMap<>();
-  private final ExecutingManagerUpdaterThread executingManager;
+  private final RunningExecutionsUpdaterThread updaterThread;
   private final ExecutorApiGateway apiGateway;
   private final int maxConcurrentRunsOneFlow;
+  private final ExecutorManagerUpdaterStage updaterStage;
+  private final ExecutionFinalizer executionFinalizer;
+  private final ActiveExecutors activeExecutors;
   QueuedExecutions queuedFlows;
   File cacheDir;
-  //make it immutable to ensure threadsafety
-  private volatile ImmutableSet<Executor> activeExecutors = null;
   private QueueProcessorThread queueProcessor;
   private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
-  private long lastCleanerThreadCheckTime = -1;
-  private long lastThreadCheckTime = -1;
-  private String updaterStage = "not started";
   private List<String> filterList;
   private Map<String, Integer> comparatorWeightsMap;
   private long lastSuccessfulExecutorInfoRefresh;
   private ExecutorService executorInforRefresherService;
 
   @Inject
-  public ExecutorManager(final Props azkProps, final ExecutorLoader loader,
-      final AlerterHolder alerterHolder,
+  public ExecutorManager(final Props azkProps, final ExecutorLoader executorLoader,
       final CommonMetrics commonMetrics,
-      final ExecutorApiGateway apiGateway) throws ExecutorManagerException {
-    this.alerterHolder = alerterHolder;
+      final ExecutorApiGateway apiGateway,
+      final RunningExecutions runningExecutions,
+      final ActiveExecutors activeExecutors,
+      final ExecutorManagerUpdaterStage updaterStage,
+      final ExecutionFinalizer executionFinalizer,
+      final RunningExecutionsUpdaterThread updaterThread) throws ExecutorManagerException {
     this.azkProps = azkProps;
     this.commonMetrics = commonMetrics;
-    this.executorLoader = loader;
+    this.executorLoader = executorLoader;
     this.apiGateway = apiGateway;
+    this.runningExecutions = runningExecutions;
+    this.activeExecutors = activeExecutors;
+    this.updaterStage = updaterStage;
+    this.executionFinalizer = executionFinalizer;
+    this.updaterThread = updaterThread;
     this.setupExecutors();
-    this.loadRunningFlows();
+    this.loadRunningExecutions();
 
     this.queuedFlows = new QueuedExecutions(
         azkProps.getLong(Constants.ConfigurationKeys.WEBSERVER_QUEUE_SIZE, 100000));
@@ -145,8 +142,6 @@ public class ExecutorManager extends EventHandler implements
 
     this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
 
-    this.executingManager = new ExecutingManagerUpdaterThread();
-
     if (isMultiExecutorMode()) {
       setupMultiExecutorMode();
     }
@@ -158,8 +153,25 @@ public class ExecutorManager extends EventHandler implements
     this.cleanerThread = new CleanerThread(executionLogsRetentionMs);
   }
 
+  // TODO move to some common place
+  static boolean isFinished(final ExecutableFlow flow) {
+    switch (flow.getStatus()) {
+      case SUCCEEDED:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  // TODO switch to always use "multi executor mode" - even for single server
+  public static boolean isMultiExecutorMode(final Props props) {
+    return props.getBoolean(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, false);
+  }
+
   public void start() {
-    this.executingManager.start();
+    this.updaterThread.start();
     this.cleanerThread.start();
     if (isMultiExecutorMode()) {
       this.queueProcessor.start();
@@ -177,15 +189,14 @@ public class ExecutorManager extends EventHandler implements
   }
 
   private void setupMultiExecutorMode() {
-    // initliatize hard filters for executor selector from azkaban.properties
+    // initialize hard filters for executor selector from azkaban.properties
     final String filters = this.azkProps
         .getString(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_FILTERS, "");
     if (filters != null) {
       this.filterList = Arrays.asList(StringUtils.split(filters, ","));
     }
 
-    // initliatize comparator feature weights for executor selector from
-    // azkaban.properties
+    // initialize comparator feature weights for executor selector from azkaban.properties
     final Map<String, String> compListStrings = this.azkProps
         .getMapByPrefix(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
     if (compListStrings != null) {
@@ -208,7 +219,7 @@ public class ExecutorManager extends EventHandler implements
                 Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5),
             this.azkProps.getInt(
                 Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
-                this.activeExecutors.size()));
+                this.activeExecutors.getAll().size()));
   }
 
   /**
@@ -218,48 +229,11 @@ public class ExecutorManager extends EventHandler implements
    */
   @Override
   public void setupExecutors() throws ExecutorManagerException {
-    final Set<Executor> newExecutors = new HashSet<>();
-
-    if (isMultiExecutorMode()) {
-      logger.info("Initializing multi executors from database.");
-      newExecutors.addAll(this.executorLoader.fetchActiveExecutors());
-    } else if (this.azkProps.containsKey(ConfigurationKeys.EXECUTOR_PORT)) {
-      // add local executor, if specified as per properties
-      final String executorHost = this.azkProps
-          .getString(Constants.ConfigurationKeys.EXECUTOR_HOST, "localhost");
-      final int executorPort = this.azkProps.getInt(ConfigurationKeys.EXECUTOR_PORT);
-      logger.info(String.format("Initializing local executor %s:%d",
-          executorHost, executorPort));
-      Executor executor =
-          this.executorLoader.fetchExecutor(executorHost, executorPort);
-      if (executor == null) {
-        executor = this.executorLoader.addExecutor(executorHost, executorPort);
-      } else if (!executor.isActive()) {
-        executor.setActive(true);
-        this.executorLoader.updateExecutor(executor);
-      }
-      newExecutors.add(new Executor(executor.getId(), executorHost,
-          executorPort, true));
-    } else {
-      // throw exception when in single executor mode and no executor port specified in azkaban
-      // properties
-      //todo chengren311: convert to slf4j and parameterized logging
-      final String error = "Missing" + ConfigurationKeys.EXECUTOR_PORT + " in azkaban properties.";
-      logger.error(error);
-      throw new ExecutorManagerException(error);
-    }
-
-    if (newExecutors.isEmpty()) {
-      final String error = "No active executor found";
-      logger.error(error);
-      throw new ExecutorManagerException(error);
-    } else {
-      this.activeExecutors = ImmutableSet.copyOf(newExecutors);
-    }
+    this.activeExecutors.setupExecutors();
   }
 
   private boolean isMultiExecutorMode() {
-    return this.azkProps.getBoolean(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, false);
+    return isMultiExecutorMode(this.azkProps);
   }
 
   /**
@@ -269,7 +243,7 @@ public class ExecutorManager extends EventHandler implements
 
     final List<Pair<Executor, Future<ExecutorInfo>>> futures =
         new ArrayList<>();
-    for (final Executor executor : this.activeExecutors) {
+    for (final Executor executor : this.activeExecutors.getAll()) {
       // execute each executorInfo refresh task to fetch
       final Future<ExecutorInfo> fetchExecutionInfo =
           this.executorInforRefresherService.submit(
@@ -382,26 +356,26 @@ public class ExecutorManager extends EventHandler implements
 
   @Override
   public State getExecutorManagerThreadState() {
-    return this.executingManager.getState();
+    return this.updaterThread.getState();
   }
 
   public String getExecutorThreadStage() {
-    return this.updaterStage;
+    return this.updaterStage.get();
   }
 
   @Override
   public boolean isExecutorManagerThreadActive() {
-    return this.executingManager.isAlive();
+    return this.updaterThread.isAlive();
   }
 
   @Override
   public long getLastExecutorManagerThreadCheckTime() {
-    return this.lastThreadCheckTime;
+    return this.updaterThread.getLastThreadCheckTime();
   }
 
   @Override
   public Collection<Executor> getAllActiveExecutors() {
-    return Collections.unmodifiableCollection(this.activeExecutors);
+    return Collections.unmodifiableCollection(this.activeExecutors.getAll());
   }
 
   /**
@@ -411,7 +385,7 @@ public class ExecutorManager extends EventHandler implements
    */
   @Override
   public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
-    for (final Executor executor : this.activeExecutors) {
+    for (final Executor executor : this.activeExecutors.getAll()) {
       if (executor.getId() == executorId) {
         return executor;
       }
@@ -423,7 +397,7 @@ public class ExecutorManager extends EventHandler implements
   public Set<String> getPrimaryServerHosts() {
     // Only one for now. More probably later.
     final HashSet<String> ports = new HashSet<>();
-    for (final Executor executor : this.activeExecutors) {
+    for (final Executor executor : this.activeExecutors.getAll()) {
       ports.add(executor.getHost() + ":" + executor.getPort());
     }
     return ports;
@@ -433,11 +407,11 @@ public class ExecutorManager extends EventHandler implements
   public Set<String> getAllActiveExecutorServerHosts() {
     // Includes non primary server/hosts
     final HashSet<String> ports = new HashSet<>();
-    for (final Executor executor : this.activeExecutors) {
+    for (final Executor executor : this.activeExecutors.getAll()) {
       ports.add(executor.getHost() + ":" + executor.getPort());
     }
     // include executor which were initially active and still has flows running
-    for (final Pair<ExecutionReference, ExecutableFlow> running : this.runningFlows
+    for (final Pair<ExecutionReference, ExecutableFlow> running : this.runningExecutions.get()
         .values()) {
       final ExecutionReference ref = running.getFirst();
       if (ref.getExecutor().isPresent()) {
@@ -448,12 +422,12 @@ public class ExecutorManager extends EventHandler implements
     return ports;
   }
 
-  private void loadRunningFlows() throws ExecutorManagerException {
+  private void loadRunningExecutions() throws ExecutorManagerException {
     logger.info("Loading running flows from database..");
     final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = this.executorLoader
         .fetchActiveFlows();
     logger.info("Loaded " + activeFlows.size() + " running flows");
-    this.runningFlows.putAll(activeFlows);
+    this.runningExecutions.get().putAll(activeFlows);
   }
 
   /*
@@ -490,7 +464,7 @@ public class ExecutorManager extends EventHandler implements
               getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(this.runningCandidate)));
     }
     executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
-        this.runningFlows.values()));
+        this.runningExecutions.get().values()));
     Collections.sort(executionIds);
     return executionIds;
   }
@@ -519,7 +493,7 @@ public class ExecutorManager extends EventHandler implements
     final List<Pair<ExecutableFlow, Optional<Executor>>> flows =
         new ArrayList<>();
     getActiveFlowsWithExecutorHelper(flows, this.queuedFlows.getAllEntries());
-    getActiveFlowsWithExecutorHelper(flows, this.runningFlows.values());
+    getActiveFlowsWithExecutorHelper(flows, this.runningExecutions.get().values());
     return flows;
   }
 
@@ -546,7 +520,7 @@ public class ExecutorManager extends EventHandler implements
             || isFlowRunningHelper(projectId, flowId, this.queuedFlows.getAllEntries());
     isRunning =
         isRunning
-            || isFlowRunningHelper(projectId, flowId, this.runningFlows.values());
+            || isFlowRunningHelper(projectId, flowId, this.runningExecutions.get().values());
     return isRunning;
   }
 
@@ -584,7 +558,7 @@ public class ExecutorManager extends EventHandler implements
   public List<ExecutableFlow> getRunningFlows() {
     final ArrayList<ExecutableFlow> flows = new ArrayList<>();
     getActiveFlowHelper(flows, this.queuedFlows.getAllEntries());
-    getActiveFlowHelper(flows, this.runningFlows.values());
+    getActiveFlowHelper(flows, this.runningExecutions.get().values());
     return flows;
   }
 
@@ -609,7 +583,7 @@ public class ExecutorManager extends EventHandler implements
   public String getRunningFlowIds() {
     final List<Integer> allIds = new ArrayList<>();
     getRunningFlowsIdsHelper(allIds, this.queuedFlows.getAllEntries());
-    getRunningFlowsIdsHelper(allIds, this.runningFlows.values());
+    getRunningFlowsIdsHelper(allIds, this.runningExecutions.get().values());
     Collections.sort(allIds);
     return allIds.toString();
   }
@@ -628,7 +602,6 @@ public class ExecutorManager extends EventHandler implements
     return allIds.toString();
   }
 
-
   public long getQueuedFlowSize() {
     return this.queuedFlows.size();
   }
@@ -713,7 +686,7 @@ public class ExecutorManager extends EventHandler implements
   public LogData getExecutableFlowLog(final ExecutableFlow exFlow, final int offset,
       final int length) throws ExecutorManagerException {
     final Pair<ExecutionReference, ExecutableFlow> pair =
-        this.runningFlows.get(exFlow.getExecutionId());
+        this.runningExecutions.get().get(exFlow.getExecutionId());
     if (pair != null) {
       final Pair<String, String> typeParam = new Pair<>("type", "flow");
       final Pair<String, String> offsetParam =
@@ -737,7 +710,7 @@ public class ExecutorManager extends EventHandler implements
   public LogData getExecutionJobLog(final ExecutableFlow exFlow, final String jobId,
       final int offset, final int length, final int attempt) throws ExecutorManagerException {
     final Pair<ExecutionReference, ExecutableFlow> pair =
-        this.runningFlows.get(exFlow.getExecutionId());
+        this.runningExecutions.get().get(exFlow.getExecutionId());
     if (pair != null) {
       final Pair<String, String> typeParam = new Pair<>("type", "job");
       final Pair<String, String> jobIdParam =
@@ -765,7 +738,7 @@ public class ExecutorManager extends EventHandler implements
   public List<Object> getExecutionJobStats(final ExecutableFlow exFlow, final String jobId,
       final int attempt) throws ExecutorManagerException {
     final Pair<ExecutionReference, ExecutableFlow> pair =
-        this.runningFlows.get(exFlow.getExecutionId());
+        this.runningExecutions.get().get(exFlow.getExecutionId());
     if (pair == null) {
       return this.executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
           attempt);
@@ -883,7 +856,7 @@ public class ExecutorManager extends EventHandler implements
       final String jobId, final int offset, final int length, final int attempt)
       throws ExecutorManagerException {
     final Pair<ExecutionReference, ExecutableFlow> pair =
-        this.runningFlows.get(exFlow.getExecutionId());
+        this.runningExecutions.get().get(exFlow.getExecutionId());
     if (pair != null) {
 
       final Pair<String, String> typeParam = new Pair<>("type", "job");
@@ -916,14 +889,15 @@ public class ExecutorManager extends EventHandler implements
   public void cancelFlow(final ExecutableFlow exFlow, final String userId)
       throws ExecutorManagerException {
     synchronized (exFlow) {
-      if (this.runningFlows.containsKey(exFlow.getExecutionId())) {
+      if (this.runningExecutions.get().containsKey(exFlow.getExecutionId())) {
         final Pair<ExecutionReference, ExecutableFlow> pair =
-            this.runningFlows.get(exFlow.getExecutionId());
+            this.runningExecutions.get().get(exFlow.getExecutionId());
         this.apiGateway.callWithReferenceByUser(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
             userId);
       } else if (this.queuedFlows.hasExecution(exFlow.getExecutionId())) {
         this.queuedFlows.dequeue(exFlow.getExecutionId());
-        finalizeFlows(exFlow, "Cancelled before dispatching to executor", null);
+        this.executionFinalizer
+            .finalizeFlow(exFlow, "Cancelled before dispatching to executor", null);
       } else {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -937,7 +911,7 @@ public class ExecutorManager extends EventHandler implements
       throws ExecutorManagerException {
     synchronized (exFlow) {
       final Pair<ExecutionReference, ExecutableFlow> pair =
-          this.runningFlows.get(exFlow.getExecutionId());
+          this.runningExecutions.get().get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -953,7 +927,7 @@ public class ExecutorManager extends EventHandler implements
       throws ExecutorManagerException {
     synchronized (exFlow) {
       final Pair<ExecutionReference, ExecutableFlow> pair =
-          this.runningFlows.get(exFlow.getExecutionId());
+          this.runningExecutions.get().get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -1018,7 +992,7 @@ public class ExecutorManager extends EventHandler implements
       throws ExecutorManagerException {
     synchronized (exFlow) {
       final Pair<ExecutionReference, ExecutableFlow> pair =
-          this.runningFlows.get(exFlow.getExecutionId());
+          this.runningExecutions.get().get(exFlow.getExecutionId());
       if (pair == null) {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -1142,7 +1116,7 @@ public class ExecutorManager extends EventHandler implements
           this.queuedFlows.enqueue(exflow, reference);
         } else {
           // assign only local executor we have
-          final Executor choosenExecutor = this.activeExecutors.iterator().next();
+          final Executor choosenExecutor = this.activeExecutors.getAll().iterator().next();
           this.executorLoader.addActiveExecutableReference(reference);
           try {
             dispatch(reference, exflow, choosenExecutor);
@@ -1153,7 +1127,7 @@ public class ExecutorManager extends EventHandler implements
             // this logic is only implemented in multiExecutorMode but
             // missed in single executor case.
             this.commonMetrics.markDispatchFail();
-            finalizeFlows(exflow, "Dispatching failed", e);
+            this.executionFinalizer.finalizeFlow(exflow, "Dispatching failed", e);
             throw e;
           }
         }
@@ -1204,7 +1178,6 @@ public class ExecutorManager extends EventHandler implements
         "/stats", paramList);
   }
 
-
   @Override
   public Map<String, Object> callExecutorJMX(final String hostPort, final String action,
       final String mBean) throws IOException {
@@ -1226,259 +1199,7 @@ public class ExecutorManager extends EventHandler implements
     if (isMultiExecutorMode()) {
       this.queueProcessor.shutdown();
     }
-    this.executingManager.shutdown();
-  }
-
-  private void finalizeFlows(final ExecutableFlow flow, final String reason,
-      final Throwable originalError) {
-
-    final int execId = flow.getExecutionId();
-    boolean alertUser = true;
-    final String[] extraReasons = getFinalizeFlowReasons(reason, originalError);
-    this.updaterStage = "finalizing flow " + execId;
-    // First we check if the execution in the datastore is complete
-    try {
-      final ExecutableFlow dsFlow;
-      if (isFinished(flow)) {
-        dsFlow = flow;
-      } else {
-        this.updaterStage = "finalizing flow " + execId + " loading from db";
-        dsFlow = this.executorLoader.fetchExecutableFlow(execId);
-
-        // If it's marked finished, we're good. If not, we fail everything and
-        // then mark it finished.
-        if (!isFinished(dsFlow)) {
-          this.updaterStage = "finalizing flow " + execId + " failing the flow";
-          failEverything(dsFlow);
-          this.executorLoader.updateExecutableFlow(dsFlow);
-        }
-      }
-
-      this.updaterStage = "finalizing flow " + execId + " deleting active reference";
-
-      // Delete the executing reference.
-      if (flow.getEndTime() == -1) {
-        flow.setEndTime(System.currentTimeMillis());
-        this.executorLoader.updateExecutableFlow(dsFlow);
-      }
-      this.executorLoader.removeActiveExecutableReference(execId);
-
-      this.updaterStage = "finalizing flow " + execId + " cleaning from memory";
-      this.runningFlows.remove(execId);
-    } catch (final ExecutorManagerException e) {
-      alertUser = false; // failed due to azkaban internal error, not to alert user
-      logger.error(e);
-    }
-
-    // TODO append to the flow log that we marked this flow as failed + the extraReasons
-
-    this.updaterStage = "finalizing flow " + execId + " alerting and emailing";
-    if (alertUser) {
-      final ExecutionOptions options = flow.getExecutionOptions();
-      // But we can definitely email them.
-      final Alerter mailAlerter = this.alerterHolder.get("email");
-      if (flow.getStatus() != Status.SUCCEEDED) {
-        if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
-          try {
-            mailAlerter.alertOnError(flow, extraReasons);
-          } catch (final Exception e) {
-            logger.error(e);
-          }
-        }
-        if (options.getFlowParameters().containsKey("alert.type")) {
-          final String alertType = options.getFlowParameters().get("alert.type");
-          final Alerter alerter = this.alerterHolder.get(alertType);
-          if (alerter != null) {
-            try {
-              alerter.alertOnError(flow, extraReasons);
-            } catch (final Exception e) {
-              logger.error("Failed to alert by " + alertType, e);
-            }
-          } else {
-            logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
-          }
-        }
-      } else {
-        if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
-          try {
-
-            mailAlerter.alertOnSuccess(flow);
-          } catch (final Exception e) {
-            logger.error(e);
-          }
-        }
-        if (options.getFlowParameters().containsKey("alert.type")) {
-          final String alertType = options.getFlowParameters().get("alert.type");
-          final Alerter alerter = this.alerterHolder.get(alertType);
-          if (alerter != null) {
-            try {
-              alerter.alertOnSuccess(flow);
-            } catch (final Exception e) {
-              logger.error("Failed to alert by " + alertType, e);
-            }
-          } else {
-            logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
-          }
-        }
-      }
-    }
-
-  }
-
-  private String[] getFinalizeFlowReasons(final String reason, final Throwable originalError) {
-    final List<String> reasons = new LinkedList<>();
-    reasons.add(reason);
-    if (originalError != null) {
-      reasons.add(ExceptionUtils.getStackTrace(originalError));
-    }
-    return reasons.toArray(new String[reasons.size()]);
-  }
-
-  private void failEverything(final ExecutableFlow exFlow) {
-    final long time = System.currentTimeMillis();
-    for (final ExecutableNode node : exFlow.getExecutableNodes()) {
-      switch (node.getStatus()) {
-        case SUCCEEDED:
-        case FAILED:
-        case KILLED:
-        case SKIPPED:
-        case DISABLED:
-          continue;
-          // case UNKNOWN:
-        case READY:
-          node.setStatus(Status.KILLING);
-          break;
-        default:
-          node.setStatus(Status.FAILED);
-          break;
-      }
-
-      if (node.getStartTime() == -1) {
-        node.setStartTime(time);
-      }
-      if (node.getEndTime() == -1) {
-        node.setEndTime(time);
-      }
-    }
-
-    if (exFlow.getEndTime() == -1) {
-      exFlow.setEndTime(time);
-    }
-
-    exFlow.setStatus(Status.FAILED);
-  }
-
-  private ExecutableFlow updateExecution(final Map<String, Object> updateData)
-      throws ExecutorManagerException {
-
-    final Integer execId =
-        (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
-    if (execId == null) {
-      throw new ExecutorManagerException(
-          "Response is malformed. Need exec id to update.");
-    }
-
-    final Pair<ExecutionReference, ExecutableFlow> refPair =
-        this.runningFlows.get(execId);
-    if (refPair == null) {
-      throw new ExecutorManagerException(
-          "No running flow found with the execution id. Removing " + execId);
-    }
-
-    final ExecutionReference ref = refPair.getFirst();
-    final ExecutableFlow flow = refPair.getSecond();
-    if (updateData.containsKey("error")) {
-      // The flow should be finished here.
-      throw new ExecutorManagerException((String) updateData.get("error"), flow);
-    }
-
-    // Reset errors.
-    ref.setNextCheckTime(0);
-    ref.setNumErrors(0);
-    final Status oldStatus = flow.getStatus();
-    flow.applyUpdateObject(updateData);
-    final Status newStatus = flow.getStatus();
-
-    if (oldStatus != newStatus && newStatus == Status.FAILED) {
-      this.commonMetrics.markFlowFail();
-    }
-
-    final ExecutionOptions options = flow.getExecutionOptions();
-    if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
-      // We want to see if we should give an email status on first failure.
-      if (options.getNotifyOnFirstFailure()) {
-        final Alerter mailAlerter = this.alerterHolder.get("email");
-        try {
-          mailAlerter.alertOnFirstError(flow);
-        } catch (final Exception e) {
-          logger.error("Failed to send first error email." + e.getMessage(), e);
-        }
-      }
-      if (options.getFlowParameters().containsKey("alert.type")) {
-        final String alertType = options.getFlowParameters().get("alert.type");
-        final Alerter alerter = this.alerterHolder.get(alertType);
-        if (alerter != null) {
-          try {
-            alerter.alertOnFirstError(flow);
-          } catch (final Exception e) {
-            logger.error("Failed to alert by " + alertType, e);
-          }
-        } else {
-          logger.error("Alerter type " + alertType
-              + " doesn't exist. Failed to alert.");
-        }
-      }
-    }
-
-    return flow;
-  }
-
-  public boolean isFinished(final ExecutableFlow flow) {
-    switch (flow.getStatus()) {
-      case SUCCEEDED:
-      case FAILED:
-      case KILLED:
-        return true;
-      default:
-        return false;
-    }
-  }
-
-  private void fillUpdateTimeAndExecId(final List<ExecutableFlow> flows,
-      final List<Integer> executionIds, final List<Long> updateTimes) {
-    for (final ExecutableFlow flow : flows) {
-      executionIds.add(flow.getExecutionId());
-      updateTimes.add(flow.getUpdateTime());
-    }
-  }
-
-  /* Group Executable flow by Executors to reduce number of REST calls */
-  private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
-    final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
-        new HashMap<>();
-
-    for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningFlows
-        .values()) {
-      final ExecutionReference ref = runningFlow.getFirst();
-      final ExecutableFlow flow = runningFlow.getSecond();
-      final Optional<Executor> executor = ref.getExecutor();
-
-      // We can set the next check time to prevent the checking of certain
-      // flows.
-      if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
-        continue;
-      }
-
-      List<ExecutableFlow> flows = exFlowMap.get(executor);
-      if (flows == null) {
-        flows = new ArrayList<>();
-        exFlowMap.put(executor, flows);
-      }
-
-      flows.add(flow);
-    }
-
-    return exFlowMap;
+    this.updaterThread.shutdown();
   }
 
   @Override
@@ -1520,13 +1241,16 @@ public class ExecutorManager extends EventHandler implements
     reference.setExecutor(choosenExecutor);
 
     // move from flow to running flows
-    this.runningFlows.put(exflow.getExecutionId(),
-        new Pair<>(reference, exflow));
-    synchronized (this) {
-      // Wake up ExecutingManagerUpdaterThread from wait() so that it will immediately check status
+    this.runningExecutions.get().put(exflow.getExecutionId(), new Pair<>(reference, exflow));
+    synchronized (this.runningExecutions.get()) {
+      // Wake up RunningExecutionsUpdaterThread from wait() so that it will immediately check status
       // from executor(s). Normally flows will run at least some time and can't be cleaned up
       // immediately, so there will be another wait round (or many, actually), but for unit tests
       // this is significant to let them run quickly.
+      this.runningExecutions.get().notifyAll();
+    }
+    synchronized (this) {
+      // wake up all internal waiting threads, too
       this.notifyAll();
     }
 
@@ -1535,160 +1259,6 @@ public class ExecutorManager extends EventHandler implements
         exflow.getExecutionId(), reference.getNumErrors()));
   }
 
-  private class ExecutingManagerUpdaterThread extends Thread {
-
-    private final int waitTimeIdleMs = 2000;
-    private final int waitTimeMs = 500;
-    // When we have an http error, for that flow, we'll check every 10 secs, 360
-    // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
-    private final int numErrorsBetweenUnresponsiveEmail = 360;
-    // First email is sent after 1 minute of unresponsiveness
-    private final int numErrorsBeforeUnresponsiveEmail = 6;
-    private final long errorThreshold = 10000;
-    private boolean shutdown = false;
-
-    public ExecutingManagerUpdaterThread() {
-      this.setName("ExecutorManagerUpdaterThread");
-    }
-
-    private void shutdown() {
-      this.shutdown = true;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void run() {
-      while (!this.shutdown) {
-        try {
-          ExecutorManager.this.lastThreadCheckTime = System.currentTimeMillis();
-          ExecutorManager.this.updaterStage = "Starting update all flows.";
-
-          final Map<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
-              getFlowToExecutorMap();
-          final ArrayList<ExecutableFlow> finalizeFlows =
-              new ArrayList<>();
-
-          if (exFlowMap.size() > 0) {
-            for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap
-                .entrySet()) {
-              final List<Long> updateTimesList = new ArrayList<>();
-              final List<Integer> executionIdsList = new ArrayList<>();
-
-              final Optional<Executor> executorOption = entry.getKey();
-              if (!executorOption.isPresent()) {
-                for (final ExecutableFlow flow : entry.getValue()) {
-                  logger.warn("Finalizing execution " + flow.getExecutionId()
-                      + ". Executor id of this execution doesn't exist");
-                  finalizeFlows.add(flow);
-                }
-                continue;
-              }
-              final Executor executor = executorOption.get();
-
-              ExecutorManager.this.updaterStage =
-                  "Starting update flows on " + executor.getHost() + ":"
-                      + executor.getPort();
-
-              // We pack the parameters of the same host together before we
-              // query.
-              fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
-                  updateTimesList);
-
-              final Pair<String, String> updateTimes =
-                  new Pair<>(
-                      ConnectorParams.UPDATE_TIME_LIST_PARAM,
-                      JSONUtils.toJSON(updateTimesList));
-              final Pair<String, String> executionIds =
-                  new Pair<>(ConnectorParams.EXEC_ID_LIST_PARAM,
-                      JSONUtils.toJSON(executionIdsList));
-
-              Map<String, Object> results = null;
-              try {
-                results =
-                    ExecutorManager.this.apiGateway.callWithExecutionId(executor.getHost(),
-                        executor.getPort(), ConnectorParams.UPDATE_ACTION,
-                        null, null, executionIds, updateTimes);
-              } catch (final ExecutorManagerException e) {
-                logger.error("Failed to get update from executor " + executor.getHost(), e);
-                boolean sendUnresponsiveEmail = false;
-                for (final ExecutableFlow flow : entry.getValue()) {
-                  final Pair<ExecutionReference, ExecutableFlow> pair =
-                      ExecutorManager.this.runningFlows.get(flow.getExecutionId());
-                  // TODO can runningFlows.get ever return null, causing NPE below?
-
-                  ExecutorManager.this.updaterStage =
-                      "Failed to get update for flow " + pair.getSecond().getExecutionId();
-
-                  final ExecutionReference ref = pair.getFirst();
-                  ref.setNextCheckTime(System.currentTimeMillis() + this.errorThreshold);
-                  ref.setNumErrors(ref.getNumErrors() + 1);
-                  if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
-                      || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
-                    // if any of the executions has failed many enough updates, alert
-                    sendUnresponsiveEmail = true;
-                  }
-                }
-                if (sendUnresponsiveEmail) {
-                  final Alerter mailAlerter = ExecutorManager.this.alerterHolder.get("email");
-                  mailAlerter.alertOnFailedUpdate(executor, entry.getValue(), e);
-                }
-              }
-
-              // We gets results
-              if (results != null) {
-                final List<Map<String, Object>> executionUpdates =
-                    (List<Map<String, Object>>) results
-                        .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
-                for (final Map<String, Object> updateMap : executionUpdates) {
-                  try {
-                    final ExecutableFlow flow = updateExecution(updateMap);
-
-                    ExecutorManager.this.updaterStage = "Updated flow " + flow.getExecutionId();
-
-                    if (isFinished(flow)) {
-                      finalizeFlows.add(flow);
-                    }
-                  } catch (final ExecutorManagerException e) {
-                    final ExecutableFlow flow = e.getExecutableFlow();
-                    logger.error(e);
-
-                    if (flow != null) {
-                      logger.warn("Finalizing execution " + flow.getExecutionId());
-                      finalizeFlows.add(flow);
-                    }
-                  }
-                }
-              }
-            }
-
-            ExecutorManager.this.updaterStage =
-                "Finalizing " + finalizeFlows.size() + " error flows.";
-
-            // Kill error flows
-            for (final ExecutableFlow flow : finalizeFlows) {
-              finalizeFlows(flow, "Not running on the assigned executor (any more)", null);
-            }
-          }
-
-          ExecutorManager.this.updaterStage = "Updated all active flows. Waiting for next round.";
-
-          synchronized (ExecutorManager.this) {
-            try {
-              if (ExecutorManager.this.runningFlows.size() > 0) {
-                ExecutorManager.this.wait(this.waitTimeMs);
-              } else {
-                ExecutorManager.this.wait(this.waitTimeIdleMs);
-              }
-            } catch (final InterruptedException e) {
-            }
-          }
-        } catch (final Exception e) {
-          logger.error("Unexpected exception in updating executions", e);
-        }
-      }
-    }
-  }
-
   /*
    * cleaner thread to clean up execution_logs, etc in DB. Runs every hour.
    */
@@ -1719,8 +1289,6 @@ public class ExecutorManager extends EventHandler implements
       while (!this.shutdown) {
         synchronized (this) {
           try {
-            ExecutorManager.this.lastCleanerThreadCheckTime = System.currentTimeMillis();
-
             // Cleanup old stuff.
             final long currentTime = System.currentTimeMillis();
             if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > this.lastLogCleanTime) {
@@ -1872,14 +1440,16 @@ public class ExecutorManager extends EventHandler implements
     private void selectExecutorAndDispatchFlow(final ExecutionReference reference,
         final ExecutableFlow exflow)
         throws ExecutorManagerException {
-      final Set<Executor> remainingExecutors = new HashSet<>(ExecutorManager.this.activeExecutors);
+      final Set<Executor> remainingExecutors = new HashSet<>(
+          ExecutorManager.this.activeExecutors.getAll());
       synchronized (exflow) {
         for (int i = 0; i <= this.maxDispatchingErrors; i++) {
           final String giveUpReason = checkGiveUpDispatching(reference, remainingExecutors);
           if (giveUpReason != null) {
             logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
                 + giveUpReason);
-            finalizeFlows(exflow, "Failed to dispatch because " + giveUpReason, null);
+            ExecutorManager.this.executionFinalizer
+                .finalizeFlow(exflow, "Failed to dispatch because " + giveUpReason, null);
             // GIVE UP DISPATCHING - exit
             return;
           } else {
@@ -1917,13 +1487,16 @@ public class ExecutorManager extends EventHandler implements
             + " (tried " + reference.getNumErrors() + " executors)";
       } else if (remainingExecutors.isEmpty()) {
         return "tried calling all executors (total: "
-            + ExecutorManager.this.activeExecutors.size() + ") but all failed";
+            // TODO rather use the original size (activeExecutors may have been reloaded in the
+            // meanwhile)
+            + ExecutorManager.this.activeExecutors.getAll().size() + ") but all failed";
       } else {
         return null;
       }
     }
 
-    private void logFailedDispatchAttempt(final ExecutionReference reference, final ExecutableFlow exflow,
+    private void logFailedDispatchAttempt(final ExecutionReference reference,
+        final ExecutableFlow exflow,
         final Executor selectedExecutor, final ExecutorManagerException e) {
       logger.warn(String.format(
           "Executor %s responded with exception for exec: %d",
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerUpdaterStage.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerUpdaterStage.java
new file mode 100644
index 0000000..5f0cbf5
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerUpdaterStage.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+/**
+ * Holds value of execution update state (for monitoring).
+ */
+public class ExecutorManagerUpdaterStage {
+
+  private volatile String value = "not started";
+
+  /**
+   * Get the current value.
+   *
+   * @return the current value.
+   */
+  public String get() {
+    return value;
+  }
+
+  /**
+   * Set the value.
+   *
+   * @param value the new value to set.
+   */
+  public void set(String value) {
+    this.value = value;
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutions.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutions.java
new file mode 100644
index 0000000..74dacb1
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutions.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.utils.Pair;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Singleton;
+
+/**
+ * Provides access to running executions.
+ */
+@Singleton
+public class RunningExecutions {
+
+  private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningExecutions =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Get running executions.
+   *
+   * @return executions.
+   */
+  public ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> get() {
+    return runningExecutions;
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
new file mode 100644
index 0000000..e15e0c8
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.alert.Alerter;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+/**
+ * Updates running executions.
+ */
+public class RunningExecutionsUpdater {
+
+  private static final Logger logger = Logger.getLogger(RunningExecutionsUpdater.class);
+
+  // When we have an http error, for that flow, we'll check every 10 secs, 360
+  // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
+  private final int numErrorsBetweenUnresponsiveEmail = 360;
+  // First email is sent after 1 minute of unresponsiveness
+  private final int numErrorsBeforeUnresponsiveEmail = 6;
+  private final long errorThreshold = 10000;
+  private final ExecutorManagerUpdaterStage updaterStage;
+  private final AlerterHolder alerterHolder;
+  private final CommonMetrics commonMetrics;
+  private final ExecutorApiGateway apiGateway;
+  private final RunningExecutions runningExecutions;
+  private final ExecutionFinalizer executionFinalizer;
+
+  @Inject
+  public RunningExecutionsUpdater(final ExecutorManagerUpdaterStage updaterStage,
+      final AlerterHolder alerterHolder, final CommonMetrics commonMetrics,
+      final ExecutorApiGateway apiGateway, final RunningExecutions runningExecutions,
+      final ExecutionFinalizer executionFinalizer) {
+    this.updaterStage = updaterStage;
+    this.alerterHolder = alerterHolder;
+    this.commonMetrics = commonMetrics;
+    this.apiGateway = apiGateway;
+    this.runningExecutions = runningExecutions;
+    this.executionFinalizer = executionFinalizer;
+  }
+
+  /**
+   * Updates running executions.
+   */
+  @SuppressWarnings("unchecked")
+  public void updateExecutions() {
+    this.updaterStage.set("Starting update all flows.");
+    final Map<Optional<Executor>, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
+    final ArrayList<ExecutableFlow> finalizeFlows =
+        new ArrayList<>();
+
+    for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap
+        .entrySet()) {
+      final List<Long> updateTimesList = new ArrayList<>();
+      final List<Integer> executionIdsList = new ArrayList<>();
+
+      final Optional<Executor> executorOption = entry.getKey();
+      if (!executorOption.isPresent()) {
+        for (final ExecutableFlow flow : entry.getValue()) {
+          logger.warn("Finalizing execution " + flow.getExecutionId()
+              + ". Executor id of this execution doesn't exist");
+          finalizeFlows.add(flow);
+        }
+        continue;
+      }
+      final Executor executor = executorOption.get();
+
+      this.updaterStage.set("Starting update flows on " + executor.getHost() + ":"
+          + executor.getPort());
+
+      // We pack the parameters of the same host together before we
+      // query.
+      fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
+          updateTimesList);
+
+      final Pair<String, String> updateTimes =
+          new Pair<>(
+              ConnectorParams.UPDATE_TIME_LIST_PARAM,
+              JSONUtils.toJSON(updateTimesList));
+      final Pair<String, String> executionIds =
+          new Pair<>(ConnectorParams.EXEC_ID_LIST_PARAM,
+              JSONUtils.toJSON(executionIdsList));
+
+      Map<String, Object> results = null;
+      try {
+        results = this.apiGateway.callWithExecutionId(executor.getHost(),
+            executor.getPort(), ConnectorParams.UPDATE_ACTION,
+            null, null, executionIds, updateTimes);
+      } catch (final ExecutorManagerException e) {
+        handleException(entry, executor, e);
+      }
+
+      if (results != null) {
+        final List<Map<String, Object>> executionUpdates =
+            (List<Map<String, Object>>) results
+                .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+        for (final Map<String, Object> updateMap : executionUpdates) {
+          try {
+            final ExecutableFlow flow = updateExecution(updateMap);
+
+            this.updaterStage.set("Updated flow " + flow.getExecutionId());
+
+            if (ExecutorManager.isFinished(flow)) {
+              finalizeFlows.add(flow);
+            }
+          } catch (final ExecutorManagerException e) {
+            final ExecutableFlow flow = e.getExecutableFlow();
+            logger.error(e);
+
+            if (flow != null) {
+              logger.warn("Finalizing execution " + flow.getExecutionId());
+              finalizeFlows.add(flow);
+            }
+          }
+        }
+      }
+    }
+
+    this.updaterStage.set("Finalizing " + finalizeFlows.size() + " error flows.");
+
+    for (final ExecutableFlow flow : finalizeFlows) {
+      this.executionFinalizer
+          .finalizeFlow(flow, "Not running on the assigned executor (any more)", null);
+    }
+
+    this.updaterStage.set("Updated all active flows. Waiting for next round.");
+  }
+
+  private void handleException(Entry<Optional<Executor>, List<ExecutableFlow>> entry,
+      Executor executor, ExecutorManagerException e) {
+    logger.error("Failed to get update from executor " + executor.getHost(), e);
+    boolean sendUnresponsiveEmail = false;
+    for (final ExecutableFlow flow : entry.getValue()) {
+      final Pair<ExecutionReference, ExecutableFlow> pair =
+          this.runningExecutions.get().get(flow.getExecutionId());
+      // TODO can runningFlows.get ever return null, causing NPE below?
+
+      this.updaterStage
+          .set("Failed to get update for flow " + pair.getSecond().getExecutionId());
+
+      final ExecutionReference ref = pair.getFirst();
+      ref.setNextCheckTime(System.currentTimeMillis() + this.errorThreshold);
+      ref.setNumErrors(ref.getNumErrors() + 1);
+      if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
+          || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
+        // if any of the executions has failed many enough updates, alert
+        sendUnresponsiveEmail = true;
+      }
+    }
+    if (sendUnresponsiveEmail) {
+      final Alerter mailAlerter = this.alerterHolder.get("email");
+      mailAlerter.alertOnFailedUpdate(executor, entry.getValue(), e);
+    }
+  }
+
+  /* Group Executable flow by Executors to reduce number of REST calls */
+  private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
+    final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
+        new HashMap<>();
+
+    for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningExecutions.get()
+        .values()) {
+      final ExecutionReference ref = runningFlow.getFirst();
+      final ExecutableFlow flow = runningFlow.getSecond();
+      final Optional<Executor> executor = ref.getExecutor();
+
+      // We can set the next check time to prevent the checking of certain
+      // flows.
+      if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
+        continue;
+      }
+
+      List<ExecutableFlow> flows = exFlowMap.get(executor);
+      if (flows == null) {
+        flows = new ArrayList<>();
+        exFlowMap.put(executor, flows);
+      }
+
+      flows.add(flow);
+    }
+
+    return exFlowMap;
+  }
+
+  private void fillUpdateTimeAndExecId(final List<ExecutableFlow> flows,
+      final List<Integer> executionIds, final List<Long> updateTimes) {
+    for (final ExecutableFlow flow : flows) {
+      executionIds.add(flow.getExecutionId());
+      updateTimes.add(flow.getUpdateTime());
+    }
+  }
+
+  private ExecutableFlow updateExecution(final Map<String, Object> updateData)
+      throws ExecutorManagerException {
+
+    final Integer execId =
+        (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
+    if (execId == null) {
+      throw new ExecutorManagerException(
+          "Response is malformed. Need exec id to update.");
+    }
+
+    final Pair<ExecutionReference, ExecutableFlow> refPair =
+        this.runningExecutions.get().get(execId);
+    if (refPair == null) {
+      throw new ExecutorManagerException(
+          "No running flow found with the execution id. Removing " + execId);
+    }
+
+    final ExecutionReference ref = refPair.getFirst();
+    final ExecutableFlow flow = refPair.getSecond();
+    if (updateData.containsKey("error")) {
+      // The flow should be finished here.
+      throw new ExecutorManagerException((String) updateData.get("error"), flow);
+    }
+
+    // Reset errors.
+    ref.setNextCheckTime(0);
+    ref.setNumErrors(0);
+    final Status oldStatus = flow.getStatus();
+    flow.applyUpdateObject(updateData);
+    final Status newStatus = flow.getStatus();
+
+    if (oldStatus != newStatus && newStatus == Status.FAILED) {
+      this.commonMetrics.markFlowFail();
+    }
+
+    final ExecutionOptions options = flow.getExecutionOptions();
+    if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
+      // We want to see if we should give an email status on first failure.
+      if (options.getNotifyOnFirstFailure()) {
+        final Alerter mailAlerter = this.alerterHolder.get("email");
+        try {
+          mailAlerter.alertOnFirstError(flow);
+        } catch (final Exception e) {
+          logger.error("Failed to send first error email." + e.getMessage(), e);
+        }
+      }
+      if (options.getFlowParameters().containsKey("alert.type")) {
+        final String alertType = options.getFlowParameters().get("alert.type");
+        final Alerter alerter = this.alerterHolder.get(alertType);
+        if (alerter != null) {
+          try {
+            alerter.alertOnFirstError(flow);
+          } catch (final Exception e) {
+            logger.error("Failed to alert by " + alertType, e);
+          }
+        } else {
+          logger.error("Alerter type " + alertType
+              + " doesn't exist. Failed to alert.");
+        }
+      }
+    }
+
+    return flow;
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdaterThread.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdaterThread.java
new file mode 100644
index 0000000..a52ab09
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdaterThread.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+/**
+ * Updates running executions periodically.
+ */
+public class RunningExecutionsUpdaterThread extends Thread {
+
+  private static final Logger logger = Logger.getLogger(RunningExecutionsUpdaterThread.class);
+
+  volatile int waitTimeIdleMs = 2000;
+  volatile int waitTimeMs = 500;
+
+  private final RunningExecutionsUpdater updater;
+  private final RunningExecutions runningExecutions;
+  private long lastThreadCheckTime = -1;
+  private boolean shutdown = false;
+
+  @Inject
+  public RunningExecutionsUpdaterThread(final RunningExecutionsUpdater updater,
+      final RunningExecutions runningExecutions) {
+    this.updater = updater;
+    this.runningExecutions = runningExecutions;
+    this.setName("ExecutorManagerUpdaterThread");
+  }
+
+  /**
+   * Start the thread: updates running executions periodically.
+   */
+  @Override
+  @SuppressWarnings("unchecked")
+  public void run() {
+    while (!this.shutdown) {
+      try {
+        this.lastThreadCheckTime = System.currentTimeMillis();
+        this.updater.updateExecutions();
+        // TODO not sure why it would be important to check the status immediately in case of _new_
+        // executions. This can only optimize finalizing executions that finish super-quickly after
+        // being started.
+        waitForNewExecutions();
+      } catch (final Exception e) {
+        logger.error("Unexpected exception in updating executions", e);
+      }
+    }
+  }
+
+  private void waitForNewExecutions() {
+    synchronized (this.runningExecutions) {
+      try {
+        final int waitTimeMillis =
+            this.runningExecutions.get().size() > 0 ? this.waitTimeMs : this.waitTimeIdleMs;
+        if (waitTimeMillis > 0) {
+          this.runningExecutions.wait(waitTimeMillis);
+        }
+      } catch (final InterruptedException e) {
+      }
+    }
+  }
+
+  void shutdown() {
+    this.shutdown = true;
+  }
+
+  public long getLastThreadCheckTime() {
+    return this.lastThreadCheckTime;
+  }
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 7da9c95..49f639c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -69,6 +69,8 @@ public class ExecutorManagerTest {
   private AlerterHolder alertHolder;
   private ExecutorApiGateway apiGateway;
   private Alerter mailAlerter;
+  private RunningExecutions runningExecutions;
+  private ExecutorManagerUpdaterStage updaterStage;
 
   @Before
   public void setup() {
@@ -77,6 +79,8 @@ public class ExecutorManagerTest {
     this.alertHolder = mock(AlerterHolder.class);
     when(this.alertHolder.get("email")).thenReturn(this.mailAlerter);
     this.loader = new MockExecutorLoader();
+    this.runningExecutions = new RunningExecutions();
+    this.updaterStage = new ExecutorManagerUpdaterStage();
   }
 
   @After
@@ -143,8 +147,19 @@ public class ExecutorManagerTest {
 
   private ExecutorManager createExecutorManager()
       throws ExecutorManagerException {
-    return new ExecutorManager(this.props, this.loader, this.alertHolder, this.commonMetrics,
-        this.apiGateway);
+    // TODO rename this test to ExecutorManagerIntegrationTest & create separate unit tests as well?
+    final ActiveExecutors activeExecutors = new ActiveExecutors(this.props, this.loader);
+    final ExecutionFinalizer executionFinalizer = new ExecutionFinalizer(this.loader,
+        this.updaterStage, this.alertHolder, this.runningExecutions);
+    final RunningExecutionsUpdaterThread updaterThread = new RunningExecutionsUpdaterThread(
+        new RunningExecutionsUpdater(
+            this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
+            this.runningExecutions, executionFinalizer), runningExecutions);
+    updaterThread.waitTimeIdleMs = 0;
+    updaterThread.waitTimeMs = 0;
+    return new ExecutorManager(this.props, this.loader, this.commonMetrics, this.apiGateway,
+        this.runningExecutions, activeExecutors, this.updaterStage, executionFinalizer,
+        updaterThread);
   }
 
   /*
@@ -288,9 +303,8 @@ public class ExecutorManagerTest {
   }
 
   /**
-   * 1. Executor 1 throws an exception when trying to dispatch to it
-   * 2. ExecutorManager should try next executor
-   * 3. Executor 2 accepts the dispatched execution
+   * 1. Executor 1 throws an exception when trying to dispatch to it 2. ExecutorManager should try
+   * next executor 3. Executor 2 accepts the dispatched execution
    */
   @Test
   public void testDispatchException() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index d6a072e..c8ac487 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -18,12 +18,18 @@ package azkaban.trigger;
 
 import static org.mockito.Mockito.mock;
 
+import azkaban.executor.ActiveExecutors;
 import azkaban.executor.AlerterHolder;
+import azkaban.executor.ExecutionFinalizer;
 import azkaban.executor.ExecutorApiGateway;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.ExecutorManagerUpdaterStage;
 import azkaban.executor.MockExecutorLoader;
+import azkaban.executor.RunningExecutions;
+import azkaban.executor.RunningExecutionsUpdater;
+import azkaban.executor.RunningExecutionsUpdaterThread;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
 import azkaban.trigger.builtin.CreateTriggerAction;
@@ -44,6 +50,11 @@ public class TriggerManagerDeadlockTest {
   TriggerManager triggerManager;
   ExecutorLoader execLoader;
   ExecutorApiGateway apiGateway;
+  RunningExecutions runningExecutions;
+  private ExecutorManagerUpdaterStage updaterStage;
+  private AlerterHolder alertHolder;
+  private ExecutionFinalizer executionFinalizer;
+  private CommonMetrics commonMetrics;
 
   @Before
   public void setup() throws ExecutorManagerException, TriggerManagerException {
@@ -53,12 +64,30 @@ public class TriggerManagerDeadlockTest {
     props.put("executor.port", 12321);
     this.execLoader = new MockExecutorLoader();
     this.apiGateway = mock(ExecutorApiGateway.class);
-    final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
-    final ExecutorManager executorManager = new ExecutorManager(props, this.execLoader,
-        mock(AlerterHolder.class), commonMetrics, this.apiGateway);
+    this.runningExecutions = new RunningExecutions();
+    this.updaterStage = new ExecutorManagerUpdaterStage();
+    this.alertHolder = mock(AlerterHolder.class);
+    this.executionFinalizer = new ExecutionFinalizer(this.execLoader,
+        this.updaterStage, this.alertHolder, this.runningExecutions);
+    this.commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+    final ExecutorManager executorManager = getExecutorManager(props);
     this.triggerManager = new TriggerManager(props, this.loader, executorManager);
   }
 
+  private ExecutorManager getExecutorManager(final Props props) throws ExecutorManagerException {
+    final ActiveExecutors activeExecutors = new ActiveExecutors(props, this.execLoader);
+    final RunningExecutionsUpdaterThread updaterThread = getRunningExecutionsUpdaterThread();
+    return new ExecutorManager(props, this.execLoader, this.commonMetrics, this.apiGateway,
+        this.runningExecutions, activeExecutors, this.updaterStage, this.executionFinalizer,
+        updaterThread);
+  }
+
+  private RunningExecutionsUpdaterThread getRunningExecutionsUpdaterThread() {
+    return new RunningExecutionsUpdaterThread(new RunningExecutionsUpdater(
+        this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
+        this.runningExecutions, this.executionFinalizer), runningExecutions);
+  }
+
   @After
   public void tearDown() {