azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 28(+7 -21)
unit/executions/embedded2/jobb.job 2(+2 -0)
unit/executions/embedded2/jobc.job 3(+1 -2)
unit/executions/embedded2/jobd.job 3(+2 -1)
Details
src/java/azkaban/execapp/FlowRunner.java 28(+7 -21)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 491189b..5cb7188 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -334,11 +334,11 @@ public class FlowRunner extends EventHandler implements Runnable {
}
else {
if (!progressGraph(flow)) {
- if (flow.isFlowFinished() || flowCancelled ) {
+ if (flow.isFlowFinished() ) {
flowFinished = true;
break;
}
-
+
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
@@ -348,27 +348,12 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
- if (flowCancelled) {
- try {
- logger.info("Flow was force cancelled cleaning up.");
- for(JobRunner activeRunner : activeJobRunners) {
- activeRunner.cancel();
- }
-
- flow.killNode(System.currentTimeMillis());
- } catch (Exception e) {
- logger.error(e);
- }
-
- updateFlow();
- }
-
logger.info("Finishing up flow. Awaiting Termination");
executorService.shutdown();
- synchronized(mainSyncObj) {
- finalizeFlow(flow);
- }
+ finalizeFlow(flow);
+ updateFlow();
+ logger.info("Finished Flow");
}
private boolean progressGraph(ExecutableFlowBase flow) throws IOException {
@@ -405,6 +390,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private void finalizeFlow(ExecutableFlowBase flow) {
String id = flow == this.flow ? "" : flow.getPrintableId() + " ";
+ flow.setEndTime(System.currentTimeMillis());
switch(flow.getStatus()) {
case FAILED_FINISHING:
logger.info("Setting flow " + id + "status to Failed.");
@@ -752,7 +738,7 @@ public class FlowRunner extends EventHandler implements Runnable {
String id = node.getPrintableId(":");
logger.info("Job Finished " + id + " with status " + node.getStatus());
- if (node.getOutputProps() != null) {
+ if (node.getOutputProps() != null && node.getOutputProps().size() > 0) {
logger.info("Job " + id + " had output props.");
}
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index cde9b6e..7013e4a 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -355,7 +355,7 @@ public class JobRunner extends EventHandler implements Runnable {
// If it's an embedded flow, we'll add the nested flow info to the job conf
if (node.getExecutableFlow() != node.getParentFlow()) {
- String subFlow = node.getExecutableFlow().getId() + ":" + node.getPrintableId(":");
+ String subFlow = node.getPrintableId(":");
props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
}
diff --git a/unit/executions/embedded2/innerFlow2.job b/unit/executions/embedded2/innerFlow2.job
new file mode 100644
index 0000000..35cbccb
--- /dev/null
+++ b/unit/executions/embedded2/innerFlow2.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=innerJobA
\ No newline at end of file
unit/executions/embedded2/jobb.job 2(+2 -0)
diff --git a/unit/executions/embedded2/jobb.job b/unit/executions/embedded2/jobb.job
index 8844d8c..4531028 100644
--- a/unit/executions/embedded2/jobb.job
+++ b/unit/executions/embedded2/jobb.job
@@ -1,3 +1,5 @@
type=flow
flow.name=innerFlow
dependencies=joba
+testprops=moo
+output.override=jobb
\ No newline at end of file
unit/executions/embedded2/jobc.job 3(+1 -2)
diff --git a/unit/executions/embedded2/jobc.job b/unit/executions/embedded2/jobc.job
index 8844d8c..2bfc5ff 100644
--- a/unit/executions/embedded2/jobc.job
+++ b/unit/executions/embedded2/jobc.job
@@ -1,3 +1,2 @@
-type=flow
-flow.name=innerFlow
+type=test
dependencies=joba
unit/executions/embedded2/jobd.job 3(+2 -1)
diff --git a/unit/executions/embedded2/jobd.job b/unit/executions/embedded2/jobd.job
index 8844d8c..e80f82b 100644
--- a/unit/executions/embedded2/jobd.job
+++ b/unit/executions/embedded2/jobd.job
@@ -1,3 +1,4 @@
type=flow
-flow.name=innerFlow
+flow.name=innerFlow2
dependencies=joba
+jobdprop=poop
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 7634a56..d7fe287 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -39,11 +39,13 @@ public class FlowRunnerTest {
@Before
public void setUp() throws Exception {
System.out.println("Create temp dir");
- workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
- if (workingDir.exists()) {
- FileUtils.deleteDirectory(workingDir);
+ synchronized ( this) {
+ workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+ if (workingDir.exists()) {
+ FileUtils.deleteDirectory(workingDir);
+ }
+ workingDir.mkdirs();
}
- workingDir.mkdirs();
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
jobtypeManager.registerJobType("java", JavaJob.class);
jobtypeManager.registerJobType("test", InteractiveTestJob.class);
@@ -55,9 +57,11 @@ public class FlowRunnerTest {
@After
public void tearDown() throws IOException {
System.out.println("Teardown temp dir");
- if (workingDir != null) {
- FileUtils.deleteDirectory(workingDir);
- workingDir = null;
+ synchronized ( this) {
+ if (workingDir != null) {
+ FileUtils.deleteDirectory(workingDir);
+ workingDir = null;
+ }
}
}
@@ -194,8 +198,16 @@ public class FlowRunnerTest {
Assert.assertTrue(runner.isCancelled());
- Assert.assertTrue("Expected flow " + Status.KILLED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.KILLED);
+ Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
+ synchronized(this) {
+ try {
+ wait(500);
+ } catch(InterruptedException e) {
+
+ }
+ }
+
testStatus(exFlow, "job1", Status.SUCCEEDED);
testStatus(exFlow, "job2d", Status.FAILED);
testStatus(exFlow, "job3", Status.KILLED);
@@ -231,6 +243,14 @@ public class FlowRunnerTest {
ExecutableFlow exFlow = runner.getExecutableFlow();
Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
+ synchronized(this) {
+ try {
+ wait(500);
+ } catch(InterruptedException e) {
+
+ }
+ }
+
testStatus(exFlow, "job1", Status.SUCCEEDED);
testStatus(exFlow, "job2d", Status.FAILED);
testStatus(exFlow, "job3", Status.SUCCEEDED);
@@ -279,7 +299,7 @@ public class FlowRunnerTest {
synchronized(this) {
// Wait for cleanup.
try {
- wait(1000);
+ wait(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -345,7 +365,9 @@ public class FlowRunnerTest {
}
private ExecutableFlow prepareExecDir(File execDir, String flowName, int execId) throws IOException {
- FileUtils.copyDirectory(execDir, workingDir);
+ synchronized ( this) {
+ FileUtils.copyDirectory(execDir, workingDir);
+ }
File jsonFlowFile = new File(workingDir, flowName + ".flow");
@SuppressWarnings("unchecked")
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index a8f6d34..909d490 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -84,7 +84,7 @@ public class FlowRunnerTest2 {
ExecutableFlow flow = runner.getExecutableFlow();
createExpectedStateMap(flow, expectedStateMap, nodeMap);
runFlowRunnerInThread(runner);
- pause(500);
+ pause(250);
// After it starts up, only joba should be running
expectedStateMap.put("joba", Status.RUNNING);
@@ -112,22 +112,20 @@ public class FlowRunnerTest2 {
Assert.assertEquals("test2.8", joba1.get("param8"));
// 2. JOB A COMPLETES SUCCESSFULLY
- InteractiveTestJob testJoba = InteractiveTestJob.getTestJob("joba");
- Props jobAOut = new Props();
- jobAOut.put("output.joba", "joba");
- testJoba.succeedJob(jobAOut);
- pause(500);
+ InteractiveTestJob.getTestJob("joba").succeedJob(Props.of("output.joba", "joba", "output.override", "joba"));
+ pause(250);
expectedStateMap.put("joba", Status.SUCCEEDED);
expectedStateMap.put("joba1", Status.RUNNING);
expectedStateMap.put("jobb", Status.RUNNING);
expectedStateMap.put("jobc", Status.RUNNING);
expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("joba:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobc:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
Props jobb = nodeMap.get("jobb").getInputProps();
Assert.assertEquals("test1.1", jobb.get("param1"));
+ Assert.assertEquals("test1.1", jobb.get("param1"));
Assert.assertEquals("test1.2", jobb.get("param2"));
Assert.assertEquals("test1.3", jobb.get("param3"));
Assert.assertEquals("override.4", jobb.get("param4"));
@@ -136,6 +134,9 @@ public class FlowRunnerTest2 {
Assert.assertEquals("test2.7", jobb.get("param7"));
Assert.assertEquals("test2.8", jobb.get("param8"));
Assert.assertEquals("test2.8", jobb.get("param8"));
+ // Test that jobb properties overwrites the output properties
+ Assert.assertEquals("moo", jobb.get("testprops"));
+ Assert.assertEquals("jobb", jobb.get("output.override"));
Assert.assertEquals("joba", jobb.get("output.joba"));
Props jobbInnerJobA = nodeMap.get("jobb:innerJobA").getInputProps();
@@ -148,6 +149,79 @@ public class FlowRunnerTest2 {
Assert.assertEquals("test2.7", jobbInnerJobA.get("param7"));
Assert.assertEquals("test2.8", jobbInnerJobA.get("param8"));
Assert.assertEquals("joba", jobbInnerJobA.get("output.joba"));
+
+ // 3. jobb:Inner completes
+ /// innerJobA completes
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob(Props.of("output.jobb.innerJobA", "jobb.innerJobA"));
+ pause(250);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+ Props jobbInnerJobB = nodeMap.get("jobb:innerJobB").getInputProps();
+ Assert.assertEquals("test1.1", jobbInnerJobB.get("param1"));
+ Assert.assertEquals("override.4", jobbInnerJobB.get("param4"));
+ Assert.assertEquals("jobb.innerJobA", jobbInnerJobB.get("output.jobb.innerJobA"));
+ Assert.assertEquals("moo", jobbInnerJobB.get("testprops"));
+ /// innerJobB, C completes
+ InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob(Props.of("output.jobb.innerJobB", "jobb.innerJobB"));
+ InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob(Props.of("output.jobb.innerJobC", "jobb.innerJobC"));
+ pause(250);
+ expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerFlow", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ Props jobbInnerJobD = nodeMap.get("jobb:innerFlow").getInputProps();
+ Assert.assertEquals("test1.1", jobbInnerJobD.get("param1"));
+ Assert.assertEquals("override.4", jobbInnerJobD.get("param4"));
+ Assert.assertEquals("jobb.innerJobB", jobbInnerJobD.get("output.jobb.innerJobB"));
+ Assert.assertEquals("jobb.innerJobC", jobbInnerJobD.get("output.jobb.innerJobC"));
+
+ // 4. Finish up on inner flow for jobb
+ InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob(Props.of("output1.jobb", "test1", "output2.jobb", "test2"));
+ pause(250);
+ expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
+ expectedStateMap.put("jobb", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+ Props jobbOutput = nodeMap.get("jobb").getOutputProps();
+ Assert.assertEquals("test1", jobbOutput.get("output1.jobb"));
+ Assert.assertEquals("test2", jobbOutput.get("output2.jobb"));
+
+ // 5. Finish jobc, jobd
+ InteractiveTestJob.getTestJob("jobc").succeedJob(Props.of("output.jobc", "jobc"));
+ pause(250);
+ expectedStateMap.put("jobc", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+ InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+ pause(250);
+ InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+ expectedStateMap.put("jobd", Status.SUCCEEDED);
+ expectedStateMap.put("jobe", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ Props jobd = nodeMap.get("jobe").getInputProps();
+ Assert.assertEquals("test1", jobd.get("output1.jobb"));
+ Assert.assertEquals("jobc", jobd.get("output.jobc"));
+
+ // 6. Finish off flow
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ pause(250);
+ InteractiveTestJob.getTestJob("jobe").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobe", Status.SUCCEEDED);
+ expectedStateMap.put("jobf", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobf").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobf", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
}
private Thread runFlowRunnerInThread(FlowRunner runner) {
@@ -225,4 +299,5 @@ public class FlowRunnerTest2 {
return runner;
}
+
}
diff --git a/unit/java/azkaban/test/executor/InteractiveTestJob.java b/unit/java/azkaban/test/executor/InteractiveTestJob.java
index 41d525c..f117d97 100644
--- a/unit/java/azkaban/test/executor/InteractiveTestJob.java
+++ b/unit/java/azkaban/test/executor/InteractiveTestJob.java
@@ -29,7 +29,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
@Override
public void run() throws Exception {
String nestedFlowPath = this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
- String id = nestedFlowPath == null ? this.getId() : nestedFlowPath + ":" + this.getId();
+ String id = nestedFlowPath == null ? this.getId() : nestedFlowPath;
testJobs.put(id, this);
while(isWaiting) {