azkaban-memoizeit

Job retries. Use 'retries' to specify the # of retries. Use 'retry.backoff'

4/5/2013 8:57:18 PM

Details

diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 9a46168..1c9a2ad 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -72,5 +72,9 @@ public abstract class FlowWatcher {
 		}
 	}
 	
+	public boolean isWatchCancelled() {
+		return cancelWatch;
+	}
+	
 	public abstract void stopWatcher();
 }
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index b58ae29..2afed2a 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -68,7 +68,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private final JobTypeManager jobtypeManager;
 	
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
-	private Map<String, JobRunner> runningJob = new ConcurrentHashMap<String, JobRunner>();
+	private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
 	
 	// Used for pipelining
 	private Integer pipelineLevel = null;
@@ -298,7 +298,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 								JobRunner runner = createJobRunner(node, outputProps);
 								try {
 									executorService.submit(runner);
-									runningJob.put(node.getJobId(), runner);
+									jobRunners.put(node.getJobId(), runner);
 								} catch (RejectedExecutionException e) {
 									logger.error(e);
 								};
@@ -350,7 +350,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private List<ExecutableNode> findReadyJobsToRun() {
 		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
 		for (ExecutableNode node : flow.getExecutableNodes()) {
-			if(Status.isStatusFinished(node.getStatus())) {
+			if (node.getStatus() == Status.FAILED) {
+				
+			}
+			else if(Status.isStatusFinished(node.getStatus())) {
 				continue;
 			}
 			else {
@@ -451,6 +454,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			jobRunner.setValidatedProxyUsers(proxyUsers);
 		}
 		
+		jobRunner.setDelayStart(node.getDelayedExecution());
 		jobRunner.setLogSettings(logger, jobLogFileSize, jobLogNumFiles);
 		jobRunner.addListener(listener);
 
@@ -510,11 +514,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 		synchronized(mainSyncObj) {
 			flowPaused = false;
 			flowCancelled = true;
+			
 			if (watcher != null) {
 				watcher.stopWatcher();
 			}
 			
-			for (JobRunner runner : runningJob.values()) {
+			for (JobRunner runner : jobRunners.values()) {
 				runner.cancel();
 			}
 			
@@ -669,16 +674,31 @@ public class FlowRunner extends EventHandler implements Runnable {
 					logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
 					
 					if (node.getStatus() == Status.FAILED) {
-						flowFailed = true;
-						
-						ExecutionOptions options = flow.getExecutionOptions();
-						// The KILLED status occurs when cancel is invoked. We want to keep this
-						// status even in failure conditions.
-						if (flow.getStatus() != Status.KILLED) {
-							flow.setStatus(Status.FAILED_FINISHING);
-							if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
-								logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
-								cancel();
+						// Retry failure if conditions are met.
+						if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
+							logger.info("Job " + node.getJobId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
+							node.setDelayedExecution(runner.getRetryBackoff());
+							node.resetForRetry();
+						}
+						else {
+							if (!runner.isCancelled() && runner.getRetries() > 0) {
+					
+								logger.info("Job " + node.getJobId() + " has run out of retry attempts");
+								// Setting delayed execution to 0 in case this is manually re-tried.
+								node.setDelayedExecution(0);
+							}
+
+							flowFailed = true;
+							
+							ExecutionOptions options = flow.getExecutionOptions();
+							// The KILLED status occurs when cancel is invoked. We want to keep this
+							// status even in failure conditions.
+							if (flow.getStatus() != Status.KILLED) {
+								flow.setStatus(Status.FAILED_FINISHING);
+								if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
+									logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
+									cancel();
+								}
 							}
 						}
 					}
@@ -729,6 +749,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 	
 	public int getNumRunningJobs() {
-		return runningJob.size();
+		return jobRunners.size();
 	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 9ceb7c0..2057c97 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -82,6 +82,9 @@ public class JobRunner extends EventHandler implements Runnable {
 
 	private String jobLogChunkSize;
 	private int jobLogBackupIndex;
+
+	private long delayStartMs = 0;
+	private boolean cancelled = false;
 	
 	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
 		this.props = props;
@@ -102,6 +105,10 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.jobLogBackupIndex = numLogBackup;
 	}
 	
+	public Props getProps() {
+		return props;
+	}
+	
 	public void setPipeline(FlowWatcher watcher, int pipelineLevel) {
 		this.watcher = watcher;
 		this.pipelineLevel = pipelineLevel;
@@ -115,6 +122,14 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 	}
 	
+	public void setDelayStart(long delayMS) {
+		delayStartMs = delayMS;
+	}
+	
+	public long getDelayStart() {
+		return delayStartMs;
+	}
+	
 	public ExecutableNode getNode() {
 		return node;
 	}
@@ -206,8 +221,36 @@ public class JobRunner extends EventHandler implements Runnable {
 						logger.info("Pipelined job " + bStatus.getJobId() + " finished.");
 					}
 				}
+				if (watcher.isWatchCancelled()) {
+					logger.info("Job was cancelled while waiting on pipeline. Quiting.");
+					node.setStartTime(System.currentTimeMillis());
+					node.setEndTime(System.currentTimeMillis());
+					fireEvent(Event.create(this, Type.JOB_FINISHED));
+					return;
+				}
 			}
-
+			
+			long currentTime = System.currentTimeMillis();
+			if (delayStartMs > 0) {
+				logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
+				synchronized(this) {
+					try {
+						this.wait(delayStartMs);
+						logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
+					} catch (InterruptedException e) {
+						logger.error("Job " + node.getJobId() + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
+					}
+				}
+				
+				if (cancelled) {
+					logger.info("Job was cancelled while in delay. Quiting.");
+					node.setStartTime(System.currentTimeMillis());
+					node.setEndTime(System.currentTimeMillis());
+					fireEvent(Event.create(this, Type.JOB_FINISHED));
+					return;
+				}
+			}
+			
 			node.setStartTime(System.currentTimeMillis());
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			try {
@@ -322,6 +365,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			job.run();
 		} catch (Exception e) {
 			e.printStackTrace();
+
 			node.setStatus(Status.FAILED);
 			logError("Job run failed!");
 			logError(e.getMessage() + e.getCause());
@@ -334,15 +378,20 @@ public class JobRunner extends EventHandler implements Runnable {
 			node.setOutputProps(outputProps);
 		}
 	}
-
+	
 	public void cancel() {
 		synchronized (syncObject) {
 			logError("Cancel has been called.");
+			this.cancelled = true;
 			
 			// Cancel code here
 			if (job == null) {
 				node.setStatus(Status.FAILED);
 				logError("Job hasn't started yet.");
+				// Just in case we're waiting on the delay
+				synchronized(this) {
+					this.notify();
+				}
 				return;
 			}
 	
@@ -355,6 +404,10 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 	}
 	
+	public boolean isCancelled() {
+		return cancelled;
+	}
+	
 	public Status getStatus() {
 		return node.getStatus();
 	}
@@ -379,6 +432,14 @@ public class JobRunner extends EventHandler implements Runnable {
 		return logFile;
 	}
 	
+	public int getRetries() {
+		return props.getInt("retries", 0);
+	}
+	
+	public long getRetryBackoff() {
+		return props.getLong("retry.backoff", 0);
+	}
+	
 	public static String createLogFileName(int executionId, String jobId, int attempt) {
 		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
 	}
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 35483c8..d15a963 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -40,9 +40,12 @@ public class ExecutableNode {
 	private int level = 0;
 	private ExecutableFlow flow;
 	private Props outputProps;
+	private Props resolvedProps;
 	private int attempt = 0;
 	private boolean paused = false;
 	
+	private long delayExecution = 0;
+
 	private Set<String> inNodes = new HashSet<String>();
 	private Set<String> outNodes = new HashSet<String>();
 	
@@ -124,7 +127,15 @@ public class ExecutableNode {
 	public void setStatus(Status status) {
 		this.status = status;
 	}
-
+	
+	public long getDelayedExecution() {
+		return delayExecution;
+	}
+	
+	public void setDelayedExecution(long delayMs) {
+		delayExecution = delayMs;
+	}
+	
 	public Object toObject() {
 		HashMap<String, Object> objMap = new HashMap<String, Object>();
 		objMap.put("id", jobId);
diff --git a/unit/executions/exectest1/exec4-retry.flow b/unit/executions/exectest1/exec4-retry.flow
new file mode 100644
index 0000000..f18a53c
--- /dev/null
+++ b/unit/executions/exectest1/exec4-retry.flow
@@ -0,0 +1,54 @@
+{
+  "project.id":1,
+  "version":2,
+  "id" : "derived-member-data",
+  "success.email" : [],
+  "edges" : [ {
+    "source" : "job-retry",
+    "target" : "job-pass"
+  },{
+    "source" : "job-pass",
+    "target" : "job-retry-fail"
+  }
+   ],
+  "failure.email" : [],
+  "nodes" : [ {
+    "propSource" : "prop2.properties",
+    "id" : "job-retry",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job-retry.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job-pass",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job-pass.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job-retry-fail",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job-retry-fail.job",
+    "expectedRuntime" : 1
+  }
+   ],
+  "layedout" : false,
+  "type" : "flow",
+  "props" : [ {
+    "inherits" : "prop1.properties",
+    "source" : "prop2.properties"
+  },{
+    "source" : "prop1.properties"
+  }]
+}
\ No newline at end of file
diff --git a/unit/executions/exectest1/job-pass.job b/unit/executions/exectest1/job-pass.job
new file mode 100644
index 0000000..0a60dc4
--- /dev/null
+++ b/unit/executions/exectest1/job-pass.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/exectest1/job-retry.job b/unit/executions/exectest1/job-retry.job
new file mode 100644
index 0000000..94cd0fa
--- /dev/null
+++ b/unit/executions/exectest1/job-retry.job
@@ -0,0 +1,8 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=true
+passRetry=2
+retries=3
+retry.backoff=1000
+
diff --git a/unit/executions/exectest1/job-retry-fail.job b/unit/executions/exectest1/job-retry-fail.job
new file mode 100644
index 0000000..bd51b47
--- /dev/null
+++ b/unit/executions/exectest1/job-retry-fail.job
@@ -0,0 +1,8 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=true
+passRetry=3
+retries=2
+retry.backoff=2000
+
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 9cdca2f..da1ed27 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -303,6 +303,26 @@ public class FlowRunnerTest {
 		}
 	}
 	
+	@Test
+	public void execRetries() throws Exception {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
+		FlowRunner runner = createFlowRunner(loader, eventCollector, "exec4-retry");
+		
+		runner.run();
+		
+		ExecutableFlow exFlow = runner.getExecutableFlow();
+		testStatus(exFlow, "job-retry", Status.SUCCEEDED);
+		testStatus(exFlow, "job-pass", Status.SUCCEEDED);
+		testStatus(exFlow, "job-retry-fail", Status.FAILED);
+		testAttempts(exFlow,"job-retry", 3);
+		testAttempts(exFlow, "job-pass", 0);
+		testAttempts(exFlow, "job-retry-fail", 2);
+		
+		Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.FAILED);
+	}
+	
 	private void testStatus(ExecutableFlow flow, String name, Status status) {
 		ExecutableNode node = flow.getExecutableNode(name);
 		
@@ -311,6 +331,14 @@ public class FlowRunnerTest {
 		}
 	}
 	
+	private void testAttempts(ExecutableFlow flow, String name, int attempt) {
+		ExecutableNode node = flow.getExecutableNode(name);
+		
+		if (node.getAttempt() != attempt) {
+			Assert.fail("Expected " + attempt + " got " + node.getAttempt() + " attempts " + name );
+		}
+	}
+	
 	private ExecutableFlow prepareExecDir(File execDir, String flowName, int execId) throws IOException {
 		FileUtils.copyDirectory(execDir, workingDir);
 		
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 1bafbbb..1f8edc2 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -109,7 +109,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
-		
+		Assert.assertTrue(!runner.isCancelled());
 		Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
 		
 		try {
@@ -181,7 +181,7 @@ public class JobRunnerTest {
 		Props outputProps = runner.getOutputProps();
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(runner.getLogFilePath() == null);
-		
+		Assert.assertTrue(!runner.isCancelled());
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FINISHED});
 		}
@@ -231,7 +231,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
-		
+		Assert.assertTrue(runner.isCancelled());
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
 		}
@@ -242,6 +242,98 @@ public class JobRunnerTest {
 		}
 	}
 	
+	@Test
+	public void testDelayedExecutionJob() {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		JobRunner runner = createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+		runner.setDelayStart(5000);
+		long startTime = System.currentTimeMillis();
+		ExecutableNode node = runner.getNode();
+		
+		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+		Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+		
+		runner.run();
+		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+		
+		Assert.assertTrue(runner.getStatus() == node.getStatus());
+		Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.SUCCEEDED);
+		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+		Assert.assertTrue( node.getEndTime() - node.getStartTime() > 1000);
+		Assert.assertTrue(node.getStartTime() - startTime >= 5000);
+		
+		File logFile = new File(runner.getLogFilePath());
+		Props outputProps = runner.getOutputProps();
+		Assert.assertTrue(outputProps != null);
+		Assert.assertTrue(logFile.exists());
+		Assert.assertFalse(runner.isCancelled());
+		Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+		
+		Assert.assertTrue(eventCollector.checkOrdering());
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
+		}
+		catch (Exception e) {
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDelayedExecutionCancelledJob() {
+		MockExecutorLoader loader = new MockExecutorLoader();
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		JobRunner runner = createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+		runner.setDelayStart(5000);
+		long startTime = System.currentTimeMillis();
+		ExecutableNode node = runner.getNode();
+		
+		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+		Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+		
+		Thread thread = new Thread(runner);
+		thread.start();
+		
+		synchronized(this) {
+			try {
+				wait(2000);
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			runner.cancel();
+			try {
+				wait(500);
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+
+		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED));
+		
+		Assert.assertTrue(runner.getStatus() == node.getStatus());
+		Assert.assertTrue("Node status is " + node.getStatus(), node.getStatus() == Status.FAILED);
+		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+		Assert.assertTrue( node.getEndTime() - node.getStartTime() < 1000);
+		Assert.assertTrue(node.getStartTime() - startTime >= 2000);
+		Assert.assertTrue(node.getStartTime() - startTime <= 5000);
+		Assert.assertTrue(runner.isCancelled());
+		
+		File logFile = new File(runner.getLogFilePath());
+		Props outputProps = runner.getOutputProps();
+		Assert.assertTrue(outputProps == null);
+		Assert.assertTrue(logFile.exists());
+		
+		Assert.assertTrue(eventCollector.checkOrdering());
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.JOB_FINISHED});
+		}
+		catch (Exception e) {
+			Assert.fail(e.getMessage());
+		}
+	}
+	
 	private Props createProps( int sleepSec, boolean fail) {
 		Props props = new Props();
 		props.put("type", "java");
diff --git a/unit/java/azkaban/test/executor/SleepJavaJob.java b/unit/java/azkaban/test/executor/SleepJavaJob.java
index b75f7c6..0c7d23c 100644
--- a/unit/java/azkaban/test/executor/SleepJavaJob.java
+++ b/unit/java/azkaban/test/executor/SleepJavaJob.java
@@ -1,7 +1,5 @@
 package azkaban.test.executor;
 
-import java.io.File;
-import java.io.FileFilter;
 import java.util.Map;
 
 public class SleepJavaJob {