azkaban-aplcache

execute flow asynchronously when being triggered by flow trigger

6/4/2018 8:25:08 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
index 4673e98..1338c64 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
@@ -145,7 +145,7 @@ public class TriggerInstanceProcessor {
    */
   public void processSucceed(final TriggerInstance triggerInst) {
     //todo chengren311: publishing Trigger events to Azkaban Project Events page
-    executeFlowAndUpdateExecID(triggerInst);
+    this.executorService.submit(() -> executeFlowAndUpdateExecID(triggerInst));
   }
 
   /**
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
index 39f56e8..e8c1e4d 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
 import azkaban.metrics.CommonMetrics;
@@ -65,6 +64,7 @@ public class TriggerInstanceProcessorTest {
   private EmailMessageCreator messageCreator;
   private TriggerInstanceProcessor processor;
   private CountDownLatch sendEmailLatch;
+  private CountDownLatch submitFlowLatch;
 
   private static TriggerInstance createTriggerInstance() throws ParseException {
     final FlowTrigger flowTrigger = new FlowTrigger(
@@ -112,14 +112,22 @@ public class TriggerInstanceProcessorTest {
       this.sendEmailLatch.countDown();
       return null;
     }).when(this.emailer).sendEmail(any(), any(), any());
+
+    this.submitFlowLatch = new CountDownLatch(1);
+    doAnswer(invocation -> {
+      this.submitFlowLatch.countDown();
+      return null;
+    }).when(this.executorManager).submitExecutableFlow(any(), anyString());
+
     this.processor = new TriggerInstanceProcessor(this.executorManager, this.triggerInstLoader,
         this.emailer);
   }
 
   @Test
-  public void testProcessSucceed() throws ExecutorManagerException, ParseException {
+  public void testProcessSucceed() throws Exception {
     final TriggerInstance triggerInstance = createTriggerInstance();
     this.processor.processSucceed(triggerInstance);
+    this.submitFlowLatch.await(10L, TimeUnit.SECONDS);
     verify(this.executorManager).submitExecutableFlow(any(), anyString());
     verify(this.triggerInstLoader).updateAssociatedFlowExecId(triggerInstance);
   }