azkaban-aplcache

Details

diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index 0c0c7e0..40dd21a 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,6 +30,7 @@ public interface ConnectorParams {
 	public static final String RESUME_ACTION = "resume";
 	public static final String PING_ACTION = "ping";
 	public static final String LOG_ACTION = "log";
+	public static final String STATS_ACTION = "stats";
 	public static final String METADATA_ACTION = "metadata";
 	
 	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index 04e0e44..bb39046 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -47,6 +47,8 @@ public interface ExecutorLoader {
 
 	public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
 
+	public String fetchStats(int execId, String name) throws ExecutorManagerException;
+
 	public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
 
 	public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index cb763f5..63264d8 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -274,8 +274,11 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 	}
 	
 	@Override
-	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
-		Pair<ExecutionReference, ExecutableFlow> pair = runningFlows.get(exFlow.getExecutionId());
+	public LogData getExecutionJobLog(
+			ExecutableFlow exFlow, String jobId, int offset, int length, int attempt)
+			throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = 
+				runningFlows.get(exFlow.getExecutionId());
 		if (pair != null) {
 			Pair<String,String> typeParam = new Pair<String,String>("type", "job");
 			Pair<String,String> jobIdParam = new Pair<String,String>("jobId", jobId);
@@ -284,14 +287,43 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 			Pair<String,String> attemptParam = new Pair<String,String>("attempt", String.valueOf(attempt));
 			
 			@SuppressWarnings("unchecked")
-			Map<String, Object> result = callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, typeParam, jobIdParam, offsetParam, lengthParam, attemptParam);
+			Map<String, Object> result = callExecutorServer(
+					pair.getFirst(), 
+					ConnectorParams.LOG_ACTION, 
+					typeParam, 
+					jobIdParam, 
+					offsetParam, 
+					lengthParam, 
+					attemptParam);
 			return LogData.createLogDataFromObject(result);
 		}
 		else {
-			LogData value = executorLoader.fetchLogs(exFlow.getExecutionId(), jobId, attempt, offset, length);
+			LogData value = executorLoader.fetchLogs(
+					exFlow.getExecutionId(), jobId, attempt, offset, length);
 			return value;
 		}
 	}
+
+	@Override
+	public String getExecutionJobStats(ExecutableFlow exFlow, String jobId)
+			throws ExecutorManagerException {
+		Pair<ExecutionReference, ExecutableFlow> pair = 
+				runningFlows.get(exFlow.getExecutionId());
+		if (pair == null) {
+			return executorLoader.fetchStats(exFlow.getExecutionId(), jobId);
+		}
+
+		Pair<String, String> typeParam = new Pair<String, String>("type", "job");
+		Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Object> result = callExecutorServer(
+				pair.getFirst(),
+				ConnectorParams.STATS_ACTION,
+				typeParam,
+				jobIdParam);
+		return (String) result.get("stats");
+	}
 	
 	@Override
 	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException {
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index cc81589..e40c97b 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -90,6 +90,8 @@ public interface ExecutorManagerAdapter{
 	public LogData getExecutableFlowLog(ExecutableFlow exFlow, int offset, int length) throws ExecutorManagerException;
 	
 	public LogData getExecutionJobLog(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
+
+	public String getExecutionJobStats(ExecutableFlow exflow, String jobId) throws ExecutorManagerException;
 	
 	public JobMetaData getExecutionJobMetaData(ExecutableFlow exFlow, String jobId, int offset, int length, int attempt) throws ExecutorManagerException;
 	
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 70b73da..3fc12db 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -707,6 +707,24 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 					"Error fetching logs " + execId + " : " + name, e);
 		}
 	}
+
+	@Override
+	public String fetchStats(int execId, String jobId)
+			throws ExecutorManagerException {
+		QueryRunner runner = createQueryRunner();
+
+		try {
+			String stats = runner.query(
+					FetchExecutableJobStatsHandler.FETCH_ATTACHMENT_EXECUTABLE_NODE,
+					new FetchExecutableJobStatsHandler(),
+					execId,
+					jobId);
+			return stats;
+		}
+		catch (SQLException e) {
+			throw new ExecutorManagerException("Error query job stats " + jobId, e);
+		}
+	}
 	
 	@Override
 	public void uploadLogFile(
@@ -977,6 +995,28 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 			return execNodes;
 		}
 	}
+
+	private static class FetchExecutableJobStatsHandler
+			implements ResultSetHandler<String> {
+		private static String FETCH_ATTACHMENT_EXECUTABLE_NODE = 
+				"SELECT attachment FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public String handle(ResultSet rs) throws SQLException {
+			String statsJson = "";
+			if (rs.next()) {
+				try {
+					byte[] stats = rs.getBytes(1);
+					statsJson = GZIPUtils.unGzipString(stats, "UTF-8");
+				}
+				catch (IOException e) {
+					throw new SQLException("Error decoding job stats", e);
+				}
+			}
+			return statsJson;
+		}
+	}
 	
 	private static class FetchExecutableJobPropsHandler 
 			implements ResultSetHandler<Pair<Props, Props>> {
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 309ffc8..9781043 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -468,7 +468,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		
 		String jobId = this.getParam(req, "jobid");
 		resp.setCharacterEncoding("utf-8");
-    String statsFilePath = null;
 		try {
 			ExecutableNode node = exFlow.getExecutableNode(jobId);
 			if (node == null) {
@@ -476,16 +475,18 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
             exFlow.getExecutionId());
 				return;
 			}
-	
-      statsFilePath = statsDir + "/" + exFlow.getExecutionId() + "-" + 
-          jobId + "-stats.json";
-      File statsFile = new File(statsFilePath);
+
+			String statsJson = executorManager.getExecutionJobStats(exFlow, jobId);
       List<Object> jsonObj = 
-          (ArrayList<Object>) JSONUtils.parseJSONFromFile(statsFile);
+          (ArrayList<Object>) JSONUtils.parseJSONFromString(statsJson);
       ret.put("jobStats", jsonObj);
     }
+		catch (ExecutorManagerException e) {
+			ret.put("error", "Error retrieving stats for job " + jobId);
+			return;
+		}
     catch (IOException e) {
-      ret.put("error", "Cannot open stats file: " + statsFilePath);
+      ret.put("error", "Cannot write JSON for stats for job " + jobId);
       return;
 		}
   }