azkaban-aplcache

code refactor on flow trigger (#1782) 1. change retention

6/1/2018 5:47:11 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerExecutionCleaner.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerExecutionCleaner.java
index b39af1f..27f9db2 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerExecutionCleaner.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerExecutionCleaner.java
@@ -27,13 +27,13 @@ import javax.inject.Inject;
  * This is to purge old flow trigger execution records from the db table.
  * Otherwise the table will keep growing indefinitely as triggers are executed, leading to
  * excessive query time on the table.
- * The cleanup policy is removing trigger instances finishing older than 20 days back.
+ * The cleanup policy is removing trigger instances finishing older than 30 days back.
  */
 @SuppressWarnings("FutureReturnValueIgnored")
 public class FlowTriggerExecutionCleaner {
 
   private static final Duration CLEAN_INTERVAL = Duration.ofMinutes(10);
-  private static final Duration RETENTION_PERIOD = Duration.ofDays(10);
+  private static final Duration RETENTION_PERIOD = Duration.ofDays(30);
   private final ScheduledExecutorService scheduler;
   private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
 
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index 2b43657..a1cc031 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -71,14 +71,17 @@ import org.slf4j.LoggerFactory;
 @Singleton
 public class FlowTriggerService {
 
+  private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
+
   private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
   private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 50;
   private static final int CANCEL_EXECUTOR_POOL_SIZE = 32;
-  private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
-  private final ExecutorService singleThreadExecutorService;
-  private final ExecutorService multiThreadsExecutorService;
-  private final List<TriggerInstance> runningTriggers;
+  private static final int TIMEOUT_EXECUTOR_POOL_SIZE = 8;
+
+  private final ExecutorService flowTriggerExecutorService;
+  private final ExecutorService cancelExecutorService;
   private final ScheduledExecutorService timeoutService;
+  private final List<TriggerInstance> runningTriggers;
   private final FlowTriggerDependencyPluginManager triggerPluginManager;
   private final TriggerInstanceProcessor triggerProcessor;
   private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
@@ -91,12 +94,14 @@ public class FlowTriggerService {
       dependencyProcessor, final FlowTriggerInstanceLoader flowTriggerInstanceLoader,
       final FlowTriggerExecutionCleaner cleaner) {
     // Give the thread a name to make debugging easier.
-    final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
         .setNameFormat("FlowTrigger-service").build();
-    this.singleThreadExecutorService = Executors.newSingleThreadExecutor(namedThreadFactory);
-    this.multiThreadsExecutorService = Executors
+    this.flowTriggerExecutorService = Executors.newSingleThreadExecutor(namedThreadFactory);
+    namedThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("FlowTrigger-cancel").build();
+    this.cancelExecutorService = Executors
         .newFixedThreadPool(CANCEL_EXECUTOR_POOL_SIZE, namedThreadFactory);
-    this.timeoutService = Executors.newScheduledThreadPool(8);
+    this.timeoutService = Executors.newScheduledThreadPool(TIMEOUT_EXECUTOR_POOL_SIZE);
     this.runningTriggers = new ArrayList<>();
     this.triggerPluginManager = pluginManager;
     this.triggerProcessor = triggerProcessor;
@@ -171,7 +176,7 @@ public class FlowTriggerService {
         .debug(String.format("Cancel trigger instance %s in %s secs", triggerInst.getId(), duration
             .getSeconds()));
     this.timeoutService.schedule(() -> {
-      cancel(triggerInst, cause);
+      cancelTriggerInstance(triggerInst, cause);
     }, duration.toMillis(), TimeUnit.MILLISECONDS);
   }
 
@@ -236,18 +241,20 @@ public class FlowTriggerService {
     }
   }
 
+  private void recoverTriggerInstance(final TriggerInstance triggerInstance) {
+    this.flowTriggerExecutorService.submit(() -> recover(triggerInstance));
+  }
+
   private void recover(final TriggerInstance triggerInstance) {
-    this.singleThreadExecutorService.submit(() -> {
-      logger.info(String.format("recovering pending trigger instance %s", triggerInstance.getId
-          ()));
-      if (isDoneButFlowNotExecuted(triggerInstance)) {
-        // if trigger instance succeeds but the associated flow hasn't been started yet, then start
-        // the flow
-        this.triggerProcessor.processSucceed(triggerInstance);
-      } else {
-        recoverRunningOrCancelling(triggerInstance);
-      }
-    });
+    logger.info(String.format("recovering pending trigger instance %s", triggerInstance.getId
+        ()));
+    if (isDoneButFlowNotExecuted(triggerInstance)) {
+      // if trigger instance succeeds but the associated flow hasn't been started yet, then start
+      // the flow
+      this.triggerProcessor.processSucceed(triggerInstance);
+    } else {
+      recoverRunningOrCancelling(triggerInstance);
+    }
   }
 
   /**
@@ -258,7 +265,7 @@ public class FlowTriggerService {
         .getIncompleteTriggerInstances();
     for (final TriggerInstance triggerInstance : unfinishedTriggerInstances) {
       if (triggerInstance.getFlowTrigger() != null) {
-        recover(triggerInstance);
+        recoverTriggerInstance(triggerInstance);
       } else {
         logger.error(String.format("cannot recover the trigger instance %s, flow trigger is null,"
             + " cancelling it ", triggerInstance.getId()));
@@ -368,49 +375,42 @@ public class FlowTriggerService {
    */
   public void startTrigger(final FlowTrigger flowTrigger, final String flowId,
       final int flowVersion, final String submitUser, final Project project) {
-    this.singleThreadExecutorService.submit(() -> {
-      final TriggerInstance triggerInst = createTriggerInstance(flowTrigger, flowId, flowVersion,
-          submitUser, project);
-
+    final TriggerInstance triggerInst = createTriggerInstance(flowTrigger, flowId, flowVersion,
+        submitUser, project);
+    this.flowTriggerExecutorService.submit(() -> {
       logger.info(
           String.format("Starting the flow trigger %s[trigger instance id: %s] by %s", flowTrigger,
               triggerInst.getId(), submitUser));
-
-      this.triggerProcessor.processNewInstance(triggerInst);
-      if (triggerInst.getStatus() == Status.CANCELLED) {
-        // all dependency instances failed
-        logger.info(String.format("Trigger instance[id: %s] is cancelled since all dependency "
-            + "instances fail to be created", triggerInst.getId()));
-        this.triggerProcessor.processTermination(triggerInst);
-      } else if (triggerInst.getStatus() == Status.CANCELLING) {
-        // some of the dependency instances failed
-        logger.info(
-            String.format("Trigger instance[id: %s] is being cancelled since some dependency "
-                + "instances fail to be created", triggerInst.getId()));
-        addToRunningListAndCancel(triggerInst);
-      } else if (triggerInst.getStatus() == Status.SUCCEEDED) {
-        this.triggerProcessor.processSucceed(triggerInst);
-      } else {
-        // todo chengren311: it's possible web server restarts before the db update, then
-        // new instance will not be recoverable from db.
-        addToRunningListAndScheduleKill(triggerInst, triggerInst.getFlowTrigger()
-            .getMaxWaitDuration(), CancellationCause.TIMEOUT);
-      }
+      start(triggerInst);
     });
   }
 
-  private FlowTriggerDependency getFlowTriggerDepByName(final FlowTrigger flowTrigger,
-      final String depName) {
-    return flowTrigger.getDependencies().stream().filter(ftd -> ftd.getName().equals(depName))
-        .findFirst().orElse(null);
+  private void start(final TriggerInstance triggerInst) {
+    this.triggerProcessor.processNewInstance(triggerInst);
+    if (triggerInst.getStatus() == Status.CANCELLED) {
+      // all dependency instances failed
+      logger.info(String.format("Trigger instance[id: %s] is cancelled since all dependency "
+          + "instances fail to be created", triggerInst.getId()));
+      this.triggerProcessor.processTermination(triggerInst);
+    } else if (triggerInst.getStatus() == Status.CANCELLING) {
+      // some of the dependency instances failed
+      logger.info(
+          String.format("Trigger instance[id: %s] is being cancelled since some dependency "
+              + "instances fail to be created", triggerInst.getId()));
+      addToRunningListAndCancel(triggerInst);
+    } else if (triggerInst.getStatus() == Status.SUCCEEDED) {
+      this.triggerProcessor.processSucceed(triggerInst);
+    } else {
+      // todo chengren311: it's possible web server restarts before the db update, then
+      // new instance will not be recoverable from db.
+      addToRunningListAndScheduleKill(triggerInst, triggerInst.getFlowTrigger()
+          .getMaxWaitDuration(), CancellationCause.TIMEOUT);
+    }
   }
 
   public TriggerInstance findRunningTriggerInstById(final String triggerInstId) {
-    //todo chengren311: make the method single threaded
-    final Future<TriggerInstance> future = this.singleThreadExecutorService.submit(
-        () -> this.runningTriggers.stream()
-            .filter(triggerInst -> triggerInst.getId().equals(triggerInstId)).findFirst()
-            .orElse(null)
+    final Future<TriggerInstance> future = this.flowTriggerExecutorService.submit(
+        () -> getTriggerInstanceById(triggerInstId)
     );
     try {
       return future.get();
@@ -420,8 +420,14 @@ public class FlowTriggerService {
     }
   }
 
+  private TriggerInstance getTriggerInstanceById(final String triggerInstId) {
+    return this.runningTriggers.stream()
+        .filter(triggerInst -> triggerInst.getId().equals(triggerInstId)).findFirst()
+        .orElse(null);
+  }
+
   private void cancelContextAsync(final DependencyInstanceContext context) {
-    this.multiThreadsExecutorService.submit(() -> context.cancel());
+    this.cancelExecutorService.submit(() -> context.cancel());
   }
 
   /**
@@ -430,27 +436,29 @@ public class FlowTriggerService {
    * @param triggerInst trigger instance to be cancelled
    * @param cause cause of cancelling
    */
-  public void cancel(final TriggerInstance triggerInst, final CancellationCause cause) {
-    this.singleThreadExecutorService.submit(
-        () -> {
-          logger.info(
-              String.format("cancelling trigger instance with id %s", triggerInst.getId()));
-          if (triggerInst != null) {
-            for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
-              // cancel only running dependencies, no need to cancel a killed/successful dependency
-              // instance
-              if (depInst.getStatus() == Status.RUNNING) {
-                this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
-                cancelContextAsync(depInst.getContext());
-              }
-            }
-          } else {
-            logger.debug(String
-                .format("unable to cancel a trigger instance in non-running state with id %s",
-                    triggerInst.getId()));
-          }
+  public void cancelTriggerInstance(final TriggerInstance triggerInst,
+      final CancellationCause cause) {
+    this.flowTriggerExecutorService.submit(() -> cancel(triggerInst, cause));
+  }
+
+  private void cancel(final TriggerInstance triggerInst, final CancellationCause cause) {
+    logger.info(
+        String.format("cancelling trigger instance with id %s", triggerInst.getId()));
+    if (triggerInst != null) {
+      for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+        // cancel running dependencies only, no need to cancel a killed/successful dependency
+        // instance
+        if (depInst.getStatus() == Status.RUNNING) {
+          this.processStatusAndCancelCauseUpdate(depInst, Status.CANCELLING, cause);
+          cancelContextAsync(depInst.getContext());
         }
-    );
+      }
+    } else {
+      logger.debug(String
+          .format(
+              "unable to cancel a trigger instance in non-running state with id %s",
+              triggerInst.getId()));
+    }
   }
 
   private DependencyInstance findDependencyInstanceByContext(
@@ -465,34 +473,36 @@ public class FlowTriggerService {
    * Mark the dependency instance context as success
    */
   public void markDependencySuccess(final DependencyInstanceContext context) {
-    this.singleThreadExecutorService.submit(() -> {
-      final DependencyInstance depInst = findDependencyInstanceByContext(context);
-      if (depInst != null) {
-        if (Status.isDone(depInst.getStatus())) {
-          logger.warn(String.format("OnSuccess of dependency instance[id: %s, name: %s] is ignored",
-              depInst.getTriggerInstance().getId(), depInst.getDepName()));
-          return;
-        }
+    this.flowTriggerExecutorService.submit(() -> markSuccess(context));
+  }
 
-        logger.info(
-            String.format("setting dependency instance[id: %s, name: %s] status to succeeded",
-                depInst.getTriggerInstance().getId(), depInst.getDepName()));
-        // if the status transits from cancelling to succeeded, then cancellation cause was set,
-        // we need to unset cancellation cause.
-        this.processStatusAndCancelCauseUpdate(depInst, Status.SUCCEEDED, CancellationCause.NONE);
-        // if associated trigger instance becomes success, then remove it from running list
-        if (depInst.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
-          logger.info(String.format("trigger instance[id: %s] succeeded",
-              depInst.getTriggerInstance().getId()));
-          this.triggerProcessor.processSucceed(depInst.getTriggerInstance());
-          this.runningTriggers.remove(depInst.getTriggerInstance());
-        }
-      } else {
-        logger.debug(String.format("unable to find trigger instance with context %s when marking "
-                + "it success",
-            context));
+  private void markSuccess(final DependencyInstanceContext context) {
+    final DependencyInstance depInst = findDependencyInstanceByContext(context);
+    if (depInst != null) {
+      if (Status.isDone(depInst.getStatus())) {
+        logger.warn(String.format("OnSuccess of dependency instance[id: %s, name: %s] is ignored",
+            depInst.getTriggerInstance().getId(), depInst.getDepName()));
+        return;
       }
-    });
+
+      logger.info(
+          String.format("setting dependency instance[id: %s, name: %s] status to succeeded",
+              depInst.getTriggerInstance().getId(), depInst.getDepName()));
+      // if the status transits from cancelling to succeeded, then cancellation cause was set,
+      // we need to unset cancellation cause.
+      this.processStatusAndCancelCauseUpdate(depInst, Status.SUCCEEDED, CancellationCause.NONE);
+      // if associated trigger instance becomes success, then remove it from running list
+      if (depInst.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
+        logger.info(String.format("trigger instance[id: %s] succeeded",
+            depInst.getTriggerInstance().getId()));
+        this.triggerProcessor.processSucceed(depInst.getTriggerInstance());
+        this.runningTriggers.remove(depInst.getTriggerInstance());
+      }
+    } else {
+      logger.debug(String.format("unable to find trigger instance with context %s when marking "
+              + "it success",
+          context));
+    }
   }
 
   private boolean cancelledByAzkaban(final DependencyInstance depInst) {
@@ -507,44 +517,51 @@ public class FlowTriggerService {
   }
 
   public void markDependencyCancelled(final DependencyInstanceContext context) {
-    this.singleThreadExecutorService.submit(() -> {
-      final DependencyInstance depInst = findDependencyInstanceByContext(context);
-      if (depInst != null) {
-        logger.info(String.format("set dependency instance[id: %s, name: %s] status to "
-            + "cancelled", depInst.getTriggerInstance().getId(), depInst.getDepName()));
-        if (cancelledByDependencyPlugin(depInst)) {
-          processStatusAndCancelCauseUpdate(depInst, Status.CANCELLED, CancellationCause.FAILURE);
-          cancelTriggerInstance(depInst.getTriggerInstance());
-        } else if (cancelledByAzkaban(depInst)) {
-          processStatusUpdate(depInst, Status.CANCELLED);
-        } else {
-          logger.warn(String.format("OnCancel of dependency instance[id: %s, name: %s] is ignored",
-              depInst.getTriggerInstance().getId(), depInst.getDepName()));
-          return;
-        }
+    this.flowTriggerExecutorService.submit(() -> {
+      markCancelled(context);
+    });
+  }
 
-        if (depInst.getTriggerInstance().getStatus() == Status.CANCELLED) {
-          logger.info(
-              String.format("trigger instance with execId %s is cancelled",
-                  depInst.getTriggerInstance().getId()));
-          this.triggerProcessor.processTermination(depInst.getTriggerInstance());
-          this.runningTriggers.remove(depInst.getTriggerInstance());
-        }
+  private void markCancelled(final DependencyInstanceContext context) {
+    final DependencyInstance depInst = findDependencyInstanceByContext(context);
+    if (depInst != null) {
+      logger.info(String.format("set dependency instance[id: %s, name: %s] status to "
+          + "cancelled", depInst.getTriggerInstance().getId(), depInst.getDepName()));
+      if (cancelledByDependencyPlugin(depInst)) {
+        processStatusAndCancelCauseUpdate(depInst, Status.CANCELLED, CancellationCause.FAILURE);
+        cancelTriggerInstance(depInst.getTriggerInstance());
+      } else if (cancelledByAzkaban(depInst)) {
+        processStatusUpdate(depInst, Status.CANCELLED);
       } else {
-        logger.warn(String.format("unable to find trigger instance with context %s when marking "
-            + "it cancelled", context));
+        logger.warn(String.format("OnCancel of dependency instance[id: %s, name: %s] is ignored",
+            depInst.getTriggerInstance().getId(), depInst.getDepName()));
+        return;
       }
-    });
+
+      if (depInst.getTriggerInstance().getStatus() == Status.CANCELLED) {
+        logger.info(
+            String.format("trigger instance with execId %s is cancelled",
+                depInst.getTriggerInstance().getId()));
+        this.triggerProcessor.processTermination(depInst.getTriggerInstance());
+        this.runningTriggers.remove(depInst.getTriggerInstance());
+      }
+    } else {
+      logger.warn(String.format("unable to find trigger instance with context %s when marking "
+          + "it cancelled", context));
+    }
   }
 
   /**
    * Shuts down the service immediately.
    */
   public void shutdown() {
-    this.singleThreadExecutorService.shutdown(); // Disable new tasks from being submitted
-    this.singleThreadExecutorService.shutdownNow(); // Cancel currently executing tasks
-    this.multiThreadsExecutorService.shutdown();
-    this.multiThreadsExecutorService.shutdownNow();
+    this.flowTriggerExecutorService.shutdown();
+    this.cancelExecutorService.shutdown();
+    this.timeoutService.shutdown();
+
+    this.flowTriggerExecutorService.shutdownNow();
+    this.cancelExecutorService.shutdownNow();
+    this.timeoutService.shutdownNow();
 
     this.triggerProcessor.shutdown();
     this.triggerPluginManager.shutdown();
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
index f17c499..bf3562b 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
@@ -60,11 +60,6 @@ public class TriggerInstanceProcessor {
     this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
   }
 
-  public void shutdown() {
-    this.executorService.shutdown();
-    this.executorService.shutdownNow();
-  }
-
   private void executeFlowAndUpdateExecID(final TriggerInstance triggerInst) {
     try {
       final Project project = triggerInst.getProject();
@@ -132,4 +127,9 @@ public class TriggerInstanceProcessor {
     logger.debug("process new instance for " + triggerInst);
     this.flowTriggerInstanceLoader.uploadTriggerInstance(triggerInst);
   }
+
+  public void shutdown() {
+    this.executorService.shutdown();
+    this.executorService.shutdownNow();
+  }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
index 22adff9..9d0133a 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
@@ -118,11 +118,11 @@ public class FlowTriggerInstanceServlet extends LoginAbstractAzkabanServlet {
         final Project project = this.projectManager.getProject(projectName);
         if (project == null) {
           ret.put("error", "please specify a valid project name");
-        }
-        else if (!hasPermission(project, session.getUser(), Type.READ)) {
+        } else if (!hasPermission(project, session.getUser(), Type.READ)) {
           ret.put("error", "Permission denied. Need READ access.");
+        } else {
+          ajaxFetchTriggerInstances(project.getId(), flowId, ret, req);
         }
-        else ajaxFetchTriggerInstances(project.getId(), flowId, ret, req);
       } else {
         ret.put("error", "please specify project id and flow id");
       }
@@ -230,7 +230,7 @@ public class FlowTriggerInstanceServlet extends LoginAbstractAzkabanServlet {
         .findRunningTriggerInstById(triggerInstanceId);
     if (triggerInst != null) {
       if (hasPermission(triggerInst.getProject(), session.getUser(), Type.EXECUTE)) {
-        this.triggerService.cancel(triggerInst, CancellationCause.MANUAL);
+        this.triggerService.cancelTriggerInstance(triggerInst, CancellationCause.MANUAL);
       } else {
         ret.put("error", "no permission to kill the trigger");
       }
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
index 2b7d588..ff81559 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
@@ -145,7 +145,7 @@ public class FlowTriggerServiceTest {
 
     Thread.sleep(Duration.ofMillis(500).toMillis());
     for (final TriggerInstance runningTrigger : flowTriggerService.getRunningTriggers()) {
-      flowTriggerService.cancel(runningTrigger, CancellationCause.MANUAL);
+      flowTriggerService.cancelTriggerInstance(runningTrigger, CancellationCause.MANUAL);
     }
     Thread.sleep(Duration.ofMillis(500).toMillis());
     assertThat(flowTriggerService.getRunningTriggers()).isEmpty();