azkaban-developers

Merge pull request #226 from rbpark/flowpropertiesfix Fixing

4/29/2014 6:29:16 PM

Details

diff --git a/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 971f9c1..0cb7c55 100644
--- a/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -107,15 +107,6 @@ public class AzkabanExecutorServer {
 		projectLoader = createProjectLoader(props);
 		runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, this.getClass().getClassLoader());
 		
-		String globalPropsPath = props.getString("executor.global.properties", null);
-		if (globalPropsPath == null) {
-			executorGlobalProps = new Props();
-		}
-		else {
-			executorGlobalProps = new Props(null, globalPropsPath);
-		}
-		runnerManager.setGlobalProps(executorGlobalProps);
-		
 		configureMBeanServer();
 
 		try {
diff --git a/src/main/java/azkaban/execapp/FlowRunner.java b/src/main/java/azkaban/execapp/FlowRunner.java
index fd8f116..c80ea63 100644
--- a/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/src/main/java/azkaban/execapp/FlowRunner.java
@@ -88,8 +88,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	// Properties map
 	private Map<String, Props> sharedProps = new HashMap<String, Props>();
-	
-	private Props globalProps;
 	private final JobTypeManager jobtypeManager;
 	
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
@@ -167,11 +165,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return this;
 	}
 	
-	public FlowRunner setGlobalProps(Props globalProps) {
-		this.globalProps = globalProps;
-		return this;
-	}
-	
 	public FlowRunner setNumJobThreads(int jobs) {
 		numJobThreads = jobs;
 		return this;
@@ -239,7 +232,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		String flowId = flow.getFlowId();
 		
 		// Add a bunch of common azkaban properties
-		Props commonFlowProps = PropsUtils.addCommonFlowProperties(this.globalProps, flow);
+		Props commonFlowProps = PropsUtils.addCommonFlowProperties(null, flow);
 		
 		if (flow.getJobSource() != null) {
 			String source = flow.getJobSource();
@@ -618,47 +611,38 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 	
-	
-	@SuppressWarnings("unchecked")
 	private void prepareJobProperties(ExecutableNode node) throws IOException {
 		if (node instanceof ExecutableFlow) {
 			return;
 		}
 		
 		Props props = null;
+		// 1. Shared properties (i.e. *.properties) for the jobs only. This takes the
+		// least precedence
+		if (!(node instanceof ExecutableFlowBase)) {
+			String sharedProps = node.getPropsSource();
+			if (sharedProps != null) {
+				props = this.sharedProps.get(sharedProps);
+			}
+		}
+		
 		// The following is the hiearchical ordering of dependency resolution
-		// 1. Parent Flow Properties
+		// 2. Parent Flow Properties
 		ExecutableFlowBase parentFlow = node.getParentFlow();
 		if (parentFlow != null) {
-			props = parentFlow.getInputProps();
-		}
-		
-		// 2. Shared Properties
-		String sharedProps = node.getPropsSource();
-		if (sharedProps != null) {
-			Props shared = this.sharedProps.get(sharedProps);
-			if (shared != null) {
-				// Clone because we may clobber
-				shared = Props.clone(shared);
-				shared.setEarliestAncestor(props);
-				props = shared;
-			}
-		}
-
-		// 3. Flow Override properties
-		Map<String,String> flowParam = flow.getExecutionOptions().getFlowParameters();
-		if (flowParam != null && !flowParam.isEmpty()) {
-			props = new Props(props, flowParam);
+			Props flowProps = Props.clone(parentFlow.getInputProps());
+			flowProps.setEarliestAncestor(props);
+			props = flowProps;
 		}
 		
-		// 4. Output Properties
+		// 3. Output Properties. The call creates a clone, so we can overwrite it.
 		Props outputProps = collectOutputProps(node);
 		if (outputProps != null) {
 			outputProps.setEarliestAncestor(props);
 			props = outputProps;
 		}
 		
-		// 5. The job source
+		// 4. The job source.
 		Props jobSource = loadJobProps(node);
 		if (jobSource != null) {
 			jobSource.setParent(props);
diff --git a/src/main/java/azkaban/execapp/FlowRunnerManager.java b/src/main/java/azkaban/execapp/FlowRunnerManager.java
index a9e2059..7b75756 100644
--- a/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -84,7 +84,7 @@ public class FlowRunnerManager implements EventListener {
 	
 	private JobTypeManager jobtypeManager;
 	
-	private Props globalProps;
+	private Props globalProps = null;
 	
 	private final Props azkabanProps;
 	
@@ -139,7 +139,15 @@ public class FlowRunnerManager implements EventListener {
 		cleanerThread = new CleanerThread();
 		cleanerThread.start();
 		
-		jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
+		String globalPropsPath = props.getString("executor.global.properties", null);
+		if (globalPropsPath != null) {
+			globalProps = new Props(null, globalPropsPath);
+		}
+		
+		jobtypeManager = new JobTypeManager(
+				props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR),
+				globalProps,
+				parentClassLoader);
 	}
 
 	private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
@@ -406,7 +414,6 @@ public class FlowRunnerManager implements EventListener {
 		runner.setFlowWatcher(watcher)
 			.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
 			.setValidateProxyUser(validateProxyUser)
-			.setGlobalProps(globalProps)
 			.setNumJobThreads(numJobThreads)
 			.addListener(this);
 		
diff --git a/src/main/java/azkaban/jobtype/JobTypeManager.java b/src/main/java/azkaban/jobtype/JobTypeManager.java
index df9627e..58344c4 100644
--- a/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -50,14 +50,16 @@ public class JobTypeManager
 	private static final Logger logger = Logger.getLogger(JobTypeManager.class);
 	
 	private JobTypePluginSet pluginSet;
-
-	public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader) {
+	private Props globalProperties;
+	
+	public JobTypeManager(String jobtypePluginDir, Props globalProperties, ClassLoader parentClassLoader) {
 		this.jobTypePluginDir = jobtypePluginDir;
 		this.parentLoader = parentClassLoader;
+		this.globalProperties = globalProperties;
 		
 		loadPlugins();
 	}
-
+	
 	public void loadPlugins() throws JobTypeManagerException {
 		JobTypePluginSet plugins = new JobTypePluginSet();
 		
@@ -113,7 +115,7 @@ public class JobTypeManager
 		if (commonJobPropsFile.exists()) {
 			logger.info("Common plugin job props file " + commonJobPropsFile + " found. Attempt to load.");
 			try {
-				commonPluginJobProps = new Props(null, commonJobPropsFile);
+				commonPluginJobProps = new Props(globalProperties, commonJobPropsFile);
 			}
 			catch (IOException e) {
 				throw new JobTypeManagerException("Failed to load common plugin job properties" + e.getCause());
diff --git a/unit/executions/execpropstest/innerflow.job b/unit/executions/execpropstest/innerflow.job
new file mode 100644
index 0000000..18def67
--- /dev/null
+++ b/unit/executions/execpropstest/innerflow.job
@@ -0,0 +1,6 @@
+type=flow
+flow.name=job4
+dependencies=job2
+props5=innerflow5
+props6=innerflow6
+props8=innerflow8
\ No newline at end of file
diff --git a/unit/executions/execpropstest/job1.job b/unit/executions/execpropstest/job1.job
new file mode 100644
index 0000000..d910f1a
--- /dev/null
+++ b/unit/executions/execpropstest/job1.job
@@ -0,0 +1,4 @@
+type=test
+props1=job1
+props2=job2
+props8=job8
\ No newline at end of file
diff --git a/unit/executions/execpropstest/job3.job b/unit/executions/execpropstest/job3.job
new file mode 100644
index 0000000..d9be481
--- /dev/null
+++ b/unit/executions/execpropstest/job3.job
@@ -0,0 +1,3 @@
+type=test
+dependencies=innerflow
+props3=job3
\ No newline at end of file
diff --git a/unit/executions/execpropstest/moo.properties b/unit/executions/execpropstest/moo.properties
new file mode 100644
index 0000000..7e4f399
--- /dev/null
+++ b/unit/executions/execpropstest/moo.properties
@@ -0,0 +1,3 @@
+props3=moo3
+props4=moo4
+props5=moo5
\ No newline at end of file
diff --git a/unit/executions/execpropstest/shared.properties b/unit/executions/execpropstest/shared.properties
new file mode 100644
index 0000000..29b2b7b
--- /dev/null
+++ b/unit/executions/execpropstest/shared.properties
@@ -0,0 +1,3 @@
+props1=shared1
+props2=shared2
+props6=shared6
\ No newline at end of file
diff --git a/unit/executions/execpropstest/subdir/job2.job b/unit/executions/execpropstest/subdir/job2.job
new file mode 100644
index 0000000..bfd48e3
--- /dev/null
+++ b/unit/executions/execpropstest/subdir/job2.job
@@ -0,0 +1,3 @@
+type=test
+props2=job2
+props7=job7
\ No newline at end of file
diff --git a/unit/executions/execpropstest/subdir/job4.job b/unit/executions/execpropstest/subdir/job4.job
new file mode 100644
index 0000000..e26aaa6
--- /dev/null
+++ b/unit/executions/execpropstest/subdir/job4.job
@@ -0,0 +1,4 @@
+type=test
+dependencies=job1
+props8=job8
+props9=job9
\ No newline at end of file
diff --git a/unit/executions/execpropstest/subdir/shared.properties b/unit/executions/execpropstest/subdir/shared.properties
new file mode 100644
index 0000000..2ba1880
--- /dev/null
+++ b/unit/executions/execpropstest/subdir/shared.properties
@@ -0,0 +1,2 @@
+props4=shared4
+props8=shared8
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index cc3b1e5..fb13d04 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -36,7 +36,7 @@ public class LocalFlowWatcherTest {
 	
 	@Before
 	public void setUp() throws Exception {
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 	}
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index d8e94dd..3764a81 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -37,7 +37,7 @@ public class RemoteFlowWatcherTest {
 	
 	@Before
 	public void setUp() throws Exception {
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 	}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index df9cac1..49bb4b5 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -73,7 +73,7 @@ public class FlowRunnerPipelineTest {
 			FileUtils.deleteDirectory(workingDir);
 		}
 		workingDir.mkdirs();
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
 		
 		pluginSet.addPluginClass("java", JavaJob.class);
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java
new file mode 100644
index 0000000..04adf21
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
+import azkaban.test.executor.InteractiveTestJob;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.Props;
+
+/**
+ * Test the property resolution of jobs in a flow.
+ * 
+ * The tests are contained in execpropstest, and should be resolved in the following fashion,
+ * where the later props take precedence over the previous ones.
+ * 
+ * 1. Global props (set in the FlowRunner)
+ * 2. Shared job props (depends on job directory)
+ * 3. Flow Override properties
+ * 4. Previous job outputs to the embedded flow (Only if contained in embedded flow)
+ * 5. Embedded flow properties (Only if contained in embedded flow)
+ * 6. Previous job outputs (if exists)
+ * 7. Job Props
+ * 
+ * The test contains the following structure:
+ * job2 -> innerFlow (job1 -> job4 ) -> job3
+ * 
+ * job2 and 4 are in nested directories so should have different shared properties than other jobs.
+ */
+public class FlowRunnerPropertyResolutionTest {
+	private File workingDir;
+	private JobTypeManager jobtypeManager;
+	private ProjectLoader fakeProjectLoader;
+	private ExecutorLoader fakeExecutorLoader;
+	private Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+	private Project project;
+	private Map<String, Flow> flowMap;
+	private static int id=101;
+	
+	@Before
+	public void setUp() throws Exception {
+		System.out.println("Create temp dir");
+		workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+		if (workingDir.exists()) {
+			FileUtils.deleteDirectory(workingDir);
+		}
+		workingDir.mkdirs();
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
+		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
+		jobtypeManager.getJobTypePluginSet().addPluginClass("test", InteractiveTestJob.class);
+		fakeProjectLoader = new MockProjectLoader(workingDir);
+		fakeExecutorLoader = new MockExecutorLoader();
+		project = new Project(1, "testProject");
+		
+		File dir = new File("unit/executions/execpropstest");
+		prepareProject(dir);
+		
+		InteractiveTestJob.clearTestJobs();
+	}
+	
+	@After
+	public void tearDown() throws IOException {
+		System.out.println("Teardown temp dir");
+		if (workingDir != null) {
+			FileUtils.deleteDirectory(workingDir);
+			workingDir = null;
+		}
+	}
+	
+	/**
+	 * Tests the basic flow resolution. Flow is defined in execpropstest
+	 * @throws Exception
+	 */
+	@Test
+	public void testPropertyResolution() throws Exception {
+		HashMap<String, String> flowProps = new HashMap<String,String>();
+		flowProps.put("props7", "flow7");
+		flowProps.put("props6", "flow6");
+		flowProps.put("props5", "flow5");
+		FlowRunner runner = createFlowRunner("job3", flowProps);
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		createNodeMap(runner.getExecutableFlow(), nodeMap);
+		
+		// 1. Start flow. Job 2 should start
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// Job 2 is a normal job.
+		// Only the flow overrides and the shared properties matter
+		ExecutableNode node = nodeMap.get("job2");
+		Props job2Props = node.getInputProps();
+		Assert.assertEquals("shared1", job2Props.get("props1"));
+		Assert.assertEquals("job2", job2Props.get("props2"));
+		Assert.assertEquals("moo3", job2Props.get("props3"));
+		Assert.assertEquals("job7", job2Props.get("props7"));
+		Assert.assertEquals("flow5", job2Props.get("props5"));
+		Assert.assertEquals("flow6", job2Props.get("props6"));
+		Assert.assertEquals("shared4", job2Props.get("props4"));
+		Assert.assertEquals("shared8", job2Props.get("props8"));
+		
+		// Job 1 is inside another flow, and is nested in a different directory
+		// The priority order should be: job1->innerflow->job2.output->flow.overrides->job1 shared props
+		Props job2Generated = new Props();
+		job2Generated.put("props6","gjob6");
+		job2Generated.put("props9","gjob9");
+		job2Generated.put("props10","gjob10");
+		InteractiveTestJob.getTestJob("job2").succeedJob(job2Generated);
+		pause(250);
+		node = nodeMap.get("innerflow:job1");
+		Props job1Props = node.getInputProps();
+		Assert.assertEquals("job1", job1Props.get("props1"));
+		Assert.assertEquals("job2", job1Props.get("props2"));
+		Assert.assertEquals("job8", job1Props.get("props8"));
+		Assert.assertEquals("gjob9", job1Props.get("props9"));
+		Assert.assertEquals("gjob10", job1Props.get("props10"));
+		Assert.assertEquals("innerflow6", job1Props.get("props6"));
+		Assert.assertEquals("innerflow5", job1Props.get("props5"));
+		Assert.assertEquals("flow7", job1Props.get("props7"));
+		Assert.assertEquals("moo3", job1Props.get("props3"));
+		Assert.assertEquals("moo4", job1Props.get("props4"));
+		
+		// Job 4 is inside another flow and takes output from job 1
+		// The priority order should be: job4->job1.output->innerflow->job2.output->flow.overrides->job4 shared props
+		Props job1GeneratedProps = new Props();
+		job1GeneratedProps.put("props9", "g2job9");
+		job1GeneratedProps.put("props7", "g2job7");
+		InteractiveTestJob.getTestJob("innerflow:job1").succeedJob(job1GeneratedProps);
+		pause(250);
+		node = nodeMap.get("innerflow:job4");
+		Props job4Props = node.getInputProps();
+		Assert.assertEquals("job8", job4Props.get("props8"));
+		Assert.assertEquals("job9", job4Props.get("props9"));
+		Assert.assertEquals("g2job7", job4Props.get("props7"));
+		Assert.assertEquals("innerflow5", job4Props.get("props5"));
+		Assert.assertEquals("innerflow6", job4Props.get("props6"));
+		Assert.assertEquals("gjob10", job4Props.get("props10"));
+		Assert.assertEquals("shared4", job4Props.get("props4"));
+		Assert.assertEquals("shared1", job4Props.get("props1"));
+		Assert.assertEquals("shared2", job4Props.get("props2"));
+		Assert.assertEquals("moo3", job4Props.get("props3"));
+		
+		// Job 3 is a normal job taking props from an embedded flow
+		// The priority order should be: job3->innerflow.output->flow.overrides->job3.sharedprops
+		Props job4GeneratedProps = new Props();
+		job4GeneratedProps.put("props9", "g4job9");
+		job4GeneratedProps.put("props6", "g4job6");
+		InteractiveTestJob.getTestJob("innerflow:job4").succeedJob(job4GeneratedProps);
+		pause(250);
+		node = nodeMap.get("job3");
+		Props job3Props = node.getInputProps();
+		Assert.assertEquals("job3", job3Props.get("props3"));
+		Assert.assertEquals("g4job6", job3Props.get("props6"));
+		Assert.assertEquals("g4job9", job3Props.get("props9"));
+		Assert.assertEquals("flow7", job3Props.get("props7"));
+		Assert.assertEquals("flow5", job3Props.get("props5"));
+		Assert.assertEquals("shared1", job3Props.get("props1"));
+		Assert.assertEquals("shared2", job3Props.get("props2"));
+		Assert.assertEquals("moo4", job3Props.get("props4"));
+	}
+	
+	private void prepareProject(File directory) throws ProjectManagerException, IOException {
+		DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+		loader.loadProjectFlow(directory);
+		if (!loader.getErrors().isEmpty()) {
+			for (String error: loader.getErrors()) {
+				System.out.println(error);
+			}
+			
+			throw new RuntimeException("Errors found in setup");
+		}
+		
+		flowMap = loader.getFlowMap();
+		project.setFlows(flowMap);
+		FileUtils.copyDirectory(directory, workingDir);
+	}
+	
+	private FlowRunner createFlowRunner(String flowName, HashMap<String, String> flowParams) throws Exception {
+		Flow flow = flowMap.get(flowName);
+
+		int exId = id++;
+		ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+		exFlow.setExecutionPath(workingDir.getPath());
+		exFlow.setExecutionId(exId);
+
+		exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
+		fakeExecutorLoader.uploadExecutableFlow(exFlow);
+	
+		FlowRunner runner = new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+		return runner;
+	}
+
+	private void createNodeMap(ExecutableFlowBase flow, Map<String, ExecutableNode> nodeMap) {
+		for (ExecutableNode node: flow.getExecutableNodes()) {
+			nodeMap.put(node.getNestedId(), node);
+			
+			if (node instanceof ExecutableFlowBase) {
+				createNodeMap((ExecutableFlowBase)node, nodeMap);
+			}
+		}
+	}
+
+	private Thread runFlowRunnerInThread(FlowRunner runner) {
+		Thread thread = new Thread(runner);
+		thread.start();
+		return thread;
+	}
+	
+	private void pause(long millisec) {
+		synchronized(this) {
+			try {
+				wait(millisec);
+			}
+			catch (InterruptedException e) {
+			}
+		}
+	}
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 2e708fe..911a48b 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -47,7 +47,7 @@ public class FlowRunnerTest {
 			}
 			workingDir.mkdirs();
 		}
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
 		pluginSet.addPluginClass("java", JavaJob.class);
 		pluginSet.addPluginClass("test", InteractiveTestJob.class);
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 0c9d7b9..bc31261 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -92,7 +92,7 @@ public class FlowRunnerTest2 {
 			FileUtils.deleteDirectory(workingDir);
 		}
 		workingDir.mkdirs();
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
 		
 		pluginSet.addPluginClass("java", JavaJob.class);
@@ -176,16 +176,7 @@ public class FlowRunnerTest2 {
 		ExecutableNode node = nodeMap.get("jobb");
 		Assert.assertEquals(Status.RUNNING, node.getStatus());
 		Props jobb = node.getInputProps();
-		Assert.assertEquals("test1.1", jobb.get("param1"));
-		Assert.assertEquals("test1.1", jobb.get("param1"));
-		Assert.assertEquals("test1.2", jobb.get("param2"));
-		Assert.assertEquals("test1.3", jobb.get("param3"));
 		Assert.assertEquals("override.4", jobb.get("param4"));
-		Assert.assertEquals("test2.5", jobb.get("param5"));
-		Assert.assertEquals("test2.6", jobb.get("param6"));
-		Assert.assertEquals("test2.7", jobb.get("param7"));
-		Assert.assertEquals("test2.8", jobb.get("param8"));
-		Assert.assertEquals("test2.8", jobb.get("param8"));
 		// Test that jobb properties overwrites the output properties
 		Assert.assertEquals("moo", jobb.get("testprops"));
 		Assert.assertEquals("jobb", jobb.get("output.override"));
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 6fcbb11..abee6f0 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -41,7 +41,7 @@ public class JobRunnerTest {
 			FileUtils.deleteDirectory(workingDir);
 		}
 		workingDir.mkdirs();
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		
 		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 	}
diff --git a/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
index a4c5cb0..9669b51 100644
--- a/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
+++ b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
@@ -52,7 +52,7 @@ public class JobTypeManagerTest {
 		jobTypeDir.mkdirs();
 		
 		FileUtils.copyDirectory(new File("unit/plugins/jobtypes"), jobTypeDir);
-		manager = new JobTypeManager(TEST_PLUGIN_DIR, this.getClass().getClassLoader());
+		manager = new JobTypeManager(TEST_PLUGIN_DIR, null, this.getClass().getClassLoader());
 	}
 	
 	@After