azkaban-aplcache

Changes

Details

diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 50be4f6..a6ee307 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -110,12 +110,6 @@ public class AzkabanExecutorServer {
 		
 		configureMBeanServer();
 
-    File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
-    if (!statsDir.exists()) {
-      statsDir.mkdir();
-    }
-    props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-		
 		try {
 			server.start();
 		} 
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 94fb4c2..75a1a08 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import javax.servlet.ServletConfig;
@@ -47,7 +48,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 	private AzkabanExecutorServer application;
 	private FlowRunnerManager flowRunnerManager;
 
-	
 	public ExecutorServlet() {
 		super();
 	}
@@ -63,7 +63,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		flowRunnerManager = application.getFlowRunnerManager();
 	}
 
-	
 	protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
 		resp.setContentType(JSON_MIME_TYPE);
 		ObjectMapper mapper = new ObjectMapper();
@@ -100,6 +99,9 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 					else if (action.equals(LOG_ACTION)) { 
 						handleFetchLogEvent(execid, req, resp, respMap);
 					}
+					else if (action.equals(ATTACHMENTS_ACTION)) { 
+						handleFetchAttachmentsEvent(execid, req, resp, respMap);
+					}
 					else if (action.equals(EXECUTE_ACTION)) {
 						handleAjaxExecute(req, respMap, execid);
 					}
@@ -170,7 +172,11 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		}
 	}
 	
-	private void handleFetchLogEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
+	private void handleFetchLogEvent(
+			int execId, 
+			HttpServletRequest req, 
+			HttpServletResponse resp, 
+			Map<String, Object> respMap) throws ServletException {
 		String type = getParam(req, "type");
 		int startByte = getIntParam(req, "offset");
 		int length = getIntParam(req, "length");
@@ -200,6 +206,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			}
 		}
 	}
+
+	private void handleFetchAttachmentsEvent(
+			int execId, 
+			HttpServletRequest req, 
+			HttpServletResponse resp, 
+			Map<String, Object> respMap) throws ServletException {
+
+		String jobId = getParam(req, "jobId");
+		int attempt = getIntParam(req, "attempt", 0);
+		try {
+			List<Object> result = flowRunnerManager.readJobAttachments(
+					execId, jobId, attempt);
+			respMap.put("attachments", result);
+		}
+		catch (Exception e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
 	
 	private void handleFetchMetaDataEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
 		int startByte = getIntParam(req, "offset");
@@ -217,7 +242,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			logger.error(e);
 			respMap.put("error", e.getMessage());
 		}
-
 	}
 	
 	@SuppressWarnings("unchecked")
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index ba207c1..cab5f58 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -1013,9 +1013,21 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		return logFile;
 	}
+
+	public File getJobAttachmentFile(String jobId, int attempt) {
+		ExecutableNode node = flow.getExecutableNodePath(jobId);
+		File path = new File(execDir, node.getJobSource());
+
+		String attachmentFileName = JobRunner.createAttachmentFileName(node, attempt);
+		File attachmentFile = new File(path.getParentFile(), attachmentFileName);
+		if (!attachmentFile.exists()) {
+			return null;
+		}
+		return attachmentFile;
+	}
 	
 	public File getJobMetaDataFile(String jobId, int attempt) {
-		ExecutableNode node = flow.getExecutableNode(jobId);
+		ExecutableNode node = flow.getExecutableNodePath(jobId);
 		File path = new File(execDir, node.getJobSource());
 		
 		String metaDataFileName = JobRunner.createMetaDataFileName(node, attempt);
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index ecc0847..17d3ae3 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +51,7 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
@@ -137,7 +139,6 @@ public class FlowRunnerManager implements EventListener {
 		cleanerThread.start();
 		
 		jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
-		
 	}
 
 	private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
@@ -555,7 +556,7 @@ public class FlowRunnerManager implements EventListener {
 		File dir = runner.getExecutionDir();
 		if (dir != null && dir.exists()) {
 			try {
-				synchronized(executionDirDeletionSync) {
+				synchronized (executionDirDeletionSync) {
 					if (!dir.exists()) {
 						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
 					}
@@ -574,6 +575,39 @@ public class FlowRunnerManager implements EventListener {
 		
 		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
 	}
+
+	public List<Object> readJobAttachments(int execId, String jobId, int attempt)
+			throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		if (runner == null) {
+			throw new ExecutorManagerException(
+					"Running flow " + execId + " not found.");
+		}
+
+		File dir = runner.getExecutionDir();
+		if (dir == null || !dir.exists()) {
+			throw new ExecutorManagerException(
+					"Error reading file. Log directory doesn't exist.");
+		}
+
+		try {
+			synchronized (executionDirDeletionSync) {
+				if (!dir.exists()) {
+					throw new ExecutorManagerException(
+							"Execution dir file doesn't exist. Probably has beend deleted");
+				}
+
+				File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
+				if (attachmentFile == null || !attachmentFile.exists()) {
+					return null;
+				}
+				return (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile);
+			}
+		}
+		catch (IOException e) {
+			throw new ExecutorManagerException(e);
+		}
+	}
 	
 	public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(execId);
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 4fec88f..3de3d23 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -65,6 +65,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private Appender jobAppender;
 	private File logFile;
+	private String attachmentFileName;
 	
 	private Job job;
 	private int executionId = -1;
@@ -217,12 +218,19 @@ public class JobRunner extends EventHandler implements Runnable {
 				jobAppender = fileAppender;
 				logger.addAppender(jobAppender);
 				logger.setAdditivity(false);
-			} catch (IOException e) {
+			}
+			catch (IOException e) {
 				flowLogger.error("Could not open log file in " + workingDir + " for job " + this.jobId, e);
 			}
 		}
 	}
 
+	private void createAttachmentFile() {
+		String fileName = createAttachmentFileName(node);
+		File file = new File(workingDir, fileName);
+		attachmentFileName = file.getAbsolutePath();
+	}
+
 	private void closeLogger() {
 		if (jobAppender != null) {
 			logger.removeAppender(jobAppender);
@@ -234,7 +242,8 @@ public class JobRunner extends EventHandler implements Runnable {
 		try {
 			node.setUpdateTime(System.currentTimeMillis());
 			loader.updateExecutableNode(node);
-		} catch (ExecutorManagerException e) {
+		}
+		catch (ExecutorManagerException e) {
 			flowLogger.error("Could not update job properties in db for " + this.jobId, e);
 		}
 	}
@@ -296,7 +305,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			if (!blockingStatus.isEmpty()) {
 				logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
 				
-				for(BlockingStatus bStatus: blockingStatus) {
+				for (BlockingStatus bStatus: blockingStatus) {
 					logger.info("Waiting on pipelined job " + bStatus.getJobId());
 					currentBlockStatus = bStatus;
 					bStatus.blockOnFinishedStatus();
@@ -323,11 +332,12 @@ public class JobRunner extends EventHandler implements Runnable {
 		long currentTime = System.currentTimeMillis();
 		if (delayStartMs > 0) {
 			logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
-			synchronized(this) {
+			synchronized (this) {
 				try {
 					this.wait(delayStartMs);
 					logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
-				} catch (InterruptedException e) {
+				}
+				catch (InterruptedException e) {
 					logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
 				}
 			}
@@ -343,26 +353,45 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private void finalizeLogFile() {
 		closeLogger();
+		if (logFile == null) {
+			flowLogger.info("Log file for job " + this.jobId + " is null");
+			return;
+		}
 		
-		if (logFile != null) {
-			try {
-				File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
-					
-					@Override
-					public boolean accept(File dir, String name) {
-						return name.startsWith(logFile.getName());
-					}
-				} 
-				);
-				Arrays.sort(files, Collections.reverseOrder());
-				
-				loader.uploadLogFile(executionId, this.node.getNestedId(), node.getAttempt(), files);
-			} catch (ExecutorManagerException e) {
-				flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
+		try {
+			File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
+				@Override
+				public boolean accept(File dir, String name) {
+					return name.startsWith(logFile.getName());
+				}
+			});
+			Arrays.sort(files, Collections.reverseOrder());
+			
+			loader.uploadLogFile(executionId, this.node.getNestedId(), node.getAttempt(), files);
+		}
+		catch (ExecutorManagerException e) {
+			flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
+		}
+	}
+
+	private void finalizeAttachmentFile() {
+		if (attachmentFileName == null) {
+			flowLogger.info("Attachment file for job " + this.jobId + " is null");
+			return;
+		}
+
+		try {
+			File file = new File(attachmentFileName);
+			if (!file.exists()) {
+				flowLogger.info("No attachment file for job " + this.jobId + 
+						" written.");
+				return;
 			}
+			loader.uploadAttachmentFile(node, file);
 		}
-		else {
-			flowLogger.info("Log file for job " + this.jobId + " is null");
+		catch (ExecutorManagerException e) {
+			flowLogger.error("Error writing out attachment for job " + 
+					this.node.getNestedId(), e);
 		}
 	}
 	
@@ -379,12 +408,14 @@ public class JobRunner extends EventHandler implements Runnable {
 			return;
 		}
 
+		createAttachmentFile();
 		createLogger();
 		boolean errorFound = false;
 		// Delay execution if necessary. Will return a true if something went wrong.
 		errorFound |= delayExecution();
 
-		// For pipelining of jobs. Will watch other jobs. Will return true if something went wrong.
+		// For pipelining of jobs. Will watch other jobs. Will return true if
+		// something went wrong.
 		errorFound |= blockOnPipeLine();
 
 		// Start the node.
@@ -393,7 +424,8 @@ public class JobRunner extends EventHandler implements Runnable {
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			try {
 				loader.uploadExecutableNode(node, props);
-			} catch (ExecutorManagerException e1) {
+			}
+			catch (ExecutorManagerException e1) {
 				logger.error("Error writing initial node properties");
 			}
 			
@@ -418,6 +450,7 @@ public class JobRunner extends EventHandler implements Runnable {
 		
 		fireEvent(Event.create(this, Type.JOB_FINISHED), false);
 		finalizeLogFile();
+		finalizeAttachmentFile();
 	}
 	
 	private boolean prepareJob() throws RuntimeException {
@@ -427,7 +460,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			return false;
 		}
 		
-		synchronized(syncObject) {
+		synchronized (syncObject) {
 			if (node.getStatus() == Status.FAILED || cancelled) {
 				return false;
 			}
@@ -447,6 +480,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			
 			props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
 			props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(node));
+			props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, attachmentFileName);
 			changeStatus(Status.RUNNING);
 			
 			// Ability to specify working directory
@@ -454,9 +488,9 @@ public class JobRunner extends EventHandler implements Runnable {
 				props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
 			}
 			
-			if(props.containsKey("user.to.proxy")) {
+			if (props.containsKey("user.to.proxy")) {
 				String jobProxyUser = props.getString("user.to.proxy");
-				if(proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
+				if (proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
 					logger.error("User " + jobProxyUser + " has no permission to execute this job " + this.jobId + "!");
 					return false;
 				}
@@ -477,7 +511,8 @@ public class JobRunner extends EventHandler implements Runnable {
 	private void runJob() {
 		try {
 			job.run();
-		} catch (Exception e) {
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			
 			if (props.getBoolean("job.succeed.on.failure", false)) {
@@ -544,7 +579,8 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 			try {
 				job.cancel();
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				logError(e.getMessage());
 				logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
 			}
@@ -603,4 +639,21 @@ public class JobRunner extends EventHandler implements Runnable {
 	public static String createMetaDataFileName(ExecutableNode node) {
 		return JobRunner.createMetaDataFileName(node, node.getAttempt());
 	}
+
+	public static String createAttachmentFileName(ExecutableNode node) {
+		
+		return JobRunner.createAttachmentFileName(node, node.getAttempt());
+	}
+	
+	public static String createAttachmentFileName(ExecutableNode node, int attempt) {
+		int executionId = node.getExecutableFlow().getExecutionId();
+		String jobId = node.getId();
+		if (node.getExecutableFlow() != node.getParentFlow()) {
+			// Posix safe file delimiter
+			jobId = node.getPrintableId("._.");
+		}
+
+		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".attach" : "_job." + executionId + "." + jobId + ".attach";
+
+	}
 }
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 0c0c7e0..c84436e 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,6 +30,7 @@ public interface ConnectorParams {
 	public static final String RESUME_ACTION = "resume";
 	public static final String PING_ACTION = "ping";
 	public static final String LOG_ACTION = "log";
+	public static final String ATTACHMENTS_ACTION = "attachments";
 	public static final String METADATA_ACTION = "metadata";
 	
 	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index a6ccea5..bc33c0e 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -89,7 +89,8 @@ public class ExecutableFlow extends ExecutableFlowBase {
 	protected void setFlow(Project project, Flow flow) {
 		super.setFlow(project, flow);
 		executionOptions = new ExecutionOptions();
-
+		executionOptions.setMailCreator(flow.getMailCreator());
+		
 		if (flow.getSuccessEmails() != null) {
 			executionOptions.setSuccessEmails(flow.getSuccessEmails());
 		}
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index bb896a2..bacad6c 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -61,7 +61,7 @@ public class ExecutableNode {
 	
 	private Props inputProps;
 	private Props outputProps;
-	
+
 	public static final String ATTEMPT_PARAM = "attempt";
 	public static final String PASTATTEMPTS_PARAM = "pastAttempts";
 	
@@ -204,7 +204,7 @@ public class ExecutableNode {
 	public Props getOutputProps() {
 		return outputProps;
 	}
-	
+
 	public long getDelayedExecution() {
 		return delayExecution;
 	}
@@ -438,4 +438,5 @@ public class ExecutableNode {
 	public long getRetryBackoff() {
 		return inputProps.getLong("retry.backoff", 0);
 	}
-}
\ No newline at end of file
+}
+
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 04e0e44..4763ee5 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -47,7 +47,11 @@ public interface ExecutorLoader {
 
 	public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
 
+	public List<Object> fetchAttachments(int execId, String name, int attempt) throws ExecutorManagerException;
+
 	public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
+	
+	public void uploadAttachmentFile(ExecutableNode node, File file) throws ExecutorManagerException;
 
 	public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
 
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index fbeadb5..f2e979b 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -200,41 +200,41 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	
 	@Override
 	public List<ExecutableFlow> getExecutableFlows(
-      Project project, String flowId, int skip, int size) 
-      throws ExecutorManagerException {
+			Project project, String flowId, int skip, int size) 
+			throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
-        project.getId(), flowId, skip, size);
+				project.getId(), flowId, skip, size);
 		return flows;
 	}
 	
 	@Override
 	public List<ExecutableFlow> getExecutableFlows(int skip, int size) 
-      throws ExecutorManagerException {
+			throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
 		return flows;
 	}
 	
 	@Override
 	public List<ExecutableFlow> getExecutableFlows(
-      String flowIdContains, int skip, int size) 
-      throws ExecutorManagerException {
+			String flowIdContains, int skip, int size) 
+			throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
-        null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
+				null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
 		return flows;
 	}
 
 	@Override
 	public List<ExecutableFlow> getExecutableFlows(
-      String projContain, 
-      String flowContain, 
-      String userContain, 
-      int status, 
-      long begin, 
-      long end, 
-      int skip, 
-      int size) throws ExecutorManagerException {
+			String projContain, 
+			String flowContain, 
+			String userContain, 
+			int status, 
+			long begin, 
+			long end, 
+			int skip, 
+			int size) throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
-        projContain, flowContain, userContain, status, begin, end , skip, size);
+				projContain, flowContain, userContain, status, begin, end , skip, size);
 		return flows;
 	}
 	
@@ -273,8 +273,11 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	}
 	
 	@Override
-	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
-		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+	public LogData getExecutionJobLog(
+			ExecutableFlow exFlow, String jobId, int offset, int length, int attempt)
+			throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = 
+				runningFlows.get(exFlow.getExecutionId());
 		if (pair != null) {
 			Pair<String,String> typeParam = new Pair<String,String>("type", "job");
 			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
@@ -283,14 +286,45 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 			Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
 			
 			@SuppressWarnings("unchecked")
-			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+			Map<String, Object> result = callExecutorServer(
+					pair.getFirst(), 
+					ConnectorParams.LOG_ACTION, 
+					typeParam, 
+					jobIdParam, 
+					offsetParam, 
+					lengthParam, 
+					attemptParam);
 			return LogData.createLogDataFromObject(result);
 		}
 		else {
-			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
+			LogData value = executorLoader.fetchLogs(
+					exFlow.getExecutionId(), jobId, attempt, offset, length);
 			return value;
 		}
 	}
+
+	@Override
+	public List<Object> getExecutionJobStats(
+			ExecutableFlow exFlow, String jobId, int attempt)
+			throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = 
+				runningFlows.get(exFlow.getExecutionId());
+		if (pair == null) {
+			return executorLoader.fetchAttachments(
+					exFlow.getExecutionId(), jobId, attempt);
+		}
+
+		Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
+		Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+		
+		@SuppressWarnings("unchecked")
+		Map<String, Object> result = callExecutorServer(
+				pair.getFirst(),
+				ConnectorParams.ATTACHMENTS_ACTION,
+				jobIdParam,
+				attemptParam);
+		return (List<Object>) result.get("attachments");
+	}
 	
 	@Override
 	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
@@ -492,7 +526,7 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 			ExecutionReference reference = new ExecutionReference(exflow.getExecutionId(), executorHost, executorPort);
 			executorLoader.addActiveExecutableReference(reference);
 			try {
-				callExecutorServer(reference,  ConnectorParams.EXECUTE_ACTION);
+				callExecutorServer(reference,	ConnectorParams.EXECUTE_ACTION);
 				runningFlows.put(exflow.getExecutionId(), new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
 				
 				message += "Execution submitted successfully with exec id " + exflow.getExecutionId();
@@ -1114,23 +1148,23 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	
 	@Override
 	public int getExecutableFlows(
-      int projectId, 
-      String flowId, 
-      int from, 
-      int length, 
-      List<ExecutableFlow> outputList) throws ExecutorManagerException {
+			int projectId, 
+			String flowId, 
+			int from, 
+			int length, 
+			List<ExecutableFlow> outputList) throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
-        projectId, flowId, from, length);
+				projectId, flowId, from, length);
 		outputList.addAll(flows);
 		return executorLoader.fetchNumExecutableFlows(projectId, flowId);
 	}
 
 	@Override
 	public List<ExecutableFlow> getExecutableFlows(
-      int projectId, String flowId, int from, int length, Status status) 
-      throws ExecutorManagerException {
+			int projectId, String flowId, int from, int length, Status status) 
+			throws ExecutorManagerException {
 		return executorLoader.fetchFlowHistory(
-        projectId, flowId, from, length, status);
+				projectId, flowId, from, length, status);
 	}
 
 	/* 
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index cc81589..af64ec4 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -90,6 +90,8 @@ public interface ExecutorManagerAdapter{
 	public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException;
 	
 	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
+
+	public List<Object> getExecutionJobStats(ExecutableFlow exflow, String jobId, int attempt) throws ExecutorManagerException;
 	
 	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
 	
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 4736421..8c7a8ed 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -35,6 +35,7 @@ import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
@@ -702,6 +703,32 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 					"Error fetching logs " + execId + " : " + name, e);
 		}
 	}
+
+	@Override
+	public List<Object> fetchAttachments(int execId, String jobId, int attempt)
+			throws ExecutorManagerException {
+		QueryRunner runner = createQueryRunner();
+
+		try {
+			String attachments = runner.query(
+					FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
+					new FetchExecutableJobAttachmentsHandler(),
+					execId,
+					jobId);
+			if (attachments == null) {
+				return null;
+			}
+			return (List<Object>) JSONUtils.parseJSONFromString(attachments);
+		}
+		catch (IOException e) {
+			throw new ExecutorManagerException(
+					"Error converting job attachments to JSON " + jobId, e);
+		}
+		catch (SQLException e) {
+			throw new ExecutorManagerException(
+					"Error query job attachments " + jobId, e);
+		}
+	}
 	
 	@Override
 	public void uploadLogFile(
@@ -833,6 +860,50 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 				DateTime.now().getMillis());
 	}
 	
+	@Override
+	public void uploadAttachmentFile(ExecutableNode node, File file)
+			throws ExecutorManagerException {
+		Connection connection = getConnection();
+		try {
+			uploadAttachmentFile(connection, node, file, defaultEncodingType);
+			connection.commit();
+		}
+		catch (SQLException e) {
+			throw new ExecutorManagerException("Error committing attachments ", e);
+		}
+		catch (IOException e) {
+			throw new ExecutorManagerException("Error uploading attachments ", e);
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+
+	private void uploadAttachmentFile(
+			Connection connection,
+			ExecutableNode node,
+			File file,
+			EncodingType encType) throws SQLException, IOException {
+
+		String jsonString = FileUtils.readFileToString(file);
+		byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
+
+		final String UPDATE_EXECUTION_NODE_ATTACHMENTS = 
+				"UPDATE execution_jobs " +
+						"SET attachments=? " + 
+						"WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+
+		QueryRunner runner = new QueryRunner();
+		runner.update(
+				connection,
+				UPDATE_EXECUTION_NODE_ATTACHMENTS,
+				attachments,
+				node.getExecutableFlow().getExecutionId(),
+				node.getParentFlow().getNestedId(),
+				node.getId(),
+				node.getAttempt());
+	}
+	
 	private Connection getConnection() throws ExecutorManagerException {
 		Connection connection = null;
 		try {
@@ -972,6 +1043,30 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 			return execNodes;
 		}
 	}
+
+	private static class FetchExecutableJobAttachmentsHandler
+			implements ResultSetHandler<String> {
+		private static String FETCH_ATTACHMENTS_EXECUTABLE_NODE = 
+				"SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public String handle(ResultSet rs) throws SQLException {
+			String attachmentsJson = null;
+			if (rs.next()) {
+				try {
+					byte[] attachments = rs.getBytes(1);
+					if (attachments != null) {
+						attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
+					}
+				}
+				catch (IOException e) {
+					throw new SQLException("Error decoding job attachments", e);
+				}
+			}
+			return attachmentsJson;
+		}
+	}
 	
 	private static class FetchExecutableJobPropsHandler 
 			implements ResultSetHandler<Pair<Props, Props>> {
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 7324205..3873a56 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -75,6 +75,11 @@ public class CommonJobProperties {
 	 * The attempt number of the executing job.
 	 */
 	public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
+
+	/**
+	 * The attempt number of the executing job.
+	 */
+	public static final String JOB_ATTACHMENT_FILE = "azkaban.job.attachment.file";
 	
 	/**
 	 * The executing flow id
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 1ef57ef..6c275af 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -338,15 +338,16 @@ public class JobTypeManager
 		
 	}
 	
-	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
-	{
+	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) 
+			throws JobTypeManagerException {
 		Job job = null;
 		try {
 			String jobType = jobProps.getString("type");
 			if (jobType == null || jobType.length() == 0) {
 				/*throw an exception when job name is null or empty*/
-				throw new JobExecutionException (
-						String.format("The 'type' parameter for job[%s] is null or empty", jobProps, logger));
+				throw new JobExecutionException(
+						String.format("The 'type' parameter for job[%s] is null or empty",
+								jobProps, logger));
 			}
 			
 			logger.info("Building " + jobType + " job executor. ");
@@ -361,17 +362,16 @@ public class JobTypeManager
 			Props sysConf = jobtypeSysProps.get(jobType);
 			
 			Props jobConf = jobProps;
-			if(jobtypeJobProps.containsKey(jobType)) {
+			if (jobtypeJobProps.containsKey(jobType)) {
 				Props p = jobtypeJobProps.get(jobType);
-				for(String k : p.getKeySet())
-				{
-					if(!jobConf.containsKey(k)) {
+				for (String k : p.getKeySet()) {
+					if (!jobConf.containsKey(k)) {
 						jobConf.put(k, p.get(k));
 					}
 				}
 			}
 			jobConf = PropsUtils.resolveProps(jobConf);
-
+			
 			if (sysConf != null) {
 				sysConf = PropsUtils.resolveProps(sysConf);
 			}
@@ -379,11 +379,11 @@ public class JobTypeManager
 				sysConf = new Props();
 			}
 			
-			
 //			logger.info("sysConf is " + sysConf);
 //			logger.info("jobConf is " + jobConf);
 //			
-			job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
+			job = (Job) Utils.callConstructor(
+					executorClass, jobId, sysConf, jobConf, logger);
 		}
 		catch (Exception e) {
 			//job = new InitErrorJob(jobId, e);
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 52e8d7c..21460b2 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -93,8 +93,9 @@ import azkaban.webapp.servlet.HistoryServlet;
 import azkaban.webapp.servlet.ProjectServlet;
 import azkaban.webapp.servlet.ProjectManagerServlet;
 import azkaban.webapp.servlet.TriggerManagerServlet;
-import azkaban.webapp.servlet.TriggerPlugin;
-import azkaban.webapp.servlet.ViewerPlugin;
+import azkaban.webapp.plugin.TriggerPlugin;
+import azkaban.webapp.plugin.ViewerPlugin;
+import azkaban.webapp.plugin.PluginRegistry;
 import azkaban.webapp.session.SessionCache;
 
 /**
@@ -153,7 +154,6 @@ public class AzkabanWebServer extends AzkabanServer {
 	private Props props;
 	private SessionCache sessionCache;
 	private File tempDir;
-	private List<ViewerPlugin> viewerPlugins;
 	private Map<String, TriggerPlugin> triggerPlugins;
 	
 	private MBeanServer mbeanServer;
@@ -197,12 +197,6 @@ public class AzkabanWebServer extends AzkabanServer {
 		
 		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
 
-		File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
-		if (!statsDir.exists()) {
-			statsDir.mkdir();
-		}
-    props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-
 		// Setup time zone
 		if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
 			String timezone = props.getString(DEFAULT_TIMEZONE_ID);
@@ -215,10 +209,6 @@ public class AzkabanWebServer extends AzkabanServer {
 		configureMBeanServer();
 	}
 
-	private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
-		this.viewerPlugins = viewerPlugins;
-	}
-	
 	private void setTriggerPlugins(Map<String, TriggerPlugin> triggerPlugins) {
 		this.triggerPlugins = triggerPlugins;
 	}
@@ -775,7 +765,7 @@ public class AzkabanWebServer extends AzkabanServer {
 		root.addServlet(new ServletHolder(new TriggerManagerServlet()),"/triggers");
 		
 		String viewerPluginDir = azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
-		app.setViewerPlugins(loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine()));
+		loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine());
 		
 		// triggerplugin
 		String triggerPluginDir = azkabanSettings.getString("trigger.plugin.dir", "plugins/triggers");
@@ -963,13 +953,12 @@ public class AzkabanWebServer extends AzkabanServer {
 		return triggerPlugins;
 	}
 	
-	private static List<ViewerPlugin> loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
+	private static void loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
 		File viewerPluginPath = new File(pluginPath);
 		if (!viewerPluginPath.exists()) {
-			return Collections.<ViewerPlugin>emptyList();
+			return;
 		}
 			
-		ArrayList<ViewerPlugin> installedViewerPlugins = new ArrayList<ViewerPlugin>();
 		ClassLoader parentLoader = AzkabanWebServer.class.getClassLoader();
 		File[] pluginDirs = viewerPluginPath.listFiles();
 		ArrayList<String> jarPaths = new ArrayList<String>();
@@ -1011,6 +1000,7 @@ public class AzkabanWebServer extends AzkabanServer {
 			
 			String pluginName = pluginProps.getString("viewer.name");
 			String pluginWebPath = pluginProps.getString("viewer.path");
+			String pluginJobType = pluginProps.getString("viewer.jobtype", null);
 			int pluginOrder = pluginProps.getInt("viewer.order", 0);
 			boolean pluginHidden = pluginProps.getBoolean("viewer.hidden", false);
 			List<String> extLibClasspath = pluginProps.getStringList("viewer.external.classpaths", (List<String>)null);
@@ -1114,27 +1104,18 @@ public class AzkabanWebServer extends AzkabanServer {
 			
 			AbstractAzkabanServlet avServlet = (AbstractAzkabanServlet)obj;
 			root.addServlet(new ServletHolder(avServlet), "/" + pluginWebPath + "/*");
-			installedViewerPlugins.add(new ViewerPlugin(pluginName, pluginWebPath, pluginOrder, pluginHidden));
+			PluginRegistry.getRegistry().register(new ViewerPlugin(
+						pluginName, 
+						pluginWebPath, 
+						pluginOrder, 
+						pluginHidden,
+						pluginJobType));
 		}
 		
 		// Velocity needs the jar resource paths to be set.
 		String jarResourcePath = StringUtils.join(jarPaths, ", ");
 		logger.info("Setting jar resource path " + jarResourcePath);
 		ve.addProperty("jar.resource.loader.path", jarResourcePath);
-		
-		// Sort plugins based on order
-		Collections.sort(installedViewerPlugins, new Comparator<ViewerPlugin>() {
-			@Override
-			public int compare(ViewerPlugin o1, ViewerPlugin o2) {
-				return o1.getOrder() - o2.getOrder();
-			}
-		});
-		
-		return installedViewerPlugins;
-	}
-	
-	public List<ViewerPlugin> getViewerPlugins() {
-		return viewerPlugins;
 	}
 	
 	/**
diff --git a/src/java/azkaban/webapp/plugin/PluginRegistry.java b/src/java/azkaban/webapp/plugin/PluginRegistry.java
new file mode 100644
index 0000000..d49db54
--- /dev/null
+++ b/src/java/azkaban/webapp/plugin/PluginRegistry.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp.plugin;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.Set;
+
+public class PluginRegistry {
+
+  private static PluginRegistry registry;
+
+  public TreeSet<ViewerPlugin> viewerPlugins;
+
+	public Map<String, TreeSet<ViewerPlugin>> jobTypeViewerPlugins;
+
+  private PluginRegistry() {
+		viewerPlugins = new TreeSet<ViewerPlugin>(ViewerPlugin.COMPARATOR);
+		jobTypeViewerPlugins = new HashMap<String, TreeSet<ViewerPlugin>>();
+  }
+
+  public void register(ViewerPlugin plugin) {
+		viewerPlugins.add(plugin);
+		String jobType = plugin.getJobType();
+		if (jobType == null) {
+			return;
+		}
+		TreeSet<ViewerPlugin> plugins = null;
+		if (!jobTypeViewerPlugins.containsKey(jobType)) {
+			plugins = new TreeSet<ViewerPlugin>(ViewerPlugin.COMPARATOR);
+			plugins.add(plugin);
+			jobTypeViewerPlugins.put(jobType, plugins);
+		}
+		else {
+			plugins = jobTypeViewerPlugins.get(jobType);
+			plugins.add(plugin);
+		}
+  }
+
+	public List<ViewerPlugin> getViewerPlugins() {
+		return new ArrayList<ViewerPlugin>(viewerPlugins);
+	}
+
+	public List<ViewerPlugin> getViewerPluginsForJobType(String jobType) {
+		TreeSet<ViewerPlugin> plugins = jobTypeViewerPlugins.get(jobType);
+		if (plugins == null) {
+			return null;
+		}
+		return new ArrayList<ViewerPlugin>(plugins);
+	}
+
+  public static PluginRegistry getRegistry() {
+    if (registry == null) {
+      registry = new PluginRegistry();
+    }
+    return registry;
+  }
+}
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 8ac470e..2fc6ecc 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -39,6 +39,9 @@ import azkaban.utils.Props;
 import azkaban.webapp.AzkabanServer;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
+import azkaban.webapp.plugin.ViewerPlugin;
+import azkaban.webapp.plugin.TriggerPlugin;
+import azkaban.webapp.plugin.PluginRegistry;
 
 /**
  * Base Servlet for pages
@@ -91,7 +94,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 		
 		if (application instanceof AzkabanWebServer) {
 			AzkabanWebServer server = (AzkabanWebServer)application;
-			viewerPlugins = server.getViewerPlugins();
+			viewerPlugins = PluginRegistry.getRegistry().getViewerPlugins();
 			triggerPlugins = new ArrayList<TriggerPlugin>(server.getTriggerPlugins().values());
 		}
 	}
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index c875428..6f78ac8 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -17,7 +17,6 @@
 package azkaban.webapp.servlet;
 
 import java.io.IOException;
-import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -46,9 +45,10 @@ import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.user.Permission.Type;
 import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.JSONUtils;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
+import azkaban.webapp.plugin.PluginRegistry;
+import azkaban.webapp.plugin.ViewerPlugin;
 
 public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = 1L;
@@ -57,8 +57,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	private ScheduleManager scheduleManager;
 	private ExecutorVelocityHelper velocityHelper;
 
-  private String statsDir;
-
 	@Override
 	public void init(ServletConfig config) throws ServletException {
 		super.init(config);
@@ -67,7 +65,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		executorManager = server.getExecutorManager();
 		scheduleManager = server.getScheduleManager();
 		velocityHelper = new ExecutorVelocityHelper();
-    statsDir = server.getServerProps().getString("azkaban.stats.dir");
 	}
 
 	@Override
@@ -130,9 +127,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				else if (ajaxName.equals("fetchExecJobLogs")) {
 					ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
 				}
-        else if (ajaxName.equals("fetchExecJobStats")) {
-          ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
-        }
+				else if (ajaxName.equals("fetchExecJobStats")) {
+					ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
+				}
 				else if (ajaxName.equals("retryFailedJobs")) {
 					ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
 				}
@@ -189,6 +186,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		page.add("attempt", attempt);
 		
 		ExecutableFlow flow = null;
+		ExecutableNode node = null;
 		try {
 			flow = executorManager.getExecutableFlow(execId);
 			if (flow == null) {
@@ -196,7 +194,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				page.render();
 				return;
 			}
-		} catch (ExecutorManagerException e) {
+
+			node = flow.getExecutableNodePath(jobId);
+			if (node == null) {
+				page.add("errorMsg", "Job " + jobId + " doesn't exist in " + flow.getExecutionId());
+				return;
+			}
+		
+			List<ViewerPlugin> jobViewerPlugins = PluginRegistry.getRegistry()
+					.getViewerPluginsForJobType(node.getType());
+			page.add("jobViewerPlugins", jobViewerPlugins);
+		}
+		catch (ExecutorManagerException e) {
 			page.add("errorMsg", "Error loading executing flow: " + e.getMessage());
 			page.render();
 			return;
@@ -209,7 +218,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 		
-		ExecutableNode node = flow.getExecutableNodePath(jobId);
 		page.add("projectName", project.getName());
 		page.add("flowid", flow.getId());
 		page.add("parentflowid", node.getParentFlow().getFlowId());
@@ -456,35 +464,35 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 	
-	private void ajaxFetchJobStats(HttpServletRequest req,
-			HttpServletResponse resp, HashMap<String, Object> ret, User user,
+	private void ajaxFetchJobStats(
+			HttpServletRequest req, 
+			HttpServletResponse resp, 
+			HashMap<String, Object> ret, 
+			User user, 
 			ExecutableFlow exFlow) throws ServletException {
-		Project project = getProjectAjaxByPermission(ret,
-				exFlow.getProjectId(), user, Type.READ);
+		Project project = getProjectAjaxByPermission(
+				ret, exFlow.getProjectId(), user, Type.READ);
 		if (project == null) {
 			return;
 		}
 
 		String jobId = this.getParam(req, "jobid");
 		resp.setCharacterEncoding("utf-8");
-		String statsFilePath = null;
+
 		try {
-			ExecutableNode node = exFlow.getExecutableNode(jobId);
+			ExecutableNode node = exFlow.getExecutableNodePath(jobId);
 			if (node == null) {
-				ret.put("error",
-						"Job " + jobId + " doesn't exist in "
-								+ exFlow.getExecutionId());
+				ret.put("error", "Job " + jobId + " doesn't exist in " + 
+						exFlow.getExecutionId());
 				return;
 			}
 
-			statsFilePath = statsDir + "/" + exFlow.getExecutionId() + "-"
-					+ jobId + "-stats.json";
-			File statsFile = new File(statsFilePath);
-			@SuppressWarnings("unchecked")
-			List<Object> jsonObj = (ArrayList<Object>) JSONUtils.parseJSONFromFile(statsFile);
+			List<Object> jsonObj = executorManager.getExecutionJobStats(
+					exFlow, jobId, node.getAttempt());
 			ret.put("jobStats", jsonObj);
-		} catch (IOException e) {
-			ret.put("error", "Cannot open stats file: " + statsFilePath);
+		}
+		catch (ExecutorManagerException e) {
+			ret.put("error", "Error retrieving stats for job " + jobId);
 			return;
 		}
 	}
@@ -715,7 +723,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		Map<String, Object> map = getExecutableFlowUpdateInfo(exFlow, lastUpdateTime);
 		map.put("status", exFlow.getStatus());
 		map.put("startTime", exFlow.getStartTime());
-		map.put("endTime",  exFlow.getEndTime());
+		map.put("endTime",	exFlow.getEndTime());
 		map.put("updateTime", exFlow.getUpdateTime());
 		ret.putAll(map);
 	}
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index d944f4c..678c288 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -122,6 +122,7 @@
 						<thead>
 							<tr>
 								<th>Name</th>
+								<th class="jobtype">Type</th>
 								<th class="timeline">Timeline</th>
 								<th class="date">Start Time</th>
 								<th class="date">End Time</th>
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
new file mode 100644
index 0000000..5b172c8
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailsheader.vm
@@ -0,0 +1,62 @@
+#*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+	## Page header.
+
+		<div class="az-page-header">
+			<div class="container-full">
+				<div class="row">
+					<div class="col-xs-6">
+						<h1><a href="${context}/executor?execid=${execid}&job=${jobid}">Job Execution <small>$jobid</small></a></h1>
+					</div>
+					<div class="col-xs-6">
+						<div class="pull-right az-page-header-form">
+							<a href="${context}/manager?project=${projectName}&flow=${parentflowid}&job=$jobname" class="btn btn-info">Job Properties</a>
+						</div>
+					</div>
+				</div>
+			</div>
+		</div>
+	
+	
+    <div class="container-full">
+
+  #parse ("azkaban/webapp/servlet/velocity/alerts.vm")
+
+  ## Breadcrumb
+
+			<ol class="breadcrumb">
+				<li><a href="${context}/manager?project=${projectName}"><strong>Project</strong> $projectName</a></li>
+				<li><a href="${context}/manager?project=${projectName}&flow=${flowid}"><strong>Flow</strong> $flowid</a></li>
+				<li><a href="${context}/executor?execid=${execid}#jobslist"><strong>Execution</strong> $execid</a></li>
+				<li class="active"><strong>Job</strong> $jobid</li>
+			</ol>
+
+  ## Tabs
+
+			<ul class="nav nav-tabs" id="headertabs">
+	#if ($current_page == "executing")
+				<li id="jobLogViewLink"><a href="#logs">Job Logs</a></li>
+				<li id="jobSummaryViewLink"><a href="#summary">Summary</a></li>
+	#else
+				<li id="jobLogViewLink"><a href="${context}/executor?execid=${execid}&job=${jobid}#logs">Job Logs</a></li>
+				<li id="jobSummaryViewLink"><a href="${context}/executor?execid=${execid}&job=${jobid}#summary">Summary</a></li>
+	#end
+	#foreach ($jobViewerPlugin in $jobViewerPlugins)
+				<li#if($current_page == $jobViewerPlugin.pluginName) class="active"#end><a href="$!context/${jobViewerPlugin.pluginPath}?execid=${execid}&jobid=${jobid}">$jobViewerPlugin.pluginName</a></li>
+	#end
+			</ul>
+    </div>
diff --git a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
index 0d15964..2f377e5 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jobdetailspage.vm
@@ -47,8 +47,8 @@
   #parse ("azkaban/webapp/servlet/velocity/errormsg.vm")
 #else
 
-	## Page header.
 
+<<<<<<< HEAD
 		<div class="az-page-header">
 			<div class="container-full">
 				<div class="row">
@@ -85,6 +85,9 @@
 				<li><a href="${context}/pigvisualizer?execid=${execid}&jobid=${jobid}">Visualization</a></li>
 			</ul>
     </div>
+=======
+  #parse ("azkaban/webapp/servlet/velocity/jobdetailsheader.vm")
+>>>>>>> 00fce49bbc08509c17de9efbd475c9ca92311dfd
 
 	## Log content.
 
@@ -120,6 +123,11 @@
             </div>
           </h3>
 
+          <div id="jobType">
+            <table id="jobTypeTable" class="table table-striped table-bordered table-hover">
+            </table>
+          </div>
+
           <div id="command-summary">
             <h4>Command Summary</h4>
             <table id="commandTable" class="table table-striped table-bordered table-hover">
@@ -157,6 +165,13 @@
               </tbody>
             </table>
           </div>
+
+          <div id="jobIds">
+            <h4>Map Reduce Jobs</h4>
+            <table class="table table-striped table-bordered table-hover">
+              <tbody id="jobIdsTableBody"></tbody>
+            </table>
+          </div>
         </div>
       </div>
     </div>
diff --git a/src/less/tables.less b/src/less/tables.less
index 8cc5764..5d5aeea 100644
--- a/src/less/tables.less
+++ b/src/less/tables.less
@@ -9,10 +9,24 @@ table.table-properties {
   font-weight: bold;
 }
 
+.property-value {
+
+}
+
 .property-value-half {
   width: 25%;
 }
 
+.property-key,
+.property-value,
+.property-value-half {
+	pre {
+		background: transparent;
+		padding: 0;
+		border: 0;
+	}
+}
+
 .editable {
   .remove-btn {
     visibility: hidden;
@@ -77,6 +91,10 @@ table.table-properties {
       width: 160px;
     }
 
+    &.jobtype {
+      width: 90px;
+    }
+
     &.execid {
       width: 100px;
     }
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index 585e208..b54b0f5 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -8,8 +8,6 @@ azkaban.jobtype.plugin.dir=plugins/jobtypes
 executor.global.properties=conf/global.properties
 azkaban.project.dir=projects
 
-azkaban.stats.dir=stats
-
 database.type=mysql
 mysql.port=3306
 mysql.host=localhost
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index 9ca591e..7524a14 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -20,9 +20,6 @@ database.type=h2
 h2.path=data/azkaban
 h2.create.tables=true
 
-# Stats
-azkaban.stats.dir=stats
-
 # Velocity dev mode
 velocity.dev.mode=false
 
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index 97e40ae..3ccb2f3 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -22,8 +22,6 @@ mysql.user=azkaban
 mysql.password=azkaban
 mysql.numconnections=100
 
-azkaban.stats.dir=stats
-
 # Velocity dev mode
 velocity.dev.mode=false
 
diff --git a/src/web/js/azkaban/model/log-data.js b/src/web/js/azkaban/model/log-data.js
index 2fe3ac5..90a958f 100644
--- a/src/web/js/azkaban/model/log-data.js
+++ b/src/web/js/azkaban/model/log-data.js
@@ -24,6 +24,8 @@ azkaban.LogDataModel = Backbone.Model.extend({
     HIVE_MAP_REDUCE_JOBS_SUMMARY: "MapReduce Jobs Launched:",
     HIVE_MAP_REDUCE_SUMMARY_REGEX: /Job (\d+):\s+Map: (\d+)\s+Reduce: (\d+)\s+(?:Cumulative CPU: (.+?))?\s+HDFS Read: (\d+)\s+HDFS Write: (\d+)/,
 
+    JOB_ID_REGEX: /job_\d{12}_\d{4,}/,
+
     initialize: function() {
         this.set("offset", 0 );
         this.set("logData", "");
@@ -91,14 +93,19 @@ azkaban.LogDataModel = Backbone.Model.extend({
         var lines = data.split("\n");
 
         if (this.parseCommand(lines)) {
+            this.parseJobType(lines);
             this.parseJobTrackerUrls(lines);
 
-            var jobType = this.parseJobType(lines);
-            if (jobType.indexOf("pig") !== -1) {
-                this.parsePigTable(lines, "pigSummary", this.PIG_JOB_SUMMARY_START, "", 0);
-                this.parsePigTable(lines, "pigStats", this.PIG_JOB_STATS_START, "", 1);
-            } else if (jobType.indexOf("hive") !== -1) {
-                this.parseHiveQueries(lines);
+            var jobType = this.get("jobType");
+            if (jobType) {
+                if (jobType.indexOf("pig") !== -1) {
+                    this.parsePigTable(lines, "pigSummary", this.PIG_JOB_SUMMARY_START, "", 0);
+                    this.parsePigTable(lines, "pigStats", this.PIG_JOB_STATS_START, "", 1);
+                } else if (jobType.indexOf("hive") !== -1) {
+                    this.parseHiveQueries(lines);
+                } else {
+                    this.parseJobIds(lines);
+                }
             }
         }
     },
@@ -163,16 +170,32 @@ azkaban.LogDataModel = Backbone.Model.extend({
         this.set("jobTrackerUrlsOrdered", jobTrackerUrlsOrdered);
     },
 
+    parseJobIds: function(lines) {
+        var seenJobIds = {};
+        var jobIds = [];
+        var numLines = lines.length;
+        var match;
+        for (var i = 0; i < numLines; i++) {
+            if ((match = this.JOB_ID_REGEX.exec(lines[i])) && !seenJobIds[match[0]]) {
+                seenJobIds[match[0]] = true;
+                jobIds.push(match[0]);
+            }
+        }
+
+        if (jobIds.length > 0) {
+            this.set("jobIds", jobIds);
+        }
+    },
+
     parseJobType: function(lines) {
         var numLines = lines.length;
         var match;
-        for (var i = 0; numLines; i++) {
+        for (var i = 0; i < numLines; i++) {
             if (match = this.JOB_TYPE_REGEX.exec(lines[i])) {
-                return match[1];
+                this.set("jobType", match[1]);
+                break;
             }
         }
-        
-        return null;
     },
 
     parsePigTable: function(lines, tableName, startPattern, endPattern, linesToSkipAfterStart) {
@@ -262,14 +285,12 @@ azkaban.LogDataModel = Backbone.Model.extend({
                                     job.push("<a href='" + this.get("jobTrackerUrlsOrdered")[currMapReduceJob++] + "'>" + currJob + "</a>");
                                     job.push(match[2]);
                                     job.push(match[3]);
-                                    job.push(match[4]);
-                                    job.push(match[5]);
-                                    job.push(match[6]);
-
-                                    if (match[7]) {
+                                    if (match[4]) {
                                         this.set("hasCumulativeCPU", true);
-                                        job.push(match[7]);
+                                        job.push(match[4]);
                                     }
+                                    job.push(match[5]);
+                                    job.push(match[6]);
 
                                     queryJobs.push(job);
                                     previousJob = currJob;
diff --git a/src/web/js/azkaban/view/flow-execution-list.js b/src/web/js/azkaban/view/flow-execution-list.js
index 0f37929..0ec15a9 100644
--- a/src/web/js/azkaban/view/flow-execution-list.js
+++ b/src/web/js/azkaban/view/flow-execution-list.js
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 var executionListView;
 azkaban.ExecutionListView = Backbone.View.extend({
 	events: {
@@ -244,6 +260,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		var self = this;
 		var tr = document.createElement("tr");
 		var tdName = document.createElement("td");
+		var tdType = document.createElement("td");
 		var tdTimeline = document.createElement("td");
 		var tdStart = document.createElement("td");
 		var tdEnd = document.createElement("td");
@@ -255,6 +272,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		var padding = 15*$(body)[0].level;
 		
 		$(tr).append(tdName);
+		$(tr).append(tdType);
 		$(tr).append(tdTimeline);
 		$(tr).append(tdStart);
 		$(tr).append(tdEnd);
@@ -264,6 +282,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		$(tr).addClass("jobListRow");
 		
 		$(tdName).addClass("jobname");
+		$(tdType).addClass("jobtype");
 		if (padding) {
 			$(tdName).css("padding-left", padding);
 		}
@@ -273,6 +292,8 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		$(tdElapse).addClass("elapsedTime");
 		$(tdStatus).addClass("statustd");
 		$(tdDetails).addClass("details");
+
+		$(tdType).text(node.type);
 		
 		var outerProgressBar = document.createElement("div");
 		//$(outerProgressBar).attr("id", node.id + "-outerprogressbar");
@@ -321,7 +342,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		}
 
 		$(body).append(tr);
-		if (node.type=="flow") {
+		if (node.type == "flow") {
 			var subFlowRow = document.createElement("tr");
 			var subFlowCell = document.createElement("td");
 			$(subFlowCell).addClass("subflowrow");
diff --git a/src/web/js/azkaban/view/job-details.js b/src/web/js/azkaban/view/job-details.js
index ab77045..07b823e 100644
--- a/src/web/js/azkaban/view/job-details.js
+++ b/src/web/js/azkaban/view/job-details.js
@@ -45,15 +45,19 @@ azkaban.JobSummaryView = Backbone.View.extend({
 	},
 
 	initialize: function(settings) {
+		$("#jobType").hide();
 		$("#commandSummary").hide();
 		$("#pigJobSummary").hide();
 		$("#pigJobStats").hide();
 		$("#hiveJobSummary").hide();
+		$("#jobIds").hide();
 
+		this.listenTo(this.model, "change:jobType", this.renderJobTypeTable);
 		this.listenTo(this.model, "change:commandProperties", this.renderCommandTable);
 		this.listenTo(this.model, "change:pigSummary", this.renderPigSummaryTable);
 		this.listenTo(this.model, "change:pigStats", this.renderPigStatsTable);
 		this.listenTo(this.model, "change:hiveSummary", this.renderHiveTable);
+		this.listenTo(this.model, "change:jobIds", this.renderJobIdsTable);
 	},
 
 	refresh: function() {
@@ -65,6 +69,46 @@ azkaban.JobSummaryView = Backbone.View.extend({
 		renderJobTable(jobSummary.statTableHeaders, jobSummary.statTableData, "stats");
 		renderHiveTable(jobSummary.hiveQueries, jobSummary.hiveQueryJobs);
 	},
+
+	renderJobTypeTable: function() {
+		var jobTypeTable = $("#jobTypeTable");
+		var jobType = this.model.get("jobType");
+
+		var tr = document.createElement("tr");
+		var td = document.createElement("td");
+		$(td).html("<b>Job Type</b>");
+		$(tr).append(td);
+		td = document.createElement("td");
+		$(td).html(jobType);
+		$(tr).append(td);
+
+		jobTypeTable.append(tr);
+
+		$("#jobType").show();
+	},
+
+	renderJobIdsTable: function() {
+		var oldBody = $("#jobIdsTableBody");
+		var newBody = $(document.createElement("tbody")).attr("id", "jobIdsTableBody");
+
+		var jobIds = this.model.get("jobIds");
+		var jobUrls = this.model.get("jobTrackerUrls");
+		var numJobs = jobIds.length;
+		for (var i = 0; i < numJobs; i++) {
+			var job = jobIds[i];
+			var tr = document.createElement("tr");
+			var td = document.createElement("td");
+			var html = jobUrls[job] ? "<a href='" + jobUrls[job] + "'>" + job + "</a>" : job;
+			$(td).html(html);
+			$(tr).append(td);
+			newBody.append(tr);
+		}
+
+		oldBody.replaceWith(newBody);
+
+		$("#jobIds").show();
+	},
+
 	renderCommandTable: function() {
 		var commandTable = $("#commandTable");
 		var commandProperties = this.model.get("commandProperties");
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 05641ec..6c1246d 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -202,5 +202,19 @@ public class MockExecutorLoader implements ExecutorLoader {
 		return null;
 	}
 
+	@Override
+	public List<Object> fetchAttachments(int execId, String name, int attempt)
+			throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void uploadAttachmentFile(ExecutableNode node, File file)
+			throws ExecutorManagerException {
+		// TODO Auto-generated method stub
+		
+	}
+
 
 }
\ No newline at end of file