azkaban-aplcache

Percentage-based project cache size (#2147) Currently

3/18/2019 9:41:51 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 4c4bca5..850acc4 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -233,8 +233,9 @@ public class Constants {
 
     public static final String SESSION_TIME_TO_LIVE = "session.time.to.live";
 
-    // allowed max size of shared project dir in MB
-    public static final String PROJECT_DIR_MAX_SIZE_IN_MB = "azkaban.project_cache_max_size_in_mb";
+    // allowed max size of shared project dir (percentage of partition size), e.g 0.8
+    public static final String PROJECT_CACHE_SIZE_PERCENTAGE = "azkaban"
+        + ".project_cache_size_percentage_of_disk";
 
     // how many older versions of project files are kept in DB before deleting them
     public static final String PROJECT_VERSION_RETENTION = "project.version.retention";
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index e444841..2af1ef5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -109,14 +109,9 @@ class FlowPreparer {
 
       final long flowPrepStartTime = System.currentTimeMillis();
 
-      // Download project to a temp dir if not exists in local cache.
-      final long start = System.currentTimeMillis();
 
       tempDir = downloadProjectIfNotExists(project, flow.getExecutionId());
 
-      log.info("Downloading zip file for project {} when preparing execution [execid {}] "
-              + "completed in {} second(s)", project, flow.getExecutionId(),
-          (System.currentTimeMillis() - start) / 1000);
 
       // With synchronization, only one thread is allowed to proceed to avoid complicated race
       // conditions which could arise when multiple threads are downloading/deleting/hard-linking
@@ -138,7 +133,11 @@ class FlowPreparer {
           // Rename temp dir to a proper project directory name.
           Files.move(tempDir.toPath(), project.getInstalledDir().toPath());
         }
+
+        final long start = System.currentTimeMillis();
         execDir = setupExecutionDir(project.getInstalledDir(), flow);
+        final long end = System.currentTimeMillis();
+        log.info("Setting up execution dir {} took {} sec(s)", execDir, (end - start) / 1000);
       }
 
       final long flowPrepCompletionTime = System.currentTimeMillis();
@@ -245,8 +244,17 @@ class FlowPreparer {
     }
 
     this.projectCacheHitRatio.markMiss();
+
+    final long start = System.currentTimeMillis();
+
+    // Download project to a temp dir if not exists in local cache.
     final File tempDir = createTempDir(proj);
     downloadAndUnzipProject(proj, tempDir);
+
+    log.info("Downloading zip file for project {} when preparing execution [execid {}] "
+            + "completed in {} second(s)", proj, execId,
+        (System.currentTimeMillis() - start) / 1000);
+
     return tempDir;
   }
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index df9b74c..c610207 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -212,11 +212,11 @@ public class FlowRunnerManager implements EventListener,
             JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), this.globalProps,
             getClass().getClassLoader());
 
-    Long projectDirMaxSize = null;
     ProjectCacheCleaner cleaner = null;
     try {
-      projectDirMaxSize = props.getLong(ConfigurationKeys.PROJECT_DIR_MAX_SIZE_IN_MB);
-      cleaner = new ProjectCacheCleaner(this.projectDirectory, projectDirMaxSize);
+      final double projectCacheSizePercentage =
+          props.getDouble(ConfigurationKeys.PROJECT_CACHE_SIZE_PERCENTAGE);
+      cleaner = new ProjectCacheCleaner(this.projectDirectory, projectCacheSizePercentage);
     } catch (final UndefinedPropertyException ex) {
     }
 
@@ -387,7 +387,7 @@ public class FlowRunnerManager implements EventListener,
     this.commonMetrics.addQueueWait(System.currentTimeMillis() -
         flow.getExecutableFlow().getSubmitTime());
 
-    final Timer.Context flowPrepTimerContext = execMetrics.getFlowSetupTimerContext();
+    final Timer.Context flowPrepTimerContext = this.execMetrics.getFlowSetupTimerContext();
 
     try {
       if (this.active || isExecutorSpecified(flow)) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java
index 751b9c9..3344fc4 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java
@@ -19,6 +19,7 @@ package azkaban.execapp;
 import azkaban.utils.ExecutorServiceUtils;
 import azkaban.utils.FileIOUtils;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.nio.file.Files;
@@ -27,7 +28,9 @@ import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.slf4j.Logger;
@@ -40,16 +43,18 @@ import org.slf4j.LoggerFactory;
 class ProjectCacheCleaner {
 
   private final File projectCacheDir;
-  private final long projectCacheMaxSizeInMB;
+
+  // cache size in percentage of disk partition where {@link projectCacheDir} belongs to
+  private final double percentageOfDisk;
+
   private static final Logger log = LoggerFactory.getLogger(ProjectCacheCleaner.class);
 
-  ProjectCacheCleaner(final File projectCacheDir, final long projectCacheMaxSizeInMB) {
+  ProjectCacheCleaner(final File projectCacheDir, final double percentageOfDisk) {
     Preconditions.checkNotNull(projectCacheDir);
     Preconditions.checkArgument(projectCacheDir.exists());
-    Preconditions.checkArgument(projectCacheMaxSizeInMB > 0);
-
+    Preconditions.checkArgument(percentageOfDisk > 0 && percentageOfDisk <= 1);
     this.projectCacheDir = projectCacheDir;
-    this.projectCacheMaxSizeInMB = projectCacheMaxSizeInMB;
+    this.percentageOfDisk = percentageOfDisk;
   }
 
   /**
@@ -69,7 +74,7 @@ class ProjectCacheCleaner {
       if (project.exists() && project.isDirectory()) {
         projects.add(project.toPath());
       } else {
-        log.debug("project {} doesn't exist or is non-dir.", project.getName());
+        log.debug("Project {} doesn't exist or is non-dir.", project.getName());
       }
     }
     return projects;
@@ -96,7 +101,7 @@ class ProjectCacheCleaner {
                 FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)));
         allProjects.add(projectDirMetadata);
       } catch (final Exception e) {
-        log.warn("error while loading project dir metadata for project {}",
+        log.warn("Error while loading project dir metadata for project {}",
             project.getFileName(), e);
       }
     }
@@ -115,22 +120,25 @@ class ProjectCacheCleaner {
   /**
    * Delete all the files in parallel
    *
-   * @param filesToDelete a list of files to delete
+   * @param projectDirsToDelete a set of project dirs to delete
    */
   @SuppressWarnings("FutureReturnValueIgnored")
-  private void deleteFilesInParallel(final List<File> filesToDelete) {
+  private void deleteProjectDirsInParallel(final ImmutableSet<File> projectDirsToDelete) {
     final int CLEANING_SERVICE_THREAD_NUM = 8;
     final ExecutorService deletionService = Executors
         .newFixedThreadPool(CLEANING_SERVICE_THREAD_NUM);
 
-    for (final File toDelete : filesToDelete) {
-      deletionService.submit(() -> FileIOUtils.deleteDirectorySilently(toDelete));
+    for (final File toDelete : projectDirsToDelete) {
+      deletionService.submit(() -> {
+        log.info("Deleting project dir {} from project cache to free up space", toDelete);
+        FileIOUtils.deleteDirectorySilently(toDelete);
+      });
     }
 
     try {
       new ExecutorServiceUtils().gracefulShutdown(deletionService, Duration.ofDays(1));
     } catch (final InterruptedException e) {
-      log.warn("error when deleting files", e);
+      log.warn("Error when deleting files", e);
     }
   }
 
@@ -144,7 +152,7 @@ class ProjectCacheCleaner {
       final List<ProjectDirectoryMetadata> projectDirMetadataList) {
     // Sort projects by last reference time in ascending order
     projectDirMetadataList.sort(Comparator.comparing(ProjectDirectoryMetadata::getLastAccessTime));
-    final List<File> filesToDelete = new ArrayList<>();
+    final Set<File> projectDirsToDelete = new HashSet<>();
 
     for (final ProjectDirectoryMetadata proj : projectDirMetadataList) {
       if (sizeToFreeInBytes > 0) {
@@ -152,37 +160,42 @@ class ProjectCacheCleaner {
         // delete the directory since execution dir is HARD linked to project dir. Note that even
         // if project is deleted, disk space will be freed up only when all associated execution
         // dirs are deleted.
-        log.debug("deleting project {}", proj);
         if (proj.getInstalledDir() != null) {
-          filesToDelete.add(proj.getInstalledDir());
+          projectDirsToDelete.add(proj.getInstalledDir());
           sizeToFreeInBytes -= proj.getDirSizeInByte();
         }
       } else {
         break;
       }
     }
-    deleteFilesInParallel(filesToDelete);
+
+    final long start = System.currentTimeMillis();
+    deleteProjectDirsInParallel(ImmutableSet.copyOf(projectDirsToDelete));
+    final long end = System.currentTimeMillis();
+    log.info("Deleting {} project dir(s) took {} sec(s)", projectDirsToDelete.size(),
+        (end - start) / 1000);
   }
 
   /**
    * Deleting least recently accessed project dirs when there's no room to accommodate new project
    */
   void deleteProjectDirsIfNecessary(final long newProjectSizeInBytes) {
+    final long projectCacheMaxSizeInByte =
+        (long) (this.projectCacheDir.getTotalSpace() * this.percentageOfDisk);
+
     final long start = System.currentTimeMillis();
     final List<ProjectDirectoryMetadata> allProjects = loadAllProjects();
-    log.debug("loading all project dirs metadata completed in {} sec(s)",
-        Duration.ofSeconds(System.currentTimeMillis() - start).getSeconds());
+    log.info("Loading {} project dirs metadata completed in {} sec(s)",
+        allProjects.size(), (System.currentTimeMillis() - start) / 1000);
 
     final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes(allProjects);
-    if (currentSpaceInBytes + newProjectSizeInBytes
-        >= this.projectCacheMaxSizeInMB * 1024 * 1024) {
+    if (currentSpaceInBytes + newProjectSizeInBytes >= projectCacheMaxSizeInByte) {
       log.info(
-          "project cache usage[{} MB] exceeds the limit[{} MB], start cleaning up project dirs",
+          "Project cache usage[{} MB] >= cache limit[{} MB], start cleaning up project dirs",
           (currentSpaceInBytes + newProjectSizeInBytes) / (1024 * 1024),
-          this.projectCacheMaxSizeInMB);
+          projectCacheMaxSizeInByte / (1024 * 1024));
 
-      final long freeCacheSpaceInBytes =
-          this.projectCacheMaxSizeInMB * 1024 * 1024 - currentSpaceInBytes;
+      final long freeCacheSpaceInBytes = projectCacheMaxSizeInByte - currentSpaceInBytes;
 
       deleteLeastRecentlyUsedProjects(newProjectSizeInBytes - freeCacheSpaceInBytes, allProjects);
     }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java
index ba786cc..1b57470 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java
@@ -18,6 +18,8 @@
 package azkaban.execapp;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import azkaban.utils.Utils;
 import java.io.File;
@@ -46,7 +48,11 @@ public class ProjectCacheCleanerTest {
 
   @Before
   public void setUp() throws Exception {
-    this.cacheDir = this.temporaryFolder.newFolder("projects");
+    this.cacheDir = spy(this.temporaryFolder.newFolder("projects"));
+
+    final long TEN_MB_IN_BYTE = 10 * 1024 * 1024;
+    when(this.cacheDir.getTotalSpace()).thenReturn(TEN_MB_IN_BYTE);
+
     final ClassLoader classLoader = getClass().getClassLoader();
 
     final long current = System.currentTimeMillis();
@@ -71,9 +77,8 @@ public class ProjectCacheCleanerTest {
    * There's still space in the cache, no deletion.
    */
   public void testNotDeleting() {
-    final Long projectDirMaxSizeInMB = 7L;
     final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir,
-        projectDirMaxSizeInMB);
+        0.7);
     cleaner.deleteProjectDirsIfNecessary(1);
 
     assertThat(this.cacheDir.list()).hasSize(3);
@@ -84,8 +89,7 @@ public class ProjectCacheCleanerTest {
    * Deleting everything in the cache to accommodate new item.
    */
   public void testDeletingAll() {
-    final Long projectDirMaxSize = 3L;
-    final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+    final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, 0.3);
     cleaner.deleteProjectDirsIfNecessary(7000000);
 
     assertThat(this.cacheDir.list()).hasSize(0);
@@ -96,8 +100,7 @@ public class ProjectCacheCleanerTest {
    * Deleting two least recently used items in the cache to accommodate new item.
    */
   public void testDeletingTwoLRUItems() {
-    final Long projectDirMaxSize = 7L;
-    final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+    final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, 0.7);
     cleaner.deleteProjectDirsIfNecessary(3000000);
     assertThat(this.cacheDir.list()).hasSize(1);
     assertThat(this.cacheDir.list()).contains("3.1");
@@ -108,8 +111,7 @@ public class ProjectCacheCleanerTest {
    * Deleting the least recently used item in the cache to accommodate new item.
    */
   public void testDeletingOneLRUItem() {
-    final Long projectDirMaxSize = 7L;
-    final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+    final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, 0.7);
     cleaner.deleteProjectDirsIfNecessary(2000000);
     assertThat(this.cacheDir.list()).hasSize(2);
     assertThat(this.cacheDir.list()).contains("3.1");