azkaban-aplcache

Details

diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 997d5fc..45ad983 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -18,21 +18,22 @@ public class ExecutableFlow {
 	private String flowId;
 	private String projectId;
 	private String executionPath;
-
+	
 	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
 	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();;
-	private ArrayList<String> startNodes = new ArrayList<String>();
+	private ArrayList<String> startNodes;
+	private ArrayList<String> endNodes;
 	
 	private long submitTime = -1;
 	private long startTime = -1;
 	private long endTime = -1;
 	
+	private int updateNumber = 0;
 	private Status flowStatus = Status.UNKNOWN;
-
 	private String submitUser;
 	
 	public enum Status {
-		FAILED, SUCCEEDED, RUNNING, WAITING, KILLED, IGNORED, READY, UNKNOWN
+		FAILED, FAILED_FINISHING, SUCCEEDED, RUNNING, WAITING, KILLED, DISABLED, READY, UNKNOWN
 	}
 	
 	public ExecutableFlow(String id, Flow flow) {
@@ -58,6 +59,14 @@ public class ExecutableFlow {
 		return flowProps.values();
 	}
 	
+	public int getUpdateNumber() {
+		return updateNumber;
+	}
+	
+	public void setUpdateNumber(int number) {
+		updateNumber = number;
+	}
+	
 	private void setFlow(Flow flow) {
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
@@ -73,15 +82,35 @@ public class ExecutableFlow {
 			targetNode.addInNode(edge.getSourceId());
 		}
 		
-		for (ExecutableNode node : executableNodes.values()) {
-			if (node.getInNodes().size()==0) {
-				startNodes.add(node.id);
+		flowProps.putAll(flow.getAllFlowProps());
+	}
+
+	public List<String> getStartNodes() {
+		if (startNodes == null) {
+			startNodes = new ArrayList<String>();
+			for (ExecutableNode node: executableNodes.values()) {
+				if (node.getInNodes().isEmpty()) {
+					startNodes.add(node.id);
+				}
 			}
 		}
 		
-		flowProps.putAll(flow.getAllFlowProps());
+		return startNodes;
 	}
-
+	
+	public List<String> getEndNodes() {
+		if (endNodes == null) {
+			endNodes = new ArrayList<String>();
+			for (ExecutableNode node: executableNodes.values()) {
+				if (node.getOutNodes().isEmpty()) {
+					endNodes.add(node.id);
+				}
+			}
+		}
+		
+		return endNodes;
+	}
+	
 	public void setStatus(String nodeId, Status status) {
 		ExecutableNode exNode = executableNodes.get(nodeId);
 		exNode.setStatus(status);
@@ -151,10 +180,6 @@ public class ExecutableFlow {
 		this.flowStatus = flowStatus;
 	}
 	
-	public List<String> getStartNodes() {
-		return new ArrayList<String>(startNodes);
-	}
-	
 	public Map<String,Object> toObject() {
 		HashMap<String, Object> flowObj = new HashMap<String, Object>();
 		flowObj.put("type", "executableflow");
@@ -167,7 +192,6 @@ public class ExecutableFlow {
 		flowObj.put("endTime", endTime);
 		flowObj.put("status", flowStatus.toString());
 		flowObj.put("submitUser", submitUser);
-		flowObj.put("startNodes", startNodes);
 		
 		ArrayList<Object> props = new ArrayList<Object>();
 		for (FlowProps fprop: flowProps.values()) {
@@ -222,8 +246,6 @@ public class ExecutableFlow {
 			FlowProps flowProps = new FlowProps(inheritedSource, source);
 			exFlow.flowProps.put(source, flowProps);
 		}
-
-		exFlow.startNodes.addAll((List<String>)flowObj.get("startNodes"));
 		
 		return exFlow;
 	}
@@ -240,9 +262,9 @@ public class ExecutableFlow {
 	public void updateExecutableFlowFromObject(Object obj) {
 		HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
 
-		submitTime = (Long)flowObj.get("submitTime");
-		startTime = (Long)flowObj.get("startTime");
-		endTime = (Long)flowObj.get("endTime");
+		submitTime = getLongFromObject(flowObj.get("submitTime"));
+		startTime = getLongFromObject(flowObj.get("startTime"));
+		endTime = getLongFromObject(flowObj.get("endTime"));
 		flowStatus = Status.valueOf((String)flowObj.get("status"));
 		
 		List<Object> nodes = (List<Object>)flowObj.get("nodes");
@@ -385,8 +407,8 @@ public class ExecutableFlow {
 			HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
 			status = Status.valueOf((String)objMap.get("status"));
 
-			startTime = (Long)objMap.get("startTime");
-			endTime = (Long)objMap.get("endTime");
+			startTime = getLongFromObject(objMap.get("startTime"));
+			endTime = getLongFromObject(objMap.get("endTime"));
 		}
 		
 		public long getStartTime() {
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index a38f5c9..5893857 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -12,18 +12,23 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
 
+import azkaban.executor.ExecutableFlow.Status;
 import azkaban.flow.Flow;
 import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.JSONUtils;
@@ -43,9 +48,11 @@ public class ExecutorManager {
 	private int portNumber;
 	private String url = "localhost";
 	
-	private HashMap<String, ExecutableFlow> runningFlows = new HashMap<String, ExecutableFlow>();
+	private ConcurrentHashMap<String, ExecutableFlow> runningFlows = new ConcurrentHashMap<String, ExecutableFlow>();
+	private LinkedList<ExecutableFlow> recentlyFinished = new LinkedList<ExecutableFlow>();
+	private int recentlyFinishedSize = 10;
 	
-	public ExecutorManager(Props props) {
+	public ExecutorManager(Props props) throws IOException, ExecutorManagerException {
 		basePath = new File(props.getString("execution.directory"));
 		if (!basePath.exists()) {
 			logger.info("Execution directory " + basePath + " not found.");
@@ -57,134 +64,93 @@ public class ExecutorManager {
 			}
 		}
 		
-		portNumber = props.getInt("executor.port", AzkabanExecutorServer.DEFAULT_PORT_NUMBER);
-		token = props.getString("executor.shared.token", "");
-		counter.set(0);
-		loadActiveExecutions();
-	}
-	
-	public List<ExecutableFlow> getExecutableFlowByProject(String projectId, int from, int maxResults) {
-		File activeFlows = new File(basePath, projectId + File.separatorChar + "active");
-		
-		if (!activeFlows.exists()) {
-			return Collections.emptyList();
+		File activePath = new File(basePath, "active");
+		if(!activePath.exists() && !activePath.mkdirs()) {
+			throw new RuntimeException("Execution directory " + activePath + " does not exist and cannot be created.");
 		}
 		
-		File[] executionFiles = activeFlows.listFiles();
-		if (executionFiles.length == 0 || from >= executionFiles.length) {
-			return Collections.emptyList();
+		File archivePath = new File(basePath, "archive");
+		if(!archivePath.exists() && !archivePath.mkdirs()) {
+			throw new RuntimeException("Execution directory " + archivePath + " does not exist and cannot be created.");
 		}
-
-		Arrays.sort(executionFiles);
-
-		ArrayList<ExecutableFlow> executionFlows = new ArrayList<ExecutableFlow>();
 		
-		int index = (executionFiles.length - from - 1);
-		for (int count = 0; count < maxResults && index >= 0; ++count, --index) {
-			File exDir = executionFiles[index];
-			ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exDir);
-			
-			if (flow != null) {
-				executionFlows.add(flow);
-			}
-			else {
-				logger.info("Skipping loading " + exDir + ". Couldn't load execution.");
-			}
-		}
+		portNumber = props.getInt("executor.port", AzkabanExecutorServer.DEFAULT_PORT_NUMBER);
+		token = props.getString("executor.shared.token", "");
+		counter.set(0);
+		loadActiveExecutions();
 		
-		return executionFlows;
+		ExecutingManagerUpdaterThread executingManager = new ExecutingManagerUpdaterThread();
+		executingManager.start();
 	}
 	
-	public int getExecutableFlowByProjectFlow(String projectId, String flowName, int from, int maxResults, List<ExecutableFlow> results) {
-		File activeFlows = new File(basePath, projectId + File.separatorChar + "active");
-		
-		if (!activeFlows.exists()) {
+	public int getExecutableFlows(String projectId, String flowId, int from, int maxResults, List<ExecutableFlow> output) {
+		String projectPath = projectId + File.separator + flowId;
+		File flowProjectPath = new File(basePath, projectPath);
+
+		if (!flowProjectPath.exists()) {
 			return 0;
 		}
 		
-		File[] executionFiles = activeFlows.listFiles(new SuffixFilter(flowName, false));
-		//File[] executionFiles = activeFlows.listFiles();
+		File[] executionFiles = flowProjectPath.listFiles();
 		if (executionFiles.length == 0 || from >= executionFiles.length) {
 			return 0;
 		}
+		
+		// Sorts the file in ascending order, so we read the list backwards.
 		Arrays.sort(executionFiles);
-
-		int count = 0;
-		for (int index = executionFiles.length - from - 1; count < maxResults && index>=0; --index ) {
+		int index = (executionFiles.length - from - 1);
+		
+		for (int count = 0; count < maxResults && index >= 0; ++count, --index) {
 			File exDir = executionFiles[index];
-			ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exDir);
-			
-			if (flow != null) {
-				results.add(flow);
-				count++;
+			try {
+				ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exDir);
+				output.add(flow);
 			}
-			else {
-				logger.info("Skipping loading " + exDir + ". Couldn't load execution.");
+			catch (ExecutorManagerException e) {
+				logger.error("Skipping loading " + exDir + ". Couldn't load execution.", e);
 			}
 		}
 		
 		return executionFiles.length;
 	}
-//	
-//	private ExecutableFlow loadExecutableFlowFromDir(File exDir) {
-//		logger.info("Loading execution " + exDir.getName());
-//		String exFlowName = exDir.getName();
-//		
-//		String flowFileName = "_" + exFlowName + ".flow";
-//		File[] exFlowFiles = exDir.listFiles(new PrefixFilter(flowFileName));
-//		Arrays.sort(exFlowFiles);
-//		
-//		if (exFlowFiles.length <= 0) {
-//			logger.error("Execution flow " + exFlowName + " missing flow file.");
-//			return null;
-//		}
-//		File lastExFlow = exFlowFiles[exFlowFiles.length-1];
-//		
-//		Object exFlowObj = null;
-//		try {
-//			exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
-//		} catch (IOException e) {
-//			logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
-//			return null;
-//		}
-//		
-//		ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
-//		return flow;
-//	}
-//	
-	private void loadActiveExecutions() {
-		File[] executingProjects = basePath.listFiles();
-		for (File project: executingProjects) {
-			File activeFlows = new File(project, "active");
-			if (!activeFlows.exists()) {
-				continue;
-			}
-			
-			for (File exflow: activeFlows.listFiles()) {
-				logger.info("Loading execution " + exflow.getName());
-				ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(exflow);
+
+	
+	private void loadActiveExecutions() throws IOException, ExecutorManagerException {
+		File activeFlows = new File(basePath, "active");
+		File[] activeFlowDirs = activeFlows.listFiles();
+		
+		for (File activeFlowDir: activeFlowDirs) {
+			if (activeFlowDir.isDirectory()) {
+				ExecutionReference reference = ExecutionReference.readFromDirectory(activeFlowDir);
 				
+				ExecutableFlow flow = this.getFlowFromReference(reference);
 				if (flow != null) {
-					logger.info("Adding active execution flow " + flow.getExecutionId());
 					runningFlows.put(flow.getExecutionId(), flow);
 				}
+				else {
+					logger.error("Flow " + reference.getExecId() + " not found.");
+				}
+			}
+			else {
+				logger.info("Path " + activeFlowDir + " not a directory.");
 			}
 		}
 	}
 	
 	public synchronized ExecutableFlow createExecutableFlow(Flow flow) {
 		String projectId = flow.getProjectId();
+		String flowId = flow.getId();
 		
-		File projectExecutionDir = new File(basePath, projectId);
-		String id = flow.getId();
-		
+		String flowExecutionDir = projectId + File.separator + flowId;
+		File projectExecutionDir = new File(basePath, flowExecutionDir);
+
 		// Find execution
 		File executionDir;
 		String executionId;
 		int count = counter.getAndIncrement();
 		String countString = String.format("%05d", count);
 		do {
-			executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + id;
+			executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + flowId;
 			executionDir = new File(projectExecutionDir, executionId);
 		}
 		while(executionDir.exists());
@@ -195,7 +161,8 @@ public class ExecutorManager {
 	
 	public synchronized void setupExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
 		String executionId = exflow.getExecutionId();
-		String projectFlowDir = exflow.getProjectId() + File.separator + "active" + File.separator + executionId;
+
+		String projectFlowDir = exflow.getProjectId() + File.separator + exflow.getFlowId() + File.separator + executionId;
 		File executionPath = new File(basePath, projectFlowDir);
 		if (executionPath.exists()) {
 			throw new ExecutorManagerException("Execution path " + executionPath + " exists. Probably a simultaneous execution.");
@@ -203,13 +170,63 @@ public class ExecutorManager {
 		
 		executionPath.mkdirs();
 		exflow.setExecutionPath(executionPath.getPath());
-		runningFlows.put(executionId, exflow);
 	}
-	
-	public synchronized ExecutableFlow getExecutableFlow(String flowId) throws ExecutorManagerException {
-		ExecutableFlow flow = runningFlows.get(flowId);
+
+	public synchronized ExecutableFlow getExecutableFlow(String executionId) throws ExecutorManagerException {
+		ExecutableFlow flow = runningFlows.get(executionId);
+		if (flow != null) {
+			return flow;
+		}
 		
-		return flow;
+		// Check active
+		File baseActiveDir = new File(basePath, "active");
+		File referenceDir = new File(baseActiveDir, executionId);
+		
+		if (!referenceDir.exists()) {
+			File baseArchiveDir = new File(basePath, "archive");
+			referenceDir = new File(baseArchiveDir, executionId);
+			if (!referenceDir.exists()) {
+				throw new ExecutorManagerException("Execution id '" + executionId + "' not found. Searching " + referenceDir);
+			}
+		}
+
+		ExecutionReference reference = null;
+		try {
+			reference = ExecutionReference.readFromDirectory(referenceDir);
+		} catch (IOException e) {
+			throw new ExecutorManagerException(e.getMessage(), e);
+		}
+
+		
+		return getFlowFromReference(reference);
+	}
+	
+	private ExecutableFlow getFlowFromReference(ExecutionReference reference) throws ExecutorManagerException {
+		File executionPath = new File(reference.getExecPath());
+		if (executionPath.exists()) {
+			return ExecutableFlowLoader.loadExecutableFlowFromDir(executionPath);
+		}
+		return null;
+	}
+	
+	private synchronized void addActiveExecutionReference(ExecutableFlow flow) throws ExecutorManagerException {
+		File activeDirectory = new File(basePath, "active");
+		if (!activeDirectory.exists()) {
+			activeDirectory.mkdirs();
+		}
+
+		// Create execution reference directory
+		File referenceDir = new File(activeDirectory, flow.getExecutionId());
+		referenceDir.mkdirs();
+
+		// We don't really need to save the reference, 
+		ExecutionReference reference = new ExecutionReference(flow);
+		try {
+			reference.writeToDirectory(referenceDir);
+		} catch (IOException e) {
+			throw new ExecutorManagerException("Couldn't write execution to directory.", e);
+		}
+		runningFlows.put(flow.getExecutionId(), flow);
 	}
 	
 	public void executeFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -217,16 +234,20 @@ public class ExecutorManager {
 		File executionDir = new File(executionPath);
 		flow.setSubmitTime(System.currentTimeMillis());
 		
-		File resourceFile = writeResourceFile(executionDir, flow);
-		File executableFlowFile = writeExecutableFlowFile(executionDir, flow);
+		writeResourceFile(executionDir, flow);
+		ExecutableFlowLoader.writeExecutableFlowFile(executionDir, flow, null);
+		addActiveExecutionReference(flow);
+		runningFlows.put(flow.getExecutionId(), flow);
+		
 		logger.info("Setting up " + flow.getExecutionId() + " for execution.");
 		
 		URIBuilder builder = new URIBuilder();
 		builder.setScheme("http")
 			.setHost(url)
 			.setPort(portNumber)
-			.setPath("/submit")
+			.setPath("/executor")
 			.setParameter("sharedToken", token)
+			.setParameter("action", "execute")
 			.setParameter("execid", flow.getExecutionId())
 			.setParameter("execpath", flow.getExecutionPath());
 		
@@ -239,17 +260,24 @@ public class ExecutorManager {
 			return;
 		}
 
+		ResponseHandler<String> responseHandler = new BasicResponseHandler();
+		
 		logger.info("Submitting flow " + flow.getExecutionId() + " for execution.");
 		HttpClient httpclient = new DefaultHttpClient();
 		HttpGet httpget = new HttpGet(uri);
-		HttpResponse response = null;
+		String response = null;
 		try {
-			response = httpclient.execute(httpget);
+			response = httpclient.execute(httpget, responseHandler);
 		} catch (IOException e) {
 			flow.setStatus(ExecutableFlow.Status.FAILED);
 			e.printStackTrace();
 			return;
 		}
+		finally {
+			httpclient.getConnectionManager().shutdown();
+		}
+		
+		logger.debug("Submitted Response: " + response);
 	}
 	
 	public void cleanupAll(ExecutableFlow exflow) throws ExecutorManagerException{
@@ -298,36 +326,6 @@ public class ExecutorManager {
 		return resourceFile;
 	}
 	
-	private File writeExecutableFlowFile(File executionDir, ExecutableFlow flow) throws ExecutorManagerException {
-		// Write out the execution file
-		String flowFileName = "_" + flow.getExecutionId() + ".flow";
-		File flowFile = new File(executionDir, flowFileName);
-		if (flowFile.exists()) {
-			throw new ExecutorManagerException("The flow file " + flowFileName + " already exists. Race condition?");
-		}
-
-		BufferedOutputStream out = null;
-		try {
-			logger.info("Writing executable file " + flowFile);
-			out = new BufferedOutputStream(new FileOutputStream(flowFile));
-			JSONUtils.toJSON(flow.toObject(), out, true);
-		} catch (IOException e) {
-			throw new ExecutorManagerException(e.getMessage(), e);
-		}
-		finally {
-			if (out != null) {
-				try {
-					out.close();
-				} catch (IOException e) {
-					// TODO Auto-generated catch block
-					e.printStackTrace();
-				}
-			}
-		}
-		
-		return flowFile;
-	}
-	
 	private HashMap<String, Object> createResourcesList(File baseDir, File dir, Set<String> sourceFiles) {
 		boolean containsSource = false;
 		
@@ -430,47 +428,230 @@ public class ExecutorManager {
 		}
 	}
 
-	private class ExecutingFlow implements Runnable {
-		public void run() {
-			
+	private String getFlowStatusInExecutor(ExecutableFlow flow) throws IOException {
+		URIBuilder builder = new URIBuilder();
+		builder.setScheme("http")
+			.setHost(url)
+			.setPort(portNumber)
+			.setPath("/executor")
+			.setParameter("sharedToken", token)
+			.setParameter("action", "status")
+			.setParameter("execid", flow.getExecutionId());
+		
+		URI uri = null;
+		try {
+			uri = builder.build();
+		} catch (URISyntaxException e) {
+			e.printStackTrace();
+			throw new IOException("Bad URI", e);
 		}
-	}
-	
-	private void updateRunningJobs() {
 		
+		HttpClient httpclient = new DefaultHttpClient();
+		HttpGet httpget = new HttpGet(uri);
+		String response = null;
+		ResponseHandler<String> responseHandler = new BasicResponseHandler();
+		
+		try {
+			response = httpclient.execute(httpget, responseHandler);
+		} catch (IOException e) {
+			e.printStackTrace();
+			throw new IOException("Connection problem", e);
+		}
+		finally {
+			httpclient.getConnectionManager().shutdown();
+		}
+		return response;
 	}
 	
-	private String createUniqueId(String projectId, String flowId) {
-		return null;
+	private void cleanFinishedJob(ExecutableFlow exFlow) throws ExecutorManagerException {
+		
+		// Write final file
+		int updateNum = exFlow.getUpdateNumber();
+		updateNum++;
+		ExecutableFlowLoader.writeExecutableFlowFile(new File(exFlow.getExecutionPath()), exFlow, updateNum);
+		
+		String activeReferencePath = "active" + File.separator + exFlow.getExecutionId(); 
+		File activeDirectory = new File(basePath, activeReferencePath);
+		if (!activeDirectory.exists()) {
+			logger.error("WTF!! Active reference " + activeDirectory + " directory doesn't exist.");
+			throw new ExecutorManagerException("Active reference " + activeDirectory + " doesn't exists.");
+		}
+		
+		String archiveReferencePath = "archive" + File.separator + exFlow.getExecutionId(); 
+		File archiveDirectory = new File(basePath, archiveReferencePath);
+		if (archiveDirectory.exists()) {
+			logger.error("WTF!! Archive reference already exists!");
+			throw new ExecutorManagerException("Active reference " + archiveDirectory + " already exists.");
+		}
+		
+		// Move file.
+		if (!activeDirectory.renameTo(archiveDirectory)) {
+			throw new ExecutorManagerException("Cannot move " + activeDirectory + " to " + archiveDirectory);
+		}
+		
+		runningFlows.remove(exFlow.getExecutionId());
+		cleanupUnusedFiles(exFlow);
 	}
 	
-	private static class PrefixFilter implements FileFilter {
-		private String prefix;
-
-		public PrefixFilter(String prefix) {
-			this.prefix = prefix;
-		}
+	private class ExecutingManagerUpdaterThread extends Thread {
+		private boolean shutdown = false;
+		private int updateTimeMs = 100;
+		public void run() {
+			while (!shutdown) {
+				ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>(runningFlows.values());
+				for(ExecutableFlow exFlow : flows) {
+					File executionDir = new File(exFlow.getExecutionPath());
+					
+					if (!executionDir.exists()) {
+						logger.error("WTF!! Execution dir " + executionDir + " doesn't exist!");
+						// @TODO probably should handle this error case somehow. Cleanup?
+						continue;
+					}
 
-		@Override
-		public boolean accept(File pathname) {
-			String name = pathname.getName();
+					// Query the executor service to see if the item is running.
+					String responseString = null;
+					try {
+						responseString = getFlowStatusInExecutor(exFlow);
+					} catch (IOException e) {
+						// Connection issue. Backoff 1 sec.
+						synchronized(this) {
+							try {
+								wait(1000);
+							} catch (InterruptedException ie) {
+							}
+						}
+						continue;
+					}
+					
+					Object executorResponseObj;
+					try {
+						executorResponseObj = JSONUtils.parseJSONFromString(responseString);
+					} catch (IOException e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+						continue;
+					}
+					
+					@SuppressWarnings("unchecked")
+					HashMap<String, Object> response = (HashMap<String, Object>)executorResponseObj;
+					String status = (String)response.get("status");
+					
+					try {
+						ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow);
+					} catch (ExecutorManagerException e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+						continue;
+					}
 
-			return pathname.isFile() && !pathname.isHidden() && name.length() >= prefix.length() && name.startsWith(prefix);
+					// If it's completed, and not running, we clean up.
+					if (exFlow.getStatus() == Status.SUCCEEDED || exFlow.getStatus() == Status.FAILED || exFlow.getStatus() == Status.KILLED) {
+						if (status.equals("notfound")) {
+							// Cleanup
+							logger.info("Flow " + exFlow.getExecutionId() + " has succeeded. Cleaning Up.");
+							try {
+								cleanFinishedJob(exFlow);
+							} catch (ExecutorManagerException e) {
+								e.printStackTrace();
+								continue;
+							}
+						}
+						else {
+							logger.error("Flow " + exFlow.getExecutionId() + " has succeeded, but the Executor says its still running");
+						}
+					}
+					else {
+						// If it's not finished, and not running, we will fail it and clean up.
+						if (status.equals("notfound")) {
+							logger.error("Flow " + exFlow.getExecutionId() + " is running, but the Executor can't find it.");
+							exFlow.setEndTime(System.currentTimeMillis());
+							exFlow.setStatus(Status.FAILED);
+						}
+					}
+					
+					synchronized(this) {
+						try {
+							wait(updateTimeMs);
+						} catch (InterruptedException e) {
+							// TODO Auto-generated catch block
+							e.printStackTrace();
+						}
+					}
+				}
+			}
 		}
 	}
 	
-	private static class SuffixFilter implements FileFilter {
-		private String suffix;
-		private boolean filesOnly = false;
-
-		public SuffixFilter(String suffix, boolean filesOnly) {
-			this.suffix = suffix;
+	private static class ExecutionReference {
+		private String execId;
+		private String projectId;
+		private String flowId;
+		private String userId;
+		private String execPath;
+		
+		public ExecutionReference() {
+		}
+		
+		public ExecutionReference(ExecutableFlow flow) {
+			this.execId = flow.getExecutionId();
+			this.projectId = flow.getProjectId();
+			this.flowId = flow.getFlowId();
+			this.userId = flow.getSubmitUser();
+			this.execPath = flow.getExecutionPath();
 		}
+		
+		private Object toObject() {
+			HashMap<String, Object> obj = new HashMap<String, Object>();
+			obj.put("execId", execId);
+			obj.put("projectId", projectId);
+			obj.put("flowId", flowId);
+			obj.put("userId", userId);
+			obj.put("execPath", execPath);
+			return obj;
+		}
+		
+		public void writeToDirectory(File directory) throws IOException {
+			File file = new File(directory, "execution.json");
+			if (!file.exists()) {
+				JSONUtils.toJSON(this.toObject(), file);
+			}
+		}
+		
+		public static ExecutionReference readFromDirectory(File directory) throws IOException {
+			File file = new File(directory, "execution.json");
+			if (!file.exists()) {
+				throw new IOException("Execution file execution.json does not exist.");
+			}
+			
+			HashMap<String, Object> obj = (HashMap<String, Object>)JSONUtils.parseJSONFromFile(file);
+			ExecutionReference reference = new ExecutionReference();
+			reference.execId = (String)obj.get("execId");
+			reference.projectId = (String)obj.get("projectId");
+			reference.flowId = (String)obj.get("flowId");
+			reference.userId = (String)obj.get("userId");
+			reference.execPath = (String)obj.get("execPath");
 
-		@Override
-		public boolean accept(File pathname) {
-			String name = pathname.getName();
-			return (pathname.isFile() || !filesOnly) && !pathname.isHidden() && name.length() >= suffix.length() && name.endsWith(suffix);
+			return reference;
+		}
+		
+		public String getExecId() {
+			return execId;
+		}
+		
+		public String getProjectId() {
+			return projectId;
+		}
+		
+		public String getFlowId() {
+			return flowId;
+		}
+		
+		public String getUserId() {
+			return userId;
+		}
+		
+		public String getExecPath() {
+			return execPath;
 		}
 	}
 }
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 26a9ff2..43dd743 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -3,11 +3,15 @@ package azkaban.executor;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import azkaban.executor.ExecutableFlow.ExecutableNode;
 import azkaban.executor.ExecutableFlow.Status;
@@ -16,6 +20,7 @@ import azkaban.executor.event.Event.Type;
 import azkaban.executor.event.EventHandler;
 import azkaban.executor.event.EventListener;
 import azkaban.flow.FlowProps;
+import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.Props;
 
 public class FlowRunner extends EventHandler implements Runnable {
@@ -26,18 +31,18 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
 	private int numThreads = NUM_CONCURRENT_THREADS;
 	private boolean cancelled = true;
-	private boolean done = false;
 	
 	private Map<String, JobRunner> jobRunnersMap;
 	private JobRunnerEventListener listener;
 	private Map<String, Props> sharedProps = new HashMap<String, Props>();
 	private Map<String, Props> outputProps = new HashMap<String, Props>();
 	private File basePath;
-	
+	private AtomicInteger commitCount = new AtomicInteger(0);
+	private HashSet<String> finalNodes = new HashSet<String>();
+
 	public enum FailedFlowOptions {
 		FINISH_RUNNING_JOBS,
-		COMPLETE_ALL_DEPENDENCIES,
-		CANCEL_ALL
+		KILL_ALL
 	}
 	
 	private FailedFlowOptions failedOptions = FailedFlowOptions.FINISH_RUNNING_JOBS;
@@ -55,7 +60,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 	
 	public void cancel() {
-		done = true;
+		finalNodes.clear();
 		cancelled = true;
 		
 		executorService.shutdownNow();
@@ -74,8 +79,21 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return cancelled;
 	}
 	
+	private synchronized void commitFlow() {
+		int count = commitCount.getAndIncrement();
+
+		try {
+			ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow, count);
+		} catch (ExecutorManagerException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+	
 	@Override
 	public void run() {
+		flow.setStatus(Status.RUNNING);
+		flow.setStartTime(System.currentTimeMillis());
 		this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
 		
 		// Load all shared props
@@ -104,18 +122,48 @@ public class FlowRunner extends EventHandler implements Runnable {
 			return;
 		}
 		
-		while(!done) {
+		// When this is empty, we will stop.
+		finalNodes.addAll(flow.getEndNodes());
+		
+		// Main loop
+		while(!finalNodes.isEmpty()) {
 			JobRunner runner = null;
 			try {
 				runner = jobsToRun.take();
 			} catch (InterruptedException e) {
 			}
 			
-			if (!done && runner != null) {
-				executorService.submit(runner);
+			if (!finalNodes.isEmpty() && runner != null) {
+				try {
+					ExecutableNode node = runner.getNode();
+					node.setStatus(Status.WAITING);
+					executorService.submit(runner);
+					finalNodes.remove(node.getId());
+				} catch (RejectedExecutionException e) {
+					// Should reject if I shutdown executor.
+					break;
+				}
 			}
 		}
 		
+		executorService.shutdown();
+		while (executorService.isTerminated()) {
+			try {
+				executorService.awaitTermination(1, TimeUnit.SECONDS);
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+
+		if (flow.getStatus() == Status.RUNNING) {
+			flow.setStatus(Status.SUCCEEDED);
+		}
+		else {
+			flow.setStatus(Status.FAILED);
+		}
+		flow.setEndTime(System.currentTimeMillis());
+		commitFlow();
 		this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
 	}
 	
@@ -136,7 +184,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		File propsFile = new File(basePath, source);
 		Props jobProps = new Props(parentProps, propsFile);
 		
-		JobRunner jobRunner = new JobRunner(node, jobProps);
+		JobRunner jobRunner = new JobRunner(node, jobProps, new File(flow.getExecutionPath()));
 		jobRunner.addListener(listener);
 		jobRunnersMap.put(node.getId(), jobRunner);
 		
@@ -176,7 +224,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			for (String dependency: dependentNode.getInNodes()) {
 				ExecutableNode dependencyNode = flow.getExecutableNode(dependency); 
 				if (dependencyNode.getStatus() != Status.SUCCEEDED &&
-					dependencyNode.getStatus() != Status.IGNORED) {
+					dependencyNode.getStatus() != Status.DISABLED) {
 					ready = false;
 					break;
 				}
@@ -213,14 +261,20 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private void handleFailedJob(ExecutableNode node) {
 		System.err.println("Job " + node.getId() + " failed.");
 		this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
-		if (failedOptions == FailedFlowOptions.FINISH_RUNNING_JOBS) {
-			done = true;
-		}
-		else if (failedOptions == FailedFlowOptions.CANCEL_ALL) {
-			this.cancel();
-		}
-		else if (failedOptions == FailedFlowOptions.COMPLETE_ALL_DEPENDENCIES) {
+		
+		switch (failedOptions) {
+			// We finish running current jobs and then fail. Do not accept new jobs.
+			case FINISH_RUNNING_JOBS:
+				finalNodes.clear();
+				executorService.shutdown();
+				this.notify();
+			break;
+			// We kill all running jobs and fail immediately
+			case KILL_ALL:
+				this.cancel();
+			break;
 		}
+
 	}
 	
 	private class JobRunnerEventListener implements EventListener {
@@ -236,16 +290,17 @@ public class FlowRunner extends EventHandler implements Runnable {
 			String jobID = runner.getNode().getId();
 			System.out.println("Event " + jobID + " " + event.getType().toString());
 
+			// On Job success, we add the output props and then set up the next run.
 			if (event.getType() == Type.JOB_SUCCEEDED) {
-				// Continue adding items.
 				Props props = runner.getOutputProps();
 				outputProps.put(jobID, props);
-				
 				flowRunner.handleSucceededJob(runner.getNode());
 			}
 			else if (event.getType() == Type.JOB_FAILED) {
 				flowRunner.handleFailedJob(runner.getNode());
 			}
+			
+			flowRunner.commitFlow();
 		}
 	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index f07af9d..b4879cd 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -10,6 +10,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.log4j.Logger;
 
 import azkaban.executor.event.Event;
+import azkaban.executor.event.Event.Type;
 import azkaban.executor.event.EventListener;
 import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.Props;
@@ -65,6 +66,15 @@ public class FlowRunnerManager {
 		return runningFlows.get(id);
 	}
 	
+	public ExecutableFlow getExecutableFlow(String id) {
+		FlowRunner runner = runningFlows.get(id);
+		if (runner == null) {
+			return null;
+		}
+		
+		return runner.getFlow();
+	}
+	
 	private class SubmitterThread extends Thread {
 		private BlockingQueue<FlowRunner> queue;
 		private boolean shutdown = false;
@@ -102,7 +112,13 @@ public class FlowRunnerManager {
 		public synchronized void handleEvent(Event event) {
 			FlowRunner runner = (FlowRunner)event.getRunner();
 			ExecutableFlow flow = runner.getFlow();
+			
 			System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
+			if (event.getType() == Type.FLOW_FINISHED) {
+				logger.info("Flow " + flow.getExecutionId() + " has finished.");
+				runningFlows.remove(flow.getExecutionId());
+			}
+
 		}
 	}
 }
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 7e2d45a..24615a5 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -1,5 +1,7 @@
 package azkaban.executor;
 
+import java.io.File;
+
 import azkaban.executor.ExecutableFlow.ExecutableNode;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.event.Event;
@@ -11,11 +13,13 @@ public class JobRunner  extends EventHandler implements Runnable {
 	private Props props;
 	private Props outputProps;
 	private ExecutableNode node;
+	private File workingDir;
 	
-	public JobRunner(ExecutableNode node, Props props) {
+	public JobRunner(ExecutableNode node, Props props, File workingDir) {
 		this.props = props;
 		this.node = node;
 		this.node.setStatus(Status.WAITING);
+		this.workingDir = workingDir;
 	}
 	
 	public ExecutableNode getNode() {
@@ -24,17 +28,23 @@ public class JobRunner  extends EventHandler implements Runnable {
 	
 	@Override
 	public void run() {
-		if (node.getStatus() == Status.KILLED) {
+		if (node.getStatus() == Status.DISABLED) {
+			this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
+			return;
+		}
+		else if (node.getStatus() == Status.KILLED) {
 			this.fireEventListeners(Event.create(this, Type.JOB_KILLED));
 			return;
 		}
-		
-		this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
+		node.setStartTime(System.currentTimeMillis());
 		node.setStatus(Status.RUNNING);
+		this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
+		
 		
 		// Run Job
 		boolean succeeded = true;
 		
+		node.setEndTime(System.currentTimeMillis());
 		if (succeeded) {
 			node.setStatus(Status.SUCCEEDED);
 			this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
@@ -44,7 +54,7 @@ public class JobRunner  extends EventHandler implements Runnable {
 			this.fireEventListeners(Event.create(this, Type.JOB_FAILED));
 		}
 	}
-
+	
 	public void cancel() {
 		// Cancel code here
 		
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index 613f3d4..3f862b4 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -1,18 +1,83 @@
 package azkaban.utils;
 
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
 
 public class ExecutableFlowLoader {
 	private static final Logger logger = Logger.getLogger(ExecutableFlowLoader.class.getName());
 	
-	public static ExecutableFlow loadExecutableFlowFromDir(File exDir) {
+	/**
+	 * Loads and create ExecutableFlow from the latest execution file.
+	 * 
+	 * @param exDir
+	 * @return
+	 * @throws ExecutorManagerException
+	 */
+	public static ExecutableFlow loadExecutableFlowFromDir(File exDir) throws ExecutorManagerException {
+		File flowFile = getLatestExecutableFlowDir(exDir);
+		Object exFlowObj = getFlowObjectFromFile(flowFile);
+
+		int updateNumber = getFlowUpdateNumber(flowFile);
+		ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
+		flow.setUpdateNumber(updateNumber);
+		flow.setExecutionPath(exDir.getPath());
+		return flow;
+	}
+	
+	/**
+	 * Get the latest update number from file.
+	 * @param file
+	 * @return
+	 */
+	private static int getFlowUpdateNumber(File file) {
+		String[] namesplit = file.getName().split("\\.");
+		
+		Integer number = 0;
+		try {
+			number = Integer.parseInt(namesplit[namesplit.length - 1]);
+		}
+		catch(NumberFormatException e) {
+		}
+		
+		return number;
+	}
+	
+	/**
+	 * Get Flow object from file
+	 * 
+	 * @param file
+	 * @return
+	 * @throws ExecutorManagerException
+	 */
+	private static Object getFlowObjectFromFile(File file) throws ExecutorManagerException {
+		Object exFlowObj = null;
+		try {
+			exFlowObj = JSONUtils.parseJSONFromFile(file);
+		} catch (IOException e) {
+			logger.error("Error loading execution flow " + file.getName() + ". Problems parsing json file.");
+			throw new ExecutorManagerException(e.getMessage(), e);
+		}
+		
+		return exFlowObj;
+	}
+	
+	/**
+	 * Get the latest executable flow dir
+	 * 
+	 * @param exDir
+	 * @return
+	 * @throws ExecutorManagerException
+	 */
+	private static File getLatestExecutableFlowDir(File exDir) throws ExecutorManagerException {
 		String exFlowName = exDir.getName();
 		
 		String flowFileName = "_" + exFlowName + ".flow";
@@ -21,23 +86,83 @@ public class ExecutableFlowLoader {
 		
 		if (exFlowFiles.length <= 0) {
 			logger.error("Execution flow " + exFlowName + " missing flow file.");
-			return null;
+			throw new ExecutorManagerException("Execution flow " + exFlowName + " missing flow file.");
 		}
 		File lastExFlow = exFlowFiles[exFlowFiles.length-1];
+		return lastExFlow;
+	}
+	
+	/**
+	 * Update Flow status
+	 * 
+	 * @param exDir
+	 * @param flow
+	 * @return
+	 * @throws ExecutorManagerException
+	 */
+	public static boolean updateFlowStatusFromFile(File exDir, ExecutableFlow flow) throws ExecutorManagerException {
+		File file = getLatestExecutableFlowDir(exDir);
+		int number =  getFlowUpdateNumber(file);
+		if (flow.getUpdateNumber() >= number) {
+			return false;
+		}
 		
-		Object exFlowObj = null;
+		Object exFlowObj = getFlowObjectFromFile(file);
+		flow.updateExecutableFlowFromObject(exFlowObj);
+		flow.setUpdateNumber(number);
+		
+		return true;
+	}
+	
+	/**
+	 * Write executable flow file
+	 * 
+	 * @param executionDir
+	 * @param flow
+	 * @param commitValue
+	 * @return
+	 * @throws ExecutorManagerException
+	 */
+	public static File writeExecutableFlowFile(File executionDir, ExecutableFlow flow, Integer commitValue) throws ExecutorManagerException {
+		// Write out the execution file
+		String flowFileName =  "_" + flow.getExecutionId() + ".flow";
+		if (commitValue != null) {
+			String countString = String.format("%05d", commitValue);
+			flowFileName += "." + countString;
+		}
+		
+		File flowFile = new File(executionDir, flowFileName);
+		if (flowFile.exists()) {
+			throw new ExecutorManagerException("The flow file " + flowFileName + " already exists. Race condition?");
+		}
+
+		File tempFlowFile = new File(executionDir, "_tmp" + flowFileName);
+		BufferedOutputStream out = null;
 		try {
-			exFlowObj = JSONUtils.parseJSONFromFile(lastExFlow);
+			logger.debug("Writing executable file " + flowFile);
+			out = new BufferedOutputStream(new FileOutputStream(tempFlowFile));
+			JSONUtils.toJSON(flow.toObject(), out, true);
 		} catch (IOException e) {
-			logger.error("Error loading execution flow " + exFlowName + ". Problems parsing json file.");
-			return null;
+			throw new ExecutorManagerException(e.getMessage(), e);
+		}
+		finally {
+			if (out != null) {
+				try {
+					out.close();
+				} catch (IOException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
 		}
 		
-		ExecutableFlow flow = ExecutableFlow.createExecutableFlowFromObject(exFlowObj);
-		flow.setExecutionPath(exDir.getPath());
-		return flow;
+		tempFlowFile.renameTo(flowFile);
+		return flowFile;
 	}
 	
+	/**
+	 *
+	 */
 	private static class PrefixFilter implements FileFilter {
 		private String prefix;
 
@@ -52,20 +177,5 @@ public class ExecutableFlowLoader {
 			return pathname.isFile() && !pathname.isHidden() && name.length() >= prefix.length() && name.startsWith(prefix);
 		}
 	}
-	
-	
-	private static class SuffixFilter implements FileFilter {
-		private String suffix;
-		private boolean filesOnly = false;
-
-		public SuffixFilter(String suffix, boolean filesOnly) {
-			this.suffix = suffix;
-		}
 
-		@Override
-		public boolean accept(File pathname) {
-			String name = pathname.getName();
-			return (pathname.isFile() || !filesOnly) && !pathname.isHidden() && name.length() >= suffix.length() && name.endsWith(suffix);
-		}
-	}
 }
diff --git a/src/java/azkaban/utils/JSONUtils.java b/src/java/azkaban/utils/JSONUtils.java
index 93e0cd2..d682dbe 100644
--- a/src/java/azkaban/utils/JSONUtils.java
+++ b/src/java/azkaban/utils/JSONUtils.java
@@ -1,6 +1,10 @@
 package azkaban.utils;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -56,6 +60,16 @@ public class JSONUtils {
 		}
 	}
 	
+	public static void toJSON(Object obj, File file) throws IOException {
+		toJSON(obj, file, false);
+	}
+	
+	public static void toJSON(Object obj, File file, boolean prettyPrint) throws IOException {
+		BufferedOutputStream stream = new BufferedOutputStream(new FileOutputStream(file));
+		toJSON(obj, stream, prettyPrint);
+		stream.close();
+	}
+	
 	public static Object parseJSONFromString(String json) throws IOException {
 		ObjectMapper mapper = new ObjectMapper();
 		JsonFactory factory = new JsonFactory();
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index 5fa9611..0c189b1 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -77,7 +77,7 @@ public class AzkabanExecutorServer {
 		String sharedToken = props.getString("executor.shared.token", "");
 
 		ServletHolder executorHolder = new ServletHolder(new ExecutorServlet(sharedToken));
-		root.addServlet(executorHolder, "/submit");
+		root.addServlet(executorHolder, "/executor");
 		root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, this);
 		runnerManager = new FlowRunnerManager(props);
 		
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index d14ed46..63deab0 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -103,14 +103,14 @@ public class AzkabanWebServer {
 	 * Constructor usually called by tomcat AzkabanServletContext to create the
 	 * initial server
 	 */
-	public AzkabanWebServer() {
+	public AzkabanWebServer() throws Exception {
 		this(loadConfigurationFromAzkabanHome());
 	}
 
 	/**
 	 * Constructor
 	 */
-	public AzkabanWebServer(Props props) {
+	public AzkabanWebServer(Props props) throws Exception {
 		this.props = props;
 		velocityEngine = configureVelocityEngine(props.getBoolean( VELOCITY_DEV_MODE_PARAM, false));
 		sessionCache = new SessionCache(props);
@@ -179,7 +179,7 @@ public class AzkabanWebServer {
 		return manager;
 	}
 
-	private ExecutorManager loadExecutorManager(Props props) {
+	private ExecutorManager loadExecutorManager(Props props) throws Exception {
 		ExecutorManager execManager = new ExecutorManager(props);
 		return execManager;
 	}
@@ -268,7 +268,7 @@ public class AzkabanWebServer {
 	 * 
 	 * @param args
 	 */
-	public static void main(String[] args) {
+	public static void main(String[] args) throws Exception {
 		OptionParser parser = new OptionParser();
 
 		OptionSpec<String> configDirectory = parser
diff --git a/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java b/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
index 7c5745a..b8fad67 100644
--- a/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
+++ b/src/java/azkaban/webapp/servlet/AzkabanServletContextListener.java
@@ -37,10 +37,15 @@ public class AzkabanServletContextListener implements ServletContextListener {
 	}
 
 	/**
-	 * Load the app
+	 * Load the app for use in non jetty containers.
 	 */
 	public void contextInitialized(ServletContextEvent event) {
-		this.app = new AzkabanWebServer();
+		try {
+			this.app = new AzkabanWebServer();
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
 
 		event.getServletContext().setAttribute(AZKABAN_SERVLET_CONTEXT_KEY, this.app);
 	}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 260d8fb..75b1ebc 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -1,6 +1,7 @@
 package azkaban.webapp.servlet;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.HashMap;
 
 import javax.servlet.ServletConfig;
@@ -12,6 +13,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.FlowRunnerManager;
 import azkaban.utils.Props;
@@ -50,7 +52,8 @@ public class ExecutorServlet extends HttpServlet {
 	protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
 		resp.setContentType(JSON_MIME_TYPE);
 		ObjectMapper mapper = new ObjectMapper();
-		mapper.writeValue(resp.getOutputStream(), obj);
+		OutputStream stream = resp.getOutputStream();
+		mapper.writeValue(stream, obj);
 	}
 
 	@Override
@@ -61,26 +64,43 @@ public class ExecutorServlet extends HttpServlet {
 		if (!token.equals(sharedToken)) {
 			respMap.put("error", "Mismatched token. Will not run.");
 		}
+		else if (!hasParam(req, "action")) {
+			respMap.put("error", "Parameter action not set");
+		}
 		else if (!hasParam(req, "execid")) {
 			respMap.put("error", "Parameter execid not set.");
 		}
-		else if (!hasParam(req, "execpath")) {
-			respMap.put("error", "Parameter execpath not set.");
-		}
 		else {
+			String action = getParam(req, "action");
 			String execid = getParam(req, "execid");
-			String execpath = getParam(req, "execpath");
 			
-			logger.info("Submitted " + execid + " with " + execpath);
-			try {
-				flowRunnerManager.submitFlow(execid, execpath);
-			} catch (ExecutorManagerException e) {
-				e.printStackTrace();
-				respMap.put("error", e.getMessage());
+			// Handle execute
+			if (action.equals("execute")) {
+				String execpath = getParam(req, "execpath");
+				
+				logger.info("Submitted " + execid + " with " + execpath);
+				try {
+					flowRunnerManager.submitFlow(execid, execpath);
+					respMap.put("status", "success");
+				} catch (ExecutorManagerException e) {
+					e.printStackTrace();
+					respMap.put("error", e.getMessage());
+				}
+			}
+			// Handle Status
+			else if (action.equals("status")) {
+				ExecutableFlow flow = flowRunnerManager.getExecutableFlow(execid);
+				if (flow == null) {
+					respMap.put("status", "notfound");
+				}
+				else {
+					respMap.put("status", flow.getStatus().toString());
+				}
 			}
 		}
 
 		writeJSON(resp, respMap);
+		resp.flushBuffer();
 	}
 	
 	@Override
diff --git a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
index 038ee96..4703822 100644
--- a/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/FlowExecutorServlet.java
@@ -207,7 +207,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 		Map<String, String> paramGroup = this.getParamGroup(req, "disabled");
 		for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
 			boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
-			exflow.setStatus(entry.getKey(), nodeDisabled ? Status.IGNORED : Status.READY);
+			exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
 		}
 		
 		// Create directory
@@ -237,6 +237,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 		
+
 		try {
 			executorManager.executeFlow(exflow);
 		} catch (ExecutorManagerException e) {
@@ -249,6 +250,7 @@ public class FlowExecutorServlet extends LoginAbstractAzkabanServlet {
 			ret.put("error", e.getMessage());
 			return;
 		}
+
 		String execId = exflow.getExecutionId();
 		
 		// The following is just a test for cleanup
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index c6affe7..480af43 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -1,6 +1,5 @@
 package azkaban.webapp.servlet;
 
-import java.awt.geom.Point2D;
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -197,7 +196,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		int length = Integer.valueOf(getParam(req, "length"));
 		
 		ArrayList<ExecutableFlow> exFlows = new ArrayList<ExecutableFlow>();
-		int total = executorManager.getExecutableFlowByProjectFlow(project.getName(), flowId, from, length, exFlows);
+		int total = executorManager.getExecutableFlows(project.getName(),  flowId, from, length, exFlows);
 		
 		ret.put("flow", flowId);
 		ret.put("total", total);