azkaban-uncached

Added MailCreator interface for creating custom emails. Added

7/29/2013 4:52:03 PM

Details

diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 152a546..e2301bf 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -39,7 +39,7 @@ public class ExecutableFlow {
 	private int version;
 
 	private String executionPath;
-	
+
 	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
 	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
 	private ArrayList<String> startNodes;
@@ -52,10 +52,10 @@ public class ExecutableFlow {
 
 	private Status flowStatus = Status.READY;
 	private String submitUser;
-	
+
 	private HashSet<String> proxyUsers = new HashSet<String>();
 	private ExecutionOptions executionOptions;
-	
+
 	public ExecutableFlow(Flow flow) {
 		this.projectId = flow.getProjectId();
 		this.scheduleId = -1;
@@ -63,79 +63,81 @@ public class ExecutableFlow {
 		this.version = flow.getVersion();
 		this.setFlow(flow);
 	}
-	
+
 	public ExecutableFlow(int executionId, Flow flow) {
 		this.projectId = flow.getProjectId();
 		this.scheduleId = -1;
 		this.flowId = flow.getId();
 		this.version = flow.getVersion();
 		this.executionId = executionId;
-		
+
 		this.setFlow(flow);
 	}
-	
+
 	public ExecutableFlow() {
 	}
-	
+
 	public long getUpdateTime() {
 		return updateTime;
 	}
-	
+
 	public void setUpdateTime(long updateTime) {
 		this.updateTime = updateTime;
 	}
-	
+
 	public List<ExecutableNode> getExecutableNodes() {
 		return new ArrayList<ExecutableNode>(executableNodes.values());
 	}
-	
+
 	public ExecutableNode getExecutableNode(String id) {
 		return executableNodes.get(id);
 	}
-	
+
 	public Collection<FlowProps> getFlowProps() {
 		return flowProps.values();
 	}
-	
+
 	public void addAllProxyUsers(Collection<String> proxyUsers) {
 		this.proxyUsers.addAll(proxyUsers);
 	}
-	
+
 	public Set<String> getProxyUsers() {
 		return new HashSet<String>(this.proxyUsers);
 	}
-	
+
 	public void setExecutionOptions(ExecutionOptions options) {
 		executionOptions = options;
 	}
-	
+
 	public ExecutionOptions getExecutionOptions() {
 		return executionOptions;
 	}
-	
+
 	private void setFlow(Flow flow) {
 		executionOptions = new ExecutionOptions();
-		
+
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
 			ExecutableNode exNode = new ExecutableNode(node, this);
 			executableNodes.put(id, exNode);
 		}
-		
+
 		for (Edge edge: flow.getEdges()) {
 			ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
 			ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
-			
+
 			sourceNode.addOutNode(edge.getTargetId());
 			targetNode.addInNode(edge.getSourceId());
 		}
-		
+
 		if (flow.getSuccessEmails() != null) {
 			executionOptions.setSuccessEmails(flow.getSuccessEmails());
 		}
 		if (flow.getFailureEmails() != null) {
 			executionOptions.setFailureEmails(flow.getFailureEmails());
 		}
+		executionOptions.setMailCreator(flow.getMailCreator());
+
 		flowProps.putAll(flow.getAllFlowProps());
 	}
 
@@ -148,10 +150,10 @@ public class ExecutableFlow {
 				}
 			}
 		}
-		
+
 		return startNodes;
 	}
-	
+
 	public List<String> getEndNodes() {
 		if (endNodes == null) {
 			endNodes = new ArrayList<String>();
@@ -161,10 +163,10 @@ public class ExecutableFlow {
 				}
 			}
 		}
-		
+
 		return endNodes;
 	}
-	
+
 	public boolean setNodeStatus(String nodeId, Status status) {
 		ExecutableNode exNode = executableNodes.get(nodeId);
 		if (exNode == null) {
@@ -179,18 +181,18 @@ public class ExecutableFlow {
 		if (exNode == null) {
 			return;
 		}
-		
+
 		exNode.setExternalExecutionId(externalExecutionId);
 	}
-	
+
 	public int getExecutionId() {
 		return executionId;
 	}
 
 	public void setExecutionId(int executionId) {
 		this.executionId = executionId;
-		
-		for(ExecutableNode node: executableNodes.values()) {
+
+		for (ExecutableNode node: executableNodes.values()) {
 			node.setExecutionId(executionId);
 		}
 	}
@@ -226,31 +228,31 @@ public class ExecutableFlow {
 	public void setExecutionPath(String executionPath) {
 		this.executionPath = executionPath;
 	}
-	
+
 	public long getStartTime() {
 		return startTime;
 	}
-	
+
 	public void setStartTime(long time) {
 		this.startTime = time;
 	}
-	
+
 	public long getEndTime() {
 		return endTime;
 	}
-	
+
 	public void setEndTime(long time) {
 		this.endTime = time;
 	}
-	
+
 	public long getSubmitTime() {
 		return submitTime;
 	}
-	
+
 	public void setSubmitTime(long time) {
 		this.submitTime = time;
 	}
-	
+
 	public Status getStatus() {
 		return flowStatus;
 	}
@@ -258,16 +260,16 @@ public class ExecutableFlow {
 	public void setStatus(Status flowStatus) {
 		this.flowStatus = flowStatus;
 	}
-	
-	public Map<String,Object> toObject() {
+
+	public Map<String, Object> toObject() {
 		HashMap<String, Object> flowObj = new HashMap<String, Object>();
 		flowObj.put("type", "executableflow");
 		flowObj.put("executionId", executionId);
 		flowObj.put("executionPath", executionPath);
 		flowObj.put("flowId", flowId);
 		flowObj.put("projectId", projectId);
-		
-		if(scheduleId >= 0) {
+
+		if (scheduleId >= 0) {
 			flowObj.put("scheduleId", scheduleId);
 		}
 		flowObj.put("submitTime", submitTime);
@@ -276,16 +278,16 @@ public class ExecutableFlow {
 		flowObj.put("status", flowStatus.toString());
 		flowObj.put("submitUser", submitUser);
 		flowObj.put("version", version);
-		
+
 		flowObj.put("executionOptions", this.executionOptions.toObject());
 		flowObj.put("version", version);
-		
+
 		ArrayList<Object> props = new ArrayList<Object>();
 		for (FlowProps fprop: flowProps.values()) {
 			HashMap<String, Object> propObj = new HashMap<String, Object>();
 			String source = fprop.getSource();
 			String inheritedSource = fprop.getInheritedSource();
-			
+
 			propObj.put("source", source);
 			if (inheritedSource != null) {
 				propObj.put("inherited", inheritedSource);
@@ -293,13 +295,13 @@ public class ExecutableFlow {
 			props.add(propObj);
 		}
 		flowObj.put("properties", props);
-		
+
 		ArrayList<Object> nodes = new ArrayList<Object>();
 		for (ExecutableNode node: executableNodes.values()) {
 			nodes.add(node.toObject());
 		}
 		flowObj.put("nodes", nodes);
-		
+
 		ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
 		flowObj.put("proxyUsers", proxyUserList);
 
@@ -307,57 +309,57 @@ public class ExecutableFlow {
 	}
 
 	public Object toUpdateObject(long lastUpdateTime) {
-		Map<String, Object> updateData = new HashMap<String,Object>();
+		Map<String, Object> updateData = new HashMap<String, Object>();
 		updateData.put("execId", this.executionId);
 		updateData.put("status", this.flowStatus.getNumVal());
 		updateData.put("startTime", this.startTime);
 		updateData.put("endTime", this.endTime);
 		updateData.put("updateTime", this.updateTime);
-		
-		List<Map<String,Object>> updatedNodes = new ArrayList<Map<String,Object>>();
+
+		List<Map<String, Object>> updatedNodes = new ArrayList<Map<String, Object>>();
 		for (ExecutableNode node: executableNodes.values()) {
-			
+
 			if (node.getUpdateTime() > lastUpdateTime) {
-				Map<String, Object> updatedNodeMap = new HashMap<String,Object>();
+				Map<String, Object> updatedNodeMap = new HashMap<String, Object>();
 				updatedNodeMap.put("jobId", node.getJobId());
 				updatedNodeMap.put("status", node.getStatus().getNumVal());
 				updatedNodeMap.put("startTime", node.getStartTime());
 				updatedNodeMap.put("endTime", node.getEndTime());
 				updatedNodeMap.put("updateTime", node.getUpdateTime());
 				updatedNodeMap.put("attempt", node.getAttempt());
-				
+
 				if (node.getAttempt() > 0) {
-					ArrayList<Map<String,Object>> pastAttempts = new ArrayList<Map<String,Object>>();
+					ArrayList<Map<String, Object>> pastAttempts = new ArrayList<Map<String, Object>>();
 					for (Attempt attempt: node.getPastAttemptList()) {
 						pastAttempts.add(attempt.toObject());
 					}
 					updatedNodeMap.put("pastAttempts", pastAttempts);
 				}
-				
+
 				updatedNodes.add(updatedNodeMap);
 			}
 		}
-		
+
 		updateData.put("nodes", updatedNodes);
 		return updateData;
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public void applyUpdateObject(Map<String, Object> updateData) {
-		List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get("nodes");
-		for (Map<String,Object> node: updatedNodes) {
+		List<Map<String, Object>> updatedNodes = (List<Map<String, Object>>)updateData.get("nodes");
+		for (Map<String, Object> node: updatedNodes) {
 			String jobId = (String)node.get("jobId");
 			Status status = Status.fromInteger((Integer)node.get("status"));
 			long startTime = JSONUtils.getLongFromObject(node.get("startTime"));
 			long endTime = JSONUtils.getLongFromObject(node.get("endTime"));
 			long updateTime = JSONUtils.getLongFromObject(node.get("updateTime"));
-			
+
 			ExecutableNode exNode = executableNodes.get(jobId);
 			exNode.setEndTime(endTime);
 			exNode.setStartTime(startTime);
 			exNode.setUpdateTime(updateTime);
 			exNode.setStatus(status);
-			
+
 			int attempt = 0;
 			if (node.containsKey("attempt")) {
 				attempt = (Integer)node.get("attempt");
@@ -365,22 +367,22 @@ public class ExecutableFlow {
 					exNode.updatePastAttempts((List<Object>)node.get("pastAttempts"));
 				}
 			}
-			
+
 			exNode.setAttempt(attempt);
 		}
-		
+
 		this.flowStatus = Status.fromInteger((Integer)updateData.get("status"));
-		
+
 		this.startTime = JSONUtils.getLongFromObject(updateData.get("startTime"));
 		this.endTime = JSONUtils.getLongFromObject(updateData.get("endTime"));
 		this.updateTime = JSONUtils.getLongFromObject(updateData.get("updateTime"));
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
 		ExecutableFlow exFlow = new ExecutableFlow();
-		
-		HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
+
+		HashMap<String, Object> flowObj = (HashMap<String, Object>)obj;
 		exFlow.executionId = (Integer)flowObj.get("executionId");
 		exFlow.executionPath = (String)flowObj.get("executionPath");
 		exFlow.flowId = (String)flowObj.get("flowId");
@@ -394,7 +396,7 @@ public class ExecutableFlow {
 		exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
 		exFlow.submitUser = (String)flowObj.get("submitUser");
 		exFlow.version = (Integer)flowObj.get("version");
-		
+
 		if (flowObj.containsKey("executionOptions")) {
 			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get("executionOptions"));
 		}
@@ -402,7 +404,7 @@ public class ExecutableFlow {
 			// for backawards compatibility should remove in a few versions.
 			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj);
 		}
-		
+
 		// Copy nodes
 		List<Object> nodes = (List<Object>)flowObj.get("nodes");
 		for (Object nodeObj: nodes) {
@@ -411,57 +413,57 @@ public class ExecutableFlow {
 		}
 
 		List<Object> properties = (List<Object>)flowObj.get("properties");
-		for (Object propNode : properties) {
+		for (Object propNode: properties) {
 			HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
 			String source = (String)fprop.get("source");
 			String inheritedSource = (String)fprop.get("inherited");
-			
+
 			FlowProps flowProps = new FlowProps(inheritedSource, source);
 			exFlow.flowProps.put(source, flowProps);
 		}
-		
-		if(flowObj.containsKey("proxyUsers")) {
-			ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
+
+		if (flowObj.containsKey("proxyUsers")) {
+			ArrayList<String> proxyUserList = (ArrayList<String>)flowObj.get("proxyUsers");
 			exFlow.addAllProxyUsers(proxyUserList);
 		}
-		
+
 		return exFlow;
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public void updateExecutableFlowFromObject(Object obj) {
-		HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
+		HashMap<String, Object> flowObj = (HashMap<String, Object>)obj;
 
 		submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
 		startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
 		endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
 		flowStatus = Status.valueOf((String)flowObj.get("status"));
-		
+
 		List<Object> nodes = (List<Object>)flowObj.get("nodes");
 		for (Object nodeObj: nodes) {
-			HashMap<String, Object> nodeHash= (HashMap<String, Object>)nodeObj;
+			HashMap<String, Object> nodeHash = (HashMap<String, Object>)nodeObj;
 			String nodeId = (String)nodeHash.get("id");
 			ExecutableNode node = executableNodes.get(nodeId);
 			if (nodeId == null) {
 				throw new RuntimeException("Node " + nodeId + " doesn't exist in flow.");
 			}
-			
+
 			node.updateNodeFromObject(nodeObj);
 		}
 	}
-	
+
 	public Set<String> getSources() {
 		HashSet<String> set = new HashSet<String>();
 		for (ExecutableNode exNode: executableNodes.values()) {
 			set.add(exNode.getJobPropsSource());
 		}
-		
+
 		for (FlowProps props: flowProps.values()) {
 			set.add(props.getSource());
 		}
 		return set;
 	}
-	
+
 	public String getSubmitUser() {
 		return submitUser;
 	}
@@ -469,7 +471,7 @@ public class ExecutableFlow {
 	public void setSubmitUser(String submitUser) {
 		this.submitUser = submitUser;
 	}
-	
+
 	public int getVersion() {
 		return version;
 	}
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index a91a6d0..2710f0b 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -8,6 +8,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import azkaban.executor.mail.DefaultMailCreator;
+
 /**
  * Execution options for submitted flows and scheduled flows
  */
@@ -27,6 +29,7 @@ public class ExecutionOptions {
 	private Integer pipelineExecId = null;
 	private Integer queueLevel = 0;
 	private String concurrentOption = CONCURRENT_OPTION_IGNORE;
+	private String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
 	private Map<String, String> flowParameters = new HashMap<String, String>();
 	
 	public enum FailureAction {
@@ -107,10 +110,18 @@ public class ExecutionOptions {
 		this.concurrentOption = concurrentOption;
 	}
 	
+	public void setMailCreator(String mailCreator) {
+		this.mailCreator = mailCreator;
+	}
+	
 	public String getConcurrentOption() {
 		return concurrentOption;
 	}
 	
+	public String getMailCreator() {
+		return mailCreator;
+	}
+	
 	public Integer getPipelineLevel() {
 		return pipelineLevel;
 	}
@@ -152,6 +163,7 @@ public class ExecutionOptions {
 		flowOptionObj.put("pipelineExecId", pipelineExecId);
 		flowOptionObj.put("queueLevel", queueLevel);
 		flowOptionObj.put("concurrentOption", concurrentOption);
+		flowOptionObj.put("mailCreator", mailCreator);
 		flowOptionObj.put("disabled", initiallyDisabledJobs);
 		flowOptionObj.put("failureEmailsOverride", failureEmailsOverride);
 		flowOptionObj.put("successEmailsOverride", successEmailsOverride);
@@ -180,6 +192,9 @@ public class ExecutionOptions {
 		if (optionsMap.containsKey("concurrentOption")) {
 			options.concurrentOption = (String)optionsMap.get("concurrentOption");
 		}
+		if (optionsMap.containsKey("mailCreator")) {
+			options.mailCreator = (String)optionsMap.get("mailCreator");
+		}
 		if (optionsMap.containsKey("disabled")) {
 			options.initiallyDisabledJobs = new HashSet<String>((List<String>)optionsMap.get("disabled"));
 		}
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index a49b4fb..4efcb38 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -7,179 +7,104 @@ import javax.mail.MessagingException;
 
 import org.apache.log4j.Logger;
 
-import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.executor.mail.MailCreator;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 
 public class ExecutorMailer {
 	private static Logger logger = Logger.getLogger(ExecutorMailer.class);
-	
+
 	private boolean testMode = false;
 	private String clientHostname;
 	private String clientPortNumber;
-	
+
 	private String mailHost;
 	private String mailUser;
 	private String mailPassword;
 	private String mailSender;
 	private String azkabanName;
-	
+
 	public ExecutorMailer(Props props) {
 		this.azkabanName = props.getString("azkaban.name", "azkaban");
 		this.mailHost = props.getString("mail.host", "localhost");
 		this.mailUser = props.getString("mail.user", "");
 		this.mailPassword = props.getString("mail.password", "");
 		this.mailSender = props.getString("mail.sender", "");
-		
+
 		this.clientHostname = props.getString("jetty.hostname", "localhost");
 		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
-		
+
 		testMode = props.getBoolean("test.mode", false);
 	}
-	
+
 	public void sendFirstErrorMessage(ExecutableFlow flow) {
+		EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+		message.setFromAddress(mailSender);
+
 		ExecutionOptions option = flow.getExecutionOptions();
-		List<String> emailList = option.getDisabledJobs();
-		int execId = flow.getExecutionId();
-		
-		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
-			message.setFromAddress(mailSender);
-			message.addAllToAddress(emailList);
-			message.setMimeType("text/html");
-			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
-			
-			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
-			
-			if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
-				message.println("This flow is set to cancel all currently running jobs.");
-			}
-			else if (option.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
-				message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
-			}
-			else {
-				message.println("This flow is set to complete all currently running jobs before stopping.");
-			}
-			
-			message.println("<table>");
-			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
-			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
-			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
-			message.println("</table>");
-			message.println("");
-			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
-			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-			
-			message.println("");
-			message.println("<h3>Reason</h3>");
-			List<String> failedJobs = findFailedJobs(flow);
-			message.println("<ul>");
-			for (String jobId : failedJobs) {
-				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
-			}
-			
-			message.println("</ul>");
-			
-			if (!testMode) {
-				try {
-					message.sendEmail();
-				} catch (MessagingException e) {
-					logger.error("Email message send failed" , e);
-				}
+		MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+
+		logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+		boolean mailCreated = mailCreator.createFirstErrorMessage(flow, message, azkabanName, clientHostname, clientPortNumber);
+
+		if (mailCreated && !testMode) {
+			try {
+				message.sendEmail();
+			} catch (MessagingException e) {
+				logger.error("Email message send failed", e);
 			}
 		}
 	}
-	
-	public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
+
+	public void sendErrorEmail(ExecutableFlow flow, String... extraReasons) {
+		EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+		message.setFromAddress(mailSender);
+
 		ExecutionOptions option = flow.getExecutionOptions();
-		
-		List<String> emailList = option.getFailureEmails();
-		int execId = flow.getExecutionId();
-		
-		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
-			message.setFromAddress(mailSender);
-			message.addAllToAddress(emailList);
-			message.setMimeType("text/html");
-			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
-			
-			message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
-			message.println("<table>");
-			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
-			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
-			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
-			message.println("</table>");
-			message.println("");
-			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
-			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-			
-			message.println("");
-			message.println("<h3>Reason</h3>");
-			List<String> failedJobs = findFailedJobs(flow);
-			message.println("<ul>");
-			for (String jobId : failedJobs) {
-				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
-			}
-			for (String reasons: extraReasons) {
-				message.println("<li>" + reasons + "</li>");
-			}
-			
-			message.println("</ul>");
-			
-			
-			
-			if (!testMode) {
-				try {
-					message.sendEmail();
-				} catch (MessagingException e) {
-					logger.error("Email message send failed" , e);
-				}
+		MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+		logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+		boolean mailCreated = mailCreator.createErrorEmail(flow, message, azkabanName, clientHostname, clientPortNumber, extraReasons);
+
+		if (mailCreated && !testMode) {
+			try {
+				message.sendEmail();
+			} catch (MessagingException e) {
+				logger.error("Email message send failed", e);
 			}
 		}
 	}
 
 	public void sendSuccessEmail(ExecutableFlow flow) {
+		EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+		message.setFromAddress(mailSender);
+
 		ExecutionOptions option = flow.getExecutionOptions();
-		List<String> emailList = option.getSuccessEmails();
-
-		int execId = flow.getExecutionId();
-		
-		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
-			message.setFromAddress(mailSender);
-			message.addAllToAddress(emailList);
-			message.setMimeType("text/html");
-			message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
-			
-			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
-			message.println("<table>");
-			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
-			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
-			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
-			message.println("</table>");
-			message.println("");
-			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
-			message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-			
-			if (!testMode) {
-				try {
-					message.sendEmail();
-				} catch (MessagingException e) {
-					logger.error("Email message send failed" , e);
-				}
+		MailCreator mailCreator = DefaultMailCreator.getCreator(option.getMailCreator());
+		logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
+
+		boolean mailCreated = mailCreator.createSuccessEmail(flow, message, azkabanName, clientHostname, clientPortNumber);
+
+		if (mailCreated && !testMode) {
+			try {
+				message.sendEmail();
+			} catch (MessagingException e) {
+				logger.error("Email message send failed", e);
 			}
 		}
 	}
-	
-	private List<String> findFailedJobs(ExecutableFlow flow) {
+
+	public static List<String> findFailedJobs(ExecutableFlow flow) {
 		ArrayList<String> failedJobs = new ArrayList<String>();
-		for (ExecutableNode node: flow.getExecutableNodes()) {
+		for (ExecutableNode node : flow.getExecutableNodes()) {
 			if (node.getStatus() == Status.FAILED) {
 				failedJobs.add(node.getJobId());
 			}
 		}
-		
+
 		return failedJobs;
 	}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/executor/mail/DefaultMailCreator.java b/src/java/azkaban/executor/mail/DefaultMailCreator.java
new file mode 100644
index 0000000..eb4ba3f
--- /dev/null
+++ b/src/java/azkaban/executor/mail/DefaultMailCreator.java
@@ -0,0 +1,149 @@
+package azkaban.executor.mail;
+
+import java.util.HashMap;
+import java.util.List;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorMailer;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.Utils;
+
+public class DefaultMailCreator implements MailCreator {
+	public static final String DEFAULT_MAIL_CREATOR = "default";
+	private static HashMap<String, MailCreator> registeredCreators = new HashMap<String, MailCreator>();
+	private static MailCreator defaultCreator;
+
+	public static void registerCreator(String name, MailCreator creator) {
+		registeredCreators.put(name, creator);
+	}
+
+	public static MailCreator getCreator(String name) {
+		MailCreator creator = registeredCreators.get(name);
+		if (creator == null) {
+			creator = defaultCreator;
+		}
+		return creator;
+	}
+
+	static {
+		defaultCreator = new DefaultMailCreator();
+		registerCreator(DEFAULT_MAIL_CREATOR, defaultCreator);
+	}
+
+	@Override
+	public boolean createFirstErrorMessage(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+		ExecutionOptions option = flow.getExecutionOptions();
+		List<String> emailList = option.getDisabledJobs();
+		int execId = flow.getExecutionId();
+
+		if (emailList != null && !emailList.isEmpty()) {
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
+
+			if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
+				message.println("This flow is set to cancel all currently running jobs.");
+			}
+			else if (option.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE) {
+				message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
+			}
+			else {
+				message.println("This flow is set to complete all currently running jobs before stopping.");
+			}
+
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+			message.println("");
+			message.println("<h3>Reason</h3>");
+			List<String> failedJobs = ExecutorMailer.findFailedJobs(flow);
+			message.println("<ul>");
+			for (String jobId : failedJobs) {
+				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>");
+			}
+
+			message.println("</ul>");
+			return true;
+		}
+
+		return false;
+	}
+
+	@Override
+	public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+		ExecutionOptions option = flow.getExecutionOptions();
+
+		List<String> emailList = option.getFailureEmails();
+		int execId = flow.getExecutionId();
+
+		if (emailList != null && !emailList.isEmpty()) {
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+
+			message.println("");
+			message.println("<h3>Reason</h3>");
+			List<String> failedJobs = ExecutorMailer.findFailedJobs(flow);
+			message.println("<ul>");
+			for (String jobId : failedJobs) {
+				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>");
+			}
+			for (String reasons : vars) {
+				message.println("<li>" + reasons + "</li>");
+			}
+
+			message.println("</ul>");
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars) {
+
+		ExecutionOptions option = flow.getExecutionOptions();
+		List<String> emailList = option.getSuccessEmails();
+
+		int execId = flow.getExecutionId();
+
+		if (emailList != null && !emailList.isEmpty()) {
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
+
+			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() + "</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() + "</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) + "</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+			return true;
+		}
+		return false;
+	}
+}
diff --git a/src/java/azkaban/executor/mail/MailCreator.java b/src/java/azkaban/executor/mail/MailCreator.java
new file mode 100644
index 0000000..d80afd4
--- /dev/null
+++ b/src/java/azkaban/executor/mail/MailCreator.java
@@ -0,0 +1,10 @@
+package azkaban.executor.mail;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.utils.EmailMessage;
+
+public interface MailCreator {
+	public boolean createFirstErrorMessage(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+	public boolean createErrorEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+	public boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message, String azkabanName, String clientHostname, String clientPortNumber, String... vars);
+}
diff --git a/src/java/azkaban/flow/Flow.java b/src/java/azkaban/flow/Flow.java
index 6c01df8..3846394 100644
--- a/src/java/azkaban/flow/Flow.java
+++ b/src/java/azkaban/flow/Flow.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import azkaban.executor.mail.DefaultMailCreator;
+
 public class Flow {
 	private final String id;
 	private int projectId;
@@ -40,8 +42,10 @@ public class Flow {
 
 	private List<String> failureEmail = new ArrayList<String>();
 	private List<String> successEmail = new ArrayList<String>();
+	private String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
 	private ArrayList<String> errors;
 	private int version = -1;
+	private Map<String, Object> metadata = new HashMap<String, Object>();
 	
 	private boolean isLayedOut = false;
 	
@@ -106,10 +110,18 @@ public class Flow {
 		return successEmail;
 	}
 	
+	public String getMailCreator() {
+		return mailCreator;
+	}
+	
 	public List<String> getFailureEmails() {
 		return failureEmail;
 	}
 	
+	public void setMailCreator(String mailCreator) {
+		this.mailCreator = mailCreator;
+	}
+	
 	public void addSuccessEmails(Collection<String> emails) {
 		successEmail.addAll(emails);
 	}
@@ -226,10 +238,15 @@ public class Flow {
 		flowObj.put("edges", objectizeEdges());
 		flowObj.put("failure.email", failureEmail);
 		flowObj.put("success.email", successEmail);
+		flowObj.put("mailCreator", mailCreator);
 		flowObj.put("layedout", isLayedOut);
 		if (errors != null) {
 			flowObj.put("errors", errors);
 		}
+
+		if (metadata != null) {
+			flowObj.put("metadata", metadata);
+		}
 		
 		return flowObj;
 	}
@@ -294,9 +311,18 @@ public class Flow {
 		List<Object> edgeList = (List<Object>)flowObject.get("edges");
 		List<Edge> edges = loadEdgeFromObjects(edgeList, nodes);
 		flow.addAllEdges(edges);
+
+		Map<String, Object> metadata = (Map<String, Object>)flowObject.get("metadata");
+		
+		if (metadata != null) {
+			flow.setMetadata(metadata);
+		}
 		
 		flow.failureEmail = (List<String>)flowObject.get("failure.email");
 		flow.successEmail = (List<String>)flowObject.get("success.email");
+		if (flowObject.containsKey("mailCreator")) {
+			flow.mailCreator = flowObject.get("mailCreator").toString();
+		}
 		return flow;
 	}
 
@@ -337,6 +363,17 @@ public class Flow {
 		return isLayedOut;
 	}
 
+	public Map<String, Object> getMetadata() {
+		if(metadata == null){
+			metadata = new HashMap<String, Object>();
+		}
+		return metadata;
+	}
+
+	public void setMetadata(Map<String, Object> metadata) {
+		this.metadata = metadata;
+	}
+
 	public void setLayedOut(boolean layedOut) {
 		this.isLayedOut = layedOut;
 	}
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 972b531..1adb0ad 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -681,6 +681,41 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 		}
 	}
 
+	@Override
+	public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException {
+		logger.info("Uploading flows");
+		Connection connection = getConnection();
+
+		try {
+			QueryRunner runner = new QueryRunner();
+			String json = JSONUtils.toJSON(flow.toObject());
+			byte[] stringData = json.getBytes("UTF-8");
+			byte[] data = stringData;
+
+			logger.info("UTF-8 size:" + data.length);
+			if (defaultEncodingType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+
+			logger.info("Flow upload " + flow.getId() + " is byte size " + data.length);
+			final String UPDATE_FLOW = "UPDATE project_flows SET encoding_type=?,json=? WHERE project_id=? AND version=? AND flow_id=?";
+			try {
+				runner.update(connection, UPDATE_FLOW, defaultEncodingType.getNumVal(), data, project.getId(), version, flow.getId());
+			} catch (SQLException e) {
+				e.printStackTrace();
+				throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
+			}
+			connection.commit();
+		} catch (IOException e) {
+			throw new ProjectManagerException("Flow Upload failed.", e);
+		} catch (SQLException e) {
+			throw new ProjectManagerException("Flow Upload failed commit.", e);
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+
 	public EncodingType getDefaultEncodingType() {
 		return defaultEncodingType;
 	}
diff --git a/src/java/azkaban/project/Project.java b/src/java/azkaban/project/Project.java
index d2e1f52..47ebc4b 100644
--- a/src/java/azkaban/project/Project.java
+++ b/src/java/azkaban/project/Project.java
@@ -45,6 +45,7 @@ public class Project {
 	private LinkedHashMap<String, Permission> groupPermissionMap = new LinkedHashMap<String, Permission>();
 	private Map<String, Flow> flows = null;
 	private HashSet<String> proxyUsers = new HashSet<String>();
+	private Map<String, Object> metadata = new HashMap<String, Object>();
 	
 	public Project(int id, String name) {
 		this.id = id;
@@ -67,6 +68,10 @@ public class Project {
 		return flows.get(flowId);
 	}
 	
+	public Map<String, Flow> getFlowMap() {
+		return flows;
+	}
+	
 	public List<Flow> getFlows() {
 		List<Flow> retFlow = null;
 		if (flows != null) {
@@ -218,6 +223,10 @@ public class Project {
 		userPermissionMap.remove(userId);
 	}
 	
+	public void clearUserPermission() {
+		userPermissionMap.clear();
+	}
+	
 	public long getCreateTimestamp() {
 		return createTimestamp;
 	}
@@ -251,6 +260,10 @@ public class Project {
 		if (source != null) {
 			projectObject.put("source", source);
 		}
+		
+		if (metadata != null) {
+			projectObject.put("metadata", metadata);
+		}
 
 		ArrayList<Map<String, Object>> users = new ArrayList<Map<String, Object>>();
 		for (Map.Entry<String, Permission> entry : userPermissionMap.entrySet()) {
@@ -282,6 +295,7 @@ public class Project {
 		Boolean active = (Boolean)projectObject.get("active");
 		active = active == null ? true : active;
 		int version = (Integer)projectObject.get("version");
+		Map<String, Object> metadata = (Map<String, Object>)projectObject.get("metadata");
 		
 		Project project = new Project(id, name);
 		project.setVersion(version);
@@ -294,6 +308,9 @@ public class Project {
 		if (source != null) {
 			project.setSource(source);
 		}
+		if (metadata != null) {
+			project.setMetadata(metadata);
+		}
 		
 		List<Map<String, Object>> users = (List<Map<String, Object>>) projectObject
 				.get("users");
@@ -403,6 +420,17 @@ public class Project {
 		this.source = source;
 	}
 
+	public Map<String, Object> getMetadata() {
+		if(metadata == null){
+			metadata = new HashMap<String, Object>();
+		}
+		return metadata;
+	}
+
+	protected void setMetadata(Map<String, Object> metadata) {
+		this.metadata = metadata;
+	}
+
 	public int getId() {
 		return id;
 	}
diff --git a/src/java/azkaban/project/ProjectLoader.java b/src/java/azkaban/project/ProjectLoader.java
index 7e99298..ea6e5df 100644
--- a/src/java/azkaban/project/ProjectLoader.java
+++ b/src/java/azkaban/project/ProjectLoader.java
@@ -122,6 +122,9 @@ public interface ProjectLoader {
 	 * @throws ProjectManagerException
 	 */
 	public void changeProjectVersion(Project project, int version, String user) throws ProjectManagerException;
+
+
+	public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException;
 	
 	/**
 	 * Uploads all computed flows
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index f610e40..a6e8177 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -352,6 +352,10 @@ public class ProjectManager {
 		projectLoader.cleanOlderProjectVersion(project.getId(), project.getVersion() - projectVersionRetention);
 	}
 	
+	public void updateFlow(Project project, Flow flow) throws ProjectManagerException {
+		projectLoader.updateFlow(project, flow.getVersion(), flow);
+	}
+	
 	private File unzipFile(File archiveFile) throws IOException {
 		ZipFile zipfile = new ZipFile(archiveFile);
 		File unzipped = Utils.createTempDir(tempDir);
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index c60f143..e964a8f 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -159,7 +159,6 @@ public class ScheduleManager {
 	 * @param id
 	 */
 	public synchronized void removeSchedule(Schedule sched) {
-
 		Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
 		Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
 		if(schedules != null) {
@@ -428,6 +427,8 @@ public class ScheduleManager {
 										flowOptions.setSuccessEmails(flow.getSuccessEmails());
 									}
 									
+									flowOptions.setMailCreator(flow.getMailCreator());
+									
 									try {
 										executorManager.submitExecutableFlow(exflow);
 										logger.info("Scheduler has invoked " + exflow.getExecutionId());
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index 06ca9bc..10f540c 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -149,15 +149,16 @@ public class EmailMessage {
 
 		if (_attachments.size() > 0) {
 			MimeMultipart multipart = new MimeMultipart("related");
+			
+			BodyPart messageBodyPart = new MimeBodyPart();
+			messageBodyPart.setContent(_body.toString(), _mimeType);
+			multipart.addBodyPart(messageBodyPart);
+			
 			// Add attachments
 			for (BodyPart part : _attachments) {
 				multipart.addBodyPart(part);
 			}
 
-			BodyPart messageBodyPart = new MimeBodyPart();
-			messageBodyPart.setContent(_body.toString(), _mimeType);
-			multipart.addBodyPart(messageBodyPart);
-
 			message.setContent(multipart);
 		} else {
 			message.setContent(_body.toString(), _mimeType);
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index fb4419f..ba3eeb0 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -116,6 +116,18 @@ public class Utils {
 		zOut.close();
 	}
 
+	public static void zipFolderContent(File folder, File output) throws IOException {
+		FileOutputStream out = new FileOutputStream(output);
+		ZipOutputStream zOut = new ZipOutputStream(out);
+		File[] files = folder.listFiles();
+		if (files != null) {
+			for (File f : files) {
+				zipFile("", f, zOut);
+			}
+		}
+		zOut.close();
+	}
+
 	private static void zipFile(String path, File input, ZipOutputStream zOut) throws IOException {
 		if (input.isDirectory()) {
 			File[] files = input.listFiles();
diff --git a/src/java/azkaban/utils/WebUtils.java b/src/java/azkaban/utils/WebUtils.java
index ef63fb6..c337e41 100644
--- a/src/java/azkaban/utils/WebUtils.java
+++ b/src/java/azkaban/utils/WebUtils.java
@@ -1,6 +1,7 @@
 package azkaban.utils;
 
 import java.text.NumberFormat;
+import java.util.Collection;
 
 import org.joda.time.DateTime;
 import org.joda.time.DurationFieldType;
@@ -154,6 +155,4 @@ public class WebUtils {
         else
             return sizeBytes + " B";
     }
-    
-    
 }
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 8810352..8e87919 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -63,7 +63,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	private String color;
 
 	private List<ViewerPlugin> viewerPlugins;
-	
+
 	/**
 	 * To retrieve the application for the servlet
 	 * 
@@ -92,7 +92,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 			viewerPlugins = server.getViewerPlugins();
 		}
 	}
-
+	
 	/**
 	 * Checks for the existance of the parameter in the request
 	 * 
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index a542a82..eb14851 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -760,6 +760,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		if (!options.isSuccessEmailsOverridden()) {
 			options.setSuccessEmails(flow.getSuccessEmails());
 		}
+		options.setMailCreator(flow.getMailCreator());
 		
 		try {
 			String message = executorManager.submitExecutableFlow(exflow);
diff --git a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
index 4e71930..7b80139 100644
--- a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
+++ b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
@@ -10,6 +10,7 @@ import javax.servlet.http.HttpServletRequest;
 
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.mail.DefaultMailCreator;
 
 public class HttpRequestUtils {
 	public static ExecutionOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
@@ -36,7 +37,7 @@ public class HttpRequestUtils {
 			boolean override = getBooleanParam(req, "successEmailsOverride", false);
 			execOptions.setSuccessEmailsOverridden(override);
 		}
-		
+
 		if (hasParam(req, "failureEmails")) {
 			String emails = getParam(req, "failureEmails");
 			if (!emails.isEmpty()) {
@@ -72,6 +73,11 @@ public class HttpRequestUtils {
 				execOptions.setPipelineLevel(queueLevel);
 			}
 		}
+		String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
+		if (hasParam(req, "mailCreator")) {
+			mailCreator = getParam(req, "mailCreator");
+			execOptions.setMailCreator(mailCreator);
+		}
 		
 		Map<String, String> flowParamGroup = getParamGroup(req, "flowOverride");
 		execOptions.setFlowParameters(flowParamGroup);
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 891e896..8ce912b 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -16,7 +16,11 @@
 
 package azkaban.webapp.servlet;
 
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Writer;
 import java.util.HashMap;
 import java.util.Map;
@@ -29,6 +33,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.fileupload.servlet.ServletFileUpload;
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
 import azkaban.project.Project;
@@ -51,6 +56,18 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 	private static final String SESSION_ID_NAME = "azkaban.browser.session.id";
 	private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
 	
+	private static HashMap<String, String> contextType = new HashMap<String,String>();
+	static {
+		contextType.put(".js", "application/javascript");
+		contextType.put(".css", "text/css");
+		contextType.put(".png", "image/png");
+		contextType.put(".jpeg", "image/jpeg");
+		contextType.put(".jpg", "image/jpeg");
+	}
+	
+	
+	private File webResourceDirectory = null;
+	
 	private MultipartParser multipartParser;
 	
 	@Override
@@ -60,6 +77,10 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 		multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
 	}
 	
+	public void setResourceDirectory(File file) {
+		this.webResourceDirectory = file;
+	}
+	
 	@Override
 	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		// Set session id
@@ -74,6 +95,10 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 
 		if (session != null) {
 			logger.info("Found session " + session.getUser());
+			if (handleFileGet(req, resp)) {
+				return;
+			}
+			
 			handleGet(req, resp, session);
 		} else {
 			if (hasParam(req, "ajax")) {
@@ -87,6 +112,46 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
 		}
 	}
 	
+	private boolean handleFileGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+		if (webResourceDirectory == null) {
+			return false;
+		}
+		
+		// Check if it's a resource
+		String prefix = req.getContextPath() + req.getServletPath();
+		String path = req.getRequestURI().substring(prefix.length());
+		int index = path.lastIndexOf('.');
+		if (index == -1 ) {
+			return false;
+		}
+		
+		String extension = path.substring(index);
+		if (contextType.containsKey(extension)) {
+			File file = new File(webResourceDirectory, path);
+			if (!file.exists() || !file.isFile()) {
+				return false;
+			}
+			
+			resp.setContentType(contextType.get(extension));
+			
+			OutputStream output = resp.getOutputStream();
+			BufferedInputStream input = null;
+			try {
+				input = new BufferedInputStream(new FileInputStream(file));
+				IOUtils.copy(input, output);
+			}
+			finally {
+				if (input != null) {
+					input.close();
+				}
+			}
+			output.flush();
+			return true;
+		}
+
+		return false;
+	}
+	
 	private Session getSessionFromRequest(HttpServletRequest req) throws ServletException {
 		String remoteIp = req.getRemoteAddr();
 		Cookie cookie = getCookieByName(req, SESSION_ID_NAME);
diff --git a/src/sql/create_schedule_table.sql b/src/sql/create_schedule_table.sql
index 32b7d86..061de26 100644
--- a/src/sql/create_schedule_table.sql
+++ b/src/sql/create_schedule_table.sql
@@ -13,6 +13,6 @@ CREATE TABLE schedules (
 	submit_user VARCHAR(128),
 	enc_type TINYINT,
 	schedule_options LONGBLOB,
-	PRIMARY KEY (schedule_id)
-	INDEX project_id (project_id, flow_name),
+	PRIMARY KEY (schedule_id),
+	INDEX project_id (project_id, flow_name)
 );
diff --git a/unit/java/azkaban/test/execapp/MockProjectLoader.java b/unit/java/azkaban/test/execapp/MockProjectLoader.java
index b10ea29..ef76de7 100644
--- a/unit/java/azkaban/test/execapp/MockProjectLoader.java
+++ b/unit/java/azkaban/test/execapp/MockProjectLoader.java
@@ -224,4 +224,10 @@ public class MockProjectLoader implements ProjectLoader {
 		// TODO Auto-generated method stub
 		
 	}
+
+	@Override
+	public void updateFlow(Project project, int version, Flow flow) throws ProjectManagerException {
+		// TODO Auto-generated method stub
+		
+	}
 }
\ No newline at end of file