azkaban-aplcache
Changes
build.gradle 8(+7 -1)
src/main/java/azkaban/jobtype/JobTypeManager.java 408(+183 -225)
src/main/java/azkaban/jobtype/JobTypePluginSet.java 145(+145 -0)
Details
build.gradle 8(+7 -1)
diff --git a/build.gradle b/build.gradle
index 168c745..b508e46 100644
--- a/build.gradle
+++ b/build.gradle
@@ -100,7 +100,8 @@ dependencies {
)
testCompile (
- [group: 'junit', name:'junit', version: '4.11']
+ [group: 'junit', name:'junit', version: '4.11'],
+ [group: 'org.hamcrest', name:'hamcrest-all', version: '1.3']
)
}
@@ -110,6 +111,11 @@ sourceSets {
srcDirs 'src/main/java', 'src/restli/generatedJava', 'src/restli/java'
}
}
+ test {
+ java {
+ srcDirs 'unit/java'
+ }
+ }
}
jar {
diff --git a/src/main/java/azkaban/execapp/ExecutorServlet.java b/src/main/java/azkaban/execapp/ExecutorServlet.java
index 75a1a08..cb8a211 100644
--- a/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -88,6 +88,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
else if (action.equals(PING_ACTION)) {
respMap.put("status", "alive");
}
+ else if (action.equals(RELOAD_JOBTYPE_PLUGINS_ACTION)) {
+ logger.info("Reloading Jobtype plugins");
+ handleReloadJobTypePlugins(respMap);
+ }
else {
int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
String user = getParam(req, USER_PARAM, null);
@@ -337,6 +341,17 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
+ private void handleReloadJobTypePlugins(Map<String, Object> respMap) throws ServletException {
+ try {
+ flowRunnerManager.reloadJobTypePlugins();
+ respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ }
+ catch (Exception e) {
+ logger.error(e);
+ respMap.put(RESPONSE_ERROR, e.getMessage());
+ }
+ }
+
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
diff --git a/src/main/java/azkaban/execapp/FlowRunnerManager.java b/src/main/java/azkaban/execapp/FlowRunnerManager.java
index f886e64..a9e2059 100644
--- a/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -47,6 +47,7 @@ import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypeManagerException;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
@@ -689,6 +690,7 @@ public class FlowRunnerManager implements EventListener {
return jobCount;
}
-
-
+ public void reloadJobTypePlugins() throws JobTypeManagerException {
+ jobtypeManager.loadPlugins();
+ }
}
diff --git a/src/main/java/azkaban/executor/ConnectorParams.java b/src/main/java/azkaban/executor/ConnectorParams.java
index c84436e..3b3f00e 100644
--- a/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/src/main/java/azkaban/executor/ConnectorParams.java
@@ -32,6 +32,7 @@ public interface ConnectorParams {
public static final String LOG_ACTION = "log";
public static final String ATTACHMENTS_ACTION = "attachments";
public static final String METADATA_ACTION = "metadata";
+ public static final String RELOAD_JOBTYPE_PLUGINS_ACTION = "reloadJobTypePlugins";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
src/main/java/azkaban/jobtype/JobTypeManager.java 408(+183 -225)
diff --git a/src/main/java/azkaban/jobtype/JobTypeManager.java b/src/main/java/azkaban/jobtype/JobTypeManager.java
index 6c275af..df9627e 100644
--- a/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -28,20 +28,18 @@ import azkaban.utils.PropsUtils;
import azkaban.utils.Utils;
import azkaban.jobExecutor.utils.JobExecutionException;
import java.io.File;
+import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.log4j.Logger;
public class JobTypeManager
{
-
- private final String jobtypePluginDir; // the dir for jobtype plugins
+ private final String jobTypePluginDir; // the dir for jobtype plugins
private final ClassLoader parentLoader;
public static final String DEFAULT_JOBTYPEPLUGINDIR = "plugins/jobtypes";
@@ -51,27 +49,25 @@ public class JobTypeManager
private static final String COMMONSYSCONFFILE = "commonprivate.properties"; // common private properties for multiple plugins
private static final Logger logger = Logger.getLogger(JobTypeManager.class);
- private Map<String, Class<? extends Job>> jobToClass;
- private Map<String, Props> jobtypeJobProps;
- private Map<String, Props> jobtypeSysProps;
+ private JobTypePluginSet pluginSet;
- public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader)
- {
- this.jobtypePluginDir = jobtypePluginDir;
+ public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader) {
+ this.jobTypePluginDir = jobtypePluginDir;
this.parentLoader = parentClassLoader;
- jobToClass = new HashMap<String, Class<? extends Job>>();
- jobtypeJobProps = new HashMap<String, Props>();
- jobtypeSysProps = new HashMap<String, Props>();
-
- loadDefaultTypes();
+ loadPlugins();
+ }
+
+ public void loadPlugins() throws JobTypeManagerException {
+ JobTypePluginSet plugins = new JobTypePluginSet();
- if(jobtypePluginDir != null) {
- File pluginDir = new File(jobtypePluginDir);
+ loadDefaultTypes(plugins);
+ if (jobTypePluginDir != null) {
+ File pluginDir = new File(jobTypePluginDir);
if (pluginDir.exists()) {
- logger.info("job type plugin directory set. Loading extra job types.");
+ logger.info("Job type plugin directory set. Loading extra job types from " + pluginDir);
try {
- loadPluginJobTypes();
+ loadPluginJobTypes(plugins);
}
catch (Exception e) {
logger.info("Plugin jobtypes failed to load. " + e.getCause());
@@ -80,186 +76,180 @@ public class JobTypeManager
}
}
+ // Swap the plugin set. If exception is thrown, then plugin isn't swapped.
+ synchronized (this) {
+ pluginSet = plugins;
+ }
}
-
- private void loadDefaultTypes() throws JobTypeManagerException{
- jobToClass.put("command", ProcessJob.class);
- jobToClass.put("javaprocess", JavaProcessJob.class);
- jobToClass.put("noop", NoopJob.class);
- jobToClass.put("python", PythonJob.class);
- jobToClass.put("ruby", RubyJob.class);
- jobToClass.put("script", ScriptJob.class);
+
+ private void loadDefaultTypes(JobTypePluginSet plugins) throws JobTypeManagerException {
+ logger.info("Loading plugin default job types");
+ plugins.addPluginClass("command", ProcessJob.class);
+ plugins.addPluginClass("javaprocess", JavaProcessJob.class);
+ plugins.addPluginClass("noop", NoopJob.class);
+ plugins.addPluginClass("python", PythonJob.class);
+ plugins.addPluginClass("ruby", RubyJob.class);
+ plugins.addPluginClass("script", ScriptJob.class);
}
// load Job Types from jobtype plugin dir
- private void loadPluginJobTypes() throws JobTypeManagerException
- {
- File jobPluginsDir = new File(jobtypePluginDir);
+ private void loadPluginJobTypes(JobTypePluginSet plugins) throws JobTypeManagerException {
+ File jobPluginsDir = new File(jobTypePluginDir);
if (!jobPluginsDir.exists()) {
+ logger.error("Job type plugin dir " + jobTypePluginDir + " doesn't exist. Will not load any external plugins.");
return;
}
-
- if (!jobPluginsDir.isDirectory()) {
- throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not a directory!");
+ else if (!jobPluginsDir.isDirectory()) {
+ throw new JobTypeManagerException("Job type plugin dir " + jobTypePluginDir + " is not a directory!");
}
-
- if (!jobPluginsDir.canRead()) {
- throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not readable!");
+ else if (!jobPluginsDir.canRead()) {
+ throw new JobTypeManagerException("Job type plugin dir " + jobTypePluginDir + " is not readable!");
}
- // look for global conf
- Props globalConf = null;
- Props globalSysConf = null;
- File confFile = findFilefromDir(jobPluginsDir, COMMONCONFFILE);
- File sysConfFile = findFilefromDir(jobPluginsDir, COMMONSYSCONFFILE);
- try {
- if(confFile != null) {
- globalConf = new Props(null, confFile);
+ // Load the common properties used by all jobs that are run
+ Props commonPluginJobProps = null;
+ File commonJobPropsFile = new File(jobPluginsDir, COMMONCONFFILE);
+ if (commonJobPropsFile.exists()) {
+ logger.info("Common plugin job props file " + commonJobPropsFile + " found. Attempt to load.");
+ try {
+ commonPluginJobProps = new Props(null, commonJobPropsFile);
}
- else {
- globalConf = new Props();
+ catch (IOException e) {
+ throw new JobTypeManagerException("Failed to load common plugin job properties" + e.getCause());
}
- if(sysConfFile != null) {
- globalSysConf = new Props(null, sysConfFile);
+ }
+ else {
+ logger.info("Common plugin job props file " + commonJobPropsFile + " not found. Using empty props.");
+ commonPluginJobProps = new Props();
+ }
+
+ // Loads the common properties used by all plugins when loading
+ Props commonPluginLoadProps = null;
+ File commonLoadPropsFile = new File(jobPluginsDir, COMMONSYSCONFFILE);
+ if (commonLoadPropsFile.exists()) {
+ logger.info("Common plugin load props file " + commonLoadPropsFile + " found. Attempt to load.");
+ try {
+ commonPluginLoadProps = new Props(null, commonLoadPropsFile);
}
- else {
- globalSysConf = new Props();
+ catch (IOException e) {
+ throw new JobTypeManagerException("Failed to load common plugin loader properties" + e.getCause());
}
}
- catch (Exception e) {
- throw new JobTypeManagerException("Failed to get global jobtype properties" + e.getCause());
+ else {
+ logger.info("Common plugin load props file " + commonLoadPropsFile + " not found. Using empty props.");
+ commonPluginLoadProps = new Props();
}
+ plugins.setCommonPluginJobProps(commonPluginJobProps);
+ plugins.setCommonPluginLoadProps(commonPluginLoadProps);
- synchronized (this) {
- ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
- try{
- for(File dir : jobPluginsDir.listFiles()) {
- if(dir.isDirectory() && dir.canRead()) {
- // get its conf file
- try {
- loadJob(dir, globalConf, globalSysConf);
- Thread.currentThread().setContextClassLoader(prevCl);
- }
- catch (Exception e) {
- logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
- throw new JobTypeManagerException(e);
- }
- }
+ // Loading job types
+ for (File dir : jobPluginsDir.listFiles()) {
+ if (dir.isDirectory() && dir.canRead()) {
+ try {
+ loadJobTypes(dir, plugins);
}
- } catch(Exception e) {
- e.printStackTrace();
- throw new JobTypeManagerException(e);
- } catch(Throwable t) {
- t.printStackTrace();
- throw new JobTypeManagerException(t);
- } finally {
- Thread.currentThread().setContextClassLoader(prevCl);
- }
- }
- }
-
- public static File findFilefromDir(File dir, String fn){
- if(dir.isDirectory()) {
- for(File f : dir.listFiles()) {
- if(f.getName().equals(fn)) {
- return f;
+ catch (Exception e) {
+ logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
+ throw new JobTypeManagerException(e);
}
}
}
- return null;
}
-// private void loadJobType(File dir, Props globalConf, Props globalSysConf) throws JobTypeManagerException{
-//
-// // look for common conf
-// Props conf = null;
-// Props sysConf = null;
-// File confFile = findFilefromDir(dir, COMMONCONFFILE);
-// File sysConfFile = findFilefromDir(dir, COMMONSYSCONFFILE);
-//
-// try {
-// if(confFile != null) {
-// conf = new Props(globalConf, confFile);
-// }
-// else {
-// conf = globalConf;
-// }
-// if(sysConfFile != null) {
-// sysConf = new Props(globalSysConf, sysConfFile);
-// }
-// else {
-// sysConf = globalSysConf;
-// }
-// }
-// catch (Exception e) {
-// throw new JobTypeManagerException("Failed to get common jobtype properties" + e.getCause());
-// }
-//
-// // look for jobtypeConf.properties and load it
-// for(File f: dir.listFiles()) {
-// if(f.isFile() && f.getName().equals(JOBTYPESYSCONFFILE)) {
-// loadJob(dir, f, conf, sysConf);
-// return;
-// }
-// }
-//
-// // no hit, keep looking
-// for(File f : dir.listFiles()) {
-// if(f.isDirectory() && f.canRead())
-// loadJobType(f, conf, sysConf);
-// }
-//
-// }
-
@SuppressWarnings("unchecked")
- private void loadJob(File dir, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
+ private void loadJobTypes(File pluginDir, JobTypePluginSet plugins) throws JobTypeManagerException {
+ // Directory is the jobtypeName
+ String jobTypeName = pluginDir.getName();
+ logger.info("Loading plugin " + jobTypeName);
+
+ Props pluginJobProps = null;
+ Props pluginLoadProps = null;
- Props conf = null;
- Props sysConf = null;
- File confFile = findFilefromDir(dir, JOBTYPECONFFILE);
- File sysConfFile = findFilefromDir(dir, JOBTYPESYSCONFFILE);
- if(sysConfFile == null) {
- logger.info("No job type found in " + dir.getAbsolutePath());
+ File pluginJobPropsFile = new File(pluginDir, JOBTYPECONFFILE);
+ File pluginLoadPropsFile = new File(pluginDir, JOBTYPESYSCONFFILE);
+
+ if (!pluginLoadPropsFile.exists()) {
+ logger.info("Plugin load props file " + pluginLoadPropsFile + " not found.");
return;
}
try {
- if(confFile != null) {
- conf = new Props(commonConf, confFile);
+ Props commonPluginJobProps = plugins.getCommonPluginJobProps();
+ Props commonPluginLoadProps = plugins.getCommonPluginLoadProps();
+ if (pluginJobPropsFile.exists()) {
+ pluginJobProps = new Props(commonPluginJobProps, pluginJobPropsFile);
}
else {
- conf = new Props(commonConf);
+ pluginJobProps = new Props(commonPluginJobProps);
}
- sysConf = new Props(commonSysConf, sysConfFile);
- sysConf = PropsUtils.resolveProps(sysConf);
-
+ pluginLoadProps = new Props(commonPluginLoadProps, pluginLoadPropsFile);
+ pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
}
catch (Exception e) {
throw new JobTypeManagerException("Failed to get jobtype properties" + e.getMessage());
}
- sysConf.put("plugin.dir", dir.getAbsolutePath());
+ // Add properties into the plugin set
+ pluginLoadProps.put("plugin.dir", pluginDir.getAbsolutePath());
+ plugins.addPluginLoadProps(jobTypeName, pluginLoadProps);
+ if (pluginJobProps != null) {
+ plugins.addPluginJobProps(jobTypeName, pluginJobProps);
+ }
- // use directory name as job type name
- String jobtypeName = dir.getName();
+ ClassLoader jobTypeLoader = loadJobTypeClassLoader(pluginDir, jobTypeName, plugins);
+ String jobtypeClass = pluginLoadProps.get("jobtype.class");
- String jobtypeClass = sysConf.get("jobtype.class");
+ Class<? extends Job> clazz = null;
+ try {
+ clazz = (Class<? extends Job>)jobTypeLoader.loadClass(jobtypeClass);
+ plugins.addPluginClass(jobTypeName, clazz);
+ }
+ catch (ClassNotFoundException e) {
+ throw new JobTypeManagerException(e);
+ }
- logger.info("Loading jobtype " + jobtypeName );
-
+ logger.info("Verifying job plugin " + jobTypeName);
+ try {
+ Props fakeSysProps = new Props(pluginLoadProps);
+ Props fakeJobProps = new Props(pluginJobProps);
+ @SuppressWarnings("unused")
+ Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
+ }
+ catch (Exception e) {
+ logger.info("Jobtype " + jobTypeName + " failed test!", e);
+ throw new JobExecutionException(e);
+ }
+ catch (Throwable t) {
+ logger.info("Jobtype " + jobTypeName + " failed test!", t);
+ throw new JobExecutionException(t);
+ }
+
+ logger.info("Loaded jobtype " + jobTypeName + " " + jobtypeClass);
+ }
+
+ /**
+ * Creates and loads all plugin resources (jars) into a ClassLoader
+ *
+ * @param pluginDir
+ * @param jobTypeName
+ * @param plugins
+ * @return
+ */
+ private ClassLoader loadJobTypeClassLoader(File pluginDir, String jobTypeName, JobTypePluginSet plugins) {
// sysconf says what jars/confs to load
- List<URL> resources = new ArrayList<URL>();
+ List<URL> resources = new ArrayList<URL>();
+ Props pluginLoadProps = plugins.getPluginLoaderProps(jobTypeName);
try {
//first global classpath
- logger.info("Adding global resources.");
- List<String> typeGlobalClassPath = sysConf.getStringList("jobtype.global.classpath", null, ",");
- if(typeGlobalClassPath != null) {
- for(String jar : typeGlobalClassPath) {
+ logger.info("Adding global resources for " + jobTypeName);
+ List<String> typeGlobalClassPath = pluginLoadProps.getStringList("jobtype.global.classpath", null, ",");
+ if (typeGlobalClassPath != null) {
+ for (String jar : typeGlobalClassPath) {
URL cpItem = new File(jar).toURI().toURL();
- if(!resources.contains(cpItem)) {
+ if (!resources.contains(cpItem)) {
logger.info("adding to classpath " + cpItem);
resources.add(cpItem);
}
@@ -268,78 +258,51 @@ public class JobTypeManager
//type specific classpath
logger.info("Adding type resources.");
- List<String> typeClassPath = sysConf.getStringList("jobtype.classpath", null, ",");
- if(typeClassPath != null) {
- for(String jar : typeClassPath) {
+ List<String> typeClassPath = pluginLoadProps.getStringList("jobtype.classpath", null, ",");
+ if (typeClassPath != null) {
+ for (String jar : typeClassPath) {
URL cpItem = new File(jar).toURI().toURL();
- if(!resources.contains(cpItem)) {
+ if (!resources.contains(cpItem)) {
logger.info("adding to classpath " + cpItem);
resources.add(cpItem);
}
}
}
- List<String> jobtypeLibDirs = sysConf.getStringList("jobtype.lib.dir", null, ",");
- if(jobtypeLibDirs != null) {
- for(String libDir : jobtypeLibDirs) {
- for(File f : new File(libDir).listFiles()) {
- if(f.getName().endsWith(".jar")) {
- resources.add(f.toURI().toURL());
- logger.info("adding to classpath " + f.toURI().toURL());
+ List<String> jobtypeLibDirs = pluginLoadProps.getStringList("jobtype.lib.dir", null, ",");
+ if (jobtypeLibDirs != null) {
+ for (String libDir : jobtypeLibDirs) {
+ for (File f : new File(libDir).listFiles()) {
+ if (f.getName().endsWith(".jar")) {
+ resources.add(f.toURI().toURL());
+ logger.info("adding to classpath " + f.toURI().toURL());
}
}
}
}
logger.info("Adding type override resources.");
- for(File f : dir.listFiles()) {
- if(f.getName().endsWith(".jar")) {
- resources.add(f.toURI().toURL());
- logger.info("adding to classpath " + f.toURI().toURL());
+ for (File f : pluginDir.listFiles()) {
+ if (f.getName().endsWith(".jar")) {
+ resources.add(f.toURI().toURL());
+ logger.info("adding to classpath " + f.toURI().toURL());
}
}
- } catch (MalformedURLException e) {
+ }
+ catch (MalformedURLException e) {
throw new JobTypeManagerException(e);
}
// each job type can have a different class loader
ClassLoader jobTypeLoader = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentLoader);
-
- Class<? extends Job> clazz = null;
- try {
- clazz = (Class<? extends Job>)jobTypeLoader.loadClass(jobtypeClass);
- jobToClass.put(jobtypeName, clazz);
- }
- catch (ClassNotFoundException e) {
- throw new JobTypeManagerException(e);
- }
-
- logger.info("Doing simple testing...");
- try {
- Props fakeSysProps = new Props(sysConf);
-// fakeSysProps.put("type", jobtypeName);
- Props fakeJobProps = new Props(conf);
- @SuppressWarnings("unused")
- Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
- }
- catch (Exception e) {
- logger.info("Jobtype " + jobtypeName + " failed test!", e);
- throw new JobExecutionException(e);
- }
- catch (Throwable t) {
- logger.info("Jobtype " + jobtypeName + " failed test!", t);
- throw new JobExecutionException(t);
- }
-
- logger.info("Loaded jobtype " + jobtypeName + " " + jobtypeClass);
-
- if(conf != null) jobtypeJobProps.put(jobtypeName, conf);
- jobtypeSysProps.put(jobtypeName, sysConf);
-
+ return jobTypeLoader;
}
- public Job buildJobExecutor(String jobId, Props jobProps, Logger logger)
- throws JobTypeManagerException {
+ public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException {
+ // This is final because during build phase, you should never need to swap
+ // the pluginSet for safety reasons
+ final JobTypePluginSet pluginSet = getJobTypePluginSet();
+
Job job = null;
try {
String jobType = jobProps.getString("type");
@@ -352,44 +315,36 @@ public class JobTypeManager
logger.info("Building " + jobType + " job executor. ");
- Class<? extends Object> executorClass = jobToClass.get(jobType);
-
+ Class<? extends Object> executorClass = pluginSet.getPluginClass(jobType);
if (executorClass == null) {
throw new JobExecutionException(
String.format("Job type '" + jobType + "' is unrecognized. Could not construct job[%s] of type[%s].", jobProps, jobType));
}
- Props sysConf = jobtypeSysProps.get(jobType);
-
- Props jobConf = jobProps;
- if (jobtypeJobProps.containsKey(jobType)) {
- Props p = jobtypeJobProps.get(jobType);
- for (String k : p.getKeySet()) {
- if (!jobConf.containsKey(k)) {
- jobConf.put(k, p.get(k));
+ Props pluginJobProps = pluginSet.getPluginJobProps(jobType);
+ if (pluginJobProps != null) {
+ for (String k : pluginJobProps.getKeySet()) {
+ if (!jobProps.containsKey(k)) {
+ jobProps.put(k, pluginJobProps.get(k));
}
}
}
- jobConf = PropsUtils.resolveProps(jobConf);
+ jobProps = PropsUtils.resolveProps(jobProps);
- if (sysConf != null) {
- sysConf = PropsUtils.resolveProps(sysConf);
+ Props pluginLoadProps = pluginSet.getPluginLoaderProps(jobType);
+ if (pluginLoadProps != null) {
+ pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
}
else {
- sysConf = new Props();
+ pluginLoadProps = new Props();
}
-// logger.info("sysConf is " + sysConf);
-// logger.info("jobConf is " + jobConf);
-//
job = (Job) Utils.callConstructor(
- executorClass, jobId, sysConf, jobConf, logger);
+ executorClass, jobId, pluginLoadProps, jobProps, logger);
}
catch (Exception e) {
- //job = new InitErrorJob(jobId, e);
logger.error("Failed to build job executor for job " + jobId + e.getMessage());
throw new JobTypeManagerException("Failed to build job executor for job " + jobId, e);
- //throw new JobTypeManagerException(e);
}
catch (Throwable t) {
logger.error("Failed to build job executor for job " + jobId + t.getMessage(), t);
@@ -398,9 +353,12 @@ public class JobTypeManager
return job;
}
-
- public void registerJobType(String typeName, Class<? extends Job> jobTypeClass) {
- jobToClass.put(typeName, jobTypeClass);
+
+ /**
+ * Public for test reasons. Will need to move tests to the same package
+ */
+ public synchronized JobTypePluginSet getJobTypePluginSet() {
+ return this.pluginSet;
}
}
src/main/java/azkaban/jobtype/JobTypePluginSet.java 145(+145 -0)
diff --git a/src/main/java/azkaban/jobtype/JobTypePluginSet.java b/src/main/java/azkaban/jobtype/JobTypePluginSet.java
new file mode 100644
index 0000000..9e6ded2
--- /dev/null
+++ b/src/main/java/azkaban/jobtype/JobTypePluginSet.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobtype;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.jobExecutor.Job;
+import azkaban.utils.Props;
+
+/**
+ * Container for job type plugins
+ *
+ * This contains the jobClass objects, the properties for loading plugins, and the
+ * properties given by default to the plugin.
+ *
+ * This class is not thread safe, so adding to this class should only be populated
+ * and controlled by the JobTypeManager
+ */
+public class JobTypePluginSet {
+ private Map<String, Class<? extends Job>> jobToClass;
+ private Map<String, Props> pluginJobPropsMap;
+ private Map<String, Props> pluginLoadPropsMap;
+
+ private Props commonJobProps;
+ private Props commonLoadProps;
+
+ /**
+ * Base constructor
+ */
+ public JobTypePluginSet() {
+ jobToClass = new HashMap<String, Class<? extends Job>>();
+ pluginJobPropsMap = new HashMap<String, Props>();
+ pluginLoadPropsMap = new HashMap<String, Props>();
+ }
+
+ /**
+ * Copy constructor
+ * @param clone
+ */
+ public JobTypePluginSet(JobTypePluginSet clone) {
+ jobToClass = new HashMap<String, Class<? extends Job>>(clone.jobToClass);
+ pluginJobPropsMap = new HashMap<String, Props>(clone.pluginJobPropsMap);
+ pluginLoadPropsMap = new HashMap<String, Props>(clone.pluginLoadPropsMap);
+ commonJobProps = clone.commonJobProps;
+ commonLoadProps = clone.commonLoadProps;
+ }
+
+ /**
+ * Sets the common properties shared in every jobtype
+ * @param commonJobProps
+ */
+ public void setCommonPluginJobProps(Props commonJobProps) {
+ this.commonJobProps = commonJobProps;
+ }
+
+ /**
+ * Sets the common properties used to load every plugin
+ * @param commonLoadProps
+ */
+ public void setCommonPluginLoadProps(Props commonLoadProps) {
+ this.commonLoadProps = commonLoadProps;
+ }
+
+ /**
+ * Gets common properties for every jobtype
+ * @return
+ */
+ public Props getCommonPluginJobProps() {
+ return commonJobProps;
+ }
+
+ /**
+ * Gets the common properties used to load a plugin
+ * @return
+ */
+ public Props getCommonPluginLoadProps() {
+ return commonLoadProps;
+ }
+
+ /**
+ * Get the properties for a jobtype used to setup and load a plugin
+ *
+ * @param jobTypeName
+ * @return
+ */
+ public Props getPluginLoaderProps(String jobTypeName) {
+ return pluginLoadPropsMap.get(jobTypeName);
+ }
+
+ /**
+ * Get the properties that will be given to the plugin as default job
+ * properties.
+ *
+ * @param jobTypeName
+ * @return
+ */
+ public Props getPluginJobProps(String jobTypeName) {
+ return pluginJobPropsMap.get(jobTypeName);
+ }
+
+ /**
+ * Gets the plugin job runner class
+ *
+ * @param jobTypeName
+ * @return
+ */
+ public Class<? extends Job> getPluginClass(String jobTypeName) {
+ return jobToClass.get(jobTypeName);
+ }
+
+ /**
+ * Adds plugin jobtype class
+ */
+ public void addPluginClass(String jobTypeName, Class<? extends Job> jobTypeClass) {
+ jobToClass.put(jobTypeName, jobTypeClass);
+ }
+
+ /**
+ * Adds plugin job properties used as default runtime properties
+ */
+ public void addPluginJobProps(String jobTypeName, Props props) {
+ pluginJobPropsMap.put(jobTypeName, props);
+ }
+
+ /**
+ * Adds plugin load properties used to load the plugin
+ */
+ public void addPluginLoadProps(String jobTypeName, Props props) {
+ pluginLoadPropsMap.put(jobTypeName, props);
+ }
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index c87f2c6..cc3b1e5 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -37,7 +37,7 @@ public class LocalFlowWatcherTest {
@Before
public void setUp() throws Exception {
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
+ jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
}
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index 45c3a85..d8e94dd 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -38,7 +38,7 @@ public class RemoteFlowWatcherTest {
@Before
public void setUp() throws Exception {
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
+ jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index aa1eee5..df9cac1 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -23,6 +23,7 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
@@ -73,8 +74,10 @@ public class FlowRunnerPipelineTest {
}
workingDir.mkdirs();
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
- jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+ JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+
+ pluginSet.addPluginClass("java", JavaJob.class);
+ pluginSet.addPluginClass("test", InteractiveTestJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
fakeExecutorLoader = new MockExecutorLoader();
project = new Project(1, "testProject");
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 21b43f1..2e708fe 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -21,8 +21,10 @@ import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.test.execapp.MockProjectLoader;
import azkaban.test.executor.InteractiveTestJob;
import azkaban.test.executor.JavaJob;
import azkaban.utils.JSONUtils;
@@ -46,8 +48,9 @@ public class FlowRunnerTest {
workingDir.mkdirs();
}
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
- jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+ JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+ pluginSet.addPluginClass("java", JavaJob.class);
+ pluginSet.addPluginClass("test", InteractiveTestJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
InteractiveTestJob.clearTestJobs();
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 7a67da6..0c9d7b9 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -21,6 +21,7 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
@@ -92,8 +93,10 @@ public class FlowRunnerTest2 {
}
workingDir.mkdirs();
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
- jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+ JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+
+ pluginSet.addPluginClass("java", JavaJob.class);
+ pluginSet.addPluginClass("test", InteractiveTestJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
fakeExecutorLoader = new MockExecutorLoader();
project = new Project(1, "testProject");
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 69d6296..6fcbb11 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -42,7 +42,8 @@ public class JobRunnerTest {
}
workingDir.mkdirs();
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
+
+ jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
}
@After
diff --git a/unit/java/azkaban/test/jobtype/FakeJavaJob.java b/unit/java/azkaban/test/jobtype/FakeJavaJob.java
new file mode 100644
index 0000000..65ca1ba
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/FakeJavaJob.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import org.apache.log4j.Logger;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.utils.Props;
+
+public class FakeJavaJob extends JavaProcessJob {
+ public FakeJavaJob(String jobid, Props sysProps, Props jobProps, Logger log) {
+ super(jobid, sysProps, jobProps, log);
+ }
+}
+
diff --git a/unit/java/azkaban/test/jobtype/FakeJavaJob2.java b/unit/java/azkaban/test/jobtype/FakeJavaJob2.java
new file mode 100644
index 0000000..581aeff
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/FakeJavaJob2.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import org.apache.log4j.Logger;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.utils.Props;
+
+public class FakeJavaJob2 extends JavaProcessJob {
+ public FakeJavaJob2(String jobid, Props sysProps, Props jobProps, Logger log) {
+ super(jobid, sysProps, jobProps, log);
+ }
+}
+
diff --git a/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
new file mode 100644
index 0000000..a4c5cb0
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.jobExecutor.Job;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
+import azkaban.utils.Props;
+
+
+/**
+ * Test the flow run, especially with embedded flows.
+ * Files are in unit/plugins/jobtypes
+ *
+ */
+public class JobTypeManagerTest {
+ public static String TEST_PLUGIN_DIR = "jobtypes_test";
+ private Logger logger = Logger.getLogger(JobTypeManagerTest.class);
+ private JobTypeManager manager;
+
+ public JobTypeManagerTest() {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ File jobTypeDir = new File(TEST_PLUGIN_DIR);
+ jobTypeDir.mkdirs();
+
+ FileUtils.copyDirectory(new File("unit/plugins/jobtypes"), jobTypeDir);
+ manager = new JobTypeManager(TEST_PLUGIN_DIR, this.getClass().getClassLoader());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.deleteDirectory(new File(TEST_PLUGIN_DIR));
+ }
+
+ /**
+ * Tests that the common and common private properties are loaded correctly
+ * @throws Exception
+ */
+ @Test
+ public void testCommonPluginProps() throws Exception {
+ JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+
+ Props props = pluginSet.getCommonPluginJobProps();
+ System.out.println(props.toString());
+ assertEquals("commonprop1", props.getString("commonprop1"));
+ assertEquals("commonprop2", props.getString("commonprop2"));
+ assertEquals("commonprop3", props.getString("commonprop3"));
+
+ Props priv = pluginSet.getCommonPluginLoadProps();
+ assertEquals("commonprivate1", priv.getString("commonprivate1"));
+ assertEquals("commonprivate2", priv.getString("commonprivate2"));
+ assertEquals("commonprivate3", priv.getString("commonprivate3"));
+ }
+
+ /**
+ * Tests that the proper classes were loaded and that the common and the load
+ * properties are properly loaded.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLoadedClasses() throws Exception {
+ JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+
+ Props props = pluginSet.getCommonPluginJobProps();
+ System.out.println(props.toString());
+ assertEquals("commonprop1", props.getString("commonprop1"));
+ assertEquals("commonprop2", props.getString("commonprop2"));
+ assertEquals("commonprop3", props.getString("commonprop3"));
+ assertNull(props.get("commonprivate1"));
+
+ Props priv = pluginSet.getCommonPluginLoadProps();
+ assertEquals("commonprivate1", priv.getString("commonprivate1"));
+ assertEquals("commonprivate2", priv.getString("commonprivate2"));
+ assertEquals("commonprivate3", priv.getString("commonprivate3"));
+
+ // Testing the anothertestjobtype
+ Class<? extends Job> aPluginClass = pluginSet.getPluginClass("anothertestjob");
+ assertEquals("azkaban.test.jobtype.FakeJavaJob", aPluginClass.getName());
+ Props ajobProps = pluginSet.getPluginJobProps("anothertestjob");
+ Props aloadProps = pluginSet.getPluginLoaderProps("anothertestjob");
+
+ // Loader props
+ assertEquals("lib/*", aloadProps.get("jobtype.classpath"));
+ assertEquals("azkaban.test.jobtype.FakeJavaJob", aloadProps.get("jobtype.class"));
+ assertEquals("commonprivate1", aloadProps.get("commonprivate1"));
+ assertEquals("commonprivate2", aloadProps.get("commonprivate2"));
+ assertEquals("commonprivate3", aloadProps.get("commonprivate3"));
+ // Job props
+ assertEquals("commonprop1", ajobProps.get("commonprop1"));
+ assertEquals("commonprop2", ajobProps.get("commonprop2"));
+ assertEquals("commonprop3", ajobProps.get("commonprop3"));
+ assertNull(ajobProps.get("commonprivate1"));
+
+ Class<? extends Job> tPluginClass = pluginSet.getPluginClass("testjob");
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", tPluginClass.getName());
+ Props tjobProps = pluginSet.getPluginJobProps("testjob");
+ Props tloadProps = pluginSet.getPluginLoaderProps("testjob");
+
+ // Loader props
+ assertNull(tloadProps.get("jobtype.classpath"));
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", tloadProps.get("jobtype.class"));
+ assertEquals("commonprivate1", tloadProps.get("commonprivate1"));
+ assertEquals("commonprivate2", tloadProps.get("commonprivate2"));
+ assertEquals("private3", tloadProps.get("commonprivate3"));
+ assertEquals("0", tloadProps.get("testprivate"));
+ // Job props
+ assertEquals("commonprop1", tjobProps.get("commonprop1"));
+ assertEquals("commonprop2", tjobProps.get("commonprop2"));
+ assertEquals("1", tjobProps.get("pluginprops1"));
+ assertEquals("2", tjobProps.get("pluginprops2"));
+ assertEquals("3", tjobProps.get("pluginprops3"));
+ assertEquals("pluginprops", tjobProps.get("commonprop3"));
+ // Testing that the private properties aren't shared with the public ones
+ assertNull(tjobProps.get("commonprivate1"));
+ assertNull(tjobProps.get("testprivate"));
+ }
+
+ /**
+ * Test building classes
+ * @throws Exception
+ */
+ @Test
+ public void testBuildClass() throws Exception {
+ Props jobProps = new Props();
+ jobProps.put("type", "anothertestjob");
+ jobProps.put("test","test1");
+ jobProps.put("pluginprops3","4");
+ Job job = manager.buildJobExecutor("anothertestjob", jobProps, logger);
+
+ assertTrue(job instanceof FakeJavaJob);
+ FakeJavaJob fjj = (FakeJavaJob)job;
+
+ Props props = fjj.getJobProps();
+ assertEquals("test1", props.get("test"));
+ assertNull(props.get("pluginprops1"));
+ assertEquals("4", props.get("pluginprops3"));
+ assertEquals("commonprop1", props.get("commonprop1"));
+ assertEquals("commonprop2", props.get("commonprop2"));
+ assertEquals("commonprop3", props.get("commonprop3"));
+ assertNull(props.get("commonprivate1"));
+ }
+
+ /**
+ * Test building classes 2
+ * @throws Exception
+ */
+ @Test
+ public void testBuildClass2() throws Exception {
+ Props jobProps = new Props();
+ jobProps.put("type", "testjob");
+ jobProps.put("test","test1");
+ jobProps.put("pluginprops3","4");
+ Job job = manager.buildJobExecutor("testjob", jobProps, logger);
+
+ assertTrue(job instanceof FakeJavaJob2);
+ FakeJavaJob2 fjj = (FakeJavaJob2)job;
+
+ Props props = fjj.getJobProps();
+ assertEquals("test1", props.get("test"));
+ assertEquals("1", props.get("pluginprops1"));
+ assertEquals("2", props.get("pluginprops2"));
+ assertEquals("4", props.get("pluginprops3")); // Overridden value
+ assertEquals("commonprop1", props.get("commonprop1"));
+ assertEquals("commonprop2", props.get("commonprop2"));
+ assertEquals("pluginprops", props.get("commonprop3"));
+ assertNull(props.get("commonprivate1"));
+ }
+
+ /**
+ * Test out reloading properties
+ * @throws Exception
+ */
+ @Test
+ public void testResetPlugins() throws Exception {
+ // Add a plugins file to the anothertestjob folder
+ File anothertestfolder = new File(TEST_PLUGIN_DIR + "/anothertestjob");
+ Props pluginProps = new Props();
+ pluginProps.put("test1", "1");
+ pluginProps.put("test2", "2");
+ pluginProps.put("pluginprops3","4");
+ pluginProps.storeFlattened(new File(anothertestfolder, "plugin.properties"));
+
+ // clone the testjob folder
+ File testFolder = new File(TEST_PLUGIN_DIR + "/testjob");
+ FileUtils.copyDirectory(testFolder, new File(TEST_PLUGIN_DIR + "/newtestjob"));
+
+ // change the common properties
+ Props commonPlugin = new Props(null, TEST_PLUGIN_DIR + "/common.properties");
+ commonPlugin.put("commonprop1", "1");
+ commonPlugin.put("newcommonprop1", "2");
+ commonPlugin.removeLocal("commonprop2");
+ commonPlugin.storeFlattened(new File(TEST_PLUGIN_DIR + "/common.properties"));
+
+ // change the common properties
+ Props commonPrivate = new Props(null, TEST_PLUGIN_DIR + "/commonprivate.properties");
+ commonPrivate.put("commonprivate1", "1");
+ commonPrivate.put("newcommonprivate1", "2");
+ commonPrivate.removeLocal("commonprivate2");
+ commonPrivate.storeFlattened(new File(TEST_PLUGIN_DIR + "/commonprivate.properties"));
+
+ // change testjob private property
+ Props loadProps = new Props(null, TEST_PLUGIN_DIR + "/testjob/private.properties");
+ loadProps.put("privatetest", "test");
+
+ /*
+ * Reload the plugins here!!
+ */
+ manager.loadPlugins();
+
+ // Checkout common props
+ JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+ Props commonProps = pluginSet.getCommonPluginJobProps();
+ assertEquals("1", commonProps.get("commonprop1"));
+ assertEquals("commonprop3", commonProps.get("commonprop3"));
+ assertEquals("2", commonProps.get("newcommonprop1"));
+ assertNull(commonProps.get("commonprop2"));
+
+ // Checkout common private
+ Props commonPrivateProps = pluginSet.getCommonPluginLoadProps();
+ assertEquals("1", commonPrivateProps.get("commonprivate1"));
+ assertEquals("commonprivate3", commonPrivateProps.get("commonprivate3"));
+ assertEquals("2", commonPrivateProps.get("newcommonprivate1"));
+ assertNull(commonPrivateProps.get("commonprivate2"));
+
+ // Verify anothertestjob changes
+ Class<? extends Job> atjClass = pluginSet.getPluginClass("anothertestjob");
+ assertEquals("azkaban.test.jobtype.FakeJavaJob", atjClass.getName());
+ Props ajobProps = pluginSet.getPluginJobProps("anothertestjob");
+ assertEquals("1", ajobProps.get("test1"));
+ assertEquals("2", ajobProps.get("test2"));
+ assertEquals("4", ajobProps.get("pluginprops3"));
+ assertEquals("commonprop3", ajobProps.get("commonprop3"));
+
+ Props aloadProps = pluginSet.getPluginLoaderProps("anothertestjob");
+ assertEquals("1", aloadProps.get("commonprivate1"));
+ assertNull(aloadProps.get("commonprivate2"));
+ assertEquals("commonprivate3", aloadProps.get("commonprivate3"));
+
+ // Verify testjob changes
+ Class<? extends Job> tjClass = pluginSet.getPluginClass("testjob");
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", tjClass.getName());
+ Props tjobProps = pluginSet.getPluginJobProps("testjob");
+ assertEquals("1", tjobProps.get("commonprop1"));
+ assertEquals("2", tjobProps.get("newcommonprop1"));
+ assertEquals("1", tjobProps.get("pluginprops1"));
+ assertEquals("2", tjobProps.get("pluginprops2"));
+ assertEquals("3", tjobProps.get("pluginprops3"));
+ assertEquals("pluginprops", tjobProps.get("commonprop3"));
+ assertNull(tjobProps.get("commonprop2"));
+
+ Props tloadProps = pluginSet.getPluginLoaderProps("testjob");
+ assertNull(tloadProps.get("jobtype.classpath"));
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", tloadProps.get("jobtype.class"));
+ assertEquals("1", tloadProps.get("commonprivate1"));
+ assertNull(tloadProps.get("commonprivate2"));
+ assertEquals("private3", tloadProps.get("commonprivate3"));
+
+ // Verify newtestjob
+ Class<? extends Job> ntPluginClass = pluginSet.getPluginClass("newtestjob");
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", ntPluginClass.getName());
+ Props ntjobProps = pluginSet.getPluginJobProps("newtestjob");
+ Props ntloadProps = pluginSet.getPluginLoaderProps("newtestjob");
+
+ // Loader props
+ assertNull(ntloadProps.get("jobtype.classpath"));
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", ntloadProps.get("jobtype.class"));
+ assertEquals("1", ntloadProps.get("commonprivate1"));
+ assertNull(ntloadProps.get("commonprivate2"));
+ assertEquals("private3", ntloadProps.get("commonprivate3"));
+ assertEquals("0", ntloadProps.get("testprivate"));
+ // Job props
+ assertEquals("1", ntjobProps.get("commonprop1"));
+ assertNull(ntjobProps.get("commonprop2"));
+ assertEquals("1", ntjobProps.get("pluginprops1"));
+ assertEquals("2", ntjobProps.get("pluginprops2"));
+ assertEquals("3", ntjobProps.get("pluginprops3"));
+ assertEquals("pluginprops", ntjobProps.get("commonprop3"));
+ }
+}
diff --git a/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar b/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar
new file mode 100644
index 0000000..51eb0de
Binary files /dev/null and b/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar differ
diff --git a/unit/plugins/jobtypes/anothertestjob/private.properties b/unit/plugins/jobtypes/anothertestjob/private.properties
new file mode 100644
index 0000000..8e95c94
--- /dev/null
+++ b/unit/plugins/jobtypes/anothertestjob/private.properties
@@ -0,0 +1,2 @@
+jobtype.classpath=lib/*
+jobtype.class=azkaban.test.jobtype.FakeJavaJob
diff --git a/unit/plugins/jobtypes/common.properties b/unit/plugins/jobtypes/common.properties
new file mode 100644
index 0000000..2823a83
--- /dev/null
+++ b/unit/plugins/jobtypes/common.properties
@@ -0,0 +1,3 @@
+commonprop1=commonprop1
+commonprop2=commonprop2
+commonprop3=commonprop3
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/commonprivate.properties b/unit/plugins/jobtypes/commonprivate.properties
new file mode 100644
index 0000000..7d46d43
--- /dev/null
+++ b/unit/plugins/jobtypes/commonprivate.properties
@@ -0,0 +1,3 @@
+commonprivate1=commonprivate1
+commonprivate2=commonprivate2
+commonprivate3=commonprivate3
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/testjob/fakejobtype.jar b/unit/plugins/jobtypes/testjob/fakejobtype.jar
new file mode 100644
index 0000000..51eb0de
Binary files /dev/null and b/unit/plugins/jobtypes/testjob/fakejobtype.jar differ
diff --git a/unit/plugins/jobtypes/testjob/plugin.properties b/unit/plugins/jobtypes/testjob/plugin.properties
new file mode 100644
index 0000000..587081a
--- /dev/null
+++ b/unit/plugins/jobtypes/testjob/plugin.properties
@@ -0,0 +1,4 @@
+pluginprops1=1
+pluginprops2=2
+pluginprops3=3
+commonprop3=pluginprops
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/testjob/private.properties b/unit/plugins/jobtypes/testjob/private.properties
new file mode 100644
index 0000000..2bba593
--- /dev/null
+++ b/unit/plugins/jobtypes/testjob/private.properties
@@ -0,0 +1,3 @@
+jobtype.class=azkaban.test.jobtype.FakeJavaJob2
+commonprivate3=private3
+testprivate=0
\ No newline at end of file