azkaban-aplcache

No eviction, alert email instead (#1661) * No eviction, alert

9/6/2018 6:29:08 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/alert/Alerter.java b/azkaban-common/src/main/java/azkaban/alert/Alerter.java
index 07a6159..08cef95 100644
--- a/azkaban-common/src/main/java/azkaban/alert/Alerter.java
+++ b/azkaban-common/src/main/java/azkaban/alert/Alerter.java
@@ -17,7 +17,10 @@
 package azkaban.alert;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.sla.SlaOption;
+import java.util.List;
 
 public interface Alerter {
 
@@ -28,4 +31,7 @@ public interface Alerter {
   void alertOnFirstError(ExecutableFlow exflow) throws Exception;
 
   void alertOnSla(SlaOption slaOption, String slaMessage) throws Exception;
+
+  void alertOnFailedUpdate(Executor executor, List<ExecutableFlow> executions,
+      ExecutorManagerException e);
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 0ad8c19..6ec5c8e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1535,9 +1535,11 @@ public class ExecutorManager extends EventHandler implements
 
     private final int waitTimeIdleMs = 2000;
     private final int waitTimeMs = 500;
-    // When we have an http error, for that flow, we'll check every 10 secs, 6
-    // times (1 mins) before we evict.
-    private final int numErrors = 6;
+    // When we have an http error, for that flow, we'll check every 10 secs, 360
+    // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
+    private final int numErrorsBetweenUnresponsiveEmail = 360;
+    // First email is sent after 1 minute of unresponsiveness
+    private final int numErrorsBeforeUnresponsiveEmail = 6;
     private final long errorThreshold = 10000;
     private boolean shutdown = false;
 
@@ -1603,30 +1605,29 @@ public class ExecutorManager extends EventHandler implements
                         executor.getPort(), ConnectorParams.UPDATE_ACTION,
                         null, null, executionIds, updateTimes);
               } catch (final ExecutorManagerException e) {
-                logger.error(e);
+                logger.error("Failed to get update from executor " + executor.getHost(), e);
+                boolean sendUnresponsiveEmail = false;
                 for (final ExecutableFlow flow : entry.getValue()) {
                   final Pair<ExecutionReference, ExecutableFlow> pair =
                       ExecutorManager.this.runningFlows.get(flow.getExecutionId());
+                  // TODO can runningFlows.get ever return null, causing NPE below?
 
                   ExecutorManager.this.updaterStage =
-                      "Failed to get update. Doing some clean up for flow "
-                          + pair.getSecond().getExecutionId();
-
-                  if (pair != null) {
-                    final ExecutionReference ref = pair.getFirst();
-                    int numErrors = ref.getNumErrors();
-                    if (ref.getNumErrors() < this.numErrors) {
-                      ref.setNextCheckTime(System.currentTimeMillis()
-                          + this.errorThreshold);
-                      ref.setNumErrors(++numErrors);
-                    } else {
-                      logger.warn("Evicting execution " + flow.getExecutionId()
-                          + ". The executor is unresponsive.");
-                      // TODO should send out an unresponsive email here.
-                      finalizeFlows.add(pair.getSecond());
-                    }
+                      "Failed to get update for flow " + pair.getSecond().getExecutionId();
+
+                  final ExecutionReference ref = pair.getFirst();
+                  ref.setNextCheckTime(System.currentTimeMillis() + this.errorThreshold);
+                  ref.setNumErrors(ref.getNumErrors() + 1);
+                  if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
+                      || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
+                    // if any of the executions has failed many enough updates, alert
+                    sendUnresponsiveEmail = true;
                   }
                 }
+                if (sendUnresponsiveEmail) {
+                  final Alerter mailAlerter = ExecutorManager.this.alerterHolder.get("email");
+                  mailAlerter.alertOnFailedUpdate(executor, entry.getValue(), e);
+                }
               }
 
               // We gets results
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
index 9ddc488..1f52b24 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -20,6 +20,8 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Utils;
@@ -29,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import org.apache.commons.lang.exception.ExceptionUtils;
 
 public class DefaultMailCreator implements MailCreator {
 
@@ -253,4 +256,51 @@ public class DefaultMailCreator implements MailCreator {
     }
     return false;
   }
+
+  @Override
+  public boolean createFailedUpdateMessage(final List<ExecutableFlow> flows,
+      final Executor executor, final ExecutorManagerException updateException,
+      final EmailMessage message, final String azkabanName,
+      final String scheme, final String clientHostname, final String clientPortNumber) {
+
+    final ExecutionOptions option = flows.get(0).getExecutionOptions();
+    final List<String> emailList = option.getFailureEmails();
+
+    if (emailList != null && !emailList.isEmpty()) {
+      message.addAllToAddress(emailList);
+      message.setMimeType("text/html");
+      message.setSubject(
+          "Flow status could not be updated from " + executor.getHost() + " on " + azkabanName);
+
+      message.println(
+          "<h2 style=\"color:#FF0000\"> Flow status could not be updated from " + executor.getHost()
+              + " on " + azkabanName + "</h2>");
+
+      message.println("The actual status of these executions is unknown, "
+          + "because getting status update from azkaban executor is failing");
+
+      message.println("");
+      message.println("<h3>Error detail</h3>");
+      message.println("<pre>" + ExceptionUtils.getStackTrace(updateException) + "</pre>");
+
+      message.println("");
+      message.println("<h3>Affected executions</h3>");
+      message.println("<ul>");
+      for (final ExecutableFlow flow : flows) {
+        final int execId = flow.getExecutionId();
+        final String executionUrl =
+            scheme + "://" + clientHostname + ":" + clientPortNumber + "/"
+                + "executor?" + "execid=" + execId;
+
+        message.println("<li>Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId()
+            + "' of project '" + flow.getProjectName() + "' - " +
+            " <a href=\"" + executionUrl + "\">Execution Link</a></li>");
+      }
+
+      message.println("</ul>");
+      return true;
+    }
+
+    return false;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
index 934e2c5..991c64a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
@@ -17,6 +17,8 @@
 package azkaban.executor.mail;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.utils.EmailMessage;
 import java.util.List;
 
@@ -33,4 +35,9 @@ public interface MailCreator {
   public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
       String azkabanName, String scheme, String clientHostname,
       String clientPortNumber, String... vars);
+
+  public boolean createFailedUpdateMessage(List<ExecutableFlow> flows, Executor executor,
+      ExecutorManagerException updateException, EmailMessage message,
+      String azkabanName, String scheme, String clientHostname,
+      String clientPortNumber);
 }
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index 41ac9fa..b271350 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -22,18 +22,25 @@ import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.alert.Alerter;
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.mail.DefaultMailCreator;
 import azkaban.executor.mail.MailCreator;
 import azkaban.metrics.CommonMetrics;
 import azkaban.sla.SlaOption;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import javax.mail.internet.AddressException;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.log4j.Logger;
 
 @Singleton
@@ -146,6 +153,55 @@ public class Emailer extends AbstractMailer implements Alerter {
     sendEmail(message, mailCreated, "success email message for execution" + flow.getExecutionId());
   }
 
+  /**
+   * Sends as many emails as there are unique combinations of:
+   *
+   * [mail creator] x [failure email address list]
+   *
+   * Executions with the same combo are grouped into a single message.
+   */
+  @Override
+  public void alertOnFailedUpdate(final Executor executor, List<ExecutableFlow> flows,
+      final ExecutorManagerException updateException) {
+
+    flows = flows.stream()
+        .filter(flow -> flow.getExecutionOptions() != null)
+        .filter(flow -> CollectionUtils.isNotEmpty(flow.getExecutionOptions().getFailureEmails()))
+        .collect(Collectors.toList());
+
+    // group by mail creator in case some flows use different creators
+    final ImmutableListMultimap<String, ExecutableFlow> creatorsToFlows = Multimaps
+        .index(flows, flow -> flow.getExecutionOptions().getMailCreator());
+
+    for (final String mailCreatorName : creatorsToFlows.keySet()) {
+
+      final ImmutableList<ExecutableFlow> creatorFlows = creatorsToFlows.get(mailCreatorName);
+      final MailCreator mailCreator = getMailCreator(mailCreatorName);
+
+      // group by recipients in case some flows have different failure email addresses
+      final ImmutableListMultimap<List<String>, ExecutableFlow> emailsToFlows = Multimaps
+          .index(creatorFlows, flow -> flow.getExecutionOptions().getFailureEmails());
+
+      for (final List<String> emailList : emailsToFlows.keySet()) {
+        sendFailedUpdateEmail(executor, updateException, mailCreator, emailsToFlows.get(emailList));
+      }
+    }
+  }
+
+  /**
+   * Sends a single email about failed updates.
+   */
+  private void sendFailedUpdateEmail(final Executor executor,
+      final ExecutorManagerException exception, final MailCreator mailCreator,
+      final ImmutableList<ExecutableFlow> flows) {
+    final EmailMessage message = this.messageCreator.createMessage();
+    final boolean mailCreated = mailCreator
+        .createFailedUpdateMessage(flows, executor, exception, message,
+            this.azkabanName, this.scheme, this.clientHostname, this.clientPortNumber);
+    final List<Integer> executionIds = Lists.transform(flows, ExecutableFlow::getExecutionId);
+    sendEmail(message, mailCreated, "failed update email message for executions " + executionIds);
+  }
+
   private MailCreator getMailCreator(final ExecutableFlow flow) {
     final String name = flow.getExecutionOptions().getMailCreator();
     return getMailCreator(name);
diff --git a/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java b/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
index 80577d0..46105e1 100644
--- a/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
@@ -7,6 +7,8 @@ import static org.junit.Assert.assertTrue;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
@@ -16,6 +18,7 @@ import azkaban.utils.EmailMessageCreator;
 import azkaban.utils.TestUtils;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.TimeZone;
 import org.joda.time.DateTimeUtils;
@@ -34,6 +37,7 @@ public class DefaultMailCreatorTest {
 
   private DefaultMailCreator mailCreator;
 
+  private Executor executor;
   private ExecutableFlow executableFlow;
   private Flow flow;
   private Project project;
@@ -45,6 +49,15 @@ public class DefaultMailCreatorTest {
   private String clientPortNumber;
   private TimeZone defaultTz;
 
+  public static ExecutorManagerException createTestStracktrace() {
+    final ExecutorManagerException exception = new ExecutorManagerException("mocked failure");
+    // set custom stacktrace to have deterministic string for comparison
+    exception.setStackTrace(new StackTraceElement[]{new StackTraceElement(
+        DefaultMailCreatorTest.class.getCanonicalName(), "createFailedUpdateMessage",
+        "DefaultMailCreatorTest.java", 135)});
+    return exception;
+  }
+
   @Before
   public void setUp() throws Exception {
     this.defaultTz = TimeZone.getDefault();
@@ -55,6 +68,7 @@ public class DefaultMailCreatorTest {
 
     this.mailCreator = new DefaultMailCreator();
 
+    this.executor = new Executor(1, "executor1-host", 1234, true);
     this.flow = new Flow("mail-creator-test");
     this.project = new Project(1, "test-project");
     this.options = new ExecutionOptions();
@@ -145,4 +159,17 @@ public class DefaultMailCreatorTest {
         .isEqualToIgnoringWhitespace(this.message.getBody());
   }
 
+  @Test
+  public void createFailedUpdateMessage() throws Exception {
+    final ExecutorManagerException exception = createTestStracktrace();
+    assertTrue(this.mailCreator
+        .createFailedUpdateMessage(Arrays.asList(this.executableFlow, this.executableFlow),
+            this.executor, exception, this.message, this.azkabanName, this.scheme,
+            this.clientHostname, this.clientPortNumber));
+    assertEquals("Flow status could not be updated from executor1-host on unit-tests",
+        this.message.getSubject());
+    assertThat(TestUtils.readResource("failedUpdateMessage.html", this))
+        .isEqualToIgnoringWhitespace(this.message.getBody());
+  }
+
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index c165eb3..97c7db3 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -24,8 +24,11 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
 import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.MockExecutorLoader;
+import azkaban.executor.mail.DefaultMailCreatorTest;
 import azkaban.flow.Flow;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
@@ -34,6 +37,7 @@ import azkaban.project.Project;
 import azkaban.test.executions.ExecutionsTestUtil;
 import com.codahale.metrics.MetricRegistry;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import javax.mail.internet.AddressException;
 import org.junit.Assert;
@@ -116,6 +120,26 @@ public class EmailerTest {
   }
 
   @Test
+  public void alertOnFailedUpdate() throws Exception {
+    final Flow flow = this.project.getFlow("jobe");
+    flow.addFailureEmails(this.receiveAddrList);
+    Assert.assertNotNull(flow);
+    final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
+    final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
+        this.executorLoader);
+    final Executor executor = new Executor(1, "executor1-host", 1234, true);
+    final List<ExecutableFlow> executions = Arrays.asList(exFlow, exFlow);
+    final ExecutorManagerException exception = DefaultMailCreatorTest.createTestStracktrace();
+    emailer.alertOnFailedUpdate(executor, executions, exception);
+    verify(this.message).addAllToAddress(this.receiveAddrList);
+    verify(this.message)
+        .setSubject("Flow status could not be updated from executor1-host on azkaban");
+    assertThat(TestUtils.readResource("failedUpdateMessage2.html", this))
+        .isEqualToIgnoringWhitespace(this.message.getBody());
+  }
+
+  @Test
   public void testGetAzkabanURL() {
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
     final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
diff --git a/azkaban-common/src/test/resources/azkaban/executor/mail/failedUpdateMessage.html b/azkaban-common/src/test/resources/azkaban/executor/mail/failedUpdateMessage.html
new file mode 100644
index 0000000..e3e21b5
--- /dev/null
+++ b/azkaban-common/src/test/resources/azkaban/executor/mail/failedUpdateMessage.html
@@ -0,0 +1,13 @@
+<h2 style="color:#FF0000"> Flow status could not be updated from executor1-host on unit-tests</h2>
+The actual status of these executions is unknown, because getting status update from azkaban executor is failing
+<h3>Error detail</h3>
+<pre>azkaban.executor.ExecutorManagerException: mocked failure
+  at azkaban.executor.mail.DefaultMailCreatorTest.createFailedUpdateMessage(DefaultMailCreatorTest.java:135)
+</pre>
+<h3>Affected executions</h3>
+<ul>
+  <li>Execution '-1' of flow 'mail-creator-test' of project 'test-project' - <a
+      href="http://localhost:8081/executor?execid=-1"> Execution Link</a></li>
+  <li>Execution '-1' of flow 'mail-creator-test' of project 'test-project' - <a
+      href="http://localhost:8081/executor?execid=-1"> Execution Link</a></li>
+</ul>
diff --git a/azkaban-common/src/test/resources/azkaban/utils/failedUpdateMessage2.html b/azkaban-common/src/test/resources/azkaban/utils/failedUpdateMessage2.html
new file mode 100644
index 0000000..78abc81
--- /dev/null
+++ b/azkaban-common/src/test/resources/azkaban/utils/failedUpdateMessage2.html
@@ -0,0 +1,12 @@
+<h2 style="color:#FF0000"> Flow status could not be updated from executor1-host on
+  azkaban</h2>The actual status of these executions is unknown, because getting status update from azkaban executor is failing
+<h3>Error detail</h3>
+<pre>azkaban.executor.ExecutorManagerException: mocked failure
+	at azkaban.executor.mail.DefaultMailCreatorTest.createFailedUpdateMessage(DefaultMailCreatorTest.java:135)
+</pre><h3>Affected executions</h3>
+<ul>
+  <li>Execution '-1' of flow 'jobe' of project 'myTestProject' - <a
+      href="http://localhost:8786/executor?execid=-1">Execution Link</a></li>
+  <li>Execution '-1' of flow 'jobe' of project 'myTestProject' - <a
+      href="http://localhost:8786/executor?execid=-1">Execution Link</a></li>
+</ul>
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
index 72e6708..3175b07 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
@@ -16,6 +16,8 @@
 
 package azkaban.viewer.reportal;
 
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorManagerException;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -103,6 +105,20 @@ public class ReportalMailCreator implements MailCreator {
         scheme, clientHostname, clientPortNumber, true);
   }
 
+  @Override
+  public boolean createFailedUpdateMessage(List<ExecutableFlow> flows, Executor executor,
+      ExecutorManagerException updateException, EmailMessage message, String azkabanName,
+      String scheme, String clientHostname, String clientPortNumber) {
+
+    ExecutableFlow flow = flows.get(0);
+
+    ExecutionOptions option = flow.getExecutionOptions();
+    Set<String> emailList = new HashSet<String>(option.getFailureEmails());
+
+    return createEmail(flow, emailList, message, "FailedUpdate", azkabanName,
+        scheme, clientHostname, clientPortNumber, false);
+  }
+
   private boolean createEmail(ExecutableFlow flow, Set<String> emailList,
       EmailMessage message, String status, String azkabanName, String scheme,
       String clientHostname, String clientPortNumber, boolean printData) {