azkaban-aplcache

Upload flow YAML files to DB. (#1546)

10/31/2017 10:01:46 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index 3a7aec8..a2e496e 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -19,8 +19,10 @@ package azkaban.project;
 
 import static java.util.Objects.requireNonNull;
 
+import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.flow.Flow;
+import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.project.validator.ValidationReport;
 import azkaban.project.validator.ValidationStatus;
@@ -102,18 +104,19 @@ class AzkabanProjectLoader {
       loader = this.flowLoaderFactory.createFlowLoader(file);
       reports.put(DIRECTORY_FLOW_REPORT_KEY, loader.loadProjectFlow(project, file));
 
+      // Check the validation report.
+      if (!isReportStatusValid(reports, project)) {
+        cleanUpProjectTempDir(file);
+        return reports;
+      }
+
+      // Upload the project to DB and storage.
+      persistProject(project, loader, archive, file, uploader);
+
     } finally {
       cleanUpProjectTempDir(file);
     }
 
-    // Check the validation report.
-    if (!isReportStatusValid(reports, project)) {
-      return reports;
-    }
-
-    // Upload the project to DB and storage.
-    persistProject(project, loader, archive, uploader);
-
     // Clean up project old installations after new project is uploaded successfully.
     cleanUpProjectOldInstallations(project);
 
@@ -182,21 +185,21 @@ class AzkabanProjectLoader {
   }
 
   private void persistProject(final Project project, final FlowLoader loader, final File archive,
-      final User uploader) throws ProjectManagerException {
+      final File projectDir, final User uploader) throws ProjectManagerException {
     synchronized (project) {
-      final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
+      final int newProjectVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
       final Map<String, Flow> flows = loader.getFlowMap();
       for (final Flow flow : flows.values()) {
         flow.setProjectId(project.getId());
-        flow.setVersion(newVersion);
+        flow.setVersion(newProjectVersion);
       }
 
-      this.storageManager.uploadProject(project, newVersion, archive, uploader);
+      this.storageManager.uploadProject(project, newProjectVersion, archive, uploader);
 
       log.info("Uploading flow to db for project " + archive.getName());
-      this.projectLoader.uploadFlows(project, newVersion, flows.values());
+      this.projectLoader.uploadFlows(project, newProjectVersion, flows.values());
       log.info("Changing project versions for project " + archive.getName());
-      this.projectLoader.changeProjectVersion(project, newVersion,
+      this.projectLoader.changeProjectVersion(project, newProjectVersion,
           uploader.getUserId());
       project.setFlows(flows);
 
@@ -209,8 +212,13 @@ class AzkabanProjectLoader {
         this.projectLoader.uploadProjectProperties(project, directoryFlowLoader.getPropsList());
 
       } else if (loader instanceof DirectoryYamlFlowLoader) {
-        // Todo jamiesjc: upload yaml file to DB as a blob
-
+        final File[] flowFiles = projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX));
+        for (final File file : flowFiles) {
+          final int newFlowVersion = this.projectLoader
+              .getLatestFlowVersion(project.getId(), newProjectVersion, file.getName()) + 1;
+          this.projectLoader
+              .uploadFlowFile(project.getId(), newProjectVersion, newFlowVersion, file);
+        }
       } else {
         throw new ProjectManagerException("Invalid type of flow loader.");
       }
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
index 12a0e8f..2cc55d5 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
@@ -323,6 +323,8 @@ class JdbcProjectHandlerSet {
   public static class IntHandler implements ResultSetHandler<Integer> {
 
     public static String SELECT_LATEST_VERSION = "SELECT MAX(version) FROM project_versions WHERE project_id=?";
+    public static String SELECT_LATEST_FLOW_VERSION = "SELECT MAX(flow_version) FROM "
+        + "project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?";
 
     @Override
     public Integer handle(final ResultSet rs) throws SQLException {
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index 6154769..a69ca0f 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -1043,4 +1043,19 @@ public class JdbcProjectImpl implements ProjectLoader {
     }
     return file;
   }
+
+  @Override
+  public int getLatestFlowVersion(final int projectId, final int projectVersion,
+      final String flowName) throws ProjectManagerException {
+    final IntHandler handler = new IntHandler();
+    try {
+      return this.dbOperator.query(IntHandler.SELECT_LATEST_FLOW_VERSION, handler, projectId,
+          projectVersion, flowName);
+    } catch (final SQLException e) {
+      logger.error(e);
+      throw new ProjectManagerException(
+          "Error selecting latest flow version from project " + projectId + ", version " +
+              projectVersion + ", flow " + flowName + ".", e);
+    }
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index c6ace1b..42062b6 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -213,4 +213,10 @@ public interface ProjectLoader {
   File getUploadedFlowFile(int projectId, int projectVersion, int flowVersion, String flowName)
       throws ProjectManagerException;
 
+  /**
+   * Gets the latest flow version.
+   */
+  int getLatestFlowVersion(int projectId, int projectVersion, String flowName)
+      throws ProjectManagerException;
+
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
index 61b020f..ad91a23 100644
--- a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
@@ -19,6 +19,9 @@ package azkaban.project;
 
 import static azkaban.Constants.ConfigurationKeys.PROJECT_TEMP_DIR;
 import static java.util.Objects.requireNonNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -26,12 +29,12 @@ import static org.mockito.Mockito.when;
 import azkaban.project.validator.ValidationReport;
 import azkaban.project.validator.ValidationStatus;
 import azkaban.storage.StorageManager;
+import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.user.User;
 import azkaban.utils.Props;
 import java.io.File;
 import java.net.URL;
 import java.util.Map;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -40,6 +43,9 @@ import org.junit.rules.TemporaryFolder;
 public class AzkabanProjectLoaderTest {
 
   private static final String DIRECTORY_FLOW_REPORT_KEY = "Directory Flow";
+  private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
+  private static final String BASIC_FLOW_FILE = "basic_flow.flow";
+  private static final String PROJECT_ZIP = "Archive.zip";
 
   @Rule
   public final TemporaryFolder TEMP_DIR = new TemporaryFolder();
@@ -73,14 +79,8 @@ public class AzkabanProjectLoaderTest {
     final File projectZipFile = new File(resource.getPath());
     final User uploader = new User("test_user");
 
-    final Map<String, ValidationReport> validationReportMap =
-        this.azkabanProjectLoader
-            .uploadProject(this.project, projectZipFile, "zip", uploader, null);
-
-    Assert.assertEquals(1, validationReportMap.size());
-    Assert.assertTrue(validationReportMap.containsKey(DIRECTORY_FLOW_REPORT_KEY));
-    Assert.assertEquals(ValidationStatus.PASS,
-        validationReportMap.get(DIRECTORY_FLOW_REPORT_KEY).getStatus());
+    checkValidationReport(this.azkabanProjectLoader
+        .uploadProject(this.project, projectZipFile, "zip", uploader, null));
 
     verify(this.storageManager)
         .uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
@@ -97,4 +97,30 @@ public class AzkabanProjectLoaderTest {
     verify(this.storageManager).getProjectFile(this.ID, this.VERSION);
   }
 
+  @Test
+  public void uploadProjectWithYamlFiles() throws Exception {
+    final File projectZipFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, PROJECT_ZIP);
+    final int flowVersion = 0;
+    final User uploader = new User("test_user");
+
+    when(this.projectLoader.getLatestProjectVersion(this.project)).thenReturn(this.VERSION);
+    when(this.projectLoader.getLatestFlowVersion(this.ID, this.VERSION, BASIC_FLOW_FILE))
+        .thenReturn(flowVersion);
+
+    checkValidationReport(this.azkabanProjectLoader
+        .uploadProject(this.project, projectZipFile, "zip", uploader, null));
+
+    verify(this.storageManager)
+        .uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
+    verify(this.projectLoader)
+        .uploadFlowFile(eq(this.ID), eq(this.VERSION + 1), eq(flowVersion + 1), any(File.class));
+
+  }
+
+  private void checkValidationReport(final Map<String, ValidationReport> validationReportMap) {
+    assertThat(validationReportMap.size()).isEqualTo(1);
+    assertThat(validationReportMap.containsKey(DIRECTORY_FLOW_REPORT_KEY)).isTrue();
+    assertThat(validationReportMap.get(DIRECTORY_FLOW_REPORT_KEY).getStatus()).isEqualTo
+        (ValidationStatus.PASS);
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index 325979a..c4d3e00 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -15,6 +15,7 @@
  */
 package azkaban.project;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import azkaban.db.DatabaseOperator;
@@ -360,7 +361,7 @@ public class JdbcProjectImplTest {
 
     final File file = this.loader
         .getUploadedFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, BASIC_FLOW_FILE);
-    Assert.assertEquals(BASIC_FLOW_FILE, file.getName());
+    assertThat(file.getName()).isEqualTo(BASIC_FLOW_FILE);
     FileUtils.contentEquals(testYamlFile, file);
   }
 
@@ -387,6 +388,19 @@ public class JdbcProjectImplTest {
             "Flow file length exceeds 10 MB limit.");
   }
 
+  @Test
+  public void testGetLatestFlowVersion() throws Exception {
+    final File testYamlFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, BASIC_FLOW_FILE);
+
+    assertThat(
+        this.loader.getLatestFlowVersion(PROJECT_ID, PROJECT_VERSION, testYamlFile.getName()))
+        .isEqualTo(0);
+    this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
+    assertThat(
+        this.loader.getLatestFlowVersion(PROJECT_ID, PROJECT_VERSION, testYamlFile.getName()))
+        .isEqualTo(FLOW_VERSION);
+  }
+
   @After
   public void clearDB() {
     try {
diff --git a/test/execution-test-data/basicflowyamltest/Archive.zip b/test/execution-test-data/basicflowyamltest/Archive.zip
new file mode 100644
index 0000000..7bc2077
Binary files /dev/null and b/test/execution-test-data/basicflowyamltest/Archive.zip differ