azkaban-aplcache

Improve alerting on executions finalized by azkaban-web (#1944) -

9/27/2018 6:25:12 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index a69297f..3641e5e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -52,6 +52,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -68,6 +69,7 @@ import java.util.regex.Pattern;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
@@ -921,7 +923,7 @@ public class ExecutorManager extends EventHandler implements
             userId);
       } else if (this.queuedFlows.hasExecution(exFlow.getExecutionId())) {
         this.queuedFlows.dequeue(exFlow.getExecutionId());
-        finalizeFlows(exFlow);
+        finalizeFlows(exFlow, "Cancelled before dispatching to executor", null);
       } else {
         throw new ExecutorManagerException("Execution "
             + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId()
@@ -1151,7 +1153,7 @@ public class ExecutorManager extends EventHandler implements
             // this logic is only implemented in multiExecutorMode but
             // missed in single executor case.
             this.commonMetrics.markDispatchFail();
-            finalizeFlows(exflow);
+            finalizeFlows(exflow, "Dispatching failed", e);
             throw e;
           }
         }
@@ -1227,10 +1229,12 @@ public class ExecutorManager extends EventHandler implements
     this.executingManager.shutdown();
   }
 
-  private void finalizeFlows(final ExecutableFlow flow) {
+  private void finalizeFlows(final ExecutableFlow flow, final String reason,
+      final Throwable originalError) {
 
     final int execId = flow.getExecutionId();
     boolean alertUser = true;
+    final String[] extraReasons = getFinalizeFlowReasons(reason, originalError);
     this.updaterStage = "finalizing flow " + execId;
     // First we check if the execution in the datastore is complete
     try {
@@ -1266,19 +1270,17 @@ public class ExecutorManager extends EventHandler implements
       logger.error(e);
     }
 
-    // TODO append to the flow log that we forced killed this flow because the
-    // target no longer had
-    // the reference.
+    // TODO append to the flow log that we marked this flow as failed + the extraReasons
 
     this.updaterStage = "finalizing flow " + execId + " alerting and emailing";
     if (alertUser) {
       final ExecutionOptions options = flow.getExecutionOptions();
       // But we can definitely email them.
       final Alerter mailAlerter = this.alerterHolder.get("email");
-      if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED) {
+      if (flow.getStatus() != Status.SUCCEEDED) {
         if (options.getFailureEmails() != null && !options.getFailureEmails().isEmpty()) {
           try {
-            mailAlerter.alertOnError(flow);
+            mailAlerter.alertOnError(flow, extraReasons);
           } catch (final Exception e) {
             logger.error(e);
           }
@@ -1288,11 +1290,9 @@ public class ExecutorManager extends EventHandler implements
           final Alerter alerter = this.alerterHolder.get(alertType);
           if (alerter != null) {
             try {
-              alerter.alertOnError(flow);
+              alerter.alertOnError(flow, extraReasons);
             } catch (final Exception e) {
-              // TODO Auto-generated catch block
-              e.printStackTrace();
-              logger.error("Failed to alert by " + alertType);
+              logger.error("Failed to alert by " + alertType, e);
             }
           } else {
             logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
@@ -1314,9 +1314,7 @@ public class ExecutorManager extends EventHandler implements
             try {
               alerter.alertOnSuccess(flow);
             } catch (final Exception e) {
-              // TODO Auto-generated catch block
-              e.printStackTrace();
-              logger.error("Failed to alert by " + alertType);
+              logger.error("Failed to alert by " + alertType, e);
             }
           } else {
             logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
@@ -1327,6 +1325,15 @@ public class ExecutorManager extends EventHandler implements
 
   }
 
+  private String[] getFinalizeFlowReasons(final String reason, final Throwable originalError) {
+    final List<String> reasons = new LinkedList<>();
+    reasons.add(reason);
+    if (originalError != null) {
+      reasons.add(ExceptionUtils.getStackTrace(originalError));
+    }
+    return reasons.toArray(new String[reasons.size()]);
+  }
+
   private void failEverything(final ExecutableFlow exFlow) {
     final long time = System.currentTimeMillis();
     for (final ExecutableNode node : exFlow.getExecutableNodes()) {
@@ -1404,8 +1411,7 @@ public class ExecutorManager extends EventHandler implements
         try {
           mailAlerter.alertOnFirstError(flow);
         } catch (final Exception e) {
-          e.printStackTrace();
-          logger.error("Failed to send first error email." + e.getMessage());
+          logger.error("Failed to send first error email." + e.getMessage(), e);
         }
       }
       if (options.getFlowParameters().containsKey("alert.type")) {
@@ -1415,9 +1421,7 @@ public class ExecutorManager extends EventHandler implements
           try {
             alerter.alertOnFirstError(flow);
           } catch (final Exception e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-            logger.error("Failed to alert by " + alertType);
+            logger.error("Failed to alert by " + alertType, e);
           }
         } else {
           logger.error("Alerter type " + alertType
@@ -1662,7 +1666,7 @@ public class ExecutorManager extends EventHandler implements
 
             // Kill error flows
             for (final ExecutableFlow flow : finalizeFlows) {
-              finalizeFlows(flow);
+              finalizeFlows(flow, "Not running on the assigned executor (any more)", null);
             }
           }
 
@@ -1875,7 +1879,7 @@ public class ExecutorManager extends EventHandler implements
           if (giveUpReason != null) {
             logger.error("Failed to dispatch queued execution " + exflow.getId() + " because "
                 + giveUpReason);
-            finalizeFlows(exflow);
+            finalizeFlows(exflow, "Failed to dispatch because " + giveUpReason, null);
             // GIVE UP DISPATCHING - exit
             return;
           } else {
@@ -1925,7 +1929,7 @@ public class ExecutorManager extends EventHandler implements
           "Executor %s responded with exception for exec: %d",
           selectedExecutor, exflow.getExecutionId()), e);
       logger.info(String.format(
-          "Reached handleDispatchExceptionCase stage for exec %d with error count %d",
+          "Failed dispatch attempt for exec %d with error count %d",
           exflow.getExecutionId(), reference.getNumErrors()));
     }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
index 1f52b24..2cdf1be 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -79,7 +79,7 @@ public class DefaultMailCreator implements MailCreator {
   @Override
   public boolean createFirstErrorMessage(final ExecutableFlow flow,
       final EmailMessage message, final String azkabanName, final String scheme,
-      final String clientHostname, final String clientPortNumber, final String... vars) {
+      final String clientHostname, final String clientPortNumber) {
 
     final ExecutionOptions option = flow.getExecutionOptions();
     final List<String> emailList = option.getFailureEmails();
@@ -142,7 +142,7 @@ public class DefaultMailCreator implements MailCreator {
   @Override
   public boolean createErrorEmail(final ExecutableFlow flow, final List<ExecutableFlow>
       pastExecutions, final EmailMessage message, final String azkabanName, final String scheme,
-      final String clientHostname, final String clientPortNumber, final String... vars) {
+      final String clientHostname, final String clientPortNumber, final String... reasons) {
 
     final ExecutionOptions option = flow.getExecutionOptions();
 
@@ -183,8 +183,8 @@ public class DefaultMailCreator implements MailCreator {
         message.println("<li><a href=\"" + executionUrl + "&job=" + jobId
             + "\">Failed job '" + jobId + "' Link</a></li>");
       }
-      for (final String reasons : vars) {
-        message.println("<li>" + reasons + "</li>");
+      for (final String reason : reasons) {
+        message.println("<li>" + reason + "</li>");
       }
 
       message.println("</ul>");
@@ -220,7 +220,7 @@ public class DefaultMailCreator implements MailCreator {
   @Override
   public boolean createSuccessEmail(final ExecutableFlow flow, final EmailMessage message,
       final String azkabanName, final String scheme, final String clientHostname,
-      final String clientPortNumber, final String... vars) {
+      final String clientPortNumber) {
 
     final ExecutionOptions option = flow.getExecutionOptions();
     final List<String> emailList = option.getSuccessEmails();
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
index 991c64a..977d232 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
@@ -24,19 +24,19 @@ import java.util.List;
 
 public interface MailCreator {
 
-  public boolean createFirstErrorMessage(ExecutableFlow flow,
+  boolean createFirstErrorMessage(ExecutableFlow flow,
       EmailMessage message, String azkabanName, String scheme,
-      String clientHostname, String clientPortNumber, String... vars);
+      String clientHostname, String clientPortNumber);
 
-  public boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow> pastExecutions,
+  boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow> pastExecutions,
       EmailMessage message, String azkabanName, String scheme, String clientHostname,
-      String clientPortNumber, String... vars);
+      String clientPortNumber, String... reasons);
 
-  public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
+  boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
       String azkabanName, String scheme, String clientHostname,
-      String clientPortNumber, String... vars);
+      String clientPortNumber);
 
-  public boolean createFailedUpdateMessage(List<ExecutableFlow> flows, Executor executor,
+  boolean createFailedUpdateMessage(List<ExecutableFlow> flows, Executor executor,
       ExecutorManagerException updateException, EmailMessage message,
       String azkabanName, String scheme, String clientHostname,
       String clientPortNumber);
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 302e3ed..7da9c95 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -331,7 +331,8 @@ public class ExecutorManagerTest {
     verify(this.apiGateway)
         .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
     verify(this.loader, Mockito.times(2)).unassignExecutor(-1);
-    verify(this.mailAlerter).alertOnError(flow1);
+    verify(this.mailAlerter).alertOnError(eq(flow1),
+        eq("Failed to dispatch because reached azkaban.maxDispatchingErrors (tried 2 executors)"));
   }
 
   private void mockFlowDoesNotExist() throws Exception {
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
index 3175b07..8539182 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
@@ -16,8 +16,21 @@
 
 package azkaban.viewer.reportal;
 
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
 import azkaban.executor.Executor;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.executor.mail.MailCreator;
+import azkaban.project.Project;
+import azkaban.reportal.util.IStreamProvider;
+import azkaban.reportal.util.ReportalHelper;
+import azkaban.reportal.util.ReportalUtil;
+import azkaban.reportal.util.StreamProviderHDFS;
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.utils.EmailMessage;
+import azkaban.webapp.AzkabanWebServer;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -34,23 +47,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
 import java.util.Set;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringEscapeUtils;
-
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionOptions;
-import azkaban.executor.mail.DefaultMailCreator;
-import azkaban.executor.mail.MailCreator;
-import azkaban.project.Project;
-import azkaban.reportal.util.IStreamProvider;
-import azkaban.reportal.util.ReportalHelper;
-import azkaban.reportal.util.ReportalUtil;
-import azkaban.reportal.util.StreamProviderHDFS;
-import azkaban.security.commons.HadoopSecurityManager;
-import azkaban.utils.EmailMessage;
-import azkaban.webapp.AzkabanWebServer;
+import org.apache.log4j.Logger;
 
 public class ReportalMailCreator implements MailCreator {
   public static final String REPORTAL_MAIL_CREATOR = "ReportalMailCreator";
@@ -64,6 +63,8 @@ public class ReportalMailCreator implements MailCreator {
   public static String reportalStorageUser = "";
   public static File reportalMailTempDirectory;
 
+  private static final Logger logger = Logger.getLogger(ReportalMailCreator.class);
+
   static {
     DefaultMailCreator.registerCreator(REPORTAL_MAIL_CREATOR,
         new ReportalMailCreator());
@@ -72,7 +73,7 @@ public class ReportalMailCreator implements MailCreator {
   @Override
   public boolean createFirstErrorMessage(ExecutableFlow flow,
       EmailMessage message, String azkabanName, String scheme,
-      String clientHostname, String clientPortNumber, String... vars) {
+      String clientHostname, String clientPortNumber) {
 
     ExecutionOptions option = flow.getExecutionOptions();
     Set<String> emailList = new HashSet<String>(option.getFailureEmails());
@@ -84,7 +85,7 @@ public class ReportalMailCreator implements MailCreator {
   @Override
   public boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow>
       pastExecutions, EmailMessage message, String azkabanName, String scheme, String clientHostname,
-      String clientPortNumber, String... vars) {
+      String clientPortNumber, String... reasons) {
 
     ExecutionOptions option = flow.getExecutionOptions();
     Set<String> emailList = new HashSet<String>(option.getFailureEmails());
@@ -96,7 +97,7 @@ public class ReportalMailCreator implements MailCreator {
   @Override
   public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
       String azkabanName, String scheme, String clientHostname,
-      String clientPortNumber, String... vars) {
+      String clientPortNumber) {
 
     ExecutionOptions option = flow.getExecutionOptions();
     Set<String> emailList = new HashSet<String>(option.getSuccessEmails());
@@ -137,7 +138,7 @@ public class ReportalMailCreator implements MailCreator {
       try {
         return createMessage(project, flow, message, urlPrefix, printData);
       } catch (Exception e) {
-        e.printStackTrace();
+        logger.error("Message creation failed for " + flow.getId(), e);
       }
     }
 
@@ -281,7 +282,7 @@ public class ReportalMailCreator implements MailCreator {
       try {
         streamProvider.cleanUp();
       } catch (IOException e) {
-        e.printStackTrace();
+        logger.error("Stream provider cleanup failed for " + flow.getId(), e);
       }
 
       boolean emptyResults = true;