azkaban-developers

Details

diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index dd21207..04f0991 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -114,12 +114,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 		flowAppender.close();
 	}
 
-	public synchronized void cancel(String user) {
-		logger.info("Cancel called by " + user);
+	private synchronized void cancel() {
 		cancelled = true;
 
-		executorService.shutdownNow();
-
+		executorService.shutdown();
+		jobsToRun.clear();
+		
 		if (pausedJobsToRun.size() > 0) {
 			logger.info("Cancelling... Clearing paused jobs queue of size " + pausedJobsToRun.size());
 			pausedJobsToRun.clear();
@@ -149,17 +149,17 @@ public class FlowRunner extends EventHandler implements Runnable {
 			default:
 			}
 		}
-		
-		logger.info("Flow cancelled.");
-		if (flow.getStatus() == Status.FAILED_FINISHING) {
-			flow.setStatus(Status.FAILED);
-		}
-		else if (flow.getStatus() != Status.FAILED) {
-			flow.setStatus(Status.KILLED);
-		}
+
 		long time = System.currentTimeMillis();
-		flow.setStartTime(time);
 		flow.setEndTime(time);
+		setStatus(flow, Status.FAILED);
+
+	}
+	
+	public synchronized void cancel(String user) {
+		logger.info("Cancel called by " + user);
+		setStatus(flow, Status.KILLED);
+		cancel();
 	}
 
 	public synchronized void pause(String user) {
@@ -192,8 +192,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		int count = commitCount.getAndIncrement();
 
 		try {
-			ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow,
-					count);
+			ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow, count);
 		} catch (ExecutorManagerException e) {
 			// TODO Auto-generated catch block
 			e.printStackTrace();
@@ -298,6 +297,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 
 		commitFlow();
+		System.out.println("Reached flow finished");
 		this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
 		closeLogger();
 	}
@@ -443,7 +443,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 	private void handleFailedJob(ExecutableNode node) {
 		System.err.println("Job " + node.getId() + " failed.");
-		if (flow.getStatus() != Status.FAILED_FINISHING && flow.getStatus() != Status.FAILED) {
+		if (flow.getStatus() != Status.FAILED_FINISHING && flow.getStatus() != Status.FAILED && flow.getStatus() != Status.KILLED) {
 			this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
 		}
 		
@@ -451,10 +451,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 		// We finish running current jobs and then fail. Do not accept new jobs.
 		case FINISH_CURRENTLY_RUNNING:
 			logger.info("Failure Action: Finish up remaining running jobs.");
-			flow.setStatus(Status.FAILED_FINISHING);
+			setStatus(flow, Status.FAILED_FINISHING);
 
 			runningJobs.clear();
-			executorService.shutdown();
 			
 			// Go through and mark everything else killed.
 			long endTime = System.currentTimeMillis();
@@ -473,18 +472,40 @@ public class FlowRunner extends EventHandler implements Runnable {
 		// We kill all running jobs and fail immediately
 		case CANCEL_ALL:
 			logger.info("Failure Action: Kill flow immediately.");
-			flow.setStatus(Status.FAILED);
-			this.cancel("azkaban");
+			setStatus(flow, Status.FAILED);
+			this.cancel();
 			break;
 		default:
 			logger.info("Failure Action: Finishing accessible jobs.");
-			flow.setStatus(Status.FAILED_FINISHING);
+			setStatus(flow, Status.FAILED_FINISHING);
 			queueNextJobs(node);
 		}
 
 		runningJobs.remove(node.getId());
 	}
 	
+	// We use this so we can have status priority.
+	private void setStatus(ExecutableFlow flow, Status status) {
+		// Here's the order we can go with the flow: 
+		if (flow.getStatus() == Status.KILLED) {
+			// Killed overrides everything.
+			return;
+		}
+		else if (flow.getStatus() == Status.FAILED_FINISHING ) {
+			if (status == Status.KILLED || status == status.FAILED) {
+				// Only override if it's KILLED or FAILED.
+				flow.setStatus(status);
+			}
+		}
+		else if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.SUCCEEDED) {
+			// Will not override a finished flow
+			return;
+		}
+		else {
+			flow.setStatus(status);
+		}
+	}
+	
 	private class JobRunnerEventListener implements EventListener {
 		private FlowRunner flowRunner;
 
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 3107491..164479c 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -24,12 +24,12 @@ import azkaban.utils.Props;
 
 /**
  * Execution manager for the server side execution.
- *
+ * 
  */
 public class FlowRunnerManager {
 	private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
 	private File basePath;
-	
+
 	private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
 	private ConcurrentHashMap<String, FlowRunner> runningFlows = new ConcurrentHashMap<String, FlowRunner>();
 	private LinkedBlockingQueue<FlowRunner> queue = new LinkedBlockingQueue<FlowRunner>();
@@ -43,206 +43,194 @@ public class FlowRunnerManager {
 	private String senderAddress;
 	private String clientHostname;
 	private String clientPortNumber;
-	
+
 	private Props globalProps;
-	
+
 	public FlowRunnerManager(Props props, Props globalProps, Mailman mailer) {
 		this.mailer = mailer;
-		
+
 		this.senderAddress = props.getString("mail.sender");
 		this.clientHostname = props.getString("jetty.hostname", "localhost");
 		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
-		
+
 		basePath = new File(props.getString("execution.directory"));
 		numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
 		executorService = Executors.newFixedThreadPool(numThreads);
 		eventListener = new FlowRunnerEventListener(this);
-		
+
 		submitterThread = new SubmitterThread(queue);
 		submitterThread.start();
 	}
-	
+
 	public void submitFlow(String id, String path) throws ExecutorManagerException {
 		// Load file and submit
 		logger.info("Flow " + id + " submitted with path " + path);
-		
+
 		File dir = new File(path);
 		ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
 		flow.setExecutionPath(path);
-		
+
 		FlowRunner runner = new FlowRunner(flow);
 		runningFlows.put(id, runner);
 		runner.setGlobalProps(globalProps);
 		runner.addListener(eventListener);
 		executorService.submit(runner);
 	}
-	
+
 	public void cancelFlow(String id, String user) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(id);
 		if (runner != null) {
 			runner.cancel(user);
 		}
 	}
-	
+
 	public void pauseFlow(String id, String user) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(id);
 		if (runner != null) {
 			runner.pause(user);
 		}
 	}
-	
+
 	public void resumeFlow(String id, String user) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(id);
 		if (runner != null) {
 			runner.resume(user);
 		}
 	}
-	
+
 	public FlowRunner getFlowRunner(String id) {
 		return runningFlows.get(id);
 	}
-	
+
 	public ExecutableFlow getExecutableFlow(String id) {
 		FlowRunner runner = runningFlows.get(id);
 		if (runner == null) {
 			return null;
 		}
-		
+
 		return runner.getFlow();
 	}
-	
+
 	private class SubmitterThread extends Thread {
 		private BlockingQueue<FlowRunner> queue;
 		private boolean shutdown = false;
-		
+
 		public SubmitterThread(BlockingQueue<FlowRunner> queue) {
 			this.queue = queue;
 		}
-		
+
 		public void shutdown() {
 			shutdown = true;
 			this.interrupt();
 		}
-		
+
 		public void run() {
-			while(!shutdown) {
+			while (!shutdown) {
 				try {
 					FlowRunner flowRunner = queue.take();
 					executorService.submit(flowRunner);
-				} 
-				catch (InterruptedException e) {
+				} catch (InterruptedException e) {
 					logger.info("Interrupted. Probably to shut down.");
 				}
 			}
 		}
 	}
-	
+
 	private class FlowRunnerEventListener implements EventListener {
 		private FlowRunnerManager manager;
-		
+
 		public FlowRunnerEventListener(FlowRunnerManager manager) {
 			this.manager = manager;
 		}
 
 		@Override
 		public synchronized void handleEvent(Event event) {
-			FlowRunner runner = (FlowRunner)event.getRunner();
+			FlowRunner runner = (FlowRunner) event.getRunner();
 			ExecutableFlow flow = runner.getFlow();
-			
-			
-			
+
 			System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
 			if (event.getType() == Type.FLOW_FINISHED) {
-				if(flow.getStatus() == Status.SUCCEEDED)
+				if (flow.getStatus() == Status.SUCCEEDED)
 					sendSuccessEmail(runner);
-				else sendErrorEmail(runner);
-				
+				else
+					sendErrorEmail(runner);
+
 				logger.info("Flow " + flow.getExecutionId() + " has finished.");
 				runningFlows.remove(flow.getExecutionId());
-				
 			}
 		}
 	}
-	
-	private List<String> getLogURLs(FlowRunner runner)
-	{
+
+	private List<String> getLogURLs(FlowRunner runner) {
 		List<String> logURLs = new ArrayList<String>();
-		
+
 		String flowID = runner.getFlow().getFlowId();
 		String execID = runner.getFlow().getExecutionId();
 		List<String> jobIDs = runner.getJobsFinished();
-		
-		//first construct log URL;
-		String logURL = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "#log";
+
+		// first construct log URL;
+		String logURL = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID
+				+ "#log";
 		logURLs.add(logURL);
-		//then the individual jobs log URL that actually ran
-		for(String jobID : jobIDs) {
-			String jobLog = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "&flow=" + flowID + "&job=" + jobID;
+		// then the individual jobs log URL that actually ran
+		for (String jobID : jobIDs) {
+			String jobLog = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid="
+					+ execID + "&flow=" + flowID + "&job=" + jobID;
 			logURLs.add(jobLog);
 		}
-		
+
 		return logURLs;
 	}
-	
-    /*
-     * Wrap a single exception with the name of the scheduled job
-     */
-    private void sendErrorEmail(FlowRunner runner) {
-    	ExecutableFlow flow = runner.getFlow();
-    	List<String> emailList = flow.getFailureEmails();
-        if(emailList != null && !emailList.isEmpty() && mailer != null) {
-        	
-        	
-        	
-        	
-            try {
-            	
-            	String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
-            	String body = "The Flow '" + flow.getFlowId() + "' failed. \n See logs below: \n" ;
-            	for(String URL : getLogURLs(runner)) {
-            		body += (URL + "\n");
-            	}
-            	
-                mailer.sendEmailIfPossible(senderAddress,
-                                             emailList,
-                                             subject,
-                                             body);
-            } catch(UnknownHostException uhe) {
-                logger.error(uhe);
-            }
-            catch (Exception e) {
-                logger.error(e);
-            }
-        }
-    }
-    
-
-    private void sendSuccessEmail(FlowRunner runner) {
-    	
-    	ExecutableFlow flow = runner.getFlow();
-
-    	List<String> emailList = flow.getSuccessEmails();
-        
-        if(emailList != null && !emailList.isEmpty() && mailer != null) {
-            try {
-            	
-            	String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
-            	String body = "The Flow '" + flow.getFlowId() + "' succeeded. \n See logs below: \n" ;
-            	for(String URL : getLogURLs(runner)) {
-            		body += (URL + "\n");
-            	}
-            	
-                mailer.sendEmailIfPossible(senderAddress,
-                                             emailList,
-                                             subject,
-                                             body);
-            } catch(UnknownHostException uhe) {
-                logger.error(uhe);
-            }
-            catch (Exception e) {
-                logger.error(e);
-            }
-        }
-    }
-    
+
+	/*
+	 * Wrap a single exception with the name of the scheduled job
+	 */
+	private void sendErrorEmail(FlowRunner runner) {
+		ExecutableFlow flow = runner.getFlow();
+		List<String> emailList = flow.getFailureEmails();
+		if (emailList != null && !emailList.isEmpty() && mailer != null) {
+
+			try {
+
+				String subject = "Flow '" + flow.getFlowId() + "' has completed on "
+						+ InetAddress.getLocalHost().getHostName() + "!";
+				String body = "The Flow '" + flow.getFlowId() + "' failed. \n See logs below: \n";
+				for (String URL : getLogURLs(runner)) {
+					body += (URL + "\n");
+				}
+
+				mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+			} catch (UnknownHostException uhe) {
+				logger.error(uhe);
+			} catch (Exception e) {
+				logger.error(e);
+			}
+		}
+	}
+
+	private void sendSuccessEmail(FlowRunner runner) {
+
+		ExecutableFlow flow = runner.getFlow();
+
+		List<String> emailList = flow.getSuccessEmails();
+
+		if (emailList != null && !emailList.isEmpty() && mailer != null) {
+			try {
+
+				String subject = "Flow '" + flow.getFlowId() + "' has completed on "
+						+ InetAddress.getLocalHost().getHostName() + "!";
+				String body = "The Flow '" + flow.getFlowId() + "' succeeded. \n See logs below: \n";
+				for (String URL : getLogURLs(runner)) {
+					body += (URL + "\n");
+				}
+
+				mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+			} catch (UnknownHostException uhe) {
+				logger.error(uhe);
+			} catch (Exception e) {
+				logger.error(e);
+			}
+		}
+	}
+
 }
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 6af4d3d..a1bdef0 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -22,8 +22,7 @@ import azkaban.jobExecutor.utils.JobWrappingFactory;
 import azkaban.utils.Props;
 
 public class JobRunner extends EventHandler implements Runnable {
-	private static final Layout DEFAULT_LAYOUT = new PatternLayout(
-			"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
 
 	private Props props;
 	private Props outputProps;
@@ -120,6 +119,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			job.run();
 		} catch (Throwable e) {
 			succeeded = false;
+			node.setStatus(Status.FAILED);
 			logError("Job run failed!");
 			e.printStackTrace();
 		}
@@ -132,7 +132,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			}
 			this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
 		} else {
-			node.setStatus(Status.FAILED);
+			System.out.println("Setting FAILED to " + node.getId());
 			this.fireEventListeners(Event.create(this, Type.JOB_FAILED));
 		}
 		logInfo("Finishing job " + node.getId() + " at " + node.getEndTime());
@@ -150,14 +150,8 @@ public class JobRunner extends EventHandler implements Runnable {
 		try {
 			job.cancel();
 		} catch (Exception e) {
-			logError("Failed trying to cancel job!");
-			e.printStackTrace();
-		}
-
-		// will just interrupt, I guess, until the code is finished.
-		this.notifyAll();
-		if (node.getStatus() != Status.FAILED) {
-			node.setStatus(Status.KILLED);
+			logError(e.getMessage());
+			logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
 		}
 	}
 
diff --git a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index f22923d..a4ac15d 100644
--- a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -44,236 +44,230 @@ import com.google.common.base.Joiner;
  */
 public class AzkabanProcess {
 
-    private final String workingDir;
-    private final List<String> cmd;
-    private final Map<String, String> env;
-    private final Logger logger;
-    private final CountDownLatch startupLatch;
-    private final CountDownLatch completeLatch;
-    private volatile int processId;
-    private volatile Process process;
-
-    public AzkabanProcess(final List<String> cmd,
-            final Map<String, String> env, final String workingDir,
-            final Logger logger) {
-        this.cmd = cmd;
-        this.env = env;
-        this.workingDir = workingDir;
-        this.processId = -1;
-        this.startupLatch = new CountDownLatch(1);
-        this.completeLatch = new CountDownLatch(1);
-        this.logger = logger;
-    }
-
-    /**
-     * Execute this process, blocking until it has completed.
-     */
-    public void run() throws IOException {
-        if (this.isStarted() || this.isComplete()) {
-            throw new IllegalStateException(
-                    "The process can only be used once.");
-        }
-
-        ProcessBuilder builder = new ProcessBuilder(cmd);
-        builder.directory(new File(workingDir));
-        builder.environment().putAll(env);
-        this.process = builder.start();
-        this.processId = processId(process);
-        if (processId == 0) {
-            logger.debug("Spawned thread with unknown process id");
-        } else {
-            logger.debug("Spawned thread with process id " + processId);
-        }
-
-        this.startupLatch.countDown();
-
-        LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(
-                process.getInputStream()), logger, Level.INFO, 30);
-        LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(
-                process.getErrorStream()), logger, Level.ERROR, 30);
-
-        outputGobbler.start();
-        errorGobbler.start();
-        int exitCode = -1;
-        try {
-            exitCode = process.waitFor();
-        } catch (InterruptedException e) {
-            logger.info("Process interrupted.", e);
-        }
-
-        completeLatch.countDown();
-        if (exitCode != 0) {
-            throw new ProcessFailureException(exitCode,
-                    errorGobbler.getRecentLog());
-        }
-
-        // try to wait for everything to get logged out before exiting
-        outputGobbler.awaitCompletion(5000);
-        errorGobbler.awaitCompletion(5000);
-    }
-
-    /**
-     * Await the completion of this process
-     * 
-     * @throws InterruptedException
-     *             if the thread is interrupted while waiting.
-     */
-    public void awaitCompletion() throws InterruptedException {
-        this.completeLatch.await();
-    }
-
-    /**
-     * Await the start of this process
-     * 
-     * @throws InterruptedException
-     *             if the thread is interrupted while waiting.
-     */
-    public void awaitStartup() throws InterruptedException {
-        this.startupLatch.await();
-    }
-
-    /**
-     * Get the process id for this process, if it has started.
-     * 
-     * @return The process id or -1 if it cannot be fetched
-     */
-    public int getProcessId() {
-        checkStarted();
-        return this.processId;
-    }
-
-    /**
-     * Attempt to kill the process, waiting up to the given time for it to die
-     * 
-     * @param time
-     *            The amount of time to wait
-     * @param unit
-     *            The time unit
-     * @return true iff this soft kill kills the process in the given wait time.
-     */
-    public boolean softKill(final long time, final TimeUnit unit)
-            throws InterruptedException {
-        checkStarted();
-        if (processId != 0 && isStarted()) {
-            try {
-                Runtime.getRuntime().exec("kill " + processId);
-                return completeLatch.await(time, unit);
-            } catch (IOException e) {
-                logger.error("Kill attempt failed.", e);
-            }
-            return false;
-        }
-        return false;
-    }
-
-    /**
-     * Force kill this process
-     */
-    public void hardKill() {
-        checkStarted();
-        if (isRunning()) {
-            process.destroy();
-        }
-    }
-
-    /**
-     * Attempt to get the process id for this process
-     * 
-     * @param process
-     *            The process to get the id from
-     * @return The id of the process
-     */
-    private int processId(final java.lang.Process process) {
-        int processId = 0;
-        try {
-            Field f = process.getClass().getDeclaredField("pid");
-            f.setAccessible(true);
-
-            processId = f.getInt(process);
-        } catch (Throwable e) {
-            e.printStackTrace();
-        }
-
-        return processId;
-    }
-
-    /**
-     * @return true iff the process has been started
-     */
-    public boolean isStarted() {
-        return startupLatch.getCount() == 0L;
-    }
-
-    /**
-     * @return true iff the process has completed
-     */
-    public boolean isComplete() {
-        return completeLatch.getCount() == 0L;
-    }
-
-    /**
-     * @return true iff the process is currently running
-     */
-    public boolean isRunning() {
-        return isStarted() && !isComplete();
-    }
-
-    public void checkStarted() {
-        if (!isStarted()) {
-            throw new IllegalStateException("Process has not yet started.");
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env
-                + ", cwd = " + workingDir + ")";
-    }
-
-    private static class LogGobbler extends Thread {
-
-        private final BufferedReader inputReader;
-        private final Logger logger;
-        private final Level loggingLevel;
-        private final CircularBuffer<String> buffer;
-
-        public LogGobbler(final Reader inputReader, final Logger logger,
-                final Level level, final int bufferLines) {
-            this.inputReader = new BufferedReader(inputReader);
-            this.logger = logger;
-            this.loggingLevel = level;
-            buffer = new CircularBuffer<String>(bufferLines);
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (!Thread.currentThread().isInterrupted()) {
-                    String line = inputReader.readLine();
-                    if (line == null) {
-                        return;
-                    }
-
-                    buffer.append(line);
-                    logger.log(loggingLevel, line);
-                }
-            } catch (IOException e) {
-                logger.error("Error reading from logging stream:", e);
-            }
-        }
-
-        public void awaitCompletion(final long waitMs) {
-            try {
-                join(waitMs);
-            } catch (InterruptedException e) {
-                logger.info("I/O thread interrupted.", e);
-            }
-        }
-
-        public String getRecentLog() {
-            return Joiner.on(System.getProperty("line.separator")).join(buffer);
-        }
-
-    }
+	private final String workingDir;
+	private final List<String> cmd;
+	private final Map<String, String> env;
+	private final Logger logger;
+	private final CountDownLatch startupLatch;
+	private final CountDownLatch completeLatch;
+	private volatile int processId;
+	private volatile Process process;
+
+	public AzkabanProcess(final List<String> cmd, final Map<String, String> env, final String workingDir,
+			final Logger logger) {
+		this.cmd = cmd;
+		this.env = env;
+		this.workingDir = workingDir;
+		this.processId = -1;
+		this.startupLatch = new CountDownLatch(1);
+		this.completeLatch = new CountDownLatch(1);
+		this.logger = logger;
+	}
+
+	/**
+	 * Execute this process, blocking until it has completed.
+	 */
+	public void run() throws IOException {
+		if (this.isStarted() || this.isComplete()) {
+			throw new IllegalStateException("The process can only be used once.");
+		}
+
+		ProcessBuilder builder = new ProcessBuilder(cmd);
+		builder.directory(new File(workingDir));
+		builder.environment().putAll(env);
+		this.process = builder.start();
+		this.processId = processId(process);
+		if (processId == 0) {
+			logger.debug("Spawned thread with unknown process id");
+		} else {
+			logger.debug("Spawned thread with process id " + processId);
+		}
+
+		this.startupLatch.countDown();
+
+		LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(process.getInputStream()), logger, Level.INFO,
+				30);
+		LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process.getErrorStream()), logger, Level.ERROR,
+				30);
+
+		outputGobbler.start();
+		errorGobbler.start();
+		int exitCode = -1;
+		try {
+			exitCode = process.waitFor();
+		} catch (InterruptedException e) {
+			logger.info("Process interrupted. Exit code is " + exitCode, e);
+		}
+
+		completeLatch.countDown();
+		if (exitCode != 0) {
+			throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
+		}
+
+		// try to wait for everything to get logged out before exiting
+		outputGobbler.awaitCompletion(5000);
+		errorGobbler.awaitCompletion(5000);
+	}
+
+	/**
+	 * Await the completion of this process
+	 * 
+	 * @throws InterruptedException
+	 *             if the thread is interrupted while waiting.
+	 */
+	public void awaitCompletion() throws InterruptedException {
+		this.completeLatch.await();
+	}
+
+	/**
+	 * Await the start of this process
+	 * 
+	 * @throws InterruptedException
+	 *             if the thread is interrupted while waiting.
+	 */
+	public void awaitStartup() throws InterruptedException {
+		this.startupLatch.await();
+	}
+
+	/**
+	 * Get the process id for this process, if it has started.
+	 * 
+	 * @return The process id or -1 if it cannot be fetched
+	 */
+	public int getProcessId() {
+		checkStarted();
+		return this.processId;
+	}
+
+	/**
+	 * Attempt to kill the process, waiting up to the given time for it to die
+	 * 
+	 * @param time
+	 *            The amount of time to wait
+	 * @param unit
+	 *            The time unit
+	 * @return true iff this soft kill kills the process in the given wait time.
+	 */
+	public boolean softKill(final long time, final TimeUnit unit) throws InterruptedException {
+		checkStarted();
+		if (processId != 0 && isStarted()) {
+			try {
+				Runtime.getRuntime().exec("kill " + processId);
+				return completeLatch.await(time, unit);
+			} catch (IOException e) {
+				logger.error("Kill attempt failed.", e);
+			}
+			return false;
+		}
+		return false;
+	}
+
+	/**
+	 * Force kill this process
+	 */
+	public void hardKill() {
+		checkStarted();
+		if (isRunning()) {
+			process.destroy();
+		}
+	}
+
+	/**
+	 * Attempt to get the process id for this process
+	 * 
+	 * @param process
+	 *            The process to get the id from
+	 * @return The id of the process
+	 */
+	private int processId(final java.lang.Process process) {
+		int processId = 0;
+		try {
+			Field f = process.getClass().getDeclaredField("pid");
+			f.setAccessible(true);
+
+			processId = f.getInt(process);
+		} catch (Throwable e) {
+			e.printStackTrace();
+		}
+
+		return processId;
+	}
+
+	/**
+	 * @return true iff the process has been started
+	 */
+	public boolean isStarted() {
+		return startupLatch.getCount() == 0L;
+	}
+
+	/**
+	 * @return true iff the process has completed
+	 */
+	public boolean isComplete() {
+		return completeLatch.getCount() == 0L;
+	}
+
+	/**
+	 * @return true iff the process is currently running
+	 */
+	public boolean isRunning() {
+		return isStarted() && !isComplete();
+	}
+
+	public void checkStarted() {
+		if (!isStarted()) {
+			throw new IllegalStateException("Process has not yet started.");
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env + ", cwd = " + workingDir + ")";
+	}
+
+	private static class LogGobbler extends Thread {
+
+		private final BufferedReader inputReader;
+		private final Logger logger;
+		private final Level loggingLevel;
+		private final CircularBuffer<String> buffer;
+
+		public LogGobbler(final Reader inputReader, final Logger logger, final Level level, final int bufferLines) {
+			this.inputReader = new BufferedReader(inputReader);
+			this.logger = logger;
+			this.loggingLevel = level;
+			buffer = new CircularBuffer<String>(bufferLines);
+		}
+
+		@Override
+		public void run() {
+			try {
+				while (!Thread.currentThread().isInterrupted()) {
+					String line = inputReader.readLine();
+					if (line == null) {
+						return;
+					}
+
+					buffer.append(line);
+					logger.log(loggingLevel, line);
+				}
+			} catch (IOException e) {
+				logger.error("Error reading from logging stream:", e);
+			}
+		}
+
+		public void awaitCompletion(final long waitMs) {
+			try {
+				join(waitMs);
+			} catch (InterruptedException e) {
+				logger.info("I/O thread interrupted.", e);
+			}
+		}
+
+		public String getRecentLog() {
+			return Joiner.on(System.getProperty("line.separator")).join(buffer);
+		}
+
+	}
 
 }
diff --git a/unit/executions/exectest1/job4.job b/unit/executions/exectest1/job4.job
index 1cbac6f..1eccb73 100644
--- a/unit/executions/exectest1/job4.job
+++ b/unit/executions/exectest1/job4.job
@@ -1,5 +1,5 @@
 type=java
 job.class=azkaban.test.executor.SleepJavaJob
 dependencies=job2
-seconds=4
+seconds=8
 fail=false
diff --git a/unit/java/azkaban/test/executor/FlowRunnerTest.java b/unit/java/azkaban/test/executor/FlowRunnerTest.java
index ab0970f..7bdc413 100644
--- a/unit/java/azkaban/test/executor/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/executor/FlowRunnerTest.java
@@ -195,6 +195,62 @@ public class FlowRunnerTest {
 		}
 	}
 	
+	@Test
+	public void execAndCancel() throws Exception {
+		File testDir = new File("unit/executions/exectest1");
+		ExecutableFlow exFlow = prepareExecDir(testDir, "exec1");
+		
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = new FlowRunner(exFlow);
+		runner.addListener(eventCollector);
+		
+		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(exFlow.getStatus() == Status.READY);
+		Thread thread = new Thread(runner);
+		thread.start();
+		
+		synchronized(this) {
+			try {
+				wait(4500);
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			runner.cancel("me");
+		}
+		synchronized(this) {
+			// Wait for cleanup.
+			try {
+				wait(1000);
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+
+		testStatus(exFlow, "job5", Status.KILLED);
+		testStatus(exFlow, "job7", Status.KILLED);
+		testStatus(exFlow, "job8", Status.KILLED);
+		testStatus(exFlow, "job9", Status.KILLED);
+		testStatus(exFlow, "job10", Status.KILLED);
+		testStatus(exFlow, "job1", Status.SUCCEEDED);
+		testStatus(exFlow, "job2", Status.SUCCEEDED);
+		testStatus(exFlow, "job3", Status.FAILED);
+		testStatus(exFlow, "job4", Status.FAILED);
+		testStatus(exFlow, "job6", Status.FAILED);
+		
+		Assert.assertTrue("Expected KILLED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.KILLED);
+		
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
+		}
+		catch (Exception e) {
+			System.out.println(e.getMessage());
+			eventCollector.writeAllEvents();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
 	private void testStatus(ExecutableFlow flow, String name, Status status) {
 		ExecutableNode node = flow.getExecutableNode(name);
 		
diff --git a/unit/java/azkaban/test/executor/JobRunnerTest.java b/unit/java/azkaban/test/executor/JobRunnerTest.java
index 7804d21..cb2d432 100644
--- a/unit/java/azkaban/test/executor/JobRunnerTest.java
+++ b/unit/java/azkaban/test/executor/JobRunnerTest.java
@@ -200,12 +200,12 @@ public class JobRunnerTest {
 		
 		eventCollector.handleEvent(Event.create(null, Type.JOB_STARTED));
 		Thread thread = new Thread(runner);
-		thread.run();
+		thread.start();
 		
 		eventCollector.handleEvent(Event.create(null, Type.JOB_KILLED));
 		synchronized(this) {
 			try {
-				wait(1000);
+				wait(2000);
 			} catch (InterruptedException e) {
 				// TODO Auto-generated catch block
 				e.printStackTrace();
@@ -214,7 +214,7 @@ public class JobRunnerTest {
 		}
 		
 		Assert.assertTrue(runner.getStatus() == node.getStatus());
-		Assert.assertTrue(node.getStatus() == Status.KILLED);
+		Assert.assertTrue(node.getStatus() == Status.FAILED);
 		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
 		// Give it 10 ms to fail.
 		Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);