azkaban-memoizeit
Changes
src/tl/flowsummary.tl 15(+9 -6)
src/tl/flowsummary-last-run.tl 23(+20 -3)
src/web/js/azkaban.flow.view.js 97(+85 -12)
Details
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 75845a5..06af3b7 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -1091,15 +1091,24 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
}
@Override
- public int getExecutableFlows(int projectId, String flowId, int from, int length, List<ExecutableFlow> outputList) throws ExecutorManagerException {
- List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(projectId, flowId, from, length);
+ public int getExecutableFlows(
+ int projectId,
+ String flowId,
+ int from,
+ int length,
+ List<ExecutableFlow> outputList) throws ExecutorManagerException {
+ List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
+ projectId, flowId, from, length);
outputList.addAll(flows);
return executorLoader.fetchNumExecutableFlows(projectId, flowId);
}
@Override
- public List<ExecutableFlow> getExecutableFlows(int projectId, String flowId, int from, int length, Status status) throws ExecutorManagerException {
- return executorLoader.fetchFlowHistory(projectId, flowId, from, length, status);
+ public List<ExecutableFlow> getExecutableFlows(
+ int projectId, String flowId, int from, int length, Status status)
+ throws ExecutorManagerException {
+ return executorLoader.fetchFlowHistory(
+ projectId, flowId, from, length, status);
}
/*
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 6cc091b..f5e4b4f 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -131,6 +131,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
else if (ajaxName.equals("fetchExecJobSummary")) {
ajaxFetchJobSummary(req, resp, ret, session.getUser(), exFlow);
}
+ else if (ajaxName.equals("fetchExecJobStats")) {
+ ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
+ }
else if (ajaxName.equals("retryFailedJobs")) {
ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
}
@@ -497,6 +500,38 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
throw new ServletException(e);
}
}
+
+ private void ajaxFetchJobStats(
+ HttpServletRequest req,
+ HttpServletResponse resp,
+ HashMap<String, Object> ret,
+ User user,
+ ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(
+ ret, exFlow.getProjectId(), user, Type.READ);
+ if (project == null) {
+ return;
+ }
+
+ String jobId = this.getParam(req, "jobId");
+ resp.setCharacterEncoding("utf-8");
+ try {
+ ExecutableNode node = exFlow.getExecutableNode(jobId);
+ if (node == null) {
+ ret.put("error", "Job " + jobId + " doesn't exist in " +
+ exFlow.getExecutionId());
+ return;
+ }
+
+ // XXX
+ outputDir = props.getString("azkaban.stats.dir",
+ System.getProperty("java.io.tmpdir"));
+
+ }
+ catch (ExecutorManagerException e) {
+ throw new ServletException(e);
+ }
+ }
private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 662bad0..4fdcb5c 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -47,6 +47,7 @@ import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableJobInfo;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
@@ -248,6 +249,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ajaxFetchFlowExecutions(project, ret, req);
}
}
+ else if (ajaxName.equals("fetchLastSuccessfulFlowExecution")) {
+ if (handleAjaxPermission(project, user, Type.READ, ret)) {
+ ajaxFetchLastSuccessfulFlowExecution(project, ret, req);
+ }
+ }
else if (ajaxName.equals("fetchJobInfo")) {
if (handleAjaxPermission(project, user, Type.READ, ret)) {
ajaxFetchJobInfo(project, ret, req);
@@ -339,7 +345,34 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxFetchFlowExecutions(Project project, HashMap<String, Object> ret, HttpServletRequest req) throws ServletException {
+ private void ajaxFetchLastSuccessfulFlowExecution(Project project,
+ HashMap<String, Object> ret, HttpServletRequest req)
+ throws ServletException {
+ String flowId = getParam(req, "flow");
+ List<ExecutableFlow> exFlows = null;
+ try {
+ exFlows = executorManager.getExecutableFlows(
+ project.getId(), flowId, 0, 1, Status.SUCCEEDED);
+ }
+ catch (ExecutorManagerException e) {
+ ret.put("error", "Error retrieving executable flows");
+ return;
+ }
+
+ if (exFlows.size() == 0) {
+ ret.put("success", "false");
+ ret.put("message", "This flow has no successful run.");
+ return;
+ }
+
+ ret.put("success", "true");
+ ret.put("message", "");
+ ret.put("execId", exFlows.get(0).getExecutionId());
+ }
+
+ private void ajaxFetchFlowExecutions(Project project,
+ HashMap<String, Object> ret, HttpServletRequest req)
+ throws ServletException {
String flowId = getParam(req, "flow");
int from = Integer.valueOf(getParam(req, "start"));
int length = Integer.valueOf(getParam(req, "length"));
@@ -347,8 +380,10 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ArrayList<ExecutableFlow> exFlows = new ArrayList<ExecutableFlow>();
int total = 0;
try {
- total = executorManager.getExecutableFlows(project.getId(), flowId, from, length, exFlows);
- } catch (ExecutorManagerException e) {
+ total = executorManager.getExecutableFlows(
+ project.getId(), flowId, from, length, exFlows);
+ }
+ catch (ExecutorManagerException e) {
ret.put("error", "Error retrieving executable flows");
}
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index b54b0f5..3f9beda 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -8,6 +8,8 @@ azkaban.jobtype.plugin.dir=plugins/jobtypes
executor.global.properties=conf/global.properties
azkaban.project.dir=projects
+azkaban.stats.dir=
+
database.type=mysql
mysql.port=3306
mysql.host=localhost
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index 7524a14..d600ed9 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -20,6 +20,9 @@ database.type=h2
h2.path=data/azkaban
h2.create.tables=true
+# Stats
+azkaban.stats.dir=
+
# Velocity dev mode
velocity.dev.mode=false
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index 3ccb2f3..3fe43a0 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -22,6 +22,8 @@ mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
+azkaban.stats.dir=
+
# Velocity dev mode
velocity.dev.mode=false
src/tl/flowsummary.tl 15(+9 -6)
diff --git a/src/tl/flowsummary.tl b/src/tl/flowsummary.tl
index 492f0ba..132e660 100644
--- a/src/tl/flowsummary.tl
+++ b/src/tl/flowsummary.tl
@@ -15,15 +15,18 @@
</tr>
</tbody>
</table>
-
+
+ <h3>
+ Scheduling
+ {?schedule}
+ <div class="pull-right">
+ <button type="button" id="removeSchedBtn" class="btn btn-xs btn-danger" onclick="removeSched({schedule.scheduleId})" >Remove Schedule</button>
+ </div>
+ {/schedule}
+ </h3>
<div class="panel panel-default">
<div class="panel-heading">
Scheduling
- {?schedule}
- <div class="pull-right">
- <button type="button" id="removeSchedBtn" class="btn btn-xs btn-danger" onclick="removeSched({schedule.scheduleId})" >Remove Schedule</button>
- </div>
- {/schedule}
</div>
{?schedule}
<table class="table table-condensed table-bordered">
src/tl/flowsummary-last-run.tl 23(+20 -3)
diff --git a/src/tl/flowsummary-last-run.tl b/src/tl/flowsummary-last-run.tl
index fc3ba7f..5a38710 100644
--- a/src/tl/flowsummary-last-run.tl
+++ b/src/tl/flowsummary-last-run.tl
@@ -1,4 +1,21 @@
- <div class="panel panel-default">
- <div class="panel-heading">Last Run</div>
- <div class="panel-body">Last run blerb</div>
+ <h4>Last Run Stats</h4>
+ <hr>
+ <div class="panel panel-default">
+ <div class="panel-heading">Resources</div>
+ <table class="table table-striped table-bordered table-condensed">
+ <tbody>
+ <tr>
+ <td class="property-key">Max Map Slots</td>
+ <td class="property-value-half">{stats.maxMapSlots}</td>
+ <td class="property-key">Total Map Slots</td>
+ <td class="property-value-half">{stats.totalMapSlots}</td>
+ </tr>
+ <tr>
+ <td class="property-key">Max Reduce Slots</td>
+ <td class="property-value-half">{stats.maxReduceSlots}</td>
+ <td class="property-key">Total Reduce Slots</td>
+ <td class="property-value-half">{stats.totalReduceSlots}</td>
+ </tr>
+ </tbody>
+ </table>
</div>
src/web/js/azkaban.flow.view.js 97(+85 -12)
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 462e903..41b677f 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -295,7 +295,7 @@ azkaban.ExecutionsView = Backbone.View.extend({
var summaryView;
azkaban.SummaryView = Backbone.View.extend({
events: {
- 'click #analyze-btn': 'analyzeLastRun'
+ 'click #analyze-btn': 'fetchLastRun'
},
initialize: function(settings) {
@@ -337,29 +337,102 @@ azkaban.SummaryView = Backbone.View.extend({
$.get(requestURL, requestData, successHandler, 'json');
},
- analyzeLastRun: function() {
- var requestURL = contextURL + "/executor";
+ fetchLastRun: function() {
+ var requestURL = contextURL + "/manager";
var requestData = {
- 'ajax': 'fetchLastRunStats',
+ 'ajax': 'fetchLastSuccessfulFlowExecution',
'project': projectName,
'flow': flowId
};
var view = this;
var successHandler = function(data) {
- data = {
- success: false,
- message: "No last run data available. This flow has not been run yet.",
- warnings: {},
- jobs: {}
- };
- view.renderLastRun(data);
+ if (data.success == false || data.execId == null) {
+ view.renderLastRun(data);
+ return;
+ }
+ view.analyzeLastRun(data.execId);
};
$.get(requestURL, requestData, successHandler, 'json');
},
+ fetchJobs: function(execId) {
+ var requestURL = contextURL + "/executor";
+ var requestData = {"execid": execId, "ajax":"fetchexecflow"};
+ var jobs = [];
+ var successHandler = function(data) {
+ for (var i = 0; i < data.nodes.length; ++i) {
+ var node = data.nodes[i];
+ jobs.push(node.id);
+ }
+ };
+ return jobs;
+ },
+
+ fetchJobStats: function(jobId) {
+ var requestURL = contextURL + "/executor";
+ var requestData = {
+ "execid": execId,
+ "flowid": flowId,
+ "jobid": jobId,
+ "ajax": "fetchExecJobStats"
+ };
+ var stats = null;
+ var successHandler = function(data) {
+ stats = data;
+ };
+ return stats;
+ },
+
+ updateStats: function(jobStats, data) {
+ var aggregateStats = data.stats;
+ var state = jobStats.state;
+ var conf = jobStats.conf;
+ if (state.numMaps > aggregateStats.maxMapSlots) {
+ aggregateStats.maxMapSlots = state.numMaps;
+ }
+ if (state.numReduces > aggregateStats.maxReduceSlots) {
+ aggregateStats.maxReduceSlots = state.numReduces;
+ }
+ aggregateStats.totalMapSlots += state.numMaps;
+ aggregateStats.totalReduceSlots += state.numReduces;
+ },
+
+ analyzeLastRun: function(execId) {
+ var jobs = this.fetchJobs(execId);
+ if (jobs == null) {
+ this.renderLastRun(null);
+ return;
+ }
+
+ var data = {
+ success: false,
+ message: null,
+ warnings: [],
+ stats: {
+ maxMapSlots: 0,
+ maxReduceSlots: 0,
+ totalMapSlots: 0,
+ totalReduceSlots: 0,
+ numJobs: jobs.length,
+ longestTaskTime: 0
+ }
+ };
+
+ for (var i = 0; i < jobs.length; ++i) {
+ var job = jobs[i];
+ var jobStats = this.fetchJobStats(job.id);
+ if (jobStats == null) {
+ data.warnings.push("No job stats available for job " + job.id);
+ }
+ this.updateStats(jobStats, data);
+ }
+ data.success = true;
+ this.renderLastRun(data);
+ },
+
renderLastRun: function(data) {
var view = this;
- if (data == null || data.success == null || data.message == null) {
+ if (data == null) {
var msg = { message: "Error retrieving last run data."};
dust.render("flowsummary-no-data", msg, function(err, out) {
view.displayLastRun(out);