azkaban-memoizeit

Details

diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 00f19a2..d342f0f 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -18,6 +18,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.webapp.servlet.AzkabanServletContextListener;
@@ -78,7 +79,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 					String user = getParam(req, USER_PARAM, null);
 					
 					logger.info("User " + user + " has called action " + action + " on " + execid);
-					if (action.equals(LOG_ACTION)) { 
+					if (action.equals(METADATA_ACTION)) {
+						handleFetchMetaDataEvent(execid, req, resp, respMap);
+					}
+					else if (action.equals(LOG_ACTION)) { 
 						handleFetchLogEvent(execid, req, resp, respMap);
 					}
 					else if (action.equals(EXECUTE_ACTION)) {
@@ -182,6 +186,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		}
 	}
 	
+	private void handleFetchMetaDataEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
+		int startByte = getIntParam(req, "offset");
+		int length = getIntParam(req, "length");
+		
+		resp.setContentType("text/plain");
+		resp.setCharacterEncoding("utf-8");
+		
+		int attempt = getIntParam(req, "attempt", 0);
+		String jobId = getParam(req, "jobId");
+		try {
+			JobMetaData result = flowRunnerManager.readJobMetaData(execId, jobId, attempt, startByte, length);
+			respMap.putAll(result.toObject());
+		} catch (Exception e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+
+	}
+	
 	@SuppressWarnings("unchecked")
 	private void handleAjaxUpdateRequest(HttpServletRequest req, Map<String, Object> respMap) throws ServletException, IOException {
 		ArrayList<Object> updateTimesList = (ArrayList<Object>)JSONUtils.parseJSONFromString(getParam(req, UPDATE_TIME_LIST_PARAM));
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index da34dff..fb9caf6 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -805,6 +805,20 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return logFile;
 	}
 	
+	public File getJobMetaDataFile(String jobId, int attempt) {
+		ExecutableNode node = flow.getExecutableNode(jobId);
+		File path = new File(execDir, node.getJobPropsSource());
+		
+		String metaDataFileName = JobRunner.createMetaDataFileName(execId, jobId, attempt);
+		File metaDataFile = new File(path.getParentFile(), metaDataFileName);
+		
+		if (!metaDataFile.exists()) {
+			return null;
+		}
+		
+		return metaDataFile;
+	}
+	
 	public boolean isRunnerThreadAlive() {
 		if (flowRunnerThread != null) {
 			return flowRunnerThread.isAlive();
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index b9aa2ec..d707c1f 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -49,6 +49,7 @@ import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
 
 import azkaban.utils.FileIOUtils;
+import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -572,6 +573,35 @@ public class FlowRunnerManager implements EventListener {
 		
 		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
 	}
+	
+	public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		if (runner == null) {
+			throw new ExecutorManagerException("Running flow " + execId + " not found.");
+		}
+		
+		File dir = runner.getExecutionDir();
+		if (dir != null && dir.exists()) {
+			try {
+				synchronized(executionDirDeletionSync) {
+					if (!dir.exists()) {
+						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
+					}
+					File metaDataFile = runner.getJobMetaDataFile(jobId, attempt);
+					if (metaDataFile != null && metaDataFile.exists()) {
+						return FileIOUtils.readUtf8MetaDataFile(metaDataFile, startByte, length);
+					}
+					else {
+						throw new ExecutorManagerException("Job log file doesn't exist.");
+					}
+				}
+			} catch (IOException e) {
+				throw new ExecutorManagerException(e);
+			}
+		}
+		
+		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
+	}
 
 	public long getLastCleanerThreadCheckTime() {
 		return lastCleanerThreadCheckTime;
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 143a861..9096b2f 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -346,6 +346,7 @@ public class JobRunner extends EventHandler implements Runnable {
 				logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
 			}
 			props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
+			props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, node.getJobId(), node.getAttempt()));
 			node.setStatus(Status.RUNNING);
 
 			// Ability to specify working directory
@@ -461,4 +462,8 @@ public class JobRunner extends EventHandler implements Runnable {
 	public static String createLogFileName(int executionId, String jobId, int attempt) {
 		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
 	}
+	
+	public static String createMetaDataFileName(int executionId, String jobId, int attempt) {
+		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" : "_job." + executionId + "." + jobId + ".meta";
+	}
 }
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 8ee9250..15a8bf8 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,6 +30,7 @@ public interface ConnectorParams {
 	public static final String RESUME_ACTION = "resume";
 	public static final String PING_ACTION = "ping";
 	public static final String LOG_ACTION = "log";
+	public static final String METADATA_ACTION = "metadata";
 	
 	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
 	public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5ca87eb..07b81a8 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -40,6 +40,7 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
 import azkaban.project.Project;
+import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
@@ -244,6 +245,25 @@ public class ExecutorManager {
 		}
 	}
 	
+	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+		if (pair != null) {
+
+			Pair<String,String> typeParam = new Pair<String,String>("type", "job");
+			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
+			Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
+			Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
+			Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+			
+			@SuppressWarnings("unchecked")
+			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+			return JobMetaData.createJobMetaDataFromObject(result);
+		}
+		else {
+			return null;
+		}
+	}
+	
 	public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
 		synchronized(exFlow) {
 			Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 5d71d30..35f308c 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -52,6 +52,11 @@ public class CommonJobProperties {
 	public static final String JOB_ATTEMPT = "azkaban.job.attempt";
 	
 	/**
+	 * The attempt number of the executing job.
+	 */
+	public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
+	
+	/**
 	 * The executing flow id
 	 */
 	public static final String FLOW_ID = "azkaban.flow.flowid";
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index 3656ff6..7934a19 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -198,6 +198,34 @@ public class FileIOUtils {
 		return new LogData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
 	}
 	
+	public static JobMetaData readUtf8MetaDataFile(File file, int fileOffset, int length) throws IOException {
+		byte[] buffer = new byte[length];
+		FileInputStream fileStream = new FileInputStream(file);
+		
+		long skipped = fileStream.skip(fileOffset);
+		if (skipped < fileOffset) {
+			return new JobMetaData(fileOffset, 0, "");
+		}
+		
+		BufferedInputStream inputStream = null;
+		int read = 0;
+		try {
+			inputStream = new BufferedInputStream(fileStream);
+			read = inputStream.read(buffer);
+		}
+		finally {
+			IOUtils.closeQuietly(inputStream);
+		}
+		
+		if (read <= 0) {
+			return new JobMetaData(fileOffset, 0, "");
+		}
+		Pair<Integer, Integer> utf8Range = getUtf8Range(buffer, 0, read);
+		String outputString = new String(buffer, utf8Range.getFirst(), utf8Range.getSecond());
+		
+		return new JobMetaData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
+	}
+	
 	/**
 	 * Returns first and length.
 	 */
@@ -316,4 +344,49 @@ public class FileIOUtils {
 		}
 	}
 
+	public static class JobMetaData {
+		private int offset;
+		private int length;
+		private String data;
+		
+		public JobMetaData(int offset, int length, String data) {
+			this.offset = offset;
+			this.length = length;
+			this.data = data;
+		}
+		
+		public int getOffset() {
+			return offset;
+		}
+		
+		public int getLength() {
+			return length;
+		}
+		
+		public String getData() {
+			return data;
+		}
+		
+		public Map<String,Object> toObject() {
+			HashMap<String,Object> map = new HashMap<String,Object>();
+			map.put("offset", offset);
+			map.put("length", length);
+			map.put("data", data);
+			
+			return map;
+		}
+		
+		public static JobMetaData createJobMetaDataFromObject(Map<String,Object> map) {
+			int offset = (Integer)map.get("offset");
+			int length = (Integer)map.get("length");
+			String data = (String)map.get("data");
+			
+			return new JobMetaData(offset,length, data);
+		}
+		
+		@Override
+		public String toString() {
+			return "[offset=" + offset + ",length="+length + ",data=" + data + "]";
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 50d0f89..ea19307 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -42,6 +42,7 @@ import azkaban.scheduler.ScheduleManager;
 import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.user.Permission.Type;
+import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
@@ -442,6 +443,51 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
+	/**
+	 * Gets the job metadata through ajax plain text stream to reduce memory overhead.
+	 * 
+	 * @param req
+	 * @param resp
+	 * @param user
+	 * @param exFlow
+	 * @throws ServletException
+	 */
+	private void ajaxFetchJobMetaData(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+		if (project == null) {
+			return;
+		}
+		
+		int offset = this.getIntParam(req, "offset");
+		int length = this.getIntParam(req, "length");
+		
+		String jobId = this.getParam(req, "jobId");
+		resp.setCharacterEncoding("utf-8");
+
+		try {
+			ExecutableNode node = exFlow.getExecutableNode(jobId);
+			if (node == null) {
+				ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+				return;
+			}
+			
+			int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+			JobMetaData data = executorManager.getExecutionJobMetaData(exFlow, jobId, offset, length, attempt);
+			if (data == null) {
+				ret.put("length", 0);
+				ret.put("offset", offset);
+				ret.put("data", "");
+			}
+			else {
+				ret.put("length", data.getLength());
+				ret.put("offset", data.getOffset());
+				ret.put("data", data.getData());
+			}
+		} catch (ExecutorManagerException e) {
+			throw new ServletException(e);
+		}
+	}
+	
 	private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
 		Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
 		if (project == null) {
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 4869710..bde19da 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -519,57 +519,33 @@ azkaban.FlowLogView = Backbone.View.extend({
 		var requestURL = contextURL + "/executor"; 
 		var model = this.model;
 		console.log("fetchLogs offset is " + offset)
-//		$.get(
-//			requestURL,
-//			{"execid": execId, "ajax":"fetchExecFlowLogs", "offset": offset, "length": 50000},
-//			function(data) {
-//	          console.log("fetchLogs");
-//	          if (data.error) {
-//	          	console.log(data.error);
-//	          }
-//	          else {
-//	          	var log = $("#logSection").text();
-//	          	if (!log) {
-//	          		log = data.data;
-//	          	}
-//	          	else {
-//	          		log += data.data;
-//	          	}
-//	          	
-//	          	var newOffset = data.offset + data.length;
-//	          	
-//	          	$("#logSection").text(log);
-//	          	model.set({"offset": newOffset, "log": log});
-//	          	$(".logViewer").scrollTop(9999);
-//	          }
-//	      }
-//	    );
+
 		$.ajax({async:false, 
 			url:requestURL,
 			data:{"execid": execId, "ajax":"fetchExecFlowLogs", "offset": offset, "length": 50000},
 			success:
 				function(data) {
-				console.log("fetchLogs");
-				if (data.error) {
-					console.log(data.error);
-				}
-				else {
-					var log = $("#logSection").text();
-					if (!log) {
-						log = data.data;
+					console.log("fetchLogs");
+					if (data.error) {
+						console.log(data.error);
 					}
 					else {
-						log += data.data;
+						var log = $("#logSection").text();
+						if (!log) {
+							log = data.data;
+						}
+						else {
+							log += data.data;
+						}
+	
+						var newOffset = data.offset + data.length;
+	
+						$("#logSection").text(log);
+						model.set({"offset": newOffset, "log": log});
+						$(".logViewer").scrollTop(9999);
 					}
-
-					var newOffset = data.offset + data.length;
-
-					$("#logSection").text(log);
-					model.set({"offset": newOffset, "log": log});
-					$(".logViewer").scrollTop(9999);
 				}
-			}
-		})
+		});
 	}
 });
 
@@ -714,14 +690,16 @@ var attemptRightClick = function(event) {
 
 $(function() {
 	var selected;
-
+	
 	graphModel = new azkaban.GraphModel();
 	logModel = new azkaban.LogModel();
+	
 	flowTabView = new azkaban.FlowTabView({el:$( '#headertabs'), model: graphModel});
 	mainSvgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel, rightClick:  { "node": exNodeClickCallback, "edge": exEdgeClickCallback, "graph": exGraphClickCallback }});
 	jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel, contextMenuCallback: exJobClickCallback});
-	statusView = new azkaban.StatusView({el:$('#flow-status'), model: graphModel});
 	flowLogView = new azkaban.FlowLogView({el:$('#flowLogView'), model: logModel});
+	statusView = new azkaban.StatusView({el:$('#flow-status'), model: graphModel});
+	
 	executionListView = new azkaban.ExecutionListView({el: $('#jobListView'), model:graphModel});
 	
 	var requestURL = contextURL + "/executor";