azkaban-memoizeit

Details

diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 00f19a2..d342f0f 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -18,6 +18,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.webapp.servlet.AzkabanServletContextListener;
@@ -78,7 +79,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 					String user = getParam(req, USER_PARAM, null);
 					
 					logger.info("User " + user + " has called action " + action + " on " + execid);
-					if (action.equals(LOG_ACTION)) { 
+					if (action.equals(METADATA_ACTION)) {
+						handleFetchMetaDataEvent(execid, req, resp, respMap);
+					}
+					else if (action.equals(LOG_ACTION)) { 
 						handleFetchLogEvent(execid, req, resp, respMap);
 					}
 					else if (action.equals(EXECUTE_ACTION)) {
@@ -182,6 +186,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		}
 	}
 	
+	private void handleFetchMetaDataEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
+		int startByte = getIntParam(req, "offset");
+		int length = getIntParam(req, "length");
+		
+		resp.setContentType("text/plain");
+		resp.setCharacterEncoding("utf-8");
+		
+		int attempt = getIntParam(req, "attempt", 0);
+		String jobId = getParam(req, "jobId");
+		try {
+			JobMetaData result = flowRunnerManager.readJobMetaData(execId, jobId, attempt, startByte, length);
+			respMap.putAll(result.toObject());
+		} catch (Exception e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+
+	}
+	
 	@SuppressWarnings("unchecked")
 	private void handleAjaxUpdateRequest(HttpServletRequest req, Map<String, Object> respMap) throws ServletException, IOException {
 		ArrayList<Object> updateTimesList = (ArrayList<Object>)JSONUtils.parseJSONFromString(getParam(req, UPDATE_TIME_LIST_PARAM));
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index b58ae29..ff2e5f0 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -717,6 +717,20 @@ public class FlowRunner extends EventHandler implements Runnable {
 		return logFile;
 	}
 	
+	public File getJobMetaDataFile(String jobId, int attempt) {
+		ExecutableNode node = flow.getExecutableNode(jobId);
+		File path = new File(execDir, node.getJobPropsSource());
+		
+		String metaDataFileName = JobRunner.createMetaDataFileName(execId, jobId, attempt);
+		File metaDataFile = new File(path.getParentFile(), metaDataFileName);
+		
+		if (!metaDataFile.exists()) {
+			return null;
+		}
+		
+		return metaDataFile;
+	}
+	
 	public boolean isRunnerThreadAlive() {
 		if (flowRunnerThread != null) {
 			return flowRunnerThread.isAlive();
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 79192c0..c64b642 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -49,6 +49,7 @@ import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
 
 import azkaban.utils.FileIOUtils;
+import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -566,6 +567,35 @@ public class FlowRunnerManager implements EventListener {
 		
 		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
 	}
+	
+	public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		if (runner == null) {
+			throw new ExecutorManagerException("Running flow " + execId + " not found.");
+		}
+		
+		File dir = runner.getExecutionDir();
+		if (dir != null && dir.exists()) {
+			try {
+				synchronized(executionDirDeletionSync) {
+					if (!dir.exists()) {
+						throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
+					}
+					File metaDataFile = runner.getJobMetaDataFile(jobId, attempt);
+					if (metaDataFile != null && metaDataFile.exists()) {
+						return FileIOUtils.readUtf8MetaDataFile(metaDataFile, startByte, length);
+					}
+					else {
+						throw new ExecutorManagerException("Job log file doesn't exist.");
+					}
+				}
+			} catch (IOException e) {
+				throw new ExecutorManagerException(e);
+			}
+		}
+		
+		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
+	}
 
 	public long getLastCleanerThreadCheckTime() {
 		return lastCleanerThreadCheckTime;
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 9ceb7c0..3d18fe2 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -289,6 +289,7 @@ public class JobRunner extends EventHandler implements Runnable {
 				logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
 			}
 			props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
+			props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, node.getJobId(), node.getAttempt()));
 			node.setStatus(Status.RUNNING);
 
 			// Ability to specify working directory
@@ -382,4 +383,8 @@ public class JobRunner extends EventHandler implements Runnable {
 	public static String createLogFileName(int executionId, String jobId, int attempt) {
 		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
 	}
+	
+	public static String createMetaDataFileName(int executionId, String jobId, int attempt) {
+		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" : "_job." + executionId + "." + jobId + ".meta";
+	}
 }
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 8ee9250..15a8bf8 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,6 +30,7 @@ public interface ConnectorParams {
 	public static final String RESUME_ACTION = "resume";
 	public static final String PING_ACTION = "ping";
 	public static final String LOG_ACTION = "log";
+	public static final String METADATA_ACTION = "metadata";
 	
 	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
 	public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 83aa3fe..b3ab60d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -40,6 +40,7 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
 import azkaban.project.Project;
+import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
@@ -244,6 +245,25 @@ public class ExecutorManager {
 		}
 	}
 	
+	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+		if (pair != null) {
+
+			Pair<String,String> typeParam = new Pair<String,String>("type", "job");
+			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
+			Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
+			Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
+			Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+			
+			@SuppressWarnings("unchecked")
+			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+			return JobMetaData.createJobMetaDataFromObject(result);
+		}
+		else {
+			return null;
+		}
+	}
+	
 	public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
 		synchronized(exFlow) {
 			Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 5d71d30..35f308c 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -52,6 +52,11 @@ public class CommonJobProperties {
 	public static final String JOB_ATTEMPT = "azkaban.job.attempt";
 	
 	/**
+	 * The attempt number of the executing job.
+	 */
+	public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
+	
+	/**
 	 * The executing flow id
 	 */
 	public static final String FLOW_ID = "azkaban.flow.flowid";
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index d09b3a0..2b1f319 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -196,6 +196,34 @@ public class FileIOUtils {
 		return new LogData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
 	}
 	
+	public static JobMetaData readUtf8MetaDataFile(File file, int fileOffset, int length) throws IOException {
+		byte[] buffer = new byte[length];
+		FileInputStream fileStream = new FileInputStream(file);
+		
+		long skipped = fileStream.skip(fileOffset);
+		if (skipped < fileOffset) {
+			return new JobMetaData(fileOffset, 0, "");
+		}
+		
+		BufferedInputStream inputStream = null;
+		int read = 0;
+		try {
+			inputStream = new BufferedInputStream(fileStream);
+			read = inputStream.read(buffer);
+		}
+		finally {
+			IOUtils.closeQuietly(inputStream);
+		}
+		
+		if (read <= 0) {
+			return new JobMetaData(fileOffset, 0, "");
+		}
+		Pair<Integer, Integer> utf8Range = getUtf8Range(buffer, 0, read);
+		String outputString = new String(buffer, utf8Range.getFirst(), utf8Range.getSecond());
+		
+		return new JobMetaData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
+	}
+	
 	/**
 	 * Returns first and length.
 	 */
@@ -314,4 +342,49 @@ public class FileIOUtils {
 		}
 	}
 
+	public static class JobMetaData {
+		private int offset;
+		private int length;
+		private String data;
+		
+		public JobMetaData(int offset, int length, String data) {
+			this.offset = offset;
+			this.length = length;
+			this.data = data;
+		}
+		
+		public int getOffset() {
+			return offset;
+		}
+		
+		public int getLength() {
+			return length;
+		}
+		
+		public String getData() {
+			return data;
+		}
+		
+		public Map<String,Object> toObject() {
+			HashMap<String,Object> map = new HashMap<String,Object>();
+			map.put("offset", offset);
+			map.put("length", length);
+			map.put("data", data);
+			
+			return map;
+		}
+		
+		public static JobMetaData createJobMetaDataFromObject(Map<String,Object> map) {
+			int offset = (Integer)map.get("offset");
+			int length = (Integer)map.get("length");
+			String data = (String)map.get("data");
+			
+			return new JobMetaData(offset,length, data);
+		}
+		
+		@Override
+		public String toString() {
+			return "[offset=" + offset + ",length="+length + ",data=" + data + "]";
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 2536fbd..89f8470 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -42,6 +42,7 @@ import azkaban.scheduler.ScheduleManager;
 import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.user.Permission.Type;
+import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
@@ -440,6 +441,51 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
+	/**
+	 * Gets the job metadata through ajax plain text stream to reduce memory overhead.
+	 * 
+	 * @param req
+	 * @param resp
+	 * @param user
+	 * @param exFlow
+	 * @throws ServletException
+	 */
+	private void ajaxFetchJobMetaData(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
+		Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+		if (project == null) {
+			return;
+		}
+		
+		int offset = this.getIntParam(req, "offset");
+		int length = this.getIntParam(req, "length");
+		
+		String jobId = this.getParam(req, "jobId");
+		resp.setCharacterEncoding("utf-8");
+
+		try {
+			ExecutableNode node = exFlow.getExecutableNode(jobId);
+			if (node == null) {
+				ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+				return;
+			}
+			
+			int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+			JobMetaData data = executorManager.getExecutionJobMetaData(exFlow, jobId, offset, length, attempt);
+			if (data == null) {
+				ret.put("length", 0);
+				ret.put("offset", offset);
+				ret.put("data", "");
+			}
+			else {
+				ret.put("length", data.getLength());
+				ret.put("offset", data.getOffset());
+				ret.put("data", data.getData());
+			}
+		} catch (ExecutorManagerException e) {
+			throw new ServletException(e);
+		}
+	}
+	
 	private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
 		Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
 		if (project == null) {