azkaban-uncached

Changes

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 648acef..9b3f199 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -354,7 +354,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				for(JobRunner activeRunner : activeJobRunners) {
 					activeRunner.cancel();
 				}
-				
+
 				flow.killNode(System.currentTimeMillis());
 			} catch (Exception e) {
 				logger.error(e);
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index bc24043..a6d5b95 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -352,6 +352,13 @@ public class JobRunner extends EventHandler implements Runnable {
 			else {
 				logInfo("Starting job " + this.jobId + " at " + node.getStartTime());
 			}
+			
+			// If it's an embedded flow, we'll add the nested flow info to the job conf
+			if (node.getExecutableFlow() != node.getParentFlow()) {
+				String subFlow = node.getExecutableFlow().getId() + "," + node.getPrintableId(",");
+				props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
+			}
+			
 			props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
 			props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
 			node.setStatus(Status.RUNNING);
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index e9f81f7..9ee6aa2 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -45,7 +45,6 @@ public class ExecutableFlow extends ExecutableFlowBase {
 	private long submitTime = -1;
 	private String submitUser;
 	private String executionPath;
-	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
 	
 	private HashSet<String> proxyUsers = new HashSet<String>();
 	private ExecutionOptions executionOptions;
@@ -58,10 +57,6 @@ public class ExecutableFlow extends ExecutableFlowBase {
 		this.setFlow(project, flow);
 	}
 	
-	public ExecutableFlow(Flow flow) {
-		this.setFlow(null, flow);
-	}
-	
 	public ExecutableFlow() {
 	}
 
@@ -75,10 +70,6 @@ public class ExecutableFlow extends ExecutableFlowBase {
 		return this;
 	}
 	
-	public Collection<FlowProps> getFlowProps() {
-		return flowProps.values();
-	}
-	
 	public void addAllProxyUsers(Collection<String> proxyUsers) {
 		this.proxyUsers.addAll(proxyUsers);
 	}
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 34bff5d..d89a412 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -16,6 +16,7 @@
 package azkaban.executor;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -74,12 +75,17 @@ public class ExecutableFlowBase extends ExecutableNode {
 		return -1;
 	}
 	
+	public Collection<FlowProps> getFlowProps() {
+		return flowProps.values();
+	}
+	
 	public String getFlowId() {
 		return flowId;
 	}
 	
 	protected void setFlow(Project project, Flow flow) {
 		this.flowId = flow.getId();
+		flowProps.putAll(flow.getAllFlowProps());
 		
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
@@ -100,6 +106,9 @@ public class ExecutableFlowBase extends ExecutableNode {
 			ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
 			ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
 			
+			if (sourceNode == null) {
+				System.out.println("Source node " + edge.getSourceId() + " doesn't exist");
+			}
 			sourceNode.addOutNode(edge.getTargetId());
 			targetNode.addInNode(edge.getSourceId());
 		}
@@ -250,16 +259,18 @@ public class ExecutableFlowBase extends ExecutableNode {
 		super.applyUpdateObject(updateData);
 
 		List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get(NODES_PARAM);
-		for (Map<String,Object> node: updatedNodes) {
-
-			String id = (String)node.get(ID_PARAM);
-			if (id == null) {
-				// Legacy case
-				id = (String)node.get("jobId");				
+		if (updatedNodes != null) {
+			for (Map<String,Object> node: updatedNodes) {
+	
+				String id = (String)node.get(ID_PARAM);
+				if (id == null) {
+					// Legacy case
+					id = (String)node.get("jobId");				
+				}
+	
+				ExecutableNode exNode = executableNodes.get(id);
+				exNode.applyUpdateObject(node);
 			}
-
-			ExecutableNode exNode = executableNodes.get(id);
-			exNode.applyUpdateObject(node);
 		}
 	}
 	
@@ -314,7 +325,7 @@ public class ExecutableFlowBase extends ExecutableNode {
 		
 		nodeloop:
 		for (ExecutableNode node: executableNodes.values()) {
-			if(Status.isStatusFinished(node.getStatus())) {
+			if(Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
 				continue;
 			}
 
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index e0f6522..c0333d0 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -1,6 +1,7 @@
 package azkaban.executor;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -38,8 +39,8 @@ public class ExecutableNode {
 	private String jobSource; 
 	// Path to top level props file
 	private String propsSource;
-	private Set<String> inNodes = null;
-	private Set<String> outNodes = null;
+	private Set<String> inNodes = new HashSet<String>();
+	private Set<String> outNodes = new HashSet<String>();
 	
 	private Props inputProps;
 	private Props outputProps;
@@ -140,16 +141,10 @@ public class ExecutableNode {
 	}
 	
 	public void addOutNode(String exNode) {
-		if (outNodes == null) {
-			outNodes = new HashSet<String>();
-		}
 		outNodes.add(exNode);
 	}
 	
 	public void addInNode(String exNode) {
-		if (inNodes == null) {
-			inNodes = new HashSet<String>();
-		}
 		inNodes.add(exNode);
 	}
 
@@ -268,21 +263,21 @@ public class ExecutableNode {
 		objMap.put(TYPE_PARAM, type);
 		objMap.put(ATTEMPT_PARAM, attempt);
 		
-		if (inNodes != null) {
+		if (inNodes != null && !inNodes.isEmpty()) {
 			objMap.put(INNODES_PARAM, inNodes);
 		}
-		if (outNodes != null) {
+		if (outNodes != null && !outNodes.isEmpty()) {
 			objMap.put(OUTNODES_PARAM, outNodes);
 		}
 		
-//		if (hasPropsSource()) {
-//			objMap.put(PROPS_SOURCE_PARAM, this.propsSource);
-//		}
+		if (hasPropsSource()) {
+			objMap.put(PROPS_SOURCE_PARAM, this.propsSource);
+		}
 		if (hasJobSource()) {
 			objMap.put(JOB_SOURCE_PARAM, this.jobSource);
 		}
 		
-		if (outputProps != null) {
+		if (outputProps != null && outputProps.size() > 0) {
 			objMap.put(OUTPUT_PROPS_PARAM, PropsUtils.toStringMap(outputProps, true));
 		}
 		
@@ -307,17 +302,17 @@ public class ExecutableNode {
 		
 		if (objMap.containsKey(INNODES_PARAM)) {
 			this.inNodes = new HashSet<String>();
-			this.inNodes.addAll((List<String>)objMap.get(INNODES_PARAM));
+			this.inNodes.addAll((Collection<String>)objMap.get(INNODES_PARAM));
 		}
 		
 		if (objMap.containsKey(OUTNODES_PARAM)) {
 			this.outNodes = new HashSet<String>();
-			this.outNodes.addAll((List<String>)objMap.get(OUTNODES_PARAM));
+			this.outNodes.addAll((Collection<String>)objMap.get(OUTNODES_PARAM));
+		}
+	
+		if (objMap.containsKey(PROPS_SOURCE_PARAM)) {
+			this.propsSource = (String)objMap.get(PROPS_SOURCE_PARAM);
 		}
-//	
-//		if (objMap.containsKey(PROPS_SOURCE_PARAM)) {
-//			this.propsSource = (String)objMap.get(PROPS_SOURCE_PARAM);
-//		}
 		
 		if (objMap.containsKey(JOB_SOURCE_PARAM)) {
 			this.jobSource = (String)objMap.get(JOB_SOURCE_PARAM);
@@ -327,7 +322,7 @@ public class ExecutableNode {
 			this.outputProps = new Props(null, (Map<String,String>)objMap.get(OUTPUT_PROPS_PARAM));
 		}
 		
-		List<Object> pastAttempts = (List<Object>)objMap.get(PASTATTEMPTS_PARAM);
+		Collection<Object> pastAttempts = (Collection<Object>)objMap.get(PASTATTEMPTS_PARAM);
 		if (pastAttempts!=null) {
 			ArrayList<ExecutionAttempt> attempts = new ArrayList<ExecutionAttempt>();
 			for (Object attemptObj: pastAttempts) {
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
index a91a6d0..cedc3f0 100644
--- a/src/java/azkaban/executor/ExecutionOptions.java
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -16,6 +16,20 @@ public class ExecutionOptions {
 	public static final String CONCURRENT_OPTION_PIPELINE="pipeline";
 	public static final String CONCURRENT_OPTION_IGNORE="ignore";
 	
+	private static final String FLOW_PARAMETERS = "flowParameters";
+	private static final String NOTIFY_ON_FIRST_FAILURE = "notifyOnFirstFailure";
+	private static final String NOTIFY_ON_LAST_FAILURE = "notifyOnLastFailure";
+	private static final String SUCCESS_EMAILS = "successEmails";
+	private static final String FAILURE_EMAILS = "failureEmails";
+	private static final String FAILURE_ACTION = "failureAction";
+	private static final String PIPELINE_LEVEL = "pipelineLevel";
+	private static final String PIPELINE_EXECID = "pipelineExecId";
+	private static final String QUEUE_LEVEL = "queueLevel";
+	private static final String CONCURRENT_OPTION = "concurrentOption";
+	private static final String DISABLE = "disabled";
+	private static final String FAILURE_EMAILS_OVERRIDE = "failureEmailsOverride";
+	private static final String SUCCESS_EMAILS_OVERRIDE = "successEmailsOverride";
+	
 	private boolean notifyOnFirstFailure = true;
 	private boolean notifyOnLastFailure = false;
 	private boolean failureEmailsOverride = false;
@@ -39,8 +53,8 @@ public class ExecutionOptions {
 	
 	private Set<String> initiallyDisabledJobs = new HashSet<String>();
 	
-	public void setFlowParameters(Map<String,String> flowParam) {
-		flowParameters.get(flowParam);
+	public void addAllFlowParameters(Map<String,String> flowParam) {
+		flowParameters.putAll(flowParam);
 	}
 	
 	public Map<String,String> getFlowParameters() {
@@ -142,19 +156,19 @@ public class ExecutionOptions {
 	public Map<String,Object> toObject() {
 		HashMap<String,Object> flowOptionObj = new HashMap<String,Object>();
 		
-		flowOptionObj.put("flowParameters", this.flowParameters);
-		flowOptionObj.put("notifyOnFirstFailure", this.notifyOnFirstFailure);
-		flowOptionObj.put("notifyOnLastFailure", this.notifyOnLastFailure);
-		flowOptionObj.put("successEmails", successEmails);
-		flowOptionObj.put("failureEmails", failureEmails);
-		flowOptionObj.put("failureAction", failureAction.toString());
-		flowOptionObj.put("pipelineLevel", pipelineLevel);
-		flowOptionObj.put("pipelineExecId", pipelineExecId);
-		flowOptionObj.put("queueLevel", queueLevel);
-		flowOptionObj.put("concurrentOption", concurrentOption);
-		flowOptionObj.put("disabled", initiallyDisabledJobs);
-		flowOptionObj.put("failureEmailsOverride", failureEmailsOverride);
-		flowOptionObj.put("successEmailsOverride", successEmailsOverride);
+		flowOptionObj.put(FLOW_PARAMETERS, this.flowParameters);
+		flowOptionObj.put(NOTIFY_ON_FIRST_FAILURE, this.notifyOnFirstFailure);
+		flowOptionObj.put(NOTIFY_ON_LAST_FAILURE, this.notifyOnLastFailure);
+		flowOptionObj.put(SUCCESS_EMAILS, successEmails);
+		flowOptionObj.put(FAILURE_EMAILS, failureEmails);
+		flowOptionObj.put(FAILURE_ACTION, failureAction.toString());
+		flowOptionObj.put(PIPELINE_LEVEL, pipelineLevel);
+		flowOptionObj.put(PIPELINE_EXECID, pipelineExecId);
+		flowOptionObj.put(QUEUE_LEVEL, queueLevel);
+		flowOptionObj.put(CONCURRENT_OPTION, concurrentOption);
+		flowOptionObj.put(DISABLE, initiallyDisabledJobs);
+		flowOptionObj.put(FAILURE_EMAILS_OVERRIDE, failureEmailsOverride);
+		flowOptionObj.put(SUCCESS_EMAILS_OVERRIDE, successEmailsOverride);
 		return flowOptionObj;
 	}
 	
@@ -167,46 +181,47 @@ public class ExecutionOptions {
 		Map<String,Object> optionsMap = (Map<String,Object>)obj;
 		
 		ExecutionOptions options = new ExecutionOptions();
-		if (optionsMap.containsKey("flowParameters")) {
-			options.flowParameters = new HashMap<String, String>((Map<String,String>)optionsMap.get("flowParameters"));
+		if (optionsMap.containsKey(FLOW_PARAMETERS)) {
+			options.flowParameters = new HashMap<String, String>();
+			options.flowParameters.putAll((Map<String,String>)optionsMap.get(FLOW_PARAMETERS));
 		}
 		// Failure notification
-		if (optionsMap.containsKey("notifyOnFirstFailure")) {
-			options.notifyOnFirstFailure = (Boolean)optionsMap.get("notifyOnFirstFailure");
+		if (optionsMap.containsKey(NOTIFY_ON_FIRST_FAILURE)) {
+			options.notifyOnFirstFailure = (Boolean)optionsMap.get(NOTIFY_ON_FIRST_FAILURE);
 		}
-		if (optionsMap.containsKey("notifyOnLastFailure")) {
-			options.notifyOnLastFailure = (Boolean)optionsMap.get("notifyOnLastFailure");
+		if (optionsMap.containsKey(NOTIFY_ON_LAST_FAILURE)) {
+			options.notifyOnLastFailure = (Boolean)optionsMap.get(NOTIFY_ON_LAST_FAILURE);
 		}
-		if (optionsMap.containsKey("concurrentOption")) {
-			options.concurrentOption = (String)optionsMap.get("concurrentOption");
+		if (optionsMap.containsKey(CONCURRENT_OPTION)) {
+			options.concurrentOption = (String)optionsMap.get(CONCURRENT_OPTION);
 		}
-		if (optionsMap.containsKey("disabled")) {
-			options.initiallyDisabledJobs = new HashSet<String>((List<String>)optionsMap.get("disabled"));
+		if (optionsMap.containsKey(DISABLE)) {
+			options.initiallyDisabledJobs = new HashSet<String>((Collection<String>)optionsMap.get(DISABLE));
 		}
 		
 		// Failure action
-		if (optionsMap.containsKey("failureAction")) {
-			options.failureAction = FailureAction.valueOf((String)optionsMap.get("failureAction"));
+		if (optionsMap.containsKey(FAILURE_ACTION)) {
+			options.failureAction = FailureAction.valueOf((String)optionsMap.get(FAILURE_ACTION));
 		}
-		options.pipelineLevel = (Integer)optionsMap.get("pipelineLevel");
-		options.pipelineExecId = (Integer)optionsMap.get("pipelineExecId");
-		options.queueLevel = (Integer)optionsMap.get("queueLevel");
+		options.pipelineLevel = (Integer)optionsMap.get(PIPELINE_LEVEL);
+		options.pipelineExecId = (Integer)optionsMap.get(PIPELINE_EXECID);
+		options.queueLevel = (Integer)optionsMap.get(QUEUE_LEVEL);
 		
 		// Success emails
-		if (optionsMap.containsKey("successEmails")) {
-			options.setSuccessEmails((List<String>)optionsMap.get("successEmails"));
+		if (optionsMap.containsKey(SUCCESS_EMAILS)) {
+			options.setSuccessEmails((List<String>)optionsMap.get(SUCCESS_EMAILS));
 		}
 		// Failure emails
-		if (optionsMap.containsKey("failureEmails")) {
-			options.setFailureEmails((List<String>)optionsMap.get("failureEmails"));
+		if (optionsMap.containsKey(FAILURE_EMAILS)) {
+			options.setFailureEmails((List<String>)optionsMap.get(FAILURE_EMAILS));
 		}
 		
-		if (optionsMap.containsKey("successEmailsOverride")) {
-			options.setSuccessEmailsOverridden((Boolean)optionsMap.get("successEmailsOverride"));
+		if (optionsMap.containsKey(SUCCESS_EMAILS_OVERRIDE)) {
+			options.setSuccessEmailsOverridden((Boolean)optionsMap.get(SUCCESS_EMAILS_OVERRIDE));
 		}
 		
-		if (optionsMap.containsKey("failureEmailsOverride")) {
-			options.setFailureEmailsOverridden((Boolean)optionsMap.get("failureEmailsOverride"));
+		if (optionsMap.containsKey(FAILURE_EMAILS_OVERRIDE)) {
+			options.setFailureEmailsOverridden((Boolean)optionsMap.get(FAILURE_EMAILS_OVERRIDE));
 		}
 		
 		return options;
diff --git a/src/java/azkaban/executor/Status.java b/src/java/azkaban/executor/Status.java
index f796378..4e82044 100644
--- a/src/java/azkaban/executor/Status.java
+++ b/src/java/azkaban/executor/Status.java
@@ -61,6 +61,7 @@ public enum Status {
 		switch (status) {
 		case RUNNING:
 		case FAILED_FINISHING:
+		case QUEUED:
 			return true;
 		default:
 			return false;
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 7a8ffc4..1744af5 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -67,6 +67,11 @@ public class CommonJobProperties {
 	public static final String FLOW_ID = "azkaban.flow.flowid";
 	
 	/**
+	 * The nested flow id path
+	 */
+	public static final String NESTED_FLOW_PATH = "azkaban.flow.nested.path";
+	
+	/**
 	 * The execution id. This should be unique per flow, but may not be due to 
 	 * restarts.
 	 */
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
index 694965d..1a6b945 100644
--- a/src/java/azkaban/jobExecutor/AbstractProcessJob.java
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -52,7 +52,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
 
 	protected String _cwd;
 
-	private volatile Props generatedPropeties;
+	private volatile Props generatedProperties;
 
 	protected AbstractProcessJob(String jobid, final Props sysProps, final Props jobProps, final Logger log) {
 		super(jobid, log);
@@ -83,7 +83,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
 
 	@Override
 	public Props getJobGeneratedProperties() {
-		return generatedPropeties;
+		return generatedProperties;
 	}
 
 	/**
@@ -187,7 +187,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
 	}
 
 	public void generateProperties(final File outputFile) {
-		generatedPropeties = loadOutputFileProps(outputFile);
+		generatedProperties = loadOutputFileProps(outputFile);
 	}
 
 }
diff --git a/src/java/azkaban/project/JdbcProjectLoader.java b/src/java/azkaban/project/JdbcProjectLoader.java
index 7825694..b6db057 100644
--- a/src/java/azkaban/project/JdbcProjectLoader.java
+++ b/src/java/azkaban/project/JdbcProjectLoader.java
@@ -77,6 +77,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 			
 			for (Project project: projects) {
 				List<Triple<String, Boolean, Permission>> permissions = fetchPermissionsForProject(connection, project);
+				
 				for (Triple<String, Boolean, Permission> entry: permissions) {
 					if(entry.getSecond()) {
 						project.setGroupPermission(entry.getFirst(), entry.getThird());
@@ -86,7 +87,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 					}
 				}
 			}
-		} catch (SQLException e) {
+		}
+		catch (SQLException e) {
 			throw new ProjectManagerException("Error retrieving all projects", e);
 		}
 		finally {
@@ -673,7 +675,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements ProjectLoad
 		}
 		catch (IOException e) {
 			throw new ProjectManagerException("Flow Upload failed.", e);
-		} catch (SQLException e) {
+		} 
+		catch (SQLException e) {
 			throw new ProjectManagerException("Flow Upload failed commit.", e);
 		}
 		finally {
diff --git a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
index 4e71930..9ec3ea1 100644
--- a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
+++ b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
@@ -74,7 +74,7 @@ public class HttpRequestUtils {
 		}
 		
 		Map<String, String> flowParamGroup = getParamGroup(req, "flowOverride");
-		execOptions.setFlowParameters(flowParamGroup);
+		execOptions.addAllFlowParameters(flowParamGroup);
 		
 		if (hasParam(req, "disabled")) {
 			String disabled = getParam(req, "disabled");
diff --git a/unit/executions/embedded/innerFlow.job b/unit/executions/embedded/innerFlow.job
index da71d64..e9b3b89 100644
--- a/unit/executions/embedded/innerFlow.job
+++ b/unit/executions/embedded/innerFlow.job
@@ -1,5 +1,4 @@
 type=javaprocess
-java.class=azkaban.test.executor.SleepJavaJob
 seconds=1
 fail=false
 dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embedded2/innerFlow.job b/unit/executions/embedded2/innerFlow.job
new file mode 100644
index 0000000..dfa0e9d
--- /dev/null
+++ b/unit/executions/embedded2/innerFlow.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=innerJobB,innerJobC
\ No newline at end of file
diff --git a/unit/executions/embedded2/innerJobA.job b/unit/executions/embedded2/innerJobA.job
new file mode 100644
index 0000000..35ebd72
--- /dev/null
+++ b/unit/executions/embedded2/innerJobA.job
@@ -0,0 +1,2 @@
+type=test
+
diff --git a/unit/executions/embedded2/innerJobB.job b/unit/executions/embedded2/innerJobB.job
new file mode 100644
index 0000000..dca1223
--- /dev/null
+++ b/unit/executions/embedded2/innerJobB.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=innerJobA
diff --git a/unit/executions/embedded2/innerJobC.job b/unit/executions/embedded2/innerJobC.job
new file mode 100644
index 0000000..dca1223
--- /dev/null
+++ b/unit/executions/embedded2/innerJobC.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=innerJobA
diff --git a/unit/executions/embedded2/joba.job b/unit/executions/embedded2/joba.job
new file mode 100644
index 0000000..80ad69e
--- /dev/null
+++ b/unit/executions/embedded2/joba.job
@@ -0,0 +1,2 @@
+type=test
+param1=joba.1
\ No newline at end of file
diff --git a/unit/executions/embedded2/joba1.job b/unit/executions/embedded2/joba1.job
new file mode 100644
index 0000000..98fd5f5
--- /dev/null
+++ b/unit/executions/embedded2/joba1.job
@@ -0,0 +1 @@
+type=test
diff --git a/unit/executions/embedded2/jobb.job b/unit/executions/embedded2/jobb.job
new file mode 100644
index 0000000..efbaa95
--- /dev/null
+++ b/unit/executions/embedded2/jobb.job
@@ -0,0 +1,4 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
+param
\ No newline at end of file
diff --git a/unit/executions/embedded2/jobc.job b/unit/executions/embedded2/jobc.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embedded2/jobc.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
diff --git a/unit/executions/embedded2/jobd.job b/unit/executions/embedded2/jobd.job
new file mode 100644
index 0000000..8844d8c
--- /dev/null
+++ b/unit/executions/embedded2/jobd.job
@@ -0,0 +1,3 @@
+type=flow
+flow.name=innerFlow
+dependencies=joba
diff --git a/unit/executions/embedded2/jobe.job b/unit/executions/embedded2/jobe.job
new file mode 100644
index 0000000..331a81e
--- /dev/null
+++ b/unit/executions/embedded2/jobe.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=jobb,jobc,jobd
diff --git a/unit/executions/embedded2/jobf.job b/unit/executions/embedded2/jobf.job
new file mode 100644
index 0000000..b1b00ce
--- /dev/null
+++ b/unit/executions/embedded2/jobf.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=jobe,joba1
diff --git a/unit/executions/embedded2/test1.properties b/unit/executions/embedded2/test1.properties
new file mode 100644
index 0000000..120fc25
--- /dev/null
+++ b/unit/executions/embedded2/test1.properties
@@ -0,0 +1,4 @@
+param1=test1.1
+param2=test1.2
+param3=test1.3
+param4=test1.4
\ No newline at end of file
diff --git a/unit/executions/embedded2/test2.properties b/unit/executions/embedded2/test2.properties
new file mode 100644
index 0000000..7df7744
--- /dev/null
+++ b/unit/executions/embedded2/test2.properties
@@ -0,0 +1,4 @@
+param5=test2.5
+param6=test2.6
+param7=test2.7
+param8=test2.8
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 18513a3..7634a56 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -24,6 +24,7 @@ import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.test.executor.InteractiveTestJob;
 import azkaban.test.executor.JavaJob;
 import azkaban.utils.JSONUtils;
 
@@ -45,7 +46,10 @@ public class FlowRunnerTest {
 		workingDir.mkdirs();
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
 		jobtypeManager.registerJobType("java", JavaJob.class);
+		jobtypeManager.registerJobType("test", InteractiveTestJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
+		
+		InteractiveTestJob.clearTestJobs();
 	}
 	
 	@After
@@ -190,7 +194,7 @@ public class FlowRunnerTest {
 		
 		Assert.assertTrue(runner.isCancelled());
 		
-		Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
+		Assert.assertTrue("Expected flow " + Status.KILLED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.KILLED);
 		
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2d", Status.FAILED);
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
new file mode 100644
index 0000000..def90ca
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -0,0 +1,185 @@
+package azkaban.test.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
+import azkaban.test.executor.InteractiveTestJob;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.Props;
+
+public class FlowRunnerTest2 {
+	private File workingDir;
+	private JobTypeManager jobtypeManager;
+	private ProjectLoader fakeProjectLoader;
+	private ExecutorLoader fakeExecutorLoader;
+	private Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+	private Project project;
+	private Map<String, Flow> flowMap;
+	private static int id=101;
+	
+	public FlowRunnerTest2() {
+	}
+	
+	@Before
+	public void setUp() throws Exception {
+		System.out.println("Create temp dir");
+		workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+		if (workingDir.exists()) {
+			FileUtils.deleteDirectory(workingDir);
+		}
+		workingDir.mkdirs();
+		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager.registerJobType("java", JavaJob.class);
+		jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+		fakeProjectLoader = new MockProjectLoader(workingDir);
+		fakeExecutorLoader = new MockExecutorLoader();
+		project = new Project(1, "testProject");
+		
+		File dir = new File("unit/executions/embedded2");
+		prepareProject(dir);
+		
+		InteractiveTestJob.clearTestJobs();
+	}
+	
+	@After
+	public void tearDown() throws IOException {
+		System.out.println("Teardown temp dir");
+		if (workingDir != null) {
+			FileUtils.deleteDirectory(workingDir);
+			workingDir = null;
+		}
+	}
+	
+	@Test
+	public void testBasicRun() throws Exception {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+		
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. STEP ONE: START FLOW
+		ExecutableFlow flow = runner.getExecutableFlow();
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		runFlowRunnerInThread(runner);
+		pause(1000);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		
+		compareStates(expectedStateMap, nodeMap);
+		ExecutableNode node = nodeMap.get("joba");
+		Props props = node.getInputProps();
+		Assert.assertEquals("joba.1", props.get("param1"));
+		Assert.assertEquals("test1.2", props.get("param2"));
+		Assert.assertEquals("test1.3", props.get("param3"));
+		Assert.assertEquals("override.4", props.get("param4"));
+		Assert.assertEquals("test2.5", props.get("param5"));
+		Assert.assertEquals("test2.6", props.get("param6"));
+		Assert.assertEquals("test2.7", props.get("param7"));
+		Assert.assertEquals("test2.8", props.get("param8"));
+		
+		// Make joba successful
+		
+		
+	}
+	
+	private Thread runFlowRunnerInThread(FlowRunner runner) {
+		Thread thread = new Thread(runner);
+		thread.start();
+		return thread;
+	}
+	
+	private void pause(long millisec) {
+		synchronized(this) {
+			try {
+				wait(millisec);
+			}
+			catch (InterruptedException e) {
+			}
+		}
+	}
+	
+	private void createExpectedStateMap(ExecutableFlowBase flow, Map<String, Status> expectedStateMap, Map<String, ExecutableNode> nodeMap) {
+		for (ExecutableNode node: flow.getExecutableNodes()) {
+			expectedStateMap.put(node.getPrintableId(), node.getStatus());
+			nodeMap.put(node.getPrintableId(), node);
+			
+			if (node instanceof ExecutableFlowBase) {
+				createExpectedStateMap((ExecutableFlowBase)node, expectedStateMap, nodeMap);
+			}
+		}
+	}
+	
+	private void compareStates(Map<String, Status> expectedStateMap, Map<String, ExecutableNode> nodeMap) {
+		for (String printedId: expectedStateMap.keySet()) {
+			Status expectedStatus = expectedStateMap.get(printedId);
+			ExecutableNode node = nodeMap.get(printedId);
+			
+			if (expectedStatus != node.getStatus()) {
+				Assert.fail("Expected values do not match for " + printedId + ". Expected " + expectedStatus + ", instead received " + node.getStatus());
+			}
+		}
+	}
+	
+	private void prepareProject(File directory) throws ProjectManagerException, IOException {
+		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+		loader.loadProjectFlow(directory);
+		if (!loader.getErrors().isEmpty()) {
+			for (String error: loader.getErrors()) {
+				System.out.println(error);
+			}
+			
+			throw new RuntimeException("Errors found in setup");
+		}
+		
+		flowMap = loader.getFlowMap();
+		project.setFlows(flowMap);
+		FileUtils.copyDirectory(directory, workingDir);
+	}
+	
+	private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName) throws Exception {
+		Flow flow = flowMap.get(flowName);
+
+		int exId = id++;
+		ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+		exFlow.setExecutionPath(workingDir.getPath());
+		exFlow.setExecutionId(exId);
+
+		Map<String, String> flowParam = new HashMap<String, String>();
+		flowParam.put("param4", "override.4");
+		flowParam.put("param10", "override.10");
+		flowParam.put("param11", "override.11");
+		exFlow.getExecutionOptions().addAllFlowParameters(flowParam);
+		fakeExecutorLoader.uploadExecutableFlow(exFlow);
+	
+		FlowRunner runner = new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+
+		runner.addListener(eventCollector);
+		
+		return runner;
+	}
+}
diff --git a/unit/java/azkaban/test/executor/ExecutableFlowTest.java b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
index 6b6ed94..4592720 100644
--- a/unit/java/azkaban/test/executor/ExecutableFlowTest.java
+++ b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
@@ -119,7 +119,7 @@ public class ExecutableFlowTest {
 		
 		HashMap<String, String> flowProps = new HashMap<String,String>();
 		flowProps.put("la", "fa");
-		options.setFlowParameters(flowProps);
+		options.addAllFlowParameters(flowProps);
 		exFlow.setExecutionOptions(options);
 		
 		Object obj = exFlow.toObject();
diff --git a/unit/java/azkaban/test/executor/InteractiveTestJob.java b/unit/java/azkaban/test/executor/InteractiveTestJob.java
new file mode 100644
index 0000000..3141304
--- /dev/null
+++ b/unit/java/azkaban/test/executor/InteractiveTestJob.java
@@ -0,0 +1,89 @@
+package azkaban.test.executor;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.utils.Props;
+
+public class InteractiveTestJob extends AbstractProcessJob {
+	private static ConcurrentHashMap<String, InteractiveTestJob> testJobs = new ConcurrentHashMap<String, InteractiveTestJob>();
+	private Props generatedProperties = new Props();
+	private boolean isWaiting = true;
+	private boolean succeed = true;
+
+	public static InteractiveTestJob getTestJob(String name) {
+		return testJobs.get(name);
+	}
+	
+	public static void clearTestJobs() {
+		testJobs.clear();
+	}
+	
+	public InteractiveTestJob(String jobId, Props sysProps, Props jobProps, Logger log) {
+		super(jobId, sysProps, jobProps, log);
+	}
+
+	@Override
+	public void run() throws Exception {
+		String nestedFlowPath = this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
+		String id = nestedFlowPath == null ? this.getId() : nestedFlowPath + "," + this.getId();
+		testJobs.put(id, this);
+		
+		while(isWaiting) {
+			synchronized(this) {
+				try {
+					wait(30000);
+				} catch (InterruptedException e) {
+				}
+				
+				if (!isWaiting) {
+					if (!succeed) {
+						throw new RuntimeException("Forced failure of " + getId());
+					}
+					else {
+						info("Job " + getId() + " succeeded.");
+					}
+				}
+			}
+		}
+	}
+	
+	public void failJob() {
+		synchronized(this) {
+			succeed = false;
+			isWaiting = false;
+			this.notify();
+		}
+	}
+	
+	public void succeedJob() {
+		synchronized(this) {
+			succeed = true;
+			isWaiting = false;
+			this.notify();
+		}
+	}
+	
+	public void succeedJob(Props generatedProperties) {
+		synchronized(this) {
+			this.generatedProperties = generatedProperties;
+			succeed = true;
+			isWaiting = false;
+			this.notify();
+		}
+	}
+	
+	@Override
+	public Props getJobGeneratedProperties() {
+		return generatedProperties;
+	}
+
+	@Override
+	public void cancel() throws InterruptedException {
+		info("Killing job");
+		failJob();
+	}
+}
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 86995f5..a51ea2a 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -32,6 +32,7 @@ import azkaban.executor.Status;
 import azkaban.flow.Flow;
 
 import azkaban.database.DataSourceUtils;
+import azkaban.project.Project;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
@@ -409,7 +410,11 @@ public class JdbcExecutorLoaderTest {
 		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
 		
 		Flow flow = Flow.flowFromObject(flowObj);
-		ExecutableFlow execFlow = new ExecutableFlow(flow);
+		Project project = new Project(1, "flow");
+		HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+		flowMap.put(flow.getId(), flow);
+		project.setFlows(flowMap);
+		ExecutableFlow execFlow = new ExecutableFlow(project, flow);
 		execFlow.setExecutionId(executionId);
 
 		return execFlow;
@@ -421,7 +426,11 @@ public class JdbcExecutorLoaderTest {
 		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
 		
 		Flow flow = Flow.flowFromObject(flowObj);
-		ExecutableFlow execFlow = new ExecutableFlow(flow);
+		Project project = new Project(1, "flow");
+		HashMap<String, Flow> flowMap = new HashMap<String, Flow>();
+		flowMap.put(flow.getId(), flow);
+		project.setFlows(flowMap);
+		ExecutableFlow execFlow = new ExecutableFlow(project, flow);
 
 		return execFlow;
 	}