Details
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 0c0c7e0..40dd21a 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,6 +30,7 @@ public interface ConnectorParams {
public static final String RESUME_ACTION = "resume";
public static final String PING_ACTION = "ping";
public static final String LOG_ACTION = "log";
+ public static final String STATS_ACTION = "stats";
public static final String METADATA_ACTION = "metadata";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 04e0e44..bb39046 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -47,6 +47,8 @@ public interface ExecutorLoader {
public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
+ public String fetchStats(int execId, String name) throws ExecutorManagerException;
+
public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index cb763f5..63264d8 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -274,8 +274,11 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
}
@Override
- public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
- Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+ public LogData getExecutionJobLog(
+ ExecutableFlow exFlow, String jobId, int offset, int length, int attempt)
+ throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(exFlow.getExecutionId());
if (pair != null) {
Pair<String,String> typeParam = new Pair<String,String>("type", "job");
Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
@@ -284,14 +287,43 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
@SuppressWarnings("unchecked")
- Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+ Map<String, Object> result = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.LOG_ACTION,
+ typeParam,
+ jobIdParam,
+ offsetParam,
+ lengthParam,
+ attemptParam);
return LogData.createLogDataFromObject(result);
}
else {
- LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
+ LogData value = executorLoader.fetchLogs(
+ exFlow.getExecutionId(), jobId, attempt, offset, length);
return value;
}
}
+
+ @Override
+ public String getExecutionJobStats(ExecutableFlow exFlow, String jobId)
+ throws ExecutorManagerException {
+ Pair<ExecutionReference, ExecutableFlow> pair =
+ runningFlows.get(exFlow.getExecutionId());
+ if (pair == null) {
+ return executorLoader.fetchStats(exFlow.getExecutionId(), jobId);
+ }
+
+ Pair<String, String> typeParam = new Pair<String, String>("type", "job");
+ Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> result = callExecutorServer(
+ pair.getFirst(),
+ ConnectorParams.STATS_ACTION,
+ typeParam,
+ jobIdParam);
+ return (String) result.get("stats");
+ }
@Override
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index cc81589..e40c97b 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -90,6 +90,8 @@ public interface ExecutorManagerAdapter{
public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException;
public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
+
+ public String getExecutionJobStats(ExecutableFlow exflow, String jobId) throws ExecutorManagerException;
public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 70b73da..3fc12db 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -707,6 +707,24 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
"Error fetching logs " + execId + " : " + name, e);
}
}
+
+ @Override
+ public String fetchStats(int execId, String jobId)
+ throws ExecutorManagerException {
+ QueryRunner runner = createQueryRunner();
+
+ try {
+ String stats = runner.query(
+ FetchExecutableJobStatsHandler.FETCH_ATTACHMENT_EXECUTABLE_NODE,
+ new FetchExecutableJobStatsHandler(),
+ execId,
+ jobId);
+ return stats;
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException("Error query job stats " + jobId, e);
+ }
+ }
@Override
public void uploadLogFile(
@@ -977,6 +995,28 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
return execNodes;
}
}
+
+ private static class FetchExecutableJobStatsHandler
+ implements ResultSetHandler<String> {
+ private static String FETCH_ATTACHMENT_EXECUTABLE_NODE =
+ "SELECT attachment FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public String handle(ResultSet rs) throws SQLException {
+ String statsJson = "";
+ if (rs.next()) {
+ try {
+ byte[] stats = rs.getBytes(1);
+ statsJson = GZIPUtils.unGzipString(stats, "UTF-8");
+ }
+ catch (IOException e) {
+ throw new SQLException("Error decoding job stats", e);
+ }
+ }
+ return statsJson;
+ }
+ }
private static class FetchExecutableJobPropsHandler
implements ResultSetHandler<Pair<Props, Props>> {
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 309ffc8..9781043 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -468,7 +468,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
String jobId = this.getParam(req, "jobid");
resp.setCharacterEncoding("utf-8");
- String statsFilePath = null;
try {
ExecutableNode node = exFlow.getExecutableNode(jobId);
if (node == null) {
@@ -476,16 +475,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
exFlow.getExecutionId());
return;
}
-
- statsFilePath = statsDir + "/" + exFlow.getExecutionId() + "-" +
- jobId + "-stats.json";
- File statsFile = new File(statsFilePath);
+
+ String statsJson = executorManager.getExecutionJobStats(exFlow, jobId);
List<Object> jsonObj =
- (ArrayList<Object>) JSONUtils.parseJSONFromFile(statsFile);
+ (ArrayList<Object>) JSONUtils.parseJSONFromString(statsJson);
ret.put("jobStats", jsonObj);
}
+ catch (ExecutorManagerException e) {
+ ret.put("error", "Error retrieving stats for job " + jobId);
+ return;
+ }
catch (IOException e) {
- ret.put("error", "Cannot open stats file: " + statsFilePath);
+ ret.put("error", "Cannot write JSON for stats for job " + jobId);
return;
}
}