azkaban-aplcache
Changes
azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java 14(+9 -5)
Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 3ee3af4..eb5d492 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -84,6 +84,10 @@ public class Constants {
// The flow exec id for a flow trigger instance which hasn't started a flow yet
public static final int UNASSIGNED_EXEC_ID = -1;
+ // The flow exec id for a flow trigger instance unable to trigger a flow yet
+ public static final int FAILED_EXEC_ID = -2;
+
+
public static class ConfigurationKeys {
// Configures Azkaban Flow Version in project YAML file
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 342e61e..64d728e 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -95,9 +95,12 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
Constants.UNASSIGNED_EXEC_ID);
private static final String SELECT_ALL_RUNNING_EXECUTIONS =
- String.format("SELECT %s FROM %s WHERE dep_status = %s or dep_status = %s",
+ String.format(
+ "SELECT %s FROM %s WHERE trigger_instance_id in (SELECT trigger_instance_id FROM %s "
+ + "WHERE dep_status = %s or dep_status = %s)",
StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
DEPENDENCY_EXECUTION_TABLE,
+ DEPENDENCY_EXECUTION_TABLE,
Status.RUNNING.ordinal(), Status.CANCELLING.ordinal());
private static final String SELECT_RECENTLY_FINISHED = String.format(
@@ -105,13 +108,14 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
+ "cancelleation_cause,project_id,"
+ "project_version,flow_id,flow_version, flow_exec_id \n"
+ "FROM execution_dependencies JOIN (\n"
- + "SELECT distinct(trigger_instance_id), max(endtime) FROM execution_dependencies "
- + "WHERE dep_status = %s or dep_status = %s\n"
+ + "SELECT trigger_instance_id FROM execution_dependencies where "
+ + "trigger_instance_id not in (SELECT distinct(trigger_instance_id) FROM "
+ + "execution_dependencies WHERE dep_status = %s or dep_status = %s)\n"
+ "GROUP BY trigger_instance_id ORDER BY max(endtime) DESC \n"
+ " limit %%s ) temp on execution_dependencies"
+ ".trigger_instance_id in (temp.trigger_instance_id);",
- Status.SUCCEEDED.ordinal(),
- Status.CANCELLED.ordinal());
+ Status.RUNNING.ordinal(),
+ Status.CANCELLING.ordinal());
private static final String UPDATE_DEPENDENCY_FLOW_EXEC_ID = String.format("UPDATE %s SET "
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index f7f17f6..de98efb 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -73,8 +73,10 @@ public class FlowTriggerService {
private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
+ private static final int CANCEL_EXECUTOR_POOL_SIZE = 32;
private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
- private final ExecutorService executorService;
+ private final ExecutorService singleThreadExecutorService;
+ private final ExecutorService multiThreadsExecutorService;
private final List<TriggerInstance> runningTriggers;
private final ScheduledExecutorService timeoutService;
private final FlowTriggerDependencyPluginManager triggerPluginManager;
@@ -89,7 +91,9 @@ public class FlowTriggerService {
// Give the thread a name to make debugging easier.
final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("FlowTrigger-service").build();
- this.executorService = Executors.newSingleThreadExecutor(namedThreadFactory);
+ this.singleThreadExecutorService = Executors.newSingleThreadExecutor(namedThreadFactory);
+ this.multiThreadsExecutorService = Executors
+ .newFixedThreadPool(CANCEL_EXECUTOR_POOL_SIZE, namedThreadFactory);
this.timeoutService = Executors.newScheduledThreadPool(8);
this.runningTriggers = new ArrayList<>();
this.triggerPluginManager = pluginManager;
@@ -225,11 +229,11 @@ public class FlowTriggerService {
}
private void recover(final TriggerInstance triggerInstance) {
- this.executorService.submit(() -> {
+ this.singleThreadExecutorService.submit(() -> {
logger.info(String.format("recovering pending trigger instance %s", triggerInstance.getId
()));
if (isDoneButFlowNotExecuted(triggerInstance)) {
- // if trigger instance succeeds but the associated flow hasn't been started, then start
+ // if trigger instance succeeds but the associated flow hasn't been started yet, then start
// the flow
this.triggerProcessor.processSucceed(triggerInstance);
} else {
@@ -286,12 +290,12 @@ public class FlowTriggerService {
final CancellationCause cause = getCancelleationCause(triggerInst);
for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
if (depInst.getStatus() == Status.CANCELLING) {
- depInst.getContext().cancel();
+ cancelContextAsync(depInst.getContext());
} else if (depInst.getStatus() == Status.RUNNING) {
// sometimes dependency instances of trigger instance in cancelling status can be running.
// e.x. dep inst1: failure, dep inst2: running -> trigger inst is in killing
this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
- depInst.getContext().cancel();
+ cancelContextAsync(depInst.getContext());
}
}
}
@@ -335,7 +339,7 @@ public class FlowTriggerService {
*/
public void startTrigger(final FlowTrigger flowTrigger, final String flowId,
final int flowVersion, final String submitUser, final Project project) {
- this.executorService.submit(() -> {
+ this.singleThreadExecutorService.submit(() -> {
final TriggerInstance triggerInst = createTriggerInstance(flowTrigger, flowId, flowVersion,
submitUser, project);
@@ -374,7 +378,7 @@ public class FlowTriggerService {
public TriggerInstance findRunningTriggerInstById(final String triggerInstId) {
//todo chengren311: make the method single threaded
- final Future<TriggerInstance> future = this.executorService.submit(
+ final Future<TriggerInstance> future = this.singleThreadExecutorService.submit(
() -> this.runningTriggers.stream()
.filter(triggerInst -> triggerInst.getId().equals(triggerInstId)).findFirst()
.orElse(null)
@@ -387,6 +391,10 @@ public class FlowTriggerService {
}
}
+ private void cancelContextAsync(final DependencyInstanceContext context) {
+ this.multiThreadsExecutorService.submit(() -> context.cancel());
+ }
+
/**
* Cancel a trigger instance
*
@@ -394,7 +402,7 @@ public class FlowTriggerService {
* @param cause cause of cancelling
*/
public void cancel(final TriggerInstance triggerInst, final CancellationCause cause) {
- this.executorService.submit(
+ this.singleThreadExecutorService.submit(
() -> {
logger.info(
String.format("cancelling trigger instance with id %s", triggerInst.getId()));
@@ -404,7 +412,7 @@ public class FlowTriggerService {
// instance
if (depInst.getStatus() == Status.RUNNING) {
this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
- depInst.getContext().cancel();
+ cancelContextAsync(depInst.getContext());
}
}
} else {
@@ -428,7 +436,7 @@ public class FlowTriggerService {
* Mark the dependency instance context as success
*/
public void markDependencySuccess(final DependencyInstanceContext context) {
- this.executorService.submit(() -> {
+ this.singleThreadExecutorService.submit(() -> {
final DependencyInstance depInst = findDependencyInstanceByContext(context);
if (depInst != null) {
if (Status.isDone(depInst.getStatus())) {
@@ -440,7 +448,9 @@ public class FlowTriggerService {
logger.info(
String.format("setting dependency instance[id: %s, name: %s] status to succeeded",
depInst.getTriggerInstance().getId(), depInst.getDepName()));
- processStatusUpdate(depInst, Status.SUCCEEDED);
+ // if the status transits from cancelling to succeeded, then cancellation cause was set,
+ // we need to unset cancellation cause.
+ this.processStatusAndCancelCauseUpdate(depInst, Status.SUCCEEDED, CancellationCause.NONE);
// if associated trigger instance becomes success, then remove it from running list
if (depInst.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
logger.info(String.format("trigger instance[id: %s] succeeded",
@@ -457,29 +467,24 @@ public class FlowTriggerService {
}
private boolean cancelledByAzkaban(final DependencyInstance depInst) {
- return depInst.getStatus() == Status.CANCELLING && (
- depInst.getCancellationCause() == CancellationCause
- .MANUAL || depInst.getCancellationCause() == CancellationCause.TIMEOUT || depInst
- .getCancellationCause() == CancellationCause.CASCADING);
+ return depInst.getStatus() == Status.CANCELLING;
}
private boolean cancelledByDependencyPlugin(final DependencyInstance depInst) {
// When onKill is called by the dependency plugin not through flowTriggerService, we treat it
// as cancelled by dependency due to failure on dependency side. In this case, cancel cause
// remains unset.
- return depInst.getStatus() == Status.CANCELLED && (depInst.getCancellationCause()
- == CancellationCause.NONE);
+ return depInst.getStatus() == Status.RUNNING;
}
public void markDependencyCancelled(final DependencyInstanceContext context) {
- this.executorService.submit(() -> {
+ this.singleThreadExecutorService.submit(() -> {
final DependencyInstance depInst = findDependencyInstanceByContext(context);
if (depInst != null) {
- logger.info(
- String.format("setting dependency instance[id: %s, name: %s] status to cancelled",
- depInst.getTriggerInstance().getId(), depInst.getDepName()));
+ logger.info(String.format("set dependency instance[id: %s, name: %s] status to "
+ + "cancelled", depInst.getTriggerInstance().getId(), depInst.getDepName()));
if (cancelledByDependencyPlugin(depInst)) {
- processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, CancellationCause.FAILURE);
+ processStatusAndCancelCauseUpdate(depInst, Status.CANCELLED, CancellationCause.FAILURE);
cancelTriggerInstance(depInst.getTriggerInstance());
} else if (cancelledByAzkaban(depInst)) {
processStatusUpdate(depInst, Status.CANCELLED);
@@ -507,8 +512,10 @@ public class FlowTriggerService {
* Shuts down the service immediately.
*/
public void shutdown() {
- this.executorService.shutdown(); // Disable new tasks from being submitted
- this.executorService.shutdownNow(); // Cancel currently executing tasks
+ this.singleThreadExecutorService.shutdown(); // Disable new tasks from being submitted
+ this.singleThreadExecutorService.shutdownNow(); // Cancel currently executing tasks
+ this.multiThreadsExecutorService.shutdown();
+ this.multiThreadsExecutorService.shutdownNow();
this.triggerPluginManager.shutdown();
}
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
index b4edfd8..c584dc7 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
@@ -16,6 +16,7 @@
package azkaban.flowtrigger;
+import azkaban.Constants;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.flow.Flow;
@@ -25,11 +26,14 @@ import azkaban.project.Project;
import azkaban.utils.Emailer;
import com.google.common.base.Preconditions;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@SuppressWarnings("FutureReturnValueIgnored")
@Singleton
public class TriggerInstanceProcessor {
@@ -37,10 +41,11 @@ public class TriggerInstanceProcessor {
private static final String FAILURE_EMAIL_SUBJECT = "flow trigger for %s "
+ "cancelled from %s";
private static final String FAILURE_EMAIL_BODY = "Your flow trigger cancelled [id: %s]";
-
+ private final static int THREAD_POOL_SIZE = 32;
private final ExecutorManager executorManager;
private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
private final Emailer emailer;
+ private final ExecutorService executorService;
@Inject
public TriggerInstanceProcessor(final ExecutorManager executorManager,
@@ -52,6 +57,7 @@ public class TriggerInstanceProcessor {
this.emailer = emailer;
this.executorManager = executorManager;
this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
+ this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
}
private void executeFlowAndUpdateExecID(final TriggerInstance triggerInst) {
@@ -63,11 +69,17 @@ public class TriggerInstanceProcessor {
// currently running")
this.executorManager.submitExecutableFlow(executableFlow, triggerInst.getSubmitUser());
triggerInst.setFlowExecId(executableFlow.getExecutionId());
- this.flowTriggerInstanceLoader.updateAssociatedFlowExecId(triggerInst);
} catch (final Exception ex) {
- logger.error("exception when executing the associate flow and updating flow exec id", ex);
- //todo chengren311: should we swallow the exception or notify user
+ logger.error(String.format(
+ "exception when executing the associated flow and updating flow exec id for trigger instance[id: %s]",
+ triggerInst.getId()), ex);
+ // if flow fails to be executed(e.g. running execution exceeds the allowed concurrent run
+ // limit), set associated flow exec id to Constants.FAILED_EXEC_ID. Upon web server
+ // restart, recovery process will skip those flows.
+ triggerInst.setFlowExecId(Constants.FAILED_EXEC_ID);
}
+
+ this.flowTriggerInstanceLoader.updateAssociatedFlowExecId(triggerInst);
}
private String generateFailureEmailSubject(final TriggerInstance triggerInstance) {
@@ -104,7 +116,8 @@ public class TriggerInstanceProcessor {
*/
public void processTermination(final TriggerInstance triggerInst) {
logger.debug("process termination for " + triggerInst);
- sendFailureEmailIfConfigured(triggerInst);
+ //sendFailureEmailIfConfigured takes 1/3 secs
+ this.executorService.submit(() -> sendFailureEmailIfConfigured(triggerInst));
}
/**
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm
index 77ba11e..c7edd51 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm
@@ -166,12 +166,15 @@
<td>$utils.formatDuration(${trigger.getStartTime()}, ${utils.currentTimestamp()})
</td>
#end
- #if (${trigger.getFlowExecId()} != "-1")
+
+ #if (${trigger.getFlowExecId()} == "-1")
+ <td>Flow not triggered yet</td>
+ #elseif (${trigger.getFlowExecId()} == "-2")
+ <td>Flow failed to be triggered</td>
+ #else
<td><a
href="${context}/executor?execid=${trigger.getFlowExecId()}">${trigger.getFlowExecId()}</a>
</td>
- #else
- <td>-</td>
#end
<td>
@@ -311,12 +314,14 @@
<td>$utils.formatDuration(${trigger.getStartTime()}, ${utils.currentTimestamp()})
</td>
#end
- #if (${trigger.getFlowExecId()} != "-1")
+ #if (${trigger.getFlowExecId()} == "-1")
+ <td>Flow not triggered yet</td>
+ #elseif (${trigger.getFlowExecId()} == "-2")
+ <td>Flow failed to be triggered</td>
+ #else
<td><a
href="${context}/executor?execid=${trigger.getFlowExecId()}">${trigger.getFlowExecId()}</a>
</td>
- #else
- <td>-</td>
#end
<td>
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
index 2a1f288..cb95603 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
@@ -15,6 +15,7 @@
*/
package azkaban.flowtrigger;
+import static java.lang.Thread.sleep;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
@@ -81,9 +82,10 @@ public class TriggerInstanceProcessorTest {
}
@Test
- public void testProcessTermination() throws ExecutorManagerException {
+ public void testProcessTermination() throws ExecutorManagerException, InterruptedException {
final TriggerInstance triggerInstance = createTriggerInstance();
processor.processTermination(triggerInstance);
+ sleep(1000);
verify(emailer).sendEmail(any(), any(), any());
}