diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 1f750fc..6d00500 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -19,7 +19,6 @@ package azkaban.trigger.builtin;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.flow.FlowUtils;
import azkaban.project.Project;
@@ -224,13 +223,9 @@ public class ExecuteFlowAction implements TriggerAction {
exflow.setSlaOptions(this.slaOptions);
}
- try {
- logger.info("Invoking flow " + project.getName() + "." + this.flowName);
- executorManager.submitExecutableFlow(exflow, this.submitUser);
- logger.info("Invoked flow " + project.getName() + "." + this.flowName);
- } catch (final ExecutorManagerException e) {
- throw new RuntimeException(e);
- }
+ logger.info("Invoking flow " + project.getName() + "." + this.flowName);
+ executorManager.submitExecutableFlow(exflow, this.submitUser);
+ logger.info("Invoked flow " + project.getName() + "." + this.flowName);
}
@Override
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index ec70a5f..69438b8 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -20,6 +20,7 @@ import static java.util.Objects.requireNonNull;
import azkaban.event.EventHandler;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
import azkaban.utils.Props;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -361,10 +362,16 @@ public class TriggerManager extends EventHandler implements
try {
logger.info("Doing trigger actions " + action.getDescription() + " for " + t);
action.doAction();
- } catch (final Exception e) {
- logger.error("Failed to do action " + action.getDescription() + " for " + t, e);
+ } catch (final ExecutorManagerException e) {
+ if (e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
+ logger.info("Skipped action [" + action.getDescription() + "] for [" + t +
+ "] because: " + e.getMessage());
+ } else {
+ logger.error("Failed to do action [" + action.getDescription() + "] for [" + t + "]",
+ e);
+ }
} catch (final Throwable th) {
- logger.error("Failed to do action " + action.getDescription() + " for " + t, th);
+ logger.error("Failed to do action [" + action.getDescription() + "] for [" + t + "]", th);
}
}
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
index de874f6..2f98175 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
@@ -17,15 +17,25 @@
package azkaban.trigger;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.anyObject;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.Props;
import azkaban.utils.Utils;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,17 +52,28 @@ public class TriggerManagerTest {
private static TriggerLoader triggerLoader;
private static ExecutorManager executorManager;
+ private static ProjectManager projectManager;
private TriggerManager triggerManager;
@BeforeClass
public static void prepare() {
triggerLoader = new MockTriggerLoader();
executorManager = mock(ExecutorManager.class);
+ projectManager = mock(ProjectManager.class);
doNothing().when(executorManager).addListener(anyObject());
}
@Before
- public void setup() throws TriggerException, TriggerManagerException {
+ public void setup() throws Exception {
+ final Project project = new Project(1, "test-project");
+ project.setFlows(ImmutableMap.of("test-flow", new Flow("test-flow")));
+ when(projectManager.getProject(1)).thenReturn(project);
+ when(executorManager.submitExecutableFlow(any(), any()))
+ .thenThrow(new ExecutorManagerException("Flow is already running. Skipping execution.",
+ ExecutorManagerException.Reason.SkippedExecution));
+ ExecuteFlowAction.setExecutorManager(this.executorManager);
+ ExecuteFlowAction.setProjectManager(this.projectManager);
+ ExecuteFlowAction.setTriggerManager(this.triggerManager);
final Props props = new Props();
props.put("trigger.scan.interval", 300);
this.triggerManager = new TriggerManager(props, triggerLoader, executorManager);
@@ -193,7 +214,8 @@ public class TriggerManagerTest {
private List<TriggerAction> getTriggerActions() {
final List<TriggerAction> actions = new ArrayList<>();
- final TriggerAction act = new DummyTriggerAction("");
+ final TriggerAction act = new ExecuteFlowAction("fuu", 1, "test-project", "test-flow",
+ "test-user", new ExecutionOptions(), Collections.emptyList());
actions.add(act);
return actions;
}