azkaban-memoizeit

Changes

Details

diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index afe9248..4ba5889 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -43,14 +43,19 @@ public class LocalFlowWatcher extends FlowWatcher {
 					Object data = event.getData();
 					if (data instanceof ExecutableNode) {
 						ExecutableNode node = (ExecutableNode)data;
-						handleJobFinished(node.getJobId(), node.getStatus());
+						
+						if (node.getId()) {
+							
+						}
+						
+						handleJobFinished(node.getId(), node.getStatus());
 					}
 				}
 				else if (event.getRunner() instanceof JobRunner) {
 					JobRunner runner = (JobRunner)event.getRunner();
 					ExecutableNode node = runner.getNode();
 					
-					handleJobFinished(node.getJobId(), node.getStatus());
+					handleJobFinished(node.getId(), node.getStatus());
 				}
 			}
 			else if (event.getType() == Type.FLOW_FINISHED) {
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
index ec60025..cecf1bc 100644
--- a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -1,6 +1,6 @@
 package azkaban.execapp.event;
 
-import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
@@ -11,7 +11,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
 	
 	private int execId;
 	private ExecutorLoader loader;
-	private ExecutableFlow flow;
+	private ExecutableFlowBase flow;
 	private RemoteUpdaterThread thread;
 	private boolean isShutdown = false;
 	
@@ -46,7 +46,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
 		@Override
 		public void run() {
 			do {
-				ExecutableFlow updateFlow = null;
+				ExecutableFlowBase updateFlow = null;
 				try {
 					updateFlow = loader.fetchExecutableFlow(execId);
 				} catch (ExecutorManagerException e) {
@@ -63,7 +63,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
 					flow.setUpdateTime(updateFlow.getUpdateTime());
 					
 					for (ExecutableNode node : flow.getExecutableNodes()) {
-						String jobId = node.getJobId();
+						String jobId = node.getId();
 						ExecutableNode newNode = updateFlow.getExecutableNode(jobId);
 						long updateTime = node.getUpdateTime();
 						node.setUpdateTime(newNode.getUpdateTime());
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index d342f0f..f73b88c 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -16,7 +16,7 @@ import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 
 import azkaban.executor.ConnectorParams;
-import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
@@ -215,7 +215,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			long updateTime = JSONUtils.getLongFromObject(updateTimesList.get(i));
 			int execId = (Integer)execIDList.get(i);
 			
-			ExecutableFlow flow = flowRunnerManager.getExecutableFlow(execId);
+			ExecutableFlowBase flow = flowRunnerManager.getExecutableFlow(execId);
 			if (flow == null) {
 				Map<String, Object> errorResponse = new HashMap<String,Object>();
 				errorResponse.put(RESPONSE_ERROR, "Flow does not exist");
@@ -243,7 +243,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 	}
 	
 	private void handleAjaxFlowStatus(Map<String, Object> respMap, int execid) {
-		ExecutableFlow flow = flowRunnerManager.getExecutableFlow(execid);
+		ExecutableFlowBase flow = flowRunnerManager.getExecutableFlow(execid);
 		if (flow == null) {
 			respMap.put(STATUS_PARAM, RESPONSE_NOTFOUND);
 		}
diff --git a/src/java/azkaban/execapp/FlowBlocker.java b/src/java/azkaban/execapp/FlowBlocker.java
new file mode 100644
index 0000000..2d64073
--- /dev/null
+++ b/src/java/azkaban/execapp/FlowBlocker.java
@@ -0,0 +1,7 @@
+package azkaban.execapp;
+
+import azkaban.executor.ExecutableNode;
+
+public interface FlowBlocker {
+	public boolean readyToRun(ExecutableNode node);
+}
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index d38aa47..38ece4a 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -24,6 +24,7 @@ import azkaban.execapp.event.EventHandler;
 import azkaban.execapp.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
@@ -71,7 +72,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
 	private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
 	private Map<String, JobRunner> activeJobRunners = new ConcurrentHashMap<String, JobRunner>();
-	
+
 	// Used for pipelining
 	private Integer pipelineLevel = null;
 	private Integer pipelineExecId = null;
@@ -314,24 +315,24 @@ public class FlowRunner extends EventHandler implements Runnable {
 								Props outputProps = collectOutputProps(node);
 								node.setStatus(Status.QUEUED);
 								JobRunner runner = createJobRunner(node, outputProps);
-								logger.info("Submitting job " + node.getJobId() + " to run.");
+								logger.info("Submitting job " + node.getId() + " to run.");
 								try {
 									executorService.submit(runner);
-									jobRunners.put(node.getJobId(), runner);
-									activeJobRunners.put(node.getJobId(), runner);
+									jobRunners.put(node.getId(), runner);
+									activeJobRunners.put(node.getId(), runner);
 								} catch (RejectedExecutionException e) {
 									logger.error(e);
 								};
 								
 							} // If killed, then auto complete and KILL
 							else if (node.getStatus() == Status.KILLED) {
-								logger.info("Killing " + node.getJobId() + " due to prior errors.");
+								logger.info("Killing " + node.getId() + " due to prior errors.");
 								node.setStartTime(currentTime);
 								node.setEndTime(currentTime);
 								fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
 							} // If disabled, then we auto skip
 							else if (node.getStatus() == Status.DISABLED) {
-								logger.info("Skipping disabled job " + node.getJobId() + ".");
+								logger.info("Skipping disabled job " + node.getId() + ".");
 								node.setStartTime(currentTime);
 								node.setEndTime(currentTime);
 								node.setStatus(Status.SKIPPED);
@@ -422,6 +423,26 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return jobsToRun;
 	}
 
+	private List<ExecutableNode> findReadyJobsToRun(ExecutableFlowBase flow) {
+		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
+		for (ExecutableNode node : flow.getExecutableNodes()) {
+			if (Status.isStatusFinished(node.getStatus())) {
+				continue;
+			}
+			else {
+				// Check the dependencies to see if execution conditions are met,
+				// and what the status should be set to.
+				Status impliedStatus = getImpliedStatus(node);
+				if (getImpliedStatus(node) != null) {
+					node.setStatus(impliedStatus);
+					jobsToRun.add(node);
+				}
+			}
+		}
+		
+		return jobsToRun;
+	}
+	
 	private boolean isFlowFinished() {
 		if (!activeJobRunners.isEmpty()) {
 			return false;
@@ -453,7 +474,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 	
 	private JobRunner createJobRunner(ExecutableNode node, Props previousOutput) {
-		String source = node.getJobPropsSource();
+		String source = node.getJobSource();
 		String propsSource = node.getPropsSource();
 
 		// If no properties are set, we just set the global properties.
@@ -481,11 +502,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		// load the override props if any
 		try {
-			prop = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getJobId()+".jor");
+			prop = projectLoader.fetchProjectProperty(flow.getProjectId(), flow.getVersion(), node.getId()+".jor");
 		}
 		catch(ProjectManagerException e) {
 			e.printStackTrace();
-			logger.error("Error loading job override property for job " + node.getJobId());
+			logger.error("Error loading job override property for job " + node.getId());
 		}
 		if(prop == null) {
 			// if no override prop, load the original one on disk
@@ -493,7 +514,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				prop = new Props(null, path);				
 			} catch (IOException e) {
 				e.printStackTrace();
-				logger.error("Error loading job file " + source + " for job " + node.getJobId());
+				logger.error("Error loading job file " + source + " for job " + node.getId());
 			}
 		}
 		// setting this fake source as this will be used to determine the location of log files.
@@ -594,7 +615,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			ArrayList<String> failures = new ArrayList<String>();
 			for (ExecutableNode node: flow.getExecutableNodes()) {
 				if (node.getStatus() == Status.FAILED) {
-					failures.add(node.getJobId());
+					failures.add(node.getId());
 				}
 				else if (node.getStatus() == Status.KILLED) {
 					node.setStartTime(-1);
@@ -620,7 +641,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 					// Resets the status and increments the attempt number
 					node.resetForRetry();
 					reEnableDependents(node);
-					logger.info("Re-enabling job " + node.getJobId() + " attempt " + node.getAttempt());
+					logger.info("Re-enabling job " + node.getId() + " attempt " + node.getAttempt());
 				}
 				else {
 					logger.error("Cannot retry job " + jobId + " since it hasn't run yet. User " + user);
@@ -734,12 +755,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 			else if (event.getType() == Type.JOB_FINISHED) {
 				synchronized(mainSyncObj) {
 					ExecutableNode node = runner.getNode();
-					activeJobRunners.remove(node.getJobId());
+					activeJobRunners.remove(node.getId());
 					
-					logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
+					logger.info("Job Finished " + node.getId() + " with status " + node.getStatus());
 					if (runner.getOutputProps() != null) {
-						logger.info("Job " + node.getJobId() + " had output props.");
-						jobOutputProps.put(node.getJobId(), runner.getOutputProps());
+						logger.info("Job " + node.getId() + " had output props.");
+						jobOutputProps.put(node.getId(), runner.getOutputProps());
 					}
 					
 					updateFlow();
@@ -747,14 +768,14 @@ public class FlowRunner extends EventHandler implements Runnable {
 					if (node.getStatus() == Status.FAILED) {
 						// Retry failure if conditions are met.
 						if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
-							logger.info("Job " + node.getJobId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
+							logger.info("Job " + node.getId() + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
 							node.setDelayedExecution(runner.getRetryBackoff());
 							node.resetForRetry();
 						}
 						else {
 							if (!runner.isCancelled() && runner.getRetries() > 0) {
 					
-								logger.info("Job " + node.getJobId() + " has run out of retry attempts");
+								logger.info("Job " + node.getId() + " has run out of retry attempts");
 								// Setting delayed execution to 0 in case this is manually re-tried.
 								node.setDelayedExecution(0);
 							}
@@ -796,7 +817,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	public File getJobLogFile(String jobId, int attempt) {
 		ExecutableNode node = flow.getExecutableNode(jobId);
-		File path = new File(execDir, node.getJobPropsSource());
+		File path = new File(execDir, node.getJobSource());
 		
 		String logFileName = JobRunner.createLogFileName(execId, jobId, attempt);
 		File logFile = new File(path.getParentFile(), logFileName);
@@ -810,7 +831,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	public File getJobMetaDataFile(String jobId, int attempt) {
 		ExecutableNode node = flow.getExecutableNode(jobId);
-		File path = new File(execDir, node.getJobPropsSource());
+		File path = new File(execDir, node.getJobSource());
 		
 		String metaDataFileName = JobRunner.createMetaDataFileName(execId, jobId, attempt);
 		File metaDataFile = new File(path.getParentFile(), metaDataFileName);
diff --git a/src/java/azkaban/execapp/FlowRunnerHelper.java b/src/java/azkaban/execapp/FlowRunnerHelper.java
new file mode 100644
index 0000000..7c8be47
--- /dev/null
+++ b/src/java/azkaban/execapp/FlowRunnerHelper.java
@@ -0,0 +1,97 @@
+package azkaban.execapp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+
+public class FlowRunnerHelper {
+	public static boolean isFlowFinished(ExecutableFlow flow) {
+		for (String end: flow.getEndNodes()) {
+			ExecutableNode node = flow.getExecutableNode(end);
+			if (!Status.isStatusFinished(node.getStatus()) ) {
+				return false;
+			}
+		}
+		
+		return true;
+	}
+	
+	public static List<ExecutableNode> findReadyJobsToRun(ExecutableFlowBase flow) {
+		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
+		for (ExecutableNode node : flow.getExecutableNodes()) {
+			if (Status.isStatusFinished(node.getStatus())) {
+				continue;
+			}
+			else {
+				// Check the dependencies to see if execution conditions are met,
+				// and what the status should be set to.
+				Status impliedStatus = getImpliedStatus(node);
+				if (impliedStatus != null) {
+					node.setStatus(impliedStatus);
+					jobsToRun.add(node);
+				}
+				else if (node instanceof ExecutableFlowBase && node.getStatus() == Status.RUNNING) {
+					// We want to seek into a running flow
+				}
+			}
+		}
+		
+		return jobsToRun;
+	}
+	
+	public static Status getImpliedStatus(ExecutableNode node) {
+		switch(node.getStatus()) {
+			case FAILED:
+			case KILLED:
+			case SKIPPED:
+			case SUCCEEDED:
+			case FAILED_SUCCEEDED:
+			case QUEUED:
+			case RUNNING:
+				return null;
+			default:
+				break;
+		}
+		
+		ExecutableFlowBase flow = node.getParentFlow();
+		
+		boolean shouldKill = false;
+		for (String dependency : node.getInNodes()) {
+			ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
+			
+			Status depStatus = dependencyNode.getStatus();
+			switch (depStatus) {
+			case FAILED:
+			case KILLED:
+				shouldKill = true;
+			case SKIPPED:
+			case SUCCEEDED:
+			case FAILED_SUCCEEDED:
+				continue;
+			case RUNNING:
+			case QUEUED:
+			case DISABLED:
+				return null;
+			default:
+				// Return null means it's not ready to run.
+				return null;
+			}
+		}
+		
+		if (shouldKill || flow.getStatus() == Status.KILLED || flow.getStatus() == Status.FAILED) {
+			return Status.KILLED;
+		}
+		
+		// If it's disabled but ready to run, we want to make sure it continues being disabled.
+		if (node.getStatus() == Status.DISABLED) {
+			return Status.DISABLED;
+		}
+		
+		// All good to go, ready to run.
+		return Status.READY;
+	}
+}
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index a363284..c4ea32f 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -67,6 +67,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private Job job;
 	private int executionId = -1;
+	private String jobId;
 	
 	private static final Object logCreatorLock = new Object();
 	private Object syncObject = new Object();
@@ -91,7 +92,9 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.props = props;
 		this.node = node;
 		this.workingDir = workingDir;
-		this.executionId = node.getExecutionId();
+		
+		this.executionId = node.getParentFlow().getExecutionId();
+		this.jobId = node.getId();
 		this.loader = loader;
 		this.jobtypeManager = jobtypeManager;
 	}
@@ -115,10 +118,10 @@ public class JobRunner extends EventHandler implements Runnable {
 		this.pipelineLevel = pipelineLevel;
 
 		if (this.pipelineLevel == 1) {
-			pipelineJobs.add(node.getJobId());
+			pipelineJobs.add(node.getId());
 		}
 		else if (this.pipelineLevel == 2) {
-			pipelineJobs.add(node.getJobId());
+			pipelineJobs.add(node.getId());
 			pipelineJobs.addAll(node.getOutNodes());
 		}
 	}
@@ -142,11 +145,11 @@ public class JobRunner extends EventHandler implements Runnable {
 	private void createLogger() {
 		// Create logger
 		synchronized (logCreatorLock) {
-			String loggerName = System.currentTimeMillis() + "." + executionId + "." + node.getJobId();
+			String loggerName = System.currentTimeMillis() + "." + this.executionId + "." + this.jobId;
 			logger = Logger.getLogger(loggerName);
 
 			// Create file appender
-			String logName = createLogFileName(node.getExecutionId(), node.getJobId(), node.getAttempt());
+			String logName = createLogFileName(this.executionId, this.jobId, node.getAttempt());
 			logFile = new File(workingDir, logName);
 			String absolutePath = logFile.getAbsolutePath();
 
@@ -158,7 +161,7 @@ public class JobRunner extends EventHandler implements Runnable {
 				jobAppender = fileAppender;
 				logger.addAppender(jobAppender);
 			} catch (IOException e) {
-				flowLogger.error("Could not open log file in " + workingDir + " for job " + node.getJobId(), e);
+				flowLogger.error("Could not open log file in " + workingDir + " for job " + this.jobId, e);
 			}
 		}
 	}
@@ -175,13 +178,13 @@ public class JobRunner extends EventHandler implements Runnable {
 			node.setUpdateTime(System.currentTimeMillis());
 			loader.updateExecutableNode(node);
 		} catch (ExecutorManagerException e) {
-			flowLogger.error("Could not update job properties in db for " + node.getJobId(), e);
+			flowLogger.error("Could not update job properties in db for " + this.jobId, e);
 		}
 	}
 	
 	@Override
 	public void run() {
-		Thread.currentThread().setName("JobRunner-" + node.getJobId() + "-" + executionId);
+		Thread.currentThread().setName("JobRunner-" + this.jobId + "-" + executionId);
 		
 		if (node.getStatus() == Status.DISABLED) {
 			node.setStartTime(System.currentTimeMillis());
@@ -220,7 +223,7 @@ public class JobRunner extends EventHandler implements Runnable {
 					}
 				}
 				if (!blockingStatus.isEmpty()) {
-					logger.info("Pipeline job " + node.getJobId() + " waiting on " + blockedList + " in execution " + watcher.getExecId());
+					logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
 					
 					for(BlockingStatus bStatus: blockingStatus) {
 						logger.info("Waiting on pipelined job " + bStatus.getJobId());
@@ -253,7 +256,7 @@ public class JobRunner extends EventHandler implements Runnable {
 						this.wait(delayStartMs);
 						logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
 					} catch (InterruptedException e) {
-						logger.error("Job " + node.getJobId() + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
+						logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
 					}
 				}
 				
@@ -286,7 +289,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			
 			node.setEndTime(System.currentTimeMillis());
 
-			logInfo("Finishing job " + node.getJobId() + " at " + node.getEndTime());
+			logInfo("Finishing job " + this.jobId + " at " + node.getEndTime());
 
 			closeLogger();
 			writeStatus();
@@ -304,13 +307,13 @@ public class JobRunner extends EventHandler implements Runnable {
 					Arrays.sort(files, Collections.reverseOrder());
 					
 					
-					loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), files);
+					loader.uploadLogFile(executionId, this.jobId, node.getAttempt(), files);
 				} catch (ExecutorManagerException e) {
-					flowLogger.error("Error writing out logs for job " + node.getJobId(), e);
+					flowLogger.error("Error writing out logs for job " + this.jobId, e);
 				}
 			}
 			else {
-				flowLogger.info("Log file for job " + node.getJobId() + " is null");
+				flowLogger.info("Log file for job " + this.jobId + " is null");
 			}
 		}
 		fireEvent(Event.create(this, Type.JOB_FINISHED));
@@ -340,13 +343,13 @@ public class JobRunner extends EventHandler implements Runnable {
 			}
 
 			if (node.getAttempt() > 0) {
-				logInfo("Starting job " + node.getJobId() + " attempt " + node.getAttempt() + " at " + node.getStartTime());
+				logInfo("Starting job " + this.jobId + " attempt " + node.getAttempt() + " at " + node.getStartTime());
 			}
 			else {
-				logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
+				logInfo("Starting job " + this.jobId + " at " + node.getStartTime());
 			}
 			props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
-			props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, node.getJobId(), node.getAttempt()));
+			props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
 			node.setStatus(Status.RUNNING);
 
 			// Ability to specify working directory
@@ -357,13 +360,13 @@ public class JobRunner extends EventHandler implements Runnable {
 			if(props.containsKey("user.to.proxy")) {
 				String jobProxyUser = props.getString("user.to.proxy");
 				if(proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
-					logger.error("User " + jobProxyUser + " has no permission to execute this job " + node.getJobId() + "!");
+					logger.error("User " + jobProxyUser + " has no permission to execute this job " + this.jobId + "!");
 					return false;
 				}
 			}
 			
 			try {
-				job = jobtypeManager.buildJobExecutor(node.getJobId(), props, logger);
+				job = jobtypeManager.buildJobExecutor(this.jobId, props, logger);
 			}
 			catch (JobTypeManagerException e) {
 				logger.error("Failed to build job type, skipping this job");
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 152a546..6141959 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -13,84 +13,66 @@
  * License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package azkaban.executor;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import azkaban.executor.ExecutableNode.Attempt;
-import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
-import azkaban.flow.Node;
+import azkaban.project.Project;
 import azkaban.utils.JSONUtils;
 
-public class ExecutableFlow {
+public class ExecutableFlow extends ExecutableFlowBase {
+	public static final String EXECUTIONID_PARAM = "executionId";
+	public static final String EXECUTIONPATH_PARAM ="executionPath";
+	public static final String EXECUTIONOPTIONS_PARAM ="executionOptions";
+	public static final String PROJECTID_PARAM ="projectId";
+	public static final String SCHEDULEID_PARAM ="scheduleId";
+	public static final String SUBMITUSER_PARAM = "submitUser";
+	public static final String SUBMITTIME_PARAM = "submitUser";
+	public static final String VERSION_PARAM = "version";
+	public static final String PROXYUSERS_PARAM = "proxyUsers";
+	
 	private int executionId = -1;
-	private String flowId;
 	private int scheduleId = -1;
 	private int projectId;
 	private int version;
-
-	private String executionPath;
-	
-	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
-	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
-	private ArrayList<String> startNodes;
-	private ArrayList<String> endNodes;
-
 	private long submitTime = -1;
-	private long startTime = -1;
-	private long endTime = -1;
-	private long updateTime = -1;
-
-	private Status flowStatus = Status.READY;
 	private String submitUser;
+	private String executionPath;
+	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
 	
 	private HashSet<String> proxyUsers = new HashSet<String>();
 	private ExecutionOptions executionOptions;
 	
-	public ExecutableFlow(Flow flow) {
-		this.projectId = flow.getProjectId();
+	public ExecutableFlow(Project project, Flow flow) {
+		this.projectId = project.getId();
+		this.version = project.getVersion();
 		this.scheduleId = -1;
-		this.flowId = flow.getId();
-		this.version = flow.getVersion();
-		this.setFlow(flow);
+
+		this.setFlow(project, flow);
 	}
 	
-	public ExecutableFlow(int executionId, Flow flow) {
-		this.projectId = flow.getProjectId();
-		this.scheduleId = -1;
-		this.flowId = flow.getId();
-		this.version = flow.getVersion();
-		this.executionId = executionId;
-		
-		this.setFlow(flow);
+	public ExecutableFlow(Flow flow) {
+		this.setFlow(null, flow);
 	}
 	
 	public ExecutableFlow() {
 	}
-	
-	public long getUpdateTime() {
-		return updateTime;
-	}
-	
-	public void setUpdateTime(long updateTime) {
-		this.updateTime = updateTime;
-	}
-	
-	public List<ExecutableNode> getExecutableNodes() {
-		return new ArrayList<ExecutableNode>(executableNodes.values());
+
+	@Override
+	public String getId() {
+		return getFlowId();
 	}
 	
-	public ExecutableNode getExecutableNode(String id) {
-		return executableNodes.get(id);
+	@Override
+	public ExecutableFlow getExecutableFlow() {
+		return this;
 	}
 	
 	public Collection<FlowProps> getFlowProps() {
@@ -113,74 +95,16 @@ public class ExecutableFlow {
 		return executionOptions;
 	}
 	
-	private void setFlow(Flow flow) {
+	private void setFlow(Project project, Flow flow) {
+		super.setFlow(project, flow, null);
 		executionOptions = new ExecutionOptions();
-		
-		for (Node node: flow.getNodes()) {
-			String id = node.getId();
-			ExecutableNode exNode = new ExecutableNode(node, this);
-			executableNodes.put(id, exNode);
-		}
-		
-		for (Edge edge: flow.getEdges()) {
-			ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
-			ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
-			
-			sourceNode.addOutNode(edge.getTargetId());
-			targetNode.addInNode(edge.getSourceId());
-		}
-		
+
 		if (flow.getSuccessEmails() != null) {
 			executionOptions.setSuccessEmails(flow.getSuccessEmails());
 		}
 		if (flow.getFailureEmails() != null) {
 			executionOptions.setFailureEmails(flow.getFailureEmails());
 		}
-		flowProps.putAll(flow.getAllFlowProps());
-	}
-
-	public List<String> getStartNodes() {
-		if (startNodes == null) {
-			startNodes = new ArrayList<String>();
-			for (ExecutableNode node: executableNodes.values()) {
-				if (node.getInNodes().isEmpty()) {
-					startNodes.add(node.getJobId());
-				}
-			}
-		}
-		
-		return startNodes;
-	}
-	
-	public List<String> getEndNodes() {
-		if (endNodes == null) {
-			endNodes = new ArrayList<String>();
-			for (ExecutableNode node: executableNodes.values()) {
-				if (node.getOutNodes().isEmpty()) {
-					endNodes.add(node.getJobId());
-				}
-			}
-		}
-		
-		return endNodes;
-	}
-	
-	public boolean setNodeStatus(String nodeId, Status status) {
-		ExecutableNode exNode = executableNodes.get(nodeId);
-		if (exNode == null) {
-			return false;
-		}
-		exNode.setStatus(status);
-		return true;
-	}
-
-	public void setProxyNodes(int externalExecutionId, String nodeId) {
-		ExecutableNode exNode = executableNodes.get(nodeId);
-		if (exNode == null) {
-			return;
-		}
-		
-		exNode.setExternalExecutionId(externalExecutionId);
 	}
 	
 	public int getExecutionId() {
@@ -189,20 +113,9 @@ public class ExecutableFlow {
 
 	public void setExecutionId(int executionId) {
 		this.executionId = executionId;
-		
-		for(ExecutableNode node: executableNodes.values()) {
-			node.setExecutionId(executionId);
-		}
-	}
-
-	public String getFlowId() {
-		return flowId;
-	}
-
-	public void setFlowId(String flowId) {
-		this.flowId = flowId;
 	}
 
+	@Override
 	public int getProjectId() {
 		return projectId;
 	}
@@ -227,254 +140,94 @@ public class ExecutableFlow {
 		this.executionPath = executionPath;
 	}
 	
-	public long getStartTime() {
-		return startTime;
+	public String getSubmitUser() {
+		return submitUser;
 	}
-	
-	public void setStartTime(long time) {
-		this.startTime = time;
+
+	public void setSubmitUser(String submitUser) {
+		this.submitUser = submitUser;
 	}
 	
-	public long getEndTime() {
-		return endTime;
+	@Override
+	public int getVersion() {
+		return version;
 	}
-	
-	public void setEndTime(long time) {
-		this.endTime = time;
+
+	public void setVersion(int version) {
+		this.version = version;
 	}
 	
 	public long getSubmitTime() {
 		return submitTime;
 	}
 	
-	public void setSubmitTime(long time) {
-		this.submitTime = time;
-	}
-	
-	public Status getStatus() {
-		return flowStatus;
-	}
-
-	public void setStatus(Status flowStatus) {
-		this.flowStatus = flowStatus;
+	public void setSubmitTime(long submitTime) {
+		this.submitTime = submitTime;
 	}
 	
 	public Map<String,Object> toObject() {
 		HashMap<String, Object> flowObj = new HashMap<String, Object>();
-		flowObj.put("type", "executableflow");
-		flowObj.put("executionId", executionId);
-		flowObj.put("executionPath", executionPath);
-		flowObj.put("flowId", flowId);
-		flowObj.put("projectId", projectId);
-		
-		if(scheduleId >= 0) {
-			flowObj.put("scheduleId", scheduleId);
-		}
-		flowObj.put("submitTime", submitTime);
-		flowObj.put("startTime", startTime);
-		flowObj.put("endTime", endTime);
-		flowObj.put("status", flowStatus.toString());
-		flowObj.put("submitUser", submitUser);
-		flowObj.put("version", version);
+		fillMapFromExecutable(flowObj);
 		
-		flowObj.put("executionOptions", this.executionOptions.toObject());
-		flowObj.put("version", version);
+		flowObj.put(EXECUTIONID_PARAM, executionId);
+		flowObj.put(EXECUTIONPATH_PARAM, executionPath);		
+		flowObj.put(PROJECTID_PARAM, projectId);
 		
-		ArrayList<Object> props = new ArrayList<Object>();
-		for (FlowProps fprop: flowProps.values()) {
-			HashMap<String, Object> propObj = new HashMap<String, Object>();
-			String source = fprop.getSource();
-			String inheritedSource = fprop.getInheritedSource();
-			
-			propObj.put("source", source);
-			if (inheritedSource != null) {
-				propObj.put("inherited", inheritedSource);
-			}
-			props.add(propObj);
+		if(scheduleId >= 0) {
+			flowObj.put(SCHEDULEID_PARAM, scheduleId);
 		}
-		flowObj.put("properties", props);
+
+		flowObj.put(SUBMITUSER_PARAM, submitUser);
+		flowObj.put(VERSION_PARAM, version);
 		
-		ArrayList<Object> nodes = new ArrayList<Object>();
-		for (ExecutableNode node: executableNodes.values()) {
-			nodes.add(node.toObject());
-		}
-		flowObj.put("nodes", nodes);
+		flowObj.put(EXECUTIONOPTIONS_PARAM, this.executionOptions.toObject());
+		flowObj.put(VERSION_PARAM, version);
 		
 		ArrayList<String> proxyUserList = new ArrayList<String>(proxyUsers);
-		flowObj.put("proxyUsers", proxyUserList);
+		flowObj.put(PROXYUSERS_PARAM, proxyUserList);
 
-		return flowObj;
-	}
-
-	public Object toUpdateObject(long lastUpdateTime) {
-		Map<String, Object> updateData = new HashMap<String,Object>();
-		updateData.put("execId", this.executionId);
-		updateData.put("status", this.flowStatus.getNumVal());
-		updateData.put("startTime", this.startTime);
-		updateData.put("endTime", this.endTime);
-		updateData.put("updateTime", this.updateTime);
-		
-		List<Map<String,Object>> updatedNodes = new ArrayList<Map<String,Object>>();
-		for (ExecutableNode node: executableNodes.values()) {
-			
-			if (node.getUpdateTime() > lastUpdateTime) {
-				Map<String, Object> updatedNodeMap = new HashMap<String,Object>();
-				updatedNodeMap.put("jobId", node.getJobId());
-				updatedNodeMap.put("status", node.getStatus().getNumVal());
-				updatedNodeMap.put("startTime", node.getStartTime());
-				updatedNodeMap.put("endTime", node.getEndTime());
-				updatedNodeMap.put("updateTime", node.getUpdateTime());
-				updatedNodeMap.put("attempt", node.getAttempt());
-				
-				if (node.getAttempt() > 0) {
-					ArrayList<Map<String,Object>> pastAttempts = new ArrayList<Map<String,Object>>();
-					for (Attempt attempt: node.getPastAttemptList()) {
-						pastAttempts.add(attempt.toObject());
-					}
-					updatedNodeMap.put("pastAttempts", pastAttempts);
-				}
-				
-				updatedNodes.add(updatedNodeMap);
-			}
-		}
-		
-		updateData.put("nodes", updatedNodes);
-		return updateData;
-	}
-	
-	@SuppressWarnings("unchecked")
-	public void applyUpdateObject(Map<String, Object> updateData) {
-		List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get("nodes");
-		for (Map<String,Object> node: updatedNodes) {
-			String jobId = (String)node.get("jobId");
-			Status status = Status.fromInteger((Integer)node.get("status"));
-			long startTime = JSONUtils.getLongFromObject(node.get("startTime"));
-			long endTime = JSONUtils.getLongFromObject(node.get("endTime"));
-			long updateTime = JSONUtils.getLongFromObject(node.get("updateTime"));
-			
-			ExecutableNode exNode = executableNodes.get(jobId);
-			exNode.setEndTime(endTime);
-			exNode.setStartTime(startTime);
-			exNode.setUpdateTime(updateTime);
-			exNode.setStatus(status);
-			
-			int attempt = 0;
-			if (node.containsKey("attempt")) {
-				attempt = (Integer)node.get("attempt");
-				if (attempt > 0) {
-					exNode.updatePastAttempts((List<Object>)node.get("pastAttempts"));
-				}
-			}
-			
-			exNode.setAttempt(attempt);
-		}
-		
-		this.flowStatus = Status.fromInteger((Integer)updateData.get("status"));
+		flowObj.put(SUBMITTIME_PARAM, submitTime);
 		
-		this.startTime = JSONUtils.getLongFromObject(updateData.get("startTime"));
-		this.endTime = JSONUtils.getLongFromObject(updateData.get("endTime"));
-		this.updateTime = JSONUtils.getLongFromObject(updateData.get("updateTime"));
+		return flowObj;
 	}
 	
 	@SuppressWarnings("unchecked")
 	public static ExecutableFlow createExecutableFlowFromObject(Object obj) {
 		ExecutableFlow exFlow = new ExecutableFlow();
-		
 		HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
-		exFlow.executionId = (Integer)flowObj.get("executionId");
-		exFlow.executionPath = (String)flowObj.get("executionPath");
-		exFlow.flowId = (String)flowObj.get("flowId");
-		exFlow.projectId = (Integer)flowObj.get("projectId");
-		if (flowObj.containsKey("scheduleId")) {
-			exFlow.scheduleId = (Integer)flowObj.get("scheduleId");
+		
+		exFlow.fillExecutableFromMapObject(flowObj);
+		exFlow.executionId = (Integer)flowObj.get(EXECUTIONID_PARAM);
+		exFlow.executionPath = (String)flowObj.get(EXECUTIONPATH_PARAM);
+
+		exFlow.projectId = (Integer)flowObj.get(PROJECTID_PARAM);
+		if (flowObj.containsKey(SCHEDULEID_PARAM)) {
+			exFlow.scheduleId = (Integer)flowObj.get(SCHEDULEID_PARAM);
 		}
-		exFlow.submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
-		exFlow.startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
-		exFlow.endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
-		exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
-		exFlow.submitUser = (String)flowObj.get("submitUser");
-		exFlow.version = (Integer)flowObj.get("version");
+		exFlow.submitUser = (String)flowObj.get(SUBMITUSER_PARAM);
+		exFlow.version = (Integer)flowObj.get(VERSION_PARAM);
 		
-		if (flowObj.containsKey("executionOptions")) {
-			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get("executionOptions"));
+		exFlow.submitTime = JSONUtils.getLongFromObject(flowObj.get(SUBMITTIME_PARAM));
+		
+		if (flowObj.containsKey(EXECUTIONOPTIONS_PARAM)) {
+			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get(EXECUTIONOPTIONS_PARAM));
 		}
 		else {
-			// for backawards compatibility should remove in a few versions.
+			// for backwards compatibility should remove in a few versions.
 			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj);
 		}
 		
-		// Copy nodes
-		List<Object> nodes = (List<Object>)flowObj.get("nodes");
-		for (Object nodeObj: nodes) {
-			ExecutableNode node = ExecutableNode.createNodeFromObject(nodeObj, exFlow);
-			exFlow.executableNodes.put(node.getJobId(), node);
-		}
-
-		List<Object> properties = (List<Object>)flowObj.get("properties");
-		for (Object propNode : properties) {
-			HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
-			String source = (String)fprop.get("source");
-			String inheritedSource = (String)fprop.get("inherited");
-			
-			FlowProps flowProps = new FlowProps(inheritedSource, source);
-			exFlow.flowProps.put(source, flowProps);
-		}
-		
-		if(flowObj.containsKey("proxyUsers")) {
-			ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
+		if(flowObj.containsKey(PROXYUSERS_PARAM)) {
+			ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get(PROXYUSERS_PARAM);
 			exFlow.addAllProxyUsers(proxyUserList);
 		}
 		
 		return exFlow;
 	}
 	
-	@SuppressWarnings("unchecked")
-	public void updateExecutableFlowFromObject(Object obj) {
-		HashMap<String, Object> flowObj = (HashMap<String,Object>)obj;
-
-		submitTime = JSONUtils.getLongFromObject(flowObj.get("submitTime"));
-		startTime = JSONUtils.getLongFromObject(flowObj.get("startTime"));
-		endTime = JSONUtils.getLongFromObject(flowObj.get("endTime"));
-		flowStatus = Status.valueOf((String)flowObj.get("status"));
-		
-		List<Object> nodes = (List<Object>)flowObj.get("nodes");
-		for (Object nodeObj: nodes) {
-			HashMap<String, Object> nodeHash= (HashMap<String, Object>)nodeObj;
-			String nodeId = (String)nodeHash.get("id");
-			ExecutableNode node = executableNodes.get(nodeId);
-			if (nodeId == null) {
-				throw new RuntimeException("Node " + nodeId + " doesn't exist in flow.");
-			}
-			
-			node.updateNodeFromObject(nodeObj);
-		}
-	}
-	
-	public Set<String> getSources() {
-		HashSet<String> set = new HashSet<String>();
-		for (ExecutableNode exNode: executableNodes.values()) {
-			set.add(exNode.getJobPropsSource());
-		}
-		
-		for (FlowProps props: flowProps.values()) {
-			set.add(props.getSource());
-		}
-		return set;
-	}
-	
-	public String getSubmitUser() {
-		return submitUser;
-	}
-
-	public void setSubmitUser(String submitUser) {
-		this.submitUser = submitUser;
-	}
-	
-	public int getVersion() {
-		return version;
-	}
-
-	public void setVersion(int version) {
-		this.version = version;
+	public Map<String, Object> toUpdateObject(long lastUpdateTime) {
+		Map<String, Object> updateData = super.toUpdateObject(lastUpdateTime);
+		updateData.put(EXECUTIONID_PARAM, this.executionId);
+		return updateData;
 	}
-}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
new file mode 100644
index 0000000..311c33b
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.executor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.flow.Edge;
+import azkaban.flow.Flow;
+import azkaban.flow.FlowProps;
+import azkaban.flow.Node;
+import azkaban.flow.SpecialJobTypes;
+import azkaban.project.Project;
+
+public class ExecutableFlowBase extends ExecutableNode {
+	public static final String FLOW_ID_PARAM = "flowId";
+	public static final String NODES_PARAM = "nodes";
+	public static final String PROPERTIES_PARAM = "properties";
+	public static final String SOURCE_PARAM = "source";
+	public static final String INHERITED_PARAM = "inherited";
+	
+	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
+	private ArrayList<String> startNodes;
+	private ArrayList<String> endNodes;
+	
+	private HashMap<String, FlowProps> flowProps = new HashMap<String, FlowProps>();
+	private String flowId;
+	
+	public ExecutableFlowBase(Project project, Node node, Flow flow, ExecutableFlowBase parent) {
+		super(node, parent);
+
+		setFlow(project, flow, parent);
+	}
+	
+	public ExecutableFlowBase() {
+	}
+	
+	public int getExecutionId() {
+		if (this.getParentFlow() != null) {
+			return this.getParentFlow().getExecutionId();
+		}
+		
+		return -1;
+ 	}
+	
+	public int getProjectId() {
+		if (this.getParentFlow() != null) {
+			return this.getParentFlow().getProjectId();
+		}
+		
+		return -1;
+	}
+	
+	public int getVersion() {
+		if (this.getParentFlow() != null) {
+			return this.getParentFlow().getVersion();
+		}
+		
+		return -1;
+	}
+	
+	public String getFlowId() {
+		return flowId;
+	}
+	
+	public String getNestedId() {
+		if (this.getParentFlow() != null) {
+			return this.getParentFlow().getNestedId() + ":" + getId();
+		}
+		
+		return getId();
+	}
+	
+	protected void setFlow(Project project, Flow flow, ExecutableFlowBase parent) {
+		this.flowId = flow.getId();
+		
+		for (Node node: flow.getNodes()) {
+			String id = node.getId();
+			if (node.getType().equals(SpecialJobTypes.EMBEDDED_FLOW_TYPE)) {
+				String embeddedFlowId = node.getEmbeddedFlowId();
+				Flow subFlow = project.getFlow(embeddedFlowId);
+				
+				ExecutableFlowBase embeddedFlow = new ExecutableFlowBase(project, node, subFlow, parent);
+				executableNodes.put(id, embeddedFlow);
+			}
+			else {
+				ExecutableNode exNode = new ExecutableNode(node, parent);
+				executableNodes.put(id, exNode);
+			}
+		}
+		
+		for (Edge edge: flow.getEdges()) {
+			ExecutableNode sourceNode = executableNodes.get(edge.getSourceId());
+			ExecutableNode targetNode = executableNodes.get(edge.getTargetId());
+			
+			sourceNode.addOutNode(edge.getTargetId());
+			targetNode.addInNode(edge.getSourceId());
+		}
+	}
+	
+	public List<ExecutableNode> getExecutableNodes() {
+		return new ArrayList<ExecutableNode>(executableNodes.values());
+	}
+	
+	public ExecutableNode getExecutableNode(String id) {
+		return executableNodes.get(id);
+	}
+	
+	public List<String> getStartNodes() {
+		if (startNodes == null) {
+			startNodes = new ArrayList<String>();
+			for (ExecutableNode node: executableNodes.values()) {
+				if (node.getInNodes().isEmpty()) {
+					startNodes.add(node.getId());
+				}
+			}
+		}
+		
+		return startNodes;
+	}
+	
+	public List<String> getEndNodes() {
+		if (endNodes == null) {
+			endNodes = new ArrayList<String>();
+			for (ExecutableNode node: executableNodes.values()) {
+				if (node.getOutNodes().isEmpty()) {
+					endNodes.add(node.getId());
+				}
+			}
+		}
+		
+		return endNodes;
+	}
+	
+	public Map<String,Object> toObject() {
+		Map<String,Object> mapObj = new HashMap<String,Object>();
+		fillMapFromExecutable(mapObj);
+		
+		return mapObj;
+	}
+	
+	protected void fillMapFromExecutable(Map<String,Object> flowObjMap) {
+		super.fillMapFromExecutable(flowObjMap);
+		
+		flowObjMap.put(FLOW_ID_PARAM, flowId);
+		
+		ArrayList<Object> nodes = new ArrayList<Object>();
+		for (ExecutableNode node: executableNodes.values()) {
+			nodes.add(node.toObject());
+		}
+		flowObjMap.put(NODES_PARAM, nodes);
+		
+		// Flow properties
+		ArrayList<Object> props = new ArrayList<Object>();
+		for (FlowProps fprop: flowProps.values()) {
+			HashMap<String, Object> propObj = new HashMap<String, Object>();
+			String source = fprop.getSource();
+			String inheritedSource = fprop.getInheritedSource();
+			
+			propObj.put(SOURCE_PARAM, source);
+			if (inheritedSource != null) {
+				propObj.put(INHERITED_PARAM, inheritedSource);
+			}
+			props.add(propObj);
+		}
+		flowObjMap.put(PROPERTIES_PARAM, props);
+	}
+
+	/**
+	 * Using the parameters in the map created from a json file, fill the results of this node
+	 */
+	@SuppressWarnings("unchecked")
+	public void fillExecutableFromMapObject(Map<String,Object> flowObjMap) {
+		super.fillExecutableFromMapObject(flowObjMap);
+		
+		this.flowId = (String)flowObjMap.get(FLOW_ID_PARAM);
+		
+		List<Object> nodes = (List<Object>)flowObjMap.get(NODES_PARAM);
+		if (nodes != null) {
+			for (Object nodeObj: nodes) {
+				Map<String,Object> nodeObjMap = (Map<String,Object>)nodeObj;
+				
+				String type = (String)nodeObjMap.get(TYPE_PARAM);
+				if (type.equals(SpecialJobTypes.EMBEDDED_FLOW_TYPE)) {
+					ExecutableFlowBase exFlow = new ExecutableFlowBase();
+					exFlow.fillExecutableFromMapObject(nodeObjMap);
+					exFlow.setParentFlow(this);
+					
+					executableNodes.put(exFlow.getId(), exFlow);
+				}
+				else {
+					ExecutableNode exJob = new ExecutableNode();
+					exJob.fillExecutableFromMapObject(nodeObjMap);
+					exJob.setParentFlow(this);
+					
+					executableNodes.put(exJob.getId(), exJob);
+				}
+			}
+		}
+		
+		List<Object> properties = (List<Object>)flowObjMap.get(PROPERTIES_PARAM);
+		for (Object propNode : properties) {
+			HashMap<String, Object> fprop = (HashMap<String, Object>)propNode;
+			String source = (String)fprop.get("source");
+			String inheritedSource = (String)fprop.get("inherited");
+			
+			FlowProps flowProps = new FlowProps(inheritedSource, source);
+			this.flowProps.put(source, flowProps);
+		}
+	}
+	
+	public Map<String, Object> toUpdateObject(long lastUpdateTime) {
+		Map<String, Object> updateData = super.toUpdateObject();
+		
+		List<Map<String,Object>> updatedNodes = new ArrayList<Map<String,Object>>();
+		for (ExecutableNode node: executableNodes.values()) {
+			if (node instanceof ExecutableFlowBase) {
+				Map<String, Object> updatedNodeMap = ((ExecutableFlowBase)node).toUpdateObject(lastUpdateTime);
+				// We add only flows to the list which either have a good update time, or has updated descendants.
+				if (node.getUpdateTime() > lastUpdateTime || updatedNodeMap.containsKey(NODES_PARAM)) {
+					updatedNodes.add(updatedNodeMap);
+				}
+			} 
+			else {
+				if (node.getUpdateTime() > lastUpdateTime) {
+					Map<String, Object> updatedNodeMap = node.toUpdateObject();
+					updatedNodes.add(updatedNodeMap);
+				}
+			}
+		}
+		
+		// if there are no updated nodes, we just won't add it to the list. This is good
+		// since if this is a nested flow, the parent is given the option to include or
+		// discard these subflows.
+		if (!updatedNodes.isEmpty()) {
+			updateData.put(NODES_PARAM, updatedNodes);
+		}
+		return updateData;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public void applyUpdateObject(Map<String, Object> updateData) {
+		super.applyUpdateObject(updateData);
+
+		List<Map<String,Object>> updatedNodes = (List<Map<String,Object>>)updateData.get(NODES_PARAM);
+		for (Map<String,Object> node: updatedNodes) {
+
+			String id = (String)node.get(ID_PARAM);
+			if (id == null) {
+				// Legacy case
+				id = (String)node.get("jobId");				
+			}
+
+			ExecutableNode exNode = executableNodes.get(id);
+			exNode.applyUpdateObject(node);
+		}
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 32eac5d..64a0a35 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -1,19 +1,3 @@
-/*
- * Copyright 2012 LinkedIn, Inc
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
 package azkaban.executor;
 
 import java.util.ArrayList;
@@ -22,197 +6,114 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import azkaban.flow.Node;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
 
+/**
+ * Base Executable that nodes and flows are based.
+ */
 public class ExecutableNode {
-	private String jobId;
-	private int executionId;
-	private String type;
-	private String jobPropsSource;
-	private String inheritPropsSource;
+	public static final String ID_PARAM = "id";
+	public static final String STATUS_PARAM = "status";
+	public static final String STARTTIME_PARAM = "startTime";
+	public static final String ENDTIME_PARAM = "endTime";
+	public static final String UPDATETIME_PARAM = "updateTime";
+	public static final String INNODES_PARAM = "inNodes";
+	public static final String OUTNODES_PARAM = "outNodes";
+	public static final String TYPE_PARAM = "type";
+	public static final String PROPS_SOURCE_PARAM = "propSource";
+	public static final String JOB_SOURCE_PARAM = "jobSource";
+	public static final String OUTPUT_PROPS_PARAM = "outputProps";
+	
+	private String id;
+	private String type = null;
 	private Status status = Status.READY;
 	private long startTime = -1;
 	private long endTime = -1;
 	private long updateTime = -1;
-	private int level = 0;
-	private ExecutableFlow flow;
+	
+	// Path to Job File
+	private String jobSource; 
+	// Path to top level props file
+	private String propsSource;
+	private Set<String> inNodes = null;
+	private Set<String> outNodes = null;
+	
 	private Props outputProps;
-	private int attempt = 0;
-	private boolean paused = false;
 	
+	public static final String ATTEMPT_PARAM = "attempt";
+	public static final String PASTATTEMPTS_PARAM = "pastAttempts";
+	
+	private int attempt = 0;
 	private long delayExecution = 0;
-
-	private Set<String> inNodes = new HashSet<String>();
-	private Set<String> outNodes = new HashSet<String>();
+	private ArrayList<ExecutionAttempt> pastAttempts = null;
 	
-	// Used if proxy node
-	private Integer externalExecutionId;
-	private ArrayList<Attempt> pastAttempts = null;
+	// Transient. These values aren't saved, but rediscovered.
+	private ExecutableFlowBase parentFlow; 
 	
-	public ExecutableNode(Node node, ExecutableFlow flow) {
-		jobId = node.getId();
-		executionId = flow.getExecutionId();
-		type = node.getType();
-		jobPropsSource = node.getJobSource();
-		inheritPropsSource = node.getPropsSource();
-		status = Status.READY;
-		level = node.getLevel();
-		this.flow = flow;
+	public ExecutableNode(Node node) {
+		this.id = node.getId();
+		this.jobSource = node.getJobSource();
+		this.propsSource = node.getPropsSource();
 	}
 	
-	public ExecutableNode() {
+	public ExecutableNode(Node node, ExecutableFlowBase parent) {
+		this(node.getId(), node.getJobSource(), node.getPropsSource(), parent);
 	}
-	
-	public void resetForRetry() {
-		Attempt pastAttempt = new Attempt(attempt, startTime, endTime, status);
-		attempt++;
+
+	public ExecutableNode(String id, String jobSource, String propsSource, ExecutableFlowBase parent) {
+		this.id = id;
+		this.jobSource = jobSource;
+		this.propsSource = propsSource;
 		
-		synchronized (this) {
-			if (pastAttempts == null) {
-				pastAttempts = new ArrayList<Attempt>();
-			}
-			
-			pastAttempts.add(pastAttempt);
-		}
-		startTime = -1;
-		endTime = -1;
-		updateTime = System.currentTimeMillis();
-		status = Status.READY;
+		setParentFlow(parent);
 	}
 	
-	public void setExecutableFlow(ExecutableFlow flow) {
-		this.flow = flow;
+	public ExecutableNode() {
 	}
 	
-	public void setExecutionId(int id) {
-		executionId = id;
-	}
-
-	public int getExecutionId() {
-		return executionId;
-	}
-
-	public String getJobId() {
-		return jobId;
-	}
-
-	public void setJobId(String id) {
-		this.jobId = id;
+	public ExecutableFlow getExecutableFlow() {
+		if (parentFlow == null) {
+			return null;
+		}
+		
+		return parentFlow.getExecutableFlow();
 	}
-
-	public void addInNode(String exNode) {
-		inNodes.add(exNode);
+	
+	public void setParentFlow(ExecutableFlowBase flow) {
+		this.parentFlow = flow;
 	}
-
-	public void addOutNode(String exNode) {
-		outNodes.add(exNode);
+	
+	public ExecutableFlowBase getParentFlow() {
+		return parentFlow;
 	}
-
-	public Set<String> getOutNodes() {
-		return outNodes;
+	
+	public String getId() {
+		return id;
 	}
 	
-	public Set<String> getInNodes() {
-		return inNodes;
+	public void setId(String id) {
+		this.id = id;
 	}
 	
 	public Status getStatus() {
 		return status;
 	}
 
-	public void setStatus(Status status) {
-		this.status = status;
+	public String getType() {
+		return type;
 	}
-	
-	public long getDelayedExecution() {
-		return delayExecution;
+
+	public void setType(String type) {
+		this.type = type;
 	}
 	
-	public void setDelayedExecution(long delayMs) {
-		delayExecution = delayMs;
+	public void setStatus(Status status) {
+		this.status = status;
 	}
 	
-	public Object toObject() {
-		HashMap<String, Object> objMap = new HashMap<String, Object>();
-		objMap.put("id", jobId);
-		objMap.put("jobSource", jobPropsSource);
-		objMap.put("propSource", inheritPropsSource);
-		objMap.put("jobType", type);
-		objMap.put("status", status.toString());
-		objMap.put("inNodes", new ArrayList<String>(inNodes));
-		objMap.put("outNodes", new ArrayList<String>(outNodes));
-		objMap.put("startTime", startTime);
-		objMap.put("endTime", endTime);
-		objMap.put("updateTime", updateTime);
-		objMap.put("level", level);
-		objMap.put("externalExecutionId", externalExecutionId);
-		objMap.put("paused", paused);
-		
-		if (pastAttempts != null) {
-			ArrayList<Object> attemptsList = new ArrayList<Object>(pastAttempts.size());
-			for (Attempt attempts : pastAttempts) {
-				attemptsList.add(attempts.toObject());
-			}
-			objMap.put("pastAttempts", attemptsList);
-		}
-		
-		return objMap;
-	}
-
-	@SuppressWarnings("unchecked")
-	public static ExecutableNode createNodeFromObject(Object obj, ExecutableFlow flow) {
-		ExecutableNode exNode = new ExecutableNode();
-		
-		HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
-		exNode.executionId = flow == null ? 0 : flow.getExecutionId();
-		exNode.jobId = (String)objMap.get("id");
-		exNode.jobPropsSource = (String)objMap.get("jobSource");
-		exNode.inheritPropsSource = (String)objMap.get("propSource");
-		exNode.type = (String)objMap.get("jobType");
-		exNode.status = Status.valueOf((String)objMap.get("status"));
-		
-		exNode.inNodes.addAll( (List<String>)objMap.get("inNodes") );
-		exNode.outNodes.addAll( (List<String>)objMap.get("outNodes") );
-		
-		exNode.startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
-		exNode.endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
-		exNode.updateTime = JSONUtils.getLongFromObject(objMap.get("updateTime"));
-		exNode.level = (Integer)objMap.get("level");
-		
-		exNode.externalExecutionId = (Integer)objMap.get("externalExecutionId");
-		
-		exNode.flow = flow;
-		Boolean paused = (Boolean)objMap.get("paused");
-		if (paused!=null) {
-			exNode.paused = paused;
-		}
-		
-		List<Object> pastAttempts = (List<Object>)objMap.get("pastAttempts");
-		if (pastAttempts!=null) {
-			ArrayList<Attempt> attempts = new ArrayList<Attempt>();
-			for (Object attemptObj: pastAttempts) {
-				Attempt attempt = Attempt.fromObject(attemptObj);
-				attempts.add(attempt);
-			}
-			
-			exNode.pastAttempts = attempts;
-		}
-		
-		return exNode;
-	}
-
-	@SuppressWarnings("unchecked")
-	public void updateNodeFromObject(Object obj) {
-		HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
-		status = Status.valueOf((String)objMap.get("status"));
-
-		startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
-		endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
-	}
-
 	public long getStartTime() {
 		return startTime;
 	}
@@ -228,23 +129,7 @@ public class ExecutableNode {
 	public void setEndTime(long endTime) {
 		this.endTime = endTime;
 	}
-
-	public String getJobPropsSource() {
-		return jobPropsSource;
-	}
-
-	public String getPropsSource() {
-		return inheritPropsSource;
-	}
-
-	public int getLevel() {
-		return level;
-	}
-
-	public ExecutableFlow getFlow() {
-		return flow;
-	}
-
+	
 	public long getUpdateTime() {
 		return updateTime;
 	}
@@ -252,7 +137,45 @@ public class ExecutableNode {
 	public void setUpdateTime(long updateTime) {
 		this.updateTime = updateTime;
 	}
+	
+	public void addOutNode(String exNode) {
+		if (outNodes == null) {
+			outNodes = new HashSet<String>();
+		}
+		outNodes.add(exNode);
+	}
+	
+	public void addInNode(String exNode) {
+		if (inNodes == null) {
+			inNodes = new HashSet<String>();
+		}
+		inNodes.add(exNode);
+	}
 
+	public Set<String> getOutNodes() {
+		return outNodes;
+	}
+	
+	public Set<String> getInNodes() {
+		return inNodes;
+	}
+	
+	public boolean hasJobSource() {
+		return jobSource != null;
+	}
+	
+	public boolean hasPropsSource() {
+		return propsSource != null;
+	}
+	
+	public String getJobSource() {
+		return jobSource;
+	}
+	
+	public String getPropsSource() {
+		return propsSource;
+	}
+	
 	public void setOutputProps(Props output) {
 		this.outputProps = output;
 	}
@@ -260,16 +183,16 @@ public class ExecutableNode {
 	public Props getOutputProps() {
 		return outputProps;
 	}
-
-	public Integer getExternalExecutionId() {
-		return externalExecutionId;
+	
+	public long getDelayedExecution() {
+		return delayExecution;
 	}
-
-	public void setExternalExecutionId(Integer externalExecutionId) {
-		this.externalExecutionId = externalExecutionId;
+	
+	public void setDelayedExecution(long delayMs) {
+		delayExecution = delayMs;
 	}
-
-	public List<Attempt> getPastAttemptList() {
+	
+	public List<ExecutionAttempt> getPastAttemptList() {
 		return pastAttempts;
 	}
 	
@@ -281,96 +204,184 @@ public class ExecutableNode {
 		this.attempt = attempt;
 	}
 	
-	public boolean isPaused() {
-		return paused;
+	public void resetForRetry() {
+		ExecutionAttempt pastAttempt = new ExecutionAttempt(attempt, this);
+		attempt++;
+		
+		synchronized (this) {
+			if (pastAttempts == null) {
+				pastAttempts = new ArrayList<ExecutionAttempt>();
+			}
+			
+			pastAttempts.add(pastAttempt);
+		}
+		
+		this.setStartTime(-1);
+		this.setEndTime(-1);
+		this.setUpdateTime(System.currentTimeMillis());
+		this.setStatus(Status.READY);
 	}
 	
-	public void setPaused(boolean paused) {
-		this.paused = paused;
-	}
 	
 	public List<Object> getAttemptObjects() {
 		ArrayList<Object> array = new ArrayList<Object>();
 		
-		for (Attempt attempt: pastAttempts) {
+		for (ExecutionAttempt attempt: pastAttempts) {
 			array.add(attempt.toObject());
 		}
 		
 		return array;
 	}
 	
+	public Map<String,Object> toObject() {
+		Map<String,Object> mapObj = new HashMap<String,Object>();
+		fillMapFromExecutable(mapObj);
+		
+		return mapObj;
+	}
 	
-	public void updatePastAttempts(List<Object> pastAttemptsList) {
-		if (pastAttemptsList == null) {
-			return;
+	protected void fillMapFromExecutable(Map<String,Object> objMap) {
+		objMap.put(ID_PARAM, this.id);
+		objMap.put(STATUS_PARAM, status.toString());
+		objMap.put(STARTTIME_PARAM, startTime);
+		objMap.put(ENDTIME_PARAM, endTime);
+		objMap.put(UPDATETIME_PARAM, updateTime);
+		objMap.put(TYPE_PARAM, type);
+		
+		if (inNodes != null) {
+			objMap.put(INNODES_PARAM, inNodes);
+		}
+		if (outNodes != null) {
+			objMap.put(OUTNODES_PARAM, outNodes);
 		}
 		
-		synchronized (this) {
-			if (this.pastAttempts == null) {
-				this.pastAttempts = new ArrayList<Attempt>();
-			}
-
-			// We just check size because past attempts don't change
-			if (pastAttemptsList.size() <= this.pastAttempts.size()) {
-				return;
-			}
-
-			Object[] pastAttemptArray = pastAttemptsList.toArray();
-			for (int i = this.pastAttempts.size(); i < pastAttemptArray.length; ++i) {
-				Attempt attempt = Attempt.fromObject(pastAttemptArray[i]);
-				this.pastAttempts.add(attempt);
+//		if (hasPropsSource()) {
+//			objMap.put(PROPS_SOURCE_PARAM, this.propsSource);
+//		}
+		if (hasJobSource()) {
+			objMap.put(JOB_SOURCE_PARAM, this.jobSource);
+		}
+		
+		if (outputProps != null) {
+			objMap.put(OUTPUT_PROPS_PARAM, PropsUtils.toStringMap(outputProps, true));
+		}
+		
+		if (pastAttempts != null) {
+			ArrayList<Object> attemptsList = new ArrayList<Object>(pastAttempts.size());
+			for (ExecutionAttempt attempts : pastAttempts) {
+				attemptsList.add(attempts.toObject());
 			}
+			objMap.put(PASTATTEMPTS_PARAM, attemptsList);
 		}
-
 	}
-
-	public static class Attempt {
-		private int attempt = 0;
-		private long startTime = -1;
-		private long endTime = -1;
-		private Status status;
+	
+	@SuppressWarnings("unchecked")
+	public void fillExecutableFromMapObject(Map<String,Object> objMap) {
+		this.id = (String)objMap.get(ID_PARAM);
+		this.status = Status.valueOf((String)objMap.get(STATUS_PARAM));
+		this.startTime = JSONUtils.getLongFromObject(objMap.get(STARTTIME_PARAM));
+		this.endTime = JSONUtils.getLongFromObject(objMap.get(ENDTIME_PARAM));
+		this.updateTime = JSONUtils.getLongFromObject(objMap.get(UPDATETIME_PARAM));
+		this.type = (String)objMap.get(TYPE_PARAM);
 		
-		public Attempt(int attempt, long startTime, long endTime, Status status) {
-			this.attempt = attempt;
-			this.startTime = startTime;
-			this.endTime = endTime;
-			this.status = status;
+		if (objMap.containsKey(INNODES_PARAM)) {
+			this.inNodes = new HashSet<String>();
+			this.inNodes.addAll((List<String>)objMap.get(INNODES_PARAM));
 		}
 		
-		public long getStartTime() {
-			return startTime;
-		}
-
-		public long getEndTime() {
-			return endTime;
+		if (objMap.containsKey(OUTNODES_PARAM)) {
+			this.outNodes = new HashSet<String>();
+			this.outNodes.addAll((List<String>)objMap.get(OUTNODES_PARAM));
 		}
+//	
+//		if (objMap.containsKey(PROPS_SOURCE_PARAM)) {
+//			this.propsSource = (String)objMap.get(PROPS_SOURCE_PARAM);
+//		}
 		
-		public Status getStatus() {
-			return status;
+		if (objMap.containsKey(JOB_SOURCE_PARAM)) {
+			this.jobSource = (String)objMap.get(JOB_SOURCE_PARAM);
 		}
 		
-		public int getAttempt() {
-			return attempt;
+		if (objMap.containsKey(OUTPUT_PROPS_PARAM)) {
+			this.outputProps = new Props(null, (Map<String,String>)objMap.get(OUTPUT_PROPS_PARAM));
 		}
 		
-		public static Attempt fromObject(Object obj) {
-			@SuppressWarnings("unchecked")
-			Map<String, Object> map = (Map<String, Object>)obj;
-			int attempt = (Integer)map.get("attempt");
-			long startTime = JSONUtils.getLongFromObject(map.get("startTime"));
-			long endTime = JSONUtils.getLongFromObject(map.get("endTime"));
-			Status status = Status.valueOf((String)map.get("status"));
+		List<Object> pastAttempts = (List<Object>)objMap.get(PASTATTEMPTS_PARAM);
+		if (pastAttempts!=null) {
+			ArrayList<ExecutionAttempt> attempts = new ArrayList<ExecutionAttempt>();
+			for (Object attemptObj: pastAttempts) {
+				ExecutionAttempt attempt = ExecutionAttempt.fromObject(attemptObj);
+				attempts.add(attempt);
+			}
 			
-			return new Attempt(attempt, startTime, endTime, status);
+			this.pastAttempts = attempts;
 		}
+	}
+
+	public Map<String, Object> toUpdateObject() {
+		Map<String, Object> updatedNodeMap = new HashMap<String,Object>();
+		updatedNodeMap.put(ID_PARAM, getId());
+		updatedNodeMap.put(STATUS_PARAM, getStatus().getNumVal());
+		updatedNodeMap.put(STARTTIME_PARAM, getStartTime());
+		updatedNodeMap.put(ENDTIME_PARAM, getEndTime());
+		updatedNodeMap.put(UPDATETIME_PARAM, getUpdateTime());
 		
-		public Map<String, Object> toObject() {
-			HashMap<String,Object> attempts = new HashMap<String,Object>();
-			attempts.put("attempt", attempt);
-			attempts.put("startTime", startTime);
-			attempts.put("endTime", endTime);
-			attempts.put("status", status.toString());
-			return attempts;
+		updatedNodeMap.put(ATTEMPT_PARAM, getAttempt());
+
+		if (getAttempt() > 0) {
+			ArrayList<Map<String,Object>> pastAttempts = new ArrayList<Map<String,Object>>();
+			for (ExecutionAttempt attempt: getPastAttemptList()) {
+				pastAttempts.add(attempt.toObject());
+			}
+			updatedNodeMap.put(PASTATTEMPTS_PARAM, pastAttempts);
+		}
+		
+		return updatedNodeMap;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public void applyUpdateObject(Map<String, Object> updateData) {
+		if (updateData.containsKey(STATUS_PARAM)) {
+			this.status = Status.fromInteger((Integer)updateData.get(STATUS_PARAM));
+		}
+		if (updateData.containsKey(STARTTIME_PARAM)) {
+			this.startTime = JSONUtils.getLongFromObject(updateData.get(STARTTIME_PARAM));
+		}
+		if (updateData.containsKey(UPDATETIME_PARAM)) {
+			this.updateTime = JSONUtils.getLongFromObject(updateData.get(UPDATETIME_PARAM));
+		}
+		if (updateData.containsKey(ENDTIME_PARAM)) {
+			this.endTime = JSONUtils.getLongFromObject(updateData.get(ENDTIME_PARAM));
+		}
+		
+		if (updateData.containsKey(ATTEMPT_PARAM)) {
+			attempt = (Integer)updateData.get(ATTEMPT_PARAM);
+			if (attempt > 0) {
+				updatePastAttempts((List<Object>)updateData.get(PASTATTEMPTS_PARAM));
+			}
+		}
+	}
+	
+	private void updatePastAttempts(List<Object> pastAttemptsList) {
+		if (pastAttemptsList == null) {
+			return;
+		}
+		
+		synchronized (this) {
+			if (this.pastAttempts == null) {
+				this.pastAttempts = new ArrayList<ExecutionAttempt>();
+			}
+
+			// We just check size because past attempts don't change
+			if (pastAttemptsList.size() <= this.pastAttempts.size()) {
+				return;
+			}
+
+			Object[] pastAttemptArray = pastAttemptsList.toArray();
+			for (int i = this.pastAttempts.size(); i < pastAttemptArray.length; ++i) {
+				ExecutionAttempt attempt = ExecutionAttempt.fromObject(pastAttemptArray[i]);
+				this.pastAttempts.add(attempt);
+			}
 		}
 	}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/executor/ExecutionAttempt.java b/src/java/azkaban/executor/ExecutionAttempt.java
new file mode 100644
index 0000000..7da0623
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutionAttempt.java
@@ -0,0 +1,68 @@
+package azkaban.executor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.utils.JSONUtils;
+
+public class ExecutionAttempt {
+	public static final String ATTEMPT_PARAM = "attempt";
+	public static final String STATUS_PARAM = "status";
+	public static final String STARTTIME_PARAM = "startTime";
+	public static final String ENDTIME_PARAM = "endTime";
+	
+	private int attempt = 0;
+	private long startTime = -1;
+	private long endTime = -1;
+	private Status status;
+	
+	public ExecutionAttempt(int attempt, ExecutableNode executable) {
+		this.attempt = attempt;
+		this.startTime = executable.getStartTime();
+		this.endTime = executable.getEndTime();
+		this.status = executable.getStatus();
+	}
+	
+	public ExecutionAttempt(int attempt, long startTime, long endTime, Status status) {
+		this.attempt = attempt;
+		this.startTime = startTime;
+		this.endTime = endTime;
+		this.status = status;
+	}
+	
+	public long getStartTime() {
+		return startTime;
+	}
+
+	public long getEndTime() {
+		return endTime;
+	}
+	
+	public Status getStatus() {
+		return status;
+	}
+	
+	public int getAttempt() {
+		return attempt;
+	}
+	
+	public static ExecutionAttempt fromObject(Object obj) {
+		@SuppressWarnings("unchecked")
+		Map<String, Object> map = (Map<String, Object>)obj;
+		int attempt = (Integer)map.get(ATTEMPT_PARAM);
+		long startTime = JSONUtils.getLongFromObject(map.get(STARTTIME_PARAM));
+		long endTime = JSONUtils.getLongFromObject(map.get(ENDTIME_PARAM));
+		Status status = Status.valueOf((String)map.get(STATUS_PARAM));
+		
+		return new ExecutionAttempt(attempt, startTime, endTime, status);
+	}
+	
+	public Map<String, Object> toObject() {
+		HashMap<String,Object> attempts = new HashMap<String,Object>();
+		attempts.put(ATTEMPT_PARAM, attempt);
+		attempts.put(STARTTIME_PARAM, startTime);
+		attempts.put(ENDTIME_PARAM, endTime);
+		attempts.put(STATUS_PARAM, status.toString());
+		return attempts;
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index c6c6f41..17ac116 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -154,11 +154,11 @@ public class ExecutorMailer extends AbstractMailer {
 		}
 	}
 	
-	private List<String> findFailedJobs(ExecutableFlow flow) {
+	private List<String> findFailedJobs(ExecutableFlowBase flow) {
 		ArrayList<String> failedJobs = new ArrayList<String>();
 		for (ExecutableNode node: flow.getExecutableNodes()) {
 			if (node.getStatus() == Status.FAILED) {
-				failedJobs.add(node.getJobId());
+				failedJobs.add(node.getId());
 			}
 		}
 		
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 527956d..74bba73 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -361,12 +361,14 @@ public class ExecutorManager {
 		}
 	}
 	
-	public String submitExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
+	public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException {
 		synchronized(exflow) {
 			logger.info("Submitting execution flow " + exflow.getFlowId());
 
 			int projectId = exflow.getProjectId();
 			String flowId = exflow.getFlowId();
+			exflow.setSubmitUser(userId);
+			exflow.setSubmitTime(System.currentTimeMillis());
 			
 			List<Integer> running = getRunningFlows(projectId, flowId);
 
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 3c7332d..a231c41 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -374,7 +374,14 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 			}
 		}
 		
-		ExecutableFlow flow = node.getFlow();
+		ExecutableFlow flow = node.getExecutableFlow();
+		String flowId = flow.getFlowId();
+		
+		// if the main flow is not the parent, then we'll create a composite key for flowID
+		if (flow != node.getParentFlow()) {
+			flowId = node.getParentFlow().getNestedId();
+		}
+		
 		QueryRunner runner = createQueryRunner();
 		try {
 			runner.update(
@@ -382,8 +389,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 					flow.getExecutionId(), 
 					flow.getProjectId(), 
 					flow.getVersion(), 
-					flow.getFlowId(), 
-					node.getJobId(),
+					flowId, 
+					node.getId(),
 					node.getStartTime(),
 					node.getEndTime(), 
 					node.getStatus().getNumVal(),
@@ -391,7 +398,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 					node.getAttempt()
 					);
 		} catch (SQLException e) {
-			throw new ExecutorManagerException("Error writing job " + node.getJobId(), e);
+			throw new ExecutorManagerException("Error writing job " + node.getId(), e);
 		}
 	}
 	
@@ -418,11 +425,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLo
 					node.getEndTime(), 
 					node.getStatus().getNumVal(), 
 					outputParam,
-					node.getFlow().getExecutionId(),
-					node.getJobId(),
+					node.getExecutableFlow().getExecutionId(),
+					node.getId(),
 					node.getAttempt());
 		} catch (SQLException e) {
-			throw new ExecutorManagerException("Error updating job " + node.getJobId(), e);
+			throw new ExecutorManagerException("Error updating job " + node.getId(), e);
 		}
 	}
 	
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index c60f143..e447067 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -57,7 +57,8 @@ import azkaban.utils.Pair;
 public class ScheduleManager {
 	private static Logger logger = Logger.getLogger(ScheduleManager.class);
 
-	private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
+	private final DateTimeFormatter _dateFormat = DateTimeFormat
+			.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
 	private ScheduleLoader loader;
 
 	private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
@@ -66,7 +67,7 @@ public class ScheduleManager {
 	private final ExecutorManager executorManager;
 	private final ProjectManager projectManager;
 	private final SLAManager slaManager;
-	
+
 	// Used for mbeans to query Scheduler status
 	private long lastCheckTime = -1;
 	private long nextWakupTime = -1;
@@ -78,10 +79,8 @@ public class ScheduleManager {
 	 * @param loader
 	 */
 	public ScheduleManager(ExecutorManager executorManager,
-							ProjectManager projectManager, 
-							SLAManager slaManager,
-							ScheduleLoader loader) 
-	{
+			ProjectManager projectManager, SLAManager slaManager,
+			ScheduleLoader loader) {
 		this.executorManager = executorManager;
 		this.projectManager = projectManager;
 		this.slaManager = slaManager;
@@ -93,7 +92,8 @@ public class ScheduleManager {
 			scheduleList = loader.loadSchedules();
 		} catch (ScheduleManagerException e) {
 			// TODO Auto-generated catch block
-			logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
+			logger.error("Failed to load schedules" + e.getCause()
+					+ e.getMessage());
 			e.printStackTrace();
 		}
 
@@ -126,9 +126,10 @@ public class ScheduleManager {
 	 * 
 	 * @param id
 	 * @return
-	*/
+	 */
 	public Set<Schedule> getSchedules(int projectId, String flowId) {
-		return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
+		return scheduleIdentityPairMap.get(new Pair<Integer, String>(projectId,
+				flowId));
 	}
 
 	/**
@@ -136,12 +137,11 @@ public class ScheduleManager {
 	 * 
 	 * @param id
 	 * @return
-	*/
+	 */
 	public Schedule getSchedule(int scheduleId) {
 		return scheduleIDMap.get(scheduleId);
 	}
 
-
 	/**
 	 * Removes the flow from the schedule if it exists.
 	 * 
@@ -149,10 +149,11 @@ public class ScheduleManager {
 	 */
 	public synchronized void removeSchedules(int projectId, String flowId) {
 		Set<Schedule> schedules = getSchedules(projectId, flowId);
-		for(Schedule sched : schedules) {
+		for (Schedule sched : schedules) {
 			removeSchedule(sched);
 		}
 	}
+
 	/**
 	 * Removes the flow from the schedule if it exists.
 	 * 
@@ -160,16 +161,16 @@ public class ScheduleManager {
 	 */
 	public synchronized void removeSchedule(Schedule sched) {
 
-		Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
+		Pair<Integer, String> identityPairMap = sched.getScheduleIdentityPair();
 		Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
-		if(schedules != null) {
+		if (schedules != null) {
 			schedules.remove(sched);
-			if(schedules.size() == 0) {
+			if (schedules.size() == 0) {
 				scheduleIdentityPairMap.remove(identityPairMap);
 			}
 		}
 		scheduleIDMap.remove(sched.getScheduleId());
-		
+
 		runner.removeRunnerSchedule(sched);
 		try {
 			loader.removeSchedule(sched);
@@ -201,40 +202,28 @@ public class ScheduleManager {
 	// }
 	// }
 
-	public Schedule scheduleFlow(
-			final int scheduleId,
-			final int projectId,
-			final String projectName,
-			final String flowName,
-			final String status,
-			final long firstSchedTime,
-			final DateTimeZone timezone,
-			final ReadablePeriod period,
-			final long lastModifyTime,
-			final long nextExecTime,
-			final long submitTime,
-			final String submitUser
-			) {
-		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+	public Schedule scheduleFlow(final int scheduleId, final int projectId,
+			final String projectName, final String flowName,
+			final String status, final long firstSchedTime,
+			final DateTimeZone timezone, final ReadablePeriod period,
+			final long lastModifyTime, final long nextExecTime,
+			final long submitTime, final String submitUser) {
+		return scheduleFlow(scheduleId, projectId, projectName, flowName,
+				status, firstSchedTime, timezone, period, lastModifyTime,
+				nextExecTime, submitTime, submitUser, null, null);
 	}
-	
-	public Schedule scheduleFlow(
-			final int scheduleId,
-			final int projectId,
-			final String projectName,
-			final String flowName,
-			final String status,
-			final long firstSchedTime,
-			final DateTimeZone timezone,
-			final ReadablePeriod period,
-			final long lastModifyTime,
-			final long nextExecTime,
-			final long submitTime,
-			final String submitUser,
-			ExecutionOptions execOptions,
-			SlaOptions slaOptions
-			) {
-		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
+
+	public Schedule scheduleFlow(final int scheduleId, final int projectId,
+			final String projectName, final String flowName,
+			final String status, final long firstSchedTime,
+			final DateTimeZone timezone, final ReadablePeriod period,
+			final long lastModifyTime, final long nextExecTime,
+			final long submitTime, final String submitUser,
+			ExecutionOptions execOptions, SlaOptions slaOptions) {
+		Schedule sched = new Schedule(scheduleId, projectId, projectName,
+				flowName, status, firstSchedTime, timezone, period,
+				lastModifyTime, nextExecTime, submitTime, submitUser,
+				execOptions, slaOptions);
 		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
 				+ _dateFormat.print(firstSchedTime) + " with a period of "
 				+ period == null ? "(non-recurring)" : period);
@@ -256,8 +245,9 @@ public class ScheduleManager {
 		s.updateTime();
 		this.runner.addRunnerSchedule(s);
 		scheduleIDMap.put(s.getScheduleId(), s);
-		Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
-		if(schedules == null) {
+		Set<Schedule> schedules = scheduleIdentityPairMap.get(s
+				.getScheduleIdentityPair());
+		if (schedules == null) {
 			schedules = new HashSet<Schedule>();
 			scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), schedules);
 		}
@@ -271,13 +261,12 @@ public class ScheduleManager {
 	 */
 	public synchronized void insertSchedule(Schedule s) {
 		boolean exist = s.getScheduleId() != -1;
-		if(s.updateTime()) {
+		if (s.updateTime()) {
 			try {
-				if(!exist) {
+				if (!exist) {
 					loader.insertSchedule(s);
 					internalSchedule(s);
-				}
-				else{
+				} else {
 					loader.updateSchedule(s);
 					internalSchedule(s);
 				}
@@ -285,19 +274,19 @@ public class ScheduleManager {
 				// TODO Auto-generated catch block
 				e.printStackTrace();
 			}
-		}
-		else {
-			logger.error("The provided schedule is non-recurring and the scheduled time already passed. " + s.getScheduleName());
+		} else {
+			logger.error("The provided schedule is non-recurring and the scheduled time already passed. "
+					+ s.getScheduleName());
 		}
 	}
 
-//	/**
-//	 * Save the schedule
-//	 */
-//	private void saveSchedule() {
-//		loader.saveSchedule(getSchedule());
-//	}
-	
+	// /**
+	// * Save the schedule
+	// */
+	// private void saveSchedule() {
+	// loader.saveSchedule(getSchedule());
+	// }
+
 	/**
 	 * Thread that simply invokes the running of flows when the schedule is
 	 * ready.
@@ -313,7 +302,8 @@ public class ScheduleManager {
 		private static final int TIMEOUT_MS = 300000;
 
 		public ScheduleRunner() {
-			schedules = new PriorityBlockingQueue<Schedule>(1,new ScheduleComparator());
+			schedules = new PriorityBlockingQueue<Schedule>(1,
+					new ScheduleComparator());
 		}
 
 		public void shutdown() {
@@ -377,99 +367,158 @@ public class ScheduleManager {
 
 						if (s == null) {
 							// If null, wake up every minute or so to see if
-							// there's something to do. Most likely there will not be.
+							// there's something to do. Most likely there will
+							// not be.
 							try {
 								logger.info("Nothing scheduled to run. Checking again soon.");
-								nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
+								nextWakupTime = System.currentTimeMillis()
+										+ TIMEOUT_MS;
 								this.wait(TIMEOUT_MS);
 							} catch (InterruptedException e) {
-								// interruption should occur when items are added or removed from the queue.
+								// interruption should occur when items are
+								// added or removed from the queue.
 							}
 						} else {
-							// We've passed the flow execution time, so we will run.
-							if (!(new DateTime(s.getNextExecTime())).isAfterNow()) {
-								// Run flow. The invocation of flows should be quick.
+							// We've passed the flow execution time, so we will
+							// run.
+							if (!(new DateTime(s.getNextExecTime()))
+									.isAfterNow()) {
+								// Run flow. The invocation of flows should be
+								// quick.
 								Schedule runningSched = schedules.poll();
 
-								logger.info("Scheduler ready to run " + runningSched.toString());
+								logger.info("Scheduler ready to run "
+										+ runningSched.toString());
 								// Execute the flow here
 								try {
-									Project project = projectManager.getProject(runningSched.getProjectId());
+									Project project = projectManager
+											.getProject(runningSched
+													.getProjectId());
 									if (project == null) {
-										logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
-										throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
-									}	
-									//TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
+										logger.error("Scheduled Project "
+												+ runningSched.getProjectId()
+												+ " does not exist!");
+										throw new RuntimeException(
+												"Error finding the scheduled project. "
+														+ runningSched
+																.getProjectId());
+									}
+									// TODO It is possible that the project is
+									// there, but the flow doesn't exist because
+									// upload a version that changes flow
+									// structure
 
-									Flow flow = project.getFlow(runningSched.getFlowName());
+									Flow flow = project.getFlow(runningSched
+											.getFlowName());
 									if (flow == null) {
-										logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
-										throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
+										logger.error("Flow "
+												+ runningSched
+														.getScheduleName()
+												+ " cannot be found in project "
+												+ project.getName());
+										throw new RuntimeException(
+												"Error finding the scheduled flow. "
+														+ runningSched
+																.getScheduleName());
 									}
 
 									// Create ExecutableFlow
-									ExecutableFlow exflow = new ExecutableFlow(flow);
-									System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
-									exflow.setScheduleId(runningSched.getScheduleId());
-									exflow.setSubmitUser(runningSched.getSubmitUser());
-									exflow.addAllProxyUsers(project.getProxyUsers());
-									
-									ExecutionOptions flowOptions = runningSched.getExecutionOptions();
-									if(flowOptions == null) {
+									ExecutableFlow exflow = new ExecutableFlow(
+											project, flow);
+									System.out
+											.println("ScheduleManager: creating schedule: "
+													+ runningSched
+															.getScheduleId());
+									exflow.setScheduleId(runningSched
+											.getScheduleId());
+									exflow.addAllProxyUsers(project
+											.getProxyUsers());
+
+									ExecutionOptions flowOptions = runningSched
+											.getExecutionOptions();
+									if (flowOptions == null) {
 										flowOptions = new ExecutionOptions();
-										flowOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+										flowOptions
+												.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
 									}
 									exflow.setExecutionOptions(flowOptions);
-									
-									if (!flowOptions.isFailureEmailsOverridden()) {
-										flowOptions.setFailureEmails(flow.getFailureEmails());
+
+									if (!flowOptions
+											.isFailureEmailsOverridden()) {
+										flowOptions.setFailureEmails(flow
+												.getFailureEmails());
 									}
-									if (!flowOptions.isSuccessEmailsOverridden()) {
-										flowOptions.setSuccessEmails(flow.getSuccessEmails());
+									if (!flowOptions
+											.isSuccessEmailsOverridden()) {
+										flowOptions.setSuccessEmails(flow
+												.getSuccessEmails());
 									}
-									
+
 									try {
-										executorManager.submitExecutableFlow(exflow);
-										logger.info("Scheduler has invoked " + exflow.getExecutionId());
-									} 
-									catch (ExecutorManagerException e) {
+										executorManager
+												.submitExecutableFlow(exflow, runningSched
+														.getSubmitUser());
+										logger.info("Scheduler has invoked "
+												+ exflow.getExecutionId());
+									} catch (ExecutorManagerException e) {
 										throw e;
-									}
-									catch (Exception e) {	
+									} catch (Exception e) {
 										e.printStackTrace();
-										throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
+										throw new ScheduleManagerException(
+												"Scheduler invoked flow "
+														+ exflow.getExecutionId()
+														+ " has failed.", e);
 									}
-									
-									SlaOptions slaOptions = runningSched.getSlaOptions();
-									if(slaOptions != null) {
-										logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+
+									SlaOptions slaOptions = runningSched
+											.getSlaOptions();
+									if (slaOptions != null) {
+										logger.info("Submitting SLA checkings for "
+												+ runningSched.getFlowName());
 										// submit flow slas
 										List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
-										for(SlaSetting set : slaOptions.getSettings()) {
-											if(set.getId().equals("")) {
-												DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
-												slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
-											}
-											else {
+										for (SlaSetting set : slaOptions
+												.getSettings()) {
+											if (set.getId().equals("")) {
+												DateTime checkTime = new DateTime(
+														runningSched
+																.getNextExecTime())
+														.plus(set.getDuration());
+												slaManager
+														.submitSla(
+																exflow.getExecutionId(),
+																"",
+																checkTime,
+																slaOptions
+																		.getSlaEmails(),
+																set.getActions(),
+																null,
+																set.getRule());
+											} else {
 												jobsettings.add(set);
 											}
 										}
-										if(jobsettings.size() > 0) {
-											slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+										if (jobsettings.size() > 0) {
+											slaManager.submitSla(
+													exflow.getExecutionId(),
+													"", DateTime.now(),
+													slaOptions.getSlaEmails(),
+													new ArrayList<SlaAction>(),
+													jobsettings,
+													SlaRule.WAITANDCHECKJOB);
 										}
 									}
-									
-								} 
-								catch (ExecutorManagerException e) {
-									if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
+
+								} catch (ExecutorManagerException e) {
+									if (e.getReason() != null
+											&& e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
 										logger.info(e.getMessage());
-									}
-									else {
+									} else {
 										e.printStackTrace();
 									}
-								}
-								catch (Exception e) {
-									logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
+								} catch (Exception e) {
+									logger.info("Scheduler failed to run job. "
+											+ e.getMessage() + e.getCause());
 								}
 
 								removeRunnerSchedule(runningSched);
@@ -480,15 +529,18 @@ public class ScheduleManager {
 								if (runningSched.updateTime()) {
 									addRunnerSchedule(runningSched);
 									loader.updateSchedule(runningSched);
-								}
-								else {
+								} else {
 									removeSchedule(runningSched);
-								}								
+								}
 							} else {
 								// wait until flow run
-								long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
+								long millisWait = Math.max(
+										0,
+										s.getNextExecTime()
+												- (new DateTime()).getMillis());
 								try {
-									nextWakupTime = System.currentTimeMillis() + millisWait;
+									nextWakupTime = System.currentTimeMillis()
+											+ millisWait;
 									this.wait(Math.min(millisWait, TIMEOUT_MS));
 								} catch (InterruptedException e) {
 									// interruption should occur when items are
@@ -497,9 +549,13 @@ public class ScheduleManager {
 							}
 						}
 					} catch (Exception e) {
-						logger.error("Unexpected exception has been thrown in scheduler", e);
+						logger.error(
+								"Unexpected exception has been thrown in scheduler",
+								e);
 					} catch (Throwable e) {
-						logger.error("Unexpected throwable has been thrown in scheduler", e);
+						logger.error(
+								"Unexpected throwable has been thrown in scheduler",
+								e);
 					}
 				}
 			}
@@ -526,19 +582,19 @@ public class ScheduleManager {
 			}
 		}
 	}
-	
+
 	public long getLastCheckTime() {
 		return lastCheckTime;
 	}
-	
+
 	public long getNextUpdateTime() {
 		return nextWakupTime;
 	}
-	
+
 	public State getThreadState() {
 		return runner.getState();
 	}
-	
+
 	public boolean isThreadActive() {
 		return runner.isAlive();
 	}
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index 5324cb7..14da916 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -28,7 +28,7 @@ import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
 import azkaban.flow.CommonJobProperties;
 
 import org.apache.commons.lang.StringUtils;
@@ -208,7 +208,7 @@ public class PropsUtils {
 		return buffer.toString();
 	}
 	
-	public static Props addCommonFlowProperties(final ExecutableFlow flow) {
+	public static Props addCommonFlowProperties(final ExecutableFlowBase flow) {
 		Props parentProps = new Props();
 
 		parentProps.put(CommonJobProperties.FLOW_ID, flow.getFlowId());
diff --git a/src/java/azkaban/utils/StringUtils.java b/src/java/azkaban/utils/StringUtils.java
index 6684b00..a51f812 100644
--- a/src/java/azkaban/utils/StringUtils.java
+++ b/src/java/azkaban/utils/StringUtils.java
@@ -16,7 +16,6 @@
 package azkaban.utils;
 
 import java.util.Collection;
-import java.util.List;
 
 public class StringUtils {
 	public static final char SINGLE_QUOTE = '\'';
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index a542a82..e0d5e85 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionAttempt;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorManager;
@@ -57,7 +58,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	@Override
 	public void init(ServletConfig config) throws ServletException {
 		super.init(config);
-		AzkabanWebServer server = (AzkabanWebServer)getApplication();
+		AzkabanWebServer server = (AzkabanWebServer) getApplication();
 		projectManager = server.getProjectManager();
 		executorManager = server.getExecutorManager();
 		scheduleManager = server.getScheduleManager();
@@ -65,25 +66,26 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	@Override
-	protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+	protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
+			Session session) throws ServletException, IOException {
 		if (hasParam(req, "ajax")) {
 			handleAJAXAction(req, resp, session);
-		}
-		else if (hasParam(req, "execid")) {
+		} else if (hasParam(req, "execid")) {
 			if (hasParam(req, "job")) {
 				handleExecutionJobPage(req, resp, session);
-			}
-			else {
+			} else {
 				handleExecutionFlowPage(req, resp, session);
 			}
-		}
-		else {
+		} else {
 			handleExecutionsPage(req, resp, session);
 		}
 	}
-	
-	private void handleExecutionJobPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/joblogpage.vm");
+
+	private void handleExecutionJobPage(HttpServletRequest req,
+			HttpServletResponse resp, Session session) throws ServletException,
+			IOException {
+		Page page = newPage(req, resp, session,
+				"azkaban/webapp/servlet/velocity/joblogpage.vm");
 		User user = session.getUser();
 		int execId = getIntParam(req, "execid");
 		String jobId = getParam(req, "job");
@@ -91,48 +93,59 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		page.add("execid", execId);
 		page.add("jobid", jobId);
 		page.add("attempt", attempt);
-		
+
 		ExecutableFlow flow = null;
 		try {
 			flow = executorManager.getExecutableFlow(execId);
 			if (flow == null) {
-				page.add("errorMsg", "Error loading executing flow " + execId + " not found.");
+				page.add("errorMsg", "Error loading executing flow " + execId
+						+ " not found.");
 				page.render();
 				return;
 			}
 		} catch (ExecutorManagerException e) {
-			page.add("errorMsg", "Error loading executing flow: " + e.getMessage());
+			page.add("errorMsg",
+					"Error loading executing flow: " + e.getMessage());
 			page.render();
 			return;
 		}
-		
+
 		int projectId = flow.getProjectId();
-		Project project = getProjectPageByPermission(page, projectId, user, Type.READ);
+		Project project = getProjectPageByPermission(page, projectId, user,
+				Type.READ);
 		if (project == null) {
 			page.render();
 			return;
 		}
-		
+
 		page.add("projectName", project.getName());
 		page.add("flowid", flow.getFlowId());
-		
+
 		page.render();
 	}
-	
-	private void handleExecutionsPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/executionspage.vm");
+
+	private void handleExecutionsPage(HttpServletRequest req,
+			HttpServletResponse resp, Session session) throws ServletException,
+			IOException {
+		Page page = newPage(req, resp, session,
+				"azkaban/webapp/servlet/velocity/executionspage.vm");
 
 		List<ExecutableFlow> runningFlows = executorManager.getRunningFlows();
 		page.add("runningFlows", runningFlows.isEmpty() ? null : runningFlows);
-		
-		List<ExecutableFlow> finishedFlows = executorManager.getRecentlyFinishedFlows();
-		page.add("recentlyFinished", finishedFlows.isEmpty() ? null : finishedFlows);
+
+		List<ExecutableFlow> finishedFlows = executorManager
+				.getRecentlyFinishedFlows();
+		page.add("recentlyFinished", finishedFlows.isEmpty() ? null
+				: finishedFlows);
 		page.add("vmutils", velocityHelper);
 		page.render();
 	}
-	
-	private void handleExecutionFlowPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/executingflowpage.vm");
+
+	private void handleExecutionFlowPage(HttpServletRequest req,
+			HttpServletResponse resp, Session session) throws ServletException,
+			IOException {
+		Page page = newPage(req, resp, session,
+				"azkaban/webapp/servlet/velocity/executingflowpage.vm");
 		User user = session.getUser();
 		int execId = getIntParam(req, "execid");
 		page.add("execid", execId);
@@ -141,89 +154,95 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		try {
 			flow = executorManager.getExecutableFlow(execId);
 			if (flow == null) {
-				page.add("errorMsg", "Error loading executing flow " + execId + " not found.");
+				page.add("errorMsg", "Error loading executing flow " + execId
+						+ " not found.");
 				page.render();
 				return;
 			}
 		} catch (ExecutorManagerException e) {
-			page.add("errorMsg", "Error loading executing flow: " + e.getMessage());
+			page.add("errorMsg",
+					"Error loading executing flow: " + e.getMessage());
 			page.render();
 			return;
 		}
-		
+
 		int projectId = flow.getProjectId();
-		Project project = getProjectPageByPermission(page, projectId, user, Type.READ);
-		if(project == null) {
+		Project project = getProjectPageByPermission(page, projectId, user,
+				Type.READ);
+		if (project == null) {
 			page.render();
 			return;
 		}
-		
+
 		page.add("projectId", project.getId());
 		page.add("projectName", project.getName());
 		page.add("flowid", flow.getFlowId());
-		
+
 		page.render();
 	}
-	
-	protected Project getProjectPageByPermission(Page page, int projectId, User user, Permission.Type type) {
+
+	protected Project getProjectPageByPermission(Page page, int projectId,
+			User user, Permission.Type type) {
 		Project project = projectManager.getProject(projectId);
-		
+
 		if (project == null) {
 			page.add("errorMsg", "Project " + project + " not found.");
-		}
-		else if (!hasPermission(project, user, type)) {
-			page.add("errorMsg", "User " + user.getUserId() + " doesn't have " + type.name() + " permissions on " + project.getName());
-		}
-		else {
+		} else if (!hasPermission(project, user, type)) {
+			page.add("errorMsg", "User " + user.getUserId() + " doesn't have "
+					+ type.name() + " permissions on " + project.getName());
+		} else {
 			return project;
 		}
-		
+
 		return null;
 	}
 
-	protected Project getProjectAjaxByPermission(Map<String, Object> ret, String projectName, User user, Permission.Type type) {
+	protected Project getProjectAjaxByPermission(Map<String, Object> ret,
+			String projectName, User user, Permission.Type type) {
 		Project project = projectManager.getProject(projectName);
-		
+
 		if (project == null) {
 			ret.put("error", "Project '" + project + "' not found.");
-		}
-		else if (!hasPermission(project, user, type)) {
-			ret.put("error", "User '" + user.getUserId() + "' doesn't have " + type.name() + " permissions on " + project.getName());
-		}
-		else {
+		} else if (!hasPermission(project, user, type)) {
+			ret.put("error", "User '" + user.getUserId() + "' doesn't have "
+					+ type.name() + " permissions on " + project.getName());
+		} else {
 			return project;
 		}
-		
+
 		return null;
 	}
-	
-	protected Project getProjectAjaxByPermission(Map<String, Object> ret, int projectId, User user, Permission.Type type) {
+
+	protected Project getProjectAjaxByPermission(Map<String, Object> ret,
+			int projectId, User user, Permission.Type type) {
 		Project project = projectManager.getProject(projectId);
-		
+
 		if (project == null) {
 			ret.put("error", "Project '" + project + "' not found.");
-		}
-		else if (!hasPermission(project, user, type)) {
-			ret.put("error", "User '" + user.getUserId() + "' doesn't have " + type.name() + " permissions on " + project.getName());
-		}
-		else {
+		} else if (!hasPermission(project, user, type)) {
+			ret.put("error", "User '" + user.getUserId() + "' doesn't have "
+					+ type.name() + " permissions on " + project.getName());
+		} else {
 			return project;
 		}
-		
+
 		return null;
 	}
-	
+
 	@Override
-	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+	protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
+			Session session) throws ServletException, IOException {
 		if (hasParam(req, "ajax")) {
 			handleAJAXAction(req, resp, session);
 		}
 	}
 
-	private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+	private void handleAJAXAction(HttpServletRequest req,
+			HttpServletResponse resp, Session session) throws ServletException,
+			IOException {
 		HashMap<String, Object> ret = new HashMap<String, Object>();
 		String ajaxName = getParam(req, "ajax");
-		
+
 		if (hasParam(req, "execid")) {
 			int execid = getIntParam(req, "execid");
 			ExecutableFlow exFlow = null;
@@ -231,64 +250,60 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			try {
 				exFlow = executorManager.getExecutableFlow(execid);
 			} catch (ExecutorManagerException e) {
-				ret.put("error", "Error fetching execution '" + execid + "': " + e.getMessage());
+				ret.put("error", "Error fetching execution '" + execid + "': "
+						+ e.getMessage());
 			}
 
 			if (exFlow == null) {
 				ret.put("error", "Cannot find execution '" + execid + "'");
-			}
-			else {
+			} else {
 				if (ajaxName.equals("fetchexecflow")) {
-					ajaxFetchExecutableFlow(req, resp, ret, session.getUser(), exFlow);
-				}
-				else if (ajaxName.equals("fetchexecflowupdate")) {
-					ajaxFetchExecutableFlowUpdate(req, resp, ret, session.getUser(), exFlow);
-				}
-				else if (ajaxName.equals("cancelFlow")) {
+					ajaxFetchExecutableFlow(req, resp, ret, session.getUser(),
+							exFlow);
+				} else if (ajaxName.equals("fetchexecflowupdate")) {
+					ajaxFetchExecutableFlowUpdate(req, resp, ret,
+							session.getUser(), exFlow);
+				} else if (ajaxName.equals("cancelFlow")) {
 					ajaxCancelFlow(req, resp, ret, session.getUser(), exFlow);
-				}
-				else if (ajaxName.equals("restartFlow")) {
+				} else if (ajaxName.equals("restartFlow")) {
 					ajaxRestartFlow(req, resp, ret, session.getUser(), exFlow);
-				}
-				else if (ajaxName.equals("pauseFlow")) {
+				} else if (ajaxName.equals("pauseFlow")) {
 					ajaxPauseFlow(req, resp, ret, session.getUser(), exFlow);
-				}
-				else if (ajaxName.equals("resumeFlow")) {
+				} else if (ajaxName.equals("resumeFlow")) {
 					ajaxResumeFlow(req, resp, ret, session.getUser(), exFlow);
-				}
-				else if (ajaxName.equals("fetchExecFlowLogs")) {
-					ajaxFetchExecFlowLogs(req, resp, ret, session.getUser(), exFlow);
-				}
-				else if (ajaxName.equals("fetchExecJobLogs")) {
+				} else if (ajaxName.equals("fetchExecFlowLogs")) {
+					ajaxFetchExecFlowLogs(req, resp, ret, session.getUser(),
+							exFlow);
+				} else if (ajaxName.equals("fetchExecJobLogs")) {
 					ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
-				}
-				else if (ajaxName.equals("retryFailedJobs")) {
+				} else if (ajaxName.equals("retryFailedJobs")) {
 					ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
 				}
-//				else if (ajaxName.equals("fetchLatestJobStatus")) {
-//					ajaxFetchLatestJobStatus(req, resp, ret, session.getUser(), exFlow);
-//				}
+				// else if (ajaxName.equals("fetchLatestJobStatus")) {
+				// ajaxFetchLatestJobStatus(req, resp, ret, session.getUser(),
+				// exFlow);
+				// }
 				else if (ajaxName.equals("flowInfo")) {
-					//String projectName = getParam(req, "project");
-					//Project project = projectManager.getProject(projectName);
-					//String flowName = getParam(req, "flow");
-					ajaxFetchExecutableFlowInfo(req, resp, ret, session.getUser(), exFlow);
+					// String projectName = getParam(req, "project");
+					// Project project = projectManager.getProject(projectName);
+					// String flowName = getParam(req, "flow");
+					ajaxFetchExecutableFlowInfo(req, resp, ret,
+							session.getUser(), exFlow);
 				}
 			}
-		}
-		else if (ajaxName.equals("getRunning")) {
+		} else if (ajaxName.equals("getRunning")) {
 			String projectName = getParam(req, "project");
 			String flowName = getParam(req, "flow");
-			ajaxGetFlowRunning(req, resp, ret, session.getUser(), projectName, flowName);
-		}
-		else if (ajaxName.equals("flowInfo")) {
+			ajaxGetFlowRunning(req, resp, ret, session.getUser(), projectName,
+					flowName);
+		} else if (ajaxName.equals("flowInfo")) {
 			String projectName = getParam(req, "project");
 			String flowName = getParam(req, "flow");
-			ajaxFetchFlowInfo(req, resp, ret, session.getUser(), projectName, flowName);
-		}
-		else {
+			ajaxFetchFlowInfo(req, resp, ret, session.getUser(), projectName,
+					flowName);
+		} else {
 			String projectName = getParam(req, "project");
-			
+
 			ret.put("project", projectName);
 			if (ajaxName.equals("executeFlow")) {
 				ajaxAttemptExecuteFlow(req, resp, ret, session.getUser());
@@ -299,68 +314,78 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
-//	private void ajaxFetchLatestJobStatus(HttpServletRequest req,HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) {
-//		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
-//		if (project == null) {
-//			ret.put("error", "Project doesn't exist or incorrect access permission.");
-//			return;
-//		}
-//		
-//		String projectName;
-//		String flowName;
-//		String jobName;
-//		try {
-//			projectName = getParam(req, "projectName");
-//			flowName = getParam(req, "flowName");
-//			jobName = getParam(req, "jobName");
-//		} catch (Exception e) {
-//			ret.put("error", e.getMessage());
-//			return;
-//		}
-//		
-//		try {
-//			ExecutableNode node = exFlow.getExecutableNode(jobId);
-//			if (node == null) {
-//				ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
-//				return;
-//			}
-//			
-//			int attempt = this.getIntParam(req, "attempt", node.getAttempt());
-//			LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
-//			if (data == null) {
-//				ret.put("length", 0);
-//				ret.put("offset", offset);
-//				ret.put("data", "");
-//			}
-//			else {
-//				ret.put("length", data.getLength());
-//				ret.put("offset", data.getOffset());
-//				ret.put("data", data.getData());
-//			}
-//		} catch (ExecutorManagerException e) {
-//			throw new ServletException(e);
-//		}
-//		
-//	}
-
-	private void ajaxRestartFailed(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+	// private void ajaxFetchLatestJobStatus(HttpServletRequest
+	// req,HttpServletResponse resp, HashMap<String, Object> ret, User user,
+	// ExecutableFlow exFlow) {
+	// Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(),
+	// user, Type.READ);
+	// if (project == null) {
+	// ret.put("error",
+	// "Project doesn't exist or incorrect access permission.");
+	// return;
+	// }
+	//
+	// String projectName;
+	// String flowName;
+	// String jobName;
+	// try {
+	// projectName = getParam(req, "projectName");
+	// flowName = getParam(req, "flowName");
+	// jobName = getParam(req, "jobName");
+	// } catch (Exception e) {
+	// ret.put("error", e.getMessage());
+	// return;
+	// }
+	//
+	// try {
+	// ExecutableNode node = exFlow.getExecutableNode(jobId);
+	// if (node == null) {
+	// ret.put("error", "Job " + jobId + " doesn't exist in " +
+	// exFlow.getExecutionId());
+	// return;
+	// }
+	//
+	// int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+	// LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset,
+	// length, attempt);
+	// if (data == null) {
+	// ret.put("length", 0);
+	// ret.put("offset", offset);
+	// ret.put("data", "");
+	// }
+	// else {
+	// ret.put("length", data.getLength());
+	// ret.put("offset", data.getOffset());
+	// ret.put("data", data.getData());
+	// }
+	// } catch (ExecutorManagerException e) {
+	// throw new ServletException(e);
+	// }
+	//
+	// }
+
+	private void ajaxRestartFailed(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.EXECUTE);
 		if (project == null) {
 			return;
 		}
-		
-		if (exFlow.getStatus() == Status.FAILED || exFlow.getStatus() == Status.SUCCEEDED) {
+
+		if (exFlow.getStatus() == Status.FAILED
+				|| exFlow.getStatus() == Status.SUCCEEDED) {
 			ret.put("error", "Flow has already finished. Please re-execute.");
 			return;
 		}
-		
+
 		try {
 			executorManager.retryFailures(exFlow, user.getUserId());
 		} catch (ExecutorManagerException e) {
 			ret.put("error", e.getMessage());
 		}
 	}
-	
+
 	/**
 	 * Gets the logs through plain text stream to reduce memory overhead.
 	 * 
@@ -370,25 +395,28 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	 * @param exFlow
 	 * @throws ServletException
 	 */
-	private void ajaxFetchExecFlowLogs(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+	private void ajaxFetchExecFlowLogs(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.READ);
 		if (project == null) {
 			return;
 		}
-		
+
 		int offset = this.getIntParam(req, "offset");
 		int length = this.getIntParam(req, "length");
-		
+
 		resp.setCharacterEncoding("utf-8");
 
 		try {
-			LogData data = executorManager.getExecutableFlowLog(exFlow, offset, length);
+			LogData data = executorManager.getExecutableFlowLog(exFlow, offset,
+					length);
 			if (data == null) {
 				ret.put("length", 0);
 				ret.put("offset", offset);
 				ret.put("data", "");
-			}
-			else {
+			} else {
 				ret.put("length", data.getLength());
 				ret.put("offset", data.getOffset());
 				ret.put("data", data.getData());
@@ -397,7 +425,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			throw new ServletException(e);
 		}
 	}
-	
+
 	/**
 	 * Gets the logs through ajax plain text stream to reduce memory overhead.
 	 * 
@@ -407,33 +435,38 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	 * @param exFlow
 	 * @throws ServletException
 	 */
-	private void ajaxFetchJobLogs(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+	private void ajaxFetchJobLogs(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.READ);
 		if (project == null) {
 			return;
 		}
-		
+
 		int offset = this.getIntParam(req, "offset");
 		int length = this.getIntParam(req, "length");
-		
+
 		String jobId = this.getParam(req, "jobId");
 		resp.setCharacterEncoding("utf-8");
 
 		try {
 			ExecutableNode node = exFlow.getExecutableNode(jobId);
 			if (node == null) {
-				ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+				ret.put("error",
+						"Job " + jobId + " doesn't exist in "
+								+ exFlow.getExecutionId());
 				return;
 			}
-			
+
 			int attempt = this.getIntParam(req, "attempt", node.getAttempt());
-			LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
+			LogData data = executorManager.getExecutionJobLog(exFlow, jobId,
+					offset, length, attempt);
 			if (data == null) {
 				ret.put("length", 0);
 				ret.put("offset", offset);
 				ret.put("data", "");
-			}
-			else {
+			} else {
 				ret.put("length", data.getLength());
 				ret.put("offset", data.getOffset());
 				ret.put("data", data.getData());
@@ -444,7 +477,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	/**
-	 * Gets the job metadata through ajax plain text stream to reduce memory overhead.
+	 * Gets the job metadata through ajax plain text stream to reduce memory
+	 * overhead.
 	 * 
 	 * @param req
 	 * @param resp
@@ -452,33 +486,38 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	 * @param exFlow
 	 * @throws ServletException
 	 */
-	private void ajaxFetchJobMetaData(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+	private void ajaxFetchJobMetaData(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.READ);
 		if (project == null) {
 			return;
 		}
-		
+
 		int offset = this.getIntParam(req, "offset");
 		int length = this.getIntParam(req, "length");
-		
+
 		String jobId = this.getParam(req, "jobId");
 		resp.setCharacterEncoding("utf-8");
 
 		try {
 			ExecutableNode node = exFlow.getExecutableNode(jobId);
 			if (node == null) {
-				ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+				ret.put("error",
+						"Job " + jobId + " doesn't exist in "
+								+ exFlow.getExecutionId());
 				return;
 			}
-			
+
 			int attempt = this.getIntParam(req, "attempt", node.getAttempt());
-			JobMetaData data = executorManager.getExecutionJobMetaData(exFlow, jobId, offset, length, attempt);
+			JobMetaData data = executorManager.getExecutionJobMetaData(exFlow,
+					jobId, offset, length, attempt);
 			if (data == null) {
 				ret.put("length", 0);
 				ret.put("offset", offset);
 				ret.put("data", "");
-			}
-			else {
+			} else {
 				ret.put("length", data.getLength());
 				ret.put("offset", data.getOffset());
 				ret.put("data", data.getData());
@@ -487,93 +526,105 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			throw new ServletException(e);
 		}
 	}
-	
-	private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
-		Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
+
+	private void ajaxFetchFlowInfo(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			String projectName, String flowId) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret, projectName, user,
+				Type.READ);
 		if (project == null) {
 			return;
 		}
-		
+
 		Flow flow = project.getFlow(flowId);
 		if (flow == null) {
-			ret.put("error", "Error loading flow. Flow " + flowId + " doesn't exist in " + projectName);
+			ret.put("error", "Error loading flow. Flow " + flowId
+					+ " doesn't exist in " + projectName);
 			return;
 		}
-		
+
 		ret.put("successEmails", flow.getSuccessEmails());
 		ret.put("failureEmails", flow.getFailureEmails());
-		
+
 		Schedule sflow = null;
-		for (Schedule sched: scheduleManager.getSchedules()) {
-			if (sched.getProjectId() == project.getId() && sched.getFlowName().equals(flowId)) {
+		for (Schedule sched : scheduleManager.getSchedules()) {
+			if (sched.getProjectId() == project.getId()
+					&& sched.getFlowName().equals(flowId)) {
 				sflow = sched;
 				break;
 			}
 		}
-		
+
 		if (sflow != null) {
 			ret.put("scheduled", sflow.getNextExecTime());
 		}
 	}
 
-	private void ajaxFetchExecutableFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exflow) throws ServletException {
-		Project project = getProjectAjaxByPermission(ret, exflow.getProjectId(), user, Type.READ);
+	private void ajaxFetchExecutableFlowInfo(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exflow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret,
+				exflow.getProjectId(), user, Type.READ);
 		if (project == null) {
 			return;
 		}
-		
+
 		Flow flow = project.getFlow(exflow.getFlowId());
 		if (flow == null) {
-			ret.put("error", "Error loading flow. Flow " + exflow.getFlowId() + " doesn't exist in " + exflow.getProjectId());
+			ret.put("error", "Error loading flow. Flow " + exflow.getFlowId()
+					+ " doesn't exist in " + exflow.getProjectId());
 			return;
 		}
-		
+
 		ExecutionOptions options = exflow.getExecutionOptions();
-		
+
 		ret.put("successEmails", options.getSuccessEmails());
 		ret.put("failureEmails", options.getFailureEmails());
 		ret.put("flowParam", options.getFlowParameters());
-		
+
 		FailureAction action = options.getFailureAction();
 		String failureAction = null;
 		switch (action) {
-			case FINISH_CURRENTLY_RUNNING:
-				failureAction = "finishCurrent";
-				break;
-			case CANCEL_ALL:
-				failureAction = "cancelImmediately";
-				break;
-			case FINISH_ALL_POSSIBLE:
-				failureAction = "finishPossible";
-				break;
+		case FINISH_CURRENTLY_RUNNING:
+			failureAction = "finishCurrent";
+			break;
+		case CANCEL_ALL:
+			failureAction = "cancelImmediately";
+			break;
+		case FINISH_ALL_POSSIBLE:
+			failureAction = "finishPossible";
+			break;
 		}
 		ret.put("failureAction", failureAction);
-		
+
 		ret.put("notifyFailureFirst", options.getNotifyOnFirstFailure());
 		ret.put("notifyFailureLast", options.getNotifyOnLastFailure());
-		
+
 		ret.put("failureEmailsOverride", options.isFailureEmailsOverridden());
 		ret.put("successEmailsOverride", options.isSuccessEmailsOverridden());
-		
+
 		ret.put("concurrentOptions", options.getConcurrentOption());
 		ret.put("pipelineLevel", options.getPipelineLevel());
 		ret.put("pipelineExecution", options.getPipelineExecutionId());
 		ret.put("queueLevel", options.getQueueLevel());
-		
-		HashMap<String, String> nodeStatus = new HashMap<String,String>();
-		for(ExecutableNode node : exflow.getExecutableNodes()) {
-			nodeStatus.put(node.getJobId(), node.getStatus().toString());
+
+		HashMap<String, String> nodeStatus = new HashMap<String, String>();
+		for (ExecutableNode node : exflow.getExecutableNodes()) {
+			nodeStatus.put(node.getId(), node.getStatus().toString());
 		}
 		ret.put("nodeStatus", nodeStatus);
 		ret.put("disabled", options.getDisabledJobs());
 	}
-	
-	private void ajaxCancelFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+
+	private void ajaxCancelFlow(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.EXECUTE);
 		if (project == null) {
 			return;
 		}
-		
+
 		try {
 			executorManager.cancelFlow(exFlow, user.getUserId());
 		} catch (ExecutorManagerException e) {
@@ -581,27 +632,37 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
-	private void ajaxGetFlowRunning(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectId, String flowId) throws ServletException{
-		Project project = getProjectAjaxByPermission(ret, projectId, user, Type.EXECUTE);
+	private void ajaxGetFlowRunning(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			String projectId, String flowId) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret, projectId, user,
+				Type.EXECUTE);
 		if (project == null) {
 			return;
 		}
-		
-		List<Integer> refs = executorManager.getRunningFlows(project.getId(), flowId);
+
+		List<Integer> refs = executorManager.getRunningFlows(project.getId(),
+				flowId);
 		if (!refs.isEmpty()) {
 			ret.put("execIds", refs);
 		}
 	}
-	
-	private void ajaxRestartFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+
+	private void ajaxRestartFlow(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.EXECUTE);
 		if (project == null) {
 			return;
 		}
 	}
 
-	private void ajaxPauseFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+	private void ajaxPauseFlow(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.EXECUTE);
 		if (project == null) {
 			return;
 		}
@@ -613,8 +674,11 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
-	private void ajaxResumeFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+	private void ajaxResumeFlow(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.EXECUTE);
 		if (project == null) {
 			return;
 		}
@@ -625,12 +689,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			ret.put("resume", e.getMessage());
 		}
 	}
-	
-	private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException{
+
+	private void ajaxFetchExecutableFlowUpdate(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
 		Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
 		System.out.println("Fetching " + exFlow.getExecutionId());
-		
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.READ);
 		if (project == null) {
 			return;
 		}
@@ -641,18 +708,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			if (node.getUpdateTime() <= lastUpdateTime) {
 				continue;
 			}
-			
-			HashMap<String, Object> nodeObj = new HashMap<String,Object>();
-			nodeObj.put("id", node.getJobId());
+
+			HashMap<String, Object> nodeObj = new HashMap<String, Object>();
+			nodeObj.put("id", node.getId());
 			nodeObj.put("status", node.getStatus());
 			nodeObj.put("startTime", node.getStartTime());
 			nodeObj.put("endTime", node.getEndTime());
 			nodeObj.put("attempt", node.getAttempt());
-			
+
 			if (node.getAttempt() > 0) {
 				nodeObj.put("pastAttempts", node.getAttemptObjects());
 			}
-			
+
 			nodeList.add(nodeObj);
 		}
 
@@ -663,40 +730,42 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		ret.put("submitTime", exFlow.getSubmitTime());
 		ret.put("updateTime", exFlow.getUpdateTime());
 	}
-	
-	private void ajaxFetchExecutableFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
+
+	private void ajaxFetchExecutableFlow(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+			ExecutableFlow exFlow) throws ServletException {
 		System.out.println("Fetching " + exFlow.getExecutionId());
 
-		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+		Project project = getProjectAjaxByPermission(ret,
+				exFlow.getProjectId(), user, Type.READ);
 		if (project == null) {
 			return;
 		}
-		
+
 		ArrayList<Map<String, Object>> nodeList = new ArrayList<Map<String, Object>>();
-		ArrayList<Map<String, Object>> edgeList = new ArrayList<Map<String,Object>>();
+		ArrayList<Map<String, Object>> edgeList = new ArrayList<Map<String, Object>>();
 		for (ExecutableNode node : exFlow.getExecutableNodes()) {
-			HashMap<String, Object> nodeObj = new HashMap<String,Object>();
-			nodeObj.put("id", node.getJobId());
-			nodeObj.put("level", node.getLevel());
+			HashMap<String, Object> nodeObj = new HashMap<String, Object>();
+			nodeObj.put("id", node.getId());
 			nodeObj.put("status", node.getStatus());
 			nodeObj.put("startTime", node.getStartTime());
 			nodeObj.put("endTime", node.getEndTime());
-			
+
 			// Add past attempts
 			if (node.getPastAttemptList() != null) {
 				ArrayList<Object> pastAttempts = new ArrayList<Object>();
-				for (ExecutableNode.Attempt attempt: node.getPastAttemptList()) {
+				for (ExecutionAttempt attempt : node.getPastAttemptList()) {
 					pastAttempts.add(attempt.toObject());
 				}
 				nodeObj.put("pastAttempts", pastAttempts);
 			}
-			
+
 			nodeList.add(nodeObj);
-			
+
 			// Add edges
-			for (String out: node.getOutNodes()) {
-				HashMap<String, Object> edgeObj = new HashMap<String,Object>();
-				edgeObj.put("from", node.getJobId());
+			for (String out : node.getOutNodes()) {
+				HashMap<String, Object> edgeObj = new HashMap<String, Object>();
+				edgeObj.put("from", node.getId());
 				edgeObj.put("target", out);
 				edgeList.add(edgeObj);
 			}
@@ -710,46 +779,53 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		ret.put("submitTime", exFlow.getSubmitTime());
 		ret.put("submitUser", exFlow.getSubmitUser());
 	}
-	
-	private void ajaxAttemptExecuteFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
+
+	private void ajaxAttemptExecuteFlow(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user)
+			throws ServletException {
 		String projectName = getParam(req, "project");
 		String flowId = getParam(req, "flow");
-		
-		Project project = getProjectAjaxByPermission(ret, projectName, user, Type.EXECUTE);
+
+		Project project = getProjectAjaxByPermission(ret, projectName, user,
+				Type.EXECUTE);
 		if (project == null) {
 			ret.put("error", "Project '" + projectName + "' doesn't exist.");
 			return;
 		}
-		
-		ret.put("flow",  flowId);
+
+		ret.put("flow", flowId);
 		Flow flow = project.getFlow(flowId);
 		if (flow == null) {
-			ret.put("error", "Flow '" + flowId + "' cannot be found in project " + project);
+			ret.put("error", "Flow '" + flowId
+					+ "' cannot be found in project " + project);
 			return;
 		}
-		
+
 		ajaxExecuteFlow(req, resp, ret, user);
 	}
-	
-	private void ajaxExecuteFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException {
+
+	private void ajaxExecuteFlow(HttpServletRequest req,
+			HttpServletResponse resp, HashMap<String, Object> ret, User user)
+			throws ServletException {
 		String projectName = getParam(req, "project");
 		String flowId = getParam(req, "flow");
-		
-		Project project = getProjectAjaxByPermission(ret, projectName, user, Type.EXECUTE);
+
+		Project project = getProjectAjaxByPermission(ret, projectName, user,
+				Type.EXECUTE);
 		if (project == null) {
 			ret.put("error", "Project '" + projectName + "' doesn't exist.");
 			return;
 		}
-		
-		ret.put("flow",  flowId);
+
+		ret.put("flow", flowId);
 		Flow flow = project.getFlow(flowId);
 		if (flow == null) {
-			ret.put("error", "Flow '" + flowId + "' cannot be found in project " + project);
+			ret.put("error", "Flow '" + flowId
+					+ "' cannot be found in project " + project);
 			return;
 		}
-		
-		ExecutableFlow exflow = new ExecutableFlow(flow);
-		exflow.setSubmitUser(user.getUserId());
+
+		ExecutableFlow exflow = new ExecutableFlow(project, flow);
 		exflow.addAllProxyUsers(project.getProxyUsers());
 
 		ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);
@@ -760,26 +836,26 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		if (!options.isSuccessEmailsOverridden()) {
 			options.setSuccessEmails(flow.getSuccessEmails());
 		}
-		
+
 		try {
-			String message = executorManager.submitExecutableFlow(exflow);
+			String message = executorManager.submitExecutableFlow(exflow, user.getUserId());
 			ret.put("message", message);
-		}
-		catch (ExecutorManagerException e) {
+		} catch (ExecutorManagerException e) {
 			e.printStackTrace();
-			ret.put("error", "Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage());
+			ret.put("error", "Error submitting flow " + exflow.getFlowId()
+					+ ". " + e.getMessage());
 		}
 
 		ret.put("execid", exflow.getExecutionId());
 	}
-	
+
 	public class ExecutorVelocityHelper {
 		public String getProjectName(int id) {
 			Project project = projectManager.getProject(id);
 			if (project == null) {
 				return String.valueOf(id);
 			}
-			
+
 			return project.getName();
 		}
 	}
diff --git a/src/java/azkaban/webapp/session/SessionCache.java b/src/java/azkaban/webapp/session/SessionCache.java
index 09d0b36..a21248b 100644
--- a/src/java/azkaban/webapp/session/SessionCache.java
+++ b/src/java/azkaban/webapp/session/SessionCache.java
@@ -20,7 +20,6 @@ import azkaban.utils.Props;
 import azkaban.utils.cache.Cache;
 import azkaban.utils.cache.CacheManager;
 import azkaban.utils.cache.Cache.EjectionPolicy;
-import azkaban.utils.cache.Element;
 
 
 /**
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index 2bdfc7a..aac1fb1 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -20,6 +20,7 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.test.execapp.EventCollectorListener;
 import azkaban.test.execapp.MockExecutorLoader;
@@ -136,15 +137,15 @@ public class LocalFlowWatcherTest {
 			Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
 			
 			// check it's start time is after the first's children.
-			ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+			ExecutableNode watchedNode = first.getExecutableNode(node.getId());
 			if (watchedNode == null) {
 				continue;
 			}
 			Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
 			
-			System.out.println("Node " + node.getJobId() + 
+			System.out.println("Node " + node.getId() + 
 					" start: " + node.getStartTime() + 
-					" dependent on " + watchedNode.getJobId() + 
+					" dependent on " + watchedNode.getId() + 
 					" " + watchedNode.getEndTime() + 
 					" diff: " + (node.getStartTime() - watchedNode.getEndTime()));
 
@@ -170,7 +171,7 @@ public class LocalFlowWatcherTest {
 			Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
 			
 			// check it's start time is after the first's children.
-			ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+			ExecutableNode watchedNode = first.getExecutableNode(node.getId());
 			if (watchedNode == null) {
 				continue;
 			}
@@ -185,7 +186,7 @@ public class LocalFlowWatcherTest {
 				Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
 				long diff = node.getStartTime() - child.getEndTime();
 				minDiff = Math.min(minDiff, diff);
-				System.out.println("Node " + node.getJobId() + 
+				System.out.println("Node " + node.getId() + 
 						" start: " + node.getStartTime() + 
 						" dependent on " + watchedChild + " " + child.getEndTime() +
 						" diff: " + diff);
@@ -228,8 +229,9 @@ public class LocalFlowWatcherTest {
 		@SuppressWarnings("unchecked")
 		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
 		
+		Project project = new Project(1, "test");
 		Flow flow = Flow.flowFromObject(flowObj);
-		ExecutableFlow execFlow = new ExecutableFlow(flow);
+		ExecutableFlow execFlow = new ExecutableFlow(project, flow);
 		execFlow.setExecutionId(execId);
 		execFlow.setExecutionPath(workingDir.getPath());
 		return execFlow;
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index edf520c..fdff895 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -20,6 +20,7 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.test.execapp.EventCollectorListener;
 import azkaban.test.execapp.MockExecutorLoader;
@@ -137,15 +138,15 @@ public class RemoteFlowWatcherTest {
 			Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
 			
 			// check it's start time is after the first's children.
-			ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+			ExecutableNode watchedNode = first.getExecutableNode(node.getId());
 			if (watchedNode == null) {
 				continue;
 			}
 			Assert.assertEquals(watchedNode.getStatus(), Status.SUCCEEDED);
 			
-			System.out.println("Node " + node.getJobId() + 
+			System.out.println("Node " + node.getId() + 
 					" start: " + node.getStartTime() + 
-					" dependent on " + watchedNode.getJobId() + 
+					" dependent on " + watchedNode.getId() + 
 					" " + watchedNode.getEndTime() + 
 					" diff: " + (node.getStartTime() - watchedNode.getEndTime()));
 
@@ -170,7 +171,7 @@ public class RemoteFlowWatcherTest {
 			Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
 			
 			// check it's start time is after the first's children.
-			ExecutableNode watchedNode = first.getExecutableNode(node.getJobId());
+			ExecutableNode watchedNode = first.getExecutableNode(node.getId());
 			if (watchedNode == null) {
 				continue;
 			}
@@ -185,7 +186,7 @@ public class RemoteFlowWatcherTest {
 				Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
 				long diff = node.getStartTime() - child.getEndTime();
 				minDiff = Math.min(minDiff, diff);
-				System.out.println("Node " + node.getJobId() + 
+				System.out.println("Node " + node.getId() + 
 						" start: " + node.getStartTime() + 
 						" dependent on " + watchedChild + " " + child.getEndTime() +
 						" diff: " + diff);
@@ -228,8 +229,9 @@ public class RemoteFlowWatcherTest {
 		@SuppressWarnings("unchecked")
 		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
 		
+		Project project = new Project(1, "test");
 		Flow flow = Flow.flowFromObject(flowObj);
-		ExecutableFlow execFlow = new ExecutableFlow(flow);
+		ExecutableFlow execFlow = new ExecutableFlow(project, flow);
 		execFlow.setExecutionId(execId);
 		execFlow.setExecutionPath(workingDir.getPath());
 		return execFlow;
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index da1ed27..18513a3 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -22,6 +22,7 @@ import azkaban.executor.Status;
 
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.test.executor.JavaJob;
 import azkaban.utils.JSONUtils;
@@ -327,7 +328,7 @@ public class FlowRunnerTest {
 		ExecutableNode node = flow.getExecutableNode(name);
 		
 		if (node.getStatus() != status) {
-			Assert.fail("Status of job " + node.getJobId() + " is " + node.getStatus() + " not " + status + " as expected.");
+			Assert.fail("Status of job " + node.getId() + " is " + node.getStatus() + " not " + status + " as expected.");
 		}
 	}
 	
@@ -346,8 +347,11 @@ public class FlowRunnerTest {
 		@SuppressWarnings("unchecked")
 		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
 		
+		Project project = new Project(1, "myproject");
+		project.setVersion(2);
+
 		Flow flow = Flow.flowFromObject(flowObj);
-		ExecutableFlow execFlow = new ExecutableFlow(flow);
+		ExecutableFlow execFlow = new ExecutableFlow(project, flow);
 		execFlow.setExecutionId(execId);
 		execFlow.setExecutionPath(workingDir.getPath());
 		return execFlow;
@@ -373,7 +377,7 @@ public class FlowRunnerTest {
 		
 		//System.out.println("Node " + node.getJobId() + " start:" + startTime + " end:" + endTime + " previous:" + previousEndTime);
 		Assert.assertTrue("Checking start and end times", startTime > 0 && endTime >= startTime);
-		Assert.assertTrue("Start time for " + node.getJobId() + " is " + startTime +" and less than " + previousEndTime, startTime >= previousEndTime);
+		Assert.assertTrue("Start time for " + node.getId() + " is " + startTime +" and less than " + previousEndTime, startTime >= previousEndTime);
 		
 		for (String outNode : node.getOutNodes()) {
 			ExecutableNode childNode = flow.getExecutableNode(outNode);
diff --git a/unit/java/azkaban/test/execapp/JobRunnerTest.java b/unit/java/azkaban/test/execapp/JobRunnerTest.java
index 1f8edc2..0698483 100644
--- a/unit/java/azkaban/test/execapp/JobRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/JobRunnerTest.java
@@ -78,7 +78,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(outputProps != null);
 		Assert.assertTrue(logFile.exists());
 		
-		Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 		
 		Assert.assertTrue(eventCollector.checkOrdering());
 		try {
@@ -110,7 +110,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(logFile.exists());
 		Assert.assertTrue(eventCollector.checkOrdering());
 		Assert.assertTrue(!runner.isCancelled());
-		Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 		
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_STATUS_CHANGED, Type.JOB_FINISHED});
@@ -145,7 +145,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(runner.getLogFilePath() == null);
 		Assert.assertTrue(eventCollector.checkOrdering());
 		
-		Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == null);
+		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
 		
 		try {
 			eventCollector.checkEventExists(new Type[] {Type.JOB_STARTED, Type.JOB_FINISHED});
@@ -175,7 +175,7 @@ public class JobRunnerTest {
 		// Give it 10 ms to fail.
 		Assert.assertTrue( node.getEndTime() - node.getStartTime() < 10);
 		
-		Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == null);
+		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
 		
 		// Log file and output files should not exist.
 		Props outputProps = runner.getOutputProps();
@@ -223,7 +223,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
 		// Give it 10 ms to fail.
 		Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);
-		Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 		
 		// Log file and output files should not exist.
 		File logFile = new File(runner.getLogFilePath());
@@ -268,7 +268,7 @@ public class JobRunnerTest {
 		Assert.assertTrue(outputProps != null);
 		Assert.assertTrue(logFile.exists());
 		Assert.assertFalse(runner.isCancelled());
-		Assert.assertTrue(loader.getNodeUpdateCount(node.getJobId()) == 3);
+		Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 		
 		Assert.assertTrue(eventCollector.checkOrdering());
 		try {
@@ -350,8 +350,8 @@ public class JobRunnerTest {
 		ExecutableFlow flow = new ExecutableFlow();
 		flow.setExecutionId(execId);
 		ExecutableNode node = new ExecutableNode();
-		node.setJobId(name);
-		node.setExecutableFlow(flow);
+		node.setId(name);
+		node.setParentFlow(flow);
 		
 		Props props = createProps(time, fail);
 		HashSet<String> proxyUsers = new HashSet<String>();
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 2b81f82..05641ec 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -65,7 +65,6 @@ public class MockExecutorLoader implements ExecutorLoader {
 
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
 		ExecutableFlow toUpdate = flows.get(flow.getExecutionId());
@@ -76,24 +75,27 @@ public class MockExecutorLoader implements ExecutorLoader {
 
 	@Override
 	public void uploadExecutableNode(ExecutableNode node, Props inputParams) throws ExecutorManagerException {
-		nodes.put(node.getJobId(), ExecutableNode.createNodeFromObject(node.toObject(), null));
-		jobUpdateCount.put(node.getJobId(), 1);
+		ExecutableNode exNode = new ExecutableNode();
+		exNode.fillExecutableFromMapObject(node.toObject());
+		
+		nodes.put(node.getId(), exNode);
+		jobUpdateCount.put(node.getId(), 1);
 	}
 
 	@Override
 	public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
-		ExecutableNode foundNode = nodes.get(node.getJobId());
+		ExecutableNode foundNode = nodes.get(node.getId());
 		foundNode.setEndTime(node.getEndTime());
 		foundNode.setStartTime(node.getStartTime());
 		foundNode.setStatus(node.getStatus());
 		foundNode.setUpdateTime(node.getUpdateTime());
 		
-		Integer value = jobUpdateCount.get(node.getJobId());
+		Integer value = jobUpdateCount.get(node.getId());
 		if (value == null) {
 			throw new ExecutorManagerException("The node has not been uploaded");
 		}
 		else {
-			jobUpdateCount.put(node.getJobId(), ++value);
+			jobUpdateCount.put(node.getId(), ++value);
 		}
 		
 		flowUpdateCount++;
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index ba89a20..86995f5 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -245,7 +245,7 @@ public class JdbcExecutorLoaderTest {
 		Assert.assertEquals(flow.getProjectId(), info.getProjectId());
 		Assert.assertEquals(flow.getVersion(), info.getVersion());
 		Assert.assertEquals(flow.getFlowId(), info.getFlowId());
-		Assert.assertEquals(oldNode.getJobId(), info.getJobId());
+		Assert.assertEquals(oldNode.getId(), info.getJobId());
 		Assert.assertEquals(oldNode.getStatus(), info.getStatus());
 		Assert.assertEquals(oldNode.getStartTime(), info.getStartTime());
 		Assert.assertEquals("endTime = " + oldNode.getEndTime() + " info endTime = " + info.getEndTime(), oldNode.getEndTime(), info.getEndTime());
@@ -409,7 +409,8 @@ public class JdbcExecutorLoaderTest {
 		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
 		
 		Flow flow = Flow.flowFromObject(flowObj);
-		ExecutableFlow execFlow = new ExecutableFlow(executionId, flow);
+		ExecutableFlow execFlow = new ExecutableFlow(flow);
+		execFlow.setExecutionId(executionId);
 
 		return execFlow;
 	}
diff --git a/unit/java/azkaban/test/utils/PropsUtilsTest.java b/unit/java/azkaban/test/utils/PropsUtilsTest.java
index e2c914f..2f1ba02 100644
--- a/unit/java/azkaban/test/utils/PropsUtilsTest.java
+++ b/unit/java/azkaban/test/utils/PropsUtilsTest.java
@@ -88,7 +88,7 @@ public class PropsUtilsTest {
 	
 	private void failIfNotException(Props props) {
 		try {
-			Props resolved = PropsUtils.resolveProps(props);
+			PropsUtils.resolveProps(props);
 			Assert.fail();
 		}
 		catch (UndefinedPropertyException e) {