diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
index 4673e98..1338c64 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
@@ -145,7 +145,7 @@ public class TriggerInstanceProcessor {
*/
public void processSucceed(final TriggerInstance triggerInst) {
//todo chengren311: publishing Trigger events to Azkaban Project Events page
- executeFlowAndUpdateExecID(triggerInst);
+ this.executorService.submit(() -> executeFlowAndUpdateExecID(triggerInst));
}
/**
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
index 39f56e8..e8c1e4d 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.metrics.CommonMetrics;
@@ -65,6 +64,7 @@ public class TriggerInstanceProcessorTest {
private EmailMessageCreator messageCreator;
private TriggerInstanceProcessor processor;
private CountDownLatch sendEmailLatch;
+ private CountDownLatch submitFlowLatch;
private static TriggerInstance createTriggerInstance() throws ParseException {
final FlowTrigger flowTrigger = new FlowTrigger(
@@ -112,14 +112,22 @@ public class TriggerInstanceProcessorTest {
this.sendEmailLatch.countDown();
return null;
}).when(this.emailer).sendEmail(any(), any(), any());
+
+ this.submitFlowLatch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ this.submitFlowLatch.countDown();
+ return null;
+ }).when(this.executorManager).submitExecutableFlow(any(), anyString());
+
this.processor = new TriggerInstanceProcessor(this.executorManager, this.triggerInstLoader,
this.emailer);
}
@Test
- public void testProcessSucceed() throws ExecutorManagerException, ParseException {
+ public void testProcessSucceed() throws Exception {
final TriggerInstance triggerInstance = createTriggerInstance();
this.processor.processSucceed(triggerInstance);
+ this.submitFlowLatch.await(10L, TimeUnit.SECONDS);
verify(this.executorManager).submitExecutableFlow(any(), anyString());
verify(this.triggerInstLoader).updateAssociatedFlowExecId(triggerInstance);
}