azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 11(+9 -2)
unit/executions/embedded2/jobb.job 1(+0 -1)
Details
src/java/azkaban/execapp/FlowRunner.java 11(+9 -2)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 9b3f199..491189b 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -420,6 +420,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+ @SuppressWarnings("unchecked")
private void prepareJobProperties(ExecutableNode node) throws IOException {
Props props = null;
// The following is the hiearchical ordering of dependency resolution
@@ -441,14 +442,20 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- // 3. Output Properties
+ // 3. Flow Override properties
+ Map<String,String> flowParam = flow.getExecutionOptions().getFlowParameters();
+ if (flowParam != null && !flowParam.isEmpty()) {
+ props = new Props(props, flowParam);
+ }
+
+ // 4. Output Properties
Props outputProps = collectOutputProps(node);
if (outputProps != null) {
outputProps.setEarliestAncestor(props);
props = outputProps;
}
- // 4. The job source
+ // 5. The job source
Props jobSource = loadJobProps(node);
if (jobSource != null) {
jobSource.setParent(props);
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index a6d5b95..cde9b6e 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -355,7 +355,7 @@ public class JobRunner extends EventHandler implements Runnable {
// If it's an embedded flow, we'll add the nested flow info to the job conf
if (node.getExecutableFlow() != node.getParentFlow()) {
- String subFlow = node.getExecutableFlow().getId() + "," + node.getPrintableId(",");
+ String subFlow = node.getExecutableFlow().getId() + ":" + node.getPrintableId(":");
props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
}
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index d89a412..cda8279 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -325,7 +325,7 @@ public class ExecutableFlowBase extends ExecutableNode {
nodeloop:
for (ExecutableNode node: executableNodes.values()) {
- if(Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) {
+ if(Status.isStatusFinished(node.getStatus())) {
continue;
}
@@ -333,6 +333,9 @@ public class ExecutableFlowBase extends ExecutableNode {
// If the flow is still running, we traverse into the flow
jobsToRun.addAll(((ExecutableFlowBase)node).findNextJobsToRun());
}
+ else if (Status.isStatusRunning(node.getStatus())) {
+ continue;
+ }
else {
for (String dependency: node.getInNodes()) {
// We find that the outer-loop is unfinished.
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 1a0b25d..34b9f09 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -332,7 +332,6 @@ public class JobTypeManager
// logger.info("jobConf is " + jobConf);
//
job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
- logger.info("job built.");
}
catch (Exception e) {
//job = new InitErrorJob(jobId, e);
unit/executions/embedded2/jobb.job 1(+0 -1)
diff --git a/unit/executions/embedded2/jobb.job b/unit/executions/embedded2/jobb.job
index efbaa95..8844d8c 100644
--- a/unit/executions/embedded2/jobb.job
+++ b/unit/executions/embedded2/jobb.job
@@ -1,4 +1,3 @@
type=flow
flow.name=innerFlow
dependencies=joba
-param
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index def90ca..a8f6d34 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -80,31 +80,74 @@ public class FlowRunnerTest2 {
Map<String, Status> expectedStateMap = new HashMap<String, Status>();
Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
- // 1. STEP ONE: START FLOW
+ // 1. START FLOW
ExecutableFlow flow = runner.getExecutableFlow();
createExpectedStateMap(flow, expectedStateMap, nodeMap);
runFlowRunnerInThread(runner);
- pause(1000);
+ pause(500);
// After it starts up, only joba should be running
expectedStateMap.put("joba", Status.RUNNING);
expectedStateMap.put("joba1", Status.RUNNING);
compareStates(expectedStateMap, nodeMap);
- ExecutableNode node = nodeMap.get("joba");
- Props props = node.getInputProps();
- Assert.assertEquals("joba.1", props.get("param1"));
- Assert.assertEquals("test1.2", props.get("param2"));
- Assert.assertEquals("test1.3", props.get("param3"));
- Assert.assertEquals("override.4", props.get("param4"));
- Assert.assertEquals("test2.5", props.get("param5"));
- Assert.assertEquals("test2.6", props.get("param6"));
- Assert.assertEquals("test2.7", props.get("param7"));
- Assert.assertEquals("test2.8", props.get("param8"));
+ Props joba = nodeMap.get("joba").getInputProps();
+ Assert.assertEquals("joba.1", joba.get("param1"));
+ Assert.assertEquals("test1.2", joba.get("param2"));
+ Assert.assertEquals("test1.3", joba.get("param3"));
+ Assert.assertEquals("override.4", joba.get("param4"));
+ Assert.assertEquals("test2.5", joba.get("param5"));
+ Assert.assertEquals("test2.6", joba.get("param6"));
+ Assert.assertEquals("test2.7", joba.get("param7"));
+ Assert.assertEquals("test2.8", joba.get("param8"));
- // Make joba successful
+ Props joba1 = nodeMap.get("joba1").getInputProps();
+ Assert.assertEquals("test1.1", joba1.get("param1"));
+ Assert.assertEquals("test1.2", joba1.get("param2"));
+ Assert.assertEquals("test1.3", joba1.get("param3"));
+ Assert.assertEquals("override.4", joba1.get("param4"));
+ Assert.assertEquals("test2.5", joba1.get("param5"));
+ Assert.assertEquals("test2.6", joba1.get("param6"));
+ Assert.assertEquals("test2.7", joba1.get("param7"));
+ Assert.assertEquals("test2.8", joba1.get("param8"));
+ // 2. JOB A COMPLETES SUCCESSFULLY
+ InteractiveTestJob testJoba = InteractiveTestJob.getTestJob("joba");
+ Props jobAOut = new Props();
+ jobAOut.put("output.joba", "joba");
+ testJoba.succeedJob(jobAOut);
+ pause(500);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("joba:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobc:innerJobA", Status.RUNNING);
+
+ Props jobb = nodeMap.get("jobb").getInputProps();
+ Assert.assertEquals("test1.1", jobb.get("param1"));
+ Assert.assertEquals("test1.2", jobb.get("param2"));
+ Assert.assertEquals("test1.3", jobb.get("param3"));
+ Assert.assertEquals("override.4", jobb.get("param4"));
+ Assert.assertEquals("test2.5", jobb.get("param5"));
+ Assert.assertEquals("test2.6", jobb.get("param6"));
+ Assert.assertEquals("test2.7", jobb.get("param7"));
+ Assert.assertEquals("test2.8", jobb.get("param8"));
+ Assert.assertEquals("test2.8", jobb.get("param8"));
+ Assert.assertEquals("joba", jobb.get("output.joba"));
+ Props jobbInnerJobA = nodeMap.get("jobb:innerJobA").getInputProps();
+ Assert.assertEquals("test1.1", jobbInnerJobA.get("param1"));
+ Assert.assertEquals("test1.2", jobbInnerJobA.get("param2"));
+ Assert.assertEquals("test1.3", jobbInnerJobA.get("param3"));
+ Assert.assertEquals("override.4", jobbInnerJobA.get("param4"));
+ Assert.assertEquals("test2.5", jobbInnerJobA.get("param5"));
+ Assert.assertEquals("test2.6", jobbInnerJobA.get("param6"));
+ Assert.assertEquals("test2.7", jobbInnerJobA.get("param7"));
+ Assert.assertEquals("test2.8", jobbInnerJobA.get("param8"));
+ Assert.assertEquals("joba", jobbInnerJobA.get("output.joba"));
}
private Thread runFlowRunnerInThread(FlowRunner runner) {
diff --git a/unit/java/azkaban/test/executor/InteractiveTestJob.java b/unit/java/azkaban/test/executor/InteractiveTestJob.java
index 3141304..41d525c 100644
--- a/unit/java/azkaban/test/executor/InteractiveTestJob.java
+++ b/unit/java/azkaban/test/executor/InteractiveTestJob.java
@@ -29,7 +29,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
@Override
public void run() throws Exception {
String nestedFlowPath = this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
- String id = nestedFlowPath == null ? this.getId() : nestedFlowPath + "," + this.getId();
+ String id = nestedFlowPath == null ? this.getId() : nestedFlowPath + ":" + this.getId();
testJobs.put(id, this);
while(isWaiting) {