azkaban-memoizeit
Changes
build.properties 2(+1 -1)
src/java/azkaban/execapp/FlowRunner.java 50(+35 -15)
src/java/azkaban/execapp/JobRunner.java 65(+63 -2)
src/java/azkaban/sla/SlaMailer.java 2(+2 -0)
src/java/azkaban/sla/SLAManager.java 1(+1 -0)
src/web/css/azkaban.css 29(+28 -1)
src/web/js/azkaban.date.utils.js 9(+9 -0)
src/web/js/azkaban.exflow.view.js 128(+89 -39)
src/web/js/azkaban.flow.view.js 3(+2 -1)
src/web/js/azkaban.joblog.view.js 2(+1 -1)
unit/executions/exectest1/exec4-retry.flow 54(+54 -0)
Details
build.properties 2(+1 -1)
diff --git a/build.properties b/build.properties
index 1b1ced1..a72a058 100644
--- a/build.properties
+++ b/build.properties
@@ -1,3 +1,3 @@
name=azkaban
-version=2.01
+version=2.1
spec.file=azkaban.spec
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 9a46168..1c9a2ad 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -72,5 +72,9 @@ public abstract class FlowWatcher {
}
}
+ public boolean isWatchCancelled() {
+ return cancelWatch;
+ }
+
public abstract void stopWatcher();
}
src/java/azkaban/execapp/FlowRunner.java 50(+35 -15)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index ff2e5f0..9d81674 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -68,7 +68,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private final JobTypeManager jobtypeManager;
private JobRunnerEventListener listener = new JobRunnerEventListener();
- private Map<String, JobRunner> runningJob = new ConcurrentHashMap<String, JobRunner>();
+ private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
// Used for pipelining
private Integer pipelineLevel = null;
@@ -298,7 +298,7 @@ public class FlowRunner extends EventHandler implements Runnable {
JobRunner runner = createJobRunner(node, outputProps);
try {
executorService.submit(runner);
- runningJob.put(node.getJobId(), runner);
+ jobRunners.put(node.getJobId(), runner);
} catch (RejectedExecutionException e) {
logger.error(e);
};
@@ -350,7 +350,10 @@ public class FlowRunner extends EventHandler implements Runnable {
private List<ExecutableNode> findReadyJobsToRun() {
ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
for (ExecutableNode node : flow.getExecutableNodes()) {
- if(Status.isStatusFinished(node.getStatus())) {
+ if (node.getStatus() == Status.FAILED) {
+
+ }
+ else if(Status.isStatusFinished(node.getStatus())) {
continue;
}
else {
@@ -451,6 +454,7 @@ public class FlowRunner extends EventHandler implements Runnable {
jobRunner.setValidatedProxyUsers(proxyUsers);
}
+ jobRunner.setDelayStart(node.getDelayedExecution());
jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
jobRunner.addListener(listener);
@@ -510,11 +514,12 @@ public class FlowRunner extends EventHandler implements Runnable {
synchronized(mainSyncObj) {
flowPaused = false;
flowCancelled = true;
+
if (watcher != null) {
watcher.stopWatcher();
}
- for (JobRunner runner : runningJob.values()) {
+ for (JobRunner runner : jobRunners.values()) {
runner.cancel();
}
@@ -669,16 +674,31 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
if (node.getStatus() == Status.FAILED) {
- flowFailed = true;
-
- ExecutionOptions options = flow.getExecutionOptions();
- // The KILLED status occurs when cancel is invoked. We want to keep this
- // status even in failure conditions.
- if (flow.getStatus() != Status.KILLED) {
- flow.setStatus(Status.FAILED_FINISHING);
- if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
- logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
- cancel();
+ // Retry failure if conditions are met.
+ if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
+ logger.info("Job " + node.getJobId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
+ node.setDelayedExecution(runner.getRetryBackoff());
+ node.resetForRetry();
+ }
+ else {
+ if (!runner.isCancelled() && runner.getRetries() > 0) {
+
+ logger.info("Job " + node.getJobId() + " has run out of retry attempts");
+ // Setting delayed execution to 0 in case this is manually re-tried.
+ node.setDelayedExecution(0);
+ }
+
+ flowFailed = true;
+
+ ExecutionOptions options = flow.getExecutionOptions();
+ // The KILLED status occurs when cancel is invoked. We want to keep this
+ // status even in failure conditions.
+ if (flow.getStatus() != Status.KILLED) {
+ flow.setStatus(Status.FAILED_FINISHING);
+ if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
+ logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
+ cancel();
+ }
}
}
}
@@ -743,6 +763,6 @@ public class FlowRunner extends EventHandler implements Runnable {
}
public int getNumRunningJobs() {
- return runningJob.size();
+ return jobRunners.size();
}
}
\ No newline at end of file
src/java/azkaban/execapp/JobRunner.java 65(+63 -2)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 3d18fe2..cb58324 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -82,6 +82,9 @@ public class JobRunner extends EventHandler implements Runnable {
private String jobLogChunkSize;
private int jobLogBackupIndex;
+
+ private long delayStartMs = 0;
+ private boolean cancelled = false;
public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
this.props = props;
@@ -102,6 +105,10 @@ public class JobRunner extends EventHandler implements Runnable {
this.jobLogBackupIndex = numLogBackup;
}
+ public Props getProps() {
+ return props;
+ }
+
public void setPipeline(FlowWatcher watcher, int pipelineLevel) {
this.watcher = watcher;
this.pipelineLevel = pipelineLevel;
@@ -115,6 +122,14 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
+ public void setDelayStart(long delayMS) {
+ delayStartMs = delayMS;
+ }
+
+ public long getDelayStart() {
+ return delayStartMs;
+ }
+
public ExecutableNode getNode() {
return node;
}
@@ -206,8 +221,36 @@ public class JobRunner extends EventHandler implements Runnable {
logger.info("Pipelined job " + bStatus.getJobId() + " finished.");
}
}
+ if (watcher.isWatchCancelled()) {
+ logger.info("Job was cancelled while waiting on pipeline. Quiting.");
+ node.setStartTime(System.currentTimeMillis());
+ node.setEndTime(System.currentTimeMillis());
+ fireEvent(Event.create(this, Type.JOB_FINISHED));
+ return;
+ }
}
-
+
+ long currentTime = System.currentTimeMillis();
+ if (delayStartMs > 0) {
+ logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
+ synchronized(this) {
+ try {
+ this.wait(delayStartMs);
+ logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
+ } catch (InterruptedException e) {
+ logger.error("Job " + node.getJobId() + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
+ }
+ }
+
+ if (cancelled) {
+ logger.info("Job was cancelled while in delay. Quiting.");
+ node.setStartTime(System.currentTimeMillis());
+ node.setEndTime(System.currentTimeMillis());
+ fireEvent(Event.create(this, Type.JOB_FINISHED));
+ return;
+ }
+ }
+
node.setStartTime(System.currentTimeMillis());
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
try {
@@ -323,6 +366,7 @@ public class JobRunner extends EventHandler implements Runnable {
job.run();
} catch (Exception e) {
e.printStackTrace();
+
node.setStatus(Status.FAILED);
logError("Job run failed!");
logError(e.getMessage() + e.getCause());
@@ -335,15 +379,20 @@ public class JobRunner extends EventHandler implements Runnable {
node.setOutputProps(outputProps);
}
}
-
+
public void cancel() {
synchronized (syncObject) {
logError("Cancel has been called.");
+ this.cancelled = true;
// Cancel code here
if (job == null) {
node.setStatus(Status.FAILED);
logError("Job hasn't started yet.");
+ // Just in case we're waiting on the delay
+ synchronized(this) {
+ this.notify();
+ }
return;
}
@@ -356,6 +405,10 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
public Status getStatus() {
return node.getStatus();
}
@@ -380,6 +433,14 @@ public class JobRunner extends EventHandler implements Runnable {
return logFile;
}
+ public int getRetries() {
+ return props.getInt("retries", 0);
+ }
+
+ public long getRetryBackoff() {
+ return props.getLong("retry.backoff", 0);
+ }
+
public static String createLogFileName(int executionId, String jobId, int attempt) {
return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
}
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 35483c8..32eac5d 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -43,6 +43,8 @@ public class ExecutableNode {
private int attempt = 0;
private boolean paused = false;
+ private long delayExecution = 0;
+
private Set<String> inNodes = new HashSet<String>();
private Set<String> outNodes = new HashSet<String>();
@@ -71,10 +73,10 @@ public class ExecutableNode {
synchronized (this) {
if (pastAttempts == null) {
pastAttempts = new ArrayList<Attempt>();
- pastAttempts.add(pastAttempt);
}
+
+ pastAttempts.add(pastAttempt);
}
-
startTime = -1;
endTime = -1;
updateTime = System.currentTimeMillis();
@@ -124,7 +126,15 @@ public class ExecutableNode {
public void setStatus(Status status) {
this.status = status;
}
-
+
+ public long getDelayedExecution() {
+ return delayExecution;
+ }
+
+ public void setDelayedExecution(long delayMs) {
+ delayExecution = delayMs;
+ }
+
public Object toObject() {
HashMap<String, Object> objMap = new HashMap<String, Object>();
objMap.put("id", jobId);
src/java/azkaban/sla/SlaMailer.java 2(+2 -0)
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
index db9e24d..8916e71 100644
--- a/src/java/azkaban/sla/SlaMailer.java
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -15,7 +15,9 @@ public class SlaMailer {
private static Logger logger = Logger.getLogger(SlaMailer.class);
private boolean testMode = false;
+ @SuppressWarnings("unused")
private String clientHostname;
+ @SuppressWarnings("unused")
private String clientPortNumber;
private String mailHost;
src/java/azkaban/sla/SLAManager.java 1(+1 -0)
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index 3c2a4fe..0313849 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -387,6 +387,7 @@ public class SLAManager {
mailer.sendSlaEmail(s, message);
}
+ @SuppressWarnings("unused")
private void sendSlaSuccessEmail(SLA s, ExecutableFlow exflow) {
String message = null;
ExecutableNode exnode;
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index f9f62a5..5ce6985 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -31,14 +31,9 @@ import java.util.Comparator;
import java.util.List;
import java.util.TimeZone;
-import javax.management.AttributeNotFoundException;
-import javax.management.InstanceNotFoundException;
-import javax.management.IntrospectionException;
-import javax.management.MBeanException;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import javax.management.ReflectionException;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 89f8470..80a3865 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -87,8 +87,10 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
User user = session.getUser();
int execId = getIntParam(req, "execid");
String jobId = getParam(req, "job");
+ int attempt = getIntParam(req, "attempt", 0);
page.add("execid", execId);
page.add("jobid", jobId);
+ page.add("attempt", attempt);
ExecutableFlow flow = null;
try {
diff --git a/src/java/azkaban/webapp/servlet/IndexServlet.java b/src/java/azkaban/webapp/servlet/IndexServlet.java
index cf94e93..289e49d 100644
--- a/src/java/azkaban/webapp/servlet/IndexServlet.java
+++ b/src/java/azkaban/webapp/servlet/IndexServlet.java
@@ -26,12 +26,8 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
-//import org.apache.log4j.Logger;
-
-import azkaban.executor.ExecutorManager;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
-import azkaban.scheduler.ScheduleManager;
import azkaban.user.Permission;
import azkaban.user.Role;
import azkaban.user.User;
diff --git a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 27bbb3f..78aec2f 100644
--- a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -21,7 +21,6 @@ import azkaban.user.Permission;
import azkaban.user.Role;
import azkaban.user.User;
import azkaban.user.UserManager;
-import azkaban.utils.Pair;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
diff --git a/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm b/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
index d3e577a..2f9ea81 100644
--- a/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
@@ -37,6 +37,7 @@
var flowName = "${flowid}";
var execId = "${execid}";
var jobId = "${jobid}";
+ var attempt = ${attempt};
</script>
</head>
<body>
src/web/css/azkaban.css 29(+28 -1)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 0828ea3..a138e89 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -139,7 +139,7 @@ label.disabled {
.section-ft,
.section-hd {
overflow: hidden;
- padding: 25px 2.7272727%;
+ padding: 25px 2.7272727% 15px;
}
.section-hd h2 {
@@ -1995,6 +1995,11 @@ tr:hover td {
background-position: 16px 0px;
}
+.list ul li.QUEUED .icon {
+ opacity: 0.5;
+ background-position: 32px 0px;
+}
+
.list ul li.RUNNING .icon {
background-position: 32px 0px;
}
@@ -2177,6 +2182,11 @@ svg .RUNNING circle {
fill: #009FC9;
}
+svg .QUEUED circle {
+ opacity: 0.5;
+ fill: #009FC9;
+}
+
svg .FAILED circle {
fill: #CC0000;
}
@@ -2503,6 +2513,14 @@ tr:hover .outerProgress {
background-color: #CCC;
}
+.progressBox.attempt:hover {
+ opacity: 1;
+}
+
+.progressBox.attempt {
+ opacity: 0.70;
+}
+
.progressBox.SUCCEEDED {
background-color: #4e911e;
background: -moz-linear-gradient(top, #5bb41c 0, #598d1e 100%);
@@ -2527,6 +2545,15 @@ tr:hover .outerProgress {
background: linear-gradient(top, #009FC9 0, #007b9b 100%);
}
+.progressBox.QUEUED {
+ opacity: 0.5;
+ background-color: #009FC9;
+ background: -moz-linear-gradient(top, #009FC9 0, #007b9b 100%);
+ background: -o-linear-gradient(top, #009FC9 0, #007b9b 100%);
+ background: -webkit-gradient(linear, left top, left bottom, color-stop(0,#009FC9), color-stop(100%,#007b9b));
+ background: linear-gradient(top, #009FC9 0, #007b9b 100%);
+}
+
h3.subhead {
margin: 15px 20px 8px 20px;
font-size: 14pt;
src/web/js/azkaban.date.utils.js 9(+9 -0)
diff --git a/src/web/js/azkaban.date.utils.js b/src/web/js/azkaban.date.utils.js
index e436648..78dbd90 100644
--- a/src/web/js/azkaban.date.utils.js
+++ b/src/web/js/azkaban.date.utils.js
@@ -55,6 +55,15 @@ var getDateFormat = function(date) {
return datestring;
}
+var getHourMinSec = function(date) {
+ var hours = getTwoDigitStr(date.getHours());
+ var minutes = getTwoDigitStr(date.getMinutes());
+ var second = getTwoDigitStr(date.getSeconds());
+
+ var timestring = hours + ":" + minutes + " " + second + "s";
+ return timestring;
+}
+
var getTwoDigitStr = function(value) {
if (value < 10) {
return "0" + value;
src/web/js/azkaban.exflow.view.js 128(+89 -39)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 59dd6cb..b4d9764 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -271,6 +271,7 @@ var mainSvgGraphView;
var executionListView;
azkaban.ExecutionListView = Backbone.View.extend({
events: {
+// "click .progressBox" : "handleProgressBoxClick"
},
initialize: function(settings) {
this.model.bind('change:graph', this.renderJobs, this);
@@ -282,6 +283,24 @@ azkaban.ExecutionListView = Backbone.View.extend({
this.updateJobRow(data.nodes);
this.updateProgressBar(data);
},
+/* handleProgressBoxClick: function(evt) {
+ var target = evt.currentTarget;
+ var job = target.job;
+ var attempt = target.attempt;
+
+ var data = this.model.get("data");
+ var node = data.nodes[job];
+
+ var jobId = event.currentTarget.jobid;
+ var requestURL = contextURL + "/manager?project=" + projectName + "&execid=" + execId + "&job=" + job + "&attempt=" + attempt;
+
+ var menu = [
+ {title: "Open Job...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Job in New Window...", callback: function() {window.open(requestURL);}}
+ ];
+
+ contextMenuView.show(evt, menu);
+ },*/
updateJobs: function(evt) {
var data = this.model.get("update");
var lastTime = data.endTime == -1 ? (new Date()).getTime() : data.endTime;
@@ -320,12 +339,34 @@ azkaban.ExecutionListView = Backbone.View.extend({
}
var progressBar = $("#" + nodeId + "-progressbar");
- for (var j = 0; j < statusList.length; ++j) {
- var status = statusList[j];
- progressBar.removeClass(status);
+ if (!progressBar.hasClass(node.status)) {
+ for (var j = 0; j < statusList.length; ++j) {
+ var status = statusList[j];
+ progressBar.removeClass(status);
+ }
+ progressBar.addClass(node.status);
}
- progressBar.addClass(node.status);
-
+
+ // Create past attempts
+ if (node.pastAttempts) {
+ for (var a = 0; a < node.pastAttempts.length; ++a) {
+ var attemptBarId = nodeId + "-progressbar-" + a;
+ var attempt = node.pastAttempts[a];
+ if ($("#" + attemptBarId).length == 0) {
+ var attemptBox = document.createElement("div");
+ $(attemptBox).attr("id", attemptBarId);
+ $(attemptBox).addClass("progressBox");
+ $(attemptBox).addClass("attempt");
+ $(attemptBox).addClass(attempt.status);
+ $(attemptBox).css("float","left");
+ $(attemptBox).bind("contextmenu", attemptRightClick);
+ $(progressBar).before(attemptBox);
+ attemptBox.job = nodeId;
+ attemptBox.attempt = a;
+ }
+ }
+ }
+
if (node.endTime == -1) {
// $("#" + node.id + "-elapse").text("0 sec");
$("#" + nodeId + "-elapse").text(getDuration(node.startTime, (new Date()).getTime()));
@@ -353,60 +394,51 @@ azkaban.ExecutionListView = Backbone.View.extend({
}
var nodes = data.nodes;
-
+ var diff = flowLastTime - flowStartTime;
+ var factor = outerWidth/diff;
for (var i = 0; i < nodes.length; ++i) {
var node = nodes[i];
+ var nodeId = node.id.replace(".", "\\\\.");
// calculate the progress
- var diff = flowLastTime - flowStartTime;
-
- var factor = outerWidth/diff;
+ var elem = $("#" + node.id + "-progressbar");
var offsetLeft = 0;
var minOffset = 0;
+ elem.attempt = 0;
+
// Add all the attempts
- if (node.attempt > 0) {
- var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id + "&attempt=" + node.attempt;
+ if (node.pastAttempts) {
+ var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id + "&attempt=" + node.pastAttempts.length;
var aId = node.id + "-log-link";
$("#" + aId).attr("href", logURL);
+ elem.attempt = node.pastAttempts.length;
- /*
- minOffset = 1;
- var pastAttempts = node.pastAttempts;
- for (var j = 0; j < pastAttempts.length; ++j) {
- var past = pastAttempts[j];
- var id = node.id + "-past-progress-" + j;
-
- if ($("#" + id).length == 0) {
- var attemptBox = document.createElement("div");
- $(attemptBox).attr("id", id);
- $(attemptBox).addClass("progressBox");
- $("#" + node.id + "-outerprogressbar").append(attemptBox);
- $(attemptBox).css("float","left");
- }
-
- var attemptBox = $("#" + id);
+ // Calculate the node attempt bars
+ for(var p = 0; p < node.pastAttempts.length; ++p) {
+ var pastAttempt = node.pastAttempts[p];
+ var pastAttemptBox = $("#" + nodeId + "-progressbar-" + p);
- var absoluteLeft = Math.max((past.startTime-flowStartTime)*factor, 3);
- var left = absoluteLeft - offsetLeft;
- var width = Math.max((past.endTime - past.startTime)*factor, 1);
+ var left = (pastAttempt.startTime - flowStartTime)*factor;
+ var width = Math.max((pastAttempt.endTime - pastAttempt.startTime)*factor, 3);
- $(attemptBox).css("margin-left", left)
- $(attemptBox).css("width", width);
- $(attemptBox).addClass(past.status);
- offsetLeft += left + width;
+ var margin = left - offsetLeft;
+ $(pastAttemptBox).css("margin-left", left - offsetLeft);
+ $(pastAttemptBox).css("width", width);
+
+ $(pastAttemptBox).attr("title", "attempt:" + p + " start:" + getHourMinSec(new Date(pastAttempt.startTime)) + " end:" + getHourMinSec(new Date(pastAttempt.endTime)));
+ offsetLeft += width + margin;
}
- */
}
-
- var absoluteLeft = Math.max((node.startTime-flowStartTime)*factor, minOffset);
- var left = absoluteLeft - offsetLeft;
+
var nodeLastTime = node.endTime == -1 ? (new Date()).getTime() : node.endTime;
+ var left = Math.max((node.startTime-flowStartTime)*factor, minOffset);
+ var margin = left - offsetLeft;
var width = Math.max((nodeLastTime - node.startTime)*factor, 3);
width = Math.min(width, outerWidth);
- var elem = $("#" + node.id + "-progressbar");
elem.css("margin-left", left)
elem.css("width", width);
+ elem.attr("title", "attempt:" + elem.attempt + " start:" + getHourMinSec(new Date(node.startTime)) + " end:" + getHourMinSec(new Date(node.endTime)));
}
},
addNodeRow: function(node) {
@@ -439,6 +471,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
$(outerProgressBar).addClass("outerProgress");
var progressBox = document.createElement("div");
+ progressBox.job = node.id;
$(progressBox).attr("id", node.id + "-progressbar");
$(progressBox).addClass("progressBox");
$(outerProgressBar).append(progressBox);
@@ -635,6 +668,23 @@ var exGraphClickCallback = function(event) {
contextMenuView.show(event, menu);
}
+var attemptRightClick = function(event) {
+ var target = event.currentTarget;
+ var job = target.job;
+ var attempt = target.attempt;
+
+ var jobId = event.currentTarget.jobid;
+ var requestURL = contextURL + "/executor?project=" + projectName + "&execid=" + execId + "&job=" + job + "&attempt=" + attempt;
+
+ var menu = [
+ {title: "Open Attempt Log...", callback: function() {window.location.href=requestURL;}},
+ {title: "Open Attempt Log in New Window...", callback: function() {window.open(requestURL);}}
+ ];
+
+ contextMenuView.show(event, menu);
+ return false;
+}
+
$(function() {
var selected;
src/web/js/azkaban.flow.view.js 3(+2 -1)
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index b23cd3d..9f3536b 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -9,7 +9,8 @@ var statusStringMap = {
"KILLED": "Killed",
"DISABLED": "Disabled",
"READY": "Ready",
- "UNKNOWN": "Unknown"
+ "UNKNOWN": "Unknown",
+ "QUEUED": "Queued"
};
var handleJobMenuClick = function(action, el, pos) {
src/web/js/azkaban.joblog.view.js 2(+1 -1)
diff --git a/src/web/js/azkaban.joblog.view.js b/src/web/js/azkaban.joblog.view.js
index 97c5bdf..76f5602 100644
--- a/src/web/js/azkaban.joblog.view.js
+++ b/src/web/js/azkaban.joblog.view.js
@@ -27,7 +27,7 @@ azkaban.JobLogView = Backbone.View.extend({
type: "get",
async: false,
dataType: "json",
- data: {"execid": execId, "jobId": jobId, "ajax":"fetchExecJobLogs", "offset": offset, "length": 50000},
+ data: {"execid": execId, "jobId": jobId, "ajax":"fetchExecJobLogs", "offset": offset, "length": 50000, "attempt": attempt},
error: function(data) {
console.log(data);
finished = true;
unit/executions/exectest1/exec4-retry.flow 54(+54 -0)
diff --git a/unit/executions/exectest1/exec4-retry.flow b/unit/executions/exectest1/exec4-retry.flow
new file mode 100644
index 0000000..f18a53c
--- /dev/null
+++ b/unit/executions/exectest1/exec4-retry.flow
@@ -0,0 +1,54 @@
+{
+ "project.id":1,
+ "version":2,
+ "id" : "derived-member-data",
+ "success.email" : [],
+ "edges" : [ {
+ "source" : "job-retry",
+ "target" : "job-pass"
+ },{
+ "source" : "job-pass",
+ "target" : "job-retry-fail"
+ }
+ ],
+ "failure.email" : [],
+ "nodes" : [ {
+ "propSource" : "prop2.properties",
+ "id" : "job-retry",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job-retry.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job-pass",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job-pass.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job-retry-fail",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job-retry-fail.job",
+ "expectedRuntime" : 1
+ }
+ ],
+ "layedout" : false,
+ "type" : "flow",
+ "props" : [ {
+ "inherits" : "prop1.properties",
+ "source" : "prop2.properties"
+ },{
+ "source" : "prop1.properties"
+ }]
+}
\ No newline at end of file
diff --git a/unit/executions/exectest1/job-pass.job b/unit/executions/exectest1/job-pass.job
new file mode 100644
index 0000000..0a60dc4
--- /dev/null
+++ b/unit/executions/exectest1/job-pass.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/exectest1/job-retry.job b/unit/executions/exectest1/job-retry.job
new file mode 100644
index 0000000..94cd0fa
--- /dev/null
+++ b/unit/executions/exectest1/job-retry.job
@@ -0,0 +1,8 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=true
+passRetry=2
+retries=3
+retry.backoff=1000
+
diff --git a/unit/executions/exectest1/job-retry-fail.job b/unit/executions/exectest1/job-retry-fail.job
new file mode 100644
index 0000000..bd51b47
--- /dev/null
+++ b/unit/executions/exectest1/job-retry-fail.job
@@ -0,0 +1,8 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=true
+passRetry=3
+retries=2
+retry.backoff=2000
+
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index 91a5e6e..f1717b2 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -4,9 +4,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import javax.sql.DataSource;
@@ -15,13 +13,6 @@ import junit.framework.Assert;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
-import org.joda.time.DateTimeZone;
-import org.joda.time.DurationFieldType;
-import org.joda.time.Hours;
-import org.joda.time.MutablePeriod;
-import org.joda.time.Period;
-import org.joda.time.PeriodType;
-import org.joda.time.ReadablePeriod;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 9cdca2f..da1ed27 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -303,6 +303,26 @@ public class FlowRunnerTest {
}
}
+ @Test
+ public void execRetries() throws Exception {
+ MockExecutorLoader loader = new MockExecutorLoader();
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+ FlowRunner runner = createFlowRunner(loader, eventCollector, "exec4-retry");
+
+ runner.run();
+
+ ExecutableFlow exFlow = runner.getExecutableFlow();
+ testStatus(exFlow, "job-retry", Status.SUCCEEDED);
+ testStatus(exFlow, "job-pass", Status.SUCCEEDED);
+ testStatus(exFlow, "job-retry-fail", Status.FAILED);
+ testAttempts(exFlow,"job-retry", 3);
+ testAttempts(exFlow, "job-pass", 0);
+ testAttempts(exFlow, "job-retry-fail", 2);
+
+ Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.FAILED);
+ }
+
private void testStatus(ExecutableFlow flow, String name, Status status) {
ExecutableNode node = flow.getExecutableNode(name);
@@ -311,6 +331,14 @@ public class FlowRunnerTest {
}
}
+ private void testAttempts(ExecutableFlow flow, String name, int attempt) {
+ ExecutableNode node = flow.getExecutableNode(name);
+
+ if (node.getAttempt() != attempt) {
+ Assert.fail("Expected " + attempt + " got " + node.getAttempt() + " attempts " + name );
+ }
+ }
+
private ExecutableFlow prepareExecDir(File execDir, String flowName, int execId) throws IOException {
FileUtils.copyDirectory(execDir, workingDir);
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 1bafbbb..1f8edc2 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -109,7 +109,7 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
-
+ Assert.assertTrue(!runner.isCancelled());
Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
try {
@@ -181,7 +181,7 @@ public class JobRunnerTest {
Props outputProps = runner.getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(runner.getLogFilePath() == null);
-
+ Assert.assertTrue(!runner.isCancelled());
try {
eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FINISHED});
}
@@ -231,7 +231,7 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
-
+ Assert.assertTrue(runner.isCancelled());
try {
eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
}
@@ -242,6 +242,98 @@ public class JobRunnerTest {
}
}
+ @Test
+ public void testDelayedExecutionJob() {
+ MockExecutorLoader loader = new MockExecutorLoader();
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ JobRunner runner = createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+ runner.setDelayStart(5000);
+ long startTime = System.currentTimeMillis();
+ ExecutableNode node = runner.getNode();
+
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+ Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+
+ runner.run();
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+
+ Assert.assertTrue(runner.getStatus() == node.getStatus());
+ Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.SUCCEEDED);
+ Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+ Assert.assertTrue( node.getEndTime() - node.getStartTime() > 1000);
+ Assert.assertTrue(node.getStartTime() - startTime >= 5000);
+
+ File logFile = new File(runner.getLogFilePath());
+ Props outputProps = runner.getOutputProps();
+ Assert.assertTrue(outputProps != null);
+ Assert.assertTrue(logFile.exists());
+ Assert.assertFalse(runner.isCancelled());
+ Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+
+ Assert.assertTrue(eventCollector.checkOrdering());
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
+ }
+ catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDelayedExecutionCancelledJob() {
+ MockExecutorLoader loader = new MockExecutorLoader();
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ JobRunner runner = createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+ runner.setDelayStart(5000);
+ long startTime = System.currentTimeMillis();
+ ExecutableNode node = runner.getNode();
+
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+ Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+
+ Thread thread = new Thread(runner);
+ thread.start();
+
+ synchronized(this) {
+ try {
+ wait(2000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ runner.cancel();
+ try {
+ wait(500);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+
+ Assert.assertTrue(runner.getStatus() == node.getStatus());
+ Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.FAILED);
+ Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+ Assert.assertTrue( node.getEndTime() - node.getStartTime() < 1000);
+ Assert.assertTrue(node.getStartTime() - startTime >= 2000);
+ Assert.assertTrue(node.getStartTime() - startTime <= 5000);
+ Assert.assertTrue(runner.isCancelled());
+
+ File logFile = new File(runner.getLogFilePath());
+ Props outputProps = runner.getOutputProps();
+ Assert.assertTrue(outputProps == null);
+ Assert.assertTrue(logFile.exists());
+
+ Assert.assertTrue(eventCollector.checkOrdering());
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.JOB_FINISHED});
+ }
+ catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
private Props createProps( int sleepSec, boolean fail) {
Props props = new Props();
props.put("type", "java");
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index ad28bb3..903491a 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -376,6 +376,7 @@ public class JdbcExecutorLoaderTest {
Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 185493);
}
+ @SuppressWarnings("static-access")
@Test
public void testRemoveExecutionLogsByTime() throws ExecutorManagerException, IOException, InterruptedException {
diff --git a/unit/java/azkaban/test/executor/SleepJavaJob.java b/unit/java/azkaban/test/executor/SleepJavaJob.java
index b75f7c6..ac831fc 100644
--- a/unit/java/azkaban/test/executor/SleepJavaJob.java
+++ b/unit/java/azkaban/test/executor/SleepJavaJob.java
@@ -1,7 +1,5 @@
package azkaban.test.executor;
-import java.io.File;
-import java.io.FileFilter;
import java.util.Map;
public class SleepJavaJob {
@@ -9,10 +7,8 @@ public class SleepJavaJob {
private String seconds;
private int attempts;
private int currentAttempt;
- private String id;
public SleepJavaJob(String id, Map<String, String> parameters) {
- this.id = id;
String failStr = parameters.get("fail");
if (failStr == null || failStr.equals("false")) {