azkaban-developers

Changes

Details

diff --git a/conf/azkaban.properties b/conf/azkaban.properties
index ee78616..40d3d0b 100644
--- a/conf/azkaban.properties
+++ b/conf/azkaban.properties
@@ -19,11 +19,16 @@ project.global.properties=conf/global.properties
 execution.directory=execution
 execution.use.symlink=true
 
+#Schedule files
+schedule.directory=schedule
+schedule.path=schedule
+schedule.backup=backup
+
 # Velocity dev mode
 velocity.dev.mode=true
 
 # Azkaban Jetty server properties. Ignored in tomcat
-jetty.maxThreads=10
+jetty.maxThreads=25
 jetty.ssl.port=8443
 jetty.port=8081
 jetty.keystore=keystore
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index ca0ddb6..ff5f1e4 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -15,6 +15,9 @@ import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.event.Event;
 import azkaban.executor.event.Event.Type;
 import azkaban.executor.event.EventHandler;
+import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.jobExecutor.Job;
+import azkaban.jobExecutor.utils.JobWrappingFactory;
 import azkaban.utils.Props;
 
 public class JobRunner extends EventHandler implements Runnable {
@@ -30,6 +33,8 @@ public class JobRunner extends EventHandler implements Runnable {
 	private Layout loggerLayout = DEFAULT_LAYOUT;
 	private Appender jobAppender;
 	
+	private Job job;
+	
 	private static final Object logCreatorLock = new Object();
 	
 	public JobRunner(ExecutableNode node, Props props, File workingDir) {
@@ -85,7 +90,6 @@ public class JobRunner extends EventHandler implements Runnable {
 		node.setStatus(Status.RUNNING);
 		this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
 
-		//Just for testing 5 sec each round.
 		synchronized(this) {
 			try {
 				wait(5000);
@@ -94,12 +98,26 @@ public class JobRunner extends EventHandler implements Runnable {
 				logger.info("Job cancelled.");
 			}
 		}
+
 		// Run Job
 		boolean succeeded = true;
 
+		props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
+		JobWrappingFactory factory  = JobWrappingFactory.getJobWrappingFactory();
+        job = factory.buildJobExecutor(props, logger);
+
+        try {
+                job.run();
+        } catch (Exception e) {
+                succeeded = false;
+                //logger.error("job run failed!");
+                e.printStackTrace();
+        }
+		
 		node.setEndTime(System.currentTimeMillis());
 		if (succeeded) {
 			node.setStatus(Status.SUCCEEDED);
+			outputProps = job.getJobGeneratedProperties();
 			this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
 		} else {
 			node.setStatus(Status.FAILED);
@@ -111,6 +129,18 @@ public class JobRunner extends EventHandler implements Runnable {
 
 	public synchronized void cancel() {
 		// Cancel code here
+		if(job == null) {
+            logger.error("Job doesn't exisit!");
+            return;
+		}
+
+		try {
+            job.cancel();
+		} catch (Exception e) {
+            logger.error("Failed trying to cancel job!");
+            e.printStackTrace();
+		}
+
 		// will just interrupt, I guess, until the code is finished.
 		this.notifyAll();
 		
diff --git a/src/java/azkaban/jobExecutor/AbstractJob.java b/src/java/azkaban/jobExecutor/AbstractJob.java
new file mode 100644
index 0000000..6292dee
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/AbstractJob.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+public abstract class AbstractJob implements Job {
+
+	public static final String JOB_TYPE = "type";
+    public static final String JOB_CLASS = "job.class";
+    public static final String JOB_PATH = "job.path";
+    public static final String JOB_FULLPATH = "job.fullpath";
+    public static final String JOB_ID = "job.id";
+    
+	
+    private final String _id;
+    private final Logger _log;
+    private volatile double _progress;
+
+//    protected AbstractJob(String id) {
+//        this(id, Logger.getLogger(id));
+//    }
+
+    protected AbstractJob(String id, Logger log) {
+        _id = id;
+        _log = log;
+        _progress = 0.0;
+    }
+    
+    public String getId() {
+        return _id;
+    }
+
+    public double getProgress() throws Exception {
+        return _progress;
+    }
+
+    public void setProgress(double progress) {
+        this._progress = progress;
+    }
+
+    public void cancel() throws Exception {
+        throw new RuntimeException("Job " + _id + " does not support cancellation!");
+    }
+
+    public Logger getLog() {
+        return this._log;
+    }
+
+    public void debug(String message) {
+        this._log.debug(message);
+    }
+
+    public void debug(String message, Throwable t) {
+        this._log.debug(message, t);
+    }
+
+    public void info(String message) {
+        this._log.info(message);
+    }
+
+    public void info(String message, Throwable t) {
+        this._log.info(message, t);
+    }
+
+    public void warn(String message) {
+        this._log.warn(message);
+    }
+
+    public void warn(String message, Throwable t) {
+        this._log.warn(message, t);
+    }
+
+    public void error(String message) {
+        this._log.error(message);
+    }
+
+    public void error(String message, Throwable t) {
+        this._log.error(message, t);
+    }
+    
+    public Props getJobGeneratedProperties() {
+        return new Props();
+    }
+    
+    public abstract void run() throws Exception;
+    
+    public boolean isCanceled() {
+        return false;
+    }
+
+}
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
new file mode 100644
index 0000000..dd860a1
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.commons.fileupload.util.Streams;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+import org.json.JSONObject;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.utils.JSONToJava;
+import azkaban.jobExecutor.utils.PropsUtils;
+
+/*
+ * A revised process-based job
+ * 
+ * @author jkreps
+ * 
+ */
+public abstract class AbstractProcessJob extends AbstractJob {
+
+//    private static final Logger log = Logger
+//            .getLogger(AbstractProcessJob.class);
+
+	private final Logger log;
+    public static final String ENV_PREFIX = "env.";
+    public static final String ENV_PREFIX_UCASE = "ENV.";
+    public static final String WORKING_DIR = "working.dir";
+    public static final String JOB_PROP_ENV = "JOB_PROP_FILE";
+    public static final String JOB_NAME_ENV = "JOB_NAME";
+    public static final String JOB_OUTPUT_PROP_FILE = "JOB_OUTPUT_PROP_FILE";
+
+    private static final JSONToJava jsonToJava = new JSONToJava();
+
+    protected final String _jobPath;
+
+//    protected final Props props;
+    protected volatile Props _props;
+
+    protected String _cwd;
+
+    private volatile Props generatedPropeties;
+
+    protected AbstractProcessJob(final Props props, final Logger log) {
+        super(props.getString(JOB_ID, "unkownjob"), log);
+
+        _props = props;
+        _jobPath = props.getString(JOB_FULLPATH, new File(".").getAbsolutePath());
+
+        _cwd = getWorkingDirectory();
+        this.log = log;
+    }
+
+    public Props getProps() {
+        return _props;
+    }
+
+    public String getJobPath() {
+        return _jobPath;
+    }
+
+    protected void resolveProps() {
+        _props = PropsUtils.resolveProps(_props);
+    }
+
+    @Override
+    public Props getJobGeneratedProperties() {
+        return generatedPropeties;
+    }
+
+    /**
+     * initialize temporary and final property file
+     * 
+     * @return {tmpPropFile, outputPropFile}
+     */
+    public File[] initPropsFiles() {
+        // Create properties file with additionally all input generated
+        // properties.
+        File[] files = new File[2];
+        files[0] = createFlattenedPropsFile(_cwd);
+
+        _props.put(ENV_PREFIX + JOB_PROP_ENV, files[0].getAbsolutePath());
+        _props.put(ENV_PREFIX + JOB_NAME_ENV, getId());
+
+        files[1] = createOutputPropsFile(getId(), _cwd);
+        _props.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE,
+                files[1].getAbsolutePath());
+
+        return files;
+    }
+
+    public String getCwd() {
+        return _cwd;
+    }
+
+    public Map<String, String> getEnvironmentVariables() {
+        Props props = getProps();
+        Map<String, String> envMap = props.getMapByPrefix(ENV_PREFIX);
+        envMap.putAll(props.getMapByPrefix(ENV_PREFIX_UCASE));
+        return envMap;
+    }
+
+    public String getWorkingDirectory() {
+        return getProps()//.getString(WORKING_DIR, ".");
+                .getString(WORKING_DIR, new File(_jobPath).getParent());
+    }
+
+    public Props loadOutputFileProps(final File outputPropertiesFile) {
+        InputStream reader = null;
+        try {
+            System.err.println("output properties file="
+                    + outputPropertiesFile.getAbsolutePath());
+            reader = new BufferedInputStream(new FileInputStream(
+                    outputPropertiesFile));
+
+            Props outputProps = new Props();
+            final String content = Streams.asString(reader).trim();
+
+            if (!content.isEmpty()) {
+                Map<String, Object> propMap = jsonToJava.apply(new JSONObject(
+                        content));
+
+                for (Map.Entry<String, Object> entry : propMap.entrySet()) {
+                    outputProps
+                            .put(entry.getKey(), entry.getValue().toString());
+                }
+            }
+            return outputProps;
+        } catch (FileNotFoundException e) {
+            log.info(String.format(
+                    "File[%s] wasn't found, returning empty props.",
+                    outputPropertiesFile));
+            return new Props();
+        } catch (Exception e) {
+            log.error(
+                    "Exception thrown when trying to load output file props.  Returning empty Props instead of failing.  Is this really the best thing to do?",
+                    e);
+            return new Props();
+        } finally {
+            IOUtils.closeQuietly(reader);
+        }
+    }
+
+    public File createFlattenedPropsFile(final String workingDir) {
+        File directory = new File(workingDir);
+        File tempFile = null;
+        try {
+            tempFile = File.createTempFile(getId() + "_", "_tmp", directory);
+            _props.storeFlattened(tempFile);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create temp property file ",
+                    e);
+        }
+
+        return tempFile;
+    }
+
+    public static File createOutputPropsFile(final String id,
+            final String workingDir) {
+        System.err.println("cwd=" + workingDir);
+
+        File directory = new File(workingDir);
+        File tempFile = null;
+        try {
+            tempFile = File.createTempFile(id + "_output_", "_tmp", directory);
+        } catch (IOException e) {
+            System.err
+                    .println("Failed to create temp output property file :\n");
+            e.printStackTrace(System.err);
+            throw new RuntimeException(
+                    "Failed to create temp output property file ", e);
+        }
+        return tempFile;
+    }
+
+    public void generateProperties(final File outputFile) {
+        generatedPropeties = loadOutputFileProps(outputFile);
+    }
+
+}
diff --git a/src/java/azkaban/jobExecutor/JavaJob.java b/src/java/azkaban/jobExecutor/JavaJob.java
new file mode 100644
index 0000000..c951de1
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/JavaJob.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import java.io.File;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+public class JavaJob extends JavaProcessJob {
+
+	public static final String RUN_METHOD_PARAM = "method.run";
+	public static final String CANCEL_METHOD_PARAM = "method.cancel";
+	public static final String PROGRESS_METHOD_PARAM = "method.progress";
+
+	public static final String JOB_CLASS = "job.class";
+	public static final String DEFAULT_CANCEL_METHOD = "cancel";
+	public static final String DEFAULT_RUN_METHOD = "run";
+	public static final String DEFAULT_PROGRESS_METHOD = "getProgress";
+
+	private String _runMethod;
+	private String _cancelMethod;
+	private String _progressMethod;
+
+	private Object _javaObject = null;
+	private String props;
+
+	public JavaJob(Props props, Logger log) {
+		super(props, log);
+	}
+
+	@Override
+    protected List<String> getClassPaths() {
+        List<String> classPath = super.getClassPaths();
+        
+        classPath.add(getSourcePathFromClass(JavaJobRunnerMain.class));
+        String loggerPath = getSourcePathFromClass(org.apache.log4j.Logger.class);
+        if (!classPath.contains(loggerPath)) {
+            classPath.add(loggerPath);
+        }
+        
+        // Add hadoop home to classpath
+        String hadoopHome = System.getenv("HADOOP_HOME");
+        if (hadoopHome == null) {
+            info("HADOOP_HOME not set, using default hadoop config.");
+        } else {
+            info("Using hadoop config found in " + hadoopHome);
+            classPath.add(new File(hadoopHome, "conf").getPath());
+        }
+        
+        return classPath;
+	}
+
+	private static String getSourcePathFromClass(Class containedClass) {
+	    File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
+	    
+        if (!file.isDirectory() && file.getName().endsWith(".class")) {
+            String name = containedClass.getName();
+            StringTokenizer tokenizer = new StringTokenizer(name, ".");
+            while(tokenizer.hasMoreTokens()) {
+                tokenizer.nextElement();
+                
+                file = file.getParentFile();
+            }
+            
+            return file.getPath();  
+        }
+        else {
+            return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
+        }
+	}
+	
+    @Override
+    protected String getJavaClass() {
+        return JavaJobRunnerMain.class.getName();
+    }
+    
+	@Override
+	public String toString() {
+		return "JavaJob{" + "_runMethod='" + _runMethod + '\''
+				+ ", _cancelMethod='" + _cancelMethod + '\''
+				+ ", _progressMethod='" + _progressMethod + '\''
+				+ ", _javaObject=" + _javaObject + ", props="
+				+ props + '}';
+	}
+}
diff --git a/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java b/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java
new file mode 100644
index 0000000..eaff543
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.utils.SecurityUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class JavaJobRunnerMain {
+
+    public static final String JOB_CLASS = "job.class";
+    public static final String DEFAULT_RUN_METHOD = "run";
+    public static final String DEFAULT_CANCEL_METHOD = "cancel";
+
+    // This is the Job interface method to get the properties generated by the job.
+    public static final String GET_GENERATED_PROPERTIES_METHOD = "getJobGeneratedProperties";
+
+    public static final String CANCEL_METHOD_PARAM = "method.cancel";
+    public static final String RUN_METHOD_PARAM = "method.run";
+    public static final String PROPS_CLASS = "azkaban.utils.Props";
+
+    private static final Layout DEFAULT_LAYOUT = new PatternLayout("%p %m\n");
+
+    public final Logger _logger;
+
+    public String _cancelMethod;
+    public String _jobName;
+    public Object _javaObject;
+    private boolean _isFinished = false;
+
+    public static void main(String[] args) throws Exception {
+        @SuppressWarnings("unused")
+        JavaJobRunnerMain wrapper = new JavaJobRunnerMain();
+    }
+
+    public JavaJobRunnerMain() throws Exception {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                cancelJob();
+            }
+        }
+        );
+
+        try {
+            _jobName = System.getenv(ProcessJob.JOB_NAME_ENV);
+            String propsFile = System.getenv(ProcessJob.JOB_PROP_ENV);
+
+            _logger = Logger.getRootLogger();
+            _logger.removeAllAppenders();
+            ConsoleAppender appender = new ConsoleAppender(DEFAULT_LAYOUT);
+            appender.activateOptions();
+            _logger.addAppender(appender);
+
+            Properties prop = new Properties();
+            prop.load(new BufferedReader(new FileReader(propsFile)));
+
+            _logger.info("Running job " + _jobName);
+            String className = prop.getProperty(JOB_CLASS);
+            if(className == null) {
+                throw new Exception("Class name is not set.");
+            }
+            _logger.info("Class name " + className);
+
+            // Create the object using proxy
+            if (SecurityUtils.shouldProxy(prop)) {
+                _javaObject = getObjectAsProxyUser(prop, _logger, _jobName, className);
+            }
+            else {
+                _javaObject = getObject(_jobName, className, prop);
+            }
+            if(_javaObject == null) {
+                _logger.info("Could not create java object to run job: " + className);
+                throw new Exception("Could not create running object");
+            }
+
+            _cancelMethod = prop.getProperty(CANCEL_METHOD_PARAM, DEFAULT_CANCEL_METHOD);
+
+            final String runMethod = prop.getProperty(RUN_METHOD_PARAM, DEFAULT_RUN_METHOD);
+            _logger.info("Invoking method " + runMethod);
+
+            if(SecurityUtils.shouldProxy(prop)) {
+              _logger.info("Proxying enabled.");
+              runMethodAsProxyUser(prop, _javaObject, runMethod);
+            } else {
+              _logger.info("Proxy check failed, not proxying run.");
+              runMethod(_javaObject, runMethod);
+            }
+            _isFinished = true;
+
+            // Get the generated properties and store them to disk, to be read by ProcessJob.
+            try {
+                final Method generatedPropertiesMethod = _javaObject.getClass().getMethod(GET_GENERATED_PROPERTIES_METHOD, new Class<?>[]{});
+                Props outputGendProps = (Props) generatedPropertiesMethod.invoke(_javaObject, new Object[] {});
+                outputGeneratedProperties(outputGendProps);
+            } catch (NoSuchMethodException e) {
+                _logger.info(
+                        String.format(
+                                "Apparently there isn't a method[%s] on object[%s], using empty Props object instead.",
+                                GET_GENERATED_PROPERTIES_METHOD,
+                                _javaObject
+                        )
+                );
+                outputGeneratedProperties(new Props());
+            }
+        } catch (Exception e) {
+            _isFinished = true;
+            throw e;
+        }
+    }
+
+  private void runMethodAsProxyUser(Properties prop, final Object obj, final String runMethod) throws IOException, InterruptedException {
+    SecurityUtils.getProxiedUser(prop, _logger, new Configuration()).doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        runMethod(obj, runMethod);
+        return null;
+      }
+    });
+  }
+
+
+
+  private void runMethod(Object obj, String runMethod) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+    obj.getClass().getMethod(runMethod, new Class<?>[] {}).invoke(obj);
+  }
+
+  private void outputGeneratedProperties(Props outputProperties)
+    {
+        _logger.info("Outputting generated properties to " + ProcessJob.JOB_OUTPUT_PROP_FILE);
+        if (outputProperties == null) {
+            _logger.info("  no gend props");
+            return;
+        }
+        for (String key : outputProperties.getKeySet()) {
+            _logger.info("  gend prop " + key + " value:" + outputProperties.get(key));
+        }
+        String outputFileStr = System.getenv(ProcessJob.JOB_OUTPUT_PROP_FILE);
+        if (outputFileStr == null) {
+            return;
+        }
+
+        Map<String, String> properties = new LinkedHashMap<String, String>();
+        for (String key : outputProperties.getKeySet()) {
+            properties.put(key, outputProperties.get(key));
+        }
+
+        OutputStream writer = null;
+        try {
+            writer = new BufferedOutputStream(new FileOutputStream(outputFileStr));
+            // Manually serialize into JSON instead of adding org.json to external classpath
+            writer.write("{\n".getBytes());
+            for (Map.Entry<String, String> entry : properties.entrySet()) {
+                writer.write(String.format(
+                        "  '%s':'%s',\n",
+                        entry.getKey().replace("'", "\\'"),
+                        entry.getValue().replace("'", "\\'")
+                ).getBytes());
+            }
+            writer.write("}".getBytes());
+        }
+        catch (Exception e) {
+            new RuntimeException("Unable to store output properties to: " + outputFileStr);
+        }
+        finally {
+        	try {
+				writer.close();
+			} catch (IOException e) {			
+			}
+        }
+    }
+
+    public void cancelJob() {
+        if (_isFinished) {
+            return;
+        }
+        _logger.info("Attempting to call cancel on this job");
+        if (_javaObject != null) {
+            Method method = null;
+
+             try {
+                method = _javaObject.getClass().getMethod(_cancelMethod);
+            } catch(SecurityException e) {
+            } catch(NoSuchMethodException e) {
+            }
+
+            if (method != null)
+                try {
+                    method.invoke(_javaObject);
+                } catch(Exception e) {
+                    if (_logger != null) {
+                        _logger.error("Cancel method failed! ", e);
+                    }
+                }
+            else {
+                throw new RuntimeException("Job " + _jobName  + " does not have cancel method " + _cancelMethod);
+            }
+        }
+    }
+
+    private static Object getObjectAsProxyUser(final Properties prop, final Logger logger, final String jobName, final String className) throws Exception{
+        Object obj = SecurityUtils.getProxiedUser(prop, logger, new Configuration()).doAs(new PrivilegedExceptionAction<Object>() {
+            @Override
+            public Object run() throws Exception {
+                return getObject(jobName, className, prop);
+            }
+          });
+        
+        return obj;
+    }
+    
+    private static Object getObject(String jobName, String className, Properties properties)
+            throws Exception {
+        Class<?> runningClass = JavaJobRunnerMain.class.getClassLoader().loadClass(className);
+
+        if(runningClass == null) {
+            throw new Exception("Class " + className + " was not found. Cannot run job.");
+        }
+
+        Class<?> propsClass = JavaJobRunnerMain.class.getClassLoader().loadClass(PROPS_CLASS);
+
+        Object obj = null;
+        if(propsClass != null && getConstructor(runningClass, String.class, propsClass) != null) {
+            // This case covers the use of azkaban.common.utils.Props with the
+            Constructor<?> con = getConstructor(propsClass, propsClass, Properties[].class);
+            Object props = con.newInstance(null, new Properties[] { properties });
+            obj = getConstructor(runningClass, String.class, propsClass).newInstance(jobName, props);
+        } else if(getConstructor(runningClass, String.class, Properties.class) != null) {
+            obj = getConstructor(runningClass, String.class, Properties.class).newInstance(jobName,
+                                                                                           properties);
+        } else if(getConstructor(runningClass, String.class) != null) {
+            obj = getConstructor(runningClass, String.class).newInstance(jobName);
+        } else if(getConstructor(runningClass) != null) {
+            obj = getConstructor(runningClass).newInstance();
+        }
+        return obj;
+    }
+
+    private static Constructor<?> getConstructor(Class<?> c, Class<?>... args) {
+        try {
+            Constructor<?> cons = c.getConstructor(args);
+            return cons;
+        } catch(NoSuchMethodException e) {
+            return null;
+        }
+    }
+
+}
diff --git a/src/java/azkaban/jobExecutor/JavaProcessJob.java b/src/java/azkaban/jobExecutor/JavaProcessJob.java
new file mode 100644
index 0000000..bd41935
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+public class JavaProcessJob extends ProcessJob {
+//    private static final Logger log = Logger
+//            .getLogger(JavaProcessJob.class);
+//    private Logger log;
+    
+	public static final String CLASSPATH = "classpath";
+	public static final String GLOBAL_CLASSPATH = "global.classpaths";
+	public static final String JAVA_CLASS = "java.class";
+	public static final String INITIAL_MEMORY_SIZE = "Xms";
+	public static final String MAX_MEMORY_SIZE = "Xmx";
+	public static final String MAIN_ARGS = "main.args";
+	public static final String JVM_PARAMS = "jvm.args";
+    public static final String GLOBAL_JVM_PARAMS = "global.jvm.args";
+    
+	public static final String DEFAULT_INITIAL_MEMORY_SIZE = "64M";
+	public static final String DEFAULT_MAX_MEMORY_SIZE = "256M";
+
+	public static String JAVA_COMMAND = "java";
+
+	public JavaProcessJob(Props prop, Logger logger) {
+		super(prop, logger);
+	}
+
+	@Override
+	protected List<String> getCommandList() {
+		ArrayList<String> list = new ArrayList<String>();
+		list.add(createCommandLine());
+		return list;
+	}
+
+	protected String createCommandLine() {
+		String command = JAVA_COMMAND + " ";
+		command += getJVMArguments() + " ";
+		command += "-Xms" + getInitialMemorySize() + " ";
+		command += "-Xmx" + getMaxMemorySize() + " ";
+		command += "-cp " + createArguments(getClassPaths(), ":") + " ";
+		command += getJavaClass() + " ";
+		command += getMainArguments();
+
+		return command;
+	}
+
+	protected String getJavaClass() {
+		return getProps().getString(JAVA_CLASS);
+	}
+
+	protected String getClassPathParam() {
+		List<String> classPath = getClassPaths();
+		if (classPath == null || classPath.size() == 0) {
+			return "";
+		}
+
+		return "-cp " + createArguments(classPath, ":") + " ";
+	}
+
+	protected List<String> getClassPaths() {
+		List<String> classPaths = getProps().getStringList(CLASSPATH, null, ",");
+
+	    ArrayList<String> classpathList = new ArrayList<String>();
+	    // Adding global properties used system wide.
+        if (getProps().containsKey(GLOBAL_CLASSPATH)) {
+            List<String> globalClasspath = getProps().getStringList(GLOBAL_CLASSPATH);
+            for (String global: globalClasspath) {
+                getLog().info("Adding to global classpath:" + global);
+                classpathList.add(global);
+            }
+        }
+		
+		if (classPaths == null) {
+			File path = new File(getPath());
+			File parent = path.getParentFile();
+			
+			for (File file : parent.listFiles()) {
+				if (file.getName().endsWith(".jar")) {
+	                //log.info("Adding to classpath:" + file.getName());
+	                classpathList.add(file.getName());
+				}
+			}
+		}
+		else {
+		    classpathList.addAll(classPaths);
+		}
+
+		return classpathList;
+	}
+
+	protected String getInitialMemorySize() {
+		return getProps().getString(INITIAL_MEMORY_SIZE,
+				DEFAULT_INITIAL_MEMORY_SIZE);
+	}
+
+	protected String getMaxMemorySize() {
+		return getProps().getString(MAX_MEMORY_SIZE, DEFAULT_MAX_MEMORY_SIZE);
+	}
+
+	protected String getMainArguments() {
+		return getProps().getString(MAIN_ARGS, "");
+	}
+
+    protected String getJVMArguments() {
+        String globalJVMArgs = getProps().getString(GLOBAL_JVM_PARAMS, null);
+        
+        if (globalJVMArgs == null) {
+            return getProps().getString(JVM_PARAMS, "");
+        }
+
+        return globalJVMArgs + " " + getProps().getString(JVM_PARAMS, "");
+    }
+
+	protected String createArguments(List<String> arguments, String separator) {
+		if (arguments != null && arguments.size() > 0) {
+			String param = "";
+			for (String arg : arguments) {
+				param += arg + separator;
+			}
+
+			return param.substring(0, param.length() - 1);
+		}
+
+		return "";
+	}
+}
diff --git a/src/java/azkaban/jobExecutor/Job.java b/src/java/azkaban/jobExecutor/Job.java
new file mode 100644
index 0000000..a7c59e5
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/Job.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import azkaban.utils.Props;
+
+
+
+/**
+ * This interface defines a Raw Job interface. Each job defines
+ * <ul>
+ * <li>Job Type : {HADOOP, UNIX, JAVA, SUCCESS_TEST, CONTROLLER}</li>
+ * <li>Job ID/Name : {String}</li>
+ * <li>Arguments: Key/Value Map for Strings</li>
+ * </ul>
+ * 
+ * A job is required to have a constructor Job(String jobId, Props props)
+ */
+
+public interface Job {
+
+    /**
+     * Returns a unique(should be checked in xml) string name/id for the Job.
+     * 
+     * @return
+     */
+    public String getId();
+
+    /**
+     * Run the job. In general this method can only be run once. Must either
+     * succeed or throw an exception.
+     */
+    public void run() throws Exception;
+
+    /**
+     * Best effort attempt to cancel the job.
+     * 
+     * @throws Exception If cancel fails
+     */
+    public void cancel() throws Exception;
+
+    /**
+     * Returns a progress report between [0 - 1.0] to indicate the percentage
+     * complete
+     * 
+     * @throws Exception If getting progress fails
+     */
+    public double getProgress() throws Exception;
+    
+    /**
+     * Get the generated properties from this job.
+     * @return
+     */
+    public Props getJobGeneratedProperties();
+    
+    /**
+     * Determine if the job was cancelled.
+     * @return
+     */
+    public boolean isCanceled();
+}
diff --git a/src/java/azkaban/jobExecutor/LongArgJob.java b/src/java/azkaban/jobExecutor/LongArgJob.java
new file mode 100644
index 0000000..3bee6d5
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/LongArgJob.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.utils.process.AzkabanProcess;
+import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
+
+
+/**
+ * A job that passes all the job properties as command line arguments in "long" format, 
+ * e.g. --key1 value1 --key2 value2 ...
+ * 
+ * @author jkreps
+ *
+ */
+public abstract class LongArgJob extends AbstractProcessJob {
+    
+    private static final long KILL_TIME_MS = 5000;
+    private final AzkabanProcessBuilder builder;
+    private volatile AzkabanProcess process;
+
+    public LongArgJob(String[] command, Props prop, Logger log) {
+        this(command, prop, log, new HashSet<String>(0));
+    }
+    
+    public LongArgJob(String[] command, Props prop, Logger log, Set<String> suppressedKeys) {
+        //super(command, desc);
+         super(prop, log);
+        //String cwd = descriptor.getProps().getString(WORKING_DIR, new File(descriptor.getFullPath()).getParent());
+       
+        this.builder = new AzkabanProcessBuilder(command).
+            setEnv(getProps().getMapByPrefix(ENV_PREFIX)).
+            setWorkingDir(getCwd()).
+            setLogger(getLog());
+        appendProps(suppressedKeys);
+    }
+
+    public void run() throws Exception {
+        
+        resolveProps();
+        
+        long startMs = System.currentTimeMillis();
+        info("Command: " + builder.getCommandString());
+        if(builder.getEnv().size() > 0)
+            info("Environment variables: " + builder.getEnv());
+        info("Working directory: " + builder.getWorkingDir());
+        
+       File [] propFiles = initPropsFiles( );
+       //System.err.println("outputfile=" + propFiles[1]);
+        
+        boolean success = false;
+        this.process = builder.build();
+        try {
+            this.process.run();
+            success = true;
+        }
+        catch   (Exception e) {
+            for (File file: propFiles)  if (file != null && file.exists()) file.delete();
+            throw new RuntimeException (e);
+        }
+        finally {
+            this.process = null;
+            info("Process completed " + (success? "successfully" : "unsuccessfully") + " in " + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
+        }
+        
+        // Get the output properties from this job.
+        generateProperties(propFiles[1]);
+                
+        for (File file: propFiles)
+            if (file != null && file.exists()) file.delete();
+    }
+    
+    
+    
+    /**
+     * This gives access to the process builder used to construct the process. An overriding class can use this to 
+     * add to the command being executed.
+     */
+    protected AzkabanProcessBuilder getBuilder() {
+        return this.builder;
+    }
+    
+    @Override
+    public void cancel() throws InterruptedException {
+        if(process == null)
+            throw new IllegalStateException("Not started.");
+        boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
+        if(!killed) {
+            warn("Kill with signal TERM failed. Killing with KILL signal.");
+            process.hardKill();
+        }
+    }
+    
+    @Override
+    public double getProgress() {
+        return process != null && process.isComplete()? 1.0 : 0.0;
+    }
+
+    private void appendProps(Set<String> suppressed) {
+        AzkabanProcessBuilder builder = this.getBuilder();
+        Props props = getProps();
+        for(String key: props.getKeySet())
+            if(!suppressed.contains(key))
+                builder.addArg("--" + key, props.get(key));
+    }
+   
+
+}
diff --git a/src/java/azkaban/jobExecutor/NoopJob.java b/src/java/azkaban/jobExecutor/NoopJob.java
new file mode 100644
index 0000000..e364b2f
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/NoopJob.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+/**
+ *
+ */
+public class NoopJob implements Job
+{
+    public NoopJob(Props props, Logger log)
+    {
+        
+    }
+
+    @Override
+    public String getId()
+    {
+        return "Azkaban!! -- " + getClass().getName();
+    }
+
+    @Override
+    public void run() throws Exception
+    {
+    }
+
+    @Override
+    public void cancel() throws Exception
+    {
+    }
+
+    @Override
+    public double getProgress() throws Exception
+    {
+        return 0;
+    }
+
+    @Override
+    public Props getJobGeneratedProperties()
+    {
+        return new Props();
+    }
+
+    @Override
+    public boolean isCanceled() {
+        return false;
+    }
+}
diff --git a/src/java/azkaban/jobExecutor/PigProcessJob.java b/src/java/azkaban/jobExecutor/PigProcessJob.java
new file mode 100644
index 0000000..3a5a15d
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/PigProcessJob.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import azkaban.jobExecutor.utils.StringUtils;
+import azkaban.utils.Props;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.log4j.Logger;
+
+import static azkaban.jobExecutor.utils.SecurityUtils.PROXY_KEYTAB_LOCATION;
+import static azkaban.jobExecutor.utils.SecurityUtils.PROXY_USER;
+import static azkaban.jobExecutor.utils.SecurityUtils.TO_PROXY;
+import static azkaban.jobExecutor.utils.SecurityUtils.shouldProxy;
+import static azkaban.jobExecutor.SecurePigWrapper.OBTAIN_BINARY_TOKEN;
+
+public class PigProcessJob extends JavaProcessJob {
+    
+	public static final String PIG_SCRIPT = "pig.script";
+	public static final String UDF_IMPORT = "udf.import.list";
+	public static final String PIG_PARAM_PREFIX = "param.";
+	public static final String PIG_PARAM_FILES = "paramfile";
+	public static final String HADOOP_UGI = "hadoop.job.ugi";
+	public static final String DEBUG = "debug";
+
+  public static final String PIG_JAVA_CLASS = "org.apache.pig.Main";
+  public static final String SECURE_PIG_WRAPPER = "azkaban.jobExecutor.SecurePigWrapper";
+
+	public PigProcessJob(Props props, Logger log) {
+		super(props, log);
+	}
+
+	@Override
+	protected String getJavaClass() {
+    return shouldProxy(getProps().toProperties()) ? SECURE_PIG_WRAPPER : PIG_JAVA_CLASS;
+	}
+
+	@Override
+	protected String getJVMArguments() {
+		String args = super.getJVMArguments();
+
+		List<String> udfImport = getUDFImportList();
+		if (udfImport != null) {
+			args += " -Dudf.import.list=" + super.createArguments(udfImport, ":");
+		}
+
+		String hadoopUGI = getHadoopUGI();
+		if (hadoopUGI != null) {
+			args += " -Dhadoop.job.ugi=" + hadoopUGI;
+		}
+
+    if(shouldProxy(getProps().toProperties())) {
+      info("Setting up secure proxy info for child process");
+      String secure;
+      Properties p = getProps().toProperties();
+      secure = " -D" + PROXY_USER + "=" + p.getProperty(PROXY_USER);
+      secure += " -D" + PROXY_KEYTAB_LOCATION + "=" + p.getProperty(PROXY_KEYTAB_LOCATION);
+      secure += " -D" + TO_PROXY + "=" + p.getProperty(TO_PROXY);
+      String extraToken = p.getProperty(OBTAIN_BINARY_TOKEN);
+      if(extraToken != null) {
+        secure += " -D" + OBTAIN_BINARY_TOKEN + "=" + extraToken;
+      }
+      info("Secure settings = " + secure);
+      args += secure;
+    } else {
+      info("Not setting up secure proxy info for child process");
+    }
+
+		return args;
+	}
+
+	@Override
+	protected String getMainArguments() {
+		ArrayList<String> list = new ArrayList<String>();
+		Map<String, String> map = getPigParams();
+		if (map != null) {
+			for (Map.Entry<String, String> entry : map.entrySet()) {
+				list.add("-param " + StringUtils.shellQuote(entry.getKey() + "=" + entry.getValue(), StringUtils.SINGLE_QUOTE));
+			}
+		}
+
+		List<String> paramFiles = getPigParamFiles();
+		if (paramFiles != null) {
+			for (String paramFile : paramFiles) {
+				list.add("-param_file " + paramFile);
+			}
+		}
+		
+		if (getDebug()) {
+			list.add("-debug");
+		}
+
+		list.add(getScript());
+
+		return org.apache.commons.lang.StringUtils.join(list, " ");
+	}
+
+	@Override
+	protected List<String> getClassPaths() {
+		List<String> classPath = super.getClassPaths();
+
+		// Add hadoop home setting.
+		String hadoopHome = System.getenv("HADOOP_HOME");
+		if (hadoopHome == null) {
+			info("HADOOP_HOME not set, using default hadoop config.");
+		} else {
+			info("Using hadoop config found in " + hadoopHome);
+			classPath.add(new File(hadoopHome, "conf").getPath());
+		}
+
+		if(shouldProxy(getProps().toProperties())) {
+	        classPath.add(getSourcePathFromClass(SecurePigWrapper.class));
+		}
+		return classPath;
+	}
+
+	protected boolean getDebug() {
+		return getProps().getBoolean(DEBUG, false);
+	}
+
+	protected String getScript() {
+		return getProps().getString(PIG_SCRIPT, getJobName() + ".pig");
+	}
+
+	protected List<String> getUDFImportList() {
+		return getProps().getStringList(UDF_IMPORT, null, ",");
+	}
+
+	protected String getHadoopUGI() {
+		return getProps().getString(HADOOP_UGI, null);
+	}
+	
+	protected Map<String, String> getPigParams() {
+		return getProps().getMapByPrefix(PIG_PARAM_PREFIX);
+	}
+
+	protected List<String> getPigParamFiles() {
+		return getProps().getStringList(PIG_PARAM_FILES, null, ",");
+	}
+	
+	
+	private static String getSourcePathFromClass(Class containedClass) {
+	    File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+	    if (!file.isDirectory() && file.getName().endsWith(".class")) {
+	        String name = containedClass.getName();
+	        StringTokenizer tokenizer = new StringTokenizer(name, ".");
+	        while(tokenizer.hasMoreTokens()) {
+	            tokenizer.nextElement();
+	            file = file.getParentFile();
+	        }
+	            
+	        return file.getPath();  
+	    }
+	    else {
+	        return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
+	    }
+	}
+}
diff --git a/src/java/azkaban/jobExecutor/ProcessJob.java b/src/java/azkaban/jobExecutor/ProcessJob.java
new file mode 100644
index 0000000..06050c8
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/ProcessJob.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+/*
+ * A job that runs a simple unix command
+ * 
+ * @author jkreps
+ * 
+ */
+public class ProcessJob extends AbstractProcessJob implements Job {
+
+    public static final String COMMAND = "command";
+    public static final int CLEAN_UP_TIME_MS = 1000;
+
+    private volatile Process _process;
+    private volatile boolean _isComplete;
+    private volatile boolean _isCancelled;
+
+    public ProcessJob(final Props props, final Logger log) {
+        super(props, log);
+    }
+
+    @Override
+    public void run() {
+        synchronized (this) {
+            _isCancelled = false;
+        }
+        resolveProps();
+
+        // Sets a list of all the commands that need to be run.
+        List<String> commands = getCommandList();
+        info(commands.size() + " commands to execute.");
+
+        File[] propFiles = initPropsFiles();
+
+        // System.err.println("in process job outputFile=" +propFiles[1]);
+
+        // For each of the jobs, set up a process and run them.
+        for (String command : commands) {
+            info("Executing command: " + command);
+            String[] cmdPieces = partitionCommandLine(command);
+
+            ProcessBuilder builder = new ProcessBuilder(cmdPieces);
+
+            builder.directory(new File(getCwd()));
+            builder.environment().putAll(getEnvironmentVariables());
+
+            try {
+                _process = builder.start();
+            } catch (IOException e) {
+                for (File file : propFiles) {
+                    if (file != null && file.exists()) {
+                        file.delete();
+                    }
+                }
+                throw new RuntimeException(e);
+            }
+            LoggingGobbler outputGobbler = new LoggingGobbler(
+                    new InputStreamReader(_process.getInputStream()),
+                    Level.INFO);
+            LoggingGobbler errorGobbler = new LoggingGobbler(
+                    new InputStreamReader(_process.getErrorStream()),
+                    Level.ERROR);
+
+            int processId = getProcessId();
+            if (processId == 0) {
+                info("Spawned thread. Unknowned processId");
+            } else {
+                info("Spawned thread with processId " + processId);
+            }
+            outputGobbler.start();
+            errorGobbler.start();
+            int exitCode = -999;
+            try {
+                exitCode = _process.waitFor();
+
+                _isComplete = true;
+                if (exitCode != 0) {
+                    for (File file : propFiles) {
+                        if (file != null && file.exists()) {
+                            file.delete();
+                        }
+                    }
+                    throw new RuntimeException(
+                            "Processes ended with exit code " + exitCode + ".");
+                }
+
+                // try to wait for everything to get logged out before exiting
+                outputGobbler.join(1000);
+                errorGobbler.join(1000);
+            } catch (InterruptedException e) {
+            } finally {
+                outputGobbler.close();
+                errorGobbler.close();
+            }
+        }
+
+        // Get the output properties from this job.
+        generateProperties(propFiles[1]);
+
+        for (File file : propFiles) {
+            if (file != null && file.exists()) {
+                file.delete();
+            }
+        }
+
+    }
+
+    protected List<String> getCommandList() {
+        List<String> commands = new ArrayList<String>();
+        commands.add(_props.getString(COMMAND));
+        for (int i = 1; _props.containsKey(COMMAND + "." + i); i++) {
+            commands.add(_props.getString(COMMAND + "." + i));
+        }
+
+        return commands;
+    }
+
+    @Override
+    public void cancel() throws Exception {
+        if (_process != null) {
+            int processId = getProcessId();
+            if (processId != 0) {
+                warn("Attempting to kill the process " + processId);
+                try {
+                    Runtime.getRuntime().exec("kill " + processId);
+                    synchronized (this) {
+                        wait(CLEAN_UP_TIME_MS);
+                    }
+                } catch (InterruptedException e) {
+                    // Do nothing. We don't really care.
+                }
+                if (!_isComplete) {
+                    error("After "
+                            + CLEAN_UP_TIME_MS
+                            + " ms, the job hasn't terminated. Will force terminate the job.");
+                }
+            } else {
+                info("Cound not get process id");
+            }
+
+            if (!_isComplete) {
+                warn("Force kill the process");
+                _process.destroy();
+            }
+            synchronized (this) {
+                _isCancelled = true;
+            }
+        }
+    }
+
+    public int getProcessId() {
+        int processId = 0;
+
+        try {
+            Field f = _process.getClass().getDeclaredField("pid");
+            f.setAccessible(true);
+
+            processId = f.getInt(_process);
+        } catch (Throwable e) {
+        }
+
+        return processId;
+    }
+
+    @Override
+    public double getProgress() {
+        return _isComplete ? 1.0 : 0.0;
+    }
+
+    private class LoggingGobbler extends Thread {
+
+        private final BufferedReader _inputReader;
+        private final Level _loggingLevel;
+
+        public LoggingGobbler(final InputStreamReader inputReader,
+                final Level level) {
+            _inputReader = new BufferedReader(inputReader);
+            _loggingLevel = level;
+        }
+
+        public void close() {
+            if (_inputReader != null) {
+                try {
+                    _inputReader.close();
+                } catch (IOException e) {
+                    error("Error cleaning up logging stream reader:", e);
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (!Thread.currentThread().isInterrupted()) {
+                    String line = _inputReader.readLine();
+                    if (line == null) {
+                        return;
+                    }
+
+                    logMessage(line);
+                }
+            } catch (IOException e) {
+                error("Error reading from logging stream:", e);
+            }
+        }
+
+        private void logMessage(final String message) {
+            if (message.startsWith(Level.DEBUG.toString())) {
+                String newMsg = message.substring(Level.DEBUG.toString()
+                        .length());
+                getLog().debug(newMsg);
+            } else if (message.startsWith(Level.ERROR.toString())) {
+                String newMsg = message.substring(Level.ERROR.toString()
+                        .length());
+                getLog().error(newMsg);
+            } else if (message.startsWith(Level.INFO.toString())) {
+                String newMsg = message.substring(Level.INFO.toString()
+                        .length());
+                getLog().info(newMsg);
+            } else if (message.startsWith(Level.WARN.toString())) {
+                String newMsg = message.substring(Level.WARN.toString()
+                        .length());
+                getLog().warn(newMsg);
+            } else if (message.startsWith(Level.FATAL.toString())) {
+                String newMsg = message.substring(Level.FATAL.toString()
+                        .length());
+                getLog().fatal(newMsg);
+            } else if (message.startsWith(Level.TRACE.toString())) {
+                String newMsg = message.substring(Level.TRACE.toString()
+                        .length());
+                getLog().trace(newMsg);
+            } else {
+                getLog().log(_loggingLevel, message);
+            }
+
+        }
+    }
+
+    @Override
+    public Props getProps() {
+        return _props;
+    }
+
+    public String getPath() {
+        return _jobPath;
+    }
+
+    public String getJobName() {
+        return getId();
+    }
+
+
+    /**
+     * Splits the command into a unix like command line structure. Quotes and
+     * single quotes are treated as nested strings.
+     * 
+     * @param command
+     * @return
+     */
+    public static String[] partitionCommandLine(final String command) {
+        ArrayList<String> commands = new ArrayList<String>();
+
+        int index = 0;
+
+        StringBuffer buffer = new StringBuffer(command.length());
+
+        boolean isApos = false;
+        boolean isQuote = false;
+        while (index < command.length()) {
+            char c = command.charAt(index);
+
+            switch (c) {
+            case ' ':
+                if (!isQuote && !isApos) {
+                    String arg = buffer.toString();
+                    buffer = new StringBuffer(command.length() - index);
+                    if (arg.length() > 0) {
+                        commands.add(arg);
+                    }
+                } else {
+                    buffer.append(c);
+                }
+                break;
+            case '\'':
+                if (!isQuote) {
+                    isApos = !isApos;
+                } else {
+                    buffer.append(c);
+                }
+                break;
+            case '"':
+                if (!isApos) {
+                    isQuote = !isQuote;
+                } else {
+                    buffer.append(c);
+                }
+                break;
+            default:
+                buffer.append(c);
+            }
+
+            index++;
+        }
+
+        if (buffer.length() > 0) {
+            String arg = buffer.toString();
+            commands.add(arg);
+        }
+
+        return commands.toArray(new String[commands.size()]);
+    }
+
+    @Override
+    public synchronized boolean isCanceled() {
+        return _isCancelled;
+    }
+
+}
diff --git a/src/java/azkaban/jobExecutor/PythonJob.java b/src/java/azkaban/jobExecutor/PythonJob.java
new file mode 100644
index 0000000..ad83c38
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/PythonJob.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+
+import azkaban.utils.Props;
+
+
+
+public class PythonJob extends LongArgJob {
+    
+    private static final String PYTHON_BINARY_KEY = "python";
+    private static final String SCRIPT_KEY = "script";
+
+
+    public PythonJob(Props props, Logger log) {
+        super(new String[]{props.getString(PYTHON_BINARY_KEY, "python"), 
+                           props.getString(SCRIPT_KEY)}, 
+              props,
+              log, 
+              ImmutableSet.of(PYTHON_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
+    }
+
+
+
+    
+}
diff --git a/src/java/azkaban/jobExecutor/RubyJob.java b/src/java/azkaban/jobExecutor/RubyJob.java
new file mode 100644
index 0000000..7f7febc
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/RubyJob.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+import com.google.common.collect.ImmutableSet;
+
+
+public class RubyJob extends LongArgJob {
+
+    private static final String RUBY_BINARY_KEY = "ruby";
+    private static final String SCRIPT_KEY = "script";
+
+    public RubyJob(Props props, Logger log) {
+        super(new String[]{props.getString(RUBY_BINARY_KEY, "ruby"), 
+                           props.getString(SCRIPT_KEY)}, 
+              props, 
+              log, 
+              ImmutableSet.of(RUBY_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
+    }
+
+   
+    
+}
diff --git a/src/java/azkaban/jobExecutor/ScriptJob.java b/src/java/azkaban/jobExecutor/ScriptJob.java
new file mode 100644
index 0000000..1879afe
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/ScriptJob.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+
+import azkaban.utils.Props;
+
+/**
+ * A script job issues a command of the form
+ *    [EXECUTABLE] [SCRIPT] --key1 val1 ... --key2 val2
+ *   executable -- the interpretor command to execute
+ *   script -- the script to pass in (requried)
+ * 
+ * @author jkreps
+ *
+ */
+public class ScriptJob extends LongArgJob {
+
+    private static final String DEFAULT_EXECUTABLE_KEY = "executable";
+    private static final String SCRIPT_KEY = "script";
+    
+    public ScriptJob(Props props, Logger log) {
+        super(new String[] {props.getString(DEFAULT_EXECUTABLE_KEY), props.getString(SCRIPT_KEY)}, 
+              props, 
+              log, 
+              ImmutableSet.of(DEFAULT_EXECUTABLE_KEY, SCRIPT_KEY, JOB_TYPE));
+    }
+ 
+
+}
diff --git a/src/java/azkaban/jobExecutor/SecurePigWrapper.java b/src/java/azkaban/jobExecutor/SecurePigWrapper.java
new file mode 100644
index 0000000..3010089
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/SecurePigWrapper.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2011 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+import org.apache.pig.Main;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+
+import static azkaban.jobExecutor.utils.SecurityUtils.getProxiedUser;
+
+public class SecurePigWrapper {
+
+  public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
+  public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
+
+  public static void main(final String[] args) throws IOException, InterruptedException {
+    final Logger logger = Logger.getRootLogger();
+    final Properties p = System.getProperties();
+    final Configuration conf = new Configuration();
+
+    getProxiedUser(p, logger, conf).doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        prefetchToken();
+        Main.main(args);
+        return null;
+      }
+
+      // For Pig jobs that need to do extra communication with the JobTracker,
+      // it's necessary to pre-fetch a token and include it in the credentials
+      // cache
+      private void prefetchToken() throws InterruptedException, IOException {
+        String shouldPrefetch = p.getProperty(OBTAIN_BINARY_TOKEN);
+        if(shouldPrefetch != null && shouldPrefetch.equals("true") ) {
+          logger.info("Pre-fetching token");
+          Job job = new Job(conf, "totally phony, extremely fake, not real job");
+
+          JobConf jc = new JobConf(conf);
+          JobClient jobClient = new JobClient(jc);
+          logger.info("Pre-fetching: Got new JobClient: " + jc);
+          Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("hi"));
+          job.getCredentials().addToken(new Text("howdy"), mrdt);
+
+          File temp = File.createTempFile("mr-azkaban", ".token");
+          temp.deleteOnExit();
+
+          FileOutputStream fos = null;
+          DataOutputStream dos = null;
+          try {
+            fos = new FileOutputStream(temp);
+            dos = new DataOutputStream(fos);
+            job.getCredentials().writeTokenStorageToStream(dos);
+          } finally {
+            if(dos != null) {
+              dos.close();
+            }
+            if(fos != null) {
+              fos.close();
+            }
+          }
+          logger.info("Setting " + MAPREDUCE_JOB_CREDENTIALS_BINARY + " to " + temp.getAbsolutePath());
+          System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, temp.getAbsolutePath());
+        } else {
+          logger.info("Not pre-fetching token");
+        }
+      }
+    });
+
+  }
+}
+
diff --git a/src/java/azkaban/jobExecutor/utils/InitErrorJob.java b/src/java/azkaban/jobExecutor/utils/InitErrorJob.java
new file mode 100644
index 0000000..1766287
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/InitErrorJob.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor.utils;
+
+import org.apache.log4j.Logger;
+
+import azkaban.jobExecutor.AbstractJob;
+
+/**
+ * this job is used to throw out exception caught in initialization stage
+ * 
+ * @author lguo
+ *
+ */
+public class InitErrorJob extends AbstractJob
+{
+
+  private Exception exception;
+  
+  public InitErrorJob (String id, Exception e) {
+     super(id, Logger.getLogger(AbstractJob.class));
+     exception = e;
+  }
+  
+  @Override
+  public void run() throws Exception
+  {
+    throw exception;
+  }
+
+}
diff --git a/src/java/azkaban/jobExecutor/utils/JobExecutionException.java b/src/java/azkaban/jobExecutor/utils/JobExecutionException.java
new file mode 100644
index 0000000..ffb855d
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/JobExecutionException.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor.utils;
+
+public class JobExecutionException extends RuntimeException {
+
+    private final static long serialVersionUID = 1;
+
+    public JobExecutionException(String message) {
+        super(message);
+    }
+
+    public JobExecutionException(Throwable cause) {
+        super(cause);
+    }
+
+    public JobExecutionException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
\ No newline at end of file
diff --git a/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java b/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java
new file mode 100644
index 0000000..b93196c
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor.utils;
+
+import azkaban.jobExecutor.JavaJob;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.jobExecutor.Job;
+import azkaban.jobExecutor.NoopJob;
+import azkaban.jobExecutor.PigProcessJob;
+import azkaban.jobExecutor.ProcessJob;
+import azkaban.jobExecutor.PythonJob;
+import azkaban.jobExecutor.RubyJob;
+import azkaban.jobExecutor.ScriptJob;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+import azkaban.jobExecutor.utils.JobExecutionException;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class JobWrappingFactory 
+{
+	private static JobWrappingFactory jobWrappingFactory = null;
+	
+    //private String _defaultType;
+    private Map<String, Class<? extends Job>> _jobToClass;
+
+    private JobWrappingFactory(final Map<String, Class<? extends Job>> jobTypeToClassMap
+    )
+    {
+        //this._defaultType = defaultType;
+        this._jobToClass = jobTypeToClassMap;
+    }
+    
+    public static JobWrappingFactory getJobWrappingFactory ()
+    {
+    	if(jobWrappingFactory == null)
+    	{
+    		jobWrappingFactory = new JobWrappingFactory(new ImmutableMap.Builder<String, Class<? extends Job>>()
+                    .put("java", JavaJob.class)
+                    .put("command", ProcessJob.class)
+                    .put("javaprocess", JavaProcessJob.class)
+                    .put("pig", PigProcessJob.class)
+                    .put("propertyPusher", NoopJob.class)
+                    .put("python", PythonJob.class)
+                    .put("ruby", RubyJob.class)
+                    .put("script", ScriptJob.class).build());
+    	}
+    	return jobWrappingFactory;
+    }
+
+    public void registerJobExecutors(final Map<String, Class<? extends Job>> newJobExecutors)
+    {
+    	_jobToClass = newJobExecutors;
+    }
+    
+    public Job buildJobExecutor(Props props, Logger logger)
+    {
+      
+      Job job;
+      try {
+        String jobType = props.getString("type");
+        if (jobType == null || jobType.length() == 0) {
+           /*throw an exception when job name is null or empty*/
+          throw new JobExecutionException (
+                                           String.format("The 'type' parameter for job[%s] is null or empty", props, logger));
+        }
+        Class<? extends Object> executorClass = _jobToClass.get(jobType);
+
+        if (executorClass == null) {
+            throw new JobExecutionException(
+                    String.format(
+                            "Could not construct job[%s] of type[%s].",
+                            props,
+                            jobType
+                    ));
+        }
+        
+        job = (Job)Utils.callConstructor(executorClass, props, logger);
+
+      }
+      catch (Exception e) {
+          job = new InitErrorJob(props.getString("jobId"), e);
+      }
+
+//        // wrap up job in logging proxy
+//        if (jobDescriptor.getLoggerPattern() != null) {
+//        	job = new LoggingJob(_logDir, job, job.getId(), jobDescriptor.getLoggerPattern());	
+//        }
+//        else {
+//        	job = new LoggingJob(_logDir, job, job.getId());	
+//        }
+        
+        return job;
+    }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/jobExecutor/utils/JSONToJava.java b/src/java/azkaban/jobExecutor/utils/JSONToJava.java
new file mode 100644
index 0000000..19b9e87
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/JSONToJava.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor.utils;
+
+import com.google.common.base.Function;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class JSONToJava implements Function<JSONObject, Map<String, Object>>
+{
+    @Override
+    public Map<String, Object> apply(JSONObject jsonObject)
+    {
+        Map<String, Object> retVal = new HashMap<String, Object>();
+
+        Iterator keyIterator = jsonObject.keys();
+        while (keyIterator.hasNext()) {
+            String key = keyIterator.next().toString();
+
+            try {
+                retVal.put(key, dispatchCorrectly(jsonObject.get(key)));
+            }
+            catch (JSONException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        return retVal;
+    }
+
+    public List<Object> apply(JSONArray jsonArray)
+    {
+        List<Object> retVal = new ArrayList<Object>(jsonArray.length());
+
+        for (int i = 0; i < jsonArray.length(); ++i) {
+            try {
+                retVal.add(dispatchCorrectly(jsonArray.get(i)));
+            }
+            catch (JSONException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        return retVal;
+    }
+
+    public Object dispatchCorrectly(Object o)
+    {
+        if (o instanceof JSONObject) {
+            return apply((JSONObject) o);
+        }
+        else if (o instanceof JSONArray) {
+            return apply((JSONArray) o);
+        }
+        else {
+            return o.toString();
+        }
+    }
+}
diff --git a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
new file mode 100644
index 0000000..f22923d
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor.utils.process;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import azkaban.utils.CircularBuffer;
+
+import com.google.common.base.Joiner;
+
+/**
+ * A less shitty version of java.lang.Process.
+ * 
+ * Output is read by seperate threads to avoid deadlock and logged to log4j
+ * loggers.
+ * 
+ * @author jkreps
+ * 
+ */
+public class AzkabanProcess {
+
+    private final String workingDir;
+    private final List<String> cmd;
+    private final Map<String, String> env;
+    private final Logger logger;
+    private final CountDownLatch startupLatch;
+    private final CountDownLatch completeLatch;
+    private volatile int processId;
+    private volatile Process process;
+
+    public AzkabanProcess(final List<String> cmd,
+            final Map<String, String> env, final String workingDir,
+            final Logger logger) {
+        this.cmd = cmd;
+        this.env = env;
+        this.workingDir = workingDir;
+        this.processId = -1;
+        this.startupLatch = new CountDownLatch(1);
+        this.completeLatch = new CountDownLatch(1);
+        this.logger = logger;
+    }
+
+    /**
+     * Execute this process, blocking until it has completed.
+     */
+    public void run() throws IOException {
+        if (this.isStarted() || this.isComplete()) {
+            throw new IllegalStateException(
+                    "The process can only be used once.");
+        }
+
+        ProcessBuilder builder = new ProcessBuilder(cmd);
+        builder.directory(new File(workingDir));
+        builder.environment().putAll(env);
+        this.process = builder.start();
+        this.processId = processId(process);
+        if (processId == 0) {
+            logger.debug("Spawned thread with unknown process id");
+        } else {
+            logger.debug("Spawned thread with process id " + processId);
+        }
+
+        this.startupLatch.countDown();
+
+        LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(
+                process.getInputStream()), logger, Level.INFO, 30);
+        LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(
+                process.getErrorStream()), logger, Level.ERROR, 30);
+
+        outputGobbler.start();
+        errorGobbler.start();
+        int exitCode = -1;
+        try {
+            exitCode = process.waitFor();
+        } catch (InterruptedException e) {
+            logger.info("Process interrupted.", e);
+        }
+
+        completeLatch.countDown();
+        if (exitCode != 0) {
+            throw new ProcessFailureException(exitCode,
+                    errorGobbler.getRecentLog());
+        }
+
+        // try to wait for everything to get logged out before exiting
+        outputGobbler.awaitCompletion(5000);
+        errorGobbler.awaitCompletion(5000);
+    }
+
+    /**
+     * Await the completion of this process
+     * 
+     * @throws InterruptedException
+     *             if the thread is interrupted while waiting.
+     */
+    public void awaitCompletion() throws InterruptedException {
+        this.completeLatch.await();
+    }
+
+    /**
+     * Await the start of this process
+     * 
+     * @throws InterruptedException
+     *             if the thread is interrupted while waiting.
+     */
+    public void awaitStartup() throws InterruptedException {
+        this.startupLatch.await();
+    }
+
+    /**
+     * Get the process id for this process, if it has started.
+     * 
+     * @return The process id or -1 if it cannot be fetched
+     */
+    public int getProcessId() {
+        checkStarted();
+        return this.processId;
+    }
+
+    /**
+     * Attempt to kill the process, waiting up to the given time for it to die
+     * 
+     * @param time
+     *            The amount of time to wait
+     * @param unit
+     *            The time unit
+     * @return true iff this soft kill kills the process in the given wait time.
+     */
+    public boolean softKill(final long time, final TimeUnit unit)
+            throws InterruptedException {
+        checkStarted();
+        if (processId != 0 && isStarted()) {
+            try {
+                Runtime.getRuntime().exec("kill " + processId);
+                return completeLatch.await(time, unit);
+            } catch (IOException e) {
+                logger.error("Kill attempt failed.", e);
+            }
+            return false;
+        }
+        return false;
+    }
+
+    /**
+     * Force kill this process
+     */
+    public void hardKill() {
+        checkStarted();
+        if (isRunning()) {
+            process.destroy();
+        }
+    }
+
+    /**
+     * Attempt to get the process id for this process
+     * 
+     * @param process
+     *            The process to get the id from
+     * @return The id of the process
+     */
+    private int processId(final java.lang.Process process) {
+        int processId = 0;
+        try {
+            Field f = process.getClass().getDeclaredField("pid");
+            f.setAccessible(true);
+
+            processId = f.getInt(process);
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+
+        return processId;
+    }
+
+    /**
+     * @return true iff the process has been started
+     */
+    public boolean isStarted() {
+        return startupLatch.getCount() == 0L;
+    }
+
+    /**
+     * @return true iff the process has completed
+     */
+    public boolean isComplete() {
+        return completeLatch.getCount() == 0L;
+    }
+
+    /**
+     * @return true iff the process is currently running
+     */
+    public boolean isRunning() {
+        return isStarted() && !isComplete();
+    }
+
+    public void checkStarted() {
+        if (!isStarted()) {
+            throw new IllegalStateException("Process has not yet started.");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env
+                + ", cwd = " + workingDir + ")";
+    }
+
+    private static class LogGobbler extends Thread {
+
+        private final BufferedReader inputReader;
+        private final Logger logger;
+        private final Level loggingLevel;
+        private final CircularBuffer<String> buffer;
+
+        public LogGobbler(final Reader inputReader, final Logger logger,
+                final Level level, final int bufferLines) {
+            this.inputReader = new BufferedReader(inputReader);
+            this.logger = logger;
+            this.loggingLevel = level;
+            buffer = new CircularBuffer<String>(bufferLines);
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (!Thread.currentThread().isInterrupted()) {
+                    String line = inputReader.readLine();
+                    if (line == null) {
+                        return;
+                    }
+
+                    buffer.append(line);
+                    logger.log(loggingLevel, line);
+                }
+            } catch (IOException e) {
+                logger.error("Error reading from logging stream:", e);
+            }
+        }
+
+        public void awaitCompletion(final long waitMs) {
+            try {
+                join(waitMs);
+            } catch (InterruptedException e) {
+                logger.info("I/O thread interrupted.", e);
+            }
+        }
+
+        public String getRecentLog() {
+            return Joiner.on(System.getProperty("line.separator")).join(buffer);
+        }
+
+    }
+
+}
diff --git a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
new file mode 100644
index 0000000..ec9fd54
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcessBuilder.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor.utils.process;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Helper code for building a process
+ * 
+ * @author jkreps
+ */
+public class AzkabanProcessBuilder {
+
+    private List<String> cmd = new ArrayList<String>();
+    private Map<String, String> env = new HashMap<String, String>();
+    private String workingDir = System.getProperty("user.dir");
+    private Logger logger = Logger.getLogger(AzkabanProcess.class);
+    private int stdErrSnippetSize = 30;
+    private int stdOutSnippetSize = 30;
+    
+    public AzkabanProcessBuilder(String...command) {
+        addArg(command);
+    }
+    
+    public AzkabanProcessBuilder addArg(String...command) {
+        for(String c: command)
+            cmd.add(c);
+        return this;
+    }
+    
+    public AzkabanProcessBuilder setWorkingDir(String dir) {
+        this.workingDir = dir;
+        return this;
+    }
+    
+    public AzkabanProcessBuilder setWorkingDir(File f) {
+        return setWorkingDir(f.getAbsolutePath());
+    }
+    
+    public String getWorkingDir() {
+        return this.workingDir;
+    }
+    
+    public AzkabanProcessBuilder addEnv(String variable, String value) {
+        env.put(variable, value);
+        return this;
+    }
+    
+    public AzkabanProcessBuilder setEnv(Map<String, String> m) {
+        this.env = m;
+        return this;
+    }
+    
+    public Map<String, String> getEnv() {
+        return this.env;
+    }
+    
+    public AzkabanProcessBuilder setStdErrorSnippetSize(int size) {
+        this.stdErrSnippetSize = size;
+        return this;
+    }
+    
+    public AzkabanProcessBuilder setStdOutSnippetSize(int size) {
+        this.stdOutSnippetSize = size;
+        return this;
+    }
+    
+    public AzkabanProcessBuilder setLogger(Logger logger) {
+        this.logger = logger;
+        return this;
+    }
+    
+    public AzkabanProcess build() {
+        return new AzkabanProcess(cmd, env, workingDir, logger);
+    }
+    
+    public List<String> getCommand() {
+        return this.cmd;
+    }
+    
+    public String getCommandString() {
+        return Joiner.on(" ").join(getCommand());
+    }
+    
+    @Override
+    public String toString() {
+        return "ProcessBuilder(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env + ", cwd = " + workingDir + ")";
+    }
+}
diff --git a/src/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java b/src/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
new file mode 100644
index 0000000..d146cf9
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor.utils.process;
+
+public class ProcessFailureException extends RuntimeException {
+
+	private static final long serialVersionUID = 1;
+	
+	private final int exitCode;
+	private final String logSnippet;
+	
+	public ProcessFailureException(int exitCode, String logSnippet) {
+		this.exitCode = exitCode;
+		this.logSnippet = logSnippet;
+	}
+
+	public int getExitCode() {
+		return exitCode;
+	}
+	
+	public String getLogSnippet() {
+	    return this.logSnippet;
+	}
+	
+}
diff --git a/src/java/azkaban/jobExecutor/utils/PropsUtils.java b/src/java/azkaban/jobExecutor/utils/PropsUtils.java
new file mode 100644
index 0000000..b879074
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/PropsUtils.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobExecutor.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import azkaban.utils.Props;
+import azkaban.utils.UndefinedPropertyException;
+//import azkaban.jobExecutor.jobs.dependency.ExecutableFlow;
+import azkaban.executor.ExecutableFlow;
+import org.joda.time.DateTime;
+
+public class PropsUtils {
+
+    /**
+     * Load job schedules from the given directories ] * @param dir The
+     * directory to look in
+     *
+     * @param suffixes File suffixes to load
+     * @return The loaded set of schedules
+     */
+    public static Props loadPropsInDir(File dir, String... suffixes) {
+        return loadPropsInDir(null, dir, suffixes);
+    }
+
+    /**
+     * Load job schedules from the given directories
+     *
+     * @param parent The parent properties for these properties
+     * @param dir The directory to look in
+     * @param suffixes File suffixes to load
+     * @return The loaded set of schedules
+     */
+    public static Props loadPropsInDir(Props parent, File dir, String... suffixes) {
+        try {
+            Props props = new Props(parent);
+            File[] files = dir.listFiles();
+            if(files != null) {
+                for(File f: files) {
+                    if(f.isFile() && endsWith(f, suffixes)) {
+                        props.putAll(new Props(null, f.getAbsolutePath()));
+                    }
+                }
+            }
+            return props;
+        } catch(IOException e) {
+            throw new RuntimeException("Error loading properties.", e);
+        }
+    }
+
+    /**
+     * Load job schedules from the given directories
+     *
+     * @param dirs The directories to check for properties
+     * @param suffixes The suffixes to load
+     * @return The properties
+     */
+    public static Props loadPropsInDirs(List<File> dirs, String... suffixes) {
+        Props props = new Props();
+        for(File dir: dirs) {
+            props.putLocal(loadPropsInDir(dir, suffixes));
+        }
+        return props;
+    }
+
+    /**
+     * Load properties from the given path
+     *
+     * @param jobPath The path to load from
+     * @param props The parent properties for loaded properties
+     * @param suffixes The suffixes of files to load
+     */
+    public static void loadPropsBySuffix(File jobPath, Props props, String... suffixes) {
+        try {
+            if(jobPath.isDirectory()) {
+                File[] files = jobPath.listFiles();
+                if(files != null) {
+                    for(File file: files)
+                        loadPropsBySuffix(file, props, suffixes);
+                }
+            } else if(endsWith(jobPath, suffixes)) {
+                props.putAll(new Props(null, jobPath.getAbsolutePath()));
+            }
+        } catch(IOException e) {
+            throw new RuntimeException("Error loading schedule properties.", e);
+        }
+    }
+
+    public static boolean endsWith(File file, String... suffixes) {
+        for(String suffix: suffixes)
+            if(file.getName().endsWith(suffix))
+                return true;
+        return false;
+    }
+
+    private static final Pattern VARIABLE_PATTERN = Pattern.compile("\\$\\{([a-zA-Z_.0-9]+)\\}");
+
+    public static Props resolveProps(Props props) {
+    	Props resolvedProps = new Props();
+
+    	for(String key : props.getKeySet()) {
+	        StringBuffer replaced = new StringBuffer();
+	        String value = props.get(key);
+	        Matcher matcher = VARIABLE_PATTERN.matcher(value);
+	        while(matcher.find()) {
+	            String variableName = matcher.group(1);
+
+	            if (variableName.equals(key)) {
+	                throw new IllegalArgumentException(
+	                        String.format("Circular property definition starting from property[%s]", key)
+	                );
+	            }
+
+	            String replacement = props.get(variableName);
+	            if(replacement == null)
+	                throw new UndefinedPropertyException("Could not find variable substitution for variable '"
+	                                                     + variableName + "' in key '" + key + "'.");
+
+	            replacement = replacement.replaceAll("\\\\", "\\\\\\\\");
+	            replacement = replacement.replaceAll("\\$", "\\\\\\$");
+
+	            matcher.appendReplacement(replaced, replacement);
+	            matcher.appendTail(replaced);
+
+	            value = replaced.toString();
+	            replaced = new StringBuffer();
+	            matcher = VARIABLE_PATTERN.matcher(value);
+	        }
+	        matcher.appendTail(replaced);
+	        resolvedProps.put(key, replaced.toString());
+    	}
+
+    	return resolvedProps;
+    }
+
+    public static Props produceParentProperties(final ExecutableFlow flow) {
+        Props parentProps = new Props();
+
+        parentProps.put("azkaban.flow.id", flow.getFlowId());
+        parentProps.put("azkaban.flow.uuid", UUID.randomUUID().toString());
+
+        DateTime loadTime = new DateTime();
+
+        parentProps.put("azkaban.flow.start.timestamp", loadTime.toString());
+        parentProps.put("azkaban.flow.start.year", loadTime.toString("yyyy"));
+        parentProps.put("azkaban.flow.start.month", loadTime.toString("MM"));
+        parentProps.put("azkaban.flow.start.day", loadTime.toString("dd"));
+        parentProps.put("azkaban.flow.start.hour", loadTime.toString("HH"));
+        parentProps.put("azkaban.flow.start.minute", loadTime.toString("mm"));
+        parentProps.put("azkaban.flow.start.seconds", loadTime.toString("ss"));
+        parentProps.put("azkaban.flow.start.milliseconds", loadTime.toString("SSS"));
+        parentProps.put("azkaban.flow.start.timezone", loadTime.toString("ZZZZ"));
+        return parentProps;
+    }
+}
diff --git a/src/java/azkaban/jobExecutor/utils/SecurityUtils.java b/src/java/azkaban/jobExecutor/utils/SecurityUtils.java
new file mode 100644
index 0000000..92f7011
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/SecurityUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2011 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor.utils;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class SecurityUtils {
+  // Secure Hadoop proxy user params
+  public static final String ENABLE_PROXYING = "azkaban.should.proxy"; // boolean
+  public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
+  public static final String PROXY_USER = "proxy.user";
+  public static final String TO_PROXY = "user.to.proxy";
+
+  private static UserGroupInformation loginUser = null;
+
+  /**
+   * Create a proxied user based on the explicit user name, taking other parameters
+   * necessary from properties file.
+   */
+  public static synchronized UserGroupInformation getProxiedUser(String toProxy, Properties prop, Logger log, Configuration conf) throws IOException {
+    if(toProxy == null) {
+      throw new IllegalArgumentException("toProxy can't be null");
+    }
+    if(conf == null) {
+      throw new IllegalArgumentException("conf can't be null");
+    }
+
+    if (loginUser == null) {
+      log.info("No login user. Creating login user");
+      String keytab = verifySecureProperty(prop, PROXY_KEYTAB_LOCATION, log);
+      String proxyUser = verifySecureProperty(prop, PROXY_USER, log);
+      UserGroupInformation.setConfiguration(conf);
+      UserGroupInformation.loginUserFromKeytab(proxyUser, keytab);
+      loginUser = UserGroupInformation.getLoginUser();
+      log.info("Logged in with user " + loginUser);
+    } else {
+      log.info("loginUser (" + loginUser + ") already created, refreshing tgt.");
+      loginUser.checkTGTAndReloginFromKeytab();
+    }
+
+    return UserGroupInformation.createProxyUser(toProxy, loginUser);
+  }
+
+  /**
+   * Create a proxied user, taking all parameters, including which user to proxy
+   * from provided Properties.
+   */
+  public static UserGroupInformation getProxiedUser(Properties prop, Logger log, Configuration conf) throws IOException {
+    String toProxy = verifySecureProperty(prop, TO_PROXY, log);
+    return getProxiedUser(toProxy, prop, log, conf);
+  }
+
+  public static String verifySecureProperty(Properties properties, String s, Logger l) throws IOException {
+    String value = properties.getProperty(s);
+
+    if(value == null) throw new IOException(s + " not set in properties. Cannot use secure proxy");
+    l.info("Secure proxy configuration: Property " + s + " = " + value);
+    return value;
+  }
+
+  public static boolean shouldProxy(Properties prop) {
+    String shouldProxy = prop.getProperty(ENABLE_PROXYING);
+
+    return shouldProxy != null && shouldProxy.equals("true");
+  }
+}
diff --git a/src/java/azkaban/jobExecutor/utils/StringUtils.java b/src/java/azkaban/jobExecutor/utils/StringUtils.java
new file mode 100644
index 0000000..00a57f5
--- /dev/null
+++ b/src/java/azkaban/jobExecutor/utils/StringUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jobExecutor.utils;
+
+public class StringUtils
+{
+  public static final char SINGLE_QUOTE = '\'';
+  public static final char DOUBLE_QUOTE = '\"';
+  
+  public static String shellQuote(String s, char quoteCh)
+  {
+    StringBuffer buf = new StringBuffer(s.length()+2);
+
+    buf.append(quoteCh);
+    for (int i = 0; i < s.length(); i++) {
+      final char ch = s.charAt(i);
+      if (ch == quoteCh) {
+        buf.append('\\');
+      }
+      buf.append(ch);
+    }
+    buf.append(quoteCh);
+    
+    return buf.toString();
+  }
+}
diff --git a/src/java/azkaban/scheduler/LocalFileScheduleLoader.java b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
new file mode 100644
index 0000000..b61d8a7
--- /dev/null
+++ b/src/java/azkaban/scheduler/LocalFileScheduleLoader.java
@@ -0,0 +1,360 @@
+package azkaban.scheduler;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import azkaban.user.User;
+import azkaban.utils.Props;
+import azkaban.utils.JSONUtils;
+
+/**
+ * Loads the schedule from a schedule file that is JSON like. The format would be as follows:
+ * 
+ * {
+ * 		schedule: [
+ * 			{
+ * 				"project": "<project>",
+ * 				"user": "<user>",
+ * 				"flow": "<flow>",
+ * 				"time": "<time>",
+ * 				"recurrence":"<period>",
+ * 				"dependency":<boolean>
+ * 			}
+ * 		]
+ * }	
+ * 
+ * 
+ * @author rpark
+ *
+ */
+public class LocalFileScheduleLoader implements ScheduleLoader {
+	private static final String SCHEDULEID = "schedule";
+	private static final String USER = "user";
+	private static final String USERSUBMIT = "userSubmit";
+    private static final String SUBMITTIME = "submitTime";
+    private static final String FIRSTSCHEDTIME = "firstSchedTime";
+	
+	private static final String SCHEDULE = "schedule";
+	private static final String NEXTEXECTIME = "nextExecTime";
+	private static final String TIMEZONE = "timezone";
+	private static final String RECURRENCE = "recurrence";
+	
+	private static final String SCHEDULESTATUS = "schedulestatus";
+	
+	private static DateTimeFormatter FILE_DATEFORMAT = DateTimeFormat.forPattern("yyyy-MM-dd.HH.mm.ss.SSS");
+    private static Logger logger = Logger.getLogger(LocalFileScheduleLoader.class);
+
+    private File basePath;
+	private File scheduleFile;
+	private File backupScheduleFile;
+	
+	public LocalFileScheduleLoader(Props props) throws IOException {
+			
+			
+		basePath = new File(props.getString("schedule.directory"));
+		if (!basePath.exists()) {
+			logger.info("Schedule directory " + basePath + " not found.");
+			if (basePath.mkdirs()) {
+				logger.info("Schedule directory " + basePath + " created.");
+			}
+			else {
+				throw new RuntimeException("Schedule directory " + basePath + " does not exist and cannot be created.");
+			}
+		}
+			
+		scheduleFile = new File(basePath, "schedule");
+		if(!scheduleFile.exists() || scheduleFile.isDirectory()) {
+			logger.info("Schedule file " + scheduleFile + " not found.");
+			if(scheduleFile.createNewFile() && scheduleFile.canRead() && scheduleFile.canWrite()) {
+				logger.info("Schedule file " + scheduleFile + " created.");
+			}
+			else {
+				throw new RuntimeException("Schedule file " + scheduleFile + " cannot be created.");
+			}
+		}
+
+		backupScheduleFile = new File(basePath, "backup");
+		if(!backupScheduleFile.exists() || backupScheduleFile.isDirectory()) {
+			logger.info("Backup schedule file " + backupScheduleFile + " not found.");
+			if(backupScheduleFile.createNewFile() && backupScheduleFile.canRead() && backupScheduleFile.canWrite()) {
+				logger.info("Backup schedule file " + backupScheduleFile + " created.");
+			}
+			else {
+				throw new RuntimeException("Backup schedule file " + backupScheduleFile + " cannot be created.");
+			}
+		}
+
+	}
+
+
+	@Override
+	public List<ScheduledFlow> loadSchedule() {
+        if (scheduleFile != null && backupScheduleFile != null) {
+            if (scheduleFile.exists()) {
+            	if(scheduleFile.length() == 0)
+            		return new ArrayList<ScheduledFlow>();
+				return loadFromFile(scheduleFile);
+            }
+            else if (backupScheduleFile.exists()) {
+            	backupScheduleFile.renameTo(scheduleFile);
+				return loadFromFile(scheduleFile);
+            }
+            else {
+                logger.warn("No schedule files found looking for " + scheduleFile.getAbsolutePath());
+            }
+        }
+        
+        return new ArrayList<ScheduledFlow>();
+	}
+
+	@Override
+	public void saveSchedule(List<ScheduledFlow> schedule) {
+        if (scheduleFile != null && backupScheduleFile != null) {
+            // Delete the backup if it exists and a current file exists.
+            if (backupScheduleFile.exists() && scheduleFile.exists()) {
+            	backupScheduleFile.delete();
+            }
+
+            // Rename the schedule if it exists.
+            if (scheduleFile.exists()) {
+            	scheduleFile.renameTo(backupScheduleFile);
+            }
+
+            HashMap<String,Object> obj = new HashMap<String,Object>();
+            ArrayList<Object> schedules = new ArrayList<Object>();
+            obj.put(SCHEDULE, schedules);
+            //Write out schedule.
+                       
+            for (ScheduledFlow schedFlow : schedule) {
+            	schedules.add(createJSONObject(schedFlow));
+            }
+ 
+    		try {
+    			FileWriter writer = new FileWriter(scheduleFile);
+    			writer.write(JSONUtils.toJSONString(obj, 4));
+    			writer.flush();
+    		} catch (Exception e) {
+    			throw new RuntimeException("Error saving flow file", e);
+    		}
+    		logger.info("schedule saved");
+        }
+	}
+	
+    @SuppressWarnings("unchecked")
+	private List<ScheduledFlow> loadFromFile(File schedulefile)
+    {
+    	BufferedReader reader = null;
+		try {
+			reader = new BufferedReader(new FileReader(schedulefile));
+		} catch (FileNotFoundException e) {
+			// TODO Auto-generated catch block
+			logger.error("Error loading schedule file ", e);
+		}
+    	List<ScheduledFlow> scheduleList = new ArrayList<ScheduledFlow>();
+    	
+		HashMap<String, Object> schedule;
+		try {
+			//TODO handle first time empty schedule file
+			schedule = (HashMap<String,Object>)JSONUtils.fromJSONStream(reader);
+		} catch (Exception e) {
+			//schedule = loadLegacyFile(schedulefile);
+			logger.error("Error parsing the schedule file", e);
+			throw new RuntimeException("Error parsing the schedule file", e);
+		}
+		finally {
+			try {
+				reader.close();
+			} catch (IOException e) {
+			}
+		}
+		
+		ArrayList<Object> array = (ArrayList<Object>)schedule.get("schedule");
+		for (int i = 0; i < array.size(); ++i) {
+			HashMap<String, Object> schedItem = (HashMap<String, Object>)array.get(i);
+			ScheduledFlow sched = createScheduledFlow(schedItem);
+			if (sched != null) {
+				scheduleList.add(sched);	
+			}
+		}
+		
+		return scheduleList;
+    }
+
+    private ScheduledFlow createScheduledFlow(HashMap<String, Object> obj) {
+    	String scheduleId = (String)obj.get(SCHEDULEID);
+    	String user = (String)obj.get(USER);
+    	String userSubmit = (String)obj.get(USERSUBMIT);
+    	String submitTimeRaw = (String)obj.get(SUBMITTIME);
+    	String firstSchedTimeRaw = (String)obj.get(FIRSTSCHEDTIME);
+    	String nextExecTimeRaw = (String)obj.get(NEXTEXECTIME);
+    	String timezone = (String)obj.get(TIMEZONE);
+    	String recurrence = (String)obj.get(RECURRENCE);
+//    	String scheduleStatus = (String)obj.get(SCHEDULESTATUS);
+    	
+    	DateTime nextExecTime = FILE_DATEFORMAT.parseDateTime(nextExecTimeRaw);
+    	DateTime submitTime = FILE_DATEFORMAT.parseDateTime(submitTimeRaw);
+    	DateTime firstSchedTime = FILE_DATEFORMAT.parseDateTime(firstSchedTimeRaw);
+
+    	if (nextExecTime == null) {
+    		logger.error("No next execution time has been set");
+    		return null;
+    	}
+    	
+    	if (submitTime == null) {
+    		logger.error("No submitTime has been set");
+    	}
+    	
+    	if(firstSchedTime == null){
+    		logger.error("No first scheduled time has been set");
+    	}
+    	
+    	if (timezone != null) {
+    		nextExecTime = nextExecTime.withZoneRetainFields(DateTimeZone.forID(timezone));
+    	}
+
+        ReadablePeriod period = null;
+    	if (recurrence != null) {
+    		period = parsePeriodString(scheduleId, recurrence);
+    	}
+
+    	ScheduledFlow scheduledFlow = new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExecTime, period);
+    	if (scheduledFlow.updateTime()) {
+    		return scheduledFlow;
+    	}
+    	
+    	logger.info("Removed " + scheduleId + " off out of scheduled. It is not recurring.");
+    	return null;
+    }
+    
+	private HashMap<String,Object> createJSONObject(ScheduledFlow flow) {
+    	HashMap<String,Object> object = new HashMap<String,Object>();
+    	object.put(SCHEDULEID, flow.getScheduleId());
+    	object.put(USER, flow.getUser());
+    	object.put(USERSUBMIT, flow.getUserSubmit());
+   
+    	object.put(SUBMITTIME, FILE_DATEFORMAT.print(flow.getSubmitTime()));
+    	object.put(FIRSTSCHEDTIME, FILE_DATEFORMAT.print(flow.getFirstSchedTime()));
+    	
+    	object.put(NEXTEXECTIME, FILE_DATEFORMAT.print(flow.getNextExecTime()));
+    	object.put(TIMEZONE, flow.getNextExecTime().getZone().getID());
+    	object.put(RECURRENCE, createPeriodString(flow.getPeriod()));
+//    	object.put(SCHEDULESTATUS, flow.getSchedStatus());
+    	
+    	return object;
+    }
+    
+    private ReadablePeriod parsePeriodString(String scheduleId, String periodStr)
+    {
+        ReadablePeriod period;
+        char periodUnit = periodStr.charAt(periodStr.length() - 1);
+        if (periodUnit == 'n') {
+            return null;
+        }
+
+        int periodInt = Integer.parseInt(periodStr.substring(0, periodStr.length() - 1));
+        switch (periodUnit) {
+            case 'd':
+                period = Days.days(periodInt);
+                break;
+            case 'h':
+                period = Hours.hours(periodInt);
+                break;
+            case 'm':
+                period = Minutes.minutes(periodInt);
+                break;
+            case 's':
+                period = Seconds.seconds(periodInt);
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid schedule period unit '" + periodUnit + "' for flow " + scheduleId);
+        }
+
+        return period;
+    }
+
+    private String createPeriodString(ReadablePeriod period)
+    {
+        String periodStr = "n";
+
+        if (period == null) {
+            return "n";
+        }
+
+        if (period.get(DurationFieldType.days()) > 0) {
+            int days = period.get(DurationFieldType.days());
+            periodStr = days + "d";
+        }
+        else if (period.get(DurationFieldType.hours()) > 0) {
+            int hours = period.get(DurationFieldType.hours());
+            periodStr = hours + "h";
+        }
+        else if (period.get(DurationFieldType.minutes()) > 0) {
+            int minutes = period.get(DurationFieldType.minutes());
+            periodStr = minutes + "m";
+        }
+        else if (period.get(DurationFieldType.seconds()) > 0) {
+            int seconds = period.get(DurationFieldType.seconds());
+            periodStr = seconds + "s";
+        }
+
+        return periodStr;
+    }
+
+//    private HashMap<String,Object> loadLegacyFile(File schedulefile) {
+//        Props schedule = null;
+//        try {
+//            schedule = new Props(null, schedulefile.getAbsolutePath());
+//        } catch(Exception e) {
+//            throw new RuntimeException("Error loading schedule from " + schedulefile);
+//        }
+//
+//        ArrayList<Object> flowScheduleList = new ArrayList<Object>();
+//        for(String key: schedule.getKeySet()) {
+//        	HashMap<String,Object> scheduledMap = parseScheduledFlow(key, schedule.get(key));
+//        	if (scheduledMap == null) {
+//        		flowScheduleList.add(scheduledMap);
+//        	}
+//        }
+//
+//        HashMap<String,Object> scheduleMap = new HashMap<String,Object>();
+//        scheduleMap.put(SCHEDULE, flowScheduleList );
+//        
+//        return scheduleMap;
+//    }
+    
+//    private HashMap<String,Object> parseScheduledFlow(String name, String flow) {
+//        String[] pieces = flow.split("\\s+");
+//
+//        if(pieces.length != 3) {
+//            logger.warn("Error loading schedule from file " + name);
+//            return null;
+//        }
+//
+//        HashMap<String,Object> scheduledFlow = new HashMap<String,Object>();
+//        scheduledFlow.put(PROJECTID, name);
+//        scheduledFlow.put(TIME, pieces[0]);
+//        scheduledFlow.put(RECURRENCE, pieces[1]);
+//        Boolean dependency = Boolean.parseBoolean(pieces[2]);
+//        
+//        return scheduledFlow;
+//    }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduledFlow.java b/src/java/azkaban/scheduler/ScheduledFlow.java
new file mode 100644
index 0000000..38c94b3
--- /dev/null
+++ b/src/java/azkaban/scheduler/ScheduledFlow.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.scheduler;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.user.User;
+import azkaban.utils.Utils;
+
+
+/**
+ * Schedule for a job instance. This is decoupled from the execution.
+ * 
+ * @author Richard Park
+ * 
+ */
+public class ScheduledFlow {
+
+	//use projectId.flowId to form a unique scheduleId
+	private final String scheduleId;	
+    
+    private final ReadablePeriod period;
+    private DateTime nextExecTime;
+    private final String user;
+    private final String userSubmit;
+    private final DateTime submitTime;
+    private final DateTime firstSchedTime;
+//    private SchedStatus schedStatus;
+    
+    public enum SchedStatus {
+    	LASTSUCCESS ("lastsuccess"), 
+    	LASTFAILED ("lastfailed"), 
+    	LASTPAUSED ("lastpaused");
+    	
+    	private final String status;
+    	SchedStatus(String status){
+    		this.status = status;
+    	}
+    	private String status(){
+    		return this.status;
+    	}
+    }
+
+    /**
+     * Constructor 
+     * 
+     * @param jobName Unique job name
+     * @param nextExecution The next execution time
+     * @param ignoreDependency 
+     */
+    public ScheduledFlow(String scheduleId,
+    					String user,
+    					String userSubmit,
+    					DateTime submitTime,
+    					DateTime firstSchedTime,
+                        DateTime nextExecution) {
+        this(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExecution, null);
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param jobId
+     * @param nextExecution
+     * @param period
+     * @param ignoreDependency
+     */
+    public ScheduledFlow(String scheduleId,
+    					String user,
+    					String userSubmit,
+    					DateTime submitTime,
+    					DateTime firstSchedTime,
+                        DateTime nextExecution,
+                        ReadablePeriod period) {
+        super();
+        this.scheduleId = Utils.nonNull(scheduleId);
+        this.user = user;
+        this.userSubmit = userSubmit;
+        this.submitTime = submitTime;
+        this.firstSchedTime = firstSchedTime;
+        this.period = period;
+        this.nextExecTime = Utils.nonNull(nextExecution);
+//        this.schedStatus = SchedStatus.LASTSUCCESS;
+    }
+    
+    public ScheduledFlow(String scheduleId, 
+    		String user, 
+    		String userSubmit,
+			DateTime submitTime, 
+			DateTime firstSchedTime,
+			ReadablePeriod period) {
+    	super();
+        this.scheduleId = Utils.nonNull(scheduleId);
+        this.user = user;
+        this.userSubmit = userSubmit;
+        this.submitTime = submitTime;
+        this.firstSchedTime = firstSchedTime;
+        this.period = period;
+        this.nextExecTime = new DateTime();
+//        this.schedStatus = SchedStatus.LASTSUCCESS;
+	}
+
+    public ScheduledFlow(String scheduleId, 
+    		String user, 
+    		String userSubmit,
+			DateTime submitTime, 
+			DateTime firstSchedTime) {
+    	super();
+        this.scheduleId = Utils.nonNull(scheduleId);
+        this.user = user;
+        this.userSubmit = userSubmit;
+        this.submitTime = submitTime;
+        this.firstSchedTime = firstSchedTime;
+        this.period = null;
+        this.nextExecTime = new DateTime();
+//        this.schedStatus = SchedStatus.LASTSUCCESS;
+	}
+
+//	public SchedStatus getSchedStatus() {
+//		return this.schedStatus;
+//	}
+//
+//	public void setSchedStatus(SchedStatus schedStatus) {
+//		this.schedStatus = schedStatus;
+//	}
+
+	/**
+     * Updates the time to a future time after 'now' that matches the period description.
+     * 
+     * @return
+     */
+    public boolean updateTime() {
+    	if (nextExecTime.isAfterNow()) {
+    		return true;
+    	}
+    	
+        if (period != null) {
+        	DateTime other = getNextRuntime(nextExecTime, period);
+    		
+    		this.nextExecTime = other;
+    		return true;
+    	}
+
+        return false;
+    }
+    
+    /**
+     * Calculates the next runtime by adding the period.
+     * 
+     * @param scheduledDate
+     * @param period
+     * @return
+     */
+    private DateTime getNextRuntime(DateTime scheduledDate, ReadablePeriod period)
+    {
+        DateTime now = new DateTime();
+        DateTime date = new DateTime(scheduledDate);
+        int count = 0;
+        while (!now.isBefore(date)) {
+            if (count > 100000) {
+                throw new IllegalStateException("100000 increments of period did not get to present time.");
+            }
+
+            if (period == null) {
+                break;
+            }
+            else {
+                date = date.plus(period);
+            }
+
+            count += 1;
+        }
+
+        return date;
+    }
+    
+    /**
+     * Returns the unique id of the job to be run.
+     * @return
+     */
+
+    /**
+     * Returns true if the job recurrs in the future
+     * @return
+     */
+    public boolean isRecurring() {
+        return this.period != null;
+    }
+
+    /**
+     * Returns the recurrance period. Or null if not applicable
+     * @return
+     */
+    public ReadablePeriod getPeriod() {
+        return period;
+    }
+
+	public DateTime getFirstSchedTime() {
+		return firstSchedTime;
+	}
+
+	/**
+     * Returns the next scheduled execution
+     * @return
+     */
+    public DateTime getNextExecTime() {
+        return nextExecTime;
+    }
+
+	public String getUserSubmit() {
+		return userSubmit;
+	}
+
+	public DateTime getSubmitTime() {
+		return submitTime;
+	}
+
+	@Override
+    public String toString()
+    {
+        return "ScheduledFlow{" +
+//        	   "scheduleStatus=" + schedStatus +
+               "nextExecTime=" + nextExecTime +
+               ", period=" + period +
+               ", firstSchedTime=" + firstSchedTime +
+               ", submitTime=" + submitTime +
+               ", userSubmit=" + userSubmit +
+               ", user=" + user +
+               ", scheduleId='" + scheduleId + '\'' +
+               '}';
+    }
+
+	public String getUser() {
+		return user;
+	}
+	
+	public String getScheduleId() {
+		return scheduleId;
+	}
+	
+	public String getFlowId(){
+		return this.scheduleId.split("\\.")[1];
+	}
+	
+	public String getProjectId(){
+		return this.scheduleId.split("\\.")[0];
+	}
+}
diff --git a/src/java/azkaban/scheduler/ScheduleLoader.java b/src/java/azkaban/scheduler/ScheduleLoader.java
new file mode 100644
index 0000000..6997264
--- /dev/null
+++ b/src/java/azkaban/scheduler/ScheduleLoader.java
@@ -0,0 +1,11 @@
+package azkaban.scheduler;
+
+import java.util.List;
+
+
+public interface ScheduleLoader {
+	public void saveSchedule(List<ScheduledFlow> schedule);
+	
+	public List<ScheduledFlow> loadSchedule();
+
+}
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
new file mode 100644
index 0000000..12c9184
--- /dev/null
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -0,0 +1,410 @@
+package azkaban.scheduler;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.PeriodFormat;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.flow.Flow;
+import azkaban.jobExecutor.utils.JobExecutionException;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.scheduler.ScheduledFlow.SchedStatus;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.utils.Props;
+
+
+
+/**
+ * The ScheduleManager stores and executes the schedule. It uses a single thread instead
+ * and waits until correct loading time for the flow. It will not remove the flow from the
+ * schedule when it is run, which can potentially allow the flow to and overlap each other.
+ * 
+ * @author Richard
+ */
+public class ScheduleManager {
+	private static Logger logger = Logger.getLogger(ScheduleManager.class);
+
+    private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
+	private ScheduleLoader loader;
+    private Map<String, ScheduledFlow> scheduleIDMap = new LinkedHashMap<String, ScheduledFlow>(); 
+    private final ScheduleRunner runner;
+    private final ExecutorManager executorManager;
+    private final ProjectManager projectManager;
+    
+    /**
+     * Give the schedule manager a loader class that will properly load the schedule.
+     * 
+     * @param loader
+     */
+    public ScheduleManager(
+    		ExecutorManager executorManager,
+    		ProjectManager projectManager,
+    		ScheduleLoader loader) 
+    {
+    	this.executorManager = executorManager;
+    	this.projectManager = projectManager;
+    	this.loader = loader;
+    	this.runner = new ScheduleRunner();
+    	
+    	List<ScheduledFlow> scheduleList = loader.loadSchedule();
+    	for (ScheduledFlow flow: scheduleList) {
+    		internalSchedule(flow);
+    	}
+    	
+    	this.runner.start();
+    }
+    
+    /**
+     * Shutdowns the scheduler thread. After shutdown, it may not be safe to use it again.
+     */
+    public void shutdown() {
+    	this.runner.shutdown();
+    }
+    
+    /**
+     * Retrieves a copy of the list of schedules.
+     * 
+     * @return
+     */
+    public synchronized List<ScheduledFlow> getSchedule() {
+    	return runner.getSchedule();
+    }
+
+    /**
+     * Returns the scheduled flow for the flow name
+     * 
+     * @param id
+     * @return
+     */
+    public ScheduledFlow getSchedule(String scheduleId) {
+    	return scheduleIDMap.get(scheduleId);
+    }
+    
+    /**
+     * Removes the flow from the schedule if it exists.
+     * 
+     * @param id
+     */
+    public synchronized void removeScheduledFlow(String scheduleId) {
+    	ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+    	scheduleIDMap.remove(scheduleId);
+    	runner.removeScheduledFlow(flow);
+    	
+    	loader.saveSchedule(getSchedule());
+    }
+    
+//    public synchronized void pauseScheduledFlow(String scheduleId){
+//    	try{
+//        	ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+//        	flow.setSchedStatus(SchedStatus.LASTPAUSED);
+//        	loader.saveSchedule(getSchedule());
+//    	}
+//    	catch (Exception e) {
+//    		throw new RuntimeException("Error pausing a schedule " + scheduleId);
+//		}
+//    }
+//    
+//    public synchronized void resumeScheduledFlow(String scheduleId){
+//    	try {
+//    		ScheduledFlow flow = scheduleIDMap.get(scheduleId);
+//        	flow.setSchedStatus(SchedStatus.LASTSUCCESS);
+//        	loader.saveSchedule(getSchedule());			
+//		} 
+//    	catch (Exception e) {
+//			throw new RuntimeException("Error resuming a schedule " + scheduleId);
+//		}
+//    }
+    
+    public void schedule(final String scheduleId,
+    		final String user,
+    		final String userSubmit,
+    		final DateTime submitTime,
+    		final DateTime firstSchedTime,
+    		final ReadablePeriod period
+            ) {
+    	//TODO: should validate projectId and flowId?
+		logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime)
+		+ " with a period of " + PeriodFormat.getDefault().print(period));
+		schedule(new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, period));
+	}
+    
+    /**
+     * Schedule the flow
+     * @param flowId
+     * @param date
+     * @param ignoreDep
+     */
+    public void schedule(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime) {
+        logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime));
+        schedule(new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime));
+    }
+    
+    /**
+     * Schedules the flow, but doesn't save the schedule afterwards.
+     * @param flow
+     */
+    private synchronized void internalSchedule(ScheduledFlow flow) {
+    	ScheduledFlow existing = scheduleIDMap.get(flow.getScheduleId());
+    	flow.updateTime();
+    	if (existing != null) {
+    		this.runner.removeScheduledFlow(existing);
+    	}
+    	
+		this.runner.addScheduledFlow(flow);
+    	scheduleIDMap.put(flow.getScheduleId(), flow);
+    }
+    
+    /**
+     * Adds a flow to the schedule.
+     * 
+     * @param flow
+     */
+    public synchronized void schedule(ScheduledFlow flow) {
+    	internalSchedule(flow);
+    	saveSchedule();
+    }
+    
+    /**
+     * Save the schedule
+     */
+    private void saveSchedule() {
+    	loader.saveSchedule(getSchedule());
+    }
+    
+    
+    /**
+     * Thread that simply invokes the running of flows when the schedule is
+     * ready.
+     * 
+     * @author Richard Park
+     *
+     */
+    public class ScheduleRunner extends Thread {
+    	private final PriorityBlockingQueue<ScheduledFlow> schedule;
+    	private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+        	// Five minute minimum intervals
+    	private static final int TIMEOUT_MS = 300000;
+    	
+    	public ScheduleRunner() {
+    		schedule = new PriorityBlockingQueue<ScheduledFlow>(1, new ScheduleComparator());
+    	}
+    	
+    	public void shutdown() {
+    		logger.error("Shutting down scheduler thread");
+    		stillAlive.set(false);
+    		this.interrupt();
+    	}
+    	
+    	/**
+    	 * Return a list of scheduled flow
+    	 * @return
+    	 */
+    	public synchronized List<ScheduledFlow> getSchedule() {
+    		return new ArrayList<ScheduledFlow>(schedule);
+    	}
+    	
+    	/**
+    	 * Adds the flow to the schedule and then interrupts so it will update its wait time.
+    	 * @param flow
+    	 */
+        public synchronized void addScheduledFlow(ScheduledFlow flow) {
+        	logger.info("Adding " + flow + " to schedule.");
+        	schedule.add(flow);
+//            MonitorImpl.getInternalMonitorInterface().workflowEvent(null, 
+//                    System.currentTimeMillis(),
+//                    WorkflowAction.SCHEDULE_WORKFLOW, 
+//                    WorkflowState.NOP,
+//                    flow.getId());
+
+        	this.interrupt();
+        }
+
+        /**
+         * Remove scheduled flows. Does not interrupt.
+         * 
+         * @param flow
+         */
+        public synchronized void removeScheduledFlow(ScheduledFlow flow) {
+        	logger.info("Removing " + flow + " from the schedule.");
+        	schedule.remove(flow);
+//            MonitorImpl.getInternalMonitorInterface().workflowEvent(null, 
+//                    System.currentTimeMillis(),
+//                    WorkflowAction.UNSCHEDULE_WORKFLOW,
+//                    WorkflowState.NOP,
+//                    flow.getId());
+        	// Don't need to interrupt, because if this is originally on the top of the queue,
+        	// it'll just skip it.
+        }
+        
+        public void run() {
+        	while(stillAlive.get()) {
+        		synchronized (this) {
+        			try {
+        				//TODO clear up the exception handling
+        				ScheduledFlow schedFlow = schedule.peek();
+	    	    		
+	    	    		if (schedFlow == null) {
+	    	    			// If null, wake up every minute or so to see if there's something to do.
+	    	    			// Most likely there will not be.
+	    	    			try {
+	    	    				this.wait(TIMEOUT_MS);
+	    	    			} catch (InterruptedException e) {
+	    	    				// interruption should occur when items are added or removed from the queue.
+	    	    			}
+	    	    		}
+	    	    		else {
+	    	    			// We've passed the flow execution time, so we will run.
+	    	    			if (!schedFlow.getNextExecTime().isAfterNow()) {
+	    	    				// Run flow. The invocation of flows should be quick.
+	    	    				ScheduledFlow runningFlow = schedule.poll();
+	    	    				logger.info("Scheduler attempting to run " + runningFlow.getScheduleId());
+	
+	    	    				// Execute the flow here
+	    	    				try {
+	    	    					Project project = projectManager.getProject(runningFlow.getProjectId());
+	    	    					if (project == null) {
+	    	    						logger.error("Scheduled Project "+runningFlow.getProjectId()+" does not exist!");
+	    	    						throw new RuntimeException("Error finding the scheduled project. " + runningFlow.getScheduleId());
+	    	    					}
+	    	    					
+	    	    					Flow flow = project.getFlow(runningFlow.getFlowId());
+	    	    					if (flow == null) {
+	    	    						logger.error("Flow " + runningFlow.getFlowId() + " cannot be found in project " + project.getName());
+	    	    						throw new RuntimeException("Error finding the scheduled flow. " + runningFlow.getScheduleId());
+	    	    					}
+	    	    					
+	    	    					HashMap<String, Props> sources;
+	    	    					try {
+	    	    						sources = projectManager.getAllFlowProperties(project, runningFlow.getFlowId());
+	    	    					}
+	    	    					catch (ProjectManagerException e) {
+	    	    						logger.error(e.getMessage());
+	    	    						throw new RuntimeException("Error getting the flow resources. " + runningFlow.getScheduleId());
+	    	    					}
+	    	    				    	    					
+	    	    					// Create ExecutableFlow
+	    	    					ExecutableFlow exflow = executorManager.createExecutableFlow(flow);
+	    	    					exflow.setSubmitUser(runningFlow.getUser());
+	    	    					//TODO make disabled in scheduled flow
+//	    	    					Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
+//	    	    					for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
+//	    	    						boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
+//	    	    						exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
+//	    	    					}
+	    	    					
+	    	    					// Create directory
+	    	    					try {
+	    	    						executorManager.setupExecutableFlow(exflow);
+	    	    					} catch (ExecutorManagerException e) {
+	    	    						try {
+	    	    							executorManager.cleanupAll(exflow);
+	    	    						} catch (ExecutorManagerException e1) {
+	    	    							e1.printStackTrace();
+	    	    						}
+	    	    						logger.error(e.getMessage());
+	    	    						return;
+	    	    					}
+
+	    	    					// Copy files to the source.
+	    	    					File executionDir = new File(exflow.getExecutionPath());
+	    	    					try {
+	    	    						projectManager.copyProjectSourceFilesToDirectory(project, executionDir);
+	    	    					} catch (ProjectManagerException e) {
+	    	    						try {
+	    	    							executorManager.cleanupAll(exflow);
+	    	    						} catch (ExecutorManagerException e1) {
+	    	    							e1.printStackTrace();
+	    	    						}
+	    	    						logger.error(e.getMessage());
+	    	    						return;
+	    	    					}
+	    	    					
+
+	    	    					try {
+	    	    						executorManager.executeFlow(exflow);
+	    	    					} catch (ExecutorManagerException e) {
+	    	    						try {
+	    	    							executorManager.cleanupAll(exflow);
+	    	    						} catch (ExecutorManagerException e1) {
+	    	    							e1.printStackTrace();
+	    	    						}
+	    	    						
+	    	    						logger.error(e.getMessage());
+	    	    						return;
+	    	    					}
+	    	    				} catch (JobExecutionException e) {
+	    	    					logger.info("Could not run flow. " + e.getMessage());
+	    	    				}
+	    	    	        	schedule.remove(runningFlow);
+	    	    				
+	    	    				// Immediately reschedule if it's possible. Let the execution manager
+	    	    				// handle any duplicate runs.
+	    	    				if (runningFlow.updateTime()) {
+	    	    					schedule.add(runningFlow);
+	    	    				}
+    	    					saveSchedule();
+	    	    			}
+	    	    			else {
+	    	    				// wait until flow run
+	    	    				long millisWait = Math.max(0, schedFlow.getNextExecTime().getMillis() - (new DateTime()).getMillis());
+	    	    				try {
+	    							this.wait(Math.min(millisWait, TIMEOUT_MS));
+	    						} catch (InterruptedException e) {
+	    							// interruption should occur when items are added or removed from the queue.
+	    						}
+	    	    			}
+	    	    		}
+        			}
+        			catch (Exception e) {
+        				logger.error("Unexpected exception has been thrown in scheduler", e);
+        			}
+        			catch (Throwable e) {
+        				logger.error("Unexpected throwable has been thrown in scheduler", e);
+        			}
+        		}
+        	}
+        }
+        
+        /**
+         * Class to sort the schedule based on time.
+         * 
+         * @author Richard Park
+         */
+        private class ScheduleComparator implements Comparator<ScheduledFlow>{
+    		@Override
+    		public int compare(ScheduledFlow arg0, ScheduledFlow arg1) {
+    			DateTime first = arg1.getNextExecTime();
+    			DateTime second = arg0.getNextExecTime();
+    			
+    			if (first.isEqual(second)) {
+    				return 0;
+    			}
+    			else if (first.isBefore(second)) {
+    				return 1;
+    			}
+    			
+    			return -1;
+    		}	
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/java/azkaban/utils/CircularBuffer.java b/src/java/azkaban/utils/CircularBuffer.java
new file mode 100644
index 0000000..1e62259
--- /dev/null
+++ b/src/java/azkaban/utils/CircularBuffer.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterators;
+
+/**
+ * A circular buffer of items of a given length. It will grow up to the give size as items are appended, then
+ * it will begin to overwrite older items.
+ * 
+ * @author jkreps
+ *
+ * @param <T> The type of the item contained.
+ */
+public class CircularBuffer<T> implements Iterable<T> {
+	
+	private final List<T> lines;
+	private final int size;
+	private int start;
+	
+	public CircularBuffer(int size) {
+		this.lines = new ArrayList<T>();
+		this.size = size;
+		this.start = 0;
+	}
+	
+	public void append(T line) {
+		if(lines.size() < size) {
+			lines.add(line);
+		} else {
+			lines.set(start, line);
+			start = (start + 1) % size;
+		}
+	}
+			
+	@Override
+	public String toString() {
+		return "[" + Joiner.on(", ").join(lines) + "]";
+	}
+
+	public Iterator<T> iterator() {
+		if(start == 0)
+			return lines.iterator();
+		else
+			return Iterators.concat(lines.subList(start, lines.size()).iterator(), lines.subList(0, start).iterator());
+	}
+	
+	public int getMaxSize() {
+	    return this.size;
+	}
+	
+	public int getSize() {
+	    return this.lines.size();
+	}
+	
+}
\ No newline at end of file
diff --git a/src/java/azkaban/utils/JSONUtils.java b/src/java/azkaban/utils/JSONUtils.java
index d682dbe..cae5958 100644
--- a/src/java/azkaban/utils/JSONUtils.java
+++ b/src/java/azkaban/utils/JSONUtils.java
@@ -7,23 +7,190 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.Reader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectWriter;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
 
 public class JSONUtils {
+
+	
 	/**
-	 * Prevent the instantiation of this helper class.
+	 * The constructor. Cannot construct this class.
 	 */
 	private JSONUtils() {
 	}
+	
+	/**
+	 * Takes a reader to stream the JSON string. The reader is not wrapped in a BufferReader
+	 * so it is up to the user to employ such optimizations if so desired.
+	 * 
+	 * The results will be Maps, Lists and other Java mapping of Json types (i.e. String, Number, Boolean).
+	 * 
+	 * @param reader
+	 * @return
+	 * @throws Exception
+	 */
+	public static Map<String, Object> fromJSONStream(Reader reader) throws Exception {
+		JSONObject jsonObj = new JSONObject(new JSONTokener(reader));
+		Map<String, Object> results = createObject(jsonObj);
+		
+		return results;
+	}
+	
+	/**
+	 * Converts a json string to Objects.
+	 * 
+	 * The results will be Maps, Lists and other Java mapping of Json types (i.e. String, Number, Boolean).
+	 * 
+	 * @param str
+	 * @return
+	 * @throws Exception
+	 */
+	public static Map<String, Object> fromJSONString(String str) throws Exception {
+		JSONObject jsonObj = new JSONObject(str);
+		Map<String, Object> results = createObject(jsonObj);
+		return results;
+	}
+
+	/**
+	 * Recurses through the json object to create a Map/List/Object equivalent.
+	 * 
+	 * @param obj
+	 * @return
+	 */
+	@SuppressWarnings("unchecked")
+	private static Map<String, Object> createObject(JSONObject obj) {
+		LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
+		
+		Iterator<String> iterator = obj.keys();
+		while(iterator.hasNext()) {
+			String key = iterator.next();
+			Object value = null;
+			try {
+				value = obj.get(key);
+			} catch (JSONException e) {
+				// Since we get the value from the key given by the JSONObject, 
+				// this exception shouldn't be thrown.
+			}
+			
+			if (value instanceof JSONArray) {
+				value = createArray((JSONArray)value);
+			}
+			else if (value instanceof JSONObject) {
+				value = createObject((JSONObject)value);
+			}
+			
+			map.put(key, value);
+		}
 
+		return map;
+	}
+	
+	/**
+	 * Recurses through the json object to create a Map/List/Object equivalent.
+	 * 
+	 * @param obj
+	 * @return
+	 */
+	private static List<Object> createArray(JSONArray array) {
+		ArrayList<Object> list = new ArrayList<Object>();
+		for (int i = 0; i < array.length(); ++i) {
+			Object value = null;
+			try {
+				value = array.get(i);
+			} catch (JSONException e) {
+				// Ugh... JSON's over done exception throwing.
+			}
+			
+			if (value instanceof JSONArray) {
+				value = createArray((JSONArray)value);
+			}
+			else if (value instanceof JSONObject) {
+				value = createObject((JSONObject)value);
+			}
+			
+			list.add(value);
+		}
+		
+		return list;
+	}
+	
+	/**
+	 * Creates a json string from Map/List/Primitive object.
+	 * 
+	 * @param obj
+	 * @return
+	 */
+	public static String toJSONString(List<?> list) {
+		JSONArray jsonList = new JSONArray(list);
+		try {
+			return jsonList.toString();
+		} catch (Exception e) {
+			return "";
+		}
+	}
+	
+	/**
+	 * Creates a json string from Map/List/Primitive object.
+	 * 
+	 * @param obj
+	 * @parm indent
+	 * @return
+	 */
+	public static String toJSONString(List<?> list, int indent) {
+		JSONArray jsonList = new JSONArray(list);
+		try {
+			return jsonList.toString(indent);
+		} catch (Exception e) {
+			return "";
+		}
+	}
+	
+	/**
+	 * Creates a json string from Map/List/Primitive object.
+	 * 
+	 * @param obj
+	 * @return
+	 */
+	public static String toJSONString(Map<String, Object> obj) {
+		JSONObject jsonObj = new JSONObject(obj);
+		try {
+			return jsonObj.toString();
+		} catch (Exception e) {
+			return "";
+		}
+	}
+	
+	/**
+	 * Creates a json pretty string from Map/List/Primitive object
+	 * 
+	 * @param obj
+	 * @param indent
+	 * @return
+	 */
+	public static String toJSONString(Map<String, Object> obj, int indent) {
+		JSONObject jsonObj = new JSONObject(obj);
+		try {
+			return jsonObj.toString(indent);
+		} catch (Exception e) {
+			return "";
+		}
+	}
+	
 	public static String toJSON(Object obj) {
 		return toJSON(obj, false);
 	}
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index c00951a..a4d5db3 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -24,6 +24,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.Random;
@@ -174,4 +176,59 @@ public class Utils {
 
 		return (Double) obj;
 	}
+	
+    /**
+     * Get the root cause of the Exception
+     * 
+     * @param e The Exception
+     * @return The root cause of the Exception
+     */
+    private static RuntimeException getCause(InvocationTargetException e) {
+        Throwable cause = e.getCause();
+        if(cause instanceof RuntimeException)
+            throw (RuntimeException) cause;
+        else
+            throw new IllegalStateException(e.getCause());
+    }
+	
+    /**
+     * Get the Class of all the objects
+     * 
+     * @param args The objects to get the Classes from
+     * @return The classes as an array
+     */
+    public static Class<?>[] getTypes(Object... args) {
+        Class<?>[] argTypes = new Class<?>[args.length];
+        for(int i = 0; i < argTypes.length; i++)
+            argTypes[i] = args[i].getClass();
+        return argTypes;
+    }
+	
+    public static Object callConstructor(Class<?> c, Object... args) {
+        return callConstructor(c, getTypes(args), args);
+    }
+
+    /**
+     * Call the class constructor with the given arguments
+     * 
+     * @param c The class
+     * @param args The arguments
+     * @return The constructed object
+     */
+    public static Object callConstructor(Class<?> c, Class<?>[] argTypes, Object[] args) {
+        try {
+            Constructor<?> cons = c.getConstructor(argTypes);
+            return cons.newInstance(args);
+        } catch(InvocationTargetException e) {
+            throw getCause(e);
+        } catch(IllegalAccessException e) {
+            throw new IllegalStateException(e);
+        } catch(NoSuchMethodException e) {
+            throw new IllegalStateException(e);
+        } catch(InstantiationException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+	
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 79a14fa..994546f 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -38,12 +38,16 @@ import org.mortbay.thread.QueuedThreadPool;
 import azkaban.executor.ExecutorManager;
 import azkaban.project.FileProjectManager;
 import azkaban.project.ProjectManager;
+import azkaban.scheduler.LocalFileScheduleLoader;
+import azkaban.scheduler.ScheduleManager;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 import azkaban.webapp.servlet.AzkabanServletContextListener;
+
 import azkaban.webapp.servlet.ExecutorServlet;
+import azkaban.webapp.servlet.ScheduleServlet;
 import azkaban.webapp.servlet.HistoryServlet;
 import azkaban.webapp.servlet.IndexServlet;
 import azkaban.webapp.servlet.ProjectManagerServlet;
@@ -95,7 +99,9 @@ public class AzkabanWebServer {
 	private UserManager userManager;
 	private ProjectManager projectManager;
 	private ExecutorManager executorManager;
-
+	
+	private ScheduleManager scheduleManager;
+	
 	private Props props;
 	private SessionCache sessionCache;
 	private File tempDir;
@@ -118,6 +124,7 @@ public class AzkabanWebServer {
 		userManager = loadUserManager(props);
 		projectManager = loadProjectManager(props);
 		executorManager = loadExecutorManager(props);
+		scheduleManager = loadScheduleManager(executorManager, props);
 
 		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
 
@@ -185,6 +192,13 @@ public class AzkabanWebServer {
 		return execManager;
 	}
 
+	private ScheduleManager loadScheduleManager(ExecutorManager execManager, Props props ) throws Exception {
+
+		ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, new LocalFileScheduleLoader(props));
+		return schedManager;
+	}
+
+	
 	/**
 	 * Returns the web session cache.
 	 * 
@@ -225,6 +239,10 @@ public class AzkabanWebServer {
 	public ExecutorManager getExecutorManager() {
 		return executorManager;
 	}
+	
+	public ScheduleManager getScheduleManager() {
+		return scheduleManager;
+	}
 
 	/**
 	 * Creates and configures the velocity engine.
@@ -348,6 +366,7 @@ public class AzkabanWebServer {
 		root.addServlet(new ServletHolder(new ProjectManagerServlet()),"/manager");
 		root.addServlet(new ServletHolder(new ExecutorServlet()),"/executor");
 		root.addServlet(new ServletHolder(new HistoryServlet()), "/history");
+		root.addServlet(new ServletHolder(new ScheduleServlet()),"/schedule");
 		
 		root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
 
@@ -438,4 +457,6 @@ public class AzkabanWebServer {
 
 		return null;
 	}
+
+
 }
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
new file mode 100644
index 0000000..376bb0d
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -0,0 +1,172 @@
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.joda.time.DateTime;
+import org.joda.time.Days;
+import org.joda.time.Hours;
+import org.joda.time.LocalDateTime;
+import org.joda.time.Minutes;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormat;
+
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.project.ProjectManagerException;
+import azkaban.user.User;
+import azkaban.user.Permission.Type;
+import azkaban.webapp.session.Session;
+import azkaban.scheduler.ScheduleManager;
+
+public class ScheduleServlet extends LoginAbstractAzkabanServlet {
+	private static final long serialVersionUID = 1L;
+	private ProjectManager projectManager;
+	private ScheduleManager scheduleManager;
+
+	@Override
+	public void init(ServletConfig config) throws ServletException {
+		super.init(config);
+		projectManager = this.getApplication().getProjectManager();
+		scheduleManager = this.getApplication().getScheduleManager();
+	}
+
+	@Override
+	protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		if (hasParam(req, "ajax")) {
+			handleAJAXAction(req, resp, session);
+		}
+//		else if (hasParam(req, "execid")) {
+//			handleExecutionFlowPage(req, resp, session);
+//		}
+	}
+
+	@Override
+	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		HashMap<String, Object> ret = new HashMap<String, Object>();
+		if (hasParam(req, "action")) {
+			String action = getParam(req, "action");
+			if (action.equals("scheduleFlow")) {
+				ajaxScheduleFlow(req, ret, session.getUser());
+			}
+		}
+		this.writeJSON(resp, ret);
+	}
+
+	private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		HashMap<String, Object> ret = new HashMap<String, Object>();
+		String ajaxName = getParam(req, "ajax");
+		
+////		if (hasParam(req, "execid")) {
+////			if (ajaxName.equals("fetchexecflow")) {
+////				ajaxFetchExecutableFlow(req, resp, ret, session.getUser());
+////			}
+//////			else if (ajaxName.equals("fetchexecflowupdate")) {
+//////				ajaxFetchExecutableFlowUpdate(req, resp, ret, session.getUser());
+//////			}
+//		}
+//		if(hasParam(req, "schedule")) {
+		
+		if (ajaxName.equals("scheduleFlow")) {
+				ajaxScheduleFlow(req, ret, session.getUser());
+		}
+//		}
+		this.writeJSON(resp, ret);
+	}
+	
+	private void ajaxScheduleFlow(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException {
+		String projectId = getParam(req, "projectId");
+		String flowId = getParam(req, "flowId");
+		
+		Project project = projectManager.getProject(projectId);
+				
+		if (project == null) {
+			ret.put("message", "Project " + projectId + " does not exist");
+			ret.put("status", "error");
+			return;
+		}
+		
+		if (!project.hasPermission(user, Type.SCHEDULE)) {
+			ret.put("status", "error");
+			ret.put("message", "Permission denied. Cannot execute " + flowId);
+			return;
+		}
+
+		Flow flow = project.getFlow(flowId);
+		if (flow == null) {
+			ret.put("status", "error");
+			ret.put("message", "Flow " + flowId + " cannot be found in project " + project);
+			return;
+		}
+		
+//		int hour = getIntParam(req, "hour");
+//		int minutes = getIntParam(req, "minutes");
+//		boolean isPm = getParam(req, "am_pm").equalsIgnoreCase("pm");
+		int hour = 0;
+		int minutes = 0;
+		boolean isPm = false;
+		String scheduledDate = req.getParameter("date");
+		DateTime day = null;
+		if(scheduledDate == null || scheduledDate.trim().length() == 0) {
+			day = new LocalDateTime().toDateTime();
+		} else {
+		    try {
+		    	day = DateTimeFormat.forPattern("MM-dd-yyyy").parseDateTime(scheduledDate);
+		    } catch(IllegalArgumentException e) {
+		      	ret.put("error", "Invalid date: '" + scheduledDate + "'");
+		      	return;
+		      }
+		}
+
+		ReadablePeriod thePeriod = null;
+		if(hasParam(req, "is_recurring"))
+		    thePeriod = parsePeriod(req);
+
+		if(isPm && hour < 12)
+		    hour += 12;
+		hour %= 24;
+
+		String userSubmit = user.getUserId();
+		String userExec = getParam(req, "userExec");
+		String scheduleId = projectId + "." + flowId;
+		DateTime submitTime = new DateTime();
+		DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
+		
+		scheduleManager.schedule(scheduleId,userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
+		
+		
+
+		ret.put("status", "success");
+		ret.put("message", scheduleId + " scheduled.");
+	
+	}
+			
+	
+	
+	private ReadablePeriod parsePeriod(HttpServletRequest req) throws ServletException {
+//			int period = getIntParam(req, "period");
+//			String periodUnits = getParam(req, "period_units");
+			int period = 10;
+			String periodUnits = "m";
+			if("d".equals(periodUnits))
+				return Days.days(period);
+			else if("h".equals(periodUnits))
+				return Hours.hours(period);
+			else if("m".equals(periodUnits))
+				return Minutes.minutes(period);
+			else if("s".equals(periodUnits))
+				return Seconds.seconds(period);
+			else
+				throw new ServletException("Unknown period unit: " + periodUnits);
+	}
+
+}
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 83e720c..a916d1c 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -49,7 +49,7 @@
 						</div>
 						
 						<div id="executebtn" class="btn1">Execute</div>
-						<div id="schedulebtn" class="btn2">Schedule Flow</div>
+						<div id="scheduleflowbtn" class="btn2 scheduleflow">Schedule Flow</div>
 					</div>
 					
 					<div id="headertabs" class="headertabs">
@@ -71,7 +71,7 @@
 							</div>
 							<div id="svgDiv" >
 								<svg id="svgGraph" xmlns="http://www.w3.org/2000/svg" version="1.1" shape-rendering="optimize-speed" text-rendering="optimize-speed" >
-								</svg>
+								</svg>pload-project
 							</div>
 						</div>
 					</div>
@@ -107,6 +107,38 @@
 						</div>
 					</div>
 				</div>
+		<!-- modal content -->
+                <div id="schedule-flow" class="modal">
+                        <h3>Schedule A Flow</h3>
+                        <div id="errorMsg" class="box-error-message">$errorMsg</div>
+
+                        <div class="message">
+                                <fieldset>
+                                        <dl>
+                                                <dt><label for="path">Project Name</label></dt>
+                                                <dd><input id="path" name="project" type="text" size="20" title="The project name."/></dd>
+                                                <dt>Description</dt>
+                                                <dd><textarea id="description" name="description" rows="2" cols="40"></textarea></dd>
+
+                                                <input name="action" type="hidden" value="create" />
+                                                <input name="redirect" type="hidden" value="$!context/" />
+                                        </dl>
+                                </fieldset>
+                        </div>
+
+                        <div class="actions">
+                                <a class="yes btn3" id="schedule-btn" href="#">Schedule The Flow</a>
+                                <a class="no simplemodal-close btn4" href="#">Cancel</a>
+                        </div>
+                        <div id="invalid-session" class="modal">
+                                <h3>Invalid Session</h3>
+                                <p>Session has expired. Please re-login.</p>
+                                <div class="actions">
+                                        <a class="yes btn3" id="login-btn" href="#">Re-login</a>
+                                </div>
+                        </div>
+                </div>
+
 #end
 		<ul id="jobMenu" class="contextMenu flowSubmenu">  
 			<li class="open"><a href="#open">Open...</a></li>
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 73c244f..7fe381a 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -855,6 +855,60 @@ azkaban.GraphModel = Backbone.Model.extend({});
 var executionModel;
 azkaban.ExecutionModel = Backbone.Model.extend({});
 
+var scheduleFlowView;
+azkaban.ScheduleFlowView = Backbone.View.extend({
+  events : {
+    "click #schedule-btn": "handleScheduleFlow"
+  },
+  initialize : function(settings) {
+    $("#errorMsg").hide();
+  },
+  handleScheduleFlow : function(evt) {
+         // First make sure we can upload
+         //var projectName = $('#path').val();
+         //var description = $('#description').val();
+
+     console.log("Creating schedule");
+     $.ajax({
+        async: "false",
+        url: "schedule",
+        dataType: "json",
+        type: "POST",
+        data: {
+		action:"scheduleFlow", 
+		projectId:projectName, 
+		flowId:flowName,
+		userExec:"dummy",
+		is_recurring:true
+		},
+        success: function(data) {
+                if (data.status == "success") {
+                        if (data.action == "redirect") {
+                                window.location = data.path;
+                        }
+			else{
+				$("#errorMsg").text("Flow " + projectName + "." + flowName + " scheduled!" );			
+			}
+                }
+                else {
+                        if (data.action == "login") {
+                                        window.location = "";
+                        }
+                        else {
+                                $("#errorMsg").text("ERROR: " + data.message);
+                                $("#errorMsg").slideDown("fast");
+                        }
+                }
+        }
+     });
+
+  },
+  render: function() {
+  }
+});
+
+
+
 $(function() {
 	var selected;
 	// Execution model has to be created before the window switches the tabs.
@@ -867,6 +921,7 @@ $(function() {
 	svgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel});
 	jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel});
 	contextMenu = new azkaban.ContextMenu({el:$('#jobMenu')});
+	scheduleFlowView = new azkaban.ScheduleFlowView({el:$('#schedule-flow')});
 	
 	var requestURL = contextURL + "/manager";
 
@@ -947,4 +1002,21 @@ $(function() {
 		);
 		
 	});
+
+	$('#scheduleflowbtn').click( function() {
+	  console.log("schedule button clicked");
+	  $('#schedule-flow').modal({
+          closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+          position: ["20%",],
+          containerId: 'confirm-container',
+          containerCss: {
+            'height': '220px',
+            'width': '565px'
+          },
+          onShow: function (dialog) {
+            var modal = this;
+            $("#errorMsg").hide();
+          }
+        });
+	});
 });
diff --git a/unit/java/azkaban/scheduler/MockJobExecutorManager.java b/unit/java/azkaban/scheduler/MockJobExecutorManager.java
new file mode 100644
index 0000000..ce07adc
--- /dev/null
+++ b/unit/java/azkaban/scheduler/MockJobExecutorManager.java
@@ -0,0 +1,99 @@
+package azkaban.scheduler;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.joda.time.DateTime;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
+//import azkaban.flow.FlowExecutionHolder;
+import azkaban.executor.ExecutorManager;
+
+public class MockJobExecutorManager extends ExecutorManager {
+	
+	private ArrayList<ExecutionRecord> executionList = new ArrayList<ExecutionRecord>();
+	private RuntimeException throwException = null;
+	
+	public MockJobExecutorManager() throws IOException, ExecutorManagerException 
+	{
+		super(null);
+	}
+	
+	public void setThrowException(RuntimeException throwException) {
+		this.throwException = throwException;
+	}
+	
+//	@Override
+//    public void execute(String id, boolean ignoreDep) {
+//		DateTime time = new DateTime();
+//    	executionList.add(new ExecutionRecord(id, ignoreDep, time, throwException));
+//    	System.out.println("Running " + id + " at time " + time);
+//    	if (throwException != null) {
+//    		throw throwException;
+//    	}
+//    }
+
+	@Override
+    public void executeFlow(ExecutableFlow flow) {
+		//System.out.println("Did not expect");
+		DateTime nextExecTime = new DateTime();
+		DateTime submitTime = new DateTime();
+		DateTime firstSchedTime = new DateTime();
+		String scheduleId = flow.getProjectId()+"."+flow.getFlowId();
+    	executionList.add(new ExecutionRecord(scheduleId, "pymk", "cyu", submitTime, firstSchedTime, nextExecTime, throwException));
+    	System.out.println("Running " + scheduleId + " at time " + nextExecTime);
+    	if (throwException != null) {
+    		throw throwException;
+    	}
+	}
+
+//	@Override
+////    public void execute(FlowExecutionHolder holder) {
+////		System.out.println("Did not expect");
+////    }
+	
+	public ArrayList<ExecutionRecord> getExecutionList() {
+		return executionList;
+	}
+	
+	public void clearExecutionList() {
+		executionList.clear();
+	}
+	
+    public class ExecutionRecord {
+    	private final String scheduleId;
+    	private final String user;
+    	private final String userSubmit;
+    	private final DateTime submitTime;
+    	private final DateTime firstSchedTime;
+    	private final DateTime nextExecTime;
+    	private final Exception throwException;
+    	
+    	public ExecutionRecord(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime, DateTime nextExecTime) {
+    		this(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExecTime, null);
+    	}
+    	
+    	public ExecutionRecord(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime, DateTime nextExecTime, Exception throwException) {
+    		this.scheduleId = scheduleId;
+    		this.user = user;
+    		this.userSubmit = userSubmit;
+    		this.submitTime = submitTime;
+    		this.firstSchedTime = firstSchedTime;
+    		this.nextExecTime = nextExecTime;
+    		this.throwException = throwException;
+    	}
+
+		public String getScheduleId() {
+			return scheduleId;
+		}
+
+		public DateTime getNextExecTime() {
+			return nextExecTime;
+		}
+
+		public Exception getThrowException() {
+			return throwException;
+		}
+    }
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/scheduler/MockLoader.java b/unit/java/azkaban/scheduler/MockLoader.java
new file mode 100644
index 0000000..0d1f5eb
--- /dev/null
+++ b/unit/java/azkaban/scheduler/MockLoader.java
@@ -0,0 +1,36 @@
+package azkaban.scheduler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+public class MockLoader implements ScheduleLoader {
+	private ArrayList<ScheduledFlow> scheduledFlow = new ArrayList<ScheduledFlow>();
+	
+	public void addScheduledFlow(String scheduleId, String user, String userSubmit, DateTime submitTime, DateTime firstSchedTime, DateTime nextExec, Period recurrence) {
+		ScheduledFlow flow = new ScheduledFlow(scheduleId, user, userSubmit, submitTime, firstSchedTime, nextExec, recurrence);
+		addScheduleFlow(flow);
+	}
+	
+	public void addScheduleFlow(ScheduledFlow flow) {
+		scheduledFlow.add(flow);
+	}
+	
+	public void clearSchedule() {
+		scheduledFlow.clear();
+	}
+	
+	@Override
+	public List<ScheduledFlow> loadSchedule() {
+		return scheduledFlow;
+	}
+
+	@Override
+	public void saveSchedule(List<ScheduledFlow> schedule) {
+		scheduledFlow.clear();
+		scheduledFlow.addAll(schedule);
+	}
+	
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java b/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java
new file mode 100644
index 0000000..4579331
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/AllJobExecutorTests.java
@@ -0,0 +1,11 @@
+package azkaban.test.jobExecutor;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+
+@RunWith(Suite.class)
+@SuiteClasses({ JavaJobTest.class, ProcessJobTest.class, PythonJobTest.class })
+public class AllJobExecutorTests {
+
+}
diff --git a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
new file mode 100644
index 0000000..27a7881
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
@@ -0,0 +1,132 @@
+package azkaban.test.jobExecutor;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+import org.easymock.classextension.EasyMock;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.jobExecutor.JavaJob;
+import azkaban.jobExecutor.ProcessJob;
+
+public class JavaJobTest
+{
+
+  private JavaJob job = null;
+//  private JobDescriptor descriptor = null;
+  private Props props = null;
+  private Logger log = Logger.getLogger(JavaJob.class);
+  private static String classPaths ;
+
+  private static final String inputContent = 
+    "Quick Change in Strategy for a Bookseller \n" +
+    " By JULIE BOSMAN \n" +
+    "Published: August 11, 2010 \n" +
+    " \n" +
+    "Twelve years later, it may be Joe Fox�s turn to worry. Readers have gone from skipping small \n" +
+    "bookstores to wondering if they need bookstores at all. More people are ordering books online  \n" +
+    "or plucking them from the best-seller bin at Wal-Mart";
+
+  private static final String errorInputContent = 
+      inputContent + "\n stop_here " +
+      "But the threat that has the industry and some readers the most rattled is the growth of e-books. \n" +
+      " In the first five months of 2009, e-books made up 2.9 percent of trade book sales. In the same period \n" +
+      "in 2010, sales of e-books, which generally cost less than hardcover books, grew to 8.5 percent, according \n" +
+      "to the Association of American Publishers, spurred by sales of the Amazon Kindle and the new Apple iPad. \n" +
+      "For Barnes & Noble, long the largest and most powerful bookstore chain in the country, the new competition \n" +
+      "has led to declining profits and store traffic.";
+  
+ 
+  private static String inputFile ;
+  private static String errorInputFile ;
+  private static String outputFile ;
+  
+  @BeforeClass
+  public static void init() {
+    // get the classpath
+    Properties prop = System.getProperties();
+    classPaths = String.format("'%s'", prop.getProperty("java.class.path", null));
+ 
+    long time = (new Date()).getTime();
+   inputFile = "/tmp/azkaban_input_" + time;
+   errorInputFile = "/tmp/azkaban_input_error_" + time;
+   outputFile = "/tmp/azkaban_output_" + time;
+    // dump input files
+   try {
+     Utils.dumpFile(inputFile, inputContent);
+     Utils.dumpFile(errorInputFile, errorInputContent);
+   }
+   catch (IOException e) {
+     e.printStackTrace(System.err);
+     Assert.fail("error in creating input file:" + e.getLocalizedMessage());
+   }
+        
+  }
+  
+  @AfterClass
+  public static void cleanup() {
+    // remove the input file and error input file
+    Utils.removeFile(inputFile);
+    Utils.removeFile(errorInputFile);
+    //Utils.removeFile(outputFile);
+  }
+  
+  @Before
+  public void setUp() {
+    
+    /*  initialize job */
+//    descriptor = EasyMock.createMock(JobDescriptor.class);
+    
+    props = new Props();
+    props.put(AbstractProcessJob.WORKING_DIR, ".");
+    props.put("type", "java");
+    props.put("fullPath", ".");
+    
+//    EasyMock.expect(descriptor.getId()).andReturn("java").times(1);
+//    EasyMock.expect(descriptor.getProps()).andReturn(props).times(1);
+//    EasyMock.expect(descriptor.getFullPath()).andReturn(".").times(1);
+//    
+//    EasyMock.replay(descriptor);
+    
+    job = new JavaJob(props, log);
+    
+//    EasyMock.verify(descriptor);
+  }
+  
+  @Test
+  public void testJavaJob() {
+    /* initialize the Props */
+    props.put(JavaJob.JOB_CLASS, "azkaban.test.jobExecutor.WordCountLocal");
+    props.put(ProcessJob.WORKING_DIR, ".");
+    props.put("input", inputFile);
+    props.put("output", outputFile);
+    props.put("classpath",  classPaths);
+    job.run();
+  }
+  
+  @Test
+  public void testFailedJavaJob() {
+    props.put(JavaJob.JOB_CLASS, "azkaban.test.jobExecutor.WordCountLocal");
+    props.put(ProcessJob.WORKING_DIR, ".");
+    props.put("input", errorInputFile);
+    props.put("output", outputFile);
+    props.put("classpath", classPaths);
+    
+    try {
+    job.run();
+    }
+    catch (RuntimeException e) {
+      Assert.assertTrue(true);
+    }
+  }
+  
+}
+
diff --git a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
new file mode 100644
index 0000000..d859e02
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
@@ -0,0 +1,79 @@
+package azkaban.test.jobExecutor;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.log4j.Logger;
+import org.easymock.classextension.EasyMock;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.jobExecutor.ProcessJob;
+
+
+public class ProcessJobTest
+{
+  private ProcessJob job = null;
+//  private JobDescriptor descriptor = null;
+  private Props props = null;
+  private Logger log = Logger.getLogger(ProcessJob.class);
+  @Before
+  public void setUp() {
+    
+    /*  initialize job */
+//    props = EasyMock.createMock(Props.class);
+    
+    props = new Props();
+    props.put(AbstractProcessJob.WORKING_DIR, ".");
+    props.put("type", "command");
+    props.put("fullPath", ".");
+
+    
+//    EasyMock.expect(props.getString("type")).andReturn("command").times(1);
+//    EasyMock.expect(props.getProps()).andReturn(props).times(1);
+//    EasyMock.expect(props.getString("fullPath")).andReturn(".").times(1);
+//    
+//    EasyMock.replay(props);
+    
+    job = new ProcessJob(props, log);
+    
+
+  }
+  
+  @Test
+  public void testOneUnixCommand() {
+    /* initialize the Props */
+    props.put(ProcessJob.COMMAND, "ls -al");
+    props.put(ProcessJob.WORKING_DIR, ".");
+
+    job.run();
+    
+  }
+
+  @Test
+  public void testFailedUnixCommand() {
+    /* initialize the Props */
+    props.put(ProcessJob.COMMAND, "xls -al");
+    props.put(ProcessJob.WORKING_DIR, ".");
+
+    try {
+      job.run();
+    }catch (RuntimeException e) {
+      Assert.assertTrue(true);
+      e.printStackTrace();
+    }
+  }
+    
+    @Test
+    public void testMultipleUnixCommands( ) {
+      /* initialize the Props */
+      props.put(ProcessJob.WORKING_DIR, ".");
+      props.put(ProcessJob.COMMAND, "pwd");
+      props.put("command.1", "date");
+      props.put("command.2", "whoami");
+      
+      job.run();
+    }
+}
+
+
diff --git a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
new file mode 100644
index 0000000..48cfd42
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
@@ -0,0 +1,107 @@
+package azkaban.test.jobExecutor;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+import org.easymock.classextension.EasyMock;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.utils.Props;
+import azkaban.jobExecutor.AbstractProcessJob;
+import azkaban.jobExecutor.PythonJob;
+
+public class PythonJobTest
+{
+  private PythonJob job = null;
+//  private JobDescriptor descriptor = null;
+  private Props props = null;
+  private Logger log = Logger.getLogger(PythonJob.class);
+  
+  private static final String scriptContent = 
+    "#!/usr/bin/python  \n" +
+    "import re, string, sys  \n" +
+    "# if no arguments were given, print a helpful message \n" +
+    "l=len(sys.argv) \n" +
+    "if l < 1: \n"+
+        "\tprint 'Usage: celsium --t temp' \n" +
+        "\tsys.exit(1) \n" +
+    "\n" +
+    "# Loop over the arguments \n" +
+    "i=1 \n" +
+    "while i < l-1 : \n" +
+        "\tname = sys.argv[i] \n" +
+        "\tvalue = sys.argv[i+1] \n" +
+        "\tif name == \"--t\": \n" +
+        "\t\ttry: \n" +
+                "\t\t\tfahrenheit = float(string.atoi(value)) \n" +
+        "\t\texcept string.atoi_error: \n" +
+               "\t\t\tprint repr(value), \" not a numeric value\" \n" +
+        "\t\telse: \n" +
+                "\t\t\tcelsius=(fahrenheit-32)*5.0/9.0 \n" +
+                "\t\t\tprint '%i F = %iC' % (int(fahrenheit), int(celsius+.5)) \n" +
+                "\t\t\tsys.exit(0) \n" +
+        "\t\ti=i+2\n" ;
+ 
+ 
+  private static String scriptFile ;
+  
+  
+
+  @BeforeClass
+  public static void init() {
+ 
+    long time = (new Date()).getTime();
+    scriptFile = "/tmp/azkaban_python" + time + ".py";
+    // dump script file
+   try {
+     Utils.dumpFile(scriptFile, scriptContent);
+    }
+   catch (IOException e) {
+     e.printStackTrace(System.err);
+     Assert.fail("error in creating script file:" + e.getLocalizedMessage());
+   }
+        
+  }
+  
+  @AfterClass
+  public static void cleanup() {
+    // remove the input file and error input file
+    Utils.removeFile(scriptFile);
+  }
+  
+  @Test
+  public void testPythonJob() {
+    
+    /*  initialize job */
+//    descriptor = EasyMock.createMock(JobDescriptor.class);
+    
+    props = new Props();
+    props.put(AbstractProcessJob.WORKING_DIR, ".");
+    props.put("type", "python");
+    props.put("script",  scriptFile);
+    props.put("t",  "90");
+    props.put("type", "script");
+    props.put("fullPath", ".");
+
+//    EasyMock.expect(descriptor.getId()).andReturn("script").times(1);
+//    EasyMock.expect(descriptor.getProps()).andReturn(props).times(3);
+//    EasyMock.expect(descriptor.getFullPath()).andReturn(".").times(1);
+//    EasyMock.replay(descriptor);
+    job = new PythonJob(props, log);
+//    EasyMock.verify(descriptor);
+    try
+    {
+      job.run();
+    }
+    catch (Exception e)
+    {
+      e.printStackTrace(System.err);
+      Assert.fail("Python job failed:" + e.getLocalizedMessage());
+    }
+  }
+ 
+}
diff --git a/unit/java/azkaban/test/jobExecutor/Utils.java b/unit/java/azkaban/test/jobExecutor/Utils.java
new file mode 100644
index 0000000..0c012cf
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/Utils.java
@@ -0,0 +1,23 @@
+package azkaban.test.jobExecutor;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+public class Utils
+{
+
+    public static void dumpFile (String filename, String filecontent) 
+    throws IOException {
+      PrintWriter writer = new PrintWriter(new FileWriter(filename));
+      writer.print(filecontent);
+      writer.close();
+    }
+    
+    public static void removeFile (String filename) {
+      File file = new File (filename);
+      file.delete();
+    }
+    
+}
diff --git a/unit/java/azkaban/test/jobExecutor/WordCountLocal.java b/unit/java/azkaban/test/jobExecutor/WordCountLocal.java
new file mode 100644
index 0000000..c22119e
--- /dev/null
+++ b/unit/java/azkaban/test/jobExecutor/WordCountLocal.java
@@ -0,0 +1,80 @@
+package azkaban.test.jobExecutor;
+    
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.log4j.Logger;
+
+import azkaban.jobExecutor.AbstractJob;
+import azkaban.utils.Props;
+
+public class WordCountLocal extends AbstractJob {
+    
+    private String _input = null;
+    private String _output = null;
+    private Map<String, Integer> _dic = new HashMap<String,Integer>();
+    
+    public WordCountLocal(String id, Props prop)
+    {
+        super(id, Logger.getLogger(WordCountLocal.class));
+        _input = prop.getString("input");
+        _output = prop.getString("output");
+    }
+    
+    
+    public void run ()  throws Exception {
+    
+        if (_input == null) throw new Exception ("input file is null");
+        if (_output == null) throw new Exception ("output file is null");
+         BufferedReader in = new BufferedReader (new InputStreamReader( new FileInputStream(_input)));
+         
+         String line = null;
+         while ( (line = in.readLine()) != null ) {
+           StringTokenizer tokenizer = new StringTokenizer(line);
+           while (tokenizer.hasMoreTokens()) {
+             String word =tokenizer.nextToken();
+         
+             if (word.toString().equals("end_here")) { //expect an out-of-bound exception
+                 String [] errArray = new String[1];
+                 System.out.println("string in possition 2 is " + errArray[1]);
+             }
+    
+             if (_dic.containsKey(word)) {
+                 Integer num = _dic.get(word);
+                 _dic.put(word, num +1);
+             }
+             else {
+                 _dic.put(word, 1);
+             }
+             }
+         }
+         in.close();
+         
+         PrintWriter out = new PrintWriter(new FileOutputStream(_output));
+         for (Map.Entry<String, Integer> entry: _dic.entrySet()) {
+             out.println (entry.getKey() + "\t" + entry.getValue());
+         }
+         out.close();
+       }
+ 
+    @Override
+    public Props getJobGeneratedProperties()
+    {
+      return new Props();
+    }
+
+    @Override
+    public boolean isCanceled()
+    {
+      return false;
+    }
+    
+    
+    }
+    
\ No newline at end of file