azkaban-memoizeit
Changes
src/java/azkaban/execapp/FlowRunner.java 14(+14 -0)
src/java/azkaban/utils/FileIOUtils.java 73(+73 -0)
src/web/js/azkaban.exflow.view.js 66(+22 -44)
Details
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 00f19a2..d342f0f 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -18,6 +18,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import azkaban.executor.ConnectorParams;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManagerException;
+import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.webapp.servlet.AzkabanServletContextListener;
@@ -78,7 +79,10 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
String user = getParam(req, USER_PARAM, null);
logger.info("User " + user + " has called action " + action + " on " + execid);
- if (action.equals(LOG_ACTION)) {
+ if (action.equals(METADATA_ACTION)) {
+ handleFetchMetaDataEvent(execid, req, resp, respMap);
+ }
+ else if (action.equals(LOG_ACTION)) {
handleFetchLogEvent(execid, req, resp, respMap);
}
else if (action.equals(EXECUTE_ACTION)) {
@@ -182,6 +186,25 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
+ private void handleFetchMetaDataEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
+ int startByte = getIntParam(req, "offset");
+ int length = getIntParam(req, "length");
+
+ resp.setContentType("text/plain");
+ resp.setCharacterEncoding("utf-8");
+
+ int attempt = getIntParam(req, "attempt", 0);
+ String jobId = getParam(req, "jobId");
+ try {
+ JobMetaData result = flowRunnerManager.readJobMetaData(execId, jobId, attempt, startByte, length);
+ respMap.putAll(result.toObject());
+ } catch (Exception e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+
+ }
+
@SuppressWarnings("unchecked")
private void handleAjaxUpdateRequest(HttpServletRequest req, Map<String, Object> respMap) throws ServletException, IOException {
ArrayList<Object> updateTimesList = (ArrayList<Object>)JSONUtils.parseJSONFromString(getParam(req, UPDATE_TIME_LIST_PARAM));
src/java/azkaban/execapp/FlowRunner.java 14(+14 -0)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index da34dff..fb9caf6 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -805,6 +805,20 @@ public class FlowRunner extends EventHandler implements Runnable {
return logFile;
}
+ public File getJobMetaDataFile(String jobId, int attempt) {
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ File path = new File(execDir, node.getJobPropsSource());
+
+ String metaDataFileName = JobRunner.createMetaDataFileName(execId, jobId, attempt);
+ File metaDataFile = new File(path.getParentFile(), metaDataFileName);
+
+ if (!metaDataFile.exists()) {
+ return null;
+ }
+
+ return metaDataFile;
+ }
+
public boolean isRunnerThreadAlive() {
if (flowRunnerThread != null) {
return flowRunnerThread.isAlive();
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index b9aa2ec..d707c1f 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -49,6 +49,7 @@ import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
import azkaban.utils.FileIOUtils;
+import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -572,6 +573,35 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
}
+
+ public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+ if (runner == null) {
+ throw new ExecutorManagerException("Running flow " + execId + " not found.");
+ }
+
+ File dir = runner.getExecutionDir();
+ if (dir != null && dir.exists()) {
+ try {
+ synchronized(executionDirDeletionSync) {
+ if (!dir.exists()) {
+ throw new ExecutorManagerException("Execution dir file doesn't exist. Probably has beend deleted");
+ }
+ File metaDataFile = runner.getJobMetaDataFile(jobId, attempt);
+ if (metaDataFile != null && metaDataFile.exists()) {
+ return FileIOUtils.readUtf8MetaDataFile(metaDataFile, startByte, length);
+ }
+ else {
+ throw new ExecutorManagerException("Job log file doesn't exist.");
+ }
+ }
+ } catch (IOException e) {
+ throw new ExecutorManagerException(e);
+ }
+ }
+
+ throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
+ }
public long getLastCleanerThreadCheckTime() {
return lastCleanerThreadCheckTime;
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 143a861..9096b2f 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -346,6 +346,7 @@ public class JobRunner extends EventHandler implements Runnable {
logInfo("Starting job " + node.getJobId() + " at " + node.getStartTime());
}
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
+ props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, node.getJobId(), node.getAttempt()));
node.setStatus(Status.RUNNING);
// Ability to specify working directory
@@ -461,4 +462,8 @@ public class JobRunner extends EventHandler implements Runnable {
public static String createLogFileName(int executionId, String jobId, int attempt) {
return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".log" : "_job." + executionId + "." + jobId + ".log";
}
+
+ public static String createMetaDataFileName(int executionId, String jobId, int attempt) {
+ return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".meta" : "_job." + executionId + "." + jobId + ".meta";
+ }
}
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 8ee9250..15a8bf8 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,6 +30,7 @@ public interface ConnectorParams {
public static final String RESUME_ACTION = "resume";
public static final String PING_ACTION = "ping";
public static final String LOG_ACTION = "log";
+ public static final String METADATA_ACTION = "metadata";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
public static final String MODIFY_EXECUTION_ACTION_TYPE = "modifyType";
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 5ca87eb..07b81a8 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -40,6 +40,7 @@ import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import azkaban.project.Project;
+import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
@@ -244,6 +245,25 @@ public class ExecutorManager {
}
}
+ public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+ if (pair != null) {
+
+ Pair<String,String> typeParam = new Pair<String,String>("type", "job");
+ Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
+ Pair<String,String> offsetParam = new Pair<String,String>("offset", String.valueOf(offset));
+ Pair<String,String> lengthParam = new Pair<String,String>("length", String.valueOf(length));
+ Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ return JobMetaData.createJobMetaDataFromObject(result);
+ }
+ else {
+ return null;
+ }
+ }
+
public void cancelFlow(ExecutableFlow exFlow, String userId) throws ExecutorManagerException {
synchronized(exFlow) {
Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 5d71d30..35f308c 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -52,6 +52,11 @@ public class CommonJobProperties {
public static final String JOB_ATTEMPT = "azkaban.job.attempt";
/**
+ * The attempt number of the executing job.
+ */
+ public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
+
+ /**
* The executing flow id
*/
public static final String FLOW_ID = "azkaban.flow.flowid";
src/java/azkaban/utils/FileIOUtils.java 73(+73 -0)
diff --git a/src/java/azkaban/utils/FileIOUtils.java b/src/java/azkaban/utils/FileIOUtils.java
index 3656ff6..7934a19 100644
--- a/src/java/azkaban/utils/FileIOUtils.java
+++ b/src/java/azkaban/utils/FileIOUtils.java
@@ -198,6 +198,34 @@ public class FileIOUtils {
return new LogData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
}
+ public static JobMetaData readUtf8MetaDataFile(File file, int fileOffset, int length) throws IOException {
+ byte[] buffer = new byte[length];
+ FileInputStream fileStream = new FileInputStream(file);
+
+ long skipped = fileStream.skip(fileOffset);
+ if (skipped < fileOffset) {
+ return new JobMetaData(fileOffset, 0, "");
+ }
+
+ BufferedInputStream inputStream = null;
+ int read = 0;
+ try {
+ inputStream = new BufferedInputStream(fileStream);
+ read = inputStream.read(buffer);
+ }
+ finally {
+ IOUtils.closeQuietly(inputStream);
+ }
+
+ if (read <= 0) {
+ return new JobMetaData(fileOffset, 0, "");
+ }
+ Pair<Integer, Integer> utf8Range = getUtf8Range(buffer, 0, read);
+ String outputString = new String(buffer, utf8Range.getFirst(), utf8Range.getSecond());
+
+ return new JobMetaData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
+ }
+
/**
* Returns first and length.
*/
@@ -316,4 +344,49 @@ public class FileIOUtils {
}
}
+ public static class JobMetaData {
+ private int offset;
+ private int length;
+ private String data;
+
+ public JobMetaData(int offset, int length, String data) {
+ this.offset = offset;
+ this.length = length;
+ this.data = data;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ public Map<String,Object> toObject() {
+ HashMap<String,Object> map = new HashMap<String,Object>();
+ map.put("offset", offset);
+ map.put("length", length);
+ map.put("data", data);
+
+ return map;
+ }
+
+ public static JobMetaData createJobMetaDataFromObject(Map<String,Object> map) {
+ int offset = (Integer)map.get("offset");
+ int length = (Integer)map.get("length");
+ String data = (String)map.get("data");
+
+ return new JobMetaData(offset,length, data);
+ }
+
+ @Override
+ public String toString() {
+ return "[offset=" + offset + ",length="+length + ",data=" + data + "]";
+ }
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 50d0f89..ea19307 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -42,6 +42,7 @@ import azkaban.scheduler.ScheduleManager;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.user.Permission.Type;
+import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.webapp.AzkabanWebServer;
import azkaban.webapp.session.Session;
@@ -442,6 +443,51 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
}
}
+ /**
+ * Gets the job metadata through ajax plain text stream to reduce memory overhead.
+ *
+ * @param req
+ * @param resp
+ * @param user
+ * @param exFlow
+ * @throws ServletException
+ */
+ private void ajaxFetchJobMetaData(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, ExecutableFlow exFlow) throws ServletException {
+ Project project = getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
+ if (project == null) {
+ return;
+ }
+
+ int offset = this.getIntParam(req, "offset");
+ int length = this.getIntParam(req, "length");
+
+ String jobId = this.getParam(req, "jobId");
+ resp.setCharacterEncoding("utf-8");
+
+ try {
+ ExecutableNode node = exFlow.getExecutableNode(jobId);
+ if (node == null) {
+ ret.put("error", "Job " + jobId + " doesn't exist in " + exFlow.getExecutionId());
+ return;
+ }
+
+ int attempt = this.getIntParam(req, "attempt", node.getAttempt());
+ JobMetaData data = executorManager.getExecutionJobMetaData(exFlow, jobId, offset, length, attempt);
+ if (data == null) {
+ ret.put("length", 0);
+ ret.put("offset", offset);
+ ret.put("data", "");
+ }
+ else {
+ ret.put("length", data.getLength());
+ ret.put("offset", data.getOffset());
+ ret.put("data", data.getData());
+ }
+ } catch (ExecutorManagerException e) {
+ throw new ServletException(e);
+ }
+ }
+
private void ajaxFetchFlowInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user, String projectName, String flowId) throws ServletException {
Project project = getProjectAjaxByPermission(ret, projectName, user, Type.READ);
if (project == null) {
src/web/js/azkaban.exflow.view.js 66(+22 -44)
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index 4869710..bde19da 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -519,57 +519,33 @@ azkaban.FlowLogView = Backbone.View.extend({
var requestURL = contextURL + "/executor";
var model = this.model;
console.log("fetchLogs offset is " + offset)
-// $.get(
-// requestURL,
-// {"execid": execId, "ajax":"fetchExecFlowLogs", "offset": offset, "length": 50000},
-// function(data) {
-// console.log("fetchLogs");
-// if (data.error) {
-// console.log(data.error);
-// }
-// else {
-// var log = $("#logSection").text();
-// if (!log) {
-// log = data.data;
-// }
-// else {
-// log += data.data;
-// }
-//
-// var newOffset = data.offset + data.length;
-//
-// $("#logSection").text(log);
-// model.set({"offset": newOffset, "log": log});
-// $(".logViewer").scrollTop(9999);
-// }
-// }
-// );
+
$.ajax({async:false,
url:requestURL,
data:{"execid": execId, "ajax":"fetchExecFlowLogs", "offset": offset, "length": 50000},
success:
function(data) {
- console.log("fetchLogs");
- if (data.error) {
- console.log(data.error);
- }
- else {
- var log = $("#logSection").text();
- if (!log) {
- log = data.data;
+ console.log("fetchLogs");
+ if (data.error) {
+ console.log(data.error);
}
else {
- log += data.data;
+ var log = $("#logSection").text();
+ if (!log) {
+ log = data.data;
+ }
+ else {
+ log += data.data;
+ }
+
+ var newOffset = data.offset + data.length;
+
+ $("#logSection").text(log);
+ model.set({"offset": newOffset, "log": log});
+ $(".logViewer").scrollTop(9999);
}
-
- var newOffset = data.offset + data.length;
-
- $("#logSection").text(log);
- model.set({"offset": newOffset, "log": log});
- $(".logViewer").scrollTop(9999);
}
- }
- })
+ });
}
});
@@ -714,14 +690,16 @@ var attemptRightClick = function(event) {
$(function() {
var selected;
-
+
graphModel = new azkaban.GraphModel();
logModel = new azkaban.LogModel();
+
flowTabView = new azkaban.FlowTabView({el:$( '#headertabs'), model: graphModel});
mainSvgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel, rightClick: { "node": exNodeClickCallback, "edge": exEdgeClickCallback, "graph": exGraphClickCallback }});
jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel, contextMenuCallback: exJobClickCallback});
- statusView = new azkaban.StatusView({el:$('#flow-status'), model: graphModel});
flowLogView = new azkaban.FlowLogView({el:$('#flowLogView'), model: logModel});
+ statusView = new azkaban.StatusView({el:$('#flow-status'), model: graphModel});
+
executionListView = new azkaban.ExecutionListView({el: $('#jobListView'), model:graphModel});
var requestURL = contextURL + "/executor";