azkaban-memoizeit

Running inner flow and recovery.

9/17/2013 6:50:26 PM

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 0259777..648acef 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -1,8 +1,24 @@
+/*
+ * Copyright 2013 LinkedIn Corp
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 package azkaban.execapp;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,7 +70,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	private int execId;
 	private File execDir;
-	private ExecutableFlow flow;
+	private final ExecutableFlow flow;
 	private Thread flowRunnerThread;
 	private int numJobThreads = 10;
 	private ExecutionOptions.FailureAction failureAction;
@@ -64,15 +80,14 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	// Properties map
 	private Map<String, Props> sharedProps = new HashMap<String, Props>();
-	private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
+//	private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
 	
 	private Props globalProps;
 //	private Props commonProps;
 	private final JobTypeManager jobtypeManager;
 	
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
-	private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
-	private Map<String, JobRunner> activeJobRunners = new ConcurrentHashMap<String, JobRunner>();
+	private Set<JobRunner> activeJobRunners = Collections.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
 	
 	// Used for pipelining
 	private Integer pipelineLevel = null;
@@ -336,7 +351,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		if (flowCancelled) {
 			try {
 				logger.info("Flow was force cancelled cleaning up.");
-				for(JobRunner activeRunner : activeJobRunners.values()) {
+				for(JobRunner activeRunner : activeJobRunners) {
 					activeRunner.cancel();
 				}
 				
@@ -352,19 +367,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		executorService.shutdown();
 		
 		synchronized(mainSyncObj) {
-			switch(flow.getStatus()) {
-			case FAILED_FINISHING:
-				logger.info("Setting flow status to Failed.");
-				flow.setStatus(Status.FAILED);
-			case FAILED:
-			case KILLED:
-			case FAILED_SUCCEEDED:
-				logger.info("Flow is set to " + flow.getStatus().toString());
-				break;
-			default:
-				flow.setStatus(Status.SUCCEEDED);
-				logger.info("Flow is set to " + flow.getStatus().toString());
-			}
+			finalizeFlow(flow);
 		}
 	}
 	
@@ -399,6 +402,24 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return false;
 	}
 	
+	private void finalizeFlow(ExecutableFlowBase flow) {
+		String id = flow == this.flow ? "" : flow.getPrintableId() + " ";
+		
+		switch(flow.getStatus()) {
+		case FAILED_FINISHING:
+			logger.info("Setting flow " + id + "status to Failed.");
+			flow.setStatus(Status.FAILED);
+		case FAILED:
+		case KILLED:
+		case FAILED_SUCCEEDED:
+			logger.info("Flow " + id + "is set to " + flow.getStatus().toString());
+			break;
+		default:
+			flow.setStatus(Status.SUCCEEDED);
+			logger.info("Flow " + id + "is set to " + flow.getStatus().toString());
+		}
+	}
+	
 	private void prepareJobProperties(ExecutableNode node) throws IOException {
 		Props props = null;
 		// The following is the hiearchical ordering of dependency resolution
@@ -476,16 +497,15 @@ public class FlowRunner extends EventHandler implements Runnable {
 			node.setStatus(Status.RUNNING);
 			node.setStartTime(System.currentTimeMillis());
 			
-			logger.info("Starting subflow " + node.getId() + ".");
+			logger.info("Starting subflow " + node.getPrintableId() + ".");
 		}
 		else {
 			node.setStatus(Status.QUEUED);
 			JobRunner runner = createJobRunner(node);
-			logger.info("Submitting job " + node.getId() + " to run.");
+			logger.info("Submitting job " + node.getPrintableId() + " to run.");
 			try {
 				executorService.submit(runner);
-				jobRunners.put(node.getId(), runner);
-				activeJobRunners.put(node.getId(), runner);
+				activeJobRunners.add(runner);
 			} catch (RejectedExecutionException e) {
 				logger.error(e);
 			};
@@ -543,7 +563,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		Props previousOutput = null;
 		// Iterate the in nodes again and create the dependencies
 		for (String dependency : node.getInNodes()) {
-			Props output = jobOutputProps.get(dependency);
+			Props output = node.getParentFlow().getExecutableNode(dependency).getOutputProps();
 			if (output != null) {
 				output = Props.clone(output);
 				output.setParent(previousOutput);
@@ -635,7 +655,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			}
 			
 			logger.info("Cancelling " + activeJobRunners.size() + " jobs.");
-			for (JobRunner runner : activeJobRunners.values()) {
+			for (JobRunner runner : activeJobRunners) {
 				runner.cancel();
 			}
 			
@@ -649,60 +669,40 @@ public class FlowRunner extends EventHandler implements Runnable {
 	public void retryFailures(String user) {
 		synchronized(mainSyncObj) {
 			logger.info("Retrying failures invoked by " + user);
-			ArrayList<String> failures = new ArrayList<String>();
-			for (ExecutableNode node: flow.getExecutableNodes()) {
-				if (node.getStatus() == Status.FAILED) {
-					failures.add(node.getId());
-				}
-				else if (node.getStatus() == Status.KILLED) {
-					node.setStartTime(-1);
-					node.setEndTime(-1);
-					node.setStatus(Status.READY);
-				}
-			}
+			retryFailures(flow);
+			
+			flow.setStatus(Status.RUNNING);
+			flow.setUpdateTime(System.currentTimeMillis());
+			flowFailed = false;
 			
-			retryJobs(failures, user);
+			updateFlow();
+			interrupt();
 		}
 	}
 	
-	public void retryJobs(List<String> jobIds, String user) {
-		synchronized(mainSyncObj) {
-			for (String jobId: jobIds) {
-				ExecutableNode node = flow.getExecutableNode(jobId);
-				if (node == null) {
-					logger.error("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot retry.");
-					continue;
-				}
-				
-				if (Status.isStatusFinished(node.getStatus())) {
-					// Resets the status and increments the attempt number
-					node.resetForRetry();
-					reEnableDependents(node);
-					logger.info("Re-enabling job " + node.getId() + " attempt " + node.getAttempt());
-				}
-				else {
-					logger.error("Cannot retry job " + jobId + " since it hasn't run yet. User " + user);
-					continue;
+	private void retryFailures(ExecutableFlowBase flow) {
+		for (ExecutableNode node: flow.getExecutableNodes()) {
+			if (node instanceof ExecutableFlowBase) {
+				if (node.getStatus() == Status.FAILED || node.getStatus() == Status.FAILED_FINISHING || node.getStatus() == Status.KILLED) {
+					retryFailures((ExecutableFlowBase)node);
 				}
 			}
 			
-			boolean isFailureFound = false;
-			for (ExecutableNode node: flow.getExecutableNodes()) {
-				Status nodeStatus = node.getStatus();
-				if (nodeStatus == Status.FAILED || nodeStatus == Status.KILLED) {
-					isFailureFound = true;
-					break;
-				}
+			if (node.getStatus() == Status.FAILED) {
+				node.resetForRetry();
+				logger.info("Re-enabling job " + node.getPrintableId() + " attempt " + node.getAttempt());
+				reEnableDependents(node);
 			}
-			
-			if (!isFailureFound) {
-				flow.setStatus(Status.RUNNING);
-				flow.setUpdateTime(System.currentTimeMillis());
-				flowFailed = false;
+			else if (node.getStatus() == Status.KILLED) {
+				node.setStartTime(-1);
+				node.setEndTime(-1);
+				node.setStatus(Status.READY);
+			}
+			else if (node.getStatus() == Status.FAILED_FINISHING) {
+				node.setStartTime(-1);
+				node.setEndTime(-1);
+				node.setStatus(Status.READY);
 			}
-			
-			updateFlow();
-			interrupt();
 		}
 	}
 	
@@ -743,24 +743,22 @@ public class FlowRunner extends EventHandler implements Runnable {
 					ExecutableNode node = runner.getNode();
 					activeJobRunners.remove(node.getId());
 					
-					logger.info("Job Finished " + node.getId() + " with status " + node.getStatus());
-					if (runner.getOutputProps() != null) {
-						logger.info("Job " + node.getId() + " had output props.");
-						jobOutputProps.put(node.getId(), runner.getOutputProps());
+					String id = node.getPrintableId(":");
+					logger.info("Job Finished " + id + " with status " + node.getStatus());
+					if (node.getOutputProps() != null) {
+						logger.info("Job " + id + " had output props.");
 					}
-					
-					updateFlow();
-					
+
 					if (node.getStatus() == Status.FAILED) {
 						// Retry failure if conditions are met.
 						if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
-							logger.info("Job " + node.getId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
+							logger.info("Job " + id + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
 							node.setDelayedExecution(runner.getRetryBackoff());
 							node.resetForRetry();
 						}
 						else {
 							if (!runner.isCancelled() && runner.getRetries() > 0) {
-								logger.info("Job " + node.getId() + " has run out of retry attempts");
+								logger.info("Job " + id + " has run out of retry attempts");
 								// Setting delayed execution to 0 in case this is manually re-tried.
 								node.setDelayedExecution(0);
 							}
@@ -771,7 +769,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 							// The KILLED status occurs when cancel is invoked. We want to keep this
 							// status even in failure conditions.
 							if (flow.getStatus() != Status.KILLED && flow.getStatus() != Status.FAILED) {
-								flow.setStatus(Status.FAILED_FINISHING);
+								propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
+
 								if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
 									logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
 									cancel();
@@ -779,15 +778,50 @@ public class FlowRunner extends EventHandler implements Runnable {
 							}
 						}
 					}
-					
+					finalizeFlowIfFinished(node.getParentFlow());
+					updateFlow();
 					interrupt();
 	
 					fireEventListeners(event);
 				}
 			}
 		}
+		
+		private void propagateStatus(ExecutableFlowBase base, Status status) {
+			base.setStatus(status);
+			if (base.getParentFlow() != null) {
+				propagateStatus(base.getParentFlow(), status);
+			}
+		}
+
+		private void finalizeFlowIfFinished(ExecutableFlowBase base) {
+			// We let main thread finalize the main flow. 
+			if (base == flow) {
+				return;
+			}
+			
+			if (base.isFlowFinished()) {
+				Props previousOutput = null;
+				for(String end: base.getEndNodes()) {
+					ExecutableNode node = base.getExecutableNode(end);
+		
+					Props output = node.getOutputProps();
+					if (output != null) {
+						output = Props.clone(output);
+						output.setParent(previousOutput);
+						previousOutput = output;
+					}
+				}
+				base.setOutputProps(previousOutput);
+				finalizeFlow(base);
+				
+				if (base.getParentFlow() != null) {
+					finalizeFlowIfFinished(base.getParentFlow());
+				}
+			}
+		}
 	}
-	
+
 	public boolean isCancelled() {
 		return flowCancelled;
 	}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index d707c1f..e56be56 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -484,16 +483,6 @@ public class FlowRunnerManager implements EventListener {
 		runner.retryFailures(user);
 	}
 	
-	public void retryJobs(int execId, String user, List<String> jobId) throws ExecutorManagerException {
-		FlowRunner runner = runningFlows.get(execId);
-		
-		if (runner == null) {
-			throw new ExecutorManagerException("Execution " + execId + " is not running.");
-		}
-		
-		runner.retryJobs(jobId, user);
-	}
-	
 	public ExecutableFlow getExecutableFlow(int execId) {
 		FlowRunner runner = runningFlows.get(execId);
 		if (runner == null) {
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index f77f1b2..bc24043 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -54,7 +54,6 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private ExecutorLoader loader;
 	private Props props;
-	private Props outputProps;
 	private ExecutableNode node;
 	private File workingDir;
 
@@ -151,7 +150,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			// Create file appender
 			String id = this.jobId;
 			if (node.getExecutableFlow() != node.getParentFlow()) {
-				id = node.getParentFlow().getNestedId("._.");
+				id = node.getPrintableId("._.");
 			}
 			
 			String logName = createLogFileName(this.executionId, id, node.getAttempt());
@@ -403,8 +402,7 @@ public class JobRunner extends EventHandler implements Runnable {
 
 		node.setStatus(Status.SUCCEEDED);
 		if (job != null) {
-			outputProps = job.getJobGeneratedProperties();
-			node.setOutputProps(outputProps);
+			node.setOutputProps(job.getJobGeneratedProperties());
 		}
 	}
 	
@@ -445,10 +443,6 @@ public class JobRunner extends EventHandler implements Runnable {
 		return node.getStatus();
 	}
 
-	public Props getOutputProps() {
-		return outputProps;
-	}
-
 	private void logError(String message) {
 		if (logger != null) {
 			logger.error(message);
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 10db20f..34bff5d 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -78,14 +78,6 @@ public class ExecutableFlowBase extends ExecutableNode {
 		return flowId;
 	}
 	
-	public String getNestedId(String delimiter) {
-		if (this.getParentFlow() != null) {
-			return this.getParentFlow().getNestedId(delimiter) + delimiter + getId();
-		}
-		
-		return getId();
-	}
-	
 	protected void setFlow(Project project, Flow flow) {
 		this.flowId = flow.getId();
 		
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 6f41c56..e0f6522 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -241,6 +241,17 @@ public class ExecutableNode {
 		return array;
 	}
 	
+	public String getPrintableId() {
+		return getPrintableId(":");
+	}
+	
+	public String getPrintableId(String delimiter) {
+		if (this.getParentFlow() instanceof ExecutableFlow) {
+			return getId();
+		}
+		return getParentFlow().getPrintableId(delimiter) + delimiter + getId();
+	}
+	
 	public Map<String,Object> toObject() {
 		Map<String,Object> mapObj = new HashMap<String,Object>();
 		fillMapFromExecutable(mapObj);
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 9ef6722..d354e65 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -379,7 +379,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 		
 		// if the main flow is not the parent, then we'll create a composite key for flowID
 		if (flow != node.getParentFlow()) {
-			flowId = node.getParentFlow().getNestedId("+");
+			flowId = flow.getId() + "+" + node.getParentFlow().getPrintableId("+");
 		}
 		
 		QueryRunner runner = createQueryRunner();
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index e0d5e85..4cf171a 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -486,6 +486,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	 * @param exFlow
 	 * @throws ServletException
 	 */
+	@SuppressWarnings("unused")
 	private void ajaxFetchJobMetaData(HttpServletRequest req,
 			HttpServletResponse resp, HashMap<String, Object> ret, User user,
 			ExecutableFlow exFlow) throws ServletException {
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 0698483..8ee41e7 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -74,7 +74,7 @@ public class JobRunnerTest {
 		Assert.assertTrue( node.getEndTime() - node.getStartTime() > 1000);
 		
 		File logFile = new File(runner.getLogFilePath());
-		Props outputProps = runner.getOutputProps();
+		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps != null);
 		Assert.assertTrue(logFile.exists());
 		
@@ -105,7 +105,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(node.getEndTime() - node.getStartTime() > 1000);
 		
 		File logFile = new File(runner.getLogFilePath());
-		Props outputProps = runner.getOutputProps();
+		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
@@ -140,7 +140,7 @@ public class JobRunnerTest {
 		Assert.assertTrue( node.getEndTime() - node.getStartTime() < 10);
 		
 		// Log file and output files should not exist.
-		Props outputProps = runner.getOutputProps();
+		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(runner.getLogFilePath() == null);
 		Assert.assertTrue(eventCollector.checkOrdering());
@@ -178,7 +178,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
 		
 		// Log file and output files should not exist.
-		Props outputProps = runner.getOutputProps();
+		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(runner.getLogFilePath() == null);
 		Assert.assertTrue(!runner.isCancelled());
@@ -227,7 +227,7 @@ public class JobRunnerTest {
 		
 		// Log file and output files should not exist.
 		File logFile = new File(runner.getLogFilePath());
-		Props outputProps = runner.getOutputProps();
+		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
@@ -264,7 +264,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(node.getStartTime() - startTime >= 5000);
 		
 		File logFile = new File(runner.getLogFilePath());
-		Props outputProps = runner.getOutputProps();
+		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps != null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertFalse(runner.isCancelled());
@@ -321,7 +321,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(runner.isCancelled());
 		
 		File logFile = new File(runner.getLogFilePath());
-		Props outputProps = runner.getOutputProps();
+		Props outputProps = runner.getNode().getOutputProps();
 		Assert.assertTrue(outputProps == null);
 		Assert.assertTrue(logFile.exists());
 		
@@ -354,9 +354,10 @@ public class JobRunnerTest {
 		node.setParentFlow(flow);
 		
 		Props props = createProps(time, fail);
+		node.setInputProps(props);
 		HashSet<String> proxyUsers = new HashSet<String>();
 		proxyUsers.add(flow.getSubmitUser());
-		JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager);
+		JobRunner runner = new JobRunner(node, workingDir, loader, jobtypeManager);
 		runner.setLogSettings(logger, "5MB", 4);
 
 		runner.addListener(listener);
diff --git a/unit/java/azkaban/test/executor/ExecutableFlowTest.java b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
index 7a2f55d..6b6ed94 100644
--- a/unit/java/azkaban/test/executor/ExecutableFlowTest.java
+++ b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
@@ -1,10 +1,8 @@
 package azkaban.test.executor;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -20,16 +18,13 @@ import org.junit.Test;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionAttempt;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
-import azkaban.flow.FlowProps;
 import azkaban.project.Project;
 import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.JSONUtils;
-import azkaban.utils.Props;
 
 public class ExecutableFlowTest {
 	private Project project;
@@ -136,6 +131,7 @@ public class ExecutableFlowTest {
 		testEquals(exFlow, parsedExFlow);
 	}
 	
+	@SuppressWarnings("rawtypes")
 	@Test
 	public void testExecutorFlowUpdates() throws Exception {
 		Flow flow = project.getFlow("jobe");