azkaban-aplcache
Details
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index b4879cd..58e507b 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -51,6 +51,7 @@ public class FlowRunnerManager {
flow.setExecutionPath(path);
FlowRunner runner = new FlowRunner(flow);
+ runningFlows.put(id, runner);
runner.addListener(eventListener);
executorService.submit(runner);
}
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index bafed41..64bea74 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -85,6 +85,15 @@ public class JobRunner extends EventHandler implements Runnable {
node.setStatus(Status.RUNNING);
this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
+ //Just for testing 5 sec each round.
+ synchronized(this) {
+ try {
+ wait(5000);
+ }
+ catch (InterruptedException e) {
+
+ }
+ }
// Run Job
boolean succeeded = true;
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 4703822..4ce24fe 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -105,6 +105,9 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
if (ajaxName.equals("fetchexecflow")) {
ajaxFetchExecutableFlow(req, resp, ret, session.getUser());
}
+ else if (ajaxName.equals("fetchexecflowupdate")) {
+ ajaxFetchExecutableFlowUpdate(req, resp, ret, session.getUser());
+ }
}
else {
String projectName = getParam(req, "project");
@@ -117,6 +120,56 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
this.writeJSON(resp, ret);
}
+ private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException{
+ String execid = getParam(req, "execid");
+ Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
+
+ System.out.println("Fetching " + execid);
+
+ ExecutableFlow exFlow = null;
+ try {
+ exFlow = executorManager.getExecutableFlow(execid);
+ } catch (ExecutorManagerException e) {
+ ret.put("error", "Error fetching execution '" + execid + "': " + e.getMessage());
+ }
+ if (exFlow == null) {
+ ret.put("error", "Cannot find execution '" + execid + "'");
+ return;
+ }
+
+ Project project = null;
+ try {
+ project = projectManager.getProject(exFlow.getProjectId(), user);
+ }
+ catch (AccessControlException e) {
+ ret.put("error", "Permission denied. User " + user.getUserId() + " doesn't have permissions to view project " + project.getName());
+ return;
+ }
+
+
+ // Just update the nodes and flow states
+ ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
+ for (ExecutableNode node : exFlow.getExecutableNodes()) {
+ if (node.getStartTime() < lastUpdateTime && node.getEndTime() < lastUpdateTime) {
+ continue;
+ }
+
+ HashMap<String, Object> nodeObj = new HashMap<String,Object>();
+ nodeObj.put("id", node.getId());
+ nodeObj.put("status", node.getStatus());
+ nodeObj.put("startTime", node.getStartTime());
+ nodeObj.put("endTime", node.getEndTime());
+
+ nodeList.add(nodeObj);
+ }
+
+ ret.put("nodes", nodeList);
+ ret.put("status", exFlow.getStatus().toString());
+ ret.put("startTime", exFlow.getStartTime());
+ ret.put("endTime", exFlow.getEndTime());
+ ret.put("submitTime", exFlow.getSubmitTime());
+ }
+
private void ajaxFetchExecutableFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
String execid = getParam(req, "execid");
System.out.println("Fetching " + execid);
@@ -163,6 +216,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
ret.put("nodes", nodeList);
ret.put("edges", edgeList);
+ ret.put("status", exFlow.getStatus().toString());
ret.put("startTime", exFlow.getStartTime());
ret.put("endTime", exFlow.getEndTime());
ret.put("submitTime", exFlow.getSubmitTime());
src/web/css/azkaban.css 16(+14 -2)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 8145cf1..02558c5 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1255,8 +1255,20 @@ svg .node:hover text {
}
svg .selected circle {
- stroke: #777;
- fill: #444;
+ stroke: #009FC9;
+ stroke-width: 4;
+}
+
+svg .READY circle {
+ fill: #CCC;
+}
+
+svg .FAILED circle {
+ fill: #CC0000;
+}
+
+svg .SUCCEEDED circle {
+ fill: #00CC33;
}
span.sublabel {
src/web/js/azkaban.exflow.view.js 111(+108 -3)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 3b3e1e1..d05976e 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -1,5 +1,7 @@
$.namespace('azkaban');
+var statusList = ["FAILED", "FAILED_FINISHING", "SUCCEEDED", "RUNNING", "WAITING", "KILLED", "DISABLED", "READY", "UNKNOWN"];
+
var handleJobMenuClick = function(action, el, pos) {
var jobid = el[0].jobid;
var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
@@ -67,8 +69,8 @@ azkaban.FlowTabView= Backbone.View.extend({
$("#graphViewLink").removeClass("selected");
$("#jobslistViewLink").addClass("selected");
- $("#graphView").hide();
- $("#jobListView").show();
+ $("#graphView").hide();
+ $("#jobListView").show();
}
});
@@ -82,6 +84,7 @@ azkaban.JobListView = Backbone.View.extend({
initialize: function(settings) {
this.model.bind('change:selected', this.handleSelectionChange, this);
this.model.bind('change:graph', this.render, this);
+ this.model.bind('change:update', this.handleStatusUpdate, this);
},
filterJobs: function(self) {
var filter = $("#filter").val();
@@ -181,6 +184,7 @@ azkaban.JobListView = Backbone.View.extend({
}
$("#list").append(ul);
+ this.assignInitialStatus(self);
},
handleJobClick : function(evt) {
var jobid = evt.currentTarget.jobid;
@@ -201,6 +205,21 @@ azkaban.JobListView = Backbone.View.extend({
this.model.set({"selected": jobid});
}
},
+ handleStatusUpdate: function(evt) {
+ var updateData = this.model.get("update");
+ for (var i = 0; i < updateData.nodes.length; ++i) {
+ var updateNode = updateData.nodes[i];
+ $(this.listNodes[updateNode.id]).addClass(updateNode.status);
+ }
+ },
+ assignInitialStatus: function(evt) {
+ var data = this.model.get("data");
+ for (var i = 0; i < data.nodes.length; ++i) {
+ var updateNode = data.nodes[i];
+
+ $(this.listNodes[updateNode.id]).addClass(updateNode.status);
+ }
+ },
handleSelectionChange: function(evt) {
if (!this.model.hasChanged("selected")) {
return;
@@ -231,6 +250,7 @@ azkaban.SvgGraphView = Backbone.View.extend({
this.model.bind('change:selected', this.changeSelected, this);
this.model.bind('change:graph', this.render, this);
this.model.bind('resetPanZoom', this.resetPanZoom, this);
+ this.model.bind('change:update', this.handleStatusUpdate, this);
this.svgns = "http://www.w3.org/2000/svg";
this.xlinksn = "http://www.w3.org/1999/xlink";
@@ -301,9 +321,18 @@ azkaban.SvgGraphView = Backbone.View.extend({
bounds.maxX = bounds.maxX ? bounds.maxX + 200 : 200;
bounds.maxY = bounds.maxY ? bounds.maxY + 200 : 200;
+ this.assignInitialStatus(self);
this.graphBounds = bounds;
this.resetPanZoom();
},
+ assignInitialStatus: function(evt) {
+ var data = this.model.get("data");
+ for (var i = 0; i < data.nodes.length; ++i) {
+ var updateNode = data.nodes[i];
+ var g = document.getElementById(updateNode.id);
+ addClass(g, updateNode.status);
+ }
+ },
changeSelected: function(self) {
console.log("change selected");
var selected = this.model.get("selected");
@@ -326,10 +355,23 @@ azkaban.SvgGraphView = Backbone.View.extend({
var x = node.x - offset;
var y = node.y - offset;
-
$("#svgGraph").svgNavigate("transformToBox", {x: x, y: y, width: widthHeight, height: widthHeight});
}
},
+ handleStatusUpdate: function(evt) {
+ var updateData = this.model.get("update");
+ for (var i = 0; i < updateData.nodes.length; ++i) {
+ var updateNode = updateData.nodes[i];
+ var g = document.getElementById(updateNode.id);
+
+ for (var i = 0; i < statusList.length; ++i) {
+ var status = statusList[i];
+ removeClass(g, status);
+ }
+
+ addClass(g, updateNode.status);
+ }
+ },
clickGraph: function(self) {
console.log("click");
if (self.currentTarget.jobid) {
@@ -446,6 +488,54 @@ azkaban.SvgGraphView = Backbone.View.extend({
var graphModel;
azkaban.GraphModel = Backbone.Model.extend({});
+var updateTime = -1;
+var updaterFunction = function() {
+ var requestURL = contextURL + "/executor";
+ var oldData = graphModel.get("data");
+ var nodeMap = graphModel.get("nodeMap");
+ var keepRunning = oldData.status != "SUCCEEDED" && oldData.status != "FAILED";
+
+ if (keepRunning) {
+ $.get(
+ requestURL,
+ {"execid": execId, "ajax":"fetchexecflowupdate", "lastUpdateTime": updateTime},
+ function(data) {
+ console.log("data updated");
+ updateTime = Math.max(updateTime, data.submitTime);
+ updateTime = Math.max(updateTime, data.startTime);
+ updateTime = Math.max(updateTime, data.endTime);
+ oldData.submitTime = data.submitTime;
+ oldData.startTime = data.startTime;
+ oldData.endtime = data.endTime;
+
+ for (var i = 0; i < data.nodes.length; ++i) {
+ var node = data.nodes[i];
+ updateTime = Math.max(updateTime, node.startTime);
+ updateTime = Math.max(updateTime, node.endTime);
+ var oldNode = nodeMap[node.id];
+ oldNode.startTime = node.startTime;
+ oldNode.endTime = node.endTime;
+ oldNode.status = node.status;
+ }
+
+ graphModel.set({"update": data});
+ },
+ "json"
+ );
+
+ var data = graphModel.get("data");
+ if (data.status != "SUCCEEDED" && data.status != "FAILED" ) {
+ setTimeout(function() {updaterFunction();}, 30000);
+ }
+ else {
+ console.log("Flow finished, so no more updates");
+ }
+ }
+ else {
+ console.log("Flow finished, so no more updates");
+ }
+}
+
$(function() {
var selected;
@@ -477,8 +567,23 @@ $(function() {
console.log("data fetched");
graphModel.set({data: data});
graphModel.trigger("change:graph");
+
+ updateTime = Math.max(updateTime, data.submitTime);
+ updateTime = Math.max(updateTime, data.startTime);
+ updateTime = Math.max(updateTime, data.endTime);
+
+ var nodeMap = {};
+ for (var i = 0; i < data.nodes.length; ++i) {
+ var node = data.nodes[i];
+ nodeMap[node.id] = node;
+ updateTime = Math.max(updateTime, node.startTime);
+ updateTime = Math.max(updateTime, node.endTime);
+ }
+
+ graphModel.set({nodeMap: nodeMap});
},
"json"
);
+ setTimeout(function() {updaterFunction()}, 1000);
});