azkaban-aplcache

Details

diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index b7a47f6..77806bf 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -342,30 +342,33 @@ public class ExecutorManager {
 		runningFlows.put(flow.getExecutionId(), flow);
 	}
 	
-	public void cancelFlow(ExecutableFlow flow) throws ExecutorManagerException {
+	public void cancelFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+		logger.info("Calling cancel");
 		String response = null;
 		try {
-			response = callExecutionServer("cancel", flow);
+			response = callExecutionServer("cancel", flow, user);
 		} catch (IOException e) {
 			e.printStackTrace();
 			throw new ExecutorManagerException("Error cancelling flow.", e);
 		}
 	}
 	
-	public void pauseFlow(ExecutableFlow flow) throws ExecutorManagerException {
+	public void pauseFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+		logger.info("Calling pause");
 		String response = null;
 		try {
-			response = callExecutionServer("pause", flow);
+			response = callExecutionServer("pause", flow, user);
 		} catch (IOException e) {
 			e.printStackTrace();
 			throw new ExecutorManagerException("Error cancelling flow.", e);
 		}
 	}
 	
-	public void resumeFlow(ExecutableFlow flow) throws ExecutorManagerException {
+	public void resumeFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+		logger.info("Calling resume");
 		String response = null;
 		try {
-			response = callExecutionServer("resume", flow);
+			response = callExecutionServer("resume", flow, user);
 		} catch (IOException e) {
 			e.printStackTrace();
 			throw new ExecutorManagerException("Error cancelling flow.", e);
@@ -374,16 +377,24 @@ public class ExecutorManager {
 	}
 	
 	private String callExecutionServer(String action, ExecutableFlow flow) throws IOException{
+		return callExecutionServer(action, flow, null);
+	}
+	
+	private String callExecutionServer(String action, ExecutableFlow flow, String user) throws IOException{
 		URIBuilder builder = new URIBuilder();
 		builder.setScheme("http")
 			.setHost(url)
 			.setPort(portNumber)
 			.setPath("/executor")
 			.setParameter("sharedToken", token)
-			.setParameter("action", "resume")
+			.setParameter("action", action)
 			.setParameter("execid", flow.getExecutionId())
 			.setParameter("execpath", flow.getExecutionPath());
 		
+		if (user != null) {
+			builder.setParameter("user", user);
+		}
+		
 		URI uri = null;
 		try {
 			uri = builder.build();
@@ -393,7 +404,7 @@ public class ExecutorManager {
 		
 		ResponseHandler<String> responseHandler = new BasicResponseHandler();
 		
-		logger.info("Submitting flow " + flow.getExecutionId() + " for execution.");
+		logger.info("Remotely querying " + flow.getExecutionId() + " for status.");
 		HttpClient httpclient = new DefaultHttpClient();
 		HttpGet httpget = new HttpGet(uri);
 		String response = null;
@@ -590,7 +601,7 @@ public class ExecutorManager {
 		// Then we're taking a substring of length - 6 to lop off the bottom 5 digits effectively partitioning
 		// by 100000 millisec. We do this to have quicker searchs by pulling partitions, not full directories.
 		int index = execID.indexOf('.');
-		return execID.substring(0, index - 5);
+		return execID.substring(0, index - 6);
 	}
 	
 	private void cleanFinishedJob(ExecutableFlow exFlow) throws ExecutorManagerException {
@@ -690,11 +701,14 @@ public class ExecutorManager {
 						}
 						continue;
 					}
+					catch (Exception e) {
+						e.printStackTrace();
+					}
 					
 					Object executorResponseObj;
 					try {
 						executorResponseObj = JSONUtils.parseJSONFromString(responseString);
-					} catch (IOException e) {
+					} catch (Exception e) {
 						// TODO Auto-generated catch block
 						e.printStackTrace();
 						continue;
@@ -705,7 +719,7 @@ public class ExecutorManager {
 					String status = (String)response.get("status");
 					
 					try {
-						ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow);
+						ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
 					} catch (ExecutorManagerException e) {
 						// TODO Auto-generated catch block
 						e.printStackTrace();
@@ -718,7 +732,7 @@ public class ExecutorManager {
 							// Cleanup
 							logger.info("Flow " + exFlow.getExecutionId() + " has succeeded. Cleaning Up.");
 							try {
-								ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow);
+								ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
 								cleanFinishedJob(exFlow);						
 							} catch (ExecutorManagerException e) {
 								e.printStackTrace();
@@ -826,8 +840,8 @@ public class ExecutorManager {
 			reference.flowId = (String)obj.get("flowId");
 			reference.userId = (String)obj.get("userId");
 			reference.execPath = (String)obj.get("execPath");
-			reference.startTime = (Long)obj.get("startTime");
-			reference.endTime = (Long)obj.get("endTime");
+			reference.startTime = getLongFromObject(obj.get("startTime"));
+			reference.endTime = getLongFromObject(obj.get("endTime"));
 			reference.status = Status.valueOf((String)obj.get("status"));
 			return reference;
 		}
@@ -881,4 +895,12 @@ public class ExecutorManager {
 			this.status = status;
 		}
 	}
+	
+	private static long getLongFromObject(Object obj) {
+		if (obj instanceof Integer) {
+			return Long.valueOf((Integer)obj);
+		}
+		
+		return (Long)obj;
+	}
 }
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index f450b80..d0c0dc6 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -33,17 +33,19 @@ import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.Props;
 
 public class FlowRunner extends EventHandler implements Runnable {
-	private static final Layout DEFAULT_LAYOUT = new PatternLayout( "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+	private static final Layout DEFAULT_LAYOUT = new PatternLayout(
+			"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
 
 	public static final int NUM_CONCURRENT_THREADS = 10;
 
 	private ExecutableFlow flow;
 	private ExecutorService executorService;
 	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
-	private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
+	private List<JobRunner> pausedJobsToRun = Collections
+			.synchronizedList(new ArrayList<JobRunner>());
 	private int numThreads = NUM_CONCURRENT_THREADS;
-	private boolean cancelled = true;
-	private boolean paused = true;
+	private boolean cancelled = false;
+	private boolean paused = false;
 
 	private Map<String, JobRunner> runningJobs;
 	private JobRunnerEventListener listener;
@@ -55,33 +57,33 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private Logger logger;
 	private Layout loggerLayout = DEFAULT_LAYOUT;
 	private Appender flowAppender;
-	
+
 	private Thread currentThread;
-	
+
 	public enum FailedFlowOptions {
-		FINISH_RUNNING_JOBS,
-		KILL_ALL
+		FINISH_RUNNING_JOBS, KILL_ALL
 	}
-	
+
 	private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
-	
+
 	public FlowRunner(ExecutableFlow flow) {
 		this.flow = flow;
 		this.basePath = new File(flow.getExecutionPath());
 		this.executorService = Executors.newFixedThreadPool(numThreads);
 		this.runningJobs = new ConcurrentHashMap<String, JobRunner>();
 		this.listener = new JobRunnerEventListener(this);
-		
+
 		createLogger();
 	}
-	
+
 	public ExecutableFlow getFlow() {
 		return flow;
 	}
-	
+
 	private void createLogger() {
 		// Create logger
-		String loggerName = System.currentTimeMillis() + "." + flow.getExecutionId();
+		String loggerName = System.currentTimeMillis() + "."
+				+ flow.getExecutionId();
 		logger = Logger.getLogger(loggerName);
 
 		// Create file appender
@@ -97,74 +99,97 @@ public class FlowRunner extends EventHandler implements Runnable {
 			logger.error("Could not open log file in " + basePath, e);
 		}
 	}
-	
+
 	private void closeLogger() {
 		logger.removeAppender(flowAppender);
 		flowAppender.close();
 	}
-	
-	public void cancel() {
-		logger.info("Cancel Invoked");
+
+	public synchronized void cancel(String user) {
+		logger.info("Cancel called by " + user);
 		cancelled = true;
-		
+
 		executorService.shutdownNow();
-		
+
+		if (pausedJobsToRun.size() > 0) {
+			logger.info("Cancelling... Clearing paused jobs queue of size "
+					+ pausedJobsToRun.size());
+			pausedJobsToRun.clear();
+		}
+
 		// Loop through job runners
-		for (JobRunner runner: runningJobs.values()) {
-			if (runner.getStatus() == Status.WAITING || runner.getStatus() == Status.RUNNING) {
+		for (JobRunner runner : runningJobs.values()) {
+			if (runner.getStatus() == Status.WAITING
+					|| runner.getStatus() == Status.RUNNING
+					|| runner.getStatus() == Status.PAUSED) {
+				logger.info("Cancelling... Killing job "
+						+ runner.getNode().getId() + " with status "
+						+ runner.getStatus());
 				runner.cancel();
 			}
 		}
 
-		flow.setStatus(Status.KILLED);
+		logger.info("Flow cancelled.");
+		if (flow.getStatus() != Status.FAILED) {
+			flow.setStatus(Status.KILLED);
+		}
+		flow.setEndTime(System.currentTimeMillis());
+		this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
 	}
-	
-	public synchronized void pause() {
-		logger.info("Pause flow");
-		if (flow.getStatus() == Status.RUNNING || flow.getStatus() == Status.WAITING) {
+
+	public synchronized void pause(String user) {
+		if (flow.getStatus() == Status.RUNNING
+				|| flow.getStatus() == Status.WAITING) {
+			logger.info("Flow paused by " + user);
 			paused = true;
 			flow.setStatus(Status.PAUSED);
 		}
 	}
-	
-	public synchronized void resume() {
-		logger.info("Resume flow");
+
+	public synchronized void resume(String user) {
+		if (isCancelled()) {
+			logger.info("Cannot resume cancelled flow.");
+			return;
+		}
+
 		if (flow.getStatus() == Status.PAUSED) {
-			flow.setStatus(Status.RUNNING);
+			paused = false;
+			logger.info("Flow resumed by " + user);
 			jobsToRun.addAll(pausedJobsToRun);
+			flow.setStatus(Status.RUNNING);
 		}
 	}
-	
+
 	public boolean isCancelled() {
 		return cancelled;
 	}
-	
+
 	private synchronized void commitFlow() {
 		int count = commitCount.getAndIncrement();
 
 		try {
-			ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow, count);
+			ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow,
+					count);
 		} catch (ExecutorManagerException e) {
 			// TODO Auto-generated catch block
 			e.printStackTrace();
 		}
 	}
-	
+
 	@Override
 	public void run() {
 		currentThread = Thread.currentThread();
-		
+
 		flow.setStatus(Status.RUNNING);
 		flow.setStartTime(System.currentTimeMillis());
 		logger.info("Starting Flow");
 		this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
-		
+
 		// Load all shared props
 		try {
 			logger.info("Loading all shared properties");
 			loadAllProperties(flow);
-		}
-		catch (IOException e) {
+		} catch (IOException e) {
 			flow.setStatus(Status.FAILED);
 			logger.error("Property loading failed due to " + e.getMessage());
 			logger.error("Exiting Prematurely.");
@@ -175,14 +200,15 @@ public class FlowRunner extends EventHandler implements Runnable {
 		// Set up starting nodes
 		try {
 			logger.info("Queuing starting jobs.");
-			for (String startNode: flow.getStartNodes()) {
+			for (String startNode : flow.getStartNodes()) {
 				ExecutableNode node = flow.getExecutableNode(startNode);
 				JobRunner jobRunner = createJobRunner(node, null);
 				jobsToRun.add(jobRunner);
 				runningJobs.put(startNode, jobRunner);
 			}
 		} catch (IOException e) {
-			logger.error("Starting job queueing failed due to " + e.getMessage());
+			logger.error("Starting job queueing failed due to "
+					+ e.getMessage());
 			flow.setStatus(Status.FAILED);
 			jobsToRun.clear();
 			runningJobs.clear();
@@ -190,9 +216,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 			this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
 			return;
 		}
-		
+
 		// Main loop
-		while(!runningJobs.isEmpty()) {
+		while (!runningJobs.isEmpty()) {
 			JobRunner runner = null;
 			try {
 				runner = jobsToRun.poll(5, TimeUnit.MINUTES);
@@ -200,12 +226,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 				logger.info("FlowRunner thread has been interrupted.");
 				if (runningJobs.isEmpty()) {
 					break;
-				}
-				else {
+				} else {
 					continue;
 				}
 			}
-			
+
 			if (runner != null) {
 				try {
 					ExecutableNode node = runner.getNode();
@@ -216,21 +241,21 @@ public class FlowRunner extends EventHandler implements Runnable {
 					// Should reject if I shutdown executor.
 					break;
 				}
-				
+
 				// Just to make sure we back off so we don't flood.
 				synchronized (this) {
 					try {
 						wait(5);
 					} catch (InterruptedException e) {
-						
+
 					}
 				}
 			}
 		}
-		
+
 		logger.info("Finishing up flow. Awaiting Termination");
 		executorService.shutdown();
-		
+
 		while (executorService.isTerminated()) {
 			try {
 				executorService.awaitTermination(1, TimeUnit.SECONDS);
@@ -242,15 +267,16 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 		flow.setEndTime(System.currentTimeMillis());
 		if (flow.getStatus() == Status.RUNNING) {
-			logger.info("Flow finished successfully in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+			logger.info("Flow finished successfully in "
+					+ (flow.getEndTime() - flow.getStartTime()) + " ms.");
 			flow.setStatus(Status.SUCCEEDED);
-		}
-		else if (flow.getStatus() == Status.KILLED) {
-			logger.info("Flow was killed in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+		} else if (flow.getStatus() == Status.KILLED) {
+			logger.info("Flow was killed in "
+					+ (flow.getEndTime() - flow.getStartTime()) + " ms.");
 			flow.setStatus(Status.KILLED);
-		}
-		else {
-			logger.info("Flow finished with failures in " + (flow.getEndTime() - flow.getStartTime()) + " ms.");
+		} else {
+			logger.info("Flow finished with failures in "
+					+ (flow.getEndTime() - flow.getStartTime()) + " ms.");
 			flow.setStatus(Status.FAILED);
 		}
 
@@ -258,102 +284,114 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
 		closeLogger();
 	}
-	
-	private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) throws IOException {
+
+	private JobRunner createJobRunner(ExecutableNode node, Props previousOutput)
+			throws IOException {
 		String source = node.getJobPropsSource();
 		String propsSource = node.getPropsSource();
 
-		Props parentProps = propsSource == null ? null : sharedProps.get(propsSource);
-		
+		Props parentProps = propsSource == null ? null : sharedProps
+				.get(propsSource);
+
 		// We add the previous job output and put into this props.
 		if (previousOutput != null) {
 			Props earliestParent = previousOutput.getEarliestAncestor();
 			earliestParent.setParent(parentProps);
-			
+
 			parentProps = earliestParent;
 		}
-		
+
 		File propsFile = new File(basePath, source);
 		Props jobProps = new Props(parentProps, propsFile);
-		
+
 		JobRunner jobRunner = new JobRunner(node, jobProps, basePath);
 		jobRunner.addListener(listener);
-		
+
 		return jobRunner;
 	}
-	
+
 	private void loadAllProperties(ExecutableFlow flow) throws IOException {
 		// First load all the properties
-		for (FlowProps fprops: flow.getFlowProps()) {
+		for (FlowProps fprops : flow.getFlowProps()) {
 			String source = fprops.getSource();
 			File propsFile = new File(basePath, source);
-			
+
 			Props props = new Props(null, propsFile);
 			sharedProps.put(source, props);
 		}
 
 		// Resolve parents
-		for (FlowProps fprops: flow.getFlowProps()) {
+		for (FlowProps fprops : flow.getFlowProps()) {
 			if (fprops.getInheritedSource() != null) {
 				String source = fprops.getSource();
 				String inherit = fprops.getInheritedSource();
-				
+
 				Props props = sharedProps.get(source);
 				Props inherits = sharedProps.get(inherit);
-				
+
 				props.setParent(inherits);
 			}
 		}
 	}
-	
+
 	private void interrupt() {
 		currentThread.interrupt();
 	}
-	
+
 	private void handleSucceededJob(ExecutableNode node) {
-		for(String dependent: node.getOutNodes()) {
+		if (this.isCancelled()) {
+			return;
+		}
+
+		for (String dependent : node.getOutNodes()) {
 			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-			
+
 			// Check all dependencies
 			boolean ready = true;
-			for (String dependency: dependentNode.getInNodes()) {
-				ExecutableNode dependencyNode = flow.getExecutableNode(dependency); 
-				if (dependencyNode.getStatus() != Status.SUCCEEDED &&
-					dependencyNode.getStatus() != Status.DISABLED) {
+			for (String dependency : dependentNode.getInNodes()) {
+				ExecutableNode dependencyNode = flow
+						.getExecutableNode(dependency);
+				if (dependencyNode.getStatus() != Status.SUCCEEDED
+						&& dependencyNode.getStatus() != Status.DISABLED) {
 					ready = false;
 					break;
 				}
 			}
-			
+
 			if (ready) {
 				Props previousOutput = null;
 				// Iterate the in nodes again and create the dependencies
-				for (String dependency: node.getInNodes()) {
+				for (String dependency : node.getInNodes()) {
 					Props output = outputProps.get(dependency);
 					if (output != null) {
 						output = Props.clone(output);
-						
+
 						output.setParent(previousOutput);
 						previousOutput = output;
 					}
 				}
-				
+
 				JobRunner runner = null;
 				try {
-					runner = this.createJobRunner(dependentNode, previousOutput);
+					runner = this
+							.createJobRunner(dependentNode, previousOutput);
 				} catch (IOException e) {
-					logger.error("JobRunner creation failed due to " + e.getMessage());
+					logger.error("JobRunner creation failed due to "
+							+ e.getMessage());
 					dependentNode.setStatus(Status.FAILED);
 					handleFailedJob(dependentNode);
 					return;
 				}
-			
+
 				runningJobs.put(dependentNode.getId(), runner);
 				if (paused) {
 					dependentNode.setStatus(Status.PAUSED);
 					pausedJobsToRun.add(runner);
-				}
-				else {
+					logger.info("Flow is paused so adding "
+							+ dependentNode.getId() + " to paused list.");
+				} else {
+					logger.info("Flow is not paused so adding "
+							+ dependentNode.getId() + " to paused list.");
 					jobsToRun.add(runner);
 				}
 			}
@@ -361,53 +399,56 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 		runningJobs.remove(node.getId());
 	}
-	
+
 	private void handleFailedJob(ExecutableNode node) {
 		System.err.println("Job " + node.getId() + " failed.");
 		this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
-		
+
 		switch (failedOptions) {
-			// We finish running current jobs and then fail. Do not accept new jobs.
-			case FINISH_RUNNING_JOBS:
-				runningJobs.clear();
-				executorService.shutdown();
+		// We finish running current jobs and then fail. Do not accept new jobs.
+		case FINISH_RUNNING_JOBS:
+			runningJobs.clear();
+			executorService.shutdown();
 			break;
-			// We kill all running jobs and fail immediately
-			case KILL_ALL:
-				this.cancel();
+		// We kill all running jobs and fail immediately
+		case KILL_ALL:
+			this.cancel("azkaban");
 			break;
 		}
-		
+
 		runningJobs.remove(node.getId());
 	}
-	
+
 	private class JobRunnerEventListener implements EventListener {
 		private FlowRunner flowRunner;
-		
+
 		public JobRunnerEventListener(FlowRunner flowRunner) {
 			this.flowRunner = flowRunner;
 		}
 
 		@Override
 		public synchronized void handleEvent(Event event) {
-			JobRunner runner = (JobRunner)event.getRunner();
+			JobRunner runner = (JobRunner) event.getRunner();
 			ExecutableNode node = runner.getNode();
 			String jobID = node.getId();
-			System.out.println("Event " + jobID + " " + event.getType().toString());
+			System.out.println("Event " + jobID + " "
+					+ event.getType().toString());
 
-			// On Job success, we add the output props and then set up the next run.
+			// On Job success, we add the output props and then set up the next
+			// run.
 			if (event.getType() == Type.JOB_SUCCEEDED) {
-				logger.info("Job Succeeded " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
+				logger.info("Job Succeeded " + jobID + " in "
+						+ (node.getEndTime() - node.getStartTime()) + " ms");
 				Props props = runner.getOutputProps();
 				outputProps.put(jobID, props);
 				flowRunner.handleSucceededJob(runner.getNode());
-			}
-			else if (event.getType() == Type.JOB_FAILED) {
-				logger.info("Job Failed " + jobID + " in " + (node.getEndTime() - node.getStartTime()) + " ms");
+			} else if (event.getType() == Type.JOB_FAILED) {
+				logger.info("Job Failed " + jobID + " in "
+						+ (node.getEndTime() - node.getStartTime()) + " ms");
 				logger.info(jobID + " FAILED");
 				flowRunner.handleFailedJob(runner.getNode());
 			}
-			
+
 			flowRunner.commitFlow();
 			if (runningJobs.isEmpty()) {
 				System.out.println("There are no more running jobs.");
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 509f64a..d334083 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -56,24 +56,24 @@ public class FlowRunnerManager {
 		executorService.submit(runner);
 	}
 	
-	public void cancelFlow(String id) throws ExecutorManagerException {
+	public void cancelFlow(String id, String user) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(id);
 		if (runner != null) {
-			runner.cancel();
+			runner.cancel(user);
 		}
 	}
 	
-	public void pauseFlow(String id) throws ExecutorManagerException {
+	public void pauseFlow(String id, String user) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(id);
 		if (runner != null) {
-			runner.pause();
+			runner.pause(user);
 		}
 	}
 	
-	public void resumeFlow(String id) throws ExecutorManagerException {
+	public void resumeFlow(String id, String user) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(id);
 		if (runner != null) {
-			runner.resume();
+			runner.resume(user);
 		}
 	}
 	
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 64bea74..ca0ddb6 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -91,7 +91,7 @@ public class JobRunner extends EventHandler implements Runnable {
 				wait(5000);
 			}
 			catch (InterruptedException e) {
-				
+				logger.info("Job cancelled.");
 			}
 		}
 		// Run Job
@@ -109,9 +109,11 @@ public class JobRunner extends EventHandler implements Runnable {
 		closeLogger();
 	}
 
-	public void cancel() {
+	public synchronized void cancel() {
 		// Cancel code here
-
+		// will just interrupt, I guess, until the code is finished.
+		this.notifyAll();
+		
 		node.setStatus(Status.KILLED);
 	}
 
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index 3130c55..8ff1785 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -23,7 +23,7 @@ public class ExecutableFlowLoader {
 	 * @throws ExecutorManagerException
 	 */
 	public static ExecutableFlow loadExecutableFlowFromDir(File exDir) throws ExecutorManagerException {
-		File flowFile = getLatestExecutableFlowDir(exDir);
+		File flowFile = getLatestExecutableFlowDir(exDir, false);
 		Object exFlowObj = getFlowObjectFromFile(flowFile);
 
 		int updateNumber = getFlowUpdateNumber(flowFile);
@@ -77,7 +77,7 @@ public class ExecutableFlowLoader {
 	 * @return
 	 * @throws ExecutorManagerException
 	 */
-	private static File getLatestExecutableFlowDir(File exDir) throws ExecutorManagerException {
+	private static File getLatestExecutableFlowDir(File exDir, boolean cleanOldUpdates) throws ExecutorManagerException {
 		String exFlowName = exDir.getName();
 		
 		String flowFileName = "_" + exFlowName + ".flow";
@@ -88,6 +88,17 @@ public class ExecutableFlowLoader {
 			logger.error("Execution flow " + exFlowName + " missing flow file.");
 			throw new ExecutorManagerException("Execution flow " + exFlowName + " missing flow file.");
 		}
+		
+		// Remove updates between first and last index.
+		if (cleanOldUpdates) {
+			if (exFlowFiles.length > 3) {
+				for (int i=1; i < exFlowFiles.length - 1; ++i) {
+					File file = exFlowFiles[i];
+					file.delete();
+				}
+			}
+		}
+		
 		File lastExFlow = exFlowFiles[exFlowFiles.length-1];
 		return lastExFlow;
 	}
@@ -100,14 +111,14 @@ public class ExecutableFlowLoader {
 	 * @return
 	 * @throws ExecutorManagerException
 	 */
-	public static boolean updateFlowStatusFromFile(File exDir, ExecutableFlow flow) throws ExecutorManagerException {
-		File file = getLatestExecutableFlowDir(exDir);
-		System.out.println("Loading from: " + file);
+	public static boolean updateFlowStatusFromFile(File exDir, ExecutableFlow flow, boolean cleanOldUpdates) throws ExecutorManagerException {
+		File file = getLatestExecutableFlowDir(exDir, cleanOldUpdates);
 		int number =  getFlowUpdateNumber(file);
 		if (flow.getUpdateNumber() >= number) {
 			return false;
 		}
 		
+		System.out.println("Loading from: " + file);
 		Object exFlowObj = getFlowObjectFromFile(file);
 		flow.updateExecutableFlowFromObject(exFlowObj);
 		flow.setUpdateNumber(number);
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index 54293d9..e850dd8 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -314,13 +314,19 @@ public class AzkabanExecutorServer {
 					handleAjaxFlowStatus(respMap, execid);
 				}
 				else if (action.equals("cancel")) {
-					
+					String user = getParam(req, "user");
+					logger.info("Cancel called.");
+					handleAjaxCancel(respMap, execid, user);
 				}
 				else if (action.equals("pause")) {
-					
+					String user = getParam(req, "user");
+					logger.info("Paused called.");
+					handleAjaxPause(respMap, execid, user);
 				}
 				else if (action.equals("resume")) {
-					
+					String user = getParam(req, "user");
+					logger.info("Resume called.");
+					handleAjaxResume(respMap, execid, user);
 				}
 			}
 
@@ -350,9 +356,30 @@ public class AzkabanExecutorServer {
 			}
 		}
 		
-		private void handleAjaxPause(Map<String, Object> respMap, String execid) throws ServletException {
+		private void handleAjaxPause(Map<String, Object> respMap, String execid, String user) throws ServletException {
+
 			try {
-				flowRunnerManager.submitFlow(execid, execpath);
+				flowRunnerManager.pauseFlow(execid, user);
+				respMap.put("status", "success");
+			} catch (ExecutorManagerException e) {
+				e.printStackTrace();
+				respMap.put("error", e.getMessage());
+			}
+		}
+		
+		private void handleAjaxResume(Map<String, Object> respMap, String execid, String user) throws ServletException {
+			try {
+				flowRunnerManager.resumeFlow(execid, user);
+				respMap.put("status", "success");
+			} catch (ExecutorManagerException e) {
+				e.printStackTrace();
+				respMap.put("error", e.getMessage());
+			}
+		}
+		
+		private void handleAjaxCancel(Map<String, Object> respMap, String execid, String user) throws ServletException {
+			try {
+				flowRunnerManager.cancelFlow(execid, user);
 				respMap.put("status", "success");
 			} catch (ExecutorManagerException e) {
 				e.printStackTrace();
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 16497b9..c8c5f5b 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -173,7 +173,12 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 		if (project == null) {
 			return;
 		}
-
+		
+		try {
+			executorManager.cancelFlow(exFlow, user.getUserId());
+		} catch (ExecutorManagerException e) {
+			ret.put("error", e.getMessage());
+		}
 	}
 
 	private void ajaxRestartFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
@@ -182,6 +187,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 
+		
 	}
 
 	private void ajaxPauseFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
@@ -190,6 +196,11 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 
+		try {
+			executorManager.pauseFlow(exFlow, user.getUserId());
+		} catch (ExecutorManagerException e) {
+			ret.put("error", e.getMessage());
+		}
 	}
 
 	private void ajaxResumeFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
@@ -198,6 +209,11 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 
+		try {
+			executorManager.resumeFlow(exFlow, user.getUserId());
+		} catch (ExecutorManagerException e) {
+			ret.put("resume", e.getMessage());
+		}
 	}
 	
 	private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 4492bca..5b31f73 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -2,6 +2,7 @@ package azkaban.webapp.servlet;
 
 import java.io.IOException;
 import java.io.Writer;
+import java.util.HashMap;
 import java.util.UUID;
 
 import javax.servlet.ServletException;
@@ -57,7 +58,14 @@ public abstract class LoginAbstractAzkabanServlet extends
 			logger.info("Found session " + session.getUser());
 			handleGet(req, resp, session);
 		} else {
-			handleLogin(req, resp);
+			if (hasParam(req, "ajax")) {
+				HashMap<String, String> retVal = new HashMap<String, String>();
+				retVal.put("error", "session");
+				this.writeJSON(resp, retVal);
+			}
+			else {
+				handleLogin(req, resp);
+			}
 		}
 	}
 
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index a49c36f..e81170d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -10,6 +10,7 @@
 		<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
 		<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
 		<script type="text/javascript" src="${context}/js/jquery.contextMenu.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.exflow.view.js"></script>
@@ -116,6 +117,21 @@
 				<tr><td class="first">Duration</td><td id="duration">-</td></tr>
 			</table>
 		</div>
+		
+		<div id="messageDialog" class="modal">
+			<h3 id="messageTitle">Error</h3>
+			<div class="messageDiv">
+				<p id="messageBox"></p>
+			</div>
+		</div>
+		
+		<div id="invalid-session" class="modal">
+			<h3>Invalid Session</h3>
+			<p>Session has expired. Please re-login.</p>
+			<div class="actions">
+				<a class="yes btn2" id="login-btn" href="#">Re-login</a>
+			</div>
+		</div>
 	</body>
 </html>
 
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 9f0ceca..83e720c 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -10,6 +10,7 @@
 		<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
 		<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
 		<script type="text/javascript" src="${context}/js/jquery.contextMenu.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.ajax.utils.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.layout.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.flow.view.js"></script>
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 3aeda0c..aea3d0b 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1250,6 +1250,10 @@ tr:hover td {
 	background-position: 0px 0px;
 }
 
+#list ul li.KILLED .icon {
+	background-position: 0px 0px;
+}
+
 #list ul li a {
 	font-size: 10pt;
 	margin-left: 5px;
@@ -1268,6 +1272,10 @@ tr:hover td {
 	background-position: 16px 0px;
 }
 
+#messageDialog .messageDiv {
+	margin: 20px;
+}
+
 table.parameters tr td.first {
 	font-weight: bold;
 }
@@ -1373,6 +1381,10 @@ svg .FAILED circle {
 	fill: #CC0000;
 }
 
+svg .KILLED circle {
+	fill: #CC0000;
+}
+
 svg .SUCCEEDED circle {
 	fill: #00CC33;
 }
@@ -1488,7 +1500,11 @@ span.sublabel {
 }
 
 #flow-status table td.FAILED {
-	color:  #CC0000;
+	color: #CC0000;
+}
+
+#flow-status table td.PAUSED {
+	color: #FF6600;
 }
 
 #flow-status table td.FAILED_FINISHING {
@@ -1586,7 +1602,7 @@ td .status.DISABLED {
 }
 
 td .status.KILLED {
-	background-color: #000;
+	background-color: #CC0000;
 }
 
 td .status.UNKNOWN {
diff --git a/src/web/js/azkaban.ajax.utils.js b/src/web/js/azkaban.ajax.utils.js
new file mode 100644
index 0000000..c087086
--- /dev/null
+++ b/src/web/js/azkaban.ajax.utils.js
@@ -0,0 +1,30 @@
+function ajaxCall(requestURL, data, callback) {
+	$.get(
+		requestURL,
+		data,
+		function(data) {
+			if (data.error == "session") {
+				// We need to relogin.
+				var errorDialog = document.getElementById("invalid-session");
+				if (errorDialog) {
+					  $(errorDialog).modal({
+					      closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+					      position: ["20%",],
+					      containerId: 'confirm-container',
+					      containerCss: {
+					        'height': '220px',
+					        'width': '565px'
+					      },
+					      onClose: function (dialog) {
+					      	window.location.reload();
+					      }
+					    });
+				}
+			}
+			else {
+				callback.call(this,data);
+			}
+		},
+		"json"
+	);
+}
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 92bcf19..9c55b1e 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -182,19 +182,75 @@ azkaban.FlowTabView= Backbone.View.extend({
   	}
   },
   handleCancelClick : function(evt) {
-  	
+    var requestURL = contextURL + "/executor";
+	ajaxCall(
+		requestURL,
+		{"execid": execId, "ajax":"cancelFlow"},
+		function(data) {
+          console.log("cancel clicked");
+          if (data.error) {
+          	showDialog("Error", data.error);
+          }
+          else {
+            showDialog("Cancelled", "Flow has been cancelled.");
+          }
+      	}
+      );
   },
   handleRestartClick : function(evt) {
-  	
   },
   handlePauseClick : function(evt) {
-  	
+  	  var requestURL = contextURL + "/executor";
+		ajaxCall(
+	      requestURL,
+	      {"execid": execId, "ajax":"pauseFlow"},
+	      function(data) {
+	          console.log("pause clicked");
+	          if (data.error) {
+	          	showDialog("Error", data.error);
+	          }
+	          else {
+	            showDialog("Paused", "Flow has been paused.");
+	          }
+	      }
+      );
   },
   handleResumeClick : function(evt) {
-  	
+     var requestURL = contextURL + "/executor";
+     ajaxCall(
+          requestURL,
+	      {"execid": execId, "ajax":"resumeFlow"},
+	      function(data) {
+	          console.log("pause clicked");
+	          if (data.error) {
+	          	showDialog("Error", data.error);
+	          }
+	          else {
+	          	showDialog("Resumed", "Flow has been resumed.");
+	          }
+	      }
+	  );
   }
 });
 
+var showDialog = function(title, message) {
+  $('#messageTitle').text(title);
+
+  $('#messageBox').text(message);
+
+  $('#messageDialog').modal({
+      closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+      position: ["20%",],
+      containerId: 'confirm-container',
+      containerCss: {
+        'height': '220px',
+        'width': '565px'
+      },
+      onShow: function (dialog) {
+      }
+    });
+}
+
 var jobListView;
 azkaban.JobListView = Backbone.View.extend({
 	events: {
@@ -765,10 +821,10 @@ var updaterFunction = function() {
 	var requestURL = contextURL + "/executor";
 	var oldData = graphModel.get("data");
 	var nodeMap = graphModel.get("nodeMap");
-	var keepRunning = oldData.status != "SUCCEEDED" && oldData.status != "FAILED";
-	
+	var keepRunning = oldData.status != "SUCCEEDED" && oldData.status != "FAILED" && oldData.status != "KILLED";
+
 	if (keepRunning) {
-		$.get(
+	     ajaxCall(
 	      requestURL,
 	      {"execid": execId, "ajax":"fetchexecflowupdate", "lastUpdateTime": updateTime},
 	      function(data) {
@@ -792,14 +848,13 @@ var updaterFunction = function() {
 	          }
 
 	          graphModel.set({"update": data});
-	      },
-	      "json"
-	    );
+	      }
+		);
 		
 		var data = graphModel.get("data");
 		if (data.status != "SUCCEEDED" && data.status != "FAILED" ) {
 			// 10 sec updates
-			setTimeout(function() {updaterFunction();}, 10000);
+			setTimeout(function() {updaterFunction();}, 5000);
 		}
 		else {
 			console.log("Flow finished, so no more updates");
@@ -821,7 +876,7 @@ $(function() {
 	executionListView = new azkaban.ExecutionListView({el: $('#jobListView'), model:graphModel});
 	var requestURL = contextURL + "/executor";
 
-	$.get(
+	ajaxCall(
 	      requestURL,
 	      {"execid": execId, "ajax":"fetchexecflow"},
 	      function(data) {
@@ -850,8 +905,7 @@ $(function() {
 					}
 			 }
 	          
-	      	  setTimeout(function() {updaterFunction()}, 5000);
-	      },
-	      "json"
+	      	  setTimeout(function() {updaterFunction()}, 2500);
+	      }
 	    );
 });