Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 00f0135..e05090e 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -227,6 +227,9 @@ public class Constants {
// allowed max size of shared project dir in MB
public static final String PROJECT_DIR_MAX_SIZE_IN_MB = "azkaban.project_cache_max_size_in_mb";
+ // how many older versions of project files are kept in DB before deleting them
+ public static final String PROJECT_VERSION_RETENTION = "project.version.retention";
+
// number of rows to be displayed on the executions page.
public static final String DISPLAY_EXECUTION_PAGE_SIZE = "azkaban.display.execution_page_size";
}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index 78588c5..1ccf1eb 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -75,7 +75,7 @@ class AzkabanProjectLoader {
} else {
log.info("Using temp dir: " + this.tempDir.getAbsolutePath());
}
- this.projectVersionRetention = props.getInt("project.version.retention", 3);
+ this.projectVersionRetention = props.getInt(ConfigurationKeys.PROJECT_VERSION_RETENTION, 3);
log.info("Project version retention is set to " + this.projectVersionRetention);
}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index eaa607a..f229b51 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -24,6 +24,7 @@ import static azkaban.project.JdbcProjectHandlerSet.ProjectPropertiesResultsHand
import static azkaban.project.JdbcProjectHandlerSet.ProjectResultHandler;
import static azkaban.project.JdbcProjectHandlerSet.ProjectVersionResultHandler;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.db.DatabaseOperator;
import azkaban.db.DatabaseTransOperator;
import azkaban.db.EncodingType;
@@ -469,6 +470,13 @@ public class JdbcProjectImpl implements ProjectLoader {
return null;
}
final int numChunks = projHandler.getNumChunks();
+ if (numChunks <= 0) {
+ throw new ProjectManagerException(String.format("Got numChunks=%s for version %s of project "
+ + "%s - seems like this version has been cleaned up already, because enough newer "
+ + "versions have been uploaded. To increase the retention of project versions, set "
+ + "%s", numChunks, version, projectId,
+ ConfigurationKeys.PROJECT_VERSION_RETENTION));
+ }
BufferedOutputStream bStream = null;
File file;
try {
@@ -514,7 +522,7 @@ public class JdbcProjectImpl implements ProjectLoader {
}
// Check md5.
- byte[] md5 = null;
+ byte[] md5;
try {
md5 = Md5Hasher.md5Hash(file);
} catch (final IOException e) {
@@ -524,7 +532,11 @@ public class JdbcProjectImpl implements ProjectLoader {
if (Arrays.equals(projHandler.getMd5Hash(), md5)) {
logger.info("Md5 Hash is valid");
} else {
- throw new ProjectManagerException("Md5 Hash failed on retrieval of file");
+ throw new ProjectManagerException(
+ String.format("Md5 Hash failed on project %s version %s retrieval of file %s. "
+ + "Expected hash: %s , got hash: %s",
+ projHandler.getProjectId(), projHandler.getVersion(), file.getAbsolutePath(),
+ Arrays.toString(projHandler.getMd5Hash()), Arrays.toString(md5)));
}
projHandler.setLocalFile(file);
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index 06be4bb..7f73e57 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -17,6 +17,7 @@ package azkaban.project;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowable;
import azkaban.db.DatabaseOperator;
import azkaban.flow.Flow;
@@ -353,7 +354,7 @@ public class JdbcProjectImplTest {
}
@Test
- public void cleanOlderProjectVersion() throws Exception {
+ public void cleanOlderProjectVersion() {
createThreeProjects();
final Project project = this.loader.fetchProjectByName("mytestProject");
final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
@@ -362,12 +363,25 @@ public class JdbcProjectImplTest {
final ProjectFileHandler fileHandler = this.loader.getUploadedFile(project.getId(), newVersion);
Assert.assertEquals(fileHandler.getNumChunks(), 1);
+ assertNumChunks(project, newVersion, 1);
this.loader.cleanOlderProjectVersion(project.getId(), newVersion + 1);
- final ProjectFileHandler fileHandler2 = this.loader
- .fetchProjectMetaData(project.getId(), newVersion);
- Assert.assertEquals(fileHandler2.getNumChunks(), 0);
+ assertNumChunks(project, newVersion, 0);
+ assertGetUploadedFileOfCleanedVersion(project.getId(), newVersion);
+ }
+
+ private void assertNumChunks(final Project project, final int version, final int expectedChunks) {
+ final ProjectFileHandler fileHandler = this.loader
+ .fetchProjectMetaData(project.getId(), version);
+ Assert.assertEquals(expectedChunks, fileHandler.getNumChunks());
+ }
+
+ private void assertGetUploadedFileOfCleanedVersion(final int project, final int version) {
+ final Throwable thrown = catchThrowable(() -> this.loader.getUploadedFile(project, version));
+ assertThat(thrown).isInstanceOf(ProjectManagerException.class);
+ assertThat(thrown).hasMessageStartingWith(String.format("Got numChunks=0 for version %s of "
+ + "project %s - seems like this version has been cleaned up", version, project));
}
@Test