azkaban-uncached
Changes
.classpath 7(+2 -5)
src/java/azkaban/execapp/FlowRunner.java 14(+5 -9)
src/java/azkaban/execapp/JobRunner.java 65(+9 -56)
src/java/azkaban/jobtype/JobTypeManager.java 290(+290 -0)
src/java/azkaban/utils/FileIOUtils.java 18(+18 -0)
src/java/azkaban/webapp/AzkabanWebServer.java 173(+135 -38)
src/java/azkaban/webapp/session/Session.java 12(+12 -0)
unit/executions/logtest/largeLog1.log.short 68(+68 -0)
Details
.classpath 7(+2 -5)
diff --git a/.classpath b/.classpath
index 12ee55d..ed3781d 100644
--- a/.classpath
+++ b/.classpath
@@ -23,14 +23,11 @@
<classpathentry kind="lib" path="lib/commons-io-2.4.jar"/>
<classpathentry kind="lib" path="lib/httpcore-4.2.1.jar"/>
<classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/>
- <classpathentry kind="lib" path="lib/hadoop-core-1.0.2-p1.jar"/>
<classpathentry kind="lib" path="lib/guava-13.0.1.jar"/>
- <classpathentry kind="lib" path="lib/pig-0.9.1-withouthadoop.jar"/>
- <classpathentry kind="lib" path="plugins/ldap/li-azkaban-ldap.jar"/>
<classpathentry kind="lib" path="lib/commons-email-1.2.jar"/>
<classpathentry kind="lib" path="lib/mail-1.4.5.jar"/>
- <classpathentry kind="lib" path="lib/avro-1.4.1.jar"/>
<classpathentry kind="lib" path="lib/commons-configuration-1.8.jar"/>
- <classpathentry kind="lib" path="lib/voldemort-0.96.jar"/>
+ <classpathentry kind="lib" path="lib/commons-dbutils-1.5.jar"/>
+ <classpathentry kind="lib" path="lib/commons-dbcp-1.4.jar"/>
<classpathentry kind="output" path="dist/classes"/>
</classpath>
src/java/azkaban/execapp/FlowRunner.java 14(+5 -9)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index beab501..6712ba8 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -32,8 +32,7 @@ import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.FlowProps;
-import azkaban.jobtype.JobtypeManager;
-import azkaban.security.HadoopSecurityManager;
+import azkaban.jobtype.JobTypeManager;
import azkaban.utils.Props;
public class FlowRunner extends EventHandler implements Runnable {
@@ -59,8 +58,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private Map<String, Props> jobOutputProps = new HashMap<String, Props>();
private Props globalProps;
- private final JobtypeManager jobtypeManager;
- private final HadoopSecurityManager hadoopSecurityManager;
+ private final JobTypeManager jobtypeManager;
private JobRunnerEventListener listener = new JobRunnerEventListener();
private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
@@ -74,14 +72,13 @@ public class FlowRunner extends EventHandler implements Runnable {
private boolean flowFinished = false;
private boolean flowCancelled = false;
- public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, JobtypeManager jobtypeManager, HadoopSecurityManager hadoopSecurityManager) throws ExecutorManagerException {
+ public FlowRunner(ExecutableFlow flow, ExecutorLoader executorLoader, JobTypeManager jobtypeManager) throws ExecutorManagerException {
this.execId = flow.getExecutionId();
this.flow = flow;
this.executorLoader = executorLoader;
this.executorService = Executors.newFixedThreadPool(numThreads);
this.execDir = new File(flow.getExecutionPath());
this.jobtypeManager = jobtypeManager;
- this.hadoopSecurityManager = hadoopSecurityManager;
}
public FlowRunner setGlobalProps(Props globalProps) {
@@ -316,7 +313,8 @@ public class FlowRunner extends EventHandler implements Runnable {
logger.error("Error loading job file " + source + " for job " + node.getJobId());
}
- JobRunner jobRunner = new JobRunner(node, prop, path.getParentFile(), executorLoader, jobtypeManager, hadoopSecurityManager);
+ // should have one prop with system secrets, the other user level props
+ JobRunner jobRunner = new JobRunner(node, new Props(), prop, path.getParentFile(), executorLoader, jobtypeManager);
jobRunner.addListener(listener);
return jobRunner;
@@ -510,8 +508,6 @@ public class FlowRunner extends EventHandler implements Runnable {
queueNextJobs(node);
}
- runner.cleanup();
-
if (isFlowFinished()) {
logger.info("Flow appears finished. Cleaning up.");
flowFinished = true;
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 74e5ce4..3cb9978 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -40,10 +40,8 @@ import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
-import azkaban.jobtype.JobtypeManager;
+import azkaban.jobtype.JobTypeManager;
-import azkaban.security.DefaultHadoopSecurityManager;
-import azkaban.security.HadoopSecurityManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
@@ -76,8 +74,7 @@ public class FlowRunnerManager implements EventListener {
private ExecutorLoader executorLoader;
private ProjectLoader projectLoader;
- private JobtypeManager jobtypeManager;
- private HadoopSecurityManager hadoopSecurityManager;
+ private JobTypeManager jobtypeManager;
private Props globalProps;
@@ -107,35 +104,8 @@ public class FlowRunnerManager implements EventListener {
cleanerThread = new CleanerThread();
cleanerThread.start();
- jobtypeManager = new JobtypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, null), parentClassLoader);
+ jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, null), parentClassLoader);
- hadoopSecurityManager = loadHadoopSecurityManager(props);
-
- }
-
- private HadoopSecurityManager loadHadoopSecurityManager(Props props) {
-
- Class<?> hadoopSecurityManagerClass = props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, null);
- logger.info("Loading hadoop security manager class " + hadoopSecurityManagerClass.getName());
- HadoopSecurityManager hadoopSecurityManager = null;
-
- if (hadoopSecurityManagerClass != null && hadoopSecurityManagerClass.getConstructors().length > 0) {
-
- try {
- Constructor<?> hsmConstructor = hadoopSecurityManagerClass.getConstructor(Props.class);
- hadoopSecurityManager = (HadoopSecurityManager) hsmConstructor.newInstance(props);
- }
- catch (Exception e) {
- logger.error("Could not instantiate Hadoop Security Manager "+ hadoopSecurityManagerClass.getName());
- throw new RuntimeException(e);
- }
- }
- else {
- hadoopSecurityManager = new DefaultHadoopSecurityManager();
- }
-
- return hadoopSecurityManager;
-
}
public Props getGlobalProps() {
@@ -274,7 +244,7 @@ public class FlowRunnerManager implements EventListener {
setupFlow(flow);
// Setup flow runner
- FlowRunner runner = new FlowRunner(flow, executorLoader, jobtypeManager, hadoopSecurityManager);
+ FlowRunner runner = new FlowRunner(flow, executorLoader, jobtypeManager);
runner.setGlobalProps(globalProps);
runner.addListener(this);
src/java/azkaban/execapp/JobRunner.java 65(+9 -56)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 298fa6c..66a8a76 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -18,7 +18,6 @@ package azkaban.execapp;
import java.io.File;
import java.io.IOException;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Layout;
@@ -35,17 +34,16 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobExecutor.AbstractProcessJob;
import azkaban.jobExecutor.Job;
-import azkaban.jobtype.JobtypeManager;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypeManagerException;
-import azkaban.security.HadoopSecurityManager;
-import azkaban.security.SecurityManagerException;
import azkaban.utils.Props;
-import azkaban.utils.SecurityUtils;
public class JobRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
private ExecutorLoader loader;
+ private Props sysProps;
private Props props;
private Props outputProps;
private ExecutableNode node;
@@ -62,17 +60,16 @@ public class JobRunner extends EventHandler implements Runnable {
private static final Object logCreatorLock = new Object();
private Object syncObject = new Object();
- private final JobtypeManager jobtypeManager;
- private final HadoopSecurityManager hadoopSecurityManager;
+ private final JobTypeManager jobtypeManager;
- public JobRunner(ExecutableNode node, Props props, File workingDir, ExecutorLoader loader, JobtypeManager jobtypeManager, HadoopSecurityManager hadoopSecurityManager) {
+ public JobRunner(ExecutableNode node, Props sysProps, Props props, File workingDir, ExecutorLoader loader, JobTypeManager jobtypeManager) {
+ this.sysProps = sysProps;
this.props = props;
this.node = node;
this.workingDir = workingDir;
this.executionId = node.getExecutionId();
this.loader = loader;
this.jobtypeManager = jobtypeManager;
- this.hadoopSecurityManager = hadoopSecurityManager;
}
public ExecutableNode getNode() {
@@ -186,7 +183,7 @@ public class JobRunner extends EventHandler implements Runnable {
this.fireEventListeners(event);
}
- private boolean prepareJob() {
+ private boolean prepareJob() throws RuntimeException{
// Check pre conditions
if (props == null) {
node.setStatus(Status.FAILED);
@@ -201,43 +198,15 @@ public class JobRunner extends EventHandler implements Runnable {
logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
node.setStatus(Status.RUNNING);
-
+ props.put(AbstractProcessJob.JOB_FULLPATH, props.getSource());
props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
//job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getJobId(), props, logger);
- job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
- if(props.containsKey(HadoopSecurityManager.TO_PROXY)) {
- try{
- getHadoopTokens(props);
- }
- catch (Exception e) {
- logger.error("Failed to get Hadoop tokens. " + e);
- }
- }
+ job = jobtypeManager.buildJobExecutor(node.getJobId(), sysProps, props, logger);
}
return true;
}
- protected void getHadoopTokens(Props props) {
-
- File tokenFile = null;
- try {
- tokenFile = File.createTempFile("mr-azkaban", ".token");
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- try {
- hadoopSecurityManager.prefetchToken(tokenFile, props.getString(HadoopSecurityManager.TO_PROXY));
- } catch (SecurityManagerException e) {
- e.printStackTrace();
- }
-
- props.put("env."+UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath());
-
- }
-
private void runJob() {
try {
job.run();
@@ -298,20 +267,4 @@ public class JobRunner extends EventHandler implements Runnable {
return logFile;
}
- public void cleanup() {
- // clean up the tokens.
- try{
- if(props.containsKey("env."+UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION)) {
- String tokenFile = props.getString("env."+UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- if(tokenFile != null) {
- File f = new File(tokenFile);
- f.delete();
- }
- }
- }
- catch(Exception e) {
- logger.error("Failed to clean up hadoop token file.");
- }
-
- }
}
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index 560df54..75b2723 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -120,7 +120,7 @@ public class ExecutorMailer {
message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
}
for (String reasons: extraReasons) {
- message.println("<li>" + extraReasons + "</li>");
+ message.println("<li>" + reasons + "</li>");
}
message.println("</ul>");
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index e0a9604..d278d59 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -17,7 +17,6 @@
package azkaban.executor;
import java.io.IOException;
-import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
index 7fa242a..f6a49c1 100644
--- a/src/java/azkaban/jobExecutor/AbstractProcessJob.java
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -48,24 +48,35 @@ public abstract class AbstractProcessJob extends AbstractJob {
protected final String _jobPath;
- protected volatile Props _props;
+ protected volatile Props jobProps;
+ protected volatile Props sysProps;
protected String _cwd;
private volatile Props generatedPropeties;
- protected AbstractProcessJob(String jobid, final Props props, final Logger log) {
+ protected AbstractProcessJob(String jobid, final Props sysProps, final Props jobProps, final Logger log) {
super(jobid, log);
- _props = props;
- _jobPath = props.getString(JOB_FULLPATH, props.getSource());
+ this.jobProps = jobProps;
+ this.sysProps = sysProps;
+ _jobPath = jobProps.getString(JOB_FULLPATH, jobProps.getSource());
_cwd = getWorkingDirectory();
this.log = log;
}
+ @Deprecated
public Props getProps() {
- return _props;
+ return jobProps;
+ }
+
+ public Props getJobProps() {
+ return jobProps;
+ }
+
+ public Props getSysProps() {
+ return sysProps;
}
public String getJobPath() {
@@ -73,7 +84,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
}
protected void resolveProps() {
- _props = PropsUtils.resolveProps(_props);
+ jobProps = PropsUtils.resolveProps(jobProps);
}
@Override
@@ -91,11 +102,11 @@ public abstract class AbstractProcessJob extends AbstractJob {
File[] files = new File[2];
files[0] = createFlattenedPropsFile(_cwd);
- _props.put(ENV_PREFIX + JOB_PROP_ENV, files[0].getAbsolutePath());
- _props.put(ENV_PREFIX + JOB_NAME_ENV, getId());
+ jobProps.put(ENV_PREFIX + JOB_PROP_ENV, files[0].getAbsolutePath());
+ jobProps.put(ENV_PREFIX + JOB_NAME_ENV, getId());
files[1] = createOutputPropsFile(getId(), _cwd);
- _props.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE,
+ jobProps.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE,
files[1].getAbsolutePath());
return files;
@@ -158,7 +169,7 @@ public abstract class AbstractProcessJob extends AbstractJob {
File tempFile = null;
try {
tempFile = File.createTempFile(getId() + "_", "_tmp", directory);
- _props.storeFlattened(tempFile);
+ jobProps.storeFlattened(tempFile);
} catch (IOException e) {
throw new RuntimeException("Failed to create temp property file ", e);
}
diff --git a/src/java/azkaban/jobExecutor/JavaProcessJob.java b/src/java/azkaban/jobExecutor/JavaProcessJob.java
index 1b3d115..0009e32 100644
--- a/src/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/src/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -40,8 +40,8 @@ public class JavaProcessJob extends ProcessJob {
public static String JAVA_COMMAND = "java";
- public JavaProcessJob(String jobid, Props prop, Logger logger) {
- super(jobid, prop, logger);
+ public JavaProcessJob(String jobid, Props sysProps, Props jobProps, Logger logger) {
+ super(jobid, sysProps, jobProps, logger);
}
@Override
diff --git a/src/java/azkaban/jobExecutor/LongArgJob.java b/src/java/azkaban/jobExecutor/LongArgJob.java
index 425ef57..4d28f08 100644
--- a/src/java/azkaban/jobExecutor/LongArgJob.java
+++ b/src/java/azkaban/jobExecutor/LongArgJob.java
@@ -39,13 +39,13 @@ public abstract class LongArgJob extends AbstractProcessJob {
private final AzkabanProcessBuilder builder;
private volatile AzkabanProcess process;
- public LongArgJob(String jobid, String[] command, Props prop, Logger log) {
- this(jobid, command, prop, log, new HashSet<String>(0));
+ public LongArgJob(String jobid, String[] command, Props sysProps, Props jobProps, Logger log) {
+ this(jobid, command, sysProps, jobProps, log, new HashSet<String>(0));
}
- public LongArgJob(String jobid, String[] command, Props prop, Logger log, Set<String> suppressedKeys) {
+ public LongArgJob(String jobid, String[] command, Props sysProps, Props jobProp, Logger log, Set<String> suppressedKeys) {
//super(command, desc);
- super(jobid, prop, log);
+ super(jobid, sysProps, jobProp, log);
//String cwd = descriptor.getProps().getString(WORKING_DIR, new File(descriptor.getFullPath()).getParent());
this.builder = new AzkabanProcessBuilder(command).
diff --git a/src/java/azkaban/jobExecutor/ProcessJob.java b/src/java/azkaban/jobExecutor/ProcessJob.java
index 8d42a4d..50233c0 100644
--- a/src/java/azkaban/jobExecutor/ProcessJob.java
+++ b/src/java/azkaban/jobExecutor/ProcessJob.java
@@ -39,8 +39,8 @@ public class ProcessJob extends AbstractProcessJob {
private static final long KILL_TIME_MS = 5000;
private volatile AzkabanProcess process;
- public ProcessJob(final String jobId, final Props props, final Logger log) {
- super(jobId, props, log);
+ public ProcessJob(final String jobId, final Props sysProps, final Props jobProps, final Logger log) {
+ super(jobId, sysProps, jobProps, log);
}
@Override
@@ -61,12 +61,12 @@ public class ProcessJob extends AbstractProcessJob {
Map<String, String> envVars = getEnvironmentVariables();
for (String command : commands) {
+ info("Command: " + command);
AzkabanProcessBuilder builder = new AzkabanProcessBuilder(partitionCommandLine(command))
.setEnv(envVars)
.setWorkingDir(getCwd())
.setLogger(getLog());
-
- info("Command: " + builder.getCommandString());
+
if (builder.getEnv().size() > 0) {
info("Environment variables: " + builder.getEnv());
}
@@ -97,9 +97,9 @@ public class ProcessJob extends AbstractProcessJob {
protected List<String> getCommandList() {
List<String> commands = new ArrayList<String>();
- commands.add(_props.getString(COMMAND));
- for (int i = 1; _props.containsKey(COMMAND + "." + i); i++) {
- commands.add(_props.getString(COMMAND + "." + i));
+ commands.add(jobProps.getString(COMMAND));
+ for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++) {
+ commands.add(jobProps.getString(COMMAND + "." + i));
}
return commands;
@@ -127,7 +127,7 @@ public class ProcessJob extends AbstractProcessJob {
@Override
public Props getProps() {
- return _props;
+ return jobProps;
}
public String getPath() {
diff --git a/src/java/azkaban/jobExecutor/PythonJob.java b/src/java/azkaban/jobExecutor/PythonJob.java
index 6a66965..8989ff5 100644
--- a/src/java/azkaban/jobExecutor/PythonJob.java
+++ b/src/java/azkaban/jobExecutor/PythonJob.java
@@ -26,10 +26,11 @@ public class PythonJob extends LongArgJob {
private static final String PYTHON_BINARY_KEY = "python";
private static final String SCRIPT_KEY = "script";
- public PythonJob(String jobid, Props props, Logger log) {
+ public PythonJob(String jobid, Props sysProps, Props jobProps, Logger log) {
super(jobid,
- new String[] { props.getString(PYTHON_BINARY_KEY, "python"),props.getString(SCRIPT_KEY) },
- props,
+ new String[] { jobProps.getString(PYTHON_BINARY_KEY, "python"),jobProps.getString(SCRIPT_KEY) },
+ sysProps,
+ jobProps,
log,
ImmutableSet.of(PYTHON_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
}
diff --git a/src/java/azkaban/jobExecutor/RubyJob.java b/src/java/azkaban/jobExecutor/RubyJob.java
index 497d203..c913e97 100644
--- a/src/java/azkaban/jobExecutor/RubyJob.java
+++ b/src/java/azkaban/jobExecutor/RubyJob.java
@@ -26,10 +26,11 @@ public class RubyJob extends LongArgJob {
private static final String RUBY_BINARY_KEY = "ruby";
private static final String SCRIPT_KEY = "script";
- public RubyJob(String jobid, Props props, Logger log) {
+ public RubyJob(String jobid, Props sysProps, Props jobProps, Logger log) {
super(jobid,
- new String[] { props.getString(RUBY_BINARY_KEY, "ruby"), props.getString(SCRIPT_KEY) },
- props,
+ new String[] { jobProps.getString(RUBY_BINARY_KEY, "ruby"), jobProps.getString(SCRIPT_KEY) },
+ sysProps,
+ jobProps,
log,
ImmutableSet.of(RUBY_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
}
diff --git a/src/java/azkaban/jobExecutor/ScriptJob.java b/src/java/azkaban/jobExecutor/ScriptJob.java
index 5ed0e04..52623e7 100644
--- a/src/java/azkaban/jobExecutor/ScriptJob.java
+++ b/src/java/azkaban/jobExecutor/ScriptJob.java
@@ -32,10 +32,11 @@ public class ScriptJob extends LongArgJob {
private static final String DEFAULT_EXECUTABLE_KEY = "executable";
private static final String SCRIPT_KEY = "script";
- public ScriptJob(String jobid, Props props, Logger log) {
+ public ScriptJob(String jobid, Props sysProps, Props jobProps, Logger log) {
super(jobid,
- new String[] { props.getString(DEFAULT_EXECUTABLE_KEY),props.getString(SCRIPT_KEY) },
- props,
+ new String[] { jobProps.getString(DEFAULT_EXECUTABLE_KEY), jobProps.getString(SCRIPT_KEY) },
+ sysProps,
+ jobProps,
log,
ImmutableSet.of(DEFAULT_EXECUTABLE_KEY, SCRIPT_KEY, JOB_TYPE));
}
src/java/azkaban/jobtype/JobTypeManager.java 290(+290 -0)
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
new file mode 100644
index 0000000..845f3e9
--- /dev/null
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -0,0 +1,290 @@
+package azkaban.jobtype;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.jobExecutor.Job;
+import azkaban.jobExecutor.NoopJob;
+import azkaban.jobExecutor.ProcessJob;
+import azkaban.jobExecutor.PythonJob;
+import azkaban.jobExecutor.RubyJob;
+import azkaban.jobExecutor.ScriptJob;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import azkaban.utils.Utils;
+import azkaban.jobExecutor.utils.InitErrorJob;
+import azkaban.jobExecutor.utils.JobExecutionException;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class JobTypeManager
+{
+
+ private final String jobtypePluginDir; // the dir for jobtype plugins
+ private final ClassLoader parentLoader;
+
+ private static final String jobtypeConfFile = "plugin.properties"; // need jars.to.include property, will be loaded with user property
+ private static final String jobtypeSysConfFile = "private.properties"; // not exposed to users
+ private static final String commonConfFile = "types.properties"; // common properties for multiple plugins
+ private static final String commonSysConfFile = "typesprivate.properties"; // common private properties for multiple plugins
+ private static final Logger logger = Logger.getLogger(JobTypeManager.class);
+
+ private Map<String, Class<? extends Job>> jobToClass;
+ private Map<String, Props> jobtypeJobProps;
+ private Map<String, Props> jobtypeSysProps;
+
+ public JobTypeManager(String jobtypePluginDir, ClassLoader parentClassLoader)
+ {
+ this.jobtypePluginDir = jobtypePluginDir;
+ this.parentLoader = parentClassLoader;
+
+ jobToClass = new HashMap<String, Class<? extends Job>>();
+ jobtypeJobProps = new HashMap<String, Props>();
+ jobtypeSysProps = new HashMap<String, Props>();
+
+ loadDefaultTypes();
+
+ if(jobtypePluginDir != null) {
+ logger.info("job type plugin directory set. Loading extra job types.");
+ loadPluginJobTypes();
+ }
+
+ }
+
+ private void loadDefaultTypes() throws JobTypeManagerException{
+// jobToClass.put("java", JavaJob.class);
+ jobToClass.put("command", ProcessJob.class);
+ jobToClass.put("javaprocess", JavaProcessJob.class);
+// jobToClass.put("pig", PigProcessJob.class);
+ jobToClass.put("propertyPusher", NoopJob.class);
+ jobToClass.put("python", PythonJob.class);
+ jobToClass.put("ruby", RubyJob.class);
+ jobToClass.put("script", ScriptJob.class);
+// jobToClass.put("myjobtype", MyTestJobType.class);
+
+ }
+
+ // load Job Typs from dir
+ private void loadPluginJobTypes() throws JobTypeManagerException
+ {
+ if(jobtypePluginDir == null || parentLoader == null) throw new JobTypeManagerException("JobTypeDir not set! JobTypeManager not properly initiated!");
+
+ File jobPluginsDir = new File(jobtypePluginDir);
+ if(!jobPluginsDir.isDirectory()) throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not a directory!");
+ if(!jobPluginsDir.canRead()) throw new JobTypeManagerException("Job type plugin dir " + jobtypePluginDir + " is not readable!");
+
+ // look for global conf
+ Props globalConf = null;
+ Props globalSysConf = null;
+ File confFile = findFilefromDir(jobPluginsDir, commonConfFile);
+ File sysConfFile = findFilefromDir(jobPluginsDir, commonSysConfFile);
+ try {
+ if(confFile != null) {
+ globalConf = new Props(null, confFile);
+ }
+ if(sysConfFile != null) {
+ globalSysConf = new Props(null, sysConfFile);
+ }
+ }
+ catch (Exception e) {
+ throw new JobTypeManagerException("Failed to get global jobtype properties" + e.getCause());
+ }
+
+ for(File dir : jobPluginsDir.listFiles()) {
+ if(dir.isDirectory() && dir.canRead()) {
+ // get its conf file
+ try {
+ loadJobType(dir, globalConf, globalSysConf);
+ }
+ catch (Exception e) {
+ throw new JobTypeManagerException(e);
+ }
+ }
+ }
+
+ }
+
+ public static File findFilefromDir(File dir, String fn){
+ if(dir.isDirectory()) {
+ for(File f : dir.listFiles()) {
+ if(f.getName().equals(fn)) {
+ return f;
+ }
+ }
+ }
+ return null;
+ }
+
+ public void loadJobType(File dir, Props globalConf, Props globalSysConf) throws JobTypeManagerException{
+
+ // look for common conf
+ Props conf = null;
+ Props sysConf = null;
+ File confFile = findFilefromDir(dir, commonConfFile);
+ File sysConfFile = findFilefromDir(dir, commonSysConfFile);
+
+ try {
+ if(confFile != null) {
+ conf = new Props(globalConf, confFile);
+ }
+ else {
+ conf = globalConf;
+ }
+ if(sysConfFile != null) {
+ sysConf = new Props(globalSysConf, sysConfFile);
+ }
+ else {
+ sysConf = globalSysConf;
+ }
+ }
+ catch (Exception e) {
+ throw new JobTypeManagerException("Failed to get common jobtype properties" + e.getCause());
+ }
+
+ // look for jobtypeConf.properties and load it
+ for(File f: dir.listFiles()) {
+ if(f.isFile() && f.getName().equals(jobtypeSysConfFile)) {
+ loadJob(dir, f, conf, sysConf);
+ return;
+ }
+ }
+
+ // no hit, keep looking
+ for(File f : dir.listFiles()) {
+ if(f.isDirectory() && f.canRead())
+ loadJobType(f, conf, sysConf);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public void loadJob(File dir, File jobConfFile, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
+ Props conf = null;
+ Props sysConf = null;
+ File confFile = findFilefromDir(dir, jobtypeConfFile);
+ File sysConfFile = findFilefromDir(dir, jobtypeSysConfFile);
+
+ try {
+ if(confFile != null) {
+ conf = new Props(commonConf, confFile);
+ conf = PropsUtils.resolveProps(conf);
+ }
+ if(sysConfFile != null) {
+ sysConf = new Props(commonSysConf, sysConfFile);
+ sysConf = PropsUtils.resolveProps(sysConf);
+ }
+ }
+ catch (Exception e) {
+ throw new JobTypeManagerException("Failed to get jobtype properties", e);
+ }
+
+ // use directory name as job type name
+ String jobtypeName = dir.getName();
+
+ String jobtypeClass = sysConf.get("jobtype.class");
+
+ logger.info("Loading jobtype " + jobtypeName );
+
+ // sysconf says what jars/confs to load
+ List<String> jobtypeClasspath = sysConf.getStringList("jobtype.classpath", null, ",");
+ List<URL> resources = new ArrayList<URL>();
+ for(String s : jobtypeClasspath) {
+ try {
+ File path = new File(s);
+ if(path.isDirectory()) {
+ for(File f : path.listFiles()) {
+ resources.add(f.toURI().toURL());
+ logger.info("adding to classpath " + f);
+ }
+ }
+ resources.add(path.toURI().toURL());
+ logger.info("adding to classpath " + path);
+ } catch (MalformedURLException e) {
+ // TODO Auto-generated catch block
+ throw new JobTypeManagerException(e);
+ }
+ }
+
+ // each job type can have a different class loader
+ ClassLoader jobTypeLoader = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentLoader);
+
+ Class<? extends Job> clazz = null;
+ try {
+ clazz = (Class<? extends Job>)jobTypeLoader.loadClass(jobtypeClass);
+ jobToClass.put(jobtypeName, clazz);
+ }
+ catch (ClassNotFoundException e) {
+ throw new JobTypeManagerException(e);
+ }
+ logger.info("Loaded jobtype " + jobtypeName + " " + jobtypeClass);
+
+ if(conf != null) jobtypeJobProps.put(jobtypeName, conf);
+ jobtypeSysProps.put(jobtypeName, sysConf);
+
+ }
+
+ public Job buildJobExecutor(String jobId, Props sysProps, Props jobProps, Logger logger) throws JobTypeManagerException
+ {
+
+ Job job;
+ try {
+ String jobType = jobProps.getString("type");
+ if (jobType == null || jobType.length() == 0) {
+ /*throw an exception when job name is null or empty*/
+ throw new JobExecutionException (
+ String.format("The 'type' parameter for job[%s] is null or empty", jobProps, logger));
+ }
+
+ logger.info("Building " + jobType + " job executor. ");
+ Class<? extends Object> executorClass = jobToClass.get(jobType);
+
+ if (executorClass == null) {
+ throw new JobExecutionException(
+ String.format("Could not construct job[%s] of type[%s].", jobProps, jobType));
+ }
+
+ Props sysConf = jobtypeSysProps.containsKey(jobType) ? new Props(sysProps, jobtypeSysProps.get(jobType)) : sysProps;
+ Props jobConf = jobtypeJobProps.containsKey(jobType) ? new Props(jobProps, jobtypeJobProps.get(jobType)) : jobProps;
+ sysConf = PropsUtils.resolveProps(sysConf);
+ jobConf = PropsUtils.resolveProps(jobConf);
+
+// logger.info("sysConf is " + sysConf);
+// logger.info("jobConf is " + jobConf);
+
+ job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
+
+ }
+ catch (Exception e) {
+ job = new InitErrorJob(jobId, e);
+ //throw new JobTypeManagerException(e);
+ }
+
+ return job;
+ }
+
+ public void registerJobType(String typeName, Class<? extends Job> jobTypeClass) {
+ jobToClass.put(typeName, jobTypeClass);
+ }
+}
+
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 7ba04f8..ef0f076 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -223,10 +223,15 @@ public class ScheduleManager {
* @param flow
*/
public synchronized void insertSchedule(Schedule s) {
+ boolean exist = scheduleIDMap.containsKey(s.getScheduleId());
if(s.updateTime()) {
internalSchedule(s);
try {
- loader.insertSchedule(s);
+ if(!exist) {
+ loader.insertSchedule(s);
+ }
+ else
+ loader.updateSchedule(s);
} catch (ScheduleManagerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
src/java/azkaban/utils/FileIOUtils.java 18(+18 -0)
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index 7bda63b..d09b3a0 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -12,6 +12,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.StringTokenizer;
import org.apache.commons.io.IOUtils;
@@ -19,6 +20,23 @@ import org.apache.commons.io.IOUtils;
* Runs a few unix commands. Created this so that I can move to JNI in the future.
*/
public class FileIOUtils {
+ public static String getSourcePathFromClass(Class<?> containedClass) {
+ File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+ if (!file.isDirectory() && file.getName().endsWith(".class")) {
+ String name = containedClass.getName();
+ StringTokenizer tokenizer = new StringTokenizer(name, ".");
+ while(tokenizer.hasMoreTokens()) {
+ tokenizer.nextElement();
+ file = file.getParentFile();
+ }
+ return file.getPath();
+ }
+ else {
+ return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
+ }
+ }
+
/**
* Run a unix command that will symlink files, and recurse into directories.
*/
diff --git a/src/java/azkaban/utils/GZIPUtils.java b/src/java/azkaban/utils/GZIPUtils.java
index 147cb55..b402067 100644
--- a/src/java/azkaban/utils/GZIPUtils.java
+++ b/src/java/azkaban/utils/GZIPUtils.java
@@ -6,7 +6,7 @@ import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.commons.io.IOUtils;
public class GZIPUtils {
@@ -36,7 +36,7 @@ public class GZIPUtils {
GZIPInputStream gzipInputStream = new GZIPInputStream(byteInputStream);
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
- IOUtils.copyBytes(gzipInputStream, byteOutputStream, 1024);
+ IOUtils.copy(gzipInputStream, byteOutputStream);
return byteOutputStream.toByteArray();
}
src/java/azkaban/webapp/AzkabanWebServer.java 173(+135 -38)
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index fbe9878..01261a8 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -20,17 +20,22 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Properties;
import java.util.TimeZone;
+import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
import org.joda.time.DateTimeZone;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.security.SslSocketConnector;
@@ -46,20 +51,22 @@ import azkaban.project.ProjectManager;
import azkaban.scheduler.JdbcScheduleLoader;
import azkaban.scheduler.ScheduleManager;
-import azkaban.security.DefaultHadoopSecurityManager;
-import azkaban.security.HadoopSecurityManager;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
+import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
import azkaban.utils.Utils;
import azkaban.webapp.servlet.AzkabanServletContextListener;
+import azkaban.webapp.servlet.AbstractAzkabanServlet;
import azkaban.webapp.servlet.ExecutorServlet;
-import azkaban.webapp.servlet.HdfsBrowserServlet;
+import azkaban.webapp.servlet.LoginAbstractAzkabanServlet;
import azkaban.webapp.servlet.ScheduleServlet;
import azkaban.webapp.servlet.HistoryServlet;
import azkaban.webapp.servlet.IndexServlet;
import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.ViewerPlugin;
import azkaban.webapp.session.SessionCache;
import joptsimple.OptionParser;
@@ -104,7 +111,6 @@ public class AzkabanWebServer implements AzkabanServer {
private static final int DEFAULT_THREAD_NUMBER = 20;
private static final String VELOCITY_DEV_MODE_PARAM = "velocity.dev.mode";
private static final String USER_MANAGER_CLASS_PARAM = "user.manager.class";
- private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM = "hadoop.security.manager.class";
private static final String DEFAULT_STATIC_DIR = "";
private final VelocityEngine velocityEngine;
@@ -115,11 +121,11 @@ public class AzkabanWebServer implements AzkabanServer {
private ScheduleManager scheduleManager;
private final ClassLoader baseClassLoader;
- private HadoopSecurityManager hadoopSecurityManager;
private Props props;
private SessionCache sessionCache;
private File tempDir;
+ private List<ViewerPlugin> viewerPlugins;
/**
* Constructor usually called by tomcat AzkabanServletContext to create the
@@ -141,7 +147,6 @@ public class AzkabanWebServer implements AzkabanServer {
executorManager = loadExecutorManager(props);
scheduleManager = loadScheduleManager(executorManager, props);
baseClassLoader = getBaseClassloader();
- hadoopSecurityManager = loadHadoopSecurityManager(props);
tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -155,6 +160,10 @@ public class AzkabanWebServer implements AzkabanServer {
}
}
+ private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
+ this.viewerPlugins = viewerPlugins;
+ }
+
private UserManager loadUserManager(Props props) {
Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
logger.info("Loading user manager class " + userManagerClass.getName());
@@ -253,10 +262,12 @@ public class AzkabanWebServer implements AzkabanServer {
*/
private VelocityEngine configureVelocityEngine(final boolean devMode) {
VelocityEngine engine = new VelocityEngine();
- engine.setProperty("resource.loader", "classpath");
+ engine.setProperty("resource.loader", "classpath, jar");
engine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
engine.setProperty("classpath.resource.loader.cache", !devMode);
engine.setProperty("classpath.resource.loader.modificationCheckInterval", 5L);
+ engine.setProperty("jar.resource.loader.class", JarResourceLoader.class.getName());
+ engine.setProperty("jar.resource.loader.cache", !devMode);
engine.setProperty("resource.manager.logwhenfound", false);
engine.setProperty("input.encoding", "UTF-8");
engine.setProperty("output.encoding", "UTF-8");
@@ -297,35 +308,6 @@ public class AzkabanWebServer implements AzkabanServer {
return retVal;
}
- public HadoopSecurityManager getHadoopSecurityManager() {
- return hadoopSecurityManager;
- }
-
- private HadoopSecurityManager loadHadoopSecurityManager(Props props) {
-
- Class<?> hadoopSecurityManagerClass = props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, null);
- logger.info("Loading hadoop security manager class " + hadoopSecurityManagerClass.getName());
- HadoopSecurityManager hadoopSecurityManager = null;
-
- if (hadoopSecurityManagerClass != null && hadoopSecurityManagerClass.getConstructors().length > 0) {
-
- try {
- Constructor<?> hsmConstructor = hadoopSecurityManagerClass.getConstructor(Props.class);
- hadoopSecurityManager = (HadoopSecurityManager) hsmConstructor.newInstance(props);
- }
- catch (Exception e) {
- logger.error("Could not instantiate Hadoop Security Manager "+ hadoopSecurityManagerClass.getName());
- throw new RuntimeException(e);
- }
- }
- else {
- hadoopSecurityManager = new DefaultHadoopSecurityManager();
- }
-
- return hadoopSecurityManager;
-
- }
-
public ClassLoader getClassLoader() {
return baseClassLoader;
}
@@ -426,10 +408,16 @@ public class AzkabanWebServer implements AzkabanServer {
root.addServlet(new ServletHolder(new ExecutorServlet()),"/executor");
root.addServlet(new ServletHolder(new HistoryServlet()), "/history");
root.addServlet(new ServletHolder(new ScheduleServlet()),"/schedule");
- root.addServlet(new ServletHolder(new HdfsBrowserServlet()), "/hdfs/*");
+
+ String viewerPluginDir = azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
+ List<String> viewerPlugins = azkabanSettings.getStringList("viewer.plugins", (List<String>) null);
+ if (viewerPlugins != null) {
+ app.setViewerPlugins(loadViewerPlugins(root, viewerPluginDir, viewerPlugins, app.getVelocityEngine()));
+ }
+ //root.addServlet(new ServletHolder(new HdfsBrowserServlet()), "/hdfs/*");
root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
-
+
try {
server.start();
}
@@ -455,6 +443,115 @@ public class AzkabanWebServer implements AzkabanServer {
logger.info("Server running on port " + sslPortNumber + ".");
}
+ private static List<ViewerPlugin> loadViewerPlugins(Context root, String pluginPath, List<String> plugins, VelocityEngine ve) {
+ ArrayList<ViewerPlugin> installedViewerPlugins = new ArrayList<ViewerPlugin>();
+
+ File viewerPluginPath = new File(pluginPath);
+ ClassLoader parentLoader = AzkabanWebServer.class.getClassLoader();
+ ArrayList<String> jarPaths = new ArrayList<String>();
+ for (String plug: plugins) {
+ File pluginDir = new File(viewerPluginPath, plug);
+ if (!pluginDir.exists()) {
+ logger.error("Error viewer plugin path " + pluginDir.getPath() + " doesn't exist.");
+ continue;
+ }
+
+ if (!pluginDir.isDirectory()) {
+ logger.error("The plugin path " + pluginDir + " is not a directory.");
+ continue;
+ }
+
+ // Load the conf directory
+ File propertiesDir = new File(pluginDir, "conf");
+ Props pluginProps = null;
+ if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+ pluginProps = PropsUtils.loadPropsInDir(propertiesDir, "properties");
+ }
+ else {
+ logger.error("Plugin conf path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ String pluginName = pluginProps.getString("viewer.name");
+ String pluginWebPath = pluginProps.getString("viewer.path");
+ String pluginClass = pluginProps.getString("viewer.servlet.class");
+ if (pluginClass == null) {
+ logger.error("Viewer class is not set.");
+ }
+ else {
+ logger.error("Plugin class " + pluginClass);
+ }
+
+ URLClassLoader urlClassLoader = null;
+ File libDir = new File(pluginDir, "lib");
+ if (libDir.exists() && libDir.isDirectory()) {
+ File[] files = libDir.listFiles();
+
+ URL[] url = new URL[files.length];
+ for (int i=0; i < files.length; ++i) {
+ try {
+ url[i] = files[i].toURI().toURL();
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+
+ urlClassLoader = new URLClassLoader(url, parentLoader);
+ }
+ else {
+ logger.error("Library path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ Class<?> viewerClass = null;
+ try {
+ viewerClass = urlClassLoader.loadClass(pluginClass);
+ }
+ catch (ClassNotFoundException e) {
+ logger.error("Class " + pluginClass + " not found.");
+ continue;
+ }
+
+ String source = FileIOUtils.getSourcePathFromClass(viewerClass);
+ logger.info("Source jar " + source);
+ jarPaths.add("jar:file:" + source);
+
+ Constructor<?> constructor = null;
+ try {
+ constructor = viewerClass.getConstructor(Props.class);
+ } catch (NoSuchMethodException e) {
+ logger.error("Constructor not found in " + pluginClass);
+ continue;
+ }
+
+ Object obj = null;
+ try {
+ obj = constructor.newInstance(pluginProps);
+ } catch (Exception e) {
+ logger.error(e);
+ }
+
+ if (!(obj instanceof AbstractAzkabanServlet)) {
+ logger.error("The object is not an AbstractViewerServlet");
+ continue;
+ }
+
+ AbstractAzkabanServlet avServlet = (AbstractAzkabanServlet)obj;
+ root.addServlet(new ServletHolder(avServlet), "/" + pluginWebPath + "/*");
+ installedViewerPlugins.add(new ViewerPlugin(pluginName, pluginWebPath));
+ }
+
+ String jarResourcePath = StringUtils.join(jarPaths, ", ");
+ logger.info("Setting jar resource path " + jarResourcePath);
+ ve.addProperty("jar.resource.loader.path", jarResourcePath);
+
+ return installedViewerPlugins;
+ }
+
+ public List<ViewerPlugin> getViewerPlugins() {
+ return viewerPlugins;
+ }
+
/**
* Loads the Azkaban property file from the AZKABAN_HOME conf directory
*
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index e1fd52c..9817b56 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -17,6 +17,7 @@
package azkaban.webapp.servlet;
import java.io.IOException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
@@ -30,6 +31,8 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.velocity.Template;
+import org.apache.velocity.app.VelocityEngine;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
@@ -64,6 +67,8 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
private String label;
private String color;
+ private List<ViewerPlugin> viewerPlugins;
+
/**
* To retrieve the application for the servlet
*
@@ -86,6 +91,11 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
name = props.getString("azkaban.name", "");
label = props.getString("azkaban.label", "");
color = props.getString("azkaban.color", "#FF0000");
+
+ if (application instanceof AzkabanWebServer) {
+ AzkabanWebServer server = (AzkabanWebServer)application;
+ viewerPlugins = server.getViewerPlugins();
+ }
}
/**
@@ -329,6 +339,14 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
page.add("success_message", successMsg == null || successMsg.isEmpty() ? "null" : successMsg);
setSuccessMessageInCookie(resp, null);
+ //@TODO, allow more than one type of viewer. For time sake, I only install the first one
+ if (viewerPlugins != null && !viewerPlugins.isEmpty()) {
+ page.add("viewers", viewerPlugins);
+ ViewerPlugin plugin = viewerPlugins.get(0);
+ page.add("viewerName", plugin.getPluginName());
+ page.add("viewerPath", plugin.getPluginPath());
+ }
+
return page;
}
@@ -348,9 +366,18 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
page.add("timezone", ZONE_FORMATTER.print(System.currentTimeMillis()));
page.add("currentTime", (new DateTime()).getMillis());
page.add("context", req.getContextPath());
+
+ //@TODO, allow more than one type of viewer. For time sake, I only install the first one
+ if (viewerPlugins != null && !viewerPlugins.isEmpty()) {
+ page.add("viewers", viewerPlugins);
+ ViewerPlugin plugin = viewerPlugins.get(0);
+ page.add("viewerName", plugin.getPluginName());
+ page.add("viewerPath", plugin.getPluginPath());
+ }
+
return page;
}
-
+
/**
* Writes json out to the stream.
*
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 8fb522f..9932e12 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -661,11 +661,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
for(String roleName: user.getRoles()) {
Role role = userManager.getRole(roleName);
- if (role.getPermission().isPermissionSet(type)) {
+ if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
return true;
}
}
- return true;
+ return false;
}
}
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 5455044..c66aff3 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -22,16 +22,20 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.fileupload.servlet.ServletFileUpload;
import org.apache.log4j.Logger;
import azkaban.user.User;
import azkaban.user.UserManager;
import azkaban.user.UserManagerException;
+import azkaban.utils.Props;
+import azkaban.webapp.AzkabanServer;
import azkaban.webapp.session.Session;
/**
@@ -44,7 +48,17 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
private static final Logger logger = Logger.getLogger(LoginAbstractAzkabanServlet.class.getName());
private static final String SESSION_ID_NAME = "azkaban.browser.session.id";
-
+ private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
+
+ private MultipartParser multipartParser;
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ super.init(config);
+
+ multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
+ }
+
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// Set session id
@@ -72,7 +86,7 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
}
}
- private Session getSessionFromRequest(HttpServletRequest req) {
+ private Session getSessionFromRequest(HttpServletRequest req) throws ServletException {
String remoteIp = req.getRemoteAddr();
Cookie cookie = getCookieByName(req, SESSION_ID_NAME);
String sessionId = null;
@@ -81,17 +95,25 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
sessionId = cookie.getValue();
logger.info("Session id " + sessionId);
}
+
+ if (sessionId == null && hasParam(req, "session.id")) {
+ sessionId = getParam(req, "session.id");
+ }
+ return getSessionFromSessionId(sessionId, remoteIp);
+ }
+
+ private Session getSessionFromSessionId(String sessionId, String remoteIp) {
if (sessionId == null) {
return null;
- } else {
- Session session = getApplication().getSessionCache().getSession(sessionId);
- // Check if the IP's are equal. If not, we invalidate the sesson.
- if (session == null || !remoteIp.equals(session.getIp())) {
- return null;
- }
-
- return session;
}
+
+ Session session = getApplication().getSessionCache().getSession(sessionId);
+ // Check if the IP's are equal. If not, we invalidate the sesson.
+ if (session == null || !remoteIp.equals(session.getIp())) {
+ return null;
+ }
+
+ return session;
}
private void handleLogin(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
@@ -109,32 +131,62 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
- if (hasParam(req, "action")) {
- String action = getParam(req, "action");
- if (action.equals("login")) {
- HashMap<String,Object> obj = new HashMap<String,Object>();
- handleAjaxLoginAction(req, resp, obj);
- this.writeJSON(resp, obj);
- }
- else {
- Session session = getSessionFromRequest(req);
- if (session == null) {
- if (isAjaxCall(req)) {
- String response = createJsonResponse("error", "Invalid Session. Need to re-login", "login", null);
- writeResponse(resp, response);
- }
- else {
- handleLogin(req, resp, "Enter username and password");
+ Session session = getSessionFromRequest(req);
+
+ // Handle Multipart differently from other post messages
+ if(ServletFileUpload.isMultipartContent(req)) {
+ Map<String, Object> params = multipartParser.parseMultipart(req);
+ if (session == null) {
+ // See if the session id is properly set.
+ if (params.containsKey("session.id")) {
+ String sessionId = (String)params.get("session.id");
+ String ip = req.getRemoteAddr();
+
+ session = getSessionFromSessionId(sessionId, ip);
+ if (session != null) {
+ handleMultiformPost(req, resp, params, session);
+ return;
}
- }
- else {
- handlePost(req, resp, session);
+ }
+
+ // if there's no valid session, see if it's a one time session.
+ if (!params.containsKey("username") || !params.containsKey("password")) {
+ writeResponse(resp, "Login error. Need username and password");
+ return;
+ }
+
+ String username = (String)params.get("username");
+ String password = (String)params.get("password");
+ String ip = req.getRemoteAddr();
+
+ try {
+ session = createSession(username, password, ip);
+ } catch (UserManagerException e) {
+ writeResponse(resp, "Login error: " + e.getMessage());
+ return;
}
}
- }
- else {
- Session session = getSessionFromRequest(req);
- if (session == null) {
+
+ handleMultiformPost(req, resp, params, session);
+ }
+ else if (hasParam(req, "action") && getParam(req, "action").equals("login")) {
+ HashMap<String,Object> obj = new HashMap<String,Object>();
+ handleAjaxLoginAction(req, resp, obj);
+ this.writeJSON(resp, obj);
+ }
+ else if (session == null) {
+ if (hasParam(req, "username") && hasParam(req, "password")) {
+ // If it's a post command with curl, we create a temporary session
+ try {
+ session = createSession(req);
+ } catch (UserManagerException e) {
+ writeResponse(resp, "Login error: " + e.getMessage());
+ }
+
+ handlePost(req, resp, session);
+ }
+ else {
+ // There are no valid sessions and temporary logins, no we either pass back a message or redirect.
if (isAjaxCall(req)) {
String response = createJsonResponse("error", "Invalid Session. Need to re-login", "login", null);
writeResponse(resp, response);
@@ -142,37 +194,47 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
else {
handleLogin(req, resp, "Enter username and password");
}
- }
- else {
- handlePost(req, resp, session);
}
}
+ else {
+ handlePost(req, resp, session);
+ }
}
+ private Session createSession(HttpServletRequest req) throws UserManagerException, ServletException {
+ String username = getParam(req, "username");
+ String password = getParam(req, "password");
+ String ip = req.getRemoteAddr();
+
+ return createSession(username, password, ip);
+ }
+
+ private Session createSession(String username, String password, String ip) throws UserManagerException, ServletException {
+ UserManager manager = getApplication().getUserManager();
+ User user = manager.getUser(username, password);
+
+ String randomUID = UUID.randomUUID().toString();
+ Session session = new Session(randomUID, user, ip);
+
+ return session;
+ }
+
protected void handleAjaxLoginAction(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> ret) throws ServletException {
if (hasParam(req, "username") && hasParam(req, "password")) {
- String username = getParam(req, "username");
- String password = getParam(req, "password");
-
- UserManager manager = getApplication().getUserManager();
-
- User user = null;
+ Session session = null;
try {
- user = manager.getUser(username, password);
- }
- catch (UserManagerException e) {
- ret.put("error", "Incorrect Login. " + e.getCause());
+ session = createSession(req);
+ } catch (UserManagerException e) {
+ ret.put("error", "Incorrect Login. " + e.getMessage());
return;
}
-
- String ip = req.getRemoteAddr();
- String randomUID = UUID.randomUUID().toString();
- Session session = new Session(randomUID, user, ip);
- Cookie cookie = new Cookie(SESSION_ID_NAME, randomUID);
+
+ Cookie cookie = new Cookie(SESSION_ID_NAME, session.getSessionId());
cookie.setPath("/");
resp.addCookie(cookie);
getApplication().getSessionCache().addSession(session);
ret.put("status", "success");
+ ret.put("session.id", session.getSessionId());
}
else {
ret.put("error", "Incorrect Login.");
@@ -218,4 +280,17 @@ public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet
* @throws IOException
*/
protected abstract void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException;
+
+ /**
+ * The post request is handed off to the implementor after the user is
+ * logged in.
+ *
+ * @param req
+ * @param resp
+ * @param session
+ * @throws ServletException
+ * @throws IOException
+ */
+ protected void handleMultiformPost(HttpServletRequest req, HttpServletResponse resp, Map<String,Object> multipart, Session session) throws ServletException, IOException {
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/Page.java b/src/java/azkaban/webapp/servlet/Page.java
index 7d44f8f..d8d5194 100644
--- a/src/java/azkaban/webapp/servlet/Page.java
+++ b/src/java/azkaban/webapp/servlet/Page.java
@@ -62,8 +62,7 @@ public class Page {
public void render() {
try {
response.setContentType(mimeType);
- engine.mergeTemplate(template, "UTF-8", context,
- response.getWriter());
+ engine.mergeTemplate(template, "UTF-8", context, response.getWriter());
} catch (Exception e) {
throw new PageRenderException(e);
}
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index dcefce6..0bbe2ba 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -73,15 +73,12 @@ import azkaban.webapp.servlet.MultipartParser;
public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1;
private static final Logger logger = Logger.getLogger(ProjectManagerServlet.class);
- private static final int DEFAULT_UPLOAD_DISK_SPOOL_SIZE = 20 * 1024 * 1024;
private static final NodeLevelComparator NODE_LEVEL_COMPARATOR = new NodeLevelComparator();
private ProjectManager projectManager;
private ExecutorManager executorManager;
private ScheduleManager scheduleManager;
private UserManager userManager;
-
- private MultipartParser multipartParser;
private static Comparator<Flow> FLOW_ID_COMPARATOR = new Comparator<Flow>() {
@Override
@@ -99,8 +96,6 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
executorManager = server.getExecutorManager();
scheduleManager = server.getScheduleManager();
userManager = server.getUserManager();
-
- multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE);
}
@Override
@@ -145,18 +140,18 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
}
@Override
- protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
- if (ServletFileUpload.isMultipartContent(req)) {
- logger.info("Post is multipart");
- Map<String, Object> params = multipartParser.parseMultipart(req);
- if (params.containsKey("action")) {
- String action = (String)params.get("action");
- if (action.equals("upload")) {
- handleUpload(req, resp, params, session);
- }
+ protected void handleMultiformPost(HttpServletRequest req, HttpServletResponse resp, Map<String, Object> params, Session session) throws ServletException, IOException {
+ if (params.containsKey("action")) {
+ String action = (String)params.get("action");
+ if (action.equals("upload")) {
+ handleUpload(req, resp, params, session);
}
}
- else if (hasParam(req, "action")) {
+ }
+
+ @Override
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ if (hasParam(req, "action")) {
String action = getParam(req, "action");
if (action.equals("create")) {
handleCreate(req, resp, session);
@@ -1161,12 +1156,12 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
for(String roleName: user.getRoles()) {
Role role = userManager.getRole(roleName);
- if (role.getPermission().isPermissionSet(type)) {
+ if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
return true;
}
}
- return true;
+ return false;
}
private Permission getPermissionObject(Project project, User user, Permission.Type type) {
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 85731a8..480ea9a 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -207,7 +207,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
//project.info("User '" + user.getUserId() + "' has scheduled " + flow.getId() + "[" + schedFlow.toNiceString() + "].");
Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId());
logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId +")" + "].");
- projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
+ projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
ret.put("status", "success");
ret.put("message", projectName + "." + flowName + " scheduled.");
@@ -239,11 +239,11 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
for(String roleName: user.getRoles()) {
Role role = userManager.getRole(roleName);
- if (role.getPermission().isPermissionSet(type)) {
+ if (role.getPermission().isPermissionSet(type) || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
return true;
}
}
- return true;
+ return false;
}
}
diff --git a/src/java/azkaban/webapp/servlet/velocity/nav.vm b/src/java/azkaban/webapp/servlet/velocity/nav.vm
index 5d98006..6988e36 100644
--- a/src/java/azkaban/webapp/servlet/velocity/nav.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/nav.vm
@@ -19,6 +19,8 @@
function navMenuClick(url) {
window.location.href=url;
}
+
+ function
</script>
<ul id="nav" class="nav">
@@ -26,14 +28,18 @@
<li id="scheduled-jobs-tab" #if($current_page == 'schedule')class="selected"#end onClick="navMenuClick('$!context/schedule')"><a href="$!context/schedule">Scheduled</a></li>
<li id="executing-jobs-tab" #if($current_page == 'executing')class="selected"#end onClick="navMenuClick('$!context/executor')"><a href="$!context/executor">Executing</a></li>
<li id="history-jobs-tab" #if($current_page == 'history')class="selected"#end onClick="navMenuClick('$!context/history')"><a href="$!context/history">History</a></li>
- <li id="hdfs-browser-tab" #if($current_page == 'hdfsbrowser')class="selected"#end onClick="navMenuClick('$!context/hdfs')"><a href="$!context/hdfs">HDFS</a></li>
+
+ #if ($viewers)
+ <li id="viewer-tab" #if($current_page == 'viewer') class="selected"#end onClick="navMenuClick('$!context/$viewerPath')">
+ <a href="$!context/$viewerPath">$viewerName</a>
+ </li>
+ #end
</ul>
-
<div id="user-id">
<a>${user_id}<div id="user-down"></div></a>
<div id="user-menu">
<ul><li><a id="logout" href="$!context?logout">logout</a></li></ul>
</div>
</div>
- </div>
+ </div>
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ViewerPlugin.java b/src/java/azkaban/webapp/servlet/ViewerPlugin.java
new file mode 100644
index 0000000..725473f
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/ViewerPlugin.java
@@ -0,0 +1,19 @@
+package azkaban.webapp.servlet;
+
+public class ViewerPlugin {
+ private final String pluginName;
+ private final String pluginPath;
+
+ public ViewerPlugin(String pluginName, String pluginPath) {
+ this.pluginName = pluginName;
+ this.pluginPath = pluginPath;
+ }
+
+ public String getPluginName() {
+ return pluginName;
+ }
+
+ public String getPluginPath() {
+ return pluginPath;
+ }
+}
src/java/azkaban/webapp/session/Session.java 12(+12 -0)
diff --git a/src/java/azkaban/webapp/session/Session.java b/src/java/azkaban/webapp/session/Session.java
index d12b1f4..38f4aa0 100644
--- a/src/java/azkaban/webapp/session/Session.java
+++ b/src/java/azkaban/webapp/session/Session.java
@@ -15,6 +15,9 @@
*/
package azkaban.webapp.session;
+import java.util.HashMap;
+import java.util.Map;
+
import azkaban.user.User;
/**
@@ -24,6 +27,7 @@ public class Session {
private final User user;
private final String sessionId;
private final String ip;
+ private Map<String, Object> sessionData = new HashMap<String, Object>();
/**
* Constructor for the session
@@ -58,4 +62,12 @@ public class Session {
public String getIp() {
return ip;
}
+
+ public void setSessionData(String key, Object value) {
+ sessionData.put(key, value);
+ }
+
+ public Object getSessionData(String key) {
+ return sessionData.get(key);
+ }
}
unit/executions/logtest/largeLog1.log.short 68(+68 -0)
diff --git a/unit/executions/logtest/largeLog1.log.short b/unit/executions/logtest/largeLog1.log.short
new file mode 100644
index 0000000..83d535f
--- /dev/null
+++ b/unit/executions/logtest/largeLog1.log.short
@@ -0,0 +1,68 @@
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an
+...
+Log file is too big. Skipping 101991 in the middle.
+...
+ore.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
+I'm Henery the Eighth, I am,
+Henery the Eighth I am, I am!
+I got married to the widow next door,
+She's been married seven times before.
+And every one was an Henery
+It wouldn't be a Willie or a Sam
+I'm her eighth old man I'm Henery
+Henery the Eighth, I am!
+Second verse, same as the first!
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 57c6777..e4496bb 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -21,18 +21,15 @@ import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorLoader;
import azkaban.flow.Flow;
-import azkaban.jobtype.JobtypeManager;
-import azkaban.security.HadoopSecurityManager;
-import azkaban.security.HadoopSecurityManager_H_1_0_2;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.test.executor.JavaJob;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
public class FlowRunnerTest {
private File workingDir;
private Logger logger = Logger.getLogger(FlowRunnerTest.class);
- private JobtypeManager jobtypeManager;
- private HadoopSecurityManager hadoopSecurityManager;
-
+ private JobTypeManager jobtypeManager;
public FlowRunnerTest() {
}
@@ -45,8 +42,8 @@ public class FlowRunnerTest {
FileUtils.deleteDirectory(workingDir);
}
workingDir.mkdirs();
- jobtypeManager = new JobtypeManager(null, this.getClass().getClassLoader());
- hadoopSecurityManager = new HadoopSecurityManager_H_1_0_2(new Props());
+ jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager.registerJobType("java", JavaJob.class);
}
@After
@@ -355,7 +352,7 @@ public class FlowRunnerTest {
MockProjectLoader projectLoader = new MockProjectLoader(new File(flow.getExecutionPath()));
loader.uploadExecutableFlow(flow);
- FlowRunner runner = new FlowRunner(flow, loader, jobtypeManager, hadoopSecurityManager);
+ FlowRunner runner = new FlowRunner(flow, loader, jobtypeManager);
runner.addListener(eventCollector);
return runner;
@@ -367,7 +364,7 @@ public class FlowRunnerTest {
MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
loader.uploadExecutableFlow(exFlow);
- FlowRunner runner = new FlowRunner(exFlow, loader, jobtypeManager, hadoopSecurityManager);
+ FlowRunner runner = new FlowRunner(exFlow, loader, jobtypeManager);
runner.addListener(eventCollector);
return runner;
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 9bbcfd3..4009f87 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -2,6 +2,7 @@ package azkaban.test.execapp;
import java.io.File;
import java.io.IOException;
+import java.util.StringTokenizer;
import junit.framework.Assert;
@@ -17,17 +18,15 @@ import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.ExecutorLoader;
-import azkaban.jobExecutor.JavaJob;
import azkaban.jobExecutor.ProcessJob;
-import azkaban.jobtype.JobtypeManager;
-import azkaban.security.HadoopSecurityManager;
-import azkaban.security.HadoopSecurityManager_H_1_0_2;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.test.executor.JavaJob;
+import azkaban.test.executor.SleepJavaJob;
import azkaban.utils.Props;
public class JobRunnerTest {
private File workingDir;
- private JobtypeManager jobtypeManager;
- private HadoopSecurityManager hadoopSecurityManager;
+ private JobTypeManager jobtypeManager;
public JobRunnerTest() {
@@ -41,8 +40,8 @@ public class JobRunnerTest {
FileUtils.deleteDirectory(workingDir);
}
workingDir.mkdirs();
- jobtypeManager = new JobtypeManager(null, this.getClass().getClassLoader());
- hadoopSecurityManager = new HadoopSecurityManager_H_1_0_2(new Props());
+ jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
+ jobtypeManager.registerJobType("java", JavaJob.class);
}
@After
@@ -244,7 +243,8 @@ public class JobRunnerTest {
private Props createProps( int sleepSec, boolean fail) {
Props props = new Props();
props.put("type", "java");
- props.put(JavaJob.JOB_CLASS, "azkaban.test.executor.SleepJavaJob");
+
+ props.put(JavaJob.JOB_CLASS, SleepJavaJob.class.getName());
props.put("seconds", sleepSec);
props.put(ProcessJob.WORKING_DIR, workingDir.getPath());
props.put("fail", String.valueOf(fail));
@@ -260,8 +260,26 @@ public class JobRunnerTest {
node.setExecutableFlow(flow);
Props props = createProps(time, fail);
- JobRunner runner = new JobRunner(node, props, workingDir, loader, jobtypeManager, hadoopSecurityManager);
+
+ JobRunner runner = new JobRunner(node, props, props, workingDir, loader, jobtypeManager);
runner.addListener(listener);
return runner;
}
+
+ private static String getSourcePathFromClass(Class containedClass) {
+ File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+ if (!file.isDirectory() && file.getName().endsWith(".class")) {
+ String name = containedClass.getName();
+ StringTokenizer tokenizer = new StringTokenizer(name, ".");
+ while(tokenizer.hasMoreTokens()) {
+ tokenizer.nextElement();
+ file = file.getParentFile();
+ }
+ return file.getPath();
+ }
+ else {
+ return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
+ }
+ }
}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java b/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java
index 4579331..83e55b2 100644
--- a/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java
+++ b/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java
@@ -5,7 +5,7 @@ import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
@RunWith(Suite.class)
-@SuiteClasses({ JavaJobTest.class, ProcessJobTest.class, PythonJobTest.class })
+@SuiteClasses({ JavaProcessJobTest.class, ProcessJobTest.class, PythonJobTest.class })
public class AllJobExecutorTests {
}
diff --git a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
index fabefe3..fde3e53 100644
--- a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
@@ -34,7 +34,7 @@ public class ProcessJobTest
//
// EasyMock.replay(props);
- job = new ProcessJob("TestProcess", props, log);
+ job = new ProcessJob("TestProcess", props, props, log);
}
diff --git a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
index 0881eff..d42cd22 100644
--- a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
@@ -90,7 +90,7 @@ public class PythonJobTest
// EasyMock.expect(descriptor.getProps()).andReturn(props).times(3);
// EasyMock.expect(descriptor.getFullPath()).andReturn(".").times(1);
// EasyMock.replay(descriptor);
- job = new PythonJob("TestProcess", props, log);
+ job = new PythonJob("TestProcess", props, props, log);
// EasyMock.verify(descriptor);
try
{