Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionControllerUtils.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionControllerUtils.java
index 5246171..8ae84ad 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionControllerUtils.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionControllerUtils.java
@@ -74,7 +74,7 @@ public class ExecutionControllerUtils {
}
if (alertUser) {
- alertUser(flow, alerterHolder, getFinalizeFlowReasons(reason, originalError));
+ alertUserOnFlowFinished(flow, alerterHolder, getFinalizeFlowReasons(reason, originalError));
}
}
@@ -85,8 +85,8 @@ public class ExecutionControllerUtils {
* @param alerterHolder the alerter holder
* @param extraReasons the extra reasons for alerting
*/
- public static void alertUser(final ExecutableFlow flow, final AlerterHolder alerterHolder,
- final String[] extraReasons) {
+ public static void alertUserOnFlowFinished(final ExecutableFlow flow, final AlerterHolder
+ alerterHolder, final String[] extraReasons) {
final ExecutionOptions options = flow.getExecutionOptions();
final Alerter mailAlerter = alerterHolder.get("email");
if (flow.getStatus() != Status.SUCCEEDED) {
@@ -137,6 +137,40 @@ public class ExecutionControllerUtils {
}
/**
+ * Alert the user when the flow has encountered the first error.
+ *
+ * @param flow the execution
+ * @param alerterHolder the alerter holder
+ */
+ public static void alertUserOnFirstError(final ExecutableFlow flow,
+ final AlerterHolder alerterHolder) {
+ final ExecutionOptions options = flow.getExecutionOptions();
+ if (options.getNotifyOnFirstFailure()) {
+ logger.info("Alert on first error of execution " + flow.getExecutionId());
+ final Alerter mailAlerter = alerterHolder.get("email");
+ try {
+ mailAlerter.alertOnFirstError(flow);
+ } catch (final Exception e) {
+ logger.error("Failed to send first error email." + e.getMessage(), e);
+ }
+
+ if (options.getFlowParameters().containsKey("alert.type")) {
+ final String alertType = options.getFlowParameters().get("alert.type");
+ final Alerter alerter = alerterHolder.get(alertType);
+ if (alerter != null) {
+ try {
+ alerter.alertOnFirstError(flow);
+ } catch (final Exception e) {
+ logger.error("Failed to alert by " + alertType, e);
+ }
+ } else {
+ logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+ }
+ }
+ }
+ }
+
+ /**
* Get the reasons to finalize the flow.
*
* @param reason the reason
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
index b986417..5a24808 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFinalizer.java
@@ -94,7 +94,7 @@ public class ExecutionFinalizer {
this.updaterStage.set("finalizing flow " + execId + " alerting and emailing");
if (alertUser) {
- ExecutionControllerUtils.alertUser(flow, this.alerterHolder,
+ ExecutionControllerUtils.alertUserOnFlowFinished(flow, this.alerterHolder,
ExecutionControllerUtils.getFinalizeFlowReasons(reason,
originalError));
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java
index ac774e0..84861f2 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorHealthChecker.java
@@ -176,5 +176,4 @@ public class ExecutorHealthChecker {
this.alerterHolder.get("email").alertOnFailedUpdate(executor, entry.getValue(), e);
}
}
-
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
index 1a38f32..1ac8d39 100644
--- a/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
+++ b/azkaban-common/src/main/java/azkaban/executor/RunningExecutionsUpdater.java
@@ -245,31 +245,8 @@ public class RunningExecutionsUpdater {
this.commonMetrics.markFlowFail();
}
- final ExecutionOptions options = flow.getExecutionOptions();
if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
- // We want to see if we should give an email status on first failure.
- if (options.getNotifyOnFirstFailure()) {
- final Alerter mailAlerter = this.alerterHolder.get("email");
- try {
- mailAlerter.alertOnFirstError(flow);
- } catch (final Exception e) {
- logger.error("Failed to send first error email." + e.getMessage(), e);
- }
- }
- if (options.getFlowParameters().containsKey("alert.type")) {
- final String alertType = options.getFlowParameters().get("alert.type");
- final Alerter alerter = this.alerterHolder.get(alertType);
- if (alerter != null) {
- try {
- alerter.alertOnFirstError(flow);
- } catch (final Exception e) {
- logger.error("Failed to alert by " + alertType, e);
- }
- } else {
- logger.error("Alerter type " + alertType
- + " doesn't exist. Failed to alert.");
- }
- }
+ ExecutionControllerUtils.alertUserOnFirstError(flow, this.alerterHolder);
}
return flow;
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 0e7c64c..b7d091b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -60,6 +60,7 @@ import azkaban.spi.AzkabanEventReporter;
import azkaban.spi.EventType;
import azkaban.utils.Props;
import azkaban.utils.SwapQueue;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Files;
@@ -221,6 +222,11 @@ public class FlowRunner extends EventHandler implements Runnable {
return this.execDir;
}
+ @VisibleForTesting
+ AlerterHolder getAlerterHolder() {
+ return this.alerterHolder;
+ }
+
@Override
public void run() {
try {
@@ -267,8 +273,9 @@ public class FlowRunner extends EventHandler implements Runnable {
Event.create(this, EventType.FLOW_FINISHED, new EventData(this.flow)));
// In polling model, executor will be responsible for sending alerting emails when a flow
// finishes.
+ // Todo jamiesjc: switch to event driven model and alert on FLOW_FINISHED event.
if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
- ExecutionControllerUtils.alertUser(this.flow, this.alerterHolder,
+ ExecutionControllerUtils.alertUserOnFlowFinished(this.flow, this.alerterHolder,
ExecutionControllerUtils.getFinalizeFlowReasons("Flow finished", null));
}
}
@@ -547,7 +554,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
if (shouldFail) {
- propagateStatus(node.getParentFlow(),
+ propagateStatusAndAlert(node.getParentFlow(),
node.getStatus() == Status.KILLED ? Status.KILLED : Status.FAILED_FINISHING);
if (this.failureAction == FailureAction.CANCEL_ALL) {
this.kill();
@@ -623,12 +630,29 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- private void propagateStatus(final ExecutableFlowBase base, final Status status) {
+ /**
+ * Recursively propagate status to parent flow. Alert on first error of the flow in new AZ
+ * dispatching design.
+ *
+ * @param base the base flow
+ * @param status the status to be propagated
+ */
+ private void propagateStatusAndAlert(final ExecutableFlowBase base, final Status status) {
if (!Status.isStatusFinished(base.getStatus()) && base.getStatus() != Status.KILLING) {
this.logger.info("Setting " + base.getNestedId() + " to " + status);
- base.setStatus(status);
+ boolean shouldAlert = false;
+ if (base.getStatus() != status) {
+ base.setStatus(status);
+ shouldAlert = true;
+ }
if (base.getParentFlow() != null) {
- propagateStatus(base.getParentFlow(), status);
+ propagateStatusAndAlert(base.getParentFlow(), status);
+ } else if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
+ // Alert on the root flow if the first error is encountered.
+ // Todo jamiesjc: Add a new FLOW_STATUS_CHANGED event type and alert on that event.
+ if (shouldAlert && base.getStatus() == Status.FAILED_FINISHING) {
+ ExecutionControllerUtils.alertUserOnFirstError((ExecutableFlow) base, this.alerterHolder);
+ }
}
}
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerYamlTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerYamlTest.java
index c12b1cb..0f14e12 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerYamlTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerYamlTest.java
@@ -17,14 +17,22 @@ package azkaban.execapp;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.alert.Alerter;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.Status;
import azkaban.project.Project;
import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
import java.io.File;
+import java.util.Arrays;
import java.util.HashMap;
import org.junit.Ignore;
import org.junit.Test;
@@ -34,12 +42,15 @@ public class FlowRunnerYamlTest extends FlowRunnerTestBase {
private static final String BASIC_FLOW_YAML_DIR = "basicflowwithoutendnode";
private static final String FAIL_BASIC_FLOW_YAML_DIR = "failbasicflowwithoutendnode";
private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowwithoutendnode";
+ private static final String ALERT_FLOW_YAML_DIR = "alertflow";
private static final String BASIC_FLOW_NAME = "basic_flow";
private static final String BASIC_FLOW_YAML_FILE = BASIC_FLOW_NAME + ".flow";
private static final String FAIL_BASIC_FLOW_NAME = "fail_basic_flow";
private static final String FAIL_BASIC_FLOW_YAML_FILE = FAIL_BASIC_FLOW_NAME + ".flow";
private static final String EMBEDDED_FLOW_NAME = "embedded_flow";
private static final String EMBEDDED_FLOW_YAML_FILE = EMBEDDED_FLOW_NAME + ".flow";
+ private static final String ALERT_FLOW_NAME = "alert_flow";
+ private static final String ALERT_FLOW_YAML_FILE = ALERT_FLOW_NAME + ".flow";
private FlowRunnerTestUtil testUtil;
@Test
@@ -111,6 +122,48 @@ public class FlowRunnerYamlTest extends FlowRunnerTestBase {
assertFlowStatus(flow, Status.SUCCEEDED);
}
+ @Test
+ public void testAlertOnFlowFinished() throws Exception {
+ setUp(ALERT_FLOW_YAML_DIR, ALERT_FLOW_YAML_FILE);
+ final Alerter mailAlerter = mock(Alerter.class);
+ final ExecutionOptions executionOptions = new ExecutionOptions();
+ executionOptions.setFailureEmails(Arrays.asList("test@example.com"));
+ final Props azkabanProps = new Props();
+ azkabanProps.put(ConfigurationKeys.AZKABAN_POLL_MODEL, "true");
+ this.runner = this.testUtil
+ .createFromFlowMap(ALERT_FLOW_NAME, executionOptions, new HashMap<>(), azkabanProps);
+ final ExecutableFlow flow = this.runner.getExecutableFlow();
+ when(this.runner.getAlerterHolder().get("email")).thenReturn(mailAlerter);
+ FlowRunnerTestUtil.startThread(this.runner);
+ InteractiveTestJob.getTestJob("jobA").failJob();
+ InteractiveTestJob.getTestJob("jobB").failJob();
+ InteractiveTestJob.getTestJob("jobC").succeedJob();
+ assertFlowStatus(flow, Status.FAILED);
+ verify(mailAlerter).alertOnError(flow, "Flow finished");
+ }
+
+ @Test
+ public void testAlertOnFirstError() throws Exception {
+ setUp(ALERT_FLOW_YAML_DIR, ALERT_FLOW_YAML_FILE);
+ final Alerter mailAlerter = mock(Alerter.class);
+ final ExecutionOptions executionOptions = new ExecutionOptions();
+ executionOptions.setNotifyOnFirstFailure(true);
+ final Props azkabanProps = new Props();
+ azkabanProps.put(ConfigurationKeys.AZKABAN_POLL_MODEL, "true");
+ this.runner = this.testUtil
+ .createFromFlowMap(ALERT_FLOW_NAME, executionOptions, new HashMap<>(), azkabanProps);
+ final ExecutableFlow flow = this.runner.getExecutableFlow();
+ when(this.runner.getAlerterHolder().get("email")).thenReturn(mailAlerter);
+ FlowRunnerTestUtil.startThread(this.runner);
+ InteractiveTestJob.getTestJob("jobA").failJob();
+ assertFlowStatus(flow, Status.FAILED_FINISHING);
+ InteractiveTestJob.getTestJob("jobB").failJob();
+ assertFlowStatus(flow, Status.FAILED_FINISHING);
+ InteractiveTestJob.getTestJob("jobC").succeedJob();
+ assertFlowStatus(flow, Status.FAILED);
+ verify(mailAlerter, times(1)).alertOnFirstError(flow);
+ }
+
private void setUp(final String projectDir, final String flowYamlFile) throws Exception {
this.testUtil = new FlowRunnerTestUtil(projectDir, this.temporaryFolder);
final Project project = this.testUtil.getProject();
diff --git a/test/execution-test-data/alertflow/alert_flow.flow b/test/execution-test-data/alertflow/alert_flow.flow
new file mode 100644
index 0000000..f5dbd51
--- /dev/null
+++ b/test/execution-test-data/alertflow/alert_flow.flow
@@ -0,0 +1,20 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: jobD
+ type: test
+ dependsOn:
+ - jobA
+ - jobB
+ - jobC
+
+ - name: jobA
+ type: test
+
+ - name: jobB
+ type: test
+
+ - name: jobC
+ type: test
diff --git a/test/execution-test-data/alertflow/alert_flow.project b/test/execution-test-data/alertflow/alert_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/alertflow/alert_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0