azkaban-uncached

Adding job log page.

10/16/2012 12:40:42 AM

Details

.classpath 3(+3 -0)

diff --git a/.classpath b/.classpath
index 7b8b139..12ee55d 100644
--- a/.classpath
+++ b/.classpath
@@ -29,5 +29,8 @@
 	<classpathentry kind="lib" path="plugins/ldap/li-azkaban-ldap.jar"/>
 	<classpathentry kind="lib" path="lib/commons-email-1.2.jar"/>
 	<classpathentry kind="lib" path="lib/mail-1.4.5.jar"/>
+	<classpathentry kind="lib" path="lib/avro-1.4.1.jar"/>
+	<classpathentry kind="lib" path="lib/commons-configuration-1.8.jar"/>
+	<classpathentry kind="lib" path="lib/voldemort-0.96.jar"/>
 	<classpathentry kind="output" path="dist/classes"/>
 </classpath>
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 791df64..b3e3a81 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -131,7 +131,7 @@ public class ExecutableFlow {
 			startNodes = new ArrayList<String>();
 			for (ExecutableNode node: executableNodes.values()) {
 				if (node.getInNodes().isEmpty()) {
-					startNodes.add(node.id);
+					startNodes.add(node.getId());
 				}
 			}
 		}
@@ -144,7 +144,7 @@ public class ExecutableFlow {
 			endNodes = new ArrayList<String>();
 			for (ExecutableNode node: executableNodes.values()) {
 				if (node.getOutNodes().isEmpty()) {
-					endNodes.add(node.id);
+					endNodes.add(node.getId());
 				}
 			}
 		}
@@ -405,151 +405,4 @@ public class ExecutableFlow {
 	public boolean getNotifyOnLastFailure() {
 		return this.notifyOnLastFailure;
 	}
-	
-	public static class ExecutableNode {
-		private String id;
-
-		private String type;
-		private String jobPropsSource;
-		private String inheritPropsSource;
-		private String outputPropsSource;
-		private Status status = Status.READY;
-		private long startTime = -1;
-		private long endTime = -1;
-		private int level = 0;
-		private ExecutableFlow flow;
-		
-		private Set<String> inNodes = new HashSet<String>();
-		private Set<String> outNodes = new HashSet<String>();
-		
-		private ExecutableNode(Node node, ExecutableFlow flow) {
-			id = node.getId();
-			type = node.getType();
-			jobPropsSource = node.getJobSource();
-			inheritPropsSource = node.getPropsSource();
-			status = Status.READY;
-			level = node.getLevel();
-			this.flow = flow;
-		}
-		
-		public ExecutableNode() {
-		}
-		
-		public String getId() {
-			return id;
-		}
-
-		public void setId(String id) {
-			this.id = id;
-		}
-		
-		public void addInNode(String exNode) {
-			inNodes.add(exNode);
-		}
-		
-		public void addOutNode(String exNode) {
-			outNodes.add(exNode);
-		}
-		
-		public Set<String> getOutNodes() {
-			return outNodes;
-		}
-		
-		public Set<String> getInNodes() {
-			return inNodes;
-		}
-		
-		public Status getStatus() {
-			return status;
-		}
-		
-		public void setStatus(Status status) {
-			this.status = status;
-		}
-		
-		public Object toObject() {
-			HashMap<String, Object> objMap = new HashMap<String, Object>();
-			objMap.put("id", id);
-			objMap.put("jobSource", jobPropsSource);
-			objMap.put("propSource", inheritPropsSource);
-			objMap.put("jobType", type);
-			objMap.put("status", status.toString());
-			objMap.put("inNodes", inNodes);
-			objMap.put("outNodes", outNodes);
-			objMap.put("startTime", startTime);
-			objMap.put("endTime", endTime);
-			objMap.put("level", level);
-			
-			if (outputPropsSource != null) {
-				objMap.put("outputSource", outputPropsSource);
-			}
-			
-			return objMap;
-		}
-
-		@SuppressWarnings("unchecked")
-		public static ExecutableNode createNodeFromObject(Object obj, ExecutableFlow flow) {
-			ExecutableNode exNode = new ExecutableNode();
-			
-			HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
-			exNode.id = (String)objMap.get("id");
-			exNode.jobPropsSource = (String)objMap.get("jobSource");
-			exNode.inheritPropsSource = (String)objMap.get("propSource");
-			exNode.outputPropsSource = (String)objMap.get("outputSource");
-			exNode.type = (String)objMap.get("jobType");
-			exNode.status = Status.valueOf((String)objMap.get("status"));
-			
-			exNode.inNodes.addAll( (List<String>)objMap.get("inNodes") );
-			exNode.outNodes.addAll( (List<String>)objMap.get("outNodes") );
-			
-			exNode.startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
-			exNode.endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
-			exNode.level = (Integer)objMap.get("level");
-			
-			exNode.flow = flow;
-			
-			return exNode;
-		}
-		
-		@SuppressWarnings("unchecked")
-		public void updateNodeFromObject(Object obj) {
-			HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
-			status = Status.valueOf((String)objMap.get("status"));
-
-			startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
-			endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
-		}
-		
-		public long getStartTime() {
-			return startTime;
-		}
-
-		public void setStartTime(long startTime) {
-			this.startTime = startTime;
-		}
-
-		public long getEndTime() {
-			return endTime;
-		}
-
-		public void setEndTime(long endTime) {
-			this.endTime = endTime;
-		}
-		
-		public String getJobPropsSource() {
-			return jobPropsSource;
-		}
-
-		public String getPropsSource() {
-			return inheritPropsSource;
-		}
-		
-		public int getLevel() {
-			return level;
-		}
-		
-		public ExecutableFlow getFlow() {
-			return flow;
-		}
-	}
 }
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
new file mode 100644
index 0000000..cfb4387
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -0,0 +1,157 @@
+package azkaban.executor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.flow.Node;
+import azkaban.utils.JSONUtils;
+
+public class ExecutableNode {
+	private String id;
+
+	private String type;
+	private String jobPropsSource;
+	private String inheritPropsSource;
+	private String outputPropsSource;
+	private Status status = Status.READY;
+	private long startTime = -1;
+	private long endTime = -1;
+	private int level = 0;
+	private ExecutableFlow flow;
+	
+	private Set<String> inNodes = new HashSet<String>();
+	private Set<String> outNodes = new HashSet<String>();
+	
+	public ExecutableNode(Node node, ExecutableFlow flow) {
+		id = node.getId();
+		type = node.getType();
+		jobPropsSource = node.getJobSource();
+		inheritPropsSource = node.getPropsSource();
+		status = Status.READY;
+		level = node.getLevel();
+		this.flow = flow;
+	}
+	
+	public ExecutableNode() {
+	}
+	
+	public String getId() {
+		return id;
+	}
+
+	public void setId(String id) {
+		this.id = id;
+	}
+	
+	public void addInNode(String exNode) {
+		inNodes.add(exNode);
+	}
+	
+	public void addOutNode(String exNode) {
+		outNodes.add(exNode);
+	}
+	
+	public Set<String> getOutNodes() {
+		return outNodes;
+	}
+	
+	public Set<String> getInNodes() {
+		return inNodes;
+	}
+	
+	public Status getStatus() {
+		return status;
+	}
+	
+	public void setStatus(Status status) {
+		this.status = status;
+	}
+	
+	public Object toObject() {
+		HashMap<String, Object> objMap = new HashMap<String, Object>();
+		objMap.put("id", id);
+		objMap.put("jobSource", jobPropsSource);
+		objMap.put("propSource", inheritPropsSource);
+		objMap.put("jobType", type);
+		objMap.put("status", status.toString());
+		objMap.put("inNodes", inNodes);
+		objMap.put("outNodes", outNodes);
+		objMap.put("startTime", startTime);
+		objMap.put("endTime", endTime);
+		objMap.put("level", level);
+		
+		if (outputPropsSource != null) {
+			objMap.put("outputSource", outputPropsSource);
+		}
+		
+		return objMap;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static ExecutableNode createNodeFromObject(Object obj, ExecutableFlow flow) {
+		ExecutableNode exNode = new ExecutableNode();
+		
+		HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
+		exNode.id = (String)objMap.get("id");
+		exNode.jobPropsSource = (String)objMap.get("jobSource");
+		exNode.inheritPropsSource = (String)objMap.get("propSource");
+		exNode.outputPropsSource = (String)objMap.get("outputSource");
+		exNode.type = (String)objMap.get("jobType");
+		exNode.status = Status.valueOf((String)objMap.get("status"));
+		
+		exNode.inNodes.addAll( (List<String>)objMap.get("inNodes") );
+		exNode.outNodes.addAll( (List<String>)objMap.get("outNodes") );
+		
+		exNode.startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
+		exNode.endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
+		exNode.level = (Integer)objMap.get("level");
+		
+		exNode.flow = flow;
+		
+		return exNode;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public void updateNodeFromObject(Object obj) {
+		HashMap<String, Object> objMap = (HashMap<String,Object>)obj;
+		status = Status.valueOf((String)objMap.get("status"));
+
+		startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
+		endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
+	}
+	
+	public long getStartTime() {
+		return startTime;
+	}
+
+	public void setStartTime(long startTime) {
+		this.startTime = startTime;
+	}
+
+	public long getEndTime() {
+		return endTime;
+	}
+
+	public void setEndTime(long endTime) {
+		this.endTime = endTime;
+	}
+	
+	public String getJobPropsSource() {
+		return jobPropsSource;
+	}
+
+	public String getPropsSource() {
+		return inheritPropsSource;
+	}
+	
+	public int getLevel() {
+		return level;
+	}
+	
+	public ExecutableFlow getFlow() {
+		return flow;
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 03dace2..57dcf13 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -40,7 +40,6 @@ import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
 
-import azkaban.executor.ExecutableFlow.ExecutableNode;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.flow.Flow;
 import azkaban.utils.ExecutableFlowLoader;
@@ -55,6 +54,8 @@ import azkaban.webapp.AzkabanExecutorServer;
 public class ExecutorManager {
 	private static final String ACTIVE_DIR = ".active";
 	private static final String ARCHIVE_DIR = ".archive";
+	private static final String JOB_EXECUTION_DIR = ".jobexecutions";
+	
 	private static Logger logger = Logger.getLogger(ExecutorManager.class);
 	// 30 seconds of retry before failure.
 	private static final long ACCESS_ERROR_THRESHOLD_MS = 30000;
@@ -124,6 +125,40 @@ public class ExecutorManager {
 		manager.addCache(recentFlowsCache);
 	}
 	
+	public int getJobHistory(String projectId, String jobId, int numResults, int skip, List<NodeStatus> nodes) throws ExecutorManagerException{
+		File flowProjectPath = new File(basePath, projectId);
+		if (!flowProjectPath.exists()) {
+			throw new ExecutorManagerException("Project " + projectId + " directory doesn't exist.");
+		}
+
+		File jobStatusDir = new File(flowProjectPath, JOB_EXECUTION_DIR + File.separator + jobId);
+		
+		File[] jobsStatusFiles = jobStatusDir.listFiles();
+		
+		if (jobsStatusFiles.length == 0 || skip >= jobsStatusFiles.length) {
+			return 0;
+		}
+		
+		Arrays.sort(jobsStatusFiles);
+		int index = (jobsStatusFiles.length - skip - 1);
+		
+		for (int count = 0; count < numResults && index >= 0; ++count, --index) {
+			File exDir = jobsStatusFiles[index];
+
+			NodeStatus status;
+			try {
+				status = NodeStatus.createNodeFromObject(JSONUtils.parseJSONFromFile(exDir));
+				if (status != null) {
+					nodes.add(status);
+				}
+			} catch (IOException e) {
+				throw new ExecutorManagerException(e.getMessage());
+			}
+		}
+
+		return jobsStatusFiles.length;
+	}
+	
 	public int getExecutableFlows(String projectId, String flowId, int from, int maxResults, List<ExecutableFlow> output) {
 		String projectPath = projectId + File.separator + flowId;
 		File flowProjectPath = new File(basePath, projectPath);
@@ -183,14 +218,14 @@ public class ExecutorManager {
 		return execFlows;
 	}
 
-	public List<ExecutionReference> getFlowHistory(String rePattern, int numResults, int skip) {
+	public List<ExecutionReference> getFlowHistory(String regexPattern, int numResults, int skip) {
 		ArrayList<ExecutionReference> searchFlows = new ArrayList<ExecutionReference>();
 
 		Pattern pattern;
 		try {
-			pattern = Pattern.compile(rePattern, Pattern.CASE_INSENSITIVE);
+			pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE);
 		} catch (PatternSyntaxException e) {
-			logger.error("Bad regex pattern " + rePattern);
+			logger.error("Bad regex pattern " + regexPattern);
 			return searchFlows;
 		}
 		
@@ -316,13 +351,19 @@ public class ExecutorManager {
 	public synchronized void setupExecutableFlow(ExecutableFlow exflow) throws ExecutorManagerException {
 		String executionId = exflow.getExecutionId();
 
-		String projectFlowDir = exflow.getProjectId() + File.separator + exflow.getFlowId() + File.separator + executionId;
-		File executionPath = new File(basePath, projectFlowDir);
+		File projectDir = new File(basePath, exflow.getProjectId());
+		String executionDir = exflow.getFlowId() + File.separator + executionId;
+		File executionPath = new File(projectDir, executionDir);
 		if (executionPath.exists()) {
 			throw new ExecutorManagerException("Execution path " + executionPath + " exists. Probably a simultaneous execution.");
 		}
 		
 		executionPath.mkdirs();
+		
+		// create job reference dir
+		File jobExecutionDir = new File(projectDir, JOB_EXECUTION_DIR);
+		jobExecutionDir.mkdirs();
+		
 		exflow.setExecutionPath(executionPath.getPath());
 	}
 
@@ -420,7 +461,7 @@ public class ExecutorManager {
 		}
 		
 		// Load last update
-		ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true);
+		updateFlowFromFile(exFlow);
 
 		// Return if already finished.
 		if (exFlow.getStatus() == Status.FAILED || 
@@ -1020,7 +1061,7 @@ public class ExecutorManager {
 					if (statusStr.equals(ConnectorParams.RESPONSE_NOTFOUND)) {
 						logger.info("Server status response for " + reference.toRefString() + " was 'notfound'. Cleaning up");
 						try {
-							ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, flow, true);
+							updateFlowFromFile(flow);
 							forceFail = true;
 						} catch (ExecutorManagerException e) {
 							logger.error("Error updating flow status " + flow.getExecutionId() + " from file.", e);
@@ -1033,7 +1074,7 @@ public class ExecutorManager {
 						
 						if (time > flow.getUpdateTime()) {
 							try {
-								ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, flow, true);
+								updateFlowFromFile(flow);
 								// Update reference
 								reference.setStartTime(flow.getStartTime());
 								reference.setEndTime(flow.getEndTime());
@@ -1087,6 +1128,14 @@ public class ExecutorManager {
 		}
 	}
 	
+	private void updateFlowFromFile(ExecutableFlow exFlow) throws ExecutorManagerException {
+		File executionDir = new File(exFlow.getExecutionPath());
+		if (ExecutableFlowLoader.updateFlowStatusFromFile(executionDir, exFlow, true)) {
+			// Move all the static directories if the update has changed.
+			File statusDir = new File(basePath, exFlow.getProjectId() + File.separator + JOB_EXECUTION_DIR);
+			ExecutableFlowLoader.moveJobStatusFiles(executionDir, statusDir);
+		}
+	}
 	
 	/**
 	 * Reference to a Flow Execution.
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index 51d8431..970045e 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -22,7 +22,6 @@ import org.apache.log4j.FileAppender;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
-import azkaban.executor.ExecutableFlow.ExecutableNode;
 import azkaban.executor.ExecutableFlow.FailureAction;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.event.Event;
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 5c79ea7..d04c878 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -11,7 +11,6 @@ import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
-import azkaban.executor.ExecutableFlow.ExecutableNode;
 import azkaban.executor.ExecutableFlow.Status;
 import azkaban.executor.event.Event;
 import azkaban.executor.event.Event.Type;
@@ -19,6 +18,7 @@ import azkaban.executor.event.EventHandler;
 import azkaban.jobExecutor.AbstractProcessJob;
 import azkaban.jobExecutor.Job;
 import azkaban.jobExecutor.utils.JobWrappingFactory;
+import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 
 public class JobRunner extends EventHandler implements Runnable {
@@ -90,6 +90,17 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 	}
 
+	private void writeStatus() {
+		NodeStatus status = new NodeStatus(this.node);
+		String statusName = "_job." + executionId + "." + node.getId() + ".status";
+		File statusFile = new File(workingDir, statusName);
+		try {
+			JSONUtils.toJSON(status.toObject(), statusFile);
+		} catch (IOException e) {
+			logger.error("Couldn't write status file.");
+		}
+	}
+	
 	@Override
 	public void run() {
 		node.setStartTime(System.currentTimeMillis());
@@ -106,11 +117,12 @@ public class JobRunner extends EventHandler implements Runnable {
 
 		createLogger();
 		this.node.setStatus(Status.WAITING);
-
+		
 		logInfo("Starting job " + node.getId() + " at " + node.getStartTime());
 		node.setStatus(Status.RUNNING);
 		this.fireEventListeners(Event.create(this, Type.JOB_STARTED));
-
+		writeStatus();
+		
 		boolean succeeded = true;
 
 		props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
@@ -149,6 +161,7 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 		logInfo("Finishing job " + node.getId() + " at " + node.getEndTime());
 		closeLogger();
+		writeStatus();
 	}
 
 	public synchronized void cancel() {
diff --git a/src/java/azkaban/executor/NodeStatus.java b/src/java/azkaban/executor/NodeStatus.java
new file mode 100644
index 0000000..5719740
--- /dev/null
+++ b/src/java/azkaban/executor/NodeStatus.java
@@ -0,0 +1,105 @@
+package azkaban.executor;
+
+import java.util.HashMap;
+
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.utils.JSONUtils;
+
+// Will need to remove these as we roll out database instead
+public class NodeStatus {
+	private final String execId;
+	private final String projectId;
+	private final String jobId;
+	private final String flowId;
+	private Status status;
+	private long startTime;
+	private long endTime;
+	
+	private NodeStatus(String execId, String projectId, String jobId, String flowId) {
+		this.execId = execId;
+		this.projectId = projectId;
+		this.jobId = jobId;
+		this.flowId = flowId;
+	}
+	
+	public NodeStatus(ExecutableNode node) {
+		this.execId = node.getFlow().getExecutionId();
+		this.projectId = node.getFlow().getProjectId();
+		this.jobId = node.getId();
+		this.status = node.getStatus();
+		this.startTime = node.getStartTime();
+		this.endTime = node.getEndTime();
+		this.flowId = node.getFlow().getFlowId();
+	}
+
+	public Status getStatus() {
+		return status;
+	}
+
+	public void setStatus(Status status) {
+		this.status = status;
+	}
+
+	public String getExecId() {
+		return execId;
+	}
+
+	public String getProjectId() {
+		return projectId;
+	}
+
+	public String getJobId() {
+		return jobId;
+	}
+	
+	public String getFlowId() {
+		return flowId;
+	}
+
+	public long getStartTime() {
+		return startTime;
+	}
+
+	public void setStartTime(long startTime) {
+		this.startTime = startTime;
+	}
+
+	public long getEndTime() {
+		return endTime;
+	}
+
+	public void setEndTime(long endTime) {
+		this.endTime = endTime;
+	}
+	
+	public Object toObject() {
+		HashMap<String, Object> objMap = new HashMap<String, Object>();
+		objMap.put("execId", execId);
+		objMap.put("jobId", jobId);
+		objMap.put("status", status.toString());
+		objMap.put("startTime", startTime);
+		objMap.put("endTime", endTime);
+		objMap.put("flowId", flowId);
+		return objMap;
+	}
+	
+	public static NodeStatus createNodeFromObject(Object obj) {
+		@SuppressWarnings("unchecked")
+		HashMap<String, Object> objMap = (HashMap<String, Object>)obj;
+		String execId = (String)objMap.get("execId");
+		String projectId = (String)objMap.get("projectId");
+		String jobId = (String)objMap.get("jobId");
+		String flowId = (String)objMap.get("flowId");
+		
+		NodeStatus nodeStatus = new NodeStatus(execId, projectId, jobId, flowId);
+		Status status = Status.valueOf((String)objMap.get("status"));
+		long startTime = JSONUtils.getLongFromObject(objMap.get("startTime"));
+		long endTime = JSONUtils.getLongFromObject(objMap.get("endTime"));
+		
+		nodeStatus.status = status;
+		nodeStatus.startTime = startTime;
+		nodeStatus.endTime = endTime;
+		
+		return nodeStatus;
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/utils/ExecutableFlowLoader.java b/src/java/azkaban/utils/ExecutableFlowLoader.java
index e7571d7..4e31562 100644
--- a/src/java/azkaban/utils/ExecutableFlowLoader.java
+++ b/src/java/azkaban/utils/ExecutableFlowLoader.java
@@ -11,6 +11,7 @@ import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.NodeStatus;
 
 public class ExecutableFlowLoader {
 	private static final Logger logger = Logger.getLogger(ExecutableFlowLoader.class.getName());
@@ -126,6 +127,30 @@ public class ExecutableFlowLoader {
 		return true;
 	}
 	
+	public static void moveJobStatusFiles(File exDir, File statusFileDir) {
+		File[] statusFiles = exDir.listFiles(new PrefixSuffixFilter("_job.", ".status"));
+		for (File file: statusFiles) {
+			try {
+				NodeStatus status = NodeStatus.createNodeFromObject(JSONUtils.parseJSONFromFile(file));
+				String jobId = status.getJobId();
+				
+				File jobStatusDir = new File(statusFileDir, jobId);
+				if (!jobStatusDir.exists()) {
+					jobStatusDir.mkdirs();
+				}
+				
+				File destFile = new File(jobStatusDir, file.getName());
+				if (destFile.exists()) {
+					destFile.delete();
+				}
+				
+				file.renameTo(destFile);
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
+	}
+	
 	/**
 	 * Write executable flow file
 	 * 
@@ -177,7 +202,7 @@ public class ExecutableFlowLoader {
 	 *
 	 */
 	private static class PrefixFilter implements FileFilter {
-		private String prefix;
+		private final String prefix;
 
 		public PrefixFilter(String prefix) {
 			this.prefix = prefix;
@@ -190,5 +215,24 @@ public class ExecutableFlowLoader {
 			return pathname.isFile() && !pathname.isHidden() && name.length() >= prefix.length() && name.startsWith(prefix);
 		}
 	}
+	
+	private static class PrefixSuffixFilter implements FileFilter {
+		private final String suffix;
+		private final String prefix;
+		private final int presuflength;
+		
+		public PrefixSuffixFilter(String prefix, String suffix) {
+			this.suffix = suffix;
+			this.prefix = prefix;
+			presuflength = suffix.length() + prefix.length();
+		}
+
+		@Override
+		public boolean accept(File pathname) {
+			String name = pathname.getName();
+
+			return pathname.isFile() && !pathname.isHidden() && name.length() >= presuflength && name.startsWith(prefix) && name.endsWith(suffix);
+		}
+	}
 
 }
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index f9bb3f6..66679f8 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -15,7 +15,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlow.ExecutableNode;
+import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutableFlow.FailureAction;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutableFlow.Status;
diff --git a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index d2b0df5..ab84381 100644
--- a/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/src/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -31,7 +31,10 @@ import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.NodeStatus;
 import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
@@ -50,6 +53,7 @@ import azkaban.utils.Props;
 import azkaban.utils.Utils;
 import azkaban.webapp.session.Session;
 import azkaban.webapp.servlet.MultipartParser;
+import azkaban.webapp.servlet.HistoryServlet.PageSelection;
 
 public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = 1;
@@ -98,6 +102,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 			else if (hasParam(req, "prop")) {
 				handlePropertyPage(req, resp, session);
 			}
+			else if (hasParam(req, "history")) {
+				handleJobHistoryPage(req, resp, session);
+			}
 			else if (hasParam(req, "job")) {
 				handleJobPage(req, resp, session);
 			}
@@ -553,6 +560,58 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 		page.render();
 	}
 	
+	private void handleJobHistoryPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/jobhistorypage.vm");
+		String projectId = getParam(req, "project");
+		String jobId = getParam(req, "job");
+		int pageNum = getIntParam(req, "page", 1);
+		int pageSize = getIntParam(req, "size", 25);
+		
+		page.add("projectId", projectId);
+		page.add("jobid", jobId);
+		page.add("page", pageNum);
+		
+		int skipPage = (pageNum - 1)*pageSize;
+
+		ArrayList<NodeStatus> statuses = new ArrayList<NodeStatus>();
+		int numResults = 0;
+		try {
+			numResults  = executorManager.getJobHistory(projectId, jobId, pageSize, skipPage, statuses);
+			if (statuses.isEmpty()) {
+				statuses = null;
+			}
+			page.add("history", statuses);
+			
+			if (pageNum == 1) {
+				page.add("previous", new PageSelection(1, pageSize, true, false));
+			}
+			page.add("next", new PageSelection(pageNum + 1, pageSize, false, false));
+
+		} catch (ExecutorManagerException e) {
+			page.add("errorMsg", e.getMessage());
+		}
+
+		// Now for the 5 other values.
+		int pageStartValue = 1;
+		if (pageNum > 3) {
+			pageStartValue = pageNum - 2;
+		}
+		int maxPage = (numResults / pageSize) + 1;
+		
+		page.add("page1", new PageSelection(pageStartValue, pageSize, pageStartValue > maxPage, pageStartValue == pageNum));
+		pageStartValue++;
+		page.add("page2", new PageSelection(pageStartValue, pageSize, pageStartValue > maxPage, pageStartValue == pageNum));
+		pageStartValue++;
+		page.add("page3", new PageSelection(pageStartValue, pageSize, pageStartValue > maxPage, pageStartValue == pageNum));
+		pageStartValue++;
+		page.add("page4", new PageSelection(pageStartValue, pageSize, pageStartValue > maxPage, pageStartValue == pageNum));
+		pageStartValue++;
+		page.add("page5", new PageSelection(pageStartValue, pageSize, pageStartValue > maxPage, pageStartValue == pageNum));
+		pageStartValue++;
+		
+		page.render();
+	}
+	
 	private void handlePermissionPage(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException {
 		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/permissionspage.vm");
 		String projectName = getParam(req, "project");
@@ -988,4 +1047,38 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
 			return node1.getLevel() - node2.getLevel();
 		}
 	}
+	
+	public class PageSelection {
+		private int page;
+		private int size;
+		private boolean disabled;
+		private boolean selected;
+		
+		public PageSelection(int page, int size, boolean disabled, boolean selected) {
+			this.page = page;
+			this.size = size;
+			this.disabled = disabled;
+			this.setSelected(selected);
+		}
+		
+		public int getPage() {
+			return page;
+		}
+		
+		public int getSize() {
+			return size;
+		}
+		
+		public boolean getDisabled() {
+			return disabled;
+		}
+
+		public boolean isSelected() {
+			return selected;
+		}
+
+		public void setSelected(boolean selected) {
+			this.selected = selected;
+		}
+	}
 }
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index 5bfa771..7d0aea7 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -68,7 +68,7 @@
 							<li><div id="pausebtn" class="btn2">Pause</div></li>
 							<li><div id="resumebtn" class="btn2">Resume</div></li>
 							<li><div id="cancelbtn" class="btn6">Cancel</div></li>
-							<li><div id="executebtn" class="btn1">Execute</div></li>
+							<li><div id="executebtn" class="btn1">Prepare Execution</div></li>
 						</ul>
 					</div>
 					<div id="graphView">
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 5655c00..edb47c6 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -54,7 +54,7 @@
 							<h4><a href="${context}/manager?project=${project.name}">Project <span>$project.name</span></a></h4>
 						</div>
 						
-						<div id="executebtn" class="btn1">Execute</div>
+						<div id="executebtn" class="btn1">Prepare Execution</div>
 						<div id="scheduleflowbtn" class="btn2 scheduleflow">Schedule Flow</div>
 					</div>
 					
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
new file mode 100644
index 0000000..059d5ce
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/jobhistorypage.vm
@@ -0,0 +1,103 @@
+<!DOCTYPE html> 
+<html>
+	<head>
+#parse( "azkaban/webapp/servlet/velocity/style.vm" )
+		<script type="text/javascript" src="${context}/js/jquery/jquery.js"></script>    
+		<script type="text/javascript" src="${context}/js/jqueryui/jquery-ui.custom.min.js"></script>   
+		<script type="text/javascript" src="${context}/js/namespace.js"></script>
+		<script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
+		<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
+		<script type="text/javascript" src="${context}/js/jquery.simplemodal.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+		<script type="text/javascript">
+			var contextURL = "${context}";
+			var currentTime = ${currentTime};
+			var timezone = "${timezone}";
+			var errorMessage = null;
+			var successMessage = null;
+			
+			var projectId = "$projectId";
+			var jobName = "$jobid";
+		</script>
+	</head>
+	<body>
+#set($current_page="all")
+#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
+		<div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>  
+
+#if($errorMsg)
+			<div class="box-error-message">$errorMsg</div>
+#else
+	<div class="content">
+	#if($error_message != "null")
+			<div class="box-error-message">$error_message</div>
+	#elseif($success_message != "null")
+			<div class="box-success-message">$success_message</div>
+	#end
+			<div id="all-jobs-content">
+				<div class="section-hd">
+					<h2><a href="${context}/manager?project=${projectId}&job=${jobid}&history">Job <span>$jobid</span> History</a></h2>
+					<div class="section-sub-hd">
+						<h4><a href="${context}/manager?project=${projectId}">Project <span>$projectId</span></a></h4>
+					</div>
+					
+				</div>
+			</div>
+			
+			<div class="executionInfo">
+				<table id="all-jobs" class="all-jobs job-table">
+					<thead>
+						<tr>
+							<th class="execid">Execution Id</th>
+							<th class="jobid">Job</th>
+							<th class="flowid">Flow</th>
+							<th class="date">Start Time</th>
+							<th class="date">End Time</th>
+							<th class="elapse">Elapse</th>		
+							<th class="status">Status</th>
+							<th class="logs">Logs</th>
+						</tr>
+					</thead>
+					<tbody>
+	#foreach($job in $history)
+						<tr>
+							<td class="first">
+								<a href="${context}/executor?execid=${job.execId}">$utils.extractNumericalId(${job.execId})</a>
+							</td>
+							<td>
+								<a href="${context}/manager?project=${projectId}&flow=${job.flowId}&job=${jobid}">$jobid</a>
+							</td>
+							<td>
+								<a href="${context}/manager?project=${projectId}&flow=${job.flowId}">${job.flowId}</a>
+							</td>
+							<td>$utils.formatDate(${job.startTime})</td>
+							<td>$utils.formatDate(${job.endTime})</td>
+							<td>$utils.formatDuration(${job.startTime}, ${job.endTime})</td>
+							<td><div class="status ${job.status}">$utils.formatStatus(${job.status})</div></td>
+							<td class="logLink">
+								<a href="${context}/executor?execid=${job.execId}&job=${jobid}">Logs</a>
+							</td>
+						</tr>
+	#end
+					</tbody>
+				</table>
+				
+				<div id="pageSelection" class="nonjavascript">
+					<ul>
+						<li id="previous" class="first"><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${previous.page}&size=${previous.size}"><span class="arrow">&larr;</span>Previous</a></li>
+				
+						<li id="page1" #if($page1.selected) class="selected" #elseif ($page1.disabled) class="disabled" #end><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${page1.page}&size=${page1.size}">${page1.page}</a></li>
+						<li id="page2" #if($page2.selected) class="selected" #elseif ($page2.disabled) class="disabled" #end><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${page2.page}&size=${page2.size}">${page2.page}</a></li>
+						<li id="page3" #if($page3.selected) class="selected" #elseif ($page3.disabled) class="disabled" #end><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${page3.page}&size=${page3.size}">${page3.page}</a></li>
+						<li id="page4" #if($page4.selected) class="selected" #elseif ($page4.disabled) class="disabled" #end><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${page4.page}&size=${page4.size}">${page4.page}</a></li>
+						<li id="page5" #if($page5.selected) class="selected" #elseif ($page5.disabled) class="disabled" #end><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${page5.page}&size=${page5.size}">${page5.page}</a></li>
+	
+						<li id="next"><a href="${context}/manager?project=${projectId}&job=${jobid}&history&page=${next.page}&size=${next.size}">Next<span class="arrow">&rarr;</span></a></li>
+					</ul>
+				</div>
+			</div>
+	</div>
+
+#end
+	</body>
+</html>
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
index 8b35553..9f47d4b 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobpage.vm
@@ -41,6 +41,8 @@
 						<h4 class="separator">&gt;</h4>
 						<h4><a href="${context}/manager?project=${project.name}&flow=${flowid}">Flow <span>$flowid</span></a></h4>
 					</div>
+					
+					<a id="jobs-logs-btn" class="btn2" href="${context}/manager?project=${project.name}&job=$jobid&history">Project Logs</a>
 				</div>
 			</div>
 			
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 207b6c1..aeeb316 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -365,7 +365,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		$(tdTimeline).append(outerProgressBar);
 		$(tdTimeline).addClass("timeline");
 
-		var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + node.id;
+		var requestURL = contextURL + "/manager?project=" + projectName + "&job=" + node.id + "&history";
 		var a = document.createElement("a");
 		$(a).attr("href", requestURL);
 		$(a).text(node.id);
@@ -376,7 +376,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		$(status).attr("id", node.id + "-status-div");
 		tdStatus.appendChild(status);
 
-		var logURL = contextURL + "/executor?execid=" + execId + "&flow=" + flowName + "&job=" + node.id;
+		var logURL = contextURL + "/executor?execid=" + execId + "&job=" + node.id;
 		var a = document.createElement("a");
 		$(a).attr("href", logURL);
 		$(a).text("Log");