azkaban-aplcache

Don't log an ERROR for skipped scheduled executions (#2000) If

11/2/2018 6:03:38 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 1f750fc..6d00500 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -19,7 +19,6 @@ package azkaban.trigger.builtin;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowUtils;
 import azkaban.project.Project;
@@ -224,13 +223,9 @@ public class ExecuteFlowAction implements TriggerAction {
       exflow.setSlaOptions(this.slaOptions);
     }
 
-    try {
-      logger.info("Invoking flow " + project.getName() + "." + this.flowName);
-      executorManager.submitExecutableFlow(exflow, this.submitUser);
-      logger.info("Invoked flow " + project.getName() + "." + this.flowName);
-    } catch (final ExecutorManagerException e) {
-      throw new RuntimeException(e);
-    }
+    logger.info("Invoking flow " + project.getName() + "." + this.flowName);
+    executorManager.submitExecutableFlow(exflow, this.submitUser);
+    logger.info("Invoked flow " + project.getName() + "." + this.flowName);
   }
 
   @Override
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index ec70a5f..69438b8 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -20,6 +20,7 @@ import static java.util.Objects.requireNonNull;
 
 import azkaban.event.EventHandler;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.utils.Props;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -361,10 +362,16 @@ public class TriggerManager extends EventHandler implements
         try {
           logger.info("Doing trigger actions " + action.getDescription() + " for " + t);
           action.doAction();
-        } catch (final Exception e) {
-          logger.error("Failed to do action " + action.getDescription() + " for " + t, e);
+        } catch (final ExecutorManagerException e) {
+          if (e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
+            logger.info("Skipped action [" + action.getDescription() + "] for [" + t +
+                "] because: " + e.getMessage());
+          } else {
+            logger.error("Failed to do action [" + action.getDescription() + "] for [" + t + "]",
+                e);
+          }
         } catch (final Throwable th) {
-          logger.error("Failed to do action " + action.getDescription() + " for " + t, th);
+          logger.error("Failed to do action [" + action.getDescription() + "] for [" + t + "]", th);
         }
       }
 
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
index de874f6..2f98175 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
@@ -17,15 +17,25 @@
 package azkaban.trigger;
 
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.anyObject;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
 import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,17 +52,28 @@ public class TriggerManagerTest {
 
   private static TriggerLoader triggerLoader;
   private static ExecutorManager executorManager;
+  private static ProjectManager projectManager;
   private TriggerManager triggerManager;
 
   @BeforeClass
   public static void prepare() {
     triggerLoader = new MockTriggerLoader();
     executorManager = mock(ExecutorManager.class);
+    projectManager = mock(ProjectManager.class);
     doNothing().when(executorManager).addListener(anyObject());
   }
 
   @Before
-  public void setup() throws TriggerException, TriggerManagerException {
+  public void setup() throws Exception {
+    final Project project = new Project(1, "test-project");
+    project.setFlows(ImmutableMap.of("test-flow", new Flow("test-flow")));
+    when(projectManager.getProject(1)).thenReturn(project);
+    when(executorManager.submitExecutableFlow(any(), any()))
+        .thenThrow(new ExecutorManagerException("Flow is already running. Skipping execution.",
+            ExecutorManagerException.Reason.SkippedExecution));
+    ExecuteFlowAction.setExecutorManager(this.executorManager);
+    ExecuteFlowAction.setProjectManager(this.projectManager);
+    ExecuteFlowAction.setTriggerManager(this.triggerManager);
     final Props props = new Props();
     props.put("trigger.scan.interval", 300);
     this.triggerManager = new TriggerManager(props, triggerLoader, executorManager);
@@ -193,7 +214,8 @@ public class TriggerManagerTest {
 
   private List<TriggerAction> getTriggerActions() {
     final List<TriggerAction> actions = new ArrayList<>();
-    final TriggerAction act = new DummyTriggerAction("");
+    final TriggerAction act = new ExecuteFlowAction("fuu", 1, "test-project", "test-flow",
+        "test-user", new ExecutionOptions(), Collections.emptyList());
     actions.add(act);
     return actions;
   }