azkaban-developers

Merge pull request #222 from rbpark/jobtypeupdate Refactoring

4/24/2014 8:06:18 PM

Details

build.gradle 8(+7 -1)

diff --git a/build.gradle b/build.gradle
index 168c745..b508e46 100644
--- a/build.gradle
+++ b/build.gradle
@@ -100,7 +100,8 @@ dependencies {
   )
 
   testCompile (
-    [group: 'junit', name:'junit', version: '4.11']
+    [group: 'junit', name:'junit', version: '4.11'],
+    [group: 'org.hamcrest', name:'hamcrest-all', version: '1.3']
   )
 }
 
@@ -110,6 +111,11 @@ sourceSets {
             srcDirs 'src/main/java', 'src/restli/generatedJava', 'src/restli/java'
         }
     }
+    test {
+        java {
+            srcDirs 'unit/java'
+        }
+    }
 }
 
 jar {
diff --git a/src/main/java/azkaban/execapp/ExecutorServlet.java b/src/main/java/azkaban/execapp/ExecutorServlet.java
index 75a1a08..cb8a211 100644
--- a/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -88,6 +88,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 				else if (action.equals(PING_ACTION)) {
 					respMap.put("status", "alive");
 				}
+				else if (action.equals(RELOAD_JOBTYPE_PLUGINS_ACTION)) {
+					logger.info("Reloading Jobtype plugins");
+					handleReloadJobTypePlugins(respMap);
+				}
 				else {
 					int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
 					String user = getParam(req, USER_PARAM, null);
@@ -337,6 +341,17 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		}
 	}
 	
+	private void handleReloadJobTypePlugins(Map<String, Object> respMap) throws ServletException {
+		try {
+			flowRunnerManager.reloadJobTypePlugins();
+			respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+		}
+		catch (Exception e) {
+			logger.error(e);
+			respMap.put(RESPONSE_ERROR, e.getMessage());
+		}
+	}
+	
 	@Override
 	public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		
diff --git a/src/main/java/azkaban/execapp/FlowRunnerManager.java b/src/main/java/azkaban/execapp/FlowRunnerManager.java
index f886e64..a9e2059 100644
--- a/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -47,6 +47,7 @@ import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypeManagerException;
 
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
@@ -689,6 +690,7 @@ public class FlowRunnerManager implements EventListener {
 		return jobCount;
 	}
 
-	
-	
+	public void reloadJobTypePlugins() throws JobTypeManagerException {
+		jobtypeManager.loadPlugins();
+	}
 }
diff --git a/src/main/java/azkaban/executor/ConnectorParams.java b/src/main/java/azkaban/executor/ConnectorParams.java
index c84436e..3b3f00e 100644
--- a/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/src/main/java/azkaban/executor/ConnectorParams.java
@@ -32,6 +32,7 @@ public interface ConnectorParams {
 	public static final String LOG_ACTION = "log";
 	public static final String ATTACHMENTS_ACTION = "attachments";
 	public static final String METADATA_ACTION = "metadata";
+	public static final String RELOAD_JOBTYPE_PLUGINS_ACTION = "reloadJobTypePlugins";
 	
 	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
 	public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
diff --git a/src/main/java/azkaban/jobtype/JobTypeManager.java b/src/main/java/azkaban/jobtype/JobTypeManager.java
index 6c275af..df9627e 100644
--- a/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -28,20 +28,18 @@ import azkaban.utils.PropsUtils;
 import azkaban.utils.Utils;
 import azkaban.jobExecutor.utils.JobExecutionException;
 import java.io.File;
+import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.log4j.Logger;
 
 public class JobTypeManager 
 {
-	
-	private final String jobtypePluginDir;	// the dir for jobtype plugins
+	private final String jobTypePluginDir;	// the dir for jobtype plugins
 	private final ClassLoader parentLoader; 
 	
 	public static final String DEFAULT_JOBTYPEPLUGINDIR = "plugins/jobtypes";
@@ -51,27 +49,25 @@ public class JobTypeManager
 	private static final String COMMONSYSCONFFILE = "commonprivate.properties"; // common private properties for multiple plugins
 	private static final Logger logger = Logger.getLogger(JobTypeManager.class);
 	
-	private Map<String, Class<? extends Job>> jobToClass;
-	private Map<String, Props> jobtypeJobProps;
-	private Map<String, Props> jobtypeSysProps;
+	private JobTypePluginSet pluginSet;
 
-	public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader)
-	{
-		this.jobtypePluginDir = jobtypePluginDir;
+	public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader) {
+		this.jobTypePluginDir = jobtypePluginDir;
 		this.parentLoader = parentClassLoader;
 		
-		jobToClass = new HashMap<String, Class<? extends Job>>();
-		jobtypeJobProps = new HashMap<String, Props>();
-		jobtypeSysProps = new HashMap<String, Props>();
-		
-		loadDefaultTypes();
+		loadPlugins();
+	}
+
+	public void loadPlugins() throws JobTypeManagerException {
+		JobTypePluginSet plugins = new JobTypePluginSet();
 		
-		if(jobtypePluginDir != null) {
-			File pluginDir = new File(jobtypePluginDir);
+		loadDefaultTypes(plugins);
+		if (jobTypePluginDir != null) {
+			File pluginDir = new File(jobTypePluginDir);
 			if (pluginDir.exists()) {
-			logger.info("job type plugin directory set. Loading extra job types.");
+				logger.info("Job type plugin directory set. Loading extra job types from " + pluginDir);
 				try {
-					loadPluginJobTypes();
+					loadPluginJobTypes(plugins);
 				}
 				catch (Exception e) {
 					logger.info("Plugin jobtypes failed to load. " + e.getCause());
@@ -80,186 +76,180 @@ public class JobTypeManager
 			}
 		}
 		
+		// Swap the plugin set. If exception is thrown, then plugin isn't swapped.
+		synchronized (this) {
+			pluginSet = plugins;
+		}
 	}
-
-	private void loadDefaultTypes() throws JobTypeManagerException{
-		jobToClass.put("command", ProcessJob.class);
-		jobToClass.put("javaprocess", JavaProcessJob.class);
-		jobToClass.put("noop", NoopJob.class);
-		jobToClass.put("python", PythonJob.class);
-		jobToClass.put("ruby", RubyJob.class);
-		jobToClass.put("script", ScriptJob.class);	
+	
+	private void loadDefaultTypes(JobTypePluginSet plugins) throws JobTypeManagerException {
+		logger.info("Loading plugin default job types");
+		plugins.addPluginClass("command", ProcessJob.class);
+		plugins.addPluginClass("javaprocess", JavaProcessJob.class);
+		plugins.addPluginClass("noop", NoopJob.class);
+		plugins.addPluginClass("python", PythonJob.class);
+		plugins.addPluginClass("ruby", RubyJob.class);
+		plugins.addPluginClass("script", ScriptJob.class);	
 	}
 
 	// load Job Types from jobtype plugin dir
-	private void loadPluginJobTypes() throws JobTypeManagerException
-	{
-		File jobPluginsDir = new File(jobtypePluginDir);
+	private void loadPluginJobTypes(JobTypePluginSet plugins) throws JobTypeManagerException {
+		File jobPluginsDir = new File(jobTypePluginDir);
 		
 		if (!jobPluginsDir.exists()) {
+			logger.error("Job type plugin dir " + jobTypePluginDir + " doesn't exist. Will not load any external plugins.");
 			return;
 		}
-
-		if (!jobPluginsDir.isDirectory()) {
-			throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not a directory!");
+		else if (!jobPluginsDir.isDirectory()) {
+			throw new JobTypeManagerException("Job type plugin dir " + jobTypePluginDir + " is not a directory!");
 		}
-		
-		if (!jobPluginsDir.canRead()) {
-			throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not readable!");
+		else if (!jobPluginsDir.canRead()) {
+			throw new JobTypeManagerException("Job type plugin dir " + jobTypePluginDir + " is not readable!");
 		}
 		
-		// look for global conf
-		Props globalConf = null;
-		Props globalSysConf = null;
-		File confFile = findFilefromDir(jobPluginsDir, COMMONCONFFILE);
-		File sysConfFile = findFilefromDir(jobPluginsDir, COMMONSYSCONFFILE);
-		try {
-			if(confFile != null) {
-				globalConf = new Props(null, confFile);
+		// Load the common properties used by all jobs that are run
+		Props commonPluginJobProps = null;
+		File commonJobPropsFile = new File(jobPluginsDir, COMMONCONFFILE);
+		if (commonJobPropsFile.exists()) {
+			logger.info("Common plugin job props file " + commonJobPropsFile + " found. Attempt to load.");
+			try {
+				commonPluginJobProps = new Props(null, commonJobPropsFile);
 			}
-			else {
-				globalConf = new Props();
+			catch (IOException e) {
+				throw new JobTypeManagerException("Failed to load common plugin job properties" + e.getCause());
 			}
-			if(sysConfFile != null) {
-				globalSysConf = new Props(null, sysConfFile);
+		}
+		else {
+			logger.info("Common plugin job props file " + commonJobPropsFile + " not found. Using empty props.");
+			commonPluginJobProps = new Props();
+		}
+		
+		// Loads the common properties used by all plugins when loading
+		Props commonPluginLoadProps = null;
+		File commonLoadPropsFile = new File(jobPluginsDir, COMMONSYSCONFFILE);
+		if (commonLoadPropsFile.exists()) {
+			logger.info("Common plugin load props file " + commonLoadPropsFile + " found. Attempt to load.");
+			try {
+				commonPluginLoadProps = new Props(null, commonLoadPropsFile);
 			}
-			else {
-				globalSysConf = new Props();
+			catch (IOException e) {
+				throw new JobTypeManagerException("Failed to load common plugin loader properties" + e.getCause());
 			}
 		}
-		catch (Exception e) {
-			throw new JobTypeManagerException("Failed to get global jobtype properties" + e.getCause());
+		else {
+			logger.info("Common plugin load props file " + commonLoadPropsFile + " not found. Using empty props.");
+			commonPluginLoadProps = new Props();
 		}
 		
+		plugins.setCommonPluginJobProps(commonPluginJobProps);
+		plugins.setCommonPluginLoadProps(commonPluginLoadProps);
 		
-		synchronized (this) {
-			ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
-			try{
-				for(File dir : jobPluginsDir.listFiles()) {
-					if(dir.isDirectory() && dir.canRead()) {
-						// get its conf file
-						try {
-							loadJob(dir, globalConf, globalSysConf);
-							Thread.currentThread().setContextClassLoader(prevCl);
-						}
-						catch (Exception e) {
-							logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
-							throw new JobTypeManagerException(e);
-						}
-					}
+		// Loading job types
+		for (File dir : jobPluginsDir.listFiles()) {
+			if (dir.isDirectory() && dir.canRead()) {
+				try {
+					loadJobTypes(dir, plugins);
 				}
-			} catch(Exception e) {
-				e.printStackTrace();
-				throw new JobTypeManagerException(e);
-			} catch(Throwable t) {
-				t.printStackTrace();
-				throw new JobTypeManagerException(t);
-			} finally {
-				Thread.currentThread().setContextClassLoader(prevCl);
-			}
- 		}
-	}
-	
-	public static File findFilefromDir(File dir, String fn){
-		if(dir.isDirectory()) {
-			for(File f : dir.listFiles()) {
-				if(f.getName().equals(fn)) {
-					return f;
+				catch (Exception e) {
+					logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
+					throw new JobTypeManagerException(e);
 				}
 			}
 		}
-		return null;
 	}
 	
-//	private void loadJobType(File dir, Props globalConf, Props globalSysConf) throws JobTypeManagerException{
-//		
-//		// look for common conf
-//		Props conf = null;
-//		Props sysConf = null;
-//		File confFile = findFilefromDir(dir, COMMONCONFFILE);
-//		File sysConfFile = findFilefromDir(dir, COMMONSYSCONFFILE);
-//		
-//		try {
-//			if(confFile != null) {
-//				conf = new Props(globalConf, confFile);
-//			}
-//			else {
-//				conf = globalConf;
-//			}
-//			if(sysConfFile != null) {
-//				sysConf = new Props(globalSysConf, sysConfFile);
-//			}
-//			else {
-//				sysConf = globalSysConf;
-//			}
-//		}
-//		catch (Exception e) {
-//			throw new JobTypeManagerException("Failed to get common jobtype properties" + e.getCause());
-//		}
-//		
-//		// look for jobtypeConf.properties and load it 		
-//		for(File f: dir.listFiles()) {
-//			if(f.isFile() && f.getName().equals(JOBTYPESYSCONFFILE)) {
-//				loadJob(dir, f, conf, sysConf);
-//				return;
-//			}
-//		}
-//
-//		// no hit, keep looking
-//		for(File f : dir.listFiles()) {
-//			if(f.isDirectory() && f.canRead())
-//				loadJobType(f, conf, sysConf);
-//		}
-//		
-//	}
-	
 	@SuppressWarnings("unchecked")
-	private void loadJob(File dir, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
+	private void loadJobTypes(File pluginDir, JobTypePluginSet plugins) throws JobTypeManagerException {
+		// Directory is the jobtypeName
+		String jobTypeName = pluginDir.getName();
+		logger.info("Loading plugin " + jobTypeName);
+		
+		Props pluginJobProps = null;
+		Props pluginLoadProps = null;
 		
-		Props conf = null;
-		Props sysConf = null;
-		File confFile = findFilefromDir(dir, JOBTYPECONFFILE);		
-		File sysConfFile = findFilefromDir(dir, JOBTYPESYSCONFFILE);
-		if(sysConfFile == null) {
-			logger.info("No job type found in " + dir.getAbsolutePath());
+		File pluginJobPropsFile = new File(pluginDir, JOBTYPECONFFILE);
+		File pluginLoadPropsFile = new File(pluginDir, JOBTYPESYSCONFFILE);
+
+		if (!pluginLoadPropsFile.exists()) {
+			logger.info("Plugin load props file " + pluginLoadPropsFile + " not found.");
 			return;
 		}
 		
 		try {
-			if(confFile != null) {
-				conf = new Props(commonConf, confFile);
+			Props commonPluginJobProps = plugins.getCommonPluginJobProps();
+			Props commonPluginLoadProps = plugins.getCommonPluginLoadProps();
+			if (pluginJobPropsFile.exists()) {
+				pluginJobProps = new Props(commonPluginJobProps, pluginJobPropsFile);
 			}
 			else {
-				conf = new Props(commonConf);
+				pluginJobProps = new Props(commonPluginJobProps);
 			}
 			
-			sysConf = new Props(commonSysConf, sysConfFile);
-			sysConf = PropsUtils.resolveProps(sysConf);
-
+			pluginLoadProps = new Props(commonPluginLoadProps, pluginLoadPropsFile);
+			pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
 		}
 		catch (Exception e) {
 			throw new JobTypeManagerException("Failed to get jobtype properties" + e.getMessage());
 		}
-		sysConf.put("plugin.dir", dir.getAbsolutePath());
+		// Add properties into the plugin set
+		pluginLoadProps.put("plugin.dir", pluginDir.getAbsolutePath());	
+		plugins.addPluginLoadProps(jobTypeName, pluginLoadProps);
+		if (pluginJobProps != null) {
+			plugins.addPluginJobProps(jobTypeName, pluginJobProps);
+		}
 		
-		// use directory name as job type name
-		String jobtypeName = dir.getName();
+		ClassLoader jobTypeLoader = loadJobTypeClassLoader(pluginDir, jobTypeName, plugins);
+		String jobtypeClass = pluginLoadProps.get("jobtype.class");
 		
-		String jobtypeClass = sysConf.get("jobtype.class");
+		Class<? extends Job> clazz = null;
+		try {
+			clazz = (Class<? extends Job>)jobTypeLoader.loadClass(jobtypeClass);
+			plugins.addPluginClass(jobTypeName, clazz);
+		}
+		catch (ClassNotFoundException e) {
+			throw new JobTypeManagerException(e);
+		}
 		
-		logger.info("Loading jobtype " + jobtypeName );
-
+		logger.info("Verifying job plugin " + jobTypeName);
+		try {
+			Props fakeSysProps = new Props(pluginLoadProps);
+			Props fakeJobProps = new Props(pluginJobProps);
+			@SuppressWarnings("unused")
+			Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
+		}
+		catch (Exception e) {
+			logger.info("Jobtype " + jobTypeName + " failed test!", e);
+			throw new JobExecutionException(e);
+		}
+		catch (Throwable t) {
+			logger.info("Jobtype " + jobTypeName + " failed test!", t);
+			throw new JobExecutionException(t);
+		}
+		
+		logger.info("Loaded jobtype " + jobTypeName + " " + jobtypeClass);
+	}
+	
+	/**
+	 * Creates and loads all plugin resources (jars) into a ClassLoader
+	 * 
+	 * @param pluginDir
+	 * @param jobTypeName
+	 * @param plugins
+	 * @return
+	 */
+	private ClassLoader loadJobTypeClassLoader(File pluginDir, String jobTypeName, JobTypePluginSet plugins) {
 		// sysconf says what jars/confs to load
-		List<URL> resources = new ArrayList<URL>();		
+		List<URL> resources = new ArrayList<URL>();
+		Props pluginLoadProps = plugins.getPluginLoaderProps(jobTypeName);
 		
 		try {
 			//first global classpath
-			logger.info("Adding global resources.");
-			List<String> typeGlobalClassPath = sysConf.getStringList("jobtype.global.classpath", null, ",");
-			if(typeGlobalClassPath != null) {
-				for(String jar : typeGlobalClassPath) {
+			logger.info("Adding global resources for " + jobTypeName);
+			List<String> typeGlobalClassPath = pluginLoadProps.getStringList("jobtype.global.classpath", null, ",");
+			if (typeGlobalClassPath != null) {
+				for (String jar : typeGlobalClassPath) {
 					URL cpItem = new File(jar).toURI().toURL();
-					if(!resources.contains(cpItem)) {
+					if (!resources.contains(cpItem)) {
 						logger.info("adding to classpath " + cpItem);
 						resources.add(cpItem);
 					}
@@ -268,78 +258,51 @@ public class JobTypeManager
 			
 			//type specific classpath
 			logger.info("Adding type resources.");
-			List<String> typeClassPath = sysConf.getStringList("jobtype.classpath", null, ",");
-			if(typeClassPath != null) {
-				for(String jar : typeClassPath) {
+			List<String> typeClassPath = pluginLoadProps.getStringList("jobtype.classpath", null, ",");
+			if (typeClassPath != null) {
+				for (String jar : typeClassPath) {
 					URL cpItem = new File(jar).toURI().toURL();
-					if(!resources.contains(cpItem)) {
+					if (!resources.contains(cpItem)) {
 						logger.info("adding to classpath " + cpItem);
 						resources.add(cpItem);
 					}
 				}
 			}			
-			List<String> jobtypeLibDirs = sysConf.getStringList("jobtype.lib.dir", null, ",");
-			if(jobtypeLibDirs != null) {
-				for(String libDir : jobtypeLibDirs) {
-					for(File f : new File(libDir).listFiles()) {
-						if(f.getName().endsWith(".jar")) {
-								resources.add(f.toURI().toURL());
-								logger.info("adding to classpath " + f.toURI().toURL());
+			List<String> jobtypeLibDirs = pluginLoadProps.getStringList("jobtype.lib.dir", null, ",");
+			if (jobtypeLibDirs != null) {
+				for (String libDir : jobtypeLibDirs) {
+					for (File f : new File(libDir).listFiles()) {
+						if (f.getName().endsWith(".jar")) {
+							resources.add(f.toURI().toURL());
+							logger.info("adding to classpath " + f.toURI().toURL());
 						}
 					}
 				}
 			}
 			
 			logger.info("Adding type override resources.");
-			for(File f : dir.listFiles()) {
-				if(f.getName().endsWith(".jar")) {
-						resources.add(f.toURI().toURL());
-						logger.info("adding to classpath " + f.toURI().toURL());
+			for (File f : pluginDir.listFiles()) {
+				if (f.getName().endsWith(".jar")) {
+					resources.add(f.toURI().toURL());
+					logger.info("adding to classpath " + f.toURI().toURL());
 				}
 			}
 			
-		} catch (MalformedURLException e) {
+		}
+		catch (MalformedURLException e) {
 			throw new JobTypeManagerException(e);
 		}
 		
 		// each job type can have a different class loader
 		ClassLoader jobTypeLoader = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentLoader);
-		
-		Class<? extends Job> clazz = null;
-		try {
-			clazz = (Class<? extends Job>)jobTypeLoader.loadClass(jobtypeClass);
-			jobToClass.put(jobtypeName, clazz);
-		}
-		catch (ClassNotFoundException e) {
-			throw new JobTypeManagerException(e);
-		}
-		
-		logger.info("Doing simple testing...");
-		try {
-			Props fakeSysProps = new Props(sysConf);
-//			fakeSysProps.put("type", jobtypeName);
-			Props fakeJobProps = new Props(conf);
-			@SuppressWarnings("unused")
-			Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
-		}
-		catch (Exception e) {
-			logger.info("Jobtype " + jobtypeName + " failed test!", e);
-			throw new JobExecutionException(e);
-		}
-		catch (Throwable t) {
-			logger.info("Jobtype " + jobtypeName + " failed test!", t);
-			throw new JobExecutionException(t);
-		}
-		
-		logger.info("Loaded jobtype " + jobtypeName + " " + jobtypeClass);
-		
-		if(conf != null) jobtypeJobProps.put(jobtypeName, conf);
-		jobtypeSysProps.put(jobtypeName, sysConf);
-		
+		return jobTypeLoader;
 	}
 	
-	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) 
-			throws JobTypeManagerException {
+	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException {
+		// This is final because during build phase, you should never need to swap
+		// the pluginSet for safety reasons
+		final JobTypePluginSet pluginSet = getJobTypePluginSet();
+		
 		Job job = null;
 		try {
 			String jobType = jobProps.getString("type");
@@ -352,44 +315,36 @@ public class JobTypeManager
 			
 			logger.info("Building " + jobType + " job executor. ");
 			
-			Class<? extends Object> executorClass = jobToClass.get(jobType);
-
+			Class<? extends Object> executorClass = pluginSet.getPluginClass(jobType);
 			if (executorClass == null) {
 				throw new JobExecutionException(
 						String.format("Job type '" + jobType + "' is unrecognized. Could not construct job[%s] of type[%s].", jobProps, jobType));
 			}
 			
-			Props sysConf = jobtypeSysProps.get(jobType);
-			
-			Props jobConf = jobProps;
-			if (jobtypeJobProps.containsKey(jobType)) {
-				Props p = jobtypeJobProps.get(jobType);
-				for (String k : p.getKeySet()) {
-					if (!jobConf.containsKey(k)) {
-						jobConf.put(k, p.get(k));
+			Props pluginJobProps = pluginSet.getPluginJobProps(jobType);
+			if (pluginJobProps != null) {
+				for (String k : pluginJobProps.getKeySet()) {
+					if (!jobProps.containsKey(k)) {
+						jobProps.put(k, pluginJobProps.get(k));
 					}
 				}
 			}
-			jobConf = PropsUtils.resolveProps(jobConf);
+			jobProps = PropsUtils.resolveProps(jobProps);
 			
-			if (sysConf != null) {
-				sysConf = PropsUtils.resolveProps(sysConf);
+			Props pluginLoadProps = pluginSet.getPluginLoaderProps(jobType);
+			if (pluginLoadProps != null) {
+				pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
 			}
 			else {
-				sysConf = new Props();
+				pluginLoadProps = new Props();
 			}
 			
-//			logger.info("sysConf is " + sysConf);
-//			logger.info("jobConf is " + jobConf);
-//			
 			job = (Job) Utils.callConstructor(
-					executorClass, jobId, sysConf, jobConf, logger);
+					executorClass, jobId, pluginLoadProps, jobProps, logger);
 		}
 		catch (Exception e) {
-			//job = new InitErrorJob(jobId, e);
 			logger.error("Failed to build job executor for job " + jobId + e.getMessage());
 			throw new JobTypeManagerException("Failed to build job executor for job " + jobId, e);
-			//throw new JobTypeManagerException(e);
 		}
 		catch (Throwable t) {
 			logger.error("Failed to build job executor for job " + jobId + t.getMessage(), t);
@@ -398,9 +353,12 @@ public class JobTypeManager
 
 		return job;
 	}
-
-	public void registerJobType(String typeName, Class<? extends Job> jobTypeClass) {
-		jobToClass.put(typeName, jobTypeClass);
+	
+	/**
+	 * Public for test reasons. Will need to move tests to the same package
+	 */
+	public synchronized JobTypePluginSet getJobTypePluginSet() {
+		return this.pluginSet;
 	}
 }
 
diff --git a/src/main/java/azkaban/jobtype/JobTypePluginSet.java b/src/main/java/azkaban/jobtype/JobTypePluginSet.java
new file mode 100644
index 0000000..9e6ded2
--- /dev/null
+++ b/src/main/java/azkaban/jobtype/JobTypePluginSet.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobtype;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.jobExecutor.Job;
+import azkaban.utils.Props;
+
+/**
+ * Container for job type plugins
+ * 
+ * This contains the jobClass objects, the properties for loading plugins, and the
+ * properties given by default to the plugin.
+ * 
+ * This class is not thread safe, so adding to this class should only be populated 
+ * and controlled by the JobTypeManager
+ */
+public class JobTypePluginSet {
+	private Map<String, Class<? extends Job>> jobToClass;
+	private Map<String, Props> pluginJobPropsMap;
+	private Map<String, Props> pluginLoadPropsMap;
+	
+	private Props commonJobProps;
+	private Props commonLoadProps;
+	
+	/**
+	 * Base constructor
+	 */
+	public JobTypePluginSet() {
+		jobToClass = new HashMap<String, Class<? extends Job>>();
+		pluginJobPropsMap = new HashMap<String, Props>();
+		pluginLoadPropsMap = new HashMap<String, Props>();
+	}
+	
+	/**
+	 * Copy constructor
+	 * @param clone
+	 */
+	public JobTypePluginSet(JobTypePluginSet clone) {
+		jobToClass = new HashMap<String, Class<? extends Job>>(clone.jobToClass);
+		pluginJobPropsMap = new HashMap<String, Props>(clone.pluginJobPropsMap);
+		pluginLoadPropsMap = new HashMap<String, Props>(clone.pluginLoadPropsMap);
+		commonJobProps = clone.commonJobProps;
+		commonLoadProps = clone.commonLoadProps;
+	}
+	
+	/**
+	 * Sets the common properties shared in every jobtype
+	 * @param commonJobProps
+	 */
+	public void setCommonPluginJobProps(Props commonJobProps) {
+		this.commonJobProps = commonJobProps;
+	}
+	
+	/**
+	 * Sets the common properties used to load every plugin
+	 * @param commonLoadProps
+	 */
+	public void setCommonPluginLoadProps(Props commonLoadProps) {
+		this.commonLoadProps = commonLoadProps;
+	}
+	
+	/**
+	 * Gets common properties for every jobtype
+	 * @return
+	 */
+	public Props getCommonPluginJobProps() {
+		return commonJobProps;
+	}
+	
+	/**
+	 * Gets the common properties used to load a plugin
+	 * @return
+	 */
+	public Props getCommonPluginLoadProps() {
+		return commonLoadProps;
+	}
+	
+	/**
+	 * Get the properties for a jobtype used to setup and load a plugin
+	 * 
+	 * @param jobTypeName
+	 * @return
+	 */
+	public Props getPluginLoaderProps(String jobTypeName) {
+		return pluginLoadPropsMap.get(jobTypeName);
+	}
+	
+	/**
+	 * Get the properties that will be given to the plugin as default job
+	 * properties.
+	 * 
+	 * @param jobTypeName
+	 * @return
+	 */
+	public Props getPluginJobProps(String jobTypeName) {
+		return pluginJobPropsMap.get(jobTypeName);
+	}
+	
+	/**
+	 * Gets the plugin job runner class
+	 * 
+	 * @param jobTypeName
+	 * @return
+	 */
+	public Class<? extends Job> getPluginClass(String jobTypeName) {
+		return jobToClass.get(jobTypeName);
+	}
+	
+	/**
+	 * Adds plugin jobtype class
+	 */
+	public void addPluginClass(String jobTypeName, Class<? extends Job> jobTypeClass) {
+		jobToClass.put(jobTypeName, jobTypeClass);
+	}
+	
+	/**
+	 * Adds plugin job properties used as default runtime properties
+	 */
+	public void addPluginJobProps(String jobTypeName, Props props) {
+		pluginJobPropsMap.put(jobTypeName, props);
+	}
+	
+	/**
+	 * Adds plugin load properties used to load the plugin
+	 */
+	public void addPluginLoadProps(String jobTypeName, Props props) {
+		pluginLoadPropsMap.put(jobTypeName, props);
+	}
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index c87f2c6..cc3b1e5 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -37,7 +37,7 @@ public class LocalFlowWatcherTest {
 	@Before
 	public void setUp() throws Exception {
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
+		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 	}
 	
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index 45c3a85..d8e94dd 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -38,7 +38,7 @@ public class RemoteFlowWatcherTest {
 	@Before
 	public void setUp() throws Exception {
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
+		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 	}
 	
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index aa1eee5..df9cac1 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -23,6 +23,7 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
@@ -73,8 +74,10 @@ public class FlowRunnerPipelineTest {
 		}
 		workingDir.mkdirs();
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
-		jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+		
+		pluginSet.addPluginClass("java", JavaJob.class);
+		pluginSet.addPluginClass("test", InteractiveTestJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 		fakeExecutorLoader = new MockExecutorLoader();
 		project = new Project(1, "testProject");
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 21b43f1..2e708fe 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -21,8 +21,10 @@ import azkaban.executor.Status;
 
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.test.execapp.MockProjectLoader;
 import azkaban.test.executor.InteractiveTestJob;
 import azkaban.test.executor.JavaJob;
 import azkaban.utils.JSONUtils;
@@ -46,8 +48,9 @@ public class FlowRunnerTest {
 			workingDir.mkdirs();
 		}
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
-		jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+		pluginSet.addPluginClass("java", JavaJob.class);
+		pluginSet.addPluginClass("test", InteractiveTestJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 		
 		InteractiveTestJob.clearTestJobs();
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 7a67da6..0c9d7b9 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -21,6 +21,7 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
@@ -92,8 +93,10 @@ public class FlowRunnerTest2 {
 		}
 		workingDir.mkdirs();
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
-		jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+		
+		pluginSet.addPluginClass("java", JavaJob.class);
+		pluginSet.addPluginClass("test", InteractiveTestJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 		fakeExecutorLoader = new MockExecutorLoader();
 		project = new Project(1, "testProject");
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 69d6296..6fcbb11 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -42,7 +42,8 @@ public class JobRunnerTest {
 		}
 		workingDir.mkdirs();
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
+		
+		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 	}
 
 	@After
diff --git a/unit/java/azkaban/test/jobtype/FakeJavaJob.java b/unit/java/azkaban/test/jobtype/FakeJavaJob.java
new file mode 100644
index 0000000..65ca1ba
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/FakeJavaJob.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import org.apache.log4j.Logger;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.utils.Props;
+
+public class FakeJavaJob extends JavaProcessJob {
+	public FakeJavaJob(String jobid, Props sysProps, Props jobProps, Logger log) {
+		super(jobid, sysProps, jobProps, log);
+	}
+}
+
diff --git a/unit/java/azkaban/test/jobtype/FakeJavaJob2.java b/unit/java/azkaban/test/jobtype/FakeJavaJob2.java
new file mode 100644
index 0000000..581aeff
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/FakeJavaJob2.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import org.apache.log4j.Logger;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.utils.Props;
+
+public class FakeJavaJob2 extends JavaProcessJob {
+	public FakeJavaJob2(String jobid, Props sysProps, Props jobProps, Logger log) {
+		super(jobid, sysProps, jobProps, log);
+	}
+}
+
diff --git a/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
new file mode 100644
index 0000000..a4c5cb0
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.jobExecutor.Job;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
+import azkaban.utils.Props;
+
+
+/**
+ * Test the flow run, especially with embedded flows.
+ * Files are in unit/plugins/jobtypes
+ * 
+ */
+public class JobTypeManagerTest {
+	public static String TEST_PLUGIN_DIR = "jobtypes_test";
+	private Logger logger = Logger.getLogger(JobTypeManagerTest.class);
+	private JobTypeManager manager;
+	
+	public JobTypeManagerTest() {
+	}
+	
+	@Before
+	public void setUp() throws Exception {
+		File jobTypeDir = new File(TEST_PLUGIN_DIR);
+		jobTypeDir.mkdirs();
+		
+		FileUtils.copyDirectory(new File("unit/plugins/jobtypes"), jobTypeDir);
+		manager = new JobTypeManager(TEST_PLUGIN_DIR, this.getClass().getClassLoader());
+	}
+	
+	@After
+	public void tearDown() throws IOException {
+		FileUtils.deleteDirectory(new File(TEST_PLUGIN_DIR));
+	}
+	
+	/**
+	 * Tests that the common and common private properties are loaded correctly
+	 * @throws Exception
+	 */
+	@Test
+	public void testCommonPluginProps() throws Exception {
+		JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+		
+		Props props = pluginSet.getCommonPluginJobProps();
+		System.out.println(props.toString());
+		assertEquals("commonprop1", props.getString("commonprop1"));
+		assertEquals("commonprop2", props.getString("commonprop2"));
+		assertEquals("commonprop3", props.getString("commonprop3"));
+		
+		Props priv = pluginSet.getCommonPluginLoadProps();
+		assertEquals("commonprivate1", priv.getString("commonprivate1"));
+		assertEquals("commonprivate2", priv.getString("commonprivate2"));
+		assertEquals("commonprivate3", priv.getString("commonprivate3"));
+	}
+
+	/**
+	 * Tests that the proper classes were loaded and that the common and the load
+	 * properties are properly loaded.
+	 * 
+	 * @throws Exception
+	 */
+	@Test
+	public void testLoadedClasses() throws Exception {
+		JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+		
+		Props props = pluginSet.getCommonPluginJobProps();
+		System.out.println(props.toString());
+		assertEquals("commonprop1", props.getString("commonprop1"));
+		assertEquals("commonprop2", props.getString("commonprop2"));
+		assertEquals("commonprop3", props.getString("commonprop3"));
+		assertNull(props.get("commonprivate1"));
+		
+		Props priv = pluginSet.getCommonPluginLoadProps();
+		assertEquals("commonprivate1", priv.getString("commonprivate1"));
+		assertEquals("commonprivate2", priv.getString("commonprivate2"));
+		assertEquals("commonprivate3", priv.getString("commonprivate3"));
+		
+		// Testing the anothertestjobtype
+		Class<? extends Job> aPluginClass = pluginSet.getPluginClass("anothertestjob");
+		assertEquals("azkaban.test.jobtype.FakeJavaJob", aPluginClass.getName());
+		Props ajobProps = pluginSet.getPluginJobProps("anothertestjob");
+		Props aloadProps = pluginSet.getPluginLoaderProps("anothertestjob");
+
+		// Loader props
+		assertEquals("lib/*", aloadProps.get("jobtype.classpath"));
+		assertEquals("azkaban.test.jobtype.FakeJavaJob", aloadProps.get("jobtype.class"));
+		assertEquals("commonprivate1", aloadProps.get("commonprivate1"));
+		assertEquals("commonprivate2", aloadProps.get("commonprivate2"));
+		assertEquals("commonprivate3", aloadProps.get("commonprivate3"));
+		// Job props
+		assertEquals("commonprop1", ajobProps.get("commonprop1"));
+		assertEquals("commonprop2", ajobProps.get("commonprop2"));
+		assertEquals("commonprop3", ajobProps.get("commonprop3"));
+		assertNull(ajobProps.get("commonprivate1"));
+		
+		Class<? extends Job> tPluginClass = pluginSet.getPluginClass("testjob");
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", tPluginClass.getName());
+		Props tjobProps = pluginSet.getPluginJobProps("testjob");
+		Props tloadProps = pluginSet.getPluginLoaderProps("testjob");
+
+		// Loader props
+		assertNull(tloadProps.get("jobtype.classpath"));
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", tloadProps.get("jobtype.class"));
+		assertEquals("commonprivate1", tloadProps.get("commonprivate1"));
+		assertEquals("commonprivate2", tloadProps.get("commonprivate2"));
+		assertEquals("private3", tloadProps.get("commonprivate3"));
+		assertEquals("0", tloadProps.get("testprivate"));
+		// Job props
+		assertEquals("commonprop1", tjobProps.get("commonprop1"));
+		assertEquals("commonprop2", tjobProps.get("commonprop2"));
+		assertEquals("1", tjobProps.get("pluginprops1"));
+		assertEquals("2", tjobProps.get("pluginprops2"));
+		assertEquals("3", tjobProps.get("pluginprops3"));
+		assertEquals("pluginprops", tjobProps.get("commonprop3"));
+		// Testing that the private properties aren't shared with the public ones
+		assertNull(tjobProps.get("commonprivate1"));
+		assertNull(tjobProps.get("testprivate"));
+	}
+	
+	/**
+	 * Test building classes
+	 * @throws Exception
+	 */
+	@Test
+	public void testBuildClass() throws Exception {
+		Props jobProps = new Props();
+		jobProps.put("type", "anothertestjob");
+		jobProps.put("test","test1");
+		jobProps.put("pluginprops3","4");
+		Job job = manager.buildJobExecutor("anothertestjob", jobProps, logger);
+		
+		assertTrue(job instanceof FakeJavaJob);
+		FakeJavaJob fjj = (FakeJavaJob)job;
+		
+		Props props = fjj.getJobProps();
+		assertEquals("test1", props.get("test"));
+		assertNull(props.get("pluginprops1"));
+		assertEquals("4", props.get("pluginprops3"));
+		assertEquals("commonprop1", props.get("commonprop1"));
+		assertEquals("commonprop2", props.get("commonprop2"));
+		assertEquals("commonprop3", props.get("commonprop3"));
+		assertNull(props.get("commonprivate1"));
+	}
+	
+	/**
+	 * Test building classes 2
+	 * @throws Exception
+	 */
+	@Test
+	public void testBuildClass2() throws Exception {
+		Props jobProps = new Props();
+		jobProps.put("type", "testjob");
+		jobProps.put("test","test1");
+		jobProps.put("pluginprops3","4");
+		Job job = manager.buildJobExecutor("testjob", jobProps, logger);
+		
+		assertTrue(job instanceof FakeJavaJob2);
+		FakeJavaJob2 fjj = (FakeJavaJob2)job;
+		
+		Props props = fjj.getJobProps();
+		assertEquals("test1", props.get("test"));
+		assertEquals("1", props.get("pluginprops1"));
+		assertEquals("2", props.get("pluginprops2"));
+		assertEquals("4", props.get("pluginprops3")); // Overridden value
+		assertEquals("commonprop1", props.get("commonprop1"));
+		assertEquals("commonprop2", props.get("commonprop2"));
+		assertEquals("pluginprops", props.get("commonprop3"));
+		assertNull(props.get("commonprivate1"));
+	}
+	
+	/**
+	 * Test out reloading properties
+	 * @throws Exception
+	 */
+	@Test
+	public void testResetPlugins() throws Exception {
+		// Add a plugins file to the anothertestjob folder
+		File anothertestfolder = new File(TEST_PLUGIN_DIR + "/anothertestjob");
+		Props pluginProps = new Props();
+		pluginProps.put("test1", "1");
+		pluginProps.put("test2", "2");
+		pluginProps.put("pluginprops3","4");
+		pluginProps.storeFlattened(new File(anothertestfolder, "plugin.properties"));
+		
+		// clone the testjob folder
+		File testFolder = new File(TEST_PLUGIN_DIR + "/testjob");
+		FileUtils.copyDirectory(testFolder, new File(TEST_PLUGIN_DIR + "/newtestjob"));
+		
+		// change the common properties
+		Props commonPlugin = new Props(null, TEST_PLUGIN_DIR + "/common.properties");
+		commonPlugin.put("commonprop1", "1");
+		commonPlugin.put("newcommonprop1", "2");
+		commonPlugin.removeLocal("commonprop2");
+		commonPlugin.storeFlattened(new File(TEST_PLUGIN_DIR + "/common.properties"));
+		
+		// change the common properties
+		Props commonPrivate = new Props(null, TEST_PLUGIN_DIR + "/commonprivate.properties");
+		commonPrivate.put("commonprivate1", "1");
+		commonPrivate.put("newcommonprivate1", "2");
+		commonPrivate.removeLocal("commonprivate2");
+		commonPrivate.storeFlattened(new File(TEST_PLUGIN_DIR + "/commonprivate.properties"));
+		
+		// change testjob private property
+		Props loadProps = new Props(null, TEST_PLUGIN_DIR + "/testjob/private.properties");
+		loadProps.put("privatetest", "test");
+		
+		/*
+		 * Reload the plugins here!!
+		 */
+		manager.loadPlugins();
+		
+		// Checkout common props
+		JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+		Props commonProps = pluginSet.getCommonPluginJobProps();
+		assertEquals("1", commonProps.get("commonprop1"));
+		assertEquals("commonprop3", commonProps.get("commonprop3"));
+		assertEquals("2", commonProps.get("newcommonprop1"));
+		assertNull(commonProps.get("commonprop2"));
+		
+		// Checkout common private
+		Props commonPrivateProps = pluginSet.getCommonPluginLoadProps();
+		assertEquals("1", commonPrivateProps.get("commonprivate1"));
+		assertEquals("commonprivate3", commonPrivateProps.get("commonprivate3"));
+		assertEquals("2", commonPrivateProps.get("newcommonprivate1"));
+		assertNull(commonPrivateProps.get("commonprivate2"));
+		
+		// Verify anothertestjob changes
+		Class<? extends Job> atjClass = pluginSet.getPluginClass("anothertestjob");
+		assertEquals("azkaban.test.jobtype.FakeJavaJob", atjClass.getName());
+		Props ajobProps = pluginSet.getPluginJobProps("anothertestjob");
+		assertEquals("1", ajobProps.get("test1"));
+		assertEquals("2", ajobProps.get("test2"));
+		assertEquals("4", ajobProps.get("pluginprops3"));
+		assertEquals("commonprop3", ajobProps.get("commonprop3"));
+
+		Props aloadProps = pluginSet.getPluginLoaderProps("anothertestjob");
+		assertEquals("1", aloadProps.get("commonprivate1"));
+		assertNull(aloadProps.get("commonprivate2"));
+		assertEquals("commonprivate3", aloadProps.get("commonprivate3"));
+		
+		// Verify testjob changes
+		Class<? extends Job> tjClass = pluginSet.getPluginClass("testjob");
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", tjClass.getName());
+		Props tjobProps = pluginSet.getPluginJobProps("testjob");
+		assertEquals("1", tjobProps.get("commonprop1"));
+		assertEquals("2", tjobProps.get("newcommonprop1"));
+		assertEquals("1", tjobProps.get("pluginprops1"));
+		assertEquals("2", tjobProps.get("pluginprops2"));
+		assertEquals("3", tjobProps.get("pluginprops3"));
+		assertEquals("pluginprops", tjobProps.get("commonprop3"));
+		assertNull(tjobProps.get("commonprop2"));
+
+		Props tloadProps = pluginSet.getPluginLoaderProps("testjob");
+		assertNull(tloadProps.get("jobtype.classpath"));
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", tloadProps.get("jobtype.class"));
+		assertEquals("1", tloadProps.get("commonprivate1"));
+		assertNull(tloadProps.get("commonprivate2"));
+		assertEquals("private3", tloadProps.get("commonprivate3"));
+		
+		// Verify newtestjob
+		Class<? extends Job> ntPluginClass = pluginSet.getPluginClass("newtestjob");
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", ntPluginClass.getName());
+		Props ntjobProps = pluginSet.getPluginJobProps("newtestjob");
+		Props ntloadProps = pluginSet.getPluginLoaderProps("newtestjob");
+
+		// Loader props
+		assertNull(ntloadProps.get("jobtype.classpath"));
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", ntloadProps.get("jobtype.class"));
+		assertEquals("1", ntloadProps.get("commonprivate1"));
+		assertNull(ntloadProps.get("commonprivate2"));
+		assertEquals("private3", ntloadProps.get("commonprivate3"));
+		assertEquals("0", ntloadProps.get("testprivate"));
+		// Job props
+		assertEquals("1", ntjobProps.get("commonprop1"));
+		assertNull(ntjobProps.get("commonprop2"));
+		assertEquals("1", ntjobProps.get("pluginprops1"));
+		assertEquals("2", ntjobProps.get("pluginprops2"));
+		assertEquals("3", ntjobProps.get("pluginprops3"));
+		assertEquals("pluginprops", ntjobProps.get("commonprop3"));
+	}
+}
diff --git a/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar b/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar
new file mode 100644
index 0000000..51eb0de
Binary files /dev/null and b/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar differ
diff --git a/unit/plugins/jobtypes/anothertestjob/private.properties b/unit/plugins/jobtypes/anothertestjob/private.properties
new file mode 100644
index 0000000..8e95c94
--- /dev/null
+++ b/unit/plugins/jobtypes/anothertestjob/private.properties
@@ -0,0 +1,2 @@
+jobtype.classpath=lib/*
+jobtype.class=azkaban.test.jobtype.FakeJavaJob
diff --git a/unit/plugins/jobtypes/common.properties b/unit/plugins/jobtypes/common.properties
new file mode 100644
index 0000000..2823a83
--- /dev/null
+++ b/unit/plugins/jobtypes/common.properties
@@ -0,0 +1,3 @@
+commonprop1=commonprop1
+commonprop2=commonprop2
+commonprop3=commonprop3
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/commonprivate.properties b/unit/plugins/jobtypes/commonprivate.properties
new file mode 100644
index 0000000..7d46d43
--- /dev/null
+++ b/unit/plugins/jobtypes/commonprivate.properties
@@ -0,0 +1,3 @@
+commonprivate1=commonprivate1
+commonprivate2=commonprivate2
+commonprivate3=commonprivate3
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/testjob/fakejobtype.jar b/unit/plugins/jobtypes/testjob/fakejobtype.jar
new file mode 100644
index 0000000..51eb0de
Binary files /dev/null and b/unit/plugins/jobtypes/testjob/fakejobtype.jar differ
diff --git a/unit/plugins/jobtypes/testjob/plugin.properties b/unit/plugins/jobtypes/testjob/plugin.properties
new file mode 100644
index 0000000..587081a
--- /dev/null
+++ b/unit/plugins/jobtypes/testjob/plugin.properties
@@ -0,0 +1,4 @@
+pluginprops1=1
+pluginprops2=2
+pluginprops3=3
+commonprop3=pluginprops
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/testjob/private.properties b/unit/plugins/jobtypes/testjob/private.properties
new file mode 100644
index 0000000..2bba593
--- /dev/null
+++ b/unit/plugins/jobtypes/testjob/private.properties
@@ -0,0 +1,3 @@
+jobtype.class=azkaban.test.jobtype.FakeJavaJob2
+commonprivate3=private3
+testprivate=0
\ No newline at end of file