azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 11(+6 -5)
src/java/azkaban/execapp/JobRunner.java 41(+38 -3)
Details
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 8ca895e..1c24545 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -37,7 +37,7 @@ public abstract class FlowWatcher {
* Called to fire events to the JobRunner listeners
* @param jobId
*/
- protected synchronized void handleJobFinished(String jobId, Status status) {
+ protected synchronized void handleJobStatusChange(String jobId, Status status) {
BlockingStatus block = map.get(jobId);
if (block != null) {
block.changeStatus(status);
@@ -53,7 +53,8 @@ public abstract class FlowWatcher {
return null;
}
- ExecutableNode node = flow.getExecutableNode(jobId);
+ String[] split = jobId.split(":");
+ ExecutableNode node = flow.getExecutableNode(split);
if (node == null) {
return null;
}
@@ -68,7 +69,8 @@ public abstract class FlowWatcher {
}
public Status peekStatus(String jobId) {
- ExecutableNode node = flow.getExecutableNode(jobId);
+ String[] split = jobId.split(":");
+ ExecutableNode node = flow.getExecutableNode(split);
if (node != null) {
return node.getStatus();
}
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index cbf6333..1dd4ba5 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -40,22 +40,19 @@ public class LocalFlowWatcher extends FlowWatcher {
public void handleEvent(Event event) {
if (event.getType() == Type.JOB_FINISHED) {
if (event.getRunner() instanceof FlowRunner) {
+ // The flow runner will finish a job without it running
Object data = event.getData();
if (data instanceof ExecutableNode) {
ExecutableNode node = (ExecutableNode)data;
-
-// if (node.getId()) {
-//
-// }
-
- handleJobFinished(node.getId(), node.getStatus());
+ handleJobStatusChange(node.getNestedId(), node.getStatus());
}
}
else if (event.getRunner() instanceof JobRunner) {
+ // A job runner is finished
JobRunner runner = (JobRunner)event.getRunner();
ExecutableNode node = runner.getNode();
- handleJobFinished(node.getId(), node.getStatus());
+ handleJobStatusChange(node.getNestedId(), node.getStatus());
}
}
else if (event.getType() == Type.FLOW_FINISHED) {
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
index 7beb47a..9d1c407 100644
--- a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -1,5 +1,8 @@
package azkaban.execapp.event;
+import java.util.ArrayList;
+import java.util.Map;
+
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
@@ -54,27 +57,24 @@ public class RemoteFlowWatcher extends FlowWatcher {
isShutdown = true;
}
+ long updateTime = 0;
if (flow == null) {
flow = updateFlow;
}
else {
+ Map<String, Object> updateData = updateFlow.toUpdateObject(updateTime);
+ ArrayList<ExecutableNode> updatedNodes = new ArrayList<ExecutableNode>();
+ flow.applyUpdateObject(updateData, updatedNodes);
+
flow.setStatus(updateFlow.getStatus());
flow.setEndTime(updateFlow.getEndTime());
flow.setUpdateTime(updateFlow.getUpdateTime());
- for (ExecutableNode node : flow.getExecutableNodes()) {
- String jobId = node.getId();
- ExecutableNode newNode = updateFlow.getExecutableNode(jobId);
- long updateTime = node.getUpdateTime();
- node.setUpdateTime(newNode.getUpdateTime());
- node.setStatus(newNode.getStatus());
- node.setStartTime(newNode.getStartTime());
- node.setEndTime(newNode.getEndTime());
-
- if (updateTime < newNode.getUpdateTime()) {
- handleJobFinished(jobId, newNode.getStatus());
- }
+ for (ExecutableNode node : updatedNodes) {
+ handleJobStatusChange(node.getNestedId(), node.getStatus());
}
+
+ updateTime = flow.getUpdateTime();
}
if (Status.isStatusFinished(flow.getStatus())) {
@@ -92,7 +92,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
}
}
-
+
@Override
public synchronized void stopWatcher() {
if(isShutdown) {
src/java/azkaban/execapp/FlowRunner.java 11(+6 -5)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 6680b7a..fbf4065 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -366,6 +366,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (node instanceof ExecutableFlowBase && ((ExecutableFlowBase)node).isFlowFinished()) {
finalizeFlow((ExecutableFlowBase)node);
+ fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
}
else if (nextStatus == Status.KILLED || isCancelled()) {
logger.info("Killing " + node.getId() + " due to prior errors.");
@@ -392,7 +393,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
private void finalizeFlow(ExecutableFlowBase flow) {
- String id = flow == this.flow ? "" : flow.getPrintableId() + " ";
+ String id = flow == this.flow ? "" : flow.getNestedId() + " ";
// If it's not the starting flow, we'll create set of output props
// for the finished flow.
@@ -519,12 +520,12 @@ public class FlowRunner extends EventHandler implements Runnable {
node.setStatus(Status.RUNNING);
node.setStartTime(System.currentTimeMillis());
- logger.info("Starting subflow " + node.getPrintableId() + ".");
+ logger.info("Starting subflow " + node.getNestedId() + ".");
}
else {
node.setStatus(Status.QUEUED);
JobRunner runner = createJobRunner(node);
- logger.info("Submitting job " + node.getPrintableId() + " to run.");
+ logger.info("Submitting job " + node.getNestedId() + " to run.");
try {
executorService.submit(runner);
activeJobRunners.add(runner);
@@ -707,7 +708,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (node.getStatus() == Status.FAILED) {
node.resetForRetry();
- logger.info("Re-enabling job " + node.getPrintableId() + " attempt " + node.getAttempt());
+ logger.info("Re-enabling job " + node.getNestedId() + " attempt " + node.getAttempt());
reEnableDependents(node);
}
else if (node.getStatus() == Status.KILLED) {
@@ -760,7 +761,7 @@ public class FlowRunner extends EventHandler implements Runnable {
ExecutableNode node = runner.getNode();
activeJobRunners.remove(node.getId());
- String id = node.getPrintableId();
+ String id = node.getNestedId();
logger.info("Job Finished " + id + " with status " + node.getStatus());
if (node.getOutputProps() != null && node.getOutputProps().size() > 0) {
logger.info("Job " + id + " had output props.");
src/java/azkaban/execapp/JobRunner.java 41(+38 -3)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 7013e4a..f948192 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -37,6 +37,7 @@ import azkaban.execapp.event.Event;
import azkaban.execapp.event.Event.Type;
import azkaban.execapp.event.EventHandler;
import azkaban.execapp.event.FlowWatcher;
+import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
@@ -117,14 +118,48 @@ public class JobRunner extends EventHandler implements Runnable {
this.pipelineLevel = pipelineLevel;
if (this.pipelineLevel == 1) {
- pipelineJobs.add(node.getId());
+ pipelineJobs.add(node.getNestedId());
}
else if (this.pipelineLevel == 2) {
- pipelineJobs.add(node.getId());
- pipelineJobs.addAll(node.getOutNodes());
+ pipelineJobs.add(node.getNestedId());
+ ExecutableFlowBase parentFlow = node.getParentFlow();
+ for (String outNode : node.getOutNodes()) {
+ ExecutableNode nextNode = parentFlow.getExecutableNode(outNode);
+
+ // If the next node is a nested flow, then we add the nested starting nodes
+ if (nextNode instanceof ExecutableFlowBase) {
+ ExecutableFlowBase nextFlow = (ExecutableFlowBase)nextNode;
+ findAllStartingNodes(nextFlow, pipelineJobs);
+ }
+ else {
+ pipelineJobs.add(nextNode.getNestedId());
+ }
+ }
+ }
+ }
+
+ private void findAllStartingNodes(ExecutableFlowBase flow, Set<String> pipelineJobs) {
+ for (String startingNode: flow.getStartNodes()) {
+ ExecutableNode node = flow.getExecutableNode(startingNode);
+ if (node instanceof ExecutableFlowBase) {
+ findAllStartingNodes((ExecutableFlowBase)node, pipelineJobs);
+ }
+ else {
+ pipelineJobs.add(node.getNestedId());
+ }
}
}
+ /**
+ * Returns a list of jobs that this JobRunner will wait upon to finish before starting.
+ * It is only relevant if pipeline is turned on.
+ *
+ * @return
+ */
+ public Set<String> getPipelineWatchedJobs() {
+ return pipelineJobs;
+ }
+
public void setDelayStart(long delayMS) {
delayStartMs = delayMS;
}
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index e99bbe2..35cd4f8 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -122,6 +122,30 @@ public class ExecutableFlowBase extends ExecutableNode {
return executableNodes.get(id);
}
+ public ExecutableNode getExecutableNode(String ... ids) {
+ return getExecutableNode(this, ids, 0);
+ }
+
+ private ExecutableNode getExecutableNode(ExecutableFlowBase flow, String[] ids, int currentIdIdx) {
+ ExecutableNode node = flow.getExecutableNode(ids[currentIdIdx]);
+ currentIdIdx++;
+
+ if (node == null) {
+ return null;
+ }
+
+ if (ids.length == currentIdIdx) {
+ return node;
+ }
+ else if (node instanceof ExecutableFlowBase) {
+ return getExecutableNode((ExecutableFlowBase)node, ids, currentIdIdx);
+ }
+ else {
+ return null;
+ }
+
+ }
+
public List<String> getStartNodes() {
if (startNodes == null) {
startNodes = new ArrayList<String>();
@@ -254,13 +278,17 @@ public class ExecutableFlowBase extends ExecutableNode {
return updateData;
}
- @SuppressWarnings("unchecked")
- public void applyUpdateObject(Map<String, Object> updateData) {
+ public void applyUpdateObject(Map<String, Object> updateData, List<ExecutableNode> updatedNodes) {
super.applyUpdateObject(updateData);
-
- List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get(NODES_PARAM);
+
if (updatedNodes != null) {
- for (Map<String,Object> node: updatedNodes) {
+ updatedNodes.add(this);
+ }
+
+ @SuppressWarnings("unchecked")
+ List<Map<String,Object>> nodes = (List<Map<String,Object>>)updateData.get(NODES_PARAM);
+ if (nodes != null) {
+ for (Map<String,Object> node: nodes) {
String id = (String)node.get(ID_PARAM);
if (id == null) {
@@ -269,9 +297,23 @@ public class ExecutableFlowBase extends ExecutableNode {
}
ExecutableNode exNode = executableNodes.get(id);
- exNode.applyUpdateObject(node);
+ if (updatedNodes != null) {
+ updatedNodes.add(exNode);
+ }
+
+ if (exNode instanceof ExecutableFlowBase) {
+ ((ExecutableFlowBase)exNode).applyUpdateObject(node, updatedNodes);
+ }
+ else {
+ exNode.applyUpdateObject(updateData);
+ }
}
}
+
+ }
+
+ public void applyUpdateObject(Map<String, Object> updateData) {
+ applyUpdateObject(updateData, null);
}
public void reEnableDependents(ExecutableNode ... nodes) {
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index c0333d0..db5fd8d 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -236,7 +236,7 @@ public class ExecutableNode {
return array;
}
- public String getPrintableId() {
+ public String getNestedId() {
return getPrintableId(":");
}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
new file mode 100644
index 0000000..d20f2f6
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -0,0 +1,262 @@
+package azkaban.test.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.event.LocalFlowWatcher;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
+import azkaban.test.executor.InteractiveTestJob;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.Props;
+
+public class FlowRunnerPipelineTest {
+ private File workingDir;
+ private JobTypeManager jobtypeManager;
+ private ProjectLoader fakeProjectLoader;
+ private ExecutorLoader fakeExecutorLoader;
+ private Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+ private Project project;
+ private Map<String, Flow> flowMap;
+ private static int id=101;
+
+ public FlowRunnerPipelineTest() {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ System.out.println("Create temp dir");
+ workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+ if (workingDir.exists()) {
+ FileUtils.deleteDirectory(workingDir);
+ }
+ workingDir.mkdirs();
+ jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager.registerJobType("java", JavaJob.class);
+ jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+ fakeProjectLoader = new MockProjectLoader(workingDir);
+ fakeExecutorLoader = new MockExecutorLoader();
+ project = new Project(1, "testProject");
+
+ File dir = new File("unit/executions/embedded2");
+ prepareProject(dir);
+
+ InteractiveTestJob.clearTestJobs();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ System.out.println("Teardown temp dir");
+ if (workingDir != null) {
+ FileUtils.deleteDirectory(workingDir);
+ workingDir = null;
+ }
+ }
+
+ @Test
+ public void testBasicPipelineLevel1Run() throws Exception {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner previousRunner = createFlowRunner(eventCollector, "jobf", "prev");
+
+ ExecutionOptions options = new ExecutionOptions();
+ options.setPipelineExecutionId(previousRunner.getExecutableFlow().getExecutionId());
+ options.setPipelineLevel(1);
+ FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
+ FlowRunner pipelineRunner = createFlowRunner(eventCollector, "jobf", "pipe", options);
+ pipelineRunner.setFlowWatcher(watcher);
+
+ Map<String, Status> previousExpectedStateMap = new HashMap<String, Status>();
+ Map<String, Status> pipelineExpectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> previousNodeMap = new HashMap<String, ExecutableNode>();
+ Map<String, ExecutableNode> pipelineNodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
+ ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
+ createExpectedStateMap(previousFlow, previousExpectedStateMap, previousNodeMap);
+ createExpectedStateMap(pipelineFlow, pipelineExpectedStateMap, pipelineNodeMap);
+
+ Thread thread1 = runFlowRunnerInThread(previousRunner);
+ pause(250);
+ Thread thread2 = runFlowRunnerInThread(pipelineRunner);
+ pause(500);
+
+ previousExpectedStateMap.put("joba", Status.RUNNING);
+ previousExpectedStateMap.put("joba1", Status.RUNNING);
+ pipelineExpectedStateMap.put("joba", Status.QUEUED);
+ pipelineExpectedStateMap.put("joba1", Status.QUEUED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:joba").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("joba", Status.SUCCEEDED);
+ previousExpectedStateMap.put("jobb", Status.RUNNING);
+ previousExpectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ previousExpectedStateMap.put("jobd", Status.RUNNING);
+ previousExpectedStateMap.put("jobc", Status.RUNNING);
+ previousExpectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ pipelineExpectedStateMap.put("joba", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:jobb:innerJobA").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ previousExpectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ previousExpectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("pipe:joba").succeedJob();
+ pause(250);
+ pipelineExpectedStateMap.put("joba", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobb", Status.RUNNING);
+ pipelineExpectedStateMap.put("jobd", Status.RUNNING);
+ pipelineExpectedStateMap.put("jobc", Status.QUEUED);
+ pipelineExpectedStateMap.put("jobd:innerJobA", Status.QUEUED);
+ pipelineExpectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:jobd:innerJobA").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+ previousExpectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
+ pipelineExpectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+ // Finish the previous d side
+ InteractiveTestJob.getTestJob("prev:jobd:innerFlow2").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+ previousExpectedStateMap.put("jobd", Status.SUCCEEDED);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+
+ InteractiveTestJob.getTestJob("prev:jobb:innerJobB").succeedJob();
+ InteractiveTestJob.getTestJob("prev:jobb:innerJobC").succeedJob();
+ InteractiveTestJob.getTestJob("prev:jobc").succeedJob();
+ pause(250);
+ InteractiveTestJob.getTestJob("pipe:jobb:innerJobA").succeedJob();
+ pause(250);
+ previousExpectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+ previousExpectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+ previousExpectedStateMap.put("jobb:innerFlow", Status.RUNNING);
+ previousExpectedStateMap.put("jobc", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ pipelineExpectedStateMap.put("jobc", Status.RUNNING);
+ pipelineExpectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ pipelineExpectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ compareStates(previousExpectedStateMap, previousNodeMap);
+ compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+// Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
+// Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
+// Assert.assertFalse(thread1.isAlive());
+// Assert.assertFalse(thread2.isAlive());
+ }
+
+ private Thread runFlowRunnerInThread(FlowRunner runner) {
+ Thread thread = new Thread(runner);
+ thread.start();
+ return thread;
+ }
+
+ private void pause(long millisec) {
+ synchronized(this) {
+ try {
+ wait(millisec);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private void createExpectedStateMap(ExecutableFlowBase flow, Map<String, Status> expectedStateMap, Map<String, ExecutableNode> nodeMap) {
+ for (ExecutableNode node: flow.getExecutableNodes()) {
+ expectedStateMap.put(node.getNestedId(), node.getStatus());
+ nodeMap.put(node.getNestedId(), node);
+
+ if (node instanceof ExecutableFlowBase) {
+ createExpectedStateMap((ExecutableFlowBase)node, expectedStateMap, nodeMap);
+ }
+ }
+ }
+
+ private void compareStates(Map<String, Status> expectedStateMap, Map<String, ExecutableNode> nodeMap) {
+ for (String printedId: expectedStateMap.keySet()) {
+ Status expectedStatus = expectedStateMap.get(printedId);
+ ExecutableNode node = nodeMap.get(printedId);
+
+ if (expectedStatus != node.getStatus()) {
+ Assert.fail("Expected values do not match for " + printedId + ". Expected " + expectedStatus + ", instead received " + node.getStatus());
+ }
+ }
+ }
+
+ private void prepareProject(File directory) throws ProjectManagerException, IOException {
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ loader.loadProjectFlow(directory);
+ if (!loader.getErrors().isEmpty()) {
+ for (String error: loader.getErrors()) {
+ System.out.println(error);
+ }
+
+ throw new RuntimeException("Errors found in setup");
+ }
+
+ flowMap = loader.getFlowMap();
+ project.setFlows(flowMap);
+ FileUtils.copyDirectory(directory, workingDir);
+ }
+
+ private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName, String groupName) throws Exception {
+ return createFlowRunner(eventCollector, flowName, groupName, new ExecutionOptions());
+ }
+
+ private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName, String groupName, ExecutionOptions options) throws Exception {
+ Flow flow = flowMap.get(flowName);
+
+ int exId = id++;
+ ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+ exFlow.setExecutionPath(workingDir.getPath());
+ exFlow.setExecutionId(exId);
+
+ Map<String, String> flowParam = new HashMap<String, String>();
+ flowParam.put("group", groupName);
+ options.addAllFlowParameters(flowParam);
+ exFlow.setExecutionOptions(options);
+ fakeExecutorLoader.uploadExecutableFlow(exFlow);
+
+ FlowRunner runner = new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+
+ runner.addListener(eventCollector);
+
+ return runner;
+ }
+
+}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 98f7818..0ac1fb6 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -830,8 +830,8 @@ public class FlowRunnerTest2 {
private void createExpectedStateMap(ExecutableFlowBase flow, Map<String, Status> expectedStateMap, Map<String, ExecutableNode> nodeMap) {
for (ExecutableNode node: flow.getExecutableNodes()) {
- expectedStateMap.put(node.getPrintableId(), node.getStatus());
- nodeMap.put(node.getPrintableId(), node);
+ expectedStateMap.put(node.getNestedId(), node.getStatus());
+ nodeMap.put(node.getNestedId(), node);
if (node instanceof ExecutableFlowBase) {
createExpectedStateMap((ExecutableFlowBase)node, expectedStateMap, nodeMap);
diff --git a/unit/java/azkaban/test/executor/InteractiveTestJob.java b/unit/java/azkaban/test/executor/InteractiveTestJob.java
index f117d97..3c385de 100644
--- a/unit/java/azkaban/test/executor/InteractiveTestJob.java
+++ b/unit/java/azkaban/test/executor/InteractiveTestJob.java
@@ -29,7 +29,11 @@ public class InteractiveTestJob extends AbstractProcessJob {
@Override
public void run() throws Exception {
String nestedFlowPath = this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
+ String groupName = this.getJobProps().getString("group", null);
String id = nestedFlowPath == null ? this.getId() : nestedFlowPath;
+ if (groupName != null) {
+ id = groupName + ":" + id;
+ }
testJobs.put(id, this);
while(isWaiting) {