azkaban-aplcache

Flow 2.0 design - Create project_flow_files DB table. (#1544) *

10/25/2017 9:30:38 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
index df658a8..12a0e8f 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
@@ -333,4 +333,26 @@ class JdbcProjectHandlerSet {
       return rs.getInt(1);
     }
   }
+
+  public static class FlowFileResultHandler implements ResultSetHandler<List<byte[]>> {
+
+    public static String SELECT_FLOW_FILE =
+        "SELECT flow_file FROM project_flow_files WHERE "
+            + "project_id=? AND project_version=? AND flow_name=? AND flow_version=?";
+
+    @Override
+    public List<byte[]> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final List<byte[]> data = new ArrayList<>();
+      do {
+        final byte[] bytes = rs.getBytes(1);
+        data.add(bytes);
+      } while (rs.next());
+
+      return data;
+    }
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index 890b44c..6154769 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -24,11 +24,12 @@ import static azkaban.project.JdbcProjectHandlerSet.ProjectPropertiesResultsHand
 import static azkaban.project.JdbcProjectHandlerSet.ProjectResultHandler;
 import static azkaban.project.JdbcProjectHandlerSet.ProjectVersionResultHandler;
 
-import azkaban.db.EncodingType;
 import azkaban.db.DatabaseOperator;
 import azkaban.db.DatabaseTransOperator;
+import azkaban.db.EncodingType;
 import azkaban.db.SQLTransaction;
 import azkaban.flow.Flow;
+import azkaban.project.JdbcProjectHandlerSet.FlowFileResultHandler;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.user.Permission;
 import azkaban.user.User;
@@ -40,8 +41,6 @@ import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.Triple;
 import com.google.common.io.Files;
-import javax.inject.Inject;
-import javax.inject.Singleton;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -54,6 +53,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
@@ -69,6 +70,8 @@ public class JdbcProjectImpl implements ProjectLoader {
   private static final Logger logger = Logger.getLogger(JdbcProjectImpl.class);
 
   private static final int CHUCK_SIZE = 1024 * 1024 * 10;
+  // Flow yaml files are usually small, set size limitation to 10 MB should be sufficient for now.
+  private static final int MAX_FLOW_FILE_SIZE_IN_BYTES = 1024 * 1024 * 10;
   private final DatabaseOperator dbOperator;
   private final File tempDir;
   private final EncodingType defaultEncodingType = EncodingType.GZIP;
@@ -950,6 +953,7 @@ public class JdbcProjectImpl implements ProjectLoader {
     final String DELETE_PROPERTIES = "DELETE FROM project_properties WHERE project_id=? AND version<?";
     final String DELETE_PROJECT_FILES = "DELETE FROM project_files WHERE project_id=? AND version<?";
     final String UPDATE_PROJECT_VERSIONS = "UPDATE project_versions SET num_chunks=0 WHERE project_id=? AND version<?";
+    // Todo jamiesjc: delete flow files
 
     final SQLTransaction<Integer> cleanOlderProjectTransaction = transOperator -> {
       transOperator.update(DELETE_FLOW, projectId, version);
@@ -968,4 +972,75 @@ public class JdbcProjectImpl implements ProjectLoader {
       throw new ProjectManagerException("clean older project transaction failed", e);
     }
   }
+
+  @Override
+  public void uploadFlowFile(final int projectId, final int projectVersion, final int flowVersion,
+      final File flowFile) throws ProjectManagerException {
+    logger.info(String
+        .format(
+            "Uploading flow file %s, version %d for project %d, version %d, file length is [%d bytes]",
+            flowFile.getName(), flowVersion, projectId, projectVersion, flowFile.length()));
+
+    if (flowFile.length() > MAX_FLOW_FILE_SIZE_IN_BYTES) {
+      throw new ProjectManagerException("Flow file length exceeds 10 MB limit.");
+    }
+
+    final byte[] buffer = new byte[MAX_FLOW_FILE_SIZE_IN_BYTES];
+    final String INSERT_FLOW_FILES =
+        "INSERT INTO project_flow_files (project_id, project_version, flow_name, flow_version, "
+            + "modified_time, "
+            + "flow_file) values (?,?,?,?,?,?)";
+
+    try (final FileInputStream input = new FileInputStream(flowFile);
+        final BufferedInputStream bufferedStream = new BufferedInputStream(input)) {
+      final int size = bufferedStream.read(buffer);
+      logger.info("Read bytes for " + flowFile.getName() + ", size:" + size);
+      final byte[] buf = Arrays.copyOfRange(buffer, 0, size);
+      try {
+        this.dbOperator
+            .update(INSERT_FLOW_FILES, projectId, projectVersion, flowFile.getName(), flowVersion,
+                System.currentTimeMillis(), buf);
+      } catch (final SQLException e) {
+        throw new ProjectManagerException(
+            "Error uploading flow file " + flowFile.getName() + ", version " + flowVersion + ".",
+            e);
+      }
+    } catch (final IOException e) {
+      throw new ProjectManagerException(
+          String.format(
+              "Error reading flow file %s, version: %d, length: [%d bytes].",
+              flowFile.getName(), flowVersion, flowFile.length()));
+    }
+  }
+
+  @Override
+  public File getUploadedFlowFile(final int projectId, final int projectVersion,
+      final int flowVersion, final String flowName) throws ProjectManagerException {
+    final FlowFileResultHandler handler = new FlowFileResultHandler();
+
+    final List<byte[]> data;
+    // Todo jamiesjc: delete the flow file after used.
+    final File file = new File(this.tempDir, flowName);
+    try (final FileOutputStream output = new FileOutputStream(file);
+        final BufferedOutputStream bufferedStream = new BufferedOutputStream(output)) {
+      try {
+        data = this.dbOperator
+            .query(FlowFileResultHandler.SELECT_FLOW_FILE, handler,
+                projectId, projectVersion, flowName, flowVersion);
+      } catch (final SQLException e) {
+        logger.error(e);
+        throw new ProjectManagerException("Failed to query uploaded flow file " + flowName + ".",
+            e);
+      }
+
+      try {
+        bufferedStream.write(data.get(0));
+      } catch (final IOException e) {
+        throw new ProjectManagerException("Error writing flow file" + flowName, e);
+      }
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Error creating output stream for flow file" + flowName, e);
+    }
+    return file;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 59806c1..c6ace1b 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -200,4 +200,17 @@ public interface ProjectLoader {
       throws ProjectManagerException;
 
   void updateProjectSettings(Project project) throws ProjectManagerException;
+
+  /**
+   * Uploads flow file.
+   */
+  void uploadFlowFile(int projectId, int projectVersion, int flowVersion, File flowFile)
+      throws ProjectManagerException;
+
+  /**
+   * Gets flow file that's uploaded.
+   */
+  File getUploadedFlowFile(int projectId, int projectVersion, int flowVersion, String flowName)
+      throws ProjectManagerException;
+
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index 2478868..325979a 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -15,9 +15,12 @@
  */
 package azkaban.project;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 import azkaban.db.DatabaseOperator;
 import azkaban.flow.Flow;
 import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.utils.Md5Hasher;
@@ -30,6 +33,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -41,6 +45,13 @@ import org.junit.Test;
 public class JdbcProjectImplTest {
 
   private static final String SAMPLE_FILE = "sample_flow_01.zip";
+  private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
+  private static final String LARGE_FLOW_YAML_DIR = "largeflowyamltest";
+  private static final String BASIC_FLOW_FILE = "basic_flow.flow";
+  private static final String LARGE_FLOW_FILE = "large_file.flow";
+  private static final int PROJECT_ID = 123;
+  private static final int PROJECT_VERSION = 3;
+  private static final int FLOW_VERSION = 1;
   private static final Props props = new Props();
   private static DatabaseOperator dbOperator;
   private ProjectLoader loader;
@@ -342,16 +353,51 @@ public class JdbcProjectImplTest {
     Assert.assertEquals(fileHandler2.getNumChunks(), 0);
   }
 
+  @Test
+  public void testUploadFlowFile() throws Exception {
+    final File testYamlFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, BASIC_FLOW_FILE);
+    this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
+
+    final File file = this.loader
+        .getUploadedFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, BASIC_FLOW_FILE);
+    Assert.assertEquals(BASIC_FLOW_FILE, file.getName());
+    FileUtils.contentEquals(testYamlFile, file);
+  }
+
+  @Test
+  public void testDuplicateUploadFlowFileException() throws Exception {
+    final File testYamlFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, BASIC_FLOW_FILE);
+    this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
+
+    assertThatThrownBy(
+        () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile))
+        .isInstanceOf(ProjectManagerException.class)
+        .hasMessageContaining(
+            "Error uploading flow file " + BASIC_FLOW_FILE + ", version " + FLOW_VERSION + ".");
+  }
+
+  @Test
+  public void testUploadLargeFlowFileException() throws Exception {
+    final File testYamlFile = ExecutionsTestUtil.getFlowFile(LARGE_FLOW_YAML_DIR, LARGE_FLOW_FILE);
+
+    assertThatThrownBy(
+        () -> this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile))
+        .isInstanceOf(ProjectManagerException.class)
+        .hasMessageContaining(
+            "Flow file length exceeds 10 MB limit.");
+  }
+
   @After
   public void clearDB() {
     try {
-      dbOperator.update("DELETE FROM projects");
-      dbOperator.update("DELETE FROM project_versions");
-      dbOperator.update("DELETE FROM project_properties");
-      dbOperator.update("DELETE FROM project_permissions");
-      dbOperator.update("DELETE FROM project_flows");
-      dbOperator.update("DELETE FROM project_files");
-      dbOperator.update("DELETE FROM project_events");
+      dbOperator.update("TRUNCATE TABLE projects");
+      dbOperator.update("TRUNCATE TABLE project_versions");
+      dbOperator.update("TRUNCATE TABLE project_properties");
+      dbOperator.update("TRUNCATE TABLE project_permissions");
+      dbOperator.update("TRUNCATE TABLE project_flows");
+      dbOperator.update("TRUNCATE TABLE project_files");
+      dbOperator.update("TRUNCATE TABLE project_events");
+      dbOperator.update("TRUNCATE TABLE project_flow_files");
     } catch (final SQLException e) {
       e.printStackTrace();
     }
diff --git a/azkaban-db/src/main/sql/create.project_flow_files.sql b/azkaban-db/src/main/sql/create.project_flow_files.sql
new file mode 100644
index 0000000..40e45f2
--- /dev/null
+++ b/azkaban-db/src/main/sql/create.project_flow_files.sql
@@ -0,0 +1,9 @@
+CREATE TABLE project_flow_files (
+  project_id        INT          NOT NULL,
+  project_version   INT          NOT NULL,
+  flow_name         VARCHAR(128) NOT NULL,
+  flow_version      INT          NOT NULL,
+  modified_time     BIGINT       NOT NULL,
+  flow_file         LONGBLOB,
+  PRIMARY KEY (project_id, project_version, flow_name, flow_version)
+);
diff --git a/test/execution-test-data/largeflowyamltest/large_file.flow b/test/execution-test-data/largeflowyamltest/large_file.flow
new file mode 100644
index 0000000..95a797f
Binary files /dev/null and b/test/execution-test-data/largeflowyamltest/large_file.flow differ