azkaban-memoizeit

Details

diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 8ca895e..1c24545 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -37,7 +37,7 @@ public abstract class FlowWatcher {
 	 * Called to fire events to the JobRunner listeners
 	 * @param jobId
 	 */
-	protected synchronized void handleJobFinished(String jobId, Status status) {
+	protected synchronized void handleJobStatusChange(String jobId, Status status) {
 		BlockingStatus block = map.get(jobId);
 		if (block != null) {
 			block.changeStatus(status);
@@ -53,7 +53,8 @@ public abstract class FlowWatcher {
 			return null;
 		}
 		
-		ExecutableNode node = flow.getExecutableNode(jobId);
+		String[] split = jobId.split(":");
+		ExecutableNode node = flow.getExecutableNode(split);
 		if (node == null) {
 			return null;
 		}
@@ -68,7 +69,8 @@ public abstract class FlowWatcher {
 	}
 	
 	public Status peekStatus(String jobId) {
-		ExecutableNode node = flow.getExecutableNode(jobId);
+		String[] split = jobId.split(":");
+		ExecutableNode node = flow.getExecutableNode(split);
 		if (node != null) {
 			return node.getStatus();
 		}
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index cbf6333..1dd4ba5 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -40,22 +40,19 @@ public class LocalFlowWatcher extends FlowWatcher {
 		public void handleEvent(Event event) {
 			if (event.getType() == Type.JOB_FINISHED) {
 				if (event.getRunner() instanceof FlowRunner) {
+					// The flow runner will finish a job without it running
 					Object data = event.getData();
 					if (data instanceof ExecutableNode) {
 						ExecutableNode node = (ExecutableNode)data;
-						
-//						if (node.getId()) {
-//							
-//						}
-						
-						handleJobFinished(node.getId(), node.getStatus());
+						handleJobStatusChange(node.getNestedId(), node.getStatus());
 					}
 				}
 				else if (event.getRunner() instanceof JobRunner) {
+					// A job runner is finished
 					JobRunner runner = (JobRunner)event.getRunner();
 					ExecutableNode node = runner.getNode();
 					
-					handleJobFinished(node.getId(), node.getStatus());
+					handleJobStatusChange(node.getNestedId(), node.getStatus());
 				}
 			}
 			else if (event.getType() == Type.FLOW_FINISHED) {
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
index 7beb47a..9d1c407 100644
--- a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -1,5 +1,8 @@
 package azkaban.execapp.event;
 
+import java.util.ArrayList;
+import java.util.Map;
+
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorLoader;
@@ -54,27 +57,24 @@ public class RemoteFlowWatcher extends FlowWatcher {
 					isShutdown = true;
 				}
 				
+				long updateTime = 0;
 				if (flow == null) {
 					flow = updateFlow;
 				}
 				else {
+					Map<String, Object> updateData = updateFlow.toUpdateObject(updateTime);
+					ArrayList<ExecutableNode> updatedNodes = new ArrayList<ExecutableNode>();
+					flow.applyUpdateObject(updateData, updatedNodes);
+
 					flow.setStatus(updateFlow.getStatus());
 					flow.setEndTime(updateFlow.getEndTime());
 					flow.setUpdateTime(updateFlow.getUpdateTime());
 					
-					for (ExecutableNode node : flow.getExecutableNodes()) {
-						String jobId = node.getId();
-						ExecutableNode newNode = updateFlow.getExecutableNode(jobId);
-						long updateTime = node.getUpdateTime();
-						node.setUpdateTime(newNode.getUpdateTime());
-						node.setStatus(newNode.getStatus());
-						node.setStartTime(newNode.getStartTime());
-						node.setEndTime(newNode.getEndTime());
-						
-						if (updateTime < newNode.getUpdateTime()) {
-							handleJobFinished(jobId, newNode.getStatus());
-						}
+					for (ExecutableNode node : updatedNodes) {
+						handleJobStatusChange(node.getNestedId(), node.getStatus());
 					}
+					
+					updateTime = flow.getUpdateTime();
 				}
 				
 				if (Status.isStatusFinished(flow.getStatus())) {
@@ -92,7 +92,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
 		}
 		
 	}
-
+	
 	@Override
 	public synchronized void stopWatcher() {
 		if(isShutdown) {
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 6680b7a..fbf4065 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -366,6 +366,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			
 			if (node instanceof ExecutableFlowBase && ((ExecutableFlowBase)node).isFlowFinished()) {
 				finalizeFlow((ExecutableFlowBase)node);
+				fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
 			}
 			else if (nextStatus == Status.KILLED || isCancelled()) {
 				logger.info("Killing " + node.getId() + " due to prior errors.");
@@ -392,7 +393,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 	
 	private void finalizeFlow(ExecutableFlowBase flow) {
-		String id = flow == this.flow ? "" : flow.getPrintableId() + " ";
+		String id = flow == this.flow ? "" : flow.getNestedId() + " ";
 
 		// If it's not the starting flow, we'll create set of output props
 		// for the finished flow.
@@ -519,12 +520,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 			node.setStatus(Status.RUNNING);
 			node.setStartTime(System.currentTimeMillis());
 			
-			logger.info("Starting subflow " + node.getPrintableId() + ".");
+			logger.info("Starting subflow " + node.getNestedId() + ".");
 		}
 		else {
 			node.setStatus(Status.QUEUED);
 			JobRunner runner = createJobRunner(node);
-			logger.info("Submitting job " + node.getPrintableId() + " to run.");
+			logger.info("Submitting job " + node.getNestedId() + " to run.");
 			try {
 				executorService.submit(runner);
 				activeJobRunners.add(runner);
@@ -707,7 +708,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			
 			if (node.getStatus() == Status.FAILED) {
 				node.resetForRetry();
-				logger.info("Re-enabling job " + node.getPrintableId() + " attempt " + node.getAttempt());
+				logger.info("Re-enabling job " + node.getNestedId() + " attempt " + node.getAttempt());
 				reEnableDependents(node);
 			}
 			else if (node.getStatus() == Status.KILLED) {
@@ -760,7 +761,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 					ExecutableNode node = runner.getNode();
 					activeJobRunners.remove(node.getId());
 					
-					String id = node.getPrintableId();
+					String id = node.getNestedId();
 					logger.info("Job Finished " + id + " with status " + node.getStatus());
 					if (node.getOutputProps() != null && node.getOutputProps().size() > 0) {
 						logger.info("Job " + id + " had output props.");
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 7013e4a..f948192 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -37,6 +37,7 @@ import azkaban.execapp.event.Event;
 import azkaban.execapp.event.Event.Type;
 import azkaban.execapp.event.EventHandler;
 import azkaban.execapp.event.FlowWatcher;
+import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
@@ -117,14 +118,48 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.pipelineLevel = pipelineLevel;
 
 		if (this.pipelineLevel == 1) {
-			pipelineJobs.add(node.getId());
+			pipelineJobs.add(node.getNestedId());
 		}
 		else if (this.pipelineLevel == 2) {
-			pipelineJobs.add(node.getId());
-			pipelineJobs.addAll(node.getOutNodes());
+			pipelineJobs.add(node.getNestedId());
+			ExecutableFlowBase parentFlow = node.getParentFlow();
+			for (String outNode : node.getOutNodes()) {
+				ExecutableNode nextNode = parentFlow.getExecutableNode(outNode);
+
+				// If the next node is a nested flow, then we add the nested starting nodes 
+				if (nextNode instanceof ExecutableFlowBase) {
+					ExecutableFlowBase nextFlow = (ExecutableFlowBase)nextNode;
+					findAllStartingNodes(nextFlow, pipelineJobs);
+				}
+				else {
+					pipelineJobs.add(nextNode.getNestedId());
+				}
+			}
+		}
+	}
+	
+	private void findAllStartingNodes(ExecutableFlowBase flow, Set<String> pipelineJobs) {
+		for (String startingNode: flow.getStartNodes()) {
+			ExecutableNode node = flow.getExecutableNode(startingNode);
+			if (node instanceof ExecutableFlowBase) {
+				findAllStartingNodes((ExecutableFlowBase)node, pipelineJobs);
+			}
+			else {
+				pipelineJobs.add(node.getNestedId());
+			}
 		}
 	}
 	
+	/**
+	 * Returns a list of jobs that this JobRunner will wait upon to finish before starting.
+	 * It is only relevant if pipeline is turned on.
+	 * 
+	 * @return
+	 */
+	public Set<String> getPipelineWatchedJobs() {
+		return pipelineJobs;
+	}
+	
 	public void setDelayStart(long delayMS) {
 		delayStartMs = delayMS;
 	}
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index e99bbe2..35cd4f8 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -122,6 +122,30 @@ public class ExecutableFlowBase extends ExecutableNode {
 		return executableNodes.get(id);
 	}
 	
+	public ExecutableNode getExecutableNode(String ... ids) {
+		return getExecutableNode(this, ids, 0);
+	}
+	
+	private ExecutableNode getExecutableNode(ExecutableFlowBase flow, String[] ids, int currentIdIdx) {
+		ExecutableNode node = flow.getExecutableNode(ids[currentIdIdx]);
+		currentIdIdx++;
+		
+		if (node == null) {
+			return null;
+		}
+		
+		if (ids.length == currentIdIdx) {
+			return node;
+		}
+		else if (node instanceof ExecutableFlowBase) {
+			return getExecutableNode((ExecutableFlowBase)node, ids, currentIdIdx);
+		}
+		else {
+			return null;
+		}
+		
+	}
+	
 	public List<String> getStartNodes() {
 		if (startNodes == null) {
 			startNodes = new ArrayList<String>();
@@ -254,13 +278,17 @@ public class ExecutableFlowBase extends ExecutableNode {
 		return updateData;
 	}
 	
-	@SuppressWarnings("unchecked")
-	public void applyUpdateObject(Map<String, Object> updateData) {
+	public void applyUpdateObject(Map<String, Object> updateData, List<ExecutableNode> updatedNodes) {
 		super.applyUpdateObject(updateData);
-
-		List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get(NODES_PARAM);
+		
 		if (updatedNodes != null) {
-			for (Map<String,Object> node: updatedNodes) {
+			updatedNodes.add(this);
+		}
+		
+		@SuppressWarnings("unchecked")
+		List<Map<String,Object>> nodes = (List<Map<String,Object>>)updateData.get(NODES_PARAM);
+		if (nodes != null) {
+			for (Map<String,Object> node: nodes) {
 	
 				String id = (String)node.get(ID_PARAM);
 				if (id == null) {
@@ -269,9 +297,23 @@ public class ExecutableFlowBase extends ExecutableNode {
 				}
 	
 				ExecutableNode exNode = executableNodes.get(id);
-				exNode.applyUpdateObject(node);
+				if (updatedNodes != null) {
+					updatedNodes.add(exNode);
+				}
+				
+				if (exNode instanceof ExecutableFlowBase) {
+					((ExecutableFlowBase)exNode).applyUpdateObject(node, updatedNodes);
+				}
+				else {
+					exNode.applyUpdateObject(updateData);
+				}
 			}
 		}
+		
+	}
+	
+	public void applyUpdateObject(Map<String, Object> updateData) {
+		applyUpdateObject(updateData, null);
 	}
 	
 	public void reEnableDependents(ExecutableNode ... nodes) {
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index c0333d0..db5fd8d 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -236,7 +236,7 @@ public class ExecutableNode {
 		return array;
 	}
 	
-	public String getPrintableId() {
+	public String getNestedId() {
 		return getPrintableId(":");
 	}
 	
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
new file mode 100644
index 0000000..d20f2f6
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -0,0 +1,262 @@
+package azkaban.test.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.execapp.event.FlowWatcher;
+import azkaban.execapp.event.LocalFlowWatcher;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.Status;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
+import azkaban.test.executor.InteractiveTestJob;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.Props;
+
+public class FlowRunnerPipelineTest {
+	private File workingDir;
+	private JobTypeManager jobtypeManager;
+	private ProjectLoader fakeProjectLoader;
+	private ExecutorLoader fakeExecutorLoader;
+	private Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+	private Project project;
+	private Map<String, Flow> flowMap;
+	private static int id=101;
+	
+	public FlowRunnerPipelineTest() {
+	}
+	
+	@Before
+	public void setUp() throws Exception {
+		System.out.println("Create temp dir");
+		workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+		if (workingDir.exists()) {
+			FileUtils.deleteDirectory(workingDir);
+		}
+		workingDir.mkdirs();
+		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager.registerJobType("java", JavaJob.class);
+		jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+		fakeProjectLoader = new MockProjectLoader(workingDir);
+		fakeExecutorLoader = new MockExecutorLoader();
+		project = new Project(1, "testProject");
+		
+		File dir = new File("unit/executions/embedded2");
+		prepareProject(dir);
+		
+		InteractiveTestJob.clearTestJobs();
+	}
+	
+	@After
+	public void tearDown() throws IOException {
+		System.out.println("Teardown temp dir");
+		if (workingDir != null) {
+			FileUtils.deleteDirectory(workingDir);
+			workingDir = null;
+		}
+	}
+	
+	@Test
+	public void testBasicPipelineLevel1Run() throws Exception {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner previousRunner = createFlowRunner(eventCollector, "jobf", "prev");
+		
+		ExecutionOptions options = new ExecutionOptions();
+		options.setPipelineExecutionId(previousRunner.getExecutableFlow().getExecutionId());
+		options.setPipelineLevel(1);
+		FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
+		FlowRunner pipelineRunner = createFlowRunner(eventCollector, "jobf", "pipe", options);
+		pipelineRunner.setFlowWatcher(watcher);
+		
+		Map<String, Status> previousExpectedStateMap = new HashMap<String, Status>();
+		Map<String, Status> pipelineExpectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> previousNodeMap = new HashMap<String, ExecutableNode>();
+		Map<String, ExecutableNode> pipelineNodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
+		ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
+		createExpectedStateMap(previousFlow, previousExpectedStateMap, previousNodeMap);
+		createExpectedStateMap(pipelineFlow, pipelineExpectedStateMap, pipelineNodeMap);
+		
+		Thread thread1 = runFlowRunnerInThread(previousRunner);
+		pause(250);
+		Thread thread2 = runFlowRunnerInThread(pipelineRunner);
+		pause(500);
+		
+		previousExpectedStateMap.put("joba", Status.RUNNING);
+		previousExpectedStateMap.put("joba1", Status.RUNNING);
+		pipelineExpectedStateMap.put("joba", Status.QUEUED);
+		pipelineExpectedStateMap.put("joba1", Status.QUEUED);
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+		
+		InteractiveTestJob.getTestJob("prev:joba").succeedJob();
+		pause(250);
+		previousExpectedStateMap.put("joba", Status.SUCCEEDED);
+		previousExpectedStateMap.put("jobb", Status.RUNNING);
+		previousExpectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		previousExpectedStateMap.put("jobd", Status.RUNNING);
+		previousExpectedStateMap.put("jobc", Status.RUNNING);
+		previousExpectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		pipelineExpectedStateMap.put("joba", Status.RUNNING);
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+		InteractiveTestJob.getTestJob("prev:jobb:innerJobA").succeedJob();
+		pause(250);
+		previousExpectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		previousExpectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		previousExpectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+		
+		InteractiveTestJob.getTestJob("pipe:joba").succeedJob();
+		pause(250);
+		pipelineExpectedStateMap.put("joba", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobb", Status.RUNNING);
+		pipelineExpectedStateMap.put("jobd", Status.RUNNING);
+		pipelineExpectedStateMap.put("jobc", Status.QUEUED);
+		pipelineExpectedStateMap.put("jobd:innerJobA", Status.QUEUED);
+		pipelineExpectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+		
+		InteractiveTestJob.getTestJob("prev:jobd:innerJobA").succeedJob();
+		pause(250);
+		previousExpectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+		previousExpectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
+		pipelineExpectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+		
+		// Finish the previous d side
+		InteractiveTestJob.getTestJob("prev:jobd:innerFlow2").succeedJob();
+		pause(250);
+		previousExpectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+		previousExpectedStateMap.put("jobd", Status.SUCCEEDED);
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		
+		InteractiveTestJob.getTestJob("prev:jobb:innerJobB").succeedJob();
+		InteractiveTestJob.getTestJob("prev:jobb:innerJobC").succeedJob();
+		InteractiveTestJob.getTestJob("prev:jobc").succeedJob();
+		pause(250);
+		InteractiveTestJob.getTestJob("pipe:jobb:innerJobA").succeedJob();
+		pause(250);
+		previousExpectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+		previousExpectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+		previousExpectedStateMap.put("jobb:innerFlow", Status.RUNNING);
+		previousExpectedStateMap.put("jobc", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobc", Status.RUNNING);
+		pipelineExpectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		pipelineExpectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+		
+//		Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
+//		Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
+//		Assert.assertFalse(thread1.isAlive());
+//		Assert.assertFalse(thread2.isAlive());
+	}
+	
+	private Thread runFlowRunnerInThread(FlowRunner runner) {
+		Thread thread = new Thread(runner);
+		thread.start();
+		return thread;
+	}
+	
+	private void pause(long millisec) {
+		synchronized(this) {
+			try {
+				wait(millisec);
+			}
+			catch (InterruptedException e) {
+			}
+		}
+	}
+	
+	private void createExpectedStateMap(ExecutableFlowBase flow, Map<String, Status> expectedStateMap, Map<String, ExecutableNode> nodeMap) {
+		for (ExecutableNode node: flow.getExecutableNodes()) {
+			expectedStateMap.put(node.getNestedId(), node.getStatus());
+			nodeMap.put(node.getNestedId(), node);
+			
+			if (node instanceof ExecutableFlowBase) {
+				createExpectedStateMap((ExecutableFlowBase)node, expectedStateMap, nodeMap);
+			}
+		}
+	}
+	
+	private void compareStates(Map<String, Status> expectedStateMap, Map<String, ExecutableNode> nodeMap) {
+		for (String printedId: expectedStateMap.keySet()) {
+			Status expectedStatus = expectedStateMap.get(printedId);
+			ExecutableNode node = nodeMap.get(printedId);
+			
+			if (expectedStatus != node.getStatus()) {
+				Assert.fail("Expected values do not match for " + printedId + ". Expected " + expectedStatus + ", instead received " + node.getStatus());
+			}
+		}
+	}
+	
+	private void prepareProject(File directory) throws ProjectManagerException, IOException {
+		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+		loader.loadProjectFlow(directory);
+		if (!loader.getErrors().isEmpty()) {
+			for (String error: loader.getErrors()) {
+				System.out.println(error);
+			}
+			
+			throw new RuntimeException("Errors found in setup");
+		}
+		
+		flowMap = loader.getFlowMap();
+		project.setFlows(flowMap);
+		FileUtils.copyDirectory(directory, workingDir);
+	}
+	
+	private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName, String groupName) throws Exception {
+		return createFlowRunner(eventCollector, flowName, groupName, new ExecutionOptions());
+	}
+	
+	private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName, String groupName, ExecutionOptions options) throws Exception {
+		Flow flow = flowMap.get(flowName);
+
+		int exId = id++;
+		ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+		exFlow.setExecutionPath(workingDir.getPath());
+		exFlow.setExecutionId(exId);
+
+		Map<String, String> flowParam = new HashMap<String, String>();
+		flowParam.put("group", groupName);
+		options.addAllFlowParameters(flowParam);
+		exFlow.setExecutionOptions(options);
+		fakeExecutorLoader.uploadExecutableFlow(exFlow);
+	
+		FlowRunner runner = new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+
+		runner.addListener(eventCollector);
+		
+		return runner;
+	}
+
+}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 98f7818..0ac1fb6 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -830,8 +830,8 @@ public class FlowRunnerTest2 {
 	
 	private void createExpectedStateMap(ExecutableFlowBase flow, Map<String, Status> expectedStateMap, Map<String, ExecutableNode> nodeMap) {
 		for (ExecutableNode node: flow.getExecutableNodes()) {
-			expectedStateMap.put(node.getPrintableId(), node.getStatus());
-			nodeMap.put(node.getPrintableId(), node);
+			expectedStateMap.put(node.getNestedId(), node.getStatus());
+			nodeMap.put(node.getNestedId(), node);
 			
 			if (node instanceof ExecutableFlowBase) {
 				createExpectedStateMap((ExecutableFlowBase)node, expectedStateMap, nodeMap);
diff --git a/unit/java/azkaban/test/executor/InteractiveTestJob.java b/unit/java/azkaban/test/executor/InteractiveTestJob.java
index f117d97..3c385de 100644
--- a/unit/java/azkaban/test/executor/InteractiveTestJob.java
+++ b/unit/java/azkaban/test/executor/InteractiveTestJob.java
@@ -29,7 +29,11 @@ public class InteractiveTestJob extends AbstractProcessJob {
 	@Override
 	public void run() throws Exception {
 		String nestedFlowPath = this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
+		String groupName = this.getJobProps().getString("group", null);
 		String id = nestedFlowPath == null ? this.getId() : nestedFlowPath;
+		if (groupName != null) {
+			id = groupName + ":" + id;
+		}
 		testJobs.put(id, this);
 		
 		while(isWaiting) {