azkaban-aplcache

flow trigger service enhancement (#1686) This PR includes

3/22/2018 7:43:04 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 3ee3af4..eb5d492 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -84,6 +84,10 @@ public class Constants {
   // The flow exec id for a flow trigger instance which hasn't started a flow yet
   public static final int UNASSIGNED_EXEC_ID = -1;
 
+  // The flow exec id for a flow trigger instance unable to trigger a flow yet
+  public static final int FAILED_EXEC_ID = -2;
+
+
   public static class ConfigurationKeys {
 
     // Configures Azkaban Flow Version in project YAML file
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 342e61e..64d728e 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -95,9 +95,12 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
               Constants.UNASSIGNED_EXEC_ID);
 
   private static final String SELECT_ALL_RUNNING_EXECUTIONS =
-      String.format("SELECT %s FROM %s WHERE dep_status = %s or dep_status = %s",
+      String.format(
+          "SELECT %s FROM %s WHERE trigger_instance_id in (SELECT trigger_instance_id FROM %s "
+              + "WHERE dep_status = %s or dep_status = %s)",
           StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
           DEPENDENCY_EXECUTION_TABLE,
+          DEPENDENCY_EXECUTION_TABLE,
           Status.RUNNING.ordinal(), Status.CANCELLING.ordinal());
 
   private static final String SELECT_RECENTLY_FINISHED = String.format(
@@ -105,13 +108,14 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
           + "cancelleation_cause,project_id,"
           + "project_version,flow_id,flow_version, flow_exec_id \n"
           + "FROM execution_dependencies JOIN (\n"
-          + "SELECT distinct(trigger_instance_id), max(endtime) FROM execution_dependencies "
-          + "WHERE dep_status = %s or dep_status = %s\n"
+          + "SELECT trigger_instance_id FROM execution_dependencies where "
+          + "trigger_instance_id not in (SELECT distinct(trigger_instance_id) FROM "
+          + "execution_dependencies WHERE dep_status = %s or dep_status = %s)\n"
           + "GROUP BY trigger_instance_id ORDER BY max(endtime) DESC \n"
           + " limit %%s ) temp on execution_dependencies"
           + ".trigger_instance_id in (temp.trigger_instance_id);",
-      Status.SUCCEEDED.ordinal(),
-      Status.CANCELLED.ordinal());
+      Status.RUNNING.ordinal(),
+      Status.CANCELLING.ordinal());
 
 
   private static final String UPDATE_DEPENDENCY_FLOW_EXEC_ID = String.format("UPDATE %s SET "
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index f7f17f6..de98efb 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -73,8 +73,10 @@ public class FlowTriggerService {
 
   private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
   private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
+  private static final int CANCEL_EXECUTOR_POOL_SIZE = 32;
   private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
-  private final ExecutorService executorService;
+  private final ExecutorService singleThreadExecutorService;
+  private final ExecutorService multiThreadsExecutorService;
   private final List<TriggerInstance> runningTriggers;
   private final ScheduledExecutorService timeoutService;
   private final FlowTriggerDependencyPluginManager triggerPluginManager;
@@ -89,7 +91,9 @@ public class FlowTriggerService {
     // Give the thread a name to make debugging easier.
     final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
         .setNameFormat("FlowTrigger-service").build();
-    this.executorService = Executors.newSingleThreadExecutor(namedThreadFactory);
+    this.singleThreadExecutorService = Executors.newSingleThreadExecutor(namedThreadFactory);
+    this.multiThreadsExecutorService = Executors
+        .newFixedThreadPool(CANCEL_EXECUTOR_POOL_SIZE, namedThreadFactory);
     this.timeoutService = Executors.newScheduledThreadPool(8);
     this.runningTriggers = new ArrayList<>();
     this.triggerPluginManager = pluginManager;
@@ -225,11 +229,11 @@ public class FlowTriggerService {
   }
 
   private void recover(final TriggerInstance triggerInstance) {
-    this.executorService.submit(() -> {
+    this.singleThreadExecutorService.submit(() -> {
       logger.info(String.format("recovering pending trigger instance %s", triggerInstance.getId
           ()));
       if (isDoneButFlowNotExecuted(triggerInstance)) {
-        // if trigger instance succeeds but the associated flow hasn't been started, then start
+        // if trigger instance succeeds but the associated flow hasn't been started yet, then start
         // the flow
         this.triggerProcessor.processSucceed(triggerInstance);
       } else {
@@ -286,12 +290,12 @@ public class FlowTriggerService {
     final CancellationCause cause = getCancelleationCause(triggerInst);
     for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
       if (depInst.getStatus() == Status.CANCELLING) {
-        depInst.getContext().cancel();
+        cancelContextAsync(depInst.getContext());
       } else if (depInst.getStatus() == Status.RUNNING) {
         // sometimes dependency instances of trigger instance in cancelling status can be running.
         // e.x. dep inst1: failure, dep inst2: running -> trigger inst is in killing
         this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
-        depInst.getContext().cancel();
+        cancelContextAsync(depInst.getContext());
       }
     }
   }
@@ -335,7 +339,7 @@ public class FlowTriggerService {
    */
   public void startTrigger(final FlowTrigger flowTrigger, final String flowId,
       final int flowVersion, final String submitUser, final Project project) {
-    this.executorService.submit(() -> {
+    this.singleThreadExecutorService.submit(() -> {
       final TriggerInstance triggerInst = createTriggerInstance(flowTrigger, flowId, flowVersion,
           submitUser, project);
 
@@ -374,7 +378,7 @@ public class FlowTriggerService {
 
   public TriggerInstance findRunningTriggerInstById(final String triggerInstId) {
     //todo chengren311: make the method single threaded
-    final Future<TriggerInstance> future = this.executorService.submit(
+    final Future<TriggerInstance> future = this.singleThreadExecutorService.submit(
         () -> this.runningTriggers.stream()
             .filter(triggerInst -> triggerInst.getId().equals(triggerInstId)).findFirst()
             .orElse(null)
@@ -387,6 +391,10 @@ public class FlowTriggerService {
     }
   }
 
+  private void cancelContextAsync(final DependencyInstanceContext context) {
+    this.multiThreadsExecutorService.submit(() -> context.cancel());
+  }
+
   /**
    * Cancel a trigger instance
    *
@@ -394,7 +402,7 @@ public class FlowTriggerService {
    * @param cause cause of cancelling
    */
   public void cancel(final TriggerInstance triggerInst, final CancellationCause cause) {
-    this.executorService.submit(
+    this.singleThreadExecutorService.submit(
         () -> {
           logger.info(
               String.format("cancelling trigger instance with id %s", triggerInst.getId()));
@@ -404,7 +412,7 @@ public class FlowTriggerService {
               // instance
               if (depInst.getStatus() == Status.RUNNING) {
                 this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
-                depInst.getContext().cancel();
+                cancelContextAsync(depInst.getContext());
               }
             }
           } else {
@@ -428,7 +436,7 @@ public class FlowTriggerService {
    * Mark the dependency instance context as success
    */
   public void markDependencySuccess(final DependencyInstanceContext context) {
-    this.executorService.submit(() -> {
+    this.singleThreadExecutorService.submit(() -> {
       final DependencyInstance depInst = findDependencyInstanceByContext(context);
       if (depInst != null) {
         if (Status.isDone(depInst.getStatus())) {
@@ -440,7 +448,9 @@ public class FlowTriggerService {
         logger.info(
             String.format("setting dependency instance[id: %s, name: %s] status to succeeded",
                 depInst.getTriggerInstance().getId(), depInst.getDepName()));
-        processStatusUpdate(depInst, Status.SUCCEEDED);
+        // if the status transits from cancelling to succeeded, then cancellation cause was set,
+        // we need to unset cancellation cause.
+        this.processStatusAndCancelCauseUpdate(depInst, Status.SUCCEEDED, CancellationCause.NONE);
         // if associated trigger instance becomes success, then remove it from running list
         if (depInst.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
           logger.info(String.format("trigger instance[id: %s] succeeded",
@@ -457,29 +467,24 @@ public class FlowTriggerService {
   }
 
   private boolean cancelledByAzkaban(final DependencyInstance depInst) {
-    return depInst.getStatus() == Status.CANCELLING && (
-        depInst.getCancellationCause() == CancellationCause
-            .MANUAL || depInst.getCancellationCause() == CancellationCause.TIMEOUT || depInst
-            .getCancellationCause() == CancellationCause.CASCADING);
+    return depInst.getStatus() == Status.CANCELLING;
   }
 
   private boolean cancelledByDependencyPlugin(final DependencyInstance depInst) {
     // When onKill is called by the dependency plugin not through flowTriggerService, we treat it
     // as cancelled by dependency due to failure on dependency side. In this case, cancel cause
     // remains unset.
-    return depInst.getStatus() == Status.CANCELLED && (depInst.getCancellationCause()
-        == CancellationCause.NONE);
+    return depInst.getStatus() == Status.RUNNING;
   }
 
   public void markDependencyCancelled(final DependencyInstanceContext context) {
-    this.executorService.submit(() -> {
+    this.singleThreadExecutorService.submit(() -> {
       final DependencyInstance depInst = findDependencyInstanceByContext(context);
       if (depInst != null) {
-        logger.info(
-            String.format("setting dependency instance[id: %s, name: %s] status to cancelled",
-                depInst.getTriggerInstance().getId(), depInst.getDepName()));
+        logger.info(String.format("set dependency instance[id: %s, name: %s] status to "
+            + "cancelled", depInst.getTriggerInstance().getId(), depInst.getDepName()));
         if (cancelledByDependencyPlugin(depInst)) {
-          processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, CancellationCause.FAILURE);
+          processStatusAndCancelCauseUpdate(depInst, Status.CANCELLED, CancellationCause.FAILURE);
           cancelTriggerInstance(depInst.getTriggerInstance());
         } else if (cancelledByAzkaban(depInst)) {
           processStatusUpdate(depInst, Status.CANCELLED);
@@ -507,8 +512,10 @@ public class FlowTriggerService {
    * Shuts down the service immediately.
    */
   public void shutdown() {
-    this.executorService.shutdown(); // Disable new tasks from being submitted
-    this.executorService.shutdownNow(); // Cancel currently executing tasks
+    this.singleThreadExecutorService.shutdown(); // Disable new tasks from being submitted
+    this.singleThreadExecutorService.shutdownNow(); // Cancel currently executing tasks
+    this.multiThreadsExecutorService.shutdown();
+    this.multiThreadsExecutorService.shutdownNow();
     this.triggerPluginManager.shutdown();
   }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
index b4edfd8..c584dc7 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
@@ -16,6 +16,7 @@
 
 package azkaban.flowtrigger;
 
+import azkaban.Constants;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
 import azkaban.flow.Flow;
@@ -25,11 +26,14 @@ import azkaban.project.Project;
 import azkaban.utils.Emailer;
 import com.google.common.base.Preconditions;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@SuppressWarnings("FutureReturnValueIgnored")
 @Singleton
 public class TriggerInstanceProcessor {
 
@@ -37,10 +41,11 @@ public class TriggerInstanceProcessor {
   private static final String FAILURE_EMAIL_SUBJECT = "flow trigger for %s "
       + "cancelled from %s";
   private static final String FAILURE_EMAIL_BODY = "Your flow trigger cancelled [id: %s]";
-
+  private final static int THREAD_POOL_SIZE = 32;
   private final ExecutorManager executorManager;
   private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
   private final Emailer emailer;
+  private final ExecutorService executorService;
 
   @Inject
   public TriggerInstanceProcessor(final ExecutorManager executorManager,
@@ -52,6 +57,7 @@ public class TriggerInstanceProcessor {
     this.emailer = emailer;
     this.executorManager = executorManager;
     this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
+    this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
   }
 
   private void executeFlowAndUpdateExecID(final TriggerInstance triggerInst) {
@@ -63,11 +69,17 @@ public class TriggerInstanceProcessor {
       // currently running")
       this.executorManager.submitExecutableFlow(executableFlow, triggerInst.getSubmitUser());
       triggerInst.setFlowExecId(executableFlow.getExecutionId());
-      this.flowTriggerInstanceLoader.updateAssociatedFlowExecId(triggerInst);
     } catch (final Exception ex) {
-      logger.error("exception when executing the associate flow and updating flow exec id", ex);
-      //todo chengren311: should we swallow the exception or notify user
+      logger.error(String.format(
+          "exception when executing the associated flow and updating flow exec id for trigger instance[id: %s]",
+          triggerInst.getId()), ex);
+      // if flow fails to be executed(e.g. running execution exceeds the allowed concurrent run
+      // limit), set associated flow exec id to Constants.FAILED_EXEC_ID. Upon web server
+      // restart, recovery process will skip those flows.
+      triggerInst.setFlowExecId(Constants.FAILED_EXEC_ID);
     }
+
+    this.flowTriggerInstanceLoader.updateAssociatedFlowExecId(triggerInst);
   }
 
   private String generateFailureEmailSubject(final TriggerInstance triggerInstance) {
@@ -104,7 +116,8 @@ public class TriggerInstanceProcessor {
    */
   public void processTermination(final TriggerInstance triggerInst) {
     logger.debug("process termination for " + triggerInst);
-    sendFailureEmailIfConfigured(triggerInst);
+    //sendFailureEmailIfConfigured takes 1/3 secs
+    this.executorService.submit(() -> sendFailureEmailIfConfigured(triggerInst));
   }
 
   /**
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm
index 77ba11e..c7edd51 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/executingflowtriggerspage.vm
@@ -166,12 +166,15 @@
                 <td>$utils.formatDuration(${trigger.getStartTime()}, ${utils.currentTimestamp()})
                 </td>
               #end
-              #if (${trigger.getFlowExecId()} != "-1")
+
+              #if (${trigger.getFlowExecId()} == "-1")
+                <td>Flow not triggered yet</td>
+              #elseif (${trigger.getFlowExecId()} == "-2")
+                <td>Flow failed to be triggered</td>
+              #else
                 <td><a
                     href="${context}/executor?execid=${trigger.getFlowExecId()}">${trigger.getFlowExecId()}</a>
                 </td>
-              #else
-                <td>-</td>
               #end
 
               <td>
@@ -311,12 +314,14 @@
                 <td>$utils.formatDuration(${trigger.getStartTime()}, ${utils.currentTimestamp()})
                 </td>
               #end
-              #if (${trigger.getFlowExecId()} != "-1")
+              #if (${trigger.getFlowExecId()} == "-1")
+                <td>Flow not triggered yet</td>
+              #elseif (${trigger.getFlowExecId()} == "-2")
+                <td>Flow failed to be triggered</td>
+              #else
                 <td><a
                     href="${context}/executor?execid=${trigger.getFlowExecId()}">${trigger.getFlowExecId()}</a>
                 </td>
-              #else
-                <td>-</td>
               #end
 
               <td>
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
index 2a1f288..cb95603 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
@@ -15,6 +15,7 @@
  */
 package azkaban.flowtrigger;
 
+import static java.lang.Thread.sleep;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doNothing;
@@ -81,9 +82,10 @@ public class TriggerInstanceProcessorTest {
   }
 
   @Test
-  public void testProcessTermination() throws ExecutorManagerException {
+  public void testProcessTermination() throws ExecutorManagerException, InterruptedException {
     final TriggerInstance triggerInstance = createTriggerInstance();
     processor.processTermination(triggerInstance);
+    sleep(1000);
     verify(emailer).sendEmail(any(), any(), any());
   }