azkaban-aplcache

Details

diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 2dbf05f..9ae7ed0 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -69,8 +69,7 @@ public abstract class FlowWatcher {
 			return null;
 		}
 		
-		String[] split = jobId.split(":");
-		ExecutableNode node = flow.getExecutableNode(split);
+		ExecutableNode node = flow.getExecutableNodePath(jobId);
 		if (node == null) {
 			return null;
 		}
@@ -85,8 +84,7 @@ public abstract class FlowWatcher {
 	}
 	
 	public Status peekStatus(String jobId) {
-		String[] split = jobId.split(":");
-		ExecutableNode node = flow.getExecutableNode(split);
+		ExecutableNode node = flow.getExecutableNodePath(jobId);
 		if (node != null) {
 			return node.getStatus();
 		}
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index db33b46..cab5f58 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -18,8 +18,11 @@ package azkaban.execapp;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -53,7 +56,12 @@ import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
+import azkaban.utils.SwapQueue;
 
+/**
+ * Class that handles the running of a ExecutableFlow DAG
+ * 
+ */
 public class FlowRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
 	// We check update every 5 minutes, just in case things get stuck. But for the most part, we'll be idling.
@@ -87,6 +95,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
 	private Set<JobRunner> activeJobRunners = Collections.newSetFromMap(new ConcurrentHashMap<JobRunner, Boolean>());
 	
+	// Thread safe swap queue for finishedExecutions.
+	private SwapQueue<ExecutableNode> finishedNodes;
+	
 	// Used for pipelining
 	private Integer pipelineLevel = null;
 	private Integer pipelineExecId = null;
@@ -105,10 +116,35 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private boolean flowFinished = false;
 	private boolean flowCancelled = false;
 	
+	// The following is state that will trigger a retry of all failed jobs
+	private boolean retryFailedJobs = false;
+	
+	
+	/**
+	 * Constructor. 
+	 * This will create its own ExecutorService for thread pools
+	 * 
+	 * @param flow
+	 * @param executorLoader
+	 * @param projectLoader
+	 * @param jobtypeManager
+	 * @throws ExecutorManagerException
+	 */
 	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
 		this(flow, executorLoader, projectLoader, jobtypeManager, null);
 	}
 
+	/**
+	 * Constructor.
+	 * If executorService is null, then it will create it's own for thread pools.
+	 * 
+	 * @param flow
+	 * @param executorLoader
+	 * @param projectLoader
+	 * @param jobtypeManager
+	 * @param executorService
+	 * @throws ExecutorManagerException
+	 */
 	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager, ExecutorService executorService) throws ExecutorManagerException {
 		this.execId = flow.getExecutionId();
 		this.flow = flow;
@@ -123,6 +159,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.failureAction = options.getFailureAction();
 		this.proxyUsers = flow.getProxyUsers();
 		this.executorService = executorService;
+		this.finishedNodes = new SwapQueue<ExecutableNode>();
 	}
 
 	public FlowRunner setFlowWatcher(FlowWatcher watcher) {
@@ -317,7 +354,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	 */
 	private void runFlow() throws Exception {
 		logger.info("Starting flows");
-		flow.setStatus(Status.RUNNING);
+		runReadyJob(this.flow);
 		updateFlow();
 		
 		while (!flowFinished) {
@@ -331,7 +368,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 					continue;
 				}
 				else {
-					if (!progressGraph()) {
+					if (retryFailedJobs) {
+						retryAllFailures();
+					}
+					else if (!progressGraph()) {
 						try {
 							mainSyncObj.wait(CHECK_WAIT_MS);
 						} catch (InterruptedException e) {
@@ -344,53 +384,186 @@ public class FlowRunner extends EventHandler implements Runnable {
 		logger.info("Finishing up flow. Awaiting Termination");
 		executorService.shutdown();
 		
-		finalizeFlow(flow);
 		updateFlow();
 		logger.info("Finished Flow");
 	}
 	
-	private boolean progressGraph() throws IOException {
-		List<ExecutableNode> jobsReadyToRun = flow.findNextJobsToRun();
-
-		// If its the current flow
-		if (jobsReadyToRun.size() == 1 && jobsReadyToRun.get(0) == flow) {
-			flowFinished = true;
-			return true;
+	private void retryAllFailures() throws IOException {
+		logger.info("Restarting all failed jobs");
+		
+		this.retryFailedJobs = false;
+		this.flowCancelled = false;
+		this.flowFailed = false;
+		this.flow.setStatus(Status.RUNNING);
+		
+		ArrayList<ExecutableNode> retryJobs = new ArrayList<ExecutableNode>();
+		resetFailedState(this.flow, retryJobs);
+		
+		for (ExecutableNode node: retryJobs) {
+			if(node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+				runReadyJob(node);
+			}
+			else if (node.getStatus() == Status.SUCCEEDED){
+				for (String outNodeId: node.getOutNodes()) {
+					ExecutableFlowBase base = node.getParentFlow();
+					runReadyJob(base.getExecutableNode(outNodeId));
+				}
+			}
+			
+			runReadyJob(node);
 		}
+		
+		updateFlow();
+	}
+	
+	private boolean progressGraph() throws IOException {
+		finishedNodes.swap();
 
-		for (ExecutableNode node: jobsReadyToRun) {
-			Status nextStatus = getImpliedStatus(node);
+		// The following nodes are finished, so we'll collect a list of outnodes
+		// that are candidates for running next.
+		HashSet<ExecutableNode> nodesToCheck = new HashSet<ExecutableNode>();
+		for (ExecutableNode node: finishedNodes) {
+			Set<String> outNodeIds = node.getOutNodes();
+			ExecutableFlowBase parentFlow = node.getParentFlow();
 			
-			if (node instanceof ExecutableFlowBase && ((ExecutableFlowBase)node).isFlowFinished()) {
-				finalizeFlow((ExecutableFlowBase)node);
-				fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+			// If a job is seen as failed, then we set the parent flow to FAILED_FINISHING
+			if (node.getStatus() == Status.FAILED) {
+				// The job cannot be retried or has run out of retry attempts. We will 
+				// fail the job and its flow now.
+				if (!retryJobIfPossible(node)) {
+					propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
+					if (failureAction == FailureAction.CANCEL_ALL) {
+						this.cancel();
+					}
+					this.flowFailed = true;
+				}
+				else {
+					nodesToCheck.add(node);
+					continue;
+				}
 			}
-			else if (nextStatus == Status.KILLED || isCancelled()) {
-				logger.info("Killing " + node.getId() + " due to prior errors.");
-				node.killNode(System.currentTimeMillis());
-				fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+
+			if (outNodeIds.isEmpty()) {
+				// There's no outnodes means it's the end of a flow, so we finalize
+				// and fire an event.
+				finalizeFlow(parentFlow);
+				finishExecutableNode(parentFlow);
+				
+				// If the parent has a parent, then we process
+				if (!(parentFlow instanceof ExecutableFlow)) {
+					outNodeIds = parentFlow.getOutNodes();
+					parentFlow = parentFlow.getParentFlow();
+				}
 			}
-			else if (nextStatus == Status.DISABLED) {
-				logger.info("Skipping disabled job " + node.getId() + ".");
-				node.skipNode(System.currentTimeMillis());
-				fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+
+			// Add all out nodes from the finished job. We'll check against this set to
+			// see if any are candidates for running.
+			for (String nodeId: outNodeIds) {
+				ExecutableNode outNode = parentFlow.getExecutableNode(nodeId);
+				nodesToCheck.add(outNode);
+			}
+		}
+
+		// Runs candidate jobs. The code will check to see if they are ready to run before 
+		// Instant kill or skip if necessary.
+		boolean jobsRun = false;
+		for (ExecutableNode node: nodesToCheck) {
+			if (Status.isStatusFinished(node.getStatus()) || 
+				Status.isStatusRunning(node.getStatus())) {
+				// Really shouldn't get in here.
+				continue;
+			}
+			
+			jobsRun |= runReadyJob(node);
+		}
+		
+		if (jobsRun || finishedNodes.getSize() > 0 ) {
+			updateFlow();
+			return true;
+		}
+		
+		return false;
+	}
+	private boolean runReadyJob(ExecutableNode node) throws IOException {
+		if (Status.isStatusFinished(node.getStatus()) || 
+			Status.isStatusRunning(node.getStatus())) {
+			return false;
+		}
+		
+		Status nextNodeStatus = getImpliedStatus(node);
+		if (nextNodeStatus == null) {
+			return false;
+		}
+		
+		if (nextNodeStatus == Status.KILLED) {
+			logger.info("Killing '" + node.getNestedId() + "' due to prior errors.");
+			node.killNode(System.currentTimeMillis());
+			finishExecutableNode(node);
+		}
+		else if (nextNodeStatus == Status.SKIPPED) {
+			logger.info("Skipping disabled job '" + node.getId() + "'.");
+			node.skipNode(System.currentTimeMillis());
+			finishExecutableNode(node);
+		}
+		else if (nextNodeStatus == Status.READY) {
+			if (node instanceof ExecutableFlowBase) {
+				ExecutableFlowBase flow = ((ExecutableFlowBase) node);
+				logger.info("Running flow '" + flow.getNestedId() + "'.");
+				flow.setStatus(Status.RUNNING);
+				flow.setStartTime(System.currentTimeMillis());
+				prepareJobProperties(flow);
+				
+				for (String startNodeId: ((ExecutableFlowBase) node).getStartNodes()) {
+					ExecutableNode startNode = flow.getExecutableNode(startNodeId);
+					runReadyJob(startNode);
+				}
 			}
 			else {
 				runExecutableNode(node);
 			}
 		}
-			
-		if (!jobsReadyToRun.isEmpty()) {
-			updateFlow();
+		return true;
+	}
+	
+	private boolean retryJobIfPossible(ExecutableNode node) {
+		if (node instanceof ExecutableFlowBase) {
+			return false;
+		}
+	
+		if (node.getRetries() > node.getAttempt()) {
+			logger.info("Job '" + node.getId() + "' will be retried. Attempt " + node.getAttempt() + " of " + node.getRetries());
+			node.setDelayedExecution(node.getRetryBackoff());
+			node.resetForRetry();
 			return true;
 		}
 		else {
+			if (node.getRetries() > 0) {
+				logger.info("Job '" + node.getId() + "' has run out of retry attempts");
+				// Setting delayed execution to 0 in case this is manually re-tried.
+				node.setDelayedExecution(0);
+			}
+			
 			return false;
 		}
 	}
 	
+	private void propagateStatus(ExecutableFlowBase base, Status status) {
+		if (!Status.isStatusFinished(base.getStatus())) {
+			logger.info("Setting " + base.getNestedId() + " to " + status);
+			base.setStatus(status);
+			if (base.getParentFlow() != null) {
+				propagateStatus(base.getParentFlow(), status);
+			}
+		}
+	}
+	
+	private void finishExecutableNode(ExecutableNode node) {
+		finishedNodes.add(node);
+		fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+	}
+	
 	private void finalizeFlow(ExecutableFlowBase flow) {
-		String id = flow == this.flow ? "" : flow.getNestedId() + " ";
+		String id = flow == this.flow ? "" : flow.getNestedId();
 
 		// If it's not the starting flow, we'll create set of output props
 		// for the finished flow.
@@ -418,21 +591,31 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 		
 		flow.setEndTime(System.currentTimeMillis());
+		flow.setUpdateTime(System.currentTimeMillis());
+		long durationSec = (flow.getEndTime() - flow.getStartTime()) / 1000;
 		switch(flow.getStatus()) {
 		case FAILED_FINISHING:
-			logger.info("Setting flow " + id + "status to Failed.");
+			logger.info("Setting flow '" + id + "' status to FAILED in " + durationSec + " seconds");
 			flow.setStatus(Status.FAILED);
+			break;
 		case FAILED:
 		case KILLED:
 		case FAILED_SUCCEEDED:
-			logger.info("Flow " + id + "is set to " + flow.getStatus().toString());
+			logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
 			break;
 		default:
 			flow.setStatus(Status.SUCCEEDED);
-			logger.info("Flow " + id + "is set to " + flow.getStatus().toString());
+			logger.info("Flow '" + id + "' is set to " + flow.getStatus().toString() + " in " + durationSec + " seconds");
+		}
+		
+		// If the finalized flow is actually the top level flow, than we finish
+		// the main loop.
+		if (flow instanceof ExecutableFlow) {
+			flowFinished = true;
 		}
 	}
 	
+	
 	@SuppressWarnings("unchecked")
 	private void prepareJobProperties(ExecutableNode node) throws IOException {
 		Props props = null;
@@ -515,66 +698,63 @@ public class FlowRunner extends EventHandler implements Runnable {
 		// Collect output props from the job's dependencies.
 		prepareJobProperties(node);
 		
-		if (node instanceof ExecutableFlowBase) {
-			node.setStatus(Status.RUNNING);
-			node.setStartTime(System.currentTimeMillis());
-			
-			logger.info("Starting subflow " + node.getNestedId() + ".");
-		}
-		else {
-			node.setStatus(Status.QUEUED);
-			JobRunner runner = createJobRunner(node);
-			logger.info("Submitting job " + node.getNestedId() + " to run.");
-			try {
-				executorService.submit(runner);
-				activeJobRunners.add(runner);
-			} catch (RejectedExecutionException e) {
-				logger.error(e);
-			};
-		}
+		node.setStatus(Status.QUEUED);
+		JobRunner runner = createJobRunner(node);
+		logger.info("Submitting job '" + node.getNestedId() + "' to run.");
+		try {
+			executorService.submit(runner);
+			activeJobRunners.add(runner);
+		} catch (RejectedExecutionException e) {
+			logger.error(e);
+		};
+
 	}
 	
 	/**
-	 * Determines what the state of the next node should be.
+	 * Determines what the state of the next node should be. Returns null if 
+	 * the node should not be run.
 	 * 
 	 * @param node
 	 * @return
 	 */
 	public Status getImpliedStatus(ExecutableNode node) {
-		if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
-			return Status.KILLED;
-		}
-		else if (node.getStatus() == Status.DISABLED) {
-			return Status.DISABLED;
+		// If it's running or finished with 'SUCCEEDED', than don't even
+		// bother starting this job.
+		if (Status.isStatusRunning(node.getStatus()) || 
+			node.getStatus() == Status.SUCCEEDED) {
+			return null;
 		}
 		
+		// Go through the node's dependencies. If all of the previous job's
+		// statuses is finished and not FAILED or KILLED, than we can safely
+		// run this job.
 		ExecutableFlowBase flow = node.getParentFlow();
 		boolean shouldKill = false;
 		for (String dependency: node.getInNodes()) {
 			ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
 			Status depStatus = dependencyNode.getStatus();
 			
-			switch (depStatus) {
-			case FAILED:
-			case KILLED:
-				shouldKill = true;
-			case SKIPPED:
-			case SUCCEEDED:
-			case FAILED_SUCCEEDED:
-				continue;
-			default:
-				// Should never come here.
+			if (!Status.isStatusFinished(depStatus)) {
 				return null;
 			}
-		}
-
-		if (shouldKill) {
-			return Status.KILLED;
+			else if (depStatus == Status.FAILED || depStatus == Status.KILLED) {
+				// We propagate failures as KILLED states.
+				shouldKill = true;
+			}
 		}
 		
 		// If it's disabled but ready to run, we want to make sure it continues being disabled.
-		if (node.getStatus() == Status.DISABLED) {
-			return Status.DISABLED;
+		if (node.getStatus() == Status.DISABLED || node.getStatus() == Status.SKIPPED) {
+			return Status.SKIPPED;
+		}
+		
+		// If the flow has failed, and we want to finish only the currently running jobs, we just
+		// kill everything else. We also kill, if the flow has been cancelled.
+		if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
+			return Status.KILLED;
+		}
+		else if (shouldKill || isCancelled()) {
+			return Status.KILLED;
 		}
 		
 		// All good to go, ready to run.
@@ -686,58 +866,97 @@ public class FlowRunner extends EventHandler implements Runnable {
 	public void retryFailures(String user) {
 		synchronized(mainSyncObj) {
 			logger.info("Retrying failures invoked by " + user);
-			retryFailures(flow);
-			
-			flow.setStatus(Status.RUNNING);
-			flow.setUpdateTime(System.currentTimeMillis());
-			flowFailed = false;
-			
-			updateFlow();
+			retryFailedJobs = true;
 			interrupt();
 		}
 	}
 	
-	private void retryFailures(ExecutableFlowBase flow) {
-		for (ExecutableNode node: flow.getExecutableNodes()) {
-			if (node instanceof ExecutableFlowBase) {
-				if (node.getStatus() == Status.FAILED || node.getStatus() == Status.FAILED_FINISHING || node.getStatus() == Status.KILLED) {
-					retryFailures((ExecutableFlowBase)node);
-				}
-			}
+	private void resetFailedState(ExecutableFlowBase flow, List<ExecutableNode> nodesToRetry) {
+		//bottom up
+		LinkedList<ExecutableNode> queue = new LinkedList<ExecutableNode>();
+		for (String id : flow.getEndNodes()) {
+			ExecutableNode node = flow.getExecutableNode(id);
+			queue.add(node);
+		}
+		
+		long maxStartTime = -1;
+		while (!queue.isEmpty()) {
+			ExecutableNode node = queue.poll();
+			Status oldStatus = node.getStatus();
+			maxStartTime = Math.max(node.getStartTime(), maxStartTime);
 			
-			if (node.getStatus() == Status.FAILED) {
-				node.resetForRetry();
-				logger.info("Re-enabling job " + node.getNestedId() + " attempt " + node.getAttempt());
-				reEnableDependents(node);
+			long currentTime = System.currentTimeMillis();
+			if (node.getStatus() == Status.SUCCEEDED) {
+				// This is a candidate parent for restart
+				nodesToRetry.add(node);
+				continue;
 			}
-			else if (node.getStatus() == Status.KILLED) {
-				node.setStartTime(-1);
+			else if (node.getStatus() == Status.RUNNING) {
+				continue;
+			}
+			else if (node.getStatus() == Status.SKIPPED) {
+				node.setStatus(Status.DISABLED);
 				node.setEndTime(-1);
-				node.setStatus(Status.READY);
+				node.setStartTime(-1);
+				node.setUpdateTime(currentTime);
+			}
+			else if (node instanceof ExecutableFlowBase) {
+				ExecutableFlowBase base = (ExecutableFlowBase)node;
+				switch (base.getStatus()) {
+				case KILLED:
+				case FAILED:
+				case FAILED_FINISHING:
+					resetFailedState(base, nodesToRetry);	
+				default:
+				}
+				
+				if (base.getStatus() != Status.KILLED) {
+					continue;
+				}
 			}
-			else if (node.getStatus() == Status.FAILED_FINISHING) {
+			else if (node.getStatus() == Status.KILLED) {
+				// Not a flow, but killed
+				node.setStatus(Status.READY);
 				node.setStartTime(-1);
 				node.setEndTime(-1);
-				node.setStatus(Status.READY);
+				node.setUpdateTime(currentTime);
+			}
+			else if(node.getStatus() == Status.FAILED) {
+				node.resetForRetry();
+				nodesToRetry.add(node);
+			}
+
+			if (!(node instanceof ExecutableFlowBase) && node.getStatus() != oldStatus) {
+				logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
 			}
-		}
-	}
-	
-	private void reEnableDependents(ExecutableNode node) {
-		for(String dependent: node.getOutNodes()) {
-			ExecutableNode dependentNode = node.getParentFlow().getExecutableNode(dependent);
 			
-			if (dependentNode.getStatus() == Status.KILLED) {
-				dependentNode.setStatus(Status.READY);
-				dependentNode.setUpdateTime(System.currentTimeMillis());
-				reEnableDependents(dependentNode);
+			for(String inId: node.getInNodes()) {
+				ExecutableNode nodeUp = flow.getExecutableNode(inId);
+				queue.add(nodeUp);
 			}
-			else if (dependentNode.getStatus() == Status.SKIPPED) {
-				dependentNode.setStatus(Status.DISABLED);
-				dependentNode.setUpdateTime(System.currentTimeMillis());
-				reEnableDependents(dependentNode);
+		}
+		
+		Status oldFlowState = flow.getStatus();
+		if (maxStartTime == -1) {
+			// Nothing has run inside the flow, so we assume the flow hasn't even started running yet.
+			flow.setStatus(Status.READY);
+		}
+		else {
+			flow.setStatus(Status.RUNNING);
+			
+			// Add any READY start nodes. Usually it means the flow started, but the start node has not.
+			for (String id: flow.getStartNodes()) {
+				ExecutableNode node = flow.getExecutableNode(id);
+				if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+					nodesToRetry.add(node);
+				}
 			}
 		}
+		flow.setUpdateTime(System.currentTimeMillis());
+		flow.setEndTime(-1);
+		flow.setStartTime(maxStartTime);
+		
+		logger.info("Resetting flow '" + flow.getNestedId() + "' from " + oldFlowState + " to " + flow.getStatus());
 	}
 	
 	private void interrupt() {
@@ -756,61 +975,17 @@ public class FlowRunner extends EventHandler implements Runnable {
 				updateFlow();
 			}
 			else if (event.getType() == Type.JOB_FINISHED) {
+				ExecutableNode node = runner.getNode();
+				long seconds = (node.getEndTime() - node.getStartTime())/1000;
 				synchronized(mainSyncObj) {
-					ExecutableNode node = runner.getNode();
-					activeJobRunners.remove(node.getId());
-					
-					String id = node.getNestedId();
-					logger.info("Job Finished " + id + " with status " + node.getStatus());
-					if (node.getOutputProps() != null && node.getOutputProps().size() > 0) {
-						logger.info("Job " + id + " had output props.");
-					}
-					
-					if (node.getStatus() == Status.FAILED) {
-						if (runner.getRetries() > node.getAttempt()) {
-							logger.info("Job " + id + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
-							node.setDelayedExecution(runner.getRetryBackoff());
-							node.resetForRetry();
-						}
-						else {
-							if (runner.getRetries() > 0) {
-								logger.info("Job " + id + " has run out of retry attempts");
-								// Setting delayed execution to 0 in case this is manually re-tried.
-								node.setDelayedExecution(0);
-							}
-							
-							flowFailed = true;
-							
-							// The KILLED status occurs when cancel is invoked. We want to keep this
-							// status even in failure conditions.
-							if (!flowCancelled) {
-								// During a failure, we propagate the failure to parent flows
-								propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
-	
-								if (failureAction == FailureAction.CANCEL_ALL) {
-									logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
-									cancel();
-								}
-							}
-						}
-						
-					}
-					updateFlow();
-
+					logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in " + seconds + " seconds");
+					finishedNodes.add(node);
+					node.getParentFlow().setUpdateTime(System.currentTimeMillis());
 					interrupt();
 					fireEventListeners(event);
 				}
 			}
 		}
-		
-		private void propagateStatus(ExecutableFlowBase base, Status status) {
-			if (!Status.isStatusFinished(base.getStatus())) {
-				base.setStatus(status);
-				if (base.getParentFlow() != null) {
-					propagateStatus(base.getParentFlow(), status);
-				}
-			}
-		}
 	}
 
 	public boolean isCancelled() {
@@ -826,10 +1001,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 	
 	public File getJobLogFile(String jobId, int attempt) {
-		ExecutableNode node = flow.getExecutableNode(jobId);
+		ExecutableNode node = flow.getExecutableNodePath(jobId);
 		File path = new File(execDir, node.getJobSource());
 		
-		String logFileName = JobRunner.createLogFileName(execId, jobId, attempt);
+		String logFileName = JobRunner.createLogFileName(node, attempt);
 		File logFile = new File(path.getParentFile(), logFileName);
 		
 		if (!logFile.exists()) {
@@ -840,23 +1015,22 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 
 	public File getJobAttachmentFile(String jobId, int attempt) {
-		ExecutableNode node = flow.getExecutableNode(jobId);
-    File path = new File(execDir, node.getJobSource());
+		ExecutableNode node = flow.getExecutableNodePath(jobId);
+		File path = new File(execDir, node.getJobSource());
 
-    String attachmentFileName =
-        JobRunner.createAttachmentFileName(execId, jobId, attempt);
-    File attachmentFile = new File(path.getParentFile(), attachmentFileName);
-    if (!attachmentFile.exists()) {
-      return null;
-    }
-    return attachmentFile;
+		String attachmentFileName = JobRunner.createAttachmentFileName(node, attempt);
+		File attachmentFile = new File(path.getParentFile(), attachmentFileName);
+		if (!attachmentFile.exists()) {
+			return null;
+		}
+		return attachmentFile;
 	}
 	
 	public File getJobMetaDataFile(String jobId, int attempt) {
-		ExecutableNode node = flow.getExecutableNode(jobId);
+		ExecutableNode node = flow.getExecutableNodePath(jobId);
 		File path = new File(execDir, node.getJobSource());
 		
-		String metaDataFileName = JobRunner.createMetaDataFileName(execId, jobId, attempt);
+		String metaDataFileName = JobRunner.createMetaDataFileName(node, attempt);
 		File metaDataFile = new File(path.getParentFile(), metaDataFileName);
 		
 		if (!metaDataFile.exists()) {
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 211507f..3de3d23 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -205,12 +205,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			logger = Logger.getLogger(loggerName);
 
 			// Create file appender
-			String id = this.jobId;
-			if (node.getExecutableFlow() != node.getParentFlow()) {
-				id = node.getPrintableId("._.");
-			}
-			
-			String logName = createLogFileName(this.executionId, id, node.getAttempt());
+			String logName = createLogFileName(node);
 			logFile = new File(workingDir, logName);
 			
 			String absolutePath = logFile.getAbsolutePath();
@@ -231,8 +226,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	}
 
 	private void createAttachmentFile() {
-		String fileName = createAttachmentFileName(
-				this.executionId, this.jobId, node.getAttempt());
+		String fileName = createAttachmentFileName(node);
 		File file = new File(workingDir, fileName);
 		attachmentFileName = file.getAbsolutePath();
 	}
@@ -485,8 +479,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			}
 			
 			props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
-			props.put(CommonJobProperties.JOB_METADATA_FILE,
-					createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
+			props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(node));
 			props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, attachmentFileName);
 			changeStatus(Status.RUNNING);
 			
@@ -618,32 +611,49 @@ public class JobRunner extends EventHandler implements Runnable {
 		return logFile;
 	}
 	
-	public int getRetries() {
-		return props.getInt("retries", 0);
+	public static String createLogFileName(ExecutableNode node, int attempt) {
+		int executionId = node.getExecutableFlow().getExecutionId();
+		String jobId = node.getId();
+		if (node.getExecutableFlow() != node.getParentFlow()) {
+			// Posix safe file delimiter
+			jobId = node.getPrintableId("._.");
+		}
+		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
 	}
 	
-	public long getRetryBackoff() {
-		return props.getLong("retry.backoff", 0);
+	public static String createLogFileName(ExecutableNode node) {
+		return JobRunner.createLogFileName(node, node.getAttempt());
+	}
+	
+	public static String createMetaDataFileName(ExecutableNode node, int attempt) {
+		int executionId = node.getExecutableFlow().getExecutionId();
+		String jobId = node.getId();
+		if (node.getExecutableFlow() != node.getParentFlow()) {
+			// Posix safe file delimiter
+			jobId = node.getPrintableId("._.");
+		}
+		
+		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" : "_job." + executionId + "." + jobId + ".meta";
 	}
 	
-	public static String createAttachmentFileName(
-			int executionId, String jobId, int attempt) {
-		return attempt > 0 
-				? "_job." + executionId + "." + attempt + "." + jobId + ".attach" 
-				: "_job." + executionId + "." + jobId + ".attach";
+	public static String createMetaDataFileName(ExecutableNode node) {
+		return JobRunner.createMetaDataFileName(node, node.getAttempt());
+	}
+
+	public static String createAttachmentFileName(ExecutableNode node) {
+		
+		return JobRunner.createAttachmentFileName(node, node.getAttempt());
 	}
+	
+	public static String createAttachmentFileName(ExecutableNode node, int attempt) {
+		int executionId = node.getExecutableFlow().getExecutionId();
+		String jobId = node.getId();
+		if (node.getExecutableFlow() != node.getParentFlow()) {
+			// Posix safe file delimiter
+			jobId = node.getPrintableId("._.");
+		}
+
+		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".attach" : "_job." + executionId + "." + jobId + ".attach";
 
-	public static String createLogFileName(
-			int executionId, String jobId, int attempt) {
-		return attempt > 0 
-				? "_job." + executionId + "." + attempt + "." + jobId + ".log" 
-				: "_job." + executionId + "." + jobId + ".log";
-	}
-	
-	public static String createMetaDataFileName(
-			int executionId, String jobId, int attempt) {
-		return attempt > 0 
-				? "_job." + executionId + "." + attempt + "." + jobId + ".meta" 
-				: "_job." + executionId + "." + jobId + ".meta";
 	}
 }
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 7c721cd..bc33c0e 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -225,4 +225,10 @@ public class ExecutableFlow extends ExecutableFlowBase {
 		updateData.put(EXECUTIONID_PARAM, this.executionId);
 		return updateData;
 	}
+	
+	public void resetForRetry() {
+		super.resetForRetry();
+		this.setStatus(Status.RUNNING);
+	}
+	
 }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 040888b..a760204 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -123,11 +123,16 @@ public class ExecutableFlowBase extends ExecutableNode {
 		return executableNodes.get(id);
 	}
 	
-	public ExecutableNode getExecutableNode(String ... ids) {
-		return getExecutableNode(this, ids, 0);
+	public ExecutableNode getExecutableNodePath(String ids) {
+		String[] split = ids.split(":");
+		return getExecutableNodePath(split);
 	}
 	
-	private ExecutableNode getExecutableNode(ExecutableFlowBase flow, String[] ids, int currentIdIdx) {
+	public ExecutableNode getExecutableNodePath(String ... ids) {
+		return getExecutableNodePath(this, ids, 0);
+	}
+	
+	private ExecutableNode getExecutableNodePath(ExecutableFlowBase flow, String[] ids, int currentIdIdx) {
 		ExecutableNode node = flow.getExecutableNode(ids[currentIdIdx]);
 		currentIdIdx++;
 		
@@ -139,7 +144,7 @@ public class ExecutableFlowBase extends ExecutableNode {
 			return node;
 		}
 		else if (node instanceof ExecutableFlowBase) {
-			return getExecutableNode((ExecutableFlowBase)node, ids, currentIdIdx);
+			return getExecutableNodePath((ExecutableFlowBase)node, ids, currentIdIdx);
 		}
 		else {
 			return null;
@@ -406,4 +411,13 @@ public class ExecutableFlowBase extends ExecutableNode {
 		
 		return jobsToRun;
 	}
+	
+	public String getFlowPath() {
+		if (this.getParentFlow() == null) {
+			return this.getFlowId();
+		}
+		else {
+			return this.getParentFlow().getFlowPath() + "," + this.getId() + ":"+ this.getFlowId();
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutableJobInfo.java b/src/java/azkaban/executor/ExecutableJobInfo.java
index 53eca5c..f993111 100644
--- a/src/java/azkaban/executor/ExecutableJobInfo.java
+++ b/src/java/azkaban/executor/ExecutableJobInfo.java
@@ -16,9 +16,13 @@
 
 package azkaban.executor;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import azkaban.utils.Pair;
+
 public class ExecutableJobInfo {
 	private final int execId;
 	private final int projectId;
@@ -29,6 +33,8 @@ public class ExecutableJobInfo {
 	private final long endTime;
 	private final Status status;
 	private final int attempt;
+
+	private ArrayList<Pair<String, String>> jobPath;
 	
 	public ExecutableJobInfo(int execId, int projectId, int version, String flowId, String jobId, long startTime, long endTime, Status status, int attempt) {
 		this.execId = execId;
@@ -40,6 +46,8 @@ public class ExecutableJobInfo {
 		this.flowId = flowId;
 		this.jobId = jobId;
 		this.attempt = attempt;
+		
+		parseFlowId();
 	}
 
 	public int getProjectId() {
@@ -58,6 +66,20 @@ public class ExecutableJobInfo {
 		return flowId;
 	}
 
+	public String getImmediateFlowId() {
+		if (jobPath.size() == 1) {
+			return flowId;
+		}
+		Pair<String, String> pair = jobPath.get(jobPath.size() - 1);		
+		return pair.getSecond();
+	}
+	
+	public String getHeadFlowId() {
+		Pair<String, String> pair = jobPath.get(0);	
+		
+		return pair.getFirst();
+	}
+	
 	public String getJobId() {
 		return jobId;
 	}
@@ -78,6 +100,40 @@ public class ExecutableJobInfo {
 		return attempt;
 	}
 	
+	public List<Pair<String,String>> getParsedFlowId() {
+		return jobPath;
+	}
+	
+	private void parseFlowId() {
+		jobPath = new ArrayList<Pair<String,String>>();
+		String[] flowPairs = flowId.split(",");
+		
+		for (String flowPair: flowPairs) {
+			String[] pairSplit = flowPair.split(":");
+			Pair<String, String> pair;
+			if (pairSplit.length == 1) {
+				pair = new Pair<String, String>(pairSplit[0], pairSplit[0]);
+			}
+			else {
+				pair = new Pair<String, String>(pairSplit[0], pairSplit[1]);
+			}
+			
+			jobPath.add(pair);
+		}
+	}
+	
+	public String getJobIdPath() {
+		// Skip the first one because it's always just the root.
+		String path = "";
+		for (int i=1; i < jobPath.size(); ++i) {
+			Pair<String,String> pair = jobPath.get(i);
+			path += pair.getFirst() + ":";
+		}
+		
+		path += jobId;
+		return path;
+	}
+	
 	public Map<String, Object> toObject() {
 		HashMap<String, Object> map = new HashMap<String, Object>();
 		map.put("execId", execId);
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 5fbeed2..bacad6c 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -397,6 +397,7 @@ public class ExecutableNode {
 			this.setStatus(Status.KILLED);
 			this.setStartTime(killTime);
 			this.setEndTime(killTime);
+			this.setUpdateTime(killTime);
 		}
 	}
 	
@@ -404,6 +405,7 @@ public class ExecutableNode {
 		this.setStatus(Status.SKIPPED);
 		this.setStartTime(skipTime);
 		this.setEndTime(skipTime);
+		this.setUpdateTime(skipTime);
 	}
 	
 	private void updatePastAttempts(List<Object> pastAttemptsList) {
@@ -428,4 +430,13 @@ public class ExecutableNode {
 			}
 		}
 	}
+	
+	public int getRetries() {
+		return inputProps.getInt("retries", 0);
+	}
+	
+	public long getRetryBackoff() {
+		return inputProps.getLong("retry.backoff", 0);
+	}
 }
+
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 8b71e29..f2e979b 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -80,7 +80,6 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	File cacheDir;
 	
 	public ExecutorManager(Props props, ExecutorLoader loader, Map<String, Alerter> alters) throws ExecutorManagerException {
-		
 		this.executorLoader = loader;
 		this.loadRunningFlows();
 		executorHost = props.getString("executor.host", "localhost");
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index e756cfb..8c7a8ed 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -498,14 +498,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 		}
 		
 		ExecutableFlow flow = node.getExecutableFlow();
-		String flowId = flow.getFlowId();
-		
-		// If the main flow is not the parent, then we'll create a composite key 
-		// for flowID.
-		if (flow != node.getParentFlow()) {
-			flowId = node.getParentFlow().getNestedId();
-		}
-		
+		String flowId = node.getParentFlow().getFlowPath();
+		System.out.println("Uploading flowId " + flowId);
 		QueryRunner runner = createQueryRunner();
 		try {
 			runner.update(
@@ -556,7 +550,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 					node.getStatus().getNumVal(), 
 					outputParam,
 					node.getExecutableFlow().getExecutionId(),
-					node.getParentFlow().getNestedId(),
+					node.getParentFlow().getFlowPath(),
 					node.getId(),
 					node.getAttempt());
 		} catch (SQLException e) {
@@ -596,7 +590,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 					FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE, 
 					new FetchExecutableJobHandler(), 
 					execId, 
-					jobId);
+					jobId,
+					attempts);
 			if (info == null || info.isEmpty()) {
 				return null;
 			}
diff --git a/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
index 6d1bd39..630d54d 100644
--- a/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
+++ b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
@@ -1,6 +1,7 @@
 package azkaban.migration.schedule2trigger;
 
 import java.io.File;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -26,6 +27,7 @@ import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.trigger.builtin.ExecuteFlowAction;
 import azkaban.utils.Utils;
 
+@SuppressWarnings("deprecation")
 public class Schedule2Trigger {
 	
 	private static final Logger logger = Logger.getLogger(Schedule2Trigger.class);
@@ -162,6 +164,7 @@ public class Schedule2Trigger {
 		props.storeLocal(outputFile);
 	}
 
+	@SuppressWarnings("unchecked")
 	private static void file2ScheduleTrigger() throws Exception {
 		
 		TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
diff --git a/src/java/azkaban/utils/SwapQueue.java b/src/java/azkaban/utils/SwapQueue.java
new file mode 100644
index 0000000..9999306
--- /dev/null
+++ b/src/java/azkaban/utils/SwapQueue.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Queue that swaps its lists. Allows for non-blocking writes when reading.
+ * Swap should be called before every read.
+ */
+public class SwapQueue<T> implements Iterable<T> {
+	ArrayList<T> primaryQueue;
+	ArrayList<T> secondaryQueue;
+	
+	public SwapQueue() {
+		primaryQueue = new ArrayList<T>();
+		secondaryQueue = new ArrayList<T>();
+	}
+	
+	/**
+	 * Swaps primaryQueue with secondary queue. The previous primary queue will be released.
+	 */
+	public synchronized void swap() {
+		primaryQueue = secondaryQueue;
+		secondaryQueue = new ArrayList<T>();
+	}
+	
+	/**
+	 * Returns a count of the secondary queue.
+	 * @return
+	 */
+	public synchronized int getSwapQueueSize() {
+		return secondaryQueue.size();
+	}
+	
+	public synchronized int getPrimarySize() {
+		return primaryQueue.size();
+	}
+	
+	public synchronized void addAll(Collection<T> col) {
+		secondaryQueue.addAll(col);
+	}
+	
+	/**
+	 * Returns both the secondary and primary size
+	 * @return
+	 */
+	public synchronized int getSize() {
+		return secondaryQueue.size() + primaryQueue.size();
+	}
+	
+	public synchronized void add(T element) {
+		secondaryQueue.add(element);
+	}
+
+	/**
+	 * Returns iterator over the primary queue.
+	 */
+	@Override
+	public synchronized Iterator<T> iterator() {
+		return primaryQueue.iterator();
+	}
+}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index bcab22d..6f78ac8 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -17,7 +17,6 @@
 package azkaban.webapp.servlet;
 
 import java.io.IOException;
-import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -31,7 +30,6 @@ import javax.servlet.http.HttpServletResponse;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionAttempt;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutionOptions.FailureAction;
@@ -47,7 +45,6 @@ import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.user.Permission.Type;
 import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.JSONUtils;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
 import azkaban.webapp.plugin.PluginRegistry;
@@ -189,6 +186,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		page.add("attempt", attempt);
 		
 		ExecutableFlow flow = null;
+		ExecutableNode node = null;
 		try {
 			flow = executorManager.getExecutableFlow(execId);
 			if (flow == null) {
@@ -197,7 +195,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				return;
 			}
 
-			ExecutableNode node = flow.getExecutableNode(jobId);
+			node = flow.getExecutableNodePath(jobId);
 			if (node == null) {
 				page.add("errorMsg", "Job " + jobId + " doesn't exist in " + flow.getExecutionId());
 				return;
@@ -219,9 +217,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			page.render();
 			return;
 		}
-
+		
 		page.add("projectName", project.getName());
-		page.add("flowid", flow.getFlowId());
+		page.add("flowid", flow.getId());
+		page.add("parentflowid", node.getParentFlow().getFlowId());
+		page.add("jobname", node.getId());
 		
 		page.render();
 	}
@@ -441,7 +441,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		resp.setCharacterEncoding("utf-8");
 
 		try {
-			ExecutableNode node = exFlow.getExecutableNode(jobId);
+			ExecutableNode node = exFlow.getExecutableNodePath(jobId);
 			if (node == null) {
 				ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
 				return;
@@ -475,11 +475,12 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		if (project == null) {
 			return;
 		}
-		
+
 		String jobId = this.getParam(req, "jobid");
 		resp.setCharacterEncoding("utf-8");
+
 		try {
-			ExecutableNode node = exFlow.getExecutableNode(jobId);
+			ExecutableNode node = exFlow.getExecutableNodePath(jobId);
 			if (node == null) {
 				ret.put("error", "Job " + jobId + " doesn't exist in " + 
 						exFlow.getExecutionId());
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
index 4bf06c3..5b172c8 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
@@ -24,13 +24,14 @@
 					</div>
 					<div class="col-xs-6">
 						<div class="pull-right az-page-header-form">
-							<a href="${context}/manager?project=${projectName}&flow=${flowid}&job=$jobid" class="btn btn-info">Job Properties</a>
+							<a href="${context}/manager?project=${projectName}&flow=${parentflowid}&job=$jobname" class="btn btn-info">Job Properties</a>
 						</div>
 					</div>
 				</div>
 			</div>
 		</div>
 	
+	
     <div class="container-full">
 
   #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
@@ -41,7 +42,7 @@
 				<li><a href="${context}/manager?project=${projectName}"><strong>Project</strong> $projectName</a></li>
 				<li><a href="${context}/manager?project=${projectName}&flow=${flowid}"><strong>Flow</strong> $flowid</a></li>
 				<li><a href="${context}/executor?execid=${execid}#jobslist"><strong>Execution</strong> $execid</a></li>
-        <li class="active"><strong>Job</strong> $jobid</li>
+				<li class="active"><strong>Job</strong> $jobid</li>
 			</ol>
 
   ## Tabs
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index f551245..27f5d40 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -47,7 +47,6 @@
   #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
 #else
 
-
   #parse ("azkaban/webapp/servlet/velocity/jobdetailsheader.vm")
 
 	## Log content.
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
index bdeca48..63856ec 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
@@ -100,10 +100,10 @@
 			#end
                 </td>
                 <td>
-                  <a href="${context}/manager?project=${projectName}&flow=${job.flowId}&job=${jobid}">${jobid}</a>
+                  <a href="${context}/manager?project=${projectName}&flow=${job.immediateFlowId}&job=${jobid}">${jobid}</a>
                 </td>
                 <td>
-                  <a href="${context}/manager?project=${projectName}&flow=${job.flowId}">${job.flowId}</a>
+                  <a href="${context}/manager?project=${projectName}&flow=${job.headFlowId}">${job.flowId}</a>
                 </td>
                 <td>$utils.formatDate(${job.startTime})</td>
                 <td>$utils.formatDate(${job.endTime})</td>
@@ -114,7 +114,7 @@
                   </div>
                 </td>
                 <td class="logLink">
-                  <a href="${context}/executor?execid=${job.execId}&job=${jobid}&attempt=${job.attempt}">Logs</a>
+                  <a href="${context}/executor?execid=${job.execId}&job=${job.jobIdPath}&attempt=${job.attempt}">Logs</a>
                 </td>
               </tr>
 		#end
diff --git a/src/less/azkaban-graph.less b/src/less/azkaban-graph.less
index 5cd548c..5a5d213 100644
--- a/src/less/azkaban-graph.less
+++ b/src/less/azkaban-graph.less
@@ -124,6 +124,19 @@
 	opacity: 0.25;
 }
 
+.QUEUED > g > rect {
+	fill: #39b3d7;
+	stroke: #39b3d7;
+}
+
+.QUEUED > g > text {
+	fill: #FFF;
+}
+
+.QUEUED {
+	opacity: 0.5;
+}
+
 /* Edges */
 .edge {
 	stroke: #CCC;
diff --git a/src/less/flow.less b/src/less/flow.less
index 99f6be5..5e60909 100644
--- a/src/less/flow.less
+++ b/src/less/flow.less
@@ -169,7 +169,7 @@ li.tree-list-item {
   }
 
   &.subFilter > a > .expandarrow {
-    color : #3398cc;
+    color : #f19153;
   }
 
   > a {
diff --git a/src/less/tables.less b/src/less/tables.less
index ba34cb4..5d5aeea 100644
--- a/src/less/tables.less
+++ b/src/less/tables.less
@@ -129,7 +129,7 @@ table.table-properties {
     
      &.timeline {
       width: 280px;
-      padding: 0px;
+      padding: 0px 0px 0px 4px;
       height: 100%;
       vertical-align: bottom;
       margin: 0px;
diff --git a/src/web/js/azkaban/util/flow-loader.js b/src/web/js/azkaban/util/flow-loader.js
index ba193f0..97e447c 100644
--- a/src/web/js/azkaban/util/flow-loader.js
+++ b/src/web/js/azkaban/util/flow-loader.js
@@ -133,7 +133,7 @@ var nodeClickCallback = function(event, model, node) {
 		}
 
 		$.merge(menu, [
-				{title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+		//		{title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
 				{break: 1},
 				{title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
 				{title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
@@ -146,8 +146,8 @@ var nodeClickCallback = function(event, model, node) {
 	}
 	else {
 		menu = [
-				{title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
-				{break: 1},
+		//		{title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+		//		{break: 1},
 				{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
 				{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
 				{break: 1},
@@ -170,8 +170,8 @@ var jobClickCallback = function(event, model, node) {
 	if (type == "flow") {
 		var flowRequestURL = contextURL + "/manager?project=" + projectName + "&flow=" + node.flowId;
 		menu = [
-				{title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
-				{break: 1},
+		//		{title: "View Properties...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+		//		{break: 1},
 				{title: "Open Flow...", callback: function() {window.location.href=flowRequestURL;}},
 				{title: "Open Flow in New Window...", callback: function() {window.open(flowRequestURL);}},
 				{break: 1},
@@ -183,8 +183,8 @@ var jobClickCallback = function(event, model, node) {
 	}
 	else {
 		menu = [
-				{title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
-				{break: 1},
+		//		{title: "View Job...", callback: function() {openJobDisplayCallback(jobId, flowId, event)}},
+		//		{break: 1},
 				{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
 				{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
 				{break: 1},
diff --git a/src/web/js/azkaban/view/flow-execution-list.js b/src/web/js/azkaban/view/flow-execution-list.js
index 2a0ce22..0ec15a9 100644
--- a/src/web/js/azkaban/view/flow-execution-list.js
+++ b/src/web/js/azkaban/view/flow-execution-list.js
@@ -171,20 +171,29 @@ azkaban.ExecutionListView = Backbone.View.extend({
 			outerWidth = parseInt(outerWidth);
 		}
 		
+		var parentLastTime = data.endTime == -1 ? (new Date()).getTime() : data.endTime;
+		var parentStartTime = data.startTime;
+		
+		var factor = outerWidth / (flowLastTime - flowStartTime);
+		var outerProgressBarWidth = factor * (parentLastTime - parentStartTime);
+		var outerLeftMargin = factor * (parentStartTime - flowStartTime);
+			
 		var nodes = data.nodes;
-		var diff = flowLastTime - flowStartTime;
-		var factor = outerWidth/diff;
 		for (var i = 0; i < nodes.length; ++i) {
 			var node = nodes[i];
 			
 			// calculate the progress
 			var tr = node.joblistrow;
-			
+			var outerProgressBar = $(tr).find("> td.timeline > .flow-progress");
 			var progressBar = $(tr).find("> td.timeline > .flow-progress > .main-progress");
 			var offsetLeft = 0;
 			var minOffset = 0;
 			progressBar.attempt = 0;
 			
+			// Shift the outer progress
+			$(outerProgressBar).css("width", outerProgressBarWidth)
+			$(outerProgressBar).css("margin-left", outerLeftMargin);
+			
 			// Add all the attempts
 			if (node.pastAttempts) {
 				var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id + "&attempt=" +	node.pastAttempts.length;
@@ -212,7 +221,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 			}
 			
 			var nodeLastTime = node.endTime == -1 ? (new Date()).getTime() : node.endTime;
-			var left = Math.max((node.startTime-flowStartTime)*factor, minOffset);
+			var left = Math.max((node.startTime-parentStartTime)*factor, minOffset);
 			var margin = left - offsetLeft;
 			var width = Math.max((nodeLastTime - node.startTime)*factor, 3);
 			width = Math.min(width, outerWidth);
@@ -247,12 +256,6 @@ azkaban.ExecutionListView = Backbone.View.extend({
 			$(subFlowRow).show();
 		}
 	},
-	expandFlow: function(flow) {
-		for (var i = 0; i < flow.nodes.length; ++i) {
-			var node = flow.nodes[i];
-			///@TODO Expand.
-		}
-	},
 	addNodeRow: function(node, body) {
 		var self = this;
 		var tr = document.createElement("tr");
diff --git a/unit/executions/embedded/innerFlow.job b/unit/executions/embedded/innerFlow.job
index e9b3b89..da71d64 100644
--- a/unit/executions/embedded/innerFlow.job
+++ b/unit/executions/embedded/innerFlow.job
@@ -1,4 +1,5 @@
 type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
 seconds=1
 fail=false
 dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embedded3/innerFlow.job b/unit/executions/embedded3/innerFlow.job
index e9b3b89..da71d64 100644
--- a/unit/executions/embedded3/innerFlow.job
+++ b/unit/executions/embedded3/innerFlow.job
@@ -1,4 +1,5 @@
 type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
 seconds=1
 fail=false
 dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embedded3/innerFlow2.job b/unit/executions/embedded3/innerFlow2.job
index 2346982..e8430ee 100644
--- a/unit/executions/embedded3/innerFlow2.job
+++ b/unit/executions/embedded3/innerFlow2.job
@@ -1,4 +1,5 @@
 type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
 seconds=1
 fail=false
 dependencies=innerJobA
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 42026fc..c0f8025 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -315,7 +315,7 @@ public class FlowRunnerTest {
 		testStatus(exFlow, "job4", Status.FAILED);
 		testStatus(exFlow, "job6", Status.FAILED);
 		
-		Assert.assertTrue("Expected KILLED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.KILLED);
+		Assert.assertTrue("Expected FAILED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.FAILED);
 		
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 0ac1fb6..ada74b4 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -124,7 +124,9 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
 		compareStates(expectedStateMap, nodeMap);
 		
-		Props jobb = nodeMap.get("jobb").getInputProps();
+		ExecutableNode node = nodeMap.get("jobb");
+		Assert.assertEquals(Status.RUNNING, node.getStatus());
+		Props jobb = node.getInputProps();
 		Assert.assertEquals("test1.1", jobb.get("param1"));
 		Assert.assertEquals("test1.1", jobb.get("param1"));
 		Assert.assertEquals("test1.2", jobb.get("param2"));
@@ -584,7 +586,7 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
 		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
 		expectedStateMap.put("jobc", Status.FAILED);
-		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd", Status.FAILED);
 		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
 		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
 		expectedStateMap.put("jobe", Status.KILLED);
@@ -644,6 +646,13 @@ public class FlowRunnerTest2 {
 		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
 		compareStates(expectedStateMap, nodeMap);
 		
+		ExecutableNode node = nodeMap.get("jobd:innerFlow2");
+		ExecutableFlowBase base = node.getParentFlow();
+		for (String nodeId : node.getInNodes()) {
+			ExecutableNode inNode = base.getExecutableNode(nodeId);
+			System.out.println(inNode.getId() + " > " + inNode.getStatus());
+		}
+		
 		runner.retryFailures("me");
 		pause(500);
 		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
@@ -656,6 +665,7 @@ public class FlowRunnerTest2 {
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertTrue(thread.isAlive());
 		
+		
 		InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
 		InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
 		InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
@@ -732,18 +742,18 @@ public class FlowRunnerTest2 {
 		runner.cancel("me");
 		pause(250);
 		
-		expectedStateMap.put("jobb", Status.KILLED);
+		expectedStateMap.put("jobb", Status.FAILED);
 		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
 		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
 		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
 		expectedStateMap.put("jobc", Status.FAILED);
-		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd", Status.FAILED);
 		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
 		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
 		expectedStateMap.put("jobe", Status.KILLED);
 		expectedStateMap.put("jobf", Status.KILLED);
 		
-		Assert.assertEquals(Status.KILLED, flow.getStatus());
+		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertFalse(thread.isAlive());
 	}
@@ -801,7 +811,7 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
 		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
 		expectedStateMap.put("jobc", Status.FAILED);
-		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd", Status.FAILED);
 		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
 		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
 		expectedStateMap.put("jobe", Status.KILLED);
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 05641ec..6c1246d 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -202,5 +202,19 @@ public class MockExecutorLoader implements ExecutorLoader {
 		return null;
 	}
 
+	@Override
+	public List<Object> fetchAttachments(int execId, String name, int attempt)
+			throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void uploadAttachmentFile(ExecutableNode node, File file)
+			throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		
+	}
+
 
 }
\ No newline at end of file