azkaban-memoizeit

Details

diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index 4ba5889..cbf6333 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -44,9 +44,9 @@ public class LocalFlowWatcher extends FlowWatcher {
 					if (data instanceof ExecutableNode) {
 						ExecutableNode node = (ExecutableNode)data;
 						
-						if (node.getId()) {
-							
-						}
+//						if (node.getId()) {
+//							
+//						}
 						
 						handleJobFinished(node.getId(), node.getStatus());
 					}
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
index cecf1bc..7beb47a 100644
--- a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -1,6 +1,6 @@
 package azkaban.execapp.event;
 
-import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
@@ -11,7 +11,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
 	
 	private int execId;
 	private ExecutorLoader loader;
-	private ExecutableFlowBase flow;
+	private ExecutableFlow flow;
 	private RemoteUpdaterThread thread;
 	private boolean isShutdown = false;
 	
@@ -46,7 +46,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
 		@Override
 		public void run() {
 			do {
-				ExecutableFlowBase updateFlow = null;
+				ExecutableFlow updateFlow = null;
 				try {
 					updateFlow = loader.fetchExecutableFlow(execId);
 				} catch (ExecutorManagerException e) {
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 38ece4a..0259777 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -57,6 +57,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private ExecutableFlow flow;
 	private Thread flowRunnerThread;
 	private int numJobThreads = 10;
+	private ExecutionOptions.FailureAction failureAction;
 	
 	// Sync object for queuing
 	private Object mainSyncObj = new Object();
@@ -66,13 +67,13 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
 	
 	private Props globalProps;
-	private Props commonProps;
+//	private Props commonProps;
 	private final JobTypeManager jobtypeManager;
 	
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
 	private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
 	private Map<String, JobRunner> activeJobRunners = new ConcurrentHashMap<String, JobRunner>();
-
+	
 	// Used for pipelining
 	private Integer pipelineLevel = null;
 	private Integer pipelineExecId = null;
@@ -92,6 +93,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private boolean flowCancelled = false;
 	
 	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
+		this(flow, executorLoader, projectLoader, jobtypeManager, null);
+	}
+
+	public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, ProjectLoader projectLoader, JobTypeManager jobtypeManager, ExecutorService executorService) throws ExecutorManagerException {
 		this.execId = flow.getExecutionId();
 		this.flow = flow;
 		this.executorLoader = executorLoader;
@@ -102,8 +107,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 		ExecutionOptions options = flow.getExecutionOptions();
 		this.pipelineLevel = options.getPipelineLevel();
 		this.pipelineExecId = options.getPipelineExecutionId();
-
+		this.failureAction = options.getFailureAction();
 		this.proxyUsers = flow.getProxyUsers();
+		this.executorService = executorService;
 	}
 
 	public FlowRunner setFlowWatcher(FlowWatcher watcher) {
@@ -176,13 +182,28 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 	
+	@SuppressWarnings("unchecked")
 	private void setupFlowExecution() {
 		int projectId = flow.getProjectId();
 		int version = flow.getVersion();
 		String flowId = flow.getFlowId();
 		
 		// Add a bunch of common azkaban properties
-		commonProps = PropsUtils.addCommonFlowProperties(flow);
+		Props commonFlowProps = PropsUtils.addCommonFlowProperties(this.globalProps, flow);
+		
+		if (flow.getJobSource() != null) {
+			String source = flow.getJobSource();
+			Props flowProps = sharedProps.get(source);
+			flowProps.setParent(commonFlowProps);
+			commonFlowProps = flowProps;
+		}
+		
+		// If there are flow overrides, we apply them now.
+		Map<String,String> flowParam = flow.getExecutionOptions().getFlowParameters();
+		if (flowParam != null && !flowParam.isEmpty()) {
+			commonFlowProps = new Props(commonFlowProps, flowParam);
+		}
+		flow.setInputProps(commonFlowProps);
 		
 		// Create execution dir
 		createLogger(flowId);
@@ -199,7 +220,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 		// The current thread is used for interrupting blocks
 		flowRunnerThread = Thread.currentThread();
 		flowRunnerThread.setName("FlowRunner-exec-" + flow.getExecutionId());
-
 	}
 	
 	private void updateFlowReference() throws ExecutorManagerException {
@@ -274,11 +294,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 				props.setParent(inherits);
 			}
-			else {
-				String source = fprops.getSource();
-				Props props = sharedProps.get(source);
-				props.setParent(globalProps);
-			}
 		}
 	}
 	
@@ -303,47 +318,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 					continue;
 				}
 				else {
-					List<ExecutableNode> jobsReadyToRun = findReadyJobsToRun();
-					
-					if (!jobsReadyToRun.isEmpty() && !flowCancelled) {
-						for (ExecutableNode node : jobsReadyToRun) {
-							long currentTime = System.currentTimeMillis();
-							
-							// Queue a job only if it's ready to run.
-							if (node.getStatus() == Status.READY) {
-								// Collect output props from the job's dependencies.
-								Props outputProps = collectOutputProps(node);
-								node.setStatus(Status.QUEUED);
-								JobRunner runner = createJobRunner(node, outputProps);
-								logger.info("Submitting job " + node.getId() + " to run.");
-								try {
-									executorService.submit(runner);
-									jobRunners.put(node.getId(), runner);
-									activeJobRunners.put(node.getId(), runner);
-								} catch (RejectedExecutionException e) {
-									logger.error(e);
-								};
-								
-							} // If killed, then auto complete and KILL
-							else if (node.getStatus() == Status.KILLED) {
-								logger.info("Killing " + node.getId() + " due to prior errors.");
-								node.setStartTime(currentTime);
-								node.setEndTime(currentTime);
-								fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
-							} // If disabled, then we auto skip
-							else if (node.getStatus() == Status.DISABLED) {
-								logger.info("Skipping disabled job " + node.getId() + ".");
-								node.setStartTime(currentTime);
-								node.setEndTime(currentTime);
-								node.setStatus(Status.SKIPPED);
-								fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
-							}
-						}
-						
-						updateFlow();
-					}
-					else {
-						if (isFlowFinished() || flowCancelled ) {
+					if (!progressGraph(flow)) {
+						if (flow.isFlowFinished() || flowCancelled ) {
 							flowFinished = true;
 							break;
 						}
@@ -364,18 +340,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 					activeRunner.cancel();
 				}
 				
-				for (ExecutableNode node: flow.getExecutableNodes()) {
-					if (Status.isStatusFinished(node.getStatus())) {
-						continue;
-					}
-					else if (node.getStatus() == Status.DISABLED) {
-						node.setStatus(Status.SKIPPED);
-					}
-					else {
-						node.setStatus(Status.KILLED);
-					}
-					fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
-				}
+				flow.killNode(System.currentTimeMillis());
 			} catch (Exception e) {
 				logger.error(e);
 			}
@@ -403,125 +368,197 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 	
-	private List<ExecutableNode> findReadyJobsToRun() {
-		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
-		for (ExecutableNode node : flow.getExecutableNodes()) {
-			if (Status.isStatusFinished(node.getStatus())) {
-				continue;
-			}
-			else {
-				// Check the dependencies to see if execution conditions are met,
-				// and what the status should be set to.
-				Status impliedStatus = getImpliedStatus(node);
-				if (getImpliedStatus(node) != null) {
-					node.setStatus(impliedStatus);
-					jobsToRun.add(node);
-				}
-			}
-		}
-		
-		return jobsToRun;
-	}
+	private boolean progressGraph(ExecutableFlowBase flow) throws IOException {
+		List<ExecutableNode> jobsReadyToRun = flow.findNextJobsToRun();
 
-	private List<ExecutableNode> findReadyJobsToRun(ExecutableFlowBase flow) {
-		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
-		for (ExecutableNode node : flow.getExecutableNodes()) {
-			if (Status.isStatusFinished(node.getStatus())) {
-				continue;
-			}
-			else {
-				// Check the dependencies to see if execution conditions are met,
-				// and what the status should be set to.
-				Status impliedStatus = getImpliedStatus(node);
-				if (getImpliedStatus(node) != null) {
-					node.setStatus(impliedStatus);
-					jobsToRun.add(node);
+		if (!jobsReadyToRun.isEmpty()) {
+			long currentTime = System.currentTimeMillis();
+			for (ExecutableNode node: jobsReadyToRun) {
+				Status nextStatus = getImpliedStatus(node);
+				
+				// If the flow has seen previous failures and the flow has been cancelled, than 
+				if (nextStatus == Status.KILLED) {
+					logger.info("Killing " + node.getId() + " due to prior errors.");
+					node.killNode(currentTime);
+					fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+				}
+				else if (nextStatus == Status.DISABLED) {
+					logger.info("Skipping disabled job " + node.getId() + ".");
+					node.skipNode(currentTime);
+					fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+				}
+				else {
+					runExecutableNode(node);
 				}
 			}
+			
+			updateFlow();
+			return true;
 		}
 		
-		return jobsToRun;
+		return false;
 	}
 	
-	private boolean isFlowFinished() {
-		if (!activeJobRunners.isEmpty()) {
-			return false;
+	private void prepareJobProperties(ExecutableNode node) throws IOException {
+		Props props = null;
+		// The following is the hiearchical ordering of dependency resolution
+		// 1. Parent Flow Properties
+		ExecutableFlowBase parentFlow = node.getParentFlow();
+		if (parentFlow != null) {
+			props = parentFlow.getInputProps();
 		}
 		
-		for (String end: flow.getEndNodes()) {
-			ExecutableNode node = flow.getExecutableNode(end);
-			if (!Status.isStatusFinished(node.getStatus()) ) {
-				return false;
+		// 2. Shared Properties
+		String sharedProps = node.getPropsSource();
+		if (sharedProps != null) {
+			Props shared = this.sharedProps.get(sharedProps);
+			if (shared != null) {
+				// Clone because we may clobber
+				shared = Props.clone(shared);
+				shared.setEarliestAncestor(props);
+				props = shared;
 			}
 		}
+
+		// 3. Output Properties
+		Props outputProps = collectOutputProps(node);
+		if (outputProps != null) {
+			outputProps.setEarliestAncestor(props);
+			props = outputProps;
+		}
 		
-		return true;
-	}
-	
-	private Props collectOutputProps(ExecutableNode node) {
-		Props previousOutput = null;
-		// Iterate the in nodes again and create the dependencies
-		for (String dependency : node.getInNodes()) {
-			Props output = jobOutputProps.get(dependency);
-			if (output != null) {
-				output = Props.clone(output);
-				output.setParent(previousOutput);
-				previousOutput = output;
-			}
+		// 4. The job source
+		Props jobSource = loadJobProps(node);
+		if (jobSource != null) {
+			jobSource.setParent(props);
+			props = jobSource;
 		}
 		
-		return previousOutput;
+		node.setInputProps(props);
 	}
 	
-	private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) {
+	private Props loadJobProps(ExecutableNode node) throws IOException {
+		Props props = null;
 		String source = node.getJobSource();
-		String propsSource = node.getPropsSource();
-
-		// If no properties are set, we just set the global properties.
-		Props parentProps = propsSource == null ? globalProps : sharedProps.get(propsSource);
-
-		// Set up overrides
-		ExecutionOptions options = flow.getExecutionOptions();
-		@SuppressWarnings("unchecked")
-		Props flowProps = new Props(null, options.getFlowParameters()); 
-		flowProps.putAll(commonProps);
-		flowProps.setParent(parentProps);
-		parentProps = flowProps;
-
-		// We add the previous job output and put into this props.
-		if (previousOutput != null) {
-			Props earliestParent = previousOutput.getEarliestAncestor();
-			earliestParent.setParent(parentProps);
-
-			parentProps = previousOutput;
+		if (source == null) {
+			return null;
 		}
 		
-		// Load job file.
-		File path = new File(execDir, source);
-		Props prop = null;
-		
 		// load the override props if any
 		try {
-			prop = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId()+".jor");
+			props = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId()+".jor");
 		}
 		catch(ProjectManagerException e) {
 			e.printStackTrace();
 			logger.error("Error loading job override property for job " + node.getId());
 		}
-		if(prop == null) {
+		
+		File path = new File(execDir, source);
+		if(props == null) {
 			// if no override prop, load the original one on disk
 			try {
-				prop = new Props(null, path);				
+				props = new Props(null, path);				
 			} catch (IOException e) {
 				e.printStackTrace();
 				logger.error("Error loading job file " + source + " for job " + node.getId());
 			}
 		}
 		// setting this fake source as this will be used to determine the location of log files.
-		prop.setSource(path.getPath());
-		prop.setParent(parentProps);
+		props.setSource(path.getPath());
+		return props;
+	}
+	
+	private void runExecutableNode(ExecutableNode node) throws IOException {
+		// Collect output props from the job's dependencies.
+		prepareJobProperties(node);
+		
+		if (node instanceof ExecutableFlowBase) {
+			node.setStatus(Status.RUNNING);
+			node.setStartTime(System.currentTimeMillis());
+			
+			logger.info("Starting subflow " + node.getId() + ".");
+		}
+		else {
+			node.setStatus(Status.QUEUED);
+			JobRunner runner = createJobRunner(node);
+			logger.info("Submitting job " + node.getId() + " to run.");
+			try {
+				executorService.submit(runner);
+				jobRunners.put(node.getId(), runner);
+				activeJobRunners.put(node.getId(), runner);
+			} catch (RejectedExecutionException e) {
+				logger.error(e);
+			};
+		}
+	}
+	
+	/**
+	 * Determines what the state of the next node should be.
+	 * 
+	 * @param node
+	 * @return
+	 */
+	public Status getImpliedStatus(ExecutableNode node) {
+		if (flowFailed && failureAction == ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING) {
+			return Status.KILLED;
+		}
+		else if (node.getStatus() == Status.DISABLED) {
+			return Status.DISABLED;
+		}
 		
-		JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager);
+		ExecutableFlowBase flow = node.getParentFlow();
+		boolean shouldKill = false;
+		for (String dependency: node.getInNodes()) {
+			ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
+			Status depStatus = dependencyNode.getStatus();
+			
+			switch (depStatus) {
+			case FAILED:
+			case KILLED:
+				shouldKill = true;
+			case SKIPPED:
+			case SUCCEEDED:
+			case FAILED_SUCCEEDED:
+				continue;
+			default:
+				// Should never come here.
+				return null;
+			}
+		}
+
+		if (shouldKill) {
+			return Status.KILLED;
+		}
+		
+		// If it's disabled but ready to run, we want to make sure it continues being disabled.
+		if (node.getStatus() == Status.DISABLED) {
+			return Status.DISABLED;
+		}
+		
+		// All good to go, ready to run.
+		return Status.READY;
+	}
+	
+	private Props collectOutputProps(ExecutableNode node) {
+		Props previousOutput = null;
+		// Iterate the in nodes again and create the dependencies
+		for (String dependency : node.getInNodes()) {
+			Props output = jobOutputProps.get(dependency);
+			if (output != null) {
+				output = Props.clone(output);
+				output.setParent(previousOutput);
+				previousOutput = output;
+			}
+		}
+		
+		return previousOutput;
+	}
+	
+	private JobRunner createJobRunner(ExecutableNode node) {
+		// Load job file.
+		File path = new File(execDir, node.getJobSource());
+		
+		JobRunner jobRunner = new JobRunner(node, path.getParentFile(), executorLoader, jobtypeManager);
 		if (watcher != null) {
 			jobRunner.setPipeline(watcher, pipelineLevel);
 		}
@@ -689,62 +726,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private void interrupt() {
 		flowRunnerThread.interrupt();
 	}
-	
-	private Status getImpliedStatus(ExecutableNode node) {
-		switch(node.getStatus()) {
-		case FAILED:
-		case KILLED:
-		case SKIPPED:
-		case SUCCEEDED:
-		case FAILED_SUCCEEDED:
-		case QUEUED:
-		case RUNNING:
-			return null;
-		default:
-			break;
-		}
-		
-		boolean shouldKill = false;
-		for (String dependency : node.getInNodes()) {
-			ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
-			
-			Status depStatus = dependencyNode.getStatus();
-			switch (depStatus) {
-			case FAILED:
-			case KILLED:
-				shouldKill = true;
-			case SKIPPED:
-			case SUCCEEDED:
-			case FAILED_SUCCEEDED:
-				continue;
-			case RUNNING:
-			case QUEUED:
-			case DISABLED:
-				return null;
-			default:
-				// Return null means it's not ready to run.
-				return null;
-			}
-		}
-		
-		ExecutionOptions options = flow.getExecutionOptions();
-		if (shouldKill || flowCancelled || (flowFailed && options.getFailureAction() != FailureAction.FINISH_ALL_POSSIBLE)) {
-			return Status.KILLED;
-		}
-		
-		// If it's disabled but ready to run, we want to make sure it continues being disabled.
-		if (node.getStatus() == Status.DISABLED) {
-			return Status.DISABLED;
-		}
-		
-		// All good to go, ready to run.
-		return Status.READY;
-	}
-	
+
 	private class JobRunnerEventListener implements EventListener {
 		public JobRunnerEventListener() {
 		}
-
+		// TODO: HANDLE subflow execution
 		@Override
 		public synchronized void handleEvent(Event event) {
 			JobRunner runner = (JobRunner)event.getRunner();
@@ -774,7 +760,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 						}
 						else {
 							if (!runner.isCancelled() && runner.getRetries() > 0) {
-					
 								logger.info("Job " + node.getId() + " has run out of retry attempts");
 								// Setting delayed execution to 0 in case this is manually re-tried.
 								node.setDelayedExecution(0);
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index c4ea32f..f77f1b2 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -88,8 +88,8 @@ public class JobRunner extends EventHandler implements Runnable {
 	private boolean cancelled = false;
 	private BlockingStatus currentBlockStatus = null;
 	
-	public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
-		this.props = props;
+	public JobRunner(ExecutableNode node, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
+		this.props = node.getInputProps();
 		this.node = node;
 		this.workingDir = workingDir;
 		
@@ -149,8 +149,14 @@ public class JobRunner extends EventHandler implements Runnable {
 			logger = Logger.getLogger(loggerName);
 
 			// Create file appender
-			String logName = createLogFileName(this.executionId, this.jobId, node.getAttempt());
+			String id = this.jobId;
+			if (node.getExecutableFlow() != node.getParentFlow()) {
+				id = node.getParentFlow().getNestedId("._.");
+			}
+			
+			String logName = createLogFileName(this.executionId, id, node.getAttempt());
 			logFile = new File(workingDir, logName);
+			
 			String absolutePath = logFile.getAbsolutePath();
 
 			jobAppender = null;
@@ -306,7 +312,6 @@ public class JobRunner extends EventHandler implements Runnable {
 					);
 					Arrays.sort(files, Collections.reverseOrder());
 					
-					
 					loader.uploadLogFile(executionId, this.jobId, node.getAttempt(), files);
 				} catch (ExecutorManagerException e) {
 					flowLogger.error("Error writing out logs for job " + this.jobId, e);
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 6141959..e9f81f7 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -34,7 +34,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
 	public static final String PROJECTID_PARAM ="projectId";
 	public static final String SCHEDULEID_PARAM ="scheduleId";
 	public static final String SUBMITUSER_PARAM = "submitUser";
-	public static final String SUBMITTIME_PARAM = "submitUser";
+	public static final String SUBMITTIME_PARAM = "submitTime";
 	public static final String VERSION_PARAM = "version";
 	public static final String PROXYUSERS_PARAM = "proxyUsers";
 	
@@ -95,8 +95,8 @@ public class ExecutableFlow extends ExecutableFlowBase {
 		return executionOptions;
 	}
 	
-	private void setFlow(Project project, Flow flow) {
-		super.setFlow(project, flow, null);
+	protected void setFlow(Project project, Flow flow) {
+		super.setFlow(project, flow);
 		executionOptions = new ExecutionOptions();
 
 		if (flow.getSuccessEmails() != null) {
@@ -195,34 +195,46 @@ public class ExecutableFlow extends ExecutableFlowBase {
 	public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
 		ExecutableFlow exFlow = new ExecutableFlow();
 		HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
-		
 		exFlow.fillExecutableFromMapObject(flowObj);
-		exFlow.executionId = (Integer)flowObj.get(EXECUTIONID_PARAM);
-		exFlow.executionPath = (String)flowObj.get(EXECUTIONPATH_PARAM);
+		
+		return exFlow;
+	}
+	
+	@Override
+	@SuppressWarnings("unchecked")
+	public void fillExecutableFromMapObject(Map<String, Object> flowObj) {
+		super.fillExecutableFromMapObject(flowObj);
+		
+		this.executionId = (Integer)flowObj.get(EXECUTIONID_PARAM);
+		this.executionPath = (String)flowObj.get(EXECUTIONPATH_PARAM);
 
-		exFlow.projectId = (Integer)flowObj.get(PROJECTID_PARAM);
+		this.projectId = (Integer)flowObj.get(PROJECTID_PARAM);
 		if (flowObj.containsKey(SCHEDULEID_PARAM)) {
-			exFlow.scheduleId = (Integer)flowObj.get(SCHEDULEID_PARAM);
+			this.scheduleId = (Integer)flowObj.get(SCHEDULEID_PARAM);
 		}
-		exFlow.submitUser = (String)flowObj.get(SUBMITUSER_PARAM);
-		exFlow.version = (Integer)flowObj.get(VERSION_PARAM);
 		
-		exFlow.submitTime = JSONUtils.getLongFromObject(flowObj.get(SUBMITTIME_PARAM));
+		if (flowObj.containsKey(SUBMITUSER_PARAM)) {
+			this.submitUser = (String)flowObj.get(SUBMITUSER_PARAM);
+		}
+		else {
+			this.submitUser = null;
+		}
+		this.version = (Integer)flowObj.get(VERSION_PARAM);
+		
+		this.submitTime = JSONUtils.getLongFromObject(flowObj.get(SUBMITTIME_PARAM));
 		
 		if (flowObj.containsKey(EXECUTIONOPTIONS_PARAM)) {
-			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get(EXECUTIONOPTIONS_PARAM));
+			this.executionOptions = ExecutionOptions.createFromObject(flowObj.get(EXECUTIONOPTIONS_PARAM));
 		}
 		else {
 			// for backwards compatibility should remove in a few versions.
-			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj);
+			this.executionOptions = ExecutionOptions.createFromObject(flowObj);
 		}
 		
 		if(flowObj.containsKey(PROXYUSERS_PARAM)) {
 			ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get(PROXYUSERS_PARAM);
-			exFlow.addAllProxyUsers(proxyUserList);
+			this.addAllProxyUsers(proxyUserList);
 		}
-		
-		return exFlow;
 	}
 	
 	public Map<String, Object> toUpdateObject(long lastUpdateTime) {
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 311c33b..10db20f 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -44,7 +44,7 @@ public class ExecutableFlowBase extends ExecutableNode {
 	public ExecutableFlowBase(Project project, Node node, Flow flow, ExecutableFlowBase parent) {
 		super(node, parent);
 
-		setFlow(project, flow, parent);
+		setFlow(project, flow);
 	}
 	
 	public ExecutableFlowBase() {
@@ -78,15 +78,15 @@ public class ExecutableFlowBase extends ExecutableNode {
 		return flowId;
 	}
 	
-	public String getNestedId() {
+	public String getNestedId(String delimiter) {
 		if (this.getParentFlow() != null) {
-			return this.getParentFlow().getNestedId() + ":" + getId();
+			return this.getParentFlow().getNestedId(delimiter) + delimiter + getId();
 		}
 		
 		return getId();
 	}
 	
-	protected void setFlow(Project project, Flow flow, ExecutableFlowBase parent) {
+	protected void setFlow(Project project, Flow flow) {
 		this.flowId = flow.getId();
 		
 		for (Node node: flow.getNodes()) {
@@ -95,11 +95,11 @@ public class ExecutableFlowBase extends ExecutableNode {
 				String embeddedFlowId = node.getEmbeddedFlowId();
 				Flow subFlow = project.getFlow(embeddedFlowId);
 				
-				ExecutableFlowBase embeddedFlow = new ExecutableFlowBase(project, node, subFlow, parent);
+				ExecutableFlowBase embeddedFlow = new ExecutableFlowBase(project, node, subFlow, this);
 				executableNodes.put(id, embeddedFlow);
 			}
 			else {
-				ExecutableNode exNode = new ExecutableNode(node, parent);
+				ExecutableNode exNode = new ExecutableNode(node, this);
 				executableNodes.put(id, exNode);
 			}
 		}
@@ -270,4 +270,78 @@ public class ExecutableFlowBase extends ExecutableNode {
 			exNode.applyUpdateObject(node);
 		}
 	}
+	
+	public void reEnableDependents(ExecutableNode ... nodes) {
+		for(ExecutableNode node: nodes) {
+			for(String dependent: node.getOutNodes()) {
+				ExecutableNode dependentNode = getExecutableNode(dependent);
+				
+				if (dependentNode.getStatus() == Status.KILLED) {
+					dependentNode.setStatus(Status.READY);
+					dependentNode.setUpdateTime(System.currentTimeMillis());
+					reEnableDependents(dependentNode);
+	
+					if (dependentNode instanceof ExecutableFlowBase) {
+						
+						((ExecutableFlowBase)dependentNode).reEnableDependents();
+					}
+				}
+				else if (dependentNode.getStatus() == Status.SKIPPED) {
+					dependentNode.setStatus(Status.DISABLED);
+					dependentNode.setUpdateTime(System.currentTimeMillis());
+					reEnableDependents(dependentNode);
+				}
+			}
+		}
+	}
+	
+	/**
+	 * Only returns true if the status of all finished nodes is true.
+	 * @return
+	 */
+	public boolean isFlowFinished() {
+		for (String end: getEndNodes()) {
+			ExecutableNode node = getExecutableNode(end);
+			if (!Status.isStatusFinished(node.getStatus()) ) {
+				return false;
+			}
+		}
+		
+		return true;
+	}
+	
+	/**
+	 * Finds all jobs which are ready to run. This occurs when all of its 
+	 * dependency nodes are finished running.
+	 * 
+	 * @param flow
+	 * @return
+	 */
+	public List<ExecutableNode> findNextJobsToRun() {
+		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
+		
+		nodeloop:
+		for (ExecutableNode node: executableNodes.values()) {
+			if(Status.isStatusFinished(node.getStatus())) {
+				continue;
+			}
+
+			if ((node instanceof ExecutableFlowBase) && Status.isStatusRunning(node.getStatus())) {
+				// If the flow is still running, we traverse into the flow
+				jobsToRun.addAll(((ExecutableFlowBase)node).findNextJobsToRun());
+			}
+			else {
+				for (String dependency: node.getInNodes()) {
+					// We find that the outer-loop is unfinished.
+					if (!Status.isStatusFinished(getExecutableNode(dependency).getStatus())) {
+						continue nodeloop;
+					}
+				}
+
+				jobsToRun.add(node);
+			}
+		}
+		
+		return jobsToRun;
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 64a0a35..6f41c56 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -41,6 +41,7 @@ public class ExecutableNode {
 	private Set<String> inNodes = null;
 	private Set<String> outNodes = null;
 	
+	private Props inputProps;
 	private Props outputProps;
 	
 	public static final String ATTEMPT_PARAM = "attempt";
@@ -60,14 +61,14 @@ public class ExecutableNode {
 	}
 	
 	public ExecutableNode(Node node, ExecutableFlowBase parent) {
-		this(node.getId(), node.getJobSource(), node.getPropsSource(), parent);
+		this(node.getId(), node.getType(), node.getJobSource(), node.getPropsSource(), parent);
 	}
 
-	public ExecutableNode(String id, String jobSource, String propsSource, ExecutableFlowBase parent) {
+	public ExecutableNode(String id, String type, String jobSource, String propsSource, ExecutableFlowBase parent) {
 		this.id = id;
 		this.jobSource = jobSource;
 		this.propsSource = propsSource;
-		
+		this.type = type;
 		setParentFlow(parent);
 	}
 	
@@ -176,10 +177,18 @@ public class ExecutableNode {
 		return propsSource;
 	}
 	
+	public void setInputProps(Props input) {
+		this.inputProps = input;
+	}
+	
 	public void setOutputProps(Props output) {
 		this.outputProps = output;
 	}
 
+	public Props getInputProps() {
+		return this.inputProps;
+	}
+	
 	public Props getOutputProps() {
 		return outputProps;
 	}
@@ -222,7 +231,6 @@ public class ExecutableNode {
 		this.setStatus(Status.READY);
 	}
 	
-	
 	public List<Object> getAttemptObjects() {
 		ArrayList<Object> array = new ArrayList<Object>();
 		
@@ -247,6 +255,7 @@ public class ExecutableNode {
 		objMap.put(ENDTIME_PARAM, endTime);
 		objMap.put(UPDATETIME_PARAM, updateTime);
 		objMap.put(TYPE_PARAM, type);
+		objMap.put(ATTEMPT_PARAM, attempt);
 		
 		if (inNodes != null) {
 			objMap.put(INNODES_PARAM, inNodes);
@@ -283,6 +292,7 @@ public class ExecutableNode {
 		this.endTime = JSONUtils.getLongFromObject(objMap.get(ENDTIME_PARAM));
 		this.updateTime = JSONUtils.getLongFromObject(objMap.get(UPDATETIME_PARAM));
 		this.type = (String)objMap.get(TYPE_PARAM);
+		this.attempt = (Integer)objMap.get(ATTEMPT_PARAM);
 		
 		if (objMap.containsKey(INNODES_PARAM)) {
 			this.inNodes = new HashSet<String>();
@@ -362,6 +372,23 @@ public class ExecutableNode {
 		}
 	}
 	
+	public void killNode(long killTime) {
+		if (this.status == Status.DISABLED) {
+			skipNode(killTime);
+		}
+		else {
+			this.setStatus(Status.KILLED);
+			this.setStartTime(killTime);
+			this.setEndTime(killTime);
+		}
+	}
+	
+	public void skipNode(long skipTime) {
+		this.setStatus(Status.SKIPPED);
+		this.setStartTime(skipTime);
+		this.setEndTime(skipTime);
+	}
+	
 	private void updatePastAttempts(List<Object> pastAttemptsList) {
 		if (pastAttemptsList == null) {
 			return;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index a231c41..9ef6722 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -379,7 +379,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 		
 		// if the main flow is not the parent, then we'll create a composite key for flowID
 		if (flow != node.getParentFlow()) {
-			flowId = node.getParentFlow().getNestedId();
+			flowId = node.getParentFlow().getNestedId("+");
 		}
 		
 		QueryRunner runner = createQueryRunner();
@@ -404,7 +404,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 	
 	@Override
 	public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
-		final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND job_id=? AND attempt=?";
+		final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
 		
 		byte[] outputParam = null;
 		Props outputProps = node.getOutputProps();
diff --git a/src/java/azkaban/executor/Status.java b/src/java/azkaban/executor/Status.java
index d8215df..f796378 100644
--- a/src/java/azkaban/executor/Status.java
+++ b/src/java/azkaban/executor/Status.java
@@ -56,4 +56,14 @@ public enum Status {
 			return false;
 		}
 	}
+	
+	public static boolean isStatusRunning(Status status) {
+		switch (status) {
+		case RUNNING:
+		case FAILED_FINISHING:
+			return true;
+		default:
+			return false;
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/utils/Props.java b/src/java/azkaban/utils/Props.java
index 12da4a7..4aeefa1 100644
--- a/src/java/azkaban/utils/Props.java
+++ b/src/java/azkaban/utils/Props.java
@@ -156,7 +156,12 @@ public class Props {
 			putAll(props);
 		}
 	}
-
+	
+	public void setEarliestAncestor(Props parent) {
+		Props props = getEarliestAncestor();
+		props.setParent(parent);
+	}
+	
 	public Props getEarliestAncestor() {
 		if (_parent == null) {
 			return this;
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index 14da916..bdaac5d 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -208,27 +208,27 @@ public class PropsUtils {
 		return buffer.toString();
 	}
 	
-	public static Props addCommonFlowProperties(final ExecutableFlowBase flow) {
-		Props parentProps = new Props();
+	public static Props addCommonFlowProperties(Props parentProps, final ExecutableFlowBase flow) {
+		Props props = new Props(parentProps);
 
-		parentProps.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
-		parentProps.put(CommonJobProperties.EXEC_ID, flow.getExecutionId());
-		parentProps.put(CommonJobProperties.PROJECT_ID, flow.getProjectId());
-		parentProps.put(CommonJobProperties.PROJECT_VERSION, flow.getVersion());
-		parentProps.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
+		props.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
+		props.put(CommonJobProperties.EXEC_ID, flow.getExecutionId());
+		props.put(CommonJobProperties.PROJECT_ID, flow.getProjectId());
+		props.put(CommonJobProperties.PROJECT_VERSION, flow.getVersion());
+		props.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
 
 		DateTime loadTime = new DateTime();
 
-		parentProps.put(CommonJobProperties.FLOW_START_TIMESTAMP, loadTime.toString());
-		parentProps.put(CommonJobProperties.FLOW_START_YEAR, loadTime.toString("yyyy"));
-		parentProps.put(CommonJobProperties.FLOW_START_MONTH, loadTime.toString("MM"));
-		parentProps.put(CommonJobProperties.FLOW_START_DAY, loadTime.toString("dd"));
-		parentProps.put(CommonJobProperties.FLOW_START_HOUR, loadTime.toString("HH"));
-		parentProps.put(CommonJobProperties.FLOW_START_MINUTE, loadTime.toString("mm"));
-		parentProps.put(CommonJobProperties.FLOW_START_SECOND, loadTime.toString("ss"));
-		parentProps.put(CommonJobProperties.FLOW_START_MILLISSECOND, loadTime.toString("SSS"));
-		parentProps.put(CommonJobProperties.FLOW_START_TIMEZONE, loadTime.toString("ZZZZ"));
-		return parentProps;
+		props.put(CommonJobProperties.FLOW_START_TIMESTAMP, loadTime.toString());
+		props.put(CommonJobProperties.FLOW_START_YEAR, loadTime.toString("yyyy"));
+		props.put(CommonJobProperties.FLOW_START_MONTH, loadTime.toString("MM"));
+		props.put(CommonJobProperties.FLOW_START_DAY, loadTime.toString("dd"));
+		props.put(CommonJobProperties.FLOW_START_HOUR, loadTime.toString("HH"));
+		props.put(CommonJobProperties.FLOW_START_MINUTE, loadTime.toString("mm"));
+		props.put(CommonJobProperties.FLOW_START_SECOND, loadTime.toString("ss"));
+		props.put(CommonJobProperties.FLOW_START_MILLISSECOND, loadTime.toString("SSS"));
+		props.put(CommonJobProperties.FLOW_START_TIMEZONE, loadTime.toString("ZZZZ"));
+		return props;
 	}
 
 	public static String toJSONString(Props props, boolean localOnly) {
diff --git a/unit/executions/embeddedBad/innerFlow.job b/unit/executions/embeddedBad/innerFlow.job
new file mode 100644
index 0000000..da71d64
--- /dev/null
+++ b/unit/executions/embeddedBad/innerFlow.job
@@ -0,0 +1,5 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
+dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embeddedBad/innerJobA.job b/unit/executions/embeddedBad/innerJobA.job
new file mode 100644
index 0000000..665b38d
--- /dev/null
+++ b/unit/executions/embeddedBad/innerJobA.job
@@ -0,0 +1,4 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/embeddedBad/innerJobB.job b/unit/executions/embeddedBad/innerJobB.job
new file mode 100644
index 0000000..dc29b4a
--- /dev/null
+++ b/unit/executions/embeddedBad/innerJobB.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=jobe
+dependencies=innerJobA
diff --git a/unit/executions/embeddedBad/innerJobC.job b/unit/executions/embeddedBad/innerJobC.job
new file mode 100644
index 0000000..178bbef
--- /dev/null
+++ b/unit/executions/embeddedBad/innerJobC.job
@@ -0,0 +1,5 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
+dependencies=innerJobA
diff --git a/unit/executions/embeddedBad/joba.job b/unit/executions/embeddedBad/joba.job
new file mode 100644
index 0000000..665b38d
--- /dev/null
+++ b/unit/executions/embeddedBad/joba.job
@@ -0,0 +1,4 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/embeddedBad/jobb.job b/unit/executions/embeddedBad/jobb.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embeddedBad/jobb.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
diff --git a/unit/executions/embeddedBad/jobc.job b/unit/executions/embeddedBad/jobc.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embeddedBad/jobc.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
diff --git a/unit/executions/embeddedBad/jobd.job b/unit/executions/embeddedBad/jobd.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embeddedBad/jobd.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
diff --git a/unit/executions/embeddedBad/jobe.job b/unit/executions/embeddedBad/jobe.job
new file mode 100644
index 0000000..fe986d5
--- /dev/null
+++ b/unit/executions/embeddedBad/jobe.job
@@ -0,0 +1,5 @@
+type=javaprocess
+java.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
+dependencies=jobb,jobc,jobd
diff --git a/unit/executions/embeddedBad/selfreference.job b/unit/executions/embeddedBad/selfreference.job
new file mode 100644
index 0000000..708f351
--- /dev/null
+++ b/unit/executions/embeddedBad/selfreference.job
@@ -0,0 +1,2 @@
+type=flow
+flow.name=selfreference
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/ExecutableFlowTest.java b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
new file mode 100644
index 0000000..7a2f55d
--- /dev/null
+++ b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
@@ -0,0 +1,355 @@
+package azkaban.test.executor;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionAttempt;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.Status;
+import azkaban.flow.Flow;
+import azkaban.flow.FlowProps;
+import azkaban.project.Project;
+import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+
+public class ExecutableFlowTest {
+	private Project project;
+	
+    @Before
+    public void setUp() throws Exception {
+		Logger logger = Logger.getLogger(this.getClass());
+    	DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+    	loader.loadProjectFlow(new File("unit/executions/embedded"));
+    	Assert.assertEquals(0, loader.getErrors().size());
+    	
+    	project = new Project(11, "myTestProject");
+    	project.setFlows(loader.getFlowMap());
+    	project.setVersion(123);
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+    }
+	
+	@Test
+	public void testExecutorFlowCreation() throws Exception {
+		Flow flow = project.getFlow("jobe");
+		Assert.assertNotNull(flow);
+		
+		ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+		Assert.assertNotNull(exFlow.getExecutableNode("joba"));
+		Assert.assertNotNull(exFlow.getExecutableNode("jobb"));
+		Assert.assertNotNull(exFlow.getExecutableNode("jobc"));
+		Assert.assertNotNull(exFlow.getExecutableNode("jobd"));
+		Assert.assertNotNull(exFlow.getExecutableNode("jobe"));
+		
+		Assert.assertFalse(exFlow.getExecutableNode("joba") instanceof ExecutableFlowBase);
+		Assert.assertTrue(exFlow.getExecutableNode("jobb") instanceof ExecutableFlowBase);
+		Assert.assertTrue(exFlow.getExecutableNode("jobc") instanceof ExecutableFlowBase);
+		Assert.assertTrue(exFlow.getExecutableNode("jobd") instanceof ExecutableFlowBase);
+		Assert.assertFalse(exFlow.getExecutableNode("jobe") instanceof ExecutableFlowBase);
+		
+		ExecutableFlowBase jobbFlow = (ExecutableFlowBase)exFlow.getExecutableNode("jobb");
+		ExecutableFlowBase jobcFlow = (ExecutableFlowBase)exFlow.getExecutableNode("jobc");
+		ExecutableFlowBase jobdFlow = (ExecutableFlowBase)exFlow.getExecutableNode("jobd");
+
+		Assert.assertEquals("innerFlow", jobbFlow.getFlowId());
+		Assert.assertEquals("jobb", jobbFlow.getId());
+		Assert.assertEquals(4, jobbFlow.getExecutableNodes().size());
+		
+		Assert.assertEquals("innerFlow", jobcFlow.getFlowId());
+		Assert.assertEquals("jobc", jobcFlow.getId());
+		Assert.assertEquals(4, jobcFlow.getExecutableNodes().size());
+		
+		Assert.assertEquals("innerFlow", jobdFlow.getFlowId());
+		Assert.assertEquals("jobd", jobdFlow.getId());
+		Assert.assertEquals(4, jobdFlow.getExecutableNodes().size());
+	}
+	
+	@Test
+	public void testExecutorFlowJson() throws Exception {
+		Flow flow = project.getFlow("jobe");
+		Assert.assertNotNull(flow);
+		
+		ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+		
+		Object obj = exFlow.toObject();
+		String exFlowJSON = JSONUtils.toJSON(obj);
+		@SuppressWarnings("unchecked")
+		Map<String,Object> flowObjMap = (Map<String,Object>)JSONUtils.parseJSONFromString(exFlowJSON);
+		
+		ExecutableFlow parsedExFlow = ExecutableFlow.createExecutableFlowFromObject(flowObjMap);
+		testEquals(exFlow, parsedExFlow);
+	}
+	
+	@Test
+	public void testExecutorFlowJson2() throws Exception {
+		Flow flow = project.getFlow("jobe");
+		Assert.assertNotNull(flow);
+		
+		ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+		exFlow.setExecutionId(101);
+		exFlow.setAttempt(2);
+		exFlow.setDelayedExecution(1000);
+		
+		ExecutionOptions options = new ExecutionOptions();
+		options.setConcurrentOption("blah");
+		options.setDisabledJobs(Arrays.asList(new String[] {"bee", null, "boo"}));
+		options.setFailureAction(FailureAction.CANCEL_ALL);
+		options.setFailureEmails(Arrays.asList(new String[] {"doo", null, "daa"}));
+		options.setSuccessEmails(Arrays.asList(new String[] {"dee", null, "dae"}));
+		options.setPipelineLevel(2);
+		options.setPipelineExecutionId(3);
+		options.setNotifyOnFirstFailure(true);
+		options.setNotifyOnLastFailure(true);
+		
+		HashMap<String, String> flowProps = new HashMap<String,String>();
+		flowProps.put("la", "fa");
+		options.setFlowParameters(flowProps);
+		exFlow.setExecutionOptions(options);
+		
+		Object obj = exFlow.toObject();
+		String exFlowJSON = JSONUtils.toJSON(obj);
+		@SuppressWarnings("unchecked")
+		Map<String,Object> flowObjMap = (Map<String,Object>)JSONUtils.parseJSONFromString(exFlowJSON);
+		
+		ExecutableFlow parsedExFlow = ExecutableFlow.createExecutableFlowFromObject(flowObjMap);
+		testEquals(exFlow, parsedExFlow);
+	}
+	
+	@Test
+	public void testExecutorFlowUpdates() throws Exception {
+		Flow flow = project.getFlow("jobe");
+		ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+		exFlow.setExecutionId(101);
+		
+		// Create copy of flow
+		Object obj = exFlow.toObject();
+		String exFlowJSON = JSONUtils.toJSON(obj);
+		@SuppressWarnings("unchecked")
+		Map<String,Object> flowObjMap = (Map<String,Object>)JSONUtils.parseJSONFromString(exFlowJSON);
+		ExecutableFlow copyFlow = ExecutableFlow.createExecutableFlowFromObject(flowObjMap);
+		
+		testEquals(exFlow, copyFlow);
+		
+		ExecutableNode joba = exFlow.getExecutableNode("joba");
+		ExecutableFlowBase jobb = (ExecutableFlowBase)(exFlow.getExecutableNode("jobb"));
+		ExecutableFlowBase jobc = (ExecutableFlowBase)(exFlow.getExecutableNode("jobc"));
+		ExecutableFlowBase jobd = (ExecutableFlowBase)(exFlow.getExecutableNode("jobd"));
+		ExecutableNode jobe = exFlow.getExecutableNode("jobe");
+		assertNotNull(joba, jobb, jobc, jobd, jobe);
+		
+		ExecutableNode jobbInnerFlowA = jobb.getExecutableNode("innerJobA");
+		ExecutableNode jobbInnerFlowB = jobb.getExecutableNode("innerJobB");
+		ExecutableNode jobbInnerFlowC = jobb.getExecutableNode("innerJobC");
+		ExecutableNode jobbInnerFlow = jobb.getExecutableNode("innerFlow");
+		assertNotNull(jobbInnerFlowA, jobbInnerFlowB, jobbInnerFlowC, jobbInnerFlow);
+		
+		ExecutableNode jobcInnerFlowA = jobc.getExecutableNode("innerJobA");
+		ExecutableNode jobcInnerFlowB = jobc.getExecutableNode("innerJobB");
+		ExecutableNode jobcInnerFlowC = jobc.getExecutableNode("innerJobC");
+		ExecutableNode jobcInnerFlow = jobc.getExecutableNode("innerFlow");
+		assertNotNull(jobcInnerFlowA, jobcInnerFlowB, jobcInnerFlowC, jobcInnerFlow);
+		
+		ExecutableNode jobdInnerFlowA = jobd.getExecutableNode("innerJobA");
+		ExecutableNode jobdInnerFlowB = jobd.getExecutableNode("innerJobB");
+		ExecutableNode jobdInnerFlowC = jobd.getExecutableNode("innerJobC");
+		ExecutableNode jobdInnerFlow = jobd.getExecutableNode("innerFlow");
+		assertNotNull(jobdInnerFlowA, jobdInnerFlowB, jobdInnerFlowC, jobdInnerFlow);
+		
+		exFlow.setEndTime(1000);
+		exFlow.setStartTime(500);
+		exFlow.setStatus(Status.RUNNING);
+		exFlow.setUpdateTime(133);
+		
+		// Change one job and see if it updates
+		jobe.setEndTime(System.currentTimeMillis());
+		jobe.setUpdateTime(System.currentTimeMillis());
+		jobe.setStatus(Status.DISABLED);
+		jobe.setStartTime(System.currentTimeMillis() - 1000);
+		// Should be one node that was changed
+		Map<String,Object> updateObject = exFlow.toUpdateObject(0);
+		Assert.assertEquals(1, ((List)(updateObject.get("nodes"))).size());
+		// Reapplying should give equal results.
+		copyFlow.applyUpdateObject(updateObject);
+		testEquals(exFlow, copyFlow);
+		
+		// This update shouldn't provide any results
+		updateObject = exFlow.toUpdateObject(System.currentTimeMillis());
+		Assert.assertNull(updateObject.get("nodes"));
+		
+		// Change inner flow
+		jobbInnerFlowA.setEndTime(System.currentTimeMillis());
+		jobbInnerFlowA.setUpdateTime(System.currentTimeMillis());
+		jobbInnerFlowA.setStatus(Status.DISABLED);
+		jobbInnerFlowA.setStartTime(System.currentTimeMillis() - 1000);
+		// We should get 2 updates if we do a toUpdateObject using 0 as the start time
+		updateObject = exFlow.toUpdateObject(0);
+		Assert.assertEquals(2, ((List)(updateObject.get("nodes"))).size());
+
+		// This should provide 1 update. That we can apply
+		updateObject = exFlow.toUpdateObject(jobe.getUpdateTime());
+		Assert.assertEquals(1, ((List)(updateObject.get("nodes"))).size());
+		copyFlow.applyUpdateObject(updateObject);
+		testEquals(exFlow, copyFlow);
+		
+		// This shouldn't give any results anymore
+		updateObject = exFlow.toUpdateObject(jobbInnerFlowA.getUpdateTime());
+		Assert.assertNull(updateObject.get("nodes"));
+	}
+	
+	private void assertNotNull(ExecutableNode ... nodes) {
+		for (ExecutableNode node: nodes) {
+			Assert.assertNotNull(node);
+		}
+	}
+	
+	public static void testEquals(ExecutableNode a, ExecutableNode b) {
+		if (a instanceof ExecutableFlow) {
+			if (b instanceof ExecutableFlow) {
+				ExecutableFlow exA = (ExecutableFlow)a;
+				ExecutableFlow exB = (ExecutableFlow)b;
+				
+				Assert.assertEquals(exA.getScheduleId(), exB.getScheduleId());
+				Assert.assertEquals(exA.getProjectId(), exB.getProjectId());
+				Assert.assertEquals(exA.getVersion(), exB.getVersion());
+				Assert.assertEquals(exA.getSubmitTime(), exB.getSubmitTime());
+				Assert.assertEquals(exA.getSubmitUser(), exB.getSubmitUser());
+				Assert.assertEquals(exA.getExecutionPath(), exB.getExecutionPath());
+				
+				testEquals(exA.getExecutionOptions(), exB.getExecutionOptions());
+			}
+			else {
+				Assert.fail("A is ExecutableFlow, but B is not");
+			}
+		}
+
+		if (a instanceof ExecutableFlowBase) {
+			if (b instanceof ExecutableFlowBase) {
+				ExecutableFlowBase exA = (ExecutableFlowBase)a;
+				ExecutableFlowBase exB = (ExecutableFlowBase)b;
+				
+				Assert.assertEquals(exA.getFlowId(), exB.getFlowId());
+				Assert.assertEquals(exA.getExecutableNodes().size(), exB.getExecutableNodes().size());
+				
+				for(ExecutableNode nodeA : exA.getExecutableNodes()) {
+					ExecutableNode nodeB = exB.getExecutableNode(nodeA.getId());
+					Assert.assertNotNull(nodeB);
+					Assert.assertEquals(a, nodeA.getParentFlow());
+					Assert.assertEquals(b, nodeB.getParentFlow());
+					
+					testEquals(nodeA, nodeB);
+				}
+			}
+			else {
+				Assert.fail("A is ExecutableFlowBase, but B is not");
+			}
+		}
+		
+		Assert.assertEquals(a.getId(), b.getId());
+		Assert.assertEquals(a.getStatus(), b.getStatus());
+		Assert.assertEquals(a.getStartTime(), b.getStartTime());
+		Assert.assertEquals(a.getEndTime(), b.getEndTime());
+		Assert.assertEquals(a.getUpdateTime(), b.getUpdateTime());
+		Assert.assertEquals(a.getAttempt(), b.getAttempt());
+
+		Assert.assertEquals(a.getJobSource(), b.getJobSource());
+		Assert.assertEquals(a.getPropsSource(), b.getPropsSource());
+		Assert.assertEquals(a.getInNodes(), a.getInNodes());
+		Assert.assertEquals(a.getOutNodes(), a.getOutNodes());
+	}
+	
+	
+	
+	public static void testEquals(ExecutionOptions optionsA, ExecutionOptions optionsB) {
+		Assert.assertEquals(optionsA.getConcurrentOption(), optionsB.getConcurrentOption());
+		Assert.assertEquals(optionsA.getNotifyOnFirstFailure(), optionsB.getNotifyOnFirstFailure());
+		Assert.assertEquals(optionsA.getNotifyOnLastFailure(), optionsB.getNotifyOnLastFailure());
+		Assert.assertEquals(optionsA.getFailureAction(), optionsB.getFailureAction());
+		Assert.assertEquals(optionsA.getPipelineExecutionId(), optionsB.getPipelineExecutionId());
+		Assert.assertEquals(optionsA.getPipelineLevel(), optionsB.getPipelineLevel());
+		Assert.assertEquals(optionsA.isFailureEmailsOverridden(), optionsB.isFailureEmailsOverridden());
+		Assert.assertEquals(optionsA.isSuccessEmailsOverridden(), optionsB.isSuccessEmailsOverridden());
+		
+		testEquals(optionsA.getDisabledJobs(), optionsB.getDisabledJobs());
+		testEquals(optionsA.getSuccessEmails(), optionsB.getSuccessEmails());
+		testEquals(optionsA.getFailureEmails(), optionsB.getFailureEmails());
+		testEquals(optionsA.getFlowParameters(), optionsB.getFlowParameters());
+	}
+	
+	public static void testEquals(Set<String> a, Set<String> b) {
+		if (a == b) {
+			return;
+		}
+		
+		if (a == null || b == null) {
+			Assert.fail();
+		}
+		
+		Assert.assertEquals(a.size(), b.size());
+		
+		Iterator<String> iterA = a.iterator();
+		
+		while(iterA.hasNext()) {
+			String aStr = iterA.next();
+			Assert.assertTrue(b.contains(aStr));
+		}
+	}
+	
+	public static void testEquals(List<String> a, List<String> b) {
+		if (a == b) {
+			return;
+		}
+		
+		if (a == null || b == null) {
+			Assert.fail();
+		}
+		
+		Assert.assertEquals(a.size(), b.size());
+		
+		Iterator<String> iterA = a.iterator();
+		Iterator<String> iterB = b.iterator();
+		
+		while(iterA.hasNext()) {
+			String aStr = iterA.next();
+			String bStr = iterB.next();
+			
+			Assert.assertEquals(aStr, bStr);
+		}
+	}
+	
+	public static void testEquals(Map<String, String> a, Map<String, String> b) {
+		if (a == b) {
+			return;
+		}
+		
+		if (a == null || b == null) {
+			Assert.fail();
+		}
+		
+		Assert.assertEquals(a.size(), b.size());
+		
+		for (String key: a.keySet()) {
+			Assert.assertEquals(a.get(key), b.get(key));
+		}
+	}
+}
diff --git a/unit/java/azkaban/test/utils/DirectoryFlowLoaderTest.java b/unit/java/azkaban/test/utils/DirectoryFlowLoaderTest.java
index 4316c5c..d671e6f 100644
--- a/unit/java/azkaban/test/utils/DirectoryFlowLoaderTest.java
+++ b/unit/java/azkaban/test/utils/DirectoryFlowLoaderTest.java
@@ -2,6 +2,8 @@ package azkaban.test.utils;
 
 import java.io.File;
 
+import junit.framework.Assert;
+
 import org.apache.log4j.Logger;
 import org.junit.Test;
 
@@ -18,4 +20,26 @@ public class DirectoryFlowLoaderTest {
 		logger.info(loader.getFlowMap().size());
 	}
 	
+	@Test
+	public void testLoadEmbeddedFlow() {
+		Logger logger = Logger.getLogger(this.getClass());
+		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+		
+		loader.loadProjectFlow(new File("unit/executions/embedded"));
+		Assert.assertEquals(0, loader.getErrors().size());
+	}
+	
+	@Test
+	public void testRecursiveLoadEmbeddedFlow() {
+		Logger logger = Logger.getLogger(this.getClass());
+		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+		
+		loader.loadProjectFlow(new File("unit/executions/embeddedBad"));
+		for (String error: loader.getErrors()) {
+			System.out.println(error);
+		}
+		
+		// Should be 3 errors: jobe->innerFlow, innerFlow->jobe, innerFlow
+		Assert.assertEquals(3, loader.getErrors().size());
+	}
 }