Details
diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 102505b..75a1a08 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -99,8 +99,8 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
else if (action.equals(LOG_ACTION)) {
handleFetchLogEvent(execid, req, resp, respMap);
}
- else if (action.equals(ATTACHMENT_ACTION)) {
- handleFetchAttachmentEvent(execid, req, resp, respMap);
+ else if (action.equals(ATTACHMENTS_ACTION)) {
+ handleFetchAttachmentsEvent(execid, req, resp, respMap);
}
else if (action.equals(EXECUTE_ACTION)) {
handleAjaxExecute(req, respMap, execid);
@@ -207,7 +207,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
}
}
- private void handleFetchAttachmentEvent(
+ private void handleFetchAttachmentsEvent(
int execId,
HttpServletRequest req,
HttpServletResponse resp,
@@ -216,9 +216,9 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
String jobId = getParam(req, "jobId");
int attempt = getIntParam(req, "attempt", 0);
try {
- List<Object> result = flowRunnerManager.readJobAttachment(
+ List<Object> result = flowRunnerManager.readJobAttachments(
execId, jobId, attempt);
- respMap.put("attachment", result);
+ respMap.put("attachments", result);
}
catch (Exception e) {
logger.error(e);
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index be8a329..33ba8e2 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -576,7 +576,7 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
}
- public List<Object> readJobAttachment(int execId, String jobId, int attempt)
+ public List<Object> readJobAttachments(int execId, String jobId, int attempt)
throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(execId);
if (runner == null) {
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 03ac403..211507f 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -393,8 +393,7 @@ public class JobRunner extends EventHandler implements Runnable {
" written.");
return;
}
- loader.uploadAttachmentFile(
- executionId, node.getNestedId(), node.getAttempt(), file);
+ loader.uploadAttachmentFile(node, file);
}
catch (ExecutorManagerException e) {
flowLogger.error("Error writing out attachment for job " +
diff --git a/src/java/azkaban/executor/ConnectorParams.java b/src/java/azkaban/executor/ConnectorParams.java
index f228031..c84436e 100644
--- a/src/java/azkaban/executor/ConnectorParams.java
+++ b/src/java/azkaban/executor/ConnectorParams.java
@@ -30,7 +30,7 @@ public interface ConnectorParams {
public static final String RESUME_ACTION = "resume";
public static final String PING_ACTION = "ping";
public static final String LOG_ACTION = "log";
- public static final String ATTACHMENT_ACTION = "attachment";
+ public static final String ATTACHMENTS_ACTION = "attachments";
public static final String METADATA_ACTION = "metadata";
public static final String MODIFY_EXECUTION_ACTION = "modifyExecution";
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index f9e2e49..4763ee5 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -47,11 +47,11 @@ public interface ExecutorLoader {
public LogData fetchLogs(int execId, String name, int attempt, int startByte, int endByte) throws ExecutorManagerException;
- public List<Object> fetchAttachment(int execId, String name, int attempt) throws ExecutorManagerException;
+ public List<Object> fetchAttachments(int execId, String name, int attempt) throws ExecutorManagerException;
public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
- public void uploadAttachmentFile(int execId, String name, int attempt, File file) throws ExecutorManagerException;
+ public void uploadAttachmentFile(ExecutableNode node, File file) throws ExecutorManagerException;
public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index ddf2b23..8b71e29 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -311,7 +311,7 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
Pair<ExecutionReference, ExecutableFlow> pair =
runningFlows.get(exFlow.getExecutionId());
if (pair == null) {
- return executorLoader.fetchAttachment(
+ return executorLoader.fetchAttachments(
exFlow.getExecutionId(), jobId, attempt);
}
@@ -321,10 +321,10 @@ public class ExecutorManager extends EventHandler implements ExecutorManagerAdap
@SuppressWarnings("unchecked")
Map<String, Object> result = callExecutorServer(
pair.getFirst(),
- ConnectorParams.ATTACHMENT_ACTION,
+ ConnectorParams.ATTACHMENTS_ACTION,
jobIdParam,
attemptParam);
- return (List<Object>) result.get("attachment");
+ return (List<Object>) result.get("attachments");
}
@Override
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 702683e..fe7ce76 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -710,25 +710,25 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
}
@Override
- public List<Object> fetchAttachment(int execId, String jobId, int attempt)
+ public List<Object> fetchAttachments(int execId, String jobId, int attempt)
throws ExecutorManagerException {
QueryRunner runner = createQueryRunner();
try {
- String attachment = runner.query(
- FetchExecutableJobAttachmentHandler.FETCH_ATTACHMENT_EXECUTABLE_NODE,
- new FetchExecutableJobAttachmentHandler(),
+ String attachments = runner.query(
+ FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
+ new FetchExecutableJobAttachmentsHandler(),
execId,
jobId);
- return (List<Object>) JSONUtils.parseJSONFromString(attachment);
+ return (List<Object>) JSONUtils.parseJSONFromString(attachments);
}
catch (IOException e) {
throw new ExecutorManagerException(
- "Error converting job attachment to JSON " + jobId, e);
+ "Error converting job attachments to JSON " + jobId, e);
}
catch (SQLException e) {
throw new ExecutorManagerException(
- "Error query job attachment " + jobId, e);
+ "Error query job attachments " + jobId, e);
}
}
@@ -863,20 +863,18 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
}
@Override
- public void uploadAttachmentFile(
- int execId, String name, int attempt, File file)
+ public void uploadAttachmentFile(ExecutableNode node, File file)
throws ExecutorManagerException {
Connection connection = getConnection();
try {
- uploadAttachmentFile(
- connection, execId, name, attempt, file, defaultEncodingType);
+ uploadAttachmentFile(connection, node, file, defaultEncodingType);
connection.commit();
}
catch (SQLException e) {
- throw new ExecutorManagerException("Error committing attachment ", e);
+ throw new ExecutorManagerException("Error committing attachments ", e);
}
catch (IOException e) {
- throw new ExecutorManagerException("Error uploading attachment ", e);
+ throw new ExecutorManagerException("Error uploading attachments ", e);
}
finally {
DbUtils.closeQuietly(connection);
@@ -885,28 +883,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
private void uploadAttachmentFile(
Connection connection,
- int execId,
- String name,
- int attempt,
+ ExecutableNode node,
File file,
EncodingType encType) throws SQLException, IOException {
String jsonString = FileUtils.readFileToString(file);
- byte[] attachment = GZIPUtils.gzipString(jsonString, "UTF-8");
+ byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
- final String UPDATE_EXECUTION_NODE_ATTACHMENT =
+ final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
"UPDATE execution_jobs " +
- "SET attachment=? " +
+ "SET attachments=? " +
"WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
QueryRunner runner = new QueryRunner();
runner.update(
connection,
- UPDATE_EXECUTION_NODE_ATTACHMENT,
- attachment,
- execId,
- name,
- attempt);
+ UPDATE_EXECUTION_NODE_ATTACHMENTS,
+ attachments,
+ node.getExecutableFlow().getExecutionId(),
+ node.getParentFlow().getNestedId(),
+ node.getId(),
+ node.getAttempt());
}
private Connection getConnection() throws ExecutorManagerException {
@@ -1049,25 +1046,25 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
}
}
- private static class FetchExecutableJobAttachmentHandler
+ private static class FetchExecutableJobAttachmentsHandler
implements ResultSetHandler<String> {
- private static String FETCH_ATTACHMENT_EXECUTABLE_NODE =
- "SELECT attachment FROM execution_jobs WHERE exec_id=? AND job_id=?";
+ private static String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
+ "SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
@SuppressWarnings("unchecked")
@Override
public String handle(ResultSet rs) throws SQLException {
- String attachmentJson = "";
+ String attachmentsJson = "";
if (rs.next()) {
try {
- byte[] attachment = rs.getBytes(1);
- attachmentJson = GZIPUtils.unGzipString(attachment, "UTF-8");
+ byte[] attachments = rs.getBytes(1);
+ attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
}
catch (IOException e) {
- throw new SQLException("Error decoding job attachment", e);
+ throw new SQLException("Error decoding job attachments", e);
}
}
- return attachmentJson;
+ return attachmentsJson;
}
}