azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 50(+35 -15)
src/java/azkaban/execapp/JobRunner.java 65(+63 -2)
unit/executions/exectest1/exec4-retry.flow 54(+54 -0)
Details
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 9a46168..1c9a2ad 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -72,5 +72,9 @@ public abstract class FlowWatcher {
}
}
+ public boolean isWatchCancelled() {
+ return cancelWatch;
+ }
+
public abstract void stopWatcher();
}
src/java/azkaban/execapp/FlowRunner.java 50(+35 -15)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index b58ae29..2afed2a 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -68,7 +68,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private final JobTypeManager jobtypeManager;
private JobRunnerEventListener listener = new JobRunnerEventListener();
- private Map<String, JobRunner> runningJob = new ConcurrentHashMap<String, JobRunner>();
+ private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
// Used for pipelining
private Integer pipelineLevel = null;
@@ -298,7 +298,7 @@ public class FlowRunner extends EventHandler implements Runnable {
JobRunner runner = createJobRunner(node, outputProps);
try {
executorService.submit(runner);
- runningJob.put(node.getJobId(), runner);
+ jobRunners.put(node.getJobId(), runner);
} catch (RejectedExecutionException e) {
logger.error(e);
};
@@ -350,7 +350,10 @@ public class FlowRunner extends EventHandler implements Runnable {
private List<ExecutableNode> findReadyJobsToRun() {
ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
for (ExecutableNode node : flow.getExecutableNodes()) {
- if(Status.isStatusFinished(node.getStatus())) {
+ if (node.getStatus() == Status.FAILED) {
+
+ }
+ else if(Status.isStatusFinished(node.getStatus())) {
continue;
}
else {
@@ -451,6 +454,7 @@ public class FlowRunner extends EventHandler implements Runnable {
jobRunner.setValidatedProxyUsers(proxyUsers);
}
+ jobRunner.setDelayStart(node.getDelayedExecution());
jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
jobRunner.addListener(listener);
@@ -510,11 +514,12 @@ public class FlowRunner extends EventHandler implements Runnable {
synchronized(mainSyncObj) {
flowPaused = false;
flowCancelled = true;
+
if (watcher != null) {
watcher.stopWatcher();
}
- for (JobRunner runner : runningJob.values()) {
+ for (JobRunner runner : jobRunners.values()) {
runner.cancel();
}
@@ -669,16 +674,31 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
if (node.getStatus() == Status.FAILED) {
- flowFailed = true;
-
- ExecutionOptions options = flow.getExecutionOptions();
- // The KILLED status occurs when cancel is invoked. We want to keep this
- // status even in failure conditions.
- if (flow.getStatus() != Status.KILLED) {
- flow.setStatus(Status.FAILED_FINISHING);
- if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
- logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
- cancel();
+ // Retry failure if conditions are met.
+ if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
+ logger.info("Job " + node.getJobId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
+ node.setDelayedExecution(runner.getRetryBackoff());
+ node.resetForRetry();
+ }
+ else {
+ if (!runner.isCancelled() && runner.getRetries() > 0) {
+
+ logger.info("Job " + node.getJobId() + " has run out of retry attempts");
+ // Setting delayed execution to 0 in case this is manually re-tried.
+ node.setDelayedExecution(0);
+ }
+
+ flowFailed = true;
+
+ ExecutionOptions options = flow.getExecutionOptions();
+ // The KILLED status occurs when cancel is invoked. We want to keep this
+ // status even in failure conditions.
+ if (flow.getStatus() != Status.KILLED) {
+ flow.setStatus(Status.FAILED_FINISHING);
+ if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
+ logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
+ cancel();
+ }
}
}
}
@@ -729,6 +749,6 @@ public class FlowRunner extends EventHandler implements Runnable {
}
public int getNumRunningJobs() {
- return runningJob.size();
+ return jobRunners.size();
}
}
\ No newline at end of file
src/java/azkaban/execapp/JobRunner.java 65(+63 -2)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 9ceb7c0..2057c97 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -82,6 +82,9 @@ public class JobRunner extends EventHandler implements Runnable {
private String jobLogChunkSize;
private int jobLogBackupIndex;
+
+ private long delayStartMs = 0;
+ private boolean cancelled = false;
public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
this.props = props;
@@ -102,6 +105,10 @@ public class JobRunner extends EventHandler implements Runnable {
this.jobLogBackupIndex = numLogBackup;
}
+ public Props getProps() {
+ return props;
+ }
+
public void setPipeline(FlowWatcher watcher, int pipelineLevel) {
this.watcher = watcher;
this.pipelineLevel = pipelineLevel;
@@ -115,6 +122,14 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
+ public void setDelayStart(long delayMS) {
+ delayStartMs = delayMS;
+ }
+
+ public long getDelayStart() {
+ return delayStartMs;
+ }
+
public ExecutableNode getNode() {
return node;
}
@@ -206,8 +221,36 @@ public class JobRunner extends EventHandler implements Runnable {
logger.info("Pipelined job " + bStatus.getJobId() + " finished.");
}
}
+ if (watcher.isWatchCancelled()) {
+ logger.info("Job was cancelled while waiting on pipeline. Quiting.");
+ node.setStartTime(System.currentTimeMillis());
+ node.setEndTime(System.currentTimeMillis());
+ fireEvent(Event.create(this, Type.JOB_FINISHED));
+ return;
+ }
}
-
+
+ long currentTime = System.currentTimeMillis();
+ if (delayStartMs > 0) {
+ logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
+ synchronized(this) {
+ try {
+ this.wait(delayStartMs);
+ logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
+ } catch (InterruptedException e) {
+ logger.error("Job " + node.getJobId() + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
+ }
+ }
+
+ if (cancelled) {
+ logger.info("Job was cancelled while in delay. Quiting.");
+ node.setStartTime(System.currentTimeMillis());
+ node.setEndTime(System.currentTimeMillis());
+ fireEvent(Event.create(this, Type.JOB_FINISHED));
+ return;
+ }
+ }
+
node.setStartTime(System.currentTimeMillis());
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
try {
@@ -322,6 +365,7 @@ public class JobRunner extends EventHandler implements Runnable {
job.run();
} catch (Exception e) {
e.printStackTrace();
+
node.setStatus(Status.FAILED);
logError("Job run failed!");
logError(e.getMessage() + e.getCause());
@@ -334,15 +378,20 @@ public class JobRunner extends EventHandler implements Runnable {
node.setOutputProps(outputProps);
}
}
-
+
public void cancel() {
synchronized (syncObject) {
logError("Cancel has been called.");
+ this.cancelled = true;
// Cancel code here
if (job == null) {
node.setStatus(Status.FAILED);
logError("Job hasn't started yet.");
+ // Just in case we're waiting on the delay
+ synchronized(this) {
+ this.notify();
+ }
return;
}
@@ -355,6 +404,10 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
public Status getStatus() {
return node.getStatus();
}
@@ -379,6 +432,14 @@ public class JobRunner extends EventHandler implements Runnable {
return logFile;
}
+ public int getRetries() {
+ return props.getInt("retries", 0);
+ }
+
+ public long getRetryBackoff() {
+ return props.getLong("retry.backoff", 0);
+ }
+
public static String createLogFileName(int executionId, String jobId, int attempt) {
return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
}
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 35483c8..d15a963 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -40,9 +40,12 @@ public class ExecutableNode {
private int level = 0;
private ExecutableFlow flow;
private Props outputProps;
+ private Props resolvedProps;
private int attempt = 0;
private boolean paused = false;
+ private long delayExecution = 0;
+
private Set<String> inNodes = new HashSet<String>();
private Set<String> outNodes = new HashSet<String>();
@@ -124,7 +127,15 @@ public class ExecutableNode {
public void setStatus(Status status) {
this.status = status;
}
-
+
+ public long getDelayedExecution() {
+ return delayExecution;
+ }
+
+ public void setDelayedExecution(long delayMs) {
+ delayExecution = delayMs;
+ }
+
public Object toObject() {
HashMap<String, Object> objMap = new HashMap<String, Object>();
objMap.put("id", jobId);
unit/executions/exectest1/exec4-retry.flow 54(+54 -0)
diff --git a/unit/executions/exectest1/exec4-retry.flow b/unit/executions/exectest1/exec4-retry.flow
new file mode 100644
index 0000000..f18a53c
--- /dev/null
+++ b/unit/executions/exectest1/exec4-retry.flow
@@ -0,0 +1,54 @@
+{
+ "project.id":1,
+ "version":2,
+ "id" : "derived-member-data",
+ "success.email" : [],
+ "edges" : [ {
+ "source" : "job-retry",
+ "target" : "job-pass"
+ },{
+ "source" : "job-pass",
+ "target" : "job-retry-fail"
+ }
+ ],
+ "failure.email" : [],
+ "nodes" : [ {
+ "propSource" : "prop2.properties",
+ "id" : "job-retry",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job-retry.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job-pass",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job-pass.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job-retry-fail",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job-retry-fail.job",
+ "expectedRuntime" : 1
+ }
+ ],
+ "layedout" : false,
+ "type" : "flow",
+ "props" : [ {
+ "inherits" : "prop1.properties",
+ "source" : "prop2.properties"
+ },{
+ "source" : "prop1.properties"
+ }]
+}
\ No newline at end of file
diff --git a/unit/executions/exectest1/job-pass.job b/unit/executions/exectest1/job-pass.job
new file mode 100644
index 0000000..0a60dc4
--- /dev/null
+++ b/unit/executions/exectest1/job-pass.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/exectest1/job-retry.job b/unit/executions/exectest1/job-retry.job
new file mode 100644
index 0000000..94cd0fa
--- /dev/null
+++ b/unit/executions/exectest1/job-retry.job
@@ -0,0 +1,8 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=true
+passRetry=2
+retries=3
+retry.backoff=1000
+
diff --git a/unit/executions/exectest1/job-retry-fail.job b/unit/executions/exectest1/job-retry-fail.job
new file mode 100644
index 0000000..bd51b47
--- /dev/null
+++ b/unit/executions/exectest1/job-retry-fail.job
@@ -0,0 +1,8 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=true
+passRetry=3
+retries=2
+retry.backoff=2000
+
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 9cdca2f..da1ed27 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -303,6 +303,26 @@ public class FlowRunnerTest {
}
}
+ @Test
+ public void execRetries() throws Exception {
+ MockExecutorLoader loader = new MockExecutorLoader();
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+ FlowRunner runner = createFlowRunner(loader, eventCollector, "exec4-retry");
+
+ runner.run();
+
+ ExecutableFlow exFlow = runner.getExecutableFlow();
+ testStatus(exFlow, "job-retry", Status.SUCCEEDED);
+ testStatus(exFlow, "job-pass", Status.SUCCEEDED);
+ testStatus(exFlow, "job-retry-fail", Status.FAILED);
+ testAttempts(exFlow,"job-retry", 3);
+ testAttempts(exFlow, "job-pass", 0);
+ testAttempts(exFlow, "job-retry-fail", 2);
+
+ Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.FAILED);
+ }
+
private void testStatus(ExecutableFlow flow, String name, Status status) {
ExecutableNode node = flow.getExecutableNode(name);
@@ -311,6 +331,14 @@ public class FlowRunnerTest {
}
}
+ private void testAttempts(ExecutableFlow flow, String name, int attempt) {
+ ExecutableNode node = flow.getExecutableNode(name);
+
+ if (node.getAttempt() != attempt) {
+ Assert.fail("Expected " + attempt + " got " + node.getAttempt() + " attempts " + name );
+ }
+ }
+
private ExecutableFlow prepareExecDir(File execDir, String flowName, int execId) throws IOException {
FileUtils.copyDirectory(execDir, workingDir);
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 1bafbbb..1f8edc2 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -109,7 +109,7 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
-
+ Assert.assertTrue(!runner.isCancelled());
Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
try {
@@ -181,7 +181,7 @@ public class JobRunnerTest {
Props outputProps = runner.getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(runner.getLogFilePath() == null);
-
+ Assert.assertTrue(!runner.isCancelled());
try {
eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FINISHED});
}
@@ -231,7 +231,7 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
-
+ Assert.assertTrue(runner.isCancelled());
try {
eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
}
@@ -242,6 +242,98 @@ public class JobRunnerTest {
}
}
+ @Test
+ public void testDelayedExecutionJob() {
+ MockExecutorLoader loader = new MockExecutorLoader();
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ JobRunner runner = createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+ runner.setDelayStart(5000);
+ long startTime = System.currentTimeMillis();
+ ExecutableNode node = runner.getNode();
+
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+ Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+
+ runner.run();
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+
+ Assert.assertTrue(runner.getStatus() == node.getStatus());
+ Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.SUCCEEDED);
+ Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+ Assert.assertTrue( node.getEndTime() - node.getStartTime() > 1000);
+ Assert.assertTrue(node.getStartTime() - startTime >= 5000);
+
+ File logFile = new File(runner.getLogFilePath());
+ Props outputProps = runner.getOutputProps();
+ Assert.assertTrue(outputProps != null);
+ Assert.assertTrue(logFile.exists());
+ Assert.assertFalse(runner.isCancelled());
+ Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+
+ Assert.assertTrue(eventCollector.checkOrdering());
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
+ }
+ catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDelayedExecutionCancelledJob() {
+ MockExecutorLoader loader = new MockExecutorLoader();
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ JobRunner runner = createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+ runner.setDelayStart(5000);
+ long startTime = System.currentTimeMillis();
+ ExecutableNode node = runner.getNode();
+
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+ Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+
+ Thread thread = new Thread(runner);
+ thread.start();
+
+ synchronized(this) {
+ try {
+ wait(2000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ runner.cancel();
+ try {
+ wait(500);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+
+ Assert.assertTrue(runner.getStatus() == node.getStatus());
+ Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.FAILED);
+ Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+ Assert.assertTrue( node.getEndTime() - node.getStartTime() < 1000);
+ Assert.assertTrue(node.getStartTime() - startTime >= 2000);
+ Assert.assertTrue(node.getStartTime() - startTime <= 5000);
+ Assert.assertTrue(runner.isCancelled());
+
+ File logFile = new File(runner.getLogFilePath());
+ Props outputProps = runner.getOutputProps();
+ Assert.assertTrue(outputProps == null);
+ Assert.assertTrue(logFile.exists());
+
+ Assert.assertTrue(eventCollector.checkOrdering());
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.JOB_FINISHED});
+ }
+ catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
private Props createProps( int sleepSec, boolean fail) {
Props props = new Props();
props.put("type", "java");
diff --git a/unit/java/azkaban/test/executor/SleepJavaJob.java b/unit/java/azkaban/test/executor/SleepJavaJob.java
index b75f7c6..0c7d23c 100644
--- a/unit/java/azkaban/test/executor/SleepJavaJob.java
+++ b/unit/java/azkaban/test/executor/SleepJavaJob.java
@@ -1,7 +1,5 @@
package azkaban.test.executor;
-import java.io.File;
-import java.io.FileFilter;
import java.util.Map;
public class SleepJavaJob {