azkaban-memoizeit

Changes

Details

diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 5fb5355..7d0efba 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -22,7 +22,7 @@ public class ExecutableFlow {
 	private long lastCheckedTime;
 	
 	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
-	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();;
+	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
 	private ArrayList<String> startNodes;
 	private ArrayList<String> endNodes;
 	
@@ -452,8 +452,7 @@ public class ExecutableFlow {
 			this.flow = flow;
 		}
 		
-		private ExecutableNode() {
-			
+		public ExecutableNode() {
 		}
 		
 		public String getId() {
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 9fc657a..d479d83 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -31,6 +31,7 @@ import azkaban.executor.event.Event.Type;
 import azkaban.executor.event.EventHandler;
 import azkaban.executor.event.EventListener;
 import azkaban.flow.FlowProps;
+import azkaban.jobExecutor.utils.JobWrappingFactory;
 import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.Props;
 
@@ -61,7 +62,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 	private Thread currentThread;
 	
-	private Set<String> emailAddress;
 	private List<String> jobsFinished;
 
 	public enum FailedFlowOptions {
@@ -69,14 +69,13 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 
 	private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
-
+	
 	public FlowRunner(ExecutableFlow flow) {
 		this.flow = flow;
 		this.basePath = new File(flow.getExecutionPath());
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
 		this.listener = new JobRunnerEventListener(this);
-		this.emailAddress = new HashSet<String>();
 		this.jobsFinished = new ArrayList<String>();
 
 		createLogger();
@@ -85,10 +84,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 	public ExecutableFlow getFlow() {
 		return flow;
 	}
-
-	public Set<String> getEmails() {
-		return emailAddress;
-	}
 	
 	public List<String> getJobsFinished() {
 		return jobsFinished;
@@ -459,7 +454,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (event.getType() == Type.JOB_SUCCEEDED) {
 				logger.info("Job Succeeded " + jobID + " in "
 						+ (node.getEndTime() - node.getStartTime()) + " ms");
-				emailAddress.addAll(runner.getNotifyEmails());
+
 				jobsFinished.add(jobID);
 				Props props = runner.getOutputProps();
 				outputProps.put(jobID, props);
@@ -468,7 +463,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 				logger.info("Job Failed " + jobID + " in "
 						+ (node.getEndTime() - node.getStartTime()) + " ms");
-				emailAddress.addAll(runner.getNotifyEmails());
+
 				jobsFinished.add(jobID);
 				logger.info(jobID + " FAILED");
 				flowRunner.handleFailedJob(runner.getNode());
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 37048f0..2e41f11 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -4,22 +4,14 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import javax.servlet.ServletRequest;
-
 import org.apache.log4j.Logger;
-import org.joda.time.Duration;
-import org.joda.time.format.PeriodFormat;
 
 import azkaban.utils.Utils;
 import azkaban.executor.ExecutableFlow.Status;
@@ -48,8 +40,6 @@ public class FlowRunnerManager {
 	private FlowRunnerEventListener eventListener;
 
 	private Mailman mailer;
-	//private String defaultFailureEmail;
-	//private String defaultSuccessEmail;
 	private String senderAddress;
 	private String clientHostname;
 	private String clientPortNumber;
@@ -58,8 +48,7 @@ public class FlowRunnerManager {
 	
 	public FlowRunnerManager(Props props, Props globalProps, Mailman mailer) {
 		this.mailer = mailer;
-//		this.defaultFailureEmail = props.getString("job.failure.email");
-//		this.defaultSuccessEmail = props.getString("job.success.email");
+		
 		this.senderAddress = props.getString("mail.sender");
 		this.clientHostname = props.getString("jetty.hostname", "localhost");
 		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
@@ -200,7 +189,7 @@ public class FlowRunnerManager {
      */
     private void sendErrorEmail(FlowRunner runner) {
     	ExecutableFlow flow = runner.getFlow();
-    	List<String> emailList = new ArrayList<String>(runner.getEmails());
+    	List<String> emailList = flow.getFailureEmails();
         if(emailList != null && !emailList.isEmpty() && mailer != null) {
         	
         	
@@ -231,7 +220,8 @@ public class FlowRunnerManager {
     private void sendSuccessEmail(FlowRunner runner) {
     	
     	ExecutableFlow flow = runner.getFlow();
-    	List<String> emailList = new ArrayList<String>(runner.getEmails());
+
+    	List<String> emailList = flow.getSuccessEmails();
         
         if(emailList != null && !emailList.isEmpty() && mailer != null) {
             try {
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 9c22d94..71e4e87 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -3,7 +3,6 @@ package azkaban.executor;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.log4j.Appender;
@@ -12,7 +11,6 @@ import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
-
 import azkaban.executor.ExecutableFlow.ExecutableNode;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.event.Event;
@@ -26,19 +24,19 @@ import azkaban.utils.Props;
 public class JobRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout(
 			"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-	
-	private static final String EMAILLIST = "notify.emails";
-	
+
 	private Props props;
 	private Props outputProps;
 	private ExecutableNode node;
 	private File workingDir;
-	
+
 	private Logger logger = null;
 	private Layout loggerLayout = DEFAULT_LAYOUT;
 	private Appender jobAppender;
+	private File logFile;
 	
 	private Job job;
+	private String executionId = null;
 	
 	private static final Object logCreatorLock = new Object();
 	
@@ -46,23 +44,35 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.props = props;
 		this.node = node;
 		this.workingDir = workingDir;
+		this.executionId = node.getFlow().getExecutionId();
 	}
 
+	public JobRunner(String executionId, ExecutableNode node, Props props, File workingDir) {
+		this.props = props;
+		this.node = node;
+		this.workingDir = workingDir;
+		this.executionId = executionId;
+	}
+	
 	public ExecutableNode getNode() {
 		return node;
 	}
-
+	
+	public String getLogFilePath() {
+		return logFile == null ? null : logFile.getPath();
+	}
+	
 	private void createLogger() {
 		// Create logger
-		synchronized(logCreatorLock) {
-			String loggerName = System.currentTimeMillis() + "." + node.getFlow().getExecutionId() + "." + node.getId();
+		synchronized (logCreatorLock) {
+			String loggerName = System.currentTimeMillis() + "." + executionId + "." + node.getId();
 			logger = Logger.getLogger(loggerName);
-	
+
 			// Create file appender
-			String logName = "_job." + node.getFlow().getExecutionId() + "." + node.getId() + ".log";
-			File logFile = new File(workingDir, logName);
+			String logName = "_job." + executionId + "." + node.getId() + ".log";
+			logFile = new File(workingDir, logName);
 			String absolutePath = logFile.getAbsolutePath();
-	
+
 			jobAppender = null;
 			try {
 				jobAppender = new FileAppender(loggerLayout, absolutePath, false);
@@ -79,52 +89,41 @@ public class JobRunner extends EventHandler implements Runnable {
 			jobAppender.close();
 		}
 	}
-	
+
 	@Override
 	public void run() {
+		node.setStartTime(System.currentTimeMillis());
 		if (node.getStatus() == Status.DISABLED) {
 			node.setStatus(Status.SKIPPED);
+			node.setEndTime(System.currentTimeMillis());
 			this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
 			return;
 		} else if (node.getStatus() == Status.KILLED) {
+			node.setEndTime(System.currentTimeMillis());
 			this.fireEventListeners(Event.create(this, Type.JOB_KILLED));
 			return;
 		}
-		
+
 		createLogger();
 		this.node.setStatus(Status.WAITING);
-		node.setStartTime(System.currentTimeMillis());
+
 		logInfo("Starting job " + node.getId() + " at " + node.getStartTime());
 		node.setStatus(Status.RUNNING);
 		this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
-		
+
 		boolean succeeded = true;
-//		synchronized(this) {
-//			try {
-//				wait(5000);
-//			}
-//			catch (InterruptedException e) {
-//				logger.info("Job cancelled.");
-//				succeeded = false;
-//			}
-//		}
 
+		props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
+		job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getId(), props, logger);
 
-		// Run Job
-		//boolean succeeded = true;
+		try {
+			job.run();
+		} catch (Throwable e) {
+			succeeded = false;
+			logError("Job run failed!");
+			e.printStackTrace();
+		}
 
-		props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
-		JobWrappingFactory factory  = JobWrappingFactory.getJobWrappingFactory();
-        job = factory.buildJobExecutor(props, logger);
-
-        try {
-                job.run();
-        } catch (Exception e) {
-                succeeded = false;
-                logError("Job run failed!");
-                e.printStackTrace();
-        }
-		
 		node.setEndTime(System.currentTimeMillis());
 		if (succeeded) {
 			node.setStatus(Status.SUCCEEDED);
@@ -142,21 +141,21 @@ public class JobRunner extends EventHandler implements Runnable {
 
 	public synchronized void cancel() {
 		// Cancel code here
-		if(job == null) {
-			logError("Job doesn't exisit!");
-            return;
+		if (job == null) {
+			logError("Job doesn't exist!");
+			return;
 		}
 
 		try {
-            job.cancel();
+			job.cancel();
 		} catch (Exception e) {
 			logError("Failed trying to cancel job!");
-            e.printStackTrace();
+			e.printStackTrace();
 		}
 
 		// will just interrupt, I guess, until the code is finished.
 		this.notifyAll();
-		
+
 		node.setStatus(Status.KILLED);
 	}
 
@@ -168,22 +167,16 @@ public class JobRunner extends EventHandler implements Runnable {
 		return outputProps;
 	}
 
-	public List<String> getNotifyEmails() {
-		List<String> emails = new ArrayList<String>();
-		emails = this.props.getStringList(EMAILLIST);
-		return emails;
-	}
-	
 	private void logError(String message) {
 		if (logger != null) {
 			logger.error(message);
 		}
 	}
-	
+
 	private void logInfo(String message) {
 		if (logger != null) {
 			logger.info(message);
 		}
 	}
-	
+
 }
diff --git a/src/java/azkaban/jobExecutor/AbstractProcessJob.java b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
index 26ff388..5f03d7f 100644
--- a/src/java/azkaban/jobExecutor/AbstractProcessJob.java
+++ b/src/java/azkaban/jobExecutor/AbstractProcessJob.java
@@ -59,8 +59,8 @@ public abstract class AbstractProcessJob extends AbstractJob {
 
     private volatile Props generatedPropeties;
 
-    protected AbstractProcessJob(final Props props, final Logger log) {
-        super(props.getString(JOB_ID, "unkownjob"), log);
+    protected AbstractProcessJob(String jobid, final Props props, final Logger log) {
+        super(jobid, log);
 
         _props = props;
         _jobPath = props.getString(JOB_FULLPATH, new File(".").getAbsolutePath());
diff --git a/src/java/azkaban/jobExecutor/JavaJob.java b/src/java/azkaban/jobExecutor/JavaJob.java
index c951de1..6533875 100644
--- a/src/java/azkaban/jobExecutor/JavaJob.java
+++ b/src/java/azkaban/jobExecutor/JavaJob.java
@@ -42,8 +42,8 @@ public class JavaJob extends JavaProcessJob {
 	private Object _javaObject = null;
 	private String props;
 
-	public JavaJob(Props props, Logger log) {
-		super(props, log);
+	public JavaJob(String jobid, Props props, Logger log) {
+		super(jobid, props, log);
 	}
 
 	@Override
diff --git a/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java b/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java
index eaff543..0ee1126 100644
--- a/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java
+++ b/src/java/azkaban/jobExecutor/JavaJobRunnerMain.java
@@ -34,246 +34,275 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
 public class JavaJobRunnerMain {
 
-    public static final String JOB_CLASS = "job.class";
-    public static final String DEFAULT_RUN_METHOD = "run";
-    public static final String DEFAULT_CANCEL_METHOD = "cancel";
-
-    // This is the Job interface method to get the properties generated by the job.
-    public static final String GET_GENERATED_PROPERTIES_METHOD = "getJobGeneratedProperties";
-
-    public static final String CANCEL_METHOD_PARAM = "method.cancel";
-    public static final String RUN_METHOD_PARAM = "method.run";
-    public static final String PROPS_CLASS = "azkaban.utils.Props";
-
-    private static final Layout DEFAULT_LAYOUT = new PatternLayout("%p %m\n");
-
-    public final Logger _logger;
-
-    public String _cancelMethod;
-    public String _jobName;
-    public Object _javaObject;
-    private boolean _isFinished = false;
-
-    public static void main(String[] args) throws Exception {
-        @SuppressWarnings("unused")
-        JavaJobRunnerMain wrapper = new JavaJobRunnerMain();
-    }
-
-    public JavaJobRunnerMain() throws Exception {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                cancelJob();
-            }
-        }
-        );
-
-        try {
-            _jobName = System.getenv(ProcessJob.JOB_NAME_ENV);
-            String propsFile = System.getenv(ProcessJob.JOB_PROP_ENV);
-
-            _logger = Logger.getRootLogger();
-            _logger.removeAllAppenders();
-            ConsoleAppender appender = new ConsoleAppender(DEFAULT_LAYOUT);
-            appender.activateOptions();
-            _logger.addAppender(appender);
-
-            Properties prop = new Properties();
-            prop.load(new BufferedReader(new FileReader(propsFile)));
-
-            _logger.info("Running job " + _jobName);
-            String className = prop.getProperty(JOB_CLASS);
-            if(className == null) {
-                throw new Exception("Class name is not set.");
-            }
-            _logger.info("Class name " + className);
-
-            // Create the object using proxy
-            if (SecurityUtils.shouldProxy(prop)) {
-                _javaObject = getObjectAsProxyUser(prop, _logger, _jobName, className);
-            }
-            else {
-                _javaObject = getObject(_jobName, className, prop);
-            }
-            if(_javaObject == null) {
-                _logger.info("Could not create java object to run job: " + className);
-                throw new Exception("Could not create running object");
-            }
-
-            _cancelMethod = prop.getProperty(CANCEL_METHOD_PARAM, DEFAULT_CANCEL_METHOD);
-
-            final String runMethod = prop.getProperty(RUN_METHOD_PARAM, DEFAULT_RUN_METHOD);
-            _logger.info("Invoking method " + runMethod);
-
-            if(SecurityUtils.shouldProxy(prop)) {
-              _logger.info("Proxying enabled.");
-              runMethodAsProxyUser(prop, _javaObject, runMethod);
-            } else {
-              _logger.info("Proxy check failed, not proxying run.");
-              runMethod(_javaObject, runMethod);
-            }
-            _isFinished = true;
-
-            // Get the generated properties and store them to disk, to be read by ProcessJob.
-            try {
-                final Method generatedPropertiesMethod = _javaObject.getClass().getMethod(GET_GENERATED_PROPERTIES_METHOD, new Class<?>[]{});
-                Props outputGendProps = (Props) generatedPropertiesMethod.invoke(_javaObject, new Object[] {});
-                outputGeneratedProperties(outputGendProps);
-            } catch (NoSuchMethodException e) {
-                _logger.info(
-                        String.format(
-                                "Apparently there isn't a method[%s] on object[%s], using empty Props object instead.",
-                                GET_GENERATED_PROPERTIES_METHOD,
-                                _javaObject
-                        )
-                );
-                outputGeneratedProperties(new Props());
-            }
-        } catch (Exception e) {
-            _isFinished = true;
-            throw e;
-        }
-    }
-
-  private void runMethodAsProxyUser(Properties prop, final Object obj, final String runMethod) throws IOException, InterruptedException {
-    SecurityUtils.getProxiedUser(prop, _logger, new Configuration()).doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        runMethod(obj, runMethod);
-        return null;
-      }
-    });
-  }
-
-
-
-  private void runMethod(Object obj, String runMethod) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
-    obj.getClass().getMethod(runMethod, new Class<?>[] {}).invoke(obj);
-  }
-
-  private void outputGeneratedProperties(Props outputProperties)
-    {
-        _logger.info("Outputting generated properties to " + ProcessJob.JOB_OUTPUT_PROP_FILE);
-        if (outputProperties == null) {
-            _logger.info("  no gend props");
-            return;
-        }
-        for (String key : outputProperties.getKeySet()) {
-            _logger.info("  gend prop " + key + " value:" + outputProperties.get(key));
-        }
-        String outputFileStr = System.getenv(ProcessJob.JOB_OUTPUT_PROP_FILE);
-        if (outputFileStr == null) {
-            return;
-        }
-
-        Map<String, String> properties = new LinkedHashMap<String, String>();
-        for (String key : outputProperties.getKeySet()) {
-            properties.put(key, outputProperties.get(key));
-        }
-
-        OutputStream writer = null;
-        try {
-            writer = new BufferedOutputStream(new FileOutputStream(outputFileStr));
-            // Manually serialize into JSON instead of adding org.json to external classpath
-            writer.write("{\n".getBytes());
-            for (Map.Entry<String, String> entry : properties.entrySet()) {
-                writer.write(String.format(
-                        "  '%s':'%s',\n",
-                        entry.getKey().replace("'", "\\'"),
-                        entry.getValue().replace("'", "\\'")
-                ).getBytes());
-            }
-            writer.write("}".getBytes());
-        }
-        catch (Exception e) {
-            new RuntimeException("Unable to store output properties to: " + outputFileStr);
-        }
-        finally {
-        	try {
+	public static final String JOB_CLASS = "job.class";
+	public static final String DEFAULT_RUN_METHOD = "run";
+	public static final String DEFAULT_CANCEL_METHOD = "cancel";
+
+	// This is the Job interface method to get the properties generated by the
+	// job.
+	public static final String GET_GENERATED_PROPERTIES_METHOD = "getJobGeneratedProperties";
+
+	public static final String CANCEL_METHOD_PARAM = "method.cancel";
+	public static final String RUN_METHOD_PARAM = "method.run";
+	public static final String[] PROPS_CLASSES = new String[] { "azkaban.utils.Props", "azkaban.common.utils.Props" };
+
+	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%p %m\n");
+
+	public final Logger _logger;
+
+	public String _cancelMethod;
+	public String _jobName;
+	public Object _javaObject;
+	private boolean _isFinished = false;
+
+	public static void main(String[] args) throws Exception {
+		@SuppressWarnings("unused")
+		JavaJobRunnerMain wrapper = new JavaJobRunnerMain();
+	}
+
+	public JavaJobRunnerMain() throws Exception {
+		Runtime.getRuntime().addShutdownHook(new Thread() {
+			public void run() {
+				cancelJob();
+			}
+		});
+
+		try {
+			_jobName = System.getenv(ProcessJob.JOB_NAME_ENV);
+			String propsFile = System.getenv(ProcessJob.JOB_PROP_ENV);
+
+			_logger = Logger.getRootLogger();
+			_logger.removeAllAppenders();
+			ConsoleAppender appender = new ConsoleAppender(DEFAULT_LAYOUT);
+			appender.activateOptions();
+			_logger.addAppender(appender);
+
+			Properties prop = new Properties();
+			prop.load(new BufferedReader(new FileReader(propsFile)));
+
+			_logger.info("Running job " + _jobName);
+			String className = prop.getProperty(JOB_CLASS);
+			if (className == null) {
+				throw new Exception("Class name is not set.");
+			}
+			_logger.info("Class name " + className);
+
+			// Create the object using proxy
+			if (SecurityUtils.shouldProxy(prop)) {
+				_javaObject = getObjectAsProxyUser(prop, _logger, _jobName, className);
+			} else {
+				_javaObject = getObject(_jobName, className, prop, _logger);
+			}
+			if (_javaObject == null) {
+				_logger.info("Could not create java object to run job: " + className);
+				throw new Exception("Could not create running object");
+			}
+
+			_cancelMethod = prop.getProperty(CANCEL_METHOD_PARAM, DEFAULT_CANCEL_METHOD);
+
+			final String runMethod = prop.getProperty(RUN_METHOD_PARAM, DEFAULT_RUN_METHOD);
+			_logger.info("Invoking method " + runMethod);
+
+			if (SecurityUtils.shouldProxy(prop)) {
+				_logger.info("Proxying enabled.");
+				runMethodAsProxyUser(prop, _javaObject, runMethod);
+			} else {
+				_logger.info("Proxy check failed, not proxying run.");
+				runMethod(_javaObject, runMethod);
+			}
+			_isFinished = true;
+
+			// Get the generated properties and store them to disk, to be read
+			// by ProcessJob.
+			try {
+				final Method generatedPropertiesMethod = _javaObject.getClass().getMethod(
+						GET_GENERATED_PROPERTIES_METHOD, new Class<?>[] {});
+				Props outputGendProps = (Props) generatedPropertiesMethod.invoke(_javaObject, new Object[] {});
+				outputGeneratedProperties(outputGendProps);
+			} catch (NoSuchMethodException e) {
+				_logger.info(String.format(
+						"Apparently there isn't a method[%s] on object[%s], using empty Props object instead.",
+						GET_GENERATED_PROPERTIES_METHOD, _javaObject));
+				outputGeneratedProperties(new Props());
+			}
+		} catch (Exception e) {
+			_isFinished = true;
+			throw e;
+		}
+	}
+
+	private void runMethodAsProxyUser(Properties prop, final Object obj, final String runMethod) throws IOException,
+			InterruptedException {
+		SecurityUtils.getProxiedUser(prop, _logger, new Configuration()).doAs(new PrivilegedExceptionAction<Void>() {
+			@Override
+			public Void run() throws Exception {
+				runMethod(obj, runMethod);
+				return null;
+			}
+		});
+	}
+
+	private void runMethod(Object obj, String runMethod) throws IllegalAccessException, InvocationTargetException,
+			NoSuchMethodException {
+		obj.getClass().getMethod(runMethod, new Class<?>[] {}).invoke(obj);
+	}
+
+	private void outputGeneratedProperties(Props outputProperties) {
+		_logger.info("Outputting generated properties to " + ProcessJob.JOB_OUTPUT_PROP_FILE);
+
+		if (outputProperties == null) {
+			_logger.info("  no gend props");
+			return;
+		}
+		for (String key : outputProperties.getKeySet()) {
+			_logger.info("  gend prop " + key + " value:" + outputProperties.get(key));
+		}
+
+		String outputFileStr = System.getenv(ProcessJob.JOB_OUTPUT_PROP_FILE);
+		if (outputFileStr == null) {
+			return;
+		}
+
+		Map<String, String> properties = new LinkedHashMap<String, String>();
+		for (String key : outputProperties.getKeySet()) {
+			properties.put(key, outputProperties.get(key));
+		}
+
+		OutputStream writer = null;
+		try {
+			writer = new BufferedOutputStream(new FileOutputStream(outputFileStr));
+			// Manually serialize into JSON instead of adding org.json to
+			// external classpath
+			writer.write("{\n".getBytes());
+			for (Map.Entry<String, String> entry : properties.entrySet()) {
+				writer.write(String.format("  '%s':'%s',\n", entry.getKey().replace("'", "\\'"),
+						entry.getValue().replace("'", "\\'")).getBytes());
+			}
+			writer.write("}".getBytes());
+		} catch (Exception e) {
+			new RuntimeException("Unable to store output properties to: " + outputFileStr);
+		} finally {
+			try {
 				writer.close();
-			} catch (IOException e) {			
+			} catch (IOException e) {
+			}
+		}
+	}
+
+	public void cancelJob() {
+		if (_isFinished) {
+			return;
+		}
+		_logger.info("Attempting to call cancel on this job");
+		if (_javaObject != null) {
+			Method method = null;
+
+			try {
+				method = _javaObject.getClass().getMethod(_cancelMethod);
+			} catch (SecurityException e) {
+			} catch (NoSuchMethodException e) {
+			}
+
+			if (method != null)
+				try {
+					method.invoke(_javaObject);
+				} catch (Exception e) {
+					if (_logger != null) {
+						_logger.error("Cancel method failed! ", e);
+					}
+				}
+			else {
+				throw new RuntimeException("Job " + _jobName + " does not have cancel method " + _cancelMethod);
+			}
+		}
+	}
+
+	private static Object getObjectAsProxyUser(final Properties prop, final Logger logger, final String jobName,
+			final String className) throws Exception {
+
+		Object obj = SecurityUtils.getProxiedUser(prop, logger, new Configuration()).doAs(
+				new PrivilegedExceptionAction<Object>() {
+					@Override
+					public Object run() throws Exception {
+						return getObject(jobName, className, prop, logger);
+					}
+				});
+
+		return obj;
+	}
+
+	private static Object getObject(String jobName, String className, Properties properties, Logger logger)
+			throws Exception {
+
+		Class<?> runningClass = JavaJobRunnerMain.class.getClassLoader().loadClass(className);
+
+		if (runningClass == null) {
+			throw new Exception("Class " + className + " was not found. Cannot run job.");
+		}
+
+		Class<?> propsClass = null;
+		for (String propClassName : PROPS_CLASSES) {
+			propsClass = JavaJobRunnerMain.class.getClassLoader().loadClass(propClassName);
+			if (propsClass != null) {
+				break;
+			}
+		}
+
+		Object obj = null;
+		if (propsClass != null && getConstructor(runningClass, String.class, propsClass) != null) {
+			// Create props class
+			Constructor<?> propsCon = getConstructor(propsClass, propsClass, Properties[].class);
+			Object props = propsCon.newInstance(null, new Properties[] { properties });
+
+			Constructor<?> con = getConstructor(runningClass, String.class, propsClass);
+			logger.info("Constructor found " + con.toGenericString());
+			obj = con.newInstance(jobName, props);
+		} else if (getConstructor(runningClass, String.class, Properties.class) != null) {
+			
+			Constructor<?> con = getConstructor(runningClass, String.class, Properties.class);
+			logger.info("Constructor found " + con.toGenericString());
+			obj = con.newInstance(jobName, properties);
+		} else if (getConstructor(runningClass, String.class, Map.class) != null) {
+			Constructor<?> con = getConstructor(runningClass, String.class, Map.class);
+			logger.info("Constructor found " + con.toGenericString());
+
+			@SuppressWarnings("rawtypes")
+			HashMap map = new HashMap();
+			for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+				map.put(entry.getKey(), entry.getValue());
+			}
+			obj = con.newInstance(jobName, map);
+		} else if (getConstructor(runningClass, String.class) != null) {
+			Constructor<?> con = getConstructor(runningClass, String.class);
+			logger.info("Constructor found " + con.toGenericString());
+			obj = con.newInstance(jobName);
+		} else if (getConstructor(runningClass) != null) {
+			Constructor<?> con = getConstructor(runningClass);
+			logger.info("Constructor found " + con.toGenericString());
+			obj = con.newInstance();
+		} else {
+			logger.error("Constructor not found. Listing available Constructors.");
+			for (Constructor<?> c : runningClass.getConstructors()) {
+				logger.info(c.toGenericString());
 			}
-        }
-    }
-
-    public void cancelJob() {
-        if (_isFinished) {
-            return;
-        }
-        _logger.info("Attempting to call cancel on this job");
-        if (_javaObject != null) {
-            Method method = null;
-
-             try {
-                method = _javaObject.getClass().getMethod(_cancelMethod);
-            } catch(SecurityException e) {
-            } catch(NoSuchMethodException e) {
-            }
-
-            if (method != null)
-                try {
-                    method.invoke(_javaObject);
-                } catch(Exception e) {
-                    if (_logger != null) {
-                        _logger.error("Cancel method failed! ", e);
-                    }
-                }
-            else {
-                throw new RuntimeException("Job " + _jobName  + " does not have cancel method " + _cancelMethod);
-            }
-        }
-    }
-
-    private static Object getObjectAsProxyUser(final Properties prop, final Logger logger, final String jobName, final String className) throws Exception{
-        Object obj = SecurityUtils.getProxiedUser(prop, logger, new Configuration()).doAs(new PrivilegedExceptionAction<Object>() {
-            @Override
-            public Object run() throws Exception {
-                return getObject(jobName, className, prop);
-            }
-          });
-        
-        return obj;
-    }
-    
-    private static Object getObject(String jobName, String className, Properties properties)
-            throws Exception {
-        Class<?> runningClass = JavaJobRunnerMain.class.getClassLoader().loadClass(className);
-
-        if(runningClass == null) {
-            throw new Exception("Class " + className + " was not found. Cannot run job.");
-        }
-
-        Class<?> propsClass = JavaJobRunnerMain.class.getClassLoader().loadClass(PROPS_CLASS);
-
-        Object obj = null;
-        if(propsClass != null && getConstructor(runningClass, String.class, propsClass) != null) {
-            // This case covers the use of azkaban.common.utils.Props with the
-            Constructor<?> con = getConstructor(propsClass, propsClass, Properties[].class);
-            Object props = con.newInstance(null, new Properties[] { properties });
-            obj = getConstructor(runningClass, String.class, propsClass).newInstance(jobName, props);
-        } else if(getConstructor(runningClass, String.class, Properties.class) != null) {
-            obj = getConstructor(runningClass, String.class, Properties.class).newInstance(jobName,
-                                                                                           properties);
-        } else if(getConstructor(runningClass, String.class) != null) {
-            obj = getConstructor(runningClass, String.class).newInstance(jobName);
-        } else if(getConstructor(runningClass) != null) {
-            obj = getConstructor(runningClass).newInstance();
-        }
-        return obj;
-    }
-
-    private static Constructor<?> getConstructor(Class<?> c, Class<?>... args) {
-        try {
-            Constructor<?> cons = c.getConstructor(args);
-            return cons;
-        } catch(NoSuchMethodException e) {
-            return null;
-        }
-    }
+		}
+		return obj;
+	}
+
+	private static Constructor<?> getConstructor(Class<?> c, Class<?>... args) {
+		try {
+			Constructor<?> cons = c.getConstructor(args);
+			return cons;
+		} catch (NoSuchMethodException e) {
+			return null;
+		}
+	}
 
 }
diff --git a/src/java/azkaban/jobExecutor/JavaProcessJob.java b/src/java/azkaban/jobExecutor/JavaProcessJob.java
index bd41935..db9b52c 100644
--- a/src/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/src/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -36,15 +36,15 @@ public class JavaProcessJob extends ProcessJob {
 	public static final String MAX_MEMORY_SIZE = "Xmx";
 	public static final String MAIN_ARGS = "main.args";
 	public static final String JVM_PARAMS = "jvm.args";
-    public static final String GLOBAL_JVM_PARAMS = "global.jvm.args";
-    
+	public static final String GLOBAL_JVM_PARAMS = "global.jvm.args";
+
 	public static final String DEFAULT_INITIAL_MEMORY_SIZE = "64M";
 	public static final String DEFAULT_MAX_MEMORY_SIZE = "256M";
 
 	public static String JAVA_COMMAND = "java";
 
-	public JavaProcessJob(Props prop, Logger logger) {
-		super(prop, logger);
+	public JavaProcessJob(String jobid, Props prop, Logger logger) {
+		super(jobid, prop, logger);
 	}
 
 	@Override
@@ -98,8 +98,8 @@ public class JavaProcessJob extends ProcessJob {
 			
 			for (File file : parent.listFiles()) {
 				if (file.getName().endsWith(".jar")) {
-	                //log.info("Adding to classpath:" + file.getName());
-	                classpathList.add(file.getName());
+					// log.info("Adding to classpath:" + file.getName());
+					classpathList.add(file.getName());
 				}
 			}
 		}
@@ -123,15 +123,15 @@ public class JavaProcessJob extends ProcessJob {
 		return getProps().getString(MAIN_ARGS, "");
 	}
 
-    protected String getJVMArguments() {
-        String globalJVMArgs = getProps().getString(GLOBAL_JVM_PARAMS, null);
-        
-        if (globalJVMArgs == null) {
-            return getProps().getString(JVM_PARAMS, "");
-        }
+	protected String getJVMArguments() {
+		String globalJVMArgs = getProps().getString(GLOBAL_JVM_PARAMS, null);
 
-        return globalJVMArgs + " " + getProps().getString(JVM_PARAMS, "");
-    }
+		if (globalJVMArgs == null) {
+			return getProps().getString(JVM_PARAMS, "");
+		}
+
+		return globalJVMArgs + " " + getProps().getString(JVM_PARAMS, "");
+	}
 
 	protected String createArguments(List<String> arguments, String separator) {
 		if (arguments != null && arguments.size() > 0) {
diff --git a/src/java/azkaban/jobExecutor/Job.java b/src/java/azkaban/jobExecutor/Job.java
index a7c59e5..3649095 100644
--- a/src/java/azkaban/jobExecutor/Job.java
+++ b/src/java/azkaban/jobExecutor/Job.java
@@ -18,8 +18,6 @@ package azkaban.jobExecutor;
 
 import azkaban.utils.Props;
 
-
-
 /**
  * This interface defines a Raw Job interface. Each job defines
  * <ul>
@@ -33,43 +31,47 @@ import azkaban.utils.Props;
 
 public interface Job {
 
-    /**
-     * Returns a unique(should be checked in xml) string name/id for the Job.
-     * 
-     * @return
-     */
-    public String getId();
+	/**
+	 * Returns a unique(should be checked in xml) string name/id for the Job.
+	 * 
+	 * @return
+	 */
+	public String getId();
+
+	/**
+	 * Run the job. In general this method can only be run once. Must either
+	 * succeed or throw an exception.
+	 */
+	public void run() throws Exception;
+
+	/**
+	 * Best effort attempt to cancel the job.
+	 * 
+	 * @throws Exception
+	 *             If cancel fails
+	 */
+	public void cancel() throws Exception;
 
-    /**
-     * Run the job. In general this method can only be run once. Must either
-     * succeed or throw an exception.
-     */
-    public void run() throws Exception;
+	/**
+	 * Returns a progress report between [0 - 1.0] to indicate the percentage
+	 * complete
+	 * 
+	 * @throws Exception
+	 *             If getting progress fails
+	 */
+	public double getProgress() throws Exception;
 
-    /**
-     * Best effort attempt to cancel the job.
-     * 
-     * @throws Exception If cancel fails
-     */
-    public void cancel() throws Exception;
+	/**
+	 * Get the generated properties from this job.
+	 * 
+	 * @return
+	 */
+	public Props getJobGeneratedProperties();
 
-    /**
-     * Returns a progress report between [0 - 1.0] to indicate the percentage
-     * complete
-     * 
-     * @throws Exception If getting progress fails
-     */
-    public double getProgress() throws Exception;
-    
-    /**
-     * Get the generated properties from this job.
-     * @return
-     */
-    public Props getJobGeneratedProperties();
-    
-    /**
-     * Determine if the job was cancelled.
-     * @return
-     */
-    public boolean isCanceled();
+	/**
+	 * Determine if the job was cancelled.
+	 * 
+	 * @return
+	 */
+	public boolean isCanceled();
 }
diff --git a/src/java/azkaban/jobExecutor/LongArgJob.java b/src/java/azkaban/jobExecutor/LongArgJob.java
index 3bee6d5..7306a4e 100644
--- a/src/java/azkaban/jobExecutor/LongArgJob.java
+++ b/src/java/azkaban/jobExecutor/LongArgJob.java
@@ -40,13 +40,13 @@ public abstract class LongArgJob extends AbstractProcessJob {
     private final AzkabanProcessBuilder builder;
     private volatile AzkabanProcess process;
 
-    public LongArgJob(String[] command, Props prop, Logger log) {
-        this(command, prop, log, new HashSet<String>(0));
+    public LongArgJob(String jobid, String[] command, Props prop, Logger log) {
+        this(jobid, command, prop, log, new HashSet<String>(0));
     }
     
-    public LongArgJob(String[] command, Props prop, Logger log, Set<String> suppressedKeys) {
+    public LongArgJob(String jobid, String[] command, Props prop, Logger log, Set<String> suppressedKeys) {
         //super(command, desc);
-         super(prop, log);
+         super(jobid, prop, log);
         //String cwd = descriptor.getProps().getString(WORKING_DIR, new File(descriptor.getFullPath()).getParent());
        
         this.builder = new AzkabanProcessBuilder(command).
diff --git a/src/java/azkaban/jobExecutor/NoopJob.java b/src/java/azkaban/jobExecutor/NoopJob.java
index e364b2f..2b7b09b 100644
--- a/src/java/azkaban/jobExecutor/NoopJob.java
+++ b/src/java/azkaban/jobExecutor/NoopJob.java
@@ -22,43 +22,38 @@ import azkaban.utils.Props;
 /**
  *
  */
-public class NoopJob implements Job
-{
-    public NoopJob(Props props, Logger log)
-    {
-        
-    }
-
-    @Override
-    public String getId()
-    {
-        return "Azkaban!! -- " + getClass().getName();
-    }
-
-    @Override
-    public void run() throws Exception
-    {
-    }
-
-    @Override
-    public void cancel() throws Exception
-    {
-    }
-
-    @Override
-    public double getProgress() throws Exception
-    {
-        return 0;
-    }
-
-    @Override
-    public Props getJobGeneratedProperties()
-    {
-        return new Props();
-    }
-
-    @Override
-    public boolean isCanceled() {
-        return false;
-    }
+public class NoopJob implements Job {
+	private String jobId;
+
+	public NoopJob(String jobid, Props props, Logger log) {
+		this.jobId = jobid;
+	}
+
+	@Override
+	public String getId() {
+		return this.jobId;
+	}
+
+	@Override
+	public void run() throws Exception {
+	}
+
+	@Override
+	public void cancel() throws Exception {
+	}
+
+	@Override
+	public double getProgress() throws Exception {
+		return 0;
+	}
+
+	@Override
+	public Props getJobGeneratedProperties() {
+		return new Props();
+	}
+
+	@Override
+	public boolean isCanceled() {
+		return false;
+	}
 }
diff --git a/src/java/azkaban/jobExecutor/PigProcessJob.java b/src/java/azkaban/jobExecutor/PigProcessJob.java
index 3a5a15d..b0d8ae8 100644
--- a/src/java/azkaban/jobExecutor/PigProcessJob.java
+++ b/src/java/azkaban/jobExecutor/PigProcessJob.java
@@ -46,8 +46,8 @@ public class PigProcessJob extends JavaProcessJob {
   public static final String PIG_JAVA_CLASS = "org.apache.pig.Main";
   public static final String SECURE_PIG_WRAPPER = "azkaban.jobExecutor.SecurePigWrapper";
 
-	public PigProcessJob(Props props, Logger log) {
-		super(props, log);
+	public PigProcessJob(String jobid, Props props, Logger log) {
+		super(jobid, props, log);
 	}
 
 	@Override
@@ -160,20 +160,20 @@ public class PigProcessJob extends JavaProcessJob {
 	
 	
 	private static String getSourcePathFromClass(Class containedClass) {
-	    File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
-
-	    if (!file.isDirectory() && file.getName().endsWith(".class")) {
-	        String name = containedClass.getName();
-	        StringTokenizer tokenizer = new StringTokenizer(name, ".");
-	        while(tokenizer.hasMoreTokens()) {
-	            tokenizer.nextElement();
-	            file = file.getParentFile();
-	        }
-	            
-	        return file.getPath();  
-	    }
-	    else {
-	        return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
-	    }
+		File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+		if (!file.isDirectory() && file.getName().endsWith(".class")) {
+			String name = containedClass.getName();
+			StringTokenizer tokenizer = new StringTokenizer(name, ".");
+			while (tokenizer.hasMoreTokens()) {
+				tokenizer.nextElement();
+				file = file.getParentFile();
+			}
+
+			return file.getPath();
+		} else {
+			return containedClass.getProtectionDomain().getCodeSource()
+					.getLocation().getPath();
+		}
 	}
 }
diff --git a/src/java/azkaban/jobExecutor/ProcessJob.java b/src/java/azkaban/jobExecutor/ProcessJob.java
index 06050c8..42a1925 100644
--- a/src/java/azkaban/jobExecutor/ProcessJob.java
+++ b/src/java/azkaban/jobExecutor/ProcessJob.java
@@ -37,311 +37,304 @@ import azkaban.utils.Props;
  */
 public class ProcessJob extends AbstractProcessJob implements Job {
 
-    public static final String COMMAND = "command";
-    public static final int CLEAN_UP_TIME_MS = 1000;
-
-    private volatile Process _process;
-    private volatile boolean _isComplete;
-    private volatile boolean _isCancelled;
-
-    public ProcessJob(final Props props, final Logger log) {
-        super(props, log);
-    }
-
-    @Override
-    public void run() {
-        synchronized (this) {
-            _isCancelled = false;
-        }
-        resolveProps();
-
-        // Sets a list of all the commands that need to be run.
-        List<String> commands = getCommandList();
-        info(commands.size() + " commands to execute.");
-
-        File[] propFiles = initPropsFiles();
-
-        // System.err.println("in process job outputFile=" +propFiles[1]);
-
-        // For each of the jobs, set up a process and run them.
-        for (String command : commands) {
-            info("Executing command: " + command);
-            String[] cmdPieces = partitionCommandLine(command);
-
-            ProcessBuilder builder = new ProcessBuilder(cmdPieces);
-
-            builder.directory(new File(getCwd()));
-            builder.environment().putAll(getEnvironmentVariables());
-
-            try {
-                _process = builder.start();
-            } catch (IOException e) {
-                for (File file : propFiles) {
-                    if (file != null && file.exists()) {
-                        file.delete();
-                    }
-                }
-                throw new RuntimeException(e);
-            }
-            LoggingGobbler outputGobbler = new LoggingGobbler(
-                    new InputStreamReader(_process.getInputStream()),
-                    Level.INFO);
-            LoggingGobbler errorGobbler = new LoggingGobbler(
-                    new InputStreamReader(_process.getErrorStream()),
-                    Level.ERROR);
-
-            int processId = getProcessId();
-            if (processId == 0) {
-                info("Spawned thread. Unknowned processId");
-            } else {
-                info("Spawned thread with processId " + processId);
-            }
-            outputGobbler.start();
-            errorGobbler.start();
-            int exitCode = -999;
-            try {
-                exitCode = _process.waitFor();
-
-                _isComplete = true;
-                if (exitCode != 0) {
-                    for (File file : propFiles) {
-                        if (file != null && file.exists()) {
-                            file.delete();
-                        }
-                    }
-                    throw new RuntimeException(
-                            "Processes ended with exit code " + exitCode + ".");
-                }
-
-                // try to wait for everything to get logged out before exiting
-                outputGobbler.join(1000);
-                errorGobbler.join(1000);
-            } catch (InterruptedException e) {
-            } finally {
-                outputGobbler.close();
-                errorGobbler.close();
-            }
-        }
-
-        // Get the output properties from this job.
-        generateProperties(propFiles[1]);
-
-        for (File file : propFiles) {
-            if (file != null && file.exists()) {
-                file.delete();
-            }
-        }
-
-    }
-
-    protected List<String> getCommandList() {
-        List<String> commands = new ArrayList<String>();
-        commands.add(_props.getString(COMMAND));
-        for (int i = 1; _props.containsKey(COMMAND + "." + i); i++) {
-            commands.add(_props.getString(COMMAND + "." + i));
-        }
-
-        return commands;
-    }
-
-    @Override
-    public void cancel() throws Exception {
-        if (_process != null) {
-            int processId = getProcessId();
-            if (processId != 0) {
-                warn("Attempting to kill the process " + processId);
-                try {
-                    Runtime.getRuntime().exec("kill " + processId);
-                    synchronized (this) {
-                        wait(CLEAN_UP_TIME_MS);
-                    }
-                } catch (InterruptedException e) {
-                    // Do nothing. We don't really care.
-                }
-                if (!_isComplete) {
-                    error("After "
-                            + CLEAN_UP_TIME_MS
-                            + " ms, the job hasn't terminated. Will force terminate the job.");
-                }
-            } else {
-                info("Cound not get process id");
-            }
-
-            if (!_isComplete) {
-                warn("Force kill the process");
-                _process.destroy();
-            }
-            synchronized (this) {
-                _isCancelled = true;
-            }
-        }
-    }
-
-    public int getProcessId() {
-        int processId = 0;
-
-        try {
-            Field f = _process.getClass().getDeclaredField("pid");
-            f.setAccessible(true);
-
-            processId = f.getInt(_process);
-        } catch (Throwable e) {
-        }
-
-        return processId;
-    }
-
-    @Override
-    public double getProgress() {
-        return _isComplete ? 1.0 : 0.0;
-    }
-
-    private class LoggingGobbler extends Thread {
-
-        private final BufferedReader _inputReader;
-        private final Level _loggingLevel;
-
-        public LoggingGobbler(final InputStreamReader inputReader,
-                final Level level) {
-            _inputReader = new BufferedReader(inputReader);
-            _loggingLevel = level;
-        }
-
-        public void close() {
-            if (_inputReader != null) {
-                try {
-                    _inputReader.close();
-                } catch (IOException e) {
-                    error("Error cleaning up logging stream reader:", e);
-                }
-            }
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (!Thread.currentThread().isInterrupted()) {
-                    String line = _inputReader.readLine();
-                    if (line == null) {
-                        return;
-                    }
-
-                    logMessage(line);
-                }
-            } catch (IOException e) {
-                error("Error reading from logging stream:", e);
-            }
-        }
-
-        private void logMessage(final String message) {
-            if (message.startsWith(Level.DEBUG.toString())) {
-                String newMsg = message.substring(Level.DEBUG.toString()
-                        .length());
-                getLog().debug(newMsg);
-            } else if (message.startsWith(Level.ERROR.toString())) {
-                String newMsg = message.substring(Level.ERROR.toString()
-                        .length());
-                getLog().error(newMsg);
-            } else if (message.startsWith(Level.INFO.toString())) {
-                String newMsg = message.substring(Level.INFO.toString()
-                        .length());
-                getLog().info(newMsg);
-            } else if (message.startsWith(Level.WARN.toString())) {
-                String newMsg = message.substring(Level.WARN.toString()
-                        .length());
-                getLog().warn(newMsg);
-            } else if (message.startsWith(Level.FATAL.toString())) {
-                String newMsg = message.substring(Level.FATAL.toString()
-                        .length());
-                getLog().fatal(newMsg);
-            } else if (message.startsWith(Level.TRACE.toString())) {
-                String newMsg = message.substring(Level.TRACE.toString()
-                        .length());
-                getLog().trace(newMsg);
-            } else {
-                getLog().log(_loggingLevel, message);
-            }
-
-        }
-    }
-
-    @Override
-    public Props getProps() {
-        return _props;
-    }
-
-    public String getPath() {
-        return _jobPath;
-    }
-
-    public String getJobName() {
-        return getId();
-    }
-
-
-    /**
-     * Splits the command into a unix like command line structure. Quotes and
-     * single quotes are treated as nested strings.
-     * 
-     * @param command
-     * @return
-     */
-    public static String[] partitionCommandLine(final String command) {
-        ArrayList<String> commands = new ArrayList<String>();
-
-        int index = 0;
-
-        StringBuffer buffer = new StringBuffer(command.length());
-
-        boolean isApos = false;
-        boolean isQuote = false;
-        while (index < command.length()) {
-            char c = command.charAt(index);
-
-            switch (c) {
-            case ' ':
-                if (!isQuote && !isApos) {
-                    String arg = buffer.toString();
-                    buffer = new StringBuffer(command.length() - index);
-                    if (arg.length() > 0) {
-                        commands.add(arg);
-                    }
-                } else {
-                    buffer.append(c);
-                }
-                break;
-            case '\'':
-                if (!isQuote) {
-                    isApos = !isApos;
-                } else {
-                    buffer.append(c);
-                }
-                break;
-            case '"':
-                if (!isApos) {
-                    isQuote = !isQuote;
-                } else {
-                    buffer.append(c);
-                }
-                break;
-            default:
-                buffer.append(c);
-            }
-
-            index++;
-        }
-
-        if (buffer.length() > 0) {
-            String arg = buffer.toString();
-            commands.add(arg);
-        }
-
-        return commands.toArray(new String[commands.size()]);
-    }
-
-    @Override
-    public synchronized boolean isCanceled() {
-        return _isCancelled;
-    }
+	public static final String COMMAND = "command";
+	public static final int CLEAN_UP_TIME_MS = 1000;
+
+	private volatile Process _process;
+	private volatile boolean _isComplete;
+	private volatile boolean _isCancelled;
+
+	public ProcessJob(final String jobId, final Props props, final Logger log) {
+		super(jobId, props, log);
+	}
+
+	@Override
+	public void run() {
+		synchronized (this) {
+			_isCancelled = false;
+		}
+		resolveProps();
+
+		// Sets a list of all the commands that need to be run.
+		List<String> commands = getCommandList();
+		info(commands.size() + " commands to execute.");
+
+		File[] propFiles = initPropsFiles();
+
+		// System.err.println("in process job outputFile=" +propFiles[1]);
+
+		// For each of the jobs, set up a process and run them.
+		for (String command : commands) {
+			info("Executing command: " + command);
+			String[] cmdPieces = partitionCommandLine(command);
+
+			ProcessBuilder builder = new ProcessBuilder(cmdPieces);
+
+			builder.directory(new File(getCwd()));
+			builder.environment().putAll(getEnvironmentVariables());
+
+			try {
+				_process = builder.start();
+			} catch (IOException e) {
+				for (File file : propFiles) {
+					if (file != null && file.exists()) {
+						file.delete();
+					}
+				}
+				throw new RuntimeException(e);
+			}
+			LoggingGobbler outputGobbler = new LoggingGobbler(
+					new InputStreamReader(_process.getInputStream()),
+					Level.INFO);
+			LoggingGobbler errorGobbler = new LoggingGobbler(
+					new InputStreamReader(_process.getErrorStream()),
+					Level.ERROR);
+
+			int processId = getProcessId();
+			if (processId == 0) {
+				info("Spawned thread. Unknowned processId");
+			} else {
+				info("Spawned thread with processId " + processId);
+			}
+			outputGobbler.start();
+			errorGobbler.start();
+			int exitCode = -999;
+			try {
+				exitCode = _process.waitFor();
+
+				_isComplete = true;
+				if (exitCode != 0) {
+					for (File file : propFiles) {
+						if (file != null && file.exists()) {
+							file.delete();
+						}
+					}
+					throw new RuntimeException(
+							"Processes ended with exit code " + exitCode + ".");
+				}
+
+				// try to wait for everything to get logged out before exiting
+				outputGobbler.join(1000);
+				errorGobbler.join(1000);
+			} catch (InterruptedException e) {
+			} finally {
+				outputGobbler.close();
+				errorGobbler.close();
+			}
+		}
+
+		// Get the output properties from this job.
+		generateProperties(propFiles[1]);
+
+		for (File file : propFiles) {
+			if (file != null && file.exists()) {
+				file.delete();
+			}
+		}
+
+	}
+
+	protected List<String> getCommandList() {
+		List<String> commands = new ArrayList<String>();
+		commands.add(_props.getString(COMMAND));
+		for (int i = 1; _props.containsKey(COMMAND + "." + i); i++) {
+			commands.add(_props.getString(COMMAND + "." + i));
+		}
+
+		return commands;
+	}
+
+	@Override
+	public void cancel() throws Exception {
+		if (_process != null) {
+			int processId = getProcessId();
+			if (processId != 0) {
+				warn("Attempting to kill the process " + processId);
+				try {
+					Runtime.getRuntime().exec("kill " + processId);
+					synchronized (this) {
+						wait(CLEAN_UP_TIME_MS);
+					}
+				} catch (InterruptedException e) {
+					// Do nothing. We don't really care.
+				}
+				if (!_isComplete) {
+					error("After "
+							+ CLEAN_UP_TIME_MS
+							+ " ms, the job hasn't terminated. Will force terminate the job.");
+				}
+			} else {
+				info("Could not get process id");
+			}
+
+			if (!_isComplete) {
+				warn("Force kill the process");
+				_process.destroy();
+			}
+			synchronized (this) {
+				_isCancelled = true;
+			}
+		}
+	}
+
+	public int getProcessId() {
+		int processId = 0;
+
+		try {
+			Field f = _process.getClass().getDeclaredField("pid");
+			f.setAccessible(true);
+
+			processId = f.getInt(_process);
+		} catch (Throwable e) {
+		}
+
+		return processId;
+	}
+
+	@Override
+	public double getProgress() {
+		return _isComplete ? 1.0 : 0.0;
+	}
+
+	private class LoggingGobbler extends Thread {
+
+		private final BufferedReader _inputReader;
+		private final Level _loggingLevel;
+
+		public LoggingGobbler(final InputStreamReader inputReader,
+				final Level level) {
+			_inputReader = new BufferedReader(inputReader);
+			_loggingLevel = level;
+		}
+
+		public void close() {
+			if (_inputReader != null) {
+				try {
+					_inputReader.close();
+				} catch (IOException e) {
+					error("Error cleaning up logging stream reader:", e);
+				}
+			}
+		}
+
+		@Override
+		public void run() {
+			try {
+				while (!Thread.currentThread().isInterrupted()) {
+					String line = _inputReader.readLine();
+					if (line == null) {
+						return;
+					}
+
+					logMessage(line);
+				}
+			} catch (IOException e) {
+				error("Error reading from logging stream:", e);
+			}
+		}
+
+		private void logMessage(final String message) {
+			if (message.startsWith(Level.DEBUG.toString())) {
+				String newMsg = message.substring(Level.DEBUG.toString().length());
+				getLog().debug(newMsg);
+			} else if (message.startsWith(Level.ERROR.toString())) {
+				String newMsg = message.substring(Level.ERROR.toString().length());
+				getLog().error(newMsg);
+			} else if (message.startsWith(Level.INFO.toString())) {
+				String newMsg = message.substring(Level.INFO.toString().length());
+				getLog().info(newMsg);
+			} else if (message.startsWith(Level.WARN.toString())) {
+				String newMsg = message.substring(Level.WARN.toString().length());
+				getLog().warn(newMsg);
+			} else if (message.startsWith(Level.FATAL.toString())) {
+				String newMsg = message.substring(Level.FATAL.toString().length());
+				getLog().fatal(newMsg);
+			} else if (message.startsWith(Level.TRACE.toString())) {
+				String newMsg = message.substring(Level.TRACE.toString().length());
+				getLog().trace(newMsg);
+			} else {
+				getLog().log(_loggingLevel, message);
+			}
+
+		}
+	}
+
+	@Override
+	public Props getProps() {
+		return _props;
+	}
+
+	public String getPath() {
+		return _jobPath;
+	}
+
+	public String getJobName() {
+		return getId();
+	}
+
+	/**
+	 * Splits the command into a unix like command line structure. Quotes and
+	 * single quotes are treated as nested strings.
+	 * 
+	 * @param command
+	 * @return
+	 */
+	public static String[] partitionCommandLine(final String command) {
+		ArrayList<String> commands = new ArrayList<String>();
+
+		int index = 0;
+
+		StringBuffer buffer = new StringBuffer(command.length());
+
+		boolean isApos = false;
+		boolean isQuote = false;
+		while (index < command.length()) {
+			char c = command.charAt(index);
+
+			switch (c) {
+			case ' ':
+				if (!isQuote && !isApos) {
+					String arg = buffer.toString();
+					buffer = new StringBuffer(command.length() - index);
+					if (arg.length() > 0) {
+						commands.add(arg);
+					}
+				} else {
+					buffer.append(c);
+				}
+				break;
+			case '\'':
+				if (!isQuote) {
+					isApos = !isApos;
+				} else {
+					buffer.append(c);
+				}
+				break;
+			case '"':
+				if (!isApos) {
+					isQuote = !isQuote;
+				} else {
+					buffer.append(c);
+				}
+				break;
+			default:
+				buffer.append(c);
+			}
+
+			index++;
+		}
+
+		if (buffer.length() > 0) {
+			String arg = buffer.toString();
+			commands.add(arg);
+		}
+
+		return commands.toArray(new String[commands.size()]);
+	}
+
+	@Override
+	public synchronized boolean isCanceled() {
+		return _isCancelled;
+	}
 
 }
diff --git a/src/java/azkaban/jobExecutor/PythonJob.java b/src/java/azkaban/jobExecutor/PythonJob.java
index ad83c38..2cb44c3 100644
--- a/src/java/azkaban/jobExecutor/PythonJob.java
+++ b/src/java/azkaban/jobExecutor/PythonJob.java
@@ -21,23 +21,17 @@ import com.google.common.collect.ImmutableSet;
 
 import azkaban.utils.Props;
 
-
-
 public class PythonJob extends LongArgJob {
-    
-    private static final String PYTHON_BINARY_KEY = "python";
-    private static final String SCRIPT_KEY = "script";
-
-
-    public PythonJob(Props props, Logger log) {
-        super(new String[]{props.getString(PYTHON_BINARY_KEY, "python"), 
-                           props.getString(SCRIPT_KEY)}, 
-              props,
-              log, 
-              ImmutableSet.of(PYTHON_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
-    }
 
+	private static final String PYTHON_BINARY_KEY = "python";
+	private static final String SCRIPT_KEY = "script";
 
+	public PythonJob(String jobid, Props props, Logger log) {
+		super(jobid, 
+				new String[] { props.getString(PYTHON_BINARY_KEY, "python"),props.getString(SCRIPT_KEY) }, 
+				props, 
+				log, 
+				ImmutableSet.of(PYTHON_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
+	}
 
-    
 }
diff --git a/src/java/azkaban/jobExecutor/RubyJob.java b/src/java/azkaban/jobExecutor/RubyJob.java
index 7f7febc..a65c272 100644
--- a/src/java/azkaban/jobExecutor/RubyJob.java
+++ b/src/java/azkaban/jobExecutor/RubyJob.java
@@ -21,20 +21,17 @@ import azkaban.utils.Props;
 
 import com.google.common.collect.ImmutableSet;
 
-
 public class RubyJob extends LongArgJob {
 
-    private static final String RUBY_BINARY_KEY = "ruby";
-    private static final String SCRIPT_KEY = "script";
+	private static final String RUBY_BINARY_KEY = "ruby";
+	private static final String SCRIPT_KEY = "script";
 
-    public RubyJob(Props props, Logger log) {
-        super(new String[]{props.getString(RUBY_BINARY_KEY, "ruby"), 
-                           props.getString(SCRIPT_KEY)}, 
-              props, 
-              log, 
-              ImmutableSet.of(RUBY_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
-    }
+	public RubyJob(String jobid, Props props, Logger log) {
+		super(jobid, 
+				new String[] { props.getString(RUBY_BINARY_KEY, "ruby"), props.getString(SCRIPT_KEY) }, 
+				props, 
+				log, 
+				ImmutableSet.of(RUBY_BINARY_KEY, SCRIPT_KEY, JOB_TYPE));
+	}
 
-   
-    
 }
diff --git a/src/java/azkaban/jobExecutor/ScriptJob.java b/src/java/azkaban/jobExecutor/ScriptJob.java
index 1879afe..1a8f16b 100644
--- a/src/java/azkaban/jobExecutor/ScriptJob.java
+++ b/src/java/azkaban/jobExecutor/ScriptJob.java
@@ -22,25 +22,24 @@ import com.google.common.collect.ImmutableSet;
 import azkaban.utils.Props;
 
 /**
- * A script job issues a command of the form
- *    [EXECUTABLE] [SCRIPT] --key1 val1 ... --key2 val2
- *   executable -- the interpretor command to execute
- *   script -- the script to pass in (requried)
+ * A script job issues a command of the form [EXECUTABLE] [SCRIPT] --key1 val1
+ * ... --key2 val2 executable -- the interpretor command to execute script --
+ * the script to pass in (requried)
  * 
  * @author jkreps
- *
+ * 
  */
 public class ScriptJob extends LongArgJob {
 
-    private static final String DEFAULT_EXECUTABLE_KEY = "executable";
-    private static final String SCRIPT_KEY = "script";
-    
-    public ScriptJob(Props props, Logger log) {
-        super(new String[] {props.getString(DEFAULT_EXECUTABLE_KEY), props.getString(SCRIPT_KEY)}, 
-              props, 
-              log, 
-              ImmutableSet.of(DEFAULT_EXECUTABLE_KEY, SCRIPT_KEY, JOB_TYPE));
-    }
- 
+	private static final String DEFAULT_EXECUTABLE_KEY = "executable";
+	private static final String SCRIPT_KEY = "script";
+
+	public ScriptJob(String jobid, Props props, Logger log) {
+		super(jobid, 
+				new String[] { props.getString(DEFAULT_EXECUTABLE_KEY),props.getString(SCRIPT_KEY) }, 
+				props, 
+				log, 
+				ImmutableSet.of(DEFAULT_EXECUTABLE_KEY, SCRIPT_KEY, JOB_TYPE));
+	}
 
 }
diff --git a/src/java/azkaban/jobExecutor/SecurePigWrapper.java b/src/java/azkaban/jobExecutor/SecurePigWrapper.java
index c210b40..0103ddb 100644
--- a/src/java/azkaban/jobExecutor/SecurePigWrapper.java
+++ b/src/java/azkaban/jobExecutor/SecurePigWrapper.java
@@ -35,62 +35,66 @@ import static azkaban.jobExecutor.utils.SecurityUtils.getProxiedUser;
 
 public class SecurePigWrapper {
 
-  public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
-  public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
+	public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
+	public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
 
-  public static void main(final String[] args) throws IOException, InterruptedException {
-    final Logger logger = Logger.getRootLogger();
-    final Properties p = System.getProperties();
-    final Configuration conf = new Configuration();
+	public static void main(final String[] args) throws IOException, InterruptedException {
+		final Logger logger = Logger.getRootLogger();
+		final Properties p = System.getProperties();
+		final Configuration conf = new Configuration();
 
-    getProxiedUser(p, logger, conf).doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        prefetchToken();
-        org.apache.pig.Main.main(args);
-        return null;
-      }
+		getProxiedUser(p, logger, conf).doAs(
+				new PrivilegedExceptionAction<Void>() {
+					@Override
+					public Void run() throws Exception {
+						prefetchToken();
+						org.apache.pig.Main.main(args);
+						return null;
+					}
 
-      // For Pig jobs that need to do extra communication with the JobTracker,
-      // it's necessary to pre-fetch a token and include it in the credentials
-      // cache
-      private void prefetchToken() throws InterruptedException, IOException {
-        String shouldPrefetch = p.getProperty(OBTAIN_BINARY_TOKEN);
-        if(shouldPrefetch != null && shouldPrefetch.equals("true") ) {
-          logger.info("Pre-fetching token");
-          Job job = new Job(conf, "totally phony, extremely fake, not real job");
+					// For Pig jobs that need to do extra communication with the
+					// JobTracker,
+					// it's necessary to pre-fetch a token and include it in the
+					// credentials
+					// cache
+					private void prefetchToken() throws InterruptedException,
+							IOException {
+						String shouldPrefetch = p.getProperty(OBTAIN_BINARY_TOKEN);
+						if (shouldPrefetch != null && shouldPrefetch.equals("true")) {
+							logger.info("Pre-fetching token");
+							Job job = new Job(conf,"totally phony, extremely fake, not real job");
 
-          JobConf jc = new JobConf(conf);
-          JobClient jobClient = new JobClient(jc);
-          logger.info("Pre-fetching: Got new JobClient: " + jc);
-          Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("hi"));
-          job.getCredentials().addToken(new Text("howdy"), mrdt);
+							JobConf jc = new JobConf(conf);
+							JobClient jobClient = new JobClient(jc);
+							logger.info("Pre-fetching: Got new JobClient: " + jc);
+							Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(new Text("hi"));
+							job.getCredentials().addToken(new Text("howdy"), mrdt);
 
-          File temp = File.createTempFile("mr-azkaban", ".token");
-          temp.deleteOnExit();
+							File temp = File.createTempFile("mr-azkaban", ".token");
+							temp.deleteOnExit();
 
-          FileOutputStream fos = null;
-          DataOutputStream dos = null;
-          try {
-            fos = new FileOutputStream(temp);
-            dos = new DataOutputStream(fos);
-            job.getCredentials().writeTokenStorageToStream(dos);
-          } finally {
-            if(dos != null) {
-              dos.close();
-            }
-            if(fos != null) {
-              fos.close();
-            }
-          }
-          logger.info("Setting " + MAPREDUCE_JOB_CREDENTIALS_BINARY + " to " + temp.getAbsolutePath());
-          System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, temp.getAbsolutePath());
-        } else {
-          logger.info("Not pre-fetching token");
-        }
-      }
-    });
+							FileOutputStream fos = null;
+							DataOutputStream dos = null;
+							try {
+								fos = new FileOutputStream(temp);
+								dos = new DataOutputStream(fos);
+								job.getCredentials().writeTokenStorageToStream(
+										dos);
+							} finally {
+								if (dos != null) {
+									dos.close();
+								}
+								if (fos != null) {
+									fos.close();
+								}
+							}
+							logger.info("Setting " + MAPREDUCE_JOB_CREDENTIALS_BINARY + " to " + temp.getAbsolutePath());
+							System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, temp.getAbsolutePath());
+						} else {
+							logger.info("Not pre-fetching token");
+						}
+					}
+				});
 
-  }
+	}
 }
-
diff --git a/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java b/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java
index b93196c..014c231 100644
--- a/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java
+++ b/src/java/azkaban/jobExecutor/utils/JobWrappingFactory.java
@@ -28,7 +28,6 @@ import azkaban.jobExecutor.ScriptJob;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 import azkaban.jobExecutor.utils.JobExecutionException;
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
 
 import java.util.Map;
@@ -42,8 +41,7 @@ public class JobWrappingFactory
     //private String _defaultType;
     private Map<String, Class<? extends Job>> _jobToClass;
 
-    private JobWrappingFactory(final Map<String, Class<? extends Job>> jobTypeToClassMap
-    )
+    protected JobWrappingFactory(final Map<String, Class<? extends Job>> jobTypeToClassMap)
     {
         //this._defaultType = defaultType;
         this._jobToClass = jobTypeToClassMap;
@@ -71,7 +69,7 @@ public class JobWrappingFactory
     	_jobToClass = newJobExecutors;
     }
     
-    public Job buildJobExecutor(Props props, Logger logger)
+    public Job buildJobExecutor(String jobId, Props props, Logger logger)
     {
       
       Job job;
@@ -93,7 +91,7 @@ public class JobWrappingFactory
                     ));
         }
         
-        job = (Job)Utils.callConstructor(executorClass, props, logger);
+        job = (Job)Utils.callConstructor(executorClass, jobId, props, logger);
 
       }
       catch (Exception e) {
diff --git a/unit/executions/exectest1/exec1.flow b/unit/executions/exectest1/exec1.flow
new file mode 100644
index 0000000..4684cd6
--- /dev/null
+++ b/unit/executions/exectest1/exec1.flow
@@ -0,0 +1,154 @@
+{
+  "id" : "derived-member-data",
+  "success.email" : [],
+  "edges" : [ {
+    "source" : "job1",
+    "target" : "job2"
+  }, {
+    "source" : "job2",
+    "target" : "job3"
+  },{
+    "source" : "job2",
+    "target" : "job4"
+  }, {
+    "source" : "job3",
+    "target" : "job5"
+  },{
+    "source" : "job4",
+    "target" : "job5"
+  },{
+    "source" : "job5",
+    "target" : "job7"
+  },{
+    "source" : "job1",
+    "target" : "job6"
+  },{
+    "source" : "job6",
+    "target" : "job7"
+  },{
+    "source" : "job7",
+    "target" : "job8"
+  },{
+    "source" : "job7",
+    "target" : "job9"
+  },
+  {
+    "source" : "job8",
+    "target" : "job10"
+  },
+  {
+    "source" : "job9",
+    "target" : "job10"
+  }
+   ],
+  "failure.email" : [],
+  "nodes" : [ {
+    "propSource" : "prop2.properties",
+    "id" : "job1",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job1.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job2",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job2.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job3",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job3.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job4",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job4.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job5",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job5.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job6",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job6.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job7",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job7.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job8",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job8.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job9",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job9.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job10",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job10.job",
+    "expectedRuntime" : 1
+  }
+   ],
+  "layedout" : false,
+  "type" : "flow",
+  "props" : [ {
+    "inherits" : "prop1.properties",
+    "source" : "prop2.properties"
+  },{
+    "source" : "prop1.properties"
+  }]
+}
\ No newline at end of file
diff --git a/unit/executions/exectest1/job1.job b/unit/executions/exectest1/job1.job
new file mode 100644
index 0000000..0a60dc4
--- /dev/null
+++ b/unit/executions/exectest1/job1.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=1
+fail=false
diff --git a/unit/executions/exectest1/job10.job b/unit/executions/exectest1/job10.job
new file mode 100644
index 0000000..218f774
--- /dev/null
+++ b/unit/executions/exectest1/job10.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job8,job9
+seconds=5
+fail=false
diff --git a/unit/executions/exectest1/job2.job b/unit/executions/exectest1/job2.job
new file mode 100644
index 0000000..3c918c8
--- /dev/null
+++ b/unit/executions/exectest1/job2.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job1
+seconds=2
+fail=false
diff --git a/unit/executions/exectest1/job3.job b/unit/executions/exectest1/job3.job
new file mode 100644
index 0000000..b26a76c
--- /dev/null
+++ b/unit/executions/exectest1/job3.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job2
+seconds=3
+fail=false
diff --git a/unit/executions/exectest1/job4.job b/unit/executions/exectest1/job4.job
new file mode 100644
index 0000000..1cbac6f
--- /dev/null
+++ b/unit/executions/exectest1/job4.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job2
+seconds=4
+fail=false
diff --git a/unit/executions/exectest1/job5.job b/unit/executions/exectest1/job5.job
new file mode 100644
index 0000000..8dd934d
--- /dev/null
+++ b/unit/executions/exectest1/job5.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job3,job4
+seconds=5
+fail=false
diff --git a/unit/executions/exectest1/job6.job b/unit/executions/exectest1/job6.job
new file mode 100644
index 0000000..fc4474d
--- /dev/null
+++ b/unit/executions/exectest1/job6.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job1
+seconds=1
+fail=false
diff --git a/unit/executions/exectest1/job7.job b/unit/executions/exectest1/job7.job
new file mode 100644
index 0000000..d01cf79
--- /dev/null
+++ b/unit/executions/exectest1/job7.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job5,job6
+seconds=2
+fail=false
diff --git a/unit/executions/exectest1/job8.job b/unit/executions/exectest1/job8.job
new file mode 100644
index 0000000..643598c
--- /dev/null
+++ b/unit/executions/exectest1/job8.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job7
+seconds=3
+fail=false
diff --git a/unit/executions/exectest1/job9.job b/unit/executions/exectest1/job9.job
new file mode 100644
index 0000000..5d6dda9
--- /dev/null
+++ b/unit/executions/exectest1/job9.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job7
+seconds=4
+fail=false
diff --git a/unit/executions/exectest1/prop1.properties b/unit/executions/exectest1/prop1.properties
new file mode 100644
index 0000000..fb37c8b
--- /dev/null
+++ b/unit/executions/exectest1/prop1.properties
@@ -0,0 +1,2 @@
+a=0
+c=0
\ No newline at end of file
diff --git a/unit/executions/exectest1/prop2.properties b/unit/executions/exectest1/prop2.properties
new file mode 100644
index 0000000..86fbde0
--- /dev/null
+++ b/unit/executions/exectest1/prop2.properties
@@ -0,0 +1,2 @@
+a=1
+b=2
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/EventCollectorListener.java b/unit/java/azkaban/test/executor/EventCollectorListener.java
new file mode 100644
index 0000000..6e00681
--- /dev/null
+++ b/unit/java/azkaban/test/executor/EventCollectorListener.java
@@ -0,0 +1,53 @@
+package azkaban.test.executor;
+
+import java.util.ArrayList;
+
+import azkaban.executor.event.Event;
+import azkaban.executor.event.Event.Type;
+import azkaban.executor.event.EventListener;
+
+public class EventCollectorListener implements EventListener {
+	private ArrayList<Event> eventList = new ArrayList<Event>();
+	
+	@Override
+	public void handleEvent(Event event) {
+		eventList.add(event);
+	}
+
+	public ArrayList<Event> getEventList() {
+		return eventList;
+	}
+	
+	public boolean checkOrdering() {
+		long time = 0;
+		for (Event event: eventList) {
+			if (time > event.getTime()) {
+				return false;
+			}
+		}
+		
+		return true;
+	}
+
+	public void checkEventExists(Type[] types) {
+		int index = 0;
+		for (Event event: eventList) {
+			if (event.getRunner() == null) {
+				continue;
+			}
+			
+			if (index >= types.length) {
+				throw new RuntimeException("More events than expected. Got " + event.getType());
+			}
+			Type type = types[index++];
+
+			if (type != event.getType()) {
+				throw new RuntimeException("Got " + event.getType() + ", expected " + type + " index:" + index);
+			}
+		}
+		
+		if (types.length != index) {
+			throw new RuntimeException("Not enough events.");
+		}
+	}
+}
diff --git a/unit/java/azkaban/test/executor/FlowRunnerTest.java b/unit/java/azkaban/test/executor/FlowRunnerTest.java
new file mode 100644
index 0000000..f6d6786
--- /dev/null
+++ b/unit/java/azkaban/test/executor/FlowRunnerTest.java
@@ -0,0 +1,69 @@
+package azkaban.test.executor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.FlowRunner;
+import azkaban.flow.Flow;
+import azkaban.utils.JSONUtils;
+
+public class FlowRunnerTest {
+	private File workingDir;
+	private Logger logger = Logger.getLogger(FlowRunnerTest.class);
+	
+	public FlowRunnerTest() {
+		
+	}
+	
+	@Before
+	public void setUp() throws Exception {
+		System.out.println("Create temp dir");
+		workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+		if (workingDir.exists()) {
+			FileUtils.deleteDirectory(workingDir);
+		}
+		workingDir.mkdirs();
+	}
+	
+	@After
+	public void tearDown() throws IOException {
+		System.out.println("Teardown temp dir");
+		if (workingDir != null) {
+			FileUtils.deleteDirectory(workingDir);
+			workingDir = null;
+		}
+	}
+	
+	@Test
+	public void exec1() throws Exception {
+		File testDir = new File("unit/executions/exectest1");
+		ExecutableFlow exFlow = prepareExecDir(testDir, "exec1");
+		
+		FlowRunner runner = new FlowRunner(exFlow);
+		
+		
+	}
+	
+	private ExecutableFlow prepareExecDir(File execDir, String execName) throws IOException {
+		FileUtils.copyDirectory(execDir, workingDir);
+		
+		File jsonFlowFile = new File(workingDir, execName + ".flow");
+		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+		
+		Flow flow = Flow.flowFromObject(flowObj);
+		ExecutableFlow execFlow = new ExecutableFlow(execName, flow);
+		execFlow.setExecutionPath(workingDir.getPath());
+		return execFlow;
+	}
+
+}
diff --git a/unit/java/azkaban/test/executor/JobRunnerTest.java b/unit/java/azkaban/test/executor/JobRunnerTest.java
new file mode 100644
index 0000000..7804d21
--- /dev/null
+++ b/unit/java/azkaban/test/executor/JobRunnerTest.java
@@ -0,0 +1,249 @@
+package azkaban.test.executor;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.event.Event;
+import azkaban.executor.event.Event.Type;
+import azkaban.executor.JobRunner;
+import azkaban.jobExecutor.JavaJob;
+import azkaban.jobExecutor.ProcessJob;
+import azkaban.utils.Props;
+
+public class JobRunnerTest {
+	private File workingDir;
+	
+	public JobRunnerTest() {
+
+	}
+
+	@Before
+	public void setUp() throws Exception {
+		System.out.println("Create temp dir");
+		workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+		if (workingDir.exists()) {
+			FileUtils.deleteDirectory(workingDir);
+		}
+		workingDir.mkdirs();
+	}
+
+	@After
+	public void tearDown() throws IOException {
+		System.out.println("Teardown temp dir");
+		if (workingDir != null) {
+			FileUtils.deleteDirectory(workingDir);
+			workingDir = null;
+		}
+	}
+
+	@Test
+	public void testBasicRun() {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		Props props = createProps(1, false);
+		ExecutableNode node = new ExecutableNode();
+		node.setId("myjobid");
+		
+		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_STARTED));
+		JobRunner runner = new JobRunner("myexecutionid", node, props, workingDir);
+		runner.addListener(eventCollector);
+		Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+		
+		runner.run();
+		eventCollector.handleEvent(Event.create(null, Event.Type.JOB_SUCCEEDED));
+		
+		Assert.assertTrue(runner.getStatus() == node.getStatus());
+		Assert.assertTrue(node.getStatus() == Status.SUCCEEDED);
+		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+		Assert.assertTrue( node.getEndTime() - node.getStartTime() > 1000);
+		
+		File logFile = new File(runner.getLogFilePath());
+		Props outputProps = runner.getOutputProps();
+		Assert.assertTrue(outputProps != null);
+		Assert.assertTrue(outputProps.getKeySet().isEmpty());
+		Assert.assertTrue(logFile.exists());
+		
+		Assert.assertTrue(eventCollector.checkOrdering());
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_SUCCEEDED});
+		}
+		catch (Exception e) {
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testFailedRun() {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		Props props = createProps(1, true);
+		ExecutableNode node = new ExecutableNode();
+		node.setId("myjobid");
+		
+		JobRunner runner = new JobRunner("myexecutionid", node, props, workingDir);
+		runner.addListener(eventCollector);
+		Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+		eventCollector.handleEvent(Event.create(null, Type.JOB_STARTED));
+		runner.run();
+		eventCollector.handleEvent(Event.create(null, Type.JOB_FAILED));
+		
+		Assert.assertTrue(runner.getStatus() == node.getStatus());
+		Assert.assertTrue(node.getStatus() == Status.FAILED);
+		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+		Assert.assertTrue(node.getEndTime() - node.getStartTime() > 1000);
+		
+		File logFile = new File(runner.getLogFilePath());
+		Props outputProps = runner.getOutputProps();
+		Assert.assertTrue(outputProps == null);
+		Assert.assertTrue(logFile.exists());
+		Assert.assertTrue(eventCollector.checkOrdering());
+		
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FAILED});
+		}
+		catch (Exception e) {
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDisabledRun() {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		Props props = createProps(1, true);
+		ExecutableNode node = new ExecutableNode();
+		node.setId("myjobid");
+		
+		node.setStatus(Status.DISABLED);
+		JobRunner runner = new JobRunner("myexecutionid", node, props, workingDir);
+		runner.addListener(eventCollector);
+		
+		// Should be disabled.
+		Assert.assertTrue(runner.getStatus() == Status.DISABLED);
+		eventCollector.handleEvent(Event.create(null, Type.JOB_STARTED));
+		runner.run();
+		eventCollector.handleEvent(Event.create(null, Type.JOB_SUCCEEDED));
+		
+		Assert.assertTrue(runner.getStatus() == node.getStatus());
+		Assert.assertTrue(node.getStatus() == Status.SKIPPED);
+		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+		// Give it 10 ms to fail.
+		Assert.assertTrue( node.getEndTime() - node.getStartTime() < 10);
+		
+		// Log file and output files should not exist.
+		Props outputProps = runner.getOutputProps();
+		Assert.assertTrue(outputProps == null);
+		Assert.assertTrue(runner.getLogFilePath() == null);
+		Assert.assertTrue(eventCollector.checkOrdering());
+		
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.JOB_SUCCEEDED});
+		}
+		catch (Exception e) {
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPreKilledRun() {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		Props props = createProps(1, true);
+		ExecutableNode node = new ExecutableNode();
+		node.setId("myjobid");
+		
+		node.setStatus(Status.KILLED);
+		JobRunner runner = new JobRunner("myexecutionid", node, props, workingDir);
+		runner.addListener(eventCollector);
+		
+		// Should be killed.
+		Assert.assertTrue(runner.getStatus() == Status.KILLED);
+		eventCollector.handleEvent(Event.create(null, Type.JOB_STARTED));
+		runner.run();
+		eventCollector.handleEvent(Event.create(null, Type.JOB_KILLED));
+		
+		// Should just skip the run and not change
+		Assert.assertTrue(runner.getStatus() == node.getStatus());
+		Assert.assertTrue(node.getStatus() == Status.KILLED);
+		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+		// Give it 10 ms to fail.
+		Assert.assertTrue( node.getEndTime() - node.getStartTime() < 10);
+		
+		// Log file and output files should not exist.
+		Props outputProps = runner.getOutputProps();
+		Assert.assertTrue(outputProps == null);
+		Assert.assertTrue(runner.getLogFilePath() == null);
+		
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.JOB_KILLED});
+		}
+		catch (Exception e) {
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelRun() {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		Props props = createProps(5, true);
+		ExecutableNode node = new ExecutableNode();
+		node.setId("myjobid");
+		
+		JobRunner runner = new JobRunner("myexecutionid", node, props, workingDir);
+		runner.addListener(eventCollector);
+		Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED || runner.getStatus() != Status.FAILED);
+		
+		eventCollector.handleEvent(Event.create(null, Type.JOB_STARTED));
+		Thread thread = new Thread(runner);
+		thread.run();
+		
+		eventCollector.handleEvent(Event.create(null, Type.JOB_KILLED));
+		synchronized(this) {
+			try {
+				wait(1000);
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			runner.cancel();
+		}
+		
+		Assert.assertTrue(runner.getStatus() == node.getStatus());
+		Assert.assertTrue(node.getStatus() == Status.KILLED);
+		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
+		// Give it 10 ms to fail.
+		Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);
+		
+		// Log file and output files should not exist.
+		File logFile = new File(runner.getLogFilePath());
+		Props outputProps = runner.getOutputProps();
+		Assert.assertTrue(outputProps == null);
+		Assert.assertTrue(logFile.exists());
+		Assert.assertTrue(eventCollector.checkOrdering());
+		
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FAILED});
+		}
+		catch (Exception e) {
+			System.out.println(e.getMessage());
+			
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	private Props createProps( int sleepSec, boolean fail) {
+		Props props = new Props();
+		props.put("type", "java");
+		props.put(JavaJob.JOB_CLASS, "azkaban.test.executor.SleepJavaJob");
+		props.put("seconds", 1);
+		props.put(ProcessJob.WORKING_DIR, workingDir.getPath());
+		props.put("fail", String.valueOf(fail));
+
+		return props;
+	}
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/SleepJavaJob.java b/unit/java/azkaban/test/executor/SleepJavaJob.java
new file mode 100644
index 0000000..1ff0ec8
--- /dev/null
+++ b/unit/java/azkaban/test/executor/SleepJavaJob.java
@@ -0,0 +1,36 @@
+package azkaban.test.executor;
+
+import java.util.Map;
+
+import azkaban.utils.Props;
+
+public class SleepJavaJob {
+	private Props props;
+	@SuppressWarnings("unchecked")
+	public SleepJavaJob(String id, Map<String, String> parameters) {
+		props = new Props(null, parameters);
+
+		System.out.println("Properly created");
+	}
+	
+	public void run() throws Exception {
+		int sec = props.getInt("seconds");
+		boolean fail = props.getBoolean("fail", false);
+		synchronized(this) {
+			try {
+				this.wait(sec*1000);
+			} catch (InterruptedException e) {
+				System.out.println("Interrupted");
+			}
+		}
+		
+		if (fail) {
+			throw new Exception("I failed because I had to.");
+		}
+	}
+	
+	public void cancel() throws Exception {
+		System.out.println("Cancelled called");
+		this.notifyAll();
+	}
+}
diff --git a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
index d9edc3d..ffe68d7 100644
--- a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
@@ -95,7 +95,7 @@ public class JavaJobTest
 //    
 //    EasyMock.replay(descriptor);
     
-    job = new JavaJob(props, log);
+    job = new JavaJob("jestJava", props, log);
     
 //    EasyMock.verify(descriptor);
   }
@@ -112,6 +112,18 @@ public class JavaJobTest
   }
   
   @Test
+  public void testJavaJobHashmap() {
+    /* initialize the Props */
+    props.put(JavaJob.JOB_CLASS, "azkaban.test.executor.SleepJavaJob");
+    props.put("seconds", 1);
+    props.put(ProcessJob.WORKING_DIR, ".");
+    props.put("input", inputFile);
+    props.put("output", outputFile);
+    props.put("classpath",  classPaths);
+    job.run();
+  }
+  
+  @Test
   public void testFailedJavaJob() {
     props.put(JavaJob.JOB_CLASS, "azkaban.test.jobExecutor.WordCountLocal");
     props.put(ProcessJob.WORKING_DIR, ".");
diff --git a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
index 861601c..1ec8c5b 100644
--- a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
@@ -34,7 +34,7 @@ public class ProcessJobTest
 //    
 //    EasyMock.replay(props);
     
-    job = new ProcessJob(props, log);
+    job = new ProcessJob("TestProcess", props, log);
     
 
   }
diff --git a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
index f86ed90..0881eff 100644
--- a/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/PythonJobTest.java
@@ -90,7 +90,7 @@ public class PythonJobTest
 //    EasyMock.expect(descriptor.getProps()).andReturn(props).times(3);
 //    EasyMock.expect(descriptor.getFullPath()).andReturn(".").times(1);
 //    EasyMock.replay(descriptor);
-    job = new PythonJob(props, log);
+    job = new PythonJob("TestProcess", props, log);
 //    EasyMock.verify(descriptor);
     try
     {