Details
diff --git a/azkaban-common/src/main/java/azkaban/alert/Alerter.java b/azkaban-common/src/main/java/azkaban/alert/Alerter.java
index 07a6159..08cef95 100644
--- a/azkaban-common/src/main/java/azkaban/alert/Alerter.java
+++ b/azkaban-common/src/main/java/azkaban/alert/Alerter.java
@@ -17,7 +17,10 @@
package azkaban.alert;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorManagerException;
import azkaban.sla.SlaOption;
+import java.util.List;
public interface Alerter {
@@ -28,4 +31,7 @@ public interface Alerter {
void alertOnFirstError(ExecutableFlow exflow) throws Exception;
void alertOnSla(SlaOption slaOption, String slaMessage) throws Exception;
+
+ void alertOnFailedUpdate(Executor executor, List<ExecutableFlow> executions,
+ ExecutorManagerException e);
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 0ad8c19..6ec5c8e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1535,9 +1535,11 @@ public class ExecutorManager extends EventHandler implements
private final int waitTimeIdleMs = 2000;
private final int waitTimeMs = 500;
- // When we have an http error, for that flow, we'll check every 10 secs, 6
- // times (1 mins) before we evict.
- private final int numErrors = 6;
+ // When we have an http error, for that flow, we'll check every 10 secs, 360
+ // times (3600 seconds = 1 hour) before we send an email about unresponsive executor.
+ private final int numErrorsBetweenUnresponsiveEmail = 360;
+ // First email is sent after 1 minute of unresponsiveness
+ private final int numErrorsBeforeUnresponsiveEmail = 6;
private final long errorThreshold = 10000;
private boolean shutdown = false;
@@ -1603,30 +1605,29 @@ public class ExecutorManager extends EventHandler implements
executor.getPort(), ConnectorParams.UPDATE_ACTION,
null, null, executionIds, updateTimes);
} catch (final ExecutorManagerException e) {
- logger.error(e);
+ logger.error("Failed to get update from executor " + executor.getHost(), e);
+ boolean sendUnresponsiveEmail = false;
for (final ExecutableFlow flow : entry.getValue()) {
final Pair<ExecutionReference, ExecutableFlow> pair =
ExecutorManager.this.runningFlows.get(flow.getExecutionId());
+ // TODO can runningFlows.get ever return null, causing NPE below?
ExecutorManager.this.updaterStage =
- "Failed to get update. Doing some clean up for flow "
- + pair.getSecond().getExecutionId();
-
- if (pair != null) {
- final ExecutionReference ref = pair.getFirst();
- int numErrors = ref.getNumErrors();
- if (ref.getNumErrors() < this.numErrors) {
- ref.setNextCheckTime(System.currentTimeMillis()
- + this.errorThreshold);
- ref.setNumErrors(++numErrors);
- } else {
- logger.warn("Evicting execution " + flow.getExecutionId()
- + ". The executor is unresponsive.");
- // TODO should send out an unresponsive email here.
- finalizeFlows.add(pair.getSecond());
- }
+ "Failed to get update for flow " + pair.getSecond().getExecutionId();
+
+ final ExecutionReference ref = pair.getFirst();
+ ref.setNextCheckTime(System.currentTimeMillis() + this.errorThreshold);
+ ref.setNumErrors(ref.getNumErrors() + 1);
+ if (ref.getNumErrors() == this.numErrorsBeforeUnresponsiveEmail
+ || ref.getNumErrors() % this.numErrorsBetweenUnresponsiveEmail == 0) {
+ // if any of the executions has failed many enough updates, alert
+ sendUnresponsiveEmail = true;
}
}
+ if (sendUnresponsiveEmail) {
+ final Alerter mailAlerter = ExecutorManager.this.alerterHolder.get("email");
+ mailAlerter.alertOnFailedUpdate(executor, entry.getValue(), e);
+ }
}
// We gets results
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
index 9ddc488..1f52b24 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -20,6 +20,8 @@ import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.utils.EmailMessage;
import azkaban.utils.Utils;
@@ -29,6 +31,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
+import org.apache.commons.lang.exception.ExceptionUtils;
public class DefaultMailCreator implements MailCreator {
@@ -253,4 +256,51 @@ public class DefaultMailCreator implements MailCreator {
}
return false;
}
+
+ @Override
+ public boolean createFailedUpdateMessage(final List<ExecutableFlow> flows,
+ final Executor executor, final ExecutorManagerException updateException,
+ final EmailMessage message, final String azkabanName,
+ final String scheme, final String clientHostname, final String clientPortNumber) {
+
+ final ExecutionOptions option = flows.get(0).getExecutionOptions();
+ final List<String> emailList = option.getFailureEmails();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject(
+ "Flow status could not be updated from " + executor.getHost() + " on " + azkabanName);
+
+ message.println(
+ "<h2 style=\"color:#FF0000\"> Flow status could not be updated from " + executor.getHost()
+ + " on " + azkabanName + "</h2>");
+
+ message.println("The actual status of these executions is unknown, "
+ + "because getting status update from azkaban executor is failing");
+
+ message.println("");
+ message.println("<h3>Error detail</h3>");
+ message.println("<pre>" + ExceptionUtils.getStackTrace(updateException) + "</pre>");
+
+ message.println("");
+ message.println("<h3>Affected executions</h3>");
+ message.println("<ul>");
+ for (final ExecutableFlow flow : flows) {
+ final int execId = flow.getExecutionId();
+ final String executionUrl =
+ scheme + "://" + clientHostname + ":" + clientPortNumber + "/"
+ + "executor?" + "execid=" + execId;
+
+ message.println("<li>Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId()
+ + "' of project '" + flow.getProjectName() + "' - " +
+ " <a href=\"" + executionUrl + "\">Execution Link</a></li>");
+ }
+
+ message.println("</ul>");
+ return true;
+ }
+
+ return false;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
index 934e2c5..991c64a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/MailCreator.java
@@ -17,6 +17,8 @@
package azkaban.executor.mail;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorManagerException;
import azkaban.utils.EmailMessage;
import java.util.List;
@@ -33,4 +35,9 @@ public interface MailCreator {
public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
String azkabanName, String scheme, String clientHostname,
String clientPortNumber, String... vars);
+
+ public boolean createFailedUpdateMessage(List<ExecutableFlow> flows, Executor executor,
+ ExecutorManagerException updateException, EmailMessage message,
+ String azkabanName, String scheme, String clientHostname,
+ String clientPortNumber);
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index 41ac9fa..b271350 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -22,18 +22,25 @@ import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.alert.Alerter;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.mail.DefaultMailCreator;
import azkaban.executor.mail.MailCreator;
import azkaban.metrics.CommonMetrics;
import azkaban.sla.SlaOption;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.mail.internet.AddressException;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;
@Singleton
@@ -146,6 +153,55 @@ public class Emailer extends AbstractMailer implements Alerter {
sendEmail(message, mailCreated, "success email message for execution" + flow.getExecutionId());
}
+ /**
+ * Sends as many emails as there are unique combinations of:
+ *
+ * [mail creator] x [failure email address list]
+ *
+ * Executions with the same combo are grouped into a single message.
+ */
+ @Override
+ public void alertOnFailedUpdate(final Executor executor, List<ExecutableFlow> flows,
+ final ExecutorManagerException updateException) {
+
+ flows = flows.stream()
+ .filter(flow -> flow.getExecutionOptions() != null)
+ .filter(flow -> CollectionUtils.isNotEmpty(flow.getExecutionOptions().getFailureEmails()))
+ .collect(Collectors.toList());
+
+ // group by mail creator in case some flows use different creators
+ final ImmutableListMultimap<String, ExecutableFlow> creatorsToFlows = Multimaps
+ .index(flows, flow -> flow.getExecutionOptions().getMailCreator());
+
+ for (final String mailCreatorName : creatorsToFlows.keySet()) {
+
+ final ImmutableList<ExecutableFlow> creatorFlows = creatorsToFlows.get(mailCreatorName);
+ final MailCreator mailCreator = getMailCreator(mailCreatorName);
+
+ // group by recipients in case some flows have different failure email addresses
+ final ImmutableListMultimap<List<String>, ExecutableFlow> emailsToFlows = Multimaps
+ .index(creatorFlows, flow -> flow.getExecutionOptions().getFailureEmails());
+
+ for (final List<String> emailList : emailsToFlows.keySet()) {
+ sendFailedUpdateEmail(executor, updateException, mailCreator, emailsToFlows.get(emailList));
+ }
+ }
+ }
+
+ /**
+ * Sends a single email about failed updates.
+ */
+ private void sendFailedUpdateEmail(final Executor executor,
+ final ExecutorManagerException exception, final MailCreator mailCreator,
+ final ImmutableList<ExecutableFlow> flows) {
+ final EmailMessage message = this.messageCreator.createMessage();
+ final boolean mailCreated = mailCreator
+ .createFailedUpdateMessage(flows, executor, exception, message,
+ this.azkabanName, this.scheme, this.clientHostname, this.clientPortNumber);
+ final List<Integer> executionIds = Lists.transform(flows, ExecutableFlow::getExecutionId);
+ sendEmail(message, mailCreated, "failed update email message for executions " + executionIds);
+ }
+
private MailCreator getMailCreator(final ExecutableFlow flow) {
final String name = flow.getExecutionOptions().getMailCreator();
return getMailCreator(name);
diff --git a/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java b/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
index 80577d0..46105e1 100644
--- a/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
@@ -7,6 +7,8 @@ import static org.junit.Assert.assertTrue;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.flow.Node;
@@ -16,6 +18,7 @@ import azkaban.utils.EmailMessageCreator;
import azkaban.utils.TestUtils;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
import org.joda.time.DateTimeUtils;
@@ -34,6 +37,7 @@ public class DefaultMailCreatorTest {
private DefaultMailCreator mailCreator;
+ private Executor executor;
private ExecutableFlow executableFlow;
private Flow flow;
private Project project;
@@ -45,6 +49,15 @@ public class DefaultMailCreatorTest {
private String clientPortNumber;
private TimeZone defaultTz;
+ public static ExecutorManagerException createTestStracktrace() {
+ final ExecutorManagerException exception = new ExecutorManagerException("mocked failure");
+ // set custom stacktrace to have deterministic string for comparison
+ exception.setStackTrace(new StackTraceElement[]{new StackTraceElement(
+ DefaultMailCreatorTest.class.getCanonicalName(), "createFailedUpdateMessage",
+ "DefaultMailCreatorTest.java", 135)});
+ return exception;
+ }
+
@Before
public void setUp() throws Exception {
this.defaultTz = TimeZone.getDefault();
@@ -55,6 +68,7 @@ public class DefaultMailCreatorTest {
this.mailCreator = new DefaultMailCreator();
+ this.executor = new Executor(1, "executor1-host", 1234, true);
this.flow = new Flow("mail-creator-test");
this.project = new Project(1, "test-project");
this.options = new ExecutionOptions();
@@ -145,4 +159,17 @@ public class DefaultMailCreatorTest {
.isEqualToIgnoringWhitespace(this.message.getBody());
}
+ @Test
+ public void createFailedUpdateMessage() throws Exception {
+ final ExecutorManagerException exception = createTestStracktrace();
+ assertTrue(this.mailCreator
+ .createFailedUpdateMessage(Arrays.asList(this.executableFlow, this.executableFlow),
+ this.executor, exception, this.message, this.azkabanName, this.scheme,
+ this.clientHostname, this.clientPortNumber));
+ assertEquals("Flow status could not be updated from executor1-host on unit-tests",
+ this.message.getSubject());
+ assertThat(TestUtils.readResource("failedUpdateMessage.html", this))
+ .isEqualToIgnoringWhitespace(this.message.getBody());
+ }
+
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index c165eb3..97c7db3 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -24,8 +24,11 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManagerException;
import azkaban.executor.MockExecutorLoader;
+import azkaban.executor.mail.DefaultMailCreatorTest;
import azkaban.flow.Flow;
import azkaban.metrics.CommonMetrics;
import azkaban.metrics.MetricsManager;
@@ -34,6 +37,7 @@ import azkaban.project.Project;
import azkaban.test.executions.ExecutionsTestUtil;
import com.codahale.metrics.MetricRegistry;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import javax.mail.internet.AddressException;
import org.junit.Assert;
@@ -116,6 +120,26 @@ public class EmailerTest {
}
@Test
+ public void alertOnFailedUpdate() throws Exception {
+ final Flow flow = this.project.getFlow("jobe");
+ flow.addFailureEmails(this.receiveAddrList);
+ Assert.assertNotNull(flow);
+ final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
+ final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+ final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
+ this.executorLoader);
+ final Executor executor = new Executor(1, "executor1-host", 1234, true);
+ final List<ExecutableFlow> executions = Arrays.asList(exFlow, exFlow);
+ final ExecutorManagerException exception = DefaultMailCreatorTest.createTestStracktrace();
+ emailer.alertOnFailedUpdate(executor, executions, exception);
+ verify(this.message).addAllToAddress(this.receiveAddrList);
+ verify(this.message)
+ .setSubject("Flow status could not be updated from executor1-host on azkaban");
+ assertThat(TestUtils.readResource("failedUpdateMessage2.html", this))
+ .isEqualToIgnoringWhitespace(this.message.getBody());
+ }
+
+ @Test
public void testGetAzkabanURL() {
final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator,
diff --git a/azkaban-common/src/test/resources/azkaban/executor/mail/failedUpdateMessage.html b/azkaban-common/src/test/resources/azkaban/executor/mail/failedUpdateMessage.html
new file mode 100644
index 0000000..e3e21b5
--- /dev/null
+++ b/azkaban-common/src/test/resources/azkaban/executor/mail/failedUpdateMessage.html
@@ -0,0 +1,13 @@
+<h2 style="color:#FF0000"> Flow status could not be updated from executor1-host on unit-tests</h2>
+The actual status of these executions is unknown, because getting status update from azkaban executor is failing
+<h3>Error detail</h3>
+<pre>azkaban.executor.ExecutorManagerException: mocked failure
+ at azkaban.executor.mail.DefaultMailCreatorTest.createFailedUpdateMessage(DefaultMailCreatorTest.java:135)
+</pre>
+<h3>Affected executions</h3>
+<ul>
+ <li>Execution '-1' of flow 'mail-creator-test' of project 'test-project' - <a
+ href="http://localhost:8081/executor?execid=-1"> Execution Link</a></li>
+ <li>Execution '-1' of flow 'mail-creator-test' of project 'test-project' - <a
+ href="http://localhost:8081/executor?execid=-1"> Execution Link</a></li>
+</ul>
diff --git a/azkaban-common/src/test/resources/azkaban/utils/failedUpdateMessage2.html b/azkaban-common/src/test/resources/azkaban/utils/failedUpdateMessage2.html
new file mode 100644
index 0000000..78abc81
--- /dev/null
+++ b/azkaban-common/src/test/resources/azkaban/utils/failedUpdateMessage2.html
@@ -0,0 +1,12 @@
+<h2 style="color:#FF0000"> Flow status could not be updated from executor1-host on
+ azkaban</h2>The actual status of these executions is unknown, because getting status update from azkaban executor is failing
+<h3>Error detail</h3>
+<pre>azkaban.executor.ExecutorManagerException: mocked failure
+ at azkaban.executor.mail.DefaultMailCreatorTest.createFailedUpdateMessage(DefaultMailCreatorTest.java:135)
+</pre><h3>Affected executions</h3>
+<ul>
+ <li>Execution '-1' of flow 'jobe' of project 'myTestProject' - <a
+ href="http://localhost:8786/executor?execid=-1">Execution Link</a></li>
+ <li>Execution '-1' of flow 'jobe' of project 'myTestProject' - <a
+ href="http://localhost:8786/executor?execid=-1">Execution Link</a></li>
+</ul>
diff --git a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
index 72e6708..3175b07 100644
--- a/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
+++ b/az-reportal/src/main/java/azkaban/viewer/reportal/ReportalMailCreator.java
@@ -16,6 +16,8 @@
package azkaban.viewer.reportal;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorManagerException;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -103,6 +105,20 @@ public class ReportalMailCreator implements MailCreator {
scheme, clientHostname, clientPortNumber, true);
}
+ @Override
+ public boolean createFailedUpdateMessage(List<ExecutableFlow> flows, Executor executor,
+ ExecutorManagerException updateException, EmailMessage message, String azkabanName,
+ String scheme, String clientHostname, String clientPortNumber) {
+
+ ExecutableFlow flow = flows.get(0);
+
+ ExecutionOptions option = flow.getExecutionOptions();
+ Set<String> emailList = new HashSet<String>(option.getFailureEmails());
+
+ return createEmail(flow, emailList, message, "FailedUpdate", azkabanName,
+ scheme, clientHostname, clientPortNumber, false);
+ }
+
private boolean createEmail(ExecutableFlow flow, Set<String> emailList,
EmailMessage message, String status, String azkabanName, String scheme,
String clientHostname, String clientPortNumber, boolean printData) {