azkaban-aplcache

Rename all references to jot attachments from 'attachment'

1/16/2014 10:21:18 PM

Details

diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 102505b..75a1a08 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -99,8 +99,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 					else if (action.equals(LOG_ACTION)) { 
 						handleFetchLogEvent(execid, req, resp, respMap);
 					}
-					else if (action.equals(ATTACHMENT_ACTION)) { 
-						handleFetchAttachmentEvent(execid, req, resp, respMap);
+					else if (action.equals(ATTACHMENTS_ACTION)) { 
+						handleFetchAttachmentsEvent(execid, req, resp, respMap);
 					}
 					else if (action.equals(EXECUTE_ACTION)) {
 						handleAjaxExecute(req, respMap, execid);
@@ -207,7 +207,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		}
 	}
 
-	private void handleFetchAttachmentEvent(
+	private void handleFetchAttachmentsEvent(
 			int execId, 
 			HttpServletRequest req, 
 			HttpServletResponse resp, 
@@ -216,9 +216,9 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 		String jobId = getParam(req, "jobId");
 		int attempt = getIntParam(req, "attempt", 0);
 		try {
-			List<Object> result = flowRunnerManager.readJobAttachment(
+			List<Object> result = flowRunnerManager.readJobAttachments(
 					execId, jobId, attempt);
-			respMap.put("attachment", result);
+			respMap.put("attachments", result);
 		}
 		catch (Exception e) {
 			logger.error(e);
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index be8a329..33ba8e2 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -576,7 +576,7 @@ public class FlowRunnerManager implements EventListener {
 		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
 	}
 
-	public List<Object> readJobAttachment(int execId, String jobId, int attempt)
+	public List<Object> readJobAttachments(int execId, String jobId, int attempt)
 			throws ExecutorManagerException {
 		FlowRunner runner = runningFlows.get(execId);
 		if (runner == null) {
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 03ac403..211507f 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -393,8 +393,7 @@ public class JobRunner extends EventHandler implements Runnable {
 						" written.");
 				return;
 			}
-			loader.uploadAttachmentFile(
-					executionId, node.getNestedId(), node.getAttempt(), file);
+			loader.uploadAttachmentFile(node, file);
 		}
 		catch (ExecutorManagerException e) {
 			flowLogger.error("Error writing out attachment for job " + 
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index f228031..c84436e 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,7 +30,7 @@ public interface ConnectorParams {
 	public static final String RESUME_ACTION = "resume";
 	public static final String PING_ACTION = "ping";
 	public static final String LOG_ACTION = "log";
-	public static final String ATTACHMENT_ACTION = "attachment";
+	public static final String ATTACHMENTS_ACTION = "attachments";
 	public static final String METADATA_ACTION = "metadata";
 	
 	public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index f9e2e49..4763ee5 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -47,11 +47,11 @@ public interface ExecutorLoader {
 
 	public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
 
-	public List<Object> fetchAttachment(int execId, String name, int attempt) throws ExecutorManagerException;
+	public List<Object> fetchAttachments(int execId, String name, int attempt) throws ExecutorManagerException;
 
 	public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
 	
-	public void uploadAttachmentFile(int execId, String name, int attempt, File file) throws ExecutorManagerException;
+	public void uploadAttachmentFile(ExecutableNode node, File file) throws ExecutorManagerException;
 
 	public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
 
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index ddf2b23..8b71e29 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -311,7 +311,7 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 		Pair<ExecutionReference, ExecutableFlow> pair = 
 				runningFlows.get(exFlow.getExecutionId());
 		if (pair == null) {
-			return executorLoader.fetchAttachment(
+			return executorLoader.fetchAttachments(
 					exFlow.getExecutionId(), jobId, attempt);
 		}
 
@@ -321,10 +321,10 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
 		@SuppressWarnings("unchecked")
 		Map<String, Object> result = callExecutorServer(
 				pair.getFirst(),
-				ConnectorParams.ATTACHMENT_ACTION,
+				ConnectorParams.ATTACHMENTS_ACTION,
 				jobIdParam,
 				attemptParam);
-		return (List<Object>) result.get("attachment");
+		return (List<Object>) result.get("attachments");
 	}
 	
 	@Override
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 702683e..fe7ce76 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -710,25 +710,25 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 	}
 
 	@Override
-	public List<Object> fetchAttachment(int execId, String jobId, int attempt)
+	public List<Object> fetchAttachments(int execId, String jobId, int attempt)
 			throws ExecutorManagerException {
 		QueryRunner runner = createQueryRunner();
 
 		try {
-			String attachment = runner.query(
-					FetchExecutableJobAttachmentHandler.FETCH_ATTACHMENT_EXECUTABLE_NODE,
-					new FetchExecutableJobAttachmentHandler(),
+			String attachments = runner.query(
+					FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
+					new FetchExecutableJobAttachmentsHandler(),
 					execId,
 					jobId);
-			return (List<Object>) JSONUtils.parseJSONFromString(attachment);
+			return (List<Object>) JSONUtils.parseJSONFromString(attachments);
 		}
 		catch (IOException e) {
 			throw new ExecutorManagerException(
-					"Error converting job attachment to JSON " + jobId, e);
+					"Error converting job attachments to JSON " + jobId, e);
 		}
 		catch (SQLException e) {
 			throw new ExecutorManagerException(
-					"Error query job attachment " + jobId, e);
+					"Error query job attachments " + jobId, e);
 		}
 	}
 	
@@ -863,20 +863,18 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 	}
 	
 	@Override
-	public void uploadAttachmentFile(
-			int execId, String name, int attempt, File file)
+	public void uploadAttachmentFile(ExecutableNode node, File file)
 			throws ExecutorManagerException {
 		Connection connection = getConnection();
 		try {
-			uploadAttachmentFile(
-					connection, execId, name, attempt, file, defaultEncodingType);
+			uploadAttachmentFile(connection, node, file, defaultEncodingType);
 			connection.commit();
 		}
 		catch (SQLException e) {
-			throw new ExecutorManagerException("Error committing attachment ", e);
+			throw new ExecutorManagerException("Error committing attachments ", e);
 		}
 		catch (IOException e) {
-			throw new ExecutorManagerException("Error uploading attachment ", e);
+			throw new ExecutorManagerException("Error uploading attachments ", e);
 		}
 		finally {
 			DbUtils.closeQuietly(connection);
@@ -885,28 +883,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 
 	private void uploadAttachmentFile(
 			Connection connection,
-			int execId,
-			String name,
-			int attempt,
+			ExecutableNode node,
 			File file,
 			EncodingType encType) throws SQLException, IOException {
 
 		String jsonString = FileUtils.readFileToString(file);
-		byte[] attachment = GZIPUtils.gzipString(jsonString, "UTF-8");
+		byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
 
-		final String UPDATE_EXECUTION_NODE_ATTACHMENT = 
+		final String UPDATE_EXECUTION_NODE_ATTACHMENTS = 
 				"UPDATE execution_jobs " +
-						"SET attachment=? " + 
+						"SET attachments=? " + 
 						"WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
 
 		QueryRunner runner = new QueryRunner();
 		runner.update(
 				connection,
-				UPDATE_EXECUTION_NODE_ATTACHMENT,
-				attachment,
-				execId,
-				name,
-				attempt);
+				UPDATE_EXECUTION_NODE_ATTACHMENTS,
+				attachments,
+				node.getExecutableFlow().getExecutionId(),
+				node.getParentFlow().getNestedId(),
+				node.getId(),
+				node.getAttempt());
 	}
 	
 	private Connection getConnection() throws ExecutorManagerException {
@@ -1049,25 +1046,25 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 		}
 	}
 
-	private static class FetchExecutableJobAttachmentHandler
+	private static class FetchExecutableJobAttachmentsHandler
 			implements ResultSetHandler<String> {
-		private static String FETCH_ATTACHMENT_EXECUTABLE_NODE = 
-				"SELECT attachment FROM execution_jobs WHERE exec_id=? AND job_id=?";
+		private static String FETCH_ATTACHMENTS_EXECUTABLE_NODE = 
+				"SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
 
 		@SuppressWarnings("unchecked")
 		@Override
 		public String handle(ResultSet rs) throws SQLException {
-			String attachmentJson = "";
+			String attachmentsJson = "";
 			if (rs.next()) {
 				try {
-					byte[] attachment = rs.getBytes(1);
-					attachmentJson = GZIPUtils.unGzipString(attachment, "UTF-8");
+					byte[] attachments = rs.getBytes(1);
+					attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
 				}
 				catch (IOException e) {
-					throw new SQLException("Error decoding job attachment", e);
+					throw new SQLException("Error decoding job attachments", e);
 				}
 			}
-			return attachmentJson;
+			return attachmentsJson;
 		}
 	}