azkaban-developers

Merge branch 'master' of https://github.com/azkaban/azkaban2

4/29/2014 2:55:43 PM

Changes

build.gradle 288(+181 -107)

src/main/less/Makefile 34(+0 -34)

Details

build.gradle 288(+181 -107)

diff --git a/build.gradle b/build.gradle
index aaea92f..8ace64a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,5 +1,15 @@
+buildscript {
+  repositories {
+    mavenCentral()
+  }
+  dependencies {
+    classpath 'de.obqo.gradle:gradle-lesscss-plugin:1.0-1.3.3'
+  }
+}
+
 apply plugin: 'java'
 apply plugin: 'eclipse'
+apply plugin: 'lesscss'
 
 defaultTasks 'dist'
 
@@ -41,18 +51,21 @@ configurations {
     compile {
         description = 'compile classpath'
     }
+    generateRestli {
+        transitive = true
+    }
     test {
         extendsFrom compile
     }
 }
-configurations.compile {
-    description = 'compile classpath'
-}
+
+ext.pegasusVersion = '1.15.7'
 
 dependencies {
   compile (
     [group: 'commons-collections', name:'commons-collections', version: '3.2.1'],
     [group: 'commons-configuration', name:'commons-configuration', version: '1.8'],
+    [group: 'commons-codec', name:'commons-codec', version: '1.9'],
     [group: 'commons-dbcp', name:'commons-dbcp', version: '1.4'],
     [group: 'commons-dbutils', name:'commons-dbutils', version: '1.5'],
     [group: 'org.apache.commons', name:'commons-email', version: '1.2'],
@@ -68,7 +81,6 @@ dependencies {
     [group: 'org.apache.httpcomponents', name:'httpcore', version: '4.2.1'],
     [group: 'org.codehaus.jackson', name:'jackson-core-asl', version: '1.9.5'],
     [group: 'org.codehaus.jackson', name:'jackson-mapper-asl',version: '1.9.5'],
-    [group: 'org.codehaus.jackson', name:'jackson-core-asl', version: '1.9.5'],
     [group: 'org.mortbay.jetty', name:'jetty', version: '6.1.26'],
     [group: 'org.mortbay.jetty', name:'jetty-util', version: '6.1.26'],
     [group: 'joda-time', name:'joda-time', version: '2.0'],
@@ -78,14 +90,42 @@ dependencies {
     [group: 'mysql', name:'mysql-connector-java', version: '5.1.28'],
     [group: 'javax.servlet', name:'servlet-api', version: '2.5'],
     [group: 'org.slf4j', name:'slf4j-api', version: '1.6.1'],
-    [group: 'org.apache.velocity', name:'velocity', version: '1.7']
+    [group: 'org.apache.velocity', name:'velocity', version: '1.7'],
+    [group: 'com.linkedin.pegasus', name: 'gradle-plugins', version: pegasusVersion],
+    [group: 'com.linkedin.pegasus', name: 'pegasus-common', version: pegasusVersion],
+    [group: 'com.linkedin.pegasus', name: 'restli-common', version: pegasusVersion],
+    [group: 'com.linkedin.pegasus', name: 'restli-server', version: pegasusVersion],
+    [group: 'com.linkedin.pegasus', name: 'data', version: pegasusVersion],
+    [group: 'com.linkedin.pegasus', name: 'r2', version: pegasusVersion],
+    [group: 'com.linkedin.pegasus', name: 'li-jersey-uri', version: pegasusVersion],
+    [group: 'com.linkedin.parseq', name: 'parseq', version: '1.3.7'],
+    [group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.3.2']
+  )
+
+  generateRestli (
+      [group: 'com.linkedin.pegasus', name:'generator', version: pegasusVersion],
+      [group: 'com.linkedin.pegasus', name:'restli-tools', version: pegasusVersion]
   )
 
   testCompile (
-    [group: 'junit', name:'junit', version: '4.11']
+    [group: 'junit', name:'junit', version: '4.11'],
+    [group: 'org.hamcrest', name:'hamcrest-all', version: '1.3']
   )
 }
 
+sourceSets {
+    main {
+        java {
+            srcDirs 'src/main/java', 'src/restli/generatedJava', 'src/restli/java'
+        }
+    }
+    test {
+        java {
+            srcDirs 'unit/java'
+        }
+    }
+}
+
 jar {
     baseName =  'azkaban'
     manifest {
@@ -96,6 +136,27 @@ jar {
     }
 }
 
+task restliTemplateGenerator(type: JavaExec) {
+    mkdir 'src/restli/generatedJava'
+
+    main = 'com.linkedin.pegasus.generator.PegasusDataTemplateGenerator'
+    args = ['src/restli/generatedJava','src/restli/schemas']
+    classpath = configurations.generateRestli
+}
+
+task restliRestSpecGenerator(dependsOn: [restliTemplateGenerator], type: JavaExec) << {
+    mkdir 'src/restli/generatedRestSpec'
+
+    main = 'com.linkedin.restli.tools.idlgen.RestLiResourceModelExporterCmdLineApp'
+    args = ['-outdir', 'src/restli/generatedRestSpec', '-sourcepath', 'src/restli/java']
+    classpath = configurations.generateRestli
+}
+
+task restli(dependsOn: restliTemplateGenerator) << {
+}
+
+compileJava.dependsOn.add('restli')
+
 eclipse.classpath.file {
     // Erase the whole classpath
     beforeMerged {
@@ -109,15 +170,12 @@ eclipse.classpath.file {
     }
 }
 
-/**
- * Invokes a makefile target that will compile less files
- */
-task compileLess(type:Exec) {
-    workingDir 'src/main/less'
-    commandLine 'make', '-e'
-    environment (
-      OBJ_DIR : file(new File(buildDir,'/less'))
-   )
+lesscss {
+  source = fileTree('src/main/less') {
+    include 'azkaban.less'
+    include 'azkaban-graph.less'
+  }
+  dest = 'build/web/css'
 }
 
 /**
@@ -128,27 +186,22 @@ task compileDust(type:Exec) {
     commandLine 'make', '-e'
     environment (
       OBJ_DIR : file(new File(buildDir,'/dust'))
-   )
+    )
 }
 
 /**
  * Copies web files to a build directory
  */
-task web(dependsOn: ['compileLess', 'compileDust']) << {
+task web(dependsOn: ['lesscss', 'compileDust']) << {
     println 'Copying web files'
     copy {
         from('src/web')
         into('build/web')
     }
-
     copy {
         from('build/dust')
         into('build/web/js')
     }
-    copy {
-        from('build/less')
-        into('build/web/css')
-    }
 }
 
 /*
@@ -170,219 +223,240 @@ task createVersionFile() << {
     versionFile.write(versionStr)
 }
 
+ext.soloAppendix = 'solo-server'
+ext.soloPackageDir = 'build/package/' + jar.baseName + '-' + soloAppendix
+
 /**
- * Packages the SoloServer version of Azkaban
+ * Copies the Azkaban Solo Server files into its package directory.
  */
-task packageSolo(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
-    appendix = 'solo-server'
-    packageDir = 'build/package/' + baseName + '-' + appendix
+task copySolo(dependsOn: ['jar', 'web', 'createVersionFile']) << {
+    delete soloPackageDir
+    mkdir soloPackageDir
 
-    println 'Creating Azkaban Solo Server Package into ' + packageDir
-    mkdir packageDir
-    mkdir packageDir + '/extlib'
-    mkdir packageDir + '/plugins'
+    println 'Creating Azkaban Solo Server Package into ' + soloPackageDir
+    mkdir soloPackageDir + '/lib'
+    mkdir soloPackageDir + '/extlib'
+    mkdir soloPackageDir + '/plugins'
 
-    println 'Copying Soloserver bin & conf'
     copy {
+        println 'Copying Soloserver bin & conf'
         from('src/package/soloserver')
-        into(packageDir)
+        into(soloPackageDir)
     }
 
-    println 'Copying Azkaban lib'
     copy {
+        println 'Copying Azkaban lib'
         from('build/libs')
-        into(packageDir + '/lib')
+        into(soloPackageDir + '/lib')
     }
 
-    println 'Copying web'
     copy {
+        println 'Copying web'
         from('build/web')
-        into(packageDir + '/web')
+        into(soloPackageDir + '/web')
     }
 
-    println 'Copying sql'
     copy {
+        println 'Copying sql'
         from('src/sql')
-        into(packageDir + '/sql')
+        into(soloPackageDir + '/sql')
     }
 
-    println 'Copying dependency jars'
     copy {
-        into packageDir + '/lib'
+        println 'Copying dependency jars'
+        into soloPackageDir + '/lib'
         from configurations.compile
     }
 
     copy {
-        into packageDir
+        println 'Copying version file'
+        into soloPackageDir
         from 'build/package/version.file'
     }
+}
 
-    println 'Tarballing Solo Package'
+/**
+ * Packages the SoloServer version of Azkaban
+ */
+task packageSolo(type: Tar, dependsOn: 'copySolo') {
+    appendix = soloAppendix
     extension = 'tar.gz'
     compression = Compression.GZIP
 
-    basedir = baseName + '-' + appendix + '-' + version
-    println 'Source is in ' + packageDir
+    ext.basedir = baseName + '-' + appendix + '-' + version
     into(basedir) {
-        from packageDir
+        from soloPackageDir
         exclude 'bin'
     }
 
-    dst_bin = basedir + '/bin'
-    src_bin = packageDir + '/bin'
+    ext.dst_bin = basedir + '/bin'
+    ext.src_bin = soloPackageDir + '/bin'
     from(src_bin) {
         into dst_bin
         fileMode = 0755
     }
 }
 
+ext.sqlPackageDir = 'build/package/sql'
+
 /**
- * Packages the Sql Scripts for Azkaban DB
+ * Copies the SQL files into its package directory.
  */
-task packageSql(type: Tar) {
-    String packageDir = 'build/package/sql'
+task copySql() << {
+    println 'Creating Azkaban SQL Scripts into ' + sqlPackageDir
+    delete sqlPackageDir
+    mkdir sqlPackageDir
 
-    println 'Creating Azkaban SQL Scripts into ' + packageDir
-    mkdir packageDir
-
-    println 'Copying SQL files'
     copy {
+        println 'Copying SQL files'
         from('src/sql')
-        into(packageDir)
+        into(sqlPackageDir)
     }
 
-    String destFile = packageDir + '/create-all-sql-' + version + '.sql';
-    println('Concating create scripts to ' + destFile)
+    String destFile = sqlPackageDir + '/create-all-sql-' + version + '.sql';
     ant.concat(destfile:destFile, fixlastline:'yes') {
+        println('Concating create scripts to ' + destFile)
         fileset(dir: 'src/sql') {
             exclude(name: 'update.*.sql')
             exclude(name: 'database.properties')
         }
     }
+}
 
-    println 'Tarballing SQL Package'
+/**
+ * Packages the Sql Scripts for Azkaban DB
+ */
+task packageSql(type: Tar, dependsOn: 'copySql') {
     extension = 'tar.gz'
     compression = Compression.GZIP
     appendix = 'sql'
 
-    basedir = baseName + '-' + appendix + '-' + version
-    packageDir = 'build/package/sql'
-    println 'Source is in ' + packageDir
+    ext.basedir = baseName + '-' + appendix + '-' + version
     into(basedir) {
-        from packageDir
+        from sqlPackageDir
     }
 }
 
+ext.execAppendix = 'exec-server'
+ext.execPackageDir = 'build/package/' + jar.baseName + '-' + execAppendix
+
 /**
- * Packages the Azkaban Executor Server
+ * Copies the Azkaban Executor Server files into its package directory.
  */
-task packageExec(type: Tar, dependsOn: [jar, 'createVersionFile']) {
-    appendix = 'exec-server'
-    String packageDir = 'build/package/' + baseName + '-' + appendix
-
-    println 'Creating Azkaban Executor Server Package into ' + packageDir
-    mkdir packageDir
-    mkdir packageDir + '/extlib'
-    mkdir packageDir + '/plugins'
+task copyExec(dependsOn: ['jar', 'createVersionFile']) << {
+    delete execPackageDir
+    println 'Creating Azkaban Executor Server Package into ' + execPackageDir
+    mkdir execPackageDir
+    mkdir execPackageDir + '/lib'
+    mkdir execPackageDir + '/extlib'
+    mkdir execPackageDir + '/plugins'
 
-    println 'Copying Exec server bin & conf'
     copy {
+        println 'Copying Exec server bin & conf'
         from('src/package/execserver')
-        into(packageDir)
+        into(execPackageDir)
     }
 
-    println 'Copying Azkaban lib'
     copy {
+        println 'Copying Azkaban lib '
         from('build/libs')
-        into(packageDir + '/lib')
+        into(execPackageDir + '/lib')
     }
 
-    println 'Copying dependency jars'
     copy {
-        into packageDir + '/lib'
+        println 'Copying dependency jars'
+        into execPackageDir + '/lib'
         from configurations.compile
     }
 
     copy {
-        into packageDir
+        into execPackageDir
         from 'build/package/version.file'
     }
+}
 
-    println 'Tarballing Web Package'
+/**
+ * Packages the Azkaban Executor Server
+ */
+task packageExec(type: Tar, dependsOn: 'copyExec') {
+    appendix = execAppendix
     extension = 'tar.gz'
     compression = Compression.GZIP
 
-    basedir = baseName + '-' + appendix + '-' + version
-    packageDir = 'build/package/' + baseName + '-' + appendix
-    println 'Source is in ' + packageDir
-
+    ext.basedir = baseName + '-' + appendix + '-' + version
     into(basedir) {
-        from packageDir
+        from execPackageDir
         exclude 'bin'
     }
 
-    dst_bin = basedir + '/bin'
-    src_bin = packageDir + '/bin'
+    ext.dst_bin = basedir + '/bin'
+    ext.src_bin = execPackageDir + '/bin'
     from(src_bin) {
         into dst_bin
         fileMode = 0755
     }
 }
 
+ext.webAppendix = 'web-server'
+ext.webPackageDir = 'build/package/' + jar.baseName + '-' + webAppendix
+
 /**
- * Packages the Azkaban Web Server
+ * Copies the Azkaban Web Server files into its package directory.
  */
-task packageWeb(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
-    appendix = 'web-server'
-    String packageDir = 'build/package/' + baseName + '-' + appendix
-
-    println 'Creating Azkaban Web Server Package into ' + packageDir
-    mkdir packageDir
-    mkdir packageDir + '/extlib'
-    mkdir packageDir + '/plugins'
+task copyWeb(dependsOn: ['jar', 'web', 'createVersionFile']) << {
+    println 'Creating Azkaban Web Server Package into ' + webPackageDir
+    delete webPackageDir
+    mkdir webPackageDir
+    mkdir webPackageDir + '/lib'
+    mkdir webPackageDir + '/extlib'
+    mkdir webPackageDir + '/plugins'
 
     println 'Copying Web server bin & conf'
     copy {
         from('src/package/webserver')
-        into(packageDir)
+        into(webPackageDir)
     }
 
-    println 'Copying Azkaban lib'
+    println 'Copying Azkaban lib '
     copy {
         from('build/libs')
-        into(packageDir + '/lib')
+        into(webPackageDir + '/lib')
     }
 
     println 'Copying web'
     copy {
         from('build/web')
-        into(packageDir + '/web')
+        into(webPackageDir + '/web')
     }
 
     println 'Copying dependency jars'
     copy {
-        into packageDir + '/lib'
+        into webPackageDir + '/lib'
         from configurations.compile
     }
 
     copy {
-        into packageDir
+        into webPackageDir
         from 'build/package/version.file'
     }
+}
 
-    println 'Tarballing Web Package'
+/**
+ * Packages the Azkaban Web Server
+ */
+task packageWeb(type: Tar, dependsOn: 'copyWeb') {
+    appendix = webAppendix
     extension = 'tar.gz'
     compression = Compression.GZIP
 
-    basedir = baseName + '-' + appendix + '-' + version
-    println 'Source is in ' + packageDir
+    ext.basedir = baseName + '-' + appendix + '-' + version
     into(basedir) {
-        from packageDir
+        from webPackageDir
         exclude 'bin'
     }
 
-    dst_bin = basedir + '/bin'
-    src_bin = packageDir + '/bin'
+    ext.dst_bin = basedir + '/bin'
+    ext.src_bin = webPackageDir + '/bin'
     from(src_bin) {
         into dst_bin
         fileMode = 0755
diff --git a/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index a6ee307..971f9c1 100644
--- a/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
 
 import org.apache.log4j.Logger;
 import org.joda.time.DateTimeZone;
+import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
@@ -86,6 +87,13 @@ public class AzkabanExecutorServer {
 		server = new Server(portNumber);
 		QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
 		server.setThreadPool(httpThreadPool);
+		
+		boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
+		logger.info("Setting up connector with stats on: " + isStatsOn);
+		
+		for (Connector connector : server.getConnectors()) {
+			connector.setStatsOn(isStatsOn);
+		}
 
 		Context root = new Context(server, "/", Context.SESSIONS);
 		root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
diff --git a/src/main/java/azkaban/execapp/ExecutorServlet.java b/src/main/java/azkaban/execapp/ExecutorServlet.java
index 75a1a08..cb8a211 100644
--- a/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -88,6 +88,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 				else if (action.equals(PING_ACTION)) {
 					respMap.put("status", "alive");
 				}
+				else if (action.equals(RELOAD_JOBTYPE_PLUGINS_ACTION)) {
+					logger.info("Reloading Jobtype plugins");
+					handleReloadJobTypePlugins(respMap);
+				}
 				else {
 					int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
 					String user = getParam(req, USER_PARAM, null);
@@ -337,6 +341,17 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		}
 	}
 	
+	private void handleReloadJobTypePlugins(Map<String, Object> respMap) throws ServletException {
+		try {
+			flowRunnerManager.reloadJobTypePlugins();
+			respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+		}
+		catch (Exception e) {
+			logger.error(e);
+			respMap.put(RESPONSE_ERROR, e.getMessage());
+		}
+	}
+	
 	@Override
 	public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		
diff --git a/src/main/java/azkaban/execapp/FlowRunner.java b/src/main/java/azkaban/execapp/FlowRunner.java
index e6db63e..0e1c198 100644
--- a/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/src/main/java/azkaban/execapp/FlowRunner.java
@@ -966,8 +966,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 		flow.setUpdateTime(System.currentTimeMillis());
 		flow.setEndTime(-1);
-		flow.setStartTime(maxStartTime);
-		
 		logger.info("Resetting flow '" + flow.getNestedId() + "' from " + oldFlowState + " to " + flow.getStatus());
 	}
 	
diff --git a/src/main/java/azkaban/execapp/FlowRunnerManager.java b/src/main/java/azkaban/execapp/FlowRunnerManager.java
index f886e64..a9e2059 100644
--- a/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -47,6 +47,7 @@ import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypeManagerException;
 
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
@@ -689,6 +690,7 @@ public class FlowRunnerManager implements EventListener {
 		return jobCount;
 	}
 
-	
-	
+	public void reloadJobTypePlugins() throws JobTypeManagerException {
+		jobtypeManager.loadPlugins();
+	}
 }
diff --git a/src/main/java/azkaban/executor/ConnectorParams.java b/src/main/java/azkaban/executor/ConnectorParams.java
index c84436e..3b3f00e 100644
--- a/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/src/main/java/azkaban/executor/ConnectorParams.java
@@ -32,6 +32,7 @@ public interface ConnectorParams {
 	public static final String LOG_ACTION = "log";
 	public static final String ATTACHMENTS_ACTION = "attachments";
 	public static final String METADATA_ACTION = "metadata";
+	public static final String RELOAD_JOBTYPE_PLUGINS_ACTION = "reloadJobTypePlugins";
 	
 	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
 	public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
diff --git a/src/main/java/azkaban/jobtype/JobTypeManager.java b/src/main/java/azkaban/jobtype/JobTypeManager.java
index 6c275af..df9627e 100644
--- a/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -28,20 +28,18 @@ import azkaban.utils.PropsUtils;
 import azkaban.utils.Utils;
 import azkaban.jobExecutor.utils.JobExecutionException;
 import java.io.File;
+import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.log4j.Logger;
 
 public class JobTypeManager 
 {
-	
-	private final String jobtypePluginDir;	// the dir for jobtype plugins
+	private final String jobTypePluginDir;	// the dir for jobtype plugins
 	private final ClassLoader parentLoader; 
 	
 	public static final String DEFAULT_JOBTYPEPLUGINDIR = "plugins/jobtypes";
@@ -51,27 +49,25 @@ public class JobTypeManager
 	private static final String COMMONSYSCONFFILE = "commonprivate.properties"; // common private properties for multiple plugins
 	private static final Logger logger = Logger.getLogger(JobTypeManager.class);
 	
-	private Map<String, Class<? extends Job>> jobToClass;
-	private Map<String, Props> jobtypeJobProps;
-	private Map<String, Props> jobtypeSysProps;
+	private JobTypePluginSet pluginSet;
 
-	public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader)
-	{
-		this.jobtypePluginDir = jobtypePluginDir;
+	public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader) {
+		this.jobTypePluginDir = jobtypePluginDir;
 		this.parentLoader = parentClassLoader;
 		
-		jobToClass = new HashMap<String, Class<? extends Job>>();
-		jobtypeJobProps = new HashMap<String, Props>();
-		jobtypeSysProps = new HashMap<String, Props>();
-		
-		loadDefaultTypes();
+		loadPlugins();
+	}
+
+	public void loadPlugins() throws JobTypeManagerException {
+		JobTypePluginSet plugins = new JobTypePluginSet();
 		
-		if(jobtypePluginDir != null) {
-			File pluginDir = new File(jobtypePluginDir);
+		loadDefaultTypes(plugins);
+		if (jobTypePluginDir != null) {
+			File pluginDir = new File(jobTypePluginDir);
 			if (pluginDir.exists()) {
-			logger.info("job type plugin directory set. Loading extra job types.");
+				logger.info("Job type plugin directory set. Loading extra job types from " + pluginDir);
 				try {
-					loadPluginJobTypes();
+					loadPluginJobTypes(plugins);
 				}
 				catch (Exception e) {
 					logger.info("Plugin jobtypes failed to load. " + e.getCause());
@@ -80,186 +76,180 @@ public class JobTypeManager
 			}
 		}
 		
+		// Swap the plugin set. If exception is thrown, then plugin isn't swapped.
+		synchronized (this) {
+			pluginSet = plugins;
+		}
 	}
-
-	private void loadDefaultTypes() throws JobTypeManagerException{
-		jobToClass.put("command", ProcessJob.class);
-		jobToClass.put("javaprocess", JavaProcessJob.class);
-		jobToClass.put("noop", NoopJob.class);
-		jobToClass.put("python", PythonJob.class);
-		jobToClass.put("ruby", RubyJob.class);
-		jobToClass.put("script", ScriptJob.class);	
+	
+	private void loadDefaultTypes(JobTypePluginSet plugins) throws JobTypeManagerException {
+		logger.info("Loading plugin default job types");
+		plugins.addPluginClass("command", ProcessJob.class);
+		plugins.addPluginClass("javaprocess", JavaProcessJob.class);
+		plugins.addPluginClass("noop", NoopJob.class);
+		plugins.addPluginClass("python", PythonJob.class);
+		plugins.addPluginClass("ruby", RubyJob.class);
+		plugins.addPluginClass("script", ScriptJob.class);	
 	}
 
 	// load Job Types from jobtype plugin dir
-	private void loadPluginJobTypes() throws JobTypeManagerException
-	{
-		File jobPluginsDir = new File(jobtypePluginDir);
+	private void loadPluginJobTypes(JobTypePluginSet plugins) throws JobTypeManagerException {
+		File jobPluginsDir = new File(jobTypePluginDir);
 		
 		if (!jobPluginsDir.exists()) {
+			logger.error("Job type plugin dir " + jobTypePluginDir + " doesn't exist. Will not load any external plugins.");
 			return;
 		}
-
-		if (!jobPluginsDir.isDirectory()) {
-			throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not a directory!");
+		else if (!jobPluginsDir.isDirectory()) {
+			throw new JobTypeManagerException("Job type plugin dir " + jobTypePluginDir + " is not a directory!");
 		}
-		
-		if (!jobPluginsDir.canRead()) {
-			throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not readable!");
+		else if (!jobPluginsDir.canRead()) {
+			throw new JobTypeManagerException("Job type plugin dir " + jobTypePluginDir + " is not readable!");
 		}
 		
-		// look for global conf
-		Props globalConf = null;
-		Props globalSysConf = null;
-		File confFile = findFilefromDir(jobPluginsDir, COMMONCONFFILE);
-		File sysConfFile = findFilefromDir(jobPluginsDir, COMMONSYSCONFFILE);
-		try {
-			if(confFile != null) {
-				globalConf = new Props(null, confFile);
+		// Load the common properties used by all jobs that are run
+		Props commonPluginJobProps = null;
+		File commonJobPropsFile = new File(jobPluginsDir, COMMONCONFFILE);
+		if (commonJobPropsFile.exists()) {
+			logger.info("Common plugin job props file " + commonJobPropsFile + " found. Attempt to load.");
+			try {
+				commonPluginJobProps = new Props(null, commonJobPropsFile);
 			}
-			else {
-				globalConf = new Props();
+			catch (IOException e) {
+				throw new JobTypeManagerException("Failed to load common plugin job properties" + e.getCause());
 			}
-			if(sysConfFile != null) {
-				globalSysConf = new Props(null, sysConfFile);
+		}
+		else {
+			logger.info("Common plugin job props file " + commonJobPropsFile + " not found. Using empty props.");
+			commonPluginJobProps = new Props();
+		}
+		
+		// Loads the common properties used by all plugins when loading
+		Props commonPluginLoadProps = null;
+		File commonLoadPropsFile = new File(jobPluginsDir, COMMONSYSCONFFILE);
+		if (commonLoadPropsFile.exists()) {
+			logger.info("Common plugin load props file " + commonLoadPropsFile + " found. Attempt to load.");
+			try {
+				commonPluginLoadProps = new Props(null, commonLoadPropsFile);
 			}
-			else {
-				globalSysConf = new Props();
+			catch (IOException e) {
+				throw new JobTypeManagerException("Failed to load common plugin loader properties" + e.getCause());
 			}
 		}
-		catch (Exception e) {
-			throw new JobTypeManagerException("Failed to get global jobtype properties" + e.getCause());
+		else {
+			logger.info("Common plugin load props file " + commonLoadPropsFile + " not found. Using empty props.");
+			commonPluginLoadProps = new Props();
 		}
 		
+		plugins.setCommonPluginJobProps(commonPluginJobProps);
+		plugins.setCommonPluginLoadProps(commonPluginLoadProps);
 		
-		synchronized (this) {
-			ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
-			try{
-				for(File dir : jobPluginsDir.listFiles()) {
-					if(dir.isDirectory() && dir.canRead()) {
-						// get its conf file
-						try {
-							loadJob(dir, globalConf, globalSysConf);
-							Thread.currentThread().setContextClassLoader(prevCl);
-						}
-						catch (Exception e) {
-							logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
-							throw new JobTypeManagerException(e);
-						}
-					}
+		// Loading job types
+		for (File dir : jobPluginsDir.listFiles()) {
+			if (dir.isDirectory() && dir.canRead()) {
+				try {
+					loadJobTypes(dir, plugins);
 				}
-			} catch(Exception e) {
-				e.printStackTrace();
-				throw new JobTypeManagerException(e);
-			} catch(Throwable t) {
-				t.printStackTrace();
-				throw new JobTypeManagerException(t);
-			} finally {
-				Thread.currentThread().setContextClassLoader(prevCl);
-			}
- 		}
-	}
-	
-	public static File findFilefromDir(File dir, String fn){
-		if(dir.isDirectory()) {
-			for(File f : dir.listFiles()) {
-				if(f.getName().equals(fn)) {
-					return f;
+				catch (Exception e) {
+					logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
+					throw new JobTypeManagerException(e);
 				}
 			}
 		}
-		return null;
 	}
 	
-//	private void loadJobType(File dir, Props globalConf, Props globalSysConf) throws JobTypeManagerException{
-//		
-//		// look for common conf
-//		Props conf = null;
-//		Props sysConf = null;
-//		File confFile = findFilefromDir(dir, COMMONCONFFILE);
-//		File sysConfFile = findFilefromDir(dir, COMMONSYSCONFFILE);
-//		
-//		try {
-//			if(confFile != null) {
-//				conf = new Props(globalConf, confFile);
-//			}
-//			else {
-//				conf = globalConf;
-//			}
-//			if(sysConfFile != null) {
-//				sysConf = new Props(globalSysConf, sysConfFile);
-//			}
-//			else {
-//				sysConf = globalSysConf;
-//			}
-//		}
-//		catch (Exception e) {
-//			throw new JobTypeManagerException("Failed to get common jobtype properties" + e.getCause());
-//		}
-//		
-//		// look for jobtypeConf.properties and load it 		
-//		for(File f: dir.listFiles()) {
-//			if(f.isFile() && f.getName().equals(JOBTYPESYSCONFFILE)) {
-//				loadJob(dir, f, conf, sysConf);
-//				return;
-//			}
-//		}
-//
-//		// no hit, keep looking
-//		for(File f : dir.listFiles()) {
-//			if(f.isDirectory() && f.canRead())
-//				loadJobType(f, conf, sysConf);
-//		}
-//		
-//	}
-	
 	@SuppressWarnings("unchecked")
-	private void loadJob(File dir, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
+	private void loadJobTypes(File pluginDir, JobTypePluginSet plugins) throws JobTypeManagerException {
+		// Directory is the jobtypeName
+		String jobTypeName = pluginDir.getName();
+		logger.info("Loading plugin " + jobTypeName);
+		
+		Props pluginJobProps = null;
+		Props pluginLoadProps = null;
 		
-		Props conf = null;
-		Props sysConf = null;
-		File confFile = findFilefromDir(dir, JOBTYPECONFFILE);		
-		File sysConfFile = findFilefromDir(dir, JOBTYPESYSCONFFILE);
-		if(sysConfFile == null) {
-			logger.info("No job type found in " + dir.getAbsolutePath());
+		File pluginJobPropsFile = new File(pluginDir, JOBTYPECONFFILE);
+		File pluginLoadPropsFile = new File(pluginDir, JOBTYPESYSCONFFILE);
+
+		if (!pluginLoadPropsFile.exists()) {
+			logger.info("Plugin load props file " + pluginLoadPropsFile + " not found.");
 			return;
 		}
 		
 		try {
-			if(confFile != null) {
-				conf = new Props(commonConf, confFile);
+			Props commonPluginJobProps = plugins.getCommonPluginJobProps();
+			Props commonPluginLoadProps = plugins.getCommonPluginLoadProps();
+			if (pluginJobPropsFile.exists()) {
+				pluginJobProps = new Props(commonPluginJobProps, pluginJobPropsFile);
 			}
 			else {
-				conf = new Props(commonConf);
+				pluginJobProps = new Props(commonPluginJobProps);
 			}
 			
-			sysConf = new Props(commonSysConf, sysConfFile);
-			sysConf = PropsUtils.resolveProps(sysConf);
-
+			pluginLoadProps = new Props(commonPluginLoadProps, pluginLoadPropsFile);
+			pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
 		}
 		catch (Exception e) {
 			throw new JobTypeManagerException("Failed to get jobtype properties" + e.getMessage());
 		}
-		sysConf.put("plugin.dir", dir.getAbsolutePath());
+		// Add properties into the plugin set
+		pluginLoadProps.put("plugin.dir", pluginDir.getAbsolutePath());	
+		plugins.addPluginLoadProps(jobTypeName, pluginLoadProps);
+		if (pluginJobProps != null) {
+			plugins.addPluginJobProps(jobTypeName, pluginJobProps);
+		}
 		
-		// use directory name as job type name
-		String jobtypeName = dir.getName();
+		ClassLoader jobTypeLoader = loadJobTypeClassLoader(pluginDir, jobTypeName, plugins);
+		String jobtypeClass = pluginLoadProps.get("jobtype.class");
 		
-		String jobtypeClass = sysConf.get("jobtype.class");
+		Class<? extends Job> clazz = null;
+		try {
+			clazz = (Class<? extends Job>)jobTypeLoader.loadClass(jobtypeClass);
+			plugins.addPluginClass(jobTypeName, clazz);
+		}
+		catch (ClassNotFoundException e) {
+			throw new JobTypeManagerException(e);
+		}
 		
-		logger.info("Loading jobtype " + jobtypeName );
-
+		logger.info("Verifying job plugin " + jobTypeName);
+		try {
+			Props fakeSysProps = new Props(pluginLoadProps);
+			Props fakeJobProps = new Props(pluginJobProps);
+			@SuppressWarnings("unused")
+			Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
+		}
+		catch (Exception e) {
+			logger.info("Jobtype " + jobTypeName + " failed test!", e);
+			throw new JobExecutionException(e);
+		}
+		catch (Throwable t) {
+			logger.info("Jobtype " + jobTypeName + " failed test!", t);
+			throw new JobExecutionException(t);
+		}
+		
+		logger.info("Loaded jobtype " + jobTypeName + " " + jobtypeClass);
+	}
+	
+	/**
+	 * Creates and loads all plugin resources (jars) into a ClassLoader
+	 * 
+	 * @param pluginDir
+	 * @param jobTypeName
+	 * @param plugins
+	 * @return
+	 */
+	private ClassLoader loadJobTypeClassLoader(File pluginDir, String jobTypeName, JobTypePluginSet plugins) {
 		// sysconf says what jars/confs to load
-		List<URL> resources = new ArrayList<URL>();		
+		List<URL> resources = new ArrayList<URL>();
+		Props pluginLoadProps = plugins.getPluginLoaderProps(jobTypeName);
 		
 		try {
 			//first global classpath
-			logger.info("Adding global resources.");
-			List<String> typeGlobalClassPath = sysConf.getStringList("jobtype.global.classpath", null, ",");
-			if(typeGlobalClassPath != null) {
-				for(String jar : typeGlobalClassPath) {
+			logger.info("Adding global resources for " + jobTypeName);
+			List<String> typeGlobalClassPath = pluginLoadProps.getStringList("jobtype.global.classpath", null, ",");
+			if (typeGlobalClassPath != null) {
+				for (String jar : typeGlobalClassPath) {
 					URL cpItem = new File(jar).toURI().toURL();
-					if(!resources.contains(cpItem)) {
+					if (!resources.contains(cpItem)) {
 						logger.info("adding to classpath " + cpItem);
 						resources.add(cpItem);
 					}
@@ -268,78 +258,51 @@ public class JobTypeManager
 			
 			//type specific classpath
 			logger.info("Adding type resources.");
-			List<String> typeClassPath = sysConf.getStringList("jobtype.classpath", null, ",");
-			if(typeClassPath != null) {
-				for(String jar : typeClassPath) {
+			List<String> typeClassPath = pluginLoadProps.getStringList("jobtype.classpath", null, ",");
+			if (typeClassPath != null) {
+				for (String jar : typeClassPath) {
 					URL cpItem = new File(jar).toURI().toURL();
-					if(!resources.contains(cpItem)) {
+					if (!resources.contains(cpItem)) {
 						logger.info("adding to classpath " + cpItem);
 						resources.add(cpItem);
 					}
 				}
 			}			
-			List<String> jobtypeLibDirs = sysConf.getStringList("jobtype.lib.dir", null, ",");
-			if(jobtypeLibDirs != null) {
-				for(String libDir : jobtypeLibDirs) {
-					for(File f : new File(libDir).listFiles()) {
-						if(f.getName().endsWith(".jar")) {
-								resources.add(f.toURI().toURL());
-								logger.info("adding to classpath " + f.toURI().toURL());
+			List<String> jobtypeLibDirs = pluginLoadProps.getStringList("jobtype.lib.dir", null, ",");
+			if (jobtypeLibDirs != null) {
+				for (String libDir : jobtypeLibDirs) {
+					for (File f : new File(libDir).listFiles()) {
+						if (f.getName().endsWith(".jar")) {
+							resources.add(f.toURI().toURL());
+							logger.info("adding to classpath " + f.toURI().toURL());
 						}
 					}
 				}
 			}
 			
 			logger.info("Adding type override resources.");
-			for(File f : dir.listFiles()) {
-				if(f.getName().endsWith(".jar")) {
-						resources.add(f.toURI().toURL());
-						logger.info("adding to classpath " + f.toURI().toURL());
+			for (File f : pluginDir.listFiles()) {
+				if (f.getName().endsWith(".jar")) {
+					resources.add(f.toURI().toURL());
+					logger.info("adding to classpath " + f.toURI().toURL());
 				}
 			}
 			
-		} catch (MalformedURLException e) {
+		}
+		catch (MalformedURLException e) {
 			throw new JobTypeManagerException(e);
 		}
 		
 		// each job type can have a different class loader
 		ClassLoader jobTypeLoader = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentLoader);
-		
-		Class<? extends Job> clazz = null;
-		try {
-			clazz = (Class<? extends Job>)jobTypeLoader.loadClass(jobtypeClass);
-			jobToClass.put(jobtypeName, clazz);
-		}
-		catch (ClassNotFoundException e) {
-			throw new JobTypeManagerException(e);
-		}
-		
-		logger.info("Doing simple testing...");
-		try {
-			Props fakeSysProps = new Props(sysConf);
-//			fakeSysProps.put("type", jobtypeName);
-			Props fakeJobProps = new Props(conf);
-			@SuppressWarnings("unused")
-			Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
-		}
-		catch (Exception e) {
-			logger.info("Jobtype " + jobtypeName + " failed test!", e);
-			throw new JobExecutionException(e);
-		}
-		catch (Throwable t) {
-			logger.info("Jobtype " + jobtypeName + " failed test!", t);
-			throw new JobExecutionException(t);
-		}
-		
-		logger.info("Loaded jobtype " + jobtypeName + " " + jobtypeClass);
-		
-		if(conf != null) jobtypeJobProps.put(jobtypeName, conf);
-		jobtypeSysProps.put(jobtypeName, sysConf);
-		
+		return jobTypeLoader;
 	}
 	
-	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) 
-			throws JobTypeManagerException {
+	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException {
+		// This is final because during build phase, you should never need to swap
+		// the pluginSet for safety reasons
+		final JobTypePluginSet pluginSet = getJobTypePluginSet();
+		
 		Job job = null;
 		try {
 			String jobType = jobProps.getString("type");
@@ -352,44 +315,36 @@ public class JobTypeManager
 			
 			logger.info("Building " + jobType + " job executor. ");
 			
-			Class<? extends Object> executorClass = jobToClass.get(jobType);
-
+			Class<? extends Object> executorClass = pluginSet.getPluginClass(jobType);
 			if (executorClass == null) {
 				throw new JobExecutionException(
 						String.format("Job type '" + jobType + "' is unrecognized. Could not construct job[%s] of type[%s].", jobProps, jobType));
 			}
 			
-			Props sysConf = jobtypeSysProps.get(jobType);
-			
-			Props jobConf = jobProps;
-			if (jobtypeJobProps.containsKey(jobType)) {
-				Props p = jobtypeJobProps.get(jobType);
-				for (String k : p.getKeySet()) {
-					if (!jobConf.containsKey(k)) {
-						jobConf.put(k, p.get(k));
+			Props pluginJobProps = pluginSet.getPluginJobProps(jobType);
+			if (pluginJobProps != null) {
+				for (String k : pluginJobProps.getKeySet()) {
+					if (!jobProps.containsKey(k)) {
+						jobProps.put(k, pluginJobProps.get(k));
 					}
 				}
 			}
-			jobConf = PropsUtils.resolveProps(jobConf);
+			jobProps = PropsUtils.resolveProps(jobProps);
 			
-			if (sysConf != null) {
-				sysConf = PropsUtils.resolveProps(sysConf);
+			Props pluginLoadProps = pluginSet.getPluginLoaderProps(jobType);
+			if (pluginLoadProps != null) {
+				pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
 			}
 			else {
-				sysConf = new Props();
+				pluginLoadProps = new Props();
 			}
 			
-//			logger.info("sysConf is " + sysConf);
-//			logger.info("jobConf is " + jobConf);
-//			
 			job = (Job) Utils.callConstructor(
-					executorClass, jobId, sysConf, jobConf, logger);
+					executorClass, jobId, pluginLoadProps, jobProps, logger);
 		}
 		catch (Exception e) {
-			//job = new InitErrorJob(jobId, e);
 			logger.error("Failed to build job executor for job " + jobId + e.getMessage());
 			throw new JobTypeManagerException("Failed to build job executor for job " + jobId, e);
-			//throw new JobTypeManagerException(e);
 		}
 		catch (Throwable t) {
 			logger.error("Failed to build job executor for job " + jobId + t.getMessage(), t);
@@ -398,9 +353,12 @@ public class JobTypeManager
 
 		return job;
 	}
-
-	public void registerJobType(String typeName, Class<? extends Job> jobTypeClass) {
-		jobToClass.put(typeName, jobTypeClass);
+	
+	/**
+	 * Public for test reasons. Will need to move tests to the same package
+	 */
+	public synchronized JobTypePluginSet getJobTypePluginSet() {
+		return this.pluginSet;
 	}
 }
 
diff --git a/src/main/java/azkaban/jobtype/JobTypePluginSet.java b/src/main/java/azkaban/jobtype/JobTypePluginSet.java
new file mode 100644
index 0000000..9e6ded2
--- /dev/null
+++ b/src/main/java/azkaban/jobtype/JobTypePluginSet.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobtype;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.jobExecutor.Job;
+import azkaban.utils.Props;
+
+/**
+ * Container for job type plugins
+ * 
+ * This contains the jobClass objects, the properties for loading plugins, and the
+ * properties given by default to the plugin.
+ * 
+ * This class is not thread safe, so adding to this class should only be populated 
+ * and controlled by the JobTypeManager
+ */
+public class JobTypePluginSet {
+	private Map<String, Class<? extends Job>> jobToClass;
+	private Map<String, Props> pluginJobPropsMap;
+	private Map<String, Props> pluginLoadPropsMap;
+	
+	private Props commonJobProps;
+	private Props commonLoadProps;
+	
+	/**
+	 * Base constructor
+	 */
+	public JobTypePluginSet() {
+		jobToClass = new HashMap<String, Class<? extends Job>>();
+		pluginJobPropsMap = new HashMap<String, Props>();
+		pluginLoadPropsMap = new HashMap<String, Props>();
+	}
+	
+	/**
+	 * Copy constructor
+	 * @param clone
+	 */
+	public JobTypePluginSet(JobTypePluginSet clone) {
+		jobToClass = new HashMap<String, Class<? extends Job>>(clone.jobToClass);
+		pluginJobPropsMap = new HashMap<String, Props>(clone.pluginJobPropsMap);
+		pluginLoadPropsMap = new HashMap<String, Props>(clone.pluginLoadPropsMap);
+		commonJobProps = clone.commonJobProps;
+		commonLoadProps = clone.commonLoadProps;
+	}
+	
+	/**
+	 * Sets the common properties shared in every jobtype
+	 * @param commonJobProps
+	 */
+	public void setCommonPluginJobProps(Props commonJobProps) {
+		this.commonJobProps = commonJobProps;
+	}
+	
+	/**
+	 * Sets the common properties used to load every plugin
+	 * @param commonLoadProps
+	 */
+	public void setCommonPluginLoadProps(Props commonLoadProps) {
+		this.commonLoadProps = commonLoadProps;
+	}
+	
+	/**
+	 * Gets common properties for every jobtype
+	 * @return
+	 */
+	public Props getCommonPluginJobProps() {
+		return commonJobProps;
+	}
+	
+	/**
+	 * Gets the common properties used to load a plugin
+	 * @return
+	 */
+	public Props getCommonPluginLoadProps() {
+		return commonLoadProps;
+	}
+	
+	/**
+	 * Get the properties for a jobtype used to setup and load a plugin
+	 * 
+	 * @param jobTypeName
+	 * @return
+	 */
+	public Props getPluginLoaderProps(String jobTypeName) {
+		return pluginLoadPropsMap.get(jobTypeName);
+	}
+	
+	/**
+	 * Get the properties that will be given to the plugin as default job
+	 * properties.
+	 * 
+	 * @param jobTypeName
+	 * @return
+	 */
+	public Props getPluginJobProps(String jobTypeName) {
+		return pluginJobPropsMap.get(jobTypeName);
+	}
+	
+	/**
+	 * Gets the plugin job runner class
+	 * 
+	 * @param jobTypeName
+	 * @return
+	 */
+	public Class<? extends Job> getPluginClass(String jobTypeName) {
+		return jobToClass.get(jobTypeName);
+	}
+	
+	/**
+	 * Adds plugin jobtype class
+	 */
+	public void addPluginClass(String jobTypeName, Class<? extends Job> jobTypeClass) {
+		jobToClass.put(jobTypeName, jobTypeClass);
+	}
+	
+	/**
+	 * Adds plugin job properties used as default runtime properties
+	 */
+	public void addPluginJobProps(String jobTypeName, Props props) {
+		pluginJobPropsMap.put(jobTypeName, props);
+	}
+	
+	/**
+	 * Adds plugin load properties used to load the plugin
+	 */
+	public void addPluginLoadProps(String jobTypeName, Props props) {
+		pluginLoadPropsMap.put(jobTypeName, props);
+	}
+}
\ No newline at end of file
diff --git a/src/main/java/azkaban/user/UserManager.java b/src/main/java/azkaban/user/UserManager.java
index 071b307..253e817 100644
--- a/src/main/java/azkaban/user/UserManager.java
+++ b/src/main/java/azkaban/user/UserManager.java
@@ -34,7 +34,7 @@ public interface UserManager {
 	 * @throws UserManagerException If the username/password combination doesn't exist.
 	 */
 	public User getUser(String username, String password) throws UserManagerException;
-	
+
 	/**
 	 * Returns true if the user is valid. This is used when adding permissions for users
 	 * 
diff --git a/src/main/java/azkaban/webapp/AzkabanWebServer.java b/src/main/java/azkaban/webapp/AzkabanWebServer.java
index ebb0aa0..a0cacc8 100644
--- a/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -42,6 +42,7 @@ import org.apache.velocity.runtime.log.Log4JLogChute;
 import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
 import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
 import org.joda.time.DateTimeZone;
+import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.bio.SocketConnector;
 import org.mortbay.jetty.security.SslSocketConnector;
@@ -50,6 +51,8 @@ import org.mortbay.jetty.servlet.DefaultServlet;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
+import com.linkedin.restli.server.RestliServlet;
+
 import azkaban.alert.Alerter;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
@@ -59,7 +62,6 @@ import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxTriggerManager;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectManager;
-
 import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.TriggerBasedScheduleLoader;
@@ -82,7 +84,6 @@ import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.Utils;
 import azkaban.webapp.servlet.AzkabanServletContextListener;
-
 import azkaban.webapp.servlet.AbstractAzkabanServlet;
 import azkaban.webapp.servlet.ExecutorServlet;
 import azkaban.webapp.servlet.IndexRedirectServlet;
@@ -158,6 +159,10 @@ public class AzkabanWebServer extends AzkabanServer {
 	private MBeanServer mbeanServer;
 	private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
 
+	public static AzkabanWebServer getInstance() {
+		return app;
+	}
+
 	/**
 	 * Constructor usually called by tomcat AzkabanServletContext to create the
 	 * initial server
@@ -684,7 +689,9 @@ public class AzkabanWebServer extends AzkabanServer {
 		}
 
 		int maxThreads = azkabanSettings.getInt("jetty.maxThreads", DEFAULT_THREAD_NUMBER);
-
+		boolean isStatsOn = azkabanSettings.getBoolean("jetty.connector.stats", true);
+		logger.info("Setting up connector with stats on: " + isStatsOn);
+		
 		boolean ssl;
 		int port;
 		final Server server = new Server();
@@ -714,6 +721,11 @@ public class AzkabanWebServer extends AzkabanServer {
 			server.addConnector(connector);
 		}
 		
+		// setting stats configuration for connectors
+		for (Connector connector : server.getConnectors()) {
+			connector.setStatsOn(isStatsOn);
+		}
+		
 		String hostname = azkabanSettings.getString("jetty.hostname", "localhost");
 		azkabanSettings.put("server.hostname", hostname);
 		azkabanSettings.put("server.port", port);
@@ -762,6 +774,10 @@ public class AzkabanWebServer extends AzkabanServer {
 		root.addServlet(new ServletHolder(new ScheduleServlet()),"/schedule");
 		root.addServlet(new ServletHolder(new JMXHttpServlet()),"/jmx");
 		root.addServlet(new ServletHolder(new TriggerManagerServlet()),"/triggers");
+
+		ServletHolder restliHolder = new ServletHolder(new RestliServlet());
+		restliHolder.setInitParameter("resourcePackages", "azkaban.restli");
+		root.addServlet(restliHolder, "/restli/*");
 		
 		String viewerPluginDir = azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
 		loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine());
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index 1f3b462..18012ee 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -21,3 +21,6 @@ executor.maxThreads=50
 executor.port=12321
 executor.flow.threads=30
 
+# JMX stats
+jetty.connector.stats=true
+executor.connector.stats=true
\ No newline at end of file
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index 7524a14..86afc73 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -41,3 +41,8 @@ job.failure.email=
 job.success.email=
 
 lockdown.create.projects=false
+
+
+# JMX stats
+jetty.connector.stats=true
+executor.connector.stats=true
\ No newline at end of file
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index 3ccb2f3..a7c3dfb 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -47,3 +47,7 @@ job.success.email=
 lockdown.create.projects=false
 
 cache.directory=cache
+
+# JMX stats
+jetty.connector.stats=true
+executor.connector.stats=true
\ No newline at end of file
diff --git a/src/restli/.gitignore b/src/restli/.gitignore
new file mode 100644
index 0000000..a6ad54c
--- /dev/null
+++ b/src/restli/.gitignore
@@ -0,0 +1,2 @@
+generatedJava
+generatedRestSpec
diff --git a/src/restli/java/azkaban/restli/ProjectManagerResource.java b/src/restli/java/azkaban/restli/ProjectManagerResource.java
new file mode 100644
index 0000000..faebbc8
--- /dev/null
+++ b/src/restli/java/azkaban/restli/ProjectManagerResource.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.restli;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.servlet.ServletException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.user.Permission;
+import azkaban.user.User;
+import azkaban.user.UserManagerException;
+import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanWebServer;
+
+import com.linkedin.restli.server.annotations.Action;
+import com.linkedin.restli.server.annotations.ActionParam;
+import com.linkedin.restli.server.annotations.RestLiActions;
+import com.linkedin.restli.server.resources.ResourceContextHolder;
+
+@RestLiActions(name = "project", namespace = "azkaban.restli")
+public class ProjectManagerResource extends ResourceContextHolder {
+	private static final Logger logger = Logger.getLogger(ProjectManagerResource.class);
+	
+	public AzkabanWebServer getAzkaban() {
+		return AzkabanWebServer.getInstance();
+	}
+	
+	@Action(name = "deploy")
+	public String deploy(
+			@ActionParam("sessionId") String sessionId,
+			@ActionParam("projectName") String projectName,
+			@ActionParam("packageUrl") String packageUrl)
+			throws ProjectManagerException, UserManagerException, ServletException, IOException {
+		logger.info("Deploy called. {sessionId: " + sessionId +
+				", projectName: " + projectName + 
+				", packageUrl:" + packageUrl + "}");
+		
+		String ip = (String)this.getContext().getRawRequestContext().getLocalAttr("REMOTE_ADDR");
+		User user = ResourceUtils.getUserFromSessionId(sessionId, ip);
+		ProjectManager projectManager = getAzkaban().getProjectManager();
+		Project project = projectManager.getProject(projectName);
+		if (project == null) {
+			throw new ProjectManagerException("Project '" + projectName + "' not found.");
+		}
+		
+		if (!ResourceUtils.hasPermission(project, user, Permission.Type.WRITE)) {
+			String errorMsg = "User " + user.getUserId() + " has no permission to write to project " + project.getName();
+			logger.error(errorMsg);
+			throw new ProjectManagerException(errorMsg);
+		}
+
+		// Deploy stuff here. Move the code to a more formal area later.
+		logger.info("Downloading file from " + packageUrl);
+		URL url = null;
+		InputStream urlFileInputStream = null;
+		try {
+			url = new URL(packageUrl);
+			InputStream in = url.openStream();
+			urlFileInputStream = new BufferedInputStream(in);
+		} catch (MalformedURLException e) {
+			String errorMsg = "Url " + packageUrl + " is malformed.";
+			logger.error(errorMsg, e);
+			throw new ProjectManagerException(errorMsg, e);
+		} catch (IOException e) {
+			String errorMsg = "Error opening input stream to " + packageUrl;
+			logger.error(errorMsg, e);
+			throw new ProjectManagerException(errorMsg, e);
+		}
+		
+		String filename = getFileName(url.getFile());
+
+		File tempDir = Utils.createTempDir();
+		OutputStream fileOutputStream = null;
+		try {
+			logger.error("Downloading " + filename);
+			File archiveFile = new File(tempDir, filename);
+			fileOutputStream = new BufferedOutputStream(new FileOutputStream(archiveFile));
+			IOUtils.copy(urlFileInputStream, fileOutputStream);
+			
+			logger.error("Downloaded to " + archiveFile.toString() + " " + archiveFile.length() + " bytes.");
+			projectManager.uploadProject(project, archiveFile, "zip", user);
+		} catch (Exception e) {
+			logger.info("Installation Failed.", e);
+			String error = e.getMessage();
+			if (error.length() > 512) {
+				error = error.substring(0, 512) + "\nToo many errors to display.\n";
+			}
+			
+			throw new ProjectManagerException("Installation failed: " + error);
+		}
+		finally {
+			if (tempDir.exists()) {
+				FileUtils.deleteDirectory(tempDir);
+			}
+			if (urlFileInputStream != null) {
+				urlFileInputStream.close();
+			}
+			if (fileOutputStream != null) {
+				fileOutputStream.close();
+			}
+		}
+		
+		return Integer.toString(project.getVersion());
+	}
+
+	private String getFileName(String file) {
+		return file.substring(file.lastIndexOf("/") + 1);
+	}
+}
diff --git a/src/restli/java/azkaban/restli/ResourceUtils.java b/src/restli/java/azkaban/restli/ResourceUtils.java
new file mode 100644
index 0000000..18229cf
--- /dev/null
+++ b/src/restli/java/azkaban/restli/ResourceUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.restli;
+
+import azkaban.project.Project;
+import azkaban.user.Permission;
+import azkaban.user.Role;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.UserManagerException;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.session.Session;
+
+public class ResourceUtils {
+	
+	public static boolean hasPermission(Project project, User user, Permission.Type type) {
+		UserManager userManager = AzkabanWebServer.getInstance().getUserManager();
+		if (project.hasPermission(user, type)) {
+			return true;
+		}
+		
+		for (String roleName: user.getRoles()) {
+			Role role = userManager.getRole(roleName);
+			if (role.getPermission().isPermissionSet(type) || 
+				role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+				return true;
+			}
+		}
+		
+		return false;
+	}
+	
+	public static User getUserFromSessionId(String sessionId, String ip) throws UserManagerException {
+		Session session = AzkabanWebServer.getInstance().getSessionCache().getSession(sessionId);
+		if (session == null) {
+			throw new UserManagerException("Invalid session. Login required");
+		}
+		else if (!session.getIp().equals(ip)) {
+			throw new UserManagerException("Invalid session. Session expired.");
+		}
+		
+		return session.getUser();
+	}
+}
diff --git a/src/restli/java/azkaban/restli/UserManagerResource.java b/src/restli/java/azkaban/restli/UserManagerResource.java
new file mode 100644
index 0000000..1a22263
--- /dev/null
+++ b/src/restli/java/azkaban/restli/UserManagerResource.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.restli;
+
+import java.util.UUID;
+import javax.servlet.ServletException;
+import org.apache.log4j.Logger;
+
+import azkaban.restli.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.UserManagerException;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.session.Session;
+
+import com.linkedin.restli.server.annotations.Action;
+import com.linkedin.restli.server.annotations.ActionParam;
+import com.linkedin.restli.server.annotations.RestLiActions;
+import com.linkedin.restli.server.resources.ResourceContextHolder;
+
+
+@RestLiActions(name = "user", namespace = "azkaban.restli")
+public class UserManagerResource extends ResourceContextHolder {
+	private static final Logger logger = Logger.getLogger(UserManagerResource.class);
+	
+	public AzkabanWebServer getAzkaban() {
+		return AzkabanWebServer.getInstance();
+	}
+	
+	@Action(name = "login")
+	public String login(
+			@ActionParam("username") String username,
+			@ActionParam("password") String password)
+			throws UserManagerException, ServletException {
+		String ip = (String)this.getContext().getRawRequestContext().getLocalAttr("REMOTE_ADDR");
+		logger.info("Attempting to login for " + username + " from ip '" + ip + "'");
+		
+		Session session = createSession(username, password, ip);
+		
+		logger.info("Session id " + session.getSessionId() + " created for user '" + username + "' and ip " + ip);
+		return session.getSessionId();
+	}
+
+	@Action(name = "getUserFromSessionId")
+	public User getUserFromSessionId(@ActionParam("sessionId") String sessionId) {
+		String ip = (String)this.getContext().getRawRequestContext().getLocalAttr("REMOTE_ADDR");
+		Session session = getSessionFromSessionId(sessionId, ip);
+		azkaban.user.User azUser = session.getUser();
+
+		// Fill out the restli object with properties from the Azkaban user
+		User user = new User();
+		user.setUserId(azUser.getUserId());
+		user.setEmail(azUser.getEmail());
+		return user;
+	}
+
+	private Session createSession(String username, String password, String ip)
+			throws UserManagerException, ServletException {
+		UserManager manager = getAzkaban().getUserManager();
+		azkaban.user.User user = manager.getUser(username, password);
+
+		String randomUID = UUID.randomUUID().toString();
+		Session session = new Session(randomUID, user, ip);
+		getAzkaban().getSessionCache().addSession(session);
+		
+		return session;
+	}
+
+	private Session getSessionFromSessionId(String sessionId, String remoteIp) {
+		if (sessionId == null) {
+			return null;
+		}
+
+		Session session = getAzkaban().getSessionCache().getSession(sessionId);
+		// Check if the IP's are equal. If not, we invalidate the sesson.
+		if (session == null || !remoteIp.equals(session.getIp())) {
+			return null;
+		}
+
+		return session;
+	}
+}
\ No newline at end of file
diff --git a/src/restli/schemas/azkaban/restli/user/User.pdsc b/src/restli/schemas/azkaban/restli/user/User.pdsc
new file mode 100644
index 0000000..156b380
--- /dev/null
+++ b/src/restli/schemas/azkaban/restli/user/User.pdsc
@@ -0,0 +1,10 @@
+{
+  "type": "record",
+  "name": "User",
+  "namespace": "azkaban.restli.user",
+  "doc": "Azkaban User restli info",
+  "fields": [
+    {"name": "userId", "type": "string","doc": "The username this session"},
+    {"name": "email", "type": "string","doc": "User email"}
+  ]
+}
\ No newline at end of file
diff --git a/src/web/js/azkaban/view/flow-execution-list.js b/src/web/js/azkaban/view/flow-execution-list.js
index 5a844a7..f99fc61 100644
--- a/src/web/js/azkaban/view/flow-execution-list.js
+++ b/src/web/js/azkaban/view/flow-execution-list.js
@@ -145,7 +145,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 						$(attemptBox).bind("contextmenu", attemptRightClick);
 						
 						$(progressBar).before(attemptBox);
-						attemptBox.job = nodeId;
+						attemptBox.job = node.id;
 						attemptBox.attempt = a;
 					}
 				}
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index c87f2c6..cc3b1e5 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -37,7 +37,7 @@ public class LocalFlowWatcherTest {
 	@Before
 	public void setUp() throws Exception {
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
+		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 	}
 	
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index 45c3a85..d8e94dd 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -38,7 +38,7 @@ public class RemoteFlowWatcherTest {
 	@Before
 	public void setUp() throws Exception {
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
+		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 	}
 	
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index aa1eee5..df9cac1 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -23,6 +23,7 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
@@ -73,8 +74,10 @@ public class FlowRunnerPipelineTest {
 		}
 		workingDir.mkdirs();
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
-		jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+		
+		pluginSet.addPluginClass("java", JavaJob.class);
+		pluginSet.addPluginClass("test", InteractiveTestJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 		fakeExecutorLoader = new MockExecutorLoader();
 		project = new Project(1, "testProject");
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 21b43f1..2e708fe 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -21,8 +21,10 @@ import azkaban.executor.Status;
 
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.test.execapp.MockProjectLoader;
 import azkaban.test.executor.InteractiveTestJob;
 import azkaban.test.executor.JavaJob;
 import azkaban.utils.JSONUtils;
@@ -46,8 +48,9 @@ public class FlowRunnerTest {
 			workingDir.mkdirs();
 		}
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
-		jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+		pluginSet.addPluginClass("java", JavaJob.class);
+		pluginSet.addPluginClass("test", InteractiveTestJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 		
 		InteractiveTestJob.clearTestJobs();
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 7a67da6..0c9d7b9 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -21,6 +21,7 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
@@ -92,8 +93,10 @@ public class FlowRunnerTest2 {
 		}
 		workingDir.mkdirs();
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
-		jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+		JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+		
+		pluginSet.addPluginClass("java", JavaJob.class);
+		pluginSet.addPluginClass("test", InteractiveTestJob.class);
 		fakeProjectLoader = new MockProjectLoader(workingDir);
 		fakeExecutorLoader = new MockExecutorLoader();
 		project = new Project(1, "testProject");
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 69d6296..6fcbb11 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -42,7 +42,8 @@ public class JobRunnerTest {
 		}
 		workingDir.mkdirs();
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
-		jobtypeManager.registerJobType("java", JavaJob.class);
+		
+		jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
 	}
 
 	@After
diff --git a/unit/java/azkaban/test/jobtype/FakeJavaJob.java b/unit/java/azkaban/test/jobtype/FakeJavaJob.java
new file mode 100644
index 0000000..65ca1ba
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/FakeJavaJob.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import org.apache.log4j.Logger;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.utils.Props;
+
+public class FakeJavaJob extends JavaProcessJob {
+	public FakeJavaJob(String jobid, Props sysProps, Props jobProps, Logger log) {
+		super(jobid, sysProps, jobProps, log);
+	}
+}
+
diff --git a/unit/java/azkaban/test/jobtype/FakeJavaJob2.java b/unit/java/azkaban/test/jobtype/FakeJavaJob2.java
new file mode 100644
index 0000000..581aeff
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/FakeJavaJob2.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import org.apache.log4j.Logger;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.utils.Props;
+
+public class FakeJavaJob2 extends JavaProcessJob {
+	public FakeJavaJob2(String jobid, Props sysProps, Props jobProps, Logger log) {
+		super(jobid, sysProps, jobProps, log);
+	}
+}
+
diff --git a/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
new file mode 100644
index 0000000..a4c5cb0
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.jobExecutor.Job;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
+import azkaban.utils.Props;
+
+
+/**
+ * Test the flow run, especially with embedded flows.
+ * Files are in unit/plugins/jobtypes
+ * 
+ */
+public class JobTypeManagerTest {
+	public static String TEST_PLUGIN_DIR = "jobtypes_test";
+	private Logger logger = Logger.getLogger(JobTypeManagerTest.class);
+	private JobTypeManager manager;
+	
+	public JobTypeManagerTest() {
+	}
+	
+	@Before
+	public void setUp() throws Exception {
+		File jobTypeDir = new File(TEST_PLUGIN_DIR);
+		jobTypeDir.mkdirs();
+		
+		FileUtils.copyDirectory(new File("unit/plugins/jobtypes"), jobTypeDir);
+		manager = new JobTypeManager(TEST_PLUGIN_DIR, this.getClass().getClassLoader());
+	}
+	
+	@After
+	public void tearDown() throws IOException {
+		FileUtils.deleteDirectory(new File(TEST_PLUGIN_DIR));
+	}
+	
+	/**
+	 * Tests that the common and common private properties are loaded correctly
+	 * @throws Exception
+	 */
+	@Test
+	public void testCommonPluginProps() throws Exception {
+		JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+		
+		Props props = pluginSet.getCommonPluginJobProps();
+		System.out.println(props.toString());
+		assertEquals("commonprop1", props.getString("commonprop1"));
+		assertEquals("commonprop2", props.getString("commonprop2"));
+		assertEquals("commonprop3", props.getString("commonprop3"));
+		
+		Props priv = pluginSet.getCommonPluginLoadProps();
+		assertEquals("commonprivate1", priv.getString("commonprivate1"));
+		assertEquals("commonprivate2", priv.getString("commonprivate2"));
+		assertEquals("commonprivate3", priv.getString("commonprivate3"));
+	}
+
+	/**
+	 * Tests that the proper classes were loaded and that the common and the load
+	 * properties are properly loaded.
+	 * 
+	 * @throws Exception
+	 */
+	@Test
+	public void testLoadedClasses() throws Exception {
+		JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+		
+		Props props = pluginSet.getCommonPluginJobProps();
+		System.out.println(props.toString());
+		assertEquals("commonprop1", props.getString("commonprop1"));
+		assertEquals("commonprop2", props.getString("commonprop2"));
+		assertEquals("commonprop3", props.getString("commonprop3"));
+		assertNull(props.get("commonprivate1"));
+		
+		Props priv = pluginSet.getCommonPluginLoadProps();
+		assertEquals("commonprivate1", priv.getString("commonprivate1"));
+		assertEquals("commonprivate2", priv.getString("commonprivate2"));
+		assertEquals("commonprivate3", priv.getString("commonprivate3"));
+		
+		// Testing the anothertestjobtype
+		Class<? extends Job> aPluginClass = pluginSet.getPluginClass("anothertestjob");
+		assertEquals("azkaban.test.jobtype.FakeJavaJob", aPluginClass.getName());
+		Props ajobProps = pluginSet.getPluginJobProps("anothertestjob");
+		Props aloadProps = pluginSet.getPluginLoaderProps("anothertestjob");
+
+		// Loader props
+		assertEquals("lib/*", aloadProps.get("jobtype.classpath"));
+		assertEquals("azkaban.test.jobtype.FakeJavaJob", aloadProps.get("jobtype.class"));
+		assertEquals("commonprivate1", aloadProps.get("commonprivate1"));
+		assertEquals("commonprivate2", aloadProps.get("commonprivate2"));
+		assertEquals("commonprivate3", aloadProps.get("commonprivate3"));
+		// Job props
+		assertEquals("commonprop1", ajobProps.get("commonprop1"));
+		assertEquals("commonprop2", ajobProps.get("commonprop2"));
+		assertEquals("commonprop3", ajobProps.get("commonprop3"));
+		assertNull(ajobProps.get("commonprivate1"));
+		
+		Class<? extends Job> tPluginClass = pluginSet.getPluginClass("testjob");
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", tPluginClass.getName());
+		Props tjobProps = pluginSet.getPluginJobProps("testjob");
+		Props tloadProps = pluginSet.getPluginLoaderProps("testjob");
+
+		// Loader props
+		assertNull(tloadProps.get("jobtype.classpath"));
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", tloadProps.get("jobtype.class"));
+		assertEquals("commonprivate1", tloadProps.get("commonprivate1"));
+		assertEquals("commonprivate2", tloadProps.get("commonprivate2"));
+		assertEquals("private3", tloadProps.get("commonprivate3"));
+		assertEquals("0", tloadProps.get("testprivate"));
+		// Job props
+		assertEquals("commonprop1", tjobProps.get("commonprop1"));
+		assertEquals("commonprop2", tjobProps.get("commonprop2"));
+		assertEquals("1", tjobProps.get("pluginprops1"));
+		assertEquals("2", tjobProps.get("pluginprops2"));
+		assertEquals("3", tjobProps.get("pluginprops3"));
+		assertEquals("pluginprops", tjobProps.get("commonprop3"));
+		// Testing that the private properties aren't shared with the public ones
+		assertNull(tjobProps.get("commonprivate1"));
+		assertNull(tjobProps.get("testprivate"));
+	}
+	
+	/**
+	 * Test building classes
+	 * @throws Exception
+	 */
+	@Test
+	public void testBuildClass() throws Exception {
+		Props jobProps = new Props();
+		jobProps.put("type", "anothertestjob");
+		jobProps.put("test","test1");
+		jobProps.put("pluginprops3","4");
+		Job job = manager.buildJobExecutor("anothertestjob", jobProps, logger);
+		
+		assertTrue(job instanceof FakeJavaJob);
+		FakeJavaJob fjj = (FakeJavaJob)job;
+		
+		Props props = fjj.getJobProps();
+		assertEquals("test1", props.get("test"));
+		assertNull(props.get("pluginprops1"));
+		assertEquals("4", props.get("pluginprops3"));
+		assertEquals("commonprop1", props.get("commonprop1"));
+		assertEquals("commonprop2", props.get("commonprop2"));
+		assertEquals("commonprop3", props.get("commonprop3"));
+		assertNull(props.get("commonprivate1"));
+	}
+	
+	/**
+	 * Test building classes 2
+	 * @throws Exception
+	 */
+	@Test
+	public void testBuildClass2() throws Exception {
+		Props jobProps = new Props();
+		jobProps.put("type", "testjob");
+		jobProps.put("test","test1");
+		jobProps.put("pluginprops3","4");
+		Job job = manager.buildJobExecutor("testjob", jobProps, logger);
+		
+		assertTrue(job instanceof FakeJavaJob2);
+		FakeJavaJob2 fjj = (FakeJavaJob2)job;
+		
+		Props props = fjj.getJobProps();
+		assertEquals("test1", props.get("test"));
+		assertEquals("1", props.get("pluginprops1"));
+		assertEquals("2", props.get("pluginprops2"));
+		assertEquals("4", props.get("pluginprops3")); // Overridden value
+		assertEquals("commonprop1", props.get("commonprop1"));
+		assertEquals("commonprop2", props.get("commonprop2"));
+		assertEquals("pluginprops", props.get("commonprop3"));
+		assertNull(props.get("commonprivate1"));
+	}
+	
+	/**
+	 * Test out reloading properties
+	 * @throws Exception
+	 */
+	@Test
+	public void testResetPlugins() throws Exception {
+		// Add a plugins file to the anothertestjob folder
+		File anothertestfolder = new File(TEST_PLUGIN_DIR + "/anothertestjob");
+		Props pluginProps = new Props();
+		pluginProps.put("test1", "1");
+		pluginProps.put("test2", "2");
+		pluginProps.put("pluginprops3","4");
+		pluginProps.storeFlattened(new File(anothertestfolder, "plugin.properties"));
+		
+		// clone the testjob folder
+		File testFolder = new File(TEST_PLUGIN_DIR + "/testjob");
+		FileUtils.copyDirectory(testFolder, new File(TEST_PLUGIN_DIR + "/newtestjob"));
+		
+		// change the common properties
+		Props commonPlugin = new Props(null, TEST_PLUGIN_DIR + "/common.properties");
+		commonPlugin.put("commonprop1", "1");
+		commonPlugin.put("newcommonprop1", "2");
+		commonPlugin.removeLocal("commonprop2");
+		commonPlugin.storeFlattened(new File(TEST_PLUGIN_DIR + "/common.properties"));
+		
+		// change the common properties
+		Props commonPrivate = new Props(null, TEST_PLUGIN_DIR + "/commonprivate.properties");
+		commonPrivate.put("commonprivate1", "1");
+		commonPrivate.put("newcommonprivate1", "2");
+		commonPrivate.removeLocal("commonprivate2");
+		commonPrivate.storeFlattened(new File(TEST_PLUGIN_DIR + "/commonprivate.properties"));
+		
+		// change testjob private property
+		Props loadProps = new Props(null, TEST_PLUGIN_DIR + "/testjob/private.properties");
+		loadProps.put("privatetest", "test");
+		
+		/*
+		 * Reload the plugins here!!
+		 */
+		manager.loadPlugins();
+		
+		// Checkout common props
+		JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+		Props commonProps = pluginSet.getCommonPluginJobProps();
+		assertEquals("1", commonProps.get("commonprop1"));
+		assertEquals("commonprop3", commonProps.get("commonprop3"));
+		assertEquals("2", commonProps.get("newcommonprop1"));
+		assertNull(commonProps.get("commonprop2"));
+		
+		// Checkout common private
+		Props commonPrivateProps = pluginSet.getCommonPluginLoadProps();
+		assertEquals("1", commonPrivateProps.get("commonprivate1"));
+		assertEquals("commonprivate3", commonPrivateProps.get("commonprivate3"));
+		assertEquals("2", commonPrivateProps.get("newcommonprivate1"));
+		assertNull(commonPrivateProps.get("commonprivate2"));
+		
+		// Verify anothertestjob changes
+		Class<? extends Job> atjClass = pluginSet.getPluginClass("anothertestjob");
+		assertEquals("azkaban.test.jobtype.FakeJavaJob", atjClass.getName());
+		Props ajobProps = pluginSet.getPluginJobProps("anothertestjob");
+		assertEquals("1", ajobProps.get("test1"));
+		assertEquals("2", ajobProps.get("test2"));
+		assertEquals("4", ajobProps.get("pluginprops3"));
+		assertEquals("commonprop3", ajobProps.get("commonprop3"));
+
+		Props aloadProps = pluginSet.getPluginLoaderProps("anothertestjob");
+		assertEquals("1", aloadProps.get("commonprivate1"));
+		assertNull(aloadProps.get("commonprivate2"));
+		assertEquals("commonprivate3", aloadProps.get("commonprivate3"));
+		
+		// Verify testjob changes
+		Class<? extends Job> tjClass = pluginSet.getPluginClass("testjob");
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", tjClass.getName());
+		Props tjobProps = pluginSet.getPluginJobProps("testjob");
+		assertEquals("1", tjobProps.get("commonprop1"));
+		assertEquals("2", tjobProps.get("newcommonprop1"));
+		assertEquals("1", tjobProps.get("pluginprops1"));
+		assertEquals("2", tjobProps.get("pluginprops2"));
+		assertEquals("3", tjobProps.get("pluginprops3"));
+		assertEquals("pluginprops", tjobProps.get("commonprop3"));
+		assertNull(tjobProps.get("commonprop2"));
+
+		Props tloadProps = pluginSet.getPluginLoaderProps("testjob");
+		assertNull(tloadProps.get("jobtype.classpath"));
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", tloadProps.get("jobtype.class"));
+		assertEquals("1", tloadProps.get("commonprivate1"));
+		assertNull(tloadProps.get("commonprivate2"));
+		assertEquals("private3", tloadProps.get("commonprivate3"));
+		
+		// Verify newtestjob
+		Class<? extends Job> ntPluginClass = pluginSet.getPluginClass("newtestjob");
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", ntPluginClass.getName());
+		Props ntjobProps = pluginSet.getPluginJobProps("newtestjob");
+		Props ntloadProps = pluginSet.getPluginLoaderProps("newtestjob");
+
+		// Loader props
+		assertNull(ntloadProps.get("jobtype.classpath"));
+		assertEquals("azkaban.test.jobtype.FakeJavaJob2", ntloadProps.get("jobtype.class"));
+		assertEquals("1", ntloadProps.get("commonprivate1"));
+		assertNull(ntloadProps.get("commonprivate2"));
+		assertEquals("private3", ntloadProps.get("commonprivate3"));
+		assertEquals("0", ntloadProps.get("testprivate"));
+		// Job props
+		assertEquals("1", ntjobProps.get("commonprop1"));
+		assertNull(ntjobProps.get("commonprop2"));
+		assertEquals("1", ntjobProps.get("pluginprops1"));
+		assertEquals("2", ntjobProps.get("pluginprops2"));
+		assertEquals("3", ntjobProps.get("pluginprops3"));
+		assertEquals("pluginprops", ntjobProps.get("commonprop3"));
+	}
+}
diff --git a/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar b/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar
new file mode 100644
index 0000000..51eb0de
Binary files /dev/null and b/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar differ
diff --git a/unit/plugins/jobtypes/anothertestjob/private.properties b/unit/plugins/jobtypes/anothertestjob/private.properties
new file mode 100644
index 0000000..8e95c94
--- /dev/null
+++ b/unit/plugins/jobtypes/anothertestjob/private.properties
@@ -0,0 +1,2 @@
+jobtype.classpath=lib/*
+jobtype.class=azkaban.test.jobtype.FakeJavaJob
diff --git a/unit/plugins/jobtypes/common.properties b/unit/plugins/jobtypes/common.properties
new file mode 100644
index 0000000..2823a83
--- /dev/null
+++ b/unit/plugins/jobtypes/common.properties
@@ -0,0 +1,3 @@
+commonprop1=commonprop1
+commonprop2=commonprop2
+commonprop3=commonprop3
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/commonprivate.properties b/unit/plugins/jobtypes/commonprivate.properties
new file mode 100644
index 0000000..7d46d43
--- /dev/null
+++ b/unit/plugins/jobtypes/commonprivate.properties
@@ -0,0 +1,3 @@
+commonprivate1=commonprivate1
+commonprivate2=commonprivate2
+commonprivate3=commonprivate3
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/testjob/fakejobtype.jar b/unit/plugins/jobtypes/testjob/fakejobtype.jar
new file mode 100644
index 0000000..51eb0de
Binary files /dev/null and b/unit/plugins/jobtypes/testjob/fakejobtype.jar differ
diff --git a/unit/plugins/jobtypes/testjob/plugin.properties b/unit/plugins/jobtypes/testjob/plugin.properties
new file mode 100644
index 0000000..587081a
--- /dev/null
+++ b/unit/plugins/jobtypes/testjob/plugin.properties
@@ -0,0 +1,4 @@
+pluginprops1=1
+pluginprops2=2
+pluginprops3=3
+commonprop3=pluginprops
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/testjob/private.properties b/unit/plugins/jobtypes/testjob/private.properties
new file mode 100644
index 0000000..2bba593
--- /dev/null
+++ b/unit/plugins/jobtypes/testjob/private.properties
@@ -0,0 +1,3 @@
+jobtype.class=azkaban.test.jobtype.FakeJavaJob2
+commonprivate3=private3
+testprivate=0
\ No newline at end of file