azkaban-aplcache

flow trigger service (#1627) This PR added flow trigger service,

2/6/2018 3:06:33 AM

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
index 039dbf7..0e41840 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
@@ -45,11 +45,11 @@ public class FlowTrigger implements Serializable {
    */
   public FlowTrigger(final CronSchedule schedule, final List<FlowTriggerDependency> dependencies,
       final Duration maxWaitDuration) {
+    // will perform some basic validation here, and futher validation will be performed on
+    // parsing time when NodeBeanLoader parses the XML to flow trigger.
     Preconditions.checkNotNull(schedule, "schedule cannot be null");
     Preconditions.checkNotNull(dependencies, "dependency cannot be null");
     Preconditions.checkNotNull(maxWaitDuration, "max wait time cannot be null");
-    Preconditions.checkArgument(maxWaitDuration.toMinutes() >= 1, "max wait time should be "
-        + "longer than 1 min");
     validateDependencies(dependencies);
     this.schedule = schedule;
     final ImmutableMap.Builder builder = new Builder();
@@ -87,9 +87,8 @@ public class FlowTrigger implements Serializable {
       final Map<String, String> props = dep.getProps();
       // set.add() returns false when there exists duplicate
       Preconditions.checkArgument(seen.add(dep.getType() + ":" + props.toString()), String.format
-          ("duplicate "
-              + "dependency"
-              + "config %s found, dependency config should be unique", dep.getName()));
+          ("duplicate dependency config %s found, dependency config should be unique",
+              dep.getName()));
     }
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
index 350c779..9603015 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
@@ -51,13 +51,9 @@ public class FlowTriggerTest {
     final List<FlowTriggerDependency> validDependencyList = new ArrayList<>();
     final List<FlowTriggerDependency> invalidDependencyList = null;
     final Duration validDuration = Duration.ofMinutes(10);
-    final Duration invalidDuration = Duration.ofMinutes(-1);
 
     assertThatThrownBy(() -> new FlowTrigger(validSchedule, invalidDependencyList, validDuration))
         .isInstanceOf(NullPointerException.class);
-
-    assertThatThrownBy(() -> new FlowTrigger(validSchedule, validDependencyList, invalidDuration))
-        .isInstanceOf(IllegalArgumentException.class);
   }
 
   @Test
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/CancellationCause.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/CancellationCause.java
index 9cc5aae..43cbdf3 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/CancellationCause.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/CancellationCause.java
@@ -20,6 +20,7 @@ public enum CancellationCause {
   NONE, //no cancellation occurred
   TIMEOUT, // cancellation is issued due to exceeding max wait time
   MANUAL, // cancellation is issued by user
-  FAILURE, // cancellation is issued by dependency instance failure
-  CASCADING // cancelled by cascading failure
+  FAILURE, // cancellation is caused by dependency instance failure(e.x invalid input)
+  CASCADING // cancellation is caused by cascading failure(e.x one dependency instance failure
+  // leads to other dependency instances being cancelled)
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
new file mode 100644
index 0000000..ea0ce1b
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -0,0 +1,520 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import azkaban.Constants;
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
+import azkaban.project.FlowTrigger;
+import azkaban.project.FlowTriggerDependency;
+import azkaban.project.Project;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * FlowTriggerService is a singleton class in the AZ web server to
+ * process all trigger-related operations. Externally it provides following
+ * operations -
+ * 1. Create a trigger instance based on trigger definition.
+ * 2. Cancel a trigger instance.
+ * 3. Query running and historic trigger instances.
+ * 4. Recover incomplete trigger instances.
+ *
+ * Internally, it
+ * 1. maintains the list of running trigger instance in memory.
+ * 2. updates status, starttime/endtime of trigger instance.
+ * 3. persists trigger instance to DB.
+ *
+ * FlowTriggerService will be leveraged by Quartz scheduler, our new AZ scheduler to schedule
+ * triggers.
+ */
+
+@SuppressWarnings("FutureReturnValueIgnored")
+@Singleton
+public class FlowTriggerService {
+
+  private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
+  //private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(5);
+  private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
+  private static final String START_TIME = "starttime";
+  private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
+  private final ExecutorService executorService;
+  private final List<TriggerInstance> runningTriggers;
+  private final ScheduledExecutorService timeoutService;
+  private final FlowTriggerDependencyPluginManager triggerPluginManager;
+  private final TriggerInstanceProcessor triggerProcessor;
+  private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
+  private final DependencyInstanceProcessor dependencyProcessor;
+
+  @Inject
+  public FlowTriggerService(final FlowTriggerDependencyPluginManager pluginManager,
+      final TriggerInstanceProcessor triggerProcessor, final DependencyInstanceProcessor
+      dependencyProcessor, final FlowTriggerInstanceLoader flowTriggerInstanceLoader) {
+    // Give the thread a name to make debugging easier.
+    final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("FlowTrigger-service").build();
+    this.executorService = Executors.newSingleThreadExecutor(namedThreadFactory);
+    this.timeoutService = Executors.newScheduledThreadPool(8);
+    this.runningTriggers = new ArrayList<>();
+    this.triggerPluginManager = pluginManager;
+    this.triggerProcessor = triggerProcessor;
+    this.dependencyProcessor = dependencyProcessor;
+    this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
+  }
+
+  private DependencyInstanceContext createDepContext(final FlowTriggerDependency dep, final long
+      starttimeInMills) throws Exception {
+    final DependencyCheck dependencyCheck = this.triggerPluginManager
+        .getDependencyCheck(dep.getType());
+    final DependencyInstanceCallback callback = new DependencyInstanceCallbackImpl(this);
+    final DependencyInstanceConfigImpl config = new DependencyInstanceConfigImpl(dep.getProps());
+    final DependencyInstanceRuntimeProps runtimeProps = new DependencyInstanceRuntimePropsImpl
+        (ImmutableMap.of(START_TIME, String.valueOf(starttimeInMills)));
+    return dependencyCheck.run(config, runtimeProps, callback);
+  }
+
+  private TriggerInstance createTriggerInstance(final FlowTrigger flowTrigger, final String flowId,
+      final int flowVersion, final String submitUser, final Project project) {
+    final String triggerInstId = generateId();
+    logger.info(
+        String.format("Starting the flow trigger %s[execId %s] by %s", flowTrigger, triggerInstId,
+            submitUser));
+    final long startTime = System.currentTimeMillis();
+    // create a list of dependency instances
+    final List<DependencyInstance> depInstList = new ArrayList<>();
+    for (final FlowTriggerDependency dep : flowTrigger.getDependencies()) {
+      final String depName = dep.getName();
+      final Date startDate = new Date(startTime);
+      DependencyInstanceContext context = null;
+      try {
+        context = createDepContext(dep, startTime);
+      } catch (final Exception ex) {
+        logger.error(String.format("unable to create dependency context for trigger instance[id ="
+            + " %s]", triggerInstId), ex);
+      }
+      // if dependency instance context fails to be created, then its status is cancelled and
+      // cause is failure
+      final Status status = context == null ? Status.CANCELLED : Status.RUNNING;
+      final CancellationCause cause =
+          context == null ? CancellationCause.FAILURE : CancellationCause.NONE;
+      final Date endTime = context == null ? new Date() : null;
+      final DependencyInstance depInst = new DependencyInstance(depName, startDate, endTime,
+          context, status, cause);
+      depInstList.add(depInst);
+    }
+
+    final TriggerInstance triggerInstance = new TriggerInstance(triggerInstId, flowTrigger,
+        flowId, flowVersion, submitUser, depInstList, Constants.UNASSIGNED_EXEC_ID, project);
+
+    return triggerInstance;
+  }
+
+  private String generateId() {
+    return UUID.randomUUID().toString();
+  }
+
+  private void scheduleKill(final TriggerInstance triggerInst, final Duration duration, final
+  CancellationCause cause) {
+    logger
+        .info(String.format("Cancel trigger instance %s in %s secs", triggerInst.getId(), duration
+            .getSeconds
+                ()));
+    this.timeoutService.schedule(() -> {
+      cancel(triggerInst, cause);
+    }, duration.toMillis(), TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * @return the list of running trigger instances
+   */
+  public Collection<TriggerInstance> getRunningTriggers() {
+    final Future future = this.executorService.submit(
+        (Callable) () -> FlowTriggerService.this.runningTriggers);
+
+    List<TriggerInstance> triggerInstanceList = new ArrayList<>();
+    try {
+      triggerInstanceList = (List<TriggerInstance>) future.get();
+    } catch (final Exception ex) {
+      logger.error("error in getting running triggers", ex);
+    }
+    return triggerInstanceList;
+  }
+
+  /**
+   * @return the list of running trigger instances
+   */
+  public Collection<TriggerInstance> getRecentlyFinished() {
+    return this.flowTriggerInstanceLoader.getRecentlyFinished(RECENTLY_FINISHED_TRIGGER_LIMIT);
+  }
+
+  public TriggerInstance findTriggerInstanceById(final String triggerInstanceId) {
+    return this.flowTriggerInstanceLoader.getTriggerInstanceById(triggerInstanceId);
+  }
+
+  private boolean isDoneButFlowNotExecuted(final TriggerInstance triggerInstance) {
+    return triggerInstance.getStatus() == Status.SUCCEEDED && triggerInstance.getFlowExecId() ==
+        Constants.UNASSIGNED_EXEC_ID;
+  }
+
+  private void recoverRunningOrCancelling(final TriggerInstance triggerInstance) {
+    final FlowTrigger flowTrigger = triggerInstance.getFlowTrigger();
+    for (final DependencyInstance depInst : triggerInstance.getDepInstances()) {
+      if (depInst.getStatus() == Status.RUNNING || depInst.getStatus() == Status.CANCELLING) {
+        final FlowTriggerDependency dependency = flowTrigger
+            .getDependencyByName(depInst.getDepName());
+        DependencyInstanceContext context = null;
+        try {
+          //recreate dependency instance context
+          context = createDepContext(dependency, depInst.getStartTime().getTime());
+        } catch (final Exception ex) {
+          logger
+              .error(
+                  String.format("unable to create dependency context for trigger instance[id ="
+                      + " %s]", triggerInstance.getId()), ex);
+        }
+        depInst.setDependencyInstanceContext(context);
+        if (context == null) {
+          depInst.setStatus(Status.CANCELLED);
+          depInst.setCancellationCause(CancellationCause.FAILURE);
+        }
+      }
+    }
+
+    if (triggerInstance.getStatus() == Status.CANCELLING) {
+      addToRunningListAndCancel(triggerInstance);
+    } else if (triggerInstance.getStatus() == Status.RUNNING) {
+      final long remainingTime = remainingTimeBeforeTimeout(triggerInstance);
+      addToRunningListAndScheduleKill(triggerInstance, Duration.ofMillis(remainingTime).plus
+          (CANCELLING_GRACE_PERIOD_AFTER_RESTART), CancellationCause.TIMEOUT);
+    }
+  }
+
+  private void recover(final TriggerInstance triggerInstance) {
+    this.executorService.submit(() -> {
+      logger.info(String.format("recovering pending trigger instance %s", triggerInstance.getId
+          ()));
+      if (isDoneButFlowNotExecuted(triggerInstance)) {
+        // if trigger instance succeeds but the associated flow hasn't been started, then start
+        // the flow
+        this.triggerProcessor.processSucceed(triggerInstance);
+      } else {
+        recoverRunningOrCancelling(triggerInstance);
+      }
+    });
+  }
+
+  /**
+   * Resume executions of all incomplete trigger instances by recovering the state from db.
+   */
+  public void recoverIncompleteTriggerInstances() {
+    final Collection<TriggerInstance> unfinishedTriggerInstances = this.flowTriggerInstanceLoader
+        .getIncompleteTriggerInstances();
+    //todo chengren311: what if flow trigger is not found?
+    for (final TriggerInstance triggerInstance : unfinishedTriggerInstances) {
+      if (triggerInstance.getFlowTrigger() != null) {
+        recover(triggerInstance);
+      } else {
+        logger.info(String.format("cannot recover the trigger instance %s, flow trigger is null ",
+            triggerInstance.getId()));
+      }
+    }
+  }
+
+  private void addToRunningListAndScheduleKill(final TriggerInstance triggerInst, final
+  Duration durationBeforeKill, final CancellationCause cause) {
+    // if trigger instance is already done
+    if (!Status.isDone(triggerInst.getStatus())) {
+      this.runningTriggers.add(triggerInst);
+      scheduleKill(triggerInst, durationBeforeKill, cause);
+    }
+  }
+
+  private CancellationCause getCancelleationCause(final TriggerInstance triggerInst) {
+    final Set<CancellationCause> causes = triggerInst.getDepInstances().stream()
+        .map(DependencyInstance::getCancellationCause).collect(Collectors.toSet());
+
+    if (causes.contains(CancellationCause.FAILURE) || causes
+        .contains(CancellationCause.CASCADING)) {
+      return CancellationCause.CASCADING;
+    } else if (causes.contains(CancellationCause.TIMEOUT)) {
+      return CancellationCause.TIMEOUT;
+    } else if (causes.contains(CancellationCause.MANUAL)) {
+      return CancellationCause.MANUAL;
+    } else {
+      return CancellationCause.NONE;
+    }
+  }
+
+
+  private void cancelTriggerInstance(final TriggerInstance triggerInst) {
+    logger.debug("cancelling trigger instance of exec id" + triggerInst.getId());
+    final CancellationCause cause = getCancelleationCause(triggerInst);
+    for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+      if (depInst.getStatus() == Status.CANCELLING) {
+        depInst.getContext().cancel();
+      } else if (depInst.getStatus() == Status.RUNNING) {
+        // sometimes dependency instances of trigger instance in cancelling status can be running.
+        // e.x. dep inst1: failure, dep inst2: running -> trigger inst is in killing
+        this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
+        depInst.getContext().cancel();
+      }
+    }
+  }
+
+  private void addToRunningListAndCancel(final TriggerInstance triggerInst) {
+    this.runningTriggers.add(triggerInst);
+    cancelTriggerInstance(triggerInst);
+  }
+
+
+  private void updateDepInstStatus(final DependencyInstance depInst, final Status newStatus) {
+    depInst.setStatus(newStatus);
+    if (Status.isDone(depInst.getStatus())) {
+      depInst.setEndTime(new Date());
+    }
+  }
+
+  private void processStatusUpdate(final DependencyInstance depInst, final Status newStatus) {
+    logger.debug("process status update for " + depInst);
+    updateDepInstStatus(depInst, newStatus);
+    this.dependencyProcessor.processStatusUpdate(depInst);
+  }
+
+  private void processStatusAndCancelCauseUpdate(final DependencyInstance depInst, final Status
+      newStatus, final CancellationCause cause) {
+    depInst.setCancellationCause(cause);
+    updateDepInstStatus(depInst, newStatus);
+    this.dependencyProcessor.processStatusUpdate(depInst);
+  }
+
+
+  private long remainingTimeBeforeTimeout(final TriggerInstance triggerInst) {
+    final long now = System.currentTimeMillis();
+    return Math.max(0, triggerInst.getFlowTrigger().getMaxWaitDuration().toMillis() - (now -
+        triggerInst.getStartTime().getTime()));
+  }
+
+  /**
+   * Start the trigger. The method will be scheduled to invoke by azkaban scheduler.
+   */
+  public void startTrigger(final FlowTrigger flowTrigger, final String flowId,
+      final int flowVersion, final String submitUser, final Project project) {
+    this.executorService.submit(() -> {
+      final TriggerInstance triggerInst = createTriggerInstance(flowTrigger, flowId, flowVersion,
+          submitUser, project);
+
+      logger.info(String.format("Starting trigger instance[id: %s]", triggerInst.getId()));
+
+      this.triggerProcessor.processNewInstance(triggerInst);
+      if (triggerInst.getStatus() == Status.CANCELLED) {
+        // all dependency instances failed
+        logger.info(String.format("Trigger instance[id: %s] is cancelled since all dependency "
+                + "instances failed to be created",
+            triggerInst.getId()));
+        this.triggerProcessor.processTermination(triggerInst);
+      } else if (triggerInst.getStatus() == Status.CANCELLING) {
+        // some of the dependency instances failed
+        logger.info(
+            String.format("Trigger instance[id: %s] is being cancelled ", triggerInst.getId()));
+        addToRunningListAndCancel(triggerInst);
+      } else {
+        // todo chengren311: it's possible web server restarts before the db update, then
+        // new instance will not be recoverable from db.
+        logger.info(
+            String.format("Trigger instance[id: %s] is successfully created", triggerInst.getId()));
+        addToRunningListAndScheduleKill(triggerInst, triggerInst.getFlowTrigger()
+            .getMaxWaitDuration(), CancellationCause.TIMEOUT);
+      }
+    });
+  }
+
+  private FlowTriggerDependency getFlowTriggerDepByName(final FlowTrigger flowTrigger,
+      final String depName) {
+    return flowTrigger.getDependencies().stream().filter(ftd -> ftd.getName().equals(depName))
+        .findFirst().orElse(null);
+  }
+
+  public TriggerInstance findRunningTriggerInstById(final String triggerInstId) {
+    //todo chengren311: make the method single threaded
+    final Future<TriggerInstance> future = this.executorService.submit(
+        () -> this.runningTriggers.stream()
+            .filter(triggerInst -> triggerInst.getId().equals(triggerInstId)).findFirst()
+            .orElse(null)
+    );
+    try {
+      return future.get();
+    } catch (final Exception e) {
+      logger.error("exception when finding trigger instance by id" + triggerInstId, e);
+      return null;
+    }
+  }
+
+  private void removeRunningTriggerInstById(final String triggerInstId) {
+    for (final Iterator<TriggerInstance> it = this.runningTriggers.iterator(); it.hasNext(); ) {
+      if (triggerInstId.equals(it.next().getId())) {
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * Cancel a trigger instance
+   *
+   * @param triggerInst trigger instance to be cancelled
+   * @param cause cause of cancelling
+   */
+  public void cancel(final TriggerInstance triggerInst, final CancellationCause cause) {
+    this.executorService.submit(
+        () -> {
+          logger.info(
+              String.format("cancelling trigger instance with id %s", triggerInst.getId()));
+          if (triggerInst != null) {
+            for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+              // cancel only running dependencies, no need to cancel a killed/successful dependency
+              // instance
+              if (depInst.getStatus() == Status.RUNNING) {
+                this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
+                depInst.getContext().cancel();
+              }
+            }
+          } else {
+            logger.debug(String
+                .format("unable to cancel a trigger instance in non-running state with id %s",
+                    triggerInst.getId()));
+          }
+        }
+    );
+  }
+
+  private DependencyInstance findDependencyInstanceByContext(
+      final DependencyInstanceContext context) {
+    return this.runningTriggers.stream()
+        .flatMap(triggerInst -> triggerInst.getDepInstances().stream()).filter(
+            depInst -> depInst.getContext() != null && depInst.getContext() == context)
+        .findFirst().orElse(null);
+  }
+
+  /**
+   * Mark the dependency instance context as success
+   */
+  public void markDependencySuccess(final DependencyInstanceContext context) {
+    this.executorService.submit(() -> {
+      final DependencyInstance depInst = findDependencyInstanceByContext(context);
+      if (depInst != null) {
+        logger.info(
+            String.format("setting dependency instance[id: %s, name: %s] status to success",
+                depInst.getTriggerInstance().getId(), depInst.getDepName()));
+
+        if (Status.isDone(depInst.getStatus())) {
+          logger.warn(String.format("OnSuccess of dependency instance[id: %s, name: %s] is ignored",
+              depInst.getTriggerInstance().getId(), depInst.getDepName()));
+          return;
+        }
+
+        processStatusUpdate(depInst, Status.SUCCEEDED);
+        // if associated trigger instance becomes success, then remove it from running list
+        if (depInst.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
+          logger.info(String.format("trigger instance with execId %s succeeded",
+              depInst.getTriggerInstance().getId()));
+          this.triggerProcessor.processSucceed(depInst.getTriggerInstance());
+          this.runningTriggers.remove(depInst.getTriggerInstance());
+        }
+      } else {
+        logger.debug(String.format("unable to find trigger instance with context %s when marking "
+                + "it success",
+            context));
+      }
+    });
+  }
+
+  private boolean cancelledByAzkaban(final DependencyInstance depInst) {
+    return depInst.getStatus() == Status.CANCELLING && (
+        depInst.getCancellationCause() == CancellationCause
+            .MANUAL || depInst.getCancellationCause() == CancellationCause.TIMEOUT || depInst
+            .getCancellationCause() == CancellationCause.CASCADING);
+  }
+
+  private boolean cancelledByDependencyPlugin(final DependencyInstance depInst) {
+    // When onKill is called by the dependency plugin not through flowTriggerService, we treat it
+    // as cancelled by dependency due to failure on dependency side. In this case, cancel cause
+    // remains unset.
+    return depInst.getStatus() == Status.CANCELLED && (depInst.getCancellationCause()
+        == CancellationCause.NONE);
+  }
+
+  public void markDependencyCancelled(final DependencyInstanceContext context) {
+    this.executorService.submit(() -> {
+      final DependencyInstance depInst = findDependencyInstanceByContext(context);
+      if (depInst != null) {
+        logger.info(
+            String.format("setting dependency instance[id: %s, name: %s] status to cancelled",
+                depInst.getTriggerInstance().getId(), depInst.getDepName()));
+        if (cancelledByDependencyPlugin(depInst)) {
+          processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, CancellationCause.FAILURE);
+          cancelTriggerInstance(depInst.getTriggerInstance());
+        } else if (cancelledByAzkaban(depInst)) {
+          processStatusUpdate(depInst, Status.CANCELLED);
+        } else {
+          logger.warn(String.format("OnCancel of dependency instance[id: %s, name: %s] is ignored",
+              depInst.getTriggerInstance().getId(), depInst.getDepName()));
+          return;
+        }
+
+        if (depInst.getTriggerInstance().getStatus() == Status.CANCELLED) {
+          logger.info(
+              String.format("trigger instance with execId %s is cancelled",
+                  depInst.getTriggerInstance().getId()));
+          this.triggerProcessor.processTermination(depInst.getTriggerInstance());
+          removeRunningTriggerInstById(depInst.getTriggerInstance().getId());
+        }
+      } else {
+        logger.warn(String.format("unable to find trigger instance with context %s when marking "
+            + "it cancelled", context));
+      }
+    });
+  }
+
+  /**
+   * Shuts down the service immediately.
+   */
+  public void shutdown() {
+    this.executorService.shutdown(); // Disable new tasks from being submitted
+    this.executorService.shutdownNow(); // Cancel currently executing tasks
+    this.triggerPluginManager.shutdown();
+  }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
new file mode 100644
index 0000000..f0195a7
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import azkaban.executor.ExecutorManager;
+import azkaban.flow.Flow;
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
+import azkaban.flowtrigger.testplugin.TestDependencyCheck;
+import azkaban.flowtrigger.util.TestUtil;
+import azkaban.project.FlowTrigger;
+import azkaban.project.FlowTriggerDependency;
+import azkaban.project.Project;
+import azkaban.utils.Emailer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+
+public class FlowTriggerServiceTest {
+
+  private static final FlowTriggerInstanceLoader flowTriggerInstanceLoader = new
+      MockFlowTriggerInstanceLoader();
+  private static TestDependencyCheck testDepCheck;
+  private static FlowTriggerService flowTriggerService;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    testDepCheck = new TestDependencyCheck();
+    final FlowTriggerDependencyPluginManager pluginManager = mock(FlowTriggerDependencyPluginManager
+        .class);
+    when(pluginManager.getDependencyCheck(ArgumentMatchers.eq("TestDependencyCheck")))
+        .thenReturn(testDepCheck);
+
+    final ExecutorManager executorManager = mock(ExecutorManager.class);
+    when(executorManager.submitExecutableFlow(any(), anyString())).thenReturn("return");
+
+    final Emailer emailer = mock(Emailer.class);
+    Mockito.doNothing().when(emailer).sendEmail(any(), anyString(), anyString());
+
+    final TriggerInstanceProcessor triggerInstProcessor = new TriggerInstanceProcessor(
+        executorManager,
+        flowTriggerInstanceLoader, emailer);
+    final DependencyInstanceProcessor depInstProcessor = new DependencyInstanceProcessor
+        (flowTriggerInstanceLoader);
+
+    flowTriggerService = new FlowTriggerService(pluginManager,
+        triggerInstProcessor, depInstProcessor, flowTriggerInstanceLoader);
+  }
+
+  @Before
+  public void cleanup() {
+    ((MockFlowTriggerInstanceLoader) flowTriggerInstanceLoader).clear();
+  }
+
+  private Project createProject() {
+    final Project project = new Project(1, "project1");
+    project.setVersion(1);
+    final Flow flow = new Flow("testflow");
+    final Map<String, Flow> flowMap = new HashMap<>();
+    flowMap.put("testflow", flow);
+    project.setFlows(flowMap);
+    return project;
+  }
+
+  @Test
+  public void testStartTriggerCancelledByTimeout() throws InterruptedException {
+
+    final List<FlowTriggerDependency> deps = new ArrayList<>();
+    deps.add(TestUtil.createTestDependency("2secs", 2, false));
+    deps.add(TestUtil.createTestDependency("8secs", 8, false));
+    deps.add(TestUtil.createTestDependency("9secs", 9, false));
+    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(5));
+    for (int i = 0; i < 30; i++) {
+      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+    }
+    Thread.sleep(Duration.ofSeconds(6).toMillis());
+    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
+    assertThat(triggerInstances).hasSize(30);
+    for (final TriggerInstance inst : triggerInstances) {
+      assertThat(inst.getStatus()).isEqualTo(Status.CANCELLED);
+      for (final DependencyInstance depInst : inst.getDepInstances()) {
+        if (depInst.getDepName().equals("2secs")) {
+          assertThat(depInst.getStatus()).isEqualTo(Status.SUCCEEDED);
+        } else if (depInst.getDepName().equals("8secs")) {
+          assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
+          assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.TIMEOUT);
+        } else if (depInst.getDepName().equals("9secs")) {
+          assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
+          assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.TIMEOUT);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testStartTriggerCancelledManually() throws InterruptedException {
+    final List<FlowTriggerDependency> deps = new ArrayList<>();
+    deps.add(TestUtil.createTestDependency("2secs", 2, false));
+    deps.add(TestUtil.createTestDependency("8secs", 8, false));
+    deps.add(TestUtil.createTestDependency("9secs", 9, false));
+    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(5));
+    for (int i = 0; i < 30; i++) {
+      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+    }
+
+    Thread.sleep(Duration.ofMillis(500).toMillis());
+    for (final TriggerInstance runningTrigger : flowTriggerService.getRunningTriggers()) {
+      flowTriggerService.cancel(runningTrigger, CancellationCause.MANUAL);
+    }
+    Thread.sleep(Duration.ofMillis(500).toMillis());
+    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
+    assertThat(triggerInstances).hasSize(30);
+    for (final TriggerInstance inst : triggerInstances) {
+      assertThat(inst.getStatus()).isEqualTo(Status.CANCELLED);
+      for (final DependencyInstance depInst : inst.getDepInstances()) {
+        assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
+        assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.MANUAL);
+      }
+    }
+  }
+
+  @Test
+  public void testStartTriggerCancelledByFailure() throws InterruptedException {
+    final List<FlowTriggerDependency> deps = new ArrayList<>();
+    deps.add(TestUtil.createTestDependency("2secs", 2, true));
+    deps.add(TestUtil.createTestDependency("8secs", 8, false));
+    deps.add(TestUtil.createTestDependency("9secs", 9, false));
+    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
+    for (int i = 0; i < 30; i++) {
+      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+    }
+    Thread.sleep(Duration.ofSeconds(1).toMillis());
+    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
+    assertThat(triggerInstances).hasSize(30);
+    for (final TriggerInstance inst : triggerInstances) {
+      assertThat(inst.getStatus()).isEqualTo(Status.CANCELLED);
+      for (final DependencyInstance depInst : inst.getDepInstances()) {
+        if (depInst.getDepName().equals("2secs")) {
+          assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
+          assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.FAILURE);
+        } else {
+          assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
+          assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.CASCADING);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testStartTriggerSuccess() throws InterruptedException {
+    final List<FlowTriggerDependency> deps = new ArrayList<>();
+    deps.add(TestUtil.createTestDependency("2secs", 2, false));
+    deps.add(TestUtil.createTestDependency("3secs", 3, false));
+    deps.add(TestUtil.createTestDependency("4secs", 4, false));
+    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
+    for (int i = 0; i < 30; i++) {
+      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+    }
+    Thread.sleep(Duration.ofSeconds(5).toMillis());
+    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
+    assertThat(triggerInstances).hasSize(30);
+    for (final TriggerInstance inst : triggerInstances) {
+      assertThat(inst.getStatus()).isEqualTo(Status.SUCCEEDED);
+    }
+  }
+
+  @Test
+  public void testRecovery() throws Exception {
+    final List<FlowTriggerDependency> deps = new ArrayList<>();
+    deps.add(TestUtil.createTestDependency("2secs", 2, false));
+    deps.add(TestUtil.createTestDependency("3secs", 3, false));
+    deps.add(TestUtil.createTestDependency("4secs", 4, false));
+    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
+    for (int i = 0; i < 30; i++) {
+      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+    }
+    Thread.sleep(Duration.ofSeconds(1).toMillis());
+    flowTriggerService.shutdown();
+    setup();
+    flowTriggerService.recoverIncompleteTriggerInstances();
+    Thread.sleep(Duration.ofSeconds(5).toMillis());
+    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
+    assertThat(triggerInstances).hasSize(30);
+    for (final TriggerInstance inst : triggerInstances) {
+      assertThat(inst.getStatus()).isEqualTo(Status.SUCCEEDED);
+    }
+  }
+}
+
+
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
new file mode 100644
index 0000000..d8c2999
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class MockFlowTriggerInstanceLoader implements FlowTriggerInstanceLoader {
+
+  private final List<TriggerInstance> triggerInstances = Collections.synchronizedList(new
+      ArrayList<TriggerInstance>());
+
+  public void clear() {
+    this.triggerInstances.clear();
+  }
+
+  @Override
+  public void uploadTriggerInstance(final TriggerInstance triggerInstance) {
+    this.triggerInstances.add(triggerInstance);
+  }
+
+  @Override
+  public void updateDependencyExecutionStatus(final DependencyInstance depInst) {
+    for (final TriggerInstance inst : this.triggerInstances) {
+      if (inst.getId().equals(depInst.getTriggerInstance().getId())) {
+        for (final DependencyInstance dep : inst.getDepInstances()) {
+          if (dep.getDepName().equals(depInst.getDepName())) {
+            dep.setEndTime(depInst.getEndTime());
+            dep.setStatus(depInst.getStatus());
+            dep.setCancellationCause(depInst.getCancellationCause());
+            break;
+          }
+        }
+        break;
+      }
+    }
+  }
+
+  @Override
+  public Collection<TriggerInstance> getIncompleteTriggerInstances() {
+    final List<TriggerInstance> res = new ArrayList<>();
+    for (final TriggerInstance inst : this.triggerInstances) {
+      if (inst.getStatus() == Status.CANCELLING || inst.getStatus() == Status.RUNNING) {
+        res.add(inst);
+      }
+    }
+    return res;
+  }
+
+  @Override
+  public void updateAssociatedFlowExecId(final TriggerInstance triggerInst) {
+    for (final TriggerInstance inst : this.triggerInstances) {
+      if (triggerInst.getId().equals(triggerInst.getId())) {
+        inst.setFlowExecId(triggerInst.getFlowExecId());
+        break;
+      }
+    }
+  }
+
+  @Override
+  public Collection<TriggerInstance> getRecentlyFinished(final int limit) {
+    final List<TriggerInstance> res = new ArrayList<>();
+    for (final TriggerInstance inst : this.triggerInstances) {
+      if (Status.isDone(inst.getStatus())) {
+        res.add(inst);
+      }
+    }
+    return res;
+  }
+
+  @Override
+  public TriggerInstance getTriggerInstanceById(final String triggerInstanceId) {
+    for (final TriggerInstance inst : this.triggerInstances) {
+      if (inst.getId().equals(triggerInstanceId)) {
+        return inst;
+      }
+    }
+    return null;
+  }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java
index d3f4784..8e43c1e 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java
@@ -25,13 +25,19 @@ import azkaban.flowtrigger.DependencyPluginConfig;
 
 public class TestDependencyCheck implements DependencyCheck {
 
+  public static final String FAILURE_FLAG = "failure";
+  public static final String RUN_TIME = "runtime";
   private DependencyPluginConfig config;
 
   @Override
   public DependencyInstanceContext run(final DependencyInstanceConfig config,
       final DependencyInstanceRuntimeProps runtimeProps,
       final DependencyInstanceCallback callback) {
-    return new TestDependencyInstanceContext(config, runtimeProps, callback);
+    if (config.get(FAILURE_FLAG).equals("true")) {
+      throw new RuntimeException("dependency instance creation failure");
+    } else {
+      return new TestDependencyInstanceContext(config, runtimeProps, callback);
+    }
   }
 
   @Override
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java
index 638c421..6b267a2 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java
@@ -20,15 +20,32 @@ import azkaban.flowtrigger.DependencyInstanceCallback;
 import azkaban.flowtrigger.DependencyInstanceConfig;
 import azkaban.flowtrigger.DependencyInstanceContext;
 import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
+@SuppressWarnings("FutureReturnValueIgnored")
 public class TestDependencyInstanceContext implements DependencyInstanceContext {
 
+  private static final ScheduledExecutorService scheduleSerivce = Executors
+      .newScheduledThreadPool(1);
+
+  private final DependencyInstanceCallback callback;
+
   public TestDependencyInstanceContext(final DependencyInstanceConfig config,
       final DependencyInstanceRuntimeProps runtimeProps,
       final DependencyInstanceCallback callback) {
+    final long expectedRunTime = Long.valueOf(config.get("runtime"));
+    this.callback = callback;
+    scheduleSerivce.schedule(this::onSuccess, expectedRunTime, TimeUnit.SECONDS);
+  }
+
+  private void onSuccess() {
+    this.callback.onSuccess(this);
   }
 
   @Override
   public void cancel() {
+    this.callback.onCancel(this);
   }
 }
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/util/TestUtil.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/util/TestUtil.java
new file mode 100644
index 0000000..a34931d
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/util/TestUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.util;
+
+import azkaban.flowtrigger.testplugin.TestDependencyCheck;
+import azkaban.project.CronSchedule;
+import azkaban.project.FlowTrigger;
+import azkaban.project.FlowTriggerDependency;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestUtil {
+
+  public static FlowTriggerDependency createTestDependency(final String name, final long
+      runtimeInSec, final boolean boundToFail) {
+    final Map<String, String> props = new HashMap<>();
+    props.put(TestDependencyCheck.RUN_TIME, String.valueOf(runtimeInSec));
+    props.put(TestDependencyCheck.FAILURE_FLAG, String.valueOf(boundToFail));
+    return new FlowTriggerDependency(name, "TestDependencyCheck", props);
+  }
+
+  public static FlowTrigger createTestFlowTrigger(final List<FlowTriggerDependency> deps,
+      final Duration maxWaitDuration) {
+    return new FlowTrigger(
+        new CronSchedule("* * * * ? *"), deps, maxWaitDuration);
+  }
+}