azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index 60c4f16..47c7bde 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -17,6 +17,7 @@ package azkaban.executor;
 
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.event.EventHandler;
+import azkaban.executor.selector.ExecutionControllerUtils;
 import azkaban.flow.FlowUtils;
 import azkaban.project.Project;
 import azkaban.project.ProjectWhitelist;
@@ -54,13 +55,15 @@ public class ExecutionController extends EventHandler implements ExecutorManager
   private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
   private final ExecutorLoader executorLoader;
   private final ExecutorApiGateway apiGateway;
+  private final AlerterHolder alerterHolder;
   private final int maxConcurrentRunsOneFlow;
 
   @Inject
-  ExecutionController(final Props azkProps, final ExecutorLoader executorLoader, final
-  ExecutorApiGateway apiGateway) {
+  ExecutionController(final Props azkProps, final ExecutorLoader executorLoader,
+      final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder) {
     this.executorLoader = executorLoader;
     this.apiGateway = apiGateway;
+    this.alerterHolder = alerterHolder;
     this.maxConcurrentRunsOneFlow = getMaxConcurrentRunsOneFlow(azkProps);
   }
 
@@ -407,14 +410,33 @@ public class ExecutionController extends EventHandler implements ExecutorManager
   }
 
   /**
-   * If a flow was dispatched to an executor, cancel by calling Executor. Else if it's still
-   * queued in database, remove it from database queue and finalize. {@inheritDoc}
+   * If a flow is already dispatched to an executor, cancel by calling Executor. Else if it's still
+   * queued in DB, remove it from DB queue and finalize. {@inheritDoc}
    */
   @Override
   public void cancelFlow(final ExecutableFlow exFlow, final String userId)
       throws ExecutorManagerException {
-    // Todo: call executor to cancel the flow if it's running or remove from DB queue if it
-    // hasn't started
+    synchronized (exFlow) {
+      final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows = this.executorLoader
+          .fetchUnfinishedFlows();
+      if (unfinishedFlows.containsKey(exFlow.getExecutionId())) {
+        final Pair<ExecutionReference, ExecutableFlow> pair = unfinishedFlows
+            .get(exFlow.getExecutionId());
+        if (pair.getFirst().getExecutor().isPresent()) {
+          // Flow is already dispatched to an executor, so call that executor to cancel the flow.
+          this.apiGateway
+              .callWithReferenceByUser(pair.getFirst(), ConnectorParams.CANCEL_ACTION, userId);
+        } else {
+          // Flow is still queued, need to finalize it and update the status in DB.
+          ExecutionControllerUtils.finalizeFlow(this.executorLoader, this.alerterHolder, exFlow,
+              "Cancelled before dispatching to executor", null);
+        }
+      } else {
+        throw new ExecutorManagerException("Execution "
+            + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
+            + " isn't running.");
+      }
+    }
   }
 
   @Override
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
index 53a23d3..a587fbe 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
@@ -16,12 +16,9 @@
 
 package azkaban.executor;
 
-import azkaban.alert.Alerter;
-import java.util.LinkedList;
-import java.util.List;
+import azkaban.executor.selector.ExecutionControllerUtils;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.log4j.Logger;
 
 /**
@@ -59,12 +56,11 @@ public class ExecutionFinalizer {
 
     final int execId = flow.getExecutionId();
     boolean alertUser = true;
-    final String[] extraReasons = getFinalizeFlowReasons(reason, originalError);
     this.updaterStage.set("finalizing flow " + execId);
     // First we check if the execution in the datastore is complete
     try {
       final ExecutableFlow dsFlow;
-      if (ExecutorManager.isFinished(flow)) {
+      if (ExecutionControllerUtils.isFinished(flow)) {
         dsFlow = flow;
       } else {
         this.updaterStage.set("finalizing flow " + execId + " loading from db");
@@ -72,9 +68,9 @@ public class ExecutionFinalizer {
 
         // If it's marked finished, we're good. If not, we fail everything and
         // then mark it finished.
-        if (!ExecutorManager.isFinished(dsFlow)) {
+        if (!ExecutionControllerUtils.isFinished(dsFlow)) {
           this.updaterStage.set("finalizing flow " + execId + " failing the flow");
-          failEverything(dsFlow);
+          ExecutionControllerUtils.failEverything(dsFlow);
           this.executorLoader.updateExecutableFlow(dsFlow);
         }
       }
@@ -99,98 +95,10 @@ public class ExecutionFinalizer {
 
     this.updaterStage.set("finalizing flow " + execId + " alerting and emailing");
     if (alertUser) {
-      final ExecutionOptions options = flow.getExecutionOptions();
-      // But we can definitely email them.
-      final Alerter mailAlerter = this.alerterHolder.get("email");
-      if (flow.getStatus() != Status.SUCCEEDED) {
-        if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
-          try {
-            mailAlerter.alertOnError(flow, extraReasons);
-          } catch (final Exception e) {
-            logger.error(e);
-          }
-        }
-        if (options.getFlowParameters().containsKey("alert.type")) {
-          final String alertType = options.getFlowParameters().get("alert.type");
-          final Alerter alerter = this.alerterHolder.get(alertType);
-          if (alerter != null) {
-            try {
-              alerter.alertOnError(flow, extraReasons);
-            } catch (final Exception e) {
-              logger.error("Failed to alert by " + alertType, e);
-            }
-          } else {
-            logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
-          }
-        }
-      } else {
-        if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
-          try {
-
-            mailAlerter.alertOnSuccess(flow);
-          } catch (final Exception e) {
-            logger.error(e);
-          }
-        }
-        if (options.getFlowParameters().containsKey("alert.type")) {
-          final String alertType = options.getFlowParameters().get("alert.type");
-          final Alerter alerter = this.alerterHolder.get(alertType);
-          if (alerter != null) {
-            try {
-              alerter.alertOnSuccess(flow);
-            } catch (final Exception e) {
-              logger.error("Failed to alert by " + alertType, e);
-            }
-          } else {
-            logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
-          }
-        }
-      }
-    }
-
-  }
-
-  private String[] getFinalizeFlowReasons(final String reason, final Throwable originalError) {
-    final List<String> reasons = new LinkedList<>();
-    reasons.add(reason);
-    if (originalError != null) {
-      reasons.add(ExceptionUtils.getStackTrace(originalError));
+      ExecutionControllerUtils.alertUser(flow, this.alerterHolder,
+          ExecutionControllerUtils.getFinalizeFlowReasons(reason,
+              originalError));
     }
-    return reasons.toArray(new String[reasons.size()]);
-  }
-
-  private void failEverything(final ExecutableFlow exFlow) {
-    final long time = System.currentTimeMillis();
-    for (final ExecutableNode node : exFlow.getExecutableNodes()) {
-      switch (node.getStatus()) {
-        case SUCCEEDED:
-        case FAILED:
-        case KILLED:
-        case SKIPPED:
-        case DISABLED:
-          continue;
-          // case UNKNOWN:
-        case READY:
-          node.setStatus(Status.KILLING);
-          break;
-        default:
-          node.setStatus(Status.FAILED);
-          break;
-      }
-
-      if (node.getStartTime() == -1) {
-        node.setStartTime(time);
-      }
-      if (node.getEndTime() == -1) {
-        node.setEndTime(time);
-      }
-    }
-
-    if (exFlow.getEndTime() == -1) {
-      exFlow.setEndTime(time);
-    }
-
-    exFlow.setStatus(Status.FAILED);
   }
 
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 2947e7a..41146ad 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -134,18 +134,6 @@ public class ExecutorManager extends EventHandler implements
     this.executorInfoRefresherService = createExecutorInfoRefresherService();
   }
 
-  // TODO move to some common place
-  static boolean isFinished(final ExecutableFlow flow) {
-    switch (flow.getStatus()) {
-      case SUCCEEDED:
-      case FAILED:
-      case KILLED:
-        return true;
-      default:
-        return false;
-    }
-  }
-
   private int getMaxConcurrentRunsOneFlow(final Props azkProps) {
     // The default threshold is set to 30 for now, in case some users are affected. We may
     // decrease this number in future, to better prevent DDos attacks.
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
index 1e934f8..78251d3 100644
--- a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
@@ -17,6 +17,7 @@
 package azkaban.executor;
 
 import azkaban.alert.Alerter;
+import azkaban.executor.selector.ExecutionControllerUtils;
 import azkaban.metrics.CommonMetrics;
 import azkaban.utils.Pair;
 import java.util.ArrayList;
@@ -35,13 +36,12 @@ import org.joda.time.DateTime;
 public class RunningExecutionsUpdater {
 
   private static final Logger logger = Logger.getLogger(RunningExecutionsUpdater.class);
-
-  // When we have an http error, for that flow, we'll check every 10 secs, 360
-  // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
-  private final int numErrorsBetweenUnresponsiveEmail = 360;
   // First email is sent after 1 minute of unresponsiveness
   final int numErrorsBeforeUnresponsiveEmail = 6;
   final long errorThreshold = 10000;
+  // When we have an http error, for that flow, we'll check every 10 secs, 360
+  // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
+  private final int numErrorsBetweenUnresponsiveEmail = 360;
   private final ExecutorManagerUpdaterStage updaterStage;
   private final AlerterHolder alerterHolder;
   private final CommonMetrics commonMetrics;
@@ -108,7 +108,7 @@ public class RunningExecutionsUpdater {
 
             this.updaterStage.set("Updated flow " + flow.getExecutionId());
 
-            if (ExecutorManager.isFinished(flow)) {
+            if (ExecutionControllerUtils.isFinished(flow)) {
               finalizeFlows.add(flow);
             }
           } catch (final ExecutorManagerException e) {
@@ -169,7 +169,7 @@ public class RunningExecutionsUpdater {
   }
 
   private boolean isExecutorRemoved(final int id) {
-    Executor fetchedExecutor;
+    final Executor fetchedExecutor;
     try {
       fetchedExecutor = this.executorLoader.fetchExecutor(id);
     } catch (final ExecutorManagerException e) {
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutionControllerUtils.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutionControllerUtils.java
new file mode 100644
index 0000000..a22b909
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutionControllerUtils.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import azkaban.alert.Alerter;
+import azkaban.executor.AlerterHolder;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import java.util.LinkedList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utils for controlling executions.
+ */
+public class ExecutionControllerUtils {
+
+  private static final Logger logger = LoggerFactory.getLogger(ExecutionControllerUtils.class);
+
+  /**
+   * If the current status of the execution is not one of the finished statuses, mark the execution
+   * as failed in the DB.
+   *
+   * @param executorLoader the executor loader
+   * @param alerterHolder the alerter holder
+   * @param flow the execution
+   * @param reason reason for finalizing the execution
+   * @param originalError the cause, if execution is being finalized because of an error
+   */
+  public static void finalizeFlow(final ExecutorLoader executorLoader, final AlerterHolder
+      alerterHolder, final ExecutableFlow flow, final String reason,
+      @Nullable final Throwable originalError) {
+    boolean alertUser = true;
+
+    // First check if the execution in the datastore is finished.
+    try {
+      final ExecutableFlow dsFlow;
+      if (isFinished(flow)) {
+        dsFlow = flow;
+      } else {
+        dsFlow = executorLoader.fetchExecutableFlow(flow.getExecutionId());
+
+        // If it's marked finished, we're good. If not, we fail everything and then mark it
+        // finished.
+        if (!isFinished(dsFlow)) {
+          failEverything(dsFlow);
+          executorLoader.updateExecutableFlow(dsFlow);
+        }
+      }
+
+      if (flow.getEndTime() == -1) {
+        flow.setEndTime(System.currentTimeMillis());
+        executorLoader.updateExecutableFlow(dsFlow);
+      }
+    } catch (final ExecutorManagerException e) {
+      // If failed due to azkaban internal error, do not alert user.
+      alertUser = false;
+      logger.error("Failed to finalize flow " + flow.getExecutionId() + ", do not alert user.", e);
+    }
+
+    if (alertUser) {
+      alertUser(flow, alerterHolder, getFinalizeFlowReasons(reason, originalError));
+    }
+  }
+
+  /**
+   * When a flow is finished, alert the user as is configured in the execution options.
+   *
+   * @param flow the execution
+   * @param alerterHolder the alerter holder
+   * @param extraReasons the extra reasons for alerting
+   */
+  public static void alertUser(final ExecutableFlow flow, final AlerterHolder alerterHolder,
+      final String[] extraReasons) {
+    final ExecutionOptions options = flow.getExecutionOptions();
+    final Alerter mailAlerter = alerterHolder.get("email");
+    if (flow.getStatus() != Status.SUCCEEDED) {
+      if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
+        try {
+          mailAlerter.alertOnError(flow, extraReasons);
+        } catch (final Exception e) {
+          logger.error("Failed to alert on error for execution " + flow.getExecutionId(), e);
+        }
+      }
+      if (options.getFlowParameters().containsKey("alert.type")) {
+        final String alertType = options.getFlowParameters().get("alert.type");
+        final Alerter alerter = alerterHolder.get(alertType);
+        if (alerter != null) {
+          try {
+            alerter.alertOnError(flow, extraReasons);
+          } catch (final Exception e) {
+            logger.error("Failed to alert on error by " + alertType + " for execution " + flow
+                .getExecutionId(), e);
+          }
+        } else {
+          logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+        }
+      }
+    } else {
+      if (options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty()) {
+        try {
+          mailAlerter.alertOnSuccess(flow);
+        } catch (final Exception e) {
+          logger.error("Failed to alert on success for execution " + flow.getExecutionId(), e);
+        }
+      }
+      if (options.getFlowParameters().containsKey("alert.type")) {
+        final String alertType = options.getFlowParameters().get("alert.type");
+        final Alerter alerter = alerterHolder.get(alertType);
+        if (alerter != null) {
+          try {
+            alerter.alertOnSuccess(flow);
+          } catch (final Exception e) {
+            logger.error("Failed to alert on success by " + alertType + " for execution " + flow
+                .getExecutionId(), e);
+          }
+        } else {
+          logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+        }
+      }
+    }
+  }
+
+  /**
+   * Get the reasons to finalize the flow.
+   *
+   * @param reason the reason
+   * @param originalError the original error
+   * @return the reasons to finalize the flow
+   */
+  public static String[] getFinalizeFlowReasons(final String reason, final Throwable
+      originalError) {
+    final List<String> reasons = new LinkedList<>();
+    reasons.add(reason);
+    if (originalError != null) {
+      reasons.add(ExceptionUtils.getStackTrace(originalError));
+    }
+    return reasons.toArray(new String[reasons.size()]);
+  }
+
+  /**
+   * Set the flow status to failed and fail every node inside the flow.
+   *
+   * @param exFlow the executable flow
+   */
+  public static void failEverything(final ExecutableFlow exFlow) {
+    final long time = System.currentTimeMillis();
+    for (final ExecutableNode node : exFlow.getExecutableNodes()) {
+      switch (node.getStatus()) {
+        case SUCCEEDED:
+        case FAILED:
+        case KILLED:
+        case SKIPPED:
+        case DISABLED:
+          continue;
+          // case UNKNOWN:
+        case READY:
+          node.setStatus(Status.KILLING);
+          break;
+        default:
+          node.setStatus(Status.FAILED);
+          break;
+      }
+
+      if (node.getStartTime() == -1) {
+        node.setStartTime(time);
+      }
+      if (node.getEndTime() == -1) {
+        node.setEndTime(time);
+      }
+    }
+
+    if (exFlow.getEndTime() == -1) {
+      exFlow.setEndTime(time);
+    }
+
+    exFlow.setStatus(Status.FAILED);
+  }
+
+  /**
+   * Check if the flow status is finished.
+   *
+   * @param flow the executable flow
+   * @return the boolean
+   */
+  public static boolean isFinished(final ExecutableFlow flow) {
+    switch (flow.getStatus()) {
+      case SUCCEEDED:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
+    }
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
index 3102761..c1c376f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -48,6 +48,7 @@ public class ExecutionControllerTest {
   private ExecutionController controller;
   private ExecutorLoader loader;
   private ExecutorApiGateway apiGateway;
+  private AlerterHolder alertHolder;
   private Props props;
   private User user;
   private ExecutableFlow flow1;
@@ -65,7 +66,9 @@ public class ExecutionControllerTest {
     this.loader = mock(ExecutorLoader.class);
     this.apiGateway = mock(ExecutorApiGateway.class);
     this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 1);
-    this.controller = new ExecutionController(this.props, this.loader, this.apiGateway);
+    this.alertHolder = mock(AlerterHolder.class);
+    this.controller = new ExecutionController(this.props, this.loader, this.apiGateway,
+        this.alertHolder);
 
     final Executor executor1 = new Executor(1, "localhost", 12345, true);
     final Executor executor2 = new Executor(2, "localhost", 12346, true);
@@ -90,12 +93,6 @@ public class ExecutionControllerTest {
         .of(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
             this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
     when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
-
-    this.unfinishedFlows = ImmutableMap
-        .of(this.flow1.getExecutionId(), new Pair<>(this.ref1, this.flow1),
-            this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
-            this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
-    when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
   }
 
   @After
@@ -107,6 +104,7 @@ public class ExecutionControllerTest {
 
   @Test
   public void testFetchAllActiveFlows() throws Exception {
+    initializeUnfinishedFlows();
     final List<ExecutableFlow> flows = this.controller.getRunningFlows();
     this.unfinishedFlows.values()
         .forEach(pair -> assertThat(flows.contains(pair.getSecond())).isTrue());
@@ -114,6 +112,7 @@ public class ExecutionControllerTest {
 
   @Test
   public void testFetchActiveFlowByProject() throws Exception {
+    initializeUnfinishedFlows();
     final List<Integer> executions = this.controller
         .getRunningFlows(this.flow2.getProjectId(), this.flow2.getFlowId());
     assertThat(executions.contains(this.flow2.getExecutionId())).isTrue();
@@ -126,6 +125,7 @@ public class ExecutionControllerTest {
 
   @Test
   public void testFetchActiveFlowWithExecutor() throws Exception {
+    initializeUnfinishedFlows();
     final List<Pair<ExecutableFlow, Optional<Executor>>> activeFlowsWithExecutor =
         this.controller.getActiveFlowsWithExecutor();
     this.unfinishedFlows.values().forEach(pair -> assertThat(activeFlowsWithExecutor
@@ -149,16 +149,8 @@ public class ExecutionControllerTest {
 
   @Test
   public void testSubmitFlowsExceedingMaxConcurrentRuns() throws Exception {
-    this.unfinishedFlows = new HashMap<>();
-    when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
-    this.controller.submitExecutableFlow(this.flow2, this.user.getUserId());
-    verify(this.loader).uploadExecutableFlow(this.flow2);
-    this.unfinishedFlows.put(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2));
-
-    this.controller.submitExecutableFlow(this.flow3, this.user.getUserId());
-    verify(this.loader).uploadExecutableFlow(this.flow3);
-    this.unfinishedFlows.put(this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
-
+    submitFlow(this.flow2, this.ref2);
+    submitFlow(this.flow3, this.ref3);
     assertThatThrownBy(() -> this.controller.submitExecutableFlow(this.flow4, this.user.getUserId
         ())).isInstanceOf(ExecutorManagerException.class).hasMessageContaining("Flow " + this
         .flow4.getId() + " has more than 1 concurrent runs. Skipping");
@@ -166,14 +158,51 @@ public class ExecutionControllerTest {
 
   @Test
   public void testSubmitFlowsWithSkipOption() throws Exception {
-    this.unfinishedFlows = new HashMap<>();
-    when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
-    this.controller.submitExecutableFlow(this.flow2, this.user.getUserId());
-    this.unfinishedFlows.put(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2));
+    submitFlow(this.flow2, this.ref2);
     this.flow3.getExecutionOptions().setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
     assertThatThrownBy(
         () -> this.controller.submitExecutableFlow(this.flow3, this.user.getUserId()))
         .isInstanceOf(ExecutorManagerException.class).hasMessageContaining(
         "Flow " + this.flow3.getId() + " is already running. Skipping execution.");
   }
+
+  @Test
+  public void testKillQueuedFlow() throws Exception {
+    // Flow1 is not assigned to any executor and is in PREPARING status.
+    submitFlow(this.flow1, this.ref1);
+    this.flow1.setStatus(Status.PREPARING);
+    this.controller.cancelFlow(this.flow1, this.user.getUserId());
+    // Verify that the status of flow1 is finalized.
+    assertThat(this.flow1.getStatus()).isEqualTo(Status.FAILED);
+    this.flow1.getExecutableNodes().forEach(node -> {
+      assertThat(node.getStatus()).isEqualTo(Status.KILLING);
+    });
+  }
+
+  @Test
+  public void testKillRunningFlow() throws Exception {
+    // Flow2 is assigned to executor2 and is in RUNNING status.
+    submitFlow(this.flow2, this.ref2);
+    this.flow2.setStatus(Status.RUNNING);
+    this.controller.cancelFlow(this.flow2, this.user.getUserId());
+    // Verify that executor is called to cancel flow2.
+    verify(this.apiGateway).callWithReferenceByUser(this.ref2, ConnectorParams.CANCEL_ACTION,
+        this.user.getUserId());
+  }
+
+  private void submitFlow(final ExecutableFlow flow, final ExecutionReference ref) throws
+      Exception {
+    when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
+    when(this.loader.fetchExecutableFlow(flow.getExecutionId())).thenReturn(flow);
+    this.controller.submitExecutableFlow(flow, this.user.getUserId());
+    this.unfinishedFlows.put(flow.getExecutionId(), new Pair<>(ref, flow));
+  }
+
+  private void initializeUnfinishedFlows() throws Exception {
+    this.unfinishedFlows = ImmutableMap
+        .of(this.flow1.getExecutionId(), new Pair<>(this.ref1, this.flow1),
+            this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
+            this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
+    when(this.loader.fetchUnfinishedFlows()).thenReturn(this.unfinishedFlows);
+  }
 }