azkaban-uncached

set timeout on sending emails

9/25/2013 10:24:04 PM

Details

diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index a49b4fb..23d3494 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -25,12 +25,18 @@ public class ExecutorMailer {
 	private String mailSender;
 	private String azkabanName;
 	
+	private int mailTimeout;
+	private int connectionTimeout;
+	
 	public ExecutorMailer(Props props) {
 		this.azkabanName = props.getString("azkaban.name", "azkaban");
 		this.mailHost = props.getString("mail.host", "localhost");
 		this.mailUser = props.getString("mail.user", "");
 		this.mailPassword = props.getString("mail.password", "");
 		this.mailSender = props.getString("mail.sender", "");
+
+		this.mailTimeout = props.getInt("mail.timeout.millis", 10000);
+		this.connectionTimeout = props.getInt("mail.connection.timeout.millis", 10000);
 		
 		this.clientHostname = props.getString("jetty.hostname", "localhost");
 		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
@@ -48,6 +54,8 @@ public class ExecutorMailer {
 			message.setFromAddress(mailSender);
 			message.addAllToAddress(emailList);
 			message.setMimeType("text/html");
+			message.setTimeout(mailTimeout);
+			message.setConnectionTimeout(connectionTimeout);
 			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
 			
 			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
@@ -102,6 +110,8 @@ public class ExecutorMailer {
 			message.setFromAddress(mailSender);
 			message.addAllToAddress(emailList);
 			message.setMimeType("text/html");
+			message.setTimeout(mailTimeout);
+			message.setConnectionTimeout(connectionTimeout);
 			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
 			
 			message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
@@ -150,6 +160,8 @@ public class ExecutorMailer {
 			message.setFromAddress(mailSender);
 			message.addAllToAddress(emailList);
 			message.setMimeType("text/html");
+			message.setTimeout(mailTimeout);
+			message.setConnectionTimeout(connectionTimeout);
 			message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
 			
 			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 42b4b41..01c94f8 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -696,7 +696,7 @@ public class ExecutorManager {
 				}
 				catch (Exception e) {
 					logger.error(e);
-				}
+				} 
 			}
 		}
 	}
@@ -704,6 +704,7 @@ public class ExecutorManager {
 	private void finalizeFlows(ExecutableFlow flow) {
 		int execId = flow.getExecutionId();
 		
+		updaterStage = "finalizing flow " + execId;
 		// First we check if the execution in the datastore is complete
 		try {
 			ExecutableFlow dsFlow;
@@ -711,15 +712,18 @@ public class ExecutorManager {
 				dsFlow = flow;
 			}
 			else {
+				updaterStage = "finalizing flow " + execId + " loading from db";
 				dsFlow = executorLoader.fetchExecutableFlow(execId);
 			
 				// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
 				if (!isFinished(dsFlow)) {
+					updaterStage = "finalizing flow " + execId + " failing the flow";
 					failEverything(dsFlow);
 					executorLoader.updateExecutableFlow(dsFlow);
 				}
 			}
 
+			updaterStage = "finalizing flow " + execId + " deleting active reference";
 			// Delete the executing reference.
 			if (flow.getEndTime() == -1) {
 				flow.setEndTime(System.currentTimeMillis());
@@ -727,6 +731,7 @@ public class ExecutorManager {
 			}
 			executorLoader.removeActiveExecutableReference(execId);
 			
+			updaterStage = "finalizing flow " + execId + " cleaning from memory";
 			runningFlows.remove(execId);
 			recentlyFinished.put(execId, dsFlow);
 		} catch (ExecutorManagerException e) {
@@ -736,6 +741,7 @@ public class ExecutorManager {
 		// TODO append to the flow log that we forced killed this flow because the target no longer had
 		// the reference.
 		
+		updaterStage = "finalizing flow " + execId + " alerting and emailing";
 		ExecutionOptions options = flow.getExecutionOptions();
 		// But we can definitely email them.
 		if(flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED)
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index 06ca9bc..3bb0e04 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -32,7 +32,9 @@ public class EmailMessage {
 	private String _fromAddress;
 	private String _mimeType = "text/plain";
 	private StringBuffer _body = new StringBuffer();
-
+	private int _mailTimeout = 10000;
+	private int _connectionTimeout = 10000;
+	
 	private ArrayList<BodyPart> _attachments = new ArrayList<BodyPart>();
 
 	public EmailMessage() {
@@ -44,7 +46,17 @@ public class EmailMessage {
 		_mailHost = host;
 		_mailPassword = password;
 	}
-
+	
+	public EmailMessage setTimeout(int timeoutMillis) {
+		_mailTimeout = timeoutMillis;
+		return this;
+	}
+	
+	public EmailMessage setConnectionTimeout(int timeoutMillis) {
+		_connectionTimeout = timeoutMillis;
+		return this;
+	}
+	
 	public EmailMessage setMailHost(String host) {
 		_mailHost = host;
 		return this;
@@ -136,6 +148,8 @@ public class EmailMessage {
 		props.put("mail."+protocol+".auth", "true");
 		props.put("mail.user", _mailUser);
 		props.put("mail.password", _mailPassword);
+		props.put("mail."+protocol+".timeout", _mailTimeout);
+		props.put("mail."+protocol+".connectiontimeout", _connectionTimeout);
 
 		Session session = Session.getInstance(props, null);
 		Message message = new MimeMessage(session);