azkaban-uncached

Refactoring of the FlowRunner to remove the execution queue

4/4/2013 8:35:03 PM

Details

diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 1475a9a..00f19a2 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -58,7 +58,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 	@Override
 	public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		HashMap<String,Object> respMap= new HashMap<String,Object>();
-		logger.info("ExecutorServer called by " + req.getRemoteAddr());
+		//logger.info("ExecutorServer called by " + req.getRemoteAddr());
 		try {
 			if (!hasParam(req, ACTION_PARAM)) {
 				logger.error("Parameter action not set");
@@ -67,7 +67,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			else {
 				String action = getParam(req, ACTION_PARAM);
 				if (action.equals(UPDATE_ACTION)) {
-					logger.info("Updated called");
+					//logger.info("Updated called");
 					handleAjaxUpdateRequest(req, respMap);
 				}
 				else if (action.equals(PING_ACTION)) {
@@ -122,27 +122,28 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			respMap.put(RESPONSE_ERROR, "Modification type not set.");
 		}
 		String modificationType = getParam(req, MODIFY_EXECUTION_ACTION_TYPE);
-		String modifiedJobList = getParam(req, MODIFY_JOBS_LIST);
-		String[] jobIds = modifiedJobList.split("\\s*,\\s*");
+
 		
 		try {
-			if (MODIFY_RETRY_JOBS.equals(modificationType)) {
-				flowRunnerManager.retryJobs(execId, user, jobIds);
-			}
-			else if (MODIFY_CANCEL_JOBS.equals(modificationType)) {
-				
-			}
-			else if (MODIFY_DISABLE_JOBS.equals(modificationType)) {
-				
+			if (MODIFY_RETRY_FAILURES.equals(modificationType)) {
+				flowRunnerManager.retryFailures(execId, user);
 			}
-			else if (MODIFY_ENABLE_JOBS.equals(modificationType)) {
-				
-			}
-			else if (MODIFY_PAUSE_JOBS.equals(modificationType)) {
-				
-			}
-			else if (MODIFY_RESUME_JOBS.equals(modificationType)) {
-				
+			else {
+//				String modifiedJobList = getParam(req, MODIFY_JOBS_LIST);
+//				String[] jobIds = modifiedJobList.split("\\s*,\\s*");
+//				
+//				if (MODIFY_RETRY_JOBS.equals(modificationType)) {
+//				}
+//				else if (MODIFY_CANCEL_JOBS.equals(modificationType)) {
+//				}
+//				else if (MODIFY_DISABLE_JOBS.equals(modificationType)) {
+//				}
+//				else if (MODIFY_ENABLE_JOBS.equals(modificationType)) {
+//				}
+//				else if (MODIFY_PAUSE_JOBS.equals(modificationType)) {
+//				}
+//				else if (MODIFY_RESUME_JOBS.equals(modificationType)) {
+//				}
 			}
 		} catch (ExecutorManagerException e) {
 			logger.error(e);
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 0fbe48d..b58ae29 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -3,18 +3,14 @@ package azkaban.execapp;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
@@ -38,29 +34,32 @@ import azkaban.flow.FlowProps;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
-import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 
 public class FlowRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-	private int execId;
-
-	private File execDir;
-
-	private ExecutorService executorService;
-	private ExecutorLoader executorLoader;
-	private ProjectLoader projectLoader;
-
-	private ExecutableFlow flow;
-	private Thread currentThread;
-	private int numThreads = 10;
+	// We check update every 5 minutes, just in case things get stuck. But for the most part, we'll be idling.
+	private static final long CHECK_WAIT_MS = 5*60*60*1000;
 	
 	private Logger logger;
 	private Layout loggerLayout = DEFAULT_LAYOUT;
 	private Appender flowAppender;
 	private File logFile;
 	
+	private ExecutorService executorService;
+	private ExecutorLoader executorLoader;
+	private ProjectLoader projectLoader;
+	
+	private int execId;
+	private File execDir;
+	private ExecutableFlow flow;
+	private Thread flowRunnerThread;
+	private int numJobThreads = 10;
+	
+	// Sync object for queuing
+	private Object mainSyncObj = new Object();
+	
 	// Properties map
 	private Map<String, Props> sharedProps = new HashMap<String, Props>();
 	private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
@@ -69,21 +68,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private final JobTypeManager jobtypeManager;
 	
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
-	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
 	private Map<String, JobRunner> runningJob = new ConcurrentHashMap<String, JobRunner>();
 	
-	private Map<Pair<String, Integer>, JobRunner> allJobs = new ConcurrentHashMap<Pair<String, Integer>, JobRunner>();
-	private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
-	
-	// Used for individual job pausing
-	private Map<String, ExecutableNode> pausedNode = new ConcurrentHashMap<String, ExecutableNode>();
-	
-	private Object actionSyncObj = new Object();
-	private boolean flowPaused = false;
-	private boolean flowFailed = false;
-	private boolean flowFinished = false;
-	private boolean flowCancelled = false;
-	
 	// Used for pipelining
 	private Integer pipelineLevel = null;
 	private Integer pipelineExecId = null;
@@ -97,12 +83,17 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private String jobLogFileSize = "5MB";
 	private int jobLogNumFiles = 4;
 	
+	private boolean flowPaused = false;
+	private boolean flowFailed = false;
+	private boolean flowFinished = false;
+	private boolean flowCancelled = false;
+	
 	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
 		this.execId = flow.getExecutionId();
 		this.flow = flow;
 		this.executorLoader = executorLoader;
 		this.projectLoader = projectLoader;
-		this.executorService = Executors.newFixedThreadPool(numThreads);
+		this.executorService = Executors.newFixedThreadPool(numJobThreads);
 		this.execDir = new File(flow.getExecutionPath());
 		this.jobtypeManager = jobtypeManager;
 
@@ -139,40 +130,17 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return execDir;
 	}
 	
-	@Override
 	public void run() {
 		try {
-			int projectId = flow.getProjectId();
-			int version = flow.getVersion();
-			String flowId = flow.getFlowId();
-			
-			// Add a bunch of common azkaban properties
-			PropsUtils.produceParentProperties(flow);
-			
-			// Create execution dir
-			createLogger(flowId);
-			logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
-			if (pipelineExecId != null) {
-				logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
-			}
-			
-			// The current thread is used for interrupting blocks
-			currentThread = Thread.currentThread();
-			currentThread.setName("FlowRunner-exec-" + flow.getExecutionId());
-
+			setupFlowExecution();
 			flow.setStartTime(System.currentTimeMillis());
 			
-			logger.info("Creating active reference");
-			if (!executorLoader.updateExecutableReference(execId, System.currentTimeMillis())) {
-				throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
-			}
+			updateFlowReference();
+			
 			logger.info("Updating initial flow directory.");
 			updateFlow();
-
 			logger.info("Fetching job and shared properties.");
 			loadAllProperties();
-			logger.info("Queuing initial jobs.");
-			queueStartingJobs();
 
 			this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
 			runFlow();
@@ -194,6 +162,34 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 	
+	private void setupFlowExecution() {
+		int projectId = flow.getProjectId();
+		int version = flow.getVersion();
+		String flowId = flow.getFlowId();
+		
+		// Add a bunch of common azkaban properties
+		PropsUtils.addCommonFlowProperties(flow);
+		
+		// Create execution dir
+		createLogger(flowId);
+		logger.info("Running execid:" + execId + " flow:" + flowId + " project:" + projectId + " version:" + version);
+		if (pipelineExecId != null) {
+			logger.info("Running simulateously with " + pipelineExecId + ". Pipelining level " + pipelineLevel);
+		}
+		
+		// The current thread is used for interrupting blocks
+		flowRunnerThread = Thread.currentThread();
+		flowRunnerThread.setName("FlowRunner-exec-" + flow.getExecutionId());
+
+	}
+	
+	private void updateFlowReference() throws ExecutorManagerException {
+		logger.info("Update active reference");
+		if (!executorLoader.updateExecutableReference(execId, System.currentTimeMillis())) {
+			throw new ExecutorManagerException("The executor reference doesn't exist. May have been killed prematurely.");
+		}
+	}
+	
 	private void updateFlow() {
 		updateFlow(System.currentTimeMillis());
 	}
@@ -276,47 +272,69 @@ public class FlowRunner extends EventHandler implements Runnable {
 		logger.info("Starting flows");
 		flow.setStatus(Status.RUNNING);
 		updateFlow();
+		
 		while (!flowFinished) {
-			JobRunner runner = null;
-			try {
-				runner = jobsToRun.poll(5, TimeUnit.MINUTES);
-			} catch (InterruptedException e) {
-				logger.info("FlowRunner thread has been interrupted.");
-				continue;
-			}
-			
-			if(runner == null) continue;
-			
-			try {
-				synchronized(actionSyncObj) {
-					ExecutableNode node = runner.getNode();
-					if (flowPaused) {
-						logger.info("Job Paused " + node.getJobId());
-						node.setStatus(Status.PAUSED);
-						pausedJobsToRun.add(runner);
+			synchronized(mainSyncObj) {
+				if (flowPaused) {
+					try {
+						mainSyncObj.wait(CHECK_WAIT_MS);
+					} catch (InterruptedException e) {
+					}
+
+					continue;
+				}
+				else {
+					List<ExecutableNode> jobsReadyToRun = findReadyJobsToRun();
+					
+					if (!jobsReadyToRun.isEmpty()) {
+						for (ExecutableNode node : jobsReadyToRun) {
+							long currentTime = System.currentTimeMillis();
+							
+							// Queue a job only if it's ready to run.
+							if (node.getStatus() == Status.READY) {
+								// Collect output props from the job's dependencies.
+								Props outputProps = collectOutputProps(node);
+								node.setStatus(Status.QUEUED);
+								JobRunner runner = createJobRunner(node, outputProps);
+								try {
+									executorService.submit(runner);
+									runningJob.put(node.getJobId(), runner);
+								} catch (RejectedExecutionException e) {
+									logger.error(e);
+								};
+								
+							} // If killed, then auto complete and KILL
+							else if (node.getStatus() == Status.KILLED) {
+								node.setStartTime(currentTime);
+								node.setEndTime(currentTime);
+							} // If disabled, then we auto skip
+							else if (node.getStatus() == Status.DISABLED) {
+								node.setStartTime(currentTime);
+								node.setEndTime(currentTime);
+								node.setStatus(Status.SKIPPED);
+							}
+						}
+						
+						updateFlow();
 					}
 					else {
-						runningJob.put(node.getJobId(), runner);
-						allJobs.put(new Pair<String, Integer>(node.getJobId(), node.getAttempt()), runner);
-						executorService.submit(runner);
-						logger.info("Job Started " + node.getJobId());
+						if (isFlowFinished()) {
+							flowFinished = true;
+							break;
+						}
+					
+						try {
+							mainSyncObj.wait(CHECK_WAIT_MS);
+						} catch (InterruptedException e) {
+						}
 					}
 				}
-			} catch (RejectedExecutionException e) {
-				logger.error(e);
 			}
 		}
 		
 		logger.info("Finishing up flow. Awaiting Termination");
 		executorService.shutdown();
 		
-		while (!executorService.isTerminated()) {
-			try {
-				executorService.awaitTermination(1, TimeUnit.SECONDS);
-			} catch (InterruptedException e) {
-			}
-		};
-		
 		switch(flow.getStatus()) {
 		case FAILED_FINISHING:
 			logger.info("Setting flow status to Failed.");
@@ -329,13 +347,50 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 	
-	private void queueStartingJobs() {
-		for (String startNode : flow.getStartNodes()) {
-			ExecutableNode node = flow.getExecutableNode(startNode);
-			JobRunner jobRunner = createJobRunner(node, null);
-			logger.info("Adding initial job " + startNode + " to run queue.");
-			jobsToRun.add(jobRunner);
+	private List<ExecutableNode> findReadyJobsToRun() {
+		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
+		for (ExecutableNode node : flow.getExecutableNodes()) {
+			if(Status.isStatusFinished(node.getStatus())) {
+				continue;
+			}
+			else {
+				// Check the dependencies to see if execution conditions are met,
+				// and what the status should be set to.
+				Status impliedStatus = getImpliedStatus(node);
+				if (getImpliedStatus(node) != null) {
+					node.setStatus(impliedStatus);
+					jobsToRun.add(node);
+				}
+			}
+		}
+		
+		return jobsToRun;
+	}
+
+	private boolean isFlowFinished() {
+		for (String end: flow.getEndNodes()) {
+			ExecutableNode node = flow.getExecutableNode(end);
+			if (!Status.isStatusFinished(node.getStatus())) {
+				return false;
+			}
+		}
+		
+		return true;
+	}
+	
+	private Props collectOutputProps(ExecutableNode node) {
+		Props previousOutput = null;
+		// Iterate the in nodes again and create the dependencies
+		for (String dependency : node.getInNodes()) {
+			Props output = jobOutputProps.get(dependency);
+			if (output != null) {
+				output = Props.clone(output);
+				output.setParent(previousOutput);
+				previousOutput = output;
+			}
 		}
+		
+		return previousOutput;
 	}
 	
 	private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) {
@@ -403,64 +458,62 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 	
 	public void pause(String user) {
-		synchronized(actionSyncObj) {
-			if (flow.getStatus() == Status.RUNNING || flow.getStatus() == Status.PREPARING) {
+		synchronized(mainSyncObj) {
+			if (!flowFinished) {
 				logger.info("Flow paused by " + user);
 				flowPaused = true;
 				flow.setStatus(Status.PAUSED);
 				
 				updateFlow();
 			}
+			else {
+				logger.info("Cannot pause finished flow. Called by user " + user);
+			}
 		}
+		
+		interrupt();
 	}
 	
 	public void resume(String user) {
-		synchronized(actionSyncObj) {
+		synchronized(mainSyncObj) {
 			if (!flowPaused) {
 				logger.info("Cannot resume flow that isn't paused");
 			}
 			else {
 				logger.info("Flow resumed by " + user);
 				flowPaused = false;
-				if (!flowCancelled) {
-					flow.setStatus(Status.RUNNING);
+				if (flowFailed) {
+					flow.setStatus(Status.FAILED_FINISHING);
 				}
-
-				for (JobRunner runner: pausedJobsToRun) {
-					ExecutableNode node = runner.getNode();
-					if (flowCancelled) {
-						logger.info("Resumed flow is cancelled. Job killed " + node.getJobId());
-						node.setStatus(Status.KILLED);
-					}
-					else {
-						node.setStatus(Status.QUEUED);
-					}
-					
-					jobsToRun.add(runner);
+				else if (flowCancelled) {
+					flow.setStatus(Status.KILLED);
 				}
+				else {
+					flow.setStatus(Status.RUNNING);
+				}
+				
 				updateFlow();
 			}
 		}
 	}
 	
 	public void cancel(String user) {
-		synchronized(actionSyncObj) {
+		synchronized(mainSyncObj) {
 			logger.info("Flow cancelled by " + user);
+			cancel();
+			updateFlow();
+		}
+		interrupt();
+	}
+	
+	private void cancel() {
+		synchronized(mainSyncObj) {
 			flowPaused = false;
 			flowCancelled = true;
-			
 			if (watcher != null) {
 				watcher.stopWatcher();
 			}
 			
-			for (JobRunner runner: pausedJobsToRun) {
-				ExecutableNode node = runner.getNode();
-				logger.info("Resumed flow is cancelled. Job killed " + node.getJobId() + " by " + user);
-				node.setStatus(Status.KILLED);
-
-				jobsToRun.add(runner);
-			}
-			
 			for (JobRunner runner : runningJob.values()) {
 				runner.cancel();
 			}
@@ -468,165 +521,47 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (flow.getStatus() != Status.FAILED && flow.getStatus() != Status.FAILED_FINISHING) {
 				flow.setStatus(Status.KILLED);
 			}
-
-			for (ExecutableNode node: pausedNode.values()) {
-				node.setStatus(Status.KILLED);
-				node.setPaused(false);
-				queueNextJob(node, "cancel-all-action");
-			}
-			
-			updateFlow();
-			interrupt();
 		}
 	}
 	
-	public void cancelJob(String jobId, String user)  throws ExecutorManagerException {
-		synchronized(actionSyncObj) {
-			logger.info("Cancel of job " + jobId + " called by user " + user);
-			JobRunner runner = runningJob.get(jobId);
-			ExecutableNode node = flow.getExecutableNode(jobId);
-			if (runner != null) {
-				runner.cancel();
-			}
-			else {
-				Status status = node.getStatus();
-				if(status == Status.FAILED || status == Status.SUCCEEDED || status == Status.SKIPPED) {
-					throw new ExecutorManagerException("Can't cancel finished job " + jobId + " with status " + status);
-				}
-				
-				node.setStatus(Status.KILLED);
-				if (node.isPaused()) {
-					node.setPaused(false);
-					queueNextJob(node, "cancel-action");
-				}
-			}
-		}
-	}
-	
-	public void resumeJob(String jobId, String user) throws ExecutorManagerException {
-		synchronized(actionSyncObj) {
-			if (runningJob.containsKey(jobId)) {
-				throw new ExecutorManagerException("Resume of job " + jobId + " failed since it's already running. User " + user);
-			}
-			else {
-				logger.info("Resume of job " + jobId + " requested by " + user);
-				ExecutableNode node = flow.getExecutableNode(jobId);
-				if (node == null) {
-					throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot pause.");
+	public void retryFailures(String user) {
+		synchronized(mainSyncObj) {
+			logger.info("Retrying failures invoked by " + user);
+			ArrayList<String> failures = new ArrayList<String>();
+			for (ExecutableNode node: flow.getExecutableNodes()) {
+				if (node.getStatus() == Status.FAILED) {
+					failures.add(node.getJobId());
 				}
-			
-				if (node.isPaused()) {
-					node.setPaused(false);
-					if (pausedNode.containsKey(jobId)) {
-						queueNextJob(node, "resume-action");
-					}
-					
-					updateFlow();
+				else if (node.getStatus() == Status.KILLED) {
+					node.setStartTime(-1);
+					node.setEndTime(-1);
+					node.setStatus(Status.READY);
 				}
 			}
-		}
-	}
-	
-	public void pauseJob(String jobId, String user) throws ExecutorManagerException {
-		synchronized(actionSyncObj) {
-			if (runningJob.containsKey(jobId)) {
-				throw new ExecutorManagerException("Pause of job " + jobId + " failed since it's already running. User " + user);
-			}
-			else {
-				logger.info("Pause of job " + jobId + " requested by " + user);
-				ExecutableNode node = flow.getExecutableNode(jobId);
-				if (node == null) {
-					throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot pause.");
-				}
 			
-				long startTime = node.getStartTime();
-				if (startTime < 0) {
-					node.setPaused(true);
-					updateFlow();
-				}
-				else {
-					throw new ExecutorManagerException("Cannot pause job " + jobId + " that's started.");	
-				}
-			}
+			retryJobs(failures, user);
 		}
 	}
 	
-	public void disableJob(String jobId, String user) throws ExecutorManagerException {
-		// Disable and then check to see if it's set.
-		synchronized(actionSyncObj) {
-			if (runningJob.containsKey(jobId)) {
-				throw new ExecutorManagerException("Disable of job " + jobId + " failed since it's already running. User " + user);
-			}
-			else {
-				logger.info("Disable of job " + jobId + " requested by " + user);
-				ExecutableNode node = flow.getExecutableNode(jobId);
-				if (node == null) {
-					throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot disable.");
-				}
-			
-				Status status = node.getStatus();
-				if (status == Status.DISABLED || status == Status.READY) {
-					node.setStatus(Status.DISABLED);
-					updateFlow();
-				}
-				else {
-					throw new ExecutorManagerException("Cannot disable job " + jobId + " with status " + status.toString());	
-				}
-			}
-		}
-	}
-	
-	public void enableJob(String jobId, String user) throws ExecutorManagerException {
-		// Disable and then check to see if it's set.
-		synchronized(actionSyncObj) {
-			if (runningJob.containsKey(jobId)) {
-				throw new ExecutorManagerException("Enable of job " + jobId + " failed since it's already running. User " + user);
-			}
-			else {
-				logger.info("Enable of job " + jobId + " requested by " + user);
+	public void retryJobs(List<String> jobIds, String user) {
+		synchronized(mainSyncObj) {
+			for (String jobId: jobIds) {
 				ExecutableNode node = flow.getExecutableNode(jobId);
 				if (node == null) {
-					throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot enable.");
+					logger.error("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot retry.");
+					continue;
 				}
-			
-				Status status = node.getStatus();
-				if (status == Status.DISABLED || status == Status.READY) {
-					node.setStatus(Status.READY);
-					updateFlow();
+				
+				if (Status.isStatusFinished(node.getStatus())) {
+					// Resets the status and increments the attempt number
+					node.resetForRetry();
+					reEnableDependents(node);
+					logger.info("Re-enabling job " + node.getJobId() + " attempt " + node.getAttempt());
 				}
 				else {
-					throw new ExecutorManagerException("Cannot enable job " + jobId + " with status " + status.toString());	
-				}
-			}
-		}
-	}
-	
-	public void retryJobs(String[] jobIds, String user) {
-		synchronized(actionSyncObj) {
-			ArrayList<ExecutableNode> jobsToBeQueued = new ArrayList<ExecutableNode>();
-			for (String jobId: jobIds) {
-				if (runningJob.containsKey(jobId)) {
-					logger.error("Cannot retry job " + jobId + " since it's already running. User " + user);
+					logger.error("Cannot retry job " + jobId + " since it hasn't run yet. User " + user);
 					continue;
 				}
-				else {
-					logger.info("Retry of job " + jobId + " requested by " + user);
-					ExecutableNode node = flow.getExecutableNode(jobId);
-					if (node == null) {
-						logger.error("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot disable.");
-					}
-				
-					Status status = node.getStatus();
-					if (status == Status.FAILED || status == Status.READY || status == Status.KILLED) {
-						node.resetForRetry();
-						reEnableDependents(node);
-					}
-					else {
-						logger.error("Cannot retry a job that hasn't finished. " + jobId);
-					}
-					
-					jobsToBeQueued.add(node);
-				}
 			}
 			
 			boolean isFailureFound = false;
@@ -644,11 +579,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 				flowFailed = false;
 			}
 			
-			for (ExecutableNode node: jobsToBeQueued) {
-				queueNextJob(node, "retry-action");
-			}
-			
 			updateFlow();
+			interrupt();
 		}
 	}
 	
@@ -661,11 +593,16 @@ public class FlowRunner extends EventHandler implements Runnable {
 				dependentNode.setUpdateTime(System.currentTimeMillis());
 				reEnableDependents(dependentNode);
 			}
+			else if (dependentNode.getStatus() == Status.SKIPPED) {
+				dependentNode.setStatus(Status.DISABLED);
+				dependentNode.setUpdateTime(System.currentTimeMillis());
+				reEnableDependents(dependentNode);
+			}
 		}
 	}
 	
 	private void interrupt() {
-		currentThread.interrupt();
+		flowRunnerThread.interrupt();
 	}
 	
 	private Status getImpliedStatus(ExecutableNode node) {
@@ -695,6 +632,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				continue;
 			case RUNNING:
 			case QUEUED:
+			case DISABLED:
 				return null;
 			default:
 				// Return null means it's not ready to run.
@@ -716,70 +654,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return Status.READY;
 	}
 	
-	/**
-	 * Iterates through the finished jobs dependents.
-	 * 
-	 * @param node
-	 */
-	private synchronized void queueNextJobs(ExecutableNode finishedNode) {
-		String trigger = finishedNode.getAttempt() > 0 ? finishedNode.getJobId() + "." + finishedNode.getAttempt() : finishedNode.getJobId();
-		for (String dependent : finishedNode.getOutNodes()) {
-			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-			queueNextJob(dependentNode, trigger);
-		}
-	}
-
-	/**
-	 * Queues node for running if it's ready to be run.
-	 * 
-	 * @param node
-	 */
-	private void queueNextJob(ExecutableNode node, String trigger) {
-		Status nextStatus = getImpliedStatus(node);
-		if (nextStatus == null) {
-			// Not yet ready or not applicable
-			return;
-		}
-
-		node.setStatus(nextStatus);
-		
-		Props previousOutput = null;
-		// Iterate the in nodes again and create the dependencies
-		for (String dependency : node.getInNodes()) {
-			Props output = jobOutputProps.get(dependency);
-			if (output != null) {
-				output = Props.clone(output);
-				output.setParent(previousOutput);
-				previousOutput = output;
-			}
-		}
-
-		synchronized(actionSyncObj) {
-			//pausedNode
-			if (node.isPaused()) {
-				pausedNode.put(node.getJobId(), node);
-				logger.info("Job Paused " + node.getJobId());
-				return;
-			}
-			
-			JobRunner runner = this.createJobRunner(node, previousOutput);
-			if (flowPaused) {
-				if (node.getStatus() != Status.DISABLED && node.getStatus() != Status.KILLED) {
-					node.setStatus(Status.PAUSED);
-				}
-				pausedJobsToRun.add(runner);
-				logger.info("Flow Paused. Pausing " + node.getJobId());
-			}
-			else {
-				if (node.getStatus() != Status.DISABLED && node.getStatus() != Status.KILLED) {
-					node.setStatus(Status.QUEUED);
-				}
-				logger.info("Adding " + node.getJobId() + " to run queue with status " + node.getStatus().toString() + " triggered by '" + trigger + "'.");
-				jobsToRun.add(runner);
-			}
-		}
-	}
-	
 	private class JobRunnerEventListener implements EventListener {
 		public JobRunnerEventListener() {
 		}
@@ -787,65 +661,34 @@ public class FlowRunner extends EventHandler implements Runnable {
 		@Override
 		public synchronized void handleEvent(Event event) {
 			JobRunner runner = (JobRunner)event.getRunner();
+			
 			if (event.getType() == Type.JOB_FINISHED) {
-				ExecutableNode node = runner.getNode();
-
-				logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
-				synchronized (actionSyncObj) {
+				synchronized(mainSyncObj) {
+					ExecutableNode node = runner.getNode();
+	
+					logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
+					
 					if (node.getStatus() == Status.FAILED) {
-						// Setting failure
 						flowFailed = true;
-						if (!isFailedStatus(flow.getStatus())) {
+						
+						ExecutionOptions options = flow.getExecutionOptions();
+						// The KILLED status occurs when cancel is invoked. We want to keep this
+						// status even in failure conditions.
+						if (flow.getStatus() != Status.KILLED) {
 							flow.setStatus(Status.FAILED_FINISHING);
-							ExecutionOptions options = flow.getExecutionOptions();
-							if (options.getFailureAction() == FailureAction.CANCEL_ALL) {
-								cancel("azkaban");
+							if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
+								logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
+								cancel();
 							}
 						}
 					}
 					
-					jobOutputProps.put(node.getJobId(), runner.getOutputProps());
-					
-					runningJob.remove(node.getJobId());
-					
-					fireEventListeners(event);
-					queueNextJobs(node);
-				}
-				
-				if (isFlowFinished()) {
-					logger.info("Flow appears finished. Cleaning up.");
-					flowFinished = true;
 					interrupt();
-				}
-			}
-			
-			if (event.isShouldUpdate()) {
-				ExecutableNode node = runner.getNode();
-				updateFlow(node.getUpdateTime());
-			}
-		}
-	}
 	
-	private boolean isFailedStatus(Status status) {
-		switch (status) {
-		case FAILED_FINISHING:
-		case FAILED:
-		case KILLED:
-			return true;
-		default:
-			return false;
-		}
-	}
-	
-	private boolean isFlowFinished() {
-		for (String end: flow.getEndNodes()) {
-			ExecutableNode node = flow.getExecutableNode(end);
-			if (!Status.isStatusFinished(node.getStatus())) {
-				return false;
+					fireEventListeners(event);
+				}
 			}
 		}
-		
-		return true;
 	}
 	
 	public boolean isCancelled() {
@@ -861,17 +704,22 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 	
 	public File getJobLogFile(String jobId, int attempt) {
-		JobRunner runner = allJobs.get(new Pair<String, Integer>(jobId, attempt));
-		if (runner == null) {
+		ExecutableNode node = flow.getExecutableNode(jobId);
+		File path = new File(execDir, node.getJobPropsSource());
+		
+		String logFileName = JobRunner.createLogFileName(execId, jobId, attempt);
+		File logFile = new File(path.getParentFile(), logFileName);
+		
+		if (!logFile.exists()) {
 			return null;
 		}
 		
-		return runner.getLogFile();
+		return logFile;
 	}
 	
 	public boolean isRunnerThreadAlive() {
-		if (currentThread != null) {
-			return currentThread.isAlive();
+		if (flowRunnerThread != null) {
+			return flowRunnerThread.isAlive();
 		}
 		return false;
 	}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 58beb8e..79192c0 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -468,27 +469,17 @@ public class FlowRunnerManager implements EventListener {
 		runner.resume(user);
 	}
 	
-	public void pauseJob(int execId, String jobId, String user) throws ExecutorManagerException {
+	public void retryFailures(int execId, String user) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(execId);
 		
 		if (runner == null) {
 			throw new ExecutorManagerException("Execution " + execId + " is not running.");
 		}
 		
-		runner.pauseJob(jobId, user);
+		runner.retryFailures(user);
 	}
 	
-	public void resumeJob(int execId, String jobId, String user) throws ExecutorManagerException {
-		FlowRunner runner = runningFlows.get(execId);
-		
-		if (runner == null) {
-			throw new ExecutorManagerException("Execution " + execId + " is not running.");
-		}
-		
-		runner.resumeJob(jobId, user);
-	}
-	
-	public void retryJobs(int execId, String user, String ... jobId) throws ExecutorManagerException {
+	public void retryJobs(int execId, String user, List<String> jobId) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(execId);
 		
 		if (runner == null) {
@@ -498,36 +489,6 @@ public class FlowRunnerManager implements EventListener {
 		runner.retryJobs(jobId, user);
 	}
 	
-	public void disableJob(int execId, String user, String jobId) throws ExecutorManagerException {
-		FlowRunner runner = runningFlows.get(execId);
-		
-		if (runner == null) {
-			throw new ExecutorManagerException("Execution " + execId + " is not running.");
-		}
-		
-		runner.disableJob(jobId, user);
-	}
-	
-	public void enableJob(int execId, String user, String jobId) throws ExecutorManagerException {
-		FlowRunner runner = runningFlows.get(execId);
-		
-		if (runner == null) {
-			throw new ExecutorManagerException("Execution " + execId + " is not running.");
-		}
-		
-		runner.enableJob(jobId, user);
-	}
-	
-	public void cancelJob(int execId, String user, String jobId) throws ExecutorManagerException {
-		FlowRunner runner = runningFlows.get(execId);
-		
-		if (runner == null) {
-			throw new ExecutorManagerException("Execution " + execId + " is not running.");
-		}
-		
-		runner.cancelJob(jobId, user);
-	}
-	
 	public ExecutableFlow getExecutableFlow(int execId) {
 		FlowRunner runner = runningFlows.get(execId);
 		if (runner == null) {
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index e5e0d02..9ceb7c0 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -130,7 +130,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			logger = Logger.getLogger(loggerName);
 
 			// Create file appender
-			String logName = node.getAttempt() > 0 ? "_job." + executionId + "." + node.getAttempt() + "." + node.getJobId() + ".log" : "_job." + executionId + "." + node.getJobId() + ".log";
+			String logName = createLogFileName(node.getExecutionId(), node.getJobId(), node.getAttempt());
 			logFile = new File(workingDir, logName);
 			String absolutePath = logFile.getAbsolutePath();
 
@@ -378,4 +378,8 @@ public class JobRunner extends EventHandler implements Runnable {
 	public File getLogFile() {
 		return logFile;
 	}
+	
+	public static String createLogFileName(int executionId, String jobId, int attempt) {
+		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
+	}
 }
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 6bda5cd..7bac5d1 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -33,6 +33,7 @@ public interface ConnectorParams {
 	
 	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
 	public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
+	public static final String MODIFY_RETRY_FAILURES = "retryFailures";
 	public static final String MODIFY_RETRY_JOBS = "retryJobs";
 	public static final String MODIFY_CANCEL_JOBS = "cancelJobs";
 	public static final String MODIFY_DISABLE_JOBS = "skipJobs";
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 84781a4..76f76ec 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -261,6 +261,10 @@ public class ExecutorManager {
 		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId, jobIds);
 	}
 	
+	public void retryFailures(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_FAILURES, userId);
+	}
+	
 	public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
 		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId, jobIds);
 	}
@@ -285,21 +289,31 @@ public class ExecutorManager {
 				throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
 			}
 			
-			for (String jobId: jobIds) {
-				if (!jobId.isEmpty()) {
-					ExecutableNode node = exFlow.getExecutableNode(jobId);
-					if (node == null) {
-						throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
+			Map<String, Object> response = null;
+			if (jobIds != null && jobIds.length > 0) {
+				for (String jobId: jobIds) {
+					if (!jobId.isEmpty()) {
+						ExecutableNode node = exFlow.getExecutableNode(jobId);
+						if (node == null) {
+							throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
+						}
 					}
 				}
+				String ids = StringUtils.join(jobIds, ',');
+				response = callExecutorServer(
+						pair.getFirst(), 
+						ConnectorParams.MODIFY_EXECUTION_ACTION, 
+						userId, 
+						new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command), 
+						new Pair<String,String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+			}
+			else {
+				response = callExecutorServer(
+						pair.getFirst(), 
+						ConnectorParams.MODIFY_EXECUTION_ACTION, 
+						userId, 
+						new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command));
 			}
-			String ids = StringUtils.join(jobIds, ',');
-			Map<String, Object> response = callExecutorServer(
-					pair.getFirst(), 
-					ConnectorParams.MODIFY_EXECUTION_ACTION, 
-					userId, 
-					new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command), 
-					new Pair<String,String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
 			
 			return response;
 		}
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index 86109bf..e3c9c2d 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -187,7 +187,7 @@ public class PropsUtils {
 		return resolvedProps;
 	}
 
-	public static Props produceParentProperties(final ExecutableFlow flow) {
+	public static Props addCommonFlowProperties(final ExecutableFlow flow) {
 		Props parentProps = new Props();
 
 		parentProps.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index f5f8c51..2536fbd 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -351,11 +351,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 		
-		String jobs = getParam(req, "jobIds");
-		String[] jobIds = jobs.split("\\s*,\\s*");
-		
 		try {
-			executorManager.retryExecutingJobs(exFlow, user.getUserId(), jobIds);
+			executorManager.retryFailures(exFlow, user.getUserId());
 		} catch (ExecutorManagerException e) {
 			ret.put("error", e.getMessage());
 		}
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 3b6ed15..59dd6cb 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -180,50 +180,11 @@ azkaban.FlowTabView= Backbone.View.extend({
   },
   handleRetryClick : function(evt) {
       var graphData = graphModel.get("data");
-  
-  	  var failedJobs = new Array();
-  	  var failedJobStr = "";
-  	  var nodes = graphData.nodes;
-	  
-  	  for (var i = 0; i < nodes.length; ++i) {
-		var node = nodes[i];
-		if(node.status=='FAILED') {
-			failedJobs.push(node.id);
-		}
-		else if (node.status=='KILLED') {
-			// Nodes can be in a killed state, even if the parents have succeeded due to failure option Finish running
-			// We want to re-enable those.
-			var shouldAdd = true;
-			if (node.in) {
-				var size = 0;
-				for(var key in node.in) {
-					size++;
-					var dependency = node.in[key];
-					if (dependency.status != 'SUCCEEDED' && dependency.status!='SKIPPED') {
-						shouldAdd = false;
-						break;
-					}
-				}
-				
-				if (size == 0) {
-					shouldAdd = false;
-				}
-			}
-			else {
-				shouldAdd = false;
-			}
-			
-			if (shouldAdd) {
-				failedJobs.push(node.id);
-			}
-		}
-  	  }
-  	  failedJobStr = failedJobs.join();
-  
+
       var requestURL = contextURL + "/executor";
 	  ajaxCall(
 		requestURL,
-		{"execid": execId, "ajax":"retryFailedJobs", "jobIds":failedJobStr},
+		{"execid": execId, "ajax":"retryFailedJobs"},
 		function(data) {
           console.log("cancel clicked");
           if (data.error) {
diff --git a/unit/java/azkaban/test/executor/SleepJavaJob.java b/unit/java/azkaban/test/executor/SleepJavaJob.java
index 689dee1..b75f7c6 100644
--- a/unit/java/azkaban/test/executor/SleepJavaJob.java
+++ b/unit/java/azkaban/test/executor/SleepJavaJob.java
@@ -8,11 +8,13 @@ public class SleepJavaJob {
 	private boolean fail;
 	private String seconds;
 	private int attempts;
+	private int currentAttempt;
 	private String id;
 
 	public SleepJavaJob(String id, Map<String, String> parameters) {
 		this.id = id;
 		String failStr = parameters.get("fail");
+		
 		if (failStr == null || failStr.equals("false")) {
 			fail = false;
 		}
@@ -20,6 +22,7 @@ public class SleepJavaJob {
 			fail = true;
 		}
 	
+		currentAttempt = parameters.containsKey("azkaban.job.attempt") ? Integer.parseInt(parameters.get("azkaban.job.attempt")) : 0;
 		String attemptString = parameters.get("passRetry");
 		if (attemptString == null) {
 			attempts = -1;
@@ -28,7 +31,13 @@ public class SleepJavaJob {
 			attempts = Integer.valueOf(attemptString);
 		}
 		seconds = parameters.get("seconds");
-		System.out.println("Properly created");
+
+		if (fail) {
+			System.out.println("Planning to fail after " + seconds + " seconds. Attempts left " + currentAttempt + " of " + attempts);
+		}
+		else {
+			System.out.println("Planning to succeed after " + seconds + " seconds.");
+		}
 	}
 	
 	public void run() throws Exception {
@@ -45,19 +54,9 @@ public class SleepJavaJob {
 				System.out.println("Interrupted " + fail);
 			}
 		}
-		
-		File file = new File("");
-		File[] attemptFiles = file.listFiles(new FileFilter() {
-			@Override
-			public boolean accept(File pathname) {
-				return pathname.getName().startsWith(id);
-			}});
-		
-		if (fail) {
-			if (attempts <= 0 || attemptFiles == null || attemptFiles.length > attempts) {
-				File attemptFile = new File(file, id + "." + (attemptFiles == null ? 0 : attemptFiles.length));
 
-				attemptFile.mkdirs();
+		if (fail) {
+			if (attempts <= 0 || currentAttempt <= attempts) {
 				throw new Exception("I failed because I had to.");
 			}
 		}