azkaban-memoizeit

Merge branch 'release-2.1'

4/12/2013 3:57:36 PM

Details

diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index 495ce3c..3e9460f 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -18,6 +18,8 @@ public class ExecutionOptions {
 	
 	private boolean notifyOnFirstFailure = true;
 	private boolean notifyOnLastFailure = false;
+	private boolean failureEmailsOverride = false;
+	private boolean successEmailsOverride = false;
 	private ArrayList<String> failureEmails = new ArrayList<String>();
 	private ArrayList<String> successEmails = new ArrayList<String>();
 	
@@ -49,6 +51,22 @@ public class ExecutionOptions {
 		failureEmails.addAll(emails);
 	}
 	
+	public boolean isFailureEmailsOverridden() {
+		return this.failureEmailsOverride;
+	}
+	
+	public boolean isSuccessEmailsOverridden() {
+		return this.successEmailsOverride;
+	}
+
+	public void setSuccessEmailsOverridden(boolean override) {
+		this.successEmailsOverride = override;
+	}
+	
+	public void setFailureEmailsOverridden(boolean override) {
+		this.failureEmailsOverride = override;
+	}
+	
 	public List<String> getFailureEmails() {
 		return failureEmails;
 	}
@@ -135,7 +153,8 @@ public class ExecutionOptions {
 		flowOptionObj.put("queueLevel", queueLevel);
 		flowOptionObj.put("concurrentOption", concurrentOption);
 		flowOptionObj.put("disabled", initiallyDisabledJobs);
-		
+		flowOptionObj.put("failureEmailsOverride", failureEmailsOverride);
+		flowOptionObj.put("successEmailsOverride", successEmailsOverride);
 		return flowOptionObj;
 	}
 	
@@ -145,7 +164,7 @@ public class ExecutionOptions {
 			return null;
 		}
 		
-		Map<String,Object> optionsMap = new HashMap<String,Object>();
+		Map<String,Object> optionsMap = (Map<String,Object>)obj;
 		
 		ExecutionOptions options = new ExecutionOptions();
 		if (optionsMap.containsKey("flowParameters")) {
@@ -182,6 +201,14 @@ public class ExecutionOptions {
 			options.setFailureEmails((List<String>)optionsMap.get("failureEmails"));
 		}
 		
+		if (optionsMap.containsKey("successEmailsOverride")) {
+			options.setSuccessEmailsOverridden((Boolean)optionsMap.get("successEmailsOverride"));
+		}
+		
+		if (optionsMap.containsKey("failureEmailsOverride")) {
+			options.setFailureEmailsOverridden((Boolean)optionsMap.get("failureEmailsOverride"));
+		}
+		
 		return options;
 	}
 }
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index ff94590..b58c167 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -392,7 +392,7 @@ public class ExecutorManager {
 					message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
 				}
 				else if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
-					throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
+					throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.", ExecutorManagerException.Reason.SkippedExecution);
 				}
 				else {
 					// The settings is to run anyways.
diff --git a/src/java/azkaban/executor/ExecutorManagerException.java b/src/java/azkaban/executor/ExecutorManagerException.java
index 772fc4b..a5c264e 100644
--- a/src/java/azkaban/executor/ExecutorManagerException.java
+++ b/src/java/azkaban/executor/ExecutorManagerException.java
@@ -17,8 +17,13 @@
 package azkaban.executor;
 
 public class ExecutorManagerException extends Exception {
+	public enum Reason {
+		SkippedExecution
+	}
+	
 	private static final long serialVersionUID = 1L;
 	private ExecutableFlow flow = null;
+	private Reason reason = null;
 	
 	public ExecutorManagerException(Exception e) {
 		super(e);
@@ -33,6 +38,11 @@ public class ExecutorManagerException extends Exception {
 		this.flow = flow;
 	}
 	
+	public ExecutorManagerException(String message, Reason reason) {
+		super(message);
+		this.reason = reason;
+	}
+	
 	public ExecutorManagerException(String message, Throwable cause) {
 		super(message, cause);
 	}
@@ -40,4 +50,8 @@ public class ExecutorManagerException extends Exception {
 	public ExecutableFlow getExecutableFlow() {
 		return flow;
 	}
+	
+	public Reason getReason() {
+		return reason;
+	}
 }
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 2733b3e..fab237a 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -35,6 +35,7 @@ import org.joda.time.format.DateTimeFormatter;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
 
 import azkaban.flow.Flow;
 import azkaban.project.Project;
@@ -379,10 +380,21 @@ public class ScheduleManager {
 									}
 									exflow.setExecutionOptions(flowOptions);
 									
+									if (!flowOptions.isFailureEmailsOverridden()) {
+										flowOptions.setFailureEmails(flow.getFailureEmails());
+									}
+									if (!flowOptions.isSuccessEmailsOverridden()) {
+										flowOptions.setSuccessEmails(flow.getSuccessEmails());
+									}
+									
 									try {
 										executorManager.submitExecutableFlow(exflow);
 										logger.info("Scheduler has invoked " + exflow.getExecutionId());
-									} catch (Exception e) {	
+									} 
+									catch (ExecutorManagerException e) {
+										throw e;
+									}
+									catch (Exception e) {	
 										e.printStackTrace();
 										throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
 									}
@@ -406,7 +418,16 @@ public class ScheduleManager {
 										}
 									}
 									
-								} catch (Exception e) {
+								} 
+								catch (ExecutorManagerException e) {
+									if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
+										logger.info(e.getMessage());
+									}
+									else {
+										e.printStackTrace();
+									}
+								}
+								catch (Exception e) {
 									logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
 								}
 
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 80a3865..a9d5d17 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -552,6 +552,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		ret.put("notifyFailureFirst", options.getNotifyOnFirstFailure());
 		ret.put("notifyFailureLast", options.getNotifyOnLastFailure());
 		
+		ret.put("failureEmailsOverride", options.isFailureEmailsOverridden());
+		ret.put("successEmailsOverride", options.isSuccessEmailsOverridden());
+		
 		ret.put("concurrentOptions", options.getConcurrentOption());
 		ret.put("pipelineLevel", options.getPipelineLevel());
 		ret.put("pipelineExecution", options.getPipelineExecutionId());
@@ -764,6 +767,12 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 
 		ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);
 		exflow.setExecutionOptions(options);
+		if (!options.isFailureEmailsOverridden()) {
+			options.setFailureEmails(flow.getFailureEmails());
+		}
+		if (!options.isSuccessEmailsOverridden()) {
+			options.setSuccessEmails(flow.getSuccessEmails());
+		}
 		
 		try {
 			String message = executorManager.submitExecutableFlow(exflow);
diff --git a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
index fd29b4c..4e71930 100644
--- a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
+++ b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
@@ -28,6 +28,15 @@ public class HttpRequestUtils {
 			}
 		}
 
+		if (hasParam(req, "failureEmailsOverride")) {
+			boolean override = getBooleanParam(req, "failureEmailsOverride", false);
+			execOptions.setFailureEmailsOverridden(override);
+		}
+		if (hasParam(req, "successEmailsOverride")) {
+			boolean override = getBooleanParam(req, "successEmailsOverride", false);
+			execOptions.setSuccessEmailsOverridden(override);
+		}
+		
 		if (hasParam(req, "failureEmails")) {
 			String emails = getParam(req, "failureEmails");
 			if (!emails.isEmpty()) {
@@ -151,6 +160,23 @@ public class HttpRequestUtils {
 		return defaultVal;
 	}
 
+	public static boolean getBooleanParam(HttpServletRequest request, String name) throws ServletException  {
+		String p = getParam(request, name);
+		return Boolean.parseBoolean(p);
+	}
+	
+	public static boolean getBooleanParam(HttpServletRequest request, String name, boolean defaultVal) {
+		if (hasParam(request, name)) {
+			try {
+				return getBooleanParam(request, name);
+			} catch (Exception e) {
+				return defaultVal;
+			}
+		}
+		
+		return defaultVal;
+	}
+	
 	public static long getLongParam(HttpServletRequest request, String name) throws ServletException {
 		String p = getParam(request, name);
 		return Long.valueOf(p);
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm b/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
index 806465f..6d4ddcb 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
@@ -48,13 +48,22 @@
 					<input id="notifyFailureFirst" class="checkbox" type="checkbox" name="notify" value="first" checked /> <label for="notify">First Failure</label>
 					<input id="notifyFailureLast" class="checkbox" type="checkbox" name="notify" value="last"></input> <label for="notify">Flow Finished</label>
 					
+
 					<h4>Failure Emails</h4>
+					<div>
+						<input id="overrideFailureEmails" type="checkbox" name="overrideFailureEmails" value="overrideFailureEmails" />
+						<label for="overrideFailureEmails">Override flow email settings</label>
+					</div>
 					<p>Notify these addresses on failure. Comma, space or semi-colon delimited list.</p>
 					<textarea id="failureEmails"></textarea>
 				</div>
 			
 				<div>
 					<h4>Success Emails</h4>
+					<div>
+						<input id="overrideSuccessEmails" type="checkbox" name="overrideSuccessEmails" value="overrideSuccessEmails" />
+						<label for="overrideSuccessEmails">Override flow email settings</label>
+					</div>
 					<p>Notify when the flow finishes successfully. Comma, space or semi-colon delimited list.</p>
 					<textarea id="successEmails"></textarea>
 				</div>
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 8fca496..b187638 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1237,7 +1237,7 @@ tr:hover td {
 /* Graph SVG */
 #graphView {
 	position: absolute;
-	top: 210px;
+	top: 202px;
 	bottom: 5px;
 	left: 50px;
 	right: 50px;
diff --git a/src/web/js/azkaban.flow.execute.view.js b/src/web/js/azkaban.flow.execute.view.js
index 0258cbe..97609ae 100644
--- a/src/web/js/azkaban.flow.execute.view.js
+++ b/src/web/js/azkaban.flow.execute.view.js
@@ -31,6 +31,24 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
   },
   initialize : function(settings) {
   	  this.model.bind('change:flowinfo', this.changeFlowInfo, this);
+  	  
+  	  $("#overrideSuccessEmails").click(function(evt) {
+		 if($(this).is(':checked')){
+		 	$('#successEmails').attr('disabled',null);
+		 }
+		 else {
+		 	$('#successEmails').attr('disabled',"disabled");
+		 }
+  	  });
+  	  
+  	   $("#overrideFailureEmails").click(function(evt) {
+		 if($(this).is(':checked')){
+		 	$('#failureEmails').attr('disabled',null);
+		 }
+		 else {
+		 	$('#failureEmails').attr('disabled',"disabled");
+		 }
+  	  });
   },
   render: function() {
   },
@@ -40,7 +58,9 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
 	var successEmails = $('#successEmails').val();
 	var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
 	var notifyFailureLast = $('#notifyFailureLast').is(':checked');
-
+	var failureEmailsOverride = $("#overrideFailureEmails").is(':checked');
+	var successEmailsOverride = $("#overrideSuccessEmails").is(':checked');
+	
 	var flowOverride = {};
 	var editRows = $(".editRow");
 	for (var i = 0; i < editRows.length; ++i) {
@@ -68,6 +88,8 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
 		ajax: "executeFlow",
 		flow: this.flowId,
 		disabled: disabled,
+		failureEmailOverride:failureEmailsOverride,
+		successEmailOverride:successEmailsOverride,
 		failureAction: failureAction,
 		failureEmails: failureEmails,
 		successEmails: successEmails,
@@ -102,6 +124,21 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
   	var pipelineExecutionId = this.model.get("pipelineExecution");
   	var queueLevel = this.model.get("queueLevel");
   	var nodeStatus = this.model.get("nodeStatus");
+  	var overrideSuccessEmails = this.model.get("overrideSuccessEmails");
+  	var overrideFailureEmails = this.model.get("overrideFailureEmails");
+  	
+	if (overrideSuccessEmails) {
+		$('#overrideSuccessEmails').attr('checked', true);
+	}
+	else {
+		$('#successEmails').attr('disabled','disabled');
+	}
+	if (overrideFailureEmails) {
+		$('#overrideFailureEmails').attr('checked', true);
+	}
+	else {
+		$('#failureEmails').attr('disabled','disabled');
+	}
   	
   	if (successEmails) {
   		$('#successEmails').val(successEmails.join());