azkaban-aplcache

Merge pull request #323 from hluu/master Email should retry

10/9/2014 9:36:42 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java b/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
index 6fecc05..bbbec14 100644
--- a/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
@@ -19,6 +19,7 @@ package azkaban.utils;
 import java.util.Collection;
 
 public class AbstractMailer {
+  private static int MB_IN_BYTES = 1048576;
   private String clientHostname;
   private int clientPort;
   private boolean usesSSL;
@@ -32,18 +33,25 @@ public class AbstractMailer {
 
   private String referenceURL;
 
+  private long attachmentMazSizeInByte;
+
   public AbstractMailer(Props props) {
     this.azkabanName = props.getString("azkaban.name", "azkaban");
     this.mailHost = props.getString("mail.host", "localhost");
     this.mailUser = props.getString("mail.user", "");
     this.mailPassword = props.getString("mail.password", "");
+    long maxAttachmentSizeInMB =
+        props.getInt("mail.max.attachment.size.mb", 100);
+
+    attachmentMazSizeInByte = maxAttachmentSizeInMB * MB_IN_BYTES;
+
     this.mailSender = props.getString("mail.sender", "");
     this.usesAuth = props.getBoolean("mail.useAuth", true);
-    
+
     this.clientHostname = props.get("server.hostname");
     this.clientPort = props.getInt("server.port");
     this.usesSSL = props.getBoolean("server.useSSL");
-    
+
     if (usesSSL) {
       referenceURL =
           "https://" + clientHostname
@@ -95,8 +103,17 @@ public class AbstractMailer {
   public String getMailSender() {
     return mailSender;
   }
-  
-  public boolean hasMailAuth(){
-      return usesAuth;
+
+  /**
+   * Attachment maximum size in bytes
+   * 
+   * @return
+   */
+  public long getAttachmentMaxSize() {
+    return attachmentMazSizeInByte;
+  }
+
+  public boolean hasMailAuth() {
+    return usesAuth;
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index ab09312..a031e77 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -16,7 +16,6 @@
 
 package azkaban.utils;
 
-import java.lang.String;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -32,9 +31,6 @@ import azkaban.executor.Status;
 import azkaban.executor.mail.DefaultMailCreator;
 import azkaban.executor.mail.MailCreator;
 import azkaban.sla.SlaOption;
-import azkaban.utils.AbstractMailer;
-import azkaban.utils.EmailMessage;
-import azkaban.utils.Props;
 
 public class Emailer extends AbstractMailer implements Alerter {
   private static Logger logger = Logger.getLogger(Emailer.class);
@@ -64,13 +60,15 @@ public class Emailer extends AbstractMailer implements Alerter {
     this.mailPassword = props.getString("mail.password", "");
     this.mailSender = props.getString("mail.sender", "");
     this.tls = props.getString("mail.tls", "false");
-    
+
     int mailTimeout = props.getInt("mail.timeout.millis", 10000);
     EmailMessage.setTimeout(mailTimeout);
     int connectionTimeout =
         props.getInt("mail.connection.timeout.millis", 10000);
     EmailMessage.setConnectionTimeout(connectionTimeout);
 
+    EmailMessage.setTotalAttachmentMaxSize(getAttachmentMaxSize());
+
     this.clientHostname = props.getString("jetty.hostname", "localhost");
 
     if (props.getBoolean("jetty.use.ssl", true)) {
@@ -138,7 +136,7 @@ public class Emailer extends AbstractMailer implements Alerter {
     message.setFromAddress(mailSender);
     message.setTLS(tls);
     message.setAuth(super.hasMailAuth());
-    
+
     ExecutionOptions option = flow.getExecutionOptions();
 
     MailCreator mailCreator =
@@ -164,7 +162,7 @@ public class Emailer extends AbstractMailer implements Alerter {
     message.setFromAddress(mailSender);
     message.setTLS(tls);
     message.setAuth(super.hasMailAuth());
-    
+
     ExecutionOptions option = flow.getExecutionOptions();
 
     MailCreator mailCreator =
diff --git a/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java b/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
index 8e54527..aca007c 100644
--- a/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
+++ b/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
@@ -18,6 +18,7 @@ package azkaban.utils;
 
 import java.io.File;
 import java.io.InputStream;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -36,9 +37,13 @@ import javax.mail.internet.MimeBodyPart;
 import javax.mail.internet.MimeMessage;
 import javax.mail.internet.MimeMultipart;
 
+import org.apache.log4j.Logger;
+
 import com.sun.mail.smtp.SMTPTransport;
 
 public class EmailMessage {
+  private final Logger logger = Logger.getLogger(EmailMessage.class);
+
   private static String protocol = "smtp";
   private List<String> _toAddress = new ArrayList<String>();
   private String _mailHost;
@@ -48,10 +53,13 @@ public class EmailMessage {
   private String _fromAddress;
   private String _mimeType = "text/plain";
   private String _tls;
+  private long _totalAttachmentSizeSoFar;
   private boolean _usesAuth = true;
   private StringBuffer _body = new StringBuffer();
   private static int _mailTimeout = 10000;
   private static int _connectionTimeout = 10000;
+  private static long _totalAttachmentMaxSizeInByte = 1024 * 1024 * 1024; // 1
+                                                                          // GB
 
   private ArrayList<BodyPart> _attachments = new ArrayList<BodyPart>();
 
@@ -73,6 +81,14 @@ public class EmailMessage {
     _connectionTimeout = timeoutMillis;
   }
 
+  public static void setTotalAttachmentMaxSize(long sizeInBytes) {
+    if (sizeInBytes < 1) {
+      throw new IllegalArgumentException(
+          "attachment max size can't be 0 or negative");
+    }
+    _totalAttachmentMaxSizeInByte = sizeInBytes;
+  }
+
   public EmailMessage setMailHost(String host) {
     _mailHost = host;
     return this;
@@ -112,10 +128,10 @@ public class EmailMessage {
     _tls = tls;
     return this;
   }
-  
+
   public EmailMessage setAuth(boolean auth) {
-      _usesAuth = auth;
-      return this;
+    _usesAuth = auth;
+    return this;
   }
 
   public EmailMessage addAttachment(File file) throws MessagingException {
@@ -124,6 +140,16 @@ public class EmailMessage {
 
   public EmailMessage addAttachment(String attachmentName, File file)
       throws MessagingException {
+
+    _totalAttachmentSizeSoFar += file.length();
+
+    if (_totalAttachmentSizeSoFar > _totalAttachmentMaxSizeInByte) {
+      throw new MessageAttachmentExceededMaximumSizeException(
+          "Adding attachment '" + attachmentName
+              + "' will exceed the allowed maximum size of "
+              + _totalAttachmentMaxSizeInByte);
+    }
+
     BodyPart attachmentPart = new MimeBodyPart();
     DataSource fileDataSource = new FileDataSource(file);
     attachmentPart.setDataHandler(new DataHandler(fileDataSource));
@@ -162,11 +188,11 @@ public class EmailMessage {
     checkSettings();
     Properties props = new Properties();
     if (_usesAuth) {
-        props.put("mail." + protocol + ".auth", "true");
-        props.put("mail.user", _mailUser);
-        props.put("mail.password", _mailPassword);
+      props.put("mail." + protocol + ".auth", "true");
+      props.put("mail.user", _mailUser);
+      props.put("mail.password", _mailPassword);
     } else {
-        props.put("mail." + protocol + ".auth", "false");
+      props.put("mail." + protocol + ".auth", "false");
     }
     props.put("mail." + protocol + ".host", _mailHost);
     props.put("mail." + protocol + ".timeout", _mailTimeout);
@@ -204,15 +230,36 @@ public class EmailMessage {
     // Transport transport = session.getTransport();
 
     SMTPTransport t = (SMTPTransport) session.getTransport(protocol);
-    if (_usesAuth) {
-        t.connect(_mailHost, _mailUser, _mailPassword);
-    } else {
-        t.connect();
+
+    try {
+      connectToSMTPServer(t);
+    } catch (MessagingException ste) {
+      if (ste.getCause() instanceof SocketTimeoutException) {
+        try {
+          // retry on SocketTimeoutException
+          connectToSMTPServer(t);
+          logger.info("Email retry on SocketTimeoutException succeeded");
+        } catch (MessagingException me) {
+          logger.error("Email retry on SocketTimeoutException failed", me);
+          throw me;
+        }
+      } else {
+        logger.error("Encountered issue while connecting to email server", ste);
+        throw ste;
+      }
     }
     t.sendMessage(message, message.getRecipients(Message.RecipientType.TO));
     t.close();
   }
 
+  private void connectToSMTPServer(SMTPTransport t) throws MessagingException {
+    if (_usesAuth) {
+      t.connect(_mailHost, _mailUser, _mailPassword);
+    } else {
+      t.connect();
+    }
+  }
+
   public void setBody(String body) {
     setBody(body, _mimeType);
   }
diff --git a/azkaban-common/src/main/java/azkaban/utils/MessageAttachmentExceededMaximumSizeException.java b/azkaban-common/src/main/java/azkaban/utils/MessageAttachmentExceededMaximumSizeException.java
new file mode 100644
index 0000000..344bc86
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/MessageAttachmentExceededMaximumSizeException.java
@@ -0,0 +1,26 @@
+package azkaban.utils;
+
+import javax.mail.MessagingException;
+
+/**
+ * To indicate the attachment size is larger than allowed size
+ * 
+ * @author hluu
+ *
+ */
+public class MessageAttachmentExceededMaximumSizeException extends
+    MessagingException {
+
+  public MessageAttachmentExceededMaximumSizeException() {
+    super();
+  }
+
+  public MessageAttachmentExceededMaximumSizeException(String s) {
+    super(s);
+  }
+
+  public MessageAttachmentExceededMaximumSizeException(String s, Exception e) {
+    super(s, e);
+  }
+
+}