azkaban-uncached

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 9be4158..23379a9 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -521,6 +521,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	public void retryJobs(String[] jobIds, String user) {
 		synchronized(actionSyncObj) {
+			ArrayList<ExecutableNode> jobsToBeQueued = new ArrayList<ExecutableNode>();
 			for (String jobId: jobIds) {
 				if (runningJob.containsKey(jobId)) {
 					logger.error("Cannot retry job " + jobId + " since it's already running. User " + user);
@@ -542,7 +543,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 						logger.error("Cannot retry a job that hasn't finished. " + jobId);
 					}
 					
-					queueNextJob(node);
+					jobsToBeQueued.add(node);
 				}
 			}
 			
@@ -558,6 +559,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (!isFailureFound) {
 				flow.setStatus(Status.RUNNING);
 				flow.setUpdateTime(System.currentTimeMillis());
+				flowFailed = false;
+			}
+			
+			for (ExecutableNode node: jobsToBeQueued) {
+				queueNextJob(node);
 			}
 			
 			updateFlow();
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index de2843a..ce88cb8 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import azkaban.executor.ExecutableNode.Attempt;
 import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
@@ -380,6 +381,15 @@ public class ExecutableFlow {
 				updatedNodeMap.put("startTime", node.getStartTime());
 				updatedNodeMap.put("endTime", node.getEndTime());
 				updatedNodeMap.put("updateTime", node.getUpdateTime());
+				updatedNodeMap.put("attempt", node.getAttempt());
+				
+				if (node.getAttempt() > 0) {
+					ArrayList<Map<String,Object>> pastAttempts = new ArrayList<Map<String,Object>>();
+					for (Attempt attempt: node.getPastAttemptList()) {
+						pastAttempts.add(attempt.toObject());
+					}
+					updatedNodeMap.put("pastAttempts", pastAttempts);
+				}
 				
 				updatedNodes.add(updatedNodeMap);
 			}
@@ -389,8 +399,8 @@ public class ExecutableFlow {
 		return updateData;
 	}
 	
+	@SuppressWarnings("unchecked")
 	public void applyUpdateObject(Map<String, Object> updateData) {
-		@SuppressWarnings("unchecked")
 		List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get("nodes");
 		for (Map<String,Object> node: updatedNodes) {
 			String jobId = (String)node.get("jobId");
@@ -398,12 +408,22 @@ public class ExecutableFlow {
 			long startTime = JSONUtils.getLongFromObject(node.get("startTime"));
 			long endTime = JSONUtils.getLongFromObject(node.get("endTime"));
 			long updateTime = JSONUtils.getLongFromObject(node.get("updateTime"));
-
+			
 			ExecutableNode exNode = executableNodes.get(jobId);
 			exNode.setEndTime(endTime);
 			exNode.setStartTime(startTime);
 			exNode.setUpdateTime(updateTime);
 			exNode.setStatus(status);
+			
+			int attempt = 0;
+			if (node.containsKey("attempt")) {
+				attempt = (Integer)node.get("attempt");
+				if (attempt > 0) {
+					exNode.updatePastAttempts((List<Object>)node.get("pastAttempts"));
+				}
+			}
+			
+			exNode.setAttempt(attempt);
 		}
 		
 		this.flowStatus = Status.fromInteger((Integer)updateData.get("status"));
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 66526af..aaa6992 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -49,7 +49,7 @@ public class ExecutableNode {
 	
 	// Used if proxy node
 	private Integer externalExecutionId;
-	private List<Attempt> pastAttempts = null;
+	private ArrayList<Attempt> pastAttempts = null;
 	
 	public ExecutableNode(Node node, ExecutableFlow flow) {
 		jobId = node.getId();
@@ -68,9 +68,12 @@ public class ExecutableNode {
 	public void resetForRetry() {
 		Attempt pastAttempt = new Attempt(attempt, startTime, endTime, status);
 		attempt++;
-		if (pastAttempts == null) {
-			pastAttempts = new ArrayList<Attempt>();
-			pastAttempts.add(pastAttempt);
+		
+		synchronized (this) {
+			if (pastAttempts == null) {
+				pastAttempts = new ArrayList<Attempt>();
+				pastAttempts.add(pastAttempt);
+			}
 		}
 		
 		startTime = -1;
@@ -277,6 +280,41 @@ public class ExecutableNode {
 		this.paused = paused;
 	}
 	
+	public List<Object> getAttemptObjects() {
+		ArrayList<Object> array = new ArrayList<Object>();
+		
+		for (Attempt attempt: pastAttempts) {
+			array.add(attempt.toObject());
+		}
+		
+		return array;
+	}
+	
+	
+	public void updatePastAttempts(List<Object> pastAttemptsList) {
+		if (pastAttemptsList == null) {
+			return;
+		}
+		
+		synchronized (this) {
+			if (this.pastAttempts == null) {
+				this.pastAttempts = new ArrayList<Attempt>();
+			}
+
+			// We just check size because past attempts don't change
+			if (pastAttemptsList.size() <= this.pastAttempts.size()) {
+				return;
+			}
+
+			Object[] pastAttemptArray = pastAttemptsList.toArray();
+			for (int i = this.pastAttempts.size(); i < pastAttemptArray.length; ++i) {
+				Attempt attempt = Attempt.fromObject(pastAttemptArray[i]);
+				this.pastAttempts.add(attempt);
+			}
+		}
+
+	}
+
 	public static class Attempt {
 		private int attempt = 0;
 		private long startTime = -1;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 8bfc19f..c70d9f5 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -391,7 +391,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 
 	@Override
 	public void uploadExecutableNode(ExecutableNode node, Props inputProps) throws ExecutorManagerException {
-		final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempts) VALUES (?,?,?,?,?,?,?,?,?,?)";
+		final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
 		
 		byte[] inputParam = null;
 		if (inputProps != null) {
@@ -426,7 +426,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	
 	@Override
 	public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
-		final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND job_id=?";
+		final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND job_id=? AND attempt=?";
 		
 		byte[] outputParam = null;
 		Props outputProps = node.getOutputProps();
@@ -448,7 +448,8 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 					node.getStatus().getNumVal(), 
 					outputParam,
 					node.getFlow().getExecutionId(),
-					node.getJobId());
+					node.getJobId(),
+					node.getAttempt());
 		} catch (SQLException e) {
 			throw new ExecutorManagerException("Error updating job " + node.getJobId(), e);
 		}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index df9dff9..f275119 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -553,6 +553,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			nodeObj.put("status", node.getStatus());
 			nodeObj.put("startTime", node.getStartTime());
 			nodeObj.put("endTime", node.getEndTime());
+			nodeObj.put("attempt", node.getAttempt());
+			
+			if (node.getAttempt() > 0) {
+				nodeObj.put("pastAttempts", node.getAttemptObjects());
+			}
 			
 			nodeList.add(nodeObj);
 		}
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
index 60c39a1..5ee0f51 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
@@ -104,10 +104,14 @@
 	#foreach($job in $history)
 						<tr>
 							<td class="first">
+								#if ($job.attempt > 0)
+								<a href="${context}/executor?execid=${job.execId}">${job.execId}.${job.attempt}</a>
+								#else
 								<a href="${context}/executor?execid=${job.execId}">${job.execId}</a>
+								#end
 							</td>
 							<td>
-								<a href="${context}/manager?project=${projectName}&flow=${job.flowId}&job=${jobid}">$jobid</a>
+								<a href="${context}/manager?project=${projectName}&flow=${job.flowId}&job=${jobid}">${jobid}</a>
 							</td>
 							<td>
 								<a href="${context}/manager?project=${projectName}&flow=${job.flowId}">${job.flowId}</a>
@@ -117,7 +121,7 @@
 							<td>$utils.formatDuration(${job.startTime}, ${job.endTime})</td>
 							<td><div class="status ${job.status}">$utils.formatStatus(${job.status})</div></td>
 							<td class="logLink">
-								<a href="${context}/executor?execid=${job.execId}&job=${jobid}">Logs</a>
+								<a href="${context}/executor?execid=${job.execId}&job=${jobid}&attempt=${job.attempt}">Logs</a>
 							</td>
 						</tr>
 	#end
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index dc9547a..57ff096 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -233,7 +233,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 	  	var successEmails = $('#successEmails').val();
 	  	var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
 	  	var notifyFailureLast = $('#notifyFailureLast').is(':checked');
-	  	var executingJobOption = $('input:radio[name=gender]:checked').val();
+	  	//var executingJobOption = $("input[@name=concurrent]:checked").val();
 	  	
 	  	var flowOverride = {};
 	  	var editRows = $(".editRow");
@@ -266,7 +266,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 	  		successEmails: successEmails,
 	  		notifyFailureFirst: notifyFailureFirst,
 	  		notifyFailureLast: notifyFailureLast,
-	  		executingJobOption: executingJobOption,
+//	  		executingJobOption: executingJobOption,
 	  		flowOverride: flowOverride
 	  	};
 	  	
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index a32419b..cd6ff66 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -183,11 +183,28 @@ azkaban.FlowTabView= Backbone.View.extend({
   	  var failedJobs = new Array();
   	  var failedJobStr = "";
   	  var nodes = graphData.nodes;
+	  
   	  for (var i = 0; i < nodes.length; ++i) {
 		var node = nodes[i];
 		if(node.status=='FAILED') {
 			failedJobs.push(node.id);
-		} 
+		}
+		else if (node.status=='KILLED') {
+			// Nodes can be in a killed state, even if the parents have succeeded due to failure option Finish running
+			// We want to re-enable those.
+			var shouldAdd = true;
+			for(var key in node.in) {
+				var dependency = node.in[key];
+				if (dependency.status != 'SUCCEEDED' && dependency.status!='SKIPPED') {
+					shouldAdd = false;
+					break;
+				}
+			}
+			
+			if (shouldAdd) {
+				failedJobs.push(node.id);
+			}
+		}
   	  }
   	  failedJobStr = failedJobs.join();
   
@@ -343,7 +360,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		
 		var flowLastTime = data.endTime == -1 ? (new Date()).getTime() : data.endTime;
 		var flowStartTime = data.startTime;
-		
+
 		var outerWidth = $(".outerProgress").css("width");
 		if (outerWidth) {
 			if (outerWidth.substring(outerWidth.length - 2, outerWidth.length) == "px") {
@@ -353,20 +370,60 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		}
 		
 		var nodes = data.nodes;
+		
 		for (var i = 0; i < nodes.length; ++i) {
 			var node = nodes[i];
-		
 			// calculate the progress
 			var diff = flowLastTime - flowStartTime;
 			
 			var factor = outerWidth/diff;
-			var left = Math.max((node.startTime-flowStartTime)*factor, 0);
+
+			var offsetLeft = 0;
+			var minOffset = 0;
+			// Add all the attempts
+			if (node.attempt > 0) {
+				var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id + "&attempt=" + node.attempt;
+				var aId = node.id + "-log-link";
+				$("#" + aId).attr("href", logURL);
+				
+				/*
+				minOffset = 1;
+				var pastAttempts = node.pastAttempts;
+				for (var j = 0; j < pastAttempts.length; ++j) {
+					var past = pastAttempts[j];
+					var id = node.id + "-past-progress-" + j;
+
+					if ($("#" + id).length == 0) {
+						var attemptBox = document.createElement("div");
+						$(attemptBox).attr("id", id);
+						$(attemptBox).addClass("progressBox");
+						$("#" + node.id + "-outerprogressbar").append(attemptBox);
+						$(attemptBox).css("float","left");
+					}
+
+					var attemptBox = $("#" + id);
+					
+					var absoluteLeft = Math.max((past.startTime-flowStartTime)*factor, 3);
+					var left = absoluteLeft - offsetLeft;
+					var width = Math.max((past.endTime - past.startTime)*factor, 1);
+					
+					$(attemptBox).css("margin-left", left)
+					$(attemptBox).css("width", width);
+					$(attemptBox).addClass(past.status);
+					offsetLeft += left + width;
+				}
+				*/
+			}
+
+			var absoluteLeft = Math.max((node.startTime-flowStartTime)*factor, minOffset);
+			var left = absoluteLeft - offsetLeft;
 			var nodeLastTime = node.endTime == -1 ? (new Date()).getTime() : node.endTime;
 			var width = Math.max((nodeLastTime - node.startTime)*factor, 3);
 			width = Math.min(width, outerWidth);
 			
-			$("#" + node.id + "-progressbar").css("margin-left", left)
-			$("#" + node.id + "-progressbar").css("width", width);
+			var elem = $("#" + node.id + "-progressbar");
+			elem.css("margin-left", left)
+			elem.css("width", width);
 		}
 	},
 	addNodeRow: function(node) {
@@ -395,6 +452,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		$(tdStatus).attr("id", node.id + "-status");
 
 		var outerProgressBar = document.createElement("div");
+		$(outerProgressBar).attr("id", node.id + "-outerprogressbar");
 		$(outerProgressBar).addClass("outerProgress");
 
 		var progressBox = document.createElement("div");
@@ -416,8 +474,13 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		tdStatus.appendChild(status);
 
 		var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id;
+		if (node.attempt) {
+			logURL += "&attempt=" + node.attempt;
+		}
+
 		var a = document.createElement("a");
 		$(a).attr("href", logURL);
+		$(a).attr("id", node.id + "-log-link");
 		$(a).text("Log");
 		$(tdLog).addClass("logLink");
 		$(tdLog).append(a);
@@ -496,6 +559,10 @@ var updateStatus = function() {
 	          	oldNode.updateTime = node.updateTime;
 	          	oldNode.endTime = node.endTime;
 	          	oldNode.status = node.status;
+	          	oldNode.attempt = node.attempt;
+	          	if (oldNode.attempt > 0) {
+	          		oldNode.pastAttempts = node.pastAttempts;
+	          	}
 	          }
 
 	          graphModel.set({"update": data});
@@ -575,6 +642,21 @@ $(function() {
 	             updateTime = Math.max(updateTime, node.startTime);
 	             updateTime = Math.max(updateTime, node.endTime);
 	          }
+	          for (var i = 0; i < data.edges.length; ++i) {
+	          	 var edge = data.edges[i];
+	          	 
+	          	 if (!nodeMap[edge.target].in) {
+	          	 	nodeMap[edge.target].in = {};
+	          	 }
+	          	 var targetInMap = nodeMap[edge.target].in;
+	          	 targetInMap[edge.from] = nodeMap[edge.from];
+	          	 
+	          	 if (!nodeMap[edge.from].out) {
+	          	 	nodeMap[edge.from].out = {};
+	          	 }
+	          	 var sourceOutMap = nodeMap[edge.from].out;
+	          	 sourceOutMap[edge.target] = nodeMap[edge.target];
+	          }
 	          
 	          graphModel.set({nodeMap: nodeMap});
 	          
diff --git a/src/web/js/azkaban.flow.job.view.js b/src/web/js/azkaban.flow.job.view.js
index 006fcf4..8071c60 100644
--- a/src/web/js/azkaban.flow.job.view.js
+++ b/src/web/js/azkaban.flow.job.view.js
@@ -76,6 +76,7 @@ azkaban.JobListView = Backbone.View.extend({
 		if (updateData.nodes) {
 			for (var i = 0; i < updateData.nodes.length; ++i) {
 				var updateNode = updateData.nodes[i];
+				$(this.listNodes[updateNode.id]).removeClass();
 				$(this.listNodes[updateNode.id]).addClass(updateNode.status);
 			}
 		}
@@ -84,7 +85,6 @@ azkaban.JobListView = Backbone.View.extend({
 		var data = this.model.get("data");
 		for (var i = 0; i < data.nodes.length; ++i) {
 			var updateNode = data.nodes[i];
-			
 			$(this.listNodes[updateNode.id]).addClass(updateNode.status);
 		}
 	},