azkaban-uncached

Details

diff --git a/src/java/azkaban/executor/event/Event.java b/src/java/azkaban/executor/event/Event.java
index 21b1ec7..555fd51 100644
--- a/src/java/azkaban/executor/event/Event.java
+++ b/src/java/azkaban/executor/event/Event.java
@@ -8,7 +8,8 @@ public class Event {
 		JOB_STARTED,
 		JOB_SUCCEEDED,
 		JOB_FAILED,
-		JOB_KILLED
+		JOB_KILLED,
+		JOB_SKIPPED
 	}
 	
 	private final Object runner;
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 7d0efba..5591f87 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -42,7 +42,6 @@ public class ExecutableFlow {
 	
 	private Integer pipelineLevel = null;
 	private Map<String, String> flowParameters = new HashMap<String, String>();
-	private Props globalProps;
 	
 	public enum FailureAction {
 		FINISH_CURRENTLY_RUNNING,
@@ -64,14 +63,6 @@ public class ExecutableFlow {
 		this.setFlow(flow);
 	}
 	
-	public void setGlobalProps(Props props) {
-		globalProps = props;
-	}
-	
-	public Props getGlobalProps() {
-		return globalProps;
-	}
-	
 	public ExecutableFlow() {
 	}
 	
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index d479d83..a47f4a2 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -5,10 +5,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -25,19 +23,18 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
 import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.FailureAction;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.event.Event;
 import azkaban.executor.event.Event.Type;
 import azkaban.executor.event.EventHandler;
 import azkaban.executor.event.EventListener;
 import azkaban.flow.FlowProps;
-import azkaban.jobExecutor.utils.JobWrappingFactory;
 import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.Props;
 
 public class FlowRunner extends EventHandler implements Runnable {
-	private static final Layout DEFAULT_LAYOUT = new PatternLayout(
-			"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
 
 	public static final int NUM_CONCURRENT_THREADS = 10;
 
@@ -63,12 +60,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private Thread currentThread;
 	
 	private List<String> jobsFinished;
-
-	public enum FailedFlowOptions {
-		FINISH_RUNNING_JOBS, KILL_ALL
-	}
-
-	private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
+	// Underlying global properties
+	private Props globalProps = null;	
+	// Used to override all regular properties.
+	private Props flowOverrideProps = null;
+	
+	private FailureAction failedAction;
 	
 	public FlowRunner(ExecutableFlow flow) {
 		this.flow = flow;
@@ -78,6 +75,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.listener = new JobRunnerEventListener(this);
 		this.jobsFinished = new ArrayList<String>();
 
+		if (flow.getFlowParameters() != null && !flow.getFlowParameters().isEmpty()) {
+			flowOverrideProps = new Props(null, flow.getFlowParameters()); 
+		}
+		failedAction = flow.getFailureAction();
 		createLogger();
 	}
 
@@ -120,34 +121,49 @@ public class FlowRunner extends EventHandler implements Runnable {
 		executorService.shutdownNow();
 
 		if (pausedJobsToRun.size() > 0) {
-			logger.info("Cancelling... Clearing paused jobs queue of size "
-					+ pausedJobsToRun.size());
+			logger.info("Cancelling... Clearing paused jobs queue of size " + pausedJobsToRun.size());
 			pausedJobsToRun.clear();
 		}
 
-		// Loop through job runners
 		for (JobRunner runner : runningJobs.values()) {
-			if (runner.getStatus() == Status.WAITING
-					|| runner.getStatus() == Status.RUNNING
-					|| runner.getStatus() == Status.PAUSED) {
+			if (runner.getStatus() == Status.WAITING || 
+					runner.getStatus() == Status.RUNNING || 
+					runner.getStatus() == Status.PAUSED ) {
+				
 				logger.info("Cancelling... Killing job "
 						+ runner.getNode().getId() + " with status "
 						+ runner.getStatus());
 				runner.cancel();
 			}
 		}
-
+		
+		runningJobs.clear();
+		long endTime = System.currentTimeMillis();
+		for (ExecutableNode fnode : flow.getExecutableNodes()) {
+			switch (fnode.getStatus()) {
+			case UNKNOWN:
+			case READY:
+				fnode.setStatus(Status.KILLED);
+				fnode.setStartTime(endTime);
+				fnode.setEndTime(endTime);
+			default:
+			}
+		}
+		
 		logger.info("Flow cancelled.");
-		if (flow.getStatus() != Status.FAILED) {
+		if (flow.getStatus() == Status.FAILED_FINISHING) {
+			flow.setStatus(Status.FAILED);
+		}
+		else if (flow.getStatus() != Status.FAILED) {
 			flow.setStatus(Status.KILLED);
 		}
-		flow.setEndTime(System.currentTimeMillis());
-		this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+		long time = System.currentTimeMillis();
+		flow.setStartTime(time);
+		flow.setEndTime(time);
 	}
 
 	public synchronized void pause(String user) {
-		if (flow.getStatus() == Status.RUNNING
-				|| flow.getStatus() == Status.WAITING) {
+		if (flow.getStatus() == Status.RUNNING || flow.getStatus() == Status.WAITING) {
 			logger.info("Flow paused by " + user);
 			paused = true;
 			flow.setStatus(Status.PAUSED);
@@ -215,8 +231,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				runningJobs.put(startNode, jobRunner);
 			}
 		} catch (IOException e) {
-			logger.error("Starting job queueing failed due to "
-					+ e.getMessage());
+			logger.error("Starting job queueing failed due to " + e.getMessage());
 			flow.setStatus(Status.FAILED);
 			jobsToRun.clear();
 			runningJobs.clear();
@@ -242,7 +257,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (runner != null) {
 				try {
 					ExecutableNode node = runner.getNode();
-					node.setStatus(Status.WAITING);
 					executorService.submit(runner);
 					logger.info("Job Started " + node.getId());
 				} catch (RejectedExecutionException e) {
@@ -268,23 +282,18 @@ public class FlowRunner extends EventHandler implements Runnable {
 			try {
 				executorService.awaitTermination(1, TimeUnit.SECONDS);
 			} catch (InterruptedException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
 			}
 		}
 
 		flow.setEndTime(System.currentTimeMillis());
 		if (flow.getStatus() == Status.RUNNING) {
-			logger.info("Flow finished successfully in "
-					+ (flow.getEndTime() - flow.getStartTime()) + " ms.");
+			logger.info("Flow finished successfully in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
 			flow.setStatus(Status.SUCCEEDED);
 		} else if (flow.getStatus() == Status.KILLED) {
-			logger.info("Flow was killed in "
-					+ (flow.getEndTime() - flow.getStartTime()) + " ms.");
+			logger.info("Flow was killed in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
 			flow.setStatus(Status.KILLED);
 		} else {
-			logger.info("Flow finished with failures in "
-					+ (flow.getEndTime() - flow.getStartTime()) + " ms.");
+			logger.info("Flow finished with failures in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
 			flow.setStatus(Status.FAILED);
 		}
 
@@ -293,14 +302,20 @@ public class FlowRunner extends EventHandler implements Runnable {
 		closeLogger();
 	}
 
-	private JobRunner createJobRunner(ExecutableNode node, Props previousOutput)
-			throws IOException {
+	private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) throws IOException {
 		String source = node.getJobPropsSource();
 		String propsSource = node.getPropsSource();
 
-		Props parentProps = propsSource == null ? flow.getGlobalProps() : sharedProps
-				.get(propsSource);
+		// If no properties are set, we just set the global properties.
+		Props parentProps = propsSource == null ? globalProps : sharedProps.get(propsSource);
 
+		// Set up overrides
+		if (flowOverrideProps != null) {
+			Props clonedOverride = Props.clone(flowOverrideProps);
+			clonedOverride.setParent(parentProps);
+			parentProps = clonedOverride;
+		}
+		
 		// We add the previous job output and put into this props.
 		if (previousOutput != null) {
 			Props earliestParent = previousOutput.getEarliestAncestor();
@@ -308,7 +323,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 			parentProps = earliestParent;
 		}
-
+		
+		// Load job file.
 		File propsFile = new File(basePath, source);
 		Props jobProps = new Props(parentProps, propsFile);
 
@@ -342,11 +358,19 @@ public class FlowRunner extends EventHandler implements Runnable {
 			else {
 				String source = fprops.getSource();
 				Props props = sharedProps.get(source);
-				props.setParent(flow.getGlobalProps());
+				props.setParent(globalProps);
 			}
 		}
 	}
 
+	public void setGlobalProps(Props props) {
+		globalProps = props;
+	}
+	
+	public Props getGlobalProps() {
+		return globalProps;
+	}
+	
 	private void interrupt() {
 		currentThread.interrupt();
 	}
@@ -356,21 +380,31 @@ public class FlowRunner extends EventHandler implements Runnable {
 			return;
 		}
 
+		// Check killed case.
 		for (String dependent : node.getOutNodes()) {
 			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-
+			
 			// Check all dependencies
 			boolean ready = true;
 			for (String dependency : dependentNode.getInNodes()) {
-				ExecutableNode dependencyNode = flow
-						.getExecutableNode(dependency);
-				if (dependencyNode.getStatus() != Status.SUCCEEDED
-						&& dependencyNode.getStatus() != Status.DISABLED) {
+				ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
+				Status depStatus = dependencyNode.getStatus();
+				if (depStatus == Status.FAILED || depStatus == Status.KILLED) {
+					// We trickle failures down the graph.
+					dependencyNode.setStatus(Status.KILLED);
+				}
+				else if (depStatus == Status.SUCCEEDED || depStatus == Status.SKIPPED) {
+					// We do nothing here. We proceed happily.
+				}
+				else {
+					// In this state, it's running, or waiting. Either way, we don't proceed.
 					ready = false;
 					break;
 				}
+
 			}
 
+			// Dependency has been met.
 			if (ready) {
 				Props previousOutput = null;
 				// Iterate the in nodes again and create the dependencies
@@ -386,8 +420,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 				JobRunner runner = null;
 				try {
-					runner = this
-							.createJobRunner(dependentNode, previousOutput);
+					runner = this.createJobRunner(dependentNode, previousOutput);
 				} catch (IOException e) {
 					logger.error("JobRunner creation failed due to "
 							+ e.getMessage());
@@ -414,24 +447,46 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 	private void handleFailedJob(ExecutableNode node) {
 		System.err.println("Job " + node.getId() + " failed.");
-		this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
-
-		switch (failedOptions) {
+		if (flow.getStatus() != Status.FAILED_FINISHING && flow.getStatus() != Status.FAILED) {
+			this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
+		}
+		
+		switch (failedAction) {
 		// We finish running current jobs and then fail. Do not accept new jobs.
-		case FINISH_RUNNING_JOBS:
+		case FINISH_CURRENTLY_RUNNING:
+			logger.info("Failure Action: Finish up remaining running jobs.");
 			flow.setStatus(Status.FAILED_FINISHING);
 			runningJobs.clear();
 			executorService.shutdown();
+			
+			// Go through and mark everything else killed.
+			long endTime = System.currentTimeMillis();
+			for (ExecutableNode fnode : flow.getExecutableNodes()) {
+				switch (fnode.getStatus()) {
+				case UNKNOWN:
+				case READY:
+					fnode.setStatus(Status.KILLED);
+					fnode.setStartTime(endTime);
+					fnode.setEndTime(endTime);
+				default:
+				}
+			}
+			
 			break;
 		// We kill all running jobs and fail immediately
-		case KILL_ALL:
+		case CANCEL_ALL:
+			logger.info("Failure Action: Kill flow immediately.");
+			flow.setStatus(Status.FAILED);
 			this.cancel("azkaban");
 			break;
+		default:
+			logger.info("Failure Action: Finishing accessible jobs.");
+			flow.setStatus(Status.FAILED_FINISHING);
 		}
 
 		runningJobs.remove(node.getId());
 	}
-
+	
 	private class JobRunnerEventListener implements EventListener {
 		private FlowRunner flowRunner;
 
@@ -447,26 +502,37 @@ public class FlowRunner extends EventHandler implements Runnable {
 			System.out.println("Event " + jobID + " "
 					+ event.getType().toString());
 
-			
-			
 			// On Job success, we add the output props and then set up the next
 			// run.
-			if (event.getType() == Type.JOB_SUCCEEDED) {
-				logger.info("Job Succeeded " + jobID + " in "
-						+ (node.getEndTime() - node.getStartTime()) + " ms");
-
+			boolean handleFailure = false;
+			switch(event.getType()) {
+				case JOB_SUCCEEDED:
+					logger.info("Job Succeeded " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
+					break;
+				case JOB_FAILED:
+					logger.info("Job Failed " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
+					handleFailure = true;
+					break;
+				case JOB_KILLED:
+					logger.info("Job Killed " + jobID + " at " + node.getEndTime() + " ms");
+					break;
+				case JOB_SKIPPED:
+					logger.info("Job Disabled and skipped " + jobID + " at " + node.getEndTime() + " ms");
+					break;
+				default:
+					return;
+			}
+			
+			if (handleFailure) {
+				jobsFinished.add(jobID);
+				logger.info(jobID + " FAILED");
+				flowRunner.handleFailedJob(runner.getNode());
+			}
+			else {
 				jobsFinished.add(jobID);
 				Props props = runner.getOutputProps();
 				outputProps.put(jobID, props);
 				flowRunner.handleSucceededJob(runner.getNode());
-			} else if (event.getType() == Type.JOB_FAILED) {
-
-				logger.info("Job Failed " + jobID + " in "
-						+ (node.getEndTime() - node.getStartTime()) + " ms");
-
-				jobsFinished.add(jobID);
-				logger.info(jobID + " FAILED");
-				flowRunner.handleFailedJob(runner.getNode());
 			}
 
 			flowRunner.commitFlow();
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 2e41f11..3107491 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -69,10 +69,10 @@ public class FlowRunnerManager {
 		File dir = new File(path);
 		ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
 		flow.setExecutionPath(path);
-		flow.setGlobalProps(globalProps);
 		
 		FlowRunner runner = new FlowRunner(flow);
 		runningFlows.put(id, runner);
+		runner.setGlobalProps(globalProps);
 		runner.addListener(eventListener);
 		executorService.submit(runner);
 	}
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 71e4e87..1e454c1 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -140,6 +140,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	}
 
 	public synchronized void cancel() {
+		logError("Cancel has been called.");
 		// Cancel code here
 		if (job == null) {
 			logError("Job doesn't exist!");
diff --git a/src/java/azkaban/jobExecutor/LongArgJob.java b/src/java/azkaban/jobExecutor/LongArgJob.java
index 7306a4e..70a35d6 100644
--- a/src/java/azkaban/jobExecutor/LongArgJob.java
+++ b/src/java/azkaban/jobExecutor/LongArgJob.java
@@ -66,7 +66,7 @@ public abstract class LongArgJob extends AbstractProcessJob {
             info("Environment variables: " + builder.getEnv());
         info("Working directory: " + builder.getWorkingDir());
         
-       File [] propFiles = initPropsFiles( );
+       File [] propFiles = initPropsFiles();
        //System.err.println("outputfile=" + propFiles[1]);
         
         boolean success = false;
@@ -124,6 +124,4 @@ public abstract class LongArgJob extends AbstractProcessJob {
             if(!suppressed.contains(key))
                 builder.addArg("--" + key, props.get(key));
     }
-   
-
 }
diff --git a/src/java/azkaban/jobExecutor/ProcessJob.java b/src/java/azkaban/jobExecutor/ProcessJob.java
index 42a1925..8024fdd 100644
--- a/src/java/azkaban/jobExecutor/ProcessJob.java
+++ b/src/java/azkaban/jobExecutor/ProcessJob.java
@@ -22,11 +22,17 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import azkaban.jobExecutor.utils.process.AzkabanProcess;
+import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
 import azkaban.utils.Props;
 
 /*
@@ -35,105 +41,66 @@ import azkaban.utils.Props;
  * @author jkreps
  * 
  */
-public class ProcessJob extends AbstractProcessJob implements Job {
+public class ProcessJob extends AbstractProcessJob {
 
 	public static final String COMMAND = "command";
-	public static final int CLEAN_UP_TIME_MS = 1000;
-
-	private volatile Process _process;
-	private volatile boolean _isComplete;
-	private volatile boolean _isCancelled;
+    private static final long KILL_TIME_MS = 5000;
+    private volatile AzkabanProcess process;
 
 	public ProcessJob(final String jobId, final Props props, final Logger log) {
 		super(jobId, props, log);
 	}
 
 	@Override
-	public void run() {
-		synchronized (this) {
-			_isCancelled = false;
-		}
+	public void run() throws Exception {
 		resolveProps();
-
-		// Sets a list of all the commands that need to be run.
 		List<String> commands = getCommandList();
-		info(commands.size() + " commands to execute.");
 
-		File[] propFiles = initPropsFiles();
+		long startMs = System.currentTimeMillis();
 
-		// System.err.println("in process job outputFile=" +propFiles[1]);
+		info(commands.size() + " commands to execute.");
+		File[] propFiles = initPropsFiles();
+		Map<String, String> envVars = getEnvironmentVariables();
 
-		// For each of the jobs, set up a process and run them.
 		for (String command : commands) {
-			info("Executing command: " + command);
-			String[] cmdPieces = partitionCommandLine(command);
-
-			ProcessBuilder builder = new ProcessBuilder(cmdPieces);
+			AzkabanProcessBuilder builder = new AzkabanProcessBuilder(partitionCommandLine(command))
+					.setEnv(envVars)
+					.setWorkingDir(getCwd())
+					.setLogger(getLog());
+
+			info("Command: " + builder.getCommandString());
+			if (builder.getEnv().size() > 0) {
+				info("Environment variables: " + builder.getEnv());
+			}
+			info("Working directory: " + builder.getWorkingDir());
 
-			builder.directory(new File(getCwd()));
-			builder.environment().putAll(getEnvironmentVariables());
+			boolean success = false;
+			this.process = builder.build();
 
 			try {
-				_process = builder.start();
-			} catch (IOException e) {
-				for (File file : propFiles) {
-					if (file != null && file.exists()) {
+				this.process.run();
+				success = true;
+			} catch (Exception e) {
+				for (File file : propFiles)
+					if (file != null && file.exists())
 						file.delete();
-					}
-				}
 				throw new RuntimeException(e);
-			}
-			LoggingGobbler outputGobbler = new LoggingGobbler(
-					new InputStreamReader(_process.getInputStream()),
-					Level.INFO);
-			LoggingGobbler errorGobbler = new LoggingGobbler(
-					new InputStreamReader(_process.getErrorStream()),
-					Level.ERROR);
-
-			int processId = getProcessId();
-			if (processId == 0) {
-				info("Spawned thread. Unknowned processId");
-			} else {
-				info("Spawned thread with processId " + processId);
-			}
-			outputGobbler.start();
-			errorGobbler.start();
-			int exitCode = -999;
-			try {
-				exitCode = _process.waitFor();
-
-				_isComplete = true;
-				if (exitCode != 0) {
-					for (File file : propFiles) {
-						if (file != null && file.exists()) {
-							file.delete();
-						}
-					}
-					throw new RuntimeException(
-							"Processes ended with exit code " + exitCode + ".");
-				}
-
-				// try to wait for everything to get logged out before exiting
-				outputGobbler.join(1000);
-				errorGobbler.join(1000);
-			} catch (InterruptedException e) {
 			} finally {
-				outputGobbler.close();
-				errorGobbler.close();
+				this.process = null;
+				info("Process completed " + (success ? "successfully" : "unsuccessfully") + " in "
+						+ ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
 			}
 		}
 
 		// Get the output properties from this job.
 		generateProperties(propFiles[1]);
 
-		for (File file : propFiles) {
-			if (file != null && file.exists()) {
+		for (File file : propFiles)
+			if (file != null && file.exists())
 				file.delete();
-			}
-		}
-
 	}
 
+
 	protected List<String> getCommandList() {
 		List<String> commands = new ArrayList<String>();
 		commands.add(_props.getString(COMMAND));
@@ -144,119 +111,24 @@ public class ProcessJob extends AbstractProcessJob implements Job {
 		return commands;
 	}
 
-	@Override
-	public void cancel() throws Exception {
-		if (_process != null) {
-			int processId = getProcessId();
-			if (processId != 0) {
-				warn("Attempting to kill the process " + processId);
-				try {
-					Runtime.getRuntime().exec("kill " + processId);
-					synchronized (this) {
-						wait(CLEAN_UP_TIME_MS);
-					}
-				} catch (InterruptedException e) {
-					// Do nothing. We don't really care.
-				}
-				if (!_isComplete) {
-					error("After "
-							+ CLEAN_UP_TIME_MS
-							+ " ms, the job hasn't terminated. Will force terminate the job.");
-				}
-			} else {
-				info("Could not get process id");
-			}
-
-			if (!_isComplete) {
-				warn("Force kill the process");
-				_process.destroy();
-			}
-			synchronized (this) {
-				_isCancelled = true;
-			}
-		}
-	}
-
+    @Override
+    public void cancel() throws InterruptedException {
+        if(process == null)
+            throw new IllegalStateException("Not started.");
+        boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
+        if(!killed) {
+            warn("Kill with signal TERM failed. Killing with KILL signal.");
+            process.hardKill();
+        }
+    }
+
+    @Override
+    public double getProgress() {
+        return process != null && process.isComplete()? 1.0 : 0.0;
+    }
+    
 	public int getProcessId() {
-		int processId = 0;
-
-		try {
-			Field f = _process.getClass().getDeclaredField("pid");
-			f.setAccessible(true);
-
-			processId = f.getInt(_process);
-		} catch (Throwable e) {
-		}
-
-		return processId;
-	}
-
-	@Override
-	public double getProgress() {
-		return _isComplete ? 1.0 : 0.0;
-	}
-
-	private class LoggingGobbler extends Thread {
-
-		private final BufferedReader _inputReader;
-		private final Level _loggingLevel;
-
-		public LoggingGobbler(final InputStreamReader inputReader,
-				final Level level) {
-			_inputReader = new BufferedReader(inputReader);
-			_loggingLevel = level;
-		}
-
-		public void close() {
-			if (_inputReader != null) {
-				try {
-					_inputReader.close();
-				} catch (IOException e) {
-					error("Error cleaning up logging stream reader:", e);
-				}
-			}
-		}
-
-		@Override
-		public void run() {
-			try {
-				while (!Thread.currentThread().isInterrupted()) {
-					String line = _inputReader.readLine();
-					if (line == null) {
-						return;
-					}
-
-					logMessage(line);
-				}
-			} catch (IOException e) {
-				error("Error reading from logging stream:", e);
-			}
-		}
-
-		private void logMessage(final String message) {
-			if (message.startsWith(Level.DEBUG.toString())) {
-				String newMsg = message.substring(Level.DEBUG.toString().length());
-				getLog().debug(newMsg);
-			} else if (message.startsWith(Level.ERROR.toString())) {
-				String newMsg = message.substring(Level.ERROR.toString().length());
-				getLog().error(newMsg);
-			} else if (message.startsWith(Level.INFO.toString())) {
-				String newMsg = message.substring(Level.INFO.toString().length());
-				getLog().info(newMsg);
-			} else if (message.startsWith(Level.WARN.toString())) {
-				String newMsg = message.substring(Level.WARN.toString().length());
-				getLog().warn(newMsg);
-			} else if (message.startsWith(Level.FATAL.toString())) {
-				String newMsg = message.substring(Level.FATAL.toString().length());
-				getLog().fatal(newMsg);
-			} else if (message.startsWith(Level.TRACE.toString())) {
-				String newMsg = message.substring(Level.TRACE.toString().length());
-				getLog().trace(newMsg);
-			} else {
-				getLog().log(_loggingLevel, message);
-			}
-
-		}
+		return process.getProcessId();
 	}
 
 	@Override
@@ -331,10 +203,4 @@ public class ProcessJob extends AbstractProcessJob implements Job {
 
 		return commands.toArray(new String[commands.size()]);
 	}
-
-	@Override
-	public synchronized boolean isCanceled() {
-		return _isCancelled;
-	}
-
 }
diff --git a/src/web/js/azkaban.project.view.js b/src/web/js/azkaban.project.view.js
index e416a6a..6e0613f 100644
--- a/src/web/js/azkaban.project.view.js
+++ b/src/web/js/azkaban.project.view.js
@@ -30,8 +30,8 @@ azkaban.ProjectView= Backbone.View.extend({
         position: ["20%",],
         containerId: 'confirm-container',
         containerCss: {
-          'height': '220px',
-          'width': '565px'
+          'height': '240px',
+          'width': '640px'
         },
         onShow: function (dialog) {
           var modal = this;
diff --git a/unit/executions/exectest1/exec2.flow b/unit/executions/exectest1/exec2.flow
new file mode 100644
index 0000000..f31c99e
--- /dev/null
+++ b/unit/executions/exectest1/exec2.flow
@@ -0,0 +1,154 @@
+{
+  "id" : "derived-member-data",
+  "success.email" : [],
+  "edges" : [ {
+    "source" : "job1",
+    "target" : "job2d"
+  }, {
+    "source" : "job2d",
+    "target" : "job3"
+  },{
+    "source" : "job2d",
+    "target" : "job4"
+  }, {
+    "source" : "job3",
+    "target" : "job5"
+  },{
+    "source" : "job4",
+    "target" : "job5"
+  },{
+    "source" : "job5",
+    "target" : "job7"
+  },{
+    "source" : "job1",
+    "target" : "job6"
+  },{
+    "source" : "job6",
+    "target" : "job7"
+  },{
+    "source" : "job7",
+    "target" : "job8"
+  },{
+    "source" : "job7",
+    "target" : "job9"
+  },
+  {
+    "source" : "job8",
+    "target" : "job10"
+  },
+  {
+    "source" : "job9",
+    "target" : "job10"
+  }
+   ],
+  "failure.email" : [],
+  "nodes" : [ {
+    "propSource" : "prop2.properties",
+    "id" : "job1",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job1.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job2d",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job2d.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job3",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job3.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job4",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job4.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job5",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job5.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job6",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job6.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job7",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job7.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job8",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job8.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job9",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job9.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job10",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job10.job",
+    "expectedRuntime" : 1
+  }
+   ],
+  "layedout" : false,
+  "type" : "flow",
+  "props" : [ {
+    "inherits" : "prop1.properties",
+    "source" : "prop2.properties"
+  },{
+    "source" : "prop1.properties"
+  }]
+}
\ No newline at end of file
diff --git a/unit/executions/exectest1/job2d.job b/unit/executions/exectest1/job2d.job
new file mode 100644
index 0000000..b4216ba
--- /dev/null
+++ b/unit/executions/exectest1/job2d.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+dependencies=job1
+seconds=1
+fail=true
diff --git a/unit/executions/exectest1/job6.job b/unit/executions/exectest1/job6.job
index fc4474d..b5df29c 100644
--- a/unit/executions/exectest1/job6.job
+++ b/unit/executions/exectest1/job6.job
@@ -1,5 +1,5 @@
 type=java
 job.class=azkaban.test.executor.SleepJavaJob
 dependencies=job1
-seconds=1
+seconds=4
 fail=false
diff --git a/unit/java/azkaban/test/executor/EventCollectorListener.java b/unit/java/azkaban/test/executor/EventCollectorListener.java
index 6e00681..e648c2a 100644
--- a/unit/java/azkaban/test/executor/EventCollectorListener.java
+++ b/unit/java/azkaban/test/executor/EventCollectorListener.java
@@ -18,6 +18,13 @@ public class EventCollectorListener implements EventListener {
 		return eventList;
 	}
 	
+	public void writeAllEvents() {
+		for (Event event: eventList) {
+			System.out.print(event.getType());
+			System.out.print(",");
+		}
+	}
+	
 	public boolean checkOrdering() {
 		long time = 0;
 		for (Event event: eventList) {
diff --git a/unit/java/azkaban/test/executor/FlowRunnerTest.java b/unit/java/azkaban/test/executor/FlowRunnerTest.java
index f6d6786..87a593c 100644
--- a/unit/java/azkaban/test/executor/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/executor/FlowRunnerTest.java
@@ -3,6 +3,7 @@ package azkaban.test.executor;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 
 import junit.framework.Assert;
 
@@ -13,7 +14,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableFlow.FailureAction;
 import azkaban.executor.FlowRunner;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.event.Event.Type;
 import azkaban.flow.Flow;
 import azkaban.utils.JSONUtils;
 
@@ -45,13 +50,121 @@ public class FlowRunnerTest {
 	}
 	
 	@Test
-	public void exec1() throws Exception {
+	public void exec1Normal() throws Exception {
 		File testDir = new File("unit/executions/exectest1");
 		ExecutableFlow exFlow = prepareExecDir(testDir, "exec1");
 		
+		EventCollectorListener eventCollector = new EventCollectorListener();
 		FlowRunner runner = new FlowRunner(exFlow);
+		runner.addListener(eventCollector);
 		
+		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+		runner.run();
+		Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
+		Assert.assertTrue(runner.getJobsFinished().size() == exFlow.getExecutableNodes().size());
+		compareFinishedRuntime(runner);
 		
+		testStatus(exFlow, "job1", Status.SUCCEEDED);
+		testStatus(exFlow, "job2", Status.SUCCEEDED);
+		testStatus(exFlow, "job3", Status.SUCCEEDED);
+		testStatus(exFlow, "job4", Status.SUCCEEDED);
+		testStatus(exFlow, "job5", Status.SUCCEEDED);
+		testStatus(exFlow, "job6", Status.SUCCEEDED);
+		testStatus(exFlow, "job7", Status.SUCCEEDED);
+		testStatus(exFlow, "job8", Status.SUCCEEDED);
+		testStatus(exFlow, "job9", Status.SUCCEEDED);
+		testStatus(exFlow, "job10", Status.SUCCEEDED);
+		
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
+		}
+		catch (Exception e) {
+			System.out.println(e.getMessage());
+			
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void exec1Failed() throws Exception {
+		File testDir = new File("unit/executions/exectest1");
+		ExecutableFlow exFlow = prepareExecDir(testDir, "exec2");
+		
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = new FlowRunner(exFlow);
+		runner.addListener(eventCollector);
+		
+		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+		runner.run();
+		
+		Assert.assertTrue(exFlow.getStatus() == Status.FAILED);
+		
+		testStatus(exFlow, "job1", Status.SUCCEEDED);
+		testStatus(exFlow, "job2d", Status.FAILED);
+		testStatus(exFlow, "job3", Status.KILLED);
+		testStatus(exFlow, "job4", Status.KILLED);
+		testStatus(exFlow, "job5", Status.KILLED);
+		testStatus(exFlow, "job6", Status.SUCCEEDED);
+		testStatus(exFlow, "job7", Status.KILLED);
+		testStatus(exFlow, "job8", Status.KILLED);
+		testStatus(exFlow, "job9", Status.KILLED);
+		testStatus(exFlow, "job10", Status.KILLED);
+
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FAILED_FINISHING, Type.FLOW_FINISHED});
+		}
+		catch (Exception e) {
+			System.out.println(e.getMessage());
+			
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void exec1FailedKillAll() throws Exception {
+		File testDir = new File("unit/executions/exectest1");
+		ExecutableFlow exFlow = prepareExecDir(testDir, "exec2");
+		exFlow.setFailureAction(FailureAction.CANCEL_ALL);
+
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = new FlowRunner(exFlow);
+		runner.addListener(eventCollector);
+		
+		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+		runner.run();
+		
+		Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
+		
+		testStatus(exFlow, "job1", Status.SUCCEEDED);
+		testStatus(exFlow, "job2d", Status.FAILED);
+		testStatus(exFlow, "job3", Status.KILLED);
+		testStatus(exFlow, "job4", Status.KILLED);
+		testStatus(exFlow, "job5", Status.KILLED);
+		testStatus(exFlow, "job6", Status.FAILED);
+		testStatus(exFlow, "job7", Status.KILLED);
+		testStatus(exFlow, "job8", Status.KILLED);
+		testStatus(exFlow, "job9", Status.KILLED);
+		testStatus(exFlow, "job10", Status.KILLED);
+		
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FAILED_FINISHING, Type.FLOW_FINISHED});
+		}
+		catch (Exception e) {
+			System.out.println(e.getMessage());
+			eventCollector.writeAllEvents();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	private void testStatus(ExecutableFlow flow, String name, Status status) {
+		ExecutableNode node = flow.getExecutableNode(name);
+		
+		if (node.getStatus() != status) {
+			Assert.fail("Status of job " + node.getId() + " is " + node.getStatus() + " not " + status + " as expected.");
+		}
 	}
 	
 	private ExecutableFlow prepareExecDir(File execDir, String execName) throws IOException {
@@ -66,4 +179,31 @@ public class FlowRunnerTest {
 		return execFlow;
 	}
 
+	private void compareFinishedRuntime(FlowRunner runner) throws Exception {
+		ExecutableFlow flow = runner.getFlow();
+		for (String flowName: flow.getStartNodes()) {
+			ExecutableNode node = flow.getExecutableNode(flowName);
+			compareStartFinishTimes(flow, node, 0);
+		}
+	}
+	
+	private void compareStartFinishTimes(ExecutableFlow flow, ExecutableNode node, long previousEndTime) throws Exception {
+		long startTime = node.getStartTime();
+		long endTime = node.getEndTime();
+		
+		// If start time is < 0, so will the endtime.
+		if (startTime <= 0) {
+			Assert.assertTrue(endTime <=0);
+			return;
+		}
+		
+		System.out.println("Node " + node.getId() + " start:" + startTime + " end:" + endTime + " previous:" + previousEndTime);
+		Assert.assertTrue("Checking start and end times", startTime > 0 && endTime >= startTime);
+		Assert.assertTrue("Start time for " + node.getId() + " is " + startTime +" and less than " + previousEndTime, startTime >= previousEndTime);
+		
+		for (String outNode : node.getOutNodes()) {
+			ExecutableNode childNode = flow.getExecutableNode(outNode);
+			compareStartFinishTimes(flow, childNode, endTime);
+		}
+	}
 }
diff --git a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
index ffe68d7..b770624 100644
--- a/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/JavaJobTest.java
@@ -101,7 +101,7 @@ public class JavaJobTest
   }
   
   @Test
-  public void testJavaJob() {
+  public void testJavaJob() throws Exception {
     /* initialize the Props */
     props.put(JavaJob.JOB_CLASS, "azkaban.test.jobExecutor.WordCountLocal");
     props.put(ProcessJob.WORKING_DIR, ".");
@@ -112,7 +112,7 @@ public class JavaJobTest
   }
   
   @Test
-  public void testJavaJobHashmap() {
+  public void testJavaJobHashmap() throws Exception {
     /* initialize the Props */
     props.put(JavaJob.JOB_CLASS, "azkaban.test.executor.SleepJavaJob");
     props.put("seconds", 1);
@@ -124,7 +124,7 @@ public class JavaJobTest
   }
   
   @Test
-  public void testFailedJavaJob() {
+  public void testFailedJavaJob() throws Exception {
     props.put(JavaJob.JOB_CLASS, "azkaban.test.jobExecutor.WordCountLocal");
     props.put(ProcessJob.WORKING_DIR, ".");
     props.put("input", errorInputFile);
diff --git a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
index 1ec8c5b..48b1b1b 100644
--- a/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
+++ b/unit/java/azkaban/test/jobExecutor/ProcessJobTest.java
@@ -40,7 +40,7 @@ public class ProcessJobTest
   }
   
   @Test
-  public void testOneUnixCommand() {
+  public void testOneUnixCommand() throws Exception {
     /* initialize the Props */
     props.put(ProcessJob.COMMAND, "ls -al");
     props.put(ProcessJob.WORKING_DIR, ".");
@@ -50,7 +50,7 @@ public class ProcessJobTest
   }
 
   @Test
-  public void testFailedUnixCommand() {
+  public void testFailedUnixCommand() throws Exception  {
     /* initialize the Props */
     props.put(ProcessJob.COMMAND, "xls -al");
     props.put(ProcessJob.WORKING_DIR, ".");
@@ -64,7 +64,7 @@ public class ProcessJobTest
   }
     
     @Test
-    public void testMultipleUnixCommands( ) {
+    public void testMultipleUnixCommands( ) throws Exception  {
       /* initialize the Props */
       props.put(ProcessJob.WORKING_DIR, ".");
       props.put(ProcessJob.COMMAND, "pwd");