azkaban-memoizeit

Details

diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 75845a5..06af3b7 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -1091,15 +1091,24 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	}
 	
 	@Override
-	public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException {
-		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+	public int getExecutableFlows(
+      int projectId, 
+      String flowId, 
+      int from, 
+      int length, 
+      List<ExecutableFlow> outputList) throws ExecutorManagerException {
+		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
+        projectId, flowId, from, length);
 		outputList.addAll(flows);
 		return executorLoader.fetchNumExecutableFlows(projectId, flowId);
 	}
 
 	@Override
-	public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
-		return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
+	public List<ExecutableFlow> getExecutableFlows(
+      int projectId, String flowId, int from, int length, Status status) 
+      throws ExecutorManagerException {
+		return executorLoader.fetchFlowHistory(
+        projectId, flowId, from, length, status);
 	}
 
 	/* 
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 6cc091b..f5e4b4f 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -131,6 +131,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				else if (ajaxName.equals("fetchExecJobSummary")) {
 					ajaxFetchJobSummary(req, resp, ret, session.getUser(), exFlow);
 				}
+        else if (ajaxName.equals("fetchExecJobStats")) {
+          ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
+        }
 				else if (ajaxName.equals("retryFailedJobs")) {
 					ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
 				}
@@ -497,6 +500,38 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			throw new ServletException(e);
 		}
 	}
+	
+  private void ajaxFetchJobStats(
+      HttpServletRequest req, 
+      HttpServletResponse resp, 
+      HashMap<String, Object> ret, 
+      User user, 
+      ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(
+        ret, exFlow.getProjectId(), user, Type.READ);
+		if (project == null) {
+			return;
+		}
+		
+		String jobId = this.getParam(req, "jobId");
+		resp.setCharacterEncoding("utf-8");
+		try {
+			ExecutableNode node = exFlow.getExecutableNode(jobId);
+			if (node == null) {
+				ret.put("error", "Job " + jobId + " doesn't exist in " + 
+            exFlow.getExecutionId());
+				return;
+			}
+	
+      // XXX
+      outputDir = props.getString("azkaban.stats.dir",
+				System.getProperty("java.io.tmpdir"));
+      
+		}
+    catch (ExecutorManagerException e) {
+			throw new ServletException(e);
+		}
+  }
 
 	private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
 		Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 662bad0..4fdcb5c 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -47,6 +47,7 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableJobInfo;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
 import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
@@ -248,6 +249,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 				ajaxFetchFlowExecutions(project, ret, req);
 			}
 		}
+		else if (ajaxName.equals("fetchLastSuccessfulFlowExecution")) {
+			if (handleAjaxPermission(project, user, Type.READ, ret)) {
+				ajaxFetchLastSuccessfulFlowExecution(project, ret, req);
+			}
+		}
 		else if (ajaxName.equals("fetchJobInfo")) {
 			if (handleAjaxPermission(project, user, Type.READ, ret)) {
 				ajaxFetchJobInfo(project, ret, req);
@@ -339,7 +345,34 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		}
   }
 
-	private void ajaxFetchFlowExecutions(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+  private void ajaxFetchLastSuccessfulFlowExecution(Project project,
+      HashMap<String, Object> ret, HttpServletRequest req)
+      throws ServletException {
+    String flowId = getParam(req, "flow");
+    List<ExecutableFlow> exFlows = null;
+    try {
+			exFlows = executorManager.getExecutableFlows(
+					project.getId(), flowId, 0, 1, Status.SUCCEEDED);
+		}
+		catch (ExecutorManagerException e) {
+			ret.put("error", "Error retrieving executable flows");
+			return;
+		}
+
+		if (exFlows.size() == 0) {
+			ret.put("success", "false");
+			ret.put("message", "This flow has no successful run.");
+			return;
+		}
+
+		ret.put("success", "true");
+		ret.put("message", "");
+		ret.put("execId", exFlows.get(0).getExecutionId());
+  }
+
+	private void ajaxFetchFlowExecutions(Project project, 
+      HashMap<String, Object> ret, HttpServletRequest req) 
+      throws ServletException {
 		String flowId = getParam(req, "flow");
 		int from = Integer.valueOf(getParam(req, "start"));
 		int length = Integer.valueOf(getParam(req, "length"));
@@ -347,8 +380,10 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		ArrayList<ExecutableFlow> exFlows = new ArrayList<ExecutableFlow>();
 		int total = 0;
 		try {
-			total = executorManager.getExecutableFlows(project.getId(), flowId, from, length, exFlows);
-		} catch (ExecutorManagerException e) {
+			total = executorManager.getExecutableFlows(
+					project.getId(), flowId, from, length, exFlows);
+		}
+    catch (ExecutorManagerException e) {
 			ret.put("error", "Error retrieving executable flows");
 		}
 		
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index b54b0f5..3f9beda 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -8,6 +8,8 @@ azkaban.jobtype.plugin.dir=plugins/jobtypes
 executor.global.properties=conf/global.properties
 azkaban.project.dir=projects
 
+azkaban.stats.dir=
+
 database.type=mysql
 mysql.port=3306
 mysql.host=localhost
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index 7524a14..d600ed9 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -20,6 +20,9 @@ database.type=h2
 h2.path=data/azkaban
 h2.create.tables=true
 
+# Stats
+azkaban.stats.dir=
+
 # Velocity dev mode
 velocity.dev.mode=false
 
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index 3ccb2f3..3fe43a0 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -22,6 +22,8 @@ mysql.user=azkaban
 mysql.password=azkaban
 mysql.numconnections=100
 
+azkaban.stats.dir=
+
 # Velocity dev mode
 velocity.dev.mode=false
 
diff --git a/src/tl/flowsummary.tl b/src/tl/flowsummary.tl
index 492f0ba..132e660 100644
--- a/src/tl/flowsummary.tl
+++ b/src/tl/flowsummary.tl
@@ -15,15 +15,18 @@
               </tr>
             </tbody>
           </table>
-					
+				
+          <h3>
+            Scheduling
+            {?schedule}
+            <div class="pull-right">
+              <button type="button" id="removeSchedBtn" class="btn btn-xs btn-danger" onclick="removeSched({schedule.scheduleId})" >Remove Schedule</button>
+            </div>
+            {/schedule}
+          </h3>
 					<div class="panel panel-default">
 						<div class="panel-heading">
 							Scheduling
-							{?schedule}
-							<div class="pull-right">
-								<button type="button" id="removeSchedBtn" class="btn btn-xs btn-danger" onclick="removeSched({schedule.scheduleId})" >Remove Schedule</button>
-							</div>
-							{/schedule}
 						</div>
 						{?schedule}
 						<table class="table table-condensed table-bordered">
diff --git a/src/tl/flowsummary-last-run.tl b/src/tl/flowsummary-last-run.tl
index fc3ba7f..5a38710 100644
--- a/src/tl/flowsummary-last-run.tl
+++ b/src/tl/flowsummary-last-run.tl
@@ -1,4 +1,21 @@
-					<div class="panel panel-default">
-						<div class="panel-heading">Last Run</div>
-            <div class="panel-body">Last run blerb</div>
+					<h4>Last Run Stats</h4>
+          <hr>
+          <div class="panel panel-default">
+						<div class="panel-heading">Resources</div>
+            <table class="table table-striped table-bordered table-condensed">
+              <tbody>
+                <tr>
+                  <td class="property-key">Max Map Slots</td>
+                  <td class="property-value-half">{stats.maxMapSlots}</td>
+                  <td class="property-key">Total Map Slots</td>
+                  <td class="property-value-half">{stats.totalMapSlots}</td>
+                </tr>
+                <tr>
+                  <td class="property-key">Max Reduce Slots</td>
+                  <td class="property-value-half">{stats.maxReduceSlots}</td>
+                  <td class="property-key">Total Reduce Slots</td>
+                  <td class="property-value-half">{stats.totalReduceSlots}</td>
+                </tr>
+              </tbody>
+            </table>
           </div>
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 462e903..41b677f 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -295,7 +295,7 @@ azkaban.ExecutionsView = Backbone.View.extend({
 var summaryView;
 azkaban.SummaryView = Backbone.View.extend({
 	events: {
-    'click #analyze-btn': 'analyzeLastRun'
+    'click #analyze-btn': 'fetchLastRun'
 	},
 	
 	initialize: function(settings) {
@@ -337,29 +337,102 @@ azkaban.SummaryView = Backbone.View.extend({
 		$.get(requestURL, requestData, successHandler, 'json');
 	},
 
-	analyzeLastRun: function() {
-		var requestURL = contextURL + "/executor";
+	fetchLastRun: function() {
+		var requestURL = contextURL + "/manager";
     var requestData = {
-      'ajax': 'fetchLastRunStats',
+      'ajax': 'fetchLastSuccessfulFlowExecution',
       'project': projectName,
       'flow': flowId
     };
     var view = this;
     var successHandler = function(data) {
-      data = {
-        success: false,
-        message: "No last run data available. This flow has not been run yet.",
-        warnings: {},
-        jobs: {}
-      };
-      view.renderLastRun(data);
+      if (data.success == false || data.execId == null) {
+        view.renderLastRun(data);
+        return;
+      }
+      view.analyzeLastRun(data.execId);
     };
     $.get(requestURL, requestData, successHandler, 'json');
 	},
 
+  fetchJobs: function(execId) {
+    var requestURL = contextURL + "/executor";
+    var requestData = {"execid": execId, "ajax":"fetchexecflow"};
+    var jobs = [];
+    var successHandler = function(data) {
+      for (var i = 0; i < data.nodes.length; ++i) {
+        var node = data.nodes[i];
+        jobs.push(node.id);
+      }
+    };
+    return jobs;
+  },
+
+  fetchJobStats: function(jobId) {
+    var requestURL = contextURL + "/executor";
+    var requestData = {
+      "execid": execId,
+      "flowid": flowId,
+      "jobid": jobId,
+      "ajax": "fetchExecJobStats"
+    };
+    var stats = null;
+    var successHandler = function(data) {
+      stats = data;
+    };
+    return stats;
+  },
+
+  updateStats: function(jobStats, data) {
+    var aggregateStats = data.stats;
+    var state = jobStats.state;
+    var conf = jobStats.conf;
+    if (state.numMaps > aggregateStats.maxMapSlots) {
+      aggregateStats.maxMapSlots = state.numMaps;
+    }
+    if (state.numReduces > aggregateStats.maxReduceSlots) {
+      aggregateStats.maxReduceSlots = state.numReduces;
+    }
+    aggregateStats.totalMapSlots += state.numMaps;
+    aggregateStats.totalReduceSlots += state.numReduces;
+  },
+
+  analyzeLastRun: function(execId) {
+    var jobs = this.fetchJobs(execId);
+    if (jobs == null) {
+      this.renderLastRun(null);
+      return;
+    }
+
+    var data = {
+      success: false,
+      message: null,
+      warnings: [],
+      stats: {
+        maxMapSlots: 0,
+        maxReduceSlots: 0,
+        totalMapSlots: 0,
+        totalReduceSlots: 0,
+        numJobs: jobs.length,
+        longestTaskTime: 0
+      }
+    };
+
+    for (var i = 0; i < jobs.length; ++i) {
+      var job = jobs[i];
+      var jobStats = this.fetchJobStats(job.id);
+      if (jobStats == null) {
+        data.warnings.push("No job stats available for job " + job.id);
+      }
+      this.updateStats(jobStats, data);
+    }
+    data.success = true;
+    this.renderLastRun(data);
+  },
+
   renderLastRun: function(data) {
     var view = this;
-    if (data == null || data.success == null || data.message == null) {
+    if (data == null) {
       var msg = { message: "Error retrieving last run data."};
       dust.render("flowsummary-no-data", msg, function(err, out) {
         view.displayLastRun(out);