Details
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
index df658a8..12a0e8f 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
@@ -333,4 +333,26 @@ class JdbcProjectHandlerSet {
return rs.getInt(1);
}
}
+
+ public static class FlowFileResultHandler implements ResultSetHandler<List<byte[]>> {
+
+ public static String SELECT_FLOW_FILE =
+ "SELECT flow_file FROM project_flow_files WHERE "
+ + "project_id=? AND project_version=? AND flow_name=? AND flow_version=?";
+
+ @Override
+ public List<byte[]> handle(final ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.emptyList();
+ }
+
+ final List<byte[]> data = new ArrayList<>();
+ do {
+ final byte[] bytes = rs.getBytes(1);
+ data.add(bytes);
+ } while (rs.next());
+
+ return data;
+ }
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index 890b44c..6154769 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -24,11 +24,12 @@ import static azkaban.project.JdbcProjectHandlerSet.ProjectPropertiesResultsHand
import static azkaban.project.JdbcProjectHandlerSet.ProjectResultHandler;
import static azkaban.project.JdbcProjectHandlerSet.ProjectVersionResultHandler;
-import azkaban.db.EncodingType;
import azkaban.db.DatabaseOperator;
import azkaban.db.DatabaseTransOperator;
+import azkaban.db.EncodingType;
import azkaban.db.SQLTransaction;
import azkaban.flow.Flow;
+import azkaban.project.JdbcProjectHandlerSet.FlowFileResultHandler;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.user.Permission;
import azkaban.user.User;
@@ -40,8 +41,6 @@ import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Triple;
import com.google.common.io.Files;
-import javax.inject.Inject;
-import javax.inject.Singleton;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -54,6 +53,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
@@ -69,6 +70,8 @@ public class JdbcProjectImpl implements ProjectLoader {
private static final Logger logger = Logger.getLogger(JdbcProjectImpl.class);
private static final int CHUCK_SIZE = 1024 * 1024 * 10;
+ // Flow yaml files are usually small, set size limitation to 10 MB should be sufficient for now.
+ private static final int MAX_FLOW_FILE_SIZE_IN_BYTES = 1024 * 1024 * 10;
private final DatabaseOperator dbOperator;
private final File tempDir;
private final EncodingType defaultEncodingType = EncodingType.GZIP;
@@ -950,6 +953,7 @@ public class JdbcProjectImpl implements ProjectLoader {
final String DELETE_PROPERTIES = "DELETE FROM project_properties WHERE project_id=? AND version<?";
final String DELETE_PROJECT_FILES = "DELETE FROM project_files WHERE project_id=? AND version<?";
final String UPDATE_PROJECT_VERSIONS = "UPDATE project_versions SET num_chunks=0 WHERE project_id=? AND version<?";
+ // Todo jamiesjc: delete flow files
final SQLTransaction<Integer> cleanOlderProjectTransaction = transOperator -> {
transOperator.update(DELETE_FLOW, projectId, version);
@@ -968,4 +972,75 @@ public class JdbcProjectImpl implements ProjectLoader {
throw new ProjectManagerException("clean older project transaction failed", e);
}
}
+
+ @Override
+ public void uploadFlowFile(final int projectId, final int projectVersion, final int flowVersion,
+ final File flowFile) throws ProjectManagerException {
+ logger.info(String
+ .format(
+ "Uploading flow file %s, version %d for project %d, version %d, file length is [%d bytes]",
+ flowFile.getName(), flowVersion, projectId, projectVersion, flowFile.length()));
+
+ if (flowFile.length() > MAX_FLOW_FILE_SIZE_IN_BYTES) {
+ throw new ProjectManagerException("Flow file length exceeds 10 MB limit.");
+ }
+
+ final byte[] buffer = new byte[MAX_FLOW_FILE_SIZE_IN_BYTES];
+ final String INSERT_FLOW_FILES =
+ "INSERT INTO project_flow_files (project_id, project_version, flow_name, flow_version, "
+ + "modified_time, "
+ + "flow_file) values (?,?,?,?,?,?)";
+
+ try (final FileInputStream input = new FileInputStream(flowFile);
+ final BufferedInputStream bufferedStream = new BufferedInputStream(input)) {
+ final int size = bufferedStream.read(buffer);
+ logger.info("Read bytes for " + flowFile.getName() + ", size:" + size);
+ final byte[] buf = Arrays.copyOfRange(buffer, 0, size);
+ try {
+ this.dbOperator
+ .update(INSERT_FLOW_FILES, projectId, projectVersion, flowFile.getName(), flowVersion,
+ System.currentTimeMillis(), buf);
+ } catch (final SQLException e) {
+ throw new ProjectManagerException(
+ "Error uploading flow file " + flowFile.getName() + ", version " + flowVersion + ".",
+ e);
+ }
+ } catch (final IOException e) {
+ throw new ProjectManagerException(
+ String.format(
+ "Error reading flow file %s, version: %d, length: [%d bytes].",
+ flowFile.getName(), flowVersion, flowFile.length()));
+ }
+ }
+
+ @Override
+ public File getUploadedFlowFile(final int projectId, final int projectVersion,
+ final int flowVersion, final String flowName) throws ProjectManagerException {
+ final FlowFileResultHandler handler = new FlowFileResultHandler();
+
+ final List<byte[]> data;
+ // Todo jamiesjc: delete the flow file after used.
+ final File file = new File(this.tempDir, flowName);
+ try (final FileOutputStream output = new FileOutputStream(file);
+ final BufferedOutputStream bufferedStream = new BufferedOutputStream(output)) {
+ try {
+ data = this.dbOperator
+ .query(FlowFileResultHandler.SELECT_FLOW_FILE, handler,
+ projectId, projectVersion, flowName, flowVersion);
+ } catch (final SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException("Failed to query uploaded flow file " + flowName + ".",
+ e);
+ }
+
+ try {
+ bufferedStream.write(data.get(0));
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error writing flow file" + flowName, e);
+ }
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error creating output stream for flow file" + flowName, e);
+ }
+ return file;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 59806c1..c6ace1b 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -200,4 +200,17 @@ public interface ProjectLoader {
throws ProjectManagerException;
void updateProjectSettings(Project project) throws ProjectManagerException;
+
+ /**
+ * Uploads flow file.
+ */
+ void uploadFlowFile(int projectId, int projectVersion, int flowVersion, File flowFile)
+ throws ProjectManagerException;
+
+ /**
+ * Gets flow file that's uploaded.
+ */
+ File getUploadedFlowFile(int projectId, int projectVersion, int flowVersion, String flowName)
+ throws ProjectManagerException;
+
}
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index 2478868..325979a 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -15,9 +15,12 @@
*/
package azkaban.project;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
import azkaban.db.DatabaseOperator;
import azkaban.flow.Flow;
import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.utils.Md5Hasher;
@@ -30,6 +33,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -41,6 +45,13 @@ import org.junit.Test;
public class JdbcProjectImplTest {
private static final String SAMPLE_FILE = "sample_flow_01.zip";
+ private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
+ private static final String LARGE_FLOW_YAML_DIR = "largeflowyamltest";
+ private static final String BASIC_FLOW_FILE = "basic_flow.flow";
+ private static final String LARGE_FLOW_FILE = "large_file.flow";
+ private static final int PROJECT_ID = 123;
+ private static final int PROJECT_VERSION = 3;
+ private static final int FLOW_VERSION = 1;
private static final Props props = new Props();
private static DatabaseOperator dbOperator;
private ProjectLoader loader;
@@ -342,16 +353,51 @@ public class JdbcProjectImplTest {
Assert.assertEquals(fileHandler2.getNumChunks(), 0);
}
+ @Test
+ public void testUploadFlowFile() throws Exception {
+ final File testYamlFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, BASIC_FLOW_FILE);
+ this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
+
+ final File file = this.loader
+ .getUploadedFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, BASIC_FLOW_FILE);
+ Assert.assertEquals(BASIC_FLOW_FILE, file.getName());
+ FileUtils.contentEquals(testYamlFile, file);
+ }
+
+ @Test
+ public void testDuplicateUploadFlowFileException() throws Exception {
+ final File testYamlFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, BASIC_FLOW_FILE);
+ this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
+
+ assertThatThrownBy(
+ () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile))
+ .isInstanceOf(ProjectManagerException.class)
+ .hasMessageContaining(
+ "Error uploading flow file " + BASIC_FLOW_FILE + ", version " + FLOW_VERSION + ".");
+ }
+
+ @Test
+ public void testUploadLargeFlowFileException() throws Exception {
+ final File testYamlFile = ExecutionsTestUtil.getFlowFile(LARGE_FLOW_YAML_DIR, LARGE_FLOW_FILE);
+
+ assertThatThrownBy(
+ () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile))
+ .isInstanceOf(ProjectManagerException.class)
+ .hasMessageContaining(
+ "Flow file length exceeds 10 MB limit.");
+ }
+
@After
public void clearDB() {
try {
- dbOperator.update("DELETE FROM projects");
- dbOperator.update("DELETE FROM project_versions");
- dbOperator.update("DELETE FROM project_properties");
- dbOperator.update("DELETE FROM project_permissions");
- dbOperator.update("DELETE FROM project_flows");
- dbOperator.update("DELETE FROM project_files");
- dbOperator.update("DELETE FROM project_events");
+ dbOperator.update("TRUNCATE TABLE projects");
+ dbOperator.update("TRUNCATE TABLE project_versions");
+ dbOperator.update("TRUNCATE TABLE project_properties");
+ dbOperator.update("TRUNCATE TABLE project_permissions");
+ dbOperator.update("TRUNCATE TABLE project_flows");
+ dbOperator.update("TRUNCATE TABLE project_files");
+ dbOperator.update("TRUNCATE TABLE project_events");
+ dbOperator.update("TRUNCATE TABLE project_flow_files");
} catch (final SQLException e) {
e.printStackTrace();
}
diff --git a/azkaban-db/src/main/sql/create.project_flow_files.sql b/azkaban-db/src/main/sql/create.project_flow_files.sql
new file mode 100644
index 0000000..40e45f2
--- /dev/null
+++ b/azkaban-db/src/main/sql/create.project_flow_files.sql
@@ -0,0 +1,9 @@
+CREATE TABLE project_flow_files (
+ project_id INT NOT NULL,
+ project_version INT NOT NULL,
+ flow_name VARCHAR(128) NOT NULL,
+ flow_version INT NOT NULL,
+ modified_time BIGINT NOT NULL,
+ flow_file LONGBLOB,
+ PRIMARY KEY (project_id, project_version, flow_name, flow_version)
+);
diff --git a/test/execution-test-data/largeflowyamltest/large_file.flow b/test/execution-test-data/largeflowyamltest/large_file.flow
new file mode 100644
index 0000000..95a797f
Binary files /dev/null and b/test/execution-test-data/largeflowyamltest/large_file.flow differ