Details
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index 3a7aec8..a2e496e 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -19,8 +19,10 @@ package azkaban.project;
import static java.util.Objects.requireNonNull;
+import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.flow.Flow;
+import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.project.validator.ValidationReport;
import azkaban.project.validator.ValidationStatus;
@@ -102,18 +104,19 @@ class AzkabanProjectLoader {
loader = this.flowLoaderFactory.createFlowLoader(file);
reports.put(DIRECTORY_FLOW_REPORT_KEY, loader.loadProjectFlow(project, file));
+ // Check the validation report.
+ if (!isReportStatusValid(reports, project)) {
+ cleanUpProjectTempDir(file);
+ return reports;
+ }
+
+ // Upload the project to DB and storage.
+ persistProject(project, loader, archive, file, uploader);
+
} finally {
cleanUpProjectTempDir(file);
}
- // Check the validation report.
- if (!isReportStatusValid(reports, project)) {
- return reports;
- }
-
- // Upload the project to DB and storage.
- persistProject(project, loader, archive, uploader);
-
// Clean up project old installations after new project is uploaded successfully.
cleanUpProjectOldInstallations(project);
@@ -182,21 +185,21 @@ class AzkabanProjectLoader {
}
private void persistProject(final Project project, final FlowLoader loader, final File archive,
- final User uploader) throws ProjectManagerException {
+ final File projectDir, final User uploader) throws ProjectManagerException {
synchronized (project) {
- final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
+ final int newProjectVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
final Map<String, Flow> flows = loader.getFlowMap();
for (final Flow flow : flows.values()) {
flow.setProjectId(project.getId());
- flow.setVersion(newVersion);
+ flow.setVersion(newProjectVersion);
}
- this.storageManager.uploadProject(project, newVersion, archive, uploader);
+ this.storageManager.uploadProject(project, newProjectVersion, archive, uploader);
log.info("Uploading flow to db for project " + archive.getName());
- this.projectLoader.uploadFlows(project, newVersion, flows.values());
+ this.projectLoader.uploadFlows(project, newProjectVersion, flows.values());
log.info("Changing project versions for project " + archive.getName());
- this.projectLoader.changeProjectVersion(project, newVersion,
+ this.projectLoader.changeProjectVersion(project, newProjectVersion,
uploader.getUserId());
project.setFlows(flows);
@@ -209,8 +212,13 @@ class AzkabanProjectLoader {
this.projectLoader.uploadProjectProperties(project, directoryFlowLoader.getPropsList());
} else if (loader instanceof DirectoryYamlFlowLoader) {
- // Todo jamiesjc: upload yaml file to DB as a blob
-
+ final File[] flowFiles = projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX));
+ for (final File file : flowFiles) {
+ final int newFlowVersion = this.projectLoader
+ .getLatestFlowVersion(project.getId(), newProjectVersion, file.getName()) + 1;
+ this.projectLoader
+ .uploadFlowFile(project.getId(), newProjectVersion, newFlowVersion, file);
+ }
} else {
throw new ProjectManagerException("Invalid type of flow loader.");
}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
index 12a0e8f..2cc55d5 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
@@ -323,6 +323,8 @@ class JdbcProjectHandlerSet {
public static class IntHandler implements ResultSetHandler<Integer> {
public static String SELECT_LATEST_VERSION = "SELECT MAX(version) FROM project_versions WHERE project_id=?";
+ public static String SELECT_LATEST_FLOW_VERSION = "SELECT MAX(flow_version) FROM "
+ + "project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?";
@Override
public Integer handle(final ResultSet rs) throws SQLException {
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index 6154769..a69ca0f 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -1043,4 +1043,19 @@ public class JdbcProjectImpl implements ProjectLoader {
}
return file;
}
+
+ @Override
+ public int getLatestFlowVersion(final int projectId, final int projectVersion,
+ final String flowName) throws ProjectManagerException {
+ final IntHandler handler = new IntHandler();
+ try {
+ return this.dbOperator.query(IntHandler.SELECT_LATEST_FLOW_VERSION, handler, projectId,
+ projectVersion, flowName);
+ } catch (final SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException(
+ "Error selecting latest flow version from project " + projectId + ", version " +
+ projectVersion + ", flow " + flowName + ".", e);
+ }
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index c6ace1b..42062b6 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -213,4 +213,10 @@ public interface ProjectLoader {
File getUploadedFlowFile(int projectId, int projectVersion, int flowVersion, String flowName)
throws ProjectManagerException;
+ /**
+ * Gets the latest flow version.
+ */
+ int getLatestFlowVersion(int projectId, int projectVersion, String flowName)
+ throws ProjectManagerException;
+
}
diff --git a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
index 61b020f..ad91a23 100644
--- a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
@@ -19,6 +19,9 @@ package azkaban.project;
import static azkaban.Constants.ConfigurationKeys.PROJECT_TEMP_DIR;
import static java.util.Objects.requireNonNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -26,12 +29,12 @@ import static org.mockito.Mockito.when;
import azkaban.project.validator.ValidationReport;
import azkaban.project.validator.ValidationStatus;
import azkaban.storage.StorageManager;
+import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.user.User;
import azkaban.utils.Props;
import java.io.File;
import java.net.URL;
import java.util.Map;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -40,6 +43,9 @@ import org.junit.rules.TemporaryFolder;
public class AzkabanProjectLoaderTest {
private static final String DIRECTORY_FLOW_REPORT_KEY = "Directory Flow";
+ private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
+ private static final String BASIC_FLOW_FILE = "basic_flow.flow";
+ private static final String PROJECT_ZIP = "Archive.zip";
@Rule
public final TemporaryFolder TEMP_DIR = new TemporaryFolder();
@@ -73,14 +79,8 @@ public class AzkabanProjectLoaderTest {
final File projectZipFile = new File(resource.getPath());
final User uploader = new User("test_user");
- final Map<String, ValidationReport> validationReportMap =
- this.azkabanProjectLoader
- .uploadProject(this.project, projectZipFile, "zip", uploader, null);
-
- Assert.assertEquals(1, validationReportMap.size());
- Assert.assertTrue(validationReportMap.containsKey(DIRECTORY_FLOW_REPORT_KEY));
- Assert.assertEquals(ValidationStatus.PASS,
- validationReportMap.get(DIRECTORY_FLOW_REPORT_KEY).getStatus());
+ checkValidationReport(this.azkabanProjectLoader
+ .uploadProject(this.project, projectZipFile, "zip", uploader, null));
verify(this.storageManager)
.uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
@@ -97,4 +97,30 @@ public class AzkabanProjectLoaderTest {
verify(this.storageManager).getProjectFile(this.ID, this.VERSION);
}
+ @Test
+ public void uploadProjectWithYamlFiles() throws Exception {
+ final File projectZipFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, PROJECT_ZIP);
+ final int flowVersion = 0;
+ final User uploader = new User("test_user");
+
+ when(this.projectLoader.getLatestProjectVersion(this.project)).thenReturn(this.VERSION);
+ when(this.projectLoader.getLatestFlowVersion(this.ID, this.VERSION, BASIC_FLOW_FILE))
+ .thenReturn(flowVersion);
+
+ checkValidationReport(this.azkabanProjectLoader
+ .uploadProject(this.project, projectZipFile, "zip", uploader, null));
+
+ verify(this.storageManager)
+ .uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
+ verify(this.projectLoader)
+ .uploadFlowFile(eq(this.ID), eq(this.VERSION + 1), eq(flowVersion + 1), any(File.class));
+
+ }
+
+ private void checkValidationReport(final Map<String, ValidationReport> validationReportMap) {
+ assertThat(validationReportMap.size()).isEqualTo(1);
+ assertThat(validationReportMap.containsKey(DIRECTORY_FLOW_REPORT_KEY)).isTrue();
+ assertThat(validationReportMap.get(DIRECTORY_FLOW_REPORT_KEY).getStatus()).isEqualTo
+ (ValidationStatus.PASS);
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index 325979a..c4d3e00 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -15,6 +15,7 @@
*/
package azkaban.project;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import azkaban.db.DatabaseOperator;
@@ -360,7 +361,7 @@ public class JdbcProjectImplTest {
final File file = this.loader
.getUploadedFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, BASIC_FLOW_FILE);
- Assert.assertEquals(BASIC_FLOW_FILE, file.getName());
+ assertThat(file.getName()).isEqualTo(BASIC_FLOW_FILE);
FileUtils.contentEquals(testYamlFile, file);
}
@@ -387,6 +388,19 @@ public class JdbcProjectImplTest {
"Flow file length exceeds 10 MB limit.");
}
+ @Test
+ public void testGetLatestFlowVersion() throws Exception {
+ final File testYamlFile = ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YAML_DIR, BASIC_FLOW_FILE);
+
+ assertThat(
+ this.loader.getLatestFlowVersion(PROJECT_ID, PROJECT_VERSION, testYamlFile.getName()))
+ .isEqualTo(0);
+ this.loader.uploadFlowFile(PROJECT_ID, PROJECT_VERSION, FLOW_VERSION, testYamlFile);
+ assertThat(
+ this.loader.getLatestFlowVersion(PROJECT_ID, PROJECT_VERSION, testYamlFile.getName()))
+ .isEqualTo(FLOW_VERSION);
+ }
+
@After
public void clearDB() {
try {
diff --git a/test/execution-test-data/basicflowyamltest/Archive.zip b/test/execution-test-data/basicflowyamltest/Archive.zip
new file mode 100644
index 0000000..7bc2077
Binary files /dev/null and b/test/execution-test-data/basicflowyamltest/Archive.zip differ