azkaban-aplcache

AZNewDispatchingLogic - Enable alertOnFirstError (#2105) *

1/24/2019 4:14:04 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionControllerUtils.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionControllerUtils.java
index 5246171..8ae84ad 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionControllerUtils.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionControllerUtils.java
@@ -74,7 +74,7 @@ public class ExecutionControllerUtils {
     }
 
     if (alertUser) {
-      alertUser(flow, alerterHolder, getFinalizeFlowReasons(reason, originalError));
+      alertUserOnFlowFinished(flow, alerterHolder, getFinalizeFlowReasons(reason, originalError));
     }
   }
 
@@ -85,8 +85,8 @@ public class ExecutionControllerUtils {
    * @param alerterHolder the alerter holder
    * @param extraReasons the extra reasons for alerting
    */
-  public static void alertUser(final ExecutableFlow flow, final AlerterHolder alerterHolder,
-      final String[] extraReasons) {
+  public static void alertUserOnFlowFinished(final ExecutableFlow flow, final AlerterHolder
+      alerterHolder, final String[] extraReasons) {
     final ExecutionOptions options = flow.getExecutionOptions();
     final Alerter mailAlerter = alerterHolder.get("email");
     if (flow.getStatus() != Status.SUCCEEDED) {
@@ -137,6 +137,40 @@ public class ExecutionControllerUtils {
   }
 
   /**
+   * Alert the user when the flow has encountered the first error.
+   *
+   * @param flow the execution
+   * @param alerterHolder the alerter holder
+   */
+  public static void alertUserOnFirstError(final ExecutableFlow flow,
+      final AlerterHolder alerterHolder) {
+    final ExecutionOptions options = flow.getExecutionOptions();
+    if (options.getNotifyOnFirstFailure()) {
+      logger.info("Alert on first error of execution " + flow.getExecutionId());
+      final Alerter mailAlerter = alerterHolder.get("email");
+      try {
+        mailAlerter.alertOnFirstError(flow);
+      } catch (final Exception e) {
+        logger.error("Failed to send first error email." + e.getMessage(), e);
+      }
+
+      if (options.getFlowParameters().containsKey("alert.type")) {
+        final String alertType = options.getFlowParameters().get("alert.type");
+        final Alerter alerter = alerterHolder.get(alertType);
+        if (alerter != null) {
+          try {
+            alerter.alertOnFirstError(flow);
+          } catch (final Exception e) {
+            logger.error("Failed to alert by " + alertType, e);
+          }
+        } else {
+          logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+        }
+      }
+    }
+  }
+
+  /**
    * Get the reasons to finalize the flow.
    *
    * @param reason the reason
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
index b986417..5a24808 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
@@ -94,7 +94,7 @@ public class ExecutionFinalizer {
 
     this.updaterStage.set("finalizing flow " + execId + " alerting and emailing");
     if (alertUser) {
-      ExecutionControllerUtils.alertUser(flow, this.alerterHolder,
+      ExecutionControllerUtils.alertUserOnFlowFinished(flow, this.alerterHolder,
           ExecutionControllerUtils.getFinalizeFlowReasons(reason,
               originalError));
     }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java
index ac774e0..84861f2 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java
@@ -176,5 +176,4 @@ public class ExecutorHealthChecker {
       this.alerterHolder.get("email").alertOnFailedUpdate(executor, entry.getValue(), e);
     }
   }
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
index 1a38f32..1ac8d39 100644
--- a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
@@ -245,31 +245,8 @@ public class RunningExecutionsUpdater {
       this.commonMetrics.markFlowFail();
     }
 
-    final ExecutionOptions options = flow.getExecutionOptions();
     if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
-      // We want to see if we should give an email status on first failure.
-      if (options.getNotifyOnFirstFailure()) {
-        final Alerter mailAlerter = this.alerterHolder.get("email");
-        try {
-          mailAlerter.alertOnFirstError(flow);
-        } catch (final Exception e) {
-          logger.error("Failed to send first error email." + e.getMessage(), e);
-        }
-      }
-      if (options.getFlowParameters().containsKey("alert.type")) {
-        final String alertType = options.getFlowParameters().get("alert.type");
-        final Alerter alerter = this.alerterHolder.get(alertType);
-        if (alerter != null) {
-          try {
-            alerter.alertOnFirstError(flow);
-          } catch (final Exception e) {
-            logger.error("Failed to alert by " + alertType, e);
-          }
-        } else {
-          logger.error("Alerter type " + alertType
-              + " doesn't exist. Failed to alert.");
-        }
-      }
+      ExecutionControllerUtils.alertUserOnFirstError(flow, this.alerterHolder);
     }
 
     return flow;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 0e7c64c..b7d091b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -60,6 +60,7 @@ import azkaban.spi.AzkabanEventReporter;
 import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import azkaban.utils.SwapQueue;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.io.Files;
@@ -221,6 +222,11 @@ public class FlowRunner extends EventHandler implements Runnable {
     return this.execDir;
   }
 
+  @VisibleForTesting
+  AlerterHolder getAlerterHolder() {
+    return this.alerterHolder;
+  }
+
   @Override
   public void run() {
     try {
@@ -267,8 +273,9 @@ public class FlowRunner extends EventHandler implements Runnable {
             Event.create(this, EventType.FLOW_FINISHED, new EventData(this.flow)));
         // In polling model, executor will be responsible for sending alerting emails when a flow
         // finishes.
+        // Todo jamiesjc: switch to event driven model and alert on FLOW_FINISHED event.
         if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
-          ExecutionControllerUtils.alertUser(this.flow, this.alerterHolder,
+          ExecutionControllerUtils.alertUserOnFlowFinished(this.flow, this.alerterHolder,
               ExecutionControllerUtils.getFinalizeFlowReasons("Flow finished", null));
         }
       }
@@ -547,7 +554,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     if (shouldFail) {
-      propagateStatus(node.getParentFlow(),
+      propagateStatusAndAlert(node.getParentFlow(),
           node.getStatus() == Status.KILLED ? Status.KILLED : Status.FAILED_FINISHING);
       if (this.failureAction == FailureAction.CANCEL_ALL) {
         this.kill();
@@ -623,12 +630,29 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
   }
 
-  private void propagateStatus(final ExecutableFlowBase base, final Status status) {
+  /**
+   * Recursively propagate status to parent flow. Alert on first error of the flow in new AZ
+   * dispatching design.
+   *
+   * @param base the base flow
+   * @param status the status to be propagated
+   */
+  private void propagateStatusAndAlert(final ExecutableFlowBase base, final Status status) {
     if (!Status.isStatusFinished(base.getStatus()) && base.getStatus() != Status.KILLING) {
       this.logger.info("Setting " + base.getNestedId() + " to " + status);
-      base.setStatus(status);
+      boolean shouldAlert = false;
+      if (base.getStatus() != status) {
+        base.setStatus(status);
+        shouldAlert = true;
+      }
       if (base.getParentFlow() != null) {
-        propagateStatus(base.getParentFlow(), status);
+        propagateStatusAndAlert(base.getParentFlow(), status);
+      } else if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
+        // Alert on the root flow if the first error is encountered.
+        // Todo jamiesjc: Add a new FLOW_STATUS_CHANGED event type and alert on that event.
+        if (shouldAlert && base.getStatus() == Status.FAILED_FINISHING) {
+          ExecutionControllerUtils.alertUserOnFirstError((ExecutableFlow) base, this.alerterHolder);
+        }
       }
     }
   }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerYamlTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerYamlTest.java
index c12b1cb..0f14e12 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerYamlTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerYamlTest.java
@@ -17,14 +17,22 @@ package azkaban.execapp;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.alert.Alerter;
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
 import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.Status;
 import azkaban.project.Project;
 import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
 import java.io.File;
+import java.util.Arrays;
 import java.util.HashMap;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -34,12 +42,15 @@ public class FlowRunnerYamlTest extends FlowRunnerTestBase {
   private static final String BASIC_FLOW_YAML_DIR = "basicflowwithoutendnode";
   private static final String FAIL_BASIC_FLOW_YAML_DIR = "failbasicflowwithoutendnode";
   private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowwithoutendnode";
+  private static final String ALERT_FLOW_YAML_DIR = "alertflow";
   private static final String BASIC_FLOW_NAME = "basic_flow";
   private static final String BASIC_FLOW_YAML_FILE = BASIC_FLOW_NAME + ".flow";
   private static final String FAIL_BASIC_FLOW_NAME = "fail_basic_flow";
   private static final String FAIL_BASIC_FLOW_YAML_FILE = FAIL_BASIC_FLOW_NAME + ".flow";
   private static final String EMBEDDED_FLOW_NAME = "embedded_flow";
   private static final String EMBEDDED_FLOW_YAML_FILE = EMBEDDED_FLOW_NAME + ".flow";
+  private static final String ALERT_FLOW_NAME = "alert_flow";
+  private static final String ALERT_FLOW_YAML_FILE = ALERT_FLOW_NAME + ".flow";
   private FlowRunnerTestUtil testUtil;
 
   @Test
@@ -111,6 +122,48 @@ public class FlowRunnerYamlTest extends FlowRunnerTestBase {
     assertFlowStatus(flow, Status.SUCCEEDED);
   }
 
+  @Test
+  public void testAlertOnFlowFinished() throws Exception {
+    setUp(ALERT_FLOW_YAML_DIR, ALERT_FLOW_YAML_FILE);
+    final Alerter mailAlerter = mock(Alerter.class);
+    final ExecutionOptions executionOptions = new ExecutionOptions();
+    executionOptions.setFailureEmails(Arrays.asList("test@example.com"));
+    final Props azkabanProps = new Props();
+    azkabanProps.put(ConfigurationKeys.AZKABAN_POLL_MODEL, "true");
+    this.runner = this.testUtil
+        .createFromFlowMap(ALERT_FLOW_NAME, executionOptions, new HashMap<>(), azkabanProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    when(this.runner.getAlerterHolder().get("email")).thenReturn(mailAlerter);
+    FlowRunnerTestUtil.startThread(this.runner);
+    InteractiveTestJob.getTestJob("jobA").failJob();
+    InteractiveTestJob.getTestJob("jobB").failJob();
+    InteractiveTestJob.getTestJob("jobC").succeedJob();
+    assertFlowStatus(flow, Status.FAILED);
+    verify(mailAlerter).alertOnError(flow, "Flow finished");
+  }
+
+  @Test
+  public void testAlertOnFirstError() throws Exception {
+    setUp(ALERT_FLOW_YAML_DIR, ALERT_FLOW_YAML_FILE);
+    final Alerter mailAlerter = mock(Alerter.class);
+    final ExecutionOptions executionOptions = new ExecutionOptions();
+    executionOptions.setNotifyOnFirstFailure(true);
+    final Props azkabanProps = new Props();
+    azkabanProps.put(ConfigurationKeys.AZKABAN_POLL_MODEL, "true");
+    this.runner = this.testUtil
+        .createFromFlowMap(ALERT_FLOW_NAME, executionOptions, new HashMap<>(), azkabanProps);
+    final ExecutableFlow flow = this.runner.getExecutableFlow();
+    when(this.runner.getAlerterHolder().get("email")).thenReturn(mailAlerter);
+    FlowRunnerTestUtil.startThread(this.runner);
+    InteractiveTestJob.getTestJob("jobA").failJob();
+    assertFlowStatus(flow, Status.FAILED_FINISHING);
+    InteractiveTestJob.getTestJob("jobB").failJob();
+    assertFlowStatus(flow, Status.FAILED_FINISHING);
+    InteractiveTestJob.getTestJob("jobC").succeedJob();
+    assertFlowStatus(flow, Status.FAILED);
+    verify(mailAlerter, times(1)).alertOnFirstError(flow);
+  }
+
   private void setUp(final String projectDir, final String flowYamlFile) throws Exception {
     this.testUtil = new FlowRunnerTestUtil(projectDir, this.temporaryFolder);
     final Project project = this.testUtil.getProject();
diff --git a/test/execution-test-data/alertflow/alert_flow.flow b/test/execution-test-data/alertflow/alert_flow.flow
new file mode 100644
index 0000000..f5dbd51
--- /dev/null
+++ b/test/execution-test-data/alertflow/alert_flow.flow
@@ -0,0 +1,20 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: jobD
+    type: test
+    dependsOn:
+      - jobA
+      - jobB
+      - jobC
+
+  - name: jobA
+    type: test
+
+  - name: jobB
+    type: test
+
+  - name: jobC
+    type: test
diff --git a/test/execution-test-data/alertflow/alert_flow.project b/test/execution-test-data/alertflow/alert_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/alertflow/alert_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0