Details
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 94fb4c2..277374e 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -47,7 +47,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
private AzkabanExecutorServer application;
private FlowRunnerManager flowRunnerManager;
-
public ExecutorServlet() {
super();
}
@@ -63,7 +62,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
flowRunnerManager = application.getFlowRunnerManager();
}
-
protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
resp.setContentType(JSON_MIME_TYPE);
ObjectMapper mapper = new ObjectMapper();
@@ -100,6 +98,9 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
else if (action.equals(LOG_ACTION)) {
handleFetchLogEvent(execid, req, resp, respMap);
}
+ else if (action.equals(STATS_ACTION)) {
+ handleFetchStatsEvent(execid, req, resp, respMap);
+ }
else if (action.equals(EXECUTE_ACTION)) {
handleAjaxExecute(req, respMap, execid);
}
@@ -170,7 +171,11 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
- private void handleFetchLogEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
+ private void handleFetchLogEvent(
+ int execId,
+ HttpServletRequest req,
+ HttpServletResponse resp,
+ Map<String, Object> respMap) throws ServletException {
String type = getParam(req, "type");
int startByte = getIntParam(req, "offset");
int length = getIntParam(req, "length");
@@ -200,6 +205,23 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
}
+
+ private void handleFetchStatsEvent(
+ int execId,
+ HttpServletRequest req,
+ HttpServletResponse resp,
+ Map<String, Object> respMap) throws ServletException {
+
+ String jobId = getParam(req, "jobId");
+ try {
+ String result = flowRunnerManager.readJobAttachment(execId, jobId);
+ respMap.put("stats", result);
+ }
+ catch (Exception e) {
+ logger.error(e);
+ respMap.put("error", e.getMessage());
+ }
+ }
private void handleFetchMetaDataEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
int startByte = getIntParam(req, "offset");
@@ -217,7 +239,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
logger.error(e);
respMap.put("error", e.getMessage());
}
-
}
@SuppressWarnings("unchecked")
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 7021c01..2159f4f 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -839,6 +839,11 @@ public class FlowRunner extends EventHandler implements Runnable {
return logFile;
}
+
+ public String getJobAttachment(String jobId) {
+ ExecutableNode node = flow.getExecutableNode(jobId);
+ return node.getAttachment();
+ }
public File getJobMetaDataFile(String jobId, int attempt) {
ExecutableNode node = flow.getExecutableNode(jobId);
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index ecc0847..9a1c44f 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -137,7 +137,6 @@ public class FlowRunnerManager implements EventListener {
cleanerThread.start();
jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
-
}
private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
@@ -574,6 +573,16 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
}
+
+ public String readJobAttachment(int execId, String jobId)
+ throws ExecutorManagerException {
+ FlowRunner runner = runningFlows.get(execId);
+ if (runner == null) {
+ throw new ExecutorManagerException("Running flow " + execId + " not found.");
+ }
+
+ return runner.getJobAttachment(jobId);
+ }
public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(execId);
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 9d807ff..187d969 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -61,6 +61,8 @@ public class ExecutableNode {
private Props inputProps;
private Props outputProps;
+
+ private String attachment;
public static final String ATTEMPT_PARAM = "attempt";
public static final String PASTATTEMPTS_PARAM = "pastAttempts";
@@ -204,6 +206,14 @@ public class ExecutableNode {
public Props getOutputProps() {
return outputProps;
}
+
+ public String getAttachment() {
+ return attachment;
+ }
+
+ public void setAttachment(String attachment) {
+ this.attachment = attachment;
+ }
public long getDelayedExecution() {
return delayExecution;
@@ -428,4 +438,4 @@ public class ExecutableNode {
}
}
}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 63264d8..7b86331 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -313,14 +313,12 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
return executorLoader.fetchStats(exFlow.getExecutionId(), jobId);
}
- Pair<String, String> typeParam = new Pair<String, String>("type", "job");
Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
-
+
@SuppressWarnings("unchecked")
Map<String, Object> result = callExecutorServer(
pair.getFirst(),
ConnectorParams.STATS_ACTION,
- typeParam,
jobIdParam);
return (String) result.get("stats");
}