azkaban-aplcache

Add attachment to ExecutableNode and plumb reading attachment

1/16/2014 1:21:06 AM

Details

diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 94fb4c2..277374e 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -47,7 +47,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 	private AzkabanExecutorServer application;
 	private FlowRunnerManager flowRunnerManager;
 
-	
 	public ExecutorServlet() {
 		super();
 	}
@@ -63,7 +62,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		flowRunnerManager = application.getFlowRunnerManager();
 	}
 
-	
 	protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
 		resp.setContentType(JSON_MIME_TYPE);
 		ObjectMapper mapper = new ObjectMapper();
@@ -100,6 +98,9 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 					else if (action.equals(LOG_ACTION)) { 
 						handleFetchLogEvent(execid, req, resp, respMap);
 					}
+					else if (action.equals(STATS_ACTION)) { 
+						handleFetchStatsEvent(execid, req, resp, respMap);
+					}
 					else if (action.equals(EXECUTE_ACTION)) {
 						handleAjaxExecute(req, respMap, execid);
 					}
@@ -170,7 +171,11 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		}
 	}
 	
-	private void handleFetchLogEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
+	private void handleFetchLogEvent(
+			int execId, 
+			HttpServletRequest req, 
+			HttpServletResponse resp, 
+			Map<String, Object> respMap) throws ServletException {
 		String type = getParam(req, "type");
 		int startByte = getIntParam(req, "offset");
 		int length = getIntParam(req, "length");
@@ -200,6 +205,23 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			}
 		}
 	}
+
+	private void handleFetchStatsEvent(
+			int execId, 
+			HttpServletRequest req, 
+			HttpServletResponse resp, 
+			Map<String, Object> respMap) throws ServletException {
+
+		String jobId = getParam(req, "jobId");
+		try {
+			String result = flowRunnerManager.readJobAttachment(execId, jobId);
+			respMap.put("stats", result);
+		}
+		catch (Exception e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
 	
 	private void handleFetchMetaDataEvent(int execId, HttpServletRequest req, HttpServletResponse resp, Map<String, Object> respMap) throws ServletException {
 		int startByte = getIntParam(req, "offset");
@@ -217,7 +239,6 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 			logger.error(e);
 			respMap.put("error", e.getMessage());
 		}
-
 	}
 	
 	@SuppressWarnings("unchecked")
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 7021c01..2159f4f 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -839,6 +839,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		return logFile;
 	}
+
+	public String getJobAttachment(String jobId) {
+		ExecutableNode node = flow.getExecutableNode(jobId);
+		return node.getAttachment();
+	}
 	
 	public File getJobMetaDataFile(String jobId, int attempt) {
 		ExecutableNode node = flow.getExecutableNode(jobId);
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index ecc0847..9a1c44f 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -137,7 +137,6 @@ public class FlowRunnerManager implements EventListener {
 		cleanerThread.start();
 		
 		jobtypeManager = new JobTypeManager(props.getString(AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR, JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), parentClassLoader);
-		
 	}
 
 	private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
@@ -574,6 +573,16 @@ public class FlowRunnerManager implements EventListener {
 		
 		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
 	}
+
+	public String readJobAttachment(int execId, String jobId)
+			throws ExecutorManagerException {
+		FlowRunner runner = runningFlows.get(execId);
+		if (runner == null) {
+			throw new ExecutorManagerException("Running flow " + execId + " not found.");
+		}
+
+		return runner.getJobAttachment(jobId);
+	}
 	
 	public JobMetaData readJobMetaData(int execId, String jobId, int attempt, int startByte, int length) throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(execId);
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index 9d807ff..187d969 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -61,6 +61,8 @@ public class ExecutableNode {
 	
 	private Props inputProps;
 	private Props outputProps;
+
+	private String attachment;
 	
 	public static final String ATTEMPT_PARAM = "attempt";
 	public static final String PASTATTEMPTS_PARAM = "pastAttempts";
@@ -204,6 +206,14 @@ public class ExecutableNode {
 	public Props getOutputProps() {
 		return outputProps;
 	}
+
+	public String getAttachment() {
+		return attachment;
+	}
+
+	public void setAttachment(String attachment) {
+		this.attachment = attachment;
+	}
 	
 	public long getDelayedExecution() {
 		return delayExecution;
@@ -428,4 +438,4 @@ public class ExecutableNode {
 			}
 		}
 	}
-}
\ No newline at end of file
+}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 63264d8..7b86331 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -313,14 +313,12 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 			return executorLoader.fetchStats(exFlow.getExecutionId(), jobId);
 		}
 
-		Pair<String, String> typeParam = new Pair<String, String>("type", "job");
 		Pair<String, String> jobIdParam = new Pair<String, String>("jobId", jobId);
-
+		
 		@SuppressWarnings("unchecked")
 		Map<String, Object> result = callExecutorServer(
 				pair.getFirst(),
 				ConnectorParams.STATS_ACTION,
-				typeParam,
 				jobIdParam);
 		return (String) result.get("stats");
 	}