azkaban-developers
Changes
build.gradle 288(+181 -107)
src/main/java/azkaban/jobtype/JobTypeManager.java 408(+183 -225)
src/main/java/azkaban/jobtype/JobTypePluginSet.java 145(+145 -0)
src/restli/.gitignore 2(+2 -0)
Details
build.gradle 288(+181 -107)
diff --git a/build.gradle b/build.gradle
index aaea92f..8ace64a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,5 +1,15 @@
+buildscript {
+ repositories {
+ mavenCentral()
+ }
+ dependencies {
+ classpath 'de.obqo.gradle:gradle-lesscss-plugin:1.0-1.3.3'
+ }
+}
+
apply plugin: 'java'
apply plugin: 'eclipse'
+apply plugin: 'lesscss'
defaultTasks 'dist'
@@ -41,18 +51,21 @@ configurations {
compile {
description = 'compile classpath'
}
+ generateRestli {
+ transitive = true
+ }
test {
extendsFrom compile
}
}
-configurations.compile {
- description = 'compile classpath'
-}
+
+ext.pegasusVersion = '1.15.7'
dependencies {
compile (
[group: 'commons-collections', name:'commons-collections', version: '3.2.1'],
[group: 'commons-configuration', name:'commons-configuration', version: '1.8'],
+ [group: 'commons-codec', name:'commons-codec', version: '1.9'],
[group: 'commons-dbcp', name:'commons-dbcp', version: '1.4'],
[group: 'commons-dbutils', name:'commons-dbutils', version: '1.5'],
[group: 'org.apache.commons', name:'commons-email', version: '1.2'],
@@ -68,7 +81,6 @@ dependencies {
[group: 'org.apache.httpcomponents', name:'httpcore', version: '4.2.1'],
[group: 'org.codehaus.jackson', name:'jackson-core-asl', version: '1.9.5'],
[group: 'org.codehaus.jackson', name:'jackson-mapper-asl',version: '1.9.5'],
- [group: 'org.codehaus.jackson', name:'jackson-core-asl', version: '1.9.5'],
[group: 'org.mortbay.jetty', name:'jetty', version: '6.1.26'],
[group: 'org.mortbay.jetty', name:'jetty-util', version: '6.1.26'],
[group: 'joda-time', name:'joda-time', version: '2.0'],
@@ -78,14 +90,42 @@ dependencies {
[group: 'mysql', name:'mysql-connector-java', version: '5.1.28'],
[group: 'javax.servlet', name:'servlet-api', version: '2.5'],
[group: 'org.slf4j', name:'slf4j-api', version: '1.6.1'],
- [group: 'org.apache.velocity', name:'velocity', version: '1.7']
+ [group: 'org.apache.velocity', name:'velocity', version: '1.7'],
+ [group: 'com.linkedin.pegasus', name: 'gradle-plugins', version: pegasusVersion],
+ [group: 'com.linkedin.pegasus', name: 'pegasus-common', version: pegasusVersion],
+ [group: 'com.linkedin.pegasus', name: 'restli-common', version: pegasusVersion],
+ [group: 'com.linkedin.pegasus', name: 'restli-server', version: pegasusVersion],
+ [group: 'com.linkedin.pegasus', name: 'data', version: pegasusVersion],
+ [group: 'com.linkedin.pegasus', name: 'r2', version: pegasusVersion],
+ [group: 'com.linkedin.pegasus', name: 'li-jersey-uri', version: pegasusVersion],
+ [group: 'com.linkedin.parseq', name: 'parseq', version: '1.3.7'],
+ [group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.3.2']
+ )
+
+ generateRestli (
+ [group: 'com.linkedin.pegasus', name:'generator', version: pegasusVersion],
+ [group: 'com.linkedin.pegasus', name:'restli-tools', version: pegasusVersion]
)
testCompile (
- [group: 'junit', name:'junit', version: '4.11']
+ [group: 'junit', name:'junit', version: '4.11'],
+ [group: 'org.hamcrest', name:'hamcrest-all', version: '1.3']
)
}
+sourceSets {
+ main {
+ java {
+ srcDirs 'src/main/java', 'src/restli/generatedJava', 'src/restli/java'
+ }
+ }
+ test {
+ java {
+ srcDirs 'unit/java'
+ }
+ }
+}
+
jar {
baseName = 'azkaban'
manifest {
@@ -96,6 +136,27 @@ jar {
}
}
+task restliTemplateGenerator(type: JavaExec) {
+ mkdir 'src/restli/generatedJava'
+
+ main = 'com.linkedin.pegasus.generator.PegasusDataTemplateGenerator'
+ args = ['src/restli/generatedJava','src/restli/schemas']
+ classpath = configurations.generateRestli
+}
+
+task restliRestSpecGenerator(dependsOn: [restliTemplateGenerator], type: JavaExec) << {
+ mkdir 'src/restli/generatedRestSpec'
+
+ main = 'com.linkedin.restli.tools.idlgen.RestLiResourceModelExporterCmdLineApp'
+ args = ['-outdir', 'src/restli/generatedRestSpec', '-sourcepath', 'src/restli/java']
+ classpath = configurations.generateRestli
+}
+
+task restli(dependsOn: restliTemplateGenerator) << {
+}
+
+compileJava.dependsOn.add('restli')
+
eclipse.classpath.file {
// Erase the whole classpath
beforeMerged {
@@ -109,15 +170,12 @@ eclipse.classpath.file {
}
}
-/**
- * Invokes a makefile target that will compile less files
- */
-task compileLess(type:Exec) {
- workingDir 'src/main/less'
- commandLine 'make', '-e'
- environment (
- OBJ_DIR : file(new File(buildDir,'/less'))
- )
+lesscss {
+ source = fileTree('src/main/less') {
+ include 'azkaban.less'
+ include 'azkaban-graph.less'
+ }
+ dest = 'build/web/css'
}
/**
@@ -128,27 +186,22 @@ task compileDust(type:Exec) {
commandLine 'make', '-e'
environment (
OBJ_DIR : file(new File(buildDir,'/dust'))
- )
+ )
}
/**
* Copies web files to a build directory
*/
-task web(dependsOn: ['compileLess', 'compileDust']) << {
+task web(dependsOn: ['lesscss', 'compileDust']) << {
println 'Copying web files'
copy {
from('src/web')
into('build/web')
}
-
copy {
from('build/dust')
into('build/web/js')
}
- copy {
- from('build/less')
- into('build/web/css')
- }
}
/*
@@ -170,219 +223,240 @@ task createVersionFile() << {
versionFile.write(versionStr)
}
+ext.soloAppendix = 'solo-server'
+ext.soloPackageDir = 'build/package/' + jar.baseName + '-' + soloAppendix
+
/**
- * Packages the SoloServer version of Azkaban
+ * Copies the Azkaban Solo Server files into its package directory.
*/
-task packageSolo(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
- appendix = 'solo-server'
- packageDir = 'build/package/' + baseName + '-' + appendix
+task copySolo(dependsOn: ['jar', 'web', 'createVersionFile']) << {
+ delete soloPackageDir
+ mkdir soloPackageDir
- println 'Creating Azkaban Solo Server Package into ' + packageDir
- mkdir packageDir
- mkdir packageDir + '/extlib'
- mkdir packageDir + '/plugins'
+ println 'Creating Azkaban Solo Server Package into ' + soloPackageDir
+ mkdir soloPackageDir + '/lib'
+ mkdir soloPackageDir + '/extlib'
+ mkdir soloPackageDir + '/plugins'
- println 'Copying Soloserver bin & conf'
copy {
+ println 'Copying Soloserver bin & conf'
from('src/package/soloserver')
- into(packageDir)
+ into(soloPackageDir)
}
- println 'Copying Azkaban lib'
copy {
+ println 'Copying Azkaban lib'
from('build/libs')
- into(packageDir + '/lib')
+ into(soloPackageDir + '/lib')
}
- println 'Copying web'
copy {
+ println 'Copying web'
from('build/web')
- into(packageDir + '/web')
+ into(soloPackageDir + '/web')
}
- println 'Copying sql'
copy {
+ println 'Copying sql'
from('src/sql')
- into(packageDir + '/sql')
+ into(soloPackageDir + '/sql')
}
- println 'Copying dependency jars'
copy {
- into packageDir + '/lib'
+ println 'Copying dependency jars'
+ into soloPackageDir + '/lib'
from configurations.compile
}
copy {
- into packageDir
+ println 'Copying version file'
+ into soloPackageDir
from 'build/package/version.file'
}
+}
- println 'Tarballing Solo Package'
+/**
+ * Packages the SoloServer version of Azkaban
+ */
+task packageSolo(type: Tar, dependsOn: 'copySolo') {
+ appendix = soloAppendix
extension = 'tar.gz'
compression = Compression.GZIP
- basedir = baseName + '-' + appendix + '-' + version
- println 'Source is in ' + packageDir
+ ext.basedir = baseName + '-' + appendix + '-' + version
into(basedir) {
- from packageDir
+ from soloPackageDir
exclude 'bin'
}
- dst_bin = basedir + '/bin'
- src_bin = packageDir + '/bin'
+ ext.dst_bin = basedir + '/bin'
+ ext.src_bin = soloPackageDir + '/bin'
from(src_bin) {
into dst_bin
fileMode = 0755
}
}
+ext.sqlPackageDir = 'build/package/sql'
+
/**
- * Packages the Sql Scripts for Azkaban DB
+ * Copies the SQL files into its package directory.
*/
-task packageSql(type: Tar) {
- String packageDir = 'build/package/sql'
+task copySql() << {
+ println 'Creating Azkaban SQL Scripts into ' + sqlPackageDir
+ delete sqlPackageDir
+ mkdir sqlPackageDir
- println 'Creating Azkaban SQL Scripts into ' + packageDir
- mkdir packageDir
-
- println 'Copying SQL files'
copy {
+ println 'Copying SQL files'
from('src/sql')
- into(packageDir)
+ into(sqlPackageDir)
}
- String destFile = packageDir + '/create-all-sql-' + version + '.sql';
- println('Concating create scripts to ' + destFile)
+ String destFile = sqlPackageDir + '/create-all-sql-' + version + '.sql';
ant.concat(destfile:destFile, fixlastline:'yes') {
+ println('Concating create scripts to ' + destFile)
fileset(dir: 'src/sql') {
exclude(name: 'update.*.sql')
exclude(name: 'database.properties')
}
}
+}
- println 'Tarballing SQL Package'
+/**
+ * Packages the Sql Scripts for Azkaban DB
+ */
+task packageSql(type: Tar, dependsOn: 'copySql') {
extension = 'tar.gz'
compression = Compression.GZIP
appendix = 'sql'
- basedir = baseName + '-' + appendix + '-' + version
- packageDir = 'build/package/sql'
- println 'Source is in ' + packageDir
+ ext.basedir = baseName + '-' + appendix + '-' + version
into(basedir) {
- from packageDir
+ from sqlPackageDir
}
}
+ext.execAppendix = 'exec-server'
+ext.execPackageDir = 'build/package/' + jar.baseName + '-' + execAppendix
+
/**
- * Packages the Azkaban Executor Server
+ * Copies the Azkaban Executor Server files into its package directory.
*/
-task packageExec(type: Tar, dependsOn: [jar, 'createVersionFile']) {
- appendix = 'exec-server'
- String packageDir = 'build/package/' + baseName + '-' + appendix
-
- println 'Creating Azkaban Executor Server Package into ' + packageDir
- mkdir packageDir
- mkdir packageDir + '/extlib'
- mkdir packageDir + '/plugins'
+task copyExec(dependsOn: ['jar', 'createVersionFile']) << {
+ delete execPackageDir
+ println 'Creating Azkaban Executor Server Package into ' + execPackageDir
+ mkdir execPackageDir
+ mkdir execPackageDir + '/lib'
+ mkdir execPackageDir + '/extlib'
+ mkdir execPackageDir + '/plugins'
- println 'Copying Exec server bin & conf'
copy {
+ println 'Copying Exec server bin & conf'
from('src/package/execserver')
- into(packageDir)
+ into(execPackageDir)
}
- println 'Copying Azkaban lib'
copy {
+ println 'Copying Azkaban lib '
from('build/libs')
- into(packageDir + '/lib')
+ into(execPackageDir + '/lib')
}
- println 'Copying dependency jars'
copy {
- into packageDir + '/lib'
+ println 'Copying dependency jars'
+ into execPackageDir + '/lib'
from configurations.compile
}
copy {
- into packageDir
+ into execPackageDir
from 'build/package/version.file'
}
+}
- println 'Tarballing Web Package'
+/**
+ * Packages the Azkaban Executor Server
+ */
+task packageExec(type: Tar, dependsOn: 'copyExec') {
+ appendix = execAppendix
extension = 'tar.gz'
compression = Compression.GZIP
- basedir = baseName + '-' + appendix + '-' + version
- packageDir = 'build/package/' + baseName + '-' + appendix
- println 'Source is in ' + packageDir
-
+ ext.basedir = baseName + '-' + appendix + '-' + version
into(basedir) {
- from packageDir
+ from execPackageDir
exclude 'bin'
}
- dst_bin = basedir + '/bin'
- src_bin = packageDir + '/bin'
+ ext.dst_bin = basedir + '/bin'
+ ext.src_bin = execPackageDir + '/bin'
from(src_bin) {
into dst_bin
fileMode = 0755
}
}
+ext.webAppendix = 'web-server'
+ext.webPackageDir = 'build/package/' + jar.baseName + '-' + webAppendix
+
/**
- * Packages the Azkaban Web Server
+ * Copies the Azkaban Web Server files into its package directory.
*/
-task packageWeb(type: Tar, dependsOn: [jar, 'web', 'createVersionFile']) {
- appendix = 'web-server'
- String packageDir = 'build/package/' + baseName + '-' + appendix
-
- println 'Creating Azkaban Web Server Package into ' + packageDir
- mkdir packageDir
- mkdir packageDir + '/extlib'
- mkdir packageDir + '/plugins'
+task copyWeb(dependsOn: ['jar', 'web', 'createVersionFile']) << {
+ println 'Creating Azkaban Web Server Package into ' + webPackageDir
+ delete webPackageDir
+ mkdir webPackageDir
+ mkdir webPackageDir + '/lib'
+ mkdir webPackageDir + '/extlib'
+ mkdir webPackageDir + '/plugins'
println 'Copying Web server bin & conf'
copy {
from('src/package/webserver')
- into(packageDir)
+ into(webPackageDir)
}
- println 'Copying Azkaban lib'
+ println 'Copying Azkaban lib '
copy {
from('build/libs')
- into(packageDir + '/lib')
+ into(webPackageDir + '/lib')
}
println 'Copying web'
copy {
from('build/web')
- into(packageDir + '/web')
+ into(webPackageDir + '/web')
}
println 'Copying dependency jars'
copy {
- into packageDir + '/lib'
+ into webPackageDir + '/lib'
from configurations.compile
}
copy {
- into packageDir
+ into webPackageDir
from 'build/package/version.file'
}
+}
- println 'Tarballing Web Package'
+/**
+ * Packages the Azkaban Web Server
+ */
+task packageWeb(type: Tar, dependsOn: 'copyWeb') {
+ appendix = webAppendix
extension = 'tar.gz'
compression = Compression.GZIP
- basedir = baseName + '-' + appendix + '-' + version
- println 'Source is in ' + packageDir
+ ext.basedir = baseName + '-' + appendix + '-' + version
into(basedir) {
- from packageDir
+ from webPackageDir
exclude 'bin'
}
- dst_bin = basedir + '/bin'
- src_bin = packageDir + '/bin'
+ ext.dst_bin = basedir + '/bin'
+ ext.src_bin = webPackageDir + '/bin'
from(src_bin) {
into dst_bin
fileMode = 0755
diff --git a/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index a6ee307..971f9c1 100644
--- a/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;
+import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
@@ -86,6 +87,13 @@ public class AzkabanExecutorServer {
server = new Server(portNumber);
QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
server.setThreadPool(httpThreadPool);
+
+ boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
+ logger.info("Setting up connector with stats on: " + isStatsOn);
+
+ for (Connector connector : server.getConnectors()) {
+ connector.setStatsOn(isStatsOn);
+ }
Context root = new Context(server, "/", Context.SESSIONS);
root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
diff --git a/src/main/java/azkaban/execapp/ExecutorServlet.java b/src/main/java/azkaban/execapp/ExecutorServlet.java
index 75a1a08..cb8a211 100644
--- a/src/main/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/main/java/azkaban/execapp/ExecutorServlet.java
@@ -88,6 +88,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
else if (action.equals(PING_ACTION)) {
respMap.put("status", "alive");
}
+ else if (action.equals(RELOAD_JOBTYPE_PLUGINS_ACTION)) {
+ logger.info("Reloading Jobtype plugins");
+ handleReloadJobTypePlugins(respMap);
+ }
else {
int execid = Integer.parseInt(getParam(req, EXECID_PARAM));
String user = getParam(req, USER_PARAM, null);
@@ -337,6 +341,17 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
+ private void handleReloadJobTypePlugins(Map<String, Object> respMap) throws ServletException {
+ try {
+ flowRunnerManager.reloadJobTypePlugins();
+ respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
+ }
+ catch (Exception e) {
+ logger.error(e);
+ respMap.put(RESPONSE_ERROR, e.getMessage());
+ }
+ }
+
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
diff --git a/src/main/java/azkaban/execapp/FlowRunner.java b/src/main/java/azkaban/execapp/FlowRunner.java
index e6db63e..0e1c198 100644
--- a/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/src/main/java/azkaban/execapp/FlowRunner.java
@@ -966,8 +966,6 @@ public class FlowRunner extends EventHandler implements Runnable {
}
flow.setUpdateTime(System.currentTimeMillis());
flow.setEndTime(-1);
- flow.setStartTime(maxStartTime);
-
logger.info("Resetting flow '" + flow.getNestedId() + "' from " + oldFlowState + " to " + flow.getStatus());
}
diff --git a/src/main/java/azkaban/execapp/FlowRunnerManager.java b/src/main/java/azkaban/execapp/FlowRunnerManager.java
index f886e64..a9e2059 100644
--- a/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -47,6 +47,7 @@ import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypeManagerException;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
@@ -689,6 +690,7 @@ public class FlowRunnerManager implements EventListener {
return jobCount;
}
-
-
+ public void reloadJobTypePlugins() throws JobTypeManagerException {
+ jobtypeManager.loadPlugins();
+ }
}
diff --git a/src/main/java/azkaban/executor/ConnectorParams.java b/src/main/java/azkaban/executor/ConnectorParams.java
index c84436e..3b3f00e 100644
--- a/src/main/java/azkaban/executor/ConnectorParams.java
+++ b/src/main/java/azkaban/executor/ConnectorParams.java
@@ -32,6 +32,7 @@ public interface ConnectorParams {
public static final String LOG_ACTION = "log";
public static final String ATTACHMENTS_ACTION = "attachments";
public static final String METADATA_ACTION = "metadata";
+ public static final String RELOAD_JOBTYPE_PLUGINS_ACTION = "reloadJobTypePlugins";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
src/main/java/azkaban/jobtype/JobTypeManager.java 408(+183 -225)
diff --git a/src/main/java/azkaban/jobtype/JobTypeManager.java b/src/main/java/azkaban/jobtype/JobTypeManager.java
index 6c275af..df9627e 100644
--- a/src/main/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/main/java/azkaban/jobtype/JobTypeManager.java
@@ -28,20 +28,18 @@ import azkaban.utils.PropsUtils;
import azkaban.utils.Utils;
import azkaban.jobExecutor.utils.JobExecutionException;
import java.io.File;
+import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.log4j.Logger;
public class JobTypeManager
{
-
- private final String jobtypePluginDir; // the dir for jobtype plugins
+ private final String jobTypePluginDir; // the dir for jobtype plugins
private final ClassLoader parentLoader;
public static final String DEFAULT_JOBTYPEPLUGINDIR = "plugins/jobtypes";
@@ -51,27 +49,25 @@ public class JobTypeManager
private static final String COMMONSYSCONFFILE = "commonprivate.properties"; // common private properties for multiple plugins
private static final Logger logger = Logger.getLogger(JobTypeManager.class);
- private Map<String, Class<? extends Job>> jobToClass;
- private Map<String, Props> jobtypeJobProps;
- private Map<String, Props> jobtypeSysProps;
+ private JobTypePluginSet pluginSet;
- public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader)
- {
- this.jobtypePluginDir = jobtypePluginDir;
+ public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader) {
+ this.jobTypePluginDir = jobtypePluginDir;
this.parentLoader = parentClassLoader;
- jobToClass = new HashMap<String, Class<? extends Job>>();
- jobtypeJobProps = new HashMap<String, Props>();
- jobtypeSysProps = new HashMap<String, Props>();
-
- loadDefaultTypes();
+ loadPlugins();
+ }
+
+ public void loadPlugins() throws JobTypeManagerException {
+ JobTypePluginSet plugins = new JobTypePluginSet();
- if(jobtypePluginDir != null) {
- File pluginDir = new File(jobtypePluginDir);
+ loadDefaultTypes(plugins);
+ if (jobTypePluginDir != null) {
+ File pluginDir = new File(jobTypePluginDir);
if (pluginDir.exists()) {
- logger.info("job type plugin directory set. Loading extra job types.");
+ logger.info("Job type plugin directory set. Loading extra job types from " + pluginDir);
try {
- loadPluginJobTypes();
+ loadPluginJobTypes(plugins);
}
catch (Exception e) {
logger.info("Plugin jobtypes failed to load. " + e.getCause());
@@ -80,186 +76,180 @@ public class JobTypeManager
}
}
+ // Swap the plugin set. If exception is thrown, then plugin isn't swapped.
+ synchronized (this) {
+ pluginSet = plugins;
+ }
}
-
- private void loadDefaultTypes() throws JobTypeManagerException{
- jobToClass.put("command", ProcessJob.class);
- jobToClass.put("javaprocess", JavaProcessJob.class);
- jobToClass.put("noop", NoopJob.class);
- jobToClass.put("python", PythonJob.class);
- jobToClass.put("ruby", RubyJob.class);
- jobToClass.put("script", ScriptJob.class);
+
+ private void loadDefaultTypes(JobTypePluginSet plugins) throws JobTypeManagerException {
+ logger.info("Loading plugin default job types");
+ plugins.addPluginClass("command", ProcessJob.class);
+ plugins.addPluginClass("javaprocess", JavaProcessJob.class);
+ plugins.addPluginClass("noop", NoopJob.class);
+ plugins.addPluginClass("python", PythonJob.class);
+ plugins.addPluginClass("ruby", RubyJob.class);
+ plugins.addPluginClass("script", ScriptJob.class);
}
// load Job Types from jobtype plugin dir
- private void loadPluginJobTypes() throws JobTypeManagerException
- {
- File jobPluginsDir = new File(jobtypePluginDir);
+ private void loadPluginJobTypes(JobTypePluginSet plugins) throws JobTypeManagerException {
+ File jobPluginsDir = new File(jobTypePluginDir);
if (!jobPluginsDir.exists()) {
+ logger.error("Job type plugin dir " + jobTypePluginDir + " doesn't exist. Will not load any external plugins.");
return;
}
-
- if (!jobPluginsDir.isDirectory()) {
- throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not a directory!");
+ else if (!jobPluginsDir.isDirectory()) {
+ throw new JobTypeManagerException("Job type plugin dir " + jobTypePluginDir + " is not a directory!");
}
-
- if (!jobPluginsDir.canRead()) {
- throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not readable!");
+ else if (!jobPluginsDir.canRead()) {
+ throw new JobTypeManagerException("Job type plugin dir " + jobTypePluginDir + " is not readable!");
}
- // look for global conf
- Props globalConf = null;
- Props globalSysConf = null;
- File confFile = findFilefromDir(jobPluginsDir, COMMONCONFFILE);
- File sysConfFile = findFilefromDir(jobPluginsDir, COMMONSYSCONFFILE);
- try {
- if(confFile != null) {
- globalConf = new Props(null, confFile);
+ // Load the common properties used by all jobs that are run
+ Props commonPluginJobProps = null;
+ File commonJobPropsFile = new File(jobPluginsDir, COMMONCONFFILE);
+ if (commonJobPropsFile.exists()) {
+ logger.info("Common plugin job props file " + commonJobPropsFile + " found. Attempt to load.");
+ try {
+ commonPluginJobProps = new Props(null, commonJobPropsFile);
}
- else {
- globalConf = new Props();
+ catch (IOException e) {
+ throw new JobTypeManagerException("Failed to load common plugin job properties" + e.getCause());
}
- if(sysConfFile != null) {
- globalSysConf = new Props(null, sysConfFile);
+ }
+ else {
+ logger.info("Common plugin job props file " + commonJobPropsFile + " not found. Using empty props.");
+ commonPluginJobProps = new Props();
+ }
+
+ // Loads the common properties used by all plugins when loading
+ Props commonPluginLoadProps = null;
+ File commonLoadPropsFile = new File(jobPluginsDir, COMMONSYSCONFFILE);
+ if (commonLoadPropsFile.exists()) {
+ logger.info("Common plugin load props file " + commonLoadPropsFile + " found. Attempt to load.");
+ try {
+ commonPluginLoadProps = new Props(null, commonLoadPropsFile);
}
- else {
- globalSysConf = new Props();
+ catch (IOException e) {
+ throw new JobTypeManagerException("Failed to load common plugin loader properties" + e.getCause());
}
}
- catch (Exception e) {
- throw new JobTypeManagerException("Failed to get global jobtype properties" + e.getCause());
+ else {
+ logger.info("Common plugin load props file " + commonLoadPropsFile + " not found. Using empty props.");
+ commonPluginLoadProps = new Props();
}
+ plugins.setCommonPluginJobProps(commonPluginJobProps);
+ plugins.setCommonPluginLoadProps(commonPluginLoadProps);
- synchronized (this) {
- ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
- try{
- for(File dir : jobPluginsDir.listFiles()) {
- if(dir.isDirectory() && dir.canRead()) {
- // get its conf file
- try {
- loadJob(dir, globalConf, globalSysConf);
- Thread.currentThread().setContextClassLoader(prevCl);
- }
- catch (Exception e) {
- logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
- throw new JobTypeManagerException(e);
- }
- }
+ // Loading job types
+ for (File dir : jobPluginsDir.listFiles()) {
+ if (dir.isDirectory() && dir.canRead()) {
+ try {
+ loadJobTypes(dir, plugins);
}
- } catch(Exception e) {
- e.printStackTrace();
- throw new JobTypeManagerException(e);
- } catch(Throwable t) {
- t.printStackTrace();
- throw new JobTypeManagerException(t);
- } finally {
- Thread.currentThread().setContextClassLoader(prevCl);
- }
- }
- }
-
- public static File findFilefromDir(File dir, String fn){
- if(dir.isDirectory()) {
- for(File f : dir.listFiles()) {
- if(f.getName().equals(fn)) {
- return f;
+ catch (Exception e) {
+ logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
+ throw new JobTypeManagerException(e);
}
}
}
- return null;
}
-// private void loadJobType(File dir, Props globalConf, Props globalSysConf) throws JobTypeManagerException{
-//
-// // look for common conf
-// Props conf = null;
-// Props sysConf = null;
-// File confFile = findFilefromDir(dir, COMMONCONFFILE);
-// File sysConfFile = findFilefromDir(dir, COMMONSYSCONFFILE);
-//
-// try {
-// if(confFile != null) {
-// conf = new Props(globalConf, confFile);
-// }
-// else {
-// conf = globalConf;
-// }
-// if(sysConfFile != null) {
-// sysConf = new Props(globalSysConf, sysConfFile);
-// }
-// else {
-// sysConf = globalSysConf;
-// }
-// }
-// catch (Exception e) {
-// throw new JobTypeManagerException("Failed to get common jobtype properties" + e.getCause());
-// }
-//
-// // look for jobtypeConf.properties and load it
-// for(File f: dir.listFiles()) {
-// if(f.isFile() && f.getName().equals(JOBTYPESYSCONFFILE)) {
-// loadJob(dir, f, conf, sysConf);
-// return;
-// }
-// }
-//
-// // no hit, keep looking
-// for(File f : dir.listFiles()) {
-// if(f.isDirectory() && f.canRead())
-// loadJobType(f, conf, sysConf);
-// }
-//
-// }
-
@SuppressWarnings("unchecked")
- private void loadJob(File dir, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
+ private void loadJobTypes(File pluginDir, JobTypePluginSet plugins) throws JobTypeManagerException {
+ // Directory is the jobtypeName
+ String jobTypeName = pluginDir.getName();
+ logger.info("Loading plugin " + jobTypeName);
+
+ Props pluginJobProps = null;
+ Props pluginLoadProps = null;
- Props conf = null;
- Props sysConf = null;
- File confFile = findFilefromDir(dir, JOBTYPECONFFILE);
- File sysConfFile = findFilefromDir(dir, JOBTYPESYSCONFFILE);
- if(sysConfFile == null) {
- logger.info("No job type found in " + dir.getAbsolutePath());
+ File pluginJobPropsFile = new File(pluginDir, JOBTYPECONFFILE);
+ File pluginLoadPropsFile = new File(pluginDir, JOBTYPESYSCONFFILE);
+
+ if (!pluginLoadPropsFile.exists()) {
+ logger.info("Plugin load props file " + pluginLoadPropsFile + " not found.");
return;
}
try {
- if(confFile != null) {
- conf = new Props(commonConf, confFile);
+ Props commonPluginJobProps = plugins.getCommonPluginJobProps();
+ Props commonPluginLoadProps = plugins.getCommonPluginLoadProps();
+ if (pluginJobPropsFile.exists()) {
+ pluginJobProps = new Props(commonPluginJobProps, pluginJobPropsFile);
}
else {
- conf = new Props(commonConf);
+ pluginJobProps = new Props(commonPluginJobProps);
}
- sysConf = new Props(commonSysConf, sysConfFile);
- sysConf = PropsUtils.resolveProps(sysConf);
-
+ pluginLoadProps = new Props(commonPluginLoadProps, pluginLoadPropsFile);
+ pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
}
catch (Exception e) {
throw new JobTypeManagerException("Failed to get jobtype properties" + e.getMessage());
}
- sysConf.put("plugin.dir", dir.getAbsolutePath());
+ // Add properties into the plugin set
+ pluginLoadProps.put("plugin.dir", pluginDir.getAbsolutePath());
+ plugins.addPluginLoadProps(jobTypeName, pluginLoadProps);
+ if (pluginJobProps != null) {
+ plugins.addPluginJobProps(jobTypeName, pluginJobProps);
+ }
- // use directory name as job type name
- String jobtypeName = dir.getName();
+ ClassLoader jobTypeLoader = loadJobTypeClassLoader(pluginDir, jobTypeName, plugins);
+ String jobtypeClass = pluginLoadProps.get("jobtype.class");
- String jobtypeClass = sysConf.get("jobtype.class");
+ Class<? extends Job> clazz = null;
+ try {
+ clazz = (Class<? extends Job>)jobTypeLoader.loadClass(jobtypeClass);
+ plugins.addPluginClass(jobTypeName, clazz);
+ }
+ catch (ClassNotFoundException e) {
+ throw new JobTypeManagerException(e);
+ }
- logger.info("Loading jobtype " + jobtypeName );
-
+ logger.info("Verifying job plugin " + jobTypeName);
+ try {
+ Props fakeSysProps = new Props(pluginLoadProps);
+ Props fakeJobProps = new Props(pluginJobProps);
+ @SuppressWarnings("unused")
+ Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
+ }
+ catch (Exception e) {
+ logger.info("Jobtype " + jobTypeName + " failed test!", e);
+ throw new JobExecutionException(e);
+ }
+ catch (Throwable t) {
+ logger.info("Jobtype " + jobTypeName + " failed test!", t);
+ throw new JobExecutionException(t);
+ }
+
+ logger.info("Loaded jobtype " + jobTypeName + " " + jobtypeClass);
+ }
+
+ /**
+ * Creates and loads all plugin resources (jars) into a ClassLoader
+ *
+ * @param pluginDir
+ * @param jobTypeName
+ * @param plugins
+ * @return
+ */
+ private ClassLoader loadJobTypeClassLoader(File pluginDir, String jobTypeName, JobTypePluginSet plugins) {
// sysconf says what jars/confs to load
- List<URL> resources = new ArrayList<URL>();
+ List<URL> resources = new ArrayList<URL>();
+ Props pluginLoadProps = plugins.getPluginLoaderProps(jobTypeName);
try {
//first global classpath
- logger.info("Adding global resources.");
- List<String> typeGlobalClassPath = sysConf.getStringList("jobtype.global.classpath", null, ",");
- if(typeGlobalClassPath != null) {
- for(String jar : typeGlobalClassPath) {
+ logger.info("Adding global resources for " + jobTypeName);
+ List<String> typeGlobalClassPath = pluginLoadProps.getStringList("jobtype.global.classpath", null, ",");
+ if (typeGlobalClassPath != null) {
+ for (String jar : typeGlobalClassPath) {
URL cpItem = new File(jar).toURI().toURL();
- if(!resources.contains(cpItem)) {
+ if (!resources.contains(cpItem)) {
logger.info("adding to classpath " + cpItem);
resources.add(cpItem);
}
@@ -268,78 +258,51 @@ public class JobTypeManager
//type specific classpath
logger.info("Adding type resources.");
- List<String> typeClassPath = sysConf.getStringList("jobtype.classpath", null, ",");
- if(typeClassPath != null) {
- for(String jar : typeClassPath) {
+ List<String> typeClassPath = pluginLoadProps.getStringList("jobtype.classpath", null, ",");
+ if (typeClassPath != null) {
+ for (String jar : typeClassPath) {
URL cpItem = new File(jar).toURI().toURL();
- if(!resources.contains(cpItem)) {
+ if (!resources.contains(cpItem)) {
logger.info("adding to classpath " + cpItem);
resources.add(cpItem);
}
}
}
- List<String> jobtypeLibDirs = sysConf.getStringList("jobtype.lib.dir", null, ",");
- if(jobtypeLibDirs != null) {
- for(String libDir : jobtypeLibDirs) {
- for(File f : new File(libDir).listFiles()) {
- if(f.getName().endsWith(".jar")) {
- resources.add(f.toURI().toURL());
- logger.info("adding to classpath " + f.toURI().toURL());
+ List<String> jobtypeLibDirs = pluginLoadProps.getStringList("jobtype.lib.dir", null, ",");
+ if (jobtypeLibDirs != null) {
+ for (String libDir : jobtypeLibDirs) {
+ for (File f : new File(libDir).listFiles()) {
+ if (f.getName().endsWith(".jar")) {
+ resources.add(f.toURI().toURL());
+ logger.info("adding to classpath " + f.toURI().toURL());
}
}
}
}
logger.info("Adding type override resources.");
- for(File f : dir.listFiles()) {
- if(f.getName().endsWith(".jar")) {
- resources.add(f.toURI().toURL());
- logger.info("adding to classpath " + f.toURI().toURL());
+ for (File f : pluginDir.listFiles()) {
+ if (f.getName().endsWith(".jar")) {
+ resources.add(f.toURI().toURL());
+ logger.info("adding to classpath " + f.toURI().toURL());
}
}
- } catch (MalformedURLException e) {
+ }
+ catch (MalformedURLException e) {
throw new JobTypeManagerException(e);
}
// each job type can have a different class loader
ClassLoader jobTypeLoader = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentLoader);
-
- Class<? extends Job> clazz = null;
- try {
- clazz = (Class<? extends Job>)jobTypeLoader.loadClass(jobtypeClass);
- jobToClass.put(jobtypeName, clazz);
- }
- catch (ClassNotFoundException e) {
- throw new JobTypeManagerException(e);
- }
-
- logger.info("Doing simple testing...");
- try {
- Props fakeSysProps = new Props(sysConf);
-// fakeSysProps.put("type", jobtypeName);
- Props fakeJobProps = new Props(conf);
- @SuppressWarnings("unused")
- Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
- }
- catch (Exception e) {
- logger.info("Jobtype " + jobtypeName + " failed test!", e);
- throw new JobExecutionException(e);
- }
- catch (Throwable t) {
- logger.info("Jobtype " + jobtypeName + " failed test!", t);
- throw new JobExecutionException(t);
- }
-
- logger.info("Loaded jobtype " + jobtypeName + " " + jobtypeClass);
-
- if(conf != null) jobtypeJobProps.put(jobtypeName, conf);
- jobtypeSysProps.put(jobtypeName, sysConf);
-
+ return jobTypeLoader;
}
- public Job buildJobExecutor(String jobId, Props jobProps, Logger logger)
- throws JobTypeManagerException {
+ public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException {
+ // This is final because during build phase, you should never need to swap
+ // the pluginSet for safety reasons
+ final JobTypePluginSet pluginSet = getJobTypePluginSet();
+
Job job = null;
try {
String jobType = jobProps.getString("type");
@@ -352,44 +315,36 @@ public class JobTypeManager
logger.info("Building " + jobType + " job executor. ");
- Class<? extends Object> executorClass = jobToClass.get(jobType);
-
+ Class<? extends Object> executorClass = pluginSet.getPluginClass(jobType);
if (executorClass == null) {
throw new JobExecutionException(
String.format("Job type '" + jobType + "' is unrecognized. Could not construct job[%s] of type[%s].", jobProps, jobType));
}
- Props sysConf = jobtypeSysProps.get(jobType);
-
- Props jobConf = jobProps;
- if (jobtypeJobProps.containsKey(jobType)) {
- Props p = jobtypeJobProps.get(jobType);
- for (String k : p.getKeySet()) {
- if (!jobConf.containsKey(k)) {
- jobConf.put(k, p.get(k));
+ Props pluginJobProps = pluginSet.getPluginJobProps(jobType);
+ if (pluginJobProps != null) {
+ for (String k : pluginJobProps.getKeySet()) {
+ if (!jobProps.containsKey(k)) {
+ jobProps.put(k, pluginJobProps.get(k));
}
}
}
- jobConf = PropsUtils.resolveProps(jobConf);
+ jobProps = PropsUtils.resolveProps(jobProps);
- if (sysConf != null) {
- sysConf = PropsUtils.resolveProps(sysConf);
+ Props pluginLoadProps = pluginSet.getPluginLoaderProps(jobType);
+ if (pluginLoadProps != null) {
+ pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);
}
else {
- sysConf = new Props();
+ pluginLoadProps = new Props();
}
-// logger.info("sysConf is " + sysConf);
-// logger.info("jobConf is " + jobConf);
-//
job = (Job) Utils.callConstructor(
- executorClass, jobId, sysConf, jobConf, logger);
+ executorClass, jobId, pluginLoadProps, jobProps, logger);
}
catch (Exception e) {
- //job = new InitErrorJob(jobId, e);
logger.error("Failed to build job executor for job " + jobId + e.getMessage());
throw new JobTypeManagerException("Failed to build job executor for job " + jobId, e);
- //throw new JobTypeManagerException(e);
}
catch (Throwable t) {
logger.error("Failed to build job executor for job " + jobId + t.getMessage(), t);
@@ -398,9 +353,12 @@ public class JobTypeManager
return job;
}
-
- public void registerJobType(String typeName, Class<? extends Job> jobTypeClass) {
- jobToClass.put(typeName, jobTypeClass);
+
+ /**
+ * Public for test reasons. Will need to move tests to the same package
+ */
+ public synchronized JobTypePluginSet getJobTypePluginSet() {
+ return this.pluginSet;
}
}
src/main/java/azkaban/jobtype/JobTypePluginSet.java 145(+145 -0)
diff --git a/src/main/java/azkaban/jobtype/JobTypePluginSet.java b/src/main/java/azkaban/jobtype/JobTypePluginSet.java
new file mode 100644
index 0000000..9e6ded2
--- /dev/null
+++ b/src/main/java/azkaban/jobtype/JobTypePluginSet.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobtype;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.jobExecutor.Job;
+import azkaban.utils.Props;
+
+/**
+ * Container for job type plugins
+ *
+ * This contains the jobClass objects, the properties for loading plugins, and the
+ * properties given by default to the plugin.
+ *
+ * This class is not thread safe, so adding to this class should only be populated
+ * and controlled by the JobTypeManager
+ */
+public class JobTypePluginSet {
+ private Map<String, Class<? extends Job>> jobToClass;
+ private Map<String, Props> pluginJobPropsMap;
+ private Map<String, Props> pluginLoadPropsMap;
+
+ private Props commonJobProps;
+ private Props commonLoadProps;
+
+ /**
+ * Base constructor
+ */
+ public JobTypePluginSet() {
+ jobToClass = new HashMap<String, Class<? extends Job>>();
+ pluginJobPropsMap = new HashMap<String, Props>();
+ pluginLoadPropsMap = new HashMap<String, Props>();
+ }
+
+ /**
+ * Copy constructor
+ * @param clone
+ */
+ public JobTypePluginSet(JobTypePluginSet clone) {
+ jobToClass = new HashMap<String, Class<? extends Job>>(clone.jobToClass);
+ pluginJobPropsMap = new HashMap<String, Props>(clone.pluginJobPropsMap);
+ pluginLoadPropsMap = new HashMap<String, Props>(clone.pluginLoadPropsMap);
+ commonJobProps = clone.commonJobProps;
+ commonLoadProps = clone.commonLoadProps;
+ }
+
+ /**
+ * Sets the common properties shared in every jobtype
+ * @param commonJobProps
+ */
+ public void setCommonPluginJobProps(Props commonJobProps) {
+ this.commonJobProps = commonJobProps;
+ }
+
+ /**
+ * Sets the common properties used to load every plugin
+ * @param commonLoadProps
+ */
+ public void setCommonPluginLoadProps(Props commonLoadProps) {
+ this.commonLoadProps = commonLoadProps;
+ }
+
+ /**
+ * Gets common properties for every jobtype
+ * @return
+ */
+ public Props getCommonPluginJobProps() {
+ return commonJobProps;
+ }
+
+ /**
+ * Gets the common properties used to load a plugin
+ * @return
+ */
+ public Props getCommonPluginLoadProps() {
+ return commonLoadProps;
+ }
+
+ /**
+ * Get the properties for a jobtype used to setup and load a plugin
+ *
+ * @param jobTypeName
+ * @return
+ */
+ public Props getPluginLoaderProps(String jobTypeName) {
+ return pluginLoadPropsMap.get(jobTypeName);
+ }
+
+ /**
+ * Get the properties that will be given to the plugin as default job
+ * properties.
+ *
+ * @param jobTypeName
+ * @return
+ */
+ public Props getPluginJobProps(String jobTypeName) {
+ return pluginJobPropsMap.get(jobTypeName);
+ }
+
+ /**
+ * Gets the plugin job runner class
+ *
+ * @param jobTypeName
+ * @return
+ */
+ public Class<? extends Job> getPluginClass(String jobTypeName) {
+ return jobToClass.get(jobTypeName);
+ }
+
+ /**
+ * Adds plugin jobtype class
+ */
+ public void addPluginClass(String jobTypeName, Class<? extends Job> jobTypeClass) {
+ jobToClass.put(jobTypeName, jobTypeClass);
+ }
+
+ /**
+ * Adds plugin job properties used as default runtime properties
+ */
+ public void addPluginJobProps(String jobTypeName, Props props) {
+ pluginJobPropsMap.put(jobTypeName, props);
+ }
+
+ /**
+ * Adds plugin load properties used to load the plugin
+ */
+ public void addPluginLoadProps(String jobTypeName, Props props) {
+ pluginLoadPropsMap.put(jobTypeName, props);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/azkaban/user/UserManager.java b/src/main/java/azkaban/user/UserManager.java
index 071b307..253e817 100644
--- a/src/main/java/azkaban/user/UserManager.java
+++ b/src/main/java/azkaban/user/UserManager.java
@@ -34,7 +34,7 @@ public interface UserManager {
* @throws UserManagerException If the username/password combination doesn't exist.
*/
public User getUser(String username, String password) throws UserManagerException;
-
+
/**
* Returns true if the user is valid. This is used when adding permissions for users
*
diff --git a/src/main/java/azkaban/webapp/AzkabanWebServer.java b/src/main/java/azkaban/webapp/AzkabanWebServer.java
index ebb0aa0..a0cacc8 100644
--- a/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -42,6 +42,7 @@ import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
import org.joda.time.DateTimeZone;
+import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.bio.SocketConnector;
import org.mortbay.jetty.security.SslSocketConnector;
@@ -50,6 +51,8 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.thread.QueuedThreadPool;
+import com.linkedin.restli.server.RestliServlet;
+
import azkaban.alert.Alerter;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
@@ -59,7 +62,6 @@ import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxTriggerManager;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectManager;
-
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.TriggerBasedScheduleLoader;
@@ -82,7 +84,6 @@ import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Utils;
import azkaban.webapp.servlet.AzkabanServletContextListener;
-
import azkaban.webapp.servlet.AbstractAzkabanServlet;
import azkaban.webapp.servlet.ExecutorServlet;
import azkaban.webapp.servlet.IndexRedirectServlet;
@@ -158,6 +159,10 @@ public class AzkabanWebServer extends AzkabanServer {
private MBeanServer mbeanServer;
private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
+ public static AzkabanWebServer getInstance() {
+ return app;
+ }
+
/**
* Constructor usually called by tomcat AzkabanServletContext to create the
* initial server
@@ -684,7 +689,9 @@ public class AzkabanWebServer extends AzkabanServer {
}
int maxThreads = azkabanSettings.getInt("jetty.maxThreads", DEFAULT_THREAD_NUMBER);
-
+ boolean isStatsOn = azkabanSettings.getBoolean("jetty.connector.stats", true);
+ logger.info("Setting up connector with stats on: " + isStatsOn);
+
boolean ssl;
int port;
final Server server = new Server();
@@ -714,6 +721,11 @@ public class AzkabanWebServer extends AzkabanServer {
server.addConnector(connector);
}
+ // setting stats configuration for connectors
+ for (Connector connector : server.getConnectors()) {
+ connector.setStatsOn(isStatsOn);
+ }
+
String hostname = azkabanSettings.getString("jetty.hostname", "localhost");
azkabanSettings.put("server.hostname", hostname);
azkabanSettings.put("server.port", port);
@@ -762,6 +774,10 @@ public class AzkabanWebServer extends AzkabanServer {
root.addServlet(new ServletHolder(new ScheduleServlet()),"/schedule");
root.addServlet(new ServletHolder(new JMXHttpServlet()),"/jmx");
root.addServlet(new ServletHolder(new TriggerManagerServlet()),"/triggers");
+
+ ServletHolder restliHolder = new ServletHolder(new RestliServlet());
+ restliHolder.setInitParameter("resourcePackages", "azkaban.restli");
+ root.addServlet(restliHolder, "/restli/*");
String viewerPluginDir = azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine());
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index 1f3b462..18012ee 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -21,3 +21,6 @@ executor.maxThreads=50
executor.port=12321
executor.flow.threads=30
+# JMX stats
+jetty.connector.stats=true
+executor.connector.stats=true
\ No newline at end of file
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index 7524a14..86afc73 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -41,3 +41,8 @@ job.failure.email=
job.success.email=
lockdown.create.projects=false
+
+
+# JMX stats
+jetty.connector.stats=true
+executor.connector.stats=true
\ No newline at end of file
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index 3ccb2f3..a7c3dfb 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -47,3 +47,7 @@ job.success.email=
lockdown.create.projects=false
cache.directory=cache
+
+# JMX stats
+jetty.connector.stats=true
+executor.connector.stats=true
\ No newline at end of file
src/restli/.gitignore 2(+2 -0)
diff --git a/src/restli/.gitignore b/src/restli/.gitignore
new file mode 100644
index 0000000..a6ad54c
--- /dev/null
+++ b/src/restli/.gitignore
@@ -0,0 +1,2 @@
+generatedJava
+generatedRestSpec
diff --git a/src/restli/java/azkaban/restli/ProjectManagerResource.java b/src/restli/java/azkaban/restli/ProjectManagerResource.java
new file mode 100644
index 0000000..faebbc8
--- /dev/null
+++ b/src/restli/java/azkaban/restli/ProjectManagerResource.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.restli;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.servlet.ServletException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.user.Permission;
+import azkaban.user.User;
+import azkaban.user.UserManagerException;
+import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanWebServer;
+
+import com.linkedin.restli.server.annotations.Action;
+import com.linkedin.restli.server.annotations.ActionParam;
+import com.linkedin.restli.server.annotations.RestLiActions;
+import com.linkedin.restli.server.resources.ResourceContextHolder;
+
+@RestLiActions(name = "project", namespace = "azkaban.restli")
+public class ProjectManagerResource extends ResourceContextHolder {
+ private static final Logger logger = Logger.getLogger(ProjectManagerResource.class);
+
+ public AzkabanWebServer getAzkaban() {
+ return AzkabanWebServer.getInstance();
+ }
+
+ @Action(name = "deploy")
+ public String deploy(
+ @ActionParam("sessionId") String sessionId,
+ @ActionParam("projectName") String projectName,
+ @ActionParam("packageUrl") String packageUrl)
+ throws ProjectManagerException, UserManagerException, ServletException, IOException {
+ logger.info("Deploy called. {sessionId: " + sessionId +
+ ", projectName: " + projectName +
+ ", packageUrl:" + packageUrl + "}");
+
+ String ip = (String)this.getContext().getRawRequestContext().getLocalAttr("REMOTE_ADDR");
+ User user = ResourceUtils.getUserFromSessionId(sessionId, ip);
+ ProjectManager projectManager = getAzkaban().getProjectManager();
+ Project project = projectManager.getProject(projectName);
+ if (project == null) {
+ throw new ProjectManagerException("Project '" + projectName + "' not found.");
+ }
+
+ if (!ResourceUtils.hasPermission(project, user, Permission.Type.WRITE)) {
+ String errorMsg = "User " + user.getUserId() + " has no permission to write to project " + project.getName();
+ logger.error(errorMsg);
+ throw new ProjectManagerException(errorMsg);
+ }
+
+ // Deploy stuff here. Move the code to a more formal area later.
+ logger.info("Downloading file from " + packageUrl);
+ URL url = null;
+ InputStream urlFileInputStream = null;
+ try {
+ url = new URL(packageUrl);
+ InputStream in = url.openStream();
+ urlFileInputStream = new BufferedInputStream(in);
+ } catch (MalformedURLException e) {
+ String errorMsg = "Url " + packageUrl + " is malformed.";
+ logger.error(errorMsg, e);
+ throw new ProjectManagerException(errorMsg, e);
+ } catch (IOException e) {
+ String errorMsg = "Error opening input stream to " + packageUrl;
+ logger.error(errorMsg, e);
+ throw new ProjectManagerException(errorMsg, e);
+ }
+
+ String filename = getFileName(url.getFile());
+
+ File tempDir = Utils.createTempDir();
+ OutputStream fileOutputStream = null;
+ try {
+ logger.error("Downloading " + filename);
+ File archiveFile = new File(tempDir, filename);
+ fileOutputStream = new BufferedOutputStream(new FileOutputStream(archiveFile));
+ IOUtils.copy(urlFileInputStream, fileOutputStream);
+
+ logger.error("Downloaded to " + archiveFile.toString() + " " + archiveFile.length() + " bytes.");
+ projectManager.uploadProject(project, archiveFile, "zip", user);
+ } catch (Exception e) {
+ logger.info("Installation Failed.", e);
+ String error = e.getMessage();
+ if (error.length() > 512) {
+ error = error.substring(0, 512) + "\nToo many errors to display.\n";
+ }
+
+ throw new ProjectManagerException("Installation failed: " + error);
+ }
+ finally {
+ if (tempDir.exists()) {
+ FileUtils.deleteDirectory(tempDir);
+ }
+ if (urlFileInputStream != null) {
+ urlFileInputStream.close();
+ }
+ if (fileOutputStream != null) {
+ fileOutputStream.close();
+ }
+ }
+
+ return Integer.toString(project.getVersion());
+ }
+
+ private String getFileName(String file) {
+ return file.substring(file.lastIndexOf("/") + 1);
+ }
+}
diff --git a/src/restli/java/azkaban/restli/ResourceUtils.java b/src/restli/java/azkaban/restli/ResourceUtils.java
new file mode 100644
index 0000000..18229cf
--- /dev/null
+++ b/src/restli/java/azkaban/restli/ResourceUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.restli;
+
+import azkaban.project.Project;
+import azkaban.user.Permission;
+import azkaban.user.Role;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.UserManagerException;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.session.Session;
+
+public class ResourceUtils {
+
+ public static boolean hasPermission(Project project, User user, Permission.Type type) {
+ UserManager userManager = AzkabanWebServer.getInstance().getUserManager();
+ if (project.hasPermission(user, type)) {
+ return true;
+ }
+
+ for (String roleName: user.getRoles()) {
+ Role role = userManager.getRole(roleName);
+ if (role.getPermission().isPermissionSet(type) ||
+ role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public static User getUserFromSessionId(String sessionId, String ip) throws UserManagerException {
+ Session session = AzkabanWebServer.getInstance().getSessionCache().getSession(sessionId);
+ if (session == null) {
+ throw new UserManagerException("Invalid session. Login required");
+ }
+ else if (!session.getIp().equals(ip)) {
+ throw new UserManagerException("Invalid session. Session expired.");
+ }
+
+ return session.getUser();
+ }
+}
diff --git a/src/restli/java/azkaban/restli/UserManagerResource.java b/src/restli/java/azkaban/restli/UserManagerResource.java
new file mode 100644
index 0000000..1a22263
--- /dev/null
+++ b/src/restli/java/azkaban/restli/UserManagerResource.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.restli;
+
+import java.util.UUID;
+import javax.servlet.ServletException;
+import org.apache.log4j.Logger;
+
+import azkaban.restli.user.User;
+import azkaban.user.UserManager;
+import azkaban.user.UserManagerException;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.session.Session;
+
+import com.linkedin.restli.server.annotations.Action;
+import com.linkedin.restli.server.annotations.ActionParam;
+import com.linkedin.restli.server.annotations.RestLiActions;
+import com.linkedin.restli.server.resources.ResourceContextHolder;
+
+
+@RestLiActions(name = "user", namespace = "azkaban.restli")
+public class UserManagerResource extends ResourceContextHolder {
+ private static final Logger logger = Logger.getLogger(UserManagerResource.class);
+
+ public AzkabanWebServer getAzkaban() {
+ return AzkabanWebServer.getInstance();
+ }
+
+ @Action(name = "login")
+ public String login(
+ @ActionParam("username") String username,
+ @ActionParam("password") String password)
+ throws UserManagerException, ServletException {
+ String ip = (String)this.getContext().getRawRequestContext().getLocalAttr("REMOTE_ADDR");
+ logger.info("Attempting to login for " + username + " from ip '" + ip + "'");
+
+ Session session = createSession(username, password, ip);
+
+ logger.info("Session id " + session.getSessionId() + " created for user '" + username + "' and ip " + ip);
+ return session.getSessionId();
+ }
+
+ @Action(name = "getUserFromSessionId")
+ public User getUserFromSessionId(@ActionParam("sessionId") String sessionId) {
+ String ip = (String)this.getContext().getRawRequestContext().getLocalAttr("REMOTE_ADDR");
+ Session session = getSessionFromSessionId(sessionId, ip);
+ azkaban.user.User azUser = session.getUser();
+
+ // Fill out the restli object with properties from the Azkaban user
+ User user = new User();
+ user.setUserId(azUser.getUserId());
+ user.setEmail(azUser.getEmail());
+ return user;
+ }
+
+ private Session createSession(String username, String password, String ip)
+ throws UserManagerException, ServletException {
+ UserManager manager = getAzkaban().getUserManager();
+ azkaban.user.User user = manager.getUser(username, password);
+
+ String randomUID = UUID.randomUUID().toString();
+ Session session = new Session(randomUID, user, ip);
+ getAzkaban().getSessionCache().addSession(session);
+
+ return session;
+ }
+
+ private Session getSessionFromSessionId(String sessionId, String remoteIp) {
+ if (sessionId == null) {
+ return null;
+ }
+
+ Session session = getAzkaban().getSessionCache().getSession(sessionId);
+ // Check if the IP's are equal. If not, we invalidate the sesson.
+ if (session == null || !remoteIp.equals(session.getIp())) {
+ return null;
+ }
+
+ return session;
+ }
+}
\ No newline at end of file
diff --git a/src/restli/schemas/azkaban/restli/user/User.pdsc b/src/restli/schemas/azkaban/restli/user/User.pdsc
new file mode 100644
index 0000000..156b380
--- /dev/null
+++ b/src/restli/schemas/azkaban/restli/user/User.pdsc
@@ -0,0 +1,10 @@
+{
+ "type": "record",
+ "name": "User",
+ "namespace": "azkaban.restli.user",
+ "doc": "Azkaban User restli info",
+ "fields": [
+ {"name": "userId", "type": "string","doc": "The username this session"},
+ {"name": "email", "type": "string","doc": "User email"}
+ ]
+}
\ No newline at end of file
diff --git a/src/web/js/azkaban/view/flow-execution-list.js b/src/web/js/azkaban/view/flow-execution-list.js
index 5a844a7..f99fc61 100644
--- a/src/web/js/azkaban/view/flow-execution-list.js
+++ b/src/web/js/azkaban/view/flow-execution-list.js
@@ -145,7 +145,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
$(attemptBox).bind("contextmenu", attemptRightClick);
$(progressBar).before(attemptBox);
- attemptBox.job = nodeId;
+ attemptBox.job = node.id;
attemptBox.attempt = a;
}
}
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index c87f2c6..cc3b1e5 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -37,7 +37,7 @@ public class LocalFlowWatcherTest {
@Before
public void setUp() throws Exception {
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
+ jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
}
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index 45c3a85..d8e94dd 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -38,7 +38,7 @@ public class RemoteFlowWatcherTest {
@Before
public void setUp() throws Exception {
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
+ jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
}
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index aa1eee5..df9cac1 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -23,6 +23,7 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
@@ -73,8 +74,10 @@ public class FlowRunnerPipelineTest {
}
workingDir.mkdirs();
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
- jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+ JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+
+ pluginSet.addPluginClass("java", JavaJob.class);
+ pluginSet.addPluginClass("test", InteractiveTestJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
fakeExecutorLoader = new MockExecutorLoader();
project = new Project(1, "testProject");
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 21b43f1..2e708fe 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -21,8 +21,10 @@ import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.test.execapp.MockProjectLoader;
import azkaban.test.executor.InteractiveTestJob;
import azkaban.test.executor.JavaJob;
import azkaban.utils.JSONUtils;
@@ -46,8 +48,9 @@ public class FlowRunnerTest {
workingDir.mkdirs();
}
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
- jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+ JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+ pluginSet.addPluginClass("java", JavaJob.class);
+ pluginSet.addPluginClass("test", InteractiveTestJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
InteractiveTestJob.clearTestJobs();
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 7a67da6..0c9d7b9 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -21,6 +21,7 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
@@ -92,8 +93,10 @@ public class FlowRunnerTest2 {
}
workingDir.mkdirs();
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
- jobtypeManager.registerJobType("test", InteractiveTestJob.class);
+ JobTypePluginSet pluginSet = jobtypeManager.getJobTypePluginSet();
+
+ pluginSet.addPluginClass("java", JavaJob.class);
+ pluginSet.addPluginClass("test", InteractiveTestJob.class);
fakeProjectLoader = new MockProjectLoader(workingDir);
fakeExecutorLoader = new MockExecutorLoader();
project = new Project(1, "testProject");
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 69d6296..6fcbb11 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -42,7 +42,8 @@ public class JobRunnerTest {
}
workingDir.mkdirs();
jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
- jobtypeManager.registerJobType("java", JavaJob.class);
+
+ jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
}
@After
diff --git a/unit/java/azkaban/test/jobtype/FakeJavaJob.java b/unit/java/azkaban/test/jobtype/FakeJavaJob.java
new file mode 100644
index 0000000..65ca1ba
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/FakeJavaJob.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import org.apache.log4j.Logger;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.utils.Props;
+
+public class FakeJavaJob extends JavaProcessJob {
+ public FakeJavaJob(String jobid, Props sysProps, Props jobProps, Logger log) {
+ super(jobid, sysProps, jobProps, log);
+ }
+}
+
diff --git a/unit/java/azkaban/test/jobtype/FakeJavaJob2.java b/unit/java/azkaban/test/jobtype/FakeJavaJob2.java
new file mode 100644
index 0000000..581aeff
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/FakeJavaJob2.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import org.apache.log4j.Logger;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.utils.Props;
+
+public class FakeJavaJob2 extends JavaProcessJob {
+ public FakeJavaJob2(String jobid, Props sysProps, Props jobProps, Logger log) {
+ super(jobid, sysProps, jobProps, log);
+ }
+}
+
diff --git a/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
new file mode 100644
index 0000000..a4c5cb0
--- /dev/null
+++ b/unit/java/azkaban/test/jobtype/JobTypeManagerTest.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.test.jobtype;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.jobExecutor.Job;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
+import azkaban.utils.Props;
+
+
+/**
+ * Test the flow run, especially with embedded flows.
+ * Files are in unit/plugins/jobtypes
+ *
+ */
+public class JobTypeManagerTest {
+ public static String TEST_PLUGIN_DIR = "jobtypes_test";
+ private Logger logger = Logger.getLogger(JobTypeManagerTest.class);
+ private JobTypeManager manager;
+
+ public JobTypeManagerTest() {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ File jobTypeDir = new File(TEST_PLUGIN_DIR);
+ jobTypeDir.mkdirs();
+
+ FileUtils.copyDirectory(new File("unit/plugins/jobtypes"), jobTypeDir);
+ manager = new JobTypeManager(TEST_PLUGIN_DIR, this.getClass().getClassLoader());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtils.deleteDirectory(new File(TEST_PLUGIN_DIR));
+ }
+
+ /**
+ * Tests that the common and common private properties are loaded correctly
+ * @throws Exception
+ */
+ @Test
+ public void testCommonPluginProps() throws Exception {
+ JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+
+ Props props = pluginSet.getCommonPluginJobProps();
+ System.out.println(props.toString());
+ assertEquals("commonprop1", props.getString("commonprop1"));
+ assertEquals("commonprop2", props.getString("commonprop2"));
+ assertEquals("commonprop3", props.getString("commonprop3"));
+
+ Props priv = pluginSet.getCommonPluginLoadProps();
+ assertEquals("commonprivate1", priv.getString("commonprivate1"));
+ assertEquals("commonprivate2", priv.getString("commonprivate2"));
+ assertEquals("commonprivate3", priv.getString("commonprivate3"));
+ }
+
+ /**
+ * Tests that the proper classes were loaded and that the common and the load
+ * properties are properly loaded.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLoadedClasses() throws Exception {
+ JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+
+ Props props = pluginSet.getCommonPluginJobProps();
+ System.out.println(props.toString());
+ assertEquals("commonprop1", props.getString("commonprop1"));
+ assertEquals("commonprop2", props.getString("commonprop2"));
+ assertEquals("commonprop3", props.getString("commonprop3"));
+ assertNull(props.get("commonprivate1"));
+
+ Props priv = pluginSet.getCommonPluginLoadProps();
+ assertEquals("commonprivate1", priv.getString("commonprivate1"));
+ assertEquals("commonprivate2", priv.getString("commonprivate2"));
+ assertEquals("commonprivate3", priv.getString("commonprivate3"));
+
+ // Testing the anothertestjobtype
+ Class<? extends Job> aPluginClass = pluginSet.getPluginClass("anothertestjob");
+ assertEquals("azkaban.test.jobtype.FakeJavaJob", aPluginClass.getName());
+ Props ajobProps = pluginSet.getPluginJobProps("anothertestjob");
+ Props aloadProps = pluginSet.getPluginLoaderProps("anothertestjob");
+
+ // Loader props
+ assertEquals("lib/*", aloadProps.get("jobtype.classpath"));
+ assertEquals("azkaban.test.jobtype.FakeJavaJob", aloadProps.get("jobtype.class"));
+ assertEquals("commonprivate1", aloadProps.get("commonprivate1"));
+ assertEquals("commonprivate2", aloadProps.get("commonprivate2"));
+ assertEquals("commonprivate3", aloadProps.get("commonprivate3"));
+ // Job props
+ assertEquals("commonprop1", ajobProps.get("commonprop1"));
+ assertEquals("commonprop2", ajobProps.get("commonprop2"));
+ assertEquals("commonprop3", ajobProps.get("commonprop3"));
+ assertNull(ajobProps.get("commonprivate1"));
+
+ Class<? extends Job> tPluginClass = pluginSet.getPluginClass("testjob");
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", tPluginClass.getName());
+ Props tjobProps = pluginSet.getPluginJobProps("testjob");
+ Props tloadProps = pluginSet.getPluginLoaderProps("testjob");
+
+ // Loader props
+ assertNull(tloadProps.get("jobtype.classpath"));
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", tloadProps.get("jobtype.class"));
+ assertEquals("commonprivate1", tloadProps.get("commonprivate1"));
+ assertEquals("commonprivate2", tloadProps.get("commonprivate2"));
+ assertEquals("private3", tloadProps.get("commonprivate3"));
+ assertEquals("0", tloadProps.get("testprivate"));
+ // Job props
+ assertEquals("commonprop1", tjobProps.get("commonprop1"));
+ assertEquals("commonprop2", tjobProps.get("commonprop2"));
+ assertEquals("1", tjobProps.get("pluginprops1"));
+ assertEquals("2", tjobProps.get("pluginprops2"));
+ assertEquals("3", tjobProps.get("pluginprops3"));
+ assertEquals("pluginprops", tjobProps.get("commonprop3"));
+ // Testing that the private properties aren't shared with the public ones
+ assertNull(tjobProps.get("commonprivate1"));
+ assertNull(tjobProps.get("testprivate"));
+ }
+
+ /**
+ * Test building classes
+ * @throws Exception
+ */
+ @Test
+ public void testBuildClass() throws Exception {
+ Props jobProps = new Props();
+ jobProps.put("type", "anothertestjob");
+ jobProps.put("test","test1");
+ jobProps.put("pluginprops3","4");
+ Job job = manager.buildJobExecutor("anothertestjob", jobProps, logger);
+
+ assertTrue(job instanceof FakeJavaJob);
+ FakeJavaJob fjj = (FakeJavaJob)job;
+
+ Props props = fjj.getJobProps();
+ assertEquals("test1", props.get("test"));
+ assertNull(props.get("pluginprops1"));
+ assertEquals("4", props.get("pluginprops3"));
+ assertEquals("commonprop1", props.get("commonprop1"));
+ assertEquals("commonprop2", props.get("commonprop2"));
+ assertEquals("commonprop3", props.get("commonprop3"));
+ assertNull(props.get("commonprivate1"));
+ }
+
+ /**
+ * Test building classes 2
+ * @throws Exception
+ */
+ @Test
+ public void testBuildClass2() throws Exception {
+ Props jobProps = new Props();
+ jobProps.put("type", "testjob");
+ jobProps.put("test","test1");
+ jobProps.put("pluginprops3","4");
+ Job job = manager.buildJobExecutor("testjob", jobProps, logger);
+
+ assertTrue(job instanceof FakeJavaJob2);
+ FakeJavaJob2 fjj = (FakeJavaJob2)job;
+
+ Props props = fjj.getJobProps();
+ assertEquals("test1", props.get("test"));
+ assertEquals("1", props.get("pluginprops1"));
+ assertEquals("2", props.get("pluginprops2"));
+ assertEquals("4", props.get("pluginprops3")); // Overridden value
+ assertEquals("commonprop1", props.get("commonprop1"));
+ assertEquals("commonprop2", props.get("commonprop2"));
+ assertEquals("pluginprops", props.get("commonprop3"));
+ assertNull(props.get("commonprivate1"));
+ }
+
+ /**
+ * Test out reloading properties
+ * @throws Exception
+ */
+ @Test
+ public void testResetPlugins() throws Exception {
+ // Add a plugins file to the anothertestjob folder
+ File anothertestfolder = new File(TEST_PLUGIN_DIR + "/anothertestjob");
+ Props pluginProps = new Props();
+ pluginProps.put("test1", "1");
+ pluginProps.put("test2", "2");
+ pluginProps.put("pluginprops3","4");
+ pluginProps.storeFlattened(new File(anothertestfolder, "plugin.properties"));
+
+ // clone the testjob folder
+ File testFolder = new File(TEST_PLUGIN_DIR + "/testjob");
+ FileUtils.copyDirectory(testFolder, new File(TEST_PLUGIN_DIR + "/newtestjob"));
+
+ // change the common properties
+ Props commonPlugin = new Props(null, TEST_PLUGIN_DIR + "/common.properties");
+ commonPlugin.put("commonprop1", "1");
+ commonPlugin.put("newcommonprop1", "2");
+ commonPlugin.removeLocal("commonprop2");
+ commonPlugin.storeFlattened(new File(TEST_PLUGIN_DIR + "/common.properties"));
+
+ // change the common properties
+ Props commonPrivate = new Props(null, TEST_PLUGIN_DIR + "/commonprivate.properties");
+ commonPrivate.put("commonprivate1", "1");
+ commonPrivate.put("newcommonprivate1", "2");
+ commonPrivate.removeLocal("commonprivate2");
+ commonPrivate.storeFlattened(new File(TEST_PLUGIN_DIR + "/commonprivate.properties"));
+
+ // change testjob private property
+ Props loadProps = new Props(null, TEST_PLUGIN_DIR + "/testjob/private.properties");
+ loadProps.put("privatetest", "test");
+
+ /*
+ * Reload the plugins here!!
+ */
+ manager.loadPlugins();
+
+ // Checkout common props
+ JobTypePluginSet pluginSet = manager.getJobTypePluginSet();
+ Props commonProps = pluginSet.getCommonPluginJobProps();
+ assertEquals("1", commonProps.get("commonprop1"));
+ assertEquals("commonprop3", commonProps.get("commonprop3"));
+ assertEquals("2", commonProps.get("newcommonprop1"));
+ assertNull(commonProps.get("commonprop2"));
+
+ // Checkout common private
+ Props commonPrivateProps = pluginSet.getCommonPluginLoadProps();
+ assertEquals("1", commonPrivateProps.get("commonprivate1"));
+ assertEquals("commonprivate3", commonPrivateProps.get("commonprivate3"));
+ assertEquals("2", commonPrivateProps.get("newcommonprivate1"));
+ assertNull(commonPrivateProps.get("commonprivate2"));
+
+ // Verify anothertestjob changes
+ Class<? extends Job> atjClass = pluginSet.getPluginClass("anothertestjob");
+ assertEquals("azkaban.test.jobtype.FakeJavaJob", atjClass.getName());
+ Props ajobProps = pluginSet.getPluginJobProps("anothertestjob");
+ assertEquals("1", ajobProps.get("test1"));
+ assertEquals("2", ajobProps.get("test2"));
+ assertEquals("4", ajobProps.get("pluginprops3"));
+ assertEquals("commonprop3", ajobProps.get("commonprop3"));
+
+ Props aloadProps = pluginSet.getPluginLoaderProps("anothertestjob");
+ assertEquals("1", aloadProps.get("commonprivate1"));
+ assertNull(aloadProps.get("commonprivate2"));
+ assertEquals("commonprivate3", aloadProps.get("commonprivate3"));
+
+ // Verify testjob changes
+ Class<? extends Job> tjClass = pluginSet.getPluginClass("testjob");
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", tjClass.getName());
+ Props tjobProps = pluginSet.getPluginJobProps("testjob");
+ assertEquals("1", tjobProps.get("commonprop1"));
+ assertEquals("2", tjobProps.get("newcommonprop1"));
+ assertEquals("1", tjobProps.get("pluginprops1"));
+ assertEquals("2", tjobProps.get("pluginprops2"));
+ assertEquals("3", tjobProps.get("pluginprops3"));
+ assertEquals("pluginprops", tjobProps.get("commonprop3"));
+ assertNull(tjobProps.get("commonprop2"));
+
+ Props tloadProps = pluginSet.getPluginLoaderProps("testjob");
+ assertNull(tloadProps.get("jobtype.classpath"));
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", tloadProps.get("jobtype.class"));
+ assertEquals("1", tloadProps.get("commonprivate1"));
+ assertNull(tloadProps.get("commonprivate2"));
+ assertEquals("private3", tloadProps.get("commonprivate3"));
+
+ // Verify newtestjob
+ Class<? extends Job> ntPluginClass = pluginSet.getPluginClass("newtestjob");
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", ntPluginClass.getName());
+ Props ntjobProps = pluginSet.getPluginJobProps("newtestjob");
+ Props ntloadProps = pluginSet.getPluginLoaderProps("newtestjob");
+
+ // Loader props
+ assertNull(ntloadProps.get("jobtype.classpath"));
+ assertEquals("azkaban.test.jobtype.FakeJavaJob2", ntloadProps.get("jobtype.class"));
+ assertEquals("1", ntloadProps.get("commonprivate1"));
+ assertNull(ntloadProps.get("commonprivate2"));
+ assertEquals("private3", ntloadProps.get("commonprivate3"));
+ assertEquals("0", ntloadProps.get("testprivate"));
+ // Job props
+ assertEquals("1", ntjobProps.get("commonprop1"));
+ assertNull(ntjobProps.get("commonprop2"));
+ assertEquals("1", ntjobProps.get("pluginprops1"));
+ assertEquals("2", ntjobProps.get("pluginprops2"));
+ assertEquals("3", ntjobProps.get("pluginprops3"));
+ assertEquals("pluginprops", ntjobProps.get("commonprop3"));
+ }
+}
diff --git a/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar b/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar
new file mode 100644
index 0000000..51eb0de
Binary files /dev/null and b/unit/plugins/jobtypes/anothertestjob/lib/fakejobtype.jar differ
diff --git a/unit/plugins/jobtypes/anothertestjob/private.properties b/unit/plugins/jobtypes/anothertestjob/private.properties
new file mode 100644
index 0000000..8e95c94
--- /dev/null
+++ b/unit/plugins/jobtypes/anothertestjob/private.properties
@@ -0,0 +1,2 @@
+jobtype.classpath=lib/*
+jobtype.class=azkaban.test.jobtype.FakeJavaJob
diff --git a/unit/plugins/jobtypes/common.properties b/unit/plugins/jobtypes/common.properties
new file mode 100644
index 0000000..2823a83
--- /dev/null
+++ b/unit/plugins/jobtypes/common.properties
@@ -0,0 +1,3 @@
+commonprop1=commonprop1
+commonprop2=commonprop2
+commonprop3=commonprop3
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/commonprivate.properties b/unit/plugins/jobtypes/commonprivate.properties
new file mode 100644
index 0000000..7d46d43
--- /dev/null
+++ b/unit/plugins/jobtypes/commonprivate.properties
@@ -0,0 +1,3 @@
+commonprivate1=commonprivate1
+commonprivate2=commonprivate2
+commonprivate3=commonprivate3
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/testjob/fakejobtype.jar b/unit/plugins/jobtypes/testjob/fakejobtype.jar
new file mode 100644
index 0000000..51eb0de
Binary files /dev/null and b/unit/plugins/jobtypes/testjob/fakejobtype.jar differ
diff --git a/unit/plugins/jobtypes/testjob/plugin.properties b/unit/plugins/jobtypes/testjob/plugin.properties
new file mode 100644
index 0000000..587081a
--- /dev/null
+++ b/unit/plugins/jobtypes/testjob/plugin.properties
@@ -0,0 +1,4 @@
+pluginprops1=1
+pluginprops2=2
+pluginprops3=3
+commonprop3=pluginprops
\ No newline at end of file
diff --git a/unit/plugins/jobtypes/testjob/private.properties b/unit/plugins/jobtypes/testjob/private.properties
new file mode 100644
index 0000000..2bba593
--- /dev/null
+++ b/unit/plugins/jobtypes/testjob/private.properties
@@ -0,0 +1,3 @@
+jobtype.class=azkaban.test.jobtype.FakeJavaJob2
+commonprivate3=private3
+testprivate=0
\ No newline at end of file