azkaban-uncached

Changes

unit/build.xml 26(+24 -2)

Details

diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 5556d2a..4fd55c2 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -95,6 +95,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 						logger.info("Resume called.");
 						handleAjaxResume(respMap, execid, user);
 					}
+					else if (action.equals(MODIFY_EXECUTION_ACTION)) {
+						logger.info("Modify Execution Action");
+						handleModifyExecutionRequest(respMap, execid, user, req);
+					}
 					else {
 						respMap.put("error", "action: '" + action + "' not supported.");
 					}
@@ -107,6 +111,38 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		resp.flushBuffer();
 	}
 	
+	private void handleModifyExecutionRequest(Map<String, Object> respMap, int execId, String user, HttpServletRequest req) throws ServletException {
+		if (!hasParam(req, MODIFY_EXECUTION_ACTION_TYPE)) {
+			respMap.put(RESPONSE_ERROR, "Modification type not set.");
+		}
+		String modificationType = getParam(req, MODIFY_EXECUTION_ACTION_TYPE);
+		String modifiedJobList = getParam(req, MODIFY_JOBS_LIST);
+		String[] jobIds = modifiedJobList.split("\\s*,\\s*");
+		
+		try {
+			if (MODIFY_RETRY_JOBS.equals(modificationType)) {
+				flowRunnerManager.retryJobs(execId, user, jobIds);
+			}
+			else if (MODIFY_CANCEL_JOBS.equals(modificationType)) {
+				
+			}
+			else if (MODIFY_DISABLE_JOBS.equals(modificationType)) {
+				
+			}
+			else if (MODIFY_ENABLE_JOBS.equals(modificationType)) {
+				
+			}
+			else if (MODIFY_PAUSE_JOBS.equals(modificationType)) {
+				
+			}
+			else if (MODIFY_RESUME_JOBS.equals(modificationType)) {
+				
+			}
+		} catch (ExecutorManagerException e) {
+			respMap.put("error", e.getMessage());
+		}
+	}
+	
 	private void handleFetchLogEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
 		String type = getParam(req, "type");
 		int startByte = getIntParam(req, "offset");
@@ -121,13 +157,14 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 				result = flowRunnerManager.readFlowLogs(execId, startByte, length);
 				respMap.putAll(result.toObject());
 			} catch (Exception e) {
-				respMap.put("error", e.getMessage());
+				respMap.put(RESPONSE_ERROR, e.getMessage());
 			}
 		}
 		else {
+			int attempt = getIntParam(req, "attempt", 0);
 			String jobId = getParam(req, "jobId");
 			try {
-				LogData result = flowRunnerManager.readJobLogs(execId, jobId, startByte, length);
+				LogData result = flowRunnerManager.readJobLogs(execId, jobId, attempt, startByte, length);
 				respMap.putAll(result.toObject());
 			} catch (Exception e) {
 				respMap.put("error", e.getMessage());
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index c3449d8..91ac757 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -33,6 +33,7 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.FlowProps;
 import azkaban.jobtype.JobTypeManager;
+import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
 public class FlowRunner extends EventHandler implements Runnable {
@@ -63,9 +64,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
 	private BlockingQueue<JobRunner> jobsToRun = new LinkedBlockingQueue<JobRunner>();
 	private Map<String, JobRunner> runningJob = new ConcurrentHashMap<String, JobRunner>();
-	private Map<String, JobRunner> allJobs = new ConcurrentHashMap<String, JobRunner>();
+	private Map<Pair<String, Integer>, JobRunner> allJobs = new ConcurrentHashMap<Pair<String, Integer>, JobRunner>();
 	private List<JobRunner> pausedJobsToRun = Collections.synchronizedList(new ArrayList<JobRunner>());
 	
+	// Used for individual job pausing
+	private Map<String, ExecutableNode> pausedNode = new ConcurrentHashMap<String, ExecutableNode>();
+	
 	private Object actionSyncObj = new Object();
 	private boolean flowPaused = false;
 	private boolean flowFailed = false;
@@ -173,7 +177,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 			flowAppender.close();
 			
 			try {
-				executorLoader.uploadLogFile(execId, "", logFile);
+				executorLoader.uploadLogFile(execId, "", 0, logFile);
 			} catch (ExecutorManagerException e) {
 				e.printStackTrace();
 			}
@@ -238,7 +242,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 					}
 					else {
 						runningJob.put(node.getJobId(), runner);
-						allJobs.put(node.getJobId(), runner);
+						allJobs.put(new Pair<String, Integer>(node.getJobId(), node.getAttempt()), runner);
 						executorService.submit(runner);
 						logger.info("Job Started " + node.getJobId());
 					}
@@ -363,12 +367,13 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	public void cancel(String user) {
 		synchronized(actionSyncObj) {
+			logger.info("Flow cancelled by " + user);
 			flowPaused = false;
 			flowCancelled = true;
 			
 			for (JobRunner runner: pausedJobsToRun) {
 				ExecutableNode node = runner.getNode();
-				logger.info("Resumed flow is cancelled. Job killed " + node.getJobId());
+				logger.info("Resumed flow is cancelled. Job killed " + node.getJobId() + " by " + user);
 				node.setStatus(Status.KILLED);
 
 				jobsToRun.add(runner);
@@ -382,11 +387,192 @@ public class FlowRunner extends EventHandler implements Runnable {
 				flow.setStatus(Status.KILLED);
 			}
 
+			for (ExecutableNode node: pausedNode.values()) {
+				node.setStatus(Status.KILLED);
+				node.setPaused(false);
+				queueNextJob(node);
+			}
+			
 			updateFlow();
 			interrupt();
 		}
 	}
 	
+	public void cancelJob(String jobId, String user)  throws ExecutorManagerException {
+		synchronized(actionSyncObj) {
+			logger.info("Cancel of job " + jobId + " called by user " + user);
+			JobRunner runner = runningJob.get(jobId);
+			ExecutableNode node = flow.getExecutableNode(jobId);
+			if (runner != null) {
+				runner.cancel();
+			}
+			else {
+				Status status = node.getStatus();
+				if(status == Status.FAILED || status == Status.SUCCEEDED || status == Status.SKIPPED) {
+					throw new ExecutorManagerException("Can't cancel finished job " + jobId + " with status " + status);
+				}
+				
+				node.setStatus(Status.KILLED);
+				if (node.isPaused()) {
+					node.setPaused(false);
+					queueNextJob(node);
+				}
+			}
+		}
+	}
+	
+	public void resumeJob(String jobId, String user) throws ExecutorManagerException {
+		synchronized(actionSyncObj) {
+			if (runningJob.containsKey(jobId)) {
+				throw new ExecutorManagerException("Resume of job " + jobId + " failed since it's already running. User " + user);
+			}
+			else {
+				logger.info("Resume of job " + jobId + " requested by " + user);
+				ExecutableNode node = flow.getExecutableNode(jobId);
+				if (node == null) {
+					throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot pause.");
+				}
+			
+				if (node.isPaused()) {
+					node.setPaused(false);
+					if (pausedNode.containsKey(jobId)) {
+						queueNextJob(node);
+					}
+					
+					updateFlow();
+				}
+			}
+		}
+	}
+	
+	public void pauseJob(String jobId, String user) throws ExecutorManagerException {
+		synchronized(actionSyncObj) {
+			if (runningJob.containsKey(jobId)) {
+				throw new ExecutorManagerException("Pause of job " + jobId + " failed since it's already running. User " + user);
+			}
+			else {
+				logger.info("Pause of job " + jobId + " requested by " + user);
+				ExecutableNode node = flow.getExecutableNode(jobId);
+				if (node == null) {
+					throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot pause.");
+				}
+			
+				long startTime = node.getStartTime();
+				if (startTime < 0) {
+					node.setPaused(true);
+					updateFlow();
+				}
+				else {
+					throw new ExecutorManagerException("Cannot pause job " + jobId + " that's started.");	
+				}
+			}
+		}
+	}
+	
+	public void disableJob(String jobId, String user) throws ExecutorManagerException {
+		// Disable and then check to see if it's set.
+		synchronized(actionSyncObj) {
+			if (runningJob.containsKey(jobId)) {
+				throw new ExecutorManagerException("Disable of job " + jobId + " failed since it's already running. User " + user);
+			}
+			else {
+				logger.info("Disable of job " + jobId + " requested by " + user);
+				ExecutableNode node = flow.getExecutableNode(jobId);
+				if (node == null) {
+					throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot disable.");
+				}
+			
+				Status status = node.getStatus();
+				if (status == Status.DISABLED || status == Status.READY) {
+					node.setStatus(Status.DISABLED);
+					updateFlow();
+				}
+				else {
+					throw new ExecutorManagerException("Cannot disable job " + jobId + " with status " + status.toString());	
+				}
+			}
+		}
+	}
+	
+	public void enableJob(String jobId, String user) throws ExecutorManagerException {
+		// Disable and then check to see if it's set.
+		synchronized(actionSyncObj) {
+			if (runningJob.containsKey(jobId)) {
+				throw new ExecutorManagerException("Enable of job " + jobId + " failed since it's already running. User " + user);
+			}
+			else {
+				logger.info("Enable of job " + jobId + " requested by " + user);
+				ExecutableNode node = flow.getExecutableNode(jobId);
+				if (node == null) {
+					throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot enable.");
+				}
+			
+				Status status = node.getStatus();
+				if (status == Status.DISABLED || status == Status.READY) {
+					node.setStatus(Status.READY);
+					updateFlow();
+				}
+				else {
+					throw new ExecutorManagerException("Cannot enable job " + jobId + " with status " + status.toString());	
+				}
+			}
+		}
+	}
+	
+	public void retryJobs(String[] jobIds, String user) {
+		synchronized(actionSyncObj) {
+			for (String jobId: jobIds) {
+				if (runningJob.containsKey(jobId)) {
+					logger.error("Cannot retry job " + jobId + " since it's already running. User " + user);
+					continue;
+				}
+				else {
+					logger.info("Retry of job " + jobId + " requested by " + user);
+					ExecutableNode node = flow.getExecutableNode(jobId);
+					if (node == null) {
+						logger.error("Job " + jobId + " doesn't exist in execution " + flow.getExecutionId() + ". Cannot disable.");
+					}
+				
+					Status status = node.getStatus();
+					if (status == Status.FAILED || status == Status.READY || status == Status.KILLED) {
+						node.resetForRetry();
+						reEnableDependents(node);
+					}
+					else {
+						logger.error("Cannot retry a job that hasn't finished. " + jobId);
+					}
+				}
+			}
+			
+			boolean isFailureFound = false;
+			for (ExecutableNode node: flow.getExecutableNodes()) {
+				Status nodeStatus = node.getStatus();
+				if (nodeStatus == Status.FAILED || nodeStatus == Status.KILLED) {
+					isFailureFound = true;
+					break;
+				}
+			}
+			
+			if (!isFailureFound) {
+				flow.setStatus(Status.RUNNING);
+			}
+			
+			updateFlow();
+		}
+	}
+	
+	private void reEnableDependents(ExecutableNode node) {
+		for(String dependent: node.getOutNodes()) {
+			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
+			
+			if (dependentNode.getStatus() == Status.KILLED) {
+				dependentNode.setStatus(Status.READY);
+				dependentNode.setUpdateTime(System.currentTimeMillis());
+				reEnableDependents(dependentNode);
+			}
+		}
+	}
+	
 	private void interrupt() {
 		currentThread.interrupt();
 	}
@@ -435,50 +621,70 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return Status.READY;
 	}
 	
-	private synchronized void queueNextJobs(ExecutableNode node) {
-		for (String dependent : node.getOutNodes()) {
+	/**
+	 * Iterates through the finished jobs dependents.
+	 * 
+	 * @param node
+	 */
+	private synchronized void queueNextJobs(ExecutableNode finishedNode) {
+		for (String dependent : finishedNode.getOutNodes()) {
 			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-			
-			Status nextStatus = getImpliedStatus(dependentNode);
-			if (nextStatus == null) {
-				// Not yet ready or not applicable
-				continue;
-			}
+			queueNextJob(dependentNode);
+		}
+	}
 
-			dependentNode.setStatus(nextStatus);
+	/**
+	 * Queues node for running if it's ready to be run.
+	 * 
+	 * @param node
+	 */
+	private void queueNextJob(ExecutableNode node) {
+		Status nextStatus = getImpliedStatus(node);
+		if (nextStatus == null) {
+			// Not yet ready or not applicable
+			return;
+		}
 
-			Props previousOutput = null;
-			// Iterate the in nodes again and create the dependencies
-			for (String dependency : dependentNode.getInNodes()) {
-				Props output = jobOutputProps.get(dependency);
-				if (output != null) {
-					output = Props.clone(output);
-					output.setParent(previousOutput);
-					previousOutput = output;
-				}
+		node.setStatus(nextStatus);
+		
+		Props previousOutput = null;
+		// Iterate the in nodes again and create the dependencies
+		for (String dependency : node.getInNodes()) {
+			Props output = jobOutputProps.get(dependency);
+			if (output != null) {
+				output = Props.clone(output);
+				output.setParent(previousOutput);
+				previousOutput = output;
 			}
+		}
 
-			JobRunner runner = this.createJobRunner(dependentNode, previousOutput);
-			synchronized(actionSyncObj) {
-				if (flowPaused) {
-					if (dependentNode.getStatus() != Status.DISABLED && dependentNode.getStatus() != Status.KILLED) {
-						dependentNode.setStatus(Status.PAUSED);
-					}
-					pausedJobsToRun.add(runner);
-					logger.info("Job Paused " + dependentNode.getJobId());
+		synchronized(actionSyncObj) {
+			//pausedNode
+			if (node.isPaused()) {
+				pausedNode.put(node.getJobId(), node);
+				logger.info("Job Paused " + node.getJobId());
+				return;
+			}
+			
+			JobRunner runner = this.createJobRunner(node, previousOutput);
+			if (flowPaused) {
+				if (node.getStatus() != Status.DISABLED && node.getStatus() != Status.KILLED) {
+					node.setStatus(Status.PAUSED);
 				}
-				else {
-					logger.info("Adding " + dependentNode.getJobId() + " to run queue.");
-					if (dependentNode.getStatus() != Status.DISABLED && dependentNode.getStatus() != Status.KILLED) {
-						dependentNode.setStatus(Status.QUEUED);
-					}
-
-					jobsToRun.add(runner);
+				pausedJobsToRun.add(runner);
+				logger.info("Flow Paused. Pausing " + node.getJobId());
+			}
+			else {
+				logger.info("Adding " + node.getJobId() + " to run queue.");
+				if (node.getStatus() != Status.DISABLED && node.getStatus() != Status.KILLED) {
+					node.setStatus(Status.QUEUED);
 				}
+
+				jobsToRun.add(runner);
 			}
 		}
 	}
-
+	
 	private class JobRunnerEventListener implements EventListener {
 		public JobRunnerEventListener() {
 		}
@@ -562,8 +768,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return logFile;
 	}
 	
-	public File getJobLogFile(String jobId) {
-		JobRunner runner = allJobs.get(jobId);
+	public File getJobLogFile(String jobId, int attempt) {
+		JobRunner runner = allJobs.get(new Pair<String, Integer>(jobId, attempt));
 		if (runner == null) {
 			return null;
 		}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index dda8674..5054c48 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -324,6 +324,66 @@ public class FlowRunnerManager implements EventListener {
 		runner.resume(user);
 	}
 	
+	public void pauseJob(int execId, String jobId, String user) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		
+		if (runner == null) {
+			throw new ExecutorManagerException("Execution " + execId + " is not running.");
+		}
+		
+		runner.pauseJob(jobId, user);
+	}
+	
+	public void resumeJob(int execId, String jobId, String user) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		
+		if (runner == null) {
+			throw new ExecutorManagerException("Execution " + execId + " is not running.");
+		}
+		
+		runner.resumeJob(jobId, user);
+	}
+	
+	public void retryJobs(int execId, String user, String ... jobId) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		
+		if (runner == null) {
+			throw new ExecutorManagerException("Execution " + execId + " is not running.");
+		}
+		
+		runner.retryJobs(jobId, user);
+	}
+	
+	public void disableJob(int execId, String user, String jobId) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		
+		if (runner == null) {
+			throw new ExecutorManagerException("Execution " + execId + " is not running.");
+		}
+		
+		runner.disableJob(jobId, user);
+	}
+	
+	public void enableJob(int execId, String user, String jobId) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		
+		if (runner == null) {
+			throw new ExecutorManagerException("Execution " + execId + " is not running.");
+		}
+		
+		runner.enableJob(jobId, user);
+	}
+	
+	public void cancelJob(int execId, String user, String jobId) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		
+		if (runner == null) {
+			throw new ExecutorManagerException("Execution " + execId + " is not running.");
+		}
+		
+		runner.cancelJob(jobId, user);
+	}
+	
 	public ExecutableFlow getExecutableFlow(int execId) {
 		FlowRunner runner = runningFlows.get(execId);
 		if (runner == null) {
@@ -384,7 +444,7 @@ public class FlowRunnerManager implements EventListener {
 		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
 	}
 	
-	public LogData readJobLogs(int execId, String jobId, int startByte, int length) throws ExecutorManagerException {
+	public LogData readJobLogs(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(execId);
 		if (runner == null) {
 			throw new ExecutorManagerException("Running flow " + execId + " not found.");
@@ -397,7 +457,7 @@ public class FlowRunnerManager implements EventListener {
 					if (!dir.exists()) {
 						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
 					}
-					File logFile = runner.getJobLogFile(jobId);
+					File logFile = runner.getJobLogFile(jobId, attempt);
 					if (logFile != null && logFile.exists()) {
 						return FileIOUtils.readUtf8File(logFile, startByte, length);
 					}
diff --git a/src/java/azkaban/execapp/FlowWatcher.java b/src/java/azkaban/execapp/FlowWatcher.java
new file mode 100644
index 0000000..5bcb4d0
--- /dev/null
+++ b/src/java/azkaban/execapp/FlowWatcher.java
@@ -0,0 +1,9 @@
+package azkaban.execapp;
+
+/**
+ * Class that watches and updates execution flows that are being listened to by
+ * other executing flows.
+ */
+public class FlowWatcher {
+
+}
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 5aa5a04..0b8d3c6 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -83,14 +83,13 @@ public class JobRunner extends EventHandler implements Runnable {
 			logger = Logger.getLogger(loggerName);
 
 			// Create file appender
-
-			String logName = "_job." + executionId + "." + node.getJobId() + ".log";
+			String logName = node.getAttempt() > 0 ? "_job." + executionId + "." + node.getAttempt() + "." + node.getJobId() + ".log" : "_job." + executionId + "." + node.getJobId() + ".log";
 			logFile = new File(workingDir, logName);
 			String absolutePath = logFile.getAbsolutePath();
 
 			jobAppender = null;
 			try {
-				FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, false);
+				FileAppender fileAppender = new FileAppender(loggerLayout, absolutePath, true);
 				
 				jobAppender = fileAppender;
 				logger.addAppender(jobAppender);
@@ -110,7 +109,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	private void writeStatus() {
 		try {
 			node.setUpdateTime(System.currentTimeMillis());
-			loader.updateExecutableNode(node, outputProps);
+			loader.updateExecutableNode(node);
 		} catch (ExecutorManagerException e) {
 			logger.error("Error writing node properties", e);
 		}
@@ -159,7 +158,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			
 			if (logFile != null) {
 				try {
-					loader.uploadLogFile(executionId, node.getJobId(), logFile);
+					loader.uploadLogFile(executionId, node.getJobId(), node.getAttempt(), logFile);
 				} catch (ExecutorManagerException e) {
 					System.err.println("Error writing out logs for job " + node.getJobId());
 				}
@@ -192,7 +191,12 @@ public class JobRunner extends EventHandler implements Runnable {
 				return false;
 			}
 
-			logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
+			if (node.getAttempt() > 0) {
+				logInfo("Starting job " + node.getJobId() + " attempt " + node.getAttempt() + " at " + node.getStartTime());
+			}
+			else {
+				logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
+			}
 			node.setStatus(Status.RUNNING);
 
 			// Ability to specify working directory
@@ -220,6 +224,7 @@ public class JobRunner extends EventHandler implements Runnable {
 		node.setStatus(Status.SUCCEEDED);
 		if (job != null) {
 			outputProps = job.getJobGeneratedProperties();
+			node.setOutputProps(outputProps);
 		}
 	}
 
@@ -266,5 +271,4 @@ public class JobRunner extends EventHandler implements Runnable {
 	public File getLogFile() {
 		return logFile;
 	}
-
 }
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 8c1360e..6bda5cd 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -31,6 +31,16 @@ public interface ConnectorParams {
 	public static final String PING_ACTION = "ping";
 	public static final String LOG_ACTION = "log";
 	
+	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
+	public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
+	public static final String MODIFY_RETRY_JOBS = "retryJobs";
+	public static final String MODIFY_CANCEL_JOBS = "cancelJobs";
+	public static final String MODIFY_DISABLE_JOBS = "skipJobs";
+	public static final String MODIFY_ENABLE_JOBS = "enableJobs";
+	public static final String MODIFY_PAUSE_JOBS = "pauseJobs";
+	public static final String MODIFY_RESUME_JOBS = "resumeJobs";
+	public static final String MODIFY_JOBS_LIST = "jobIds";
+	
 	public static final String START_PARAM = "start";
 	public static final String END_PARAM = "end";
 	public static final String STATUS_PARAM = "status";
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index ec05459..de2843a 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -118,6 +118,15 @@ public class ExecutableFlow {
 		this.setFlow(flow);
 	}
 	
+	public ExecutableFlow(int executionId, Flow flow) {
+		this.projectId = flow.getProjectId();
+		this.flowId = flow.getId();
+		this.version = flow.getVersion();
+		this.executionId = executionId;
+		
+		this.setFlow(flow);
+	}
+	
 	public ExecutableFlow() {
 	}
 	
@@ -207,6 +216,15 @@ public class ExecutableFlow {
 		exNode.setStatus(status);
 		return true;
 	}
+
+	public void setProxyNodes(int externalExecutionId, String nodeId) {
+		ExecutableNode exNode = executableNodes.get(nodeId);
+		if (exNode == null) {
+			return;
+		}
+		
+		exNode.setExternalExecutionId(externalExecutionId);
+	}
 	
 	public int getExecutionId() {
 		return executionId;
@@ -214,6 +232,10 @@ public class ExecutableFlow {
 
 	public void setExecutionId(int executionId) {
 		this.executionId = executionId;
+		
+		for(ExecutableNode node: executableNodes.values()) {
+			node.setExecutionId(executionId);
+		}
 	}
 
 	public String getFlowId() {
diff --git a/src/java/azkaban/executor/ExecutableJobInfo.java b/src/java/azkaban/executor/ExecutableJobInfo.java
index 99f271e..b716b5f 100644
--- a/src/java/azkaban/executor/ExecutableJobInfo.java
+++ b/src/java/azkaban/executor/ExecutableJobInfo.java
@@ -14,8 +14,9 @@ public class ExecutableJobInfo {
 	private final long startTime;
 	private final long endTime;
 	private final Status status;
+	private final int attempt;
 	
-	public ExecutableJobInfo(int execId, int projectId, int version, String flowId, String jobId, long startTime, long endTime, Status status) {
+	public ExecutableJobInfo(int execId, int projectId, int version, String flowId, String jobId, long startTime, long endTime, Status status, int attempt) {
 		this.execId = execId;
 		this.projectId = projectId;
 		this.startTime = startTime;
@@ -24,6 +25,7 @@ public class ExecutableJobInfo {
 		this.version = version;
 		this.flowId = flowId;
 		this.jobId = jobId;
+		this.attempt = attempt;
 	}
 
 	public int getProjectId() {
@@ -58,6 +60,10 @@ public class ExecutableJobInfo {
 		return status;
 	}
 	
+	public int getAttempt() {
+		return attempt;
+	}
+	
 	public Map<String, Object> toObject() {
 		HashMap<String, Object> map = new HashMap<String, Object>();
 		map.put("execId", execId);
@@ -67,6 +73,7 @@ public class ExecutableJobInfo {
 		map.put("startTime", startTime);
 		map.put("endTime", endTime);
 		map.put("status", status.toString());
+		map.put("attempt", attempt);
 		
 		return map;
 	}
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 5541bb2..66526af 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -20,11 +20,13 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.flow.Node;
 import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
 
 public class ExecutableNode {
 	private String jobId;
@@ -32,17 +34,23 @@ public class ExecutableNode {
 	private String type;
 	private String jobPropsSource;
 	private String inheritPropsSource;
-	private String outputPropsSource;
 	private Status status = Status.READY;
 	private long startTime = -1;
 	private long endTime = -1;
 	private long updateTime = -1;
 	private int level = 0;
 	private ExecutableFlow flow;
+	private Props outputProps;
+	private int attempt = 0;
+	private boolean paused = false;
 	
 	private Set<String> inNodes = new HashSet<String>();
 	private Set<String> outNodes = new HashSet<String>();
 	
+	// Used if proxy node
+	private Integer externalExecutionId;
+	private List<Attempt> pastAttempts = null;
+	
 	public ExecutableNode(Node node, ExecutableFlow flow) {
 		jobId = node.getId();
 		executionId = flow.getExecutionId();
@@ -57,6 +65,20 @@ public class ExecutableNode {
 	public ExecutableNode() {
 	}
 	
+	public void resetForRetry() {
+		Attempt pastAttempt = new Attempt(attempt, startTime, endTime, status);
+		attempt++;
+		if (pastAttempts == null) {
+			pastAttempts = new ArrayList<Attempt>();
+			pastAttempts.add(pastAttempt);
+		}
+		
+		startTime = -1;
+		endTime = -1;
+		updateTime = System.currentTimeMillis();
+		status = Status.READY;
+	}
+	
 	public void setExecutableFlow(ExecutableFlow flow) {
 		this.flow = flow;
 	}
@@ -64,11 +86,11 @@ public class ExecutableNode {
 	public void setExecutionId(int id) {
 		executionId = id;
 	}
-	
+
 	public int getExecutionId() {
 		return executionId;
 	}
-	
+
 	public String getJobId() {
 		return jobId;
 	}
@@ -76,15 +98,15 @@ public class ExecutableNode {
 	public void setJobId(String id) {
 		this.jobId = id;
 	}
-	
+
 	public void addInNode(String exNode) {
 		inNodes.add(exNode);
 	}
-	
+
 	public void addOutNode(String exNode) {
 		outNodes.add(exNode);
 	}
-	
+
 	public Set<String> getOutNodes() {
 		return outNodes;
 	}
@@ -96,11 +118,11 @@ public class ExecutableNode {
 	public Status getStatus() {
 		return status;
 	}
-	
+
 	public void setStatus(Status status) {
 		this.status = status;
 	}
-	
+
 	public Object toObject() {
 		HashMap<String, Object> objMap = new HashMap<String, Object>();
 		objMap.put("id", jobId);
@@ -114,9 +136,15 @@ public class ExecutableNode {
 		objMap.put("endTime", endTime);
 		objMap.put("updateTime", updateTime);
 		objMap.put("level", level);
+		objMap.put("externalExecutionId", externalExecutionId);
+		objMap.put("paused", paused);
 		
-		if (outputPropsSource != null) {
-			objMap.put("outputSource", outputPropsSource);
+		if (pastAttempts != null) {
+			ArrayList<Object> attemptsList = new ArrayList<Object>(pastAttempts.size());
+			for (Attempt attempts : pastAttempts) {
+				attemptsList.add(attempts.toObject());
+			}
+			objMap.put("pastAttempts", attemptsList);
 		}
 		
 		return objMap;
@@ -131,7 +159,6 @@ public class ExecutableNode {
 		exNode.jobId = (String)objMap.get("id");
 		exNode.jobPropsSource = (String)objMap.get("jobSource");
 		exNode.inheritPropsSource = (String)objMap.get("propSource");
-		exNode.outputPropsSource = (String)objMap.get("outputSource");
 		exNode.type = (String)objMap.get("jobType");
 		exNode.status = Status.valueOf((String)objMap.get("status"));
 		
@@ -143,11 +170,28 @@ public class ExecutableNode {
 		exNode.updateTime = JSONUtils.getLongFromObject(objMap.get("updateTime"));
 		exNode.level = (Integer)objMap.get("level");
 		
+		exNode.externalExecutionId = (Integer)objMap.get("externalExecutionId");
+		
 		exNode.flow = flow;
+		Boolean paused = (Boolean)objMap.get("paused");
+		if (paused!=null) {
+			exNode.paused = paused;
+		}
+		
+		List<Object> pastAttempts = (List<Object>)objMap.get("pastAttempts");
+		if (pastAttempts!=null) {
+			ArrayList<Attempt> attempts = new ArrayList<Attempt>();
+			for (Object attemptObj: pastAttempts) {
+				Attempt attempt = Attempt.fromObject(attemptObj);
+				attempts.add(attempt);
+			}
+			
+			exNode.pastAttempts = attempts;
+		}
 		
 		return exNode;
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public void updateNodeFromObject(Object obj) {
 		HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
@@ -156,7 +200,7 @@ public class ExecutableNode {
 		startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
 		endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
 	}
-	
+
 	public long getStartTime() {
 		return startTime;
 	}
@@ -172,7 +216,7 @@ public class ExecutableNode {
 	public void setEndTime(long endTime) {
 		this.endTime = endTime;
 	}
-	
+
 	public String getJobPropsSource() {
 		return jobPropsSource;
 	}
@@ -180,11 +224,11 @@ public class ExecutableNode {
 	public String getPropsSource() {
 		return inheritPropsSource;
 	}
-	
+
 	public int getLevel() {
 		return level;
 	}
-	
+
 	public ExecutableFlow getFlow() {
 		return flow;
 	}
@@ -196,4 +240,90 @@ public class ExecutableNode {
 	public void setUpdateTime(long updateTime) {
 		this.updateTime = updateTime;
 	}
+
+	public void setOutputProps(Props output) {
+		this.outputProps = output;
+	}
+
+	public Props getOutputProps() {
+		return outputProps;
+	}
+
+	public Integer getExternalExecutionId() {
+		return externalExecutionId;
+	}
+
+	public void setExternalExecutionId(Integer externalExecutionId) {
+		this.externalExecutionId = externalExecutionId;
+	}
+
+	public List<Attempt> getPastAttemptList() {
+		return pastAttempts;
+	}
+	
+	public int getAttempt() {
+		return attempt;
+	}
+
+	public void setAttempt(int attempt) {
+		this.attempt = attempt;
+	}
+	
+	public boolean isPaused() {
+		return paused;
+	}
+	
+	public void setPaused(boolean paused) {
+		this.paused = paused;
+	}
+	
+	public static class Attempt {
+		private int attempt = 0;
+		private long startTime = -1;
+		private long endTime = -1;
+		private Status status;
+		
+		public Attempt(int attempt, long startTime, long endTime, Status status) {
+			this.attempt = attempt;
+			this.startTime = startTime;
+			this.endTime = endTime;
+			this.status = status;
+		}
+		
+		public long getStartTime() {
+			return startTime;
+		}
+
+		public long getEndTime() {
+			return endTime;
+		}
+		
+		public Status getStatus() {
+			return status;
+		}
+		
+		public int getAttempt() {
+			return attempt;
+		}
+		
+		public static Attempt fromObject(Object obj) {
+			@SuppressWarnings("unchecked")
+			Map<String, Object> map = (Map<String, Object>)obj;
+			int attempt = (Integer)map.get("attempt");
+			long startTime = JSONUtils.getLongFromObject(map.get("startTime"));
+			long endTime = JSONUtils.getLongFromObject(map.get("endTime"));
+			Status status = Status.valueOf((String)map.get("status"));
+			
+			return new Attempt(attempt, startTime, endTime, status);
+		}
+		
+		public Map<String, Object> toObject() {
+			HashMap<String,Object> attempts = new HashMap<String,Object>();
+			attempts.put("attempt", attempt);
+			attempts.put("startTime", startTime);
+			attempts.put("endTime", endTime);
+			attempts.put("status", status.toString());
+			return attempts;
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutionReference.java b/src/java/azkaban/executor/ExecutionReference.java
index 89c1d38..930d11d 100644
--- a/src/java/azkaban/executor/ExecutionReference.java
+++ b/src/java/azkaban/executor/ExecutionReference.java
@@ -1,7 +1,5 @@
 package azkaban.executor;
 
-import java.util.HashMap;
-
 public class ExecutionReference {
 	private final int execId;
 	private final String host;
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 0ce8016..13eea40 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -43,23 +43,32 @@ public interface ExecutorLoader {
 
 	public boolean updateExecutableReference(int execId, long updateTime) throws ExecutorManagerException;
 
-	public LogData fetchLogs(int execId, String name, int startByte, int endByte) throws ExecutorManagerException;
+	public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
 
-	public void uploadLogFile(int execId, String name, File ... files) throws ExecutorManagerException;
+	public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
 
 	public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
 
 	public void uploadExecutableNode(ExecutableNode node, Props inputParams) throws ExecutorManagerException; 
 
-	public ExecutableJobInfo fetchJobInfo(int execId, String jobId) throws ExecutorManagerException;
+	public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId) throws ExecutorManagerException;
 
+	public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempt) throws ExecutorManagerException;
+	
 	public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId, int skip, int size) throws ExecutorManagerException;
 	
-	public void updateExecutableNode(ExecutableNode node, Props outputParams) throws ExecutorManagerException;
+	public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException;
 
 	public int fetchNumExecutableFlows(int projectId, String flowId) throws ExecutorManagerException;
 
 	public int fetchNumExecutableFlows() throws ExecutorManagerException;
 	
 	public int fetchNumExecutableNodes(int projectId, String jobId) throws ExecutorManagerException;
+	
+	public Props fetchExecutionJobInputProps(int execId, String jobId) throws ExecutorManagerException;
+	
+	public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException;
+	
+	public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException;
+	
 }
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index fe807d3..a5add77 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.methods.HttpGet;
@@ -159,12 +160,12 @@ public class ExecutorManager {
 			return LogData.createLogDataFromObject(result);
 		}
 		else {
-			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), "", offset, length);
+			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), "", 0, offset, length);
 			return value;
 		}
 	}
 	
-	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length) throws ExecutorManagerException {
+	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
 		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
 		if (pair != null) {
 
@@ -172,13 +173,14 @@ public class ExecutorManager {
 			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
 			Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
 			Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
-
+			Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+			
 			@SuppressWarnings("unchecked")
-			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam);
+			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
 			return LogData.createLogDataFromObject(result);
 		}
 		else {
-			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, offset, length);
+			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
 			return value;
 		}
 	}
@@ -213,6 +215,58 @@ public class ExecutorManager {
 		}
 	}
 	
+	public void pauseExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_PAUSE_JOBS, userId, jobIds);
+	}
+	
+	public void resumeExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RESUME_JOBS, userId, jobIds);
+	}
+	
+	public void retryExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_RETRY_JOBS, userId, jobIds);
+	}
+	
+	public void disableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_DISABLE_JOBS, userId, jobIds);
+	}
+	
+	public void enableExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_ENABLE_JOBS, userId, jobIds);
+	}
+	
+	public void cancelExecutingJobs(ExecutableFlow exFlow, String userId, String ... jobIds) throws ExecutorManagerException {
+		modifyExecutingJobs(exFlow, ConnectorParams.MODIFY_CANCEL_JOBS, userId, jobIds);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private Map<String, Object> modifyExecutingJobs(ExecutableFlow exFlow, String command, String userId, String ... jobIds) throws ExecutorManagerException {
+		synchronized(exFlow) {
+			Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+			if (pair == null) {
+				throw new ExecutorManagerException("Execution " + exFlow.getExecutionId() + " of flow " + exFlow.getFlowId() + " isn't running.");
+			}
+			
+			for (String jobId: jobIds) {
+				if (!jobId.isEmpty()) {
+					ExecutableNode node = exFlow.getExecutableNode(jobId);
+					if (node == null) {
+						throw new ExecutorManagerException("Job " + jobId + " doesn't exist in execution " + exFlow.getExecutionId() + ".");
+					}
+				}
+			}
+			String ids = StringUtils.join(jobIds, ',');
+			Map<String, Object> response = callExecutorServer(
+					pair.getFirst(), 
+					ConnectorParams.MODIFY_EXECUTION_ACTION, 
+					userId, 
+					new Pair<String,String>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, command), 
+					new Pair<String,String>(ConnectorParams.MODIFY_JOBS_LIST, ids));
+			
+			return response;
+		}
+	}
+	
 	public void submitExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
 		synchronized(exflow) {
 			logger.info("Submitting execution flow " + exflow.getFlowId());
@@ -258,6 +312,14 @@ public class ExecutorManager {
 		}
 	}
 	
+	private Map<String, Object> callExecutorServer(ExecutionReference ref, String action, String user, Pair<String,String> ... params) throws ExecutorManagerException {
+		try {
+			return callExecutorServer(ref.getHost(), ref.getPort(), action, ref.getExecId(), user, params);
+		} catch (IOException e) {
+			throw new ExecutorManagerException(e);
+		}
+	}
+	
 	private Map<String, Object> callExecutorServer(String host, int port, String action, Integer executionId, String user, Pair<String,String> ... params) throws IOException {
 		URIBuilder builder = new URIBuilder();
 		builder.setScheme("http")
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index e7f7bf1..87478ac 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -152,7 +152,6 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 			if (encType == EncodingType.GZIP) {
 				data = GZIPUtils.gzipBytes(stringData);
 			}
-			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
 		}
 		catch (IOException e) {
 			throw new ExecutorManagerException("Error encoding the execution flow.");
@@ -248,6 +247,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	@Override
 	public List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
 		QueryRunner runner = new QueryRunner(dataSource);
+
 		FetchExecutableFlows flowHandler = new FetchExecutableFlows();
 		
 		try {
@@ -391,7 +391,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 
 	@Override
 	public void uploadExecutableNode(ExecutableNode node, Props inputProps) throws ExecutorManagerException {
-		final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params) VALUES (?,?,?,?,?,?,?,?,?)";
+		final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempts) VALUES (?,?,?,?,?,?,?,?,?,?)";
 		
 		byte[] inputParam = null;
 		if (inputProps != null) {
@@ -416,17 +416,20 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 					node.getStartTime(),
 					node.getEndTime(), 
 					node.getStatus().getNumVal(),
-					inputParam);
+					inputParam,
+					node.getAttempt()
+					);
 		} catch (SQLException e) {
 			throw new ExecutorManagerException("Error writing job " + node.getJobId(), e);
 		}
 	}
 	
 	@Override
-	public void updateExecutableNode(ExecutableNode node, Props outputProps) throws ExecutorManagerException {
+	public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
 		final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND job_id=?";
 		
 		byte[] outputParam = null;
+		Props outputProps = node.getOutputProps();
 		if (outputProps != null) {
 			try {
 				String jsonString = JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
@@ -444,7 +447,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 					node.getEndTime(), 
 					node.getStatus().getNumVal(), 
 					outputParam,
-					node.getExecutionId(),
+					node.getFlow().getExecutionId(),
 					node.getJobId());
 		} catch (SQLException e) {
 			throw new ExecutorManagerException("Error updating job " + node.getJobId(), e);
@@ -452,7 +455,23 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	}
 	
 	@Override
-	public ExecutableJobInfo fetchJobInfo(int execId, String jobId) throws ExecutorManagerException {
+	public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId) throws ExecutorManagerException {
+		QueryRunner runner = new QueryRunner(dataSource);
+		
+		try {
+			List<ExecutableJobInfo> info = runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS, new FetchExecutableJobHandler(), execId, jobId);
+			if (info == null || info.isEmpty()) {
+				return null;
+			}
+			
+			return info;
+		} catch (SQLException e) {
+			throw new ExecutorManagerException("Error querying job info " + jobId, e);
+		}
+	}
+	
+	@Override
+	public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempts) throws ExecutorManagerException {
 		QueryRunner runner = new QueryRunner(dataSource);
 		
 		try {
@@ -468,6 +487,42 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	}
 	
 	@Override
+	public Props fetchExecutionJobInputProps(int execId, String jobId) throws ExecutorManagerException {
+		QueryRunner runner = new QueryRunner(dataSource);
+		try {
+			Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
+			return props.getFirst();
+		}
+		catch (SQLException e) {
+			throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, e);
+		}
+	}
+	
+	@Override
+	public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException {
+		QueryRunner runner = new QueryRunner(dataSource);
+		try {
+			Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
+			return props.getFirst();
+		}
+		catch (SQLException e) {
+			throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, e);
+		}
+	}
+	
+	@Override
+	public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException {
+		QueryRunner runner = new QueryRunner(dataSource);
+		try {
+			Pair<Props, Props> props = runner.query(FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), execId, jobId);
+			return props;
+		}
+		catch (SQLException e) {
+			throw new ExecutorManagerException("Error querying job params " + execId + " " + jobId, e);
+		}
+	}
+	
+	@Override
 	public List<ExecutableJobInfo> fetchJobHistory(int projectId, String jobId, int skip, int size) throws ExecutorManagerException {
 		QueryRunner runner = new QueryRunner(dataSource);
 		
@@ -484,13 +539,13 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	}
 	
 	@Override
-	public LogData fetchLogs(int execId, String name, int startByte, int length) throws ExecutorManagerException {
+	public LogData fetchLogs(int execId, String name, int attempt, int startByte, int length) throws ExecutorManagerException {
 		QueryRunner runner = new QueryRunner(dataSource);
 
 		FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
 		
 		try {
-			LogData result = runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name, startByte, startByte + length);
+			LogData result = runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name, attempt, startByte, startByte + length);
 			return result;
 		} catch (SQLException e) {
 			throw new ExecutorManagerException("Error fetching logs " + execId + " : " + name, e);
@@ -498,10 +553,10 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	}
 	
 	@Override
-	public void uploadLogFile(int execId, String name, File ... files) throws ExecutorManagerException {
+	public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException {
 		Connection connection = getConnection();
 		try {
-			uploadLogFile(connection, execId, name, files, defaultEncodingType);
+			uploadLogFile(connection, execId, name, attempt, files, defaultEncodingType);
 			connection.commit();
 		}
 		catch (SQLException e) {
@@ -515,7 +570,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 		}
 	}
 	
-	private void uploadLogFile(Connection connection, int execId, String name, File[] files, EncodingType encType) throws ExecutorManagerException, IOException {
+	private void uploadLogFile(Connection connection, int execId, String name, int attempt, File[] files, EncodingType encType) throws ExecutorManagerException, IOException {
 		// 50K buffer... if logs are greater than this, we chunk.
 		// However, we better prevent large log files from being uploaded somehow
 		byte[] buffer = new byte[50*1024];
@@ -532,7 +587,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 				while (size >= 0) {
 					if (pos + size == buffer.length) {
 						// Flush here.
-						uploadLogPart(connection, execId, name, startByte, startByte + buffer.length, encType, buffer, buffer.length);
+						uploadLogPart(connection, execId, name, attempt, startByte, startByte + buffer.length, encType, buffer, buffer.length);
 						
 						pos = 0;
 						length = buffer.length;
@@ -549,7 +604,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 			
 			// Final commit of buffer.
 			if (pos > 0) {
-				uploadLogPart(connection, execId, name, startByte, startByte + pos, encType, buffer, pos);
+				uploadLogPart(connection, execId, name, attempt, startByte, startByte + pos, encType, buffer, pos);
 			}
 		}
 		catch (SQLException e) {
@@ -564,8 +619,8 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 
 	}
 	
-	private void uploadLogPart(Connection connection, int execId, String name, int startByte, int endByte, EncodingType encType, byte[] buffer, int length) throws SQLException, IOException {
-		final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, enc_type, start_byte, end_byte, log) VALUES (?,?,?,?,?,?)";
+	private void uploadLogPart(Connection connection, int execId, String name, int attempt, int startByte, int endByte, EncodingType encType, byte[] buffer, int length) throws SQLException, IOException {
+		final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log) VALUES (?,?,?,?,?,?,?)";
 		QueryRunner runner = new QueryRunner();
 		
 		byte[] buf = buffer;
@@ -576,7 +631,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 			buf = Arrays.copyOf(buffer, length);
 		}
 		
-		runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, encType.getNumVal(), startByte, startByte + length, buf);
+		runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte, startByte + length, buf);
 	}
 	
 	private Connection getConnection() throws ExecutorManagerException {
@@ -608,7 +663,7 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	}
 	
 	private static class FetchLogsHandler implements ResultSetHandler<LogData> {
-		private static String FETCH_LOGS = "SELECT exec_id, name, enc_type, start_byte, end_byte, log FROM execution_logs WHERE exec_id=? AND name=? AND end_byte > ? AND start_byte <= ? ORDER BY start_byte";
+		private static String FETCH_LOGS = "SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log FROM execution_logs WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? AND start_byte <= ? ORDER BY start_byte";
 
 		private int startByte;
 		private int endByte;
@@ -657,8 +712,9 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 	}
 	
 	private static class FetchExecutableJobHandler implements ResultSetHandler<List<ExecutableJobInfo>> {
-		private static String FETCH_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status FROM execution_jobs WHERE exec_id=? AND job_id=?";
-		private static String FETCH_PROJECT_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status FROM execution_jobs WHERE project_id=? AND job_id=? ORDER BY exec_id DESC LIMIT ?, ? ";
+		private static String FETCH_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE exec_id=? AND job_id=? AND attempt_id=?";
+		private static String FETCH_EXECUTABLE_NODE_ATTEMPTS = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE exec_id=? AND job_id=?";
+		private static String FETCH_PROJECT_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE project_id=? AND job_id=? ORDER BY exec_id DESC LIMIT ?, ? ";
 
 		@Override
 		public List<ExecutableJobInfo> handle(ResultSet rs) throws SQLException {
@@ -676,8 +732,9 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 				long startTime = rs.getLong(6);
 				long endTime = rs.getLong(7);
 				Status status = Status.fromInteger(rs.getInt(8));
+				int attempt = rs.getInt(9);
 				
-				ExecutableJobInfo info = new ExecutableJobInfo(execId, projectId, version, flowId, jobId, startTime, endTime, status);
+				ExecutableJobInfo info = new ExecutableJobInfo(execId, projectId, version, flowId, jobId, startTime, endTime, status, attempt);
 				execNodes.add(info);
 			} while (rs.next());
 
@@ -685,6 +742,61 @@ public class JdbcExecutorLoader implements ExecutorLoader {
 		}
 	}
 	
+	private static class FetchExecutableJobPropsHandler implements ResultSetHandler<Pair<Props, Props>> {
+		private static String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE = "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+		private static String FETCH_INPUT_PARAM_EXECUTABLE_NODE = "SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+		private static String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE = "SELECT input_params, output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+		
+		@SuppressWarnings("unchecked")
+		@Override
+		public Pair<Props, Props> handle(ResultSet rs) throws SQLException {
+			if (!rs.next()) {
+				return new Pair<Props, Props>(null, null);
+			}
+			
+			if (rs.getMetaData().getColumnCount() > 1) {
+				byte[] input = rs.getBytes(1);
+				byte[] output = rs.getBytes(2);
+				
+				Props inputProps = null;
+				Props outputProps = null;
+				try {
+					if (input != null) {
+						String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
+						inputProps = PropsUtils.fromHierarchicalMap(
+								(Map<String, Object>)JSONUtils.parseJSONFromString(jsonInputString));
+						
+					}
+					if (output != null) {
+						String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
+						outputProps = PropsUtils.fromHierarchicalMap(
+								(Map<String, Object>)JSONUtils.parseJSONFromString(jsonOutputString));
+					}
+				} catch (IOException e) {
+					throw new SQLException("Error decoding param data", e);
+				}
+				
+				return new Pair<Props, Props>(inputProps, outputProps);
+			}
+			else {
+				byte[] params = rs.getBytes(1);
+				Props props = null;
+				try {
+					if (params != null) {
+						String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");
+
+						props = PropsUtils.fromHierarchicalMap(
+								(Map<String, Object>)JSONUtils.parseJSONFromString(jsonProps));
+					}
+				} catch (IOException e) {
+					throw new SQLException("Error decoding param data", e);
+				}
+				
+				return new Pair<Props,Props>(props, null);
+			}
+		}
+		
+	}
 
 	private static class FetchActiveExecutableFlows implements ResultSetHandler<Map<Integer, Pair<ExecutionReference,ExecutableFlow>>> {
 		private static String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, ax.host host, ax.port port, ax.update_time axUpdateTime FROM execution_flows ex INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index a04db08..d43b0da 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -183,7 +183,6 @@ public class JobTypeManager
 	@SuppressWarnings("unchecked")
 	private void loadJob(File dir, Props commonConf, Props commonSysConf) throws JobTypeManagerException{
 		
-		
 		Props conf = null;
 		Props sysConf = null;
 		File confFile = findFilefromDir(dir, JOBTYPECONFFILE);		
@@ -250,7 +249,6 @@ public class JobTypeManager
 	
 	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
 	{
-
 		Job job;
 		try {
 			String jobType = jobProps.getString("type");
@@ -260,13 +258,13 @@ public class JobTypeManager
 						String.format("The 'type' parameter for job[%s] is null or empty", jobProps, logger));
 			}
 			
-			logger.info("Building " + jobType + " job executor. ");		
+			logger.info("Building " + jobType + " job executor. ");
 			
 			Class<? extends Object> executorClass = jobToClass.get(jobType);
 
 			if (executorClass == null) {
 				throw new JobExecutionException(
-						String.format("Could not construct job[%s] of type[%s].", jobProps, jobType));
+						String.format("Job type '" + jobType + "' is unrecognized. Could not construct job[%s] of type[%s].", jobProps, jobType));
 			}
 			
 			Props sysConf = jobtypeSysProps.get(jobType);
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index b81d0e8..e97c221 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -233,6 +233,23 @@ public class PropsUtils {
 		}
 	}
 	
+	@SuppressWarnings("unchecked")
+	public static Props fromHierarchicalMap(Map<String, Object> propsMap) {
+		if (propsMap == null) {
+			return null;
+		}
+		
+		String source = (String)propsMap.get("source");
+		Map<String, String> propsParams = (Map<String,String>)propsMap.get("props");
+		
+		Map<String,Object> parent = (Map<String,Object>)propsMap.get("parent");
+		Props parentProps = fromHierarchicalMap(parent);
+		
+		Props props = new Props(parentProps, propsParams);
+		props.setSource(source);
+		return props;
+	}
+	
 	public static Map<String,Object> toHierarchicalMap(Props props) {
 		Map<String,Object> propsMap = new HashMap<String,Object>();
 		propsMap.put("source", props.getSource());
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 2babb79..430af3d 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -428,6 +428,7 @@ public class AzkabanWebServer implements AzkabanServer {
 			public void run() {
 				logger.info("Shutting down http server...");
 				try {
+					app.getScheduleManager().shutdown();
 					server.stop();
 					server.destroy();
 				} 
@@ -435,6 +436,7 @@ public class AzkabanWebServer implements AzkabanServer {
 					logger.error("Error while shutting down http server.", e);
 				}
 				logger.info("kk thx bye.");
+				System.exit(0);
 			}
 		});
 		logger.info("Server running on port " + sslPortNumber + ".");
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 581f5c1..df9dff9 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -32,7 +32,6 @@ import javax.servlet.http.HttpServletResponse;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutionReference;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.ExecutorManagerException;
@@ -264,6 +263,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				else if (ajaxName.equals("fetchExecJobLogs")) {
 					ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
 				}
+				else if (ajaxName.equals("retryFailedJobs")) {
+					ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
+				}
 				else if (ajaxName.equals("flowInfo")) {
 					//String projectName = getParam(req, "project");
 					//Project project = projectManager.getProject(projectName);
@@ -295,6 +297,27 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
+	private void ajaxRestartFailed(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.EXECUTE);
+		if (project == null) {
+			return;
+		}
+		
+		if (exFlow.getStatus() == Status.FAILED || exFlow.getStatus() == Status.SUCCEEDED) {
+			ret.put("error", "Flow has already finished. Please re-execute.");
+			return;
+		}
+		
+		String jobs = getParam(req, "jobIds");
+		String[] jobIds = jobs.split("\\s*,\\s*");
+		
+		try {
+			executorManager.retryExecutingJobs(exFlow, user.getUserId(), jobIds);
+		} catch (ExecutorManagerException e) {
+			ret.put("error", e.getMessage());
+		}
+	}
+	
 	/**
 	 * Gets the logs through plain text stream to reduce memory overhead.
 	 * 
@@ -349,11 +372,19 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		
 		int offset = this.getIntParam(req, "offset");
 		int length = this.getIntParam(req, "length");
+		
 		String jobId = this.getParam(req, "jobId");
 		resp.setCharacterEncoding("utf-8");
 
 		try {
-			LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length);
+			ExecutableNode node = exFlow.getExecutableNode(jobId);
+			if (node == null) {
+				ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+				return;
+			}
+			
+			int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+			LogData data = executorManager.getExecutionJobLog(exFlow, jobId, offset, length, attempt);
 			if (data == null) {
 				ret.put("length", 0);
 				ret.put("offset", offset);
@@ -552,6 +583,15 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			nodeObj.put("startTime", node.getStartTime());
 			nodeObj.put("endTime", node.getEndTime());
 			
+			// Add past attempts
+			if (node.getPastAttemptList() != null) {
+				ArrayList<Object> pastAttempts = new ArrayList<Object>();
+				for (ExecutableNode.Attempt attempt: node.getPastAttemptList()) {
+					pastAttempts.add(attempt.toObject());
+				}
+				nodeObj.put("pastAttempts", pastAttempts);
+			}
+			
 			nodeList.add(nodeObj);
 			
 			// Add edges
@@ -651,13 +691,23 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				}
 			}
 		}
-		else {
-			// Setup disabled
-			Map<String, String> paramGroup = this.getParamGroup(req, "disable");
-			for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
-				boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
-	
-				exflow.setNodeStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
+		else if (hasParam(req, "disabled")) {
+			String disabled = getParam(req, "disabled");
+			String[] disabledNodes = disabled.split("\\s*,\\s*");
+			
+			for (String node: disabledNodes) {
+				if (!node.isEmpty()) {
+					exflow.setNodeStatus(node, Status.DISABLED);
+				}
+			}
+		}
+		
+		if (hasParam(req, "restartExecutionId")) {
+			int externalExecutionId = getIntParam(req, "restartExecutionId");
+			String proxyJobs = getParam(req, "proxyJobs");
+			String[] proxyJobsArray = proxyJobs.split("\\s*,\\s*");
+			for (String nodeId: proxyJobsArray) {
+				exflow.setProxyNodes(externalExecutionId, nodeId);
 			}
 		}
 		
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 3a2e851..735a251 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -84,6 +84,7 @@
 							<li><div id="pausebtn" class="btn2">Pause</div></li>
 							<li><div id="resumebtn" class="btn2">Resume</div></li>
 							<li><div id="cancelbtn" class="btn6">Cancel</div></li>
+							<li><div id="retrybtn" class="btn1">Retry Failed</div></li>
 							<li><div id="executebtn" class="btn1">Prepare Execution</div></li>
 						</ul>
 					</div>
@@ -142,6 +143,7 @@
 
 #parse( "azkaban/webapp/servlet/velocity/executionoptionspanel.vm" )
 #end
+
 		<ul id="jobMenu" class="contextMenu">  
 			<li class="open"><a href="#open">Open...</a></li>
 			<li class="openwindow"><a href="#openwindow">Open in New Window...</a></li>
diff --git a/src/sql/create_execution_job.sql b/src/sql/create_execution_job.sql
index bcaf90a..b263128 100644
--- a/src/sql/create_execution_job.sql
+++ b/src/sql/create_execution_job.sql
@@ -4,13 +4,15 @@ CREATE TABLE execution_jobs (
 	version INT NOT NULL,
 	flow_id VARCHAR(128) NOT NULL,
 	job_id VARCHAR(128) NOT NULL,
+	attempt INT,
 	start_time BIGINT,
 	end_time BIGINT,
 	status TINYINT,
 	input_params LONGBLOB,
 	output_params LONGBLOB,
 	attachments LONGBLOB,
-	PRIMARY KEY (exec_id, job_id),
+	PRIMARY KEY (exec_id, job_id, attempt),
+	INDEX exec_job (exec_id, job_id),
 	INDEX exec_id (exec_id),
 	INDEX job_id (project_id, job_id)
-) ENGINE=InnoDB;
\ No newline at end of file
+) ENGINE=InnoDB;
diff --git a/src/sql/create_execution_logs.sql b/src/sql/create_execution_logs.sql
index 330d4b2..07e0b7b 100644
--- a/src/sql/create_execution_logs.sql
+++ b/src/sql/create_execution_logs.sql
@@ -1,10 +1,12 @@
 CREATE TABLE execution_logs (
 	exec_id INT NOT NULL,
 	name VARCHAR(128),
+	attempt INT,
 	enc_type TINYINT,
 	start_byte INT,
 	end_byte INT,
 	log LONGBLOB,
+	PRIMARY KEY (exec_id, name, attempt),
 	INDEX log_index (exec_id, name),
 	INDEX byte_log_index(exec_id, name, start_byte, end_byte)
 ) ENGINE=InnoDB;
diff --git a/src/sql/update_2.0_to_2.01.sql b/src/sql/update_2.0_to_2.01.sql
new file mode 100644
index 0000000..010f5ed
--- /dev/null
+++ b/src/sql/update_2.0_to_2.01.sql
@@ -0,0 +1,13 @@
+/*
+For 2.01 Adds the attempt column to execution_jobs
+*/
+ALTER TABLE execution_jobs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_jobs DROP PRIMARY KEY;
+ALTER TABLE execution_jobs ADD PRIMARY KEY(exec_id, job_id, attempt);
+ALTER TABLE execution_jobs ADD INDEX exec_job (exec_id, job_id);
+
+ALTER TABLE execution_logs ADD COLUMN attempt INT DEFAULT 0;
+ALTER TABLE execution_logs DROP PRIMARY KEY;
+ALTER TABLE execution_logs ADD PRIMARY KEY(exec_id, name, attempt);
+
+
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index d8e80fa..dc9547a 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -248,11 +248,19 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 			}
 		}
 	  	
+	  	var disabled = "";
+	  	var disabledMap = this.cloneModel.get('disabled');
+	  	for (var dis in disabledMap) {
+	  		if (disabledMap[dis]) {
+	  			disabled += dis + ",";
+	  		}
+	  	}
+	  	
 	  	var executingData = {
 	  		project: projectName,
 	  		ajax: "executeFlow",
 	  		flow: flowName,
-	  		disable: this.cloneModel.get('disabled'),
+	  		disabled: disabled,
 	  		failureAction: failureAction,
 	  		failureEmails: failureEmails,
 	  		successEmails: successEmails,
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 91c72aa..cd6382c 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -76,6 +76,7 @@ azkaban.FlowTabView= Backbone.View.extend({
   	"click #executebtn" : "handleRestartClick",
   	"click #pausebtn" : "handlePauseClick",
   	"click #resumebtn" : "handleResumeClick",
+  	"click #retrybtn" : "handleRetryClick"
   },
   initialize : function(settings) {
   	$("#cancelbtn").hide();
@@ -130,6 +131,7 @@ azkaban.FlowTabView= Backbone.View.extend({
   	$("#executebtn").hide();
   	$("#pausebtn").hide();
   	$("#resumebtn").hide();
+  	$("#retrybtn").hide();
 
   	if(data.status=="SUCCEEDED") {
   	  	$("#executebtn").show();
@@ -139,7 +141,8 @@ azkaban.FlowTabView= Backbone.View.extend({
   	}
   	else if (data.status=="FAILED_FINISHING") {
   		$("#cancelbtn").show();
-  		$("#executebtn").show();
+  		$("#executebtn").hide();
+  		$("#retrybtn").show();
   	}
   	else if (data.status=="RUNNING") {
   		$("#cancelbtn").show();
@@ -174,6 +177,36 @@ azkaban.FlowTabView= Backbone.View.extend({
       	}
       );
   },
+  handleRetryClick : function(evt) {
+      var graphData = graphModel.get("data");
+  
+  	  var failedJobs = new Array();
+  	  var failedJobStr = "";
+  	  var nodes = graphData.nodes;
+  	  for (var i = 0; i < nodes.length; ++i) {
+		var node = nodes[i];
+		if(node.status=='FAILED') {
+			failedJobs.push(node.id);
+		} 
+  	  }
+  	  failedJobStr = failedJobs.join();
+  
+      var requestURL = contextURL + "/executor";
+	  ajaxCall(
+		requestURL,
+		{"execid": execId, "ajax":"retryFailedJobs", "jobIds":failedJobStr},
+		function(data) {
+          console.log("cancel clicked");
+          if (data.error) {
+          	showDialog("Error", data.error);
+          }
+          else {
+            showDialog("Retry", "Flow has been retried.");
+            setTimeout(function() {updateStatus();}, 1100);
+          }
+      	}
+      );
+  },
   handleRestartClick : function(evt) {
   	  	executeFlowView.show();
   },

unit/build.xml 26(+24 -2)

diff --git a/unit/build.xml b/unit/build.xml
index d949de5..b35b866 100644
--- a/unit/build.xml
+++ b/unit/build.xml
@@ -8,9 +8,18 @@
 	
 	<property name="java.src.dir" value="${base.dir}/unit/java" />
 	<property name="job.conf.dir" value="${base.dir}/unit/executions/exectest1" />
-
+	<property name="job.conf.dir2" value="${base.dir}/unit/executions/exectest2" />
+	
 	<property environment="env" />
 
+	<path id="main.classpath">
+		<fileset dir="../lib">
+			<include name="*.jar" />
+		</fileset>
+
+		<pathelement path="${dist.classes.dir}" />
+	</path>
+	
 	<!-- set the build number based on environment variable, otherwise blank -->
 	<property environment="env" description="System environment variables (including those set by Hudson)" />
 
@@ -29,7 +38,8 @@
 		<mkdir dir="${dist.packages.dir}" />
 		
 		<javac fork="true" destdir="${dist.classes.dir}"
-			target="1.6" debug="true" deprecation="false" failonerror="true" srcdir="${java.src.dir}/azkaban/test/executor" includes="*Job.java">
+			target="1.6" debug="true" deprecation="false" failonerror="true" srcdir="${java.src.dir}/azkaban/test/executor" includes="SleepJavaJob.java">
+			<classpath refid="main.classpath" />
 		</javac>
 	</target>
 	
@@ -54,4 +64,16 @@
 		</zip>
 	</target>
 
+	<target name="package-exectest2" depends="jars" description="Creates a test zip">
+		<delete dir="${dist.packages.dir}" />
+		<mkdir dir="${dist.packages.dir}" />
+		
+		<!-- Tarball it -->
+		<zip destfile="${dist.packages.dir}/exectest2.zip">
+			<zipfileset dir="${dist.jar.dir}" />
+			<zipfileset dir="${job.conf.dir2}" />
+		</zip>
+	</target>
+
+	
 </project>
diff --git a/unit/executions/exectest2/failflow.job b/unit/executions/exectest2/failflow.job
new file mode 100644
index 0000000..bfd7ce6
--- /dev/null
+++ b/unit/executions/exectest2/failflow.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=5
+fail=false
+dependencies=myjob3,myjob5
\ No newline at end of file
diff --git a/unit/executions/exectest2/myjob1.job b/unit/executions/exectest2/myjob1.job
new file mode 100644
index 0000000..917929e
--- /dev/null
+++ b/unit/executions/exectest2/myjob1.job
@@ -0,0 +1,4 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=5
+fail=false
diff --git a/unit/executions/exectest2/myjob2-fail20.job b/unit/executions/exectest2/myjob2-fail20.job
new file mode 100644
index 0000000..4e02239
--- /dev/null
+++ b/unit/executions/exectest2/myjob2-fail20.job
@@ -0,0 +1,6 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=20
+fail=true
+passRetry=2
+dependencies=myjob1
\ No newline at end of file
diff --git a/unit/executions/exectest2/myjob2-fail30.job b/unit/executions/exectest2/myjob2-fail30.job
new file mode 100644
index 0000000..908bfed
--- /dev/null
+++ b/unit/executions/exectest2/myjob2-fail30.job
@@ -0,0 +1,6 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=30
+fail=true
+passRetry=2
+dependencies=myjob1
\ No newline at end of file
diff --git a/unit/executions/exectest2/myjob2-pass50.job b/unit/executions/exectest2/myjob2-pass50.job
new file mode 100644
index 0000000..d9553ca
--- /dev/null
+++ b/unit/executions/exectest2/myjob2-pass50.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=50
+fail=false
+dependencies=myjob1
\ No newline at end of file
diff --git a/unit/executions/exectest2/myjob3.job b/unit/executions/exectest2/myjob3.job
new file mode 100644
index 0000000..fe614fa
--- /dev/null
+++ b/unit/executions/exectest2/myjob3.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=240
+fail=false
+dependencies=myjob2-pass50
diff --git a/unit/executions/exectest2/myjob4.job b/unit/executions/exectest2/myjob4.job
new file mode 100644
index 0000000..d81f9f2
--- /dev/null
+++ b/unit/executions/exectest2/myjob4.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=20
+fail=false
+dependencies=myjob2-fail20
\ No newline at end of file
diff --git a/unit/executions/exectest2/myjob5.job b/unit/executions/exectest2/myjob5.job
new file mode 100644
index 0000000..9b17129
--- /dev/null
+++ b/unit/executions/exectest2/myjob5.job
@@ -0,0 +1,5 @@
+type=java
+job.class=azkaban.test.executor.SleepJavaJob
+seconds=20
+fail=false
+dependencies=myjob4,myjob2-fail30
\ No newline at end of file
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 8030d93..124c5d9 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -60,7 +60,7 @@ public class MockExecutorLoader implements ExecutorLoader {
 	}
 
 	@Override
-	public void uploadLogFile(int execId, String name, File... files) throws ExecutorManagerException {
+	public void uploadLogFile(int execId, String name, int attempt, File... files) throws ExecutorManagerException {
 
 	}
 
@@ -80,7 +80,7 @@ public class MockExecutorLoader implements ExecutorLoader {
 	}
 
 	@Override
-	public void updateExecutableNode(ExecutableNode node, Props outputParams) throws ExecutorManagerException {
+	public void updateExecutableNode(ExecutableNode node) throws ExecutorManagerException {
 		ExecutableNode foundNode = nodes.get(node.getJobId());
 		foundNode.setEndTime(node.getEndTime());
 		foundNode.setStartTime(node.getStartTime());
@@ -118,7 +118,7 @@ public class MockExecutorLoader implements ExecutorLoader {
 	}
 
 	@Override
-	public ExecutableJobInfo fetchJobInfo(int execId, String jobId)
+	public ExecutableJobInfo fetchJobInfo(int execId, String jobId, int attempt)
 			throws ExecutorManagerException {
 		// TODO Auto-generated method stub
 		return null;
@@ -131,7 +131,7 @@ public class MockExecutorLoader implements ExecutorLoader {
 	}
 
 	@Override
-	public LogData fetchLogs(int execId, String name, int startByte, int endByte) throws ExecutorManagerException {
+	public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException {
 		// TODO Auto-generated method stub
 		return null;
 	}
@@ -162,5 +162,29 @@ public class MockExecutorLoader implements ExecutorLoader {
 		return 0;
 	}
 
+	@Override
+	public Props fetchExecutionJobInputProps(int execId, String jobId) throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public Props fetchExecutionJobOutputProps(int execId, String jobId) throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public Pair<Props, Props> fetchExecutionJobProps(int execId, String jobId) throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public List<ExecutableJobInfo> fetchJobInfoAttempts(int execId, String jobId) throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
 
 }
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 19175a1..ba673dd 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -221,6 +221,7 @@ public class JdbcExecutorLoaderTest {
 		Assert.assertEquals(new HashSet<String>(flow.getEndNodes()), new HashSet<String>(fetchFlow.getEndNodes()));
 	}
 	
+	
 	@Test
 	public void testUploadExecutableNode() throws Exception {
 		if (!isTestSetup()) {
@@ -228,16 +229,17 @@ public class JdbcExecutorLoaderTest {
 		}
 		
 		ExecutorLoader loader = createLoader();
-		ExecutableFlow flow = createExecutableFlow("exec1");
+		ExecutableFlow flow = createExecutableFlow(10, "exec1");
 		flow.setExecutionId(10);
 		
 		File jobFile = new File(flowDir, "job10.job");
 		Props props = new Props(null, jobFile);
 		props.put("test","test2");
 		ExecutableNode oldNode = flow.getExecutableNode("job10");
+		oldNode.setStartTime(System.currentTimeMillis());
 		loader.uploadExecutableNode(oldNode, props);
 		
-		ExecutableJobInfo info = loader.fetchJobInfo(10, "job10");
+		ExecutableJobInfo info = loader.fetchJobInfo(10, "job10", 0);
 		Assert.assertEquals(flow.getExecutionId(), info.getExecId());
 		Assert.assertEquals(flow.getProjectId(), info.getProjectId());
 		Assert.assertEquals(flow.getVersion(), info.getVersion());
@@ -246,6 +248,23 @@ public class JdbcExecutorLoaderTest {
 		Assert.assertEquals(oldNode.getStatus(), info.getStatus());
 		Assert.assertEquals(oldNode.getStartTime(), info.getStartTime());
 		Assert.assertEquals("endTime = " + oldNode.getEndTime() + " info endTime = " + info.getEndTime(), oldNode.getEndTime(), info.getEndTime());
+	
+		// Fetch props
+		Props outputProps = new Props();
+		outputProps.put("hello", "output");
+		oldNode.setOutputProps(outputProps);
+		oldNode.setEndTime(System.currentTimeMillis());
+		loader.updateExecutableNode(oldNode);
+
+		Props fInputProps = loader.fetchExecutionJobInputProps(10, "job10");
+		Props fOutputProps = loader.fetchExecutionJobOutputProps(10, "job10");
+		Pair<Props,Props> inOutProps = loader.fetchExecutionJobProps(10, "job10");
+		
+		Assert.assertEquals(fInputProps.get("test"), "test2");
+		Assert.assertEquals(fOutputProps.get("hello"), "output");
+		Assert.assertEquals(inOutProps.getFirst().get("test"), "test2");
+		Assert.assertEquals(inOutProps.getSecond().get("hello"), "output");
+		
 	}
 	
 	@Test
@@ -307,19 +326,19 @@ public class JdbcExecutorLoaderTest {
 		File[] smalllog = {new File(logDir, "log1.log"), new File(logDir, "log2.log"), new File(logDir, "log3.log")};
 
 		ExecutorLoader loader = createLoader();
-		loader.uploadLogFile(1, "smallFiles", smalllog);
+		loader.uploadLogFile(1, "smallFiles", 0, smalllog);
 		
-		LogData data = loader.fetchLogs(1, "smallFiles", 0, 50000);
+		LogData data = loader.fetchLogs(1, "smallFiles", 0, 0, 50000);
 		Assert.assertNotNull(data);
 		Assert.assertEquals("Logs length is " + data.getLength(), data.getLength(), 53);
 		
 		System.out.println(data.toString());
 	
-		LogData data2 = loader.fetchLogs(1, "smallFiles", 10, 20);
-		Assert.assertNotNull(data2);
-		Assert.assertEquals("Logs length is " + data2.getLength(), data2.getLength(), 10);
-		
+		LogData data2 = loader.fetchLogs(1, "smallFiles", 0, 10, 20);
 		System.out.println(data2.toString());
+		Assert.assertNotNull(data2);
+		Assert.assertEquals("Logs length is " + data2.getLength(), data2.getLength(), 20);
+
 	}
 	
 	@Test
@@ -330,30 +349,41 @@ public class JdbcExecutorLoaderTest {
 		File[] largelog = {new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"), new File(logDir, "largeLog3.log")};
 
 		ExecutorLoader loader = createLoader();
-		loader.uploadLogFile(1, "largeFiles", largelog);
+		loader.uploadLogFile(1, "largeFiles",0, largelog);
 		
-		LogData logsResult = loader.fetchLogs(1, "largeFiles", 0, 64000);
+		LogData logsResult = loader.fetchLogs(1, "largeFiles",0, 0, 64000);
 		Assert.assertNotNull(logsResult);
 		Assert.assertEquals("Logs length is " + logsResult.getLength(), logsResult.getLength(), 64000);
 		
-		LogData logsResult2 = loader.fetchLogs(1, "largeFiles", 1000, 64000);
+		LogData logsResult2 = loader.fetchLogs(1, "largeFiles",0, 1000, 64000);
 		Assert.assertNotNull(logsResult2);
-		Assert.assertEquals("Logs length is " + logsResult2.getLength(), logsResult2.getLength(), 63000);
+		Assert.assertEquals("Logs length is " + logsResult2.getLength(), logsResult2.getLength(), 64000);
 		
-		LogData logsResult3 = loader.fetchLogs(1, "largeFiles", 330000, 400000);
+		LogData logsResult3 = loader.fetchLogs(1, "largeFiles",0, 330000, 400000);
 		Assert.assertNotNull(logsResult3);
 		Assert.assertEquals("Logs length is " + logsResult3.getLength(), logsResult3.getLength(), 5493);
 		
-		LogData logsResult4 = loader.fetchLogs(1, "largeFiles", 340000, 400000);
+		LogData logsResult4 = loader.fetchLogs(1, "largeFiles",0, 340000, 400000);
 		Assert.assertNull(logsResult4);
 		
-		LogData logsResult5 = loader.fetchLogs(1, "largeFiles", 153600, 204800);
+		LogData logsResult5 = loader.fetchLogs(1, "largeFiles",0, 153600, 204800);
 		Assert.assertNotNull(logsResult5);
-		Assert.assertEquals("Logs length is " + logsResult5.getLength(), logsResult5.getLength(), 51200);
+		Assert.assertEquals("Logs length is " + logsResult5.getLength(), logsResult5.getLength(), 181893);
 		
-		LogData logsResult6 = loader.fetchLogs(1, "largeFiles", 150000, 250000);
+		LogData logsResult6 = loader.fetchLogs(1, "largeFiles",0, 150000, 250000);
 		Assert.assertNotNull(logsResult6);
-		Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 100000);
+		Assert.assertEquals("Logs length is " + logsResult6.getLength(), logsResult6.getLength(), 185493);
+	}
+	
+	private ExecutableFlow createExecutableFlow(int executionId, String flowName) throws IOException {
+		File jsonFlowFile = new File(flowDir, flowName + ".flow");
+		@SuppressWarnings("unchecked")
+		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
+		
+		Flow flow = Flow.flowFromObject(flowObj);
+		ExecutableFlow execFlow = new ExecutableFlow(executionId, flow);
+
+		return execFlow;
 	}
 	
 	private ExecutableFlow createExecutableFlow(String flowName) throws IOException {
diff --git a/unit/java/azkaban/test/executor/SleepJavaJob.java b/unit/java/azkaban/test/executor/SleepJavaJob.java
index 29b0bb8..0bba1ec 100644
--- a/unit/java/azkaban/test/executor/SleepJavaJob.java
+++ b/unit/java/azkaban/test/executor/SleepJavaJob.java
@@ -1,12 +1,17 @@
 package azkaban.test.executor;
 
+import java.io.File;
+import java.io.FileFilter;
 import java.util.Map;
 
 public class SleepJavaJob {
 	private boolean fail;
 	private String seconds;
+	private int attempts;
+	private String id;
 
 	public SleepJavaJob(String id, Map<String, String> parameters) {
+		this.id = id;
 		String failStr = parameters.get("fail");
 		if (failStr == null || failStr.equals("false")) {
 			fail = false;
@@ -15,6 +20,13 @@ public class SleepJavaJob {
 			fail = true;
 		}
 	
+		String attemptString = parameters.get("passRetry");
+		if (attemptString == null) {
+			attempts = -1;
+		}
+		else {
+			attempts = Integer.valueOf(attemptString);
+		}
 		seconds = parameters.get("seconds");
 		System.out.println("Properly created");
 	}
@@ -34,8 +46,19 @@ public class SleepJavaJob {
 			}
 		}
 		
+		File file = new File("");
+		File[] attemptFiles = file.listFiles(new FileFilter() {
+			@Override
+			public boolean accept(File pathname) {
+				return pathname.getName().startsWith(id);
+			}});
+		
 		if (fail) {
-			throw new Exception("I failed because I had to.");
+			if (attempts <= 0 || attemptFiles == null || attemptFiles.length > attempts) {
+				File attemptFile = new File(file, id + "." + (attemptFiles == null ? 0 : attemptFiles.length));
+				attemptFile.mkdir();
+				throw new Exception("I failed because I had to.");
+			}
 		}
 	}
 	
@@ -46,4 +69,6 @@ public class SleepJavaJob {
 			this.notifyAll();
 		}
 	}
+	
+
 }