azkaban-aplcache
Changes
conf/azkaban.properties 7(+6 -1)
src/java/azkaban/executor/JobRunner.java 32(+31 -1)
src/java/azkaban/jobExecutor/AbstractJob.java 108(+108 -0)
src/java/azkaban/jobExecutor/AbstractProcessJob.java 201(+201 -0)
src/java/azkaban/jobExecutor/JavaJob.java 103(+103 -0)
src/java/azkaban/jobExecutor/JavaJobRunnerMain.java 279(+279 -0)
src/java/azkaban/jobExecutor/JavaProcessJob.java 148(+148 -0)
src/java/azkaban/jobExecutor/Job.java 75(+75 -0)
src/java/azkaban/jobExecutor/LongArgJob.java 129(+129 -0)
src/java/azkaban/jobExecutor/NoopJob.java 64(+64 -0)
src/java/azkaban/jobExecutor/PigProcessJob.java 179(+179 -0)
src/java/azkaban/jobExecutor/ProcessJob.java 347(+347 -0)
src/java/azkaban/jobExecutor/PythonJob.java 43(+43 -0)
src/java/azkaban/jobExecutor/RubyJob.java 40(+40 -0)
src/java/azkaban/jobExecutor/ScriptJob.java 46(+46 -0)
src/java/azkaban/jobExecutor/utils/PropsUtils.java 174(+174 -0)
src/java/azkaban/scheduler/ScheduledFlow.java 263(+263 -0)
src/java/azkaban/scheduler/ScheduleManager.java 410(+410 -0)
src/java/azkaban/utils/CircularBuffer.java 75(+75 -0)
src/java/azkaban/utils/JSONUtils.java 169(+168 -1)
src/java/azkaban/utils/Utils.java 57(+57 -0)
src/java/azkaban/webapp/servlet/ScheduleServlet.java 172(+172 -0)
src/web/js/azkaban.flow.view.js 72(+72 -0)
unit/java/azkaban/scheduler/MockLoader.java 36(+36 -0)
unit/java/azkaban/test/jobExecutor/JavaJobTest.java 132(+132 -0)
Details
conf/azkaban.properties 7(+6 -1)
diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index ee78616..40d3d0b 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -19,11 +19,16 @@ project.global.properties=conf/global.properties
execution.directory=execution
execution.use.symlink=true
+#Schedule files
+schedule.directory=schedule
+schedule.path=schedule
+schedule.backup=backup
+
# Velocity dev mode
velocity.dev.mode=true
# Azkaban Jetty server properties. Ignored in tomcat
-jetty.maxThreads=10
+jetty.maxThreads=25
jetty.ssl.port=8443
jetty.port=8081
jetty.keystore=keystore
src/java/azkaban/executor/JobRunner.java 32(+31 -1)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index ca0ddb6..ff5f1e4 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -15,6 +15,9 @@ import azkaban.executor.ExecutableFlow.Status;
import azkaban.executor.event.Event;
import azkaban.executor.event.Event.Type;
import azkaban.executor.event.EventHandler;
+import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.jobExecutor.Job;
+import azkaban.jobExecutor.utils.JobWrappingFactory;
import azkaban.utils.Props;
public class JobRunner extends EventHandler implements Runnable {
@@ -30,6 +33,8 @@ public class JobRunner extends EventHandler implements Runnable {
private Layout loggerLayout = DEFAULT_LAYOUT;
private Appender jobAppender;
+ private Job job;
+
private static final Object logCreatorLock = new Object();
public JobRunner(ExecutableNode node, Props props, File workingDir) {
@@ -85,7 +90,6 @@ public class JobRunner extends EventHandler implements Runnable {
node.setStatus(Status.RUNNING);
this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
- //Just for testing 5 sec each round.
synchronized(this) {
try {
wait(5000);
@@ -94,12 +98,26 @@ public class JobRunner extends EventHandler implements Runnable {
logger.info("Job cancelled.");
}
}
+
// Run Job
boolean succeeded = true;
+ props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
+ JobWrappingFactory factory = JobWrappingFactory.getJobWrappingFactory();
+ job = factory.buildJobExecutor(props, logger);
+
+ try {
+ job.run();
+ } catch (Exception e) {
+ succeeded = false;
+ //logger.error("job run failed!");
+ e.printStackTrace();
+ }
+
node.setEndTime(System.currentTimeMillis());
if (succeeded) {
node.setStatus(Status.SUCCEEDED);
+ outputProps = job.getJobGeneratedProperties();
this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
} else {
node.setStatus(Status.FAILED);
@@ -111,6 +129,18 @@ public class JobRunner extends EventHandler implements Runnable {
public synchronized void cancel() {
// Cancel code here
+ if(job == null) {
+ logger.error("Job doesn't exisit!");
+ return;
+ }
+
+ try {
+ job.cancel();
+ } catch (Exception e) {
+ logger.error("Failed trying to cancel job!");
+ e.printStackTrace();
+ }
+
// will just interrupt, I guess, until the code is finished.
this.notifyAll();
src/java/azkaban/jobExecutor/AbstractJob.java 108(+108 -0)
diff --git a/src/java/azkaban/jobExecutor/AbstractJob.java b/src/java/azkaban/jobExecutor/AbstractJob.java
new file mode 100644
index 0000000..6292dee
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/AbstractJob.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+public abstract class AbstractJob implements Job {
+
+ public static final String JOB_TYPE = "type";
+ public static final String JOB_CLASS = "job.class";
+ public static final String JOB_PATH = "job.path";
+ public static final String JOB_FULLPATH = "job.fullpath";
+ public static final String JOB_ID = "job.id";
+
+
+ private final String _id;
+ private final Logger _log;
+ private volatile double _progress;
+
+// protected AbstractJob(String id) {
+// this(id, Logger.getLogger(id));
+// }
+
+ protected AbstractJob(String id, Logger log) {
+ _id = id;
+ _log = log;
+ _progress = 0.0;
+ }
+
+ public String getId() {
+ return _id;
+ }
+
+ public double getProgress() throws Exception {
+ return _progress;
+ }
+
+ public void setProgress(double progress) {
+ this._progress = progress;
+ }
+
+ public void cancel() throws Exception {
+ throw new RuntimeException("Job " + _id + " does not support cancellation!");
+ }
+
+ public Logger getLog() {
+ return this._log;
+ }
+
+ public void debug(String message) {
+ this._log.debug(message);
+ }
+
+ public void debug(String message, Throwable t) {
+ this._log.debug(message, t);
+ }
+
+ public void info(String message) {
+ this._log.info(message);
+ }
+
+ public void info(String message, Throwable t) {
+ this._log.info(message, t);
+ }
+
+ public void warn(String message) {
+ this._log.warn(message);
+ }
+
+ public void warn(String message, Throwable t) {
+ this._log.warn(message, t);
+ }
+
+ public void error(String message) {
+ this._log.error(message);
+ }
+
+ public void error(String message, Throwable t) {
+ this._log.error(message, t);
+ }
+
+ public Props getJobGeneratedProperties() {
+ return new Props();
+ }
+
+ public abstract void run() throws Exception;
+
+ public boolean isCanceled() {
+ return false;
+ }
+
+}
src/java/azkaban/jobExecutor/AbstractProcessJob.java 201(+201 -0)
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
new file mode 100644
index 0000000..dd860a1
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.commons.fileupload.util.Streams;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+import org.json.JSONObject;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.utils.JSONToJava;
+import azkaban.jobExecutor.utils.PropsUtils;
+
+/*
+ * A revised process-based job
+ *
+ * @author jkreps
+ *
+ */
+public abstract class AbstractProcessJob extends AbstractJob {
+
+// private static final Logger log = Logger
+// .getLogger(AbstractProcessJob.class);
+
+ private final Logger log;
+ public static final String ENV_PREFIX = "env.";
+ public static final String ENV_PREFIX_UCASE = "ENV.";
+ public static final String WORKING_DIR = "working.dir";
+ public static final String JOB_PROP_ENV = "JOB_PROP_FILE";
+ public static final String JOB_NAME_ENV = "JOB_NAME";
+ public static final String JOB_OUTPUT_PROP_FILE = "JOB_OUTPUT_PROP_FILE";
+
+ private static final JSONToJava jsonToJava = new JSONToJava();
+
+ protected final String _jobPath;
+
+// protected final Props props;
+ protected volatile Props _props;
+
+ protected String _cwd;
+
+ private volatile Props generatedPropeties;
+
+ protected AbstractProcessJob(final Props props, final Logger log) {
+ super(props.getString(JOB_ID, "unkownjob"), log);
+
+ _props = props;
+ _jobPath = props.getString(JOB_FULLPATH, new File(".").getAbsolutePath());
+
+ _cwd = getWorkingDirectory();
+ this.log = log;
+ }
+
+ public Props getProps() {
+ return _props;
+ }
+
+ public String getJobPath() {
+ return _jobPath;
+ }
+
+ protected void resolveProps() {
+ _props = PropsUtils.resolveProps(_props);
+ }
+
+ @Override
+ public Props getJobGeneratedProperties() {
+ return generatedPropeties;
+ }
+
+ /**
+ * initialize temporary and final property file
+ *
+ * @return {tmpPropFile, outputPropFile}
+ */
+ public File[] initPropsFiles() {
+ // Create properties file with additionally all input generated
+ // properties.
+ File[] files = new File[2];
+ files[0] = createFlattenedPropsFile(_cwd);
+
+ _props.put(ENV_PREFIX + JOB_PROP_ENV, files[0].getAbsolutePath());
+ _props.put(ENV_PREFIX + JOB_NAME_ENV, getId());
+
+ files[1] = createOutputPropsFile(getId(), _cwd);
+ _props.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE,
+ files[1].getAbsolutePath());
+
+ return files;
+ }
+
+ public String getCwd() {
+ return _cwd;
+ }
+
+ public Map<String, String> getEnvironmentVariables() {
+ Props props = getProps();
+ Map<String, String> envMap = props.getMapByPrefix(ENV_PREFIX);
+ envMap.putAll(props.getMapByPrefix(ENV_PREFIX_UCASE));
+ return envMap;
+ }
+
+ public String getWorkingDirectory() {
+ return getProps()//.getString(WORKING_DIR, ".");
+ .getString(WORKING_DIR, new File(_jobPath).getParent());
+ }
+
+ public Props loadOutputFileProps(final File outputPropertiesFile) {
+ InputStream reader = null;
+ try {
+ System.err.println("output properties file="
+ + outputPropertiesFile.getAbsolutePath());
+ reader = new BufferedInputStream(new FileInputStream(
+ outputPropertiesFile));
+
+ Props outputProps = new Props();
+ final String content = Streams.asString(reader).trim();
+
+ if (!content.isEmpty()) {
+ Map<String, Object> propMap = jsonToJava.apply(new JSONObject(
+ content));
+
+ for (Map.Entry<String, Object> entry : propMap.entrySet()) {
+ outputProps
+ .put(entry.getKey(), entry.getValue().toString());
+ }
+ }
+ return outputProps;
+ } catch (FileNotFoundException e) {
+ log.info(String.format(
+ "File[%s] wasn't found, returning empty props.",
+ outputPropertiesFile));
+ return new Props();
+ } catch (Exception e) {
+ log.error(
+ "Exception thrown when trying to load output file props. Returning empty Props instead of failing. Is this really the best thing to do?",
+ e);
+ return new Props();
+ } finally {
+ IOUtils.closeQuietly(reader);
+ }
+ }
+
+ public File createFlattenedPropsFile(final String workingDir) {
+ File directory = new File(workingDir);
+ File tempFile = null;
+ try {
+ tempFile = File.createTempFile(getId() + "_", "_tmp", directory);
+ _props.storeFlattened(tempFile);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create temp property file ",
+ e);
+ }
+
+ return tempFile;
+ }
+
+ public static File createOutputPropsFile(final String id,
+ final String workingDir) {
+ System.err.println("cwd=" + workingDir);
+
+ File directory = new File(workingDir);
+ File tempFile = null;
+ try {
+ tempFile = File.createTempFile(id + "_output_", "_tmp", directory);
+ } catch (IOException e) {
+ System.err
+ .println("Failed to create temp output property file :\n");
+ e.printStackTrace(System.err);
+ throw new RuntimeException(
+ "Failed to create temp output property file ", e);
+ }
+ return tempFile;
+ }
+
+ public void generateProperties(final File outputFile) {
+ generatedPropeties = loadOutputFileProps(outputFile);
+ }
+
+}
src/java/azkaban/jobExecutor/JavaJob.java 103(+103 -0)
diff --git a/src/java/azkaban/jobExecutor/JavaJob.java b/src/java/azkaban/jobExecutor/JavaJob.java
new file mode 100644
index 0000000..c951de1
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/JavaJob.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import java.io.File;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+public class JavaJob extends JavaProcessJob {
+
+ public static final String RUN_METHOD_PARAM = "method.run";
+ public static final String CANCEL_METHOD_PARAM = "method.cancel";
+ public static final String PROGRESS_METHOD_PARAM = "method.progress";
+
+ public static final String JOB_CLASS = "job.class";
+ public static final String DEFAULT_CANCEL_METHOD = "cancel";
+ public static final String DEFAULT_RUN_METHOD = "run";
+ public static final String DEFAULT_PROGRESS_METHOD = "getProgress";
+
+ private String _runMethod;
+ private String _cancelMethod;
+ private String _progressMethod;
+
+ private Object _javaObject = null;
+ private String props;
+
+ public JavaJob(Props props, Logger log) {
+ super(props, log);
+ }
+
+ @Override
+ protected List<String> getClassPaths() {
+ List<String> classPath = super.getClassPaths();
+
+ classPath.add(getSourcePathFromClass(JavaJobRunnerMain.class));
+ String loggerPath = getSourcePathFromClass(org.apache.log4j.Logger.class);
+ if (!classPath.contains(loggerPath)) {
+ classPath.add(loggerPath);
+ }
+
+ // Add hadoop home to classpath
+ String hadoopHome = System.getenv("HADOOP_HOME");
+ if (hadoopHome == null) {
+ info("HADOOP_HOME not set, using default hadoop config.");
+ } else {
+ info("Using hadoop config found in " + hadoopHome);
+ classPath.add(new File(hadoopHome, "conf").getPath());
+ }
+
+ return classPath;
+ }
+
+ private static String getSourcePathFromClass(Class containedClass) {
+ File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+ if (!file.isDirectory() && file.getName().endsWith(".class")) {
+ String name = containedClass.getName();
+ StringTokenizer tokenizer = new StringTokenizer(name, ".");
+ while(tokenizer.hasMoreTokens()) {
+ tokenizer.nextElement();
+
+ file = file.getParentFile();
+ }
+
+ return file.getPath();
+ }
+ else {
+ return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
+ }
+ }
+
+ @Override
+ protected String getJavaClass() {
+ return JavaJobRunnerMain.class.getName();
+ }
+
+ @Override
+ public String toString() {
+ return "JavaJob{" + "_runMethod='" + _runMethod + '\''
+ + ", _cancelMethod='" + _cancelMethod + '\''
+ + ", _progressMethod='" + _progressMethod + '\''
+ + ", _javaObject=" + _javaObject + ", props="
+ + props + '}';
+ }
+}
src/java/azkaban/jobExecutor/JavaJobRunnerMain.java 279(+279 -0)
diff --git a/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java b/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java
new file mode 100644
index 0000000..eaff543
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.utils.SecurityUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class JavaJobRunnerMain {
+
+ public static final String JOB_CLASS = "job.class";
+ public static final String DEFAULT_RUN_METHOD = "run";
+ public static final String DEFAULT_CANCEL_METHOD = "cancel";
+
+ // This is the Job interface method to get the properties generated by the job.
+ public static final String GET_GENERATED_PROPERTIES_METHOD = "getJobGeneratedProperties";
+
+ public static final String CANCEL_METHOD_PARAM = "method.cancel";
+ public static final String RUN_METHOD_PARAM = "method.run";
+ public static final String PROPS_CLASS = "azkaban.utils.Props";
+
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout("%p %m\n");
+
+ public final Logger _logger;
+
+ public String _cancelMethod;
+ public String _jobName;
+ public Object _javaObject;
+ private boolean _isFinished = false;
+
+ public static void main(String[] args) throws Exception {
+ @SuppressWarnings("unused")
+ JavaJobRunnerMain wrapper = new JavaJobRunnerMain();
+ }
+
+ public JavaJobRunnerMain() throws Exception {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ cancelJob();
+ }
+ }
+ );
+
+ try {
+ _jobName = System.getenv(ProcessJob.JOB_NAME_ENV);
+ String propsFile = System.getenv(ProcessJob.JOB_PROP_ENV);
+
+ _logger = Logger.getRootLogger();
+ _logger.removeAllAppenders();
+ ConsoleAppender appender = new ConsoleAppender(DEFAULT_LAYOUT);
+ appender.activateOptions();
+ _logger.addAppender(appender);
+
+ Properties prop = new Properties();
+ prop.load(new BufferedReader(new FileReader(propsFile)));
+
+ _logger.info("Running job " + _jobName);
+ String className = prop.getProperty(JOB_CLASS);
+ if(className == null) {
+ throw new Exception("Class name is not set.");
+ }
+ _logger.info("Class name " + className);
+
+ // Create the object using proxy
+ if (SecurityUtils.shouldProxy(prop)) {
+ _javaObject = getObjectAsProxyUser(prop, _logger, _jobName, className);
+ }
+ else {
+ _javaObject = getObject(_jobName, className, prop);
+ }
+ if(_javaObject == null) {
+ _logger.info("Could not create java object to run job: " + className);
+ throw new Exception("Could not create running object");
+ }
+
+ _cancelMethod = prop.getProperty(CANCEL_METHOD_PARAM, DEFAULT_CANCEL_METHOD);
+
+ final String runMethod = prop.getProperty(RUN_METHOD_PARAM, DEFAULT_RUN_METHOD);
+ _logger.info("Invoking method " + runMethod);
+
+ if(SecurityUtils.shouldProxy(prop)) {
+ _logger.info("Proxying enabled.");
+ runMethodAsProxyUser(prop, _javaObject, runMethod);
+ } else {
+ _logger.info("Proxy check failed, not proxying run.");
+ runMethod(_javaObject, runMethod);
+ }
+ _isFinished = true;
+
+ // Get the generated properties and store them to disk, to be read by ProcessJob.
+ try {
+ final Method generatedPropertiesMethod = _javaObject.getClass().getMethod(GET_GENERATED_PROPERTIES_METHOD, new Class<?>[]{});
+ Props outputGendProps = (Props) generatedPropertiesMethod.invoke(_javaObject, new Object[] {});
+ outputGeneratedProperties(outputGendProps);
+ } catch (NoSuchMethodException e) {
+ _logger.info(
+ String.format(
+ "Apparently there isn't a method[%s] on object[%s], using empty Props object instead.",
+ GET_GENERATED_PROPERTIES_METHOD,
+ _javaObject
+ )
+ );
+ outputGeneratedProperties(new Props());
+ }
+ } catch (Exception e) {
+ _isFinished = true;
+ throw e;
+ }
+ }
+
+ private void runMethodAsProxyUser(Properties prop, final Object obj, final String runMethod) throws IOException, InterruptedException {
+ SecurityUtils.getProxiedUser(prop, _logger, new Configuration()).doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ runMethod(obj, runMethod);
+ return null;
+ }
+ });
+ }
+
+
+
+ private void runMethod(Object obj, String runMethod) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+ obj.getClass().getMethod(runMethod, new Class<?>[] {}).invoke(obj);
+ }
+
+ private void outputGeneratedProperties(Props outputProperties)
+ {
+ _logger.info("Outputting generated properties to " + ProcessJob.JOB_OUTPUT_PROP_FILE);
+ if (outputProperties == null) {
+ _logger.info(" no gend props");
+ return;
+ }
+ for (String key : outputProperties.getKeySet()) {
+ _logger.info(" gend prop " + key + " value:" + outputProperties.get(key));
+ }
+ String outputFileStr = System.getenv(ProcessJob.JOB_OUTPUT_PROP_FILE);
+ if (outputFileStr == null) {
+ return;
+ }
+
+ Map<String, String> properties = new LinkedHashMap<String, String>();
+ for (String key : outputProperties.getKeySet()) {
+ properties.put(key, outputProperties.get(key));
+ }
+
+ OutputStream writer = null;
+ try {
+ writer = new BufferedOutputStream(new FileOutputStream(outputFileStr));
+ // Manually serialize into JSON instead of adding org.json to external classpath
+ writer.write("{\n".getBytes());
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ writer.write(String.format(
+ " '%s':'%s',\n",
+ entry.getKey().replace("'", "\\'"),
+ entry.getValue().replace("'", "\\'")
+ ).getBytes());
+ }
+ writer.write("}".getBytes());
+ }
+ catch (Exception e) {
+ new RuntimeException("Unable to store output properties to: " + outputFileStr);
+ }
+ finally {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ public void cancelJob() {
+ if (_isFinished) {
+ return;
+ }
+ _logger.info("Attempting to call cancel on this job");
+ if (_javaObject != null) {
+ Method method = null;
+
+ try {
+ method = _javaObject.getClass().getMethod(_cancelMethod);
+ } catch(SecurityException e) {
+ } catch(NoSuchMethodException e) {
+ }
+
+ if (method != null)
+ try {
+ method.invoke(_javaObject);
+ } catch(Exception e) {
+ if (_logger != null) {
+ _logger.error("Cancel method failed! ", e);
+ }
+ }
+ else {
+ throw new RuntimeException("Job " + _jobName + " does not have cancel method " + _cancelMethod);
+ }
+ }
+ }
+
+ private static Object getObjectAsProxyUser(final Properties prop, final Logger logger, final String jobName, final String className) throws Exception{
+ Object obj = SecurityUtils.getProxiedUser(prop, logger, new Configuration()).doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ return getObject(jobName, className, prop);
+ }
+ });
+
+ return obj;
+ }
+
+ private static Object getObject(String jobName, String className, Properties properties)
+ throws Exception {
+ Class<?> runningClass = JavaJobRunnerMain.class.getClassLoader().loadClass(className);
+
+ if(runningClass == null) {
+ throw new Exception("Class " + className + " was not found. Cannot run job.");
+ }
+
+ Class<?> propsClass = JavaJobRunnerMain.class.getClassLoader().loadClass(PROPS_CLASS);
+
+ Object obj = null;
+ if(propsClass != null && getConstructor(runningClass, String.class, propsClass) != null) {
+ // This case covers the use of azkaban.common.utils.Props with the
+ Constructor<?> con = getConstructor(propsClass, propsClass, Properties[].class);
+ Object props = con.newInstance(null, new Properties[] { properties });
+ obj = getConstructor(runningClass, String.class, propsClass).newInstance(jobName, props);
+ } else if(getConstructor(runningClass, String.class, Properties.class) != null) {
+ obj = getConstructor(runningClass, String.class, Properties.class).newInstance(jobName,
+ properties);
+ } else if(getConstructor(runningClass, String.class) != null) {
+ obj = getConstructor(runningClass, String.class).newInstance(jobName);
+ } else if(getConstructor(runningClass) != null) {
+ obj = getConstructor(runningClass).newInstance();
+ }
+ return obj;
+ }
+
+ private static Constructor<?> getConstructor(Class<?> c, Class<?>... args) {
+ try {
+ Constructor<?> cons = c.getConstructor(args);
+ return cons;
+ } catch(NoSuchMethodException e) {
+ return null;
+ }
+ }
+
+}
src/java/azkaban/jobExecutor/JavaProcessJob.java 148(+148 -0)
diff --git a/src/java/azkaban/jobExecutor/JavaProcessJob.java b/src/java/azkaban/jobExecutor/JavaProcessJob.java
new file mode 100644
index 0000000..bd41935
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+public class JavaProcessJob extends ProcessJob {
+// private static final Logger log = Logger
+// .getLogger(JavaProcessJob.class);
+// private Logger log;
+
+ public static final String CLASSPATH = "classpath";
+ public static final String GLOBAL_CLASSPATH = "global.classpaths";
+ public static final String JAVA_CLASS = "java.class";
+ public static final String INITIAL_MEMORY_SIZE = "Xms";
+ public static final String MAX_MEMORY_SIZE = "Xmx";
+ public static final String MAIN_ARGS = "main.args";
+ public static final String JVM_PARAMS = "jvm.args";
+ public static final String GLOBAL_JVM_PARAMS = "global.jvm.args";
+
+ public static final String DEFAULT_INITIAL_MEMORY_SIZE = "64M";
+ public static final String DEFAULT_MAX_MEMORY_SIZE = "256M";
+
+ public static String JAVA_COMMAND = "java";
+
+ public JavaProcessJob(Props prop, Logger logger) {
+ super(prop, logger);
+ }
+
+ @Override
+ protected List<String> getCommandList() {
+ ArrayList<String> list = new ArrayList<String>();
+ list.add(createCommandLine());
+ return list;
+ }
+
+ protected String createCommandLine() {
+ String command = JAVA_COMMAND + " ";
+ command += getJVMArguments() + " ";
+ command += "-Xms" + getInitialMemorySize() + " ";
+ command += "-Xmx" + getMaxMemorySize() + " ";
+ command += "-cp " + createArguments(getClassPaths(), ":") + " ";
+ command += getJavaClass() + " ";
+ command += getMainArguments();
+
+ return command;
+ }
+
+ protected String getJavaClass() {
+ return getProps().getString(JAVA_CLASS);
+ }
+
+ protected String getClassPathParam() {
+ List<String> classPath = getClassPaths();
+ if (classPath == null || classPath.size() == 0) {
+ return "";
+ }
+
+ return "-cp " + createArguments(classPath, ":") + " ";
+ }
+
+ protected List<String> getClassPaths() {
+ List<String> classPaths = getProps().getStringList(CLASSPATH, null, ",");
+
+ ArrayList<String> classpathList = new ArrayList<String>();
+ // Adding global properties used system wide.
+ if (getProps().containsKey(GLOBAL_CLASSPATH)) {
+ List<String> globalClasspath = getProps().getStringList(GLOBAL_CLASSPATH);
+ for (String global: globalClasspath) {
+ getLog().info("Adding to global classpath:" + global);
+ classpathList.add(global);
+ }
+ }
+
+ if (classPaths == null) {
+ File path = new File(getPath());
+ File parent = path.getParentFile();
+
+ for (File file : parent.listFiles()) {
+ if (file.getName().endsWith(".jar")) {
+ //log.info("Adding to classpath:" + file.getName());
+ classpathList.add(file.getName());
+ }
+ }
+ }
+ else {
+ classpathList.addAll(classPaths);
+ }
+
+ return classpathList;
+ }
+
+ protected String getInitialMemorySize() {
+ return getProps().getString(INITIAL_MEMORY_SIZE,
+ DEFAULT_INITIAL_MEMORY_SIZE);
+ }
+
+ protected String getMaxMemorySize() {
+ return getProps().getString(MAX_MEMORY_SIZE, DEFAULT_MAX_MEMORY_SIZE);
+ }
+
+ protected String getMainArguments() {
+ return getProps().getString(MAIN_ARGS, "");
+ }
+
+ protected String getJVMArguments() {
+ String globalJVMArgs = getProps().getString(GLOBAL_JVM_PARAMS, null);
+
+ if (globalJVMArgs == null) {
+ return getProps().getString(JVM_PARAMS, "");
+ }
+
+ return globalJVMArgs + " " + getProps().getString(JVM_PARAMS, "");
+ }
+
+ protected String createArguments(List<String> arguments, String separator) {
+ if (arguments != null && arguments.size() > 0) {
+ String param = "";
+ for (String arg : arguments) {
+ param += arg + separator;
+ }
+
+ return param.substring(0, param.length() - 1);
+ }
+
+ return "";
+ }
+}
src/java/azkaban/jobExecutor/Job.java 75(+75 -0)
diff --git a/src/java/azkaban/jobExecutor/Job.java b/src/java/azkaban/jobExecutor/Job.java
new file mode 100644
index 0000000..a7c59e5
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/Job.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import azkaban.utils.Props;
+
+
+
+/**
+ * This interface defines a Raw Job interface. Each job defines
+ * <ul>
+ * <li>Job Type : {HADOOP, UNIX, JAVA, SUCCESS_TEST, CONTROLLER}</li>
+ * <li>Job ID/Name : {String}</li>
+ * <li>Arguments: Key/Value Map for Strings</li>
+ * </ul>
+ *
+ * A job is required to have a constructor Job(String jobId, Props props)
+ */
+
+public interface Job {
+
+ /**
+ * Returns a unique(should be checked in xml) string name/id for the Job.
+ *
+ * @return
+ */
+ public String getId();
+
+ /**
+ * Run the job. In general this method can only be run once. Must either
+ * succeed or throw an exception.
+ */
+ public void run() throws Exception;
+
+ /**
+ * Best effort attempt to cancel the job.
+ *
+ * @throws Exception If cancel fails
+ */
+ public void cancel() throws Exception;
+
+ /**
+ * Returns a progress report between [0 - 1.0] to indicate the percentage
+ * complete
+ *
+ * @throws Exception If getting progress fails
+ */
+ public double getProgress() throws Exception;
+
+ /**
+ * Get the generated properties from this job.
+ * @return
+ */
+ public Props getJobGeneratedProperties();
+
+ /**
+ * Determine if the job was cancelled.
+ * @return
+ */
+ public boolean isCanceled();
+}
src/java/azkaban/jobExecutor/LongArgJob.java 129(+129 -0)
diff --git a/src/java/azkaban/jobExecutor/LongArgJob.java b/src/java/azkaban/jobExecutor/LongArgJob.java
new file mode 100644
index 0000000..3bee6d5
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/LongArgJob.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.utils.process.AzkabanProcess;
+import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
+
+
+/**
+ * A job that passes all the job properties as command line arguments in "long" format,
+ * e.g. --key1 value1 --key2 value2 ...
+ *
+ * @author jkreps
+ *
+ */
+public abstract class LongArgJob extends AbstractProcessJob {
+
+ private static final long KILL_TIME_MS = 5000;
+ private final AzkabanProcessBuilder builder;
+ private volatile AzkabanProcess process;
+
+ public LongArgJob(String[] command, Props prop, Logger log) {
+ this(command, prop, log, new HashSet<String>(0));
+ }
+
+ public LongArgJob(String[] command, Props prop, Logger log, Set<String> suppressedKeys) {
+ //super(command, desc);
+ super(prop, log);
+ //String cwd = descriptor.getProps().getString(WORKING_DIR, new File(descriptor.getFullPath()).getParent());
+
+ this.builder = new AzkabanProcessBuilder(command).
+ setEnv(getProps().getMapByPrefix(ENV_PREFIX)).
+ setWorkingDir(getCwd()).
+ setLogger(getLog());
+ appendProps(suppressedKeys);
+ }
+
+ public void run() throws Exception {
+
+ resolveProps();
+
+ long startMs = System.currentTimeMillis();
+ info("Command: " + builder.getCommandString());
+ if(builder.getEnv().size() > 0)
+ info("Environment variables: " + builder.getEnv());
+ info("Working directory: " + builder.getWorkingDir());
+
+ File [] propFiles = initPropsFiles( );
+ //System.err.println("outputfile=" + propFiles[1]);
+
+ boolean success = false;
+ this.process = builder.build();
+ try {
+ this.process.run();
+ success = true;
+ }
+ catch (Exception e) {
+ for (File file: propFiles) if (file != null && file.exists()) file.delete();
+ throw new RuntimeException (e);
+ }
+ finally {
+ this.process = null;
+ info("Process completed " + (success? "successfully" : "unsuccessfully") + " in " + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
+ }
+
+ // Get the output properties from this job.
+ generateProperties(propFiles[1]);
+
+ for (File file: propFiles)
+ if (file != null && file.exists()) file.delete();
+ }
+
+
+
+ /**
+ * This gives access to the process builder used to construct the process. An overriding class can use this to
+ * add to the command being executed.
+ */
+ protected AzkabanProcessBuilder getBuilder() {
+ return this.builder;
+ }
+
+ @Override
+ public void cancel() throws InterruptedException {
+ if(process == null)
+ throw new IllegalStateException("Not started.");
+ boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
+ if(!killed) {
+ warn("Kill with signal TERM failed. Killing with KILL signal.");
+ process.hardKill();
+ }
+ }
+
+ @Override
+ public double getProgress() {
+ return process != null && process.isComplete()? 1.0 : 0.0;
+ }
+
+ private void appendProps(Set<String> suppressed) {
+ AzkabanProcessBuilder builder = this.getBuilder();
+ Props props = getProps();
+ for(String key: props.getKeySet())
+ if(!suppressed.contains(key))
+ builder.addArg("--" + key, props.get(key));
+ }
+
+
+}
src/java/azkaban/jobExecutor/NoopJob.java 64(+64 -0)
diff --git a/src/java/azkaban/jobExecutor/NoopJob.java b/src/java/azkaban/jobExecutor/NoopJob.java
new file mode 100644
index 0000000..e364b2f
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/NoopJob.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+/**
+ *
+ */
+public class NoopJob implements Job
+{
+ public NoopJob(Props props, Logger log)
+ {
+
+ }
+
+ @Override
+ public String getId()
+ {
+ return "Azkaban!! -- " + getClass().getName();
+ }
+
+ @Override
+ public void run() throws Exception
+ {
+ }
+
+ @Override
+ public void cancel() throws Exception
+ {
+ }
+
+ @Override
+ public double getProgress() throws Exception
+ {
+ return 0;
+ }
+
+ @Override
+ public Props getJobGeneratedProperties()
+ {
+ return new Props();
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return false;
+ }
+}
src/java/azkaban/jobExecutor/PigProcessJob.java 179(+179 -0)
diff --git a/src/java/azkaban/jobExecutor/PigProcessJob.java b/src/java/azkaban/jobExecutor/PigProcessJob.java
new file mode 100644
index 0000000..3a5a15d
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/PigProcessJob.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import azkaban.jobExecutor.utils.StringUtils;
+import azkaban.utils.Props;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.log4j.Logger;
+
+import static azkaban.jobExecutor.utils.SecurityUtils.PROXY_KEYTAB_LOCATION;
+import static azkaban.jobExecutor.utils.SecurityUtils.PROXY_USER;
+import static azkaban.jobExecutor.utils.SecurityUtils.TO_PROXY;
+import static azkaban.jobExecutor.utils.SecurityUtils.shouldProxy;
+import static azkaban.jobExecutor.SecurePigWrapper.OBTAIN_BINARY_TOKEN;
+
+public class PigProcessJob extends JavaProcessJob {
+
+ public static final String PIG_SCRIPT = "pig.script";
+ public static final String UDF_IMPORT = "udf.import.list";
+ public static final String PIG_PARAM_PREFIX = "param.";
+ public static final String PIG_PARAM_FILES = "paramfile";
+ public static final String HADOOP_UGI = "hadoop.job.ugi";
+ public static final String DEBUG = "debug";
+
+ public static final String PIG_JAVA_CLASS = "org.apache.pig.Main";
+ public static final String SECURE_PIG_WRAPPER = "azkaban.jobExecutor.SecurePigWrapper";
+
+ public PigProcessJob(Props props, Logger log) {
+ super(props, log);
+ }
+
+ @Override
+ protected String getJavaClass() {
+ return shouldProxy(getProps().toProperties()) ? SECURE_PIG_WRAPPER : PIG_JAVA_CLASS;
+ }
+
+ @Override
+ protected String getJVMArguments() {
+ String args = super.getJVMArguments();
+
+ List<String> udfImport = getUDFImportList();
+ if (udfImport != null) {
+ args += " -Dudf.import.list=" + super.createArguments(udfImport, ":");
+ }
+
+ String hadoopUGI = getHadoopUGI();
+ if (hadoopUGI != null) {
+ args += " -Dhadoop.job.ugi=" + hadoopUGI;
+ }
+
+ if(shouldProxy(getProps().toProperties())) {
+ info("Setting up secure proxy info for child process");
+ String secure;
+ Properties p = getProps().toProperties();
+ secure = " -D" + PROXY_USER + "=" + p.getProperty(PROXY_USER);
+ secure += " -D" + PROXY_KEYTAB_LOCATION + "=" + p.getProperty(PROXY_KEYTAB_LOCATION);
+ secure += " -D" + TO_PROXY + "=" + p.getProperty(TO_PROXY);
+ String extraToken = p.getProperty(OBTAIN_BINARY_TOKEN);
+ if(extraToken != null) {
+ secure += " -D" + OBTAIN_BINARY_TOKEN + "=" + extraToken;
+ }
+ info("Secure settings = " + secure);
+ args += secure;
+ } else {
+ info("Not setting up secure proxy info for child process");
+ }
+
+ return args;
+ }
+
+ @Override
+ protected String getMainArguments() {
+ ArrayList<String> list = new ArrayList<String>();
+ Map<String, String> map = getPigParams();
+ if (map != null) {
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ list.add("-param " + StringUtils.shellQuote(entry.getKey() + "=" + entry.getValue(), StringUtils.SINGLE_QUOTE));
+ }
+ }
+
+ List<String> paramFiles = getPigParamFiles();
+ if (paramFiles != null) {
+ for (String paramFile : paramFiles) {
+ list.add("-param_file " + paramFile);
+ }
+ }
+
+ if (getDebug()) {
+ list.add("-debug");
+ }
+
+ list.add(getScript());
+
+ return org.apache.commons.lang.StringUtils.join(list, " ");
+ }
+
+ @Override
+ protected List<String> getClassPaths() {
+ List<String> classPath = super.getClassPaths();
+
+ // Add hadoop home setting.
+ String hadoopHome = System.getenv("HADOOP_HOME");
+ if (hadoopHome == null) {
+ info("HADOOP_HOME not set, using default hadoop config.");
+ } else {
+ info("Using hadoop config found in " + hadoopHome);
+ classPath.add(new File(hadoopHome, "conf").getPath());
+ }
+
+ if(shouldProxy(getProps().toProperties())) {
+ classPath.add(getSourcePathFromClass(SecurePigWrapper.class));
+ }
+ return classPath;
+ }
+
+ protected boolean getDebug() {
+ return getProps().getBoolean(DEBUG, false);
+ }
+
+ protected String getScript() {
+ return getProps().getString(PIG_SCRIPT, getJobName() + ".pig");
+ }
+
+ protected List<String> getUDFImportList() {
+ return getProps().getStringList(UDF_IMPORT, null, ",");
+ }
+
+ protected String getHadoopUGI() {
+ return getProps().getString(HADOOP_UGI, null);
+ }
+
+ protected Map<String, String> getPigParams() {
+ return getProps().getMapByPrefix(PIG_PARAM_PREFIX);
+ }
+
+ protected List<String> getPigParamFiles() {
+ return getProps().getStringList(PIG_PARAM_FILES, null, ",");
+ }
+
+
+ private static String getSourcePathFromClass(Class containedClass) {
+ File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+ if (!file.isDirectory() && file.getName().endsWith(".class")) {
+ String name = containedClass.getName();
+ StringTokenizer tokenizer = new StringTokenizer(name, ".");
+ while(tokenizer.hasMoreTokens()) {
+ tokenizer.nextElement();
+ file = file.getParentFile();
+ }
+
+ return file.getPath();
+ }
+ else {
+ return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
+ }
+ }
+}
src/java/azkaban/jobExecutor/ProcessJob.java 347(+347 -0)
diff --git a/src/java/azkaban/jobExecutor/ProcessJob.java b/src/java/azkaban/jobExecutor/ProcessJob.java
new file mode 100644
index 0000000..06050c8
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/ProcessJob.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+/*
+ * A job that runs a simple unix command
+ *
+ * @author jkreps
+ *
+ */
+public class ProcessJob extends AbstractProcessJob implements Job {
+
+ public static final String COMMAND = "command";
+ public static final int CLEAN_UP_TIME_MS = 1000;
+
+ private volatile Process _process;
+ private volatile boolean _isComplete;
+ private volatile boolean _isCancelled;
+
+ public ProcessJob(final Props props, final Logger log) {
+ super(props, log);
+ }
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ _isCancelled = false;
+ }
+ resolveProps();
+
+ // Sets a list of all the commands that need to be run.
+ List<String> commands = getCommandList();
+ info(commands.size() + " commands to execute.");
+
+ File[] propFiles = initPropsFiles();
+
+ // System.err.println("in process job outputFile=" +propFiles[1]);
+
+ // For each of the jobs, set up a process and run them.
+ for (String command : commands) {
+ info("Executing command: " + command);
+ String[] cmdPieces = partitionCommandLine(command);
+
+ ProcessBuilder builder = new ProcessBuilder(cmdPieces);
+
+ builder.directory(new File(getCwd()));
+ builder.environment().putAll(getEnvironmentVariables());
+
+ try {
+ _process = builder.start();
+ } catch (IOException e) {
+ for (File file : propFiles) {
+ if (file != null && file.exists()) {
+ file.delete();
+ }
+ }
+ throw new RuntimeException(e);
+ }
+ LoggingGobbler outputGobbler = new LoggingGobbler(
+ new InputStreamReader(_process.getInputStream()),
+ Level.INFO);
+ LoggingGobbler errorGobbler = new LoggingGobbler(
+ new InputStreamReader(_process.getErrorStream()),
+ Level.ERROR);
+
+ int processId = getProcessId();
+ if (processId == 0) {
+ info("Spawned thread. Unknowned processId");
+ } else {
+ info("Spawned thread with processId " + processId);
+ }
+ outputGobbler.start();
+ errorGobbler.start();
+ int exitCode = -999;
+ try {
+ exitCode = _process.waitFor();
+
+ _isComplete = true;
+ if (exitCode != 0) {
+ for (File file : propFiles) {
+ if (file != null && file.exists()) {
+ file.delete();
+ }
+ }
+ throw new RuntimeException(
+ "Processes ended with exit code " + exitCode + ".");
+ }
+
+ // try to wait for everything to get logged out before exiting
+ outputGobbler.join(1000);
+ errorGobbler.join(1000);
+ } catch (InterruptedException e) {
+ } finally {
+ outputGobbler.close();
+ errorGobbler.close();
+ }
+ }
+
+ // Get the output properties from this job.
+ generateProperties(propFiles[1]);
+
+ for (File file : propFiles) {
+ if (file != null && file.exists()) {
+ file.delete();
+ }
+ }
+
+ }
+
+ protected List<String> getCommandList() {
+ List<String> commands = new ArrayList<String>();
+ commands.add(_props.getString(COMMAND));
+ for (int i = 1; _props.containsKey(COMMAND + "." + i); i++) {
+ commands.add(_props.getString(COMMAND + "." + i));
+ }
+
+ return commands;
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ if (_process != null) {
+ int processId = getProcessId();
+ if (processId != 0) {
+ warn("Attempting to kill the process " + processId);
+ try {
+ Runtime.getRuntime().exec("kill " + processId);
+ synchronized (this) {
+ wait(CLEAN_UP_TIME_MS);
+ }
+ } catch (InterruptedException e) {
+ // Do nothing. We don't really care.
+ }
+ if (!_isComplete) {
+ error("After "
+ + CLEAN_UP_TIME_MS
+ + " ms, the job hasn't terminated. Will force terminate the job.");
+ }
+ } else {
+ info("Cound not get process id");
+ }
+
+ if (!_isComplete) {
+ warn("Force kill the process");
+ _process.destroy();
+ }
+ synchronized (this) {
+ _isCancelled = true;
+ }
+ }
+ }
+
+ public int getProcessId() {
+ int processId = 0;
+
+ try {
+ Field f = _process.getClass().getDeclaredField("pid");
+ f.setAccessible(true);
+
+ processId = f.getInt(_process);
+ } catch (Throwable e) {
+ }
+
+ return processId;
+ }
+
+ @Override
+ public double getProgress() {
+ return _isComplete ? 1.0 : 0.0;
+ }
+
+ private class LoggingGobbler extends Thread {
+
+ private final BufferedReader _inputReader;
+ private final Level _loggingLevel;
+
+ public LoggingGobbler(final InputStreamReader inputReader,
+ final Level level) {
+ _inputReader = new BufferedReader(inputReader);
+ _loggingLevel = level;
+ }
+
+ public void close() {
+ if (_inputReader != null) {
+ try {
+ _inputReader.close();
+ } catch (IOException e) {
+ error("Error cleaning up logging stream reader:", e);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ String line = _inputReader.readLine();
+ if (line == null) {
+ return;
+ }
+
+ logMessage(line);
+ }
+ } catch (IOException e) {
+ error("Error reading from logging stream:", e);
+ }
+ }
+
+ private void logMessage(final String message) {
+ if (message.startsWith(Level.DEBUG.toString())) {
+ String newMsg = message.substring(Level.DEBUG.toString()
+ .length());
+ getLog().debug(newMsg);
+ } else if (message.startsWith(Level.ERROR.toString())) {
+ String newMsg = message.substring(Level.ERROR.toString()
+ .length());
+ getLog().error(newMsg);
+ } else if (message.startsWith(Level.INFO.toString())) {
+ String newMsg = message.substring(Level.INFO.toString()
+ .length());
+ getLog().info(newMsg);
+ } else if (message.startsWith(Level.WARN.toString())) {
+ String newMsg = message.substring(Level.WARN.toString()
+ .length());
+ getLog().warn(newMsg);
+ } else if (message.startsWith(Level.FATAL.toString())) {
+ String newMsg = message.substring(Level.FATAL.toString()
+ .length());
+ getLog().fatal(newMsg);
+ } else if (message.startsWith(Level.TRACE.toString())) {
+ String newMsg = message.substring(Level.TRACE.toString()
+ .length());
+ getLog().trace(newMsg);
+ } else {
+ getLog().log(_loggingLevel, message);
+ }
+
+ }
+ }
+
+ @Override
+ public Props getProps() {
+ return _props;
+ }
+
+ public String getPath() {
+ return _jobPath;
+ }
+
+ public String getJobName() {
+ return getId();
+ }
+
+
+ /**
+ * Splits the command into a unix like command line structure. Quotes and
+ * single quotes are treated as nested strings.
+ *
+ * @param command
+ * @return
+ */
+ public static String[] partitionCommandLine(final String command) {
+ ArrayList<String> commands = new ArrayList<String>();
+
+ int index = 0;
+
+ StringBuffer buffer = new StringBuffer(command.length());
+
+ boolean isApos = false;
+ boolean isQuote = false;
+ while (index < command.length()) {
+ char c = command.charAt(index);
+
+ switch (c) {
+ case ' ':
+ if (!isQuote && !isApos) {
+ String arg = buffer.toString();
+ buffer = new StringBuffer(command.length() - index);
+ if (arg.length() > 0) {
+ commands.add(arg);
+ }
+ } else {
+ buffer.append(c);
+ }
+ break;
+ case '\'':
+ if (!isQuote) {
+ isApos = !isApos;
+ } else {
+ buffer.append(c);
+ }
+ break;
+ case '"':
+ if (!isApos) {
+ isQuote = !isQuote;
+ } else {
+ buffer.append(c);
+ }
+ break;
+ default:
+ buffer.append(c);
+ }
+
+ index++;
+ }
+
+ if (buffer.length() > 0) {
+ String arg = buffer.toString();
+ commands.add(arg);
+ }
+
+ return commands.toArray(new String[commands.size()]);
+ }
+
+ @Override
+ public synchronized boolean isCanceled() {
+ return _isCancelled;
+ }
+
+}
src/java/azkaban/jobExecutor/PythonJob.java 43(+43 -0)
diff --git a/src/java/azkaban/jobExecutor/PythonJob.java b/src/java/azkaban/jobExecutor/PythonJob.java
new file mode 100644
index 0000000..ad83c38
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/PythonJob.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+
+import azkaban.utils.Props;
+
+
+
+public class PythonJob extends LongArgJob {
+
+ private static final String PYTHON_BINARY_KEY = "python";
+ private static final String SCRIPT_KEY = "script";
+
+
+ public PythonJob(Props props, Logger log) {
+ super(new String[]{props.getString(PYTHON_BINARY_KEY, "python"),
+ props.getString(SCRIPT_KEY)},
+ props,
+ log,
+ ImmutableSet.of(PYTHON_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
+ }
+
+
+
+
+}
src/java/azkaban/jobExecutor/RubyJob.java 40(+40 -0)
diff --git a/src/java/azkaban/jobExecutor/RubyJob.java b/src/java/azkaban/jobExecutor/RubyJob.java
new file mode 100644
index 0000000..7f7febc
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/RubyJob.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+import com.google.common.collect.ImmutableSet;
+
+
+public class RubyJob extends LongArgJob {
+
+ private static final String RUBY_BINARY_KEY = "ruby";
+ private static final String SCRIPT_KEY = "script";
+
+ public RubyJob(Props props, Logger log) {
+ super(new String[]{props.getString(RUBY_BINARY_KEY, "ruby"),
+ props.getString(SCRIPT_KEY)},
+ props,
+ log,
+ ImmutableSet.of(RUBY_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
+ }
+
+
+
+}
src/java/azkaban/jobExecutor/ScriptJob.java 46(+46 -0)
diff --git a/src/java/azkaban/jobExecutor/ScriptJob.java b/src/java/azkaban/jobExecutor/ScriptJob.java
new file mode 100644
index 0000000..1879afe
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/ScriptJob.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+
+import azkaban.utils.Props;
+
+/**
+ * A script job issues a command of the form
+ * [EXECUTABLE] [SCRIPT] --key1 val1 ... --key2 val2
+ * executable -- the interpretor command to execute
+ * script -- the script to pass in (requried)
+ *
+ * @author jkreps
+ *
+ */
+public class ScriptJob extends LongArgJob {
+
+ private static final String DEFAULT_EXECUTABLE_KEY = "executable";
+ private static final String SCRIPT_KEY = "script";
+
+ public ScriptJob(Props props, Logger log) {
+ super(new String[] {props.getString(DEFAULT_EXECUTABLE_KEY), props.getString(SCRIPT_KEY)},
+ props,
+ log,
+ ImmutableSet.of(DEFAULT_EXECUTABLE_KEY, SCRIPT_KEY, JOB_TYPE));
+ }
+
+
+}
diff --git a/src/java/azkaban/jobExecutor/SecurePigWrapper.java b/src/java/azkaban/jobExecutor/SecurePigWrapper.java
new file mode 100644
index 0000000..3010089
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/SecurePigWrapper.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2011 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+import org.apache.pig.Main;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+
+import static azkaban.jobExecutor.utils.SecurityUtils.getProxiedUser;
+
+public class SecurePigWrapper {
+
+ public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
+ public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
+
+ public static void main(final String[] args) throws IOException, InterruptedException {
+ final Logger logger = Logger.getRootLogger();
+ final Properties p = System.getProperties();
+ final Configuration conf = new Configuration();
+
+ getProxiedUser(p, logger, conf).doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ prefetchToken();
+ Main.main(args);
+ return null;
+ }
+
+ // For Pig jobs that need to do extra communication with the JobTracker,
+ // it's necessary to pre-fetch a token and include it in the credentials
+ // cache
+ private void prefetchToken() throws InterruptedException, IOException {
+ String shouldPrefetch = p.getProperty(OBTAIN_BINARY_TOKEN);
+ if(shouldPrefetch != null && shouldPrefetch.equals("true") ) {
+ logger.info("Pre-fetching token");
+ Job job = new Job(conf, "totally phony, extremely fake, not real job");
+
+ JobConf jc = new JobConf(conf);
+ JobClient jobClient = new JobClient(jc);
+ logger.info("Pre-fetching: Got new JobClient: " + jc);
+ Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("hi"));
+ job.getCredentials().addToken(new Text("howdy"), mrdt);
+
+ File temp = File.createTempFile("mr-azkaban", ".token");
+ temp.deleteOnExit();
+
+ FileOutputStream fos = null;
+ DataOutputStream dos = null;
+ try {
+ fos = new FileOutputStream(temp);
+ dos = new DataOutputStream(fos);
+ job.getCredentials().writeTokenStorageToStream(dos);
+ } finally {
+ if(dos != null) {
+ dos.close();
+ }
+ if(fos != null) {
+ fos.close();
+ }
+ }
+ logger.info("Setting " + MAPREDUCE_JOB_CREDENTIALS_BINARY + " to " + temp.getAbsolutePath());
+ System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, temp.getAbsolutePath());
+ } else {
+ logger.info("Not pre-fetching token");
+ }
+ }
+ });
+
+ }
+}
+
diff --git a/src/java/azkaban/jobExecutor/utils/InitErrorJob.java b/src/java/azkaban/jobExecutor/utils/InitErrorJob.java
new file mode 100644
index 0000000..1766287
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/InitErrorJob.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor.utils;
+
+import org.apache.log4j.Logger;
+
+import azkaban.jobExecutor.AbstractJob;
+
+/**
+ * this job is used to throw out exception caught in initialization stage
+ *
+ * @author lguo
+ *
+ */
+public class InitErrorJob extends AbstractJob
+{
+
+ private Exception exception;
+
+ public InitErrorJob (String id, Exception e) {
+ super(id, Logger.getLogger(AbstractJob.class));
+ exception = e;
+ }
+
+ @Override
+ public void run() throws Exception
+ {
+ throw exception;
+ }
+
+}
diff --git a/src/java/azkaban/jobExecutor/utils/JobExecutionException.java b/src/java/azkaban/jobExecutor/utils/JobExecutionException.java
new file mode 100644
index 0000000..ffb855d
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/JobExecutionException.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor.utils;
+
+public class JobExecutionException extends RuntimeException {
+
+ private final static long serialVersionUID = 1;
+
+ public JobExecutionException(String message) {
+ super(message);
+ }
+
+ public JobExecutionException(Throwable cause) {
+ super(cause);
+ }
+
+ public JobExecutionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
\ No newline at end of file
diff --git a/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java b/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java
new file mode 100644
index 0000000..b93196c
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor.utils;
+
+import azkaban.jobExecutor.JavaJob;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.jobExecutor.Job;
+import azkaban.jobExecutor.NoopJob;
+import azkaban.jobExecutor.PigProcessJob;
+import azkaban.jobExecutor.ProcessJob;
+import azkaban.jobExecutor.PythonJob;
+import azkaban.jobExecutor.RubyJob;
+import azkaban.jobExecutor.ScriptJob;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+import azkaban.jobExecutor.utils.JobExecutionException;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class JobWrappingFactory
+{
+ private static JobWrappingFactory jobWrappingFactory = null;
+
+ //private String _defaultType;
+ private Map<String, Class<? extends Job>> _jobToClass;
+
+ private JobWrappingFactory(final Map<String, Class<? extends Job>> jobTypeToClassMap
+ )
+ {
+ //this._defaultType = defaultType;
+ this._jobToClass = jobTypeToClassMap;
+ }
+
+ public static JobWrappingFactory getJobWrappingFactory ()
+ {
+ if(jobWrappingFactory == null)
+ {
+ jobWrappingFactory = new JobWrappingFactory(new ImmutableMap.Builder<String, Class<? extends Job>>()
+ .put("java", JavaJob.class)
+ .put("command", ProcessJob.class)
+ .put("javaprocess", JavaProcessJob.class)
+ .put("pig", PigProcessJob.class)
+ .put("propertyPusher", NoopJob.class)
+ .put("python", PythonJob.class)
+ .put("ruby", RubyJob.class)
+ .put("script", ScriptJob.class).build());
+ }
+ return jobWrappingFactory;
+ }
+
+ public void registerJobExecutors(final Map<String, Class<? extends Job>> newJobExecutors)
+ {
+ _jobToClass = newJobExecutors;
+ }
+
+ public Job buildJobExecutor(Props props, Logger logger)
+ {
+
+ Job job;
+ try {
+ String jobType = props.getString("type");
+ if (jobType == null || jobType.length() == 0) {
+ /*throw an exception when job name is null or empty*/
+ throw new JobExecutionException (
+ String.format("The 'type' parameter for job[%s] is null or empty", props, logger));
+ }
+ Class<? extends Object> executorClass = _jobToClass.get(jobType);
+
+ if (executorClass == null) {
+ throw new JobExecutionException(
+ String.format(
+ "Could not construct job[%s] of type[%s].",
+ props,
+ jobType
+ ));
+ }
+
+ job = (Job)Utils.callConstructor(executorClass, props, logger);
+
+ }
+ catch (Exception e) {
+ job = new InitErrorJob(props.getString("jobId"), e);
+ }
+
+// // wrap up job in logging proxy
+// if (jobDescriptor.getLoggerPattern() != null) {
+// job = new LoggingJob(_logDir, job, job.getId(), jobDescriptor.getLoggerPattern());
+// }
+// else {
+// job = new LoggingJob(_logDir, job, job.getId());
+// }
+
+ return job;
+ }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/jobExecutor/utils/JSONToJava.java b/src/java/azkaban/jobExecutor/utils/JSONToJava.java
new file mode 100644
index 0000000..19b9e87
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/JSONToJava.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor.utils;
+
+import com.google.common.base.Function;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class JSONToJava implements Function<JSONObject, Map<String, Object>>
+{
+ @Override
+ public Map<String, Object> apply(JSONObject jsonObject)
+ {
+ Map<String, Object> retVal = new HashMap<String, Object>();
+
+ Iterator keyIterator = jsonObject.keys();
+ while (keyIterator.hasNext()) {
+ String key = keyIterator.next().toString();
+
+ try {
+ retVal.put(key, dispatchCorrectly(jsonObject.get(key)));
+ }
+ catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return retVal;
+ }
+
+ public List<Object> apply(JSONArray jsonArray)
+ {
+ List<Object> retVal = new ArrayList<Object>(jsonArray.length());
+
+ for (int i = 0; i < jsonArray.length(); ++i) {
+ try {
+ retVal.add(dispatchCorrectly(jsonArray.get(i)));
+ }
+ catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return retVal;
+ }
+
+ public Object dispatchCorrectly(Object o)
+ {
+ if (o instanceof JSONObject) {
+ return apply((JSONObject) o);
+ }
+ else if (o instanceof JSONArray) {
+ return apply((JSONArray) o);
+ }
+ else {
+ return o.toString();
+ }
+ }
+}
diff --git a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
new file mode 100644
index 0000000..f22923d
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor.utils.process;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import azkaban.utils.CircularBuffer;
+
+import com.google.common.base.Joiner;
+
+/**
+ * A less shitty version of java.lang.Process.
+ *
+ * Output is read by seperate threads to avoid deadlock and logged to log4j
+ * loggers.
+ *
+ * @author jkreps
+ *
+ */
+public class AzkabanProcess {
+
+ private final String workingDir;
+ private final List<String> cmd;
+ private final Map<String, String> env;
+ private final Logger logger;
+ private final CountDownLatch startupLatch;
+ private final CountDownLatch completeLatch;
+ private volatile int processId;
+ private volatile Process process;
+
+ public AzkabanProcess(final List<String> cmd,
+ final Map<String, String> env, final String workingDir,
+ final Logger logger) {
+ this.cmd = cmd;
+ this.env = env;
+ this.workingDir = workingDir;
+ this.processId = -1;
+ this.startupLatch = new CountDownLatch(1);
+ this.completeLatch = new CountDownLatch(1);
+ this.logger = logger;
+ }
+
+ /**
+ * Execute this process, blocking until it has completed.
+ */
+ public void run() throws IOException {
+ if (this.isStarted() || this.isComplete()) {
+ throw new IllegalStateException(
+ "The process can only be used once.");
+ }
+
+ ProcessBuilder builder = new ProcessBuilder(cmd);
+ builder.directory(new File(workingDir));
+ builder.environment().putAll(env);
+ this.process = builder.start();
+ this.processId = processId(process);
+ if (processId == 0) {
+ logger.debug("Spawned thread with unknown process id");
+ } else {
+ logger.debug("Spawned thread with process id " + processId);
+ }
+
+ this.startupLatch.countDown();
+
+ LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(
+ process.getInputStream()), logger, Level.INFO, 30);
+ LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(
+ process.getErrorStream()), logger, Level.ERROR, 30);
+
+ outputGobbler.start();
+ errorGobbler.start();
+ int exitCode = -1;
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException e) {
+ logger.info("Process interrupted.", e);
+ }
+
+ completeLatch.countDown();
+ if (exitCode != 0) {
+ throw new ProcessFailureException(exitCode,
+ errorGobbler.getRecentLog());
+ }
+
+ // try to wait for everything to get logged out before exiting
+ outputGobbler.awaitCompletion(5000);
+ errorGobbler.awaitCompletion(5000);
+ }
+
+ /**
+ * Await the completion of this process
+ *
+ * @throws InterruptedException
+ * if the thread is interrupted while waiting.
+ */
+ public void awaitCompletion() throws InterruptedException {
+ this.completeLatch.await();
+ }
+
+ /**
+ * Await the start of this process
+ *
+ * @throws InterruptedException
+ * if the thread is interrupted while waiting.
+ */
+ public void awaitStartup() throws InterruptedException {
+ this.startupLatch.await();
+ }
+
+ /**
+ * Get the process id for this process, if it has started.
+ *
+ * @return The process id or -1 if it cannot be fetched
+ */
+ public int getProcessId() {
+ checkStarted();
+ return this.processId;
+ }
+
+ /**
+ * Attempt to kill the process, waiting up to the given time for it to die
+ *
+ * @param time
+ * The amount of time to wait
+ * @param unit
+ * The time unit
+ * @return true iff this soft kill kills the process in the given wait time.
+ */
+ public boolean softKill(final long time, final TimeUnit unit)
+ throws InterruptedException {
+ checkStarted();
+ if (processId != 0 && isStarted()) {
+ try {
+ Runtime.getRuntime().exec("kill " + processId);
+ return completeLatch.await(time, unit);
+ } catch (IOException e) {
+ logger.error("Kill attempt failed.", e);
+ }
+ return false;
+ }
+ return false;
+ }
+
+ /**
+ * Force kill this process
+ */
+ public void hardKill() {
+ checkStarted();
+ if (isRunning()) {
+ process.destroy();
+ }
+ }
+
+ /**
+ * Attempt to get the process id for this process
+ *
+ * @param process
+ * The process to get the id from
+ * @return The id of the process
+ */
+ private int processId(final java.lang.Process process) {
+ int processId = 0;
+ try {
+ Field f = process.getClass().getDeclaredField("pid");
+ f.setAccessible(true);
+
+ processId = f.getInt(process);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+
+ return processId;
+ }
+
+ /**
+ * @return true iff the process has been started
+ */
+ public boolean isStarted() {
+ return startupLatch.getCount() == 0L;
+ }
+
+ /**
+ * @return true iff the process has completed
+ */
+ public boolean isComplete() {
+ return completeLatch.getCount() == 0L;
+ }
+
+ /**
+ * @return true iff the process is currently running
+ */
+ public boolean isRunning() {
+ return isStarted() && !isComplete();
+ }
+
+ public void checkStarted() {
+ if (!isStarted()) {
+ throw new IllegalStateException("Process has not yet started.");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env
+ + ", cwd = " + workingDir + ")";
+ }
+
+ private static class LogGobbler extends Thread {
+
+ private final BufferedReader inputReader;
+ private final Logger logger;
+ private final Level loggingLevel;
+ private final CircularBuffer<String> buffer;
+
+ public LogGobbler(final Reader inputReader, final Logger logger,
+ final Level level, final int bufferLines) {
+ this.inputReader = new BufferedReader(inputReader);
+ this.logger = logger;
+ this.loggingLevel = level;
+ buffer = new CircularBuffer<String>(bufferLines);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ String line = inputReader.readLine();
+ if (line == null) {
+ return;
+ }
+
+ buffer.append(line);
+ logger.log(loggingLevel, line);
+ }
+ } catch (IOException e) {
+ logger.error("Error reading from logging stream:", e);
+ }
+ }
+
+ public void awaitCompletion(final long waitMs) {
+ try {
+ join(waitMs);
+ } catch (InterruptedException e) {
+ logger.info("I/O thread interrupted.", e);
+ }
+ }
+
+ public String getRecentLog() {
+ return Joiner.on(System.getProperty("line.separator")).join(buffer);
+ }
+
+ }
+
+}
diff --git a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
new file mode 100644
index 0000000..ec9fd54
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor.utils.process;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Helper code for building a process
+ *
+ * @author jkreps
+ */
+public class AzkabanProcessBuilder {
+
+ private List<String> cmd = new ArrayList<String>();
+ private Map<String, String> env = new HashMap<String, String>();
+ private String workingDir = System.getProperty("user.dir");
+ private Logger logger = Logger.getLogger(AzkabanProcess.class);
+ private int stdErrSnippetSize = 30;
+ private int stdOutSnippetSize = 30;
+
+ public AzkabanProcessBuilder(String...command) {
+ addArg(command);
+ }
+
+ public AzkabanProcessBuilder addArg(String...command) {
+ for(String c: command)
+ cmd.add(c);
+ return this;
+ }
+
+ public AzkabanProcessBuilder setWorkingDir(String dir) {
+ this.workingDir = dir;
+ return this;
+ }
+
+ public AzkabanProcessBuilder setWorkingDir(File f) {
+ return setWorkingDir(f.getAbsolutePath());
+ }
+
+ public String getWorkingDir() {
+ return this.workingDir;
+ }
+
+ public AzkabanProcessBuilder addEnv(String variable, String value) {
+ env.put(variable, value);
+ return this;
+ }
+
+ public AzkabanProcessBuilder setEnv(Map<String, String> m) {
+ this.env = m;
+ return this;
+ }
+
+ public Map<String, String> getEnv() {
+ return this.env;
+ }
+
+ public AzkabanProcessBuilder setStdErrorSnippetSize(int size) {
+ this.stdErrSnippetSize = size;
+ return this;
+ }
+
+ public AzkabanProcessBuilder setStdOutSnippetSize(int size) {
+ this.stdOutSnippetSize = size;
+ return this;
+ }
+
+ public AzkabanProcessBuilder setLogger(Logger logger) {
+ this.logger = logger;
+ return this;
+ }
+
+ public AzkabanProcess build() {
+ return new AzkabanProcess(cmd, env, workingDir, logger);
+ }
+
+ public List<String> getCommand() {
+ return this.cmd;
+ }
+
+ public String getCommandString() {
+ return Joiner.on(" ").join(getCommand());
+ }
+
+ @Override
+ public String toString() {
+ return "ProcessBuilder(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env + ", cwd = " + workingDir + ")";
+ }
+}
diff --git a/src/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java b/src/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
new file mode 100644
index 0000000..d146cf9
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor.utils.process;
+
+public class ProcessFailureException extends RuntimeException {
+
+ private static final long serialVersionUID = 1;
+
+ private final int exitCode;
+ private final String logSnippet;
+
+ public ProcessFailureException(int exitCode, String logSnippet) {
+ this.exitCode = exitCode;
+ this.logSnippet = logSnippet;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public String getLogSnippet() {
+ return this.logSnippet;
+ }
+
+}
src/java/azkaban/jobExecutor/utils/PropsUtils.java 174(+174 -0)
diff --git a/src/java/azkaban/jobExecutor/utils/PropsUtils.java b/src/java/azkaban/jobExecutor/utils/PropsUtils.java
new file mode 100644
index 0000000..b879074
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/PropsUtils.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import azkaban.utils.Props;
+import azkaban.utils.UndefinedPropertyException;
+//import azkaban.jobExecutor.jobs.dependency.ExecutableFlow;
+import azkaban.executor.ExecutableFlow;
+import org.joda.time.DateTime;
+
+public class PropsUtils {
+
+ /**
+ * Load job schedules from the given directories ] * @param dir The
+ * directory to look in
+ *
+ * @param suffixes File suffixes to load
+ * @return The loaded set of schedules
+ */
+ public static Props loadPropsInDir(File dir, String... suffixes) {
+ return loadPropsInDir(null, dir, suffixes);
+ }
+
+ /**
+ * Load job schedules from the given directories
+ *
+ * @param parent The parent properties for these properties
+ * @param dir The directory to look in
+ * @param suffixes File suffixes to load
+ * @return The loaded set of schedules
+ */
+ public static Props loadPropsInDir(Props parent, File dir, String... suffixes) {
+ try {
+ Props props = new Props(parent);
+ File[] files = dir.listFiles();
+ if(files != null) {
+ for(File f: files) {
+ if(f.isFile() && endsWith(f, suffixes)) {
+ props.putAll(new Props(null, f.getAbsolutePath()));
+ }
+ }
+ }
+ return props;
+ } catch(IOException e) {
+ throw new RuntimeException("Error loading properties.", e);
+ }
+ }
+
+ /**
+ * Load job schedules from the given directories
+ *
+ * @param dirs The directories to check for properties
+ * @param suffixes The suffixes to load
+ * @return The properties
+ */
+ public static Props loadPropsInDirs(List<File> dirs, String... suffixes) {
+ Props props = new Props();
+ for(File dir: dirs) {
+ props.putLocal(loadPropsInDir(dir, suffixes));
+ }
+ return props;
+ }
+
+ /**
+ * Load properties from the given path
+ *
+ * @param jobPath The path to load from
+ * @param props The parent properties for loaded properties
+ * @param suffixes The suffixes of files to load
+ */
+ public static void loadPropsBySuffix(File jobPath, Props props, String... suffixes) {
+ try {
+ if(jobPath.isDirectory()) {
+ File[] files = jobPath.listFiles();
+ if(files != null) {
+ for(File file: files)
+ loadPropsBySuffix(file, props, suffixes);
+ }
+ } else if(endsWith(jobPath, suffixes)) {
+ props.putAll(new Props(null, jobPath.getAbsolutePath()));
+ }
+ } catch(IOException e) {
+ throw new RuntimeException("Error loading schedule properties.", e);
+ }
+ }
+
+ public static boolean endsWith(File file, String... suffixes) {
+ for(String suffix: suffixes)
+ if(file.getName().endsWith(suffix))
+ return true;
+ return false;
+ }
+
+ private static final Pattern VARIABLE_PATTERN = Pattern.compile("\\$\\{([a-zA-Z_.0-9]+)\\}");
+
+ public static Props resolveProps(Props props) {
+ Props resolvedProps = new Props();
+
+ for(String key : props.getKeySet()) {
+ StringBuffer replaced = new StringBuffer();
+ String value = props.get(key);
+ Matcher matcher = VARIABLE_PATTERN.matcher(value);
+ while(matcher.find()) {
+ String variableName = matcher.group(1);
+
+ if (variableName.equals(key)) {
+ throw new IllegalArgumentException(
+ String.format("Circular property definition starting from property[%s]", key)
+ );
+ }
+
+ String replacement = props.get(variableName);
+ if(replacement == null)
+ throw new UndefinedPropertyException("Could not find variable substitution for variable '"
+ + variableName + "' in key '" + key + "'.");
+
+ replacement = replacement.replaceAll("\\\\", "\\\\\\\\");
+ replacement = replacement.replaceAll("\\$", "\\\\\\$");
+
+ matcher.appendReplacement(replaced, replacement);
+ matcher.appendTail(replaced);
+
+ value = replaced.toString();
+ replaced = new StringBuffer();
+ matcher = VARIABLE_PATTERN.matcher(value);
+ }
+ matcher.appendTail(replaced);
+ resolvedProps.put(key, replaced.toString());
+ }
+
+ return resolvedProps;
+ }
+
+ public static Props produceParentProperties(final ExecutableFlow flow) {
+ Props parentProps = new Props();
+
+ parentProps.put("azkaban.flow.id", flow.getFlowId());
+ parentProps.put("azkaban.flow.uuid", UUID.randomUUID().toString());
+
+ DateTime loadTime = new DateTime();
+
+ parentProps.put("azkaban.flow.start.timestamp", loadTime.toString());
+ parentProps.put("azkaban.flow.start.year", loadTime.toString("yyyy"));
+ parentProps.put("azkaban.flow.start.month", loadTime.toString("MM"));
+ parentProps.put("azkaban.flow.start.day", loadTime.toString("dd"));
+ parentProps.put("azkaban.flow.start.hour", loadTime.toString("HH"));
+ parentProps.put("azkaban.flow.start.minute", loadTime.toString("mm"));
+ parentProps.put("azkaban.flow.start.seconds", loadTime.toString("ss"));
+ parentProps.put("azkaban.flow.start.milliseconds", loadTime.toString("SSS"));
+ parentProps.put("azkaban.flow.start.timezone", loadTime.toString("ZZZZ"));
+ return parentProps;
+ }
+}
diff --git a/src/java/azkaban/jobExecutor/utils/SecurityUtils.java b/src/java/azkaban/jobExecutor/utils/SecurityUtils.java
new file mode 100644
index 0000000..92f7011
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/SecurityUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2011 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor.utils;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class SecurityUtils {
+ // Secure Hadoop proxy user params
+ public static final String ENABLE_PROXYING = "azkaban.should.proxy"; // boolean
+ public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
+ public static final String PROXY_USER = "proxy.user";
+ public static final String TO_PROXY = "user.to.proxy";
+
+ private static UserGroupInformation loginUser = null;
+
+ /**
+ * Create a proxied user based on the explicit user name, taking other parameters
+ * necessary from properties file.
+ */
+ public static synchronized UserGroupInformation getProxiedUser(String toProxy, Properties prop, Logger log, Configuration conf) throws IOException {
+ if(toProxy == null) {
+ throw new IllegalArgumentException("toProxy can't be null");
+ }
+ if(conf == null) {
+ throw new IllegalArgumentException("conf can't be null");
+ }
+
+ if (loginUser == null) {
+ log.info("No login user. Creating login user");
+ String keytab = verifySecureProperty(prop, PROXY_KEYTAB_LOCATION, log);
+ String proxyUser = verifySecureProperty(prop, PROXY_USER, log);
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation.loginUserFromKeytab(proxyUser, keytab);
+ loginUser = UserGroupInformation.getLoginUser();
+ log.info("Logged in with user " + loginUser);
+ } else {
+ log.info("loginUser (" + loginUser + ") already created, refreshing tgt.");
+ loginUser.checkTGTAndReloginFromKeytab();
+ }
+
+ return UserGroupInformation.createProxyUser(toProxy, loginUser);
+ }
+
+ /**
+ * Create a proxied user, taking all parameters, including which user to proxy
+ * from provided Properties.
+ */
+ public static UserGroupInformation getProxiedUser(Properties prop, Logger log, Configuration conf) throws IOException {
+ String toProxy = verifySecureProperty(prop, TO_PROXY, log);
+ return getProxiedUser(toProxy, prop, log, conf);
+ }
+
+ public static String verifySecureProperty(Properties properties, String s, Logger l) throws IOException {
+ String value = properties.getProperty(s);
+
+ if(value == null) throw new IOException(s + " not set in properties. Cannot use secure proxy");
+ l.info("Secure proxy configuration: Property " + s + " = " + value);
+ return value;
+ }
+
+ public static boolean shouldProxy(Properties prop) {
+ String shouldProxy = prop.getProperty(ENABLE_PROXYING);
+
+ return shouldProxy != null && shouldProxy.equals("true");
+ }
+}
diff --git a/src/java/azkaban/jobExecutor/utils/StringUtils.java b/src/java/azkaban/jobExecutor/utils/StringUtils.java
new file mode 100644
index 0000000..00a57f5
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/StringUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor.utils;
+
+public class StringUtils
+{
+ public static final char SINGLE_QUOTE = '\'';
+ public static final char DOUBLE_QUOTE = '\"';
+
+ public static String shellQuote(String s, char quoteCh)
+ {
+ StringBuffer buf = new StringBuffer(s.length()+2);
+
+ buf.append(quoteCh);
+ for (int i = 0; i < s.length(); i++) {
+ final char ch = s.charAt(i);
+ if (ch == quoteCh) {
+ buf.append('\\');
+ }
+ buf.append(ch);
+ }
+ buf.append(quoteCh);
+
+ return buf.toString();
+ }
+}
diff --git a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
new file mode 100644
index 0000000..b61d8a7
--- /dev/null
+++ b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
@@ -0,0 +1,360 @@
+package azkaban.scheduler;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import azkaban.user.User;
+import azkaban.utils.Props;
+import azkaban.utils.JSONUtils;
+
+/**
+ * Loads the schedule from a schedule file that is JSON like. The format would be as follows:
+ *
+ * {
+ * schedule: [
+ * {
+ * "project": "<project>",
+ * "user": "<user>",
+ * "flow": "<flow>",
+ * "time": "<time>",
+ * "recurrence":"<period>",
+ * "dependency":<boolean>
+ * }
+ * ]
+ * }
+ *
+ *
+ * @author rpark
+ *
+ */
+public class LocalFileScheduleLoader implements ScheduleLoader {
+ private static final String SCHEDULEID = "schedule";
+ private static final String USER = "user";
+ private static final String USERSUBMIT = "userSubmit";
+ private static final String SUBMITTIME = "submitTime";
+ private static final String FIRSTSCHEDTIME = "firstSchedTime";
+
+ private static final String SCHEDULE = "schedule";
+ private static final String NEXTEXECTIME = "nextExecTime";
+ private static final String TIMEZONE = "timezone";
+ private static final String RECURRENCE = "recurrence";
+
+ private static final String SCHEDULESTATUS = "schedulestatus";
+
+ private static DateTimeFormatter FILE_DATEFORMAT = DateTimeFormat.forPattern("yyyy-MM-dd.HH.mm.ss.SSS");
+ private static Logger logger = Logger.getLogger(LocalFileScheduleLoader.class);
+
+ private File basePath;
+ private File scheduleFile;
+ private File backupScheduleFile;
+
+ public LocalFileScheduleLoader(Props props) throws IOException {
+
+
+ basePath = new File(props.getString("schedule.directory"));
+ if (!basePath.exists()) {
+ logger.info("Schedule directory " + basePath + " not found.");
+ if (basePath.mkdirs()) {
+ logger.info("Schedule directory " + basePath + " created.");
+ }
+ else {
+ throw new RuntimeException("Schedule directory " + basePath + " does not exist and cannot be created.");
+ }
+ }
+
+ scheduleFile = new File(basePath, "schedule");
+ if(!scheduleFile.exists() || scheduleFile.isDirectory()) {
+ logger.info("Schedule file " + scheduleFile + " not found.");
+ if(scheduleFile.createNewFile() && scheduleFile.canRead() && scheduleFile.canWrite()) {
+ logger.info("Schedule file " + scheduleFile + " created.");
+ }
+ else {
+ throw new RuntimeException("Schedule file " + scheduleFile + " cannot be created.");
+ }
+ }
+
+ backupScheduleFile = new File(basePath, "backup");
+ if(!backupScheduleFile.exists() || backupScheduleFile.isDirectory()) {
+ logger.info("Backup schedule file " + backupScheduleFile + " not found.");
+ if(backupScheduleFile.createNewFile() && backupScheduleFile.canRead() && backupScheduleFile.canWrite()) {
+ logger.info("Backup schedule file " + backupScheduleFile + " created.");
+ }
+ else {
+ throw new RuntimeException("Backup schedule file " + backupScheduleFile + " cannot be created.");
+ }
+ }
+
+ }
+
+
+ @Override
+ public List<ScheduledFlow> loadSchedule() {
+ if (scheduleFile != null && backupScheduleFile != null) {
+ if (scheduleFile.exists()) {
+ if(scheduleFile.length() == 0)
+ return new ArrayList<ScheduledFlow>();
+ return loadFromFile(scheduleFile);
+ }
+ else if (backupScheduleFile.exists()) {
+ backupScheduleFile.renameTo(scheduleFile);
+ return loadFromFile(scheduleFile);
+ }
+ else {
+ logger.warn("No schedule files found looking for " + scheduleFile.getAbsolutePath());
+ }
+ }
+
+ return new ArrayList<ScheduledFlow>();
+ }
+
+ @Override
+ public void saveSchedule(List<ScheduledFlow> schedule) {
+ if (scheduleFile != null && backupScheduleFile != null) {
+ // Delete the backup if it exists and a current file exists.
+ if (backupScheduleFile.exists() && scheduleFile.exists()) {
+ backupScheduleFile.delete();
+ }
+
+ // Rename the schedule if it exists.
+ if (scheduleFile.exists()) {
+ scheduleFile.renameTo(backupScheduleFile);
+ }
+
+ HashMap<String,Object> obj = new HashMap<String,Object>();
+ ArrayList<Object> schedules = new ArrayList<Object>();
+ obj.put(SCHEDULE, schedules);
+ //Write out schedule.
+
+ for (ScheduledFlow schedFlow : schedule) {
+ schedules.add(createJSONObject(schedFlow));
+ }
+
+ try {
+ FileWriter writer = new FileWriter(scheduleFile);
+ writer.write(JSONUtils.toJSONString(obj, 4));
+ writer.flush();
+ } catch (Exception e) {
+ throw new RuntimeException("Error saving flow file", e);
+ }
+ logger.info("schedule saved");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<ScheduledFlow> loadFromFile(File schedulefile)
+ {
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new FileReader(schedulefile));
+ } catch (FileNotFoundException e) {
+ // TODO Auto-generated catch block
+ logger.error("Error loading schedule file ", e);
+ }
+ List<ScheduledFlow> scheduleList = new ArrayList<ScheduledFlow>();
+
+ HashMap<String, Object> schedule;
+ try {
+ //TODO handle first time empty schedule file
+ schedule = (HashMap<String,Object>)JSONUtils.fromJSONStream(reader);
+ } catch (Exception e) {
+ //schedule = loadLegacyFile(schedulefile);
+ logger.error("Error parsing the schedule file", e);
+ throw new RuntimeException("Error parsing the schedule file", e);
+ }
+ finally {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ }
+ }
+
+ ArrayList<Object> array = (ArrayList<Object>)schedule.get("schedule");
+ for (int i = 0; i < array.size(); ++i) {
+ HashMap<String, Object> schedItem = (HashMap<String, Object>)array.get(i);
+ ScheduledFlow sched = createScheduledFlow(schedItem);
+ if (sched != null) {
+ scheduleList.add(sched);
+ }
+ }
+
+ return scheduleList;
+ }
+
+ private ScheduledFlow createScheduledFlow(HashMap<String, Object> obj) {
+ String scheduleId = (String)obj.get(SCHEDULEID);
+ String user = (String)obj.get(USER);
+ String userSubmit = (String)obj.get(USERSUBMIT);
+ String submitTimeRaw = (String)obj.get(SUBMITTIME);
+ String firstSchedTimeRaw = (String)obj.get(FIRSTSCHEDTIME);
+ String nextExecTimeRaw = (String)obj.get(NEXTEXECTIME);
+ String timezone = (String)obj.get(TIMEZONE);
+ String recurrence = (String)obj.get(RECURRENCE);
+// String scheduleStatus = (String)obj.get(SCHEDULESTATUS);
+
+ DateTime nextExecTime = FILE_DATEFORMAT.parseDateTime(nextExecTimeRaw);
+ DateTime submitTime = FILE_DATEFORMAT.parseDateTime(submitTimeRaw);
+ DateTime firstSchedTime = FILE_DATEFORMAT.parseDateTime(firstSchedTimeRaw);
+
+ if (nextExecTime == null) {
+ logger.error("No next execution time has been set");
+ return null;
+ }
+
+ if (submitTime == null) {
+ logger.error("No submitTime has been set");
+ }
+
+ if(firstSchedTime == null){
+ logger.error("No first scheduled time has been set");
+ }
+
+ if (timezone != null) {
+ nextExecTime = nextExecTime.withZoneRetainFields(DateTimeZone.forID(timezone));
+ }
+
+ ReadablePeriod period = null;
+ if (recurrence != null) {
+ period = parsePeriodString(scheduleId, recurrence);
+ }
+
+ ScheduledFlow scheduledFlow = new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExecTime, period);
+ if (scheduledFlow.updateTime()) {
+ return scheduledFlow;
+ }
+
+ logger.info("Removed " + scheduleId + " off out of scheduled. It is not recurring.");
+ return null;
+ }
+
+ private HashMap<String,Object> createJSONObject(ScheduledFlow flow) {
+ HashMap<String,Object> object = new HashMap<String,Object>();
+ object.put(SCHEDULEID, flow.getScheduleId());
+ object.put(USER, flow.getUser());
+ object.put(USERSUBMIT, flow.getUserSubmit());
+
+ object.put(SUBMITTIME, FILE_DATEFORMAT.print(flow.getSubmitTime()));
+ object.put(FIRSTSCHEDTIME, FILE_DATEFORMAT.print(flow.getFirstSchedTime()));
+
+ object.put(NEXTEXECTIME, FILE_DATEFORMAT.print(flow.getNextExecTime()));
+ object.put(TIMEZONE, flow.getNextExecTime().getZone().getID());
+ object.put(RECURRENCE, createPeriodString(flow.getPeriod()));
+// object.put(SCHEDULESTATUS, flow.getSchedStatus());
+
+ return object;
+ }
+
+ private ReadablePeriod parsePeriodString(String scheduleId, String periodStr)
+ {
+ ReadablePeriod period;
+ char periodUnit = periodStr.charAt(periodStr.length() - 1);
+ if (periodUnit == 'n') {
+ return null;
+ }
+
+ int periodInt = Integer.parseInt(periodStr.substring(0, periodStr.length() - 1));
+ switch (periodUnit) {
+ case 'd':
+ period = Days.days(periodInt);
+ break;
+ case 'h':
+ period = Hours.hours(periodInt);
+ break;
+ case 'm':
+ period = Minutes.minutes(periodInt);
+ break;
+ case 's':
+ period = Seconds.seconds(periodInt);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid schedule period unit '" + periodUnit + "' for flow " + scheduleId);
+ }
+
+ return period;
+ }
+
+ private String createPeriodString(ReadablePeriod period)
+ {
+ String periodStr = "n";
+
+ if (period == null) {
+ return "n";
+ }
+
+ if (period.get(DurationFieldType.days()) > 0) {
+ int days = period.get(DurationFieldType.days());
+ periodStr = days + "d";
+ }
+ else if (period.get(DurationFieldType.hours()) > 0) {
+ int hours = period.get(DurationFieldType.hours());
+ periodStr = hours + "h";
+ }
+ else if (period.get(DurationFieldType.minutes()) > 0) {
+ int minutes = period.get(DurationFieldType.minutes());
+ periodStr = minutes + "m";
+ }
+ else if (period.get(DurationFieldType.seconds()) > 0) {
+ int seconds = period.get(DurationFieldType.seconds());
+ periodStr = seconds + "s";
+ }
+
+ return periodStr;
+ }
+
+// private HashMap<String,Object> loadLegacyFile(File schedulefile) {
+// Props schedule = null;
+// try {
+// schedule = new Props(null, schedulefile.getAbsolutePath());
+// } catch(Exception e) {
+// throw new RuntimeException("Error loading schedule from " + schedulefile);
+// }
+//
+// ArrayList<Object> flowScheduleList = new ArrayList<Object>();
+// for(String key: schedule.getKeySet()) {
+// HashMap<String,Object> scheduledMap = parseScheduledFlow(key, schedule.get(key));
+// if (scheduledMap == null) {
+// flowScheduleList.add(scheduledMap);
+// }
+// }
+//
+// HashMap<String,Object> scheduleMap = new HashMap<String,Object>();
+// scheduleMap.put(SCHEDULE, flowScheduleList );
+//
+// return scheduleMap;
+// }
+
+// private HashMap<String,Object> parseScheduledFlow(String name, String flow) {
+// String[] pieces = flow.split("\\s+");
+//
+// if(pieces.length != 3) {
+// logger.warn("Error loading schedule from file " + name);
+// return null;
+// }
+//
+// HashMap<String,Object> scheduledFlow = new HashMap<String,Object>();
+// scheduledFlow.put(PROJECTID, name);
+// scheduledFlow.put(TIME, pieces[0]);
+// scheduledFlow.put(RECURRENCE, pieces[1]);
+// Boolean dependency = Boolean.parseBoolean(pieces[2]);
+//
+// return scheduledFlow;
+// }
+}
\ No newline at end of file
src/java/azkaban/scheduler/ScheduledFlow.java 263(+263 -0)
diff --git a/src/java/azkaban/scheduler/ScheduledFlow.java b/src/java/azkaban/scheduler/ScheduledFlow.java
new file mode 100644
index 0000000..38c94b3
--- /dev/null
+++ b/src/java/azkaban/scheduler/ScheduledFlow.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.scheduler;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.user.User;
+import azkaban.utils.Utils;
+
+
+/**
+ * Schedule for a job instance. This is decoupled from the execution.
+ *
+ * @author Richard Park
+ *
+ */
+public class ScheduledFlow {
+
+ //use projectId.flowId to form a unique scheduleId
+ private final String scheduleId;
+
+ private final ReadablePeriod period;
+ private DateTime nextExecTime;
+ private final String user;
+ private final String userSubmit;
+ private final DateTime submitTime;
+ private final DateTime firstSchedTime;
+// private SchedStatus schedStatus;
+
+ public enum SchedStatus {
+ LASTSUCCESS ("lastsuccess"),
+ LASTFAILED ("lastfailed"),
+ LASTPAUSED ("lastpaused");
+
+ private final String status;
+ SchedStatus(String status){
+ this.status = status;
+ }
+ private String status(){
+ return this.status;
+ }
+ }
+
+ /**
+ * Constructor
+ *
+ * @param jobName Unique job name
+ * @param nextExecution The next execution time
+ * @param ignoreDependency
+ */
+ public ScheduledFlow(String scheduleId,
+ String user,
+ String userSubmit,
+ DateTime submitTime,
+ DateTime firstSchedTime,
+ DateTime nextExecution) {
+ this(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExecution, null);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param jobId
+ * @param nextExecution
+ * @param period
+ * @param ignoreDependency
+ */
+ public ScheduledFlow(String scheduleId,
+ String user,
+ String userSubmit,
+ DateTime submitTime,
+ DateTime firstSchedTime,
+ DateTime nextExecution,
+ ReadablePeriod period) {
+ super();
+ this.scheduleId = Utils.nonNull(scheduleId);
+ this.user = user;
+ this.userSubmit = userSubmit;
+ this.submitTime = submitTime;
+ this.firstSchedTime = firstSchedTime;
+ this.period = period;
+ this.nextExecTime = Utils.nonNull(nextExecution);
+// this.schedStatus = SchedStatus.LASTSUCCESS;
+ }
+
+ public ScheduledFlow(String scheduleId,
+ String user,
+ String userSubmit,
+ DateTime submitTime,
+ DateTime firstSchedTime,
+ ReadablePeriod period) {
+ super();
+ this.scheduleId = Utils.nonNull(scheduleId);
+ this.user = user;
+ this.userSubmit = userSubmit;
+ this.submitTime = submitTime;
+ this.firstSchedTime = firstSchedTime;
+ this.period = period;
+ this.nextExecTime = new DateTime();
+// this.schedStatus = SchedStatus.LASTSUCCESS;
+ }
+
+ public ScheduledFlow(String scheduleId,
+ String user,
+ String userSubmit,
+ DateTime submitTime,
+ DateTime firstSchedTime) {
+ super();
+ this.scheduleId = Utils.nonNull(scheduleId);
+ this.user = user;
+ this.userSubmit = userSubmit;
+ this.submitTime = submitTime;
+ this.firstSchedTime = firstSchedTime;
+ this.period = null;
+ this.nextExecTime = new DateTime();
+// this.schedStatus = SchedStatus.LASTSUCCESS;
+ }
+
+// public SchedStatus getSchedStatus() {
+// return this.schedStatus;
+// }
+//
+// public void setSchedStatus(SchedStatus schedStatus) {
+// this.schedStatus = schedStatus;
+// }
+
+ /**
+ * Updates the time to a future time after 'now' that matches the period description.
+ *
+ * @return
+ */
+ public boolean updateTime() {
+ if (nextExecTime.isAfterNow()) {
+ return true;
+ }
+
+ if (period != null) {
+ DateTime other = getNextRuntime(nextExecTime, period);
+
+ this.nextExecTime = other;
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Calculates the next runtime by adding the period.
+ *
+ * @param scheduledDate
+ * @param period
+ * @return
+ */
+ private DateTime getNextRuntime(DateTime scheduledDate, ReadablePeriod period)
+ {
+ DateTime now = new DateTime();
+ DateTime date = new DateTime(scheduledDate);
+ int count = 0;
+ while (!now.isBefore(date)) {
+ if (count > 100000) {
+ throw new IllegalStateException("100000 increments of period did not get to present time.");
+ }
+
+ if (period == null) {
+ break;
+ }
+ else {
+ date = date.plus(period);
+ }
+
+ count += 1;
+ }
+
+ return date;
+ }
+
+ /**
+ * Returns the unique id of the job to be run.
+ * @return
+ */
+
+ /**
+ * Returns true if the job recurrs in the future
+ * @return
+ */
+ public boolean isRecurring() {
+ return this.period != null;
+ }
+
+ /**
+ * Returns the recurrance period. Or null if not applicable
+ * @return
+ */
+ public ReadablePeriod getPeriod() {
+ return period;
+ }
+
+ public DateTime getFirstSchedTime() {
+ return firstSchedTime;
+ }
+
+ /**
+ * Returns the next scheduled execution
+ * @return
+ */
+ public DateTime getNextExecTime() {
+ return nextExecTime;
+ }
+
+ public String getUserSubmit() {
+ return userSubmit;
+ }
+
+ public DateTime getSubmitTime() {
+ return submitTime;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ScheduledFlow{" +
+// "scheduleStatus=" + schedStatus +
+ "nextExecTime=" + nextExecTime +
+ ", period=" + period +
+ ", firstSchedTime=" + firstSchedTime +
+ ", submitTime=" + submitTime +
+ ", userSubmit=" + userSubmit +
+ ", user=" + user +
+ ", scheduleId='" + scheduleId + '\'' +
+ '}';
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getScheduleId() {
+ return scheduleId;
+ }
+
+ public String getFlowId(){
+ return this.scheduleId.split("\\.")[1];
+ }
+
+ public String getProjectId(){
+ return this.scheduleId.split("\\.")[0];
+ }
+}
diff --git a/src/java/azkaban/scheduler/ScheduleLoader.java b/src/java/azkaban/scheduler/ScheduleLoader.java
new file mode 100644
index 0000000..6997264
--- /dev/null
+++ b/src/java/azkaban/scheduler/ScheduleLoader.java
@@ -0,0 +1,11 @@
+package azkaban.scheduler;
+
+import java.util.List;
+
+
+public interface ScheduleLoader {
+ public void saveSchedule(List<ScheduledFlow> schedule);
+
+ public List<ScheduledFlow> loadSchedule();
+
+}
\ No newline at end of file
src/java/azkaban/scheduler/ScheduleManager.java 410(+410 -0)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
new file mode 100644
index 0000000..12c9184
--- /dev/null
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -0,0 +1,410 @@
+package azkaban.scheduler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.PeriodFormat;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.flow.Flow;
+import azkaban.jobExecutor.utils.JobExecutionException;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.scheduler.ScheduledFlow.SchedStatus;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.utils.Props;
+
+
+
+/**
+ * The ScheduleManager stores and executes the schedule. It uses a single thread instead
+ * and waits until correct loading time for the flow. It will not remove the flow from the
+ * schedule when it is run, which can potentially allow the flow to and overlap each other.
+ *
+ * @author Richard
+ */
+public class ScheduleManager {
+ private static Logger logger = Logger.getLogger(ScheduleManager.class);
+
+ private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
+ private ScheduleLoader loader;
+ private Map<String, ScheduledFlow> scheduleIDMap = new LinkedHashMap<String, ScheduledFlow>();
+ private final ScheduleRunner runner;
+ private final ExecutorManager executorManager;
+ private final ProjectManager projectManager;
+
+ /**
+ * Give the schedule manager a loader class that will properly load the schedule.
+ *
+ * @param loader
+ */
+ public ScheduleManager(
+ ExecutorManager executorManager,
+ ProjectManager projectManager,
+ ScheduleLoader loader)
+ {
+ this.executorManager = executorManager;
+ this.projectManager = projectManager;
+ this.loader = loader;
+ this.runner = new ScheduleRunner();
+
+ List<ScheduledFlow> scheduleList = loader.loadSchedule();
+ for (ScheduledFlow flow: scheduleList) {
+ internalSchedule(flow);
+ }
+
+ this.runner.start();
+ }
+
+ /**
+ * Shutdowns the scheduler thread. After shutdown, it may not be safe to use it again.
+ */
+ public void shutdown() {
+ this.runner.shutdown();
+ }
+
+ /**
+ * Retrieves a copy of the list of schedules.
+ *
+ * @return
+ */
+ public synchronized List<ScheduledFlow> getSchedule() {
+ return runner.getSchedule();
+ }
+
+ /**
+ * Returns the scheduled flow for the flow name
+ *
+ * @param id
+ * @return
+ */
+ public ScheduledFlow getSchedule(String scheduleId) {
+ return scheduleIDMap.get(scheduleId);
+ }
+
+ /**
+ * Removes the flow from the schedule if it exists.
+ *
+ * @param id
+ */
+ public synchronized void removeScheduledFlow(String scheduleId) {
+ ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+ scheduleIDMap.remove(scheduleId);
+ runner.removeScheduledFlow(flow);
+
+ loader.saveSchedule(getSchedule());
+ }
+
+// public synchronized void pauseScheduledFlow(String scheduleId){
+// try{
+// ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+// flow.setSchedStatus(SchedStatus.LASTPAUSED);
+// loader.saveSchedule(getSchedule());
+// }
+// catch (Exception e) {
+// throw new RuntimeException("Error pausing a schedule " + scheduleId);
+// }
+// }
+//
+// public synchronized void resumeScheduledFlow(String scheduleId){
+// try {
+// ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+// flow.setSchedStatus(SchedStatus.LASTSUCCESS);
+// loader.saveSchedule(getSchedule());
+// }
+// catch (Exception e) {
+// throw new RuntimeException("Error resuming a schedule " + scheduleId);
+// }
+// }
+
+ public void schedule(final String scheduleId,
+ final String user,
+ final String userSubmit,
+ final DateTime submitTime,
+ final DateTime firstSchedTime,
+ final ReadablePeriod period
+ ) {
+ //TODO: should validate projectId and flowId?
+ logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime)
+ + " with a period of " + PeriodFormat.getDefault().print(period));
+ schedule(new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, period));
+ }
+
+ /**
+ * Schedule the flow
+ * @param flowId
+ * @param date
+ * @param ignoreDep
+ */
+ public void schedule(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime) {
+ logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime));
+ schedule(new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime));
+ }
+
+ /**
+ * Schedules the flow, but doesn't save the schedule afterwards.
+ * @param flow
+ */
+ private synchronized void internalSchedule(ScheduledFlow flow) {
+ ScheduledFlow existing = scheduleIDMap.get(flow.getScheduleId());
+ flow.updateTime();
+ if (existing != null) {
+ this.runner.removeScheduledFlow(existing);
+ }
+
+ this.runner.addScheduledFlow(flow);
+ scheduleIDMap.put(flow.getScheduleId(), flow);
+ }
+
+ /**
+ * Adds a flow to the schedule.
+ *
+ * @param flow
+ */
+ public synchronized void schedule(ScheduledFlow flow) {
+ internalSchedule(flow);
+ saveSchedule();
+ }
+
+ /**
+ * Save the schedule
+ */
+ private void saveSchedule() {
+ loader.saveSchedule(getSchedule());
+ }
+
+
+ /**
+ * Thread that simply invokes the running of flows when the schedule is
+ * ready.
+ *
+ * @author Richard Park
+ *
+ */
+ public class ScheduleRunner extends Thread {
+ private final PriorityBlockingQueue<ScheduledFlow> schedule;
+ private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+ // Five minute minimum intervals
+ private static final int TIMEOUT_MS = 300000;
+
+ public ScheduleRunner() {
+ schedule = new PriorityBlockingQueue<ScheduledFlow>(1, new ScheduleComparator());
+ }
+
+ public void shutdown() {
+ logger.error("Shutting down scheduler thread");
+ stillAlive.set(false);
+ this.interrupt();
+ }
+
+ /**
+ * Return a list of scheduled flow
+ * @return
+ */
+ public synchronized List<ScheduledFlow> getSchedule() {
+ return new ArrayList<ScheduledFlow>(schedule);
+ }
+
+ /**
+ * Adds the flow to the schedule and then interrupts so it will update its wait time.
+ * @param flow
+ */
+ public synchronized void addScheduledFlow(ScheduledFlow flow) {
+ logger.info("Adding " + flow + " to schedule.");
+ schedule.add(flow);
+// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
+// System.currentTimeMillis(),
+// WorkflowAction.SCHEDULE_WORKFLOW,
+// WorkflowState.NOP,
+// flow.getId());
+
+ this.interrupt();
+ }
+
+ /**
+ * Remove scheduled flows. Does not interrupt.
+ *
+ * @param flow
+ */
+ public synchronized void removeScheduledFlow(ScheduledFlow flow) {
+ logger.info("Removing " + flow + " from the schedule.");
+ schedule.remove(flow);
+// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
+// System.currentTimeMillis(),
+// WorkflowAction.UNSCHEDULE_WORKFLOW,
+// WorkflowState.NOP,
+// flow.getId());
+ // Don't need to interrupt, because if this is originally on the top of the queue,
+ // it'll just skip it.
+ }
+
+ public void run() {
+ while(stillAlive.get()) {
+ synchronized (this) {
+ try {
+ //TODO clear up the exception handling
+ ScheduledFlow schedFlow = schedule.peek();
+
+ if (schedFlow == null) {
+ // If null, wake up every minute or so to see if there's something to do.
+ // Most likely there will not be.
+ try {
+ this.wait(TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ // interruption should occur when items are added or removed from the queue.
+ }
+ }
+ else {
+ // We've passed the flow execution time, so we will run.
+ if (!schedFlow.getNextExecTime().isAfterNow()) {
+ // Run flow. The invocation of flows should be quick.
+ ScheduledFlow runningFlow = schedule.poll();
+ logger.info("Scheduler attempting to run " + runningFlow.getScheduleId());
+
+ // Execute the flow here
+ try {
+ Project project = projectManager.getProject(runningFlow.getProjectId());
+ if (project == null) {
+ logger.error("Scheduled Project "+runningFlow.getProjectId()+" does not exist!");
+ throw new RuntimeException("Error finding the scheduled project. " + runningFlow.getScheduleId());
+ }
+
+ Flow flow = project.getFlow(runningFlow.getFlowId());
+ if (flow == null) {
+ logger.error("Flow " + runningFlow.getFlowId() + " cannot be found in project " + project.getName());
+ throw new RuntimeException("Error finding the scheduled flow. " + runningFlow.getScheduleId());
+ }
+
+ HashMap<String, Props> sources;
+ try {
+ sources = projectManager.getAllFlowProperties(project, runningFlow.getFlowId());
+ }
+ catch (ProjectManagerException e) {
+ logger.error(e.getMessage());
+ throw new RuntimeException("Error getting the flow resources. " + runningFlow.getScheduleId());
+ }
+
+ // Create ExecutableFlow
+ ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
+ exflow.setSubmitUser(runningFlow.getUser());
+ //TODO make disabled in scheduled flow
+// Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
+// for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
+// boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
+// exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
+// }
+
+ // Create directory
+ try {
+ executorManager.setupExecutableFlow(exflow);
+ } catch (ExecutorManagerException e) {
+ try {
+ executorManager.cleanupAll(exflow);
+ } catch (ExecutorManagerException e1) {
+ e1.printStackTrace();
+ }
+ logger.error(e.getMessage());
+ return;
+ }
+
+ // Copy files to the source.
+ File executionDir = new File(exflow.getExecutionPath());
+ try {
+ projectManager.copyProjectSourceFilesToDirectory(project, executionDir);
+ } catch (ProjectManagerException e) {
+ try {
+ executorManager.cleanupAll(exflow);
+ } catch (ExecutorManagerException e1) {
+ e1.printStackTrace();
+ }
+ logger.error(e.getMessage());
+ return;
+ }
+
+
+ try {
+ executorManager.executeFlow(exflow);
+ } catch (ExecutorManagerException e) {
+ try {
+ executorManager.cleanupAll(exflow);
+ } catch (ExecutorManagerException e1) {
+ e1.printStackTrace();
+ }
+
+ logger.error(e.getMessage());
+ return;
+ }
+ } catch (JobExecutionException e) {
+ logger.info("Could not run flow. " + e.getMessage());
+ }
+ schedule.remove(runningFlow);
+
+ // Immediately reschedule if it's possible. Let the execution manager
+ // handle any duplicate runs.
+ if (runningFlow.updateTime()) {
+ schedule.add(runningFlow);
+ }
+ saveSchedule();
+ }
+ else {
+ // wait until flow run
+ long millisWait = Math.max(0, schedFlow.getNextExecTime().getMillis() - (new DateTime()).getMillis());
+ try {
+ this.wait(Math.min(millisWait, TIMEOUT_MS));
+ } catch (InterruptedException e) {
+ // interruption should occur when items are added or removed from the queue.
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ logger.error("Unexpected exception has been thrown in scheduler", e);
+ }
+ catch (Throwable e) {
+ logger.error("Unexpected throwable has been thrown in scheduler", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Class to sort the schedule based on time.
+ *
+ * @author Richard Park
+ */
+ private class ScheduleComparator implements Comparator<ScheduledFlow>{
+ @Override
+ public int compare(ScheduledFlow arg0, ScheduledFlow arg1) {
+ DateTime first = arg1.getNextExecTime();
+ DateTime second = arg0.getNextExecTime();
+
+ if (first.isEqual(second)) {
+ return 0;
+ }
+ else if (first.isBefore(second)) {
+ return 1;
+ }
+
+ return -1;
+ }
+ }
+ }
+}
\ No newline at end of file
src/java/azkaban/utils/CircularBuffer.java 75(+75 -0)
diff --git a/src/java/azkaban/utils/CircularBuffer.java b/src/java/azkaban/utils/CircularBuffer.java
new file mode 100644
index 0000000..1e62259
--- /dev/null
+++ b/src/java/azkaban/utils/CircularBuffer.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterators;
+
+/**
+ * A circular buffer of items of a given length. It will grow up to the give size as items are appended, then
+ * it will begin to overwrite older items.
+ *
+ * @author jkreps
+ *
+ * @param <T> The type of the item contained.
+ */
+public class CircularBuffer<T> implements Iterable<T> {
+
+ private final List<T> lines;
+ private final int size;
+ private int start;
+
+ public CircularBuffer(int size) {
+ this.lines = new ArrayList<T>();
+ this.size = size;
+ this.start = 0;
+ }
+
+ public void append(T line) {
+ if(lines.size() < size) {
+ lines.add(line);
+ } else {
+ lines.set(start, line);
+ start = (start + 1) % size;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "[" + Joiner.on(", ").join(lines) + "]";
+ }
+
+ public Iterator<T> iterator() {
+ if(start == 0)
+ return lines.iterator();
+ else
+ return Iterators.concat(lines.subList(start, lines.size()).iterator(), lines.subList(0, start).iterator());
+ }
+
+ public int getMaxSize() {
+ return this.size;
+ }
+
+ public int getSize() {
+ return this.lines.size();
+ }
+
+}
\ No newline at end of file
src/java/azkaban/utils/JSONUtils.java 169(+168 -1)
diff --git a/src/java/azkaban/utils/JSONUtils.java b/src/java/azkaban/utils/JSONUtils.java
index d682dbe..cae5958 100644
--- a/src/java/azkaban/utils/JSONUtils.java
+++ b/src/java/azkaban/utils/JSONUtils.java
@@ -7,23 +7,190 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.Reader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectWriter;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
public class JSONUtils {
+
+
/**
- * Prevent the instantiation of this helper class.
+ * The constructor. Cannot construct this class.
*/
private JSONUtils() {
}
+
+ /**
+ * Takes a reader to stream the JSON string. The reader is not wrapped in a BufferReader
+ * so it is up to the user to employ such optimizations if so desired.
+ *
+ * The results will be Maps, Lists and other Java mapping of Json types (i.e. String, Number, Boolean).
+ *
+ * @param reader
+ * @return
+ * @throws Exception
+ */
+ public static Map<String, Object> fromJSONStream(Reader reader) throws Exception {
+ JSONObject jsonObj = new JSONObject(new JSONTokener(reader));
+ Map<String, Object> results = createObject(jsonObj);
+
+ return results;
+ }
+
+ /**
+ * Converts a json string to Objects.
+ *
+ * The results will be Maps, Lists and other Java mapping of Json types (i.e. String, Number, Boolean).
+ *
+ * @param str
+ * @return
+ * @throws Exception
+ */
+ public static Map<String, Object> fromJSONString(String str) throws Exception {
+ JSONObject jsonObj = new JSONObject(str);
+ Map<String, Object> results = createObject(jsonObj);
+ return results;
+ }
+
+ /**
+ * Recurses through the json object to create a Map/List/Object equivalent.
+ *
+ * @param obj
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ private static Map<String, Object> createObject(JSONObject obj) {
+ LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
+
+ Iterator<String> iterator = obj.keys();
+ while(iterator.hasNext()) {
+ String key = iterator.next();
+ Object value = null;
+ try {
+ value = obj.get(key);
+ } catch (JSONException e) {
+ // Since we get the value from the key given by the JSONObject,
+ // this exception shouldn't be thrown.
+ }
+
+ if (value instanceof JSONArray) {
+ value = createArray((JSONArray)value);
+ }
+ else if (value instanceof JSONObject) {
+ value = createObject((JSONObject)value);
+ }
+
+ map.put(key, value);
+ }
+ return map;
+ }
+
+ /**
+ * Recurses through the json object to create a Map/List/Object equivalent.
+ *
+ * @param obj
+ * @return
+ */
+ private static List<Object> createArray(JSONArray array) {
+ ArrayList<Object> list = new ArrayList<Object>();
+ for (int i = 0; i < array.length(); ++i) {
+ Object value = null;
+ try {
+ value = array.get(i);
+ } catch (JSONException e) {
+ // Ugh... JSON's over done exception throwing.
+ }
+
+ if (value instanceof JSONArray) {
+ value = createArray((JSONArray)value);
+ }
+ else if (value instanceof JSONObject) {
+ value = createObject((JSONObject)value);
+ }
+
+ list.add(value);
+ }
+
+ return list;
+ }
+
+ /**
+ * Creates a json string from Map/List/Primitive object.
+ *
+ * @param obj
+ * @return
+ */
+ public static String toJSONString(List<?> list) {
+ JSONArray jsonList = new JSONArray(list);
+ try {
+ return jsonList.toString();
+ } catch (Exception e) {
+ return "";
+ }
+ }
+
+ /**
+ * Creates a json string from Map/List/Primitive object.
+ *
+ * @param obj
+ * @parm indent
+ * @return
+ */
+ public static String toJSONString(List<?> list, int indent) {
+ JSONArray jsonList = new JSONArray(list);
+ try {
+ return jsonList.toString(indent);
+ } catch (Exception e) {
+ return "";
+ }
+ }
+
+ /**
+ * Creates a json string from Map/List/Primitive object.
+ *
+ * @param obj
+ * @return
+ */
+ public static String toJSONString(Map<String, Object> obj) {
+ JSONObject jsonObj = new JSONObject(obj);
+ try {
+ return jsonObj.toString();
+ } catch (Exception e) {
+ return "";
+ }
+ }
+
+ /**
+ * Creates a json pretty string from Map/List/Primitive object
+ *
+ * @param obj
+ * @param indent
+ * @return
+ */
+ public static String toJSONString(Map<String, Object> obj, int indent) {
+ JSONObject jsonObj = new JSONObject(obj);
+ try {
+ return jsonObj.toString(indent);
+ } catch (Exception e) {
+ return "";
+ }
+ }
+
public static String toJSON(Object obj) {
return toJSON(obj, false);
}
src/java/azkaban/utils/Utils.java 57(+57 -0)
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index c00951a..a4d5db3 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -24,6 +24,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Random;
@@ -174,4 +176,59 @@ public class Utils {
return (Double) obj;
}
+
+ /**
+ * Get the root cause of the Exception
+ *
+ * @param e The Exception
+ * @return The root cause of the Exception
+ */
+ private static RuntimeException getCause(InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if(cause instanceof RuntimeException)
+ throw (RuntimeException) cause;
+ else
+ throw new IllegalStateException(e.getCause());
+ }
+
+ /**
+ * Get the Class of all the objects
+ *
+ * @param args The objects to get the Classes from
+ * @return The classes as an array
+ */
+ public static Class<?>[] getTypes(Object... args) {
+ Class<?>[] argTypes = new Class<?>[args.length];
+ for(int i = 0; i < argTypes.length; i++)
+ argTypes[i] = args[i].getClass();
+ return argTypes;
+ }
+
+ public static Object callConstructor(Class<?> c, Object... args) {
+ return callConstructor(c, getTypes(args), args);
+ }
+
+ /**
+ * Call the class constructor with the given arguments
+ *
+ * @param c The class
+ * @param args The arguments
+ * @return The constructed object
+ */
+ public static Object callConstructor(Class<?> c, Class<?>[] argTypes, Object[] args) {
+ try {
+ Constructor<?> cons = c.getConstructor(argTypes);
+ return cons.newInstance(args);
+ } catch(InvocationTargetException e) {
+ throw getCause(e);
+ } catch(IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ } catch(NoSuchMethodException e) {
+ throw new IllegalStateException(e);
+ } catch(InstantiationException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+
}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 173379c..85b3c3f 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -38,6 +38,8 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.executor.ExecutorManager;
import azkaban.project.FileProjectManager;
import azkaban.project.ProjectManager;
+import azkaban.scheduler.LocalFileScheduleLoader;
+import azkaban.scheduler.ScheduleManager;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
import azkaban.utils.Props;
@@ -45,6 +47,7 @@ import azkaban.utils.Utils;
import azkaban.webapp.servlet.AzkabanServletContextListener;
import azkaban.webapp.servlet.ExecutionServlet;
import azkaban.webapp.servlet.FlowExecutorServlet;
+import azkaban.webapp.servlet.ScheduleServlet;
import azkaban.webapp.servlet.HistoryServlet;
import azkaban.webapp.servlet.IndexServlet;
import azkaban.webapp.servlet.ProjectManagerServlet;
@@ -96,7 +99,9 @@ public class AzkabanWebServer {
private UserManager userManager;
private ProjectManager projectManager;
private ExecutorManager executorManager;
-
+
+ private ScheduleManager scheduleManager;
+
private Props props;
private SessionCache sessionCache;
private File tempDir;
@@ -119,6 +124,7 @@ public class AzkabanWebServer {
userManager = loadUserManager(props);
projectManager = loadProjectManager(props);
executorManager = loadExecutorManager(props);
+ scheduleManager = loadScheduleManager(executorManager, props);
tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -186,6 +192,13 @@ public class AzkabanWebServer {
return execManager;
}
+ private ScheduleManager loadScheduleManager(ExecutorManager execManager, Props props ) throws Exception {
+
+ ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, new LocalFileScheduleLoader(props));
+ return schedManager;
+ }
+
+
/**
* Returns the web session cache.
*
@@ -226,6 +239,10 @@ public class AzkabanWebServer {
public ExecutorManager getExecutorManager() {
return executorManager;
}
+
+ public ScheduleManager getScheduleManager() {
+ return scheduleManager;
+ }
/**
* Creates and configures the velocity engine.
@@ -350,6 +367,7 @@ public class AzkabanWebServer {
root.addServlet(new ServletHolder(new FlowExecutorServlet()),"/executor");
root.addServlet(new ServletHolder(new ExecutionServlet()),"/executions");
root.addServlet(new ServletHolder(new HistoryServlet()), "/history");
+ root.addServlet(new ServletHolder(new ScheduleServlet()),"/schedule");
root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
@@ -440,4 +458,6 @@ public class AzkabanWebServer {
return null;
}
+
+
}
src/java/azkaban/webapp/servlet/ScheduleServlet.java 172(+172 -0)
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
new file mode 100644
index 0000000..376bb0d
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -0,0 +1,172 @@
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.joda.time.DateTime;
+import org.joda.time.Days;
+import org.joda.time.Hours;
+import org.joda.time.LocalDateTime;
+import org.joda.time.Minutes;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormat;
+
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.user.User;
+import azkaban.user.Permission.Type;
+import azkaban.webapp.session.Session;
+import azkaban.scheduler.ScheduleManager;
+
+public class ScheduleServlet extends LoginAbstractAzkabanServlet {
+ private static final long serialVersionUID = 1L;
+ private ProjectManager projectManager;
+ private ScheduleManager scheduleManager;
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ super.init(config);
+ projectManager = this.getApplication().getProjectManager();
+ scheduleManager = this.getApplication().getScheduleManager();
+ }
+
+ @Override
+ protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ }
+// else if (hasParam(req, "execid")) {
+// handleExecutionFlowPage(req, resp, session);
+// }
+ }
+
+ @Override
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ if (hasParam(req, "action")) {
+ String action = getParam(req, "action");
+ if (action.equals("scheduleFlow")) {
+ ajaxScheduleFlow(req, ret, session.getUser());
+ }
+ }
+ this.writeJSON(resp, ret);
+ }
+
+ private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ String ajaxName = getParam(req, "ajax");
+
+//// if (hasParam(req, "execid")) {
+//// if (ajaxName.equals("fetchexecflow")) {
+//// ajaxFetchExecutableFlow(req, resp, ret, session.getUser());
+//// }
+////// else if (ajaxName.equals("fetchexecflowupdate")) {
+////// ajaxFetchExecutableFlowUpdate(req, resp, ret, session.getUser());
+////// }
+// }
+// if(hasParam(req, "schedule")) {
+
+ if (ajaxName.equals("scheduleFlow")) {
+ ajaxScheduleFlow(req, ret, session.getUser());
+ }
+// }
+ this.writeJSON(resp, ret);
+ }
+
+ private void ajaxScheduleFlow(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException {
+ String projectId = getParam(req, "projectId");
+ String flowId = getParam(req, "flowId");
+
+ Project project = projectManager.getProject(projectId);
+
+ if (project == null) {
+ ret.put("message", "Project " + projectId + " does not exist");
+ ret.put("status", "error");
+ return;
+ }
+
+ if (!project.hasPermission(user, Type.SCHEDULE)) {
+ ret.put("status", "error");
+ ret.put("message", "Permission denied. Cannot execute " + flowId);
+ return;
+ }
+
+ Flow flow = project.getFlow(flowId);
+ if (flow == null) {
+ ret.put("status", "error");
+ ret.put("message", "Flow " + flowId + " cannot be found in project " + project);
+ return;
+ }
+
+// int hour = getIntParam(req, "hour");
+// int minutes = getIntParam(req, "minutes");
+// boolean isPm = getParam(req, "am_pm").equalsIgnoreCase("pm");
+ int hour = 0;
+ int minutes = 0;
+ boolean isPm = false;
+ String scheduledDate = req.getParameter("date");
+ DateTime day = null;
+ if(scheduledDate == null || scheduledDate.trim().length() == 0) {
+ day = new LocalDateTime().toDateTime();
+ } else {
+ try {
+ day = DateTimeFormat.forPattern("MM-dd-yyyy").parseDateTime(scheduledDate);
+ } catch(IllegalArgumentException e) {
+ ret.put("error", "Invalid date: '" + scheduledDate + "'");
+ return;
+ }
+ }
+
+ ReadablePeriod thePeriod = null;
+ if(hasParam(req, "is_recurring"))
+ thePeriod = parsePeriod(req);
+
+ if(isPm && hour < 12)
+ hour += 12;
+ hour %= 24;
+
+ String userSubmit = user.getUserId();
+ String userExec = getParam(req, "userExec");
+ String scheduleId = projectId + "." + flowId;
+ DateTime submitTime = new DateTime();
+ DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
+
+ scheduleManager.schedule(scheduleId,userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
+
+
+
+ ret.put("status", "success");
+ ret.put("message", scheduleId + " scheduled.");
+
+ }
+
+
+
+ private ReadablePeriod parsePeriod(HttpServletRequest req) throws ServletException {
+// int period = getIntParam(req, "period");
+// String periodUnits = getParam(req, "period_units");
+ int period = 10;
+ String periodUnits = "m";
+ if("d".equals(periodUnits))
+ return Days.days(period);
+ else if("h".equals(periodUnits))
+ return Hours.hours(period);
+ else if("m".equals(periodUnits))
+ return Minutes.minutes(period);
+ else if("s".equals(periodUnits))
+ return Seconds.seconds(period);
+ else
+ throw new ServletException("Unknown period unit: " + periodUnits);
+ }
+
+}
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 83e720c..a916d1c 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -49,7 +49,7 @@
</div>
<div id="executebtn" class="btn1">Execute</div>
- <div id="schedulebtn" class="btn2">Schedule Flow</div>
+ <div id="scheduleflowbtn" class="btn2 scheduleflow">Schedule Flow</div>
</div>
<div id="headertabs" class="headertabs">
@@ -71,7 +71,7 @@
</div>
<div id="svgDiv" >
<svg id="svgGraph" xmlns="http://www.w3.org/2000/svg" version="1.1" shape-rendering="optimize-speed" text-rendering="optimize-speed" >
- </svg>
+ </svg>pload-project
</div>
</div>
</div>
@@ -107,6 +107,38 @@
</div>
</div>
</div>
+ <!-- modal content -->
+ <div id="schedule-flow" class="modal">
+ <h3>Schedule A Flow</h3>
+ <div id="errorMsg" class="box-error-message">$errorMsg</div>
+
+ <div class="message">
+ <fieldset>
+ <dl>
+ <dt><label for="path">Project Name</label></dt>
+ <dd><input id="path" name="project" type="text" size="20" title="The project name."/></dd>
+ <dt>Description</dt>
+ <dd><textarea id="description" name="description" rows="2" cols="40"></textarea></dd>
+
+ <input name="action" type="hidden" value="create" />
+ <input name="redirect" type="hidden" value="$!context/" />
+ </dl>
+ </fieldset>
+ </div>
+
+ <div class="actions">
+ <a class="yes btn3" id="schedule-btn" href="#">Schedule The Flow</a>
+ <a class="no simplemodal-close btn4" href="#">Cancel</a>
+ </div>
+ <div id="invalid-session" class="modal">
+ <h3>Invalid Session</h3>
+ <p>Session has expired. Please re-login.</p>
+ <div class="actions">
+ <a class="yes btn3" id="login-btn" href="#">Re-login</a>
+ </div>
+ </div>
+ </div>
+
#end
<ul id="jobMenu" class="contextMenu flowSubmenu">
<li class="open"><a href="#open">Open...</a></li>
src/web/js/azkaban.flow.view.js 72(+72 -0)
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 868d8d3..5af2b50 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -855,6 +855,60 @@ azkaban.GraphModel = Backbone.Model.extend({});
var executionModel;
azkaban.ExecutionModel = Backbone.Model.extend({});
+var scheduleFlowView;
+azkaban.ScheduleFlowView = Backbone.View.extend({
+ events : {
+ "click #schedule-btn": "handleScheduleFlow"
+ },
+ initialize : function(settings) {
+ $("#errorMsg").hide();
+ },
+ handleScheduleFlow : function(evt) {
+ // First make sure we can upload
+ //var projectName = $('#path').val();
+ //var description = $('#description').val();
+
+ console.log("Creating schedule");
+ $.ajax({
+ async: "false",
+ url: "schedule",
+ dataType: "json",
+ type: "POST",
+ data: {
+ action:"scheduleFlow",
+ projectId:projectName,
+ flowId:flowName,
+ userExec:"dummy",
+ is_recurring:true
+ },
+ success: function(data) {
+ if (data.status == "success") {
+ if (data.action == "redirect") {
+ window.location = data.path;
+ }
+ else{
+ $("#errorMsg").text("Flow " + projectName + "." + flowName + " scheduled!" );
+ }
+ }
+ else {
+ if (data.action == "login") {
+ window.location = "";
+ }
+ else {
+ $("#errorMsg").text("ERROR: " + data.message);
+ $("#errorMsg").slideDown("fast");
+ }
+ }
+ }
+ });
+
+ },
+ render: function() {
+ }
+});
+
+
+
$(function() {
var selected;
// Execution model has to be created before the window switches the tabs.
@@ -867,6 +921,7 @@ $(function() {
svgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel});
jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel});
contextMenu = new azkaban.ContextMenu({el:$('#jobMenu')});
+ scheduleFlowView = new azkaban.ScheduleFlowView({el:$('#schedule-flow')});
var requestURL = contextURL + "/manager";
@@ -947,4 +1002,21 @@ $(function() {
);
});
+
+ $('#scheduleflowbtn').click( function() {
+ console.log("schedule button clicked");
+ $('#schedule-flow').modal({
+ closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+ position: ["20%",],
+ containerId: 'confirm-container',
+ containerCss: {
+ 'height': '220px',
+ 'width': '565px'
+ },
+ onShow: function (dialog) {
+ var modal = this;
+ $("#errorMsg").hide();
+ }
+ });
+ });
});
diff --git a/unit/java/azkaban/scheduler/MockJobExecutorManager.java b/unit/java/azkaban/scheduler/MockJobExecutorManager.java
new file mode 100644
index 0000000..ce07adc
--- /dev/null
+++ b/unit/java/azkaban/scheduler/MockJobExecutorManager.java
@@ -0,0 +1,99 @@
+package azkaban.scheduler;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.joda.time.DateTime;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
+//import azkaban.flow.FlowExecutionHolder;
+import azkaban.executor.ExecutorManager;
+
+public class MockJobExecutorManager extends ExecutorManager {
+
+ private ArrayList<ExecutionRecord> executionList = new ArrayList<ExecutionRecord>();
+ private RuntimeException throwException = null;
+
+ public MockJobExecutorManager() throws IOException, ExecutorManagerException
+ {
+ super(null);
+ }
+
+ public void setThrowException(RuntimeException throwException) {
+ this.throwException = throwException;
+ }
+
+// @Override
+// public void execute(String id, boolean ignoreDep) {
+// DateTime time = new DateTime();
+// executionList.add(new ExecutionRecord(id, ignoreDep, time, throwException));
+// System.out.println("Running " + id + " at time " + time);
+// if (throwException != null) {
+// throw throwException;
+// }
+// }
+
+ @Override
+ public void executeFlow(ExecutableFlow flow) {
+ //System.out.println("Did not expect");
+ DateTime nextExecTime = new DateTime();
+ DateTime submitTime = new DateTime();
+ DateTime firstSchedTime = new DateTime();
+ String scheduleId = flow.getProjectId()+"."+flow.getFlowId();
+ executionList.add(new ExecutionRecord(scheduleId, "pymk", "cyu", submitTime, firstSchedTime, nextExecTime, throwException));
+ System.out.println("Running " + scheduleId + " at time " + nextExecTime);
+ if (throwException != null) {
+ throw throwException;
+ }
+ }
+
+// @Override
+//// public void execute(FlowExecutionHolder holder) {
+//// System.out.println("Did not expect");
+//// }
+
+ public ArrayList<ExecutionRecord> getExecutionList() {
+ return executionList;
+ }
+
+ public void clearExecutionList() {
+ executionList.clear();
+ }
+
+ public class ExecutionRecord {
+ private final String scheduleId;
+ private final String user;
+ private final String userSubmit;
+ private final DateTime submitTime;
+ private final DateTime firstSchedTime;
+ private final DateTime nextExecTime;
+ private final Exception throwException;
+
+ public ExecutionRecord(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime, DateTime nextExecTime) {
+ this(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExecTime, null);
+ }
+
+ public ExecutionRecord(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime, DateTime nextExecTime, Exception throwException) {
+ this.scheduleId = scheduleId;
+ this.user = user;
+ this.userSubmit = userSubmit;
+ this.submitTime = submitTime;
+ this.firstSchedTime = firstSchedTime;
+ this.nextExecTime = nextExecTime;
+ this.throwException = throwException;
+ }
+
+ public String getScheduleId() {
+ return scheduleId;
+ }
+
+ public DateTime getNextExecTime() {
+ return nextExecTime;
+ }
+
+ public Exception getThrowException() {
+ return throwException;
+ }
+ }
+}
\ No newline at end of file
unit/java/azkaban/scheduler/MockLoader.java 36(+36 -0)
diff --git a/unit/java/azkaban/scheduler/MockLoader.java b/unit/java/azkaban/scheduler/MockLoader.java
new file mode 100644
index 0000000..0d1f5eb
--- /dev/null
+++ b/unit/java/azkaban/scheduler/MockLoader.java
@@ -0,0 +1,36 @@
+package azkaban.scheduler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+public class MockLoader implements ScheduleLoader {
+ private ArrayList<ScheduledFlow> scheduledFlow = new ArrayList<ScheduledFlow>();
+
+ public void addScheduledFlow(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime, DateTime nextExec, Period recurrence) {
+ ScheduledFlow flow = new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExec, recurrence);
+ addScheduleFlow(flow);
+ }
+
+ public void addScheduleFlow(ScheduledFlow flow) {
+ scheduledFlow.add(flow);
+ }
+
+ public void clearSchedule() {
+ scheduledFlow.clear();
+ }
+
+ @Override
+ public List<ScheduledFlow> loadSchedule() {
+ return scheduledFlow;
+ }
+
+ @Override
+ public void saveSchedule(List<ScheduledFlow> schedule) {
+ scheduledFlow.clear();
+ scheduledFlow.addAll(schedule);
+ }
+
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java b/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java
new file mode 100644
index 0000000..4579331
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java
@@ -0,0 +1,11 @@
+package azkaban.test.jobExecutor;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+
+@RunWith(Suite.class)
+@SuiteClasses({ JavaJobTest.class, ProcessJobTest.class, PythonJobTest.class })
+public class AllJobExecutorTests {
+
+}
unit/java/azkaban/test/jobExecutor/JavaJobTest.java 132(+132 -0)
diff --git a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
new file mode 100644
index 0000000..27a7881
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
@@ -0,0 +1,132 @@
+package azkaban.test.jobExecutor;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+import org.easymock.classextension.EasyMock;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.jobExecutor.JavaJob;
+import azkaban.jobExecutor.ProcessJob;
+
+public class JavaJobTest
+{
+
+ private JavaJob job = null;
+// private JobDescriptor descriptor = null;
+ private Props props = null;
+ private Logger log = Logger.getLogger(JavaJob.class);
+ private static String classPaths ;
+
+ private static final String inputContent =
+ "Quick Change in Strategy for a Bookseller \n" +
+ " By JULIE BOSMAN \n" +
+ "Published: August 11, 2010 \n" +
+ " \n" +
+ "Twelve years later, it may be Joe Fox�s turn to worry. Readers have gone from skipping small \n" +
+ "bookstores to wondering if they need bookstores at all. More people are ordering books online \n" +
+ "or plucking them from the best-seller bin at Wal-Mart";
+
+ private static final String errorInputContent =
+ inputContent + "\n stop_here " +
+ "But the threat that has the industry and some readers the most rattled is the growth of e-books. \n" +
+ " In the first five months of 2009, e-books made up 2.9 percent of trade book sales. In the same period \n" +
+ "in 2010, sales of e-books, which generally cost less than hardcover books, grew to 8.5 percent, according \n" +
+ "to the Association of American Publishers, spurred by sales of the Amazon Kindle and the new Apple iPad. \n" +
+ "For Barnes & Noble, long the largest and most powerful bookstore chain in the country, the new competition \n" +
+ "has led to declining profits and store traffic.";
+
+
+ private static String inputFile ;
+ private static String errorInputFile ;
+ private static String outputFile ;
+
+ @BeforeClass
+ public static void init() {
+ // get the classpath
+ Properties prop = System.getProperties();
+ classPaths = String.format("'%s'", prop.getProperty("java.class.path", null));
+
+ long time = (new Date()).getTime();
+ inputFile = "/tmp/azkaban_input_" + time;
+ errorInputFile = "/tmp/azkaban_input_error_" + time;
+ outputFile = "/tmp/azkaban_output_" + time;
+ // dump input files
+ try {
+ Utils.dumpFile(inputFile, inputContent);
+ Utils.dumpFile(errorInputFile, errorInputContent);
+ }
+ catch (IOException e) {
+ e.printStackTrace(System.err);
+ Assert.fail("error in creating input file:" + e.getLocalizedMessage());
+ }
+
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ // remove the input file and error input file
+ Utils.removeFile(inputFile);
+ Utils.removeFile(errorInputFile);
+ //Utils.removeFile(outputFile);
+ }
+
+ @Before
+ public void setUp() {
+
+ /* initialize job */
+// descriptor = EasyMock.createMock(JobDescriptor.class);
+
+ props = new Props();
+ props.put(AbstractProcessJob.WORKING_DIR, ".");
+ props.put("type", "java");
+ props.put("fullPath", ".");
+
+// EasyMock.expect(descriptor.getId()).andReturn("java").times(1);
+// EasyMock.expect(descriptor.getProps()).andReturn(props).times(1);
+// EasyMock.expect(descriptor.getFullPath()).andReturn(".").times(1);
+//
+// EasyMock.replay(descriptor);
+
+ job = new JavaJob(props, log);
+
+// EasyMock.verify(descriptor);
+ }
+
+ @Test
+ public void testJavaJob() {
+ /* initialize the Props */
+ props.put(JavaJob.JOB_CLASS, "azkaban.test.jobExecutor.WordCountLocal");
+ props.put(ProcessJob.WORKING_DIR, ".");
+ props.put("input", inputFile);
+ props.put("output", outputFile);
+ props.put("classpath", classPaths);
+ job.run();
+ }
+
+ @Test
+ public void testFailedJavaJob() {
+ props.put(JavaJob.JOB_CLASS, "azkaban.test.jobExecutor.WordCountLocal");
+ props.put(ProcessJob.WORKING_DIR, ".");
+ props.put("input", errorInputFile);
+ props.put("output", outputFile);
+ props.put("classpath", classPaths);
+
+ try {
+ job.run();
+ }
+ catch (RuntimeException e) {
+ Assert.assertTrue(true);
+ }
+ }
+
+}
+
diff --git a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
new file mode 100644
index 0000000..d859e02
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
@@ -0,0 +1,79 @@
+package azkaban.test.jobExecutor;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.log4j.Logger;
+import org.easymock.classextension.EasyMock;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.jobExecutor.ProcessJob;
+
+
+public class ProcessJobTest
+{
+ private ProcessJob job = null;
+// private JobDescriptor descriptor = null;
+ private Props props = null;
+ private Logger log = Logger.getLogger(ProcessJob.class);
+ @Before
+ public void setUp() {
+
+ /* initialize job */
+// props = EasyMock.createMock(Props.class);
+
+ props = new Props();
+ props.put(AbstractProcessJob.WORKING_DIR, ".");
+ props.put("type", "command");
+ props.put("fullPath", ".");
+
+
+// EasyMock.expect(props.getString("type")).andReturn("command").times(1);
+// EasyMock.expect(props.getProps()).andReturn(props).times(1);
+// EasyMock.expect(props.getString("fullPath")).andReturn(".").times(1);
+//
+// EasyMock.replay(props);
+
+ job = new ProcessJob(props, log);
+
+
+ }
+
+ @Test
+ public void testOneUnixCommand() {
+ /* initialize the Props */
+ props.put(ProcessJob.COMMAND, "ls -al");
+ props.put(ProcessJob.WORKING_DIR, ".");
+
+ job.run();
+
+ }
+
+ @Test
+ public void testFailedUnixCommand() {
+ /* initialize the Props */
+ props.put(ProcessJob.COMMAND, "xls -al");
+ props.put(ProcessJob.WORKING_DIR, ".");
+
+ try {
+ job.run();
+ }catch (RuntimeException e) {
+ Assert.assertTrue(true);
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testMultipleUnixCommands( ) {
+ /* initialize the Props */
+ props.put(ProcessJob.WORKING_DIR, ".");
+ props.put(ProcessJob.COMMAND, "pwd");
+ props.put("command.1", "date");
+ props.put("command.2", "whoami");
+
+ job.run();
+ }
+}
+
+
diff --git a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
new file mode 100644
index 0000000..48cfd42
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
@@ -0,0 +1,107 @@
+package azkaban.test.jobExecutor;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+import org.easymock.classextension.EasyMock;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.jobExecutor.PythonJob;
+
+public class PythonJobTest
+{
+ private PythonJob job = null;
+// private JobDescriptor descriptor = null;
+ private Props props = null;
+ private Logger log = Logger.getLogger(PythonJob.class);
+
+ private static final String scriptContent =
+ "#!/usr/bin/python \n" +
+ "import re, string, sys \n" +
+ "# if no arguments were given, print a helpful message \n" +
+ "l=len(sys.argv) \n" +
+ "if l < 1: \n"+
+ "\tprint 'Usage: celsium --t temp' \n" +
+ "\tsys.exit(1) \n" +
+ "\n" +
+ "# Loop over the arguments \n" +
+ "i=1 \n" +
+ "while i < l-1 : \n" +
+ "\tname = sys.argv[i] \n" +
+ "\tvalue = sys.argv[i+1] \n" +
+ "\tif name == \"--t\": \n" +
+ "\t\ttry: \n" +
+ "\t\t\tfahrenheit = float(string.atoi(value)) \n" +
+ "\t\texcept string.atoi_error: \n" +
+ "\t\t\tprint repr(value), \" not a numeric value\" \n" +
+ "\t\telse: \n" +
+ "\t\t\tcelsius=(fahrenheit-32)*5.0/9.0 \n" +
+ "\t\t\tprint '%i F = %iC' % (int(fahrenheit), int(celsius+.5)) \n" +
+ "\t\t\tsys.exit(0) \n" +
+ "\t\ti=i+2\n" ;
+
+
+ private static String scriptFile ;
+
+
+
+ @BeforeClass
+ public static void init() {
+
+ long time = (new Date()).getTime();
+ scriptFile = "/tmp/azkaban_python" + time + ".py";
+ // dump script file
+ try {
+ Utils.dumpFile(scriptFile, scriptContent);
+ }
+ catch (IOException e) {
+ e.printStackTrace(System.err);
+ Assert.fail("error in creating script file:" + e.getLocalizedMessage());
+ }
+
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ // remove the input file and error input file
+ Utils.removeFile(scriptFile);
+ }
+
+ @Test
+ public void testPythonJob() {
+
+ /* initialize job */
+// descriptor = EasyMock.createMock(JobDescriptor.class);
+
+ props = new Props();
+ props.put(AbstractProcessJob.WORKING_DIR, ".");
+ props.put("type", "python");
+ props.put("script", scriptFile);
+ props.put("t", "90");
+ props.put("type", "script");
+ props.put("fullPath", ".");
+
+// EasyMock.expect(descriptor.getId()).andReturn("script").times(1);
+// EasyMock.expect(descriptor.getProps()).andReturn(props).times(3);
+// EasyMock.expect(descriptor.getFullPath()).andReturn(".").times(1);
+// EasyMock.replay(descriptor);
+ job = new PythonJob(props, log);
+// EasyMock.verify(descriptor);
+ try
+ {
+ job.run();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.err);
+ Assert.fail("Python job failed:" + e.getLocalizedMessage());
+ }
+ }
+
+}
diff --git a/unit/java/azkaban/test/jobExecutor/Utils.java b/unit/java/azkaban/test/jobExecutor/Utils.java
new file mode 100644
index 0000000..0c012cf
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/Utils.java
@@ -0,0 +1,23 @@
+package azkaban.test.jobExecutor;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+public class Utils
+{
+
+ public static void dumpFile (String filename, String filecontent)
+ throws IOException {
+ PrintWriter writer = new PrintWriter(new FileWriter(filename));
+ writer.print(filecontent);
+ writer.close();
+ }
+
+ public static void removeFile (String filename) {
+ File file = new File (filename);
+ file.delete();
+ }
+
+}
diff --git a/unit/java/azkaban/test/jobExecutor/WordCountLocal.java b/unit/java/azkaban/test/jobExecutor/WordCountLocal.java
new file mode 100644
index 0000000..c22119e
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/WordCountLocal.java
@@ -0,0 +1,80 @@
+package azkaban.test.jobExecutor;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.log4j.Logger;
+
+import azkaban.jobExecutor.AbstractJob;
+import azkaban.utils.Props;
+
+public class WordCountLocal extends AbstractJob {
+
+ private String _input = null;
+ private String _output = null;
+ private Map<String, Integer> _dic = new HashMap<String,Integer>();
+
+ public WordCountLocal(String id, Props prop)
+ {
+ super(id, Logger.getLogger(WordCountLocal.class));
+ _input = prop.getString("input");
+ _output = prop.getString("output");
+ }
+
+
+ public void run () throws Exception {
+
+ if (_input == null) throw new Exception ("input file is null");
+ if (_output == null) throw new Exception ("output file is null");
+ BufferedReader in = new BufferedReader (new InputStreamReader( new FileInputStream(_input)));
+
+ String line = null;
+ while ( (line = in.readLine()) != null ) {
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ while (tokenizer.hasMoreTokens()) {
+ String word =tokenizer.nextToken();
+
+ if (word.toString().equals("end_here")) { //expect an out-of-bound exception
+ String [] errArray = new String[1];
+ System.out.println("string in possition 2 is " + errArray[1]);
+ }
+
+ if (_dic.containsKey(word)) {
+ Integer num = _dic.get(word);
+ _dic.put(word, num +1);
+ }
+ else {
+ _dic.put(word, 1);
+ }
+ }
+ }
+ in.close();
+
+ PrintWriter out = new PrintWriter(new FileOutputStream(_output));
+ for (Map.Entry<String, Integer> entry: _dic.entrySet()) {
+ out.println (entry.getKey() + "\t" + entry.getValue());
+ }
+ out.close();
+ }
+
+ @Override
+ public Props getJobGeneratedProperties()
+ {
+ return new Props();
+ }
+
+ @Override
+ public boolean isCanceled()
+ {
+ return false;
+ }
+
+
+ }
+
\ No newline at end of file