azkaban-aplcache

flow trigger service enhancement (#1644) This PR includes

2/16/2018 8:49:32 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index f901e34..88e5daa 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -29,7 +29,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -63,15 +62,16 @@ import org.slf4j.LoggerFactory;
  *
  * FlowTriggerService will be leveraged by Quartz scheduler, our new AZ scheduler to schedule
  * triggers.
+ *
+ * After construction, call {@link #start()} to start the service.
  */
 
 @SuppressWarnings("FutureReturnValueIgnored")
 @Singleton
 public class FlowTriggerService {
 
-  private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
-  //private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(5);
   private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
+  private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
   private static final String START_TIME = "starttime";
   private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
   private final ExecutorService executorService;
@@ -117,9 +117,6 @@ public class FlowTriggerService {
   private TriggerInstance createTriggerInstance(final FlowTrigger flowTrigger, final String flowId,
       final int flowVersion, final String submitUser, final Project project) {
     final String triggerInstId = generateId();
-    logger.info(
-        String.format("Starting the flow trigger %s[execId %s] by %s", flowTrigger, triggerInstId,
-            submitUser));
     final long startTime = System.currentTimeMillis();
     // create a list of dependency instances
     final List<DependencyInstance> depInstList = new ArrayList<>();
@@ -157,9 +154,8 @@ public class FlowTriggerService {
   private void scheduleKill(final TriggerInstance triggerInst, final Duration duration, final
   CancellationCause cause) {
     logger
-        .info(String.format("Cancel trigger instance %s in %s secs", triggerInst.getId(), duration
-            .getSeconds
-                ()));
+        .debug(String.format("Cancel trigger instance %s in %s secs", triggerInst.getId(), duration
+            .getSeconds()));
     this.timeoutService.schedule(() -> {
       cancel(triggerInst, cause);
     }, duration.toMillis(), TimeUnit.MILLISECONDS);
@@ -255,7 +251,7 @@ public class FlowTriggerService {
       if (triggerInstance.getFlowTrigger() != null) {
         recover(triggerInstance);
       } else {
-        logger.info(String.format("cannot recover the trigger instance %s, flow trigger is null ",
+        logger.error(String.format("cannot recover the trigger instance %s, flow trigger is null ",
             triggerInstance.getId()));
       }
     }
@@ -344,25 +340,27 @@ public class FlowTriggerService {
       final TriggerInstance triggerInst = createTriggerInstance(flowTrigger, flowId, flowVersion,
           submitUser, project);
 
-      logger.info(String.format("Starting trigger instance[id: %s]", triggerInst.getId()));
+      logger.info(
+          String.format("Starting the flow trigger %s[trigger instance id: %s] by %s", flowTrigger,
+              triggerInst.getId(), submitUser));
 
       this.triggerProcessor.processNewInstance(triggerInst);
       if (triggerInst.getStatus() == Status.CANCELLED) {
         // all dependency instances failed
         logger.info(String.format("Trigger instance[id: %s] is cancelled since all dependency "
-                + "instances failed to be created",
-            triggerInst.getId()));
+            + "instances fail to be created", triggerInst.getId()));
         this.triggerProcessor.processTermination(triggerInst);
       } else if (triggerInst.getStatus() == Status.CANCELLING) {
         // some of the dependency instances failed
         logger.info(
-            String.format("Trigger instance[id: %s] is being cancelled ", triggerInst.getId()));
+            String.format("Trigger instance[id: %s] is being cancelled since some dependency "
+                + "instances fail to be created", triggerInst.getId()));
         addToRunningListAndCancel(triggerInst);
+      } else if (triggerInst.getStatus() == Status.SUCCEEDED) {
+        this.triggerProcessor.processSucceed(triggerInst);
       } else {
         // todo chengren311: it's possible web server restarts before the db update, then
         // new instance will not be recoverable from db.
-        logger.info(
-            String.format("Trigger instance[id: %s] is successfully created", triggerInst.getId()));
         addToRunningListAndScheduleKill(triggerInst, triggerInst.getFlowTrigger()
             .getMaxWaitDuration(), CancellationCause.TIMEOUT);
       }
@@ -390,14 +388,6 @@ public class FlowTriggerService {
     }
   }
 
-  private void removeRunningTriggerInstById(final String triggerInstId) {
-    for (final Iterator<TriggerInstance> it = this.runningTriggers.iterator(); it.hasNext(); ) {
-      if (triggerInstId.equals(it.next().getId())) {
-        it.remove();
-      }
-    }
-  }
-
   /**
    * Cancel a trigger instance
    *
@@ -442,20 +432,19 @@ public class FlowTriggerService {
     this.executorService.submit(() -> {
       final DependencyInstance depInst = findDependencyInstanceByContext(context);
       if (depInst != null) {
-        logger.info(
-            String.format("setting dependency instance[id: %s, name: %s] status to success",
-                depInst.getTriggerInstance().getId(), depInst.getDepName()));
-
         if (Status.isDone(depInst.getStatus())) {
           logger.warn(String.format("OnSuccess of dependency instance[id: %s, name: %s] is ignored",
               depInst.getTriggerInstance().getId(), depInst.getDepName()));
           return;
         }
 
+        logger.info(
+            String.format("setting dependency instance[id: %s, name: %s] status to succeeded",
+                depInst.getTriggerInstance().getId(), depInst.getDepName()));
         processStatusUpdate(depInst, Status.SUCCEEDED);
         // if associated trigger instance becomes success, then remove it from running list
         if (depInst.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
-          logger.info(String.format("trigger instance with execId %s succeeded",
+          logger.info(String.format("trigger instance[id: %s] succeeded",
               depInst.getTriggerInstance().getId()));
           this.triggerProcessor.processSucceed(depInst.getTriggerInstance());
           this.runningTriggers.remove(depInst.getTriggerInstance());
@@ -506,7 +495,7 @@ public class FlowTriggerService {
               String.format("trigger instance with execId %s is cancelled",
                   depInst.getTriggerInstance().getId()));
           this.triggerProcessor.processTermination(depInst.getTriggerInstance());
-          removeRunningTriggerInstById(depInst.getTriggerInstance().getId());
+          this.runningTriggers.remove(depInst.getTriggerInstance());
         }
       } else {
         logger.warn(String.format("unable to find trigger instance with context %s when marking "
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
index 99ff8f6..0f172ed 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
@@ -20,9 +20,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
 import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
@@ -51,6 +55,7 @@ public class FlowTriggerServiceTest {
       MockFlowTriggerInstanceLoader();
   private static TestDependencyCheck testDepCheck;
   private static FlowTriggerService flowTriggerService;
+  private static ExecutorManager executorManager;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -60,7 +65,7 @@ public class FlowTriggerServiceTest {
     when(pluginManager.getDependencyCheck(ArgumentMatchers.eq("TestDependencyCheck")))
         .thenReturn(testDepCheck);
 
-    final ExecutorManager executorManager = mock(ExecutorManager.class);
+    executorManager = mock(ExecutorManager.class);
     when(executorManager.submitExecutableFlow(any(), anyString())).thenReturn("return");
 
     final Emailer emailer = mock(Emailer.class);
@@ -80,6 +85,7 @@ public class FlowTriggerServiceTest {
   @Before
   public void cleanup() {
     ((MockFlowTriggerInstanceLoader) flowTriggerInstanceLoader).clear();
+    reset(executorManager);
   }
 
   private Project createProject() {
@@ -104,6 +110,7 @@ public class FlowTriggerServiceTest {
       flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
     }
     Thread.sleep(Duration.ofSeconds(6).toMillis());
+    assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
     final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
     assertThat(triggerInstances).hasSize(30);
     for (final TriggerInstance inst : triggerInstances) {
@@ -138,6 +145,7 @@ public class FlowTriggerServiceTest {
       flowTriggerService.cancel(runningTrigger, CancellationCause.MANUAL);
     }
     Thread.sleep(Duration.ofMillis(500).toMillis());
+    assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
     final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
     assertThat(triggerInstances).hasSize(30);
     for (final TriggerInstance inst : triggerInstances) {
@@ -160,6 +168,7 @@ public class FlowTriggerServiceTest {
       flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
     }
     Thread.sleep(Duration.ofSeconds(1).toMillis());
+    assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
     final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
     assertThat(triggerInstances).hasSize(30);
     for (final TriggerInstance inst : triggerInstances) {
@@ -187,6 +196,7 @@ public class FlowTriggerServiceTest {
       flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
     }
     Thread.sleep(Duration.ofSeconds(5).toMillis());
+    assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
     final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
     assertThat(triggerInstances).hasSize(30);
     for (final TriggerInstance inst : triggerInstances) {
@@ -195,6 +205,21 @@ public class FlowTriggerServiceTest {
   }
 
   @Test
+  public void testStartZeroDependencyTrigger()
+      throws InterruptedException, ExecutorManagerException {
+    final List<FlowTriggerDependency> deps = new ArrayList<>();
+    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
+    for (int i = 0; i < 30; i++) {
+      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+    }
+    Thread.sleep(Duration.ofSeconds(1).toMillis());
+    // zero dependency trigger will launch associated flow immediately
+    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRunningTriggers();
+    assertThat(triggerInstances).isEmpty();
+    verify(executorManager, times(30)).submitExecutableFlow(any(), anyString());
+  }
+
+  @Test
   public void testRecovery() throws Exception {
     final List<FlowTriggerDependency> deps = new ArrayList<>();
     deps.add(TestUtil.createTestDependency("2secs", 2, false));
@@ -207,8 +232,8 @@ public class FlowTriggerServiceTest {
     Thread.sleep(Duration.ofSeconds(1).toMillis());
     flowTriggerService.shutdown();
     setup();
-    flowTriggerService.start();
     Thread.sleep(Duration.ofSeconds(5).toMillis());
+    assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
     final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
     assertThat(triggerInstances).hasSize(30);
     for (final TriggerInstance inst : triggerInstances) {