Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 4c4bca5..850acc4 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -233,8 +233,9 @@ public class Constants {
public static final String SESSION_TIME_TO_LIVE = "session.time.to.live";
- // allowed max size of shared project dir in MB
- public static final String PROJECT_DIR_MAX_SIZE_IN_MB = "azkaban.project_cache_max_size_in_mb";
+ // allowed max size of shared project dir (percentage of partition size), e.g 0.8
+ public static final String PROJECT_CACHE_SIZE_PERCENTAGE = "azkaban"
+ + ".project_cache_size_percentage_of_disk";
// how many older versions of project files are kept in DB before deleting them
public static final String PROJECT_VERSION_RETENTION = "project.version.retention";
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index e444841..2af1ef5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -109,14 +109,9 @@ class FlowPreparer {
final long flowPrepStartTime = System.currentTimeMillis();
- // Download project to a temp dir if not exists in local cache.
- final long start = System.currentTimeMillis();
tempDir = downloadProjectIfNotExists(project, flow.getExecutionId());
- log.info("Downloading zip file for project {} when preparing execution [execid {}] "
- + "completed in {} second(s)", project, flow.getExecutionId(),
- (System.currentTimeMillis() - start) / 1000);
// With synchronization, only one thread is allowed to proceed to avoid complicated race
// conditions which could arise when multiple threads are downloading/deleting/hard-linking
@@ -138,7 +133,11 @@ class FlowPreparer {
// Rename temp dir to a proper project directory name.
Files.move(tempDir.toPath(), project.getInstalledDir().toPath());
}
+
+ final long start = System.currentTimeMillis();
execDir = setupExecutionDir(project.getInstalledDir(), flow);
+ final long end = System.currentTimeMillis();
+ log.info("Setting up execution dir {} took {} sec(s)", execDir, (end - start) / 1000);
}
final long flowPrepCompletionTime = System.currentTimeMillis();
@@ -245,8 +244,17 @@ class FlowPreparer {
}
this.projectCacheHitRatio.markMiss();
+
+ final long start = System.currentTimeMillis();
+
+ // Download project to a temp dir if not exists in local cache.
final File tempDir = createTempDir(proj);
downloadAndUnzipProject(proj, tempDir);
+
+ log.info("Downloading zip file for project {} when preparing execution [execid {}] "
+ + "completed in {} second(s)", proj, execId,
+ (System.currentTimeMillis() - start) / 1000);
+
return tempDir;
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index df9b74c..c610207 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -212,11 +212,11 @@ public class FlowRunnerManager implements EventListener,
JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), this.globalProps,
getClass().getClassLoader());
- Long projectDirMaxSize = null;
ProjectCacheCleaner cleaner = null;
try {
- projectDirMaxSize = props.getLong(ConfigurationKeys.PROJECT_DIR_MAX_SIZE_IN_MB);
- cleaner = new ProjectCacheCleaner(this.projectDirectory, projectDirMaxSize);
+ final double projectCacheSizePercentage =
+ props.getDouble(ConfigurationKeys.PROJECT_CACHE_SIZE_PERCENTAGE);
+ cleaner = new ProjectCacheCleaner(this.projectDirectory, projectCacheSizePercentage);
} catch (final UndefinedPropertyException ex) {
}
@@ -387,7 +387,7 @@ public class FlowRunnerManager implements EventListener,
this.commonMetrics.addQueueWait(System.currentTimeMillis() -
flow.getExecutableFlow().getSubmitTime());
- final Timer.Context flowPrepTimerContext = execMetrics.getFlowSetupTimerContext();
+ final Timer.Context flowPrepTimerContext = this.execMetrics.getFlowSetupTimerContext();
try {
if (this.active || isExecutorSpecified(flow)) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java
index 751b9c9..3344fc4 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java
@@ -19,6 +19,7 @@ package azkaban.execapp;
import azkaban.utils.ExecutorServiceUtils;
import azkaban.utils.FileIOUtils;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.FilenameFilter;
import java.nio.file.Files;
@@ -27,7 +28,9 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
@@ -40,16 +43,18 @@ import org.slf4j.LoggerFactory;
class ProjectCacheCleaner {
private final File projectCacheDir;
- private final long projectCacheMaxSizeInMB;
+
+ // cache size in percentage of disk partition where {@link projectCacheDir} belongs to
+ private final double percentageOfDisk;
+
private static final Logger log = LoggerFactory.getLogger(ProjectCacheCleaner.class);
- ProjectCacheCleaner(final File projectCacheDir, final long projectCacheMaxSizeInMB) {
+ ProjectCacheCleaner(final File projectCacheDir, final double percentageOfDisk) {
Preconditions.checkNotNull(projectCacheDir);
Preconditions.checkArgument(projectCacheDir.exists());
- Preconditions.checkArgument(projectCacheMaxSizeInMB > 0);
-
+ Preconditions.checkArgument(percentageOfDisk > 0 && percentageOfDisk <= 1);
this.projectCacheDir = projectCacheDir;
- this.projectCacheMaxSizeInMB = projectCacheMaxSizeInMB;
+ this.percentageOfDisk = percentageOfDisk;
}
/**
@@ -69,7 +74,7 @@ class ProjectCacheCleaner {
if (project.exists() && project.isDirectory()) {
projects.add(project.toPath());
} else {
- log.debug("project {} doesn't exist or is non-dir.", project.getName());
+ log.debug("Project {} doesn't exist or is non-dir.", project.getName());
}
}
return projects;
@@ -96,7 +101,7 @@ class ProjectCacheCleaner {
FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)));
allProjects.add(projectDirMetadata);
} catch (final Exception e) {
- log.warn("error while loading project dir metadata for project {}",
+ log.warn("Error while loading project dir metadata for project {}",
project.getFileName(), e);
}
}
@@ -115,22 +120,25 @@ class ProjectCacheCleaner {
/**
* Delete all the files in parallel
*
- * @param filesToDelete a list of files to delete
+ * @param projectDirsToDelete a set of project dirs to delete
*/
@SuppressWarnings("FutureReturnValueIgnored")
- private void deleteFilesInParallel(final List<File> filesToDelete) {
+ private void deleteProjectDirsInParallel(final ImmutableSet<File> projectDirsToDelete) {
final int CLEANING_SERVICE_THREAD_NUM = 8;
final ExecutorService deletionService = Executors
.newFixedThreadPool(CLEANING_SERVICE_THREAD_NUM);
- for (final File toDelete : filesToDelete) {
- deletionService.submit(() -> FileIOUtils.deleteDirectorySilently(toDelete));
+ for (final File toDelete : projectDirsToDelete) {
+ deletionService.submit(() -> {
+ log.info("Deleting project dir {} from project cache to free up space", toDelete);
+ FileIOUtils.deleteDirectorySilently(toDelete);
+ });
}
try {
new ExecutorServiceUtils().gracefulShutdown(deletionService, Duration.ofDays(1));
} catch (final InterruptedException e) {
- log.warn("error when deleting files", e);
+ log.warn("Error when deleting files", e);
}
}
@@ -144,7 +152,7 @@ class ProjectCacheCleaner {
final List<ProjectDirectoryMetadata> projectDirMetadataList) {
// Sort projects by last reference time in ascending order
projectDirMetadataList.sort(Comparator.comparing(ProjectDirectoryMetadata::getLastAccessTime));
- final List<File> filesToDelete = new ArrayList<>();
+ final Set<File> projectDirsToDelete = new HashSet<>();
for (final ProjectDirectoryMetadata proj : projectDirMetadataList) {
if (sizeToFreeInBytes > 0) {
@@ -152,37 +160,42 @@ class ProjectCacheCleaner {
// delete the directory since execution dir is HARD linked to project dir. Note that even
// if project is deleted, disk space will be freed up only when all associated execution
// dirs are deleted.
- log.debug("deleting project {}", proj);
if (proj.getInstalledDir() != null) {
- filesToDelete.add(proj.getInstalledDir());
+ projectDirsToDelete.add(proj.getInstalledDir());
sizeToFreeInBytes -= proj.getDirSizeInByte();
}
} else {
break;
}
}
- deleteFilesInParallel(filesToDelete);
+
+ final long start = System.currentTimeMillis();
+ deleteProjectDirsInParallel(ImmutableSet.copyOf(projectDirsToDelete));
+ final long end = System.currentTimeMillis();
+ log.info("Deleting {} project dir(s) took {} sec(s)", projectDirsToDelete.size(),
+ (end - start) / 1000);
}
/**
* Deleting least recently accessed project dirs when there's no room to accommodate new project
*/
void deleteProjectDirsIfNecessary(final long newProjectSizeInBytes) {
+ final long projectCacheMaxSizeInByte =
+ (long) (this.projectCacheDir.getTotalSpace() * this.percentageOfDisk);
+
final long start = System.currentTimeMillis();
final List<ProjectDirectoryMetadata> allProjects = loadAllProjects();
- log.debug("loading all project dirs metadata completed in {} sec(s)",
- Duration.ofSeconds(System.currentTimeMillis() - start).getSeconds());
+ log.info("Loading {} project dirs metadata completed in {} sec(s)",
+ allProjects.size(), (System.currentTimeMillis() - start) / 1000);
final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes(allProjects);
- if (currentSpaceInBytes + newProjectSizeInBytes
- >= this.projectCacheMaxSizeInMB * 1024 * 1024) {
+ if (currentSpaceInBytes + newProjectSizeInBytes >= projectCacheMaxSizeInByte) {
log.info(
- "project cache usage[{} MB] exceeds the limit[{} MB], start cleaning up project dirs",
+ "Project cache usage[{} MB] >= cache limit[{} MB], start cleaning up project dirs",
(currentSpaceInBytes + newProjectSizeInBytes) / (1024 * 1024),
- this.projectCacheMaxSizeInMB);
+ projectCacheMaxSizeInByte / (1024 * 1024));
- final long freeCacheSpaceInBytes =
- this.projectCacheMaxSizeInMB * 1024 * 1024 - currentSpaceInBytes;
+ final long freeCacheSpaceInBytes = projectCacheMaxSizeInByte - currentSpaceInBytes;
deleteLeastRecentlyUsedProjects(newProjectSizeInBytes - freeCacheSpaceInBytes, allProjects);
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java
index ba786cc..1b57470 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java
@@ -18,6 +18,8 @@
package azkaban.execapp;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import azkaban.utils.Utils;
import java.io.File;
@@ -46,7 +48,11 @@ public class ProjectCacheCleanerTest {
@Before
public void setUp() throws Exception {
- this.cacheDir = this.temporaryFolder.newFolder("projects");
+ this.cacheDir = spy(this.temporaryFolder.newFolder("projects"));
+
+ final long TEN_MB_IN_BYTE = 10 * 1024 * 1024;
+ when(this.cacheDir.getTotalSpace()).thenReturn(TEN_MB_IN_BYTE);
+
final ClassLoader classLoader = getClass().getClassLoader();
final long current = System.currentTimeMillis();
@@ -71,9 +77,8 @@ public class ProjectCacheCleanerTest {
* There's still space in the cache, no deletion.
*/
public void testNotDeleting() {
- final Long projectDirMaxSizeInMB = 7L;
final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir,
- projectDirMaxSizeInMB);
+ 0.7);
cleaner.deleteProjectDirsIfNecessary(1);
assertThat(this.cacheDir.list()).hasSize(3);
@@ -84,8 +89,7 @@ public class ProjectCacheCleanerTest {
* Deleting everything in the cache to accommodate new item.
*/
public void testDeletingAll() {
- final Long projectDirMaxSize = 3L;
- final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+ final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, 0.3);
cleaner.deleteProjectDirsIfNecessary(7000000);
assertThat(this.cacheDir.list()).hasSize(0);
@@ -96,8 +100,7 @@ public class ProjectCacheCleanerTest {
* Deleting two least recently used items in the cache to accommodate new item.
*/
public void testDeletingTwoLRUItems() {
- final Long projectDirMaxSize = 7L;
- final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+ final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, 0.7);
cleaner.deleteProjectDirsIfNecessary(3000000);
assertThat(this.cacheDir.list()).hasSize(1);
assertThat(this.cacheDir.list()).contains("3.1");
@@ -108,8 +111,7 @@ public class ProjectCacheCleanerTest {
* Deleting the least recently used item in the cache to accommodate new item.
*/
public void testDeletingOneLRUItem() {
- final Long projectDirMaxSize = 7L;
- final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+ final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, 0.7);
cleaner.deleteProjectDirsIfNecessary(2000000);
assertThat(this.cacheDir.list()).hasSize(2);
assertThat(this.cacheDir.list()).contains("3.1");