azkaban-aplcache
Changes
azkaban-common/src/main/java/azkaban/utils/Emailer.java 180(+52 -128)
Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
index 9080a39..afb90b9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -17,13 +17,15 @@
package azkaban.executor.mail;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Status;
import azkaban.utils.EmailMessage;
-import azkaban.utils.Emailer;
import azkaban.utils.Utils;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -61,6 +63,16 @@ public class DefaultMailCreator implements MailCreator {
}
}
+ private static List<String> findFailedJobs(final ExecutableFlow flow) {
+ final ArrayList<String> failedJobs = new ArrayList<>();
+ for (final ExecutableNode node : flow.getExecutableNodes()) {
+ if (node.getStatus() == Status.FAILED) {
+ failedJobs.add(node.getId());
+ }
+ }
+ return failedJobs;
+ }
+
@Override
public boolean createFirstErrorMessage(final ExecutableFlow flow,
final EmailMessage message, final String azkabanName, final String scheme,
@@ -110,7 +122,7 @@ public class DefaultMailCreator implements MailCreator {
message.println("");
message.println("<h3>Reason</h3>");
- final List<String> failedJobs = Emailer.findFailedJobs(flow);
+ final List<String> failedJobs = findFailedJobs(flow);
message.println("<ul>");
for (final String jobId : failedJobs) {
message.println("<li><a href=\"" + executionUrl + "&job=" + jobId
@@ -162,7 +174,7 @@ public class DefaultMailCreator implements MailCreator {
message.println("");
message.println("<h3>Reason</h3>");
- final List<String> failedJobs = Emailer.findFailedJobs(flow);
+ final List<String> failedJobs = findFailedJobs(flow);
message.println("<ul>");
for (final String jobId : failedJobs) {
message.println("<li><a href=\"" + executionUrl + "&job=" + jobId
azkaban-common/src/main/java/azkaban/utils/Emailer.java 180(+52 -128)
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index 917a550..af477ee 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -22,14 +22,10 @@ import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.alert.Alerter;
import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionOptions;
-import azkaban.executor.Status;
import azkaban.executor.mail.DefaultMailCreator;
import azkaban.executor.mail.MailCreator;
import azkaban.metrics.CommonMetrics;
import azkaban.sla.SlaOption;
-import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -46,7 +42,6 @@ public class Emailer extends AbstractMailer implements Alerter {
private final String clientHostname;
private final String clientPortNumber;
private final String azkabanName;
- private final boolean testMode;
@Inject
public Emailer(final Props props, final CommonMetrics commonMetrics,
@@ -78,27 +73,6 @@ public class Emailer extends AbstractMailer implements Alerter {
props.getInt(ConfigurationKeys.AZKABAN_WEBSERVER_EXTERNAL_PORT, props.getInt("jetty.port",
Constants.DEFAULT_PORT_NUMBER)));
}
-
- this.testMode = props.getBoolean("test.mode", false);
- }
-
- public static List<String> findFailedJobs(final ExecutableFlow flow) {
- final ArrayList<String> failedJobs = new ArrayList<>();
- for (final ExecutableNode node : flow.getExecutableNodes()) {
- if (node.getStatus() == Status.FAILED) {
- failedJobs.add(node.getId());
- }
- }
- return failedJobs;
- }
-
- private void sendSlaAlertEmail(final SlaOption slaOption, final String slaMessage) {
- final String subject =
- "SLA violation for " + getJobOrFlowName(slaOption) + " on " + getAzkabanName();
- final List<String> emailList =
- (List<String>) slaOption.getInfo().get(SlaOption.INFO_EMAIL_LIST);
- logger.info("Sending SLA email " + slaMessage);
- sendEmail(emailList, subject, slaMessage);
}
/**
@@ -106,133 +80,83 @@ public class Emailer extends AbstractMailer implements Alerter {
*/
public void sendEmail(final List<String> emailList, final String subject, final String body) {
if (emailList != null && !emailList.isEmpty()) {
- final EmailMessage message =
- super.createEmailMessage(subject, "text/html", emailList);
-
+ final EmailMessage message = super.createEmailMessage(subject, "text/html", emailList);
message.setBody(body);
-
- if (!this.testMode) {
- try {
- message.sendEmail();
- logger.info("Sent email message " + body);
- this.commonMetrics.markSendEmailSuccess();
- } catch (final Exception e) {
- logger.error("Failed to send email message " + body, e);
- this.commonMetrics.markSendEmailFail();
- }
- }
+ sendEmail(message, true, "email message " + body);
}
}
- private String getJobOrFlowName(final SlaOption slaOption) {
- final String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
- final String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
- if (org.apache.commons.lang.StringUtils.isNotBlank(jobName)) {
- return flowName + ":" + jobName;
- } else {
- return flowName;
- }
+ @Override
+ public void alertOnSla(final SlaOption slaOption, final String slaMessage) {
+ final String subject =
+ "SLA violation for " + getJobOrFlowName(slaOption) + " on " + getAzkabanName();
+ final List<String> emailList =
+ (List<String>) slaOption.getInfo().get(SlaOption.INFO_EMAIL_LIST);
+ logger.info("Sending SLA email " + slaMessage);
+ sendEmail(emailList, subject, slaMessage);
}
- public void sendFirstErrorMessage(final ExecutableFlow flow) {
+ @Override
+ public void alertOnFirstError(final ExecutableFlow flow) {
final EmailMessage message = this.messageCreator.createMessage();
-
- final ExecutionOptions option = flow.getExecutionOptions();
-
- final MailCreator mailCreator =
- DefaultMailCreator.getCreator(option.getMailCreator());
-
- logger.debug("ExecutorMailer using mail creator:"
- + mailCreator.getClass().getCanonicalName());
-
- final boolean mailCreated =
- mailCreator.createFirstErrorMessage(flow, message, this.azkabanName, this.scheme,
- this.clientHostname, this.clientPortNumber);
-
- if (mailCreated && !this.testMode) {
- try {
- message.sendEmail();
- logger.info("Sent first error email message for execution " + flow.getExecutionId());
- this.commonMetrics.markSendEmailSuccess();
- } catch (final Exception e) {
- logger.error(
- "Failed to send first error email message for execution " + flow.getExecutionId(), e);
- this.commonMetrics.markSendEmailFail();
- }
- }
+ final MailCreator mailCreator = getMailCreator(flow);
+ final boolean mailCreated = mailCreator.createFirstErrorMessage(flow, message, this.azkabanName,
+ this.scheme, this.clientHostname, this.clientPortNumber);
+ sendEmail(message, mailCreated,
+ "first error email message for execution " + flow.getExecutionId());
}
- public void sendErrorEmail(final ExecutableFlow flow, final String... extraReasons) {
+ @Override
+ public void alertOnError(final ExecutableFlow flow, final String... extraReasons) {
final EmailMessage message = this.messageCreator.createMessage();
-
- final ExecutionOptions option = flow.getExecutionOptions();
-
- final MailCreator mailCreator =
- DefaultMailCreator.getCreator(option.getMailCreator());
- logger.debug("ExecutorMailer using mail creator:"
- + mailCreator.getClass().getCanonicalName());
-
- final boolean mailCreated =
- mailCreator.createErrorEmail(flow, message, this.azkabanName, this.scheme,
- this.clientHostname, this.clientPortNumber, extraReasons);
-
- if (mailCreated && !this.testMode) {
- try {
- message.sendEmail();
- logger.info("Sent error email message for execution " + flow.getExecutionId());
- this.commonMetrics.markSendEmailSuccess();
- } catch (final Exception e) {
- logger
- .error("Failed to send error email message for execution " + flow.getExecutionId(), e);
- this.commonMetrics.markSendEmailFail();
- }
- }
+ final MailCreator mailCreator = getMailCreator(flow);
+ final boolean mailCreated = mailCreator.createErrorEmail(flow, message, this.azkabanName,
+ this.scheme, this.clientHostname, this.clientPortNumber, extraReasons);
+ sendEmail(message, mailCreated, "error email message for execution " + flow.getExecutionId());
}
- public void sendSuccessEmail(final ExecutableFlow flow) {
+ @Override
+ public void alertOnSuccess(final ExecutableFlow flow) {
final EmailMessage message = this.messageCreator.createMessage();
+ final MailCreator mailCreator = getMailCreator(flow);
+ final boolean mailCreated = mailCreator.createSuccessEmail(flow, message, this.azkabanName,
+ this.scheme, this.clientHostname, this.clientPortNumber);
+ sendEmail(message, mailCreated, "success email message for execution" + flow.getExecutionId());
+ }
- final ExecutionOptions option = flow.getExecutionOptions();
-
- final MailCreator mailCreator =
- DefaultMailCreator.getCreator(option.getMailCreator());
- logger.debug("ExecutorMailer using mail creator:"
- + mailCreator.getClass().getCanonicalName());
+ private MailCreator getMailCreator(final ExecutableFlow flow) {
+ final String name = flow.getExecutionOptions().getMailCreator();
+ return getMailCreator(name);
+ }
- final boolean mailCreated =
- mailCreator.createSuccessEmail(flow, message, this.azkabanName, this.scheme,
- this.clientHostname, this.clientPortNumber);
+ private MailCreator getMailCreator(final String name) {
+ final MailCreator mailCreator = DefaultMailCreator.getCreator(name);
+ logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+ return mailCreator;
+ }
- if (mailCreated && !this.testMode) {
+ private void sendEmail(final EmailMessage message, final boolean mailCreated,
+ final String operation) {
+ if (mailCreated) {
try {
message.sendEmail();
- logger.info("Sent success email message for execution " + flow.getExecutionId());
+ logger.info("Sent " + operation);
this.commonMetrics.markSendEmailSuccess();
} catch (final Exception e) {
- logger.error("Failed to send success email message for execution " + flow.getExecutionId(),
- e);
+ logger.error("Failed to send " + operation, e);
this.commonMetrics.markSendEmailFail();
}
}
}
- @Override
- public void alertOnSuccess(final ExecutableFlow exflow) {
- sendSuccessEmail(exflow);
- }
-
- @Override
- public void alertOnError(final ExecutableFlow exflow, final String... extraReasons) {
- sendErrorEmail(exflow, extraReasons);
- }
-
- @Override
- public void alertOnFirstError(final ExecutableFlow exflow) {
- sendFirstErrorMessage(exflow);
+ private String getJobOrFlowName(final SlaOption slaOption) {
+ final String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+ final String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+ if (org.apache.commons.lang.StringUtils.isNotBlank(jobName)) {
+ return flowName + ":" + jobName;
+ } else {
+ return flowName;
+ }
}
- @Override
- public void alertOnSla(final SlaOption slaOption, final String slaMessage) {
- sendSlaAlertEmail(slaOption, slaMessage);
- }
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index 74ef788..11f72fa 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -91,7 +91,7 @@ public class EmailerTest {
final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
- emailer.sendErrorEmail(exFlow);
+ emailer.alertOnError(exFlow);
verify(this.message).addAllToAddress(this.receiveAddrList);
verify(this.message).setSubject("Flow 'jobe' has failed on azkaban");
assertThat(DefaultMailCreatorTest.read("errorEmail2.html"))