azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 14(+14 -0)
src/java/azkaban/utils/FileIOUtils.java 73(+73 -0)
Details
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 00f19a2..d342f0f 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -18,6 +18,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManagerException;
+import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.webapp.servlet.AzkabanServletContextListener;
@@ -78,7 +79,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
String user = getParam(req, USER_PARAM, null);
logger.info("User " + user + " has called action " + action + " on " + execid);
- if (action.equals(LOG_ACTION)) {
+ if (action.equals(METADATA_ACTION)) {
+ handleFetchMetaDataEvent(execid, req, resp, respMap);
+ }
+ else if (action.equals(LOG_ACTION)) {
handleFetchLogEvent(execid, req, resp, respMap);
}
else if (action.equals(EXECUTE_ACTION)) {
@@ -182,6 +186,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
+ private void handleFetchMetaDataEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
+ int startByte = getIntParam(req, "offset");
+ int length = getIntParam(req, "length");
+
+ resp.setContentType("text/plain");
+ resp.setCharacterEncoding("utf-8");
+
+ int attempt = getIntParam(req, "attempt", 0);
+ String jobId = getParam(req, "jobId");
+ try {
+ JobMetaData result = flowRunnerManager.readJobMetaData(execId, jobId, attempt, startByte, length);
+ respMap.putAll(result.toObject());
+ } catch (Exception e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+
+ }
+
@SuppressWarnings("unchecked")
private void handleAjaxUpdateRequest(HttpServletRequest req, Map<String, Object> respMap) throws ServletException, IOException {
ArrayList<Object> updateTimesList = (ArrayList<Object>)JSONUtils.parseJSONFromString(getParam(req, UPDATE_TIME_LIST_PARAM));
src/java/azkaban/execapp/FlowRunner.java 14(+14 -0)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index b58ae29..ff2e5f0 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -717,6 +717,20 @@ public class FlowRunner extends EventHandler implements Runnable {
return logFile;
}
+ public File getJobMetaDataFile(String jobId, int attempt) {
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ File path = new File(execDir, node.getJobPropsSource());
+
+ String metaDataFileName = JobRunner.createMetaDataFileName(execId, jobId, attempt);
+ File metaDataFile = new File(path.getParentFile(), metaDataFileName);
+
+ if (!metaDataFile.exists()) {
+ return null;
+ }
+
+ return metaDataFile;
+ }
+
public boolean isRunnerThreadAlive() {
if (flowRunnerThread != null) {
return flowRunnerThread.isAlive();
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 79192c0..c64b642 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -49,6 +49,7 @@ import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
import azkaban.utils.FileIOUtils;
+import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -566,6 +567,35 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
}
+
+ public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+ if (runner == null) {
+ throw new ExecutorManagerException("Running flow " + execId + " not found.");
+ }
+
+ File dir = runner.getExecutionDir();
+ if (dir != null && dir.exists()) {
+ try {
+ synchronized(executionDirDeletionSync) {
+ if (!dir.exists()) {
+ throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
+ }
+ File metaDataFile = runner.getJobMetaDataFile(jobId, attempt);
+ if (metaDataFile != null && metaDataFile.exists()) {
+ return FileIOUtils.readUtf8MetaDataFile(metaDataFile, startByte, length);
+ }
+ else {
+ throw new ExecutorManagerException("Job log file doesn't exist.");
+ }
+ }
+ } catch (IOException e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
+
+ throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
+ }
public long getLastCleanerThreadCheckTime() {
return lastCleanerThreadCheckTime;
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 9ceb7c0..3d18fe2 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -289,6 +289,7 @@ public class JobRunner extends EventHandler implements Runnable {
logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
}
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
+ props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, node.getJobId(), node.getAttempt()));
node.setStatus(Status.RUNNING);
// Ability to specify working directory
@@ -382,4 +383,8 @@ public class JobRunner extends EventHandler implements Runnable {
public static String createLogFileName(int executionId, String jobId, int attempt) {
return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
}
+
+ public static String createMetaDataFileName(int executionId, String jobId, int attempt) {
+ return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" : "_job." + executionId + "." + jobId + ".meta";
+ }
}
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 8ee9250..15a8bf8 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,6 +30,7 @@ public interface ConnectorParams {
public static final String RESUME_ACTION = "resume";
public static final String PING_ACTION = "ping";
public static final String LOG_ACTION = "log";
+ public static final String METADATA_ACTION = "metadata";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 83aa3fe..b3ab60d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -40,6 +40,7 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import azkaban.project.Project;
+import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
@@ -244,6 +245,25 @@ public class ExecutorManager {
}
}
+ public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+ if (pair != null) {
+
+ Pair<String,String> typeParam = new Pair<String,String>("type", "job");
+ Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
+ Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
+ Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
+ Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ return JobMetaData.createJobMetaDataFromObject(result);
+ }
+ else {
+ return null;
+ }
+ }
+
public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
synchronized(exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 5d71d30..35f308c 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -52,6 +52,11 @@ public class CommonJobProperties {
public static final String JOB_ATTEMPT = "azkaban.job.attempt";
/**
+ * The attempt number of the executing job.
+ */
+ public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
+
+ /**
* The executing flow id
*/
public static final String FLOW_ID = "azkaban.flow.flowid";
src/java/azkaban/utils/FileIOUtils.java 73(+73 -0)
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index d09b3a0..2b1f319 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -196,6 +196,34 @@ public class FileIOUtils {
return new LogData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
}
+ public static JobMetaData readUtf8MetaDataFile(File file, int fileOffset, int length) throws IOException {
+ byte[] buffer = new byte[length];
+ FileInputStream fileStream = new FileInputStream(file);
+
+ long skipped = fileStream.skip(fileOffset);
+ if (skipped < fileOffset) {
+ return new JobMetaData(fileOffset, 0, "");
+ }
+
+ BufferedInputStream inputStream = null;
+ int read = 0;
+ try {
+ inputStream = new BufferedInputStream(fileStream);
+ read = inputStream.read(buffer);
+ }
+ finally {
+ IOUtils.closeQuietly(inputStream);
+ }
+
+ if (read <= 0) {
+ return new JobMetaData(fileOffset, 0, "");
+ }
+ Pair<Integer, Integer> utf8Range = getUtf8Range(buffer, 0, read);
+ String outputString = new String(buffer, utf8Range.getFirst(), utf8Range.getSecond());
+
+ return new JobMetaData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
+ }
+
/**
* Returns first and length.
*/
@@ -314,4 +342,49 @@ public class FileIOUtils {
}
}
+ public static class JobMetaData {
+ private int offset;
+ private int length;
+ private String data;
+
+ public JobMetaData(int offset, int length, String data) {
+ this.offset = offset;
+ this.length = length;
+ this.data = data;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ public Map<String,Object> toObject() {
+ HashMap<String,Object> map = new HashMap<String,Object>();
+ map.put("offset", offset);
+ map.put("length", length);
+ map.put("data", data);
+
+ return map;
+ }
+
+ public static JobMetaData createJobMetaDataFromObject(Map<String,Object> map) {
+ int offset = (Integer)map.get("offset");
+ int length = (Integer)map.get("length");
+ String data = (String)map.get("data");
+
+ return new JobMetaData(offset,length, data);
+ }
+
+ @Override
+ public String toString() {
+ return "[offset=" + offset + ",length="+length + ",data=" + data + "]";
+ }
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 2536fbd..89f8470 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -42,6 +42,7 @@ import azkaban.scheduler.ScheduleManager;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.user.Permission.Type;
+import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
@@ -440,6 +441,51 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
+ /**
+ * Gets the job metadata through ajax plain text stream to reduce memory overhead.
+ *
+ * @param req
+ * @param resp
+ * @param user
+ * @param exFlow
+ * @throws ServletException
+ */
+ private void ajaxFetchJobMetaData(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+ if (project == null) {
+ return;
+ }
+
+ int offset = this.getIntParam(req, "offset");
+ int length = this.getIntParam(req, "length");
+
+ String jobId = this.getParam(req, "jobId");
+ resp.setCharacterEncoding("utf-8");
+
+ try {
+ ExecutableNode node = exFlow.getExecutableNode(jobId);
+ if (node == null) {
+ ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+ return;
+ }
+
+ int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+ JobMetaData data = executorManager.getExecutionJobMetaData(exFlow, jobId, offset, length, attempt);
+ if (data == null) {
+ ret.put("length", 0);
+ ret.put("offset", offset);
+ ret.put("data", "");
+ }
+ else {
+ ret.put("length", data.getLength());
+ ret.put("offset", data.getOffset());
+ ret.put("data", data.getData());
+ }
+ } catch (ExecutorManagerException e) {
+ throw new ServletException(e);
+ }
+ }
+
private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
if (project == null) {