diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 27981e7..9b12995 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -24,6 +24,7 @@ import static azkaban.project.DirectoryYamlFlowLoader.CONDITION_ON_JOB_STATUS_PA
import static azkaban.project.DirectoryYamlFlowLoader.CONDITION_VARIABLE_REPLACEMENT_PATTERN;
import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.Constants.JobProperties;
import azkaban.ServiceProvider;
import azkaban.event.Event;
@@ -35,6 +36,7 @@ import azkaban.execapp.event.JobCallbackManager;
import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.execapp.metric.NumFailedJobMetric;
import azkaban.execapp.metric.NumRunningJobMetric;
+import azkaban.executor.AlerterHolder;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
@@ -43,6 +45,7 @@ import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
+import azkaban.executor.selector.ExecutionControllerUtils;
import azkaban.flow.ConditionOnJobStatus;
import azkaban.flow.FlowProps;
import azkaban.flow.FlowUtils;
@@ -120,6 +123,7 @@ public class FlowRunner extends EventHandler implements Runnable {
// Thread safe swap queue for finishedExecutions.
private final SwapQueue<ExecutableNode> finishedNodes;
private final AzkabanEventReporter azkabanEventReporter;
+ private final AlerterHolder alerterHolder;
private Logger logger;
private Appender flowAppender;
private File logFile;
@@ -148,10 +152,11 @@ public class FlowRunner extends EventHandler implements Runnable {
*/
public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
- final Props azkabanProps, final AzkabanEventReporter azkabanEventReporter)
+ final Props azkabanProps, final AzkabanEventReporter azkabanEventReporter, final
+ AlerterHolder alerterHolder)
throws ExecutorManagerException {
this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps,
- azkabanEventReporter);
+ azkabanEventReporter, alerterHolder);
}
/**
@@ -160,7 +165,7 @@ public class FlowRunner extends EventHandler implements Runnable {
public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
final ExecutorService executorService, final Props azkabanProps,
- final AzkabanEventReporter azkabanEventReporter)
+ final AzkabanEventReporter azkabanEventReporter, final AlerterHolder alerterHolder)
throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
@@ -177,6 +182,8 @@ public class FlowRunner extends EventHandler implements Runnable {
this.executorService = executorService;
this.finishedNodes = new SwapQueue<>();
this.azkabanProps = azkabanProps;
+ this.alerterHolder = alerterHolder;
+
// Add the flow listener only if a non-null eventReporter is available.
if (azkabanEventReporter != null) {
this.addListener(this.flowListener);
@@ -258,6 +265,12 @@ public class FlowRunner extends EventHandler implements Runnable {
} finally {
this.fireEventListeners(
Event.create(this, EventType.FLOW_FINISHED, new EventData(this.flow)));
+ // In polling model, executor will be responsible for sending alerting emails when a flow
+ // finishes.
+ if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
+ ExecutionControllerUtils.alertUser(this.flow, this.alerterHolder,
+ ExecutionControllerUtils.getFinalizeFlowReasons("Flow finished", null));
+ }
}
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 7cfb598..2027837 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -26,6 +26,7 @@ import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
import azkaban.execapp.event.RemoteFlowWatcher;
import azkaban.execapp.metric.NumFailedFlowMetric;
+import azkaban.executor.AlerterHolder;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.Executor;
@@ -123,6 +124,7 @@ public class FlowRunnerManager implements EventListener,
private final JobTypeManager jobtypeManager;
private final FlowPreparer flowPreparer;
private final TriggerManager triggerManager;
+ private final AlerterHolder alerterHolder;
private final AzkabanEventReporter azkabanEventReporter;
private final Props azkabanProps;
private final File executionDirectory;
@@ -150,6 +152,7 @@ public class FlowRunnerManager implements EventListener,
final ProjectLoader projectLoader,
final StorageManager storageManager,
final TriggerManager triggerManager,
+ final AlerterHolder alerterHolder,
@Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
this.azkabanProps = props;
@@ -176,6 +179,7 @@ public class FlowRunnerManager implements EventListener,
this.executorLoader = executorLoader;
this.projectLoader = projectLoader;
this.triggerManager = triggerManager;
+ this.alerterHolder = alerterHolder;
this.jobLogChunkSize = this.azkabanProps.getString("job.log.chunk.size", "5MB");
this.jobLogNumFiles = this.azkabanProps.getInt("job.log.backup.index", 4);
@@ -366,7 +370,7 @@ public class FlowRunnerManager implements EventListener,
final FlowRunner runner =
new FlowRunner(flow, this.executorLoader, this.projectLoader, this.jobtypeManager,
- this.azkabanProps, this.azkabanEventReporter);
+ this.azkabanProps, this.azkabanEventReporter, this.alerterHolder);
runner.setFlowWatcher(watcher)
.setJobLogSettings(this.jobLogChunkSize, this.jobLogNumFiles)
.setValidateProxyUser(this.validateProxyUser)
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index 64d421e..a37f18a 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import azkaban.event.Event;
import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.jmx.JmxJobMBeanManager;
+import azkaban.executor.AlerterHolder;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
@@ -252,7 +253,7 @@ public class FlowRunnerTestUtil {
this.executorLoader.uploadExecutableFlow(exFlow);
final FlowRunner runner =
new FlowRunner(exFlow, this.executorLoader, this.projectLoader,
- this.jobtypeManager, azkabanProps, null);
+ this.jobtypeManager, azkabanProps, null, mock(AlerterHolder.class));
if (eventCollector != null) {
runner.addListener(eventCollector);
}