azkaban-developers

Details

diff --git a/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 971f9c1..0cb7c55 100644
--- a/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -107,15 +107,6 @@ public class AzkabanExecutorServer {
 		projectLoader = createProjectLoader(props);
 		runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, this.getClass().getClassLoader());
 		
-		String globalPropsPath = props.getString("executor.global.properties", null);
-		if (globalPropsPath == null) {
-			executorGlobalProps = new Props();
-		}
-		else {
-			executorGlobalProps = new Props(null, globalPropsPath);
-		}
-		runnerManager.setGlobalProps(executorGlobalProps);
-		
 		configureMBeanServer();
 
 		try {
diff --git a/src/main/java/azkaban/execapp/FlowRunner.java b/src/main/java/azkaban/execapp/FlowRunner.java
index 0e1c198..c80ea63 100644
--- a/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/src/main/java/azkaban/execapp/FlowRunner.java
@@ -88,8 +88,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	// Properties map
 	private Map<String, Props> sharedProps = new HashMap<String, Props>();
-	
-	private Props globalProps;
 	private final JobTypeManager jobtypeManager;
 	
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
@@ -167,11 +165,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return this;
 	}
 	
-	public FlowRunner setGlobalProps(Props globalProps) {
-		this.globalProps = globalProps;
-		return this;
-	}
-	
 	public FlowRunner setNumJobThreads(int jobs) {
 		numJobThreads = jobs;
 		return this;
@@ -239,7 +232,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		String flowId = flow.getFlowId();
 		
 		// Add a bunch of common azkaban properties
-		Props commonFlowProps = PropsUtils.addCommonFlowProperties(this.globalProps, flow);
+		Props commonFlowProps = PropsUtils.addCommonFlowProperties(null, flow);
 		
 		if (flow.getJobSource() != null) {
 			String source = flow.getJobSource();
diff --git a/src/main/java/azkaban/execapp/FlowRunnerManager.java b/src/main/java/azkaban/execapp/FlowRunnerManager.java
index a9e2059..7b75756 100644
--- a/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -84,7 +84,7 @@ public class FlowRunnerManager implements EventListener {
 	
 	private JobTypeManager jobtypeManager;
 	
-	private Props globalProps;
+	private Props globalProps = null;
 	
 	private final Props azkabanProps;
 	
@@ -139,7 +139,15 @@ public class FlowRunnerManager implements EventListener {
 		cleanerThread = new CleanerThread();
 		cleanerThread.start();
 		
-		jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
+		String globalPropsPath = props.getString("executor.global.properties", null);
+		if (globalPropsPath != null) {
+			globalProps = new Props(null, globalPropsPath);
+		}
+		
+		jobtypeManager = new JobTypeManager(
+				props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR),
+				globalProps,
+				parentClassLoader);
 	}
 
 	private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
@@ -406,7 +414,6 @@ public class FlowRunnerManager implements EventListener {
 		runner.setFlowWatcher(watcher)
 			.setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
 			.setValidateProxyUser(validateProxyUser)
-			.setGlobalProps(globalProps)
 			.setNumJobThreads(numJobThreads)
 			.addListener(this);
 		
diff --git a/src/main/java/azkaban/jobtype/JobTypeManager.java b/src/main/java/azkaban/jobtype/JobTypeManager.java
index df9627e..58344c4 100644
--- a/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -50,14 +50,16 @@ public class JobTypeManager
 	private static final Logger logger = Logger.getLogger(JobTypeManager.class);
 	
 	private JobTypePluginSet pluginSet;
-
-	public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader) {
+	private Props globalProperties;
+	
+	public JobTypeManager(String jobtypePluginDir, Props globalProperties, ClassLoader parentClassLoader) {
 		this.jobTypePluginDir = jobtypePluginDir;
 		this.parentLoader = parentClassLoader;
+		this.globalProperties = globalProperties;
 		
 		loadPlugins();
 	}
-
+	
 	public void loadPlugins() throws JobTypeManagerException {
 		JobTypePluginSet plugins = new JobTypePluginSet();
 		
@@ -113,7 +115,7 @@ public class JobTypeManager
 		if (commonJobPropsFile.exists()) {
 			logger.info("Common plugin job props file " + commonJobPropsFile + " found. Attempt to load.");
 			try {
-				commonPluginJobProps = new Props(null, commonJobPropsFile);
+				commonPluginJobProps = new Props(globalProperties, commonJobPropsFile);
 			}
 			catch (IOException e) {
 				throw new JobTypeManagerException("Failed to load common plugin job properties" + e.getCause());
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index cc3b1e5..fb13d04 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -36,7 +36,7 @@ public class LocalFlowWatcherTest {
 	
 	@Before
 	public void setUp() throws Exception {
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 	}
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index d8e94dd..3764a81 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -37,7 +37,7 @@ public class RemoteFlowWatcherTest {
 	
 	@Before
 	public void setUp() throws Exception {
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 	}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index df9cac1..49bb4b5 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -73,7 +73,7 @@ public class FlowRunnerPipelineTest {
 			FileUtils.deleteDirectory(workingDir);
 		}
 		workingDir.mkdirs();
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
 		
 		pluginSet.addPluginClass("java", JavaJob.class);
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java
index afddd0c..24ac2cc 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPropertyResolutionTest.java
@@ -64,7 +64,7 @@ public class FlowRunnerPropertyResolutionTest {
 			FileUtils.deleteDirectory(workingDir);
 		}
 		workingDir.mkdirs();
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 		jobtypeManager.getJobTypePluginSet().addPluginClass("test", InteractiveTestJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 2e708fe..911a48b 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -47,7 +47,7 @@ public class FlowRunnerTest {
 			}
 			workingDir.mkdirs();
 		}
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
 		pluginSet.addPluginClass("java", JavaJob.class);
 		pluginSet.addPluginClass("test", InteractiveTestJob.class);
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 0c9d7b9..8635f46 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -92,7 +92,7 @@ public class FlowRunnerTest2 {
 			FileUtils.deleteDirectory(workingDir);
 		}
 		workingDir.mkdirs();
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
 		
 		pluginSet.addPluginClass("java", JavaJob.class);
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 6fcbb11..abee6f0 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -41,7 +41,7 @@ public class JobRunnerTest {
 			FileUtils.deleteDirectory(workingDir);
 		}
 		workingDir.mkdirs();
-		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+		jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
 		
 		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 	}
diff --git a/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
index a4c5cb0..9669b51 100644
--- a/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
+++ b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
@@ -52,7 +52,7 @@ public class JobTypeManagerTest {
 		jobTypeDir.mkdirs();
 		
 		FileUtils.copyDirectory(new File("unit/plugins/jobtypes"), jobTypeDir);
-		manager = new JobTypeManager(TEST_PLUGIN_DIR, this.getClass().getClassLoader());
+		manager = new JobTypeManager(TEST_PLUGIN_DIR, null, this.getClass().getClassLoader());
 	}
 	
 	@After