azkaban-uncached
Changes
src/java/azkaban/utils/EmailMessage.java 18(+16 -2)
Details
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index a49b4fb..23d3494 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -25,12 +25,18 @@ public class ExecutorMailer {
private String mailSender;
private String azkabanName;
+ private int mailTimeout;
+ private int connectionTimeout;
+
public ExecutorMailer(Props props) {
this.azkabanName = props.getString("azkaban.name", "azkaban");
this.mailHost = props.getString("mail.host", "localhost");
this.mailUser = props.getString("mail.user", "");
this.mailPassword = props.getString("mail.password", "");
this.mailSender = props.getString("mail.sender", "");
+
+ this.mailTimeout = props.getInt("mail.timeout.millis", 10000);
+ this.connectionTimeout = props.getInt("mail.connection.timeout.millis", 10000);
this.clientHostname = props.getString("jetty.hostname", "localhost");
this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
@@ -48,6 +54,8 @@ public class ExecutorMailer {
message.setFromAddress(mailSender);
message.addAllToAddress(emailList);
message.setMimeType("text/html");
+ message.setTimeout(mailTimeout);
+ message.setConnectionTimeout(connectionTimeout);
message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
@@ -102,6 +110,8 @@ public class ExecutorMailer {
message.setFromAddress(mailSender);
message.addAllToAddress(emailList);
message.setMimeType("text/html");
+ message.setTimeout(mailTimeout);
+ message.setConnectionTimeout(connectionTimeout);
message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
@@ -150,6 +160,8 @@ public class ExecutorMailer {
message.setFromAddress(mailSender);
message.addAllToAddress(emailList);
message.setMimeType("text/html");
+ message.setTimeout(mailTimeout);
+ message.setConnectionTimeout(connectionTimeout);
message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 42b4b41..01c94f8 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -696,7 +696,7 @@ public class ExecutorManager {
}
catch (Exception e) {
logger.error(e);
- }
+ }
}
}
}
@@ -704,6 +704,7 @@ public class ExecutorManager {
private void finalizeFlows(ExecutableFlow flow) {
int execId = flow.getExecutionId();
+ updaterStage = "finalizing flow " + execId;
// First we check if the execution in the datastore is complete
try {
ExecutableFlow dsFlow;
@@ -711,15 +712,18 @@ public class ExecutorManager {
dsFlow = flow;
}
else {
+ updaterStage = "finalizing flow " + execId + " loading from db";
dsFlow = executorLoader.fetchExecutableFlow(execId);
// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
if (!isFinished(dsFlow)) {
+ updaterStage = "finalizing flow " + execId + " failing the flow";
failEverything(dsFlow);
executorLoader.updateExecutableFlow(dsFlow);
}
}
+ updaterStage = "finalizing flow " + execId + " deleting active reference";
// Delete the executing reference.
if (flow.getEndTime() == -1) {
flow.setEndTime(System.currentTimeMillis());
@@ -727,6 +731,7 @@ public class ExecutorManager {
}
executorLoader.removeActiveExecutableReference(execId);
+ updaterStage = "finalizing flow " + execId + " cleaning from memory";
runningFlows.remove(execId);
recentlyFinished.put(execId, dsFlow);
} catch (ExecutorManagerException e) {
@@ -736,6 +741,7 @@ public class ExecutorManager {
// TODO append to the flow log that we forced killed this flow because the target no longer had
// the reference.
+ updaterStage = "finalizing flow " + execId + " alerting and emailing";
ExecutionOptions options = flow.getExecutionOptions();
// But we can definitely email them.
if(flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED)
src/java/azkaban/utils/EmailMessage.java 18(+16 -2)
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index 06ca9bc..3bb0e04 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -32,7 +32,9 @@ public class EmailMessage {
private String _fromAddress;
private String _mimeType = "text/plain";
private StringBuffer _body = new StringBuffer();
-
+ private int _mailTimeout = 10000;
+ private int _connectionTimeout = 10000;
+
private ArrayList<BodyPart> _attachments = new ArrayList<BodyPart>();
public EmailMessage() {
@@ -44,7 +46,17 @@ public class EmailMessage {
_mailHost = host;
_mailPassword = password;
}
-
+
+ public EmailMessage setTimeout(int timeoutMillis) {
+ _mailTimeout = timeoutMillis;
+ return this;
+ }
+
+ public EmailMessage setConnectionTimeout(int timeoutMillis) {
+ _connectionTimeout = timeoutMillis;
+ return this;
+ }
+
public EmailMessage setMailHost(String host) {
_mailHost = host;
return this;
@@ -136,6 +148,8 @@ public class EmailMessage {
props.put("mail."+protocol+".auth", "true");
props.put("mail.user", _mailUser);
props.put("mail.password", _mailPassword);
+ props.put("mail."+protocol+".timeout", _mailTimeout);
+ props.put("mail."+protocol+".connectiontimeout", _connectionTimeout);
Session session = Session.getInstance(props, null);
Message message = new MimeMessage(session);