azkaban-aplcache
Changes
azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstanceCallbackImpl.java 25(+15 -10)
azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java 17(+17 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
index 039dbf7..0e41840 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
@@ -45,11 +45,11 @@ public class FlowTrigger implements Serializable {
*/
public FlowTrigger(final CronSchedule schedule, final List<FlowTriggerDependency> dependencies,
final Duration maxWaitDuration) {
+ // will perform some basic validation here, and futher validation will be performed on
+ // parsing time when NodeBeanLoader parses the XML to flow trigger.
Preconditions.checkNotNull(schedule, "schedule cannot be null");
Preconditions.checkNotNull(dependencies, "dependency cannot be null");
Preconditions.checkNotNull(maxWaitDuration, "max wait time cannot be null");
- Preconditions.checkArgument(maxWaitDuration.toMinutes() >= 1, "max wait time should be "
- + "longer than 1 min");
validateDependencies(dependencies);
this.schedule = schedule;
final ImmutableMap.Builder builder = new Builder();
@@ -87,9 +87,8 @@ public class FlowTrigger implements Serializable {
final Map<String, String> props = dep.getProps();
// set.add() returns false when there exists duplicate
Preconditions.checkArgument(seen.add(dep.getType() + ":" + props.toString()), String.format
- ("duplicate "
- + "dependency"
- + "config %s found, dependency config should be unique", dep.getName()));
+ ("duplicate dependency config %s found, dependency config should be unique",
+ dep.getName()));
}
}
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
index 350c779..9603015 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowTriggerTest.java
@@ -51,13 +51,9 @@ public class FlowTriggerTest {
final List<FlowTriggerDependency> validDependencyList = new ArrayList<>();
final List<FlowTriggerDependency> invalidDependencyList = null;
final Duration validDuration = Duration.ofMinutes(10);
- final Duration invalidDuration = Duration.ofMinutes(-1);
assertThatThrownBy(() -> new FlowTrigger(validSchedule, invalidDependencyList, validDuration))
.isInstanceOf(NullPointerException.class);
-
- assertThatThrownBy(() -> new FlowTrigger(validSchedule, validDependencyList, invalidDuration))
- .isInstanceOf(IllegalArgumentException.class);
}
@Test
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/CancellationCause.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/CancellationCause.java
index 9cc5aae..43cbdf3 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/CancellationCause.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/CancellationCause.java
@@ -20,6 +20,7 @@ public enum CancellationCause {
NONE, //no cancellation occurred
TIMEOUT, // cancellation is issued due to exceeding max wait time
MANUAL, // cancellation is issued by user
- FAILURE, // cancellation is issued by dependency instance failure
- CASCADING // cancelled by cascading failure
+ FAILURE, // cancellation is caused by dependency instance failure(e.x invalid input)
+ CASCADING // cancellation is caused by cascading failure(e.x one dependency instance failure
+ // leads to other dependency instances being cancelled)
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
new file mode 100644
index 0000000..ea0ce1b
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -0,0 +1,520 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import azkaban.Constants;
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
+import azkaban.project.FlowTrigger;
+import azkaban.project.FlowTriggerDependency;
+import azkaban.project.Project;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * FlowTriggerService is a singleton class in the AZ web server to
+ * process all trigger-related operations. Externally it provides following
+ * operations -
+ * 1. Create a trigger instance based on trigger definition.
+ * 2. Cancel a trigger instance.
+ * 3. Query running and historic trigger instances.
+ * 4. Recover incomplete trigger instances.
+ *
+ * Internally, it
+ * 1. maintains the list of running trigger instance in memory.
+ * 2. updates status, starttime/endtime of trigger instance.
+ * 3. persists trigger instance to DB.
+ *
+ * FlowTriggerService will be leveraged by Quartz scheduler, our new AZ scheduler to schedule
+ * triggers.
+ */
+
+@SuppressWarnings("FutureReturnValueIgnored")
+@Singleton
+public class FlowTriggerService {
+
+ private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
+ //private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(5);
+ private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
+ private static final String START_TIME = "starttime";
+ private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
+ private final ExecutorService executorService;
+ private final List<TriggerInstance> runningTriggers;
+ private final ScheduledExecutorService timeoutService;
+ private final FlowTriggerDependencyPluginManager triggerPluginManager;
+ private final TriggerInstanceProcessor triggerProcessor;
+ private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
+ private final DependencyInstanceProcessor dependencyProcessor;
+
+ @Inject
+ public FlowTriggerService(final FlowTriggerDependencyPluginManager pluginManager,
+ final TriggerInstanceProcessor triggerProcessor, final DependencyInstanceProcessor
+ dependencyProcessor, final FlowTriggerInstanceLoader flowTriggerInstanceLoader) {
+ // Give the thread a name to make debugging easier.
+ final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("FlowTrigger-service").build();
+ this.executorService = Executors.newSingleThreadExecutor(namedThreadFactory);
+ this.timeoutService = Executors.newScheduledThreadPool(8);
+ this.runningTriggers = new ArrayList<>();
+ this.triggerPluginManager = pluginManager;
+ this.triggerProcessor = triggerProcessor;
+ this.dependencyProcessor = dependencyProcessor;
+ this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
+ }
+
+ private DependencyInstanceContext createDepContext(final FlowTriggerDependency dep, final long
+ starttimeInMills) throws Exception {
+ final DependencyCheck dependencyCheck = this.triggerPluginManager
+ .getDependencyCheck(dep.getType());
+ final DependencyInstanceCallback callback = new DependencyInstanceCallbackImpl(this);
+ final DependencyInstanceConfigImpl config = new DependencyInstanceConfigImpl(dep.getProps());
+ final DependencyInstanceRuntimeProps runtimeProps = new DependencyInstanceRuntimePropsImpl
+ (ImmutableMap.of(START_TIME, String.valueOf(starttimeInMills)));
+ return dependencyCheck.run(config, runtimeProps, callback);
+ }
+
+ private TriggerInstance createTriggerInstance(final FlowTrigger flowTrigger, final String flowId,
+ final int flowVersion, final String submitUser, final Project project) {
+ final String triggerInstId = generateId();
+ logger.info(
+ String.format("Starting the flow trigger %s[execId %s] by %s", flowTrigger, triggerInstId,
+ submitUser));
+ final long startTime = System.currentTimeMillis();
+ // create a list of dependency instances
+ final List<DependencyInstance> depInstList = new ArrayList<>();
+ for (final FlowTriggerDependency dep : flowTrigger.getDependencies()) {
+ final String depName = dep.getName();
+ final Date startDate = new Date(startTime);
+ DependencyInstanceContext context = null;
+ try {
+ context = createDepContext(dep, startTime);
+ } catch (final Exception ex) {
+ logger.error(String.format("unable to create dependency context for trigger instance[id ="
+ + " %s]", triggerInstId), ex);
+ }
+ // if dependency instance context fails to be created, then its status is cancelled and
+ // cause is failure
+ final Status status = context == null ? Status.CANCELLED : Status.RUNNING;
+ final CancellationCause cause =
+ context == null ? CancellationCause.FAILURE : CancellationCause.NONE;
+ final Date endTime = context == null ? new Date() : null;
+ final DependencyInstance depInst = new DependencyInstance(depName, startDate, endTime,
+ context, status, cause);
+ depInstList.add(depInst);
+ }
+
+ final TriggerInstance triggerInstance = new TriggerInstance(triggerInstId, flowTrigger,
+ flowId, flowVersion, submitUser, depInstList, Constants.UNASSIGNED_EXEC_ID, project);
+
+ return triggerInstance;
+ }
+
+ private String generateId() {
+ return UUID.randomUUID().toString();
+ }
+
+ private void scheduleKill(final TriggerInstance triggerInst, final Duration duration, final
+ CancellationCause cause) {
+ logger
+ .info(String.format("Cancel trigger instance %s in %s secs", triggerInst.getId(), duration
+ .getSeconds
+ ()));
+ this.timeoutService.schedule(() -> {
+ cancel(triggerInst, cause);
+ }, duration.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @return the list of running trigger instances
+ */
+ public Collection<TriggerInstance> getRunningTriggers() {
+ final Future future = this.executorService.submit(
+ (Callable) () -> FlowTriggerService.this.runningTriggers);
+
+ List<TriggerInstance> triggerInstanceList = new ArrayList<>();
+ try {
+ triggerInstanceList = (List<TriggerInstance>) future.get();
+ } catch (final Exception ex) {
+ logger.error("error in getting running triggers", ex);
+ }
+ return triggerInstanceList;
+ }
+
+ /**
+ * @return the list of running trigger instances
+ */
+ public Collection<TriggerInstance> getRecentlyFinished() {
+ return this.flowTriggerInstanceLoader.getRecentlyFinished(RECENTLY_FINISHED_TRIGGER_LIMIT);
+ }
+
+ public TriggerInstance findTriggerInstanceById(final String triggerInstanceId) {
+ return this.flowTriggerInstanceLoader.getTriggerInstanceById(triggerInstanceId);
+ }
+
+ private boolean isDoneButFlowNotExecuted(final TriggerInstance triggerInstance) {
+ return triggerInstance.getStatus() == Status.SUCCEEDED && triggerInstance.getFlowExecId() ==
+ Constants.UNASSIGNED_EXEC_ID;
+ }
+
+ private void recoverRunningOrCancelling(final TriggerInstance triggerInstance) {
+ final FlowTrigger flowTrigger = triggerInstance.getFlowTrigger();
+ for (final DependencyInstance depInst : triggerInstance.getDepInstances()) {
+ if (depInst.getStatus() == Status.RUNNING || depInst.getStatus() == Status.CANCELLING) {
+ final FlowTriggerDependency dependency = flowTrigger
+ .getDependencyByName(depInst.getDepName());
+ DependencyInstanceContext context = null;
+ try {
+ //recreate dependency instance context
+ context = createDepContext(dependency, depInst.getStartTime().getTime());
+ } catch (final Exception ex) {
+ logger
+ .error(
+ String.format("unable to create dependency context for trigger instance[id ="
+ + " %s]", triggerInstance.getId()), ex);
+ }
+ depInst.setDependencyInstanceContext(context);
+ if (context == null) {
+ depInst.setStatus(Status.CANCELLED);
+ depInst.setCancellationCause(CancellationCause.FAILURE);
+ }
+ }
+ }
+
+ if (triggerInstance.getStatus() == Status.CANCELLING) {
+ addToRunningListAndCancel(triggerInstance);
+ } else if (triggerInstance.getStatus() == Status.RUNNING) {
+ final long remainingTime = remainingTimeBeforeTimeout(triggerInstance);
+ addToRunningListAndScheduleKill(triggerInstance, Duration.ofMillis(remainingTime).plus
+ (CANCELLING_GRACE_PERIOD_AFTER_RESTART), CancellationCause.TIMEOUT);
+ }
+ }
+
+ private void recover(final TriggerInstance triggerInstance) {
+ this.executorService.submit(() -> {
+ logger.info(String.format("recovering pending trigger instance %s", triggerInstance.getId
+ ()));
+ if (isDoneButFlowNotExecuted(triggerInstance)) {
+ // if trigger instance succeeds but the associated flow hasn't been started, then start
+ // the flow
+ this.triggerProcessor.processSucceed(triggerInstance);
+ } else {
+ recoverRunningOrCancelling(triggerInstance);
+ }
+ });
+ }
+
+ /**
+ * Resume executions of all incomplete trigger instances by recovering the state from db.
+ */
+ public void recoverIncompleteTriggerInstances() {
+ final Collection<TriggerInstance> unfinishedTriggerInstances = this.flowTriggerInstanceLoader
+ .getIncompleteTriggerInstances();
+ //todo chengren311: what if flow trigger is not found?
+ for (final TriggerInstance triggerInstance : unfinishedTriggerInstances) {
+ if (triggerInstance.getFlowTrigger() != null) {
+ recover(triggerInstance);
+ } else {
+ logger.info(String.format("cannot recover the trigger instance %s, flow trigger is null ",
+ triggerInstance.getId()));
+ }
+ }
+ }
+
+ private void addToRunningListAndScheduleKill(final TriggerInstance triggerInst, final
+ Duration durationBeforeKill, final CancellationCause cause) {
+ // if trigger instance is already done
+ if (!Status.isDone(triggerInst.getStatus())) {
+ this.runningTriggers.add(triggerInst);
+ scheduleKill(triggerInst, durationBeforeKill, cause);
+ }
+ }
+
+ private CancellationCause getCancelleationCause(final TriggerInstance triggerInst) {
+ final Set<CancellationCause> causes = triggerInst.getDepInstances().stream()
+ .map(DependencyInstance::getCancellationCause).collect(Collectors.toSet());
+
+ if (causes.contains(CancellationCause.FAILURE) || causes
+ .contains(CancellationCause.CASCADING)) {
+ return CancellationCause.CASCADING;
+ } else if (causes.contains(CancellationCause.TIMEOUT)) {
+ return CancellationCause.TIMEOUT;
+ } else if (causes.contains(CancellationCause.MANUAL)) {
+ return CancellationCause.MANUAL;
+ } else {
+ return CancellationCause.NONE;
+ }
+ }
+
+
+ private void cancelTriggerInstance(final TriggerInstance triggerInst) {
+ logger.debug("cancelling trigger instance of exec id" + triggerInst.getId());
+ final CancellationCause cause = getCancelleationCause(triggerInst);
+ for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+ if (depInst.getStatus() == Status.CANCELLING) {
+ depInst.getContext().cancel();
+ } else if (depInst.getStatus() == Status.RUNNING) {
+ // sometimes dependency instances of trigger instance in cancelling status can be running.
+ // e.x. dep inst1: failure, dep inst2: running -> trigger inst is in killing
+ this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
+ depInst.getContext().cancel();
+ }
+ }
+ }
+
+ private void addToRunningListAndCancel(final TriggerInstance triggerInst) {
+ this.runningTriggers.add(triggerInst);
+ cancelTriggerInstance(triggerInst);
+ }
+
+
+ private void updateDepInstStatus(final DependencyInstance depInst, final Status newStatus) {
+ depInst.setStatus(newStatus);
+ if (Status.isDone(depInst.getStatus())) {
+ depInst.setEndTime(new Date());
+ }
+ }
+
+ private void processStatusUpdate(final DependencyInstance depInst, final Status newStatus) {
+ logger.debug("process status update for " + depInst);
+ updateDepInstStatus(depInst, newStatus);
+ this.dependencyProcessor.processStatusUpdate(depInst);
+ }
+
+ private void processStatusAndCancelCauseUpdate(final DependencyInstance depInst, final Status
+ newStatus, final CancellationCause cause) {
+ depInst.setCancellationCause(cause);
+ updateDepInstStatus(depInst, newStatus);
+ this.dependencyProcessor.processStatusUpdate(depInst);
+ }
+
+
+ private long remainingTimeBeforeTimeout(final TriggerInstance triggerInst) {
+ final long now = System.currentTimeMillis();
+ return Math.max(0, triggerInst.getFlowTrigger().getMaxWaitDuration().toMillis() - (now -
+ triggerInst.getStartTime().getTime()));
+ }
+
+ /**
+ * Start the trigger. The method will be scheduled to invoke by azkaban scheduler.
+ */
+ public void startTrigger(final FlowTrigger flowTrigger, final String flowId,
+ final int flowVersion, final String submitUser, final Project project) {
+ this.executorService.submit(() -> {
+ final TriggerInstance triggerInst = createTriggerInstance(flowTrigger, flowId, flowVersion,
+ submitUser, project);
+
+ logger.info(String.format("Starting trigger instance[id: %s]", triggerInst.getId()));
+
+ this.triggerProcessor.processNewInstance(triggerInst);
+ if (triggerInst.getStatus() == Status.CANCELLED) {
+ // all dependency instances failed
+ logger.info(String.format("Trigger instance[id: %s] is cancelled since all dependency "
+ + "instances failed to be created",
+ triggerInst.getId()));
+ this.triggerProcessor.processTermination(triggerInst);
+ } else if (triggerInst.getStatus() == Status.CANCELLING) {
+ // some of the dependency instances failed
+ logger.info(
+ String.format("Trigger instance[id: %s] is being cancelled ", triggerInst.getId()));
+ addToRunningListAndCancel(triggerInst);
+ } else {
+ // todo chengren311: it's possible web server restarts before the db update, then
+ // new instance will not be recoverable from db.
+ logger.info(
+ String.format("Trigger instance[id: %s] is successfully created", triggerInst.getId()));
+ addToRunningListAndScheduleKill(triggerInst, triggerInst.getFlowTrigger()
+ .getMaxWaitDuration(), CancellationCause.TIMEOUT);
+ }
+ });
+ }
+
+ private FlowTriggerDependency getFlowTriggerDepByName(final FlowTrigger flowTrigger,
+ final String depName) {
+ return flowTrigger.getDependencies().stream().filter(ftd -> ftd.getName().equals(depName))
+ .findFirst().orElse(null);
+ }
+
+ public TriggerInstance findRunningTriggerInstById(final String triggerInstId) {
+ //todo chengren311: make the method single threaded
+ final Future<TriggerInstance> future = this.executorService.submit(
+ () -> this.runningTriggers.stream()
+ .filter(triggerInst -> triggerInst.getId().equals(triggerInstId)).findFirst()
+ .orElse(null)
+ );
+ try {
+ return future.get();
+ } catch (final Exception e) {
+ logger.error("exception when finding trigger instance by id" + triggerInstId, e);
+ return null;
+ }
+ }
+
+ private void removeRunningTriggerInstById(final String triggerInstId) {
+ for (final Iterator<TriggerInstance> it = this.runningTriggers.iterator(); it.hasNext(); ) {
+ if (triggerInstId.equals(it.next().getId())) {
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * Cancel a trigger instance
+ *
+ * @param triggerInst trigger instance to be cancelled
+ * @param cause cause of cancelling
+ */
+ public void cancel(final TriggerInstance triggerInst, final CancellationCause cause) {
+ this.executorService.submit(
+ () -> {
+ logger.info(
+ String.format("cancelling trigger instance with id %s", triggerInst.getId()));
+ if (triggerInst != null) {
+ for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+ // cancel only running dependencies, no need to cancel a killed/successful dependency
+ // instance
+ if (depInst.getStatus() == Status.RUNNING) {
+ this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
+ depInst.getContext().cancel();
+ }
+ }
+ } else {
+ logger.debug(String
+ .format("unable to cancel a trigger instance in non-running state with id %s",
+ triggerInst.getId()));
+ }
+ }
+ );
+ }
+
+ private DependencyInstance findDependencyInstanceByContext(
+ final DependencyInstanceContext context) {
+ return this.runningTriggers.stream()
+ .flatMap(triggerInst -> triggerInst.getDepInstances().stream()).filter(
+ depInst -> depInst.getContext() != null && depInst.getContext() == context)
+ .findFirst().orElse(null);
+ }
+
+ /**
+ * Mark the dependency instance context as success
+ */
+ public void markDependencySuccess(final DependencyInstanceContext context) {
+ this.executorService.submit(() -> {
+ final DependencyInstance depInst = findDependencyInstanceByContext(context);
+ if (depInst != null) {
+ logger.info(
+ String.format("setting dependency instance[id: %s, name: %s] status to success",
+ depInst.getTriggerInstance().getId(), depInst.getDepName()));
+
+ if (Status.isDone(depInst.getStatus())) {
+ logger.warn(String.format("OnSuccess of dependency instance[id: %s, name: %s] is ignored",
+ depInst.getTriggerInstance().getId(), depInst.getDepName()));
+ return;
+ }
+
+ processStatusUpdate(depInst, Status.SUCCEEDED);
+ // if associated trigger instance becomes success, then remove it from running list
+ if (depInst.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
+ logger.info(String.format("trigger instance with execId %s succeeded",
+ depInst.getTriggerInstance().getId()));
+ this.triggerProcessor.processSucceed(depInst.getTriggerInstance());
+ this.runningTriggers.remove(depInst.getTriggerInstance());
+ }
+ } else {
+ logger.debug(String.format("unable to find trigger instance with context %s when marking "
+ + "it success",
+ context));
+ }
+ });
+ }
+
+ private boolean cancelledByAzkaban(final DependencyInstance depInst) {
+ return depInst.getStatus() == Status.CANCELLING && (
+ depInst.getCancellationCause() == CancellationCause
+ .MANUAL || depInst.getCancellationCause() == CancellationCause.TIMEOUT || depInst
+ .getCancellationCause() == CancellationCause.CASCADING);
+ }
+
+ private boolean cancelledByDependencyPlugin(final DependencyInstance depInst) {
+ // When onKill is called by the dependency plugin not through flowTriggerService, we treat it
+ // as cancelled by dependency due to failure on dependency side. In this case, cancel cause
+ // remains unset.
+ return depInst.getStatus() == Status.CANCELLED && (depInst.getCancellationCause()
+ == CancellationCause.NONE);
+ }
+
+ public void markDependencyCancelled(final DependencyInstanceContext context) {
+ this.executorService.submit(() -> {
+ final DependencyInstance depInst = findDependencyInstanceByContext(context);
+ if (depInst != null) {
+ logger.info(
+ String.format("setting dependency instance[id: %s, name: %s] status to cancelled",
+ depInst.getTriggerInstance().getId(), depInst.getDepName()));
+ if (cancelledByDependencyPlugin(depInst)) {
+ processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, CancellationCause.FAILURE);
+ cancelTriggerInstance(depInst.getTriggerInstance());
+ } else if (cancelledByAzkaban(depInst)) {
+ processStatusUpdate(depInst, Status.CANCELLED);
+ } else {
+ logger.warn(String.format("OnCancel of dependency instance[id: %s, name: %s] is ignored",
+ depInst.getTriggerInstance().getId(), depInst.getDepName()));
+ return;
+ }
+
+ if (depInst.getTriggerInstance().getStatus() == Status.CANCELLED) {
+ logger.info(
+ String.format("trigger instance with execId %s is cancelled",
+ depInst.getTriggerInstance().getId()));
+ this.triggerProcessor.processTermination(depInst.getTriggerInstance());
+ removeRunningTriggerInstById(depInst.getTriggerInstance().getId());
+ }
+ } else {
+ logger.warn(String.format("unable to find trigger instance with context %s when marking "
+ + "it cancelled", context));
+ }
+ });
+ }
+
+ /**
+ * Shuts down the service immediately.
+ */
+ public void shutdown() {
+ this.executorService.shutdown(); // Disable new tasks from being submitted
+ this.executorService.shutdownNow(); // Cancel currently executing tasks
+ this.triggerPluginManager.shutdown();
+ }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
new file mode 100644
index 0000000..f0195a7
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import azkaban.executor.ExecutorManager;
+import azkaban.flow.Flow;
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
+import azkaban.flowtrigger.testplugin.TestDependencyCheck;
+import azkaban.flowtrigger.util.TestUtil;
+import azkaban.project.FlowTrigger;
+import azkaban.project.FlowTriggerDependency;
+import azkaban.project.Project;
+import azkaban.utils.Emailer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+
+public class FlowTriggerServiceTest {
+
+ private static final FlowTriggerInstanceLoader flowTriggerInstanceLoader = new
+ MockFlowTriggerInstanceLoader();
+ private static TestDependencyCheck testDepCheck;
+ private static FlowTriggerService flowTriggerService;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ testDepCheck = new TestDependencyCheck();
+ final FlowTriggerDependencyPluginManager pluginManager = mock(FlowTriggerDependencyPluginManager
+ .class);
+ when(pluginManager.getDependencyCheck(ArgumentMatchers.eq("TestDependencyCheck")))
+ .thenReturn(testDepCheck);
+
+ final ExecutorManager executorManager = mock(ExecutorManager.class);
+ when(executorManager.submitExecutableFlow(any(), anyString())).thenReturn("return");
+
+ final Emailer emailer = mock(Emailer.class);
+ Mockito.doNothing().when(emailer).sendEmail(any(), anyString(), anyString());
+
+ final TriggerInstanceProcessor triggerInstProcessor = new TriggerInstanceProcessor(
+ executorManager,
+ flowTriggerInstanceLoader, emailer);
+ final DependencyInstanceProcessor depInstProcessor = new DependencyInstanceProcessor
+ (flowTriggerInstanceLoader);
+
+ flowTriggerService = new FlowTriggerService(pluginManager,
+ triggerInstProcessor, depInstProcessor, flowTriggerInstanceLoader);
+ }
+
+ @Before
+ public void cleanup() {
+ ((MockFlowTriggerInstanceLoader) flowTriggerInstanceLoader).clear();
+ }
+
+ private Project createProject() {
+ final Project project = new Project(1, "project1");
+ project.setVersion(1);
+ final Flow flow = new Flow("testflow");
+ final Map<String, Flow> flowMap = new HashMap<>();
+ flowMap.put("testflow", flow);
+ project.setFlows(flowMap);
+ return project;
+ }
+
+ @Test
+ public void testStartTriggerCancelledByTimeout() throws InterruptedException {
+
+ final List<FlowTriggerDependency> deps = new ArrayList<>();
+ deps.add(TestUtil.createTestDependency("2secs", 2, false));
+ deps.add(TestUtil.createTestDependency("8secs", 8, false));
+ deps.add(TestUtil.createTestDependency("9secs", 9, false));
+ final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(5));
+ for (int i = 0; i < 30; i++) {
+ flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+ }
+ Thread.sleep(Duration.ofSeconds(6).toMillis());
+ final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
+ assertThat(triggerInstances).hasSize(30);
+ for (final TriggerInstance inst : triggerInstances) {
+ assertThat(inst.getStatus()).isEqualTo(Status.CANCELLED);
+ for (final DependencyInstance depInst : inst.getDepInstances()) {
+ if (depInst.getDepName().equals("2secs")) {
+ assertThat(depInst.getStatus()).isEqualTo(Status.SUCCEEDED);
+ } else if (depInst.getDepName().equals("8secs")) {
+ assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
+ assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.TIMEOUT);
+ } else if (depInst.getDepName().equals("9secs")) {
+ assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
+ assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.TIMEOUT);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testStartTriggerCancelledManually() throws InterruptedException {
+ final List<FlowTriggerDependency> deps = new ArrayList<>();
+ deps.add(TestUtil.createTestDependency("2secs", 2, false));
+ deps.add(TestUtil.createTestDependency("8secs", 8, false));
+ deps.add(TestUtil.createTestDependency("9secs", 9, false));
+ final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(5));
+ for (int i = 0; i < 30; i++) {
+ flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+ }
+
+ Thread.sleep(Duration.ofMillis(500).toMillis());
+ for (final TriggerInstance runningTrigger : flowTriggerService.getRunningTriggers()) {
+ flowTriggerService.cancel(runningTrigger, CancellationCause.MANUAL);
+ }
+ Thread.sleep(Duration.ofMillis(500).toMillis());
+ final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
+ assertThat(triggerInstances).hasSize(30);
+ for (final TriggerInstance inst : triggerInstances) {
+ assertThat(inst.getStatus()).isEqualTo(Status.CANCELLED);
+ for (final DependencyInstance depInst : inst.getDepInstances()) {
+ assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
+ assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.MANUAL);
+ }
+ }
+ }
+
+ @Test
+ public void testStartTriggerCancelledByFailure() throws InterruptedException {
+ final List<FlowTriggerDependency> deps = new ArrayList<>();
+ deps.add(TestUtil.createTestDependency("2secs", 2, true));
+ deps.add(TestUtil.createTestDependency("8secs", 8, false));
+ deps.add(TestUtil.createTestDependency("9secs", 9, false));
+ final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
+ for (int i = 0; i < 30; i++) {
+ flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+ }
+ Thread.sleep(Duration.ofSeconds(1).toMillis());
+ final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
+ assertThat(triggerInstances).hasSize(30);
+ for (final TriggerInstance inst : triggerInstances) {
+ assertThat(inst.getStatus()).isEqualTo(Status.CANCELLED);
+ for (final DependencyInstance depInst : inst.getDepInstances()) {
+ if (depInst.getDepName().equals("2secs")) {
+ assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
+ assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.FAILURE);
+ } else {
+ assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
+ assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.CASCADING);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testStartTriggerSuccess() throws InterruptedException {
+ final List<FlowTriggerDependency> deps = new ArrayList<>();
+ deps.add(TestUtil.createTestDependency("2secs", 2, false));
+ deps.add(TestUtil.createTestDependency("3secs", 3, false));
+ deps.add(TestUtil.createTestDependency("4secs", 4, false));
+ final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
+ for (int i = 0; i < 30; i++) {
+ flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+ }
+ Thread.sleep(Duration.ofSeconds(5).toMillis());
+ final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
+ assertThat(triggerInstances).hasSize(30);
+ for (final TriggerInstance inst : triggerInstances) {
+ assertThat(inst.getStatus()).isEqualTo(Status.SUCCEEDED);
+ }
+ }
+
+ @Test
+ public void testRecovery() throws Exception {
+ final List<FlowTriggerDependency> deps = new ArrayList<>();
+ deps.add(TestUtil.createTestDependency("2secs", 2, false));
+ deps.add(TestUtil.createTestDependency("3secs", 3, false));
+ deps.add(TestUtil.createTestDependency("4secs", 4, false));
+ final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
+ for (int i = 0; i < 30; i++) {
+ flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+ }
+ Thread.sleep(Duration.ofSeconds(1).toMillis());
+ flowTriggerService.shutdown();
+ setup();
+ flowTriggerService.recoverIncompleteTriggerInstances();
+ Thread.sleep(Duration.ofSeconds(5).toMillis());
+ final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
+ assertThat(triggerInstances).hasSize(30);
+ for (final TriggerInstance inst : triggerInstances) {
+ assertThat(inst.getStatus()).isEqualTo(Status.SUCCEEDED);
+ }
+ }
+}
+
+
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
new file mode 100644
index 0000000..d8c2999
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/MockFlowTriggerInstanceLoader.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class MockFlowTriggerInstanceLoader implements FlowTriggerInstanceLoader {
+
+ private final List<TriggerInstance> triggerInstances = Collections.synchronizedList(new
+ ArrayList<TriggerInstance>());
+
+ public void clear() {
+ this.triggerInstances.clear();
+ }
+
+ @Override
+ public void uploadTriggerInstance(final TriggerInstance triggerInstance) {
+ this.triggerInstances.add(triggerInstance);
+ }
+
+ @Override
+ public void updateDependencyExecutionStatus(final DependencyInstance depInst) {
+ for (final TriggerInstance inst : this.triggerInstances) {
+ if (inst.getId().equals(depInst.getTriggerInstance().getId())) {
+ for (final DependencyInstance dep : inst.getDepInstances()) {
+ if (dep.getDepName().equals(depInst.getDepName())) {
+ dep.setEndTime(depInst.getEndTime());
+ dep.setStatus(depInst.getStatus());
+ dep.setCancellationCause(depInst.getCancellationCause());
+ break;
+ }
+ }
+ break;
+ }
+ }
+ }
+
+ @Override
+ public Collection<TriggerInstance> getIncompleteTriggerInstances() {
+ final List<TriggerInstance> res = new ArrayList<>();
+ for (final TriggerInstance inst : this.triggerInstances) {
+ if (inst.getStatus() == Status.CANCELLING || inst.getStatus() == Status.RUNNING) {
+ res.add(inst);
+ }
+ }
+ return res;
+ }
+
+ @Override
+ public void updateAssociatedFlowExecId(final TriggerInstance triggerInst) {
+ for (final TriggerInstance inst : this.triggerInstances) {
+ if (triggerInst.getId().equals(triggerInst.getId())) {
+ inst.setFlowExecId(triggerInst.getFlowExecId());
+ break;
+ }
+ }
+ }
+
+ @Override
+ public Collection<TriggerInstance> getRecentlyFinished(final int limit) {
+ final List<TriggerInstance> res = new ArrayList<>();
+ for (final TriggerInstance inst : this.triggerInstances) {
+ if (Status.isDone(inst.getStatus())) {
+ res.add(inst);
+ }
+ }
+ return res;
+ }
+
+ @Override
+ public TriggerInstance getTriggerInstanceById(final String triggerInstanceId) {
+ for (final TriggerInstance inst : this.triggerInstances) {
+ if (inst.getId().equals(triggerInstanceId)) {
+ return inst;
+ }
+ }
+ return null;
+ }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java
index d3f4784..8e43c1e 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyCheck.java
@@ -25,13 +25,19 @@ import azkaban.flowtrigger.DependencyPluginConfig;
public class TestDependencyCheck implements DependencyCheck {
+ public static final String FAILURE_FLAG = "failure";
+ public static final String RUN_TIME = "runtime";
private DependencyPluginConfig config;
@Override
public DependencyInstanceContext run(final DependencyInstanceConfig config,
final DependencyInstanceRuntimeProps runtimeProps,
final DependencyInstanceCallback callback) {
- return new TestDependencyInstanceContext(config, runtimeProps, callback);
+ if (config.get(FAILURE_FLAG).equals("true")) {
+ throw new RuntimeException("dependency instance creation failure");
+ } else {
+ return new TestDependencyInstanceContext(config, runtimeProps, callback);
+ }
}
@Override
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java
index 638c421..6b267a2 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/testplugin/TestDependencyInstanceContext.java
@@ -20,15 +20,32 @@ import azkaban.flowtrigger.DependencyInstanceCallback;
import azkaban.flowtrigger.DependencyInstanceConfig;
import azkaban.flowtrigger.DependencyInstanceContext;
import azkaban.flowtrigger.DependencyInstanceRuntimeProps;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+@SuppressWarnings("FutureReturnValueIgnored")
public class TestDependencyInstanceContext implements DependencyInstanceContext {
+ private static final ScheduledExecutorService scheduleSerivce = Executors
+ .newScheduledThreadPool(1);
+
+ private final DependencyInstanceCallback callback;
+
public TestDependencyInstanceContext(final DependencyInstanceConfig config,
final DependencyInstanceRuntimeProps runtimeProps,
final DependencyInstanceCallback callback) {
+ final long expectedRunTime = Long.valueOf(config.get("runtime"));
+ this.callback = callback;
+ scheduleSerivce.schedule(this::onSuccess, expectedRunTime, TimeUnit.SECONDS);
+ }
+
+ private void onSuccess() {
+ this.callback.onSuccess(this);
}
@Override
public void cancel() {
+ this.callback.onCancel(this);
}
}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/util/TestUtil.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/util/TestUtil.java
new file mode 100644
index 0000000..a34931d
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/util/TestUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.util;
+
+import azkaban.flowtrigger.testplugin.TestDependencyCheck;
+import azkaban.project.CronSchedule;
+import azkaban.project.FlowTrigger;
+import azkaban.project.FlowTriggerDependency;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestUtil {
+
+ public static FlowTriggerDependency createTestDependency(final String name, final long
+ runtimeInSec, final boolean boundToFail) {
+ final Map<String, String> props = new HashMap<>();
+ props.put(TestDependencyCheck.RUN_TIME, String.valueOf(runtimeInSec));
+ props.put(TestDependencyCheck.FAILURE_FLAG, String.valueOf(boundToFail));
+ return new FlowTriggerDependency(name, "TestDependencyCheck", props);
+ }
+
+ public static FlowTrigger createTestFlowTrigger(final List<FlowTriggerDependency> deps,
+ final Duration maxWaitDuration) {
+ return new FlowTrigger(
+ new CronSchedule("* * * * ? *"), deps, maxWaitDuration);
+ }
+}