azkaban-aplcache

more informative email alerting on flow trigger cancellation

6/1/2018 5:48:35 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java b/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
index f9e8eae..647cc1b 100644
--- a/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
@@ -34,7 +34,7 @@ public class AbstractMailer {
     this.attachmentMazSizeInByte = maxAttachmentSizeInMB * MB_IN_BYTES;
   }
 
-  protected EmailMessage createEmailMessage(final String subject, final String mimetype,
+  public EmailMessage createEmailMessage(final String subject, final String mimetype,
       final Collection<String> emailList) {
     final EmailMessage message = this.messageCreator.createMessage();
     message.addAllToAddress(emailList);
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index af477ee..6247b59 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -52,8 +52,7 @@ public class Emailer extends AbstractMailer implements Alerter {
 
     final int mailTimeout = props.getInt("mail.timeout.millis", 30000);
     EmailMessage.setTimeout(mailTimeout);
-    final int connectionTimeout =
-        props.getInt("mail.connection.timeout.millis", 30000);
+    final int connectionTimeout = props.getInt("mail.connection.timeout.millis", 30000);
     EmailMessage.setConnectionTimeout(connectionTimeout);
 
     EmailMessage.setTotalAttachmentMaxSize(getAttachmentMaxSize());
@@ -75,6 +74,10 @@ public class Emailer extends AbstractMailer implements Alerter {
     }
   }
 
+  public String getAzkabanURL() {
+    return this.scheme + "://" + this.clientHostname + ":" + this.clientPortNumber;
+  }
+
   /**
    * Send an email to the specified email list
    */
@@ -135,7 +138,7 @@ public class Emailer extends AbstractMailer implements Alerter {
     return mailCreator;
   }
 
-  private void sendEmail(final EmailMessage message, final boolean mailCreated,
+  public void sendEmail(final EmailMessage message, final boolean mailCreated,
       final String operation) {
     if (mailCreated) {
       try {
@@ -158,5 +161,4 @@ public class Emailer extends AbstractMailer implements Alerter {
       return flowName;
     }
   }
-
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index e5c263e..5302156 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -38,7 +38,7 @@ import org.mockito.stubbing.Answer;
 
 public class EmailerTest {
 
-  private final String receiveAddr = "receive@domain.com";//receiver email address
+  private static final String receiveAddr = "receive@domain.com";//receiver email address
   private final List<String> receiveAddrList = new ArrayList<>();
 
   private Project project;
@@ -66,6 +66,16 @@ public class EmailerTest {
     return message;
   }
 
+  public static Props createMailProperties() {
+    final Props props = new Props();
+    props.put("job.failure.email", receiveAddr);
+    props.put("server.port", "114");
+    props.put("jetty.use.ssl", "false");
+    props.put("server.useSSL", "false");
+    props.put("jetty.port", "8786");
+    return props;
+  }
+
   @Before
   public void setUp() throws Exception {
     this.message = mockEmailMessage();
@@ -98,6 +108,13 @@ public class EmailerTest {
   }
 
   @Test
+  public void testGetAzkabanURL() {
+    final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
+    assertThat(emailer.getAzkabanURL()).isEqualTo("http://localhost:8786");
+  }
+
+  @Test
   public void testCreateEmailMessage() {
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
     final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
@@ -109,14 +126,4 @@ public class EmailerTest {
     verify(this.message).setSubject("subject");
     verify(this.message).setMimeType("text/html");
   }
-
-  private Props createMailProperties() {
-    final Props props = new Props();
-    props.put("job.failure.email", this.receiveAddr);
-    props.put("server.port", "114");
-    props.put("jetty.use.ssl", "false");
-    props.put("server.useSSL", "false");
-    props.put("jetty.port", "8786");
-    return props;
-  }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
index bf3562b..69eebf1 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
@@ -23,8 +23,12 @@ import azkaban.flow.Flow;
 import azkaban.flow.FlowUtils;
 import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
 import azkaban.project.Project;
+import azkaban.utils.EmailMessage;
 import azkaban.utils.Emailer;
+import azkaban.utils.Utils;
 import com.google.common.base.Preconditions;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -33,14 +37,13 @@ import javax.inject.Singleton;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("FutureReturnValueIgnored")
 @Singleton
+@SuppressWarnings("FutureReturnValueIgnored")
 public class TriggerInstanceProcessor {
 
   private static final Logger logger = LoggerFactory.getLogger(TriggerInstanceProcessor.class);
-  private static final String FAILURE_EMAIL_SUBJECT = "flow trigger for %s "
-      + "cancelled from %s";
-  private static final String FAILURE_EMAIL_BODY = "Your flow trigger cancelled [id: %s]";
+  private static final String FAILURE_EMAIL_SUBJECT = "flow trigger for flow '%s', project '%s' "
+      + "has been cancelled on %s";
   private final static int THREAD_POOL_SIZE = 32;
   private final ExecutorManager executorManager;
   private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
@@ -83,22 +86,57 @@ public class TriggerInstanceProcessor {
   }
 
   private String generateFailureEmailSubject(final TriggerInstance triggerInstance) {
-    final String flowFullName =
-        triggerInstance.getProjectName() + "." + triggerInstance.getFlowId();
-    return String.format(FAILURE_EMAIL_SUBJECT, flowFullName, this.emailer.getAzkabanName());
+    return String.format(FAILURE_EMAIL_SUBJECT, triggerInstance.getFlowId(), triggerInstance
+        .getProjectName(), this.emailer.getAzkabanName());
   }
 
-  private String generateFailureEmailBody(final TriggerInstance triggerInstance) {
-    final String triggerInstFullName =
-        triggerInstance.getProjectName() + "." + triggerInstance.getFlowId();
-    return String.format(FAILURE_EMAIL_BODY, triggerInstFullName);
+  private EmailMessage createFlowTriggerFailureEmailMessage(final TriggerInstance triggerInst) {
+    final EmailMessage message = this.emailer.createEmailMessage(generateFailureEmailSubject
+        (triggerInst), "text/html", triggerInst.getFailureEmails());
+    final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    message.addAllToAddress(triggerInst.getFailureEmails());
+    message.setMimeType("text/html");
+    message.println("<table>");
+    message.println("<tr><td>Start Time</td><td>");
+    message.println("<tr><td>" + sdf.format(new Date(triggerInst.getStartTime())) + "</td><td>");
+
+    message.println("<tr><td>End Time</td><td>");
+    message.println("<tr><td>" + sdf.format(new Date(triggerInst.getEndTime())) + "</td><td>");
+    message.println("<tr><td>Duration</td><td>"
+        + Utils.formatDuration(triggerInst.getStartTime(), triggerInst.getEndTime())
+        + "</td></tr>");
+    message.println("<tr><td>Status</td><td>" + triggerInst.getStatus() + "</td></tr>");
+    message.println("</table>");
+    message.println("");
+    final String executionUrl = this.emailer.getAzkabanURL() + "/executor?triggerinstanceid="
+        + triggerInst.getId();
+
+    message.println("<a href=\"" + executionUrl + "\">" + triggerInst.getFlowId()
+        + " Flow Trigger Instance Link</a>");
+
+    message.println("");
+    message.println("<h3>Cancelled Dependencies</h3>");
+
+    for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+      if (depInst.getStatus() == Status.CANCELLED) {
+        message.println("<table>");
+        message.println("<tr><td>Dependency Name: " + depInst.getDepName() + "</td><td>");
+        message
+            .println("<tr><td>Cancellation Cause: " + depInst.getCancellationCause() + "</td><td>");
+        message.println("</table>");
+      }
+    }
+
+    return message;
   }
 
   private void sendFailureEmailIfConfigured(final TriggerInstance triggerInstance) {
     final List<String> failureEmails = triggerInstance.getFailureEmails();
     if (!failureEmails.isEmpty()) {
-      this.emailer.sendEmail(failureEmails, generateFailureEmailSubject(triggerInstance),
-          generateFailureEmailBody(triggerInstance));
+      final EmailMessage message = this.createFlowTriggerFailureEmailMessage(triggerInstance);
+      this.emailer.sendEmail(message, true, "email message failure email for flow trigger "
+          + triggerInstance.getId());
     }
   }
 
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
index 9c61e84..39f56e8 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
@@ -15,6 +15,7 @@
  */
 package azkaban.flowtrigger;
 
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
@@ -26,19 +27,32 @@ import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
 import azkaban.project.CronSchedule;
 import azkaban.project.FlowTrigger;
 import azkaban.project.Project;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.EmailMessageCreator;
 import azkaban.utils.Emailer;
+import azkaban.utils.EmailerTest;
+import azkaban.utils.TestUtils;
+import com.codahale.metrics.MetricRegistry;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
 import java.util.List;
+import java.util.TimeZone;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.assertj.core.util.Lists;
 import org.assertj.core.util.Maps;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 
 public class TriggerInstanceProcessorTest {
@@ -47,30 +61,52 @@ public class TriggerInstanceProcessorTest {
   private FlowTriggerInstanceLoader triggerInstLoader;
   private ExecutorManager executorManager;
   private Emailer emailer;
+  private EmailMessage message;
+  private EmailMessageCreator messageCreator;
   private TriggerInstanceProcessor processor;
   private CountDownLatch sendEmailLatch;
 
-  private static TriggerInstance createTriggerInstance() {
+  private static TriggerInstance createTriggerInstance() throws ParseException {
     final FlowTrigger flowTrigger = new FlowTrigger(
         new CronSchedule("* * * * ? *"),
         new ArrayList<>(),
         Duration.ofMinutes(1)
     );
     final Project proj = new Project(1, "proj");
-    final Flow flow = new Flow("flowId");
+    final Flow flow = new Flow("123");
     flow.addFailureEmails(Lists.newArrayList(EMAIL));
     proj.setFlows(Maps.newHashMap("flowId", flow));
-    final List<DependencyInstance> depInstList = new ArrayList<>();
+    final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    sdf.setTimeZone(TimeZone.getDefault());
+    final Date startDate = sdf.parse("2000-01-11 16:00:00");
+    final Date endDate = sdf.parse("2000-01-11 16:00:00");
+
+    final List<DependencyInstance> depInstList = Arrays.asList(
+        new DependencyInstance("dep1", startDate.getTime(), endDate.getTime(), null,
+            Status.CANCELLED, CancellationCause.MANUAL),
+        new DependencyInstance("dep2", startDate.getTime(), endDate.getTime(), null,
+            Status.SUCCEEDED, CancellationCause.NONE),
+        new DependencyInstance("dep3", startDate.getTime(), endDate.getTime(), null,
+            Status.CANCELLED, CancellationCause.TIMEOUT),
+        new DependencyInstance("dep4", startDate.getTime(), endDate.getTime(), null,
+            Status.CANCELLED, CancellationCause.CASCADING)
+    );
+
     return new TriggerInstance("instanceId", flowTrigger, "flowId", 1,
         "test", depInstList, -1, proj);
   }
 
   @Before
   public void setUp() throws Exception {
+    this.message = EmailerTest.mockEmailMessage();
+    this.messageCreator = EmailerTest.mockMessageCreator(this.message);
+
     this.triggerInstLoader = mock(FlowTriggerInstanceLoader.class);
     this.executorManager = mock(ExecutorManager.class);
     when(this.executorManager.submitExecutableFlow(any(), anyString())).thenReturn("return");
-    this.emailer = mock(Emailer.class);
+    final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+    this.emailer = Mockito.spy(new Emailer(EmailerTest.createMailProperties(), commonMetrics,
+        this.messageCreator));
     this.sendEmailLatch = new CountDownLatch(1);
     doAnswer(invocation -> {
       this.sendEmailLatch.countDown();
@@ -81,7 +117,7 @@ public class TriggerInstanceProcessorTest {
   }
 
   @Test
-  public void testProcessSucceed() throws ExecutorManagerException {
+  public void testProcessSucceed() throws ExecutorManagerException, ParseException {
     final TriggerInstance triggerInstance = createTriggerInstance();
     this.processor.processSucceed(triggerInstance);
     verify(this.executorManager).submitExecutableFlow(any(), anyString());
@@ -93,13 +129,17 @@ public class TriggerInstanceProcessorTest {
     final TriggerInstance triggerInstance = createTriggerInstance();
     this.processor.processTermination(triggerInstance);
     this.sendEmailLatch.await(10L, TimeUnit.SECONDS);
-    verify(this.emailer).sendEmail(any(), any(), any());
+    verify(this.message).setSubject(
+        "flow trigger for flow 'flowId', project 'proj' has been cancelled on azkaban");
+    assertThat(TestUtils.readResource("/emailTemplate/flowtriggerfailureemail.html", this))
+        .isEqualToIgnoringWhitespace(this.message.getBody());
   }
 
   @Test
-  public void testNewInstance() {
+  public void testNewInstance() throws ParseException {
     final TriggerInstance triggerInstance = createTriggerInstance();
     this.processor.processNewInstance(triggerInstance);
     verify(this.triggerInstLoader).uploadTriggerInstance(triggerInstance);
   }
 }
+
diff --git a/azkaban-web-server/src/test/resources/emailTemplate/flowtriggerfailureemail.html b/azkaban-web-server/src/test/resources/emailTemplate/flowtriggerfailureemail.html
new file mode 100644
index 0000000..21a272d
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/emailTemplate/flowtriggerfailureemail.html
@@ -0,0 +1,47 @@
+<table>
+  <tr>
+    <td>Start Time</td>
+    <td>
+  <tr>
+    <td>2000-01-11 16:00:00</td>
+    <td>
+  <tr>
+    <td>End Time</td>
+    <td>
+  <tr>
+    <td>2000-01-11 16:00:00</td>
+    <td>
+  <tr>
+    <td>Duration</td>
+    <td>0 sec</td>
+  </tr>
+  <tr>
+    <td>Status</td>
+    <td>CANCELLED</td>
+  </tr>
+</table><a href="http://localhost:8786/executor?triggerinstanceid=instanceId">flowId Flow Trigger
+  Instance Link</a><h3>Cancelled Dependencies</h3>
+<table>
+  <tr>
+    <td>Dependency Name: dep1</td>
+    <td>
+  <tr>
+    <td>Cancellation Cause: MANUAL</td>
+    <td>
+</table>
+<table>
+  <tr>
+    <td>Dependency Name: dep3</td>
+    <td>
+  <tr>
+    <td>Cancellation Cause: TIMEOUT</td>
+    <td>
+</table>
+<table>
+  <tr>
+    <td>Dependency Name: dep4</td>
+    <td>
+  <tr>
+    <td>Cancellation Cause: CASCADING</td>
+    <td>
+</table>
\ No newline at end of file