azkaban-developers

Merge pull request #110 from davidzchen/job-stats Store

1/17/2014 8:55:13 PM

Details

diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 50be4f6..a6ee307 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -110,12 +110,6 @@ public class AzkabanExecutorServer {
 		
 		configureMBeanServer();
 
-    File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
-    if (!statsDir.exists()) {
-      statsDir.mkdir();
-    }
-    props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-		
 		try {
 			server.start();
 		} 
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 94fb4c2..75a1a08 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import javax.servlet.ServletConfig;
@@ -47,7 +48,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 	private AzkabanExecutorServer application;
 	private FlowRunnerManager flowRunnerManager;
 
-	
 	public ExecutorServlet() {
 		super();
 	}
@@ -63,7 +63,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		flowRunnerManager = application.getFlowRunnerManager();
 	}
 
-	
 	protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
 		resp.setContentType(JSON_MIME_TYPE);
 		ObjectMapper mapper = new ObjectMapper();
@@ -100,6 +99,9 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 					else if (action.equals(LOG_ACTION)) { 
 						handleFetchLogEvent(execid, req, resp, respMap);
 					}
+					else if (action.equals(ATTACHMENTS_ACTION)) { 
+						handleFetchAttachmentsEvent(execid, req, resp, respMap);
+					}
 					else if (action.equals(EXECUTE_ACTION)) {
 						handleAjaxExecute(req, respMap, execid);
 					}
@@ -170,7 +172,11 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		}
 	}
 	
-	private void handleFetchLogEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
+	private void handleFetchLogEvent(
+			int execId, 
+			HttpServletRequest req, 
+			HttpServletResponse resp, 
+			Map<String, Object> respMap) throws ServletException {
 		String type = getParam(req, "type");
 		int startByte = getIntParam(req, "offset");
 		int length = getIntParam(req, "length");
@@ -200,6 +206,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			}
 		}
 	}
+
+	private void handleFetchAttachmentsEvent(
+			int execId, 
+			HttpServletRequest req, 
+			HttpServletResponse resp, 
+			Map<String, Object> respMap) throws ServletException {
+
+		String jobId = getParam(req, "jobId");
+		int attempt = getIntParam(req, "attempt", 0);
+		try {
+			List<Object> result = flowRunnerManager.readJobAttachments(
+					execId, jobId, attempt);
+			respMap.put("attachments", result);
+		}
+		catch (Exception e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
 	
 	private void handleFetchMetaDataEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
 		int startByte = getIntParam(req, "offset");
@@ -217,7 +242,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			logger.error(e);
 			respMap.put("error", e.getMessage());
 		}
-
 	}
 	
 	@SuppressWarnings("unchecked")
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 5f38882..db33b46 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -838,6 +838,19 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		return logFile;
 	}
+
+	public File getJobAttachmentFile(String jobId, int attempt) {
+		ExecutableNode node = flow.getExecutableNode(jobId);
+    File path = new File(execDir, node.getJobSource());
+
+    String attachmentFileName =
+        JobRunner.createAttachmentFileName(execId, jobId, attempt);
+    File attachmentFile = new File(path.getParentFile(), attachmentFileName);
+    if (!attachmentFile.exists()) {
+      return null;
+    }
+    return attachmentFile;
+	}
 	
 	public File getJobMetaDataFile(String jobId, int attempt) {
 		ExecutableNode node = flow.getExecutableNode(jobId);
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index ecc0847..17d3ae3 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +51,7 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
@@ -137,7 +139,6 @@ public class FlowRunnerManager implements EventListener {
 		cleanerThread.start();
 		
 		jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
-		
 	}
 
 	private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
@@ -555,7 +556,7 @@ public class FlowRunnerManager implements EventListener {
 		File dir = runner.getExecutionDir();
 		if (dir != null && dir.exists()) {
 			try {
-				synchronized(executionDirDeletionSync) {
+				synchronized (executionDirDeletionSync) {
 					if (!dir.exists()) {
 						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
 					}
@@ -574,6 +575,39 @@ public class FlowRunnerManager implements EventListener {
 		
 		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
 	}
+
+	public List<Object> readJobAttachments(int execId, String jobId, int attempt)
+			throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		if (runner == null) {
+			throw new ExecutorManagerException(
+					"Running flow " + execId + " not found.");
+		}
+
+		File dir = runner.getExecutionDir();
+		if (dir == null || !dir.exists()) {
+			throw new ExecutorManagerException(
+					"Error reading file. Log directory doesn't exist.");
+		}
+
+		try {
+			synchronized (executionDirDeletionSync) {
+				if (!dir.exists()) {
+					throw new ExecutorManagerException(
+							"Execution dir file doesn't exist. Probably has beend deleted");
+				}
+
+				File attachmentFile = runner.getJobAttachmentFile(jobId, attempt);
+				if (attachmentFile == null || !attachmentFile.exists()) {
+					return null;
+				}
+				return (ArrayList<Object>) JSONUtils.parseJSONFromFile(attachmentFile);
+			}
+		}
+		catch (IOException e) {
+			throw new ExecutorManagerException(e);
+		}
+	}
 	
 	public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(execId);
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 99bd8ab..211507f 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -65,6 +65,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private Appender jobAppender;
 	private File logFile;
+	private String attachmentFileName;
 	
 	private Job job;
 	private int executionId = -1;
@@ -222,12 +223,20 @@ public class JobRunner extends EventHandler implements Runnable {
 				jobAppender = fileAppender;
 				logger.addAppender(jobAppender);
 				logger.setAdditivity(false);
-			} catch (IOException e) {
+			}
+			catch (IOException e) {
 				flowLogger.error("Could not open log file in " + workingDir + " for job " + this.jobId, e);
 			}
 		}
 	}
 
+	private void createAttachmentFile() {
+		String fileName = createAttachmentFileName(
+				this.executionId, this.jobId, node.getAttempt());
+		File file = new File(workingDir, fileName);
+		attachmentFileName = file.getAbsolutePath();
+	}
+
 	private void closeLogger() {
 		if (jobAppender != null) {
 			logger.removeAppender(jobAppender);
@@ -239,7 +248,8 @@ public class JobRunner extends EventHandler implements Runnable {
 		try {
 			node.setUpdateTime(System.currentTimeMillis());
 			loader.updateExecutableNode(node);
-		} catch (ExecutorManagerException e) {
+		}
+		catch (ExecutorManagerException e) {
 			flowLogger.error("Could not update job properties in db for " + this.jobId, e);
 		}
 	}
@@ -301,7 +311,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			if (!blockingStatus.isEmpty()) {
 				logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
 				
-				for(BlockingStatus bStatus: blockingStatus) {
+				for (BlockingStatus bStatus: blockingStatus) {
 					logger.info("Waiting on pipelined job " + bStatus.getJobId());
 					currentBlockStatus = bStatus;
 					bStatus.blockOnFinishedStatus();
@@ -328,11 +338,12 @@ public class JobRunner extends EventHandler implements Runnable {
 		long currentTime = System.currentTimeMillis();
 		if (delayStartMs > 0) {
 			logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
-			synchronized(this) {
+			synchronized (this) {
 				try {
 					this.wait(delayStartMs);
 					logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
-				} catch (InterruptedException e) {
+				}
+				catch (InterruptedException e) {
 					logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
 				}
 			}
@@ -348,26 +359,45 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private void finalizeLogFile() {
 		closeLogger();
+		if (logFile == null) {
+			flowLogger.info("Log file for job " + this.jobId + " is null");
+			return;
+		}
 		
-		if (logFile != null) {
-			try {
-				File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
-					
-					@Override
-					public boolean accept(File dir, String name) {
-						return name.startsWith(logFile.getName());
-					}
-				} 
-				);
-				Arrays.sort(files, Collections.reverseOrder());
-				
-				loader.uploadLogFile(executionId, this.node.getNestedId(), node.getAttempt(), files);
-			} catch (ExecutorManagerException e) {
-				flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
+		try {
+			File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
+				@Override
+				public boolean accept(File dir, String name) {
+					return name.startsWith(logFile.getName());
+				}
+			});
+			Arrays.sort(files, Collections.reverseOrder());
+			
+			loader.uploadLogFile(executionId, this.node.getNestedId(), node.getAttempt(), files);
+		}
+		catch (ExecutorManagerException e) {
+			flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
+		}
+	}
+
+	private void finalizeAttachmentFile() {
+		if (attachmentFileName == null) {
+			flowLogger.info("Attachment file for job " + this.jobId + " is null");
+			return;
+		}
+
+		try {
+			File file = new File(attachmentFileName);
+			if (!file.exists()) {
+				flowLogger.info("No attachment file for job " + this.jobId + 
+						" written.");
+				return;
 			}
+			loader.uploadAttachmentFile(node, file);
 		}
-		else {
-			flowLogger.info("Log file for job " + this.jobId + " is null");
+		catch (ExecutorManagerException e) {
+			flowLogger.error("Error writing out attachment for job " + 
+					this.node.getNestedId(), e);
 		}
 	}
 	
@@ -384,12 +414,14 @@ public class JobRunner extends EventHandler implements Runnable {
 			return;
 		}
 
+		createAttachmentFile();
 		createLogger();
 		boolean errorFound = false;
 		// Delay execution if necessary. Will return a true if something went wrong.
 		errorFound |= delayExecution();
 
-		// For pipelining of jobs. Will watch other jobs. Will return true if something went wrong.
+		// For pipelining of jobs. Will watch other jobs. Will return true if
+		// something went wrong.
 		errorFound |= blockOnPipeLine();
 
 		// Start the node.
@@ -398,7 +430,8 @@ public class JobRunner extends EventHandler implements Runnable {
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			try {
 				loader.uploadExecutableNode(node, props);
-			} catch (ExecutorManagerException e1) {
+			}
+			catch (ExecutorManagerException e1) {
 				logger.error("Error writing initial node properties");
 			}
 			
@@ -423,6 +456,7 @@ public class JobRunner extends EventHandler implements Runnable {
 		
 		fireEvent(Event.create(this, Type.JOB_FINISHED), false);
 		finalizeLogFile();
+		finalizeAttachmentFile();
 	}
 	
 	private boolean prepareJob() throws RuntimeException {
@@ -432,7 +466,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			return false;
 		}
 		
-		synchronized(syncObject) {
+		synchronized (syncObject) {
 			if (node.getStatus() == Status.FAILED || cancelled) {
 				return false;
 			}
@@ -451,7 +485,9 @@ public class JobRunner extends EventHandler implements Runnable {
 			}
 			
 			props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
-			props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
+			props.put(CommonJobProperties.JOB_METADATA_FILE,
+					createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
+			props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, attachmentFileName);
 			changeStatus(Status.RUNNING);
 			
 			// Ability to specify working directory
@@ -459,9 +495,9 @@ public class JobRunner extends EventHandler implements Runnable {
 				props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
 			}
 			
-			if(props.containsKey("user.to.proxy")) {
+			if (props.containsKey("user.to.proxy")) {
 				String jobProxyUser = props.getString("user.to.proxy");
-				if(proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
+				if (proxyUsers != null && !proxyUsers.contains(jobProxyUser)) {
 					logger.error("User " + jobProxyUser + " has no permission to execute this job " + this.jobId + "!");
 					return false;
 				}
@@ -482,7 +518,8 @@ public class JobRunner extends EventHandler implements Runnable {
 	private void runJob() {
 		try {
 			job.run();
-		} catch (Exception e) {
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			
 			if (props.getBoolean("job.succeed.on.failure", false)) {
@@ -549,7 +586,8 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 			try {
 				job.cancel();
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				logError(e.getMessage());
 				logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
 			}
@@ -588,11 +626,24 @@ public class JobRunner extends EventHandler implements Runnable {
 		return props.getLong("retry.backoff", 0);
 	}
 	
-	public static String createLogFileName(int executionId, String jobId, int attempt) {
-		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
+	public static String createAttachmentFileName(
+			int executionId, String jobId, int attempt) {
+		return attempt > 0 
+				? "_job." + executionId + "." + attempt + "." + jobId + ".attach" 
+				: "_job." + executionId + "." + jobId + ".attach";
 	}
-	
-	public static String createMetaDataFileName(int executionId, String jobId, int attempt) {
-		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" : "_job." + executionId + "." + jobId + ".meta";
+
+	public static String createLogFileName(
+			int executionId, String jobId, int attempt) {
+		return attempt > 0 
+				? "_job." + executionId + "." + attempt + "." + jobId + ".log" 
+				: "_job." + executionId + "." + jobId + ".log";
+	}
+	
+	public static String createMetaDataFileName(
+			int executionId, String jobId, int attempt) {
+		return attempt > 0 
+				? "_job." + executionId + "." + attempt + "." + jobId + ".meta" 
+				: "_job." + executionId + "." + jobId + ".meta";
 	}
 }
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 0c0c7e0..c84436e 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,6 +30,7 @@ public interface ConnectorParams {
 	public static final String RESUME_ACTION = "resume";
 	public static final String PING_ACTION = "ping";
 	public static final String LOG_ACTION = "log";
+	public static final String ATTACHMENTS_ACTION = "attachments";
 	public static final String METADATA_ACTION = "metadata";
 	
 	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 9d807ff..5fbeed2 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -61,7 +61,7 @@ public class ExecutableNode {
 	
 	private Props inputProps;
 	private Props outputProps;
-	
+
 	public static final String ATTEMPT_PARAM = "attempt";
 	public static final String PASTATTEMPTS_PARAM = "pastAttempts";
 	
@@ -204,7 +204,7 @@ public class ExecutableNode {
 	public Props getOutputProps() {
 		return outputProps;
 	}
-	
+
 	public long getDelayedExecution() {
 		return delayExecution;
 	}
@@ -428,4 +428,4 @@ public class ExecutableNode {
 			}
 		}
 	}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 04e0e44..4763ee5 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -47,7 +47,11 @@ public interface ExecutorLoader {
 
 	public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
 
+	public List<Object> fetchAttachments(int execId, String name, int attempt) throws ExecutorManagerException;
+
 	public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
+	
+	public void uploadAttachmentFile(ExecutableNode node, File file) throws ExecutorManagerException;
 
 	public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
 
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index cb763f5..8b71e29 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -201,41 +201,41 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	
 	@Override
 	public List<ExecutableFlow> getExecutableFlows(
-      Project project, String flowId, int skip, int size) 
-      throws ExecutorManagerException {
+			Project project, String flowId, int skip, int size) 
+			throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
-        project.getId(), flowId, skip, size);
+				project.getId(), flowId, skip, size);
 		return flows;
 	}
 	
 	@Override
 	public List<ExecutableFlow> getExecutableFlows(int skip, int size) 
-      throws ExecutorManagerException {
+			throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(skip, size);
 		return flows;
 	}
 	
 	@Override
 	public List<ExecutableFlow> getExecutableFlows(
-      String flowIdContains, int skip, int size) 
-      throws ExecutorManagerException {
+			String flowIdContains, int skip, int size) 
+			throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
-        null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
+				null, '%'+flowIdContains+'%', null, 0, -1, -1 , skip, size);
 		return flows;
 	}
 
 	@Override
 	public List<ExecutableFlow> getExecutableFlows(
-      String projContain, 
-      String flowContain, 
-      String userContain, 
-      int status, 
-      long begin, 
-      long end, 
-      int skip, 
-      int size) throws ExecutorManagerException {
+			String projContain, 
+			String flowContain, 
+			String userContain, 
+			int status, 
+			long begin, 
+			long end, 
+			int skip, 
+			int size) throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
-        projContain, flowContain, userContain, status, begin, end , skip, size);
+				projContain, flowContain, userContain, status, begin, end , skip, size);
 		return flows;
 	}
 	
@@ -274,8 +274,11 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	}
 	
 	@Override
-	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
-		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+	public LogData getExecutionJobLog(
+			ExecutableFlow exFlow, String jobId, int offset, int length, int attempt)
+			throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = 
+				runningFlows.get(exFlow.getExecutionId());
 		if (pair != null) {
 			Pair<String,String> typeParam = new Pair<String,String>("type", "job");
 			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
@@ -284,14 +287,45 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 			Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
 			
 			@SuppressWarnings("unchecked")
-			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+			Map<String, Object> result = callExecutorServer(
+					pair.getFirst(), 
+					ConnectorParams.LOG_ACTION, 
+					typeParam, 
+					jobIdParam, 
+					offsetParam, 
+					lengthParam, 
+					attemptParam);
 			return LogData.createLogDataFromObject(result);
 		}
 		else {
-			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
+			LogData value = executorLoader.fetchLogs(
+					exFlow.getExecutionId(), jobId, attempt, offset, length);
 			return value;
 		}
 	}
+
+	@Override
+	public List<Object> getExecutionJobStats(
+			ExecutableFlow exFlow, String jobId, int attempt)
+			throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = 
+				runningFlows.get(exFlow.getExecutionId());
+		if (pair == null) {
+			return executorLoader.fetchAttachments(
+					exFlow.getExecutionId(), jobId, attempt);
+		}
+
+		Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
+		Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+		
+		@SuppressWarnings("unchecked")
+		Map<String, Object> result = callExecutorServer(
+				pair.getFirst(),
+				ConnectorParams.ATTACHMENTS_ACTION,
+				jobIdParam,
+				attemptParam);
+		return (List<Object>) result.get("attachments");
+	}
 	
 	@Override
 	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
@@ -493,7 +527,7 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 			ExecutionReference reference = new ExecutionReference(exflow.getExecutionId(), executorHost, executorPort);
 			executorLoader.addActiveExecutableReference(reference);
 			try {
-				callExecutorServer(reference,  ConnectorParams.EXECUTE_ACTION);
+				callExecutorServer(reference,	ConnectorParams.EXECUTE_ACTION);
 				runningFlows.put(exflow.getExecutionId(), new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
 				
 				message += "Execution submitted successfully with exec id " + exflow.getExecutionId();
@@ -1115,23 +1149,23 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	
 	@Override
 	public int getExecutableFlows(
-      int projectId, 
-      String flowId, 
-      int from, 
-      int length, 
-      List<ExecutableFlow> outputList) throws ExecutorManagerException {
+			int projectId, 
+			String flowId, 
+			int from, 
+			int length, 
+			List<ExecutableFlow> outputList) throws ExecutorManagerException {
 		List<ExecutableFlow> flows = executorLoader.fetchFlowHistory(
-        projectId, flowId, from, length);
+				projectId, flowId, from, length);
 		outputList.addAll(flows);
 		return executorLoader.fetchNumExecutableFlows(projectId, flowId);
 	}
 
 	@Override
 	public List<ExecutableFlow> getExecutableFlows(
-      int projectId, String flowId, int from, int length, Status status) 
-      throws ExecutorManagerException {
+			int projectId, String flowId, int from, int length, Status status) 
+			throws ExecutorManagerException {
 		return executorLoader.fetchFlowHistory(
-        projectId, flowId, from, length, status);
+				projectId, flowId, from, length, status);
 	}
 
 	/* 
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index cc81589..af64ec4 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -90,6 +90,8 @@ public interface ExecutorManagerAdapter{
 	public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException;
 	
 	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
+
+	public List<Object> getExecutionJobStats(ExecutableFlow exflow, String jobId, int attempt) throws ExecutorManagerException;
 	
 	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
 	
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 70b73da..e756cfb 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -35,6 +35,7 @@ import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
@@ -707,6 +708,32 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 					"Error fetching logs " + execId + " : " + name, e);
 		}
 	}
+
+	@Override
+	public List<Object> fetchAttachments(int execId, String jobId, int attempt)
+			throws ExecutorManagerException {
+		QueryRunner runner = createQueryRunner();
+
+		try {
+			String attachments = runner.query(
+					FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
+					new FetchExecutableJobAttachmentsHandler(),
+					execId,
+					jobId);
+			if (attachments == null) {
+				return null;
+			}
+			return (List<Object>) JSONUtils.parseJSONFromString(attachments);
+		}
+		catch (IOException e) {
+			throw new ExecutorManagerException(
+					"Error converting job attachments to JSON " + jobId, e);
+		}
+		catch (SQLException e) {
+			throw new ExecutorManagerException(
+					"Error query job attachments " + jobId, e);
+		}
+	}
 	
 	@Override
 	public void uploadLogFile(
@@ -838,6 +865,50 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 				DateTime.now().getMillis());
 	}
 	
+	@Override
+	public void uploadAttachmentFile(ExecutableNode node, File file)
+			throws ExecutorManagerException {
+		Connection connection = getConnection();
+		try {
+			uploadAttachmentFile(connection, node, file, defaultEncodingType);
+			connection.commit();
+		}
+		catch (SQLException e) {
+			throw new ExecutorManagerException("Error committing attachments ", e);
+		}
+		catch (IOException e) {
+			throw new ExecutorManagerException("Error uploading attachments ", e);
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+
+	private void uploadAttachmentFile(
+			Connection connection,
+			ExecutableNode node,
+			File file,
+			EncodingType encType) throws SQLException, IOException {
+
+		String jsonString = FileUtils.readFileToString(file);
+		byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
+
+		final String UPDATE_EXECUTION_NODE_ATTACHMENTS = 
+				"UPDATE execution_jobs " +
+						"SET attachments=? " + 
+						"WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+
+		QueryRunner runner = new QueryRunner();
+		runner.update(
+				connection,
+				UPDATE_EXECUTION_NODE_ATTACHMENTS,
+				attachments,
+				node.getExecutableFlow().getExecutionId(),
+				node.getParentFlow().getNestedId(),
+				node.getId(),
+				node.getAttempt());
+	}
+	
 	private Connection getConnection() throws ExecutorManagerException {
 		Connection connection = null;
 		try {
@@ -977,6 +1048,30 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 			return execNodes;
 		}
 	}
+
+	private static class FetchExecutableJobAttachmentsHandler
+			implements ResultSetHandler<String> {
+		private static String FETCH_ATTACHMENTS_EXECUTABLE_NODE = 
+				"SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public String handle(ResultSet rs) throws SQLException {
+			String attachmentsJson = null;
+			if (rs.next()) {
+				try {
+					byte[] attachments = rs.getBytes(1);
+					if (attachments != null) {
+						attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
+					}
+				}
+				catch (IOException e) {
+					throw new SQLException("Error decoding job attachments", e);
+				}
+			}
+			return attachmentsJson;
+		}
+	}
 	
 	private static class FetchExecutableJobPropsHandler 
 			implements ResultSetHandler<Pair<Props, Props>> {
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 7324205..3873a56 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -75,6 +75,11 @@ public class CommonJobProperties {
 	 * The attempt number of the executing job.
 	 */
 	public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
+
+	/**
+	 * The attempt number of the executing job.
+	 */
+	public static final String JOB_ATTACHMENT_FILE = "azkaban.job.attachment.file";
 	
 	/**
 	 * The executing flow id
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 1ef57ef..6c275af 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -338,15 +338,16 @@ public class JobTypeManager
 		
 	}
 	
-	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) throws JobTypeManagerException
-	{
+	public Job buildJobExecutor(String jobId, Props jobProps, Logger logger) 
+			throws JobTypeManagerException {
 		Job job = null;
 		try {
 			String jobType = jobProps.getString("type");
 			if (jobType == null || jobType.length() == 0) {
 				/*throw an exception when job name is null or empty*/
-				throw new JobExecutionException (
-						String.format("The 'type' parameter for job[%s] is null or empty", jobProps, logger));
+				throw new JobExecutionException(
+						String.format("The 'type' parameter for job[%s] is null or empty",
+								jobProps, logger));
 			}
 			
 			logger.info("Building " + jobType + " job executor. ");
@@ -361,17 +362,16 @@ public class JobTypeManager
 			Props sysConf = jobtypeSysProps.get(jobType);
 			
 			Props jobConf = jobProps;
-			if(jobtypeJobProps.containsKey(jobType)) {
+			if (jobtypeJobProps.containsKey(jobType)) {
 				Props p = jobtypeJobProps.get(jobType);
-				for(String k : p.getKeySet())
-				{
-					if(!jobConf.containsKey(k)) {
+				for (String k : p.getKeySet()) {
+					if (!jobConf.containsKey(k)) {
 						jobConf.put(k, p.get(k));
 					}
 				}
 			}
 			jobConf = PropsUtils.resolveProps(jobConf);
-
+			
 			if (sysConf != null) {
 				sysConf = PropsUtils.resolveProps(sysConf);
 			}
@@ -379,11 +379,11 @@ public class JobTypeManager
 				sysConf = new Props();
 			}
 			
-			
 //			logger.info("sysConf is " + sysConf);
 //			logger.info("jobConf is " + jobConf);
 //			
-			job = (Job)Utils.callConstructor(executorClass, jobId, sysConf, jobConf, logger);
+			job = (Job) Utils.callConstructor(
+					executorClass, jobId, sysConf, jobConf, logger);
 		}
 		catch (Exception e) {
 			//job = new InitErrorJob(jobId, e);
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index b6d961b..21460b2 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -197,12 +197,6 @@ public class AzkabanWebServer extends AzkabanServer {
 		
 		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
 
-		File statsDir = new File(props.getString("azkaban.stats.dir", "stats"));
-		if (!statsDir.exists()) {
-			statsDir.mkdir();
-		}
-    props.put("azkaban.stats.dir", statsDir.getCanonicalPath());
-
 		// Setup time zone
 		if (props.containsKey(DEFAULT_TIMEZONE_ID)) {
 			String timezone = props.getString(DEFAULT_TIMEZONE_ID);
diff --git a/src/java/azkaban/webapp/plugin/ViewerPlugin.java b/src/java/azkaban/webapp/plugin/ViewerPlugin.java
index 1ea864b..9e167ff 100644
--- a/src/java/azkaban/webapp/plugin/ViewerPlugin.java
+++ b/src/java/azkaban/webapp/plugin/ViewerPlugin.java
@@ -29,7 +29,10 @@ public class ViewerPlugin {
 			new Comparator<ViewerPlugin>() {
 		@Override
 		public int compare(ViewerPlugin o1, ViewerPlugin o2) {
-			return o1.getOrder() - o2.getOrder();
+			if (o1.getOrder() != o2.getOrder()) {
+				return o1.getOrder() - o2.getOrder();
+			}
+			return o1.getPluginName().compareTo(o2.getPluginName());
 		}
 	};
 
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index e154480..bcab22d 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -60,8 +60,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 	private ScheduleManager scheduleManager;
 	private ExecutorVelocityHelper velocityHelper;
 
-  private String statsDir;
-
 	@Override
 	public void init(ServletConfig config) throws ServletException {
 		super.init(config);
@@ -70,7 +68,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		executorManager = server.getExecutorManager();
 		scheduleManager = server.getScheduleManager();
 		velocityHelper = new ExecutorVelocityHelper();
-    statsDir = server.getServerProps().getString("azkaban.stats.dir");
 	}
 
 	@Override
@@ -133,9 +130,9 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 				else if (ajaxName.equals("fetchExecJobLogs")) {
 					ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow);
 				}
-        else if (ajaxName.equals("fetchExecJobStats")) {
-          ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
-        }
+				else if (ajaxName.equals("fetchExecJobStats")) {
+					ajaxFetchJobStats(req, resp, ret, session.getUser(), exFlow);
+				}
 				else if (ajaxName.equals("retryFailedJobs")) {
 					ajaxRestartFailed(req, resp, ret, session.getUser(), exFlow);
 				}
@@ -467,41 +464,37 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 	
-  private void ajaxFetchJobStats(
-      HttpServletRequest req, 
-      HttpServletResponse resp, 
-      HashMap<String, Object> ret, 
-      User user, 
-      ExecutableFlow exFlow) throws ServletException {
+	private void ajaxFetchJobStats(
+			HttpServletRequest req, 
+			HttpServletResponse resp, 
+			HashMap<String, Object> ret, 
+			User user, 
+			ExecutableFlow exFlow) throws ServletException {
 		Project project = getProjectAjaxByPermission(
-        ret, exFlow.getProjectId(), user, Type.READ);
+				ret, exFlow.getProjectId(), user, Type.READ);
 		if (project == null) {
 			return;
 		}
 		
 		String jobId = this.getParam(req, "jobid");
 		resp.setCharacterEncoding("utf-8");
-    String statsFilePath = null;
 		try {
 			ExecutableNode node = exFlow.getExecutableNode(jobId);
 			if (node == null) {
 				ret.put("error", "Job " + jobId + " doesn't exist in " + 
-            exFlow.getExecutionId());
+						exFlow.getExecutionId());
 				return;
 			}
-	
-      statsFilePath = statsDir + "/" + exFlow.getExecutionId() + "-" + 
-          jobId + "-stats.json";
-      File statsFile = new File(statsFilePath);
-      List<Object> jsonObj = 
-          (ArrayList<Object>) JSONUtils.parseJSONFromFile(statsFile);
-      ret.put("jobStats", jsonObj);
-    }
-    catch (IOException e) {
-      ret.put("error", "Cannot open stats file: " + statsFilePath);
-      return;
-		}
-  }
+
+			List<Object> jsonObj = executorManager.getExecutionJobStats(
+					exFlow, jobId, node.getAttempt());
+			ret.put("jobStats", jsonObj);
+		}
+		catch (ExecutorManagerException e) {
+			ret.put("error", "Error retrieving stats for job " + jobId);
+			return;
+		}
+	}
 
 	private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
 		Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
@@ -729,7 +722,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		Map<String, Object> map = getExecutableFlowUpdateInfo(exFlow, lastUpdateTime);
 		map.put("status", exFlow.getStatus());
 		map.put("startTime", exFlow.getStartTime());
-		map.put("endTime",  exFlow.getEndTime());
+		map.put("endTime",	exFlow.getEndTime());
 		map.put("updateTime", exFlow.getUpdateTime());
 		ret.putAll(map);
 	}
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index d944f4c..678c288 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -122,6 +122,7 @@
 						<thead>
 							<tr>
 								<th>Name</th>
+								<th class="jobtype">Type</th>
 								<th class="timeline">Timeline</th>
 								<th class="date">Start Time</th>
 								<th class="date">End Time</th>
diff --git a/src/less/tables.less b/src/less/tables.less
index d96815f..ba34cb4 100644
--- a/src/less/tables.less
+++ b/src/less/tables.less
@@ -9,10 +9,24 @@ table.table-properties {
   font-weight: bold;
 }
 
+.property-value {
+
+}
+
 .property-value-half {
   width: 25%;
 }
 
+.property-key,
+.property-value,
+.property-value-half {
+	pre {
+		background: transparent;
+		padding: 0;
+		border: 0;
+	}
+}
+
 .editable {
   .remove-btn {
     visibility: hidden;
@@ -77,6 +91,10 @@ table.table-properties {
       width: 160px;
     }
 
+    &.jobtype {
+      width: 90px;
+    }
+
     &.execid {
       width: 100px;
     }
diff --git a/src/package/execserver/conf/azkaban.properties b/src/package/execserver/conf/azkaban.properties
index 585e208..b54b0f5 100644
--- a/src/package/execserver/conf/azkaban.properties
+++ b/src/package/execserver/conf/azkaban.properties
@@ -8,8 +8,6 @@ azkaban.jobtype.plugin.dir=plugins/jobtypes
 executor.global.properties=conf/global.properties
 azkaban.project.dir=projects
 
-azkaban.stats.dir=stats
-
 database.type=mysql
 mysql.port=3306
 mysql.host=localhost
diff --git a/src/package/soloserver/conf/azkaban.properties b/src/package/soloserver/conf/azkaban.properties
index 9ca591e..7524a14 100644
--- a/src/package/soloserver/conf/azkaban.properties
+++ b/src/package/soloserver/conf/azkaban.properties
@@ -20,9 +20,6 @@ database.type=h2
 h2.path=data/azkaban
 h2.create.tables=true
 
-# Stats
-azkaban.stats.dir=stats
-
 # Velocity dev mode
 velocity.dev.mode=false
 
diff --git a/src/package/webserver/conf/azkaban.properties b/src/package/webserver/conf/azkaban.properties
index 97e40ae..3ccb2f3 100644
--- a/src/package/webserver/conf/azkaban.properties
+++ b/src/package/webserver/conf/azkaban.properties
@@ -22,8 +22,6 @@ mysql.user=azkaban
 mysql.password=azkaban
 mysql.numconnections=100
 
-azkaban.stats.dir=stats
-
 # Velocity dev mode
 velocity.dev.mode=false
 
diff --git a/src/web/js/azkaban/view/flow-execution-list.js b/src/web/js/azkaban/view/flow-execution-list.js
index 69621a3..2a0ce22 100644
--- a/src/web/js/azkaban/view/flow-execution-list.js
+++ b/src/web/js/azkaban/view/flow-execution-list.js
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2014 LinkedIn Corp.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 var executionListView;
 azkaban.ExecutionListView = Backbone.View.extend({
 	events: {
@@ -241,6 +257,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		var self = this;
 		var tr = document.createElement("tr");
 		var tdName = document.createElement("td");
+		var tdType = document.createElement("td");
 		var tdTimeline = document.createElement("td");
 		var tdStart = document.createElement("td");
 		var tdEnd = document.createElement("td");
@@ -252,6 +269,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		var padding = 15*$(body)[0].level;
 		
 		$(tr).append(tdName);
+		$(tr).append(tdType);
 		$(tr).append(tdTimeline);
 		$(tr).append(tdStart);
 		$(tr).append(tdEnd);
@@ -261,6 +279,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		$(tr).addClass("jobListRow");
 		
 		$(tdName).addClass("jobname");
+		$(tdType).addClass("jobtype");
 		if (padding) {
 			$(tdName).css("padding-left", padding);
 		}
@@ -270,6 +289,8 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		$(tdElapse).addClass("elapsedTime");
 		$(tdStatus).addClass("statustd");
 		$(tdDetails).addClass("details");
+
+		$(tdType).text(node.type);
 		
 		var outerProgressBar = document.createElement("div");
 		//$(outerProgressBar).attr("id", node.id + "-outerprogressbar");
@@ -318,7 +339,7 @@ azkaban.ExecutionListView = Backbone.View.extend({
 		}
 
 		$(body).append(tr);
-		if (node.type=="flow") {
+		if (node.type == "flow") {
 			var subFlowRow = document.createElement("tr");
 			var subFlowCell = document.createElement("td");
 			$(subFlowCell).addClass("subflowrow");