azkaban-aplcache

Implement unit tests for Emailer & EmailMessage (#1691) These

4/30/2018 9:45:47 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java b/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
index bd73eac..f9e8eae 100644
--- a/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
@@ -20,47 +20,26 @@ import java.util.Collection;
 
 public class AbstractMailer {
 
-  public static final int DEFAULT_SMTP_PORT = 25;
   private static final int MB_IN_BYTES = 1048576;
-  private final boolean usesAuth;
-
-  private final String mailHost;
-  private final int mailPort;
-  private final String mailUser;
-  private final String mailPassword;
-  private final String mailSender;
+  protected final EmailMessageCreator messageCreator;
   private final String azkabanName;
-  private final String tls;
 
   private final long attachmentMazSizeInByte;
 
-  public AbstractMailer(final Props props) {
+  public AbstractMailer(final Props props, final EmailMessageCreator messageCreator) {
     this.azkabanName = props.getString("azkaban.name", "azkaban");
-    this.mailHost = props.getString("mail.host", "localhost");
-    this.mailPort = props.getInt("mail.port", DEFAULT_SMTP_PORT);
-    this.mailUser = props.getString("mail.user", "");
-    this.mailPassword = props.getString("mail.password", "");
-    this.tls = props.getString("mail.tls", "false");
+    this.messageCreator = messageCreator;
     final long maxAttachmentSizeInMB =
         props.getInt("mail.max.attachment.size.mb", 100);
-
     this.attachmentMazSizeInByte = maxAttachmentSizeInMB * MB_IN_BYTES;
-
-    this.mailSender = props.getString("mail.sender", "");
-    this.usesAuth = props.getBoolean("mail.useAuth", true);
   }
 
   protected EmailMessage createEmailMessage(final String subject, final String mimetype,
       final Collection<String> emailList) {
-    final EmailMessage message = new EmailMessage(this.mailHost, this.mailPort, this.mailUser,
-        this.mailPassword);
-    message.setFromAddress(this.mailSender);
+    final EmailMessage message = this.messageCreator.createMessage();
     message.addAllToAddress(emailList);
     message.setMimeType(mimetype);
     message.setSubject(subject);
-    message.setAuth(this.usesAuth);
-    message.setTLS(this.tls);
-
     return message;
   }
 
@@ -71,11 +50,7 @@ public class AbstractMailer {
   /**
    * Attachment maximum size in bytes
    */
-  public long getAttachmentMaxSize() {
+  long getAttachmentMaxSize() {
     return this.attachmentMazSizeInByte;
   }
-
-  public boolean hasMailAuth() {
-    return this.usesAuth;
-  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index bd7d52c..917a550 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -45,26 +45,15 @@ public class Emailer extends AbstractMailer implements Alerter {
   private final String scheme;
   private final String clientHostname;
   private final String clientPortNumber;
-  private final String mailHost;
-  private final int mailPort;
-  private final String mailUser;
-  private final String mailPassword;
-  private final String mailSender;
   private final String azkabanName;
-  private final String tls;
   private final boolean testMode;
 
   @Inject
-  public Emailer(final Props props, final CommonMetrics commonMetrics) {
-    super(props);
+  public Emailer(final Props props, final CommonMetrics commonMetrics,
+      final EmailMessageCreator messageCreator) {
+    super(props, messageCreator);
     this.commonMetrics = requireNonNull(commonMetrics, "commonMetrics is null.");
     this.azkabanName = props.getString("azkaban.name", "azkaban");
-    this.mailHost = props.getString("mail.host", "localhost");
-    this.mailPort = props.getInt("mail.port", DEFAULT_SMTP_PORT);
-    this.mailUser = props.getString("mail.user", "");
-    this.mailPassword = props.getString("mail.password", "");
-    this.mailSender = props.getString("mail.sender", "");
-    this.tls = props.getString("mail.tls", "false");
 
     final int mailTimeout = props.getInt("mail.timeout.millis", 30000);
     EmailMessage.setTimeout(mailTimeout);
@@ -146,11 +135,7 @@ public class Emailer extends AbstractMailer implements Alerter {
   }
 
   public void sendFirstErrorMessage(final ExecutableFlow flow) {
-    final EmailMessage message = new EmailMessage(this.mailHost, this.mailPort, this.mailUser,
-        this.mailPassword);
-    message.setFromAddress(this.mailSender);
-    message.setTLS(this.tls);
-    message.setAuth(super.hasMailAuth());
+    final EmailMessage message = this.messageCreator.createMessage();
 
     final ExecutionOptions option = flow.getExecutionOptions();
 
@@ -178,11 +163,7 @@ public class Emailer extends AbstractMailer implements Alerter {
   }
 
   public void sendErrorEmail(final ExecutableFlow flow, final String... extraReasons) {
-    final EmailMessage message = new EmailMessage(this.mailHost, this.mailPort, this.mailUser,
-        this.mailPassword);
-    message.setFromAddress(this.mailSender);
-    message.setTLS(this.tls);
-    message.setAuth(super.hasMailAuth());
+    final EmailMessage message = this.messageCreator.createMessage();
 
     final ExecutionOptions option = flow.getExecutionOptions();
 
@@ -209,11 +190,7 @@ public class Emailer extends AbstractMailer implements Alerter {
   }
 
   public void sendSuccessEmail(final ExecutableFlow flow) {
-    final EmailMessage message = new EmailMessage(this.mailHost, this.mailPort, this.mailUser,
-        this.mailPassword);
-    message.setFromAddress(this.mailSender);
-    message.setTLS(this.tls);
-    message.setAuth(super.hasMailAuth());
+    final EmailMessage message = this.messageCreator.createMessage();
 
     final ExecutionOptions option = flow.getExecutionOptions();
 
diff --git a/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java b/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
index ca10a29..e5f81cc 100644
--- a/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
+++ b/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
@@ -16,7 +16,6 @@
 
 package azkaban.utils;
 
-import com.sun.mail.smtp.SMTPTransport;
 import java.io.File;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -30,16 +29,13 @@ import javax.activation.FileDataSource;
 import javax.mail.BodyPart;
 import javax.mail.Message;
 import javax.mail.MessagingException;
-import javax.mail.Session;
 import javax.mail.internet.InternetAddress;
 import javax.mail.internet.MimeBodyPart;
-import javax.mail.internet.MimeMessage;
 import javax.mail.internet.MimeMultipart;
 import org.apache.log4j.Logger;
 
 public class EmailMessage {
 
-  private static final String protocol = "smtp";
   private static final int MAX_EMAIL_RETRY_COUNT = 5;
   private static int _mailTimeout = 10000;
   private static int _connectionTimeout = 10000;
@@ -51,6 +47,7 @@ public class EmailMessage {
   private final String _mailHost;
   private final String _mailUser;
   private final String _mailPassword;
+  private final EmailMessageCreator creator;
   private String _subject;
   private String _fromAddress;
   private String _mimeType = "text/plain";
@@ -60,15 +57,13 @@ public class EmailMessage {
   private boolean _enableAttachementEmbedment = true;
   private StringBuffer _body = new StringBuffer();
 
-  public EmailMessage() {
-    this("localhost", AbstractMailer.DEFAULT_SMTP_PORT, "", "");
-  }
-
-  public EmailMessage(final String host, final int port, final String user, final String password) {
+  public EmailMessage(final String host, final int port, final String user, final String password,
+      final EmailMessageCreator creator) {
     this._mailUser = user;
     this._mailHost = host;
     this._mailPort = port;
     this._mailPassword = password;
+    this.creator = creator;
   }
 
   public static void setTimeout(final int timeoutMillis) {
@@ -171,21 +166,22 @@ public class EmailMessage {
     checkSettings();
     final Properties props = new Properties();
     if (this._usesAuth) {
-      props.put("mail." + protocol + ".auth", "true");
+      props.put("mail.smtp.auth", "true");
       props.put("mail.user", this._mailUser);
       props.put("mail.password", this._mailPassword);
     } else {
-      props.put("mail." + protocol + ".auth", "false");
+      props.put("mail.smtp.auth", "false");
     }
-    props.put("mail." + protocol + ".host", this._mailHost);
-    props.put("mail." + protocol + ".port", this._mailPort);
-    props.put("mail." + protocol + ".timeout", _mailTimeout);
-    props.put("mail." + protocol + ".connectiontimeout", _connectionTimeout);
+    props.put("mail.smtp.host", this._mailHost);
+    props.put("mail.smtp.port", this._mailPort);
+    props.put("mail.smtp.timeout", _mailTimeout);
+    props.put("mail.smtp.connectiontimeout", _connectionTimeout);
     props.put("mail.smtp.starttls.enable", this._tls);
     props.put("mail.smtp.ssl.trust", this._mailHost);
 
-    final Session session = Session.getInstance(props, null);
-    final Message message = new MimeMessage(session);
+    final JavaxMailSender sender = this.creator.createSender(props);
+    final Message message = sender.createMessage();
+
     final InternetAddress from = new InternetAddress(this._fromAddress, false);
     message.setFrom(from);
     for (final String toAddr : this._toAddress) {
@@ -214,48 +210,46 @@ public class EmailMessage {
       message.setContent(this._body.toString(), this._mimeType);
     }
 
-    final SMTPTransport t = (SMTPTransport) session.getTransport(protocol);
-
-    retryConnectToSMTPServer(t);
-    retrySendMessage(t, message);
-    t.close();
+    retryConnectToSMTPServer(sender);
+    retrySendMessage(sender, message);
+    sender.close();
   }
 
-  private void connectToSMTPServer(final SMTPTransport t) throws MessagingException {
+  private void connectToSMTPServer(final JavaxMailSender s) throws MessagingException {
     if (this._usesAuth) {
-      t.connect(this._mailHost, this._mailPort, this._mailUser, this._mailPassword);
+      s.connect(this._mailHost, this._mailPort, this._mailUser, this._mailPassword);
     } else {
-      t.connect();
+      s.connect();
     }
   }
 
-  private void retryConnectToSMTPServer(final SMTPTransport t) throws MessagingException {
+  private void retryConnectToSMTPServer(final JavaxMailSender s) throws MessagingException {
     int attempt;
     for (attempt = 0; attempt < MAX_EMAIL_RETRY_COUNT; attempt++) {
       try {
-        connectToSMTPServer(t);
+        connectToSMTPServer(s);
         return;
       } catch (final Exception e) {
         this.logger.error("Connecting to SMTP server failed, attempt: " + attempt, e);
       }
     }
-    t.close();
+    s.close();
     throw new MessagingException("Failed to connect to SMTP server after "
         + attempt + " attempts.");
   }
 
-  private void retrySendMessage(final SMTPTransport t, final Message message)
+  private void retrySendMessage(final JavaxMailSender s, final Message message)
       throws MessagingException {
     int attempt;
     for (attempt = 0; attempt < MAX_EMAIL_RETRY_COUNT; attempt++) {
       try {
-        t.sendMessage(message, message.getRecipients(Message.RecipientType.TO));
+        s.sendMessage(message, message.getRecipients(Message.RecipientType.TO));
         return;
       } catch (final Exception e) {
         this.logger.error("Sending email messages failed, attempt: " + attempt, e);
       }
     }
-    t.close();
+    s.close();
     throw new MessagingException("Failed to send email messages after "
         + attempt + " attempts.");
   }
diff --git a/azkaban-common/src/main/java/azkaban/utils/EmailMessageCreator.java b/azkaban-common/src/main/java/azkaban/utils/EmailMessageCreator.java
new file mode 100644
index 0000000..ac739ed
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/EmailMessageCreator.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.util.Properties;
+import javax.inject.Inject;
+import javax.mail.NoSuchProviderException;
+
+
+public class EmailMessageCreator {
+
+  public static final int DEFAULT_SMTP_PORT = 25;
+
+  private final String mailHost;
+  private final int mailPort;
+  private final String mailUser;
+  private final String mailPassword;
+  private final String mailSender;
+  private final String tls;
+  private final boolean usesAuth;
+
+  @Inject
+  public EmailMessageCreator(final Props props) {
+    this.mailHost = props.getString("mail.host", "localhost");
+    this.mailPort = props.getInt("mail.port", DEFAULT_SMTP_PORT);
+    this.mailUser = props.getString("mail.user", "");
+    this.mailPassword = props.getString("mail.password", "");
+    this.mailSender = props.getString("mail.sender", "");
+    this.tls = props.getString("mail.tls", "false");
+    this.usesAuth = props.getBoolean("mail.useAuth", true);
+  }
+
+  public EmailMessage createMessage() {
+    final EmailMessage message = new EmailMessage(
+        this.mailHost, this.mailPort, this.mailUser, this.mailPassword, this);
+    message.setFromAddress(this.mailSender);
+    message.setTLS(this.tls);
+    message.setAuth(this.usesAuth);
+    return message;
+  }
+
+  public JavaxMailSender createSender(final Properties props) throws NoSuchProviderException {
+    return new JavaxMailSender(props);
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/JavaxMailSender.java b/azkaban-common/src/main/java/azkaban/utils/JavaxMailSender.java
new file mode 100644
index 0000000..edf313c
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/JavaxMailSender.java
@@ -0,0 +1,50 @@
+package azkaban.utils;
+
+import com.sun.mail.smtp.SMTPTransport;
+import java.util.Properties;
+import javax.mail.Address;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.NoSuchProviderException;
+import javax.mail.Session;
+import javax.mail.internet.MimeMessage;
+
+/**
+ * Wraps javax.mail features, mostly because Session is a final class and can't be mocked.
+ */
+public class JavaxMailSender {
+
+  public static final String PROTOCOL = "smtp";
+
+  private final Session session;
+  private final SMTPTransport t;
+
+  public JavaxMailSender(final Properties props)
+      throws NoSuchProviderException {
+    this.session = Session.getInstance(props, null);
+    this.t = (SMTPTransport) this.session.getTransport(PROTOCOL);
+  }
+
+  public Message createMessage() {
+    return new MimeMessage(this.session);
+  }
+
+  public void connect(final String mailHost, final int mailPort, final String mailUser,
+      final String mailPassword) throws MessagingException {
+    this.t.connect(mailHost, mailPort, mailUser, mailPassword);
+  }
+
+  public void connect() throws MessagingException {
+    this.t.connect();
+  }
+
+  public void sendMessage(final Message message, final Address[] recipients)
+      throws MessagingException {
+    this.t.sendMessage(message, recipients);
+  }
+
+  public void close() throws MessagingException {
+    this.t.close();
+  }
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
index 3406ec9..37fed51 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutorManagerTest.java
@@ -29,8 +29,6 @@ import azkaban.Constants;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
 import azkaban.user.User;
-import azkaban.utils.AbstractMailerTest;
-import azkaban.utils.Emailer;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
@@ -70,8 +68,8 @@ public class ExecutorManagerTest {
 
   @Before
   public void setup() {
-    this.props = AbstractMailerTest.createMailProperties();
-    this.alertHolder = new AlerterHolder(this.props, new Emailer(this.props, this.commonMetrics));
+    this.props = new Props();
+    this.alertHolder = mock(AlerterHolder.class);
     this.loader = new MockExecutorLoader();
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java b/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
index d3ed6e7..3fbb68f 100644
--- a/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/mail/DefaultMailCreatorTest.java
@@ -12,6 +12,7 @@ import azkaban.flow.Flow;
 import azkaban.flow.Node;
 import azkaban.project.Project;
 import azkaban.utils.EmailMessage;
+import azkaban.utils.EmailMessageCreator;
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
 import java.io.InputStream;
@@ -44,6 +45,11 @@ public class DefaultMailCreatorTest {
   private String clientPortNumber;
   private TimeZone defaultTz;
 
+  public static String read(final String file) throws Exception {
+    final InputStream is = DefaultMailCreatorTest.class.getResourceAsStream(file);
+    return IOUtils.toString(is, Charsets.UTF_8).trim();
+  }
+
   @Before
   public void setUp() throws Exception {
     this.defaultTz = TimeZone.getDefault();
@@ -57,7 +63,8 @@ public class DefaultMailCreatorTest {
     this.flow = new Flow("mail-creator-test");
     this.project = new Project(1, "test-project");
     this.options = new ExecutionOptions();
-    this.message = new EmailMessage();
+    this.message = new EmailMessage("localhost", EmailMessageCreator.DEFAULT_SMTP_PORT, "", "",
+        null);
 
     this.azkabanName = "unit-tests";
     this.scheme = "http";
@@ -124,9 +131,4 @@ public class DefaultMailCreatorTest {
     assertThat(read("successEmail.html")).isEqualToIgnoringWhitespace(this.message.getBody());
   }
 
-  private String read(final String file) throws Exception {
-    final InputStream is = DefaultMailCreatorTest.class.getResourceAsStream(file);
-    return IOUtils.toString(is, Charsets.UTF_8).trim();
-  }
-
 }
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
index 61e5208..d6a072e 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerDeadlockTest.java
@@ -27,8 +27,6 @@ import azkaban.executor.MockExecutorLoader;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
 import azkaban.trigger.builtin.CreateTriggerAction;
-import azkaban.utils.AbstractMailerTest;
-import azkaban.utils.Emailer;
 import azkaban.utils.Props;
 import com.codahale.metrics.MetricRegistry;
 import java.util.ArrayList;
@@ -50,15 +48,14 @@ public class TriggerManagerDeadlockTest {
   @Before
   public void setup() throws ExecutorManagerException, TriggerManagerException {
     this.loader = new MockTriggerLoader();
-    final Props props = AbstractMailerTest.createMailProperties();
+    final Props props = new Props();
     props.put("trigger.scan.interval", 1000);
     props.put("executor.port", 12321);
     this.execLoader = new MockExecutorLoader();
     this.apiGateway = mock(ExecutorApiGateway.class);
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
     final ExecutorManager executorManager = new ExecutorManager(props, this.execLoader,
-        new AlerterHolder(props, new Emailer(props, commonMetrics)),
-        commonMetrics, this.apiGateway);
+        mock(AlerterHolder.class), commonMetrics, this.apiGateway);
     this.triggerManager = new TriggerManager(props, this.loader, executorManager);
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/utils/AbstractMailerTest.java b/azkaban-common/src/test/java/azkaban/utils/AbstractMailerTest.java
index c1ef290..7a80ca9 100644
--- a/azkaban-common/src/test/java/azkaban/utils/AbstractMailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/AbstractMailerTest.java
@@ -15,6 +15,9 @@
  */
 package azkaban.utils;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.verify;
+
 import java.util.ArrayList;
 import java.util.List;
 import org.junit.Before;
@@ -23,50 +26,29 @@ import org.junit.Test;
 public class AbstractMailerTest {
 
   List<String> senderList = new ArrayList<>();
-
-  public static Props createMailProperties() {
-    final Props props = new Props();
-    props.put("mail.user", "somebody");
-    props.put("mail.password", "pwd");
-    props.put("mail.sender", "somebody@xxx.com");
-    props.put("server.port", "114");
-    props.put("jetty.use.ssl", "false");
-    props.put("server.useSSL", "false");
-    props.put("jetty.port", "8786");
-    return props;
-
-  }
+  private EmailMessage message;
+  private EmailMessageCreator messageCreator;
+  private Props props;
 
   @Before
   public void setUp() throws Exception {
+    this.message = EmailerTest.mockEmailMessage();
+    this.messageCreator = EmailerTest.mockMessageCreator(this.message);
     this.senderList.add("sender@domain.com");
+    this.props = new Props();
+    this.props.put("server.port", "114");
+    this.props.put("jetty.use.ssl", "false");
+    this.props.put("server.useSSL", "false");
+    this.props.put("jetty.port", "8786");
   }
 
-  /**
-   * test emailMessage properties
-   */
   @Test
   public void testCreateEmailMessage() {
-
-    final Props props = createMailProperties();
-    props.put("mail.port", "445");
-    final AbstractMailer mailer = new AbstractMailer(props);
-    final EmailMessage emailMessage = mailer.createEmailMessage("subject", "text/html",
-        this.senderList);
-
-    assert emailMessage.getMailPort() == 445;
-
-
-  }
-
-  @Test
-  public void testCreateDefaultEmailMessage() {
-    final Props defaultProps = createMailProperties();
-    final AbstractMailer mailer = new AbstractMailer(defaultProps);
-    final EmailMessage emailMessage = mailer.createEmailMessage("subject", "text/html",
-        this.senderList);
-    assert emailMessage.getMailPort() == 25;
-
+    final AbstractMailer mailer = new AbstractMailer(this.props, this.messageCreator);
+    final EmailMessage em = mailer.createEmailMessage("subject", "text/html", this.senderList);
+    verify(this.messageCreator).createMessage();
+    assertThat(this.message).isEqualTo(em);
+    verify(this.message).setSubject("subject");
   }
 
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index ac2b049..74ef788 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -15,7 +15,14 @@
  */
 package azkaban.utils;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.mail.DefaultMailCreatorTest;
 import azkaban.flow.Flow;
 import azkaban.metrics.CommonMetrics;
 import azkaban.metrics.MetricsManager;
@@ -27,78 +34,85 @@ import java.util.ArrayList;
 import java.util.List;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
-
+import org.mockito.stubbing.Answer;
 
 public class EmailerTest {
 
-  String host = "smtp.domain.com";//smtp server address
-  int mailPort = 25;//smtp server port
-  String sender = "somebody@domain.com";//sender email address
-  String user = "somebody@domain.com";// the sender username
-  String password = "pwd"; //the sender password
-
-  String receiveAddr = "receive@domain.com";//receiver email address
-  List<String> receiveAddrList = new ArrayList<>();
+  private final String receiveAddr = "receive@domain.com";//receiver email address
+  private final List<String> receiveAddrList = new ArrayList<>();
 
   private Project project;
   private Props props;
+  private EmailMessageCreator messageCreator;
+  private EmailMessage message;
+
+  public static EmailMessageCreator mockMessageCreator(final EmailMessage message) {
+    final EmailMessageCreator mock = mock(EmailMessageCreator.class);
+    when(mock.createMessage()).thenReturn(message);
+    return mock;
+  }
+
+  public static EmailMessage mockEmailMessage() {
+    final EmailMessage message = mock(EmailMessage.class);
+    final StringBuffer body = new StringBuffer();
+
+    when(message.println(any())).thenAnswer((Answer<EmailMessage>) invocation -> {
+      body.append(invocation.<Object>getArgument(0));
+      return message;
+    });
+
+    when(message.getBody()).thenAnswer(invocation -> body.toString());
 
+    return message;
+  }
 
   @Before
   public void setUp() throws Exception {
+    this.message = mockEmailMessage();
+    this.messageCreator = mockMessageCreator(this.message);
     this.receiveAddrList.add(this.receiveAddr);
     this.project = new Project(11, "myTestProject");
 
     this.props = createMailProperties();
     final DirectoryFlowLoader loader = new DirectoryFlowLoader(this.props);
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
-    Assert.assertEquals(0, loader.getErrors().size());
+    assertThat(loader.getErrors()).isEmpty();
     this.project.setFlows(loader.getFlowMap());
     this.project.setVersion(123);
   }
 
-
-  /**
-   * this is an integration test for Emailer sending  email. if you want to run this case and send
-   * email successfully, please remove @Ignore and make sure these variable{host,mailPort,password,receiveAddr}
-   * are set to real values. the test will currently succeed because email sending errors are
-   * caught, you need to manually verify that a real email is sent and received.
-   */
-  @Ignore
   @Test
-  public void testSendEmail() throws Exception {
-
+  public void testSendErrorEmail() throws Exception {
     final Flow flow = this.project.getFlow("jobe");
     flow.addFailureEmails(this.receiveAddrList);
     Assert.assertNotNull(flow);
 
     final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
-    final Emailer emailer = new Emailer(this.props, commonMetrics);
+    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
     emailer.sendErrorEmail(exFlow);
-
+    verify(this.message).addAllToAddress(this.receiveAddrList);
+    verify(this.message).setSubject("Flow 'jobe' has failed on azkaban");
+    assertThat(DefaultMailCreatorTest.read("errorEmail2.html"))
+        .isEqualToIgnoringWhitespace(this.message.getBody());
   }
 
   @Test
   public void testCreateEmailMessage() {
     final CommonMetrics commonMetrics = new CommonMetrics(new MetricsManager(new MetricRegistry()));
-    final Emailer emailer = new Emailer(this.props, commonMetrics);
+    final Emailer emailer = new Emailer(this.props, commonMetrics, this.messageCreator);
     final EmailMessage em = emailer
         .createEmailMessage("subject", "text/html", this.receiveAddrList);
-    assert em.getMailPort() == this.mailPort;
-
+    verify(this.messageCreator).createMessage();
+    assertThat(this.messageCreator.createMessage()).isEqualTo(em);
+    verify(this.message).addAllToAddress(this.receiveAddrList);
+    verify(this.message).setSubject("subject");
+    verify(this.message).setMimeType("text/html");
   }
 
-
-  public Props createMailProperties() {
+  private Props createMailProperties() {
     final Props props = new Props();
-    props.put("mail.user", this.user);
-    props.put("mail.password", this.password);
-    props.put("mail.sender", this.sender);
-    props.put("mail.host", this.host);
-    props.put("mail.port", this.mailPort);
     props.put("job.failure.email", this.receiveAddr);
     props.put("server.port", "114");
     props.put("jetty.use.ssl", "false");
@@ -106,6 +120,4 @@ public class EmailerTest {
     props.put("jetty.port", "8786");
     return props;
   }
-
-
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailMessageCreatorTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailMessageCreatorTest.java
new file mode 100644
index 0000000..a7c0019
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailMessageCreatorTest.java
@@ -0,0 +1,31 @@
+package azkaban.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class EmailMessageCreatorTest {
+
+  private static final String HOST = "smtp.domain.com";
+  private static final int MAIL_PORT = 25;
+  private static final String SENDER = "somebody@domain.com";
+  private static final String USER = "somebody@domain.com";
+  private static final String PASSWORD = "pwd";
+
+  @Test
+  public void createMessage() {
+    final Props props = new Props();
+    props.put("mail.user", USER);
+    props.put("mail.password", PASSWORD);
+    props.put("mail.sender", SENDER);
+    props.put("mail.host", HOST);
+    props.put("mail.port", MAIL_PORT);
+    final EmailMessageCreator creator = new EmailMessageCreator(props);
+    final EmailMessage message = creator.createMessage();
+
+    assertThat(message.getMailPort()).isEqualTo(MAIL_PORT);
+    assertThat(message.getBody()).isEmpty();
+    assertThat(message.getSubject()).isNull();
+  }
+
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailMessageTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailMessageTest.java
index 927045a..679b65e 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailMessageTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailMessageTest.java
@@ -16,48 +16,55 @@
 
 package azkaban.utils;
 
-import java.io.IOException;
-import javax.mail.MessagingException;
-import org.junit.After;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import javax.mail.Address;
+import javax.mail.Message;
+import javax.mail.Message.RecipientType;
+import javax.mail.internet.InternetAddress;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class EmailMessageTest {
 
-  String host = "";
-  int port = 25;
-  String sender = "";
-  String user = "";
-  String password = "";
-
-  String toAddr = "";
+  private final String host = "example.com";
+  private final int port = 25;
+  private final String sender = "from@example.com";
+  private final String user = "user";
+  private final String password = "pass";
+  private final String toAddr = "to@example.com";
 
   private EmailMessage em;
+  private JavaxMailSender mailSender;
+  private Message mimeMessage;
+  private Address[] addresses;
+  private EmailMessageCreator creator;
 
   @Before
   public void setUp() throws Exception {
-    this.em = new EmailMessage(this.host, this.port, this.user, this.password);
-    this.em.setFromAddress(this.sender);
+    this.creator = mock(EmailMessageCreator.class);
+    this.mailSender = mock(JavaxMailSender.class);
+    this.mimeMessage = mock(Message.class);
+    this.addresses = new Address[]{new InternetAddress(this.toAddr, false)};
+    when(this.creator.createSender(any())).thenReturn(this.mailSender);
+    when(this.mailSender.createMessage()).thenReturn(this.mimeMessage);
+    when(this.mimeMessage.getRecipients(Message.RecipientType.TO)).thenReturn(this.addresses);
+    this.em = new EmailMessage(this.host, this.port, this.user, this.password, this.creator);
   }
 
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  @Ignore
   @Test
-  public void testSendEmail() throws IOException {
+  public void testSendEmail() throws Exception {
+    this.em.setTLS("true");
     this.em.addToAddress(this.toAddr);
-    // em.addToAddress("cyu@linkedin.com");
+    this.em.setFromAddress(this.sender);
     this.em.setSubject("azkaban test email");
     this.em.setBody("azkaban test email");
-    try {
-      this.em.sendEmail();
-    } catch (final MessagingException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
+    this.em.sendEmail();
+    verify(this.mimeMessage).addRecipient(RecipientType.TO, this.addresses[0]);
+    verify(this.mailSender).sendMessage(this.mimeMessage, this.addresses);
   }
 
 }
diff --git a/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail2.html b/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail2.html
new file mode 100644
index 0000000..7378aa6
--- /dev/null
+++ b/azkaban-common/src/test/resources/azkaban/executor/mail/errorEmail2.html
@@ -0,0 +1,21 @@
+<h2 style="color:#FF0000"> Execution '-1' of flow 'jobe' of project 'myTestProject' has failed on
+  azkaban</h2>
+<table>
+  <tr>
+    <td>Start Time</td>
+    <td>N/A</td>
+  </tr>
+  <tr>
+    <td>End Time</td>
+    <td>N/A</td>
+  </tr>
+  <tr>
+    <td>Duration</td>
+    <td>-</td>
+  </tr>
+  <tr>
+    <td>Status</td>
+    <td>READY</td>
+  </tr>
+</table><a href="http://localhost:8786/executor?execid=-1">jobe Execution Link</a><h3>Reason</h3>
+<ul></ul>