diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
index f901e34..88e5daa 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/FlowTriggerService.java
@@ -29,7 +29,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -63,15 +62,16 @@ import org.slf4j.LoggerFactory;
*
* FlowTriggerService will be leveraged by Quartz scheduler, our new AZ scheduler to schedule
* triggers.
+ *
+ * After construction, call {@link #start()} to start the service.
*/
@SuppressWarnings("FutureReturnValueIgnored")
@Singleton
public class FlowTriggerService {
- private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
- //private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(5);
private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
+ private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 20;
private static final String START_TIME = "starttime";
private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
private final ExecutorService executorService;
@@ -117,9 +117,6 @@ public class FlowTriggerService {
private TriggerInstance createTriggerInstance(final FlowTrigger flowTrigger, final String flowId,
final int flowVersion, final String submitUser, final Project project) {
final String triggerInstId = generateId();
- logger.info(
- String.format("Starting the flow trigger %s[execId %s] by %s", flowTrigger, triggerInstId,
- submitUser));
final long startTime = System.currentTimeMillis();
// create a list of dependency instances
final List<DependencyInstance> depInstList = new ArrayList<>();
@@ -157,9 +154,8 @@ public class FlowTriggerService {
private void scheduleKill(final TriggerInstance triggerInst, final Duration duration, final
CancellationCause cause) {
logger
- .info(String.format("Cancel trigger instance %s in %s secs", triggerInst.getId(), duration
- .getSeconds
- ()));
+ .debug(String.format("Cancel trigger instance %s in %s secs", triggerInst.getId(), duration
+ .getSeconds()));
this.timeoutService.schedule(() -> {
cancel(triggerInst, cause);
}, duration.toMillis(), TimeUnit.MILLISECONDS);
@@ -255,7 +251,7 @@ public class FlowTriggerService {
if (triggerInstance.getFlowTrigger() != null) {
recover(triggerInstance);
} else {
- logger.info(String.format("cannot recover the trigger instance %s, flow trigger is null ",
+ logger.error(String.format("cannot recover the trigger instance %s, flow trigger is null ",
triggerInstance.getId()));
}
}
@@ -344,25 +340,27 @@ public class FlowTriggerService {
final TriggerInstance triggerInst = createTriggerInstance(flowTrigger, flowId, flowVersion,
submitUser, project);
- logger.info(String.format("Starting trigger instance[id: %s]", triggerInst.getId()));
+ logger.info(
+ String.format("Starting the flow trigger %s[trigger instance id: %s] by %s", flowTrigger,
+ triggerInst.getId(), submitUser));
this.triggerProcessor.processNewInstance(triggerInst);
if (triggerInst.getStatus() == Status.CANCELLED) {
// all dependency instances failed
logger.info(String.format("Trigger instance[id: %s] is cancelled since all dependency "
- + "instances failed to be created",
- triggerInst.getId()));
+ + "instances fail to be created", triggerInst.getId()));
this.triggerProcessor.processTermination(triggerInst);
} else if (triggerInst.getStatus() == Status.CANCELLING) {
// some of the dependency instances failed
logger.info(
- String.format("Trigger instance[id: %s] is being cancelled ", triggerInst.getId()));
+ String.format("Trigger instance[id: %s] is being cancelled since some dependency "
+ + "instances fail to be created", triggerInst.getId()));
addToRunningListAndCancel(triggerInst);
+ } else if (triggerInst.getStatus() == Status.SUCCEEDED) {
+ this.triggerProcessor.processSucceed(triggerInst);
} else {
// todo chengren311: it's possible web server restarts before the db update, then
// new instance will not be recoverable from db.
- logger.info(
- String.format("Trigger instance[id: %s] is successfully created", triggerInst.getId()));
addToRunningListAndScheduleKill(triggerInst, triggerInst.getFlowTrigger()
.getMaxWaitDuration(), CancellationCause.TIMEOUT);
}
@@ -390,14 +388,6 @@ public class FlowTriggerService {
}
}
- private void removeRunningTriggerInstById(final String triggerInstId) {
- for (final Iterator<TriggerInstance> it = this.runningTriggers.iterator(); it.hasNext(); ) {
- if (triggerInstId.equals(it.next().getId())) {
- it.remove();
- }
- }
- }
-
/**
* Cancel a trigger instance
*
@@ -442,20 +432,19 @@ public class FlowTriggerService {
this.executorService.submit(() -> {
final DependencyInstance depInst = findDependencyInstanceByContext(context);
if (depInst != null) {
- logger.info(
- String.format("setting dependency instance[id: %s, name: %s] status to success",
- depInst.getTriggerInstance().getId(), depInst.getDepName()));
-
if (Status.isDone(depInst.getStatus())) {
logger.warn(String.format("OnSuccess of dependency instance[id: %s, name: %s] is ignored",
depInst.getTriggerInstance().getId(), depInst.getDepName()));
return;
}
+ logger.info(
+ String.format("setting dependency instance[id: %s, name: %s] status to succeeded",
+ depInst.getTriggerInstance().getId(), depInst.getDepName()));
processStatusUpdate(depInst, Status.SUCCEEDED);
// if associated trigger instance becomes success, then remove it from running list
if (depInst.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
- logger.info(String.format("trigger instance with execId %s succeeded",
+ logger.info(String.format("trigger instance[id: %s] succeeded",
depInst.getTriggerInstance().getId()));
this.triggerProcessor.processSucceed(depInst.getTriggerInstance());
this.runningTriggers.remove(depInst.getTriggerInstance());
@@ -506,7 +495,7 @@ public class FlowTriggerService {
String.format("trigger instance with execId %s is cancelled",
depInst.getTriggerInstance().getId()));
this.triggerProcessor.processTermination(depInst.getTriggerInstance());
- removeRunningTriggerInstById(depInst.getTriggerInstance().getId());
+ this.runningTriggers.remove(depInst.getTriggerInstance());
}
} else {
logger.warn(String.format("unable to find trigger instance with context %s when marking "
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
index 99ff8f6..0f172ed 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerServiceTest.java
@@ -20,9 +20,13 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
@@ -51,6 +55,7 @@ public class FlowTriggerServiceTest {
MockFlowTriggerInstanceLoader();
private static TestDependencyCheck testDepCheck;
private static FlowTriggerService flowTriggerService;
+ private static ExecutorManager executorManager;
@BeforeClass
public static void setup() throws Exception {
@@ -60,7 +65,7 @@ public class FlowTriggerServiceTest {
when(pluginManager.getDependencyCheck(ArgumentMatchers.eq("TestDependencyCheck")))
.thenReturn(testDepCheck);
- final ExecutorManager executorManager = mock(ExecutorManager.class);
+ executorManager = mock(ExecutorManager.class);
when(executorManager.submitExecutableFlow(any(), anyString())).thenReturn("return");
final Emailer emailer = mock(Emailer.class);
@@ -80,6 +85,7 @@ public class FlowTriggerServiceTest {
@Before
public void cleanup() {
((MockFlowTriggerInstanceLoader) flowTriggerInstanceLoader).clear();
+ reset(executorManager);
}
private Project createProject() {
@@ -104,6 +110,7 @@ public class FlowTriggerServiceTest {
flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
}
Thread.sleep(Duration.ofSeconds(6).toMillis());
+ assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
assertThat(triggerInstances).hasSize(30);
for (final TriggerInstance inst : triggerInstances) {
@@ -138,6 +145,7 @@ public class FlowTriggerServiceTest {
flowTriggerService.cancel(runningTrigger, CancellationCause.MANUAL);
}
Thread.sleep(Duration.ofMillis(500).toMillis());
+ assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
assertThat(triggerInstances).hasSize(30);
for (final TriggerInstance inst : triggerInstances) {
@@ -160,6 +168,7 @@ public class FlowTriggerServiceTest {
flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
}
Thread.sleep(Duration.ofSeconds(1).toMillis());
+ assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
assertThat(triggerInstances).hasSize(30);
for (final TriggerInstance inst : triggerInstances) {
@@ -187,6 +196,7 @@ public class FlowTriggerServiceTest {
flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
}
Thread.sleep(Duration.ofSeconds(5).toMillis());
+ assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
assertThat(triggerInstances).hasSize(30);
for (final TriggerInstance inst : triggerInstances) {
@@ -195,6 +205,21 @@ public class FlowTriggerServiceTest {
}
@Test
+ public void testStartZeroDependencyTrigger()
+ throws InterruptedException, ExecutorManagerException {
+ final List<FlowTriggerDependency> deps = new ArrayList<>();
+ final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
+ for (int i = 0; i < 30; i++) {
+ flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
+ }
+ Thread.sleep(Duration.ofSeconds(1).toMillis());
+ // zero dependency trigger will launch associated flow immediately
+ final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRunningTriggers();
+ assertThat(triggerInstances).isEmpty();
+ verify(executorManager, times(30)).submitExecutableFlow(any(), anyString());
+ }
+
+ @Test
public void testRecovery() throws Exception {
final List<FlowTriggerDependency> deps = new ArrayList<>();
deps.add(TestUtil.createTestDependency("2secs", 2, false));
@@ -207,8 +232,8 @@ public class FlowTriggerServiceTest {
Thread.sleep(Duration.ofSeconds(1).toMillis());
flowTriggerService.shutdown();
setup();
- flowTriggerService.start();
Thread.sleep(Duration.ofSeconds(5).toMillis());
+ assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
assertThat(triggerInstances).hasSize(30);
for (final TriggerInstance inst : triggerInstances) {