azkaban-aplcache

Auto flow updater available.

8/20/2012 8:49:59 PM

Details

diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index b4879cd..58e507b 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -51,6 +51,7 @@ public class FlowRunnerManager {
 		flow.setExecutionPath(path);
 
 		FlowRunner runner = new FlowRunner(flow);
+		runningFlows.put(id, runner);
 		runner.addListener(eventListener);
 		executorService.submit(runner);
 	}
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index bafed41..64bea74 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -85,6 +85,15 @@ public class JobRunner extends EventHandler implements Runnable {
 		node.setStatus(Status.RUNNING);
 		this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
 
+		//Just for testing 5 sec each round.
+		synchronized(this) {
+			try {
+				wait(5000);
+			}
+			catch (InterruptedException e) {
+				
+			}
+		}
 		// Run Job
 		boolean succeeded = true;
 
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 4703822..4ce24fe 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -105,6 +105,9 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 			if (ajaxName.equals("fetchexecflow")) {
 				ajaxFetchExecutableFlow(req, resp, ret, session.getUser());
 			}
+			else if (ajaxName.equals("fetchexecflowupdate")) {
+				ajaxFetchExecutableFlowUpdate(req, resp, ret, session.getUser());
+			}
 		}
 		else {
 			String projectName = getParam(req, "project");
@@ -117,6 +120,56 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 		this.writeJSON(resp, ret);
 	}
 	
+	private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException{
+		String execid = getParam(req, "execid");
+		Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
+		
+		System.out.println("Fetching " + execid);
+		
+		ExecutableFlow exFlow = null;
+		try {
+			exFlow = executorManager.getExecutableFlow(execid);
+		} catch (ExecutorManagerException e) {
+			ret.put("error", "Error fetching execution '" + execid + "': " + e.getMessage());
+		}
+		if (exFlow == null) {
+			ret.put("error", "Cannot find execution '" + execid + "'");
+			return;
+		}
+		
+		Project project = null;
+		try {
+			project = projectManager.getProject(exFlow.getProjectId(), user);
+		}
+		catch (AccessControlException e) {
+			ret.put("error", "Permission denied. User " + user.getUserId() + " doesn't have permissions to view project " + project.getName());
+			return;
+		}
+		
+		
+		// Just update the nodes and flow states
+		ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
+		for (ExecutableNode node : exFlow.getExecutableNodes()) {
+			if (node.getStartTime() < lastUpdateTime && node.getEndTime() < lastUpdateTime) {
+				continue;
+			}
+			
+			HashMap<String, Object> nodeObj = new HashMap<String,Object>();
+			nodeObj.put("id", node.getId());
+			nodeObj.put("status", node.getStatus());
+			nodeObj.put("startTime", node.getStartTime());
+			nodeObj.put("endTime", node.getEndTime());
+			
+			nodeList.add(nodeObj);
+		}
+
+		ret.put("nodes", nodeList);
+		ret.put("status", exFlow.getStatus().toString());
+		ret.put("startTime", exFlow.getStartTime());
+		ret.put("endTime", exFlow.getEndTime());
+		ret.put("submitTime", exFlow.getSubmitTime());
+	}
+	
 	private void ajaxFetchExecutableFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
 		String execid = getParam(req, "execid");
 		System.out.println("Fetching " + execid);
@@ -163,6 +216,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 
 		ret.put("nodes", nodeList);
 		ret.put("edges", edgeList);
+		ret.put("status", exFlow.getStatus().toString());
 		ret.put("startTime", exFlow.getStartTime());
 		ret.put("endTime", exFlow.getEndTime());
 		ret.put("submitTime", exFlow.getSubmitTime());
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 8145cf1..02558c5 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1255,8 +1255,20 @@ svg .node:hover text {
 }
 
 svg .selected circle {
-	stroke: #777;
-	fill: #444;
+	stroke: #009FC9;
+	stroke-width: 4;
+}
+
+svg .READY circle {
+	fill: #CCC;
+}
+
+svg .FAILED circle {
+	fill: #CC0000;
+}
+
+svg .SUCCEEDED circle {
+	fill: #00CC33;
 }
 
 span.sublabel {
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 3b3e1e1..d05976e 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -1,5 +1,7 @@
 $.namespace('azkaban');
 
+var statusList = ["FAILED", "FAILED_FINISHING", "SUCCEEDED", "RUNNING", "WAITING", "KILLED", "DISABLED", "READY", "UNKNOWN"];
+
 var handleJobMenuClick = function(action, el, pos) {
 	var jobid = el[0].jobid;
 	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
@@ -67,8 +69,8 @@ azkaban.FlowTabView= Backbone.View.extend({
   	$("#graphViewLink").removeClass("selected");
   	$("#jobslistViewLink").addClass("selected");
   	
-  	 $("#graphView").hide();
-  	 $("#jobListView").show();
+  	$("#graphView").hide();
+  	$("#jobListView").show();
   }
 });
 
@@ -82,6 +84,7 @@ azkaban.JobListView = Backbone.View.extend({
 	initialize: function(settings) {
 		this.model.bind('change:selected', this.handleSelectionChange, this);
 		this.model.bind('change:graph', this.render, this);
+		this.model.bind('change:update', this.handleStatusUpdate, this);
 	},
 	filterJobs: function(self) {
 		var filter = $("#filter").val();
@@ -181,6 +184,7 @@ azkaban.JobListView = Backbone.View.extend({
 		}
 		
 		$("#list").append(ul);
+		this.assignInitialStatus(self);
 	},
 	handleJobClick : function(evt) {
 		var jobid = evt.currentTarget.jobid;
@@ -201,6 +205,21 @@ azkaban.JobListView = Backbone.View.extend({
 			this.model.set({"selected": jobid});
 		}
 	},
+	handleStatusUpdate: function(evt) {
+		var updateData = this.model.get("update");
+		for (var i = 0; i < updateData.nodes.length; ++i) {
+			var updateNode = updateData.nodes[i];
+			$(this.listNodes[updateNode.id]).addClass(updateNode.status);
+		}
+	},
+	assignInitialStatus: function(evt) {
+		var data = this.model.get("data");
+		for (var i = 0; i < data.nodes.length; ++i) {
+			var updateNode = data.nodes[i];
+			
+			$(this.listNodes[updateNode.id]).addClass(updateNode.status);
+		}
+	},
 	handleSelectionChange: function(evt) {
 		if (!this.model.hasChanged("selected")) {
 			return;
@@ -231,6 +250,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		this.model.bind('change:selected', this.changeSelected, this);
 		this.model.bind('change:graph', this.render, this);
 		this.model.bind('resetPanZoom', this.resetPanZoom, this);
+		this.model.bind('change:update', this.handleStatusUpdate, this);
 		
 		this.svgns = "http://www.w3.org/2000/svg";
 		this.xlinksn = "http://www.w3.org/1999/xlink";
@@ -301,9 +321,18 @@ azkaban.SvgGraphView = Backbone.View.extend({
 		bounds.maxX = bounds.maxX ? bounds.maxX + 200 : 200;
 		bounds.maxY = bounds.maxY ? bounds.maxY + 200 : 200;
 		
+		this.assignInitialStatus(self);
 		this.graphBounds = bounds;
 		this.resetPanZoom();
 	},
+	assignInitialStatus: function(evt) {
+		var data = this.model.get("data");
+		for (var i = 0; i < data.nodes.length; ++i) {
+			var updateNode = data.nodes[i];
+			var g = document.getElementById(updateNode.id);
+			addClass(g, updateNode.status);
+		}
+	},
 	changeSelected: function(self) {
 		console.log("change selected");
 		var selected = this.model.get("selected");
@@ -326,10 +355,23 @@ azkaban.SvgGraphView = Backbone.View.extend({
 			var x = node.x - offset;
 			var y = node.y - offset;
 			
-			
 			$("#svgGraph").svgNavigate("transformToBox", {x: x, y: y, width: widthHeight, height: widthHeight});
 		}
 	},
+	handleStatusUpdate: function(evt) {
+		var updateData = this.model.get("update");
+		for (var i = 0; i < updateData.nodes.length; ++i) {
+			var updateNode = updateData.nodes[i];
+			var g = document.getElementById(updateNode.id);
+			
+			for (var i = 0; i < statusList.length; ++i) {
+				var status = statusList[i];
+				removeClass(g, status);
+			}
+			
+			addClass(g, updateNode.status);
+		}
+	},
 	clickGraph: function(self) {
 		console.log("click");
 		if (self.currentTarget.jobid) {
@@ -446,6 +488,54 @@ azkaban.SvgGraphView = Backbone.View.extend({
 var graphModel;
 azkaban.GraphModel = Backbone.Model.extend({});
 
+var updateTime = -1;
+var updaterFunction = function() {
+	var requestURL = contextURL + "/executor";
+	var oldData = graphModel.get("data");
+	var nodeMap = graphModel.get("nodeMap");
+	var keepRunning = oldData.status != "SUCCEEDED" && oldData.status != "FAILED";
+	
+	if (keepRunning) {
+		$.get(
+	      requestURL,
+	      {"execid": execId, "ajax":"fetchexecflowupdate", "lastUpdateTime": updateTime},
+	      function(data) {
+	          console.log("data updated");
+	          updateTime = Math.max(updateTime, data.submitTime);
+	          updateTime = Math.max(updateTime, data.startTime);
+	          updateTime = Math.max(updateTime, data.endTime);
+	          oldData.submitTime = data.submitTime;
+	          oldData.startTime = data.startTime;
+	          oldData.endtime = data.endTime;
+	          
+	          for (var i = 0; i < data.nodes.length; ++i) {
+	          	var node = data.nodes[i];
+	          	updateTime = Math.max(updateTime, node.startTime);
+	          	updateTime = Math.max(updateTime, node.endTime);
+	          	var oldNode = nodeMap[node.id];
+	          	oldNode.startTime = node.startTime;
+	          	oldNode.endTime = node.endTime;
+	          	oldNode.status = node.status;
+	          }
+
+	          graphModel.set({"update": data});
+	      },
+	      "json"
+	    );
+		
+		var data = graphModel.get("data");
+		if (data.status != "SUCCEEDED" && data.status != "FAILED" ) {
+			setTimeout(function() {updaterFunction();}, 30000);
+		}
+		else {
+			console.log("Flow finished, so no more updates");
+		}
+	}
+	else {
+		console.log("Flow finished, so no more updates");
+	}
+}
+
 $(function() {
 	var selected;
 	
@@ -477,8 +567,23 @@ $(function() {
 	          console.log("data fetched");
 	          graphModel.set({data: data});
 	          graphModel.trigger("change:graph");
+	          
+	          updateTime = Math.max(updateTime, data.submitTime);
+	          updateTime = Math.max(updateTime, data.startTime);
+	          updateTime = Math.max(updateTime, data.endTime);
+	          
+	          var nodeMap = {};
+	          for (var i = 0; i < data.nodes.length; ++i) {
+	             var node = data.nodes[i];
+	             nodeMap[node.id] = node;
+	             updateTime = Math.max(updateTime, node.startTime);
+	             updateTime = Math.max(updateTime, node.endTime);
+	          }
+	          
+	          graphModel.set({nodeMap: nodeMap});
 	      },
 	      "json"
 	    );
 
+	setTimeout(function() {updaterFunction()}, 1000);
 });