azkaban-uncached
Changes
src/java/azkaban/executor/ExecutableFlow.java 164(+83 -81)
src/java/azkaban/executor/ExecutorMailer.java 185(+55 -130)
src/java/azkaban/flow/Flow.java 37(+37 -0)
src/java/azkaban/project/Project.java 28(+28 -0)
src/java/azkaban/utils/Utils.java 12(+12 -0)
src/java/azkaban/utils/WebUtils.java 3(+1 -2)
src/sql/create_schedule_table.sql 4(+2 -2)
Details
src/java/azkaban/executor/ExecutableFlow.java 164(+83 -81)
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 152a546..e2301bf 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -39,7 +39,7 @@ public class ExecutableFlow {
private int version;
private String executionPath;
-
+
private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
private ArrayList<String> startNodes;
@@ -52,10 +52,10 @@ public class ExecutableFlow {
private Status flowStatus = Status.READY;
private String submitUser;
-
+
private HashSet<String> proxyUsers = new HashSet<String>();
private ExecutionOptions executionOptions;
-
+
public ExecutableFlow(Flow flow) {
this.projectId = flow.getProjectId();
this.scheduleId = -1;
@@ -63,79 +63,81 @@ public class ExecutableFlow {
this.version = flow.getVersion();
this.setFlow(flow);
}
-
+
public ExecutableFlow(int executionId, Flow flow) {
this.projectId = flow.getProjectId();
this.scheduleId = -1;
this.flowId = flow.getId();
this.version = flow.getVersion();
this.executionId = executionId;
-
+
this.setFlow(flow);
}
-
+
public ExecutableFlow() {
}
-
+
public long getUpdateTime() {
return updateTime;
}
-
+
public void setUpdateTime(long updateTime) {
this.updateTime = updateTime;
}
-
+
public List<ExecutableNode> getExecutableNodes() {
return new ArrayList<ExecutableNode>(executableNodes.values());
}
-
+
public ExecutableNode getExecutableNode(String id) {
return executableNodes.get(id);
}
-
+
public Collection<FlowProps> getFlowProps() {
return flowProps.values();
}
-
+
public void addAllProxyUsers(Collection<String> proxyUsers) {
this.proxyUsers.addAll(proxyUsers);
}
-
+
public Set<String> getProxyUsers() {
return new HashSet<String>(this.proxyUsers);
}
-
+
public void setExecutionOptions(ExecutionOptions options) {
executionOptions = options;
}
-
+
public ExecutionOptions getExecutionOptions() {
return executionOptions;
}
-
+
private void setFlow(Flow flow) {
executionOptions = new ExecutionOptions();
-
+
for (Node node: flow.getNodes()) {
String id = node.getId();
ExecutableNode exNode = new ExecutableNode(node, this);
executableNodes.put(id, exNode);
}
-
+
for (Edge edge: flow.getEdges()) {
ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
-
+
sourceNode.addOutNode(edge.getTargetId());
targetNode.addInNode(edge.getSourceId());
}
-
+
if (flow.getSuccessEmails() != null) {
executionOptions.setSuccessEmails(flow.getSuccessEmails());
}
if (flow.getFailureEmails() != null) {
executionOptions.setFailureEmails(flow.getFailureEmails());
}
+ executionOptions.setMailCreator(flow.getMailCreator());
+
flowProps.putAll(flow.getAllFlowProps());
}
@@ -148,10 +150,10 @@ public class ExecutableFlow {
}
}
}
-
+
return startNodes;
}
-
+
public List<String> getEndNodes() {
if (endNodes == null) {
endNodes = new ArrayList<String>();
@@ -161,10 +163,10 @@ public class ExecutableFlow {
}
}
}
-
+
return endNodes;
}
-
+
public boolean setNodeStatus(String nodeId, Status status) {
ExecutableNode exNode = executableNodes.get(nodeId);
if (exNode == null) {
@@ -179,18 +181,18 @@ public class ExecutableFlow {
if (exNode == null) {
return;
}
-
+
exNode.setExternalExecutionId(externalExecutionId);
}
-
+
public int getExecutionId() {
return executionId;
}
public void setExecutionId(int executionId) {
this.executionId = executionId;
-
- for(ExecutableNode node: executableNodes.values()) {
+
+ for (ExecutableNode node: executableNodes.values()) {
node.setExecutionId(executionId);
}
}
@@ -226,31 +228,31 @@ public class ExecutableFlow {
public void setExecutionPath(String executionPath) {
this.executionPath = executionPath;
}
-
+
public long getStartTime() {
return startTime;
}
-
+
public void setStartTime(long time) {
this.startTime = time;
}
-
+
public long getEndTime() {
return endTime;
}
-
+
public void setEndTime(long time) {
this.endTime = time;
}
-
+
public long getSubmitTime() {
return submitTime;
}
-
+
public void setSubmitTime(long time) {
this.submitTime = time;
}
-
+
public Status getStatus() {
return flowStatus;
}
@@ -258,16 +260,16 @@ public class ExecutableFlow {
public void setStatus(Status flowStatus) {
this.flowStatus = flowStatus;
}
-
- public Map<String,Object> toObject() {
+
+ public Map<String, Object> toObject() {
HashMap<String, Object> flowObj = new HashMap<String, Object>();
flowObj.put("type", "executableflow");
flowObj.put("executionId", executionId);
flowObj.put("executionPath", executionPath);
flowObj.put("flowId", flowId);
flowObj.put("projectId", projectId);
-
- if(scheduleId >= 0) {
+
+ if (scheduleId >= 0) {
flowObj.put("scheduleId", scheduleId);
}
flowObj.put("submitTime", submitTime);
@@ -276,16 +278,16 @@ public class ExecutableFlow {
flowObj.put("status", flowStatus.toString());
flowObj.put("submitUser", submitUser);
flowObj.put("version", version);
-
+
flowObj.put("executionOptions", this.executionOptions.toObject());
flowObj.put("version", version);
-
+
ArrayList<Object> props = new ArrayList<Object>();
for (FlowProps fprop: flowProps.values()) {
HashMap<String, Object> propObj = new HashMap<String, Object>();
String source = fprop.getSource();
String inheritedSource = fprop.getInheritedSource();
-
+
propObj.put("source", source);
if (inheritedSource != null) {
propObj.put("inherited", inheritedSource);
@@ -293,13 +295,13 @@ public class ExecutableFlow {
props.add(propObj);
}
flowObj.put("properties", props);
-
+
ArrayList<Object> nodes = new ArrayList<Object>();
for (ExecutableNode node: executableNodes.values()) {
nodes.add(node.toObject());
}
flowObj.put("nodes", nodes);
-
+
ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
flowObj.put("proxyUsers", proxyUserList);
@@ -307,57 +309,57 @@ public class ExecutableFlow {
}
public Object toUpdateObject(long lastUpdateTime) {
- Map<String, Object> updateData = new HashMap<String,Object>();
+ Map<String, Object> updateData = new HashMap<String, Object>();
updateData.put("execId", this.executionId);
updateData.put("status", this.flowStatus.getNumVal());
updateData.put("startTime", this.startTime);
updateData.put("endTime", this.endTime);
updateData.put("updateTime", this.updateTime);
-
- List<Map<String,Object>> updatedNodes = new ArrayList<Map<String,Object>>();
+
+ List<Map<String, Object>> updatedNodes = new ArrayList<Map<String, Object>>();
for (ExecutableNode node: executableNodes.values()) {
-
+
if (node.getUpdateTime() > lastUpdateTime) {
- Map<String, Object> updatedNodeMap = new HashMap<String,Object>();
+ Map<String, Object> updatedNodeMap = new HashMap<String, Object>();
updatedNodeMap.put("jobId", node.getJobId());
updatedNodeMap.put("status", node.getStatus().getNumVal());
updatedNodeMap.put("startTime", node.getStartTime());
updatedNodeMap.put("endTime", node.getEndTime());
updatedNodeMap.put("updateTime", node.getUpdateTime());
updatedNodeMap.put("attempt", node.getAttempt());
-
+
if (node.getAttempt() > 0) {
- ArrayList<Map<String,Object>> pastAttempts = new ArrayList<Map<String,Object>>();
+ ArrayList<Map<String, Object>> pastAttempts = new ArrayList<Map<String, Object>>();
for (Attempt attempt: node.getPastAttemptList()) {
pastAttempts.add(attempt.toObject());
}
updatedNodeMap.put("pastAttempts", pastAttempts);
}
-
+
updatedNodes.add(updatedNodeMap);
}
}
-
+
updateData.put("nodes", updatedNodes);
return updateData;
}
-
+
@SuppressWarnings("unchecked")
public void applyUpdateObject(Map<String, Object> updateData) {
- List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get("nodes");
- for (Map<String,Object> node: updatedNodes) {
+ List<Map<String, Object>> updatedNodes = (List<Map<String, Object>>)updateData.get("nodes");
+ for (Map<String, Object> node: updatedNodes) {
String jobId = (String)node.get("jobId");
Status status = Status.fromInteger((Integer)node.get("status"));
long startTime = JSONUtils.getLongFromObject(node.get("startTime"));
long endTime = JSONUtils.getLongFromObject(node.get("endTime"));
long updateTime = JSONUtils.getLongFromObject(node.get("updateTime"));
-
+
ExecutableNode exNode = executableNodes.get(jobId);
exNode.setEndTime(endTime);
exNode.setStartTime(startTime);
exNode.setUpdateTime(updateTime);
exNode.setStatus(status);
-
+
int attempt = 0;
if (node.containsKey("attempt")) {
attempt = (Integer)node.get("attempt");
@@ -365,22 +367,22 @@ public class ExecutableFlow {
exNode.updatePastAttempts((List<Object>)node.get("pastAttempts"));
}
}
-
+
exNode.setAttempt(attempt);
}
-
+
this.flowStatus = Status.fromInteger((Integer)updateData.get("status"));
-
+
this.startTime = JSONUtils.getLongFromObject(updateData.get("startTime"));
this.endTime = JSONUtils.getLongFromObject(updateData.get("endTime"));
this.updateTime = JSONUtils.getLongFromObject(updateData.get("updateTime"));
}
-
+
@SuppressWarnings("unchecked")
public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
ExecutableFlow exFlow = new ExecutableFlow();
-
- HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
+
+ HashMap<String, Object> flowObj = (HashMap<String, Object>)obj;
exFlow.executionId = (Integer)flowObj.get("executionId");
exFlow.executionPath = (String)flowObj.get("executionPath");
exFlow.flowId = (String)flowObj.get("flowId");
@@ -394,7 +396,7 @@ public class ExecutableFlow {
exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
exFlow.submitUser = (String)flowObj.get("submitUser");
exFlow.version = (Integer)flowObj.get("version");
-
+
if (flowObj.containsKey("executionOptions")) {
exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get("executionOptions"));
}
@@ -402,7 +404,7 @@ public class ExecutableFlow {
// for backawards compatibility should remove in a few versions.
exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj);
}
-
+
// Copy nodes
List<Object> nodes = (List<Object>)flowObj.get("nodes");
for (Object nodeObj: nodes) {
@@ -411,57 +413,57 @@ public class ExecutableFlow {
}
List<Object> properties = (List<Object>)flowObj.get("properties");
- for (Object propNode : properties) {
+ for (Object propNode: properties) {
HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
String source = (String)fprop.get("source");
String inheritedSource = (String)fprop.get("inherited");
-
+
FlowProps flowProps = new FlowProps(inheritedSource, source);
exFlow.flowProps.put(source, flowProps);
}
-
- if(flowObj.containsKey("proxyUsers")) {
- ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
+
+ if (flowObj.containsKey("proxyUsers")) {
+ ArrayList<String> proxyUserList = (ArrayList<String>)flowObj.get("proxyUsers");
exFlow.addAllProxyUsers(proxyUserList);
}
-
+
return exFlow;
}
-
+
@SuppressWarnings("unchecked")
public void updateExecutableFlowFromObject(Object obj) {
- HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
+ HashMap<String, Object> flowObj = (HashMap<String, Object>)obj;
submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
flowStatus = Status.valueOf((String)flowObj.get("status"));
-
+
List<Object> nodes = (List<Object>)flowObj.get("nodes");
for (Object nodeObj: nodes) {
- HashMap<String, Object> nodeHash= (HashMap<String, Object>)nodeObj;
+ HashMap<String, Object> nodeHash = (HashMap<String, Object>)nodeObj;
String nodeId = (String)nodeHash.get("id");
ExecutableNode node = executableNodes.get(nodeId);
if (nodeId == null) {
throw new RuntimeException("Node " + nodeId + " doesn't exist in flow.");
}
-
+
node.updateNodeFromObject(nodeObj);
}
}
-
+
public Set<String> getSources() {
HashSet<String> set = new HashSet<String>();
for (ExecutableNode exNode: executableNodes.values()) {
set.add(exNode.getJobPropsSource());
}
-
+
for (FlowProps props: flowProps.values()) {
set.add(props.getSource());
}
return set;
}
-
+
public String getSubmitUser() {
return submitUser;
}
@@ -469,7 +471,7 @@ public class ExecutableFlow {
public void setSubmitUser(String submitUser) {
this.submitUser = submitUser;
}
-
+
public int getVersion() {
return version;
}
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index a91a6d0..2710f0b 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -8,6 +8,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import azkaban.executor.mail.DefaultMailCreator;
+
/**
* Execution options for submitted flows and scheduled flows
*/
@@ -27,6 +29,7 @@ public class ExecutionOptions {
private Integer pipelineExecId = null;
private Integer queueLevel = 0;
private String concurrentOption = CONCURRENT_OPTION_IGNORE;
+ private String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
private Map<String, String> flowParameters = new HashMap<String, String>();
public enum FailureAction {
@@ -107,10 +110,18 @@ public class ExecutionOptions {
this.concurrentOption = concurrentOption;
}
+ public void setMailCreator(String mailCreator) {
+ this.mailCreator = mailCreator;
+ }
+
public String getConcurrentOption() {
return concurrentOption;
}
+ public String getMailCreator() {
+ return mailCreator;
+ }
+
public Integer getPipelineLevel() {
return pipelineLevel;
}
@@ -152,6 +163,7 @@ public class ExecutionOptions {
flowOptionObj.put("pipelineExecId", pipelineExecId);
flowOptionObj.put("queueLevel", queueLevel);
flowOptionObj.put("concurrentOption", concurrentOption);
+ flowOptionObj.put("mailCreator", mailCreator);
flowOptionObj.put("disabled", initiallyDisabledJobs);
flowOptionObj.put("failureEmailsOverride", failureEmailsOverride);
flowOptionObj.put("successEmailsOverride", successEmailsOverride);
@@ -180,6 +192,9 @@ public class ExecutionOptions {
if (optionsMap.containsKey("concurrentOption")) {
options.concurrentOption = (String)optionsMap.get("concurrentOption");
}
+ if (optionsMap.containsKey("mailCreator")) {
+ options.mailCreator = (String)optionsMap.get("mailCreator");
+ }
if (optionsMap.containsKey("disabled")) {
options.initiallyDisabledJobs = new HashSet<String>((List<String>)optionsMap.get("disabled"));
}
src/java/azkaban/executor/ExecutorMailer.java 185(+55 -130)
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index a49b4fb..4efcb38 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -7,179 +7,104 @@ import javax.mail.MessagingException;
import org.apache.log4j.Logger;
-import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.executor.mail.MailCreator;
import azkaban.utils.EmailMessage;
import azkaban.utils.Props;
import azkaban.utils.Utils;
public class ExecutorMailer {
private static Logger logger = Logger.getLogger(ExecutorMailer.class);
-
+
private boolean testMode = false;
private String clientHostname;
private String clientPortNumber;
-
+
private String mailHost;
private String mailUser;
private String mailPassword;
private String mailSender;
private String azkabanName;
-
+
public ExecutorMailer(Props props) {
this.azkabanName = props.getString("azkaban.name", "azkaban");
this.mailHost = props.getString("mail.host", "localhost");
this.mailUser = props.getString("mail.user", "");
this.mailPassword = props.getString("mail.password", "");
this.mailSender = props.getString("mail.sender", "");
-
+
this.clientHostname = props.getString("jetty.hostname", "localhost");
this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
-
+
testMode = props.getBoolean("test.mode", false);
}
-
+
public void sendFirstErrorMessage(ExecutableFlow flow) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+
ExecutionOptions option = flow.getExecutionOptions();
- List<String> emailList = option.getDisabledJobs();
- int execId = flow.getExecutionId();
-
- if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
- message.setFromAddress(mailSender);
- message.addAllToAddress(emailList);
- message.setMimeType("text/html");
- message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
-
- message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
-
- if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
- message.println("This flow is set to cancel all currently running jobs.");
- }
- else if (option.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
- message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
- }
- else {
- message.println("This flow is set to complete all currently running jobs before stopping.");
- }
-
- message.println("<table>");
- message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
- message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
- message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
- message.println("</table>");
- message.println("");
- String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
- message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-
- message.println("");
- message.println("<h3>Reason</h3>");
- List<String> failedJobs = findFailedJobs(flow);
- message.println("<ul>");
- for (String jobId : failedJobs) {
- message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
- }
-
- message.println("</ul>");
-
- if (!testMode) {
- try {
- message.sendEmail();
- } catch (MessagingException e) {
- logger.error("Email message send failed" , e);
- }
+ MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+
+ logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+ boolean mailCreated = mailCreator.createFirstErrorMessage(flow, message, azkabanName, clientHostname, clientPortNumber);
+
+ if (mailCreated && !testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed", e);
}
}
}
-
- public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
+
+ public void sendErrorEmail(ExecutableFlow flow, String... extraReasons) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+
ExecutionOptions option = flow.getExecutionOptions();
-
- List<String> emailList = option.getFailureEmails();
- int execId = flow.getExecutionId();
-
- if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
- message.setFromAddress(mailSender);
- message.addAllToAddress(emailList);
- message.setMimeType("text/html");
- message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
-
- message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
- message.println("<table>");
- message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
- message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
- message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
- message.println("</table>");
- message.println("");
- String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
- message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-
- message.println("");
- message.println("<h3>Reason</h3>");
- List<String> failedJobs = findFailedJobs(flow);
- message.println("<ul>");
- for (String jobId : failedJobs) {
- message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
- }
- for (String reasons: extraReasons) {
- message.println("<li>" + reasons + "</li>");
- }
-
- message.println("</ul>");
-
-
-
- if (!testMode) {
- try {
- message.sendEmail();
- } catch (MessagingException e) {
- logger.error("Email message send failed" , e);
- }
+ MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+ logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+ boolean mailCreated = mailCreator.createErrorEmail(flow, message, azkabanName, clientHostname, clientPortNumber, extraReasons);
+
+ if (mailCreated && !testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed", e);
}
}
}
public void sendSuccessEmail(ExecutableFlow flow) {
+ EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+ message.setFromAddress(mailSender);
+
ExecutionOptions option = flow.getExecutionOptions();
- List<String> emailList = option.getSuccessEmails();
-
- int execId = flow.getExecutionId();
-
- if (emailList != null && !emailList.isEmpty()) {
- EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
- message.setFromAddress(mailSender);
- message.addAllToAddress(emailList);
- message.setMimeType("text/html");
- message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
-
- message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
- message.println("<table>");
- message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
- message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
- message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
- message.println("</table>");
- message.println("");
- String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
- message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-
- if (!testMode) {
- try {
- message.sendEmail();
- } catch (MessagingException e) {
- logger.error("Email message send failed" , e);
- }
+ MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+ logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+ boolean mailCreated = mailCreator.createSuccessEmail(flow, message, azkabanName, clientHostname, clientPortNumber);
+
+ if (mailCreated && !testMode) {
+ try {
+ message.sendEmail();
+ } catch (MessagingException e) {
+ logger.error("Email message send failed", e);
}
}
}
-
- private List<String> findFailedJobs(ExecutableFlow flow) {
+
+ public static List<String> findFailedJobs(ExecutableFlow flow) {
ArrayList<String> failedJobs = new ArrayList<String>();
- for (ExecutableNode node: flow.getExecutableNodes()) {
+ for (ExecutableNode node : flow.getExecutableNodes()) {
if (node.getStatus() == Status.FAILED) {
failedJobs.add(node.getJobId());
}
}
-
+
return failedJobs;
}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/executor/mail/DefaultMailCreator.java b/src/java/azkaban/executor/mail/DefaultMailCreator.java
new file mode 100644
index 0000000..eb4ba3f
--- /dev/null
+++ b/src/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -0,0 +1,149 @@
+package azkaban.executor.mail;
+
+import java.util.HashMap;
+import java.util.List;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorMailer;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.Utils;
+
+public class DefaultMailCreator implements MailCreator {
+ public static final String DEFAULT_MAIL_CREATOR = "default";
+ private static HashMap<String, MailCreator> registeredCreators = new HashMap<String, MailCreator>();
+ private static MailCreator defaultCreator;
+
+ public static void registerCreator(String name, MailCreator creator) {
+ registeredCreators.put(name, creator);
+ }
+
+ public static MailCreator getCreator(String name) {
+ MailCreator creator = registeredCreators.get(name);
+ if (creator == null) {
+ creator = defaultCreator;
+ }
+ return creator;
+ }
+
+ static {
+ defaultCreator = new DefaultMailCreator();
+ registerCreator(DEFAULT_MAIL_CREATOR, defaultCreator);
+ }
+
+ @Override
+ public boolean createFirstErrorMessage(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+ ExecutionOptions option = flow.getExecutionOptions();
+ List<String> emailList = option.getDisabledJobs();
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
+
+ if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
+ message.println("This flow is set to cancel all currently running jobs.");
+ }
+ else if (option.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE) {
+ message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
+ }
+ else {
+ message.println("This flow is set to complete all currently running jobs before stopping.");
+ }
+
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+ message.println("");
+ message.println("<h3>Reason</h3>");
+ List<String> failedJobs = ExecutorMailer.findFailedJobs(flow);
+ message.println("<ul>");
+ for (String jobId : failedJobs) {
+ message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>");
+ }
+
+ message.println("</ul>");
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+ ExecutionOptions option = flow.getExecutionOptions();
+
+ List<String> emailList = option.getFailureEmails();
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+ message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+ message.println("");
+ message.println("<h3>Reason</h3>");
+ List<String> failedJobs = ExecutorMailer.findFailedJobs(flow);
+ message.println("<ul>");
+ for (String jobId : failedJobs) {
+ message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>");
+ }
+ for (String reasons : vars) {
+ message.println("<li>" + reasons + "</li>");
+ }
+
+ message.println("</ul>");
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+ ExecutionOptions option = flow.getExecutionOptions();
+ List<String> emailList = option.getSuccessEmails();
+
+ int execId = flow.getExecutionId();
+
+ if (emailList != null && !emailList.isEmpty()) {
+ message.addAllToAddress(emailList);
+ message.setMimeType("text/html");
+ message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
+
+ message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
+ message.println("<table>");
+ message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+ message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+ message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+ message.println("</table>");
+ message.println("");
+ String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+ message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/src/java/azkaban/executor/mail/MailCreator.java b/src/java/azkaban/executor/mail/MailCreator.java
new file mode 100644
index 0000000..d80afd4
--- /dev/null
+++ b/src/java/azkaban/executor/mail/MailCreator.java
@@ -0,0 +1,10 @@
+package azkaban.executor.mail;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.utils.EmailMessage;
+
+public interface MailCreator {
+ public boolean createFirstErrorMessage(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+ public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+ public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+}
src/java/azkaban/flow/Flow.java 37(+37 -0)
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index 6c01df8..3846394 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import azkaban.executor.mail.DefaultMailCreator;
+
public class Flow {
private final String id;
private int projectId;
@@ -40,8 +42,10 @@ public class Flow {
private List<String> failureEmail = new ArrayList<String>();
private List<String> successEmail = new ArrayList<String>();
+ private String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
private ArrayList<String> errors;
private int version = -1;
+ private Map<String, Object> metadata = new HashMap<String, Object>();
private boolean isLayedOut = false;
@@ -106,10 +110,18 @@ public class Flow {
return successEmail;
}
+ public String getMailCreator() {
+ return mailCreator;
+ }
+
public List<String> getFailureEmails() {
return failureEmail;
}
+ public void setMailCreator(String mailCreator) {
+ this.mailCreator = mailCreator;
+ }
+
public void addSuccessEmails(Collection<String> emails) {
successEmail.addAll(emails);
}
@@ -226,10 +238,15 @@ public class Flow {
flowObj.put("edges", objectizeEdges());
flowObj.put("failure.email", failureEmail);
flowObj.put("success.email", successEmail);
+ flowObj.put("mailCreator", mailCreator);
flowObj.put("layedout", isLayedOut);
if (errors != null) {
flowObj.put("errors", errors);
}
+
+ if (metadata != null) {
+ flowObj.put("metadata", metadata);
+ }
return flowObj;
}
@@ -294,9 +311,18 @@ public class Flow {
List<Object> edgeList = (List<Object>)flowObject.get("edges");
List<Edge> edges = loadEdgeFromObjects(edgeList, nodes);
flow.addAllEdges(edges);
+
+ Map<String, Object> metadata = (Map<String, Object>)flowObject.get("metadata");
+
+ if (metadata != null) {
+ flow.setMetadata(metadata);
+ }
flow.failureEmail = (List<String>)flowObject.get("failure.email");
flow.successEmail = (List<String>)flowObject.get("success.email");
+ if (flowObject.containsKey("mailCreator")) {
+ flow.mailCreator = flowObject.get("mailCreator").toString();
+ }
return flow;
}
@@ -337,6 +363,17 @@ public class Flow {
return isLayedOut;
}
+ public Map<String, Object> getMetadata() {
+ if(metadata == null){
+ metadata = new HashMap<String, Object>();
+ }
+ return metadata;
+ }
+
+ public void setMetadata(Map<String, Object> metadata) {
+ this.metadata = metadata;
+ }
+
public void setLayedOut(boolean layedOut) {
this.isLayedOut = layedOut;
}
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 972b531..1adb0ad 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -681,6 +681,41 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
}
}
+ @Override
+ public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException {
+ logger.info("Uploading flows");
+ Connection connection = getConnection();
+
+ try {
+ QueryRunner runner = new QueryRunner();
+ String json = JSONUtils.toJSON(flow.toObject());
+ byte[] stringData = json.getBytes("UTF-8");
+ byte[] data = stringData;
+
+ logger.info("UTF-8 size:" + data.length);
+ if (defaultEncodingType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+
+ logger.info("Flow upload " + flow.getId() + " is byte size " + data.length);
+ final String UPDATE_FLOW = "UPDATE project_flows SET encoding_type=?,json=? WHERE project_id=? AND version=? AND flow_id=?";
+ try {
+ runner.update(connection, UPDATE_FLOW, defaultEncodingType.getNumVal(), data, project.getId(), version, flow.getId());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
+ }
+ connection.commit();
+ } catch (IOException e) {
+ throw new ProjectManagerException("Flow Upload failed.", e);
+ } catch (SQLException e) {
+ throw new ProjectManagerException("Flow Upload failed commit.", e);
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
public EncodingType getDefaultEncodingType() {
return defaultEncodingType;
}
src/java/azkaban/project/Project.java 28(+28 -0)
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index d2e1f52..47ebc4b 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -45,6 +45,7 @@ public class Project {
private LinkedHashMap<String, Permission> groupPermissionMap = new LinkedHashMap<String, Permission>();
private Map<String, Flow> flows = null;
private HashSet<String> proxyUsers = new HashSet<String>();
+ private Map<String, Object> metadata = new HashMap<String, Object>();
public Project(int id, String name) {
this.id = id;
@@ -67,6 +68,10 @@ public class Project {
return flows.get(flowId);
}
+ public Map<String, Flow> getFlowMap() {
+ return flows;
+ }
+
public List<Flow> getFlows() {
List<Flow> retFlow = null;
if (flows != null) {
@@ -218,6 +223,10 @@ public class Project {
userPermissionMap.remove(userId);
}
+ public void clearUserPermission() {
+ userPermissionMap.clear();
+ }
+
public long getCreateTimestamp() {
return createTimestamp;
}
@@ -251,6 +260,10 @@ public class Project {
if (source != null) {
projectObject.put("source", source);
}
+
+ if (metadata != null) {
+ projectObject.put("metadata", metadata);
+ }
ArrayList<Map<String, Object>> users = new ArrayList<Map<String, Object>>();
for (Map.Entry<String, Permission> entry : userPermissionMap.entrySet()) {
@@ -282,6 +295,7 @@ public class Project {
Boolean active = (Boolean)projectObject.get("active");
active = active == null ? true : active;
int version = (Integer)projectObject.get("version");
+ Map<String, Object> metadata = (Map<String, Object>)projectObject.get("metadata");
Project project = new Project(id, name);
project.setVersion(version);
@@ -294,6 +308,9 @@ public class Project {
if (source != null) {
project.setSource(source);
}
+ if (metadata != null) {
+ project.setMetadata(metadata);
+ }
List<Map<String, Object>> users = (List<Map<String, Object>>) projectObject
.get("users");
@@ -403,6 +420,17 @@ public class Project {
this.source = source;
}
+ public Map<String, Object> getMetadata() {
+ if(metadata == null){
+ metadata = new HashMap<String, Object>();
+ }
+ return metadata;
+ }
+
+ protected void setMetadata(Map<String, Object> metadata) {
+ this.metadata = metadata;
+ }
+
public int getId() {
return id;
}
diff --git a/src/java/azkaban/project/ProjectLoader.java b/src/java/azkaban/project/ProjectLoader.java
index 7e99298..ea6e5df 100644
--- a/src/java/azkaban/project/ProjectLoader.java
+++ b/src/java/azkaban/project/ProjectLoader.java
@@ -122,6 +122,9 @@ public interface ProjectLoader {
* @throws ProjectManagerException
*/
public void changeProjectVersion(Project project, int version, String user) throws ProjectManagerException;
+
+
+ public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException;
/**
* Uploads all computed flows
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index f610e40..a6e8177 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -352,6 +352,10 @@ public class ProjectManager {
projectLoader.cleanOlderProjectVersion(project.getId(), project.getVersion() - projectVersionRetention);
}
+ public void updateFlow(Project project, Flow flow) throws ProjectManagerException {
+ projectLoader.updateFlow(project, flow.getVersion(), flow);
+ }
+
private File unzipFile(File archiveFile) throws IOException {
ZipFile zipfile = new ZipFile(archiveFile);
File unzipped = Utils.createTempDir(tempDir);
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index c60f143..e964a8f 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -159,7 +159,6 @@ public class ScheduleManager {
* @param id
*/
public synchronized void removeSchedule(Schedule sched) {
-
Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
if(schedules != null) {
@@ -428,6 +427,8 @@ public class ScheduleManager {
flowOptions.setSuccessEmails(flow.getSuccessEmails());
}
+ flowOptions.setMailCreator(flow.getMailCreator());
+
try {
executorManager.submitExecutableFlow(exflow);
logger.info("Scheduler has invoked " + exflow.getExecutionId());
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index 06ca9bc..10f540c 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -149,15 +149,16 @@ public class EmailMessage {
if (_attachments.size() > 0) {
MimeMultipart multipart = new MimeMultipart("related");
+
+ BodyPart messageBodyPart = new MimeBodyPart();
+ messageBodyPart.setContent(_body.toString(), _mimeType);
+ multipart.addBodyPart(messageBodyPart);
+
// Add attachments
for (BodyPart part : _attachments) {
multipart.addBodyPart(part);
}
- BodyPart messageBodyPart = new MimeBodyPart();
- messageBodyPart.setContent(_body.toString(), _mimeType);
- multipart.addBodyPart(messageBodyPart);
-
message.setContent(multipart);
} else {
message.setContent(_body.toString(), _mimeType);
src/java/azkaban/utils/Utils.java 12(+12 -0)
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index fb4419f..ba3eeb0 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -116,6 +116,18 @@ public class Utils {
zOut.close();
}
+ public static void zipFolderContent(File folder, File output) throws IOException {
+ FileOutputStream out = new FileOutputStream(output);
+ ZipOutputStream zOut = new ZipOutputStream(out);
+ File[] files = folder.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ zipFile("", f, zOut);
+ }
+ }
+ zOut.close();
+ }
+
private static void zipFile(String path, File input, ZipOutputStream zOut) throws IOException {
if (input.isDirectory()) {
File[] files = input.listFiles();
src/java/azkaban/utils/WebUtils.java 3(+1 -2)
diff --git a/src/java/azkaban/utils/WebUtils.java b/src/java/azkaban/utils/WebUtils.java
index ef63fb6..c337e41 100644
--- a/src/java/azkaban/utils/WebUtils.java
+++ b/src/java/azkaban/utils/WebUtils.java
@@ -1,6 +1,7 @@
package azkaban.utils;
import java.text.NumberFormat;
+import java.util.Collection;
import org.joda.time.DateTime;
import org.joda.time.DurationFieldType;
@@ -154,6 +155,4 @@ public class WebUtils {
else
return sizeBytes + " B";
}
-
-
}
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 8810352..8e87919 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -63,7 +63,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
private String color;
private List<ViewerPlugin> viewerPlugins;
-
+
/**
* To retrieve the application for the servlet
*
@@ -92,7 +92,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
viewerPlugins = server.getViewerPlugins();
}
}
-
+
/**
* Checks for the existance of the parameter in the request
*
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index a542a82..eb14851 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -760,6 +760,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
if (!options.isSuccessEmailsOverridden()) {
options.setSuccessEmails(flow.getSuccessEmails());
}
+ options.setMailCreator(flow.getMailCreator());
try {
String message = executorManager.submitExecutableFlow(exflow);
diff --git a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
index 4e71930..7b80139 100644
--- a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
+++ b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
@@ -10,6 +10,7 @@ import javax.servlet.http.HttpServletRequest;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.mail.DefaultMailCreator;
public class HttpRequestUtils {
public static ExecutionOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
@@ -36,7 +37,7 @@ public class HttpRequestUtils {
boolean override = getBooleanParam(req, "successEmailsOverride", false);
execOptions.setSuccessEmailsOverridden(override);
}
-
+
if (hasParam(req, "failureEmails")) {
String emails = getParam(req, "failureEmails");
if (!emails.isEmpty()) {
@@ -72,6 +73,11 @@ public class HttpRequestUtils {
execOptions.setPipelineLevel(queueLevel);
}
}
+ String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
+ if (hasParam(req, "mailCreator")) {
+ mailCreator = getParam(req, "mailCreator");
+ execOptions.setMailCreator(mailCreator);
+ }
Map<String, String> flowParamGroup = getParamGroup(req, "flowOverride");
execOptions.setFlowParameters(flowParamGroup);
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 891e896..8ce912b 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -16,7 +16,11 @@
package azkaban.webapp.servlet;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.Writer;
import java.util.HashMap;
import java.util.Map;
@@ -29,6 +33,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
+import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import azkaban.project.Project;
@@ -51,6 +56,18 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
private static final String SESSION_ID_NAME = "azkaban.browser.session.id";
private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
+ private static HashMap<String, String> contextType = new HashMap<String,String>();
+ static {
+ contextType.put(".js", "application/javascript");
+ contextType.put(".css", "text/css");
+ contextType.put(".png", "image/png");
+ contextType.put(".jpeg", "image/jpeg");
+ contextType.put(".jpg", "image/jpeg");
+ }
+
+
+ private File webResourceDirectory = null;
+
private MultipartParser multipartParser;
@Override
@@ -60,6 +77,10 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
}
+ public void setResourceDirectory(File file) {
+ this.webResourceDirectory = file;
+ }
+
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// Set session id
@@ -74,6 +95,10 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
if (session != null) {
logger.info("Found session " + session.getUser());
+ if (handleFileGet(req, resp)) {
+ return;
+ }
+
handleGet(req, resp, session);
} else {
if (hasParam(req, "ajax")) {
@@ -87,6 +112,46 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
}
}
+ private boolean handleFileGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ if (webResourceDirectory == null) {
+ return false;
+ }
+
+ // Check if it's a resource
+ String prefix = req.getContextPath() + req.getServletPath();
+ String path = req.getRequestURI().substring(prefix.length());
+ int index = path.lastIndexOf('.');
+ if (index == -1 ) {
+ return false;
+ }
+
+ String extension = path.substring(index);
+ if (contextType.containsKey(extension)) {
+ File file = new File(webResourceDirectory, path);
+ if (!file.exists() || !file.isFile()) {
+ return false;
+ }
+
+ resp.setContentType(contextType.get(extension));
+
+ OutputStream output = resp.getOutputStream();
+ BufferedInputStream input = null;
+ try {
+ input = new BufferedInputStream(new FileInputStream(file));
+ IOUtils.copy(input, output);
+ }
+ finally {
+ if (input != null) {
+ input.close();
+ }
+ }
+ output.flush();
+ return true;
+ }
+
+ return false;
+ }
+
private Session getSessionFromRequest(HttpServletRequest req) throws ServletException {
String remoteIp = req.getRemoteAddr();
Cookie cookie = getCookieByName(req, SESSION_ID_NAME);
src/sql/create_schedule_table.sql 4(+2 -2)
diff --git a/src/sql/create_schedule_table.sql b/src/sql/create_schedule_table.sql
index 32b7d86..061de26 100644
--- a/src/sql/create_schedule_table.sql
+++ b/src/sql/create_schedule_table.sql
@@ -13,6 +13,6 @@ CREATE TABLE schedules (
submit_user VARCHAR(128),
enc_type TINYINT,
schedule_options LONGBLOB,
- PRIMARY KEY (schedule_id)
- INDEX project_id (project_id, flow_name),
+ PRIMARY KEY (schedule_id),
+ INDEX project_id (project_id, flow_name)
);
diff --git a/unit/java/azkaban/test/execapp/MockProjectLoader.java b/unit/java/azkaban/test/execapp/MockProjectLoader.java
index b10ea29..ef76de7 100644
--- a/unit/java/azkaban/test/execapp/MockProjectLoader.java
+++ b/unit/java/azkaban/test/execapp/MockProjectLoader.java
@@ -224,4 +224,10 @@ public class MockProjectLoader implements ProjectLoader {
// TODO Auto-generated method stub
}
+
+ @Override
+ public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException {
+ // TODO Auto-generated method stub
+
+ }
}
\ No newline at end of file