azkaban-uncached

Quick execute links done. Also allows concurrent running of

2/5/2013 4:35:38 AM

Details

diff --git a/src/java/azkaban/executor/ExecutionReference.java b/src/java/azkaban/executor/ExecutionReference.java
index 5b5eeb1..89c1d38 100644
--- a/src/java/azkaban/executor/ExecutionReference.java
+++ b/src/java/azkaban/executor/ExecutionReference.java
@@ -1,5 +1,7 @@
 package azkaban.executor;
 
+import java.util.HashMap;
+
 public class ExecutionReference {
 	private final int execId;
 	private final String host;
@@ -49,4 +51,4 @@ public class ExecutionReference {
 	public void setNumErrors(int numErrors) {
 		this.numErrors = numErrors;
 	}
-}
+ }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index d9b9685..fe807d3 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -71,6 +71,17 @@ public class ExecutorManager {
 		runningFlows.putAll(executorLoader.fetchActiveFlows());
 	}
 	
+	public List<Integer> getRunningFlows(int projectId, String flowId) {
+		ArrayList<Integer> executionIds = new ArrayList<Integer>();
+		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
+			if (ref.getSecond().getFlowId().equals(flowId)) {
+				executionIds.add(ref.getFirst().getExecId());
+			}
+		}
+		
+		return executionIds;
+	}
+	
 	public boolean isFlowRunning(int projectId, String flowId) {
 		for (Pair<ExecutionReference, ExecutableFlow> ref : runningFlows.values()) {
 			if (ref.getSecond().getFlowId().equals(flowId)) {
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index a8ddd59..581f5c1 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -31,6 +32,7 @@ import javax.servlet.http.HttpServletResponse;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutionReference;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutorManagerException;
@@ -270,10 +272,10 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				}
 			}
 		}
-		else if (ajaxName.equals("isRunning")) {
+		else if (ajaxName.equals("getRunning")) {
 			String projectName = getParam(req, "project");
 			String flowName = getParam(req, "flow");
-			ajaxIsFlowRunning(req, resp, ret, session.getUser(), projectName, flowName);
+			ajaxGetFlowRunning(req, resp, ret, session.getUser(), projectName, flowName);
 		}
 		else if (ajaxName.equals("flowInfo")) {
 			String projectName = getParam(req, "project");
@@ -454,13 +456,16 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
-	private void ajaxIsFlowRunning(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectId, String flowId) throws ServletException{
+	private void ajaxGetFlowRunning(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectId, String flowId) throws ServletException{
 		Project project = getProjectAjaxByPermission(ret, projectId, user, Type.EXECUTE);
 		if (project == null) {
 			return;
 		}
 		
-		ret.put("running", executorManager.isFlowRunning(project.getId(), flowId));
+		List<Integer> refs = executorManager.getRunningFlows(project.getId(), flowId);
+		if (!refs.isEmpty()) {
+			ret.put("execIds", refs);
+		}
 	}
 	
 	private void ajaxRestartFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
@@ -624,12 +629,36 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
 		exflow.addFlowParameters(flowParamGroup);
 		
-		// Setup disabled
-		Map<String, String> paramGroup = this.getParamGroup(req, "disable");
-		for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
-			boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
-
-			exflow.setNodeStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
+		if (hasParam(req, "job")) {
+			// Disable everything.
+			for(ExecutableNode node : exflow.getExecutableNodes()) {
+				node.setStatus(Status.DISABLED);
+			}
+			
+			String jobId = getParam(req, "job");
+			ExecutableNode job = exflow.getExecutableNode(jobId);
+			if (job == null) {
+				ret.put("error", "Job " + jobId + " doesn't exist in flow.");
+				return;
+			}
+			
+			job.setStatus(Status.READY);
+			
+			if (hasParam(req, "withDep")) {
+				boolean withDep = "true".equals(getParam(req, "withDep"));
+				if (withDep) {
+					enableAllAncestors(job, exflow);
+				}
+			}
+		}
+		else {
+			// Setup disabled
+			Map<String, String> paramGroup = this.getParamGroup(req, "disable");
+			for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
+				boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
+	
+				exflow.setNodeStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
+			}
 		}
 		
 		try {
@@ -643,6 +672,19 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		ret.put("execid", exflow.getExecutionId());
 	}
 	
+	private void enableAllAncestors(ExecutableNode node, ExecutableFlow flow) {
+		Set<String> inNodes = node.getInNodes();
+		if (inNodes != null) {
+			for (String inNode: inNodes) {
+				ExecutableNode job = flow.getExecutableNode(inNode);
+				if (job != null) {
+					job.setStatus(Status.READY);
+					enableAllAncestors(job, flow);
+				}
+			}
+		}
+	}
+	
 	public class ExecutorVMHelper {
 		public String getProjectName(int id) {
 			Project project = projectManager.getProject(id);
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
index 78c6763..f592314 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
@@ -24,6 +24,7 @@
 		<script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
 		<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
 		<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.project.view.js"></script>
 		<script type="text/javascript">
@@ -131,7 +132,7 @@
 					<fieldset>
 						<dl>
 							<dt>Job Archive</dt>
-							<dd><input id="file" name="file" class="file" type="file" onChange="changeFile()" /></dd>
+							<dd><input id="file" name="file" class="file" type="file" /></dd>
 							<input type="hidden" name="project" value="$project.name" />
 							<input type="hidden" name="action" value="upload" />
 						</dl>
@@ -168,11 +169,11 @@
 		</div>
 		<div id="flow-execute" class="modal">
 			<h3>Execute Flow</h3>
+			<div id="executeErrorMsg" class="box-error-message"></div>
 			<div id="execute-message" class="message">
 			</div>
 			<div class="actions">
 				<a class="yes btn1" id="execute-btn">Execute</a>
-				<a class="yes btn2" id="customize-btn">Customize Execution</a>
 				<a class="no simplemodal-close btn3">Cancel</a>
 			</div>
 		</div>
diff --git a/src/web/js/azkaban.ajax.utils.js b/src/web/js/azkaban.ajax.utils.js
index c087086..6290978 100644
--- a/src/web/js/azkaban.ajax.utils.js
+++ b/src/web/js/azkaban.ajax.utils.js
@@ -28,3 +28,62 @@ function ajaxCall(requestURL, data, callback) {
 		"json"
 	);
 }
+
+function executeFlow(executingData) {
+	executeURL = contextURL + "/executor";
+	$.get(
+		executeURL,
+		executingData,
+		function(data) {
+			if (data.error) {
+				alert(data.error);
+			}
+			else {
+				var redirectURL = contextURL + "/executor?execid=" + data.execid;
+				window.location.href = redirectURL;
+			}
+		},
+		"json"
+	);
+}
+
+/**
+* Checks to see if a flow is running.
+*
+*/
+function flowExecutingStatus(projectId, flowId) {
+	var requestURL = contextURL + "/executor";
+	
+	var executionIds;
+	$.ajax( {
+		url: requestURL,
+		async: false,
+		data: {"ajax":"getRunning", "project":projectId, "flow":flowId},
+		error: function(data) {},
+		success: function(data) {
+			if (data.error == "session") {
+				// We need to relogin.
+				var errorDialog = document.getElementById("invalid-session");
+				if (errorDialog) {
+					  $(errorDialog).modal({
+					      closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+					      position: ["20%",],
+					      containerId: 'confirm-container',
+					      containerCss: {
+					        'height': '220px',
+					        'width': '565px'
+					      },
+					      onClose: function (dialog) {
+					      	window.location.reload();
+					      }
+					    });
+				}
+			}
+			else {
+				executionIds = data.execIds;
+			}
+		}
+	});
+	
+	return executionIds;
+}
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index e6fdefb..4388010 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -262,20 +262,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 	  		flowOverride: flowOverride
 	  	};
 	  	
-		$.get(
-			executeURL,
-			executingData,
-			function(data) {
-				if (data.error) {
-					alert(data.error);
-				}
-				else {
-					var redirectURL = contextURL + "/executor?execid=" + data.execid;
-					window.location.href = redirectURL;
-				}
-			},
-			"json"
-		);
+		executeFlow(executingData);
 	  },
 	  handleAddRow: function(evt) {
 	  	var tr = document.createElement("tr");
diff --git a/src/web/js/azkaban.project.view.js b/src/web/js/azkaban.project.view.js
index a233824..8c849c1 100644
--- a/src/web/js/azkaban.project.view.js
+++ b/src/web/js/azkaban.project.view.js
@@ -259,24 +259,57 @@ azkaban.FlowTableView= Backbone.View.extend({
   	var jobId = evt.currentTarget.jobName;
   	var flowId = evt.currentTarget.flowId;
   	
-  	$("#execute-message").text("Run only job '" + jobId + "' in flow '" + flowId + "'.");
-  	this.executeFlowDialog();
+  	$("#execute-message").text("Execute only job '" + jobId + "' in flow '" + flowId + "'.");
+  	
+  	var executingData = {
+  		project: projectId,
+  		ajax: "executeFlow",
+  		flow: flowId,
+  		job: jobId
+	};
+  	
+  	this.executeFlowDialog(executingData);
   },
   runWithDep: function(evt) {
-   var jobId = evt.currentTarget.jobName;
+    var jobId = evt.currentTarget.jobName;
   	var flowId = evt.currentTarget.flowId;
     console.log("Run With Dep");
-    var jobId = evt.currentTarget.jobId;
-    $("#execute-message").text("Run job '" + jobId + "' and all of its ancestors in '" + flowId + "'.");
-    this.executeFlowDialog();
+    $("#execute-message").text("Execute job '" + jobId + "' and all of its ancestors in '" + flowId + "'.");
+    
+    var executingData = {
+  		project: projectId,
+  		ajax: "executeFlow",
+  		flow: flowId,
+  		job: jobId,
+  		withDep: true
+	};
+    
+    this.executeFlowDialog(executingData);
   },
   executeFlow: function(evt) {
     console.log("Execute Flow");
     var flowId = evt.currentTarget.flowId;
-    $("#execute-message").text("Executing the complete flow '" + flowId + "'.");
-    this.executeFlowDialog();
+    $("#execute-message").text("Execute the complete flow '" + flowId + "'.");
+    
+    var executingData = {
+  		project: projectId,
+  		ajax: "executeFlow",
+  		flow: flowId
+	};
+    
+    this.executeFlowDialog(executingData);
   },
-  executeFlowDialog: function(message) {
+  executeFlowDialog: function(executingData) {
+  	var flowId = executingData.flow;
+  	var executionIds = flowExecutingStatus(projectId, flowId);
+  	
+    if (executionIds && executionIds.length > 0) {
+    	$("#executeErrorMsg").text("Flow '" + flowId + "' is already running. Click on Execute to proceed anyways.");
+    }
+    else {
+    	$("#executeErrorMsg").hide();
+    }
+
  	$('#flow-execute').modal({
       closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
       position: ["20%",],
@@ -287,7 +320,9 @@ azkaban.FlowTableView= Backbone.View.extend({
       },
       onShow: function (dialog) {
         var modal = this;
-        $("#errorMsg").hide();
+        $('#execute-btn').click(function() {
+        	executeFlow(executingData);
+        });
       }
     });
   },