azkaban-uncached

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 9b3f199..491189b 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -420,6 +420,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 	
+	@SuppressWarnings("unchecked")
 	private void prepareJobProperties(ExecutableNode node) throws IOException {
 		Props props = null;
 		// The following is the hiearchical ordering of dependency resolution
@@ -441,14 +442,20 @@ public class FlowRunner extends EventHandler implements Runnable {
 			}
 		}
 
-		// 3. Output Properties
+		// 3. Flow Override properties
+		Map<String,String> flowParam = flow.getExecutionOptions().getFlowParameters();
+		if (flowParam != null && !flowParam.isEmpty()) {
+			props = new Props(props, flowParam);
+		}
+		
+		// 4. Output Properties
 		Props outputProps = collectOutputProps(node);
 		if (outputProps != null) {
 			outputProps.setEarliestAncestor(props);
 			props = outputProps;
 		}
 		
-		// 4. The job source
+		// 5. The job source
 		Props jobSource = loadJobProps(node);
 		if (jobSource != null) {
 			jobSource.setParent(props);
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index a6d5b95..cde9b6e 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -355,7 +355,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			
 			// If it's an embedded flow, we'll add the nested flow info to the job conf
 			if (node.getExecutableFlow() != node.getParentFlow()) {
-				String subFlow = node.getExecutableFlow().getId() + "," + node.getPrintableId(",");
+				String subFlow = node.getExecutableFlow().getId() + ":" + node.getPrintableId(":");
 				props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
 			}
 			
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index d89a412..cda8279 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -325,7 +325,7 @@ public class ExecutableFlowBase extends ExecutableNode {
 		
 		nodeloop:
 		for (ExecutableNode node: executableNodes.values()) {
-			if(Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
+			if(Status.isStatusFinished(node.getStatus())) {
 				continue;
 			}
 
@@ -333,6 +333,9 @@ public class ExecutableFlowBase extends ExecutableNode {
 				// If the flow is still running, we traverse into the flow
 				jobsToRun.addAll(((ExecutableFlowBase)node).findNextJobsToRun());
 			}
+			else if (Status.isStatusRunning(node.getStatus())) {
+				continue;
+			}
 			else {
 				for (String dependency: node.getInNodes()) {
 					// We find that the outer-loop is unfinished.
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 1a0b25d..34b9f09 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -332,7 +332,6 @@ public class JobTypeManager
 //			logger.info("jobConf is " + jobConf);
 //			
 			job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
-			logger.info("job built.");
 		}
 		catch (Exception e) {
 			//job = new InitErrorJob(jobId, e);
diff --git a/unit/executions/embedded2/jobb.job b/unit/executions/embedded2/jobb.job
index efbaa95..8844d8c 100644
--- a/unit/executions/embedded2/jobb.job
+++ b/unit/executions/embedded2/jobb.job
@@ -1,4 +1,3 @@
 type=flow
 flow.name=innerFlow
 dependencies=joba
-param
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index def90ca..a8f6d34 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -80,31 +80,74 @@ public class FlowRunnerTest2 {
 		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
 		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
 		
-		// 1. STEP ONE: START FLOW
+		// 1. START FLOW
 		ExecutableFlow flow = runner.getExecutableFlow();
 		createExpectedStateMap(flow, expectedStateMap, nodeMap);
 		runFlowRunnerInThread(runner);
-		pause(1000);
+		pause(500);
 		
 		// After it starts up, only joba should be running
 		expectedStateMap.put("joba", Status.RUNNING);
 		expectedStateMap.put("joba1", Status.RUNNING);
 		
 		compareStates(expectedStateMap, nodeMap);
-		ExecutableNode node = nodeMap.get("joba");
-		Props props = node.getInputProps();
-		Assert.assertEquals("joba.1", props.get("param1"));
-		Assert.assertEquals("test1.2", props.get("param2"));
-		Assert.assertEquals("test1.3", props.get("param3"));
-		Assert.assertEquals("override.4", props.get("param4"));
-		Assert.assertEquals("test2.5", props.get("param5"));
-		Assert.assertEquals("test2.6", props.get("param6"));
-		Assert.assertEquals("test2.7", props.get("param7"));
-		Assert.assertEquals("test2.8", props.get("param8"));
+		Props joba = nodeMap.get("joba").getInputProps();
+		Assert.assertEquals("joba.1", joba.get("param1"));
+		Assert.assertEquals("test1.2", joba.get("param2"));
+		Assert.assertEquals("test1.3", joba.get("param3"));
+		Assert.assertEquals("override.4", joba.get("param4"));
+		Assert.assertEquals("test2.5", joba.get("param5"));
+		Assert.assertEquals("test2.6", joba.get("param6"));
+		Assert.assertEquals("test2.7", joba.get("param7"));
+		Assert.assertEquals("test2.8", joba.get("param8"));
 		
-		// Make joba successful
+		Props joba1 = nodeMap.get("joba1").getInputProps();
+		Assert.assertEquals("test1.1", joba1.get("param1"));
+		Assert.assertEquals("test1.2", joba1.get("param2"));
+		Assert.assertEquals("test1.3", joba1.get("param3"));
+		Assert.assertEquals("override.4", joba1.get("param4"));
+		Assert.assertEquals("test2.5", joba1.get("param5"));
+		Assert.assertEquals("test2.6", joba1.get("param6"));
+		Assert.assertEquals("test2.7", joba1.get("param7"));
+		Assert.assertEquals("test2.8", joba1.get("param8"));
 		
+		// 2. JOB A COMPLETES SUCCESSFULLY 
+		InteractiveTestJob testJoba = InteractiveTestJob.getTestJob("joba");
+		Props jobAOut = new Props();
+		jobAOut.put("output.joba", "joba");
+		testJoba.succeedJob(jobAOut);
+		pause(500);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("joba:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobc:innerJobA", Status.RUNNING);
+		
+		Props jobb = nodeMap.get("jobb").getInputProps();
+		Assert.assertEquals("test1.1", jobb.get("param1"));
+		Assert.assertEquals("test1.2", jobb.get("param2"));
+		Assert.assertEquals("test1.3", jobb.get("param3"));
+		Assert.assertEquals("override.4", jobb.get("param4"));
+		Assert.assertEquals("test2.5", jobb.get("param5"));
+		Assert.assertEquals("test2.6", jobb.get("param6"));
+		Assert.assertEquals("test2.7", jobb.get("param7"));
+		Assert.assertEquals("test2.8", jobb.get("param8"));
+		Assert.assertEquals("test2.8", jobb.get("param8"));
+		Assert.assertEquals("joba", jobb.get("output.joba"));
 		
+		Props jobbInnerJobA = nodeMap.get("jobb:innerJobA").getInputProps();
+		Assert.assertEquals("test1.1", jobbInnerJobA.get("param1"));
+		Assert.assertEquals("test1.2", jobbInnerJobA.get("param2"));
+		Assert.assertEquals("test1.3", jobbInnerJobA.get("param3"));
+		Assert.assertEquals("override.4", jobbInnerJobA.get("param4"));
+		Assert.assertEquals("test2.5", jobbInnerJobA.get("param5"));
+		Assert.assertEquals("test2.6", jobbInnerJobA.get("param6"));
+		Assert.assertEquals("test2.7", jobbInnerJobA.get("param7"));
+		Assert.assertEquals("test2.8", jobbInnerJobA.get("param8"));
+		Assert.assertEquals("joba", jobbInnerJobA.get("output.joba"));
 	}
 	
 	private Thread runFlowRunnerInThread(FlowRunner runner) {
diff --git a/unit/java/azkaban/test/executor/InteractiveTestJob.java b/unit/java/azkaban/test/executor/InteractiveTestJob.java
index 3141304..41d525c 100644
--- a/unit/java/azkaban/test/executor/InteractiveTestJob.java
+++ b/unit/java/azkaban/test/executor/InteractiveTestJob.java
@@ -29,7 +29,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
 	@Override
 	public void run() throws Exception {
 		String nestedFlowPath = this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
-		String id = nestedFlowPath == null ? this.getId() : nestedFlowPath + "," + this.getId();
+		String id = nestedFlowPath == null ? this.getId() : nestedFlowPath + ":" + this.getId();
 		testJobs.put(id, this);
 		
 		while(isWaiting) {