azkaban-uncached
Details
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 9be4158..23379a9 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -521,6 +521,7 @@ public class FlowRunner extends EventHandler implements Runnable {
public void retryJobs(String[] jobIds, String user) {
synchronized(actionSyncObj) {
+ ArrayList<ExecutableNode> jobsToBeQueued = new ArrayList<ExecutableNode>();
for (String jobId: jobIds) {
if (runningJob.containsKey(jobId)) {
logger.error("Cannot retry job " + jobId + " since it's already running. User " + user);
@@ -542,7 +543,7 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.error("Cannot retry a job that hasn't finished. " + jobId);
}
- queueNextJob(node);
+ jobsToBeQueued.add(node);
}
}
@@ -558,6 +559,11 @@ public class FlowRunner extends EventHandler implements Runnable {
if (!isFailureFound) {
flow.setStatus(Status.RUNNING);
flow.setUpdateTime(System.currentTimeMillis());
+ flowFailed = false;
+ }
+
+ for (ExecutableNode node: jobsToBeQueued) {
+ queueNextJob(node);
}
updateFlow();
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index de2843a..ce88cb8 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import azkaban.executor.ExecutableNode.Attempt;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
@@ -380,6 +381,15 @@ public class ExecutableFlow {
updatedNodeMap.put("startTime", node.getStartTime());
updatedNodeMap.put("endTime", node.getEndTime());
updatedNodeMap.put("updateTime", node.getUpdateTime());
+ updatedNodeMap.put("attempt", node.getAttempt());
+
+ if (node.getAttempt() > 0) {
+ ArrayList<Map<String,Object>> pastAttempts = new ArrayList<Map<String,Object>>();
+ for (Attempt attempt: node.getPastAttemptList()) {
+ pastAttempts.add(attempt.toObject());
+ }
+ updatedNodeMap.put("pastAttempts", pastAttempts);
+ }
updatedNodes.add(updatedNodeMap);
}
@@ -389,8 +399,8 @@ public class ExecutableFlow {
return updateData;
}
+ @SuppressWarnings("unchecked")
public void applyUpdateObject(Map<String, Object> updateData) {
- @SuppressWarnings("unchecked")
List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get("nodes");
for (Map<String,Object> node: updatedNodes) {
String jobId = (String)node.get("jobId");
@@ -398,12 +408,22 @@ public class ExecutableFlow {
long startTime = JSONUtils.getLongFromObject(node.get("startTime"));
long endTime = JSONUtils.getLongFromObject(node.get("endTime"));
long updateTime = JSONUtils.getLongFromObject(node.get("updateTime"));
-
+
ExecutableNode exNode = executableNodes.get(jobId);
exNode.setEndTime(endTime);
exNode.setStartTime(startTime);
exNode.setUpdateTime(updateTime);
exNode.setStatus(status);
+
+ int attempt = 0;
+ if (node.containsKey("attempt")) {
+ attempt = (Integer)node.get("attempt");
+ if (attempt > 0) {
+ exNode.updatePastAttempts((List<Object>)node.get("pastAttempts"));
+ }
+ }
+
+ exNode.setAttempt(attempt);
}
this.flowStatus = Status.fromInteger((Integer)updateData.get("status"));
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 66526af..aaa6992 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -49,7 +49,7 @@ public class ExecutableNode {
// Used if proxy node
private Integer externalExecutionId;
- private List<Attempt> pastAttempts = null;
+ private ArrayList<Attempt> pastAttempts = null;
public ExecutableNode(Node node, ExecutableFlow flow) {
jobId = node.getId();
@@ -68,9 +68,12 @@ public class ExecutableNode {
public void resetForRetry() {
Attempt pastAttempt = new Attempt(attempt, startTime, endTime, status);
attempt++;
- if (pastAttempts == null) {
- pastAttempts = new ArrayList<Attempt>();
- pastAttempts.add(pastAttempt);
+
+ synchronized (this) {
+ if (pastAttempts == null) {
+ pastAttempts = new ArrayList<Attempt>();
+ pastAttempts.add(pastAttempt);
+ }
}
startTime = -1;
@@ -277,6 +280,41 @@ public class ExecutableNode {
this.paused = paused;
}
+ public List<Object> getAttemptObjects() {
+ ArrayList<Object> array = new ArrayList<Object>();
+
+ for (Attempt attempt: pastAttempts) {
+ array.add(attempt.toObject());
+ }
+
+ return array;
+ }
+
+
+ public void updatePastAttempts(List<Object> pastAttemptsList) {
+ if (pastAttemptsList == null) {
+ return;
+ }
+
+ synchronized (this) {
+ if (this.pastAttempts == null) {
+ this.pastAttempts = new ArrayList<Attempt>();
+ }
+
+ // We just check size because past attempts don't change
+ if (pastAttemptsList.size() <= this.pastAttempts.size()) {
+ return;
+ }
+
+ Object[] pastAttemptArray = pastAttemptsList.toArray();
+ for (int i = this.pastAttempts.size(); i < pastAttemptArray.length; ++i) {
+ Attempt attempt = Attempt.fromObject(pastAttemptArray[i]);
+ this.pastAttempts.add(attempt);
+ }
+ }
+
+ }
+
public static class Attempt {
private int attempt = 0;
private long startTime = -1;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 8bfc19f..c70d9f5 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -391,7 +391,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public void uploadExecutableNode(ExecutableNode node, Props inputProps) throws ExecutorManagerException {
- final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempts) VALUES (?,?,?,?,?,?,?,?,?,?)";
+ final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
byte[] inputParam = null;
if (inputProps != null) {
@@ -426,7 +426,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
@Override
public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
- final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND job_id=?";
+ final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND job_id=? AND attempt=?";
byte[] outputParam = null;
Props outputProps = node.getOutputProps();
@@ -448,7 +448,8 @@ public class JdbcExecutorLoader implements ExecutorLoader {
node.getStatus().getNumVal(),
outputParam,
node.getFlow().getExecutionId(),
- node.getJobId());
+ node.getJobId(),
+ node.getAttempt());
} catch (SQLException e) {
throw new ExecutorManagerException("Error updating job " + node.getJobId(), e);
}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index df9dff9..f275119 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -553,6 +553,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
nodeObj.put("status", node.getStatus());
nodeObj.put("startTime", node.getStartTime());
nodeObj.put("endTime", node.getEndTime());
+ nodeObj.put("attempt", node.getAttempt());
+
+ if (node.getAttempt() > 0) {
+ nodeObj.put("pastAttempts", node.getAttemptObjects());
+ }
nodeList.add(nodeObj);
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
index 60c39a1..5ee0f51 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
@@ -104,10 +104,14 @@
#foreach($job in $history)
<tr>
<td class="first">
+ #if ($job.attempt > 0)
+ <a href="${context}/executor?execid=${job.execId}">${job.execId}.${job.attempt}</a>
+ #else
<a href="${context}/executor?execid=${job.execId}">${job.execId}</a>
+ #end
</td>
<td>
- <a href="${context}/manager?project=${projectName}&flow=${job.flowId}&job=${jobid}">$jobid</a>
+ <a href="${context}/manager?project=${projectName}&flow=${job.flowId}&job=${jobid}">${jobid}</a>
</td>
<td>
<a href="${context}/manager?project=${projectName}&flow=${job.flowId}">${job.flowId}</a>
@@ -117,7 +121,7 @@
<td>$utils.formatDuration(${job.startTime}, ${job.endTime})</td>
<td><div class="status ${job.status}">$utils.formatStatus(${job.status})</div></td>
<td class="logLink">
- <a href="${context}/executor?execid=${job.execId}&job=${jobid}">Logs</a>
+ <a href="${context}/executor?execid=${job.execId}&job=${jobid}&attempt=${job.attempt}">Logs</a>
</td>
</tr>
#end
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index dc9547a..57ff096 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -233,7 +233,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
var successEmails = $('#successEmails').val();
var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
var notifyFailureLast = $('#notifyFailureLast').is(':checked');
- var executingJobOption = $('input:radio[name=gender]:checked').val();
+ //var executingJobOption = $("input[@name=concurrent]:checked").val();
var flowOverride = {};
var editRows = $(".editRow");
@@ -266,7 +266,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
successEmails: successEmails,
notifyFailureFirst: notifyFailureFirst,
notifyFailureLast: notifyFailureLast,
- executingJobOption: executingJobOption,
+// executingJobOption: executingJobOption,
flowOverride: flowOverride
};
src/web/js/azkaban.exflow.view.js 94(+88 -6)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index a32419b..cd6ff66 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -183,11 +183,28 @@ azkaban.FlowTabView= Backbone.View.extend({
var failedJobs = new Array();
var failedJobStr = "";
var nodes = graphData.nodes;
+
for (var i = 0; i < nodes.length; ++i) {
var node = nodes[i];
if(node.status=='FAILED') {
failedJobs.push(node.id);
- }
+ }
+ else if (node.status=='KILLED') {
+ // Nodes can be in a killed state, even if the parents have succeeded due to failure option Finish running
+ // We want to re-enable those.
+ var shouldAdd = true;
+ for(var key in node.in) {
+ var dependency = node.in[key];
+ if (dependency.status != 'SUCCEEDED' && dependency.status!='SKIPPED') {
+ shouldAdd = false;
+ break;
+ }
+ }
+
+ if (shouldAdd) {
+ failedJobs.push(node.id);
+ }
+ }
}
failedJobStr = failedJobs.join();
@@ -343,7 +360,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
var flowLastTime = data.endTime == -1 ? (new Date()).getTime() : data.endTime;
var flowStartTime = data.startTime;
-
+
var outerWidth = $(".outerProgress").css("width");
if (outerWidth) {
if (outerWidth.substring(outerWidth.length - 2, outerWidth.length) == "px") {
@@ -353,20 +370,60 @@ azkaban.ExecutionListView = Backbone.View.extend({
}
var nodes = data.nodes;
+
for (var i = 0; i < nodes.length; ++i) {
var node = nodes[i];
-
// calculate the progress
var diff = flowLastTime - flowStartTime;
var factor = outerWidth/diff;
- var left = Math.max((node.startTime-flowStartTime)*factor, 0);
+
+ var offsetLeft = 0;
+ var minOffset = 0;
+ // Add all the attempts
+ if (node.attempt > 0) {
+ var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id + "&attempt=" + node.attempt;
+ var aId = node.id + "-log-link";
+ $("#" + aId).attr("href", logURL);
+
+ /*
+ minOffset = 1;
+ var pastAttempts = node.pastAttempts;
+ for (var j = 0; j < pastAttempts.length; ++j) {
+ var past = pastAttempts[j];
+ var id = node.id + "-past-progress-" + j;
+
+ if ($("#" + id).length == 0) {
+ var attemptBox = document.createElement("div");
+ $(attemptBox).attr("id", id);
+ $(attemptBox).addClass("progressBox");
+ $("#" + node.id + "-outerprogressbar").append(attemptBox);
+ $(attemptBox).css("float","left");
+ }
+
+ var attemptBox = $("#" + id);
+
+ var absoluteLeft = Math.max((past.startTime-flowStartTime)*factor, 3);
+ var left = absoluteLeft - offsetLeft;
+ var width = Math.max((past.endTime - past.startTime)*factor, 1);
+
+ $(attemptBox).css("margin-left", left)
+ $(attemptBox).css("width", width);
+ $(attemptBox).addClass(past.status);
+ offsetLeft += left + width;
+ }
+ */
+ }
+
+ var absoluteLeft = Math.max((node.startTime-flowStartTime)*factor, minOffset);
+ var left = absoluteLeft - offsetLeft;
var nodeLastTime = node.endTime == -1 ? (new Date()).getTime() : node.endTime;
var width = Math.max((nodeLastTime - node.startTime)*factor, 3);
width = Math.min(width, outerWidth);
- $("#" + node.id + "-progressbar").css("margin-left", left)
- $("#" + node.id + "-progressbar").css("width", width);
+ var elem = $("#" + node.id + "-progressbar");
+ elem.css("margin-left", left)
+ elem.css("width", width);
}
},
addNodeRow: function(node) {
@@ -395,6 +452,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
$(tdStatus).attr("id", node.id + "-status");
var outerProgressBar = document.createElement("div");
+ $(outerProgressBar).attr("id", node.id + "-outerprogressbar");
$(outerProgressBar).addClass("outerProgress");
var progressBox = document.createElement("div");
@@ -416,8 +474,13 @@ azkaban.ExecutionListView = Backbone.View.extend({
tdStatus.appendChild(status);
var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id;
+ if (node.attempt) {
+ logURL += "&attempt=" + node.attempt;
+ }
+
var a = document.createElement("a");
$(a).attr("href", logURL);
+ $(a).attr("id", node.id + "-log-link");
$(a).text("Log");
$(tdLog).addClass("logLink");
$(tdLog).append(a);
@@ -496,6 +559,10 @@ var updateStatus = function() {
oldNode.updateTime = node.updateTime;
oldNode.endTime = node.endTime;
oldNode.status = node.status;
+ oldNode.attempt = node.attempt;
+ if (oldNode.attempt > 0) {
+ oldNode.pastAttempts = node.pastAttempts;
+ }
}
graphModel.set({"update": data});
@@ -575,6 +642,21 @@ $(function() {
updateTime = Math.max(updateTime, node.startTime);
updateTime = Math.max(updateTime, node.endTime);
}
+ for (var i = 0; i < data.edges.length; ++i) {
+ var edge = data.edges[i];
+
+ if (!nodeMap[edge.target].in) {
+ nodeMap[edge.target].in = {};
+ }
+ var targetInMap = nodeMap[edge.target].in;
+ targetInMap[edge.from] = nodeMap[edge.from];
+
+ if (!nodeMap[edge.from].out) {
+ nodeMap[edge.from].out = {};
+ }
+ var sourceOutMap = nodeMap[edge.from].out;
+ sourceOutMap[edge.target] = nodeMap[edge.target];
+ }
graphModel.set({nodeMap: nodeMap});
src/web/js/azkaban.flow.job.view.js 2(+1 -1)
diff --git a/src/web/js/azkaban.flow.job.view.js b/src/web/js/azkaban.flow.job.view.js
index 006fcf4..8071c60 100644
--- a/src/web/js/azkaban.flow.job.view.js
+++ b/src/web/js/azkaban.flow.job.view.js
@@ -76,6 +76,7 @@ azkaban.JobListView = Backbone.View.extend({
if (updateData.nodes) {
for (var i = 0; i < updateData.nodes.length; ++i) {
var updateNode = updateData.nodes[i];
+ $(this.listNodes[updateNode.id]).removeClass();
$(this.listNodes[updateNode.id]).addClass(updateNode.status);
}
}
@@ -84,7 +85,6 @@ azkaban.JobListView = Backbone.View.extend({
var data = this.model.get("data");
for (var i = 0; i < data.nodes.length; ++i) {
var updateNode = data.nodes[i];
-
$(this.listNodes[updateNode.id]).addClass(updateNode.status);
}
},