Details
diff --git a/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java b/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
index 6fecc05..bbbec14 100644
--- a/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/AbstractMailer.java
@@ -19,6 +19,7 @@ package azkaban.utils;
import java.util.Collection;
public class AbstractMailer {
+ private static int MB_IN_BYTES = 1048576;
private String clientHostname;
private int clientPort;
private boolean usesSSL;
@@ -32,18 +33,25 @@ public class AbstractMailer {
private String referenceURL;
+ private long attachmentMazSizeInByte;
+
public AbstractMailer(Props props) {
this.azkabanName = props.getString("azkaban.name", "azkaban");
this.mailHost = props.getString("mail.host", "localhost");
this.mailUser = props.getString("mail.user", "");
this.mailPassword = props.getString("mail.password", "");
+ long maxAttachmentSizeInMB =
+ props.getInt("mail.max.attachment.size.mb", 100);
+
+ attachmentMazSizeInByte = maxAttachmentSizeInMB * MB_IN_BYTES;
+
this.mailSender = props.getString("mail.sender", "");
this.usesAuth = props.getBoolean("mail.useAuth", true);
-
+
this.clientHostname = props.get("server.hostname");
this.clientPort = props.getInt("server.port");
this.usesSSL = props.getBoolean("server.useSSL");
-
+
if (usesSSL) {
referenceURL =
"https://" + clientHostname
@@ -95,8 +103,17 @@ public class AbstractMailer {
public String getMailSender() {
return mailSender;
}
-
- public boolean hasMailAuth(){
- return usesAuth;
+
+ /**
+ * Attachment maximum size in bytes
+ *
+ * @return
+ */
+ public long getAttachmentMaxSize() {
+ return attachmentMazSizeInByte;
+ }
+
+ public boolean hasMailAuth() {
+ return usesAuth;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/Emailer.java b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
index ab09312..a031e77 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Emailer.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Emailer.java
@@ -16,7 +16,6 @@
package azkaban.utils;
-import java.lang.String;
import java.util.ArrayList;
import java.util.List;
@@ -32,9 +31,6 @@ import azkaban.executor.Status;
import azkaban.executor.mail.DefaultMailCreator;
import azkaban.executor.mail.MailCreator;
import azkaban.sla.SlaOption;
-import azkaban.utils.AbstractMailer;
-import azkaban.utils.EmailMessage;
-import azkaban.utils.Props;
public class Emailer extends AbstractMailer implements Alerter {
private static Logger logger = Logger.getLogger(Emailer.class);
@@ -64,13 +60,15 @@ public class Emailer extends AbstractMailer implements Alerter {
this.mailPassword = props.getString("mail.password", "");
this.mailSender = props.getString("mail.sender", "");
this.tls = props.getString("mail.tls", "false");
-
+
int mailTimeout = props.getInt("mail.timeout.millis", 10000);
EmailMessage.setTimeout(mailTimeout);
int connectionTimeout =
props.getInt("mail.connection.timeout.millis", 10000);
EmailMessage.setConnectionTimeout(connectionTimeout);
+ EmailMessage.setTotalAttachmentMaxSize(getAttachmentMaxSize());
+
this.clientHostname = props.getString("jetty.hostname", "localhost");
if (props.getBoolean("jetty.use.ssl", true)) {
@@ -138,7 +136,7 @@ public class Emailer extends AbstractMailer implements Alerter {
message.setFromAddress(mailSender);
message.setTLS(tls);
message.setAuth(super.hasMailAuth());
-
+
ExecutionOptions option = flow.getExecutionOptions();
MailCreator mailCreator =
@@ -164,7 +162,7 @@ public class Emailer extends AbstractMailer implements Alerter {
message.setFromAddress(mailSender);
message.setTLS(tls);
message.setAuth(super.hasMailAuth());
-
+
ExecutionOptions option = flow.getExecutionOptions();
MailCreator mailCreator =
diff --git a/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java b/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
index 8e54527..aca007c 100644
--- a/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
+++ b/azkaban-common/src/main/java/azkaban/utils/EmailMessage.java
@@ -18,6 +18,7 @@ package azkaban.utils;
import java.io.File;
import java.io.InputStream;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@@ -36,9 +37,13 @@ import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
+import org.apache.log4j.Logger;
+
import com.sun.mail.smtp.SMTPTransport;
public class EmailMessage {
+ private final Logger logger = Logger.getLogger(EmailMessage.class);
+
private static String protocol = "smtp";
private List<String> _toAddress = new ArrayList<String>();
private String _mailHost;
@@ -48,10 +53,13 @@ public class EmailMessage {
private String _fromAddress;
private String _mimeType = "text/plain";
private String _tls;
+ private long _totalAttachmentSizeSoFar;
private boolean _usesAuth = true;
private StringBuffer _body = new StringBuffer();
private static int _mailTimeout = 10000;
private static int _connectionTimeout = 10000;
+ private static long _totalAttachmentMaxSizeInByte = 1024 * 1024 * 1024; // 1
+ // GB
private ArrayList<BodyPart> _attachments = new ArrayList<BodyPart>();
@@ -73,6 +81,14 @@ public class EmailMessage {
_connectionTimeout = timeoutMillis;
}
+ public static void setTotalAttachmentMaxSize(long sizeInBytes) {
+ if (sizeInBytes < 1) {
+ throw new IllegalArgumentException(
+ "attachment max size can't be 0 or negative");
+ }
+ _totalAttachmentMaxSizeInByte = sizeInBytes;
+ }
+
public EmailMessage setMailHost(String host) {
_mailHost = host;
return this;
@@ -112,10 +128,10 @@ public class EmailMessage {
_tls = tls;
return this;
}
-
+
public EmailMessage setAuth(boolean auth) {
- _usesAuth = auth;
- return this;
+ _usesAuth = auth;
+ return this;
}
public EmailMessage addAttachment(File file) throws MessagingException {
@@ -124,6 +140,16 @@ public class EmailMessage {
public EmailMessage addAttachment(String attachmentName, File file)
throws MessagingException {
+
+ _totalAttachmentSizeSoFar += file.length();
+
+ if (_totalAttachmentSizeSoFar > _totalAttachmentMaxSizeInByte) {
+ throw new MessageAttachmentExceededMaximumSizeException(
+ "Adding attachment '" + attachmentName
+ + "' will exceed the allowed maximum size of "
+ + _totalAttachmentMaxSizeInByte);
+ }
+
BodyPart attachmentPart = new MimeBodyPart();
DataSource fileDataSource = new FileDataSource(file);
attachmentPart.setDataHandler(new DataHandler(fileDataSource));
@@ -162,11 +188,11 @@ public class EmailMessage {
checkSettings();
Properties props = new Properties();
if (_usesAuth) {
- props.put("mail." + protocol + ".auth", "true");
- props.put("mail.user", _mailUser);
- props.put("mail.password", _mailPassword);
+ props.put("mail." + protocol + ".auth", "true");
+ props.put("mail.user", _mailUser);
+ props.put("mail.password", _mailPassword);
} else {
- props.put("mail." + protocol + ".auth", "false");
+ props.put("mail." + protocol + ".auth", "false");
}
props.put("mail." + protocol + ".host", _mailHost);
props.put("mail." + protocol + ".timeout", _mailTimeout);
@@ -204,15 +230,36 @@ public class EmailMessage {
// Transport transport = session.getTransport();
SMTPTransport t = (SMTPTransport) session.getTransport(protocol);
- if (_usesAuth) {
- t.connect(_mailHost, _mailUser, _mailPassword);
- } else {
- t.connect();
+
+ try {
+ connectToSMTPServer(t);
+ } catch (MessagingException ste) {
+ if (ste.getCause() instanceof SocketTimeoutException) {
+ try {
+ // retry on SocketTimeoutException
+ connectToSMTPServer(t);
+ logger.info("Email retry on SocketTimeoutException succeeded");
+ } catch (MessagingException me) {
+ logger.error("Email retry on SocketTimeoutException failed", me);
+ throw me;
+ }
+ } else {
+ logger.error("Encountered issue while connecting to email server", ste);
+ throw ste;
+ }
}
t.sendMessage(message, message.getRecipients(Message.RecipientType.TO));
t.close();
}
+ private void connectToSMTPServer(SMTPTransport t) throws MessagingException {
+ if (_usesAuth) {
+ t.connect(_mailHost, _mailUser, _mailPassword);
+ } else {
+ t.connect();
+ }
+ }
+
public void setBody(String body) {
setBody(body, _mimeType);
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/MessageAttachmentExceededMaximumSizeException.java b/azkaban-common/src/main/java/azkaban/utils/MessageAttachmentExceededMaximumSizeException.java
new file mode 100644
index 0000000..344bc86
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/MessageAttachmentExceededMaximumSizeException.java
@@ -0,0 +1,26 @@
+package azkaban.utils;
+
+import javax.mail.MessagingException;
+
+/**
+ * To indicate the attachment size is larger than allowed size
+ *
+ * @author hluu
+ *
+ */
+public class MessageAttachmentExceededMaximumSizeException extends
+ MessagingException {
+
+ public MessageAttachmentExceededMaximumSizeException() {
+ super();
+ }
+
+ public MessageAttachmentExceededMaximumSizeException(String s) {
+ super(s);
+ }
+
+ public MessageAttachmentExceededMaximumSizeException(String s, Exception e) {
+ super(s, e);
+ }
+
+}