azkaban-developers
Changes
src/java/azkaban/execapp/JobRunner.java 39(+36 -3)
Details
src/java/azkaban/execapp/JobRunner.java 39(+36 -3)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 1e18b32..e6d1e4e 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -65,6 +65,7 @@ public class JobRunner extends EventHandler implements Runnable {
private Appender jobAppender;
private File logFile;
+ private String attachmentFileName;
private Job job;
private int executionId = -1;
@@ -229,6 +230,13 @@ public class JobRunner extends EventHandler implements Runnable {
}
}
+ private void createAttachmentFile() {
+ String fileName = createAttachmentFileName(
+ this.executionId, this.jobId, node.getAttempt());
+ File file = new File(workingDir, fileName);
+ attachmentFileName = file.getAbsolutePath();
+ }
+
private void closeLogger() {
if (jobAppender != null) {
logger.removeAppender(jobAppender);
@@ -358,7 +366,6 @@ public class JobRunner extends EventHandler implements Runnable {
try {
File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
-
@Override
public boolean accept(File dir, String name) {
return name.startsWith(logFile.getName());
@@ -372,6 +379,28 @@ public class JobRunner extends EventHandler implements Runnable {
flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
}
}
+
+ private void finalizeAttachmentFile() {
+ if (attachmentFileName == null) {
+ flowLogger.info("Attachment file for job " + this.jobId + " is null");
+ return;
+ }
+
+ try {
+ File file = new File(attachmentFileName);
+ if (!file.exists()) {
+ flowLogger.info("Attachment file for job " + this.jobId +
+ " does not exist.");
+ return;
+ }
+ loader.uploadAttachmentFile(
+ executionId, node.getNestedId(), node.getAttempt(), file);
+ }
+ catch (ExecutorManagerException e) {
+ flowLogger.error("Error writing out attachment for job " +
+ this.node.getNestedId(), e);
+ }
+ }
/**
* The main run thread.
@@ -391,7 +420,8 @@ public class JobRunner extends EventHandler implements Runnable {
// Delay execution if necessary. Will return a true if something went wrong.
errorFound |= delayExecution();
- // For pipelining of jobs. Will watch other jobs. Will return true if something went wrong.
+ // For pipelining of jobs. Will watch other jobs. Will return true if
+ // something went wrong.
errorFound |= blockOnPipeLine();
// Start the node.
@@ -400,7 +430,8 @@ public class JobRunner extends EventHandler implements Runnable {
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
try {
loader.uploadExecutableNode(node, props);
- } catch (ExecutorManagerException e1) {
+ }
+ catch (ExecutorManagerException e1) {
logger.error("Error writing initial node properties");
}
@@ -425,6 +456,7 @@ public class JobRunner extends EventHandler implements Runnable {
fireEvent(Event.create(this, Type.JOB_FINISHED), false);
finalizeLogFile();
+ finalizeAttachmentFile();
}
private boolean prepareJob() throws RuntimeException {
@@ -455,6 +487,7 @@ public class JobRunner extends EventHandler implements Runnable {
props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
props.put(CommonJobProperties.JOB_METADATA_FILE,
createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
+ props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, attachmentFileName);
changeStatus(Status.RUNNING);
// Ability to specify working directory
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index c59def3..f9e2e49 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -50,6 +50,8 @@ public interface ExecutorLoader {
public List<Object> fetchAttachment(int execId, String name, int attempt) throws ExecutorManagerException;
public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
+
+ public void uploadAttachmentFile(int execId, String name, int attempt, File file) throws ExecutorManagerException;
public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 988d64e..702683e 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -35,6 +35,7 @@ import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -861,6 +862,53 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
DateTime.now().getMillis());
}
+ @Override
+ public void uploadAttachmentFile(
+ int execId, String name, int attempt, File file)
+ throws ExecutorManagerException {
+ Connection connection = getConnection();
+ try {
+ uploadAttachmentFile(
+ connection, execId, name, attempt, file, defaultEncodingType);
+ connection.commit();
+ }
+ catch (SQLException e) {
+ throw new ExecutorManagerException("Error committing attachment ", e);
+ }
+ catch (IOException e) {
+ throw new ExecutorManagerException("Error uploading attachment ", e);
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ private void uploadAttachmentFile(
+ Connection connection,
+ int execId,
+ String name,
+ int attempt,
+ File file,
+ EncodingType encType) throws SQLException, IOException {
+
+ String jsonString = FileUtils.readFileToString(file);
+ byte[] attachment = GZIPUtils.gzipString(jsonString, "UTF-8");
+
+ final String UPDATE_EXECUTION_NODE_ATTACHMENT =
+ "UPDATE execution_jobs " +
+ "SET attachment=? " +
+ "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+
+ QueryRunner runner = new QueryRunner();
+ runner.update(
+ connection,
+ UPDATE_EXECUTION_NODE_ATTACHMENT,
+ attachment,
+ execId,
+ name,
+ attempt);
+ }
+
private Connection getConnection() throws ExecutorManagerException {
Connection connection = null;
try {
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 7324205..3873a56 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -75,6 +75,11 @@ public class CommonJobProperties {
* The attempt number of the executing job.
*/
public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
+
+ /**
+ * The attempt number of the executing job.
+ */
+ public static final String JOB_ATTACHMENT_FILE = "azkaban.job.attachment.file";
/**
* The executing flow id