Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index 60c4f16..47c7bde 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -17,6 +17,7 @@ package azkaban.executor;
import azkaban.Constants.ConfigurationKeys;
import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutionControllerUtils;
import azkaban.flow.FlowUtils;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
@@ -54,13 +55,15 @@ public class ExecutionController extends EventHandler implements ExecutorManager
private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
private final ExecutorLoader executorLoader;
private final ExecutorApiGateway apiGateway;
+ private final AlerterHolder alerterHolder;
private final int maxConcurrentRunsOneFlow;
@Inject
- ExecutionController(final Props azkProps, final ExecutorLoader executorLoader, final
- ExecutorApiGateway apiGateway) {
+ ExecutionController(final Props azkProps, final ExecutorLoader executorLoader,
+ final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder) {
this.executorLoader = executorLoader;
this.apiGateway = apiGateway;
+ this.alerterHolder = alerterHolder;
this.maxConcurrentRunsOneFlow = getMaxConcurrentRunsOneFlow(azkProps);
}
@@ -407,14 +410,33 @@ public class ExecutionController extends EventHandler implements ExecutorManager
}
/**
- * If a flow was dispatched to an executor, cancel by calling Executor. Else if it's still
- * queued in database, remove it from database queue and finalize. {@inheritDoc}
+ * If a flow is already dispatched to an executor, cancel by calling Executor. Else if it's still
+ * queued in DB, remove it from DB queue and finalize. {@inheritDoc}
*/
@Override
public void cancelFlow(final ExecutableFlow exFlow, final String userId)
throws ExecutorManagerException {
- // Todo: call executor to cancel the flow if it's running or remove from DB queue if it
- // hasn't started
+ synchronized (exFlow) {
+ final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows = this.executorLoader
+ .fetchUnfinishedFlows();
+ if (unfinishedFlows.containsKey(exFlow.getExecutionId())) {
+ final Pair<ExecutionReference, ExecutableFlow> pair = unfinishedFlows
+ .get(exFlow.getExecutionId());
+ if (pair.getFirst().getExecutor().isPresent()) {
+ // Flow is already dispatched to an executor, so call that executor to cancel the flow.
+ this.apiGateway
+ .callWithReferenceByUser(pair.getFirst(), ConnectorParams.CANCEL_ACTION, userId);
+ } else {
+ // Flow is still queued, need to finalize it and update the status in DB.
+ ExecutionControllerUtils.finalizeFlow(this.executorLoader, this.alerterHolder, exFlow,
+ "Cancelled before dispatching to executor", null);
+ }
+ } else {
+ throw new ExecutorManagerException("Execution "
+ + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+ + " isn't running.");
+ }
+ }
}
@Override
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
index 53a23d3..a587fbe 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
@@ -16,12 +16,9 @@
package azkaban.executor;
-import azkaban.alert.Alerter;
-import java.util.LinkedList;
-import java.util.List;
+import azkaban.executor.selector.ExecutionControllerUtils;
import javax.annotation.Nullable;
import javax.inject.Inject;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.log4j.Logger;
/**
@@ -59,12 +56,11 @@ public class ExecutionFinalizer {
final int execId = flow.getExecutionId();
boolean alertUser = true;
- final String[] extraReasons = getFinalizeFlowReasons(reason, originalError);
this.updaterStage.set("finalizing flow " + execId);
// First we check if the execution in the datastore is complete
try {
final ExecutableFlow dsFlow;
- if (ExecutorManager.isFinished(flow)) {
+ if (ExecutionControllerUtils.isFinished(flow)) {
dsFlow = flow;
} else {
this.updaterStage.set("finalizing flow " + execId + " loading from db");
@@ -72,9 +68,9 @@ public class ExecutionFinalizer {
// If it's marked finished, we're good. If not, we fail everything and
// then mark it finished.
- if (!ExecutorManager.isFinished(dsFlow)) {
+ if (!ExecutionControllerUtils.isFinished(dsFlow)) {
this.updaterStage.set("finalizing flow " + execId + " failing the flow");
- failEverything(dsFlow);
+ ExecutionControllerUtils.failEverything(dsFlow);
this.executorLoader.updateExecutableFlow(dsFlow);
}
}
@@ -99,98 +95,10 @@ public class ExecutionFinalizer {
this.updaterStage.set("finalizing flow " + execId + " alerting and emailing");
if (alertUser) {
- final ExecutionOptions options = flow.getExecutionOptions();
- // But we can definitely email them.
- final Alerter mailAlerter = this.alerterHolder.get("email");
- if (flow.getStatus() != Status.SUCCEEDED) {
- if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
- try {
- mailAlerter.alertOnError(flow, extraReasons);
- } catch (final Exception e) {
- logger.error(e);
- }
- }
- if (options.getFlowParameters().containsKey("alert.type")) {
- final String alertType = options.getFlowParameters().get("alert.type");
- final Alerter alerter = this.alerterHolder.get(alertType);
- if (alerter != null) {
- try {
- alerter.alertOnError(flow, extraReasons);
- } catch (final Exception e) {
- logger.error("Failed to alert by " + alertType, e);
- }
- } else {
- logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
- }
- }
- } else {
- if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
- try {
-
- mailAlerter.alertOnSuccess(flow);
- } catch (final Exception e) {
- logger.error(e);
- }
- }
- if (options.getFlowParameters().containsKey("alert.type")) {
- final String alertType = options.getFlowParameters().get("alert.type");
- final Alerter alerter = this.alerterHolder.get(alertType);
- if (alerter != null) {
- try {
- alerter.alertOnSuccess(flow);
- } catch (final Exception e) {
- logger.error("Failed to alert by " + alertType, e);
- }
- } else {
- logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
- }
- }
- }
- }
-
- }
-
- private String[] getFinalizeFlowReasons(final String reason, final Throwable originalError) {
- final List<String> reasons = new LinkedList<>();
- reasons.add(reason);
- if (originalError != null) {
- reasons.add(ExceptionUtils.getStackTrace(originalError));
+ ExecutionControllerUtils.alertUser(flow, this.alerterHolder,
+ ExecutionControllerUtils.getFinalizeFlowReasons(reason,
+ originalError));
}
- return reasons.toArray(new String[reasons.size()]);
- }
-
- private void failEverything(final ExecutableFlow exFlow) {
- final long time = System.currentTimeMillis();
- for (final ExecutableNode node : exFlow.getExecutableNodes()) {
- switch (node.getStatus()) {
- case SUCCEEDED:
- case FAILED:
- case KILLED:
- case SKIPPED:
- case DISABLED:
- continue;
- // case UNKNOWN:
- case READY:
- node.setStatus(Status.KILLING);
- break;
- default:
- node.setStatus(Status.FAILED);
- break;
- }
-
- if (node.getStartTime() == -1) {
- node.setStartTime(time);
- }
- if (node.getEndTime() == -1) {
- node.setEndTime(time);
- }
- }
-
- if (exFlow.getEndTime() == -1) {
- exFlow.setEndTime(time);
- }
-
- exFlow.setStatus(Status.FAILED);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 2947e7a..41146ad 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -134,18 +134,6 @@ public class ExecutorManager extends EventHandler implements
this.executorInfoRefresherService = createExecutorInfoRefresherService();
}
- // TODO move to some common place
- static boolean isFinished(final ExecutableFlow flow) {
- switch (flow.getStatus()) {
- case SUCCEEDED:
- case FAILED:
- case KILLED:
- return true;
- default:
- return false;
- }
- }
-
private int getMaxConcurrentRunsOneFlow(final Props azkProps) {
// The default threshold is set to 30 for now, in case some users are affected. We may
// decrease this number in future, to better prevent DDos attacks.
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
index 1e934f8..78251d3 100644
--- a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
@@ -17,6 +17,7 @@
package azkaban.executor;
import azkaban.alert.Alerter;
+import azkaban.executor.selector.ExecutionControllerUtils;
import azkaban.metrics.CommonMetrics;
import azkaban.utils.Pair;
import java.util.ArrayList;
@@ -35,13 +36,12 @@ import org.joda.time.DateTime;
public class RunningExecutionsUpdater {
private static final Logger logger = Logger.getLogger(RunningExecutionsUpdater.class);
-
- // When we have an http error, for that flow, we'll check every 10 secs, 360
- // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
- private final int numErrorsBetweenUnresponsiveEmail = 360;
// First email is sent after 1 minute of unresponsiveness
final int numErrorsBeforeUnresponsiveEmail = 6;
final long errorThreshold = 10000;
+ // When we have an http error, for that flow, we'll check every 10 secs, 360
+ // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
+ private final int numErrorsBetweenUnresponsiveEmail = 360;
private final ExecutorManagerUpdaterStage updaterStage;
private final AlerterHolder alerterHolder;
private final CommonMetrics commonMetrics;
@@ -108,7 +108,7 @@ public class RunningExecutionsUpdater {
this.updaterStage.set("Updated flow " + flow.getExecutionId());
- if (ExecutorManager.isFinished(flow)) {
+ if (ExecutionControllerUtils.isFinished(flow)) {
finalizeFlows.add(flow);
}
} catch (final ExecutorManagerException e) {
@@ -169,7 +169,7 @@ public class RunningExecutionsUpdater {
}
private boolean isExecutorRemoved(final int id) {
- Executor fetchedExecutor;
+ final Executor fetchedExecutor;
try {
fetchedExecutor = this.executorLoader.fetchExecutor(id);
} catch (final ExecutorManagerException e) {
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutionControllerUtils.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutionControllerUtils.java
new file mode 100644
index 0000000..a22b909
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutionControllerUtils.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import azkaban.alert.Alerter;
+import azkaban.executor.AlerterHolder;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import java.util.LinkedList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils for controlling executions.
+ */
+public class ExecutionControllerUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(ExecutionControllerUtils.class);
+
+ /**
+ * If the current status of the execution is not one of the finished statuses, mark the execution
+ * as failed in the DB.
+ *
+ * @param executorLoader the executor loader
+ * @param alerterHolder the alerter holder
+ * @param flow the execution
+ * @param reason reason for finalizing the execution
+ * @param originalError the cause, if execution is being finalized because of an error
+ */
+ public static void finalizeFlow(final ExecutorLoader executorLoader, final AlerterHolder
+ alerterHolder, final ExecutableFlow flow, final String reason,
+ @Nullable final Throwable originalError) {
+ boolean alertUser = true;
+
+ // First check if the execution in the datastore is finished.
+ try {
+ final ExecutableFlow dsFlow;
+ if (isFinished(flow)) {
+ dsFlow = flow;
+ } else {
+ dsFlow = executorLoader.fetchExecutableFlow(flow.getExecutionId());
+
+ // If it's marked finished, we're good. If not, we fail everything and then mark it
+ // finished.
+ if (!isFinished(dsFlow)) {
+ failEverything(dsFlow);
+ executorLoader.updateExecutableFlow(dsFlow);
+ }
+ }
+
+ if (flow.getEndTime() == -1) {
+ flow.setEndTime(System.currentTimeMillis());
+ executorLoader.updateExecutableFlow(dsFlow);
+ }
+ } catch (final ExecutorManagerException e) {
+ // If failed due to azkaban internal error, do not alert user.
+ alertUser = false;
+ logger.error("Failed to finalize flow " + flow.getExecutionId() + ", do not alert user.", e);
+ }
+
+ if (alertUser) {
+ alertUser(flow, alerterHolder, getFinalizeFlowReasons(reason, originalError));
+ }
+ }
+
+ /**
+ * When a flow is finished, alert the user as is configured in the execution options.
+ *
+ * @param flow the execution
+ * @param alerterHolder the alerter holder
+ * @param extraReasons the extra reasons for alerting
+ */
+ public static void alertUser(final ExecutableFlow flow, final AlerterHolder alerterHolder,
+ final String[] extraReasons) {
+ final ExecutionOptions options = flow.getExecutionOptions();
+ final Alerter mailAlerter = alerterHolder.get("email");
+ if (flow.getStatus() != Status.SUCCEEDED) {
+ if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
+ try {
+ mailAlerter.alertOnError(flow, extraReasons);
+ } catch (final Exception e) {
+ logger.error("Failed to alert on error for execution " + flow.getExecutionId(), e);
+ }
+ }
+ if (options.getFlowParameters().containsKey("alert.type")) {
+ final String alertType = options.getFlowParameters().get("alert.type");
+ final Alerter alerter = alerterHolder.get(alertType);
+ if (alerter != null) {
+ try {
+ alerter.alertOnError(flow, extraReasons);
+ } catch (final Exception e) {
+ logger.error("Failed to alert on error by " + alertType + " for execution " + flow
+ .getExecutionId(), e);
+ }
+ } else {
+ logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+ }
+ }
+ } else {
+ if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
+ try {
+ mailAlerter.alertOnSuccess(flow);
+ } catch (final Exception e) {
+ logger.error("Failed to alert on success for execution " + flow.getExecutionId(), e);
+ }
+ }
+ if (options.getFlowParameters().containsKey("alert.type")) {
+ final String alertType = options.getFlowParameters().get("alert.type");
+ final Alerter alerter = alerterHolder.get(alertType);
+ if (alerter != null) {
+ try {
+ alerter.alertOnSuccess(flow);
+ } catch (final Exception e) {
+ logger.error("Failed to alert on success by " + alertType + " for execution " + flow
+ .getExecutionId(), e);
+ }
+ } else {
+ logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the reasons to finalize the flow.
+ *
+ * @param reason the reason
+ * @param originalError the original error
+ * @return the reasons to finalize the flow
+ */
+ public static String[] getFinalizeFlowReasons(final String reason, final Throwable
+ originalError) {
+ final List<String> reasons = new LinkedList<>();
+ reasons.add(reason);
+ if (originalError != null) {
+ reasons.add(ExceptionUtils.getStackTrace(originalError));
+ }
+ return reasons.toArray(new String[reasons.size()]);
+ }
+
+ /**
+ * Set the flow status to failed and fail every node inside the flow.
+ *
+ * @param exFlow the executable flow
+ */
+ public static void failEverything(final ExecutableFlow exFlow) {
+ final long time = System.currentTimeMillis();
+ for (final ExecutableNode node : exFlow.getExecutableNodes()) {
+ switch (node.getStatus()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ case SKIPPED:
+ case DISABLED:
+ continue;
+ // case UNKNOWN:
+ case READY:
+ node.setStatus(Status.KILLING);
+ break;
+ default:
+ node.setStatus(Status.FAILED);
+ break;
+ }
+
+ if (node.getStartTime() == -1) {
+ node.setStartTime(time);
+ }
+ if (node.getEndTime() == -1) {
+ node.setEndTime(time);
+ }
+ }
+
+ if (exFlow.getEndTime() == -1) {
+ exFlow.setEndTime(time);
+ }
+
+ exFlow.setStatus(Status.FAILED);
+ }
+
+ /**
+ * Check if the flow status is finished.
+ *
+ * @param flow the executable flow
+ * @return the boolean
+ */
+ public static boolean isFinished(final ExecutableFlow flow) {
+ switch (flow.getStatus()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
index 3102761..c1c376f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -48,6 +48,7 @@ public class ExecutionControllerTest {
private ExecutionController controller;
private ExecutorLoader loader;
private ExecutorApiGateway apiGateway;
+ private AlerterHolder alertHolder;
private Props props;
private User user;
private ExecutableFlow flow1;
@@ -65,7 +66,9 @@ public class ExecutionControllerTest {
this.loader = mock(ExecutorLoader.class);
this.apiGateway = mock(ExecutorApiGateway.class);
this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 1);
- this.controller = new ExecutionController(this.props, this.loader, this.apiGateway);
+ this.alertHolder = mock(AlerterHolder.class);
+ this.controller = new ExecutionController(this.props, this.loader, this.apiGateway,
+ this.alertHolder);
final Executor executor1 = new Executor(1, "localhost", 12345, true);
final Executor executor2 = new Executor(2, "localhost", 12346, true);
@@ -90,12 +93,6 @@ public class ExecutionControllerTest {
.of(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
-
- this.unfinishedFlows = ImmutableMap
- .of(this.flow1.getExecutionId(), new Pair<>(this.ref1, this.flow1),
- this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
- this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
- when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
}
@After
@@ -107,6 +104,7 @@ public class ExecutionControllerTest {
@Test
public void testFetchAllActiveFlows() throws Exception {
+ initializeUnfinishedFlows();
final List<ExecutableFlow> flows = this.controller.getRunningFlows();
this.unfinishedFlows.values()
.forEach(pair -> assertThat(flows.contains(pair.getSecond())).isTrue());
@@ -114,6 +112,7 @@ public class ExecutionControllerTest {
@Test
public void testFetchActiveFlowByProject() throws Exception {
+ initializeUnfinishedFlows();
final List<Integer> executions = this.controller
.getRunningFlows(this.flow2.getProjectId(), this.flow2.getFlowId());
assertThat(executions.contains(this.flow2.getExecutionId())).isTrue();
@@ -126,6 +125,7 @@ public class ExecutionControllerTest {
@Test
public void testFetchActiveFlowWithExecutor() throws Exception {
+ initializeUnfinishedFlows();
final List<Pair<ExecutableFlow, Optional<Executor>>> activeFlowsWithExecutor =
this.controller.getActiveFlowsWithExecutor();
this.unfinishedFlows.values().forEach(pair -> assertThat(activeFlowsWithExecutor
@@ -149,16 +149,8 @@ public class ExecutionControllerTest {
@Test
public void testSubmitFlowsExceedingMaxConcurrentRuns() throws Exception {
- this.unfinishedFlows = new HashMap<>();
- when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
- this.controller.submitExecutableFlow(this.flow2, this.user.getUserId());
- verify(this.loader).uploadExecutableFlow(this.flow2);
- this.unfinishedFlows.put(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2));
-
- this.controller.submitExecutableFlow(this.flow3, this.user.getUserId());
- verify(this.loader).uploadExecutableFlow(this.flow3);
- this.unfinishedFlows.put(this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
-
+ submitFlow(this.flow2, this.ref2);
+ submitFlow(this.flow3, this.ref3);
assertThatThrownBy(() -> this.controller.submitExecutableFlow(this.flow4, this.user.getUserId
())).isInstanceOf(ExecutorManagerException.class).hasMessageContaining("Flow " + this
.flow4.getId() + " has more than 1 concurrent runs. Skipping");
@@ -166,14 +158,51 @@ public class ExecutionControllerTest {
@Test
public void testSubmitFlowsWithSkipOption() throws Exception {
- this.unfinishedFlows = new HashMap<>();
- when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
- this.controller.submitExecutableFlow(this.flow2, this.user.getUserId());
- this.unfinishedFlows.put(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2));
+ submitFlow(this.flow2, this.ref2);
this.flow3.getExecutionOptions().setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
assertThatThrownBy(
() -> this.controller.submitExecutableFlow(this.flow3, this.user.getUserId()))
.isInstanceOf(ExecutorManagerException.class).hasMessageContaining(
"Flow " + this.flow3.getId() + " is already running. Skipping execution.");
}
+
+ @Test
+ public void testKillQueuedFlow() throws Exception {
+ // Flow1 is not assigned to any executor and is in PREPARING status.
+ submitFlow(this.flow1, this.ref1);
+ this.flow1.setStatus(Status.PREPARING);
+ this.controller.cancelFlow(this.flow1, this.user.getUserId());
+ // Verify that the status of flow1 is finalized.
+ assertThat(this.flow1.getStatus()).isEqualTo(Status.FAILED);
+ this.flow1.getExecutableNodes().forEach(node -> {
+ assertThat(node.getStatus()).isEqualTo(Status.KILLING);
+ });
+ }
+
+ @Test
+ public void testKillRunningFlow() throws Exception {
+ // Flow2 is assigned to executor2 and is in RUNNING status.
+ submitFlow(this.flow2, this.ref2);
+ this.flow2.setStatus(Status.RUNNING);
+ this.controller.cancelFlow(this.flow2, this.user.getUserId());
+ // Verify that executor is called to cancel flow2.
+ verify(this.apiGateway).callWithReferenceByUser(this.ref2, ConnectorParams.CANCEL_ACTION,
+ this.user.getUserId());
+ }
+
+ private void submitFlow(final ExecutableFlow flow, final ExecutionReference ref) throws
+ Exception {
+ when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
+ when(this.loader.fetchExecutableFlow(flow.getExecutionId())).thenReturn(flow);
+ this.controller.submitExecutableFlow(flow, this.user.getUserId());
+ this.unfinishedFlows.put(flow.getExecutionId(), new Pair<>(ref, flow));
+ }
+
+ private void initializeUnfinishedFlows() throws Exception {
+ this.unfinishedFlows = ImmutableMap
+ .of(this.flow1.getExecutionId(), new Pair<>(this.ref1, this.flow1),
+ this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
+ this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
+ when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
+ }
}