diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index e79278e..4f68c10 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -89,10 +89,14 @@ public class FlowPreparer {
this.storageManager = storageManager;
this.executionsDir = executionsDir;
this.projectsDir = projectsDir;
- this.projectDirCleaner = new ProjectCacheDirCleaner(projectDirMaxSizeInMb);
+ this.projectDirCleaner =
+ projectDirMaxSizeInMb != null ? new ProjectCacheDirCleaner(projectDirMaxSizeInMb) : null;
this.cacheMetrics = new ProjectsDirCacheMetrics();
}
+ private boolean isProjectCacheSizeLimitEnabled() {
+ return this.projectDirCleaner != null;
+ }
public double getProjectDirCacheHitRatio() {
return this.cacheMetrics.getHitRatio();
@@ -142,9 +146,11 @@ public class FlowPreparer {
/**
* check if number of files inside the project dir equals to target
*/
- private boolean isFileCountEqual(final ProjectVersion pv, final int target) {
+ @VisibleForTesting
+ boolean isFileCountEqual(final ProjectVersion pv, final int target) {
final int fileCount;
try {
+ updateFileCount(pv.getInstalledDir(), pv);
final Path path = Paths.get(pv.getInstalledDir().getPath(), PROJECT_DIR_COUNT_FILE_NAME);
fileCount = (int) FileIOUtils.readNumberFromFile(path);
return fileCount == target;
@@ -176,7 +182,7 @@ public class FlowPreparer {
final int linkCount = FileIOUtils
.createDeepHardlink(projectVersion.getInstalledDir(), execDir);
- if (!isFileCountEqual(projectVersion, linkCount)) {
+ if (isProjectCacheSizeLimitEnabled() && !isFileCountEqual(projectVersion, linkCount)) {
throw new Exception(String.format("File count check failed for execid: %d, project dir %s"
+ " are being deleted when setting this execution up",
flow.getExecutionId(), projectVersion.getInstalledDir()));
@@ -262,7 +268,9 @@ public class FlowPreparer {
updateDirSize(tempDir, pv);
updateFileCount(tempDir, pv);
log.info(String.format("Downloading zip file for Project Version {%s} completes", pv));
- this.projectDirCleaner.deleteProjectDirsIfNecessary(pv.getDirSizeInBytes());
+ if (this.projectDirCleaner != null) {
+ this.projectDirCleaner.deleteProjectDirsIfNecessary(pv.getDirSizeInBytes());
+ }
Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
log.warn(String.format("Project preparation completes. [%s]", pv));
} finally {
@@ -288,7 +296,7 @@ public class FlowPreparer {
private class ProjectCacheDirCleaner {
- private final Long projectDirMaxSizeInMb;
+ private final long projectDirMaxSizeInMb;
/*
* Delete the project dir associated with {@code version}.
@@ -300,7 +308,7 @@ public class FlowPreparer {
}
}
- ProjectCacheDirCleaner(final Long projectDirMaxSizeInMb) {
+ ProjectCacheDirCleaner(final long projectDirMaxSizeInMb) {
this.projectDirMaxSizeInMb = projectDirMaxSizeInMb;
}
@@ -384,7 +392,6 @@ public class FlowPreparer {
}
synchronized void deleteProjectDirsIfNecessary(final long spaceToDeleteInBytes) {
- if (this.projectDirMaxSizeInMb != null) {
final long start = System.currentTimeMillis();
final List<ProjectVersion> allProjects = loadAllProjects();
FlowPreparer.log
@@ -396,10 +403,9 @@ public class FlowPreparer {
>= this.projectDirMaxSizeInMb * 1024 * 1024) {
FlowPreparer.log.info(String.format("Project dir disk usage[%s bytes] exceeds the "
+ "limit[%s mb], start cleaning up project dirs",
- currentSpaceInBytes + spaceToDeleteInBytes, this.projectDirMaxSizeInMb.longValue()));
+ currentSpaceInBytes + spaceToDeleteInBytes, this.projectDirMaxSizeInMb));
deleteLeastRecentlyUsedProjects(spaceToDeleteInBytes, allProjects);
}
- }
}
}
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index ca47350..b8fd5f8 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -117,7 +117,7 @@ public class FlowPreparerTest {
}
@Test
- public void testSetupFlow() throws Exception {
+ public void testSetupFlow() {
final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
when(executableFlow.getExecutionId()).thenReturn(12345);
when(executableFlow.getProjectId()).thenReturn(12);
@@ -129,6 +129,33 @@ public class FlowPreparerTest {
assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
}
+ @Test
+ public void testFileCountCheckNotCalled() {
+ //given
+ final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
+ when(executableFlow.getExecutionId()).thenReturn(12345);
+ when(executableFlow.getProjectId()).thenReturn(12);
+ when(executableFlow.getVersion()).thenReturn(34);
+
+ //when
+ this.instance.setup(executableFlow);
+
+ //then
+ verify(this.instance, never()).isFileCountEqual(any(), anyInt());
+ }
+
+ @Test
+ public void testIsFileCountEqual() {
+ //given
+ final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
+ this.executionsDir, this.projectsDir, 1L);
+ final File projectDir = new File(this.projectsDir, "sample_project_01");
+ projectDir.mkdir();
+ final ProjectVersion pv = new ProjectVersion(1, 1, projectDir);
+
+ //then
+ assertThat(flowPreparer.isFileCountEqual(pv, 1)).isEqualTo(true);
+ }
@Test
public void testProjectCacheDirCleanerNotEnabled() throws IOException {