azkaban-developers

Details

diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 1e18b32..e6d1e4e 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -65,6 +65,7 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private Appender jobAppender;
 	private File logFile;
+	private String attachmentFileName;
 	
 	private Job job;
 	private int executionId = -1;
@@ -229,6 +230,13 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 	}
 
+	private void createAttachmentFile() {
+		String fileName = createAttachmentFileName(
+				this.executionId, this.jobId, node.getAttempt());
+		File file = new File(workingDir, fileName);
+		attachmentFileName = file.getAbsolutePath();
+	}
+
 	private void closeLogger() {
 		if (jobAppender != null) {
 			logger.removeAppender(jobAppender);
@@ -358,7 +366,6 @@ public class JobRunner extends EventHandler implements Runnable {
 		
 		try {
 			File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
-				
 				@Override
 				public boolean accept(File dir, String name) {
 					return name.startsWith(logFile.getName());
@@ -372,6 +379,28 @@ public class JobRunner extends EventHandler implements Runnable {
 			flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
 		}
 	}
+
+	private void finalizeAttachmentFile() {
+		if (attachmentFileName == null) {
+			flowLogger.info("Attachment file for job " + this.jobId + " is null");
+			return;
+		}
+
+		try {
+			File file = new File(attachmentFileName);
+			if (!file.exists()) {
+				flowLogger.info("Attachment file for job " + this.jobId + 
+						" does not exist.");
+				return;
+			}
+			loader.uploadAttachmentFile(
+					executionId, node.getNestedId(), node.getAttempt(), file);
+		}
+		catch (ExecutorManagerException e) {
+			flowLogger.error("Error writing out attachment for job " + 
+					this.node.getNestedId(), e);
+		}
+	}
 	
 	/**
 	 * The main run thread.
@@ -391,7 +420,8 @@ public class JobRunner extends EventHandler implements Runnable {
 		// Delay execution if necessary. Will return a true if something went wrong.
 		errorFound |= delayExecution();
 
-		// For pipelining of jobs. Will watch other jobs. Will return true if something went wrong.
+		// For pipelining of jobs. Will watch other jobs. Will return true if
+		// something went wrong.
 		errorFound |= blockOnPipeLine();
 
 		// Start the node.
@@ -400,7 +430,8 @@ public class JobRunner extends EventHandler implements Runnable {
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			try {
 				loader.uploadExecutableNode(node, props);
-			} catch (ExecutorManagerException e1) {
+			}
+			catch (ExecutorManagerException e1) {
 				logger.error("Error writing initial node properties");
 			}
 			
@@ -425,6 +456,7 @@ public class JobRunner extends EventHandler implements Runnable {
 		
 		fireEvent(Event.create(this, Type.JOB_FINISHED), false);
 		finalizeLogFile();
+		finalizeAttachmentFile();
 	}
 	
 	private boolean prepareJob() throws RuntimeException {
@@ -455,6 +487,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
 			props.put(CommonJobProperties.JOB_METADATA_FILE,
 					createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
+			props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, attachmentFileName);
 			changeStatus(Status.RUNNING);
 			
 			// Ability to specify working directory
diff --git a/src/java/azkaban/executor/ExecutorLoader.java b/src/java/azkaban/executor/ExecutorLoader.java
index c59def3..f9e2e49 100644
--- a/src/java/azkaban/executor/ExecutorLoader.java
+++ b/src/java/azkaban/executor/ExecutorLoader.java
@@ -50,6 +50,8 @@ public interface ExecutorLoader {
 	public List<Object> fetchAttachment(int execId, String name, int attempt) throws ExecutorManagerException;
 
 	public void uploadLogFile(int execId, String name, int attempt, File ... files) throws ExecutorManagerException;
+	
+	public void uploadAttachmentFile(int execId, String name, int attempt, File file) throws ExecutorManagerException;
 
 	public void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException;
 
diff --git a/src/java/azkaban/executor/JdbcExecutorLoader.java b/src/java/azkaban/executor/JdbcExecutorLoader.java
index 988d64e..702683e 100644
--- a/src/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/src/java/azkaban/executor/JdbcExecutorLoader.java
@@ -35,6 +35,7 @@ import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
@@ -861,6 +862,53 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader
 				DateTime.now().getMillis());
 	}
 	
+	@Override
+	public void uploadAttachmentFile(
+			int execId, String name, int attempt, File file)
+			throws ExecutorManagerException {
+		Connection connection = getConnection();
+		try {
+			uploadAttachmentFile(
+					connection, execId, name, attempt, file, defaultEncodingType);
+			connection.commit();
+		}
+		catch (SQLException e) {
+			throw new ExecutorManagerException("Error committing attachment ", e);
+		}
+		catch (IOException e) {
+			throw new ExecutorManagerException("Error uploading attachment ", e);
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+
+	private void uploadAttachmentFile(
+			Connection connection,
+			int execId,
+			String name,
+			int attempt,
+			File file,
+			EncodingType encType) throws SQLException, IOException {
+
+		String jsonString = FileUtils.readFileToString(file);
+		byte[] attachment = GZIPUtils.gzipString(jsonString, "UTF-8");
+
+		final String UPDATE_EXECUTION_NODE_ATTACHMENT = 
+				"UPDATE execution_jobs " +
+						"SET attachment=? " + 
+						"WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+
+		QueryRunner runner = new QueryRunner();
+		runner.update(
+				connection,
+				UPDATE_EXECUTION_NODE_ATTACHMENT,
+				attachment,
+				execId,
+				name,
+				attempt);
+	}
+	
 	private Connection getConnection() throws ExecutorManagerException {
 		Connection connection = null;
 		try {
diff --git a/src/java/azkaban/flow/CommonJobProperties.java b/src/java/azkaban/flow/CommonJobProperties.java
index 7324205..3873a56 100644
--- a/src/java/azkaban/flow/CommonJobProperties.java
+++ b/src/java/azkaban/flow/CommonJobProperties.java
@@ -75,6 +75,11 @@ public class CommonJobProperties {
 	 * The attempt number of the executing job.
 	 */
 	public static final String JOB_METADATA_FILE = "azkaban.job.metadata.file";
+
+	/**
+	 * The attempt number of the executing job.
+	 */
+	public static final String JOB_ATTACHMENT_FILE = "azkaban.job.attachment.file";
 	
 	/**
 	 * The executing flow id