diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index 2b43657..a1cc031 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -71,14 +71,17 @@ import org.slf4j.LoggerFactory;
@Singleton
public class FlowTriggerService {
+ private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
+
private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 50;
private static final int CANCEL_EXECUTOR_POOL_SIZE = 32;
- private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
- private final ExecutorService singleThreadExecutorService;
- private final ExecutorService multiThreadsExecutorService;
- private final List<TriggerInstance> runningTriggers;
+ private static final int TIMEOUT_EXECUTOR_POOL_SIZE = 8;
+
+ private final ExecutorService flowTriggerExecutorService;
+ private final ExecutorService cancelExecutorService;
private final ScheduledExecutorService timeoutService;
+ private final List<TriggerInstance> runningTriggers;
private final FlowTriggerDependencyPluginManager triggerPluginManager;
private final TriggerInstanceProcessor triggerProcessor;
private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
@@ -91,12 +94,14 @@ public class FlowTriggerService {
dependencyProcessor, final FlowTriggerInstanceLoader flowTriggerInstanceLoader,
final FlowTriggerExecutionCleaner cleaner) {
// Give the thread a name to make debugging easier.
- final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("FlowTrigger-service").build();
- this.singleThreadExecutorService = Executors.newSingleThreadExecutor(namedThreadFactory);
- this.multiThreadsExecutorService = Executors
+ this.flowTriggerExecutorService = Executors.newSingleThreadExecutor(namedThreadFactory);
+ namedThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("FlowTrigger-cancel").build();
+ this.cancelExecutorService = Executors
.newFixedThreadPool(CANCEL_EXECUTOR_POOL_SIZE, namedThreadFactory);
- this.timeoutService = Executors.newScheduledThreadPool(8);
+ this.timeoutService = Executors.newScheduledThreadPool(TIMEOUT_EXECUTOR_POOL_SIZE);
this.runningTriggers = new ArrayList<>();
this.triggerPluginManager = pluginManager;
this.triggerProcessor = triggerProcessor;
@@ -171,7 +176,7 @@ public class FlowTriggerService {
.debug(String.format("Cancel trigger instance %s in %s secs", triggerInst.getId(), duration
.getSeconds()));
this.timeoutService.schedule(() -> {
- cancel(triggerInst, cause);
+ cancelTriggerInstance(triggerInst, cause);
}, duration.toMillis(), TimeUnit.MILLISECONDS);
}
@@ -236,18 +241,20 @@ public class FlowTriggerService {
}
}
+ private void recoverTriggerInstance(final TriggerInstance triggerInstance) {
+ this.flowTriggerExecutorService.submit(() -> recover(triggerInstance));
+ }
+
private void recover(final TriggerInstance triggerInstance) {
- this.singleThreadExecutorService.submit(() -> {
- logger.info(String.format("recovering pending trigger instance %s", triggerInstance.getId
- ()));
- if (isDoneButFlowNotExecuted(triggerInstance)) {
- // if trigger instance succeeds but the associated flow hasn't been started yet, then start
- // the flow
- this.triggerProcessor.processSucceed(triggerInstance);
- } else {
- recoverRunningOrCancelling(triggerInstance);
- }
- });
+ logger.info(String.format("recovering pending trigger instance %s", triggerInstance.getId
+ ()));
+ if (isDoneButFlowNotExecuted(triggerInstance)) {
+ // if trigger instance succeeds but the associated flow hasn't been started yet, then start
+ // the flow
+ this.triggerProcessor.processSucceed(triggerInstance);
+ } else {
+ recoverRunningOrCancelling(triggerInstance);
+ }
}
/**
@@ -258,7 +265,7 @@ public class FlowTriggerService {
.getIncompleteTriggerInstances();
for (final TriggerInstance triggerInstance : unfinishedTriggerInstances) {
if (triggerInstance.getFlowTrigger() != null) {
- recover(triggerInstance);
+ recoverTriggerInstance(triggerInstance);
} else {
logger.error(String.format("cannot recover the trigger instance %s, flow trigger is null,"
+ " cancelling it ", triggerInstance.getId()));
@@ -368,49 +375,42 @@ public class FlowTriggerService {
*/
public void startTrigger(final FlowTrigger flowTrigger, final String flowId,
final int flowVersion, final String submitUser, final Project project) {
- this.singleThreadExecutorService.submit(() -> {
- final TriggerInstance triggerInst = createTriggerInstance(flowTrigger, flowId, flowVersion,
- submitUser, project);
-
+ final TriggerInstance triggerInst = createTriggerInstance(flowTrigger, flowId, flowVersion,
+ submitUser, project);
+ this.flowTriggerExecutorService.submit(() -> {
logger.info(
String.format("Starting the flow trigger %s[trigger instance id: %s] by %s", flowTrigger,
triggerInst.getId(), submitUser));
-
- this.triggerProcessor.processNewInstance(triggerInst);
- if (triggerInst.getStatus() == Status.CANCELLED) {
- // all dependency instances failed
- logger.info(String.format("Trigger instance[id: %s] is cancelled since all dependency "
- + "instances fail to be created", triggerInst.getId()));
- this.triggerProcessor.processTermination(triggerInst);
- } else if (triggerInst.getStatus() == Status.CANCELLING) {
- // some of the dependency instances failed
- logger.info(
- String.format("Trigger instance[id: %s] is being cancelled since some dependency "
- + "instances fail to be created", triggerInst.getId()));
- addToRunningListAndCancel(triggerInst);
- } else if (triggerInst.getStatus() == Status.SUCCEEDED) {
- this.triggerProcessor.processSucceed(triggerInst);
- } else {
- // todo chengren311: it's possible web server restarts before the db update, then
- // new instance will not be recoverable from db.
- addToRunningListAndScheduleKill(triggerInst, triggerInst.getFlowTrigger()
- .getMaxWaitDuration(), CancellationCause.TIMEOUT);
- }
+ start(triggerInst);
});
}
- private FlowTriggerDependency getFlowTriggerDepByName(final FlowTrigger flowTrigger,
- final String depName) {
- return flowTrigger.getDependencies().stream().filter(ftd -> ftd.getName().equals(depName))
- .findFirst().orElse(null);
+ private void start(final TriggerInstance triggerInst) {
+ this.triggerProcessor.processNewInstance(triggerInst);
+ if (triggerInst.getStatus() == Status.CANCELLED) {
+ // all dependency instances failed
+ logger.info(String.format("Trigger instance[id: %s] is cancelled since all dependency "
+ + "instances fail to be created", triggerInst.getId()));
+ this.triggerProcessor.processTermination(triggerInst);
+ } else if (triggerInst.getStatus() == Status.CANCELLING) {
+ // some of the dependency instances failed
+ logger.info(
+ String.format("Trigger instance[id: %s] is being cancelled since some dependency "
+ + "instances fail to be created", triggerInst.getId()));
+ addToRunningListAndCancel(triggerInst);
+ } else if (triggerInst.getStatus() == Status.SUCCEEDED) {
+ this.triggerProcessor.processSucceed(triggerInst);
+ } else {
+ // todo chengren311: it's possible web server restarts before the db update, then
+ // new instance will not be recoverable from db.
+ addToRunningListAndScheduleKill(triggerInst, triggerInst.getFlowTrigger()
+ .getMaxWaitDuration(), CancellationCause.TIMEOUT);
+ }
}
public TriggerInstance findRunningTriggerInstById(final String triggerInstId) {
- //todo chengren311: make the method single threaded
- final Future<TriggerInstance> future = this.singleThreadExecutorService.submit(
- () -> this.runningTriggers.stream()
- .filter(triggerInst -> triggerInst.getId().equals(triggerInstId)).findFirst()
- .orElse(null)
+ final Future<TriggerInstance> future = this.flowTriggerExecutorService.submit(
+ () -> getTriggerInstanceById(triggerInstId)
);
try {
return future.get();
@@ -420,8 +420,14 @@ public class FlowTriggerService {
}
}
+ private TriggerInstance getTriggerInstanceById(final String triggerInstId) {
+ return this.runningTriggers.stream()
+ .filter(triggerInst -> triggerInst.getId().equals(triggerInstId)).findFirst()
+ .orElse(null);
+ }
+
private void cancelContextAsync(final DependencyInstanceContext context) {
- this.multiThreadsExecutorService.submit(() -> context.cancel());
+ this.cancelExecutorService.submit(() -> context.cancel());
}
/**
@@ -430,27 +436,29 @@ public class FlowTriggerService {
* @param triggerInst trigger instance to be cancelled
* @param cause cause of cancelling
*/
- public void cancel(final TriggerInstance triggerInst, final CancellationCause cause) {
- this.singleThreadExecutorService.submit(
- () -> {
- logger.info(
- String.format("cancelling trigger instance with id %s", triggerInst.getId()));
- if (triggerInst != null) {
- for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
- // cancel only running dependencies, no need to cancel a killed/successful dependency
- // instance
- if (depInst.getStatus() == Status.RUNNING) {
- this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
- cancelContextAsync(depInst.getContext());
- }
- }
- } else {
- logger.debug(String
- .format("unable to cancel a trigger instance in non-running state with id %s",
- triggerInst.getId()));
- }
+ public void cancelTriggerInstance(final TriggerInstance triggerInst,
+ final CancellationCause cause) {
+ this.flowTriggerExecutorService.submit(() -> cancel(triggerInst, cause));
+ }
+
+ private void cancel(final TriggerInstance triggerInst, final CancellationCause cause) {
+ logger.info(
+ String.format("cancelling trigger instance with id %s", triggerInst.getId()));
+ if (triggerInst != null) {
+ for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+ // cancel running dependencies only, no need to cancel a killed/successful dependency
+ // instance
+ if (depInst.getStatus() == Status.RUNNING) {
+ this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
+ cancelContextAsync(depInst.getContext());
}
- );
+ }
+ } else {
+ logger.debug(String
+ .format(
+ "unable to cancel a trigger instance in non-running state with id %s",
+ triggerInst.getId()));
+ }
}
private DependencyInstance findDependencyInstanceByContext(
@@ -465,34 +473,36 @@ public class FlowTriggerService {
* Mark the dependency instance context as success
*/
public void markDependencySuccess(final DependencyInstanceContext context) {
- this.singleThreadExecutorService.submit(() -> {
- final DependencyInstance depInst = findDependencyInstanceByContext(context);
- if (depInst != null) {
- if (Status.isDone(depInst.getStatus())) {
- logger.warn(String.format("OnSuccess of dependency instance[id: %s, name: %s] is ignored",
- depInst.getTriggerInstance().getId(), depInst.getDepName()));
- return;
- }
+ this.flowTriggerExecutorService.submit(() -> markSuccess(context));
+ }
- logger.info(
- String.format("setting dependency instance[id: %s, name: %s] status to succeeded",
- depInst.getTriggerInstance().getId(), depInst.getDepName()));
- // if the status transits from cancelling to succeeded, then cancellation cause was set,
- // we need to unset cancellation cause.
- this.processStatusAndCancelCauseUpdate(depInst, Status.SUCCEEDED, CancellationCause.NONE);
- // if associated trigger instance becomes success, then remove it from running list
- if (depInst.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
- logger.info(String.format("trigger instance[id: %s] succeeded",
- depInst.getTriggerInstance().getId()));
- this.triggerProcessor.processSucceed(depInst.getTriggerInstance());
- this.runningTriggers.remove(depInst.getTriggerInstance());
- }
- } else {
- logger.debug(String.format("unable to find trigger instance with context %s when marking "
- + "it success",
- context));
+ private void markSuccess(final DependencyInstanceContext context) {
+ final DependencyInstance depInst = findDependencyInstanceByContext(context);
+ if (depInst != null) {
+ if (Status.isDone(depInst.getStatus())) {
+ logger.warn(String.format("OnSuccess of dependency instance[id: %s, name: %s] is ignored",
+ depInst.getTriggerInstance().getId(), depInst.getDepName()));
+ return;
}
- });
+
+ logger.info(
+ String.format("setting dependency instance[id: %s, name: %s] status to succeeded",
+ depInst.getTriggerInstance().getId(), depInst.getDepName()));
+ // if the status transits from cancelling to succeeded, then cancellation cause was set,
+ // we need to unset cancellation cause.
+ this.processStatusAndCancelCauseUpdate(depInst, Status.SUCCEEDED, CancellationCause.NONE);
+ // if associated trigger instance becomes success, then remove it from running list
+ if (depInst.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
+ logger.info(String.format("trigger instance[id: %s] succeeded",
+ depInst.getTriggerInstance().getId()));
+ this.triggerProcessor.processSucceed(depInst.getTriggerInstance());
+ this.runningTriggers.remove(depInst.getTriggerInstance());
+ }
+ } else {
+ logger.debug(String.format("unable to find trigger instance with context %s when marking "
+ + "it success",
+ context));
+ }
}
private boolean cancelledByAzkaban(final DependencyInstance depInst) {
@@ -507,44 +517,51 @@ public class FlowTriggerService {
}
public void markDependencyCancelled(final DependencyInstanceContext context) {
- this.singleThreadExecutorService.submit(() -> {
- final DependencyInstance depInst = findDependencyInstanceByContext(context);
- if (depInst != null) {
- logger.info(String.format("set dependency instance[id: %s, name: %s] status to "
- + "cancelled", depInst.getTriggerInstance().getId(), depInst.getDepName()));
- if (cancelledByDependencyPlugin(depInst)) {
- processStatusAndCancelCauseUpdate(depInst, Status.CANCELLED, CancellationCause.FAILURE);
- cancelTriggerInstance(depInst.getTriggerInstance());
- } else if (cancelledByAzkaban(depInst)) {
- processStatusUpdate(depInst, Status.CANCELLED);
- } else {
- logger.warn(String.format("OnCancel of dependency instance[id: %s, name: %s] is ignored",
- depInst.getTriggerInstance().getId(), depInst.getDepName()));
- return;
- }
+ this.flowTriggerExecutorService.submit(() -> {
+ markCancelled(context);
+ });
+ }
- if (depInst.getTriggerInstance().getStatus() == Status.CANCELLED) {
- logger.info(
- String.format("trigger instance with execId %s is cancelled",
- depInst.getTriggerInstance().getId()));
- this.triggerProcessor.processTermination(depInst.getTriggerInstance());
- this.runningTriggers.remove(depInst.getTriggerInstance());
- }
+ private void markCancelled(final DependencyInstanceContext context) {
+ final DependencyInstance depInst = findDependencyInstanceByContext(context);
+ if (depInst != null) {
+ logger.info(String.format("set dependency instance[id: %s, name: %s] status to "
+ + "cancelled", depInst.getTriggerInstance().getId(), depInst.getDepName()));
+ if (cancelledByDependencyPlugin(depInst)) {
+ processStatusAndCancelCauseUpdate(depInst, Status.CANCELLED, CancellationCause.FAILURE);
+ cancelTriggerInstance(depInst.getTriggerInstance());
+ } else if (cancelledByAzkaban(depInst)) {
+ processStatusUpdate(depInst, Status.CANCELLED);
} else {
- logger.warn(String.format("unable to find trigger instance with context %s when marking "
- + "it cancelled", context));
+ logger.warn(String.format("OnCancel of dependency instance[id: %s, name: %s] is ignored",
+ depInst.getTriggerInstance().getId(), depInst.getDepName()));
+ return;
}
- });
+
+ if (depInst.getTriggerInstance().getStatus() == Status.CANCELLED) {
+ logger.info(
+ String.format("trigger instance with execId %s is cancelled",
+ depInst.getTriggerInstance().getId()));
+ this.triggerProcessor.processTermination(depInst.getTriggerInstance());
+ this.runningTriggers.remove(depInst.getTriggerInstance());
+ }
+ } else {
+ logger.warn(String.format("unable to find trigger instance with context %s when marking "
+ + "it cancelled", context));
+ }
}
/**
* Shuts down the service immediately.
*/
public void shutdown() {
- this.singleThreadExecutorService.shutdown(); // Disable new tasks from being submitted
- this.singleThreadExecutorService.shutdownNow(); // Cancel currently executing tasks
- this.multiThreadsExecutorService.shutdown();
- this.multiThreadsExecutorService.shutdownNow();
+ this.flowTriggerExecutorService.shutdown();
+ this.cancelExecutorService.shutdown();
+ this.timeoutService.shutdown();
+
+ this.flowTriggerExecutorService.shutdownNow();
+ this.cancelExecutorService.shutdownNow();
+ this.timeoutService.shutdownNow();
this.triggerProcessor.shutdown();
this.triggerPluginManager.shutdown();
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
index 22adff9..9d0133a 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
@@ -118,11 +118,11 @@ public class FlowTriggerInstanceServlet extends LoginAbstractAzkabanServlet {
final Project project = this.projectManager.getProject(projectName);
if (project == null) {
ret.put("error", "please specify a valid project name");
- }
- else if (!hasPermission(project, session.getUser(), Type.READ)) {
+ } else if (!hasPermission(project, session.getUser(), Type.READ)) {
ret.put("error", "Permission denied. Need READ access.");
+ } else {
+ ajaxFetchTriggerInstances(project.getId(), flowId, ret, req);
}
- else ajaxFetchTriggerInstances(project.getId(), flowId, ret, req);
} else {
ret.put("error", "please specify project id and flow id");
}
@@ -230,7 +230,7 @@ public class FlowTriggerInstanceServlet extends LoginAbstractAzkabanServlet {
.findRunningTriggerInstById(triggerInstanceId);
if (triggerInst != null) {
if (hasPermission(triggerInst.getProject(), session.getUser(), Type.EXECUTE)) {
- this.triggerService.cancel(triggerInst, CancellationCause.MANUAL);
+ this.triggerService.cancelTriggerInstance(triggerInst, CancellationCause.MANUAL);
} else {
ret.put("error", "no permission to kill the trigger");
}