azkaban-developers
Changes
src/main/java/azkaban/execapp/FlowRunner.java 48(+16 -32)
Details
diff --git a/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 971f9c1..0cb7c55 100644
--- a/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -107,15 +107,6 @@ public class AzkabanExecutorServer {
projectLoader = createProjectLoader(props);
runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, this.getClass().getClassLoader());
- String globalPropsPath = props.getString("executor.global.properties", null);
- if (globalPropsPath == null) {
- executorGlobalProps = new Props();
- }
- else {
- executorGlobalProps = new Props(null, globalPropsPath);
- }
- runnerManager.setGlobalProps(executorGlobalProps);
-
configureMBeanServer();
try {
src/main/java/azkaban/execapp/FlowRunner.java 48(+16 -32)
diff --git a/src/main/java/azkaban/execapp/FlowRunner.java b/src/main/java/azkaban/execapp/FlowRunner.java
index fd8f116..c80ea63 100644
--- a/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/src/main/java/azkaban/execapp/FlowRunner.java
@@ -88,8 +88,6 @@ public class FlowRunner extends EventHandler implements Runnable {
// Properties map
private Map<String, Props> sharedProps = new HashMap<String, Props>();
-
- private Props globalProps;
private final JobTypeManager jobtypeManager;
private JobRunnerEventListener listener = new JobRunnerEventListener();
@@ -167,11 +165,6 @@ public class FlowRunner extends EventHandler implements Runnable {
return this;
}
- public FlowRunner setGlobalProps(Props globalProps) {
- this.globalProps = globalProps;
- return this;
- }
-
public FlowRunner setNumJobThreads(int jobs) {
numJobThreads = jobs;
return this;
@@ -239,7 +232,7 @@ public class FlowRunner extends EventHandler implements Runnable {
String flowId = flow.getFlowId();
// Add a bunch of common azkaban properties
- Props commonFlowProps = PropsUtils.addCommonFlowProperties(this.globalProps, flow);
+ Props commonFlowProps = PropsUtils.addCommonFlowProperties(null, flow);
if (flow.getJobSource() != null) {
String source = flow.getJobSource();
@@ -618,47 +611,38 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
-
- @SuppressWarnings("unchecked")
private void prepareJobProperties(ExecutableNode node) throws IOException {
if (node instanceof ExecutableFlow) {
return;
}
Props props = null;
+ // 1. Shared properties (i.e. *.properties) for the jobs only. This takes the
+ // least precedence
+ if (!(node instanceof ExecutableFlowBase)) {
+ String sharedProps = node.getPropsSource();
+ if (sharedProps != null) {
+ props = this.sharedProps.get(sharedProps);
+ }
+ }
+
// The following is the hiearchical ordering of dependency resolution
- // 1. Parent Flow Properties
+ // 2. Parent Flow Properties
ExecutableFlowBase parentFlow = node.getParentFlow();
if (parentFlow != null) {
- props = parentFlow.getInputProps();
- }
-
- // 2. Shared Properties
- String sharedProps = node.getPropsSource();
- if (sharedProps != null) {
- Props shared = this.sharedProps.get(sharedProps);
- if (shared != null) {
- // Clone because we may clobber
- shared = Props.clone(shared);
- shared.setEarliestAncestor(props);
- props = shared;
- }
- }
-
- // 3. Flow Override properties
- Map<String,String> flowParam = flow.getExecutionOptions().getFlowParameters();
- if (flowParam != null && !flowParam.isEmpty()) {
- props = new Props(props, flowParam);
+ Props flowProps = Props.clone(parentFlow.getInputProps());
+ flowProps.setEarliestAncestor(props);
+ props = flowProps;
}
- // 4. Output Properties
+ // 3. Output Properties. The call creates a clone, so we can overwrite it.
Props outputProps = collectOutputProps(node);
if (outputProps != null) {
outputProps.setEarliestAncestor(props);
props = outputProps;
}
- // 5. The job source
+ // 4. The job source.
Props jobSource = loadJobProps(node);
if (jobSource != null) {
jobSource.setParent(props);
diff --git a/src/main/java/azkaban/execapp/FlowRunnerManager.java b/src/main/java/azkaban/execapp/FlowRunnerManager.java
index a9e2059..7b75756 100644
--- a/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -84,7 +84,7 @@ public class FlowRunnerManager implements EventListener {
private JobTypeManager jobtypeManager;
- private Props globalProps;
+ private Props globalProps = null;
private final Props azkabanProps;
@@ -139,7 +139,15 @@ public class FlowRunnerManager implements EventListener {
cleanerThread = new CleanerThread();
cleanerThread.start();
- jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
+ String globalPropsPath = props.getString("executor.global.properties", null);
+ if (globalPropsPath != null) {
+ globalProps = new Props(null, globalPropsPath);
+ }
+
+ jobtypeManager = new JobTypeManager(
+ props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR),
+ globalProps,
+ parentClassLoader);
}
private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
@@ -406,7 +414,6 @@ public class FlowRunnerManager implements EventListener {
runner.setFlowWatcher(watcher)
.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
.setValidateProxyUser(validateProxyUser)
- .setGlobalProps(globalProps)
.setNumJobThreads(numJobThreads)
.addListener(this);
diff --git a/src/main/java/azkaban/jobtype/JobTypeManager.java b/src/main/java/azkaban/jobtype/JobTypeManager.java
index df9627e..58344c4 100644
--- a/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -50,14 +50,16 @@ public class JobTypeManager
private static final Logger logger = Logger.getLogger(JobTypeManager.class);
private JobTypePluginSet pluginSet;
-
- public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader) {
+ private Props globalProperties;
+
+ public JobTypeManager(String jobtypePluginDir, Props globalProperties, ClassLoader parentClassLoader) {
this.jobTypePluginDir = jobtypePluginDir;
this.parentLoader = parentClassLoader;
+ this.globalProperties = globalProperties;
loadPlugins();
}
-
+
public void loadPlugins() throws JobTypeManagerException {
JobTypePluginSet plugins = new JobTypePluginSet();
@@ -113,7 +115,7 @@ public class JobTypeManager
if (commonJobPropsFile.exists()) {
logger.info("Common plugin job props file " + commonJobPropsFile + " found. Attempt to load.");
try {
- commonPluginJobProps = new Props(null, commonJobPropsFile);
+ commonPluginJobProps = new Props(globalProperties, commonJobPropsFile);
}
catch (IOException e) {
throw new JobTypeManagerException("Failed to load common plugin job properties" + e.getCause());
diff --git a/unit/executions/execpropstest/innerflow.job b/unit/executions/execpropstest/innerflow.job
new file mode 100644
index 0000000..18def67
--- /dev/null
+++ b/unit/executions/execpropstest/innerflow.job
@@ -0,0 +1,6 @@
+type=flow
+flow.name=job4
+dependencies=job2
+props5=innerflow5
+props6=innerflow6
+props8=innerflow8
\ No newline at end of file
diff --git a/unit/executions/execpropstest/job1.job b/unit/executions/execpropstest/job1.job
new file mode 100644
index 0000000..d910f1a
--- /dev/null
+++ b/unit/executions/execpropstest/job1.job
@@ -0,0 +1,4 @@
+type=test
+props1=job1
+props2=job2
+props8=job8
\ No newline at end of file
diff --git a/unit/executions/execpropstest/job3.job b/unit/executions/execpropstest/job3.job
new file mode 100644
index 0000000..d9be481
--- /dev/null
+++ b/unit/executions/execpropstest/job3.job
@@ -0,0 +1,3 @@
+type=test
+dependencies=innerflow
+props3=job3
\ No newline at end of file
diff --git a/unit/executions/execpropstest/moo.properties b/unit/executions/execpropstest/moo.properties
new file mode 100644
index 0000000..7e4f399
--- /dev/null
+++ b/unit/executions/execpropstest/moo.properties
@@ -0,0 +1,3 @@
+props3=moo3
+props4=moo4
+props5=moo5
\ No newline at end of file
diff --git a/unit/executions/execpropstest/shared.properties b/unit/executions/execpropstest/shared.properties
new file mode 100644
index 0000000..29b2b7b
--- /dev/null
+++ b/unit/executions/execpropstest/shared.properties
@@ -0,0 +1,3 @@
+props1=shared1
+props2=shared2
+props6=shared6
\ No newline at end of file
diff --git a/unit/executions/execpropstest/subdir/job2.job b/unit/executions/execpropstest/subdir/job2.job
new file mode 100644
index 0000000..bfd48e3
--- /dev/null
+++ b/unit/executions/execpropstest/subdir/job2.job
@@ -0,0 +1,3 @@
+type=test
+props2=job2
+props7=job7
\ No newline at end of file
diff --git a/unit/executions/execpropstest/subdir/job4.job b/unit/executions/execpropstest/subdir/job4.job
new file mode 100644
index 0000000..e26aaa6
--- /dev/null
+++ b/unit/executions/execpropstest/subdir/job4.job
@@ -0,0 +1,4 @@
+type=test
+dependencies=job1
+props8=job8
+props9=job9
\ No newline at end of file
diff --git a/unit/executions/execpropstest/subdir/shared.properties b/unit/executions/execpropstest/subdir/shared.properties
new file mode 100644
index 0000000..2ba1880
--- /dev/null
+++ b/unit/executions/execpropstest/subdir/shared.properties
@@ -0,0 +1,2 @@
+props4=shared4
+props8=shared8
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index cc3b1e5..fb13d04 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -36,7 +36,7 @@ public class LocalFlowWatcherTest {
@Before
public void setUp() throws Exception {
- jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
}
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index d8e94dd..3764a81 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -37,7 +37,7 @@ public class RemoteFlowWatcherTest {
@Before
public void setUp() throws Exception {
- jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index df9cac1..49bb4b5 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -73,7 +73,7 @@ public class FlowRunnerPipelineTest {
FileUtils.deleteDirectory(workingDir);
}
workingDir.mkdirs();
- jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
pluginSet.addPluginClass("java", JavaJob.class);
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java
new file mode 100644
index 0000000..04adf21
--- /dev/null
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorLoader;
+import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
+import azkaban.test.executor.InteractiveTestJob;
+import azkaban.test.executor.JavaJob;
+import azkaban.utils.DirectoryFlowLoader;
+import azkaban.utils.Props;
+
+/**
+ * Test the property resolution of jobs in a flow.
+ *
+ * The tests are contained in execpropstest, and should be resolved in the following fashion,
+ * where the later props take precedence over the previous ones.
+ *
+ * 1. Global props (set in the FlowRunner)
+ * 2. Shared job props (depends on job directory)
+ * 3. Flow Override properties
+ * 4. Previous job outputs to the embedded flow (Only if contained in embedded flow)
+ * 5. Embedded flow properties (Only if contained in embedded flow)
+ * 6. Previous job outputs (if exists)
+ * 7. Job Props
+ *
+ * The test contains the following structure:
+ * job2 -> innerFlow (job1 -> job4 ) -> job3
+ *
+ * job2 and 4 are in nested directories so should have different shared properties than other jobs.
+ */
+public class FlowRunnerPropertyResolutionTest {
+ private File workingDir;
+ private JobTypeManager jobtypeManager;
+ private ProjectLoader fakeProjectLoader;
+ private ExecutorLoader fakeExecutorLoader;
+ private Logger logger = Logger.getLogger(FlowRunnerTest2.class);
+ private Project project;
+ private Map<String, Flow> flowMap;
+ private static int id=101;
+
+ @Before
+ public void setUp() throws Exception {
+ System.out.println("Create temp dir");
+ workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+ if (workingDir.exists()) {
+ FileUtils.deleteDirectory(workingDir);
+ }
+ workingDir.mkdirs();
+ jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
+ jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
+ jobtypeManager.getJobTypePluginSet().addPluginClass("test", InteractiveTestJob.class);
+ fakeProjectLoader = new MockProjectLoader(workingDir);
+ fakeExecutorLoader = new MockExecutorLoader();
+ project = new Project(1, "testProject");
+
+ File dir = new File("unit/executions/execpropstest");
+ prepareProject(dir);
+
+ InteractiveTestJob.clearTestJobs();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ System.out.println("Teardown temp dir");
+ if (workingDir != null) {
+ FileUtils.deleteDirectory(workingDir);
+ workingDir = null;
+ }
+ }
+
+ /**
+ * Tests the basic flow resolution. Flow is defined in execpropstest
+ * @throws Exception
+ */
+ @Test
+ public void testPropertyResolution() throws Exception {
+ HashMap<String, String> flowProps = new HashMap<String,String>();
+ flowProps.put("props7", "flow7");
+ flowProps.put("props6", "flow6");
+ flowProps.put("props5", "flow5");
+ FlowRunner runner = createFlowRunner("job3", flowProps);
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+ createNodeMap(runner.getExecutableFlow(), nodeMap);
+
+ // 1. Start flow. Job 2 should start
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // Job 2 is a normal job.
+ // Only the flow overrides and the shared properties matter
+ ExecutableNode node = nodeMap.get("job2");
+ Props job2Props = node.getInputProps();
+ Assert.assertEquals("shared1", job2Props.get("props1"));
+ Assert.assertEquals("job2", job2Props.get("props2"));
+ Assert.assertEquals("moo3", job2Props.get("props3"));
+ Assert.assertEquals("job7", job2Props.get("props7"));
+ Assert.assertEquals("flow5", job2Props.get("props5"));
+ Assert.assertEquals("flow6", job2Props.get("props6"));
+ Assert.assertEquals("shared4", job2Props.get("props4"));
+ Assert.assertEquals("shared8", job2Props.get("props8"));
+
+ // Job 1 is inside another flow, and is nested in a different directory
+ // The priority order should be: job1->innerflow->job2.output->flow.overrides->job1 shared props
+ Props job2Generated = new Props();
+ job2Generated.put("props6","gjob6");
+ job2Generated.put("props9","gjob9");
+ job2Generated.put("props10","gjob10");
+ InteractiveTestJob.getTestJob("job2").succeedJob(job2Generated);
+ pause(250);
+ node = nodeMap.get("innerflow:job1");
+ Props job1Props = node.getInputProps();
+ Assert.assertEquals("job1", job1Props.get("props1"));
+ Assert.assertEquals("job2", job1Props.get("props2"));
+ Assert.assertEquals("job8", job1Props.get("props8"));
+ Assert.assertEquals("gjob9", job1Props.get("props9"));
+ Assert.assertEquals("gjob10", job1Props.get("props10"));
+ Assert.assertEquals("innerflow6", job1Props.get("props6"));
+ Assert.assertEquals("innerflow5", job1Props.get("props5"));
+ Assert.assertEquals("flow7", job1Props.get("props7"));
+ Assert.assertEquals("moo3", job1Props.get("props3"));
+ Assert.assertEquals("moo4", job1Props.get("props4"));
+
+ // Job 4 is inside another flow and takes output from job 1
+ // The priority order should be: job4->job1.output->innerflow->job2.output->flow.overrides->job4 shared props
+ Props job1GeneratedProps = new Props();
+ job1GeneratedProps.put("props9", "g2job9");
+ job1GeneratedProps.put("props7", "g2job7");
+ InteractiveTestJob.getTestJob("innerflow:job1").succeedJob(job1GeneratedProps);
+ pause(250);
+ node = nodeMap.get("innerflow:job4");
+ Props job4Props = node.getInputProps();
+ Assert.assertEquals("job8", job4Props.get("props8"));
+ Assert.assertEquals("job9", job4Props.get("props9"));
+ Assert.assertEquals("g2job7", job4Props.get("props7"));
+ Assert.assertEquals("innerflow5", job4Props.get("props5"));
+ Assert.assertEquals("innerflow6", job4Props.get("props6"));
+ Assert.assertEquals("gjob10", job4Props.get("props10"));
+ Assert.assertEquals("shared4", job4Props.get("props4"));
+ Assert.assertEquals("shared1", job4Props.get("props1"));
+ Assert.assertEquals("shared2", job4Props.get("props2"));
+ Assert.assertEquals("moo3", job4Props.get("props3"));
+
+ // Job 3 is a normal job taking props from an embedded flow
+ // The priority order should be: job3->innerflow.output->flow.overrides->job3.sharedprops
+ Props job4GeneratedProps = new Props();
+ job4GeneratedProps.put("props9", "g4job9");
+ job4GeneratedProps.put("props6", "g4job6");
+ InteractiveTestJob.getTestJob("innerflow:job4").succeedJob(job4GeneratedProps);
+ pause(250);
+ node = nodeMap.get("job3");
+ Props job3Props = node.getInputProps();
+ Assert.assertEquals("job3", job3Props.get("props3"));
+ Assert.assertEquals("g4job6", job3Props.get("props6"));
+ Assert.assertEquals("g4job9", job3Props.get("props9"));
+ Assert.assertEquals("flow7", job3Props.get("props7"));
+ Assert.assertEquals("flow5", job3Props.get("props5"));
+ Assert.assertEquals("shared1", job3Props.get("props1"));
+ Assert.assertEquals("shared2", job3Props.get("props2"));
+ Assert.assertEquals("moo4", job3Props.get("props4"));
+ }
+
+ private void prepareProject(File directory) throws ProjectManagerException, IOException {
+ DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
+ loader.loadProjectFlow(directory);
+ if (!loader.getErrors().isEmpty()) {
+ for (String error: loader.getErrors()) {
+ System.out.println(error);
+ }
+
+ throw new RuntimeException("Errors found in setup");
+ }
+
+ flowMap = loader.getFlowMap();
+ project.setFlows(flowMap);
+ FileUtils.copyDirectory(directory, workingDir);
+ }
+
+ private FlowRunner createFlowRunner(String flowName, HashMap<String, String> flowParams) throws Exception {
+ Flow flow = flowMap.get(flowName);
+
+ int exId = id++;
+ ExecutableFlow exFlow = new ExecutableFlow(project, flow);
+ exFlow.setExecutionPath(workingDir.getPath());
+ exFlow.setExecutionId(exId);
+
+ exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
+ fakeExecutorLoader.uploadExecutableFlow(exFlow);
+
+ FlowRunner runner = new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader, fakeProjectLoader, jobtypeManager);
+ return runner;
+ }
+
+ private void createNodeMap(ExecutableFlowBase flow, Map<String, ExecutableNode> nodeMap) {
+ for (ExecutableNode node: flow.getExecutableNodes()) {
+ nodeMap.put(node.getNestedId(), node);
+
+ if (node instanceof ExecutableFlowBase) {
+ createNodeMap((ExecutableFlowBase)node, nodeMap);
+ }
+ }
+ }
+
+ private Thread runFlowRunnerInThread(FlowRunner runner) {
+ Thread thread = new Thread(runner);
+ thread.start();
+ return thread;
+ }
+
+ private void pause(long millisec) {
+ synchronized(this) {
+ try {
+ wait(millisec);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 2e708fe..911a48b 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -47,7 +47,7 @@ public class FlowRunnerTest {
}
workingDir.mkdirs();
}
- jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
pluginSet.addPluginClass("java", JavaJob.class);
pluginSet.addPluginClass("test", InteractiveTestJob.class);
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 0c9d7b9..bc31261 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -92,7 +92,7 @@ public class FlowRunnerTest2 {
FileUtils.deleteDirectory(workingDir);
}
workingDir.mkdirs();
- jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
pluginSet.addPluginClass("java", JavaJob.class);
@@ -176,16 +176,7 @@ public class FlowRunnerTest2 {
ExecutableNode node = nodeMap.get("jobb");
Assert.assertEquals(Status.RUNNING, node.getStatus());
Props jobb = node.getInputProps();
- Assert.assertEquals("test1.1", jobb.get("param1"));
- Assert.assertEquals("test1.1", jobb.get("param1"));
- Assert.assertEquals("test1.2", jobb.get("param2"));
- Assert.assertEquals("test1.3", jobb.get("param3"));
Assert.assertEquals("override.4", jobb.get("param4"));
- Assert.assertEquals("test2.5", jobb.get("param5"));
- Assert.assertEquals("test2.6", jobb.get("param6"));
- Assert.assertEquals("test2.7", jobb.get("param7"));
- Assert.assertEquals("test2.8", jobb.get("param8"));
- Assert.assertEquals("test2.8", jobb.get("param8"));
// Test that jobb properties overwrites the output properties
Assert.assertEquals("moo", jobb.get("testprops"));
Assert.assertEquals("jobb", jobb.get("output.override"));
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 6fcbb11..abee6f0 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -41,7 +41,7 @@ public class JobRunnerTest {
FileUtils.deleteDirectory(workingDir);
}
workingDir.mkdirs();
- jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
}
diff --git a/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
index a4c5cb0..9669b51 100644
--- a/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
+++ b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
@@ -52,7 +52,7 @@ public class JobTypeManagerTest {
jobTypeDir.mkdirs();
FileUtils.copyDirectory(new File("unit/plugins/jobtypes"), jobTypeDir);
- manager = new JobTypeManager(TEST_PLUGIN_DIR, this.getClass().getClassLoader());
+ manager = new JobTypeManager(TEST_PLUGIN_DIR, null, this.getClass().getClassLoader());
}
@After