azkaban-aplcache

AZNewDispatchingLogic - Send alerting emails from executor

1/2/2019 11:11:54 PM

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 27981e7..9b12995 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -24,6 +24,7 @@ import static azkaban.project.DirectoryYamlFlowLoader.CONDITION_ON_JOB_STATUS_PA
 import static azkaban.project.DirectoryYamlFlowLoader.CONDITION_VARIABLE_REPLACEMENT_PATTERN;
 
 import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
 import azkaban.Constants.JobProperties;
 import azkaban.ServiceProvider;
 import azkaban.event.Event;
@@ -35,6 +36,7 @@ import azkaban.execapp.event.JobCallbackManager;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.execapp.metric.NumFailedJobMetric;
 import azkaban.execapp.metric.NumRunningJobMetric;
+import azkaban.executor.AlerterHolder;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
@@ -43,6 +45,7 @@ import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
+import azkaban.executor.selector.ExecutionControllerUtils;
 import azkaban.flow.ConditionOnJobStatus;
 import azkaban.flow.FlowProps;
 import azkaban.flow.FlowUtils;
@@ -120,6 +123,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   // Thread safe swap queue for finishedExecutions.
   private final SwapQueue<ExecutableNode> finishedNodes;
   private final AzkabanEventReporter azkabanEventReporter;
+  private final AlerterHolder alerterHolder;
   private Logger logger;
   private Appender flowAppender;
   private File logFile;
@@ -148,10 +152,11 @@ public class FlowRunner extends EventHandler implements Runnable {
    */
   public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
-      final Props azkabanProps, final AzkabanEventReporter azkabanEventReporter)
+      final Props azkabanProps, final AzkabanEventReporter azkabanEventReporter, final
+  AlerterHolder alerterHolder)
       throws ExecutorManagerException {
     this(flow, executorLoader, projectLoader, jobtypeManager, null, azkabanProps,
-        azkabanEventReporter);
+        azkabanEventReporter, alerterHolder);
   }
 
   /**
@@ -160,7 +165,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   public FlowRunner(final ExecutableFlow flow, final ExecutorLoader executorLoader,
       final ProjectLoader projectLoader, final JobTypeManager jobtypeManager,
       final ExecutorService executorService, final Props azkabanProps,
-      final AzkabanEventReporter azkabanEventReporter)
+      final AzkabanEventReporter azkabanEventReporter, final AlerterHolder alerterHolder)
       throws ExecutorManagerException {
     this.execId = flow.getExecutionId();
     this.flow = flow;
@@ -177,6 +182,8 @@ public class FlowRunner extends EventHandler implements Runnable {
     this.executorService = executorService;
     this.finishedNodes = new SwapQueue<>();
     this.azkabanProps = azkabanProps;
+    this.alerterHolder = alerterHolder;
+
     // Add the flow listener only if a non-null eventReporter is available.
     if (azkabanEventReporter != null) {
       this.addListener(this.flowListener);
@@ -258,6 +265,12 @@ public class FlowRunner extends EventHandler implements Runnable {
       } finally {
         this.fireEventListeners(
             Event.create(this, EventType.FLOW_FINISHED, new EventData(this.flow)));
+        // In polling model, executor will be responsible for sending alerting emails when a flow
+        // finishes.
+        if (this.azkabanProps.getBoolean(ConfigurationKeys.AZKABAN_POLL_MODEL, false)) {
+          ExecutionControllerUtils.alertUser(this.flow, this.alerterHolder,
+              ExecutionControllerUtils.getFinalizeFlowReasons("Flow finished", null));
+        }
       }
     }
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 7cfb598..2027837 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -26,6 +26,7 @@ import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.event.LocalFlowWatcher;
 import azkaban.execapp.event.RemoteFlowWatcher;
 import azkaban.execapp.metric.NumFailedFlowMetric;
+import azkaban.executor.AlerterHolder;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.Executor;
@@ -123,6 +124,7 @@ public class FlowRunnerManager implements EventListener,
   private final JobTypeManager jobtypeManager;
   private final FlowPreparer flowPreparer;
   private final TriggerManager triggerManager;
+  private final AlerterHolder alerterHolder;
   private final AzkabanEventReporter azkabanEventReporter;
   private final Props azkabanProps;
   private final File executionDirectory;
@@ -150,6 +152,7 @@ public class FlowRunnerManager implements EventListener,
       final ProjectLoader projectLoader,
       final StorageManager storageManager,
       final TriggerManager triggerManager,
+      final AlerterHolder alerterHolder,
       @Nullable final AzkabanEventReporter azkabanEventReporter) throws IOException {
     this.azkabanProps = props;
 
@@ -176,6 +179,7 @@ public class FlowRunnerManager implements EventListener,
     this.executorLoader = executorLoader;
     this.projectLoader = projectLoader;
     this.triggerManager = triggerManager;
+    this.alerterHolder = alerterHolder;
 
     this.jobLogChunkSize = this.azkabanProps.getString("job.log.chunk.size", "5MB");
     this.jobLogNumFiles = this.azkabanProps.getInt("job.log.backup.index", 4);
@@ -366,7 +370,7 @@ public class FlowRunnerManager implements EventListener,
 
     final FlowRunner runner =
         new FlowRunner(flow, this.executorLoader, this.projectLoader, this.jobtypeManager,
-            this.azkabanProps, this.azkabanEventReporter);
+            this.azkabanProps, this.azkabanEventReporter, this.alerterHolder);
     runner.setFlowWatcher(watcher)
         .setJobLogSettings(this.jobLogChunkSize, this.jobLogNumFiles)
         .setValidateProxyUser(this.validateProxyUser)
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index 64d421e..a37f18a 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 import azkaban.event.Event;
 import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
+import azkaban.executor.AlerterHolder;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
@@ -252,7 +253,7 @@ public class FlowRunnerTestUtil {
     this.executorLoader.uploadExecutableFlow(exFlow);
     final FlowRunner runner =
         new FlowRunner(exFlow, this.executorLoader, this.projectLoader,
-            this.jobtypeManager, azkabanProps, null);
+            this.jobtypeManager, azkabanProps, null, mock(AlerterHolder.class));
     if (eventCollector != null) {
       runner.addListener(eventCollector);
     }