azkaban-memoizeit

Details

build.properties 2(+1 -1)

diff --git a/build.properties b/build.properties
index 1b1ced1..a72a058 100644
--- a/build.properties
+++ b/build.properties
@@ -1,3 +1,3 @@
 name=azkaban
-version=2.01
+version=2.1
 spec.file=azkaban.spec
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 9a46168..1c9a2ad 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -72,5 +72,9 @@ public abstract class FlowWatcher {
 		}
 	}
 	
+	public boolean isWatchCancelled() {
+		return cancelWatch;
+	}
+	
 	public abstract void stopWatcher();
 }
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index ff2e5f0..9d81674 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -68,7 +68,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private final JobTypeManager jobtypeManager;
 	
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
-	private Map<String, JobRunner> runningJob = new ConcurrentHashMap<String, JobRunner>();
+	private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
 	
 	// Used for pipelining
 	private Integer pipelineLevel = null;
@@ -298,7 +298,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 								JobRunner runner = createJobRunner(node, outputProps);
 								try {
 									executorService.submit(runner);
-									runningJob.put(node.getJobId(), runner);
+									jobRunners.put(node.getJobId(), runner);
 								} catch (RejectedExecutionException e) {
 									logger.error(e);
 								};
@@ -350,7 +350,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private List<ExecutableNode> findReadyJobsToRun() {
 		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
 		for (ExecutableNode node : flow.getExecutableNodes()) {
-			if(Status.isStatusFinished(node.getStatus())) {
+			if (node.getStatus() == Status.FAILED) {
+				
+			}
+			else if(Status.isStatusFinished(node.getStatus())) {
 				continue;
 			}
 			else {
@@ -451,6 +454,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			jobRunner.setValidatedProxyUsers(proxyUsers);
 		}
 		
+		jobRunner.setDelayStart(node.getDelayedExecution());
 		jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
 		jobRunner.addListener(listener);
 
@@ -510,11 +514,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 		synchronized(mainSyncObj) {
 			flowPaused = false;
 			flowCancelled = true;
+			
 			if (watcher != null) {
 				watcher.stopWatcher();
 			}
 			
-			for (JobRunner runner : runningJob.values()) {
+			for (JobRunner runner : jobRunners.values()) {
 				runner.cancel();
 			}
 			
@@ -669,16 +674,31 @@ public class FlowRunner extends EventHandler implements Runnable {
 					logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
 					
 					if (node.getStatus() == Status.FAILED) {
-						flowFailed = true;
-						
-						ExecutionOptions options = flow.getExecutionOptions();
-						// The KILLED status occurs when cancel is invoked. We want to keep this
-						// status even in failure conditions.
-						if (flow.getStatus() != Status.KILLED) {
-							flow.setStatus(Status.FAILED_FINISHING);
-							if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
-								logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
-								cancel();
+						// Retry failure if conditions are met.
+						if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
+							logger.info("Job " + node.getJobId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
+							node.setDelayedExecution(runner.getRetryBackoff());
+							node.resetForRetry();
+						}
+						else {
+							if (!runner.isCancelled() && runner.getRetries() > 0) {
+					
+								logger.info("Job " + node.getJobId() + " has run out of retry attempts");
+								// Setting delayed execution to 0 in case this is manually re-tried.
+								node.setDelayedExecution(0);
+							}
+
+							flowFailed = true;
+							
+							ExecutionOptions options = flow.getExecutionOptions();
+							// The KILLED status occurs when cancel is invoked. We want to keep this
+							// status even in failure conditions.
+							if (flow.getStatus() != Status.KILLED) {
+								flow.setStatus(Status.FAILED_FINISHING);
+								if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
+									logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
+									cancel();
+								}
 							}
 						}
 					}
@@ -743,6 +763,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 	
 	public int getNumRunningJobs() {
-		return runningJob.size();
+		return jobRunners.size();
 	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 3d18fe2..cb58324 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -82,6 +82,9 @@ public class JobRunner extends EventHandler implements Runnable {
 
 	private String jobLogChunkSize;
 	private int jobLogBackupIndex;
+
+	private long delayStartMs = 0;
+	private boolean cancelled = false;
 	
 	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
 		this.props = props;
@@ -102,6 +105,10 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.jobLogBackupIndex = numLogBackup;
 	}
 	
+	public Props getProps() {
+		return props;
+	}
+	
 	public void setPipeline(FlowWatcher watcher, int pipelineLevel) {
 		this.watcher = watcher;
 		this.pipelineLevel = pipelineLevel;
@@ -115,6 +122,14 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 	}
 	
+	public void setDelayStart(long delayMS) {
+		delayStartMs = delayMS;
+	}
+	
+	public long getDelayStart() {
+		return delayStartMs;
+	}
+	
 	public ExecutableNode getNode() {
 		return node;
 	}
@@ -206,8 +221,36 @@ public class JobRunner extends EventHandler implements Runnable {
 						logger.info("Pipelined job " + bStatus.getJobId() + " finished.");
 					}
 				}
+				if (watcher.isWatchCancelled()) {
+					logger.info("Job was cancelled while waiting on pipeline. Quiting.");
+					node.setStartTime(System.currentTimeMillis());
+					node.setEndTime(System.currentTimeMillis());
+					fireEvent(Event.create(this, Type.JOB_FINISHED));
+					return;
+				}
 			}
-
+			
+			long currentTime = System.currentTimeMillis();
+			if (delayStartMs > 0) {
+				logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
+				synchronized(this) {
+					try {
+						this.wait(delayStartMs);
+						logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
+					} catch (InterruptedException e) {
+						logger.error("Job " + node.getJobId() + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
+					}
+				}
+				
+				if (cancelled) {
+					logger.info("Job was cancelled while in delay. Quiting.");
+					node.setStartTime(System.currentTimeMillis());
+					node.setEndTime(System.currentTimeMillis());
+					fireEvent(Event.create(this, Type.JOB_FINISHED));
+					return;
+				}
+			}
+			
 			node.setStartTime(System.currentTimeMillis());
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			try {
@@ -323,6 +366,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			job.run();
 		} catch (Exception e) {
 			e.printStackTrace();
+
 			node.setStatus(Status.FAILED);
 			logError("Job run failed!");
 			logError(e.getMessage() + e.getCause());
@@ -335,15 +379,20 @@ public class JobRunner extends EventHandler implements Runnable {
 			node.setOutputProps(outputProps);
 		}
 	}
-
+	
 	public void cancel() {
 		synchronized (syncObject) {
 			logError("Cancel has been called.");
+			this.cancelled = true;
 			
 			// Cancel code here
 			if (job == null) {
 				node.setStatus(Status.FAILED);
 				logError("Job hasn't started yet.");
+				// Just in case we're waiting on the delay
+				synchronized(this) {
+					this.notify();
+				}
 				return;
 			}
 	
@@ -356,6 +405,10 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 	}
 	
+	public boolean isCancelled() {
+		return cancelled;
+	}
+	
 	public Status getStatus() {
 		return node.getStatus();
 	}
@@ -380,6 +433,14 @@ public class JobRunner extends EventHandler implements Runnable {
 		return logFile;
 	}
 	
+	public int getRetries() {
+		return props.getInt("retries", 0);
+	}
+	
+	public long getRetryBackoff() {
+		return props.getLong("retry.backoff", 0);
+	}
+	
 	public static String createLogFileName(int executionId, String jobId, int attempt) {
 		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
 	}
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 35483c8..32eac5d 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -43,6 +43,8 @@ public class ExecutableNode {
 	private int attempt = 0;
 	private boolean paused = false;
 	
+	private long delayExecution = 0;
+
 	private Set<String> inNodes = new HashSet<String>();
 	private Set<String> outNodes = new HashSet<String>();
 	
@@ -71,10 +73,10 @@ public class ExecutableNode {
 		synchronized (this) {
 			if (pastAttempts == null) {
 				pastAttempts = new ArrayList<Attempt>();
-				pastAttempts.add(pastAttempt);
 			}
+			
+			pastAttempts.add(pastAttempt);
 		}
-		
 		startTime = -1;
 		endTime = -1;
 		updateTime = System.currentTimeMillis();
@@ -124,7 +126,15 @@ public class ExecutableNode {
 	public void setStatus(Status status) {
 		this.status = status;
 	}
-
+	
+	public long getDelayedExecution() {
+		return delayExecution;
+	}
+	
+	public void setDelayedExecution(long delayMs) {
+		delayExecution = delayMs;
+	}
+	
 	public Object toObject() {
 		HashMap<String, Object> objMap = new HashMap<String, Object>();
 		objMap.put("id", jobId);
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
index db9e24d..8916e71 100644
--- a/src/java/azkaban/sla/SlaMailer.java
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -15,7 +15,9 @@ public class SlaMailer {
 	private static Logger logger = Logger.getLogger(SlaMailer.class);
 	
 	private boolean testMode = false;
+	@SuppressWarnings("unused")
 	private String clientHostname;
+	@SuppressWarnings("unused")
 	private String clientPortNumber;
 	
 	private String mailHost;
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index 3c2a4fe..0313849 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -387,6 +387,7 @@ public class SLAManager {
 		mailer.sendSlaEmail(s, message);
 	}
 	
+	@SuppressWarnings("unused")
 	private void sendSlaSuccessEmail(SLA s, ExecutableFlow exflow) {
 		String message = null;
 		ExecutableNode exnode;
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index f9f62a5..5ce6985 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -31,14 +31,9 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.TimeZone;
 
-import javax.management.AttributeNotFoundException;
-import javax.management.InstanceNotFoundException;
-import javax.management.IntrospectionException;
-import javax.management.MBeanException;
 import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-import javax.management.ReflectionException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 89f8470..80a3865 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -87,8 +87,10 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		User user = session.getUser();
 		int execId = getIntParam(req, "execid");
 		String jobId = getParam(req, "job");
+		int attempt = getIntParam(req, "attempt", 0);
 		page.add("execid", execId);
 		page.add("jobid", jobId);
+		page.add("attempt", attempt);
 		
 		ExecutableFlow flow = null;
 		try {
diff --git a/src/java/azkaban/webapp/servlet/IndexServlet.java b/src/java/azkaban/webapp/servlet/IndexServlet.java
index cf94e93..289e49d 100644
--- a/src/java/azkaban/webapp/servlet/IndexServlet.java
+++ b/src/java/azkaban/webapp/servlet/IndexServlet.java
@@ -26,12 +26,8 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.log4j.Logger;
 
-//import org.apache.log4j.Logger;
-
-import azkaban.executor.ExecutorManager;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
-import azkaban.scheduler.ScheduleManager;
 import azkaban.user.Permission;
 import azkaban.user.Role;
 import azkaban.user.User;
diff --git a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 27bbb3f..78aec2f 100644
--- a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -21,7 +21,6 @@ import azkaban.user.Permission;
 import azkaban.user.Role;
 import azkaban.user.User;
 import azkaban.user.UserManager;
-import azkaban.utils.Pair;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
 
diff --git a/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm b/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
index d3e577a..2f9ea81 100644
--- a/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
@@ -37,6 +37,7 @@
 			var flowName = "${flowid}";
 			var execId = "${execid}";
 			var jobId = "${jobid}";
+			var attempt = ${attempt};
 		</script>
 	</head>
 	<body>
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 0828ea3..a138e89 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -139,7 +139,7 @@ label.disabled {
 .section-ft,
 .section-hd {
   overflow: hidden;
-  padding: 25px 2.7272727%;
+  padding: 25px 2.7272727% 15px;
 }
      
 .section-hd h2 {
@@ -1995,6 +1995,11 @@ tr:hover td {
 	background-position: 16px 0px;
 }
 
+.list ul li.QUEUED .icon {
+	opacity: 0.5;
+	background-position: 32px 0px;
+}
+
 .list ul li.RUNNING .icon {
 	background-position: 32px 0px;
 }
@@ -2177,6 +2182,11 @@ svg .RUNNING circle {
 	fill: #009FC9;
 }
 
+svg .QUEUED circle {
+	opacity: 0.5;
+	fill: #009FC9;
+}
+
 svg .FAILED circle {
 	fill: #CC0000;
 }
@@ -2503,6 +2513,14 @@ tr:hover .outerProgress {
 	background-color: #CCC;
 }
 
+.progressBox.attempt:hover {
+	opacity: 1;
+}
+
+.progressBox.attempt {
+	opacity: 0.70;
+}
+
 .progressBox.SUCCEEDED {
 	background-color: #4e911e;
 	background: -moz-linear-gradient(top, #5bb41c 0, #598d1e 100%);
@@ -2527,6 +2545,15 @@ tr:hover .outerProgress {
     background: linear-gradient(top, #009FC9 0, #007b9b 100%);
 }
 
+.progressBox.QUEUED {
+	opacity: 0.5;
+	background-color: #009FC9;
+	background: -moz-linear-gradient(top, #009FC9 0, #007b9b 100%);
+    background: -o-linear-gradient(top, #009FC9 0, #007b9b 100%);
+    background: -webkit-gradient(linear, left top, left bottom, color-stop(0,#009FC9), color-stop(100%,#007b9b));
+    background: linear-gradient(top, #009FC9 0, #007b9b 100%);
+}
+
 h3.subhead {
 	margin: 15px 20px 8px 20px;
 	font-size: 14pt;
diff --git a/src/web/js/azkaban.date.utils.js b/src/web/js/azkaban.date.utils.js
index e436648..78dbd90 100644
--- a/src/web/js/azkaban.date.utils.js
+++ b/src/web/js/azkaban.date.utils.js
@@ -55,6 +55,15 @@ var getDateFormat = function(date) {
 	return datestring;
 }
 
+var getHourMinSec = function(date) {
+	var hours = getTwoDigitStr(date.getHours());
+	var minutes = getTwoDigitStr(date.getMinutes());
+	var second = getTwoDigitStr(date.getSeconds());
+	
+	var timestring = hours + ":" + minutes + " " + second + "s";
+	return timestring;
+}
+
 var getTwoDigitStr = function(value) {
 	if (value < 10) {
 		return "0" + value;
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 59dd6cb..b4d9764 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -271,6 +271,7 @@ var mainSvgGraphView;
 var executionListView;
 azkaban.ExecutionListView = Backbone.View.extend({
 	events: {
+//		"click .progressBox" : "handleProgressBoxClick"
 	},
 	initialize: function(settings) {
 		this.model.bind('change:graph', this.renderJobs, this);
@@ -282,6 +283,24 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		this.updateJobRow(data.nodes);
 		this.updateProgressBar(data);
 	},
+/*	handleProgressBoxClick: function(evt) {
+		var target = evt.currentTarget;
+		var job = target.job;
+		var attempt = target.attempt;
+		
+		var data = this.model.get("data");
+		var node = data.nodes[job];
+		
+		var jobId = event.currentTarget.jobid;
+		var requestURL = contextURL + "/manager?project=" + projectName + "&execid=" + execId + "&job=" + job + "&attempt=" + attempt;
+	
+		var menu = [	
+				{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
+				{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}}
+		];
+	
+		contextMenuView.show(evt, menu);
+	},*/
 	updateJobs: function(evt) {
 		var data = this.model.get("update");
 		var lastTime = data.endTime == -1 ? (new Date()).getTime() : data.endTime;
@@ -320,12 +339,34 @@ azkaban.ExecutionListView = Backbone.View.extend({
 				}
 				
 				var progressBar = $("#" + nodeId + "-progressbar");
-				for (var j = 0; j < statusList.length; ++j) {
-					var status = statusList[j];
-					progressBar.removeClass(status);
+				if (!progressBar.hasClass(node.status)) {
+					for (var j = 0; j < statusList.length; ++j) {
+						var status = statusList[j];
+						progressBar.removeClass(status);
+					}
+					progressBar.addClass(node.status);
 				}
-				progressBar.addClass(node.status);
-
+				
+				// Create past attempts
+				if (node.pastAttempts) {
+					for (var a = 0; a < node.pastAttempts.length; ++a) {
+						var attemptBarId = nodeId + "-progressbar-" + a;
+						var attempt = node.pastAttempts[a];
+						if ($("#" + attemptBarId).length == 0) {
+							var attemptBox = document.createElement("div");
+							$(attemptBox).attr("id", attemptBarId);
+							$(attemptBox).addClass("progressBox");
+							$(attemptBox).addClass("attempt");
+							$(attemptBox).addClass(attempt.status);
+							$(attemptBox).css("float","left");
+							$(attemptBox).bind("contextmenu", attemptRightClick);
+							$(progressBar).before(attemptBox);
+							attemptBox.job = nodeId;
+							attemptBox.attempt = a;
+						}
+					}
+				}
+				
 				if (node.endTime == -1) {
 //					$("#" + node.id + "-elapse").text("0 sec");
 					$("#" + nodeId + "-elapse").text(getDuration(node.startTime, (new Date()).getTime()));					
@@ -353,60 +394,51 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		}
 		
 		var nodes = data.nodes;
-		
+		var diff = flowLastTime - flowStartTime;
+		var factor = outerWidth/diff;
 		for (var i = 0; i < nodes.length; ++i) {
 			var node = nodes[i];
+			var nodeId = node.id.replace(".", "\\\\.");
 			// calculate the progress
-			var diff = flowLastTime - flowStartTime;
-			
-			var factor = outerWidth/diff;
 
+			var elem = $("#" + node.id + "-progressbar");
 			var offsetLeft = 0;
 			var minOffset = 0;
+			elem.attempt = 0;
+			
 			// Add all the attempts
-			if (node.attempt > 0) {
-				var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id + "&attempt=" + node.attempt;
+			if (node.pastAttempts) {
+				var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id + "&attempt=" +  node.pastAttempts.length;
 				var aId = node.id + "-log-link";
 				$("#" + aId).attr("href", logURL);
+				elem.attempt = node.pastAttempts.length;
 				
-				/*
-				minOffset = 1;
-				var pastAttempts = node.pastAttempts;
-				for (var j = 0; j < pastAttempts.length; ++j) {
-					var past = pastAttempts[j];
-					var id = node.id + "-past-progress-" + j;
-
-					if ($("#" + id).length == 0) {
-						var attemptBox = document.createElement("div");
-						$(attemptBox).attr("id", id);
-						$(attemptBox).addClass("progressBox");
-						$("#" + node.id + "-outerprogressbar").append(attemptBox);
-						$(attemptBox).css("float","left");
-					}
-
-					var attemptBox = $("#" + id);
+				// Calculate the node attempt bars
+				for(var p = 0; p < node.pastAttempts.length; ++p) {
+					var pastAttempt = node.pastAttempts[p];
+					var pastAttemptBox = $("#" + nodeId + "-progressbar-" + p);
 					
-					var absoluteLeft = Math.max((past.startTime-flowStartTime)*factor, 3);
-					var left = absoluteLeft - offsetLeft;
-					var width = Math.max((past.endTime - past.startTime)*factor, 1);
+					var left = (pastAttempt.startTime - flowStartTime)*factor;
+					var width =  Math.max((pastAttempt.endTime - pastAttempt.startTime)*factor, 3);
 					
-					$(attemptBox).css("margin-left", left)
-					$(attemptBox).css("width", width);
-					$(attemptBox).addClass(past.status);
-					offsetLeft += left + width;
+					var margin = left - offsetLeft;
+					$(pastAttemptBox).css("margin-left", left - offsetLeft);
+					$(pastAttemptBox).css("width", width);
+					
+					$(pastAttemptBox).attr("title", "attempt:" + p + "  start:" + getHourMinSec(new Date(pastAttempt.startTime)) + "  end:" + getHourMinSec(new Date(pastAttempt.endTime)));
+					offsetLeft += width + margin;
 				}
-				*/
 			}
-
-			var absoluteLeft = Math.max((node.startTime-flowStartTime)*factor, minOffset);
-			var left = absoluteLeft - offsetLeft;
+			
 			var nodeLastTime = node.endTime == -1 ? (new Date()).getTime() : node.endTime;
+			var left = Math.max((node.startTime-flowStartTime)*factor, minOffset);
+			var margin = left - offsetLeft;
 			var width = Math.max((nodeLastTime - node.startTime)*factor, 3);
 			width = Math.min(width, outerWidth);
 			
-			var elem = $("#" + node.id + "-progressbar");
 			elem.css("margin-left", left)
 			elem.css("width", width);
+			elem.attr("title", "attempt:" + elem.attempt + "  start:" + getHourMinSec(new Date(node.startTime)) + "  end:" + getHourMinSec(new Date(node.endTime)));
 		}
 	},
 	addNodeRow: function(node) {
@@ -439,6 +471,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		$(outerProgressBar).addClass("outerProgress");
 
 		var progressBox = document.createElement("div");
+		progressBox.job = node.id;
 		$(progressBox).attr("id", node.id + "-progressbar");
 		$(progressBox).addClass("progressBox");
 		$(outerProgressBar).append(progressBox);
@@ -635,6 +668,23 @@ var exGraphClickCallback = function(event) {
 	contextMenuView.show(event, menu);
 }
 
+var attemptRightClick = function(event) {
+	var target = event.currentTarget;
+	var job = target.job;
+	var attempt = target.attempt;
+	
+	var jobId = event.currentTarget.jobid;
+	var requestURL = contextURL + "/executor?project=" + projectName + "&execid=" + execId + "&job=" + job + "&attempt=" + attempt;
+
+	var menu = [	
+			{title: "Open Attempt Log...", callback: function() {window.location.href=requestURL;}},
+			{title: "Open Attempt Log in New Window...", callback: function() {window.open(requestURL);}}
+	];
+
+	contextMenuView.show(event, menu);
+	return false;
+}
+
 $(function() {
 	var selected;
 
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index b23cd3d..9f3536b 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -9,7 +9,8 @@ var statusStringMap = {
 	"KILLED": "Killed",
 	"DISABLED": "Disabled",
 	"READY": "Ready",
-	"UNKNOWN": "Unknown"
+	"UNKNOWN": "Unknown",
+	"QUEUED": "Queued"
 };
 
 var handleJobMenuClick = function(action, el, pos) {
diff --git a/src/web/js/azkaban.joblog.view.js b/src/web/js/azkaban.joblog.view.js
index 97c5bdf..76f5602 100644
--- a/src/web/js/azkaban.joblog.view.js
+++ b/src/web/js/azkaban.joblog.view.js
@@ -27,7 +27,7 @@ azkaban.JobLogView = Backbone.View.extend({
 				type: "get",
 				async: false,
 				dataType: "json",
-				data: {"execid": execId, "jobId": jobId, "ajax":"fetchExecJobLogs", "offset": offset, "length": 50000},
+				data: {"execid": execId, "jobId": jobId, "ajax":"fetchExecJobLogs", "offset": offset, "length": 50000, "attempt": attempt},
 				error: function(data) {
 					console.log(data);
 					finished = true;
diff --git a/unit/executions/exectest1/exec4-retry.flow b/unit/executions/exectest1/exec4-retry.flow
new file mode 100644
index 0000000..f18a53c
--- /dev/null
+++ b/unit/executions/exectest1/exec4-retry.flow
@@ -0,0 +1,54 @@
+{
+  "project.id":1,
+  "version":2,
+  "id" : "derived-member-data",
+  "success.email" : [],
+  "edges" : [ {
+    "source" : "job-retry",
+    "target" : "job-pass"
+  },{
+    "source" : "job-pass",
+    "target" : "job-retry-fail"
+  }
+   ],
+  "failure.email" : [],
+  "nodes" : [ {
+    "propSource" : "prop2.properties",
+    "id" : "job-retry",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job-retry.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job-pass",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job-pass.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job-retry-fail",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job-retry-fail.job",
+    "expectedRuntime" : 1
+  }
+   ],
+  "layedout" : false,
+  "type" : "flow",
+  "props" : [ {
+    "inherits" : "prop1.properties",
+    "source" : "prop2.properties"
+  },{
+    "source" : "prop1.properties"
+  }]
+}
\ No newline at end of file
diff --git a/unit/executions/exectest1/job-pass.job b/unit/executions/exectest1/job-pass.job
new file mode 100644
index 0000000..0a60dc4
--- /dev/null
+++ b/unit/executions/exectest1/job-pass.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/exectest1/job-retry.job b/unit/executions/exectest1/job-retry.job
new file mode 100644
index 0000000..94cd0fa
--- /dev/null
+++ b/unit/executions/exectest1/job-retry.job
@@ -0,0 +1,8 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=true
+passRetry=2
+retries=3
+retry.backoff=1000
+
diff --git a/unit/executions/exectest1/job-retry-fail.job b/unit/executions/exectest1/job-retry-fail.job
new file mode 100644
index 0000000..bd51b47
--- /dev/null
+++ b/unit/executions/exectest1/job-retry-fail.job
@@ -0,0 +1,8 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=true
+passRetry=3
+retries=2
+retry.backoff=2000
+
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index 91a5e6e..f1717b2 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -4,9 +4,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import javax.sql.DataSource;
 
@@ -15,13 +13,6 @@ import junit.framework.Assert;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
-import org.joda.time.DateTimeZone;
-import org.joda.time.DurationFieldType;
-import org.joda.time.Hours;
-import org.joda.time.MutablePeriod;
-import org.joda.time.Period;
-import org.joda.time.PeriodType;
-import org.joda.time.ReadablePeriod;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 9cdca2f..da1ed27 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -303,6 +303,26 @@ public class FlowRunnerTest {
 		}
 	}
 	
+	@Test
+	public void execRetries() throws Exception {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+		FlowRunner runner = createFlowRunner(loader, eventCollector, "exec4-retry");
+		
+		runner.run();
+		
+		ExecutableFlow exFlow = runner.getExecutableFlow();
+		testStatus(exFlow, "job-retry", Status.SUCCEEDED);
+		testStatus(exFlow, "job-pass", Status.SUCCEEDED);
+		testStatus(exFlow, "job-retry-fail", Status.FAILED);
+		testAttempts(exFlow,"job-retry", 3);
+		testAttempts(exFlow, "job-pass", 0);
+		testAttempts(exFlow, "job-retry-fail", 2);
+		
+		Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.FAILED);
+	}
+	
 	private void testStatus(ExecutableFlow flow, String name, Status status) {
 		ExecutableNode node = flow.getExecutableNode(name);
 		
@@ -311,6 +331,14 @@ public class FlowRunnerTest {
 		}
 	}
 	
+	private void testAttempts(ExecutableFlow flow, String name, int attempt) {
+		ExecutableNode node = flow.getExecutableNode(name);
+		
+		if (node.getAttempt() != attempt) {
+			Assert.fail("Expected " + attempt + " got " + node.getAttempt() + " attempts " + name );
+		}
+	}
+	
 	private ExecutableFlow prepareExecDir(File execDir, String flowName, int execId) throws IOException {
 		FileUtils.copyDirectory(execDir, workingDir);
 		
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 1bafbbb..1f8edc2 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -109,7 +109,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
-		
+		Assert.assertTrue(!runner.isCancelled());
 		Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
 		
 		try {
@@ -181,7 +181,7 @@ public class JobRunnerTest {
 		Props outputProps = runner.getOutputProps();
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(runner.getLogFilePath() == null);
-		
+		Assert.assertTrue(!runner.isCancelled());
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FINISHED});
 		}
@@ -231,7 +231,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
-		
+		Assert.assertTrue(runner.isCancelled());
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
 		}
@@ -242,6 +242,98 @@ public class JobRunnerTest {
 		}
 	}
 	
+	@Test
+	public void testDelayedExecutionJob() {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		JobRunner runner = createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+		runner.setDelayStart(5000);
+		long startTime = System.currentTimeMillis();
+		ExecutableNode node = runner.getNode();
+		
+		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+		Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+		
+		runner.run();
+		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+		
+		Assert.assertTrue(runner.getStatus() == node.getStatus());
+		Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.SUCCEEDED);
+		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+		Assert.assertTrue( node.getEndTime() - node.getStartTime() > 1000);
+		Assert.assertTrue(node.getStartTime() - startTime >= 5000);
+		
+		File logFile = new File(runner.getLogFilePath());
+		Props outputProps = runner.getOutputProps();
+		Assert.assertTrue(outputProps != null);
+		Assert.assertTrue(logFile.exists());
+		Assert.assertFalse(runner.isCancelled());
+		Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+		
+		Assert.assertTrue(eventCollector.checkOrdering());
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
+		}
+		catch (Exception e) {
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDelayedExecutionCancelledJob() {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		JobRunner runner = createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+		runner.setDelayStart(5000);
+		long startTime = System.currentTimeMillis();
+		ExecutableNode node = runner.getNode();
+		
+		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+		Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+		
+		Thread thread = new Thread(runner);
+		thread.start();
+		
+		synchronized(this) {
+			try {
+				wait(2000);
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			runner.cancel();
+			try {
+				wait(500);
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+
+		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+		
+		Assert.assertTrue(runner.getStatus() == node.getStatus());
+		Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.FAILED);
+		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+		Assert.assertTrue( node.getEndTime() - node.getStartTime() < 1000);
+		Assert.assertTrue(node.getStartTime() - startTime >= 2000);
+		Assert.assertTrue(node.getStartTime() - startTime <= 5000);
+		Assert.assertTrue(runner.isCancelled());
+		
+		File logFile = new File(runner.getLogFilePath());
+		Props outputProps = runner.getOutputProps();
+		Assert.assertTrue(outputProps == null);
+		Assert.assertTrue(logFile.exists());
+		
+		Assert.assertTrue(eventCollector.checkOrdering());
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.JOB_FINISHED});
+		}
+		catch (Exception e) {
+			Assert.fail(e.getMessage());
+		}
+	}
+	
 	private Props createProps( int sleepSec, boolean fail) {
 		Props props = new Props();
 		props.put("type", "java");
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index ad28bb3..903491a 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -376,6 +376,7 @@ public class JdbcExecutorLoaderTest {
 		Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 185493);
 	}
 	
+	@SuppressWarnings("static-access")
 	@Test
 	public void testRemoveExecutionLogsByTime() throws ExecutorManagerException, IOException, InterruptedException {
 		
diff --git a/unit/java/azkaban/test/executor/SleepJavaJob.java b/unit/java/azkaban/test/executor/SleepJavaJob.java
index b75f7c6..ac831fc 100644
--- a/unit/java/azkaban/test/executor/SleepJavaJob.java
+++ b/unit/java/azkaban/test/executor/SleepJavaJob.java
@@ -1,7 +1,5 @@
 package azkaban.test.executor;
 
-import java.io.File;
-import java.io.FileFilter;
 import java.util.Map;
 
 public class SleepJavaJob {
@@ -9,10 +7,8 @@ public class SleepJavaJob {
 	private String seconds;
 	private int attempts;
 	private int currentAttempt;
-	private String id;
 
 	public SleepJavaJob(String id, Map<String, String> parameters) {
-		this.id = id;
 		String failStr = parameters.get("fail");
 		
 		if (failStr == null || failStr.equals("false")) {