azkaban-uncached

Refactored the connection between the Client and server to

10/13/2012 2:04:51 AM

Details

diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
new file mode 100644
index 0000000..233128e
--- /dev/null
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -0,0 +1,34 @@
+package azkaban.executor;
+
+public interface ConnectorParams {
+	public static final String ACTION_PARAM = "action";
+	public static final String EXECID_PARAM = "execid";
+	public static final String SHAREDTOKEN_PARAM = "token";
+	public static final String USER_PARAM = "user";
+	
+	public static final String STATUS_ACTION = "status";
+	public static final String EXECUTE_ACTION = "execute";
+	public static final String CANCEL_ACTION = "cancel";
+	public static final String PAUSE_ACTION = "pause";
+	public static final String RESUME_ACTION = "resume";
+	public static final String PING_ACTION = "ping";
+	
+	public static final String START_PARAM = "start";
+	public static final String END_PARAM = "end";
+	public static final String STATUS_PARAM = "status";
+	public static final String NODES_PARAM = "nodes";
+	public static final String EXECPATH_PARAM = "execpath";
+	
+	public static final String RESPONSE_NOTFOUND = "notfound";
+	public static final String RESPONSE_ERROR = "error";
+	public static final String RESPONSE_SUCCESS = "success";
+	public static final String RESPONSE_ALIVE = "alive";
+	public static final String RESPONSE_UPDATETIME = "lasttime";
+	
+	public static final int NODE_NAME_INDEX = 0;
+	public static final int NODE_STATUS_INDEX = 1;
+	public static final int NODE_START_INDEX = 2;
+	public static final int NODE_END_INDEX = 3;
+
+	public static final String FORCED_FAILED_MARKER = ".failed";
+}
diff --git a/src/java/azkaban/executor/event/Event.java b/src/java/azkaban/executor/event/Event.java
index 555fd51..2b52f72 100644
--- a/src/java/azkaban/executor/event/Event.java
+++ b/src/java/azkaban/executor/event/Event.java
@@ -9,7 +9,8 @@ public class Event {
 		JOB_SUCCEEDED,
 		JOB_FAILED,
 		JOB_KILLED,
-		JOB_SKIPPED
+		JOB_SKIPPED,
+		ERROR
 	}
 	
 	private final Object runner;
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 73129d2..791df64 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -12,6 +12,7 @@ import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
+import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 
 public class ExecutableFlow {
@@ -19,7 +20,6 @@ public class ExecutableFlow {
 	private String flowId;
 	private String projectId;
 	private String executionPath;
-	private long lastCheckedTime;
 	
 	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
 	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
@@ -32,11 +32,11 @@ public class ExecutableFlow {
 	private long submitTime = -1;
 	private long startTime = -1;
 	private long endTime = -1;
+	private long updateTime = -1;
 	
 	private int updateNumber = 0;
 	private Status flowStatus = Status.READY;
 	private String submitUser;
-	private boolean submitted = false;
 	private boolean notifyOnFirstFailure = true;
 	private boolean notifyOnLastFailure = false;
 	
@@ -66,12 +66,12 @@ public class ExecutableFlow {
 	public ExecutableFlow() {
 	}
 	
-	public long getLastCheckedTime() {
-		return lastCheckedTime;
+	public long getUpdateTime() {
+		return updateTime;
 	}
 	
-	public void setLastCheckedTime(long lastCheckedTime) {
-		this.lastCheckedTime = lastCheckedTime;
+	public void setUpdateTime(long updateTime) {
+		this.updateTime = updateTime;
 	}
 	
 	public List<ExecutableNode> getExecutableNodes() {
@@ -117,8 +117,12 @@ public class ExecutableFlow {
 			targetNode.addInNode(edge.getSourceId());
 		}
 		
-		successEmails = new ArrayList<String>(flow.getSuccessEmails());
-		failureEmails = new ArrayList<String>(flow.getFailureEmails());
+		if (flow.getSuccessEmails() != null) {
+			successEmails = new ArrayList<String>(flow.getSuccessEmails());
+		}
+		if (flow.getFailureEmails() != null) {
+			failureEmails = new ArrayList<String>(flow.getFailureEmails());
+		}
 		flowProps.putAll(flow.getAllFlowProps());
 	}
 
@@ -293,9 +297,9 @@ public class ExecutableFlow {
 		exFlow.executionPath = (String)flowObj.get("executionPath");
 		exFlow.flowId = (String)flowObj.get("flowId");
 		exFlow.projectId = (String)flowObj.get("projectId");
-		exFlow.submitTime = getLongFromObject(flowObj.get("submitTime"));
-		exFlow.startTime = getLongFromObject(flowObj.get("startTime"));
-		exFlow.endTime = getLongFromObject(flowObj.get("endTime"));
+		exFlow.submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
+		exFlow.startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
+		exFlow.endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
 		exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
 		exFlow.submitUser = (String)flowObj.get("submitUser");
 		if (flowObj.containsKey("flowParameters")) {
@@ -340,21 +344,13 @@ public class ExecutableFlow {
 		return exFlow;
 	}
 	
-	private static long getLongFromObject(Object obj) {
-		if (obj instanceof Integer) {
-			return Long.valueOf((Integer)obj);
-		}
-		
-		return (Long)obj;
-	}
-	
 	@SuppressWarnings("unchecked")
 	public void updateExecutableFlowFromObject(Object obj) {
 		HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
 
-		submitTime = getLongFromObject(flowObj.get("submitTime"));
-		startTime = getLongFromObject(flowObj.get("startTime"));
-		endTime = getLongFromObject(flowObj.get("endTime"));
+		submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
+		startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
+		endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
 		flowStatus = Status.valueOf((String)flowObj.get("status"));
 		
 		List<Object> nodes = (List<Object>)flowObj.get("nodes");
@@ -390,14 +386,6 @@ public class ExecutableFlow {
 		this.submitUser = submitUser;
 	}
 
-	public boolean isSubmitted() {
-		return submitted;
-	}
-
-	public void setSubmitted(boolean submitted) {
-		this.submitted = submitted;
-	}
-
 	public void setPipelineLevel(int level) {
 		pipelineLevel = level;
 	}
@@ -514,8 +502,8 @@ public class ExecutableFlow {
 			exNode.inNodes.addAll( (List<String>)objMap.get("inNodes") );
 			exNode.outNodes.addAll( (List<String>)objMap.get("outNodes") );
 			
-			exNode.startTime = getLongFromObject(objMap.get("startTime"));
-			exNode.endTime = getLongFromObject(objMap.get("endTime"));
+			exNode.startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
+			exNode.endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
 			exNode.level = (Integer)objMap.get("level");
 			
 			exNode.flow = flow;
@@ -528,8 +516,8 @@ public class ExecutableFlow {
 			HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
 			status = Status.valueOf((String)objMap.get("status"));
 
-			startTime = getLongFromObject(objMap.get("startTime"));
-			endTime = getLongFromObject(objMap.get("endTime"));
+			startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
+			endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
 		}
 		
 		public long getStartTime() {
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 819d17b..03dace2 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -6,6 +6,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.Writer;
 import java.net.URI;
@@ -17,6 +18,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -38,8 +40,8 @@ import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
 
+import azkaban.executor.ExecutableFlow.ExecutableNode;
 import azkaban.executor.ExecutableFlow.Status;
-import azkaban.executor.ExecutorManager.ExecutionReference;
 import azkaban.flow.Flow;
 import azkaban.utils.ExecutableFlowLoader;
 import azkaban.utils.JSONUtils;
@@ -55,9 +57,8 @@ public class ExecutorManager {
 	private static final String ARCHIVE_DIR = ".archive";
 	private static Logger logger = Logger.getLogger(ExecutorManager.class);
 	// 30 seconds of retry before failure.
-	private static final long ACCESS_ERROR_THRESHOLD = 30000;
-	private static final int UPDATE_THREAD_MS = 1000;
-
+	private static final long ACCESS_ERROR_THRESHOLD_MS = 30000;
+	
 	// log read buffer.
 	private static final int LOG_BUFFER_READ_SIZE = 10*1024;
 	
@@ -69,10 +70,14 @@ public class ExecutorManager {
 	private String url = "localhost";
 	
 	private ConcurrentHashMap<String, ExecutableFlow> runningFlows = new ConcurrentHashMap<String, ExecutableFlow>();
+	private ConcurrentHashMap<String, ExecutionReference> runningReference = new ConcurrentHashMap<String, ExecutionReference>();
 	
 	private CacheManager manager = CacheManager.create();
 	private Cache recentFlowsCache;
 	private static final int LIVE_SECONDS = 600;
+	private Object BlockObj = new Object();
+	
+	ExecutingManagerUpdaterThread executingManager;
 	
 	public ExecutorManager(Props props) throws IOException, ExecutorManagerException {
 		basePath = new File(props.getString("execution.directory"));
@@ -102,7 +107,7 @@ public class ExecutorManager {
 		counter.set(0);
 		loadActiveExecutions();
 		
-		ExecutingManagerUpdaterThread executingManager = new ExecutingManagerUpdaterThread();
+		executingManager = new ExecutingManagerUpdaterThread();
 		executingManager.start();
 	}
 	
@@ -214,7 +219,6 @@ public class ExecutorManager {
 		Arrays.sort(archivePartitionsDir, new Comparator<File>() {
 			@Override
 			public int compare(File arg0, File arg1) {
-				// TODO Auto-generated method stub
 				return arg1.getName().compareTo(arg0.getName());
 			}});
 
@@ -237,7 +241,6 @@ public class ExecutorManager {
 							searchFlows.add(ref);
 						}
 					} catch (IOException e) {
-						// TODO Auto-generated catch block
 						e.printStackTrace();
 					}
 					
@@ -263,14 +266,19 @@ public class ExecutorManager {
 		for (File activeFlowDir: activeFlowDirs) {
 			if (activeFlowDir.isDirectory()) {
 				ExecutionReference reference = ExecutionReference.readFromDirectory(activeFlowDir);
+				if (reference.getExecutorUrl() == null) {
+					reference.setExecutorPort(portNumber);
+					reference.setExecutorUrl(url);
+				}
 				
 				ExecutableFlow flow = this.getFlowFromReference(reference);
 				if (flow == null) {
 					logger.error("Flow " + reference.getExecId() + " not found.");
 				}
-				flow.setLastCheckedTime(System.currentTimeMillis());
-				flow.setSubmitted(true);
+				reference.setLastCheckedTime(System.currentTimeMillis());
+
 				if (flow != null) {
+					runningReference.put(reference.getExecId(), reference);
 					runningFlows.put(flow.getExecutionId(), flow);
 				}
 			}
@@ -290,11 +298,14 @@ public class ExecutorManager {
 		// Find execution
 		File executionDir;
 		String executionId;
-		int count = counter.getAndIncrement() % 100000;
-		String countString = String.format("%05d", count);
+
+		int count = 0;
+
 		do {
-			executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + flowId;
+			String countString = String.format("%05d", count);
+			executionId = String.valueOf(System.currentTimeMillis()) + "." + countString + "." + projectId + "." + flowId;
 			executionDir = new File(projectExecutionDir, executionId);
+			count++;
 		}
 		while(executionDir.exists());
 		
@@ -321,6 +332,22 @@ public class ExecutorManager {
 			return flow;
 		}
 		
+		String[] split = executionId.split("\\.");
+		// get project file from split.
+		String projectId = split[2];
+		File projectPath = new File(basePath, projectId);
+		if (projectPath.exists()) {
+			// Execution path sometimes looks like timestamp.count.projectId.flowId. Except flowId could have ..
+			String flowId = executionId.substring(split[0].length() + split[1].length() + projectId.length() + 3);
+			File flowPath = new File(projectPath, flowId);
+			if (flowPath.exists()) {
+				File executionPath = new File(flowPath, executionId);
+				if (executionPath.exists()) {
+					return ExecutableFlowLoader.loadExecutableFlowFromDir(executionPath);
+				}
+			}
+		}
+		
 		// Check active
 		File baseActiveDir = new File(basePath, ACTIVE_DIR);
 		File referenceDir = new File(baseActiveDir, executionId);
@@ -355,7 +382,7 @@ public class ExecutorManager {
 		return null;
 	}
 	
-	private synchronized void addActiveExecutionReference(ExecutableFlow flow) throws ExecutorManagerException {
+	private synchronized ExecutionReference addActiveExecutionReference(ExecutableFlow flow) throws ExecutorManagerException {
 		File activeDirectory = new File(basePath, ACTIVE_DIR);
 		if (!activeDirectory.exists()) {
 			activeDirectory.mkdirs();
@@ -367,67 +394,191 @@ public class ExecutorManager {
 
 		// We don't really need to save the reference, 
 		ExecutionReference reference = new ExecutionReference(flow);
+		reference.setExecutorUrl(url);
+		reference.setExecutorPort(portNumber);
 		try {
 			reference.writeToDirectory(referenceDir);
 		} catch (IOException e) {
 			throw new ExecutorManagerException("Couldn't write execution to directory.", e);
 		}
-		runningFlows.put(flow.getExecutionId(), flow);
+
+		return reference;
 	}
 	
-	public void cancelFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+	private void forceFlowFailure(ExecutableFlow exFlow) throws ExecutorManagerException {
+		String logFileName = "_flow." + exFlow.getExecutionId() + ".log";
+		File executionDir = new File(exFlow.getExecutionPath());
+		
+		// Add a marker to the directory as an indicator to zombie processes that this is off limits.
+		File forcedFailed = new File(executionDir, ConnectorParams.FORCED_FAILED_MARKER);
+		if (!forcedFailed.exists()) {
+			try {
+				forcedFailed.createNewFile();
+			} catch (IOException e) {
+				logger.error("Error creating failed marker in execution directory",e);
+			}
+		}
+		
+		// Load last update
+		ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
+
+		// Return if already finished.
+		if (exFlow.getStatus() == Status.FAILED || 
+			exFlow.getStatus() == Status.SUCCEEDED || 
+			exFlow.getStatus() == Status.KILLED) {
+			return;
+		}
+
+		// Finish log file
+		File logFile = new File(executionDir, logFileName);
+		if (logFile.exists()) {
+			// Finally add to log
+			FileWriter writer = null;
+			try {
+				writer = new FileWriter(logFile, true);
+				writer.append("\n" + System.currentTimeMillis() + " ERROR: Can't reach executor. Killing Flow!!!!");
+			} catch (IOException e) {
+				if (writer != null) {
+					try {
+						writer.close();
+					} catch (IOException e1) {
+						e1.printStackTrace();
+					}
+				}
+			}
+		}
+
+		// We mark every untouched job with KILLED, and running jobs with FAILED.
+		long time = System.currentTimeMillis();
+		for (ExecutableNode node: exFlow.getExecutableNodes()) {
+			switch(node.getStatus()) {
+				case SUCCEEDED:
+				case FAILED:
+				case KILLED:
+				case SKIPPED:
+				case DISABLED:
+					continue;
+				case UNKNOWN:
+				case READY:
+					node.setStatus(Status.KILLED);
+					break;
+				default:
+					node.setStatus(Status.FAILED);
+					break;
+			}
+
+			if (node.getStartTime() == -1) {
+				node.setStartTime(time);
+			}
+			if (node.getEndTime() == -1) {
+				node.setEndTime(time);
+			}
+		}
+
+		if (exFlow.getEndTime() == -1) {
+			exFlow.setEndTime(time);
+		}
+		
+		exFlow.setStatus(Status.FAILED);
+	}
+
+	@SuppressWarnings("unchecked")
+	public void cancelFlow(String execId, String user) throws ExecutorManagerException {
 		logger.info("Calling cancel");
-		String response = null;
+		ExecutionReference reference = runningReference.get(execId);
+		if (reference == null) {
+			throw new ExecutorManagerException("Execution " + execId + " not running.");
+		}
+		
+		Map<String, Object> respObj = null;
 		try {
-			response = callExecutionServer("cancel", flow, user);
+			String response = callExecutorServer(reference, ConnectorParams.CANCEL_ACTION);
+			respObj = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
 		} catch (IOException e) {
 			e.printStackTrace();
 			throw new ExecutorManagerException("Error cancelling flow.", e);
 		}
+		
+		if (respObj.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+			throw new ExecutorManagerException((String)respObj.get(ConnectorParams.RESPONSE_ERROR));
+		}
 	}
-	
-	public void pauseFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+
+	@SuppressWarnings("unchecked")
+	public void pauseFlow(String execId, String user) throws ExecutorManagerException {
 		logger.info("Calling pause");
-		String response = null;
+		ExecutionReference reference = runningReference.get(execId);
+		if (reference == null) {
+			throw new ExecutorManagerException("Execution " + execId + " not running.");
+		}
+		
+		Map<String, Object> respObj = null;
 		try {
-			response = callExecutionServer("pause", flow, user);
+			String response = callExecutorServer(reference, ConnectorParams.PAUSE_ACTION);
+			respObj = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
 		} catch (IOException e) {
 			e.printStackTrace();
-			throw new ExecutorManagerException("Error cancelling flow.", e);
+			throw new ExecutorManagerException("Error pausing flow.", e);
+		}
+
+		if (respObj.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+			throw new ExecutorManagerException((String)respObj.get(ConnectorParams.RESPONSE_ERROR));
 		}
 	}
-	
-	public void resumeFlow(ExecutableFlow flow, String user) throws ExecutorManagerException {
+
+	@SuppressWarnings("unchecked")
+	public void resumeFlow(String execId, String user) throws ExecutorManagerException {
 		logger.info("Calling resume");
-		String response = null;
+		ExecutionReference reference = runningReference.get(execId);
+		if (reference == null) {
+			throw new ExecutorManagerException("Execution " + execId + " not running.");
+		}
+		
+		Map<String, Object> respObj = null;
 		try {
-			response = callExecutionServer("resume", flow, user);
+			String response = callExecutorServer(reference, ConnectorParams.RESUME_ACTION);
+			respObj = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
 		} catch (IOException e) {
 			e.printStackTrace();
-			throw new ExecutorManagerException("Error cancelling flow.", e);
+			throw new ExecutorManagerException("Error resuming flow.", e);
 		}
 
+		if (respObj.containsKey(ConnectorParams.RESPONSE_ERROR)) {
+			throw new ExecutorManagerException((String)respObj.get(ConnectorParams.RESPONSE_ERROR));
+		}
 	}
 	
-	private String callExecutionServer(String action, ExecutableFlow flow) throws IOException{
-		return callExecutionServer(action, flow, null);
+	private synchronized String callExecutorServer(ExecutionReference reference, String action) throws IOException {
+		return callExecutorServer(
+				reference.getExecutorUrl(),
+				reference.getExecutorPort(),
+				action,
+				reference.getExecId(),
+				reference.getExecPath(),
+				reference.userId);
 	}
 	
-	private String callExecutionServer(String action, ExecutableFlow flow, String user) throws IOException{
+	private synchronized String callExecutorServer(String url, int port, String action, String execid, String execPath, String user) throws IOException {
 		URIBuilder builder = new URIBuilder();
 		builder.setScheme("http")
 			.setHost(url)
-			.setPort(portNumber)
+			.setPort(port)
 			.setPath("/executor")
-			.setParameter("sharedToken", token)
-			.setParameter("action", action)
-			.setParameter("execid", flow.getExecutionId())
-			.setParameter("execpath", flow.getExecutionPath());
+			.setParameter(ConnectorParams.SHAREDTOKEN_PARAM, token)
+			.setParameter(ConnectorParams.ACTION_PARAM, action);
+
+		if (execid != null) {
+			builder.setParameter(ConnectorParams.EXECID_PARAM, execid);
+		}
 		
 		if (user != null) {
-			builder.setParameter("user", user);
+			builder.setParameter(ConnectorParams.USER_PARAM, user);
 		}
 		
+		if (execPath != null) {
+			builder.setParameter(ConnectorParams.EXECPATH_PARAM, execPath);
+		}
+
 		URI uri = null;
 		try {
 			uri = builder.build();
@@ -437,14 +588,12 @@ public class ExecutorManager {
 		
 		ResponseHandler<String> responseHandler = new BasicResponseHandler();
 		
-		logger.info("Remotely querying " + flow.getExecutionId() + " for status.");
 		HttpClient httpclient = new DefaultHttpClient();
 		HttpGet httpget = new HttpGet(uri);
 		String response = null;
 		try {
 			response = httpclient.execute(httpget, responseHandler);
 		} catch (IOException e) {
-			flow.setStatus(ExecutableFlow.Status.FAILED);
 			e.printStackTrace();
 			return response;
 		}
@@ -462,24 +611,30 @@ public class ExecutorManager {
 		
 		writeResourceFile(executionDir, flow);
 		ExecutableFlowLoader.writeExecutableFlowFile(executionDir, flow, null);
-		addActiveExecutionReference(flow);
-		flow.setLastCheckedTime(System.currentTimeMillis());
-		runningFlows.put(flow.getExecutionId(), flow);
+		ExecutionReference reference = addActiveExecutionReference(flow);
 		
 		logger.info("Setting up " + flow.getExecutionId() + " for execution.");
 		
 		String response;
 		try {
-			response = callExecutionServer("execute", flow);
+			response = callExecutorServer(reference, ConnectorParams.EXECUTE_ACTION);
+			reference.setSubmitted(true);
+			runningReference.put(flow.getExecutionId(), reference);
+			runningFlows.put(flow.getExecutionId(), flow);
 		} catch (IOException e) {
 			e.printStackTrace();
-			flow.setStatus(ExecutableFlow.Status.FAILED);
+			// Clean up.
+			forceFlowFailure(flow);
+			cleanFinishedJob(flow);
 			return;
 		}
 		
 		logger.debug("Submitted Response: " + response);
-		flow.setLastCheckedTime(System.currentTimeMillis());
-		flow.setSubmitted(true);
+
+		reference.setStartTime(System.currentTimeMillis());
+		synchronized(BlockObj) {
+			BlockObj.notify();
+		}
 	}
 	
 	private long readLog(File logFile, Writer writer, long startChar, long maxSize) {
@@ -608,7 +763,6 @@ public class ExecutorManager {
 				try {
 					out.close();
 				} catch (IOException e) {
-					// TODO Auto-generated catch block
 					e.printStackTrace();
 				}
 			}
@@ -725,22 +879,17 @@ public class ExecutorManager {
 		int index = execID.indexOf('.');
 		return execID.substring(0, index - 8);
 	}
-	
-	private void cleanFinishedJob(ExecutableFlow exFlow) throws ExecutorManagerException {
-		
+
+	private void cleanExecutionReferenceJob(ExecutionReference reference) throws ExecutorManagerException {
 		// Write final file
-		int updateNum = exFlow.getUpdateNumber();
-		updateNum++;
-		ExecutableFlowLoader.writeExecutableFlowFile(new File(exFlow.getExecutionPath()), exFlow, updateNum);
-		
-		String activeReferencePath = ACTIVE_DIR + File.separator + exFlow.getExecutionId(); 
+		String exId = reference.getExecId();
+		String activeReferencePath = ACTIVE_DIR + File.separator + exId; 
 		File activeDirectory = new File(basePath, activeReferencePath);
 		if (!activeDirectory.exists()) {
 			logger.error("WTF!! Active reference " + activeDirectory + " directory doesn't exist.");
 			throw new ExecutorManagerException("Active reference " + activeDirectory + " doesn't exists.");
 		}
 		
-		String exId = exFlow.getExecutionId();
 		String partitionVal = getExecutableReferencePartition(exId);
 		
 		String archiveDatePartition = ARCHIVE_DIR + File.separator + partitionVal;
@@ -749,7 +898,7 @@ public class ExecutorManager {
 			archivePartitionDir.mkdirs();
 		}
 
-		File archiveDirectory = new File(archivePartitionDir, exFlow.getExecutionId());
+		File archiveDirectory = new File(archivePartitionDir, exId);
 		if (archiveDirectory.exists()) {
 			logger.error("Archive reference already exists. Cleaning up.");
 			try {
@@ -766,7 +915,6 @@ public class ExecutorManager {
 			throw new ExecutorManagerException("Cannot create " + archiveDirectory);
 		}
 		
-		ExecutionReference reference = new ExecutionReference(exFlow);
 		try {
 			reference.writeToDirectory(archiveDirectory);
 		} catch (IOException e) {
@@ -779,126 +927,167 @@ public class ExecutorManager {
 		} catch (IOException e) {
 			throw new ExecutorManagerException("Cannot cleanup active directory " + activeDirectory);
 		}
+	
+		runningReference.remove(exId);
+	}
+	
+	private void cleanFinishedJob(ExecutableFlow exFlow) throws ExecutorManagerException {
+		// Write final file
+		int updateNum = exFlow.getUpdateNumber();
+		updateNum++;
+		String exId = exFlow.getExecutionId();
+		ExecutableFlowLoader.writeExecutableFlowFile(new File(exFlow.getExecutionPath()), exFlow, updateNum);
+
+		// Clean up reference
+		ExecutionReference reference = runningReference.get(exId);
+		if (reference != null) {
+			reference.setStartTime(exFlow.getStartTime());
+			reference.setEndTime(exFlow.getEndTime());
+			reference.setStatus(exFlow.getStatus());
+			cleanExecutionReferenceJob(reference);
+		}
 		
 		runningFlows.remove(exFlow.getExecutionId());
 		recentFlowsCache.put(new Element(exFlow.getExecutionId(), exFlow));
 		cleanupUnusedFiles(exFlow);
 	}
 	
-	/**
-	 * Thread that polls the executor for executing jobs.
-	 * It is also cleans up the flow execution files after it's done.
-	 */
 	private class ExecutingManagerUpdaterThread extends Thread {
 		private boolean shutdown = false;
-		private int updateTimeMs = UPDATE_THREAD_MS;
+		private int waitTimeIdleMs = 1000;
+		private int waitTimeMs = 100;
+		
+		@SuppressWarnings("unchecked")
 		public void run() {
-			while (!shutdown) {
-				ArrayList<ExecutableFlow> flows = new ArrayList<ExecutableFlow>(runningFlows.values());
-				for(ExecutableFlow exFlow : flows) {
-					if (!exFlow.isSubmitted()) {
+			while(!shutdown) {
+				for (ExecutionReference reference: runningReference.values()) {
+					// Don't do anything if not submitted
+					if (!reference.isSubmitted()) {
 						continue;
 					}
 					
-					File executionDir = new File(exFlow.getExecutionPath());
+					String execId = reference.getExecId();
+					ExecutableFlow flow = runningFlows.get(execId);
+					if (flow != null) {
+						// Why doesn't flow exist?
+					}
 					
+					File executionDir = new File(flow.getExecutionPath());
+					// The execution dir doesn't exist. So we clean up.
 					if (!executionDir.exists()) {
 						logger.error("WTF!! Execution dir " + executionDir + " doesn't exist!");
-						// @TODO probably should handle this error case somehow. Cleanup?
+						// Removing reference.
+						reference.setStatus(Status.FAILED);
+						try {
+							cleanExecutionReferenceJob(reference);
+						} catch (ExecutorManagerException e) {
+							logger.error("The execution dir " + executionDir.getPath() + " doesn't exist for " + reference.toRefString(), e);
+						}
 						continue;
 					}
-
-					// Query the executor service to see if the item is running.
-					String responseString = null;
+					
+					// Get status from the server. If the server response are errors, than we clean up after 30 seconds of errors.
+					HashMap<String,Object> map = null;
 					try {
-						responseString = callExecutionServer("status", exFlow);
+						String string = callExecutorServer(reference, ConnectorParams.STATUS_ACTION);
+						map = (HashMap<String,Object>)JSONUtils.parseJSONFromString(string);
+						reference.setLastCheckedTime(System.currentTimeMillis());
 					} catch (IOException e) {
-						e.printStackTrace();
-						// Connection issue. Backoff 1 sec.
-						synchronized(this) {
+						if (System.currentTimeMillis() - reference.getLastCheckedTime() > ACCESS_ERROR_THRESHOLD_MS) {
+							logger.error("Error: Can't connect to server." + reference.toRefString() + ". Might be dead. Cleaning up project.", e);
+							// Cleanup. Since we haven't seen anyone.
 							try {
-								wait(1000);
-							} catch (InterruptedException ie) {
+								forceFlowFailure(flow);
+								cleanFinishedJob(flow);
+							} catch (ExecutorManagerException e1) {
+								logger.error("Foreced Fail: Error while cleaning up flow and job. " + reference.toRefString(), e1);
 							}
+							
+							continue;
 						}
-						continue;
-					}
-					catch (Exception e) {
-						e.printStackTrace();
 					}
 					
-					Object executorResponseObj;
-					try {
-						executorResponseObj = JSONUtils.parseJSONFromString(responseString);
-					} catch (Exception e) {
-						// TODO Auto-generated catch block
-						e.printStackTrace();
+					// If the response from the server is an error, we print out the response and continue.
+					String error = (String)map.get(ConnectorParams.RESPONSE_ERROR);
+					if (error != null) {
+						logger.error("Server status response for " + reference.toRefString() + " was an error: " + error);
 						continue;
 					}
 					
-					@SuppressWarnings("unchecked")
-					HashMap<String, Object> response = (HashMap<String, Object>)executorResponseObj;
-					String status = (String)response.get("status");
-					
-					try {
-						ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
-					} catch (ExecutorManagerException e) {
-						// TODO Auto-generated catch block
-						e.printStackTrace();
-						continue;
+					// If not found, then we clean up.
+					String statusStr = (String)map.get(ConnectorParams.STATUS_PARAM);
+					boolean forceFail = false;
+					if (statusStr.equals(ConnectorParams.RESPONSE_NOTFOUND)) {
+						logger.info("Server status response for " + reference.toRefString() + " was 'notfound'. Cleaning up");
+						try {
+							ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, flow, true);
+							forceFail = true;
+						} catch (ExecutorManagerException e) {
+							logger.error("Error updating flow status " + flow.getExecutionId() + " from file.", e);
+							continue;
+						}
 					}
-
-					// If it's completed, and not running, we clean up.
-					if (exFlow.getStatus() == Status.SUCCEEDED || exFlow.getStatus() == Status.FAILED || exFlow.getStatus() == Status.KILLED) {
-						if (status.equals("notfound")) {
-							// Cleanup
-							logger.info("Flow " + exFlow.getExecutionId() + " has succeeded. Cleaning Up.");
+					else {
+						// It's found, so we check the status.
+						long time = JSONUtils.getLongFromObject(map.get(ConnectorParams.RESPONSE_UPDATETIME));
+						
+						if (time > flow.getUpdateTime()) {
 							try {
-								ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
-								cleanFinishedJob(exFlow);						
+								ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, flow, true);
+								// Update reference
+								reference.setStartTime(flow.getStartTime());
+								reference.setEndTime(flow.getEndTime());
+								reference.setStatus(flow.getStatus());
 							} catch (ExecutorManagerException e) {
-								e.printStackTrace();
-								continue;
+								logger.error("Error updating flow status " + flow.getExecutionId() + " from file.", e);
 							}
-							exFlow.setLastCheckedTime(System.currentTimeMillis());
+							
+							flow.setUpdateTime(time);
 						}
-						else {
-							logger.error("Flow " + exFlow.getExecutionId() + " has succeeded, but the Executor says its still running with msg: " + status);
-							if (System.currentTimeMillis() - exFlow.getLastCheckedTime() > ACCESS_ERROR_THRESHOLD) {
-								exFlow.setStatus(Status.FAILED);
-								exFlow.setEndTime(System.currentTimeMillis());
-								logger.error("It's been " + ACCESS_ERROR_THRESHOLD + " ms since last update. Auto-failing the job.");
+					}
+
+					switch(flow.getStatus()) {
+						case SUCCEEDED:
+						case FAILED:
+						case KILLED:
+							try {
+								cleanFinishedJob(flow);
+							} catch (ExecutorManagerException e) {
+								logger.error("Error while cleaning up flow and job. " + reference.toRefString(), e);
+							}
+							
+							break;
+						default:{
+							// We force the failure.
+							if (forceFail) {
+								try {
+									forceFlowFailure(flow);
+									cleanFinishedJob(flow);
+								} catch (ExecutorManagerException e) {
+									logger.error("Foreced Fail: Error while cleaning up flow and job. " + reference.toRefString(), e);
+								}
 							}
 						}
 					}
-					else {
-						// If it's not finished, and not running, we will fail it and clean up.
-						if (status.equals("notfound")) {
-							logger.error("Flow " + exFlow.getExecutionId() + " is running, but the Executor can't find it.");
-							if (System.currentTimeMillis() - exFlow.getLastCheckedTime() > ACCESS_ERROR_THRESHOLD) {
-								exFlow.setStatus(Status.FAILED);
-								exFlow.setEndTime(System.currentTimeMillis());
-								logger.error("It's been " + ACCESS_ERROR_THRESHOLD + " ms since last update. Auto-failing the job.");
-							}
+				}
+
+				// Change to rotating queue?
+				synchronized(BlockObj) {
+					try {
+						if (runningReference.isEmpty()) {
+							BlockObj.wait(waitTimeIdleMs);
 						}
 						else {
-							exFlow.setLastCheckedTime(System.currentTimeMillis());
-						}
-					}
-					
-					synchronized(this) {
-						try {
-							wait(updateTimeMs);
-						} catch (InterruptedException e) {
-							// TODO Auto-generated catch block
-							e.printStackTrace();
+							BlockObj.wait(waitTimeMs);
 						}
+					} catch (InterruptedException e) {
 					}
 				}
 			}
 		}
 	}
 	
+	
 	/**
 	 * Reference to a Flow Execution.
 	 * It allows us to search for Flow and Project with only the Execution Id, it references
@@ -913,11 +1102,18 @@ public class ExecutorManager {
 		private long startTime;
 		private long endTime;
 		private Status status;
-
+		private String executorUrl;
+		
+		private int executorPort;
+		private boolean isSubmitted = true;
+		private long lastCheckedTime = -1;
+		
 		public ExecutionReference() {
+			this.lastCheckedTime = System.currentTimeMillis();
 		}
 		
 		public ExecutionReference(ExecutableFlow flow) {
+			this();
 			this.execId = flow.getExecutionId();
 			this.projectId = flow.getProjectId();
 			this.flowId = flow.getFlowId();
@@ -927,6 +1123,7 @@ public class ExecutorManager {
 			this.startTime = flow.getStartTime();
 			this.endTime = flow.getEndTime();
 			this.status = flow.getStatus();
+			this.isSubmitted = false;
 		}
 		
 		private Object toObject() {
@@ -939,6 +1136,8 @@ public class ExecutorManager {
 			obj.put("startTime", startTime);
 			obj.put("endTime", endTime);
 			obj.put("status", status);
+			obj.put("executorUrl", executorUrl);
+			obj.put("executorPort", executorPort);
 			return obj;
 		}
 		
@@ -955,6 +1154,7 @@ public class ExecutorManager {
 				throw new IOException("Execution file execution.json does not exist.");
 			}
 			
+			@SuppressWarnings("unchecked")
 			HashMap<String, Object> obj = (HashMap<String, Object>)JSONUtils.parseJSONFromFile(file);
 			ExecutionReference reference = new ExecutionReference();
 			reference.execId = (String)obj.get("execId");
@@ -962,12 +1162,22 @@ public class ExecutorManager {
 			reference.flowId = (String)obj.get("flowId");
 			reference.userId = (String)obj.get("userId");
 			reference.execPath = (String)obj.get("execPath");
-			reference.startTime = getLongFromObject(obj.get("startTime"));
-			reference.endTime = getLongFromObject(obj.get("endTime"));
+			reference.startTime = JSONUtils.getLongFromObject(obj.get("startTime"));
+			reference.endTime = JSONUtils.getLongFromObject(obj.get("endTime"));
 			reference.status = Status.valueOf((String)obj.get("status"));
+			
+			if (obj.containsKey("executorUrl")) {
+				reference.executorUrl = (String)obj.get("executorUrl");
+				reference.executorPort = (Integer)obj.get("executorPort");
+			}
+
 			return reference;
 		}
 		
+		public String toRefString() {
+			return execId + ":" + executorUrl + ":" + executorPort;
+		}
+		
 		public String getExecId() {
 			return execId;
 		}
@@ -1016,15 +1226,38 @@ public class ExecutorManager {
 		public void setStatus(Status status) {
 			this.status = status;
 		}
-	}
-	
-	private static long getLongFromObject(Object obj) {
-		if (obj instanceof Integer) {
-			return Long.valueOf((Integer)obj);
+
+		public String getExecutorUrl() {
+			return executorUrl;
 		}
-		
-		return (Long)obj;
-	}
 
+		public void setExecutorUrl(String executorUrl) {
+			this.executorUrl = executorUrl;
+		}
+
+		public int getExecutorPort() {
+			return executorPort;
+		}
+
+		public void setExecutorPort(int port) {
+			this.executorPort = port;
+		}
+
+		public boolean isSubmitted() {
+			return isSubmitted;
+		}
+
+		public void setSubmitted(boolean isSubmitted) {
+			this.isSubmitted = isSubmitted;
+		}
+
+		public long getLastCheckedTime() {
+			return lastCheckedTime;
+		}
+
+		public void setLastCheckedTime(long lastCheckedTime) {
+			this.lastCheckedTime = lastCheckedTime;
+		}
+	}
 
 }
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 04f0991..51d8431 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -66,6 +66,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private Props flowOverrideProps = null;
 	
 	private FailureAction failedAction;
+	private boolean testMode = false;
+	private File failedMarker;
 	
 	public FlowRunner(ExecutableFlow flow) {
 		this.flow = flow;
@@ -79,6 +81,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 			flowOverrideProps = new Props(null, flow.getFlowParameters()); 
 		}
 		failedAction = flow.getFailureAction();
+		failedMarker = new File(basePath, ConnectorParams.FORCED_FAILED_MARKER);
+
 		createLogger();
 	}
 
@@ -201,13 +205,17 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 	@Override
 	public void run() {
+		if (testMode) {
+			logger.info("Running in testmode");
+		}
 		currentThread = Thread.currentThread();
 
 		flow.setStatus(Status.RUNNING);
 		flow.setStartTime(System.currentTimeMillis());
 		logger.info("Starting Flow");
 		this.fireEventListeners(Event.create(this, Type.FLOW_STARTED));
-
+		boolean forceFailed = false;
+		
 		// Load all shared props
 		try {
 			logger.info("Loading all shared properties");
@@ -252,7 +260,16 @@ public class FlowRunner extends EventHandler implements Runnable {
 					continue;
 				}
 			}
-
+			
+			if (failedMarker.exists()) {
+				logger.error("Looks like this job will be forced failed due to error.");
+				flow.setStatus(Status.FAILED);
+				forceFailed = true;
+				executorService.shutdownNow();
+				this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
+				return;
+			}
+			
 			if (runner != null) {
 				try {
 					ExecutableNode node = runner.getNode();
@@ -273,7 +290,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				}
 			}
 		}
-
+		
 		logger.info("Finishing up flow. Awaiting Termination");
 		executorService.shutdown();
 
@@ -329,6 +346,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 		Props jobProps = new Props(parentProps, propsFile);
 
 		JobRunner jobRunner = new JobRunner(node, jobProps, basePath);
+		jobRunner.setTestMode(testMode);
+		
 		jobRunner.addListener(listener);
 
 		return jobRunner;
@@ -506,6 +525,14 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 	}
 	
+	public boolean isTestMode() {
+		return testMode;
+	}
+
+	public void setTestMode(boolean testMode) {
+		this.testMode = testMode;
+	}
+
 	private class JobRunnerEventListener implements EventListener {
 		private FlowRunner flowRunner;
 
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 164479c..53b4b67 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -11,6 +11,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
 import org.apache.log4j.Logger;
 
 import azkaban.utils.Utils;
@@ -46,6 +52,14 @@ public class FlowRunnerManager {
 
 	private Props globalProps;
 
+	// Keep recent flows only one minute after it finished.
+	private CacheManager manager = CacheManager.create();
+	private Cache recentFlowsCache;
+	private static final int LIVE_SECONDS = 60;
+	private static final int RECENT_FLOWS_CACHE_SIZE = 100;
+	
+	private boolean testMode = false;
+	
 	public FlowRunnerManager(Props props, Props globalProps, Mailman mailer) {
 		this.mailer = mailer;
 
@@ -53,6 +67,13 @@ public class FlowRunnerManager {
 		this.clientHostname = props.getString("jetty.hostname", "localhost");
 		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
 
+		setupCache();
+		
+		testMode = props.getBoolean("test.mode", false);
+		if (testMode) {
+			logger.info("Running in testMode.");
+		}
+		
 		basePath = new File(props.getString("execution.directory"));
 		numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
 		executorService = Executors.newFixedThreadPool(numThreads);
@@ -62,6 +83,19 @@ public class FlowRunnerManager {
 		submitterThread.start();
 	}
 
+	private void setupCache() {
+		CacheConfiguration cacheConfig = new CacheConfiguration("recentFlowsCache",RECENT_FLOWS_CACHE_SIZE)
+				.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.FIFO)
+				.overflowToDisk(false)
+				.eternal(false)
+				.timeToLiveSeconds(LIVE_SECONDS)
+				.diskPersistent(false)
+				.diskExpiryThreadIntervalSeconds(0);
+
+		recentFlowsCache = new Cache(cacheConfig);
+		manager.addCache(recentFlowsCache);
+	}
+	
 	public void submitFlow(String id, String path) throws ExecutorManagerException {
 		// Load file and submit
 		logger.info("Flow " + id + " submitted with path " + path);
@@ -71,6 +105,8 @@ public class FlowRunnerManager {
 		flow.setExecutionPath(path);
 
 		FlowRunner runner = new FlowRunner(flow);
+		runner.setTestMode(testMode);
+
 		runningFlows.put(id, runner);
 		runner.setGlobalProps(globalProps);
 		runner.addListener(eventListener);
@@ -105,7 +141,11 @@ public class FlowRunnerManager {
 	public ExecutableFlow getExecutableFlow(String id) {
 		FlowRunner runner = runningFlows.get(id);
 		if (runner == null) {
-			return null;
+			Element elem = recentFlowsCache.get(id);
+			if (elem == null) {
+				return null;
+			}
+			return (ExecutableFlow)elem.getObjectValue();
 		}
 
 		return runner.getFlow();
@@ -157,6 +197,7 @@ public class FlowRunnerManager {
 
 				logger.info("Flow " + flow.getExecutionId() + " has finished.");
 				runningFlows.remove(flow.getExecutionId());
+				recentFlowsCache.put(new Element(flow.getExecutionId(), flow));
 			}
 		}
 	}
@@ -199,7 +240,9 @@ public class FlowRunnerManager {
 					body += (URL + "\n");
 				}
 
-				mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+				if (!testMode) {
+					mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+				}
 			} catch (UnknownHostException uhe) {
 				logger.error(uhe);
 			} catch (Exception e) {
@@ -224,7 +267,9 @@ public class FlowRunnerManager {
 					body += (URL + "\n");
 				}
 
-				mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+				if (!testMode) {
+					mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+				}
 			} catch (UnknownHostException uhe) {
 				logger.error(uhe);
 			} catch (Exception e) {
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index a1bdef0..5c79ea7 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -36,6 +36,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private Job job;
 	private String executionId = null;
+	private boolean testMode = false;
 	
 	private static final Object logCreatorLock = new Object();
 	
@@ -115,13 +116,24 @@ public class JobRunner extends EventHandler implements Runnable {
 		props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
 		job = JobWrappingFactory.getJobWrappingFactory().buildJobExecutor(node.getId(), props, logger);
 
-		try {
-			job.run();
-		} catch (Throwable e) {
-			succeeded = false;
-			node.setStatus(Status.FAILED);
-			logError("Job run failed!");
-			e.printStackTrace();
+		if (testMode) {
+			logInfo("Test Mode. Skipping.");
+			synchronized(this) {
+				try {
+					wait(5000);
+				} catch (InterruptedException e) {
+				}
+			}
+		}
+		else {
+			try {
+				job.run();
+			} catch (Throwable e) {
+				succeeded = false;
+				node.setStatus(Status.FAILED);
+				logError("Job run failed!");
+				e.printStackTrace();
+			}
 		}
 
 		node.setEndTime(System.currentTimeMillis());
@@ -175,4 +187,12 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 	}
 
+	public boolean isTestMode() {
+		return testMode;
+	}
+
+	public void setTestMode(boolean testMode) {
+		this.testMode = testMode;
+	}
+
 }
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index 8ff1785..e7571d7 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -169,6 +169,7 @@ public class ExecutableFlowLoader {
 		}
 		
 		tempFlowFile.renameTo(flowFile);
+		flow.setUpdateTime(System.currentTimeMillis());
 		return flowFile;
 	}
 	
diff --git a/src/java/azkaban/utils/JSONUtils.java b/src/java/azkaban/utils/JSONUtils.java
index 5d2c199..1207a21 100644
--- a/src/java/azkaban/utils/JSONUtils.java
+++ b/src/java/azkaban/utils/JSONUtils.java
@@ -138,4 +138,12 @@ public class JSONUtils {
 			return null;
 		}
 	}
+	
+	public static long getLongFromObject(Object obj) {
+		if (obj instanceof Integer) {
+			return Long.valueOf((Integer)obj);
+		}
+		
+		return (Long)obj;
+	}
 }
diff --git a/src/java/azkaban/webapp/AzkabanExecutorServer.java b/src/java/azkaban/webapp/AzkabanExecutorServer.java
index c7b1d6f..6afc9cc 100644
--- a/src/java/azkaban/webapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/webapp/AzkabanExecutorServer.java
@@ -39,6 +39,7 @@ import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
+import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.FlowRunnerManager;
@@ -71,7 +72,6 @@ public class AzkabanExecutorServer {
 	private File tempDir;
 	private Server server;
 	
-
 	/**
 	 * Constructor
 	 * 
@@ -263,13 +263,11 @@ public class AzkabanExecutorServer {
 		return null;
 	}
 	
-	public static class ExecutorServlet extends HttpServlet {
+	public static class ExecutorServlet extends HttpServlet implements ConnectorParams {
+		private static final long serialVersionUID = 1L;
 		private static final Logger logger = Logger.getLogger(ExecutorServlet.class.getName());
 		public static final String JSON_MIME_TYPE = "application/json";
-		
-		public enum State {
-			FAILED, SUCCEEDED, RUNNING, WAITING, IGNORED, READY
-		}
+
 		private String sharedToken;
 		private AzkabanExecutorServer application;
 		private FlowRunnerManager flowRunnerManager;
@@ -303,43 +301,42 @@ public class AzkabanExecutorServer {
 		public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 			HashMap<String,Object> respMap= new HashMap<String,Object>();
 			
-			String token = getParam(req, "sharedToken");
+			String token = getParam(req, SHAREDTOKEN_PARAM);
 			if (!token.equals(sharedToken)) {
 				respMap.put("error", "Mismatched token. Will not run.");
 			}
-			else if (!hasParam(req, "action")) {
+			else if (!hasParam(req, ACTION_PARAM)) {
 				respMap.put("error", "Parameter action not set");
 			}
-			else if (!hasParam(req, "execid")) {
-				respMap.put("error", "Parameter execid not set.");
-			}
 			else {
-				String action = getParam(req, "action");
-				String execid = getParam(req, "execid");
+				String action = getParam(req, ACTION_PARAM);
+				String execid = getParam(req, EXECID_PARAM, null);
+				String user = getParam(req, USER_PARAM, null);
 				
-				// Handle execute
-				if (action.equals("execute")) {
+				if (action.equals(PING_ACTION)) {
+					respMap.put("status", "alive");
+				}
+				else if (action.equals(EXECUTE_ACTION)) {
 					handleAjaxExecute(req, respMap, execid);
 				}
-				// Handle Status
-				else if (action.equals("status")) {
+				else if (action.equals(STATUS_ACTION)) {
 					handleAjaxFlowStatus(respMap, execid);
 				}
-				else if (action.equals("cancel")) {
-					String user = getParam(req, "user");
+				else if (action.equals(CANCEL_ACTION)) {
 					logger.info("Cancel called.");
 					handleAjaxCancel(respMap, execid, user);
 				}
-				else if (action.equals("pause")) {
-					String user = getParam(req, "user");
+				else if (action.equals(PAUSE_ACTION)) {
 					logger.info("Paused called.");
 					handleAjaxPause(respMap, execid, user);
 				}
-				else if (action.equals("resume")) {
-					String user = getParam(req, "user");
+				else if (action.equals(RESUME_ACTION)) {
 					logger.info("Resume called.");
 					handleAjaxResume(respMap, execid, user);
 				}
+				else {
+					respMap.put("error", "action: '" + action + "' not supported.");
+				}
 			}
 
 			writeJSON(resp, respMap);
@@ -347,55 +344,80 @@ public class AzkabanExecutorServer {
 		}
 		
 		private void handleAjaxExecute(HttpServletRequest req, Map<String, Object> respMap, String execid) throws ServletException {
-			String execpath = getParam(req, "execpath");
+			if (execid == null) {
+				respMap.put(RESPONSE_ERROR, EXECID_PARAM + " has not been set");
+				return;
+			}
+			
+			String execpath = getParam(req, EXECPATH_PARAM);
 			logger.info("Submitted " + execid + " with " + execpath);
 			try {
 				flowRunnerManager.submitFlow(execid, execpath);
-				respMap.put("status", "success");
+				respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
 			} catch (ExecutorManagerException e) {
 				e.printStackTrace();
-				respMap.put("error", e.getMessage());
+				respMap.put(RESPONSE_ERROR, e.getMessage());
 			}
 		}
 		
 		private void handleAjaxFlowStatus(Map<String, Object> respMap, String execid) {
+			if (execid == null) {
+				respMap.put(RESPONSE_ERROR, EXECID_PARAM + " has not been set");
+				return;
+			}
+			
 			ExecutableFlow flow = flowRunnerManager.getExecutableFlow(execid);
 			if (flow == null) {
-				respMap.put("status", "notfound");
+				respMap.put(STATUS_PARAM, RESPONSE_NOTFOUND);
 			}
 			else {
-				respMap.put("status", flow.getStatus().toString());
+				respMap.put(STATUS_PARAM, flow.getStatus().toString());
+				respMap.put(RESPONSE_UPDATETIME, flow.getUpdateTime());
 			}
 		}
 		
 		private void handleAjaxPause(Map<String, Object> respMap, String execid, String user) throws ServletException {
-
+			if (execid == null || user == null) {
+				respMap.put(RESPONSE_ERROR, "execid or user has not been set");
+				return;
+			}
+			
 			try {
 				flowRunnerManager.pauseFlow(execid, user);
-				respMap.put("status", "success");
+				respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
 			} catch (ExecutorManagerException e) {
 				e.printStackTrace();
-				respMap.put("error", e.getMessage());
+				respMap.put(RESPONSE_ERROR, e.getMessage());
 			}
 		}
 		
 		private void handleAjaxResume(Map<String, Object> respMap, String execid, String user) throws ServletException {
+			if (execid == null || user == null) {
+				respMap.put(RESPONSE_ERROR, "execid or user has not been set");
+				return;
+			}
+			
 			try {
 				flowRunnerManager.resumeFlow(execid, user);
-				respMap.put("status", "success");
+				respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
 			} catch (ExecutorManagerException e) {
 				e.printStackTrace();
-				respMap.put("error", e.getMessage());
+				respMap.put(RESPONSE_ERROR, e.getMessage());
 			}
 		}
 		
 		private void handleAjaxCancel(Map<String, Object> respMap, String execid, String user) throws ServletException {
+			if (execid == null || user == null) {
+				respMap.put(RESPONSE_ERROR, "execid or user has not been set");
+				return;
+			}
+			
 			try {
 				flowRunnerManager.cancelFlow(execid, user);
-				respMap.put("status", "success");
+				respMap.put(STATUS_PARAM, RESPONSE_SUCCESS);
 			} catch (ExecutorManagerException e) {
 				e.printStackTrace();
-				respMap.put("error", e.getMessage());
+				respMap.put(RESPONSE_ERROR, e.getMessage());
 			}
 		}
 		
@@ -419,6 +441,15 @@ public class AzkabanExecutorServer {
 			else
 				return p;
 		}
+		
+		public String getParam(HttpServletRequest request, String name, String defaultVal ) {
+			String p = request.getParameter(name);
+			if (p == null) {
+				return defaultVal;
+			}
+
+			return p;
+		}
 	}
 
 }
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index ac000b5..751cea4 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -118,6 +118,24 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	}
 
 	/**
+	 * Retrieves the param from the http servlet request.
+	 * 
+	 * @param request
+	 * @param name
+	 * @param default
+	 * 
+	 * @return
+	 */
+	public String getParam(HttpServletRequest request, String name, String defaultVal){
+		String p = request.getParameter(name);
+		if (p == null) {
+			return defaultVal;
+		}
+		return p;
+	}
+
+	
+	/**
 	 * Returns the param and parses it into an int. Will throw an exception if
 	 * not found, or a parse error if the type is incorrect.
 	 * 
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 86ec0ba..f9bb3f6 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -415,7 +415,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 		
 		try {
-			executorManager.cancelFlow(exFlow, user.getUserId());
+			executorManager.cancelFlow(exFlow.getExecutionId(), user.getUserId());
 		} catch (ExecutorManagerException e) {
 			ret.put("error", e.getMessage());
 		}
@@ -444,7 +444,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 
 		try {
-			executorManager.pauseFlow(exFlow, user.getUserId());
+			executorManager.pauseFlow(exFlow.getExecutionId(), user.getUserId());
 		} catch (ExecutorManagerException e) {
 			ret.put("error", e.getMessage());
 		}
@@ -457,7 +457,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 
 		try {
-			executorManager.resumeFlow(exFlow, user.getUserId());
+			executorManager.resumeFlow(exFlow.getExecutionId(), user.getUserId());
 		} catch (ExecutorManagerException e) {
 			ret.put("resume", e.getMessage());
 		}
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 89486ec..62f5f70 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -21,19 +21,15 @@ import azkaban.webapp.session.Session;
  * Abstract Servlet that handles auto login when the session hasn't been
  * verified.
  */
-public abstract class LoginAbstractAzkabanServlet extends
-		AbstractAzkabanServlet {
+public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger logger = Logger
-			.getLogger(LoginAbstractAzkabanServlet.class.getName());
+	private static final Logger logger = Logger.getLogger(LoginAbstractAzkabanServlet.class.getName());
 	private static final String SESSION_ID_NAME = "azkaban.browser.session.id";
 
 	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp)
-			throws ServletException, IOException {
-
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 		// Set session id
 		Session session = getSessionFromRequest(req);
 		if (hasParam(req, "logout")) {
@@ -89,7 +85,6 @@ public abstract class LoginAbstractAzkabanServlet extends
 	}
 
 	private void handleLogin(HttpServletRequest req, HttpServletResponse resp, String errorMsg) throws ServletException, IOException {
-
 		Page page = newPage(req, resp,
 				"azkaban/webapp/servlet/velocity/login.vm");
 		if (errorMsg != null) {
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 211ea49..cb917f1 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -23,14 +23,11 @@ import org.joda.time.ReadablePeriod;
 import org.joda.time.Seconds;
 import org.joda.time.format.DateTimeFormat;
 
-import azkaban.executor.ExecutorManager.ExecutionReference;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
-import azkaban.project.ProjectManagerException;
 import azkaban.user.User;
 import azkaban.user.Permission.Type;
-import azkaban.webapp.servlet.HistoryServlet.PageSelection;
 import azkaban.webapp.session.Session;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.ScheduledFlow;
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 90e59be..9d88d40 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1929,7 +1929,7 @@ td .status.FAILED {
 }
 
 td .status.READY {
-	background-color: #C82123;
+	background-color: #CCC;
 }
 
 td .status.RUNNING {
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index f18211f..7ba5aff 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -131,8 +131,12 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 					alert(data.error);
 				}
 				else {
-					$('#successEmails').val(data.successEmails.join());
-					$('#failureEmails').val(data.failureEmails.join());
+					if (data.successEmails) {
+						$('#successEmails').val(data.successEmails.join());
+					}
+					if (data.failureEmails) {
+						$('#failureEmails').val(data.failureEmails.join());
+					}
 					
 					if (data.failureAction) {
 						$('#failureAction').val(data.failureAction);