azkaban-aplcache

Refactor Emailer class (#1665) Refactor Emailer class:

5/1/2018 7:23:09 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
index 9080a39..afb90b9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -17,13 +17,15 @@
 package azkaban.executor.mail;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Status;
 import azkaban.utils.EmailMessage;
-import azkaban.utils.Emailer;
 import azkaban.utils.Utils;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -61,6 +63,16 @@ public class DefaultMailCreator implements MailCreator {
     }
   }
 
+  private static List<String> findFailedJobs(final ExecutableFlow flow) {
+    final ArrayList<String> failedJobs = new ArrayList<>();
+    for (final ExecutableNode node : flow.getExecutableNodes()) {
+      if (node.getStatus() == Status.FAILED) {
+        failedJobs.add(node.getId());
+      }
+    }
+    return failedJobs;
+  }
+
   @Override
   public boolean createFirstErrorMessage(final ExecutableFlow flow,
       final EmailMessage message, final String azkabanName, final String scheme,
@@ -110,7 +122,7 @@ public class DefaultMailCreator implements MailCreator {
 
       message.println("");
       message.println("<h3>Reason</h3>");
-      final List<String> failedJobs = Emailer.findFailedJobs(flow);
+      final List<String> failedJobs = findFailedJobs(flow);
       message.println("<ul>");
       for (final String jobId : failedJobs) {
         message.println("<li><a href=\"" + executionUrl + "&job=" + jobId
@@ -162,7 +174,7 @@ public class DefaultMailCreator implements MailCreator {
 
       message.println("");
       message.println("<h3>Reason</h3>");
-      final List<String> failedJobs = Emailer.findFailedJobs(flow);
+      final List<String> failedJobs = findFailedJobs(flow);
       message.println("<ul>");
       for (final String jobId : failedJobs) {
         message.println("<li><a href=\"" + executionUrl + "&job=" + jobId
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index 917a550..af477ee 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -22,14 +22,10 @@ import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.alert.Alerter;
 import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionOptions;
-import azkaban.executor.Status;
 import azkaban.executor.mail.DefaultMailCreator;
 import azkaban.executor.mail.MailCreator;
 import azkaban.metrics.CommonMetrics;
 import azkaban.sla.SlaOption;
-import java.util.ArrayList;
 import java.util.List;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -46,7 +42,6 @@ public class Emailer extends AbstractMailer implements Alerter {
   private final String clientHostname;
   private final String clientPortNumber;
   private final String azkabanName;
-  private final boolean testMode;
 
   @Inject
   public Emailer(final Props props, final CommonMetrics commonMetrics,
@@ -78,27 +73,6 @@ public class Emailer extends AbstractMailer implements Alerter {
           props.getInt(ConfigurationKeys.AZKABAN_WEBSERVER_EXTERNAL_PORT, props.getInt("jetty.port",
               Constants.DEFAULT_PORT_NUMBER)));
     }
-
-    this.testMode = props.getBoolean("test.mode", false);
-  }
-
-  public static List<String> findFailedJobs(final ExecutableFlow flow) {
-    final ArrayList<String> failedJobs = new ArrayList<>();
-    for (final ExecutableNode node : flow.getExecutableNodes()) {
-      if (node.getStatus() == Status.FAILED) {
-        failedJobs.add(node.getId());
-      }
-    }
-    return failedJobs;
-  }
-
-  private void sendSlaAlertEmail(final SlaOption slaOption, final String slaMessage) {
-    final String subject =
-        "SLA violation for " + getJobOrFlowName(slaOption) + " on " + getAzkabanName();
-    final List<String> emailList =
-        (List<String>) slaOption.getInfo().get(SlaOption.INFO_EMAIL_LIST);
-    logger.info("Sending SLA email " + slaMessage);
-    sendEmail(emailList, subject, slaMessage);
   }
 
   /**
@@ -106,133 +80,83 @@ public class Emailer extends AbstractMailer implements Alerter {
    */
   public void sendEmail(final List<String> emailList, final String subject, final String body) {
     if (emailList != null && !emailList.isEmpty()) {
-      final EmailMessage message =
-          super.createEmailMessage(subject, "text/html", emailList);
-
+      final EmailMessage message = super.createEmailMessage(subject, "text/html", emailList);
       message.setBody(body);
-
-      if (!this.testMode) {
-        try {
-          message.sendEmail();
-          logger.info("Sent email message " + body);
-          this.commonMetrics.markSendEmailSuccess();
-        } catch (final Exception e) {
-          logger.error("Failed to send email message " + body, e);
-          this.commonMetrics.markSendEmailFail();
-        }
-      }
+      sendEmail(message, true, "email message " + body);
     }
   }
 
-  private String getJobOrFlowName(final SlaOption slaOption) {
-    final String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
-    final String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
-    if (org.apache.commons.lang.StringUtils.isNotBlank(jobName)) {
-      return flowName + ":" + jobName;
-    } else {
-      return flowName;
-    }
+  @Override
+  public void alertOnSla(final SlaOption slaOption, final String slaMessage) {
+    final String subject =
+        "SLA violation for " + getJobOrFlowName(slaOption) + " on " + getAzkabanName();
+    final List<String> emailList =
+        (List<String>) slaOption.getInfo().get(SlaOption.INFO_EMAIL_LIST);
+    logger.info("Sending SLA email " + slaMessage);
+    sendEmail(emailList, subject, slaMessage);
   }
 
-  public void sendFirstErrorMessage(final ExecutableFlow flow) {
+  @Override
+  public void alertOnFirstError(final ExecutableFlow flow) {
     final EmailMessage message = this.messageCreator.createMessage();
-
-    final ExecutionOptions option = flow.getExecutionOptions();
-
-    final MailCreator mailCreator =
-        DefaultMailCreator.getCreator(option.getMailCreator());
-
-    logger.debug("ExecutorMailer using mail creator:"
-        + mailCreator.getClass().getCanonicalName());
-
-    final boolean mailCreated =
-        mailCreator.createFirstErrorMessage(flow, message, this.azkabanName, this.scheme,
-            this.clientHostname, this.clientPortNumber);
-
-    if (mailCreated && !this.testMode) {
-      try {
-        message.sendEmail();
-        logger.info("Sent first error email message for execution " + flow.getExecutionId());
-        this.commonMetrics.markSendEmailSuccess();
-      } catch (final Exception e) {
-        logger.error(
-            "Failed to send first error email message for execution " + flow.getExecutionId(), e);
-        this.commonMetrics.markSendEmailFail();
-      }
-    }
+    final MailCreator mailCreator = getMailCreator(flow);
+    final boolean mailCreated = mailCreator.createFirstErrorMessage(flow, message, this.azkabanName,
+        this.scheme, this.clientHostname, this.clientPortNumber);
+    sendEmail(message, mailCreated,
+        "first error email message for execution " + flow.getExecutionId());
   }
 
-  public void sendErrorEmail(final ExecutableFlow flow, final String... extraReasons) {
+  @Override
+  public void alertOnError(final ExecutableFlow flow, final String... extraReasons) {
     final EmailMessage message = this.messageCreator.createMessage();
-
-    final ExecutionOptions option = flow.getExecutionOptions();
-
-    final MailCreator mailCreator =
-        DefaultMailCreator.getCreator(option.getMailCreator());
-    logger.debug("ExecutorMailer using mail creator:"
-        + mailCreator.getClass().getCanonicalName());
-
-    final boolean mailCreated =
-        mailCreator.createErrorEmail(flow, message, this.azkabanName, this.scheme,
-            this.clientHostname, this.clientPortNumber, extraReasons);
-
-    if (mailCreated && !this.testMode) {
-      try {
-        message.sendEmail();
-        logger.info("Sent error email message for execution " + flow.getExecutionId());
-        this.commonMetrics.markSendEmailSuccess();
-      } catch (final Exception e) {
-        logger
-            .error("Failed to send error email message for execution " + flow.getExecutionId(), e);
-        this.commonMetrics.markSendEmailFail();
-      }
-    }
+    final MailCreator mailCreator = getMailCreator(flow);
+    final boolean mailCreated = mailCreator.createErrorEmail(flow, message, this.azkabanName,
+        this.scheme, this.clientHostname, this.clientPortNumber, extraReasons);
+    sendEmail(message, mailCreated, "error email message for execution " + flow.getExecutionId());
   }
 
-  public void sendSuccessEmail(final ExecutableFlow flow) {
+  @Override
+  public void alertOnSuccess(final ExecutableFlow flow) {
     final EmailMessage message = this.messageCreator.createMessage();
+    final MailCreator mailCreator = getMailCreator(flow);
+    final boolean mailCreated = mailCreator.createSuccessEmail(flow, message, this.azkabanName,
+        this.scheme, this.clientHostname, this.clientPortNumber);
+    sendEmail(message, mailCreated, "success email message for execution" + flow.getExecutionId());
+  }
 
-    final ExecutionOptions option = flow.getExecutionOptions();
-
-    final MailCreator mailCreator =
-        DefaultMailCreator.getCreator(option.getMailCreator());
-    logger.debug("ExecutorMailer using mail creator:"
-        + mailCreator.getClass().getCanonicalName());
+  private MailCreator getMailCreator(final ExecutableFlow flow) {
+    final String name = flow.getExecutionOptions().getMailCreator();
+    return getMailCreator(name);
+  }
 
-    final boolean mailCreated =
-        mailCreator.createSuccessEmail(flow, message, this.azkabanName, this.scheme,
-            this.clientHostname, this.clientPortNumber);
+  private MailCreator getMailCreator(final String name) {
+    final MailCreator mailCreator = DefaultMailCreator.getCreator(name);
+    logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+    return mailCreator;
+  }
 
-    if (mailCreated && !this.testMode) {
+  private void sendEmail(final EmailMessage message, final boolean mailCreated,
+      final String operation) {
+    if (mailCreated) {
       try {
         message.sendEmail();
-        logger.info("Sent success email message for execution " + flow.getExecutionId());
+        logger.info("Sent " + operation);
         this.commonMetrics.markSendEmailSuccess();
       } catch (final Exception e) {
-        logger.error("Failed to send success email message for execution " + flow.getExecutionId(),
-            e);
+        logger.error("Failed to send " + operation, e);
         this.commonMetrics.markSendEmailFail();
       }
     }
   }
 
-  @Override
-  public void alertOnSuccess(final ExecutableFlow exflow) {
-    sendSuccessEmail(exflow);
-  }
-
-  @Override
-  public void alertOnError(final ExecutableFlow exflow, final String... extraReasons) {
-    sendErrorEmail(exflow, extraReasons);
-  }
-
-  @Override
-  public void alertOnFirstError(final ExecutableFlow exflow) {
-    sendFirstErrorMessage(exflow);
+  private String getJobOrFlowName(final SlaOption slaOption) {
+    final String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+    final String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+    if (org.apache.commons.lang.StringUtils.isNotBlank(jobName)) {
+      return flowName + ":" + jobName;
+    } else {
+      return flowName;
+    }
   }
 
-  @Override
-  public void alertOnSla(final SlaOption slaOption, final String slaMessage) {
-    sendSlaAlertEmail(slaOption, slaMessage);
-  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index 74ef788..11f72fa 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -91,7 +91,7 @@ public class EmailerTest {
     final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
     final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
-    emailer.sendErrorEmail(exFlow);
+    emailer.alertOnError(exFlow);
     verify(this.message).addAllToAddress(this.receiveAddrList);
     verify(this.message).setSubject("Flow 'jobe' has failed on azkaban");
     assertThat(DefaultMailCreatorTest.read("errorEmail2.html"))