azkaban-aplcache

Details

diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index e696a83..48984f1 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -43,7 +43,7 @@ executor.shared.token=abcdefg
 executor.flow.threads=30
 
 # mail settings
-mail.sender=azkaban-noreply@linkedin.com
-mail.host=mail.corp.linkedin.com
-job.failure.email=cyu@linkedin.com
-job.success.email=cyu@linkedin.com
+mail.sender=
+mail.host=
+job.failure.email=
+job.success.email=
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 53dcf34..102213b 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -25,6 +25,9 @@ public class ExecutableFlow {
 	private ArrayList<String> startNodes;
 	private ArrayList<String> endNodes;
 	
+	private ArrayList<String> failureEmails;
+	private ArrayList<String> successEmails;
+	
 	private long submitTime = -1;
 	private long startTime = -1;
 	private long endTime = -1;
@@ -33,9 +36,22 @@ public class ExecutableFlow {
 	private Status flowStatus = Status.UNKNOWN;
 	private String submitUser;
 	private boolean submitted = false;
+	private boolean notifyOnFirstFailure = true;
+	private boolean notifyOnLastFailure = false;
+	
+	private Integer pipelineLevel = null;
+	private Map<String, String> flowParameters = new HashMap<String, String>();
+	
+	public enum FailureAction {
+		FINISH_CURRENTLY_RUNNING,
+		CANCEL_ALL,
+		FINISH_ALL_POSSIBLE
+	}
+	
+	private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
 	
 	public enum Status {
-		FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN, PAUSED
+		FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN, PAUSED, SKIPPED
 	}
 	
 	public ExecutableFlow(String id, Flow flow) {
@@ -77,6 +93,14 @@ public class ExecutableFlow {
 		updateNumber = number;
 	}
 	
+	public void addFlowParameters(Map<String, String> param) {
+		flowParameters.putAll(param);
+	}
+	
+	public Map<String, String> getFlowParameters() {
+		return flowParameters;
+	}
+	
 	private void setFlow(Flow flow) {
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
@@ -92,6 +116,8 @@ public class ExecutableFlow {
 			targetNode.addInNode(edge.getSourceId());
 		}
 		
+		successEmails = new ArrayList<String>(flow.getSuccessEmails());
+		failureEmails = new ArrayList<String>(flow.getFailureEmails());
 		flowProps.putAll(flow.getAllFlowProps());
 	}
 
@@ -190,6 +216,14 @@ public class ExecutableFlow {
 		this.flowStatus = flowStatus;
 	}
 	
+	public void setFailureEmails(List<String> emails) {
+		this.failureEmails = emails == null ? new ArrayList<String>() : new ArrayList<String>(emails);
+	}
+	
+	public void setSuccessEmails(List<String> emails) {
+		this.successEmails = emails == null ? new ArrayList<String>() : new ArrayList<String>(emails);
+	}
+	
 	public Map<String,Object> toObject() {
 		HashMap<String, Object> flowObj = new HashMap<String, Object>();
 		flowObj.put("type", "executableflow");
@@ -202,6 +236,13 @@ public class ExecutableFlow {
 		flowObj.put("endTime", endTime);
 		flowObj.put("status", flowStatus.toString());
 		flowObj.put("submitUser", submitUser);
+		flowObj.put("flowParameters", this.flowParameters);
+		flowObj.put("notifyOnFirstFailure", this.notifyOnFirstFailure);
+		flowObj.put("notifyOnLastFailure", this.notifyOnLastFailure);
+		flowObj.put("successEmails", successEmails);
+		flowObj.put("failureEmails", failureEmails);
+		flowObj.put("failureAction", failureAction.toString());
+		flowObj.put("pipelineLevel", pipelineLevel);
 		
 		ArrayList<Object> props = new ArrayList<Object>();
 		for (FlowProps fprop: flowProps.values()) {
@@ -226,6 +267,10 @@ public class ExecutableFlow {
 		return flowObj;
 	}
 
+	public void setFailureAction(FailureAction action) {
+		failureAction = action;
+	}
+	
 	@SuppressWarnings("unchecked")
 	public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
 		ExecutableFlow exFlow = new ExecutableFlow();
@@ -240,7 +285,23 @@ public class ExecutableFlow {
 		exFlow.endTime = getLongFromObject(flowObj.get("endTime"));
 		exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
 		exFlow.submitUser = (String)flowObj.get("submitUser");
-				
+		exFlow.flowParameters = new HashMap<String, String>((Map<String,String>)flowObj.get("flowParameters"));
+		
+		// Failure notification
+		if (flowObj.containsKey("notifyOnFirstFailure")) {
+			exFlow.notifyOnFirstFailure = (Boolean)flowObj.get("notifyOnFirstFailure");
+		}
+		if (flowObj.containsKey("notifyOnLastFailure")) {
+			exFlow.notifyOnLastFailure = (Boolean)flowObj.get("notifyOnLastFailure");
+		}
+		
+		// Failure action
+		if (flowObj.containsKey("failureAction")) {
+			exFlow.failureAction = FailureAction.valueOf((String)flowObj.get("failureAction"));
+		}
+		exFlow.pipelineLevel = (Integer)flowObj.get("pipelineLevel");
+		
+		// Copy nodes
 		List<Object> nodes = (List<Object>)flowObj.get("nodes");
 		for (Object nodeObj: nodes) {
 			ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj, exFlow);
@@ -257,6 +318,11 @@ public class ExecutableFlow {
 			exFlow.flowProps.put(source, flowProps);
 		}
 		
+		// Success emails
+		exFlow.setSuccessEmails((List<String>)flowObj.get("successEmails"));
+		// Failure emails
+		exFlow.setFailureEmails((List<String>)flowObj.get("failureEmails"));
+		
 		return exFlow;
 	}
 	
@@ -318,6 +384,18 @@ public class ExecutableFlow {
 		this.submitted = submitted;
 	}
 
+	public void setPipelineLevel(int level) {
+		pipelineLevel = level;
+	}
+	
+	public void setNotifyOnFirstFailure(boolean notify) {
+		this.notifyOnFirstFailure = notify;
+	}
+	
+	public void setNotifyOnLastFailure(boolean notify) {
+		this.notifyOnLastFailure = notify;
+	}
+	
 	public static class ExecutableNode {
 		private String id;
 
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index e0865a3..10a9e94 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -43,8 +43,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private ExecutableFlow flow;
 	private ExecutorService executorService;
 	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
-	private List<JobRunner> pausedJobsToRun = Collections
-			.synchronizedList(new ArrayList<JobRunner>());
+	private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
 	private int numThreads = NUM_CONCURRENT_THREADS;
 	private boolean cancelled = false;
 	private boolean paused = false;
diff --git a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
index 733ee77..5bc9a1a 100644
--- a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
+++ b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
@@ -72,8 +72,6 @@ public class LocalFileScheduleLoader implements ScheduleLoader {
 	private File backupScheduleFile;
 	
 	public LocalFileScheduleLoader(Props props) throws IOException {
-			
-			
 		basePath = new File(props.getString("schedule.directory"));
 		if (!basePath.exists()) {
 			logger.info("Schedule directory " + basePath + " not found.");
diff --git a/src/java/azkaban/utils/Mailman.java b/src/java/azkaban/utils/Mailman.java
index 2553a21..1770009 100644
--- a/src/java/azkaban/utils/Mailman.java
+++ b/src/java/azkaban/utils/Mailman.java
@@ -63,29 +63,29 @@ public class Mailman {
 		Session session = Session.getInstance(props, null);
 		session.setDebug(true);
 		
-		//email message
-		Message msg = new MimeMessage(session);
-		msg.setFrom(new InternetAddress(fromAddress));
-		for(String str : toAddress) {
-			msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(str));
-		}
-		
-		msg.setSubject(subject);
-		msg.setText(body);
-		
-		//transport
-		SMTPTransport t = (SMTPTransport)session.getTransport(protocol);
-		
-		try {
-			t.connect(_mailHost, _mailUser, _mailPassword);
-			t.sendMessage(msg, msg.getAllRecipients());
-		}
-		catch (Exception e) {
-			logger.error(e);
-		}
-		finally {
-			t.close();
-		}
+//		//email message
+//		Message msg = new MimeMessage(session);
+//		msg.setFrom(new InternetAddress(fromAddress));
+//		for(String str : toAddress) {
+//			msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(str));
+//		}
+//		
+//		msg.setSubject(subject);
+//		msg.setText(body);
+//		
+//		//transport
+//		SMTPTransport t = (SMTPTransport)session.getTransport(protocol);
+//		
+//		try {
+//			t.connect(_mailHost, _mailUser, _mailPassword);
+//			t.sendMessage(msg, msg.getAllRecipients());
+//		}
+//		catch (Exception e) {
+//			logger.error(e);
+//		}
+//		finally {
+//			t.close();
+//		}
 	}
 
 	public void sendEmailIfPossible(String fromAddress, List<String> toAddress,
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 29745ef..68de060 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -4,6 +4,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -15,6 +16,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.FailureAction;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutorManagerException;
@@ -35,7 +37,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	private ProjectManager projectManager;
 	private ExecutorManager executorManager;
 	private ScheduleManager scheduleManager;
-	private static final int STRING_BUFFER_SIZE = 1024*5;
 
 	@Override
 	public void init(ServletConfig config) throws ServletException {
@@ -500,7 +501,45 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		// Create ExecutableFlow
 		ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
 		exflow.setSubmitUser(user.getUserId());
-		Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
+		if (hasParam(req, "failureAction")) {
+			String option = getParam(req, "failureAction");
+			if (option.equals("finishCurrent") ) {
+				exflow.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
+			}
+			else if (option.equals("cancelImmediately")) {
+				exflow.setFailureAction(FailureAction.CANCEL_ALL);
+			}
+			else if (option.equals("finishPossible")) {
+				exflow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+			}
+		}
+		if (hasParam(req, "failureEmails")) {
+			String emails = getParam(req, "failureEmails");
+			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			exflow.setFailureEmails(Arrays.asList(emailSplit));
+		}
+		if (hasParam(req, "successEmails")) {
+			String emails = getParam(req, "successEmails");
+			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			exflow.setSuccessEmails(Arrays.asList(emailSplit));
+		}
+		if (hasParam(req, "notifyFailureFirst")) {
+			exflow.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
+		}
+		if (hasParam(req, "notifyFailureLast")) {
+			exflow.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
+		}
+		if (hasParam(req, "executingJobOption")) {
+			String option = getParam(req, "jobOption");
+			// Not set yet
+		}
+		if (hasParam(req, "flowOverride")) {
+			Map<String, String> paramGroup = this.getParamGroup(req, "flowOverride");
+			exflow.addFlowParameters(paramGroup);
+		}
+		
+		// Setup disabled
+		Map<String, String> paramGroup = this.getParamGroup(req, "disable");
 		for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
 			boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
 			exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 68c06e0..ac2879d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -206,7 +206,7 @@
 		                				<dl>
 		                					<dt>Failure Action</dt>
 		                					<dd>
-		                						<select name="failureAction">
+		                						<select id="failureAction" name="failureAction">
 		                							<option value="finishCurrent">Finish Current Running</option>
 		                							<option value="cancelImmediately">Cancel All</option>
 		                							<option value="finishPossible">Finish All Possible</option>
@@ -218,14 +218,14 @@
 		                					</dd>
 		                					<dt>Notify on Failure</dt>
 		                					<dd>
-		                						<input class="checkbox" type="checkbox" name="notify" value="first" checked >First Failure</input>
-		                						<input class="checkbox" type="checkbox" name="notify" value="last">Flow Stop</input>
+		                						<input id="notifyFailureFirst" class="checkbox" type="checkbox" name="notify" value="first" checked >First Failure</input>
+		                						<input id="notifyFailureLast" class="checkbox" type="checkbox" name="notify" value="last">Flow Stop</input>
 		                					</dd>
 		                					<dt>Success Email</dt>
 		                					<dd>
 		                						<textarea id="successEmails"></textarea>
 		                					</dd>
-		                					<dt>Executing Job</dt>
+		                					<dt>Concurrent Execution</dt>
 		                					<dd id="executingJob">
 		                						<input id="ignore" class="radio" type="radio" name="concurrent" value="ignore" checked /><label class="radioLabel" for="ignore">Run Concurrently</label>
 		                						<input id="pipeline" class="radio" type="radio" name="concurrent" value="pipeline" /><label class="radioLabel" for="pipeline">Pipeline</label>
@@ -279,10 +279,10 @@
 		</ul>
 
 		<ul id="disableJobMenu" class="contextMenu flowSubmenu">  
-			<li class="open"><a href="#open">Open...</a></li>
 			<li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
 			<li id="disable" class="disable separator"><a href="#disable">Disable</a><div id="disableArrow" class="context-sub-icon"></div></li>
 			<ul id="disableSub" class="subMenu">
+				<li class="disableAll"><a href="#disableAll">All</a></li>
 				<li class="parents"><a href="#disableParents">Parents</a></li>
 				<li class="ancestors"><a href="#disableAncestors">All Ancestors</a></li>
 				<li class="children"><a href="#disableChildren">Children</a></li>
@@ -290,6 +290,7 @@
 			</ul>
 			<li id="enable" class="enable"><a href="#enable">Enable</a> <div id="enableArrow" class="context-sub-icon"></div></li>
 			<ul id="enableSub" class="subMenu">
+				<li class="enableAll"><a href="#enableAll">All</a></li>
 				<li class="parents"><a href="#enableParents">Parents</a></li>
 				<li class="ancestors"><a href="#enableAncestors">All Ancestors</a></li>
 				<li class="children"><a href="#enableChildren">Children</a></li>
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index a9a5034..6120c2c 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -52,7 +52,6 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
   	  events : {
   	  	"click" : "closeEditingTarget",
 	    "click #execute-btn": "handleExecuteFlow",
-	    "click #execute-custom-btn": "handleCustomFlow",
 	    "click #cancel-btn": "handleCancelExecution",
 	    "click .modal-close": "handleCancelExecution",
 	    "click #generalOptions": "handleGeneralOptionsSelect",
@@ -124,9 +123,44 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 	  },
 	  handleExecuteFlow: function(evt) {
 	  	var executeURL = contextURL + "/executor";
+	  	var disabled = this.cloneModel.get("disabled");
+	  	var failureAction = $('#failureAction').val();
+	  	var failureEmails = $('#failureEmails').val();
+	  	var successEmails = $('#successEmails').val();
+	  	var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
+	  	var notifyFailureLast = $('#notifyFailureLast').is(':checked');
+	  	var executingJobOption = $('input:radio[name=gender]:checked').val();
+	  	
+	  	var flowOverride = {};
+	  	var editRows = $(".editRow");
+		for (var i = 0; i < editRows.length; ++i) {
+			var row = editRows[i];
+			var td = $(row).find('td');
+			var key = $(td[0]).text();
+			var val = $(td[1]).text();
+			
+			if (key && key.length > 0) {
+				flowOverride[key] = val;
+			}
+		}
+	  	
+	  	var executingData = {
+	  		project: projectName,
+	  		ajax: "executeFlow",
+	  		flow: flowName,
+	  		disable: this.cloneModel.get('disabled'),
+	  		failureAction: failureAction,
+	  		failureEmails: failureEmails,
+	  		successEmails: successEmails,
+	  		notifyFailureFirst: notifyFailureFirst,
+	  		notifyFailureLast: notifyFailureLast,
+	  		executingJobOption: executingJobOption,
+	  		flowOverride: flowOverride
+	  	};
+	  	
 		$.get(
 			executeURL,
-			{"project": projectName, "ajax":"executeFlow", "flow":flowName, "disabled":this.cloneModel.get("disabled")},
+			executingData,
 			function(data) {
 				if (data.error) {
 					alert(data.error);
@@ -139,9 +173,6 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 			"json"
 		);
 	  },
-	  handleCustomFlow: function(evt) {
-	  	
-	  },
 	  handleAddRow: function(evt) {
 	  	var tr = document.createElement("tr");
 	  	var tdName = document.createElement("td");
@@ -162,6 +193,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 		$(tdValue).append(valueData);
 	    $(tdValue).addClass("editable");
 		
+		$(tr).addClass("editRow");
 	  	$(tr).append(tdName);
 	  	$(tr).append(tdValue);
 	   
@@ -192,8 +224,6 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 	  	var row = curTarget.parentElement.parentElement;
 		$(row).remove();
 	  },
-	  handleResetData : function(evt) {
-	  },
 	  closeEditingTarget: function(evt) {
 	  	if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
 	  		var input = $(this.editingTarget).children("input")[0];
@@ -215,9 +245,6 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 		    this.editingTarget = null;
 	  	}
 	  },
-	  addRowData : function(evt) {
-
-	  },
 	  handleDisableMenuClick : function(action, el, pos) {
 			var jobid = el[0].jobid;
 			var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
@@ -234,6 +261,17 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 				cloneModel.set({disabled: disabled});
 				cloneModel.trigger("change:disabled");
 			}
+			else if(action == "disableAll") {
+				var disabled = cloneModel.get("disabled");
+		
+				var nodes = cloneModel.get("nodes");
+				for (var key in nodes) {
+					disabled[key] = true;
+				}
+
+				cloneModel.set({disabled: disabled});
+				cloneModel.trigger("change:disabled");
+			}
 			else if (action == "disableParents") {
 				var disabled = cloneModel.get("disabled");
 				var nodes = cloneModel.get("nodes");
@@ -287,6 +325,11 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 				cloneModel.set({disabled: disabled});
 				cloneModel.trigger("change:disabled");
 			}
+			else if(action == "enableAll") {
+				disabled = {};
+				cloneModel.set({disabled: disabled});
+				cloneModel.trigger("change:disabled");
+			}
 			else if (action == "enableParents") {
 				var disabled = cloneModel.get("disabled");
 				var nodes = cloneModel.get("nodes");