azkaban-aplcache
Refactor ExecutorManager (#1971) * Refactoring: move ExecutingManagerUpdaterThread …
10/15/2018 3:18:11 PM
Changes
Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java
new file mode 100644
index 0000000..28b8bb0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutors.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.utils.Props;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+/**
+ * Loads & provides executors.
+ */
+public class ActiveExecutors {
+
+ private static final Logger logger = Logger.getLogger(ExecutorManager.class);
+
+ private volatile ImmutableSet<Executor> activeExecutors;
+ private final Props azkProps;
+ private final ExecutorLoader executorLoader;
+
+ @Inject
+ public ActiveExecutors(final Props azkProps, final ExecutorLoader executorLoader) {
+ this.azkProps = azkProps;
+ this.executorLoader = executorLoader;
+ }
+
+ /**
+ * Loads executors. Can be also used to reload executors if there have been changes in the DB.
+ *
+ * @throws ExecutorManagerException if no active executors are found or if loading executors
+ * fails.
+ */
+ public void setupExecutors() throws ExecutorManagerException {
+ final ImmutableSet<Executor> newExecutors;
+ if (ExecutorManager.isMultiExecutorMode(this.azkProps)) {
+ newExecutors = setupMultiExecutors();
+ } else {
+ // TODO remove this - switch everything to use multi-executor mode
+ newExecutors = setupSingleExecutor();
+ }
+ if (newExecutors.isEmpty()) {
+ final String error = "No active executors found";
+ logger.error(error);
+ throw new ExecutorManagerException(error);
+ } else {
+ this.activeExecutors = newExecutors;
+ }
+ }
+
+ /**
+ * Returns all executors. The result is cached. To reload, call {@link #setupExecutors()}.
+ *
+ * @return all executors
+ */
+ public Collection<Executor> getAll() {
+ return this.activeExecutors;
+ }
+
+ private ImmutableSet<Executor> setupMultiExecutors() throws ExecutorManagerException {
+ logger.info("Initializing multi executors from database.");
+ return ImmutableSet.copyOf(this.executorLoader.fetchActiveExecutors());
+ }
+
+ private ImmutableSet<Executor> setupSingleExecutor() throws ExecutorManagerException {
+ if (this.azkProps.containsKey(ConfigurationKeys.EXECUTOR_PORT)) {
+ return getOrAddSingleExecutor();
+ } else {
+ // throw exception when in single executor mode and no executor port specified in azkaban
+ // properties
+ //todo chengren311: convert to slf4j and parameterized logging
+ final String error = "Missing" + ConfigurationKeys.EXECUTOR_PORT + " in azkaban properties.";
+ logger.error(error);
+ throw new ExecutorManagerException(error);
+ }
+ }
+
+ private ImmutableSet<Executor> getOrAddSingleExecutor() throws ExecutorManagerException {
+ // add single executor, if specified as per properties
+ final String executorHost = this.azkProps
+ .getString(ConfigurationKeys.EXECUTOR_HOST, "localhost");
+ final int executorPort = this.azkProps.getInt(ConfigurationKeys.EXECUTOR_PORT);
+ logger.info(String.format("Initializing single executor %s:%d", executorHost, executorPort));
+ Executor executor = this.executorLoader.fetchExecutor(executorHost, executorPort);
+ if (executor == null) {
+ executor = this.executorLoader.addExecutor(executorHost, executorPort);
+ } else if (!executor.isActive()) {
+ executor.setActive(true);
+ this.executorLoader.updateExecutor(executor);
+ }
+ return ImmutableSet.of(new Executor(executor.getId(), executorHost, executorPort, true));
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
new file mode 100644
index 0000000..53a23d3
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.alert.Alerter;
+import java.util.LinkedList;
+import java.util.List;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Handles removing of running executions (after they have been deemed to be be done or orphaned).
+ */
+public class ExecutionFinalizer {
+
+ private static final Logger logger = Logger.getLogger(ExecutionFinalizer.class);
+
+ private final ExecutorLoader executorLoader;
+ private final ExecutorManagerUpdaterStage updaterStage;
+ private final AlerterHolder alerterHolder;
+ private final RunningExecutions runningExecutions;
+
+ @Inject
+ public ExecutionFinalizer(final ExecutorLoader executorLoader,
+ final ExecutorManagerUpdaterStage updaterStage,
+ final AlerterHolder alerterHolder, final RunningExecutions runningExecutions) {
+ this.executorLoader = executorLoader;
+ this.updaterStage = updaterStage;
+ this.alerterHolder = alerterHolder;
+ this.runningExecutions = runningExecutions;
+ }
+
+ /**
+ * If the current status of the execution is not one of the finished statuses, marks the execution
+ * as failed in the DB. Removes the execution from the running executions cache.
+ *
+ * @param flow the execution
+ * @param reason reason for finalizing the execution
+ * @param originalError the cause, if execution is being finalized because of an error
+ */
+ public void finalizeFlow(final ExecutableFlow flow, final String reason,
+ @Nullable final Throwable originalError) {
+
+ final int execId = flow.getExecutionId();
+ boolean alertUser = true;
+ final String[] extraReasons = getFinalizeFlowReasons(reason, originalError);
+ this.updaterStage.set("finalizing flow " + execId);
+ // First we check if the execution in the datastore is complete
+ try {
+ final ExecutableFlow dsFlow;
+ if (ExecutorManager.isFinished(flow)) {
+ dsFlow = flow;
+ } else {
+ this.updaterStage.set("finalizing flow " + execId + " loading from db");
+ dsFlow = this.executorLoader.fetchExecutableFlow(execId);
+
+ // If it's marked finished, we're good. If not, we fail everything and
+ // then mark it finished.
+ if (!ExecutorManager.isFinished(dsFlow)) {
+ this.updaterStage.set("finalizing flow " + execId + " failing the flow");
+ failEverything(dsFlow);
+ this.executorLoader.updateExecutableFlow(dsFlow);
+ }
+ }
+
+ this.updaterStage.set("finalizing flow " + execId + " deleting active reference");
+
+ // Delete the executing reference.
+ if (flow.getEndTime() == -1) {
+ flow.setEndTime(System.currentTimeMillis());
+ this.executorLoader.updateExecutableFlow(dsFlow);
+ }
+ this.executorLoader.removeActiveExecutableReference(execId);
+
+ this.updaterStage.set("finalizing flow " + execId + " cleaning from memory");
+ this.runningExecutions.get().remove(execId);
+ } catch (final ExecutorManagerException e) {
+ alertUser = false; // failed due to azkaban internal error, not to alert user
+ logger.error(e);
+ }
+
+ // TODO append to the flow log that we marked this flow as failed + the extraReasons
+
+ this.updaterStage.set("finalizing flow " + execId + " alerting and emailing");
+ if (alertUser) {
+ final ExecutionOptions options = flow.getExecutionOptions();
+ // But we can definitely email them.
+ final Alerter mailAlerter = this.alerterHolder.get("email");
+ if (flow.getStatus() != Status.SUCCEEDED) {
+ if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
+ try {
+ mailAlerter.alertOnError(flow, extraReasons);
+ } catch (final Exception e) {
+ logger.error(e);
+ }
+ }
+ if (options.getFlowParameters().containsKey("alert.type")) {
+ final String alertType = options.getFlowParameters().get("alert.type");
+ final Alerter alerter = this.alerterHolder.get(alertType);
+ if (alerter != null) {
+ try {
+ alerter.alertOnError(flow, extraReasons);
+ } catch (final Exception e) {
+ logger.error("Failed to alert by " + alertType, e);
+ }
+ } else {
+ logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+ }
+ }
+ } else {
+ if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
+ try {
+
+ mailAlerter.alertOnSuccess(flow);
+ } catch (final Exception e) {
+ logger.error(e);
+ }
+ }
+ if (options.getFlowParameters().containsKey("alert.type")) {
+ final String alertType = options.getFlowParameters().get("alert.type");
+ final Alerter alerter = this.alerterHolder.get(alertType);
+ if (alerter != null) {
+ try {
+ alerter.alertOnSuccess(flow);
+ } catch (final Exception e) {
+ logger.error("Failed to alert by " + alertType, e);
+ }
+ } else {
+ logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+ }
+ }
+ }
+ }
+
+ }
+
+ private String[] getFinalizeFlowReasons(final String reason, final Throwable originalError) {
+ final List<String> reasons = new LinkedList<>();
+ reasons.add(reason);
+ if (originalError != null) {
+ reasons.add(ExceptionUtils.getStackTrace(originalError));
+ }
+ return reasons.toArray(new String[reasons.size()]);
+ }
+
+ private void failEverything(final ExecutableFlow exFlow) {
+ final long time = System.currentTimeMillis();
+ for (final ExecutableNode node : exFlow.getExecutableNodes()) {
+ switch (node.getStatus()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ case SKIPPED:
+ case DISABLED:
+ continue;
+ // case UNKNOWN:
+ case READY:
+ node.setStatus(Status.KILLING);
+ break;
+ default:
+ node.setStatus(Status.FAILED);
+ break;
+ }
+
+ if (node.getStartTime() == -1) {
+ node.setStartTime(time);
+ }
+ if (node.getEndTime() == -1) {
+ node.setEndTime(time);
+ }
+ }
+
+ if (exFlow.getEndTime() == -1) {
+ exFlow.setEndTime(time);
+ }
+
+ exFlow.setStatus(Status.FAILED);
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3641e5e..c6fe2a2 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -20,7 +20,6 @@ import static java.util.Objects.requireNonNull;
import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
-import azkaban.alert.Alerter;
import azkaban.event.EventHandler;
import azkaban.executor.selector.ExecutorComparator;
import azkaban.executor.selector.ExecutorFilter;
@@ -32,10 +31,8 @@ import azkaban.project.ProjectWhitelist;
import azkaban.utils.AuthenticationUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.File;
@@ -50,15 +47,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -69,7 +63,6 @@ import java.util.regex.Pattern;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -96,42 +89,46 @@ public class ExecutorManager extends EventHandler implements
* 24 * 60 * 60 * 1000L;
private static final Duration RECENTLY_FINISHED_LIFETIME = Duration.ofMinutes(10);
private static final Logger logger = Logger.getLogger(ExecutorManager.class);
- private final AlerterHolder alerterHolder;
+ private final RunningExecutions runningExecutions;
private final Props azkProps;
private final CommonMetrics commonMetrics;
private final ExecutorLoader executorLoader;
private final CleanerThread cleanerThread;
- private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows =
- new ConcurrentHashMap<>();
- private final ExecutingManagerUpdaterThread executingManager;
+ private final RunningExecutionsUpdaterThread updaterThread;
private final ExecutorApiGateway apiGateway;
private final int maxConcurrentRunsOneFlow;
+ private final ExecutorManagerUpdaterStage updaterStage;
+ private final ExecutionFinalizer executionFinalizer;
+ private final ActiveExecutors activeExecutors;
QueuedExecutions queuedFlows;
File cacheDir;
- //make it immutable to ensure threadsafety
- private volatile ImmutableSet<Executor> activeExecutors = null;
private QueueProcessorThread queueProcessor;
private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
- private long lastCleanerThreadCheckTime = -1;
- private long lastThreadCheckTime = -1;
- private String updaterStage = "not started";
private List<String> filterList;
private Map<String, Integer> comparatorWeightsMap;
private long lastSuccessfulExecutorInfoRefresh;
private ExecutorService executorInforRefresherService;
@Inject
- public ExecutorManager(final Props azkProps, final ExecutorLoader loader,
- final AlerterHolder alerterHolder,
+ public ExecutorManager(final Props azkProps, final ExecutorLoader executorLoader,
final CommonMetrics commonMetrics,
- final ExecutorApiGateway apiGateway) throws ExecutorManagerException {
- this.alerterHolder = alerterHolder;
+ final ExecutorApiGateway apiGateway,
+ final RunningExecutions runningExecutions,
+ final ActiveExecutors activeExecutors,
+ final ExecutorManagerUpdaterStage updaterStage,
+ final ExecutionFinalizer executionFinalizer,
+ final RunningExecutionsUpdaterThread updaterThread) throws ExecutorManagerException {
this.azkProps = azkProps;
this.commonMetrics = commonMetrics;
- this.executorLoader = loader;
+ this.executorLoader = executorLoader;
this.apiGateway = apiGateway;
+ this.runningExecutions = runningExecutions;
+ this.activeExecutors = activeExecutors;
+ this.updaterStage = updaterStage;
+ this.executionFinalizer = executionFinalizer;
+ this.updaterThread = updaterThread;
this.setupExecutors();
- this.loadRunningFlows();
+ this.loadRunningExecutions();
this.queuedFlows = new QueuedExecutions(
azkProps.getLong(Constants.ConfigurationKeys.WEBSERVER_QUEUE_SIZE, 100000));
@@ -145,8 +142,6 @@ public class ExecutorManager extends EventHandler implements
this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
- this.executingManager = new ExecutingManagerUpdaterThread();
-
if (isMultiExecutorMode()) {
setupMultiExecutorMode();
}
@@ -158,8 +153,25 @@ public class ExecutorManager extends EventHandler implements
this.cleanerThread = new CleanerThread(executionLogsRetentionMs);
}
+ // TODO move to some common place
+ static boolean isFinished(final ExecutableFlow flow) {
+ switch (flow.getStatus()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ // TODO switch to always use "multi executor mode" - even for single server
+ public static boolean isMultiExecutorMode(final Props props) {
+ return props.getBoolean(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, false);
+ }
+
public void start() {
- this.executingManager.start();
+ this.updaterThread.start();
this.cleanerThread.start();
if (isMultiExecutorMode()) {
this.queueProcessor.start();
@@ -177,15 +189,14 @@ public class ExecutorManager extends EventHandler implements
}
private void setupMultiExecutorMode() {
- // initliatize hard filters for executor selector from azkaban.properties
+ // initialize hard filters for executor selector from azkaban.properties
final String filters = this.azkProps
.getString(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_FILTERS, "");
if (filters != null) {
this.filterList = Arrays.asList(StringUtils.split(filters, ","));
}
- // initliatize comparator feature weights for executor selector from
- // azkaban.properties
+ // initialize comparator feature weights for executor selector from azkaban.properties
final Map<String, String> compListStrings = this.azkProps
.getMapByPrefix(Constants.ConfigurationKeys.EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
if (compListStrings != null) {
@@ -208,7 +219,7 @@ public class ExecutorManager extends EventHandler implements
Constants.ConfigurationKeys.ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5),
this.azkProps.getInt(
Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED,
- this.activeExecutors.size()));
+ this.activeExecutors.getAll().size()));
}
/**
@@ -218,48 +229,11 @@ public class ExecutorManager extends EventHandler implements
*/
@Override
public void setupExecutors() throws ExecutorManagerException {
- final Set<Executor> newExecutors = new HashSet<>();
-
- if (isMultiExecutorMode()) {
- logger.info("Initializing multi executors from database.");
- newExecutors.addAll(this.executorLoader.fetchActiveExecutors());
- } else if (this.azkProps.containsKey(ConfigurationKeys.EXECUTOR_PORT)) {
- // add local executor, if specified as per properties
- final String executorHost = this.azkProps
- .getString(Constants.ConfigurationKeys.EXECUTOR_HOST, "localhost");
- final int executorPort = this.azkProps.getInt(ConfigurationKeys.EXECUTOR_PORT);
- logger.info(String.format("Initializing local executor %s:%d",
- executorHost, executorPort));
- Executor executor =
- this.executorLoader.fetchExecutor(executorHost, executorPort);
- if (executor == null) {
- executor = this.executorLoader.addExecutor(executorHost, executorPort);
- } else if (!executor.isActive()) {
- executor.setActive(true);
- this.executorLoader.updateExecutor(executor);
- }
- newExecutors.add(new Executor(executor.getId(), executorHost,
- executorPort, true));
- } else {
- // throw exception when in single executor mode and no executor port specified in azkaban
- // properties
- //todo chengren311: convert to slf4j and parameterized logging
- final String error = "Missing" + ConfigurationKeys.EXECUTOR_PORT + " in azkaban properties.";
- logger.error(error);
- throw new ExecutorManagerException(error);
- }
-
- if (newExecutors.isEmpty()) {
- final String error = "No active executor found";
- logger.error(error);
- throw new ExecutorManagerException(error);
- } else {
- this.activeExecutors = ImmutableSet.copyOf(newExecutors);
- }
+ this.activeExecutors.setupExecutors();
}
private boolean isMultiExecutorMode() {
- return this.azkProps.getBoolean(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, false);
+ return isMultiExecutorMode(this.azkProps);
}
/**
@@ -269,7 +243,7 @@ public class ExecutorManager extends EventHandler implements
final List<Pair<Executor, Future<ExecutorInfo>>> futures =
new ArrayList<>();
- for (final Executor executor : this.activeExecutors) {
+ for (final Executor executor : this.activeExecutors.getAll()) {
// execute each executorInfo refresh task to fetch
final Future<ExecutorInfo> fetchExecutionInfo =
this.executorInforRefresherService.submit(
@@ -382,26 +356,26 @@ public class ExecutorManager extends EventHandler implements
@Override
public State getExecutorManagerThreadState() {
- return this.executingManager.getState();
+ return this.updaterThread.getState();
}
public String getExecutorThreadStage() {
- return this.updaterStage;
+ return this.updaterStage.get();
}
@Override
public boolean isExecutorManagerThreadActive() {
- return this.executingManager.isAlive();
+ return this.updaterThread.isAlive();
}
@Override
public long getLastExecutorManagerThreadCheckTime() {
- return this.lastThreadCheckTime;
+ return this.updaterThread.getLastThreadCheckTime();
}
@Override
public Collection<Executor> getAllActiveExecutors() {
- return Collections.unmodifiableCollection(this.activeExecutors);
+ return Collections.unmodifiableCollection(this.activeExecutors.getAll());
}
/**
@@ -411,7 +385,7 @@ public class ExecutorManager extends EventHandler implements
*/
@Override
public Executor fetchExecutor(final int executorId) throws ExecutorManagerException {
- for (final Executor executor : this.activeExecutors) {
+ for (final Executor executor : this.activeExecutors.getAll()) {
if (executor.getId() == executorId) {
return executor;
}
@@ -423,7 +397,7 @@ public class ExecutorManager extends EventHandler implements
public Set<String> getPrimaryServerHosts() {
// Only one for now. More probably later.
final HashSet<String> ports = new HashSet<>();
- for (final Executor executor : this.activeExecutors) {
+ for (final Executor executor : this.activeExecutors.getAll()) {
ports.add(executor.getHost() + ":" + executor.getPort());
}
return ports;
@@ -433,11 +407,11 @@ public class ExecutorManager extends EventHandler implements
public Set<String> getAllActiveExecutorServerHosts() {
// Includes non primary server/hosts
final HashSet<String> ports = new HashSet<>();
- for (final Executor executor : this.activeExecutors) {
+ for (final Executor executor : this.activeExecutors.getAll()) {
ports.add(executor.getHost() + ":" + executor.getPort());
}
// include executor which were initially active and still has flows running
- for (final Pair<ExecutionReference, ExecutableFlow> running : this.runningFlows
+ for (final Pair<ExecutionReference, ExecutableFlow> running : this.runningExecutions.get()
.values()) {
final ExecutionReference ref = running.getFirst();
if (ref.getExecutor().isPresent()) {
@@ -448,12 +422,12 @@ public class ExecutorManager extends EventHandler implements
return ports;
}
- private void loadRunningFlows() throws ExecutorManagerException {
+ private void loadRunningExecutions() throws ExecutorManagerException {
logger.info("Loading running flows from database..");
final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = this.executorLoader
.fetchActiveFlows();
logger.info("Loaded " + activeFlows.size() + " running flows");
- this.runningFlows.putAll(activeFlows);
+ this.runningExecutions.get().putAll(activeFlows);
}
/*
@@ -490,7 +464,7 @@ public class ExecutorManager extends EventHandler implements
getRunningFlowsHelper(projectId, flowId, Lists.newArrayList(this.runningCandidate)));
}
executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
- this.runningFlows.values()));
+ this.runningExecutions.get().values()));
Collections.sort(executionIds);
return executionIds;
}
@@ -519,7 +493,7 @@ public class ExecutorManager extends EventHandler implements
final List<Pair<ExecutableFlow, Optional<Executor>>> flows =
new ArrayList<>();
getActiveFlowsWithExecutorHelper(flows, this.queuedFlows.getAllEntries());
- getActiveFlowsWithExecutorHelper(flows, this.runningFlows.values());
+ getActiveFlowsWithExecutorHelper(flows, this.runningExecutions.get().values());
return flows;
}
@@ -546,7 +520,7 @@ public class ExecutorManager extends EventHandler implements
|| isFlowRunningHelper(projectId, flowId, this.queuedFlows.getAllEntries());
isRunning =
isRunning
- || isFlowRunningHelper(projectId, flowId, this.runningFlows.values());
+ || isFlowRunningHelper(projectId, flowId, this.runningExecutions.get().values());
return isRunning;
}
@@ -584,7 +558,7 @@ public class ExecutorManager extends EventHandler implements
public List<ExecutableFlow> getRunningFlows() {
final ArrayList<ExecutableFlow> flows = new ArrayList<>();
getActiveFlowHelper(flows, this.queuedFlows.getAllEntries());
- getActiveFlowHelper(flows, this.runningFlows.values());
+ getActiveFlowHelper(flows, this.runningExecutions.get().values());
return flows;
}
@@ -609,7 +583,7 @@ public class ExecutorManager extends EventHandler implements
public String getRunningFlowIds() {
final List<Integer> allIds = new ArrayList<>();
getRunningFlowsIdsHelper(allIds, this.queuedFlows.getAllEntries());
- getRunningFlowsIdsHelper(allIds, this.runningFlows.values());
+ getRunningFlowsIdsHelper(allIds, this.runningExecutions.get().values());
Collections.sort(allIds);
return allIds.toString();
}
@@ -628,7 +602,6 @@ public class ExecutorManager extends EventHandler implements
return allIds.toString();
}
-
public long getQueuedFlowSize() {
return this.queuedFlows.size();
}
@@ -713,7 +686,7 @@ public class ExecutorManager extends EventHandler implements
public LogData getExecutableFlowLog(final ExecutableFlow exFlow, final int offset,
final int length) throws ExecutorManagerException {
final Pair<ExecutionReference, ExecutableFlow> pair =
- this.runningFlows.get(exFlow.getExecutionId());
+ this.runningExecutions.get().get(exFlow.getExecutionId());
if (pair != null) {
final Pair<String, String> typeParam = new Pair<>("type", "flow");
final Pair<String, String> offsetParam =
@@ -737,7 +710,7 @@ public class ExecutorManager extends EventHandler implements
public LogData getExecutionJobLog(final ExecutableFlow exFlow, final String jobId,
final int offset, final int length, final int attempt) throws ExecutorManagerException {
final Pair<ExecutionReference, ExecutableFlow> pair =
- this.runningFlows.get(exFlow.getExecutionId());
+ this.runningExecutions.get().get(exFlow.getExecutionId());
if (pair != null) {
final Pair<String, String> typeParam = new Pair<>("type", "job");
final Pair<String, String> jobIdParam =
@@ -765,7 +738,7 @@ public class ExecutorManager extends EventHandler implements
public List<Object> getExecutionJobStats(final ExecutableFlow exFlow, final String jobId,
final int attempt) throws ExecutorManagerException {
final Pair<ExecutionReference, ExecutableFlow> pair =
- this.runningFlows.get(exFlow.getExecutionId());
+ this.runningExecutions.get().get(exFlow.getExecutionId());
if (pair == null) {
return this.executorLoader.fetchAttachments(exFlow.getExecutionId(), jobId,
attempt);
@@ -883,7 +856,7 @@ public class ExecutorManager extends EventHandler implements
final String jobId, final int offset, final int length, final int attempt)
throws ExecutorManagerException {
final Pair<ExecutionReference, ExecutableFlow> pair =
- this.runningFlows.get(exFlow.getExecutionId());
+ this.runningExecutions.get().get(exFlow.getExecutionId());
if (pair != null) {
final Pair<String, String> typeParam = new Pair<>("type", "job");
@@ -916,14 +889,15 @@ public class ExecutorManager extends EventHandler implements
public void cancelFlow(final ExecutableFlow exFlow, final String userId)
throws ExecutorManagerException {
synchronized (exFlow) {
- if (this.runningFlows.containsKey(exFlow.getExecutionId())) {
+ if (this.runningExecutions.get().containsKey(exFlow.getExecutionId())) {
final Pair<ExecutionReference, ExecutableFlow> pair =
- this.runningFlows.get(exFlow.getExecutionId());
+ this.runningExecutions.get().get(exFlow.getExecutionId());
this.apiGateway.callWithReferenceByUser(pair.getFirst(), ConnectorParams.CANCEL_ACTION,
userId);
} else if (this.queuedFlows.hasExecution(exFlow.getExecutionId())) {
this.queuedFlows.dequeue(exFlow.getExecutionId());
- finalizeFlows(exFlow, "Cancelled before dispatching to executor", null);
+ this.executionFinalizer
+ .finalizeFlow(exFlow, "Cancelled before dispatching to executor", null);
} else {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -937,7 +911,7 @@ public class ExecutorManager extends EventHandler implements
throws ExecutorManagerException {
synchronized (exFlow) {
final Pair<ExecutionReference, ExecutableFlow> pair =
- this.runningFlows.get(exFlow.getExecutionId());
+ this.runningExecutions.get().get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -953,7 +927,7 @@ public class ExecutorManager extends EventHandler implements
throws ExecutorManagerException {
synchronized (exFlow) {
final Pair<ExecutionReference, ExecutableFlow> pair =
- this.runningFlows.get(exFlow.getExecutionId());
+ this.runningExecutions.get().get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -1018,7 +992,7 @@ public class ExecutorManager extends EventHandler implements
throws ExecutorManagerException {
synchronized (exFlow) {
final Pair<ExecutionReference, ExecutableFlow> pair =
- this.runningFlows.get(exFlow.getExecutionId());
+ this.runningExecutions.get().get(exFlow.getExecutionId());
if (pair == null) {
throw new ExecutorManagerException("Execution "
+ exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -1142,7 +1116,7 @@ public class ExecutorManager extends EventHandler implements
this.queuedFlows.enqueue(exflow, reference);
} else {
// assign only local executor we have
- final Executor choosenExecutor = this.activeExecutors.iterator().next();
+ final Executor choosenExecutor = this.activeExecutors.getAll().iterator().next();
this.executorLoader.addActiveExecutableReference(reference);
try {
dispatch(reference, exflow, choosenExecutor);
@@ -1153,7 +1127,7 @@ public class ExecutorManager extends EventHandler implements
// this logic is only implemented in multiExecutorMode but
// missed in single executor case.
this.commonMetrics.markDispatchFail();
- finalizeFlows(exflow, "Dispatching failed", e);
+ this.executionFinalizer.finalizeFlow(exflow, "Dispatching failed", e);
throw e;
}
}
@@ -1204,7 +1178,6 @@ public class ExecutorManager extends EventHandler implements
"/stats", paramList);
}
-
@Override
public Map<String, Object> callExecutorJMX(final String hostPort, final String action,
final String mBean) throws IOException {
@@ -1226,259 +1199,7 @@ public class ExecutorManager extends EventHandler implements
if (isMultiExecutorMode()) {
this.queueProcessor.shutdown();
}
- this.executingManager.shutdown();
- }
-
- private void finalizeFlows(final ExecutableFlow flow, final String reason,
- final Throwable originalError) {
-
- final int execId = flow.getExecutionId();
- boolean alertUser = true;
- final String[] extraReasons = getFinalizeFlowReasons(reason, originalError);
- this.updaterStage = "finalizing flow " + execId;
- // First we check if the execution in the datastore is complete
- try {
- final ExecutableFlow dsFlow;
- if (isFinished(flow)) {
- dsFlow = flow;
- } else {
- this.updaterStage = "finalizing flow " + execId + " loading from db";
- dsFlow = this.executorLoader.fetchExecutableFlow(execId);
-
- // If it's marked finished, we're good. If not, we fail everything and
- // then mark it finished.
- if (!isFinished(dsFlow)) {
- this.updaterStage = "finalizing flow " + execId + " failing the flow";
- failEverything(dsFlow);
- this.executorLoader.updateExecutableFlow(dsFlow);
- }
- }
-
- this.updaterStage = "finalizing flow " + execId + " deleting active reference";
-
- // Delete the executing reference.
- if (flow.getEndTime() == -1) {
- flow.setEndTime(System.currentTimeMillis());
- this.executorLoader.updateExecutableFlow(dsFlow);
- }
- this.executorLoader.removeActiveExecutableReference(execId);
-
- this.updaterStage = "finalizing flow " + execId + " cleaning from memory";
- this.runningFlows.remove(execId);
- } catch (final ExecutorManagerException e) {
- alertUser = false; // failed due to azkaban internal error, not to alert user
- logger.error(e);
- }
-
- // TODO append to the flow log that we marked this flow as failed + the extraReasons
-
- this.updaterStage = "finalizing flow " + execId + " alerting and emailing";
- if (alertUser) {
- final ExecutionOptions options = flow.getExecutionOptions();
- // But we can definitely email them.
- final Alerter mailAlerter = this.alerterHolder.get("email");
- if (flow.getStatus() != Status.SUCCEEDED) {
- if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
- try {
- mailAlerter.alertOnError(flow, extraReasons);
- } catch (final Exception e) {
- logger.error(e);
- }
- }
- if (options.getFlowParameters().containsKey("alert.type")) {
- final String alertType = options.getFlowParameters().get("alert.type");
- final Alerter alerter = this.alerterHolder.get(alertType);
- if (alerter != null) {
- try {
- alerter.alertOnError(flow, extraReasons);
- } catch (final Exception e) {
- logger.error("Failed to alert by " + alertType, e);
- }
- } else {
- logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
- }
- }
- } else {
- if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
- try {
-
- mailAlerter.alertOnSuccess(flow);
- } catch (final Exception e) {
- logger.error(e);
- }
- }
- if (options.getFlowParameters().containsKey("alert.type")) {
- final String alertType = options.getFlowParameters().get("alert.type");
- final Alerter alerter = this.alerterHolder.get(alertType);
- if (alerter != null) {
- try {
- alerter.alertOnSuccess(flow);
- } catch (final Exception e) {
- logger.error("Failed to alert by " + alertType, e);
- }
- } else {
- logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
- }
- }
- }
- }
-
- }
-
- private String[] getFinalizeFlowReasons(final String reason, final Throwable originalError) {
- final List<String> reasons = new LinkedList<>();
- reasons.add(reason);
- if (originalError != null) {
- reasons.add(ExceptionUtils.getStackTrace(originalError));
- }
- return reasons.toArray(new String[reasons.size()]);
- }
-
- private void failEverything(final ExecutableFlow exFlow) {
- final long time = System.currentTimeMillis();
- for (final ExecutableNode node : exFlow.getExecutableNodes()) {
- switch (node.getStatus()) {
- case SUCCEEDED:
- case FAILED:
- case KILLED:
- case SKIPPED:
- case DISABLED:
- continue;
- // case UNKNOWN:
- case READY:
- node.setStatus(Status.KILLING);
- break;
- default:
- node.setStatus(Status.FAILED);
- break;
- }
-
- if (node.getStartTime() == -1) {
- node.setStartTime(time);
- }
- if (node.getEndTime() == -1) {
- node.setEndTime(time);
- }
- }
-
- if (exFlow.getEndTime() == -1) {
- exFlow.setEndTime(time);
- }
-
- exFlow.setStatus(Status.FAILED);
- }
-
- private ExecutableFlow updateExecution(final Map<String, Object> updateData)
- throws ExecutorManagerException {
-
- final Integer execId =
- (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
- if (execId == null) {
- throw new ExecutorManagerException(
- "Response is malformed. Need exec id to update.");
- }
-
- final Pair<ExecutionReference, ExecutableFlow> refPair =
- this.runningFlows.get(execId);
- if (refPair == null) {
- throw new ExecutorManagerException(
- "No running flow found with the execution id. Removing " + execId);
- }
-
- final ExecutionReference ref = refPair.getFirst();
- final ExecutableFlow flow = refPair.getSecond();
- if (updateData.containsKey("error")) {
- // The flow should be finished here.
- throw new ExecutorManagerException((String) updateData.get("error"), flow);
- }
-
- // Reset errors.
- ref.setNextCheckTime(0);
- ref.setNumErrors(0);
- final Status oldStatus = flow.getStatus();
- flow.applyUpdateObject(updateData);
- final Status newStatus = flow.getStatus();
-
- if (oldStatus != newStatus && newStatus == Status.FAILED) {
- this.commonMetrics.markFlowFail();
- }
-
- final ExecutionOptions options = flow.getExecutionOptions();
- if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
- // We want to see if we should give an email status on first failure.
- if (options.getNotifyOnFirstFailure()) {
- final Alerter mailAlerter = this.alerterHolder.get("email");
- try {
- mailAlerter.alertOnFirstError(flow);
- } catch (final Exception e) {
- logger.error("Failed to send first error email." + e.getMessage(), e);
- }
- }
- if (options.getFlowParameters().containsKey("alert.type")) {
- final String alertType = options.getFlowParameters().get("alert.type");
- final Alerter alerter = this.alerterHolder.get(alertType);
- if (alerter != null) {
- try {
- alerter.alertOnFirstError(flow);
- } catch (final Exception e) {
- logger.error("Failed to alert by " + alertType, e);
- }
- } else {
- logger.error("Alerter type " + alertType
- + " doesn't exist. Failed to alert.");
- }
- }
- }
-
- return flow;
- }
-
- public boolean isFinished(final ExecutableFlow flow) {
- switch (flow.getStatus()) {
- case SUCCEEDED:
- case FAILED:
- case KILLED:
- return true;
- default:
- return false;
- }
- }
-
- private void fillUpdateTimeAndExecId(final List<ExecutableFlow> flows,
- final List<Integer> executionIds, final List<Long> updateTimes) {
- for (final ExecutableFlow flow : flows) {
- executionIds.add(flow.getExecutionId());
- updateTimes.add(flow.getUpdateTime());
- }
- }
-
- /* Group Executable flow by Executors to reduce number of REST calls */
- private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
- final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
- new HashMap<>();
-
- for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningFlows
- .values()) {
- final ExecutionReference ref = runningFlow.getFirst();
- final ExecutableFlow flow = runningFlow.getSecond();
- final Optional<Executor> executor = ref.getExecutor();
-
- // We can set the next check time to prevent the checking of certain
- // flows.
- if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
- continue;
- }
-
- List<ExecutableFlow> flows = exFlowMap.get(executor);
- if (flows == null) {
- flows = new ArrayList<>();
- exFlowMap.put(executor, flows);
- }
-
- flows.add(flow);
- }
-
- return exFlowMap;
+ this.updaterThread.shutdown();
}
@Override
@@ -1520,13 +1241,16 @@ public class ExecutorManager extends EventHandler implements
reference.setExecutor(choosenExecutor);
// move from flow to running flows
- this.runningFlows.put(exflow.getExecutionId(),
- new Pair<>(reference, exflow));
- synchronized (this) {
- // Wake up ExecutingManagerUpdaterThread from wait() so that it will immediately check status
+ this.runningExecutions.get().put(exflow.getExecutionId(), new Pair<>(reference, exflow));
+ synchronized (this.runningExecutions.get()) {
+ // Wake up RunningExecutionsUpdaterThread from wait() so that it will immediately check status
// from executor(s). Normally flows will run at least some time and can't be cleaned up
// immediately, so there will be another wait round (or many, actually), but for unit tests
// this is significant to let them run quickly.
+ this.runningExecutions.get().notifyAll();
+ }
+ synchronized (this) {
+ // wake up all internal waiting threads, too
this.notifyAll();
}
@@ -1535,160 +1259,6 @@ public class ExecutorManager extends EventHandler implements
exflow.getExecutionId(), reference.getNumErrors()));
}
- private class ExecutingManagerUpdaterThread extends Thread {
-
- private final int waitTimeIdleMs = 2000;
- private final int waitTimeMs = 500;
- // When we have an http error, for that flow, we'll check every 10 secs, 360
- // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
- private final int numErrorsBetweenUnresponsiveEmail = 360;
- // First email is sent after 1 minute of unresponsiveness
- private final int numErrorsBeforeUnresponsiveEmail = 6;
- private final long errorThreshold = 10000;
- private boolean shutdown = false;
-
- public ExecutingManagerUpdaterThread() {
- this.setName("ExecutorManagerUpdaterThread");
- }
-
- private void shutdown() {
- this.shutdown = true;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void run() {
- while (!this.shutdown) {
- try {
- ExecutorManager.this.lastThreadCheckTime = System.currentTimeMillis();
- ExecutorManager.this.updaterStage = "Starting update all flows.";
-
- final Map<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
- getFlowToExecutorMap();
- final ArrayList<ExecutableFlow> finalizeFlows =
- new ArrayList<>();
-
- if (exFlowMap.size() > 0) {
- for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap
- .entrySet()) {
- final List<Long> updateTimesList = new ArrayList<>();
- final List<Integer> executionIdsList = new ArrayList<>();
-
- final Optional<Executor> executorOption = entry.getKey();
- if (!executorOption.isPresent()) {
- for (final ExecutableFlow flow : entry.getValue()) {
- logger.warn("Finalizing execution " + flow.getExecutionId()
- + ". Executor id of this execution doesn't exist");
- finalizeFlows.add(flow);
- }
- continue;
- }
- final Executor executor = executorOption.get();
-
- ExecutorManager.this.updaterStage =
- "Starting update flows on " + executor.getHost() + ":"
- + executor.getPort();
-
- // We pack the parameters of the same host together before we
- // query.
- fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
- updateTimesList);
-
- final Pair<String, String> updateTimes =
- new Pair<>(
- ConnectorParams.UPDATE_TIME_LIST_PARAM,
- JSONUtils.toJSON(updateTimesList));
- final Pair<String, String> executionIds =
- new Pair<>(ConnectorParams.EXEC_ID_LIST_PARAM,
- JSONUtils.toJSON(executionIdsList));
-
- Map<String, Object> results = null;
- try {
- results =
- ExecutorManager.this.apiGateway.callWithExecutionId(executor.getHost(),
- executor.getPort(), ConnectorParams.UPDATE_ACTION,
- null, null, executionIds, updateTimes);
- } catch (final ExecutorManagerException e) {
- logger.error("Failed to get update from executor " + executor.getHost(), e);
- boolean sendUnresponsiveEmail = false;
- for (final ExecutableFlow flow : entry.getValue()) {
- final Pair<ExecutionReference, ExecutableFlow> pair =
- ExecutorManager.this.runningFlows.get(flow.getExecutionId());
- // TODO can runningFlows.get ever return null, causing NPE below?
-
- ExecutorManager.this.updaterStage =
- "Failed to get update for flow " + pair.getSecond().getExecutionId();
-
- final ExecutionReference ref = pair.getFirst();
- ref.setNextCheckTime(System.currentTimeMillis() + this.errorThreshold);
- ref.setNumErrors(ref.getNumErrors() + 1);
- if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
- || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
- // if any of the executions has failed many enough updates, alert
- sendUnresponsiveEmail = true;
- }
- }
- if (sendUnresponsiveEmail) {
- final Alerter mailAlerter = ExecutorManager.this.alerterHolder.get("email");
- mailAlerter.alertOnFailedUpdate(executor, entry.getValue(), e);
- }
- }
-
- // We gets results
- if (results != null) {
- final List<Map<String, Object>> executionUpdates =
- (List<Map<String, Object>>) results
- .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
- for (final Map<String, Object> updateMap : executionUpdates) {
- try {
- final ExecutableFlow flow = updateExecution(updateMap);
-
- ExecutorManager.this.updaterStage = "Updated flow " + flow.getExecutionId();
-
- if (isFinished(flow)) {
- finalizeFlows.add(flow);
- }
- } catch (final ExecutorManagerException e) {
- final ExecutableFlow flow = e.getExecutableFlow();
- logger.error(e);
-
- if (flow != null) {
- logger.warn("Finalizing execution " + flow.getExecutionId());
- finalizeFlows.add(flow);
- }
- }
- }
- }
- }
-
- ExecutorManager.this.updaterStage =
- "Finalizing " + finalizeFlows.size() + " error flows.";
-
- // Kill error flows
- for (final ExecutableFlow flow : finalizeFlows) {
- finalizeFlows(flow, "Not running on the assigned executor (any more)", null);
- }
- }
-
- ExecutorManager.this.updaterStage = "Updated all active flows. Waiting for next round.";
-
- synchronized (ExecutorManager.this) {
- try {
- if (ExecutorManager.this.runningFlows.size() > 0) {
- ExecutorManager.this.wait(this.waitTimeMs);
- } else {
- ExecutorManager.this.wait(this.waitTimeIdleMs);
- }
- } catch (final InterruptedException e) {
- }
- }
- } catch (final Exception e) {
- logger.error("Unexpected exception in updating executions", e);
- }
- }
- }
- }
-
/*
* cleaner thread to clean up execution_logs, etc in DB. Runs every hour.
*/
@@ -1719,8 +1289,6 @@ public class ExecutorManager extends EventHandler implements
while (!this.shutdown) {
synchronized (this) {
try {
- ExecutorManager.this.lastCleanerThreadCheckTime = System.currentTimeMillis();
-
// Cleanup old stuff.
final long currentTime = System.currentTimeMillis();
if (currentTime - CLEANER_THREAD_WAIT_INTERVAL_MS > this.lastLogCleanTime) {
@@ -1872,14 +1440,16 @@ public class ExecutorManager extends EventHandler implements
private void selectExecutorAndDispatchFlow(final ExecutionReference reference,
final ExecutableFlow exflow)
throws ExecutorManagerException {
- final Set<Executor> remainingExecutors = new HashSet<>(ExecutorManager.this.activeExecutors);
+ final Set<Executor> remainingExecutors = new HashSet<>(
+ ExecutorManager.this.activeExecutors.getAll());
synchronized (exflow) {
for (int i = 0; i <= this.maxDispatchingErrors; i++) {
final String giveUpReason = checkGiveUpDispatching(reference, remainingExecutors);
if (giveUpReason != null) {
logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
+ giveUpReason);
- finalizeFlows(exflow, "Failed to dispatch because " + giveUpReason, null);
+ ExecutorManager.this.executionFinalizer
+ .finalizeFlow(exflow, "Failed to dispatch because " + giveUpReason, null);
// GIVE UP DISPATCHING - exit
return;
} else {
@@ -1917,13 +1487,16 @@ public class ExecutorManager extends EventHandler implements
+ " (tried " + reference.getNumErrors() + " executors)";
} else if (remainingExecutors.isEmpty()) {
return "tried calling all executors (total: "
- + ExecutorManager.this.activeExecutors.size() + ") but all failed";
+ // TODO rather use the original size (activeExecutors may have been reloaded in the
+ // meanwhile)
+ + ExecutorManager.this.activeExecutors.getAll().size() + ") but all failed";
} else {
return null;
}
}
- private void logFailedDispatchAttempt(final ExecutionReference reference, final ExecutableFlow exflow,
+ private void logFailedDispatchAttempt(final ExecutionReference reference,
+ final ExecutableFlow exflow,
final Executor selectedExecutor, final ExecutorManagerException e) {
logger.warn(String.format(
"Executor %s responded with exception for exec: %d",
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerUpdaterStage.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerUpdaterStage.java
new file mode 100644
index 0000000..5f0cbf5
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerUpdaterStage.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+/**
+ * Holds value of execution update state (for monitoring).
+ */
+public class ExecutorManagerUpdaterStage {
+
+ private volatile String value = "not started";
+
+ /**
+ * Get the current value.
+ *
+ * @return the current value.
+ */
+ public String get() {
+ return value;
+ }
+
+ /**
+ * Set the value.
+ *
+ * @param value the new value to set.
+ */
+ public void set(String value) {
+ this.value = value;
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutions.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutions.java
new file mode 100644
index 0000000..74dacb1
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutions.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.utils.Pair;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Singleton;
+
+/**
+ * Provides access to running executions.
+ */
+@Singleton
+public class RunningExecutions {
+
+ private final ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningExecutions =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Get running executions.
+ *
+ * @return executions.
+ */
+ public ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> get() {
+ return runningExecutions;
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
new file mode 100644
index 0000000..e15e0c8
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.alert.Alerter;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+/**
+ * Updates running executions.
+ */
+public class RunningExecutionsUpdater {
+
+ private static final Logger logger = Logger.getLogger(RunningExecutionsUpdater.class);
+
+ // When we have an http error, for that flow, we'll check every 10 secs, 360
+ // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
+ private final int numErrorsBetweenUnresponsiveEmail = 360;
+ // First email is sent after 1 minute of unresponsiveness
+ private final int numErrorsBeforeUnresponsiveEmail = 6;
+ private final long errorThreshold = 10000;
+ private final ExecutorManagerUpdaterStage updaterStage;
+ private final AlerterHolder alerterHolder;
+ private final CommonMetrics commonMetrics;
+ private final ExecutorApiGateway apiGateway;
+ private final RunningExecutions runningExecutions;
+ private final ExecutionFinalizer executionFinalizer;
+
+ @Inject
+ public RunningExecutionsUpdater(final ExecutorManagerUpdaterStage updaterStage,
+ final AlerterHolder alerterHolder, final CommonMetrics commonMetrics,
+ final ExecutorApiGateway apiGateway, final RunningExecutions runningExecutions,
+ final ExecutionFinalizer executionFinalizer) {
+ this.updaterStage = updaterStage;
+ this.alerterHolder = alerterHolder;
+ this.commonMetrics = commonMetrics;
+ this.apiGateway = apiGateway;
+ this.runningExecutions = runningExecutions;
+ this.executionFinalizer = executionFinalizer;
+ }
+
+ /**
+ * Updates running executions.
+ */
+ @SuppressWarnings("unchecked")
+ public void updateExecutions() {
+ this.updaterStage.set("Starting update all flows.");
+ final Map<Optional<Executor>, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
+ final ArrayList<ExecutableFlow> finalizeFlows =
+ new ArrayList<>();
+
+ for (final Map.Entry<Optional<Executor>, List<ExecutableFlow>> entry : exFlowMap
+ .entrySet()) {
+ final List<Long> updateTimesList = new ArrayList<>();
+ final List<Integer> executionIdsList = new ArrayList<>();
+
+ final Optional<Executor> executorOption = entry.getKey();
+ if (!executorOption.isPresent()) {
+ for (final ExecutableFlow flow : entry.getValue()) {
+ logger.warn("Finalizing execution " + flow.getExecutionId()
+ + ". Executor id of this execution doesn't exist");
+ finalizeFlows.add(flow);
+ }
+ continue;
+ }
+ final Executor executor = executorOption.get();
+
+ this.updaterStage.set("Starting update flows on " + executor.getHost() + ":"
+ + executor.getPort());
+
+ // We pack the parameters of the same host together before we
+ // query.
+ fillUpdateTimeAndExecId(entry.getValue(), executionIdsList,
+ updateTimesList);
+
+ final Pair<String, String> updateTimes =
+ new Pair<>(
+ ConnectorParams.UPDATE_TIME_LIST_PARAM,
+ JSONUtils.toJSON(updateTimesList));
+ final Pair<String, String> executionIds =
+ new Pair<>(ConnectorParams.EXEC_ID_LIST_PARAM,
+ JSONUtils.toJSON(executionIdsList));
+
+ Map<String, Object> results = null;
+ try {
+ results = this.apiGateway.callWithExecutionId(executor.getHost(),
+ executor.getPort(), ConnectorParams.UPDATE_ACTION,
+ null, null, executionIds, updateTimes);
+ } catch (final ExecutorManagerException e) {
+ handleException(entry, executor, e);
+ }
+
+ if (results != null) {
+ final List<Map<String, Object>> executionUpdates =
+ (List<Map<String, Object>>) results
+ .get(ConnectorParams.RESPONSE_UPDATED_FLOWS);
+ for (final Map<String, Object> updateMap : executionUpdates) {
+ try {
+ final ExecutableFlow flow = updateExecution(updateMap);
+
+ this.updaterStage.set("Updated flow " + flow.getExecutionId());
+
+ if (ExecutorManager.isFinished(flow)) {
+ finalizeFlows.add(flow);
+ }
+ } catch (final ExecutorManagerException e) {
+ final ExecutableFlow flow = e.getExecutableFlow();
+ logger.error(e);
+
+ if (flow != null) {
+ logger.warn("Finalizing execution " + flow.getExecutionId());
+ finalizeFlows.add(flow);
+ }
+ }
+ }
+ }
+ }
+
+ this.updaterStage.set("Finalizing " + finalizeFlows.size() + " error flows.");
+
+ for (final ExecutableFlow flow : finalizeFlows) {
+ this.executionFinalizer
+ .finalizeFlow(flow, "Not running on the assigned executor (any more)", null);
+ }
+
+ this.updaterStage.set("Updated all active flows. Waiting for next round.");
+ }
+
+ private void handleException(Entry<Optional<Executor>, List<ExecutableFlow>> entry,
+ Executor executor, ExecutorManagerException e) {
+ logger.error("Failed to get update from executor " + executor.getHost(), e);
+ boolean sendUnresponsiveEmail = false;
+ for (final ExecutableFlow flow : entry.getValue()) {
+ final Pair<ExecutionReference, ExecutableFlow> pair =
+ this.runningExecutions.get().get(flow.getExecutionId());
+ // TODO can runningFlows.get ever return null, causing NPE below?
+
+ this.updaterStage
+ .set("Failed to get update for flow " + pair.getSecond().getExecutionId());
+
+ final ExecutionReference ref = pair.getFirst();
+ ref.setNextCheckTime(System.currentTimeMillis() + this.errorThreshold);
+ ref.setNumErrors(ref.getNumErrors() + 1);
+ if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
+ || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
+ // if any of the executions has failed many enough updates, alert
+ sendUnresponsiveEmail = true;
+ }
+ }
+ if (sendUnresponsiveEmail) {
+ final Alerter mailAlerter = this.alerterHolder.get("email");
+ mailAlerter.alertOnFailedUpdate(executor, entry.getValue(), e);
+ }
+ }
+
+ /* Group Executable flow by Executors to reduce number of REST calls */
+ private Map<Optional<Executor>, List<ExecutableFlow>> getFlowToExecutorMap() {
+ final HashMap<Optional<Executor>, List<ExecutableFlow>> exFlowMap =
+ new HashMap<>();
+
+ for (final Pair<ExecutionReference, ExecutableFlow> runningFlow : this.runningExecutions.get()
+ .values()) {
+ final ExecutionReference ref = runningFlow.getFirst();
+ final ExecutableFlow flow = runningFlow.getSecond();
+ final Optional<Executor> executor = ref.getExecutor();
+
+ // We can set the next check time to prevent the checking of certain
+ // flows.
+ if (ref.getNextCheckTime() >= System.currentTimeMillis()) {
+ continue;
+ }
+
+ List<ExecutableFlow> flows = exFlowMap.get(executor);
+ if (flows == null) {
+ flows = new ArrayList<>();
+ exFlowMap.put(executor, flows);
+ }
+
+ flows.add(flow);
+ }
+
+ return exFlowMap;
+ }
+
+ private void fillUpdateTimeAndExecId(final List<ExecutableFlow> flows,
+ final List<Integer> executionIds, final List<Long> updateTimes) {
+ for (final ExecutableFlow flow : flows) {
+ executionIds.add(flow.getExecutionId());
+ updateTimes.add(flow.getUpdateTime());
+ }
+ }
+
+ private ExecutableFlow updateExecution(final Map<String, Object> updateData)
+ throws ExecutorManagerException {
+
+ final Integer execId =
+ (Integer) updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
+ if (execId == null) {
+ throw new ExecutorManagerException(
+ "Response is malformed. Need exec id to update.");
+ }
+
+ final Pair<ExecutionReference, ExecutableFlow> refPair =
+ this.runningExecutions.get().get(execId);
+ if (refPair == null) {
+ throw new ExecutorManagerException(
+ "No running flow found with the execution id. Removing " + execId);
+ }
+
+ final ExecutionReference ref = refPair.getFirst();
+ final ExecutableFlow flow = refPair.getSecond();
+ if (updateData.containsKey("error")) {
+ // The flow should be finished here.
+ throw new ExecutorManagerException((String) updateData.get("error"), flow);
+ }
+
+ // Reset errors.
+ ref.setNextCheckTime(0);
+ ref.setNumErrors(0);
+ final Status oldStatus = flow.getStatus();
+ flow.applyUpdateObject(updateData);
+ final Status newStatus = flow.getStatus();
+
+ if (oldStatus != newStatus && newStatus == Status.FAILED) {
+ this.commonMetrics.markFlowFail();
+ }
+
+ final ExecutionOptions options = flow.getExecutionOptions();
+ if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
+ // We want to see if we should give an email status on first failure.
+ if (options.getNotifyOnFirstFailure()) {
+ final Alerter mailAlerter = this.alerterHolder.get("email");
+ try {
+ mailAlerter.alertOnFirstError(flow);
+ } catch (final Exception e) {
+ logger.error("Failed to send first error email." + e.getMessage(), e);
+ }
+ }
+ if (options.getFlowParameters().containsKey("alert.type")) {
+ final String alertType = options.getFlowParameters().get("alert.type");
+ final Alerter alerter = this.alerterHolder.get(alertType);
+ if (alerter != null) {
+ try {
+ alerter.alertOnFirstError(flow);
+ } catch (final Exception e) {
+ logger.error("Failed to alert by " + alertType, e);
+ }
+ } else {
+ logger.error("Alerter type " + alertType
+ + " doesn't exist. Failed to alert.");
+ }
+ }
+ }
+
+ return flow;
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdaterThread.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdaterThread.java
new file mode 100644
index 0000000..a52ab09
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdaterThread.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+/**
+ * Updates running executions periodically.
+ */
+public class RunningExecutionsUpdaterThread extends Thread {
+
+ private static final Logger logger = Logger.getLogger(RunningExecutionsUpdaterThread.class);
+
+ volatile int waitTimeIdleMs = 2000;
+ volatile int waitTimeMs = 500;
+
+ private final RunningExecutionsUpdater updater;
+ private final RunningExecutions runningExecutions;
+ private long lastThreadCheckTime = -1;
+ private boolean shutdown = false;
+
+ @Inject
+ public RunningExecutionsUpdaterThread(final RunningExecutionsUpdater updater,
+ final RunningExecutions runningExecutions) {
+ this.updater = updater;
+ this.runningExecutions = runningExecutions;
+ this.setName("ExecutorManagerUpdaterThread");
+ }
+
+ /**
+ * Start the thread: updates running executions periodically.
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ while (!this.shutdown) {
+ try {
+ this.lastThreadCheckTime = System.currentTimeMillis();
+ this.updater.updateExecutions();
+ // TODO not sure why it would be important to check the status immediately in case of _new_
+ // executions. This can only optimize finalizing executions that finish super-quickly after
+ // being started.
+ waitForNewExecutions();
+ } catch (final Exception e) {
+ logger.error("Unexpected exception in updating executions", e);
+ }
+ }
+ }
+
+ private void waitForNewExecutions() {
+ synchronized (this.runningExecutions) {
+ try {
+ final int waitTimeMillis =
+ this.runningExecutions.get().size() > 0 ? this.waitTimeMs : this.waitTimeIdleMs;
+ if (waitTimeMillis > 0) {
+ this.runningExecutions.wait(waitTimeMillis);
+ }
+ } catch (final InterruptedException e) {
+ }
+ }
+ }
+
+ void shutdown() {
+ this.shutdown = true;
+ }
+
+ public long getLastThreadCheckTime() {
+ return this.lastThreadCheckTime;
+ }
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 7da9c95..49f639c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -69,6 +69,8 @@ public class ExecutorManagerTest {
private AlerterHolder alertHolder;
private ExecutorApiGateway apiGateway;
private Alerter mailAlerter;
+ private RunningExecutions runningExecutions;
+ private ExecutorManagerUpdaterStage updaterStage;
@Before
public void setup() {
@@ -77,6 +79,8 @@ public class ExecutorManagerTest {
this.alertHolder = mock(AlerterHolder.class);
when(this.alertHolder.get("email")).thenReturn(this.mailAlerter);
this.loader = new MockExecutorLoader();
+ this.runningExecutions = new RunningExecutions();
+ this.updaterStage = new ExecutorManagerUpdaterStage();
}
@After
@@ -143,8 +147,19 @@ public class ExecutorManagerTest {
private ExecutorManager createExecutorManager()
throws ExecutorManagerException {
- return new ExecutorManager(this.props, this.loader, this.alertHolder, this.commonMetrics,
- this.apiGateway);
+ // TODO rename this test to ExecutorManagerIntegrationTest & create separate unit tests as well?
+ final ActiveExecutors activeExecutors = new ActiveExecutors(this.props, this.loader);
+ final ExecutionFinalizer executionFinalizer = new ExecutionFinalizer(this.loader,
+ this.updaterStage, this.alertHolder, this.runningExecutions);
+ final RunningExecutionsUpdaterThread updaterThread = new RunningExecutionsUpdaterThread(
+ new RunningExecutionsUpdater(
+ this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
+ this.runningExecutions, executionFinalizer), runningExecutions);
+ updaterThread.waitTimeIdleMs = 0;
+ updaterThread.waitTimeMs = 0;
+ return new ExecutorManager(this.props, this.loader, this.commonMetrics, this.apiGateway,
+ this.runningExecutions, activeExecutors, this.updaterStage, executionFinalizer,
+ updaterThread);
}
/*
@@ -288,9 +303,8 @@ public class ExecutorManagerTest {
}
/**
- * 1. Executor 1 throws an exception when trying to dispatch to it
- * 2. ExecutorManager should try next executor
- * 3. Executor 2 accepts the dispatched execution
+ * 1. Executor 1 throws an exception when trying to dispatch to it 2. ExecutorManager should try
+ * next executor 3. Executor 2 accepts the dispatched execution
*/
@Test
public void testDispatchException() throws Exception {
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index d6a072e..c8ac487 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -18,12 +18,18 @@ package azkaban.trigger;
import static org.mockito.Mockito.mock;
+import azkaban.executor.ActiveExecutors;
import azkaban.executor.AlerterHolder;
+import azkaban.executor.ExecutionFinalizer;
import azkaban.executor.ExecutorApiGateway;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.ExecutorManagerUpdaterStage;
import azkaban.executor.MockExecutorLoader;
+import azkaban.executor.RunningExecutions;
+import azkaban.executor.RunningExecutionsUpdater;
+import azkaban.executor.RunningExecutionsUpdaterThread;
import azkaban.metrics.CommonMetrics;
import azkaban.metrics.MetricsManager;
import azkaban.trigger.builtin.CreateTriggerAction;
@@ -44,6 +50,11 @@ public class TriggerManagerDeadlockTest {
TriggerManager triggerManager;
ExecutorLoader execLoader;
ExecutorApiGateway apiGateway;
+ RunningExecutions runningExecutions;
+ private ExecutorManagerUpdaterStage updaterStage;
+ private AlerterHolder alertHolder;
+ private ExecutionFinalizer executionFinalizer;
+ private CommonMetrics commonMetrics;
@Before
public void setup() throws ExecutorManagerException, TriggerManagerException {
@@ -53,12 +64,30 @@ public class TriggerManagerDeadlockTest {
props.put("executor.port", 12321);
this.execLoader = new MockExecutorLoader();
this.apiGateway = mock(ExecutorApiGateway.class);
- final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
- final ExecutorManager executorManager = new ExecutorManager(props, this.execLoader,
- mock(AlerterHolder.class), commonMetrics, this.apiGateway);
+ this.runningExecutions = new RunningExecutions();
+ this.updaterStage = new ExecutorManagerUpdaterStage();
+ this.alertHolder = mock(AlerterHolder.class);
+ this.executionFinalizer = new ExecutionFinalizer(this.execLoader,
+ this.updaterStage, this.alertHolder, this.runningExecutions);
+ this.commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+ final ExecutorManager executorManager = getExecutorManager(props);
this.triggerManager = new TriggerManager(props, this.loader, executorManager);
}
+ private ExecutorManager getExecutorManager(final Props props) throws ExecutorManagerException {
+ final ActiveExecutors activeExecutors = new ActiveExecutors(props, this.execLoader);
+ final RunningExecutionsUpdaterThread updaterThread = getRunningExecutionsUpdaterThread();
+ return new ExecutorManager(props, this.execLoader, this.commonMetrics, this.apiGateway,
+ this.runningExecutions, activeExecutors, this.updaterStage, this.executionFinalizer,
+ updaterThread);
+ }
+
+ private RunningExecutionsUpdaterThread getRunningExecutionsUpdaterThread() {
+ return new RunningExecutionsUpdaterThread(new RunningExecutionsUpdater(
+ this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
+ this.runningExecutions, this.executionFinalizer), runningExecutions);
+ }
+
@After
public void tearDown() {