Details
diff --git a/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java b/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
index f9e8eae..647cc1b 100644
--- a/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
@@ -34,7 +34,7 @@ public class AbstractMailer {
this.attachmentMazSizeInByte = maxAttachmentSizeInMB * MB_IN_BYTES;
}
- protected EmailMessage createEmailMessage(final String subject, final String mimetype,
+ public EmailMessage createEmailMessage(final String subject, final String mimetype,
final Collection<String> emailList) {
final EmailMessage message = this.messageCreator.createMessage();
message.addAllToAddress(emailList);
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index af477ee..6247b59 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -52,8 +52,7 @@ public class Emailer extends AbstractMailer implements Alerter {
final int mailTimeout = props.getInt("mail.timeout.millis", 30000);
EmailMessage.setTimeout(mailTimeout);
- final int connectionTimeout =
- props.getInt("mail.connection.timeout.millis", 30000);
+ final int connectionTimeout = props.getInt("mail.connection.timeout.millis", 30000);
EmailMessage.setConnectionTimeout(connectionTimeout);
EmailMessage.setTotalAttachmentMaxSize(getAttachmentMaxSize());
@@ -75,6 +74,10 @@ public class Emailer extends AbstractMailer implements Alerter {
}
}
+ public String getAzkabanURL() {
+ return this.scheme + "://" + this.clientHostname + ":" + this.clientPortNumber;
+ }
+
/**
* Send an email to the specified email list
*/
@@ -135,7 +138,7 @@ public class Emailer extends AbstractMailer implements Alerter {
return mailCreator;
}
- private void sendEmail(final EmailMessage message, final boolean mailCreated,
+ public void sendEmail(final EmailMessage message, final boolean mailCreated,
final String operation) {
if (mailCreated) {
try {
@@ -158,5 +161,4 @@ public class Emailer extends AbstractMailer implements Alerter {
return flowName;
}
}
-
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index e5c263e..5302156 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -38,7 +38,7 @@ import org.mockito.stubbing.Answer;
public class EmailerTest {
- private final String receiveAddr = "receive@domain.com";//receiver email address
+ private static final String receiveAddr = "receive@domain.com";//receiver email address
private final List<String> receiveAddrList = new ArrayList<>();
private Project project;
@@ -66,6 +66,16 @@ public class EmailerTest {
return message;
}
+ public static Props createMailProperties() {
+ final Props props = new Props();
+ props.put("job.failure.email", receiveAddr);
+ props.put("server.port", "114");
+ props.put("jetty.use.ssl", "false");
+ props.put("server.useSSL", "false");
+ props.put("jetty.port", "8786");
+ return props;
+ }
+
@Before
public void setUp() throws Exception {
this.message = mockEmailMessage();
@@ -98,6 +108,13 @@ public class EmailerTest {
}
@Test
+ public void testGetAzkabanURL() {
+ final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+ final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
+ assertThat(emailer.getAzkabanURL()).isEqualTo("http://localhost:8786");
+ }
+
+ @Test
public void testCreateEmailMessage() {
final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
@@ -109,14 +126,4 @@ public class EmailerTest {
verify(this.message).setSubject("subject");
verify(this.message).setMimeType("text/html");
}
-
- private Props createMailProperties() {
- final Props props = new Props();
- props.put("job.failure.email", this.receiveAddr);
- props.put("server.port", "114");
- props.put("jetty.use.ssl", "false");
- props.put("server.useSSL", "false");
- props.put("jetty.port", "8786");
- return props;
- }
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
index bf3562b..69eebf1 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstanceProcessor.java
@@ -23,8 +23,12 @@ import azkaban.flow.Flow;
import azkaban.flow.FlowUtils;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.project.Project;
+import azkaban.utils.EmailMessage;
import azkaban.utils.Emailer;
+import azkaban.utils.Utils;
import com.google.common.base.Preconditions;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -33,14 +37,13 @@ import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@SuppressWarnings("FutureReturnValueIgnored")
@Singleton
+@SuppressWarnings("FutureReturnValueIgnored")
public class TriggerInstanceProcessor {
private static final Logger logger = LoggerFactory.getLogger(TriggerInstanceProcessor.class);
- private static final String FAILURE_EMAIL_SUBJECT = "flow trigger for %s "
- + "cancelled from %s";
- private static final String FAILURE_EMAIL_BODY = "Your flow trigger cancelled [id: %s]";
+ private static final String FAILURE_EMAIL_SUBJECT = "flow trigger for flow '%s', project '%s' "
+ + "has been cancelled on %s";
private final static int THREAD_POOL_SIZE = 32;
private final ExecutorManager executorManager;
private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
@@ -83,22 +86,57 @@ public class TriggerInstanceProcessor {
}
private String generateFailureEmailSubject(final TriggerInstance triggerInstance) {
- final String flowFullName =
- triggerInstance.getProjectName() + "." + triggerInstance.getFlowId();
- return String.format(FAILURE_EMAIL_SUBJECT, flowFullName, this.emailer.getAzkabanName());
+ return String.format(FAILURE_EMAIL_SUBJECT, triggerInstance.getFlowId(), triggerInstance
+ .getProjectName(), this.emailer.getAzkabanName());
}
- private String generateFailureEmailBody(final TriggerInstance triggerInstance) {
- final String triggerInstFullName =
- triggerInstance.getProjectName() + "." + triggerInstance.getFlowId();
- return String.format(FAILURE_EMAIL_BODY, triggerInstFullName);
+ private EmailMessage createFlowTriggerFailureEmailMessage(final TriggerInstance triggerInst) {
+ final EmailMessage message = this.emailer.createEmailMessage(generateFailureEmailSubject
+ (triggerInst), "text/html", triggerInst.getFailureEmails());
+ final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ message.addAllToAddress(triggerInst.getFailureEmails());
+ message.setMimeType("text/html");
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>");
+ message.println("<tr><td>" + sdf.format(new Date(triggerInst.getStartTime())) + "</td><td>");
+
+ message.println("<tr><td>End Time</td><td>");
+ message.println("<tr><td>" + sdf.format(new Date(triggerInst.getEndTime())) + "</td><td>");
+ message.println("<tr><td>Duration</td><td>"
+ + Utils.formatDuration(triggerInst.getStartTime(), triggerInst.getEndTime())
+ + "</td></tr>");
+ message.println("<tr><td>Status</td><td>" + triggerInst.getStatus() + "</td></tr>");
+ message.println("</table>");
+ message.println("");
+ final String executionUrl = this.emailer.getAzkabanURL() + "/executor?triggerinstanceid="
+ + triggerInst.getId();
+
+ message.println("<a href=\"" + executionUrl + "\">" + triggerInst.getFlowId()
+ + " Flow Trigger Instance Link</a>");
+
+ message.println("");
+ message.println("<h3>Cancelled Dependencies</h3>");
+
+ for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+ if (depInst.getStatus() == Status.CANCELLED) {
+ message.println("<table>");
+ message.println("<tr><td>Dependency Name: " + depInst.getDepName() + "</td><td>");
+ message
+ .println("<tr><td>Cancellation Cause: " + depInst.getCancellationCause() + "</td><td>");
+ message.println("</table>");
+ }
+ }
+
+ return message;
}
private void sendFailureEmailIfConfigured(final TriggerInstance triggerInstance) {
final List<String> failureEmails = triggerInstance.getFailureEmails();
if (!failureEmails.isEmpty()) {
- this.emailer.sendEmail(failureEmails, generateFailureEmailSubject(triggerInstance),
- generateFailureEmailBody(triggerInstance));
+ final EmailMessage message = this.createFlowTriggerFailureEmailMessage(triggerInstance);
+ this.emailer.sendEmail(message, true, "email message failure email for flow trigger "
+ + triggerInstance.getId());
}
}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
index 9c61e84..39f56e8 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TriggerInstanceProcessorTest.java
@@ -15,6 +15,7 @@
*/
package azkaban.flowtrigger;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
@@ -26,19 +27,32 @@ import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
+import azkaban.metrics.CommonMetrics;
+import azkaban.metrics.MetricsManager;
import azkaban.project.CronSchedule;
import azkaban.project.FlowTrigger;
import azkaban.project.Project;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.EmailMessageCreator;
import azkaban.utils.Emailer;
+import azkaban.utils.EmailerTest;
+import azkaban.utils.TestUtils;
+import com.codahale.metrics.MetricRegistry;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
import java.util.List;
+import java.util.TimeZone;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.assertj.core.util.Lists;
import org.assertj.core.util.Maps;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
public class TriggerInstanceProcessorTest {
@@ -47,30 +61,52 @@ public class TriggerInstanceProcessorTest {
private FlowTriggerInstanceLoader triggerInstLoader;
private ExecutorManager executorManager;
private Emailer emailer;
+ private EmailMessage message;
+ private EmailMessageCreator messageCreator;
private TriggerInstanceProcessor processor;
private CountDownLatch sendEmailLatch;
- private static TriggerInstance createTriggerInstance() {
+ private static TriggerInstance createTriggerInstance() throws ParseException {
final FlowTrigger flowTrigger = new FlowTrigger(
new CronSchedule("* * * * ? *"),
new ArrayList<>(),
Duration.ofMinutes(1)
);
final Project proj = new Project(1, "proj");
- final Flow flow = new Flow("flowId");
+ final Flow flow = new Flow("123");
flow.addFailureEmails(Lists.newArrayList(EMAIL));
proj.setFlows(Maps.newHashMap("flowId", flow));
- final List<DependencyInstance> depInstList = new ArrayList<>();
+ final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(TimeZone.getDefault());
+ final Date startDate = sdf.parse("2000-01-11 16:00:00");
+ final Date endDate = sdf.parse("2000-01-11 16:00:00");
+
+ final List<DependencyInstance> depInstList = Arrays.asList(
+ new DependencyInstance("dep1", startDate.getTime(), endDate.getTime(), null,
+ Status.CANCELLED, CancellationCause.MANUAL),
+ new DependencyInstance("dep2", startDate.getTime(), endDate.getTime(), null,
+ Status.SUCCEEDED, CancellationCause.NONE),
+ new DependencyInstance("dep3", startDate.getTime(), endDate.getTime(), null,
+ Status.CANCELLED, CancellationCause.TIMEOUT),
+ new DependencyInstance("dep4", startDate.getTime(), endDate.getTime(), null,
+ Status.CANCELLED, CancellationCause.CASCADING)
+ );
+
return new TriggerInstance("instanceId", flowTrigger, "flowId", 1,
"test", depInstList, -1, proj);
}
@Before
public void setUp() throws Exception {
+ this.message = EmailerTest.mockEmailMessage();
+ this.messageCreator = EmailerTest.mockMessageCreator(this.message);
+
this.triggerInstLoader = mock(FlowTriggerInstanceLoader.class);
this.executorManager = mock(ExecutorManager.class);
when(this.executorManager.submitExecutableFlow(any(), anyString())).thenReturn("return");
- this.emailer = mock(Emailer.class);
+ final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
+ this.emailer = Mockito.spy(new Emailer(EmailerTest.createMailProperties(), commonMetrics,
+ this.messageCreator));
this.sendEmailLatch = new CountDownLatch(1);
doAnswer(invocation -> {
this.sendEmailLatch.countDown();
@@ -81,7 +117,7 @@ public class TriggerInstanceProcessorTest {
}
@Test
- public void testProcessSucceed() throws ExecutorManagerException {
+ public void testProcessSucceed() throws ExecutorManagerException, ParseException {
final TriggerInstance triggerInstance = createTriggerInstance();
this.processor.processSucceed(triggerInstance);
verify(this.executorManager).submitExecutableFlow(any(), anyString());
@@ -93,13 +129,17 @@ public class TriggerInstanceProcessorTest {
final TriggerInstance triggerInstance = createTriggerInstance();
this.processor.processTermination(triggerInstance);
this.sendEmailLatch.await(10L, TimeUnit.SECONDS);
- verify(this.emailer).sendEmail(any(), any(), any());
+ verify(this.message).setSubject(
+ "flow trigger for flow 'flowId', project 'proj' has been cancelled on azkaban");
+ assertThat(TestUtils.readResource("/emailTemplate/flowtriggerfailureemail.html", this))
+ .isEqualToIgnoringWhitespace(this.message.getBody());
}
@Test
- public void testNewInstance() {
+ public void testNewInstance() throws ParseException {
final TriggerInstance triggerInstance = createTriggerInstance();
this.processor.processNewInstance(triggerInstance);
verify(this.triggerInstLoader).uploadTriggerInstance(triggerInstance);
}
}
+
diff --git a/azkaban-web-server/src/test/resources/emailTemplate/flowtriggerfailureemail.html b/azkaban-web-server/src/test/resources/emailTemplate/flowtriggerfailureemail.html
new file mode 100644
index 0000000..21a272d
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/emailTemplate/flowtriggerfailureemail.html
@@ -0,0 +1,47 @@
+<table>
+ <tr>
+ <td>Start Time</td>
+ <td>
+ <tr>
+ <td>2000-01-11 16:00:00</td>
+ <td>
+ <tr>
+ <td>End Time</td>
+ <td>
+ <tr>
+ <td>2000-01-11 16:00:00</td>
+ <td>
+ <tr>
+ <td>Duration</td>
+ <td>0 sec</td>
+ </tr>
+ <tr>
+ <td>Status</td>
+ <td>CANCELLED</td>
+ </tr>
+</table><a href="http://localhost:8786/executor?triggerinstanceid=instanceId">flowId Flow Trigger
+ Instance Link</a><h3>Cancelled Dependencies</h3>
+<table>
+ <tr>
+ <td>Dependency Name: dep1</td>
+ <td>
+ <tr>
+ <td>Cancellation Cause: MANUAL</td>
+ <td>
+</table>
+<table>
+ <tr>
+ <td>Dependency Name: dep3</td>
+ <td>
+ <tr>
+ <td>Cancellation Cause: TIMEOUT</td>
+ <td>
+</table>
+<table>
+ <tr>
+ <td>Dependency Name: dep4</td>
+ <td>
+ <tr>
+ <td>Cancellation Cause: CASCADING</td>
+ <td>
+</table>
\ No newline at end of file