azkaban-aplcache

FlowPreparer refactor (#2130) This PR refactors FlowPreparer

2/25/2019 9:52:42 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 7290bad..5a86e7d 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -36,8 +36,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -45,7 +47,7 @@ import org.apache.log4j.Logger;
  */
 public class FileIOUtils {
 
-  private final static Logger logger = Logger.getLogger(FileIOUtils.class);
+  private static final Logger log = LoggerFactory.getLogger(FileIOUtils.class);
 
   /**
    * Check if a directory is writable
@@ -72,17 +74,17 @@ public class FileIOUtils {
     return true;
   }
 
-  public static int getFileCount(final File file) {
-    final File[] files = file.listFiles();
-    int count = 0;
-    for (final File f : files) {
-      if (f.isDirectory()) {
-        count += getFileCount(f);
-      } else {
-        count++;
+  /**
+   * Delete a directory, log the error if deletion fails.
+   */
+  public static void deleteDirectorySilently(final File dir) {
+    if (dir != null) {
+      try {
+        FileUtils.deleteDirectory(dir);
+      } catch (final IOException e) {
+        log.error("error when deleting dir {}", dir, e);
       }
     }
-    return count;
   }
 
 
@@ -98,7 +100,7 @@ public class FileIOUtils {
         .newBufferedWriter(filePath, StandardCharsets.UTF_8)) {
       writer.write(String.valueOf(num));
     } catch (final IOException e) {
-      logger.error(String.format("Failed to write the number %s to the file %s", num, filePath), e);
+      log.error("Failed to write the number {} to the file {}", num, filePath, e);
       throw e;
     }
   }
diff --git a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
index 9887004..f5cfeb7 100644
--- a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
@@ -135,11 +135,6 @@ public class FileIOUtilsTest {
   }
 
   @Test
-  public void testFileCount() {
-    assertThat(FileIOUtils.getFileCount(this.baseDir)).isEqualTo(5);
-  }
-
-  @Test
   public void testHardlinkCopy() throws IOException {
     final int hardLinkCount = FileIOUtils.createDeepHardlink(this.sourceDir, this.destDir);
     assertThat(areDirsEqual(this.sourceDir, this.destDir, true)).isTrue();
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 4f68c10..66f6b8d 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -21,49 +21,44 @@ import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.project.ProjectFileHandler;
-import azkaban.project.ProjectManagerException;
 import azkaban.storage.StorageManager;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.Utils;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.FileTime;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
+import java.util.Optional;
 import java.util.zip.ZipFile;
 import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class FlowPreparer {
+class FlowPreparer {
 
   // Name of the file which keeps project directory size
   static final String PROJECT_DIR_SIZE_FILE_NAME = "___azkaban_project_dir_size_in_bytes___";
 
-  // Name of the file which keeps count of files inside the directory
-  static final String PROJECT_DIR_COUNT_FILE_NAME = "___azkaban_project_dir_count___";
+  private static final Logger log = LoggerFactory.getLogger(FlowPreparer.class);
 
-  private static final Logger log = Logger.getLogger(FlowPreparer.class);
   // TODO spyne: move to config class
   private final File executionsDir;
   // TODO spyne: move to config class
-  private final File projectsDir;
+  private final File projectCacheDir;
   private final StorageManager storageManager;
-  private final ProjectCacheDirCleaner projectDirCleaner;
-  private final ProjectsDirCacheMetrics cacheMetrics;
+  // Null if cache clean-up is disabled
+  private final Optional<ProjectCacheCleaner> projectCacheCleaner;
+  private final ProjectCacheMetrics cacheMetrics;
 
   @VisibleForTesting
-  static class ProjectsDirCacheMetrics {
-
+  static class ProjectCacheMetrics {
     private long cacheHit;
     private long cacheMiss;
 
@@ -84,328 +79,205 @@ public class FlowPreparer {
     }
   }
 
-  public FlowPreparer(final StorageManager storageManager, final File executionsDir,
-      final File projectsDir, final Long projectDirMaxSizeInMb) {
+  FlowPreparer(final StorageManager storageManager, final File executionsDir,
+      final File projectsDir, final ProjectCacheCleaner cleaner) {
+    Preconditions.checkNotNull(storageManager);
+    Preconditions.checkNotNull(executionsDir);
+    Preconditions.checkNotNull(projectsDir);
+
+    Preconditions.checkArgument(projectsDir.exists());
+    Preconditions.checkArgument(executionsDir.exists());
+
     this.storageManager = storageManager;
     this.executionsDir = executionsDir;
-    this.projectsDir = projectsDir;
-    this.projectDirCleaner =
-        projectDirMaxSizeInMb != null ? new ProjectCacheDirCleaner(projectDirMaxSizeInMb) : null;
-    this.cacheMetrics = new ProjectsDirCacheMetrics();
+    this.projectCacheDir = projectsDir;
+    this.projectCacheCleaner = Optional.ofNullable(cleaner);
+    this.cacheMetrics = new ProjectCacheMetrics();
   }
 
-  private boolean isProjectCacheSizeLimitEnabled() {
-    return this.projectDirCleaner != null;
-  }
-
-  public double getProjectDirCacheHitRatio() {
+  double getProjectDirCacheHitRatio() {
     return this.cacheMetrics.getHitRatio();
   }
 
   /**
-   * Creates a file which keeps the size of {@param dir} in bytes inside the {@param dir} and sets
-   * the dirSize for {@param pv}.
-   *
-   * @param dir the directory whose size needs to be kept in the file to be created.
-   * @param pv the projectVersion whose size needs to updated.
-   */
-  static void updateDirSize(final File dir, final ProjectVersion pv) {
-    try {
-      final Path path = Paths.get(dir.getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
-      if (!Files.exists(path)) {
-        final long sizeInByte = FileUtils.sizeOfDirectory(dir);
-        FileIOUtils.dumpNumberToFile(path, sizeInByte);
-      }
-      pv.setDirSizeInBytes(FileIOUtils.readNumberFromFile(path));
-    } catch (final IOException e) {
-      log.error("error when dumping dir size to file", e);
-    }
-  }
-
-  /**
-   * Creates a file which keeps the count of files inside {@param dir}
+   * Calculate the directory size and save it to a file.
    *
-   * @param dir the directory whose size needs to be kept in the file to be created.
-   * @param pv the projectVersion whose size needs to updated.
+   * @param dir the directory whose size needs to be saved.
+   * @return the size of the dir.
    */
-  static void updateFileCount(final File dir, final ProjectVersion pv) {
-    try {
-      final Path path = Paths.get(dir.getPath(), PROJECT_DIR_COUNT_FILE_NAME);
-      if (!Files.exists(path)) {
-        // count itself
-        final int fileCount = FileIOUtils.getFileCount(dir) + 1;
-        FileIOUtils.dumpNumberToFile(path, fileCount);
-      }
-      pv.setFileCount((int) FileIOUtils.readNumberFromFile(path));
-    } catch (final IOException e) {
-      log.error("error when updating file count", e);
+  static long calculateDirSizeAndSave(final File dir) throws IOException {
+    final Path path = Paths.get(dir.getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
+    if (!Files.exists(path)) {
+      final long sizeInByte = FileUtils.sizeOfDirectory(dir);
+      FileIOUtils.dumpNumberToFile(path, sizeInByte);
+      return sizeInByte;
+    } else {
+      return FileIOUtils.readNumberFromFile(path);
     }
   }
 
 
   /**
-   * check if number of files inside the project dir equals to target
-   */
-  @VisibleForTesting
-  boolean isFileCountEqual(final ProjectVersion pv, final int target) {
-    final int fileCount;
-    try {
-      updateFileCount(pv.getInstalledDir(), pv);
-      final Path path = Paths.get(pv.getInstalledDir().getPath(), PROJECT_DIR_COUNT_FILE_NAME);
-      fileCount = (int) FileIOUtils.readNumberFromFile(path);
-      return fileCount == target;
-    } catch (final IOException e) {
-      log.error(e);
-      return false;
-    }
-  }
-
-  /**
    * Prepare the flow directory for execution.
    *
    * @param flow Executable Flow instance.
    */
-  void setup(final ExecutableFlow flow) {
-    File execDir = null;
+  void setup(final ExecutableFlow flow) throws ExecutorManagerException {
+    final ProjectFileHandler projectFileHandler = null;
+    File tempDir = null;
     try {
-      // First get the ProjectVersion
-      final ProjectVersion projectVersion = new ProjectVersion(flow.getProjectId(),
+      final ProjectDirectoryMetadata project = new ProjectDirectoryMetadata(
+          flow.getProjectId(),
           flow.getVersion());
 
-      // Setup the project
-      setupProject(projectVersion);
-
-      // Create the execution directory
-      execDir = createExecDir(flow);
-
-      // Create the symlinks from the project
-      final int linkCount = FileIOUtils
-          .createDeepHardlink(projectVersion.getInstalledDir(), execDir);
-
-      if (isProjectCacheSizeLimitEnabled() && !isFileCountEqual(projectVersion, linkCount)) {
-        throw new Exception(String.format("File count check failed for execid: %d, project dir %s"
-                + " are being deleted when setting this execution up",
-            flow.getExecutionId(), projectVersion.getInstalledDir()));
+      final long flowPrepStartTime = System.currentTimeMillis();
+
+      // Download project to a temp dir if not exists in local cache.
+      final long start = System.currentTimeMillis();
+
+      tempDir = downloadProjectIfNotExists(project);
+
+      log.info("Downloading zip file for project {} when preparing execution [execid {}] "
+              + "completed in {} second(s)", project, flow.getExecutionId(),
+          (System.currentTimeMillis() - start) / 1000);
+
+      // With synchronization, only one thread is allowed to proceed to avoid complicated race
+      // conditions which could arise when multiple threads are downloading/deleting/hard-linking
+      // the same project. But it doesn't prevent multiple executor processes interfering with each
+      // other triggering race conditions. So it's important to operationally make sure that only
+      // one executor process is setting up flow execution against the shared project directory.
+      long criticalSectionStartTime = -1;
+      File execDir = null;
+
+      synchronized (this) {
+        criticalSectionStartTime = System.currentTimeMillis();
+        if (!project.getInstalledDir().exists() && tempDir != null) {
+          // If new project is downloaded and project dir cache clean-up feature is enabled, then
+          // perform clean-up if size of all project dirs exceeds the cache size.
+          if (this.projectCacheCleaner.isPresent()) {
+            this.projectCacheCleaner.get()
+                .deleteProjectDirsIfNecessary(project.getDirSizeInByte());
+          }
+          // Rename temp dir to a proper project directory name.
+          Files.move(tempDir.toPath(), project.getInstalledDir().toPath());
+        }
+        execDir = setupExecutionDir(project.getInstalledDir(), flow);
       }
 
-      log.info(String
-          .format("Flow Preparation complete. [execid: %d, path: %s]", flow.getExecutionId(),
-              execDir.getPath()));
-    } catch (final Exception e) {
-      log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
-      cleanup(execDir);
-      throw new RuntimeException(e);
+      final long flowPrepCompletionTime = System.currentTimeMillis();
+      log.info("Flow preparation completed in {} sec(s), out ot which {} sec(s) was spent inside "
+              + "critical section. [execid: {}, path: {}]",
+          (flowPrepCompletionTime - flowPrepStartTime) / 1000,
+          (flowPrepCompletionTime - criticalSectionStartTime) / 1000,
+          flow.getExecutionId(), execDir.getPath());
+    } catch (final Exception ex) {
+      FileIOUtils.deleteDirectorySilently(tempDir);
+      log.error("Error in preparing flow execution {}", flow.getExecutionId(), ex);
+      throw new ExecutorManagerException(ex);
+    } finally {
+      if (projectFileHandler != null) {
+        projectFileHandler.deleteLocalFile();
+      }
     }
   }
 
-  private void cleanup(final File execDir) {
-    if (execDir != null) {
-      try {
-        FileUtils.deleteDirectory(execDir);
-      } catch (final IOException e) {
-        throw new RuntimeException(e);
-      }
+  private File setupExecutionDir(final File installedDir, final ExecutableFlow flow)
+      throws IOException {
+    File execDir = null;
+    try {
+      execDir = createExecDir(flow);
+      // Create hardlinks from the project
+      FileIOUtils.createDeepHardlink(installedDir, execDir);
+      return execDir;
+    } catch (final Exception ex) {
+      FileIOUtils.deleteDirectorySilently(execDir);
+      throw ex;
     }
   }
 
   /**
-   * Touch the file if it exists.
+   * Update last modified time of the file if it exists.
    *
    * @param path path to the target file
    */
   @VisibleForTesting
-  void touchIfExists(final Path path) {
+  void updateLastModifiedTime(final Path path) {
     try {
       Files.setLastModifiedTime(path, FileTime.fromMillis(System.currentTimeMillis()));
     } catch (final IOException ex) {
-      log.error(ex);
+      log.warn("Error when updating last modified time for {}", path, ex);
     }
   }
 
   /**
-   * Prepare the project directory.
-   *
-   * @param pv ProjectVersion object
+   * @return the project directory name of a project
    */
-  @VisibleForTesting
-  void setupProject(final ProjectVersion pv)
-      throws ProjectManagerException, IOException {
-    final int projectId = pv.getProjectId();
-    final int version = pv.getVersion();
-
-    final String projectDir = String.valueOf(projectId) + "." + String.valueOf(version);
-    if (pv.getInstalledDir() == null) {
-      pv.setInstalledDir(new File(this.projectsDir, projectDir));
-    }
-
-    // If directory exists. Assume its prepared and skip.
-    if (pv.getInstalledDir().exists()) {
-      log.info("Project already cached. Skipping download. " + pv);
-      this.cacheMetrics.incrementCacheHit();
-      touchIfExists(
-          Paths.get(pv.getInstalledDir().getPath(), PROJECT_DIR_SIZE_FILE_NAME));
-      return;
-    }
-
-    log.info("Preparing Project: " + pv);
+  private String generateProjectDirName(final ProjectDirectoryMetadata proj) {
+    return String.valueOf(proj.getProjectId()) + "." + String.valueOf(proj.getVersion());
+  }
 
-    final File tempDir = new File(this.projectsDir,
+  private File createTempDir(final ProjectDirectoryMetadata proj) {
+    final String projectDir = generateProjectDirName(proj);
+    final File tempDir = new File(this.projectCacheDir,
         "_temp." + projectDir + "." + System.currentTimeMillis());
-
-    // TODO spyne: Why mkdirs? This path should be already set up.
     tempDir.mkdirs();
+    return tempDir;
+  }
 
-    ProjectFileHandler projectFileHandler = null;
+  private void downloadAndUnzipProject(final ProjectDirectoryMetadata proj, final File dest)
+      throws IOException {
+    final ProjectFileHandler projectFileHandler = requireNonNull(this.storageManager
+        .getProjectFile(proj.getProjectId(), proj.getVersion()));
     try {
-      log.info(String.format("Downloading zip file for Project Version {%s}", pv));
-      this.cacheMetrics.incrementCacheMiss();
-      projectFileHandler = requireNonNull(
-          this.storageManager.getProjectFile(pv.getProjectId(), pv.getVersion()));
       checkState("zip".equals(projectFileHandler.getFileType()));
       final File zipFile = requireNonNull(projectFileHandler.getLocalFile());
       final ZipFile zip = new ZipFile(zipFile);
-      Utils.unzip(zip, tempDir);
-      updateDirSize(tempDir, pv);
-      updateFileCount(tempDir, pv);
-      log.info(String.format("Downloading zip file for Project Version {%s} completes", pv));
-      if (this.projectDirCleaner != null) {
-        this.projectDirCleaner.deleteProjectDirsIfNecessary(pv.getDirSizeInBytes());
-      }
-      Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
-      log.warn(String.format("Project preparation completes. [%s]", pv));
+      Utils.unzip(zip, dest);
+      proj.setDirSizeInByte(calculateDirSizeAndSave(dest));
     } finally {
-      if (projectFileHandler != null) {
-        projectFileHandler.deleteLocalFile();
-      }
-      // Clean up: Remove tempDir if exists
-      FileUtils.deleteDirectory(tempDir);
+      projectFileHandler.deleteLocalFile();
     }
   }
 
+  /**
+   * Download project zip and unzip it if not exists locally.
+   *
+   * @param proj project to download
+   * @return the temp dir where the new project is downloaded to, null if no project is downloaded.
+   * @throws IOException if downloading or unzipping fails.
+   */
+  @VisibleForTesting
+  File downloadProjectIfNotExists(final ProjectDirectoryMetadata proj)
+      throws IOException {
+    final String projectDir = generateProjectDirName(proj);
+    if (proj.getInstalledDir() == null) {
+      proj.setInstalledDir(new File(this.projectCacheDir, projectDir));
+    }
+
+    // If directory exists, assume it's prepared and skip.
+    if (proj.getInstalledDir().exists()) {
+      log.info("Project {} already cached. Skipping download.", proj);
+      // Hit the local cache.
+      this.cacheMetrics.incrementCacheHit();
+      // Update last modified time of the file keeping project dir size when the project is
+      // accessed. This last modified time will be used to determined least recently used
+      // projects when performing project directory clean-up.
+      updateLastModifiedTime(
+          Paths.get(proj.getInstalledDir().getPath(), PROJECT_DIR_SIZE_FILE_NAME));
+      return null;
+    }
+
+    this.cacheMetrics.incrementCacheMiss();
+    final File tempDir = createTempDir(proj);
+    downloadAndUnzipProject(proj, tempDir);
+    return tempDir;
+  }
 
   private File createExecDir(final ExecutableFlow flow) {
     final int execId = flow.getExecutionId();
     final File execDir = new File(this.executionsDir, String.valueOf(execId));
     flow.setExecutionPath(execDir.getPath());
-
-    // TODO spyne: Why mkdirs? This path should be already set up.
     execDir.mkdirs();
     return execDir;
   }
-
-
-  private class ProjectCacheDirCleaner {
-
-    private final long projectDirMaxSizeInMb;
-
-    /*
-     * Delete the project dir associated with {@code version}.
-     */
-    private void deleteDirectory(final ProjectVersion pv) throws IOException {
-      final File installedDir = pv.getInstalledDir();
-      if (installedDir != null && installedDir.exists()) {
-        FileUtils.deleteDirectory(installedDir);
-      }
-    }
-
-    ProjectCacheDirCleaner(final long projectDirMaxSizeInMb) {
-      this.projectDirMaxSizeInMb = projectDirMaxSizeInMb;
-    }
-
-    private List<Path> loadAllProjectDirs() {
-      final List<Path> projects = new ArrayList<>();
-      for (final File project : FlowPreparer.this.projectsDir.listFiles(new FilenameFilter() {
-
-        String pattern = "[0-9]+\\.[0-9]+";
-
-        @Override
-        public boolean accept(final File dir, final String name) {
-          return name.matches(this.pattern);
-        }
-      })) {
-        if (project.exists() && project.isDirectory()) {
-          projects.add(project.toPath());
-        } else {
-          FlowPreparer.log
-              .debug(String.format("project %s doesn't exist or is non-dir.", project.getName()));
-        }
-      }
-      return projects;
-    }
-
-    private List<ProjectVersion> loadAllProjects() {
-      final List<ProjectVersion> allProjects = new ArrayList<>();
-      for (final Path project : this.loadAllProjectDirs()) {
-        try {
-          final String fileName = project.getFileName().toString();
-          final int projectId = Integer.parseInt(fileName.split("\\.")[0]);
-          final int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
-
-          final ProjectVersion projVersion =
-              new ProjectVersion(projectId, versionNum, project.toFile());
-
-          FlowPreparer.updateDirSize(projVersion.getInstalledDir(), projVersion);
-          FlowPreparer.updateFileCount(projVersion.getInstalledDir(), projVersion);
-
-          final Path projectDirFileCount = Paths.get(projVersion.getInstalledDir().toString(),
-              FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
-
-          projVersion.setLastAccessTime(Files.getLastModifiedTime(projectDirFileCount));
-          allProjects.add(projVersion);
-        } catch (final Exception e) {
-          FlowPreparer.log
-              .error(String.format("error while loading project dir metadata for project %s",
-                  project.getFileName()), e);
-        }
-      }
-      return allProjects;
-    }
-
-    /**
-     * @return sum of the size of all project dirs
-     */
-    private long getProjectDirsTotalSizeInBytes(final List<ProjectVersion> allProjects) {
-      long totalSizeInBytes = 0;
-      for (final ProjectVersion version : allProjects) {
-        totalSizeInBytes += version.getDirSizeInBytes();
-      }
-      return totalSizeInBytes;
-    }
-
-    private void deleteLeastRecentlyUsedProjects(long sizeToFreeInBytes,
-        final List<ProjectVersion> projectVersions) {
-      // sort project version by last reference time in ascending order
-      projectVersions.sort(Comparator.comparing(ProjectVersion::getLastAccessTime));
-      for (final ProjectVersion version : projectVersions) {
-        if (sizeToFreeInBytes > 0) {
-          try {
-            // delete the project directory even if flow within is running. It's OK to
-            // delete the directory since execution dir is HARD linked to project dir.
-            FlowPreparer.log.info(String.format("deleting project version %s", version));
-            deleteDirectory(version);
-            sizeToFreeInBytes -= version.getDirSizeInBytes();
-          } catch (final IOException ex) {
-            FlowPreparer.log.error(ex);
-          }
-        }
-      }
-    }
-
-    synchronized void deleteProjectDirsIfNecessary(final long spaceToDeleteInBytes) {
-        final long start = System.currentTimeMillis();
-        final List<ProjectVersion> allProjects = loadAllProjects();
-        FlowPreparer.log
-            .debug(String.format("loading all project dirs metadata completes in %s sec(s)",
-                Duration.ofSeconds(System.currentTimeMillis() - start).getSeconds()));
-
-        final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes(allProjects);
-        if (currentSpaceInBytes + spaceToDeleteInBytes
-            >= this.projectDirMaxSizeInMb * 1024 * 1024) {
-          FlowPreparer.log.info(String.format("Project dir disk usage[%s bytes] exceeds the "
-                  + "limit[%s mb], start cleaning up project dirs",
-              currentSpaceInBytes + spaceToDeleteInBytes, this.projectDirMaxSizeInMb));
-          deleteLeastRecentlyUsedProjects(spaceToDeleteInBytes, allProjects);
-        }
-    }
-  }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 4dee864..f0b2122 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -201,14 +201,16 @@ public class FlowRunnerManager implements EventListener,
             getClass().getClassLoader());
 
     Long projectDirMaxSize = null;
+    ProjectCacheCleaner cleaner = null;
     try {
       projectDirMaxSize = props.getLong(ConfigurationKeys.PROJECT_DIR_MAX_SIZE_IN_MB);
+      cleaner = new ProjectCacheCleaner(this.projectDirectory, projectDirMaxSize);
     } catch (final UndefinedPropertyException ex) {
     }
 
     // Create a flow preparer
     this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory,
-        this.projectDirectory, projectDirMaxSize);
+        this.projectDirectory, cleaner);
 
     this.cleanerThread = new CleanerThread();
     this.cleanerThread.start();
@@ -960,7 +962,7 @@ public class FlowRunnerManager implements EventListener,
                     AzkabanExecutorServer.getApp().getPort()), "The executor can not be null");
             this.executorId = executor.getId();
           } catch (final Exception e) {
-            logger.error("Failed to fetch executor ", e);
+            FlowRunnerManager.logger.error("Failed to fetch executor ", e);
           }
         }
       } else if (FlowRunnerManager.this.active) {
@@ -969,11 +971,11 @@ public class FlowRunnerManager implements EventListener,
           final int execId = FlowRunnerManager.this.executorLoader
               .selectAndUpdateExecution(this.executorId);
           if (execId != -1) {
-            logger.info("Submitting flow " + execId);
+            FlowRunnerManager.logger.info("Submitting flow " + execId);
             submitFlow(execId);
           }
         } catch (final Exception e) {
-          logger.error("Failed to submit flow ", e);
+          FlowRunnerManager.logger.error("Failed to submit flow ", e);
         }
       }
     }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java
new file mode 100644
index 0000000..751b9c9
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2019 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import azkaban.utils.ExecutorServiceUtils;
+import azkaban.utils.FileIOUtils;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for deleting least recently accessed projects in the shared project
+ * cache when there's no room to accommodate a new project.
+ */
+class ProjectCacheCleaner {
+
+  private final File projectCacheDir;
+  private final long projectCacheMaxSizeInMB;
+  private static final Logger log = LoggerFactory.getLogger(ProjectCacheCleaner.class);
+
+  ProjectCacheCleaner(final File projectCacheDir, final long projectCacheMaxSizeInMB) {
+    Preconditions.checkNotNull(projectCacheDir);
+    Preconditions.checkArgument(projectCacheDir.exists());
+    Preconditions.checkArgument(projectCacheMaxSizeInMB > 0);
+
+    this.projectCacheDir = projectCacheDir;
+    this.projectCacheMaxSizeInMB = projectCacheMaxSizeInMB;
+  }
+
+  /**
+   * @return a list of project directories.
+   */
+  private List<Path> loadAllProjectDirs() {
+    final List<Path> projects = new ArrayList<>();
+    for (final File project : this.projectCacheDir.listFiles(new FilenameFilter() {
+
+      String pattern = "[0-9]+\\.[0-9]+";
+
+      @Override
+      public boolean accept(final File dir, final String name) {
+        return name.matches(this.pattern);
+      }
+    })) {
+      if (project.exists() && project.isDirectory()) {
+        projects.add(project.toPath());
+      } else {
+        log.debug("project {} doesn't exist or is non-dir.", project.getName());
+      }
+    }
+    return projects;
+  }
+
+  /**
+   * @return a list of {@link ProjectDirectoryMetadata} for all project directories
+   */
+  private List<ProjectDirectoryMetadata> loadAllProjects() {
+    final List<ProjectDirectoryMetadata> allProjects = new ArrayList<>();
+    for (final Path project : this.loadAllProjectDirs()) {
+      try {
+        final String fileName = project.getFileName().toString();
+        final int projectId = Integer.parseInt(fileName.split("\\.")[0]);
+        final int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
+
+        final ProjectDirectoryMetadata projectDirMetadata =
+            new ProjectDirectoryMetadata(projectId, versionNum, project.toFile());
+
+        projectDirMetadata.setDirSizeInByte(
+            FlowPreparer.calculateDirSizeAndSave(projectDirMetadata.getInstalledDir()));
+        projectDirMetadata.setLastAccessTime(
+            Files.getLastModifiedTime(Paths.get(projectDirMetadata.getInstalledDir().toString(),
+                FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)));
+        allProjects.add(projectDirMetadata);
+      } catch (final Exception e) {
+        log.warn("error while loading project dir metadata for project {}",
+            project.getFileName(), e);
+      }
+    }
+    return allProjects;
+  }
+
+  /**
+   * @return sum of the size of all project dirs
+   */
+  private long getProjectDirsTotalSizeInBytes(final List<ProjectDirectoryMetadata> allProjects) {
+    final long totalSizeInBytes = allProjects.stream()
+        .mapToLong(ProjectDirectoryMetadata::getDirSizeInByte).sum();
+    return totalSizeInBytes;
+  }
+
+  /**
+   * Delete all the files in parallel
+   *
+   * @param filesToDelete a list of files to delete
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void deleteFilesInParallel(final List<File> filesToDelete) {
+    final int CLEANING_SERVICE_THREAD_NUM = 8;
+    final ExecutorService deletionService = Executors
+        .newFixedThreadPool(CLEANING_SERVICE_THREAD_NUM);
+
+    for (final File toDelete : filesToDelete) {
+      deletionService.submit(() -> FileIOUtils.deleteDirectorySilently(toDelete));
+    }
+
+    try {
+      new ExecutorServiceUtils().gracefulShutdown(deletionService, Duration.ofDays(1));
+    } catch (final InterruptedException e) {
+      log.warn("error when deleting files", e);
+    }
+  }
+
+  /**
+   * Delete least recently used projects to free up space
+   *
+   * @param sizeToFreeInBytes space to free up
+   * @param projectDirMetadataList a list of candidate files to delete
+   */
+  private void deleteLeastRecentlyUsedProjects(long sizeToFreeInBytes,
+      final List<ProjectDirectoryMetadata> projectDirMetadataList) {
+    // Sort projects by last reference time in ascending order
+    projectDirMetadataList.sort(Comparator.comparing(ProjectDirectoryMetadata::getLastAccessTime));
+    final List<File> filesToDelete = new ArrayList<>();
+
+    for (final ProjectDirectoryMetadata proj : projectDirMetadataList) {
+      if (sizeToFreeInBytes > 0) {
+        // Delete the project directory even if flow within is running. It's OK to
+        // delete the directory since execution dir is HARD linked to project dir. Note that even
+        // if project is deleted, disk space will be freed up only when all associated execution
+        // dirs are deleted.
+        log.debug("deleting project {}", proj);
+        if (proj.getInstalledDir() != null) {
+          filesToDelete.add(proj.getInstalledDir());
+          sizeToFreeInBytes -= proj.getDirSizeInByte();
+        }
+      } else {
+        break;
+      }
+    }
+    deleteFilesInParallel(filesToDelete);
+  }
+
+  /**
+   * Deleting least recently accessed project dirs when there's no room to accommodate new project
+   */
+  void deleteProjectDirsIfNecessary(final long newProjectSizeInBytes) {
+    final long start = System.currentTimeMillis();
+    final List<ProjectDirectoryMetadata> allProjects = loadAllProjects();
+    log.debug("loading all project dirs metadata completed in {} sec(s)",
+        Duration.ofSeconds(System.currentTimeMillis() - start).getSeconds());
+
+    final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes(allProjects);
+    if (currentSpaceInBytes + newProjectSizeInBytes
+        >= this.projectCacheMaxSizeInMB * 1024 * 1024) {
+      log.info(
+          "project cache usage[{} MB] exceeds the limit[{} MB], start cleaning up project dirs",
+          (currentSpaceInBytes + newProjectSizeInBytes) / (1024 * 1024),
+          this.projectCacheMaxSizeInMB);
+
+      final long freeCacheSpaceInBytes =
+          this.projectCacheMaxSizeInMB * 1024 * 1024 - currentSpaceInBytes;
+
+      deleteLeastRecentlyUsedProjects(newProjectSizeInBytes - freeCacheSpaceInBytes, allProjects);
+    }
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index b8fd5f8..dcb4f6c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -18,6 +18,7 @@
 package azkaban.execapp;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -28,19 +29,21 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import azkaban.execapp.FlowPreparer.ProjectsDirCacheMetrics;
+import azkaban.execapp.FlowPreparer.ProjectCacheMetrics;
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
 import azkaban.project.ProjectFileHandler;
 import azkaban.storage.StorageManager;
 import azkaban.utils.FileIOUtils;
-import azkaban.utils.Pair;
 import java.io.File;
-import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -50,6 +53,7 @@ import org.junit.rules.TemporaryFolder;
 public class FlowPreparerTest {
 
   public static final String SAMPLE_FLOW_01 = "sample_flow_01";
+
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File executionsDir;
@@ -69,6 +73,15 @@ public class FlowPreparerTest {
     return storageManager;
   }
 
+  private ExecutableFlow mockExecutableFlow(final int execId, final int projectId,
+      final int version) {
+    final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
+    when(executableFlow.getExecutionId()).thenReturn(execId);
+    when(executableFlow.getProjectId()).thenReturn(projectId);
+    when(executableFlow.getVersion()).thenReturn(version);
+    return executableFlow;
+  }
+
   @Before
   public void setUp() throws Exception {
     this.executionsDir = this.temporaryFolder.newFolder("executions");
@@ -76,146 +89,107 @@ public class FlowPreparerTest {
 
     this.instance = spy(
         new FlowPreparer(createMockStorageManager(), this.executionsDir, this.projectsDir, null));
-    doNothing().when(this.instance).touchIfExists(any());
+    doNothing().when(this.instance).updateLastModifiedTime(any());
   }
 
   @Test
-  public void testSetupProject() throws Exception {
-    final ProjectVersion pv = new ProjectVersion(12, 34,
-        new File(this.projectsDir, "sample_project_01"));
-    this.instance.setupProject(pv);
+  public void testProjectDirSizeIsSet() throws Exception {
+    final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
+        new File(this.projectsDir, SAMPLE_FLOW_01));
+
+    final File tmp = this.instance.downloadProjectIfNotExists(proj);
 
     final long actualDirSize = 1048835;
 
-    assertThat(pv.getDirSizeInBytes()).isEqualTo(actualDirSize);
+    assertThat(proj.getDirSizeInByte()).isEqualTo(actualDirSize);
     assertThat(FileIOUtils.readNumberFromFile(
-        Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)))
+        Paths.get(tmp.getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)))
         .isEqualTo(actualDirSize);
-
-    assertThat(FileIOUtils.readNumberFromFile(
-        Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_COUNT_FILE_NAME)))
-        .isEqualTo(8);
-
-    assertTrue(pv.getInstalledDir().exists());
-    assertTrue(new File(pv.getInstalledDir(), "sample_flow_01").exists());
   }
 
   @Test
-  public void testSetupProjectTouchesTheDirSizeFile() throws Exception {
-    //verifies setup project touches project dir size file.
-    final ProjectVersion pv = new ProjectVersion(12, 34,
-        new File(this.projectsDir, "sample_project_01"));
-
-    //setup project 1st time will not do touch
-    this.instance.setupProject(pv);
-    verify(this.instance, never()).touchIfExists(
-        Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME));
-
-    this.instance.setupProject(pv);
-    verify(this.instance).touchIfExists(
-        Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME));
-  }
+  public void testDownloadingProjectIfNotExists() throws Exception {
+    final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
+        new File(this.projectsDir, SAMPLE_FLOW_01));
+    final File tmp = this.instance.downloadProjectIfNotExists(proj);
 
-  @Test
-  public void testSetupFlow() {
-    final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
-    when(executableFlow.getExecutionId()).thenReturn(12345);
-    when(executableFlow.getProjectId()).thenReturn(12);
-    when(executableFlow.getVersion()).thenReturn(34);
+    final Path projectDirSizeFile = Paths.get(proj.getInstalledDir().getPath(),
+        FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
 
-    this.instance.setup(executableFlow);
-    final File execDir = new File(this.executionsDir, "12345");
-    assertTrue(execDir.exists());
-    assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
+    verify(this.instance, never()).updateLastModifiedTime(projectDirSizeFile);
+    assertThat(tmp).isNotNull();
+    assertThat(tmp.list()).contains(SAMPLE_FLOW_01);
   }
 
   @Test
-  public void testFileCountCheckNotCalled() {
-    //given
-    final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
-    when(executableFlow.getExecutionId()).thenReturn(12345);
-    when(executableFlow.getProjectId()).thenReturn(12);
-    when(executableFlow.getVersion()).thenReturn(34);
-
-    //when
-    this.instance.setup(executableFlow);
+  public void testNotDownloadingProjectIfExists() throws Exception {
+    final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
+        new File(this.projectsDir, SAMPLE_FLOW_01));
+    File tmp = this.instance.downloadProjectIfNotExists(proj);
+    Files.move(tmp.toPath(), proj.getInstalledDir().toPath());
 
-    //then
-    verify(this.instance, never()).isFileCountEqual(any(), anyInt());
-  }
+    // Try downloading the same project again
+    tmp = this.instance.downloadProjectIfNotExists(proj);
 
-  @Test
-  public void testIsFileCountEqual() {
-    //given
-    final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
-        this.executionsDir, this.projectsDir, 1L);
-    final File projectDir = new File(this.projectsDir, "sample_project_01");
-    projectDir.mkdir();
-    final ProjectVersion pv = new ProjectVersion(1, 1, projectDir);
+    final Path projectDirSizeFile = Paths.get(proj.getInstalledDir().getPath(),
+        FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
 
-    //then
-    assertThat(flowPreparer.isFileCountEqual(pv, 1)).isEqualTo(true);
+    verify(this.instance).updateLastModifiedTime(projectDirSizeFile);
+    assertThat(tmp).isNull();
   }
 
   @Test
-  public void testProjectCacheDirCleanerNotEnabled() throws IOException {
-    final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects = new HashMap<>();
-
-    //given
-    final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
-        this.executionsDir, this.projectsDir, null);
+  public void testSetupFlowByMultipleThreads() {
+    final int threadNum = 4;
+
+    final ExecutableFlow[] executableFlows = new ExecutableFlow[]{
+        mockExecutableFlow(1, 12, 34),
+        mockExecutableFlow(2, 12, 34),
+        mockExecutableFlow(3, 12, 34),
+        mockExecutableFlow(4, 12, 34)
+    };
+
+    final ExecutorService service = Executors.newFixedThreadPool(threadNum);
+
+    final List<Future> futures = new ArrayList<>();
+    for (int i = 0; i < threadNum; i++) {
+      final int finalI = i;
+      futures.add(service.submit(() -> {
+        assertThatCode(() -> this.instance.setup(executableFlows[finalI])
+        ).doesNotThrowAnyException();
+      }));
+    }
 
-    //when
-    final List<File> expectedRemainingFiles = new ArrayList<>();
-    for (int i = 1; i <= 3; i++) {
-      final int projectId = i;
-      final int version = 1;
-      final ProjectVersion pv = new ProjectVersion(projectId, version, null);
-      installedProjects.put(new Pair<>(projectId, version), pv);
-      flowPreparer.setupProject(pv);
-      expectedRemainingFiles.add(pv.getInstalledDir());
+    for (final Future future : futures) {
+      assertThatCode(() -> future.get()).doesNotThrowAnyException();
     }
 
-    //then
-    assertThat(this.projectsDir.listFiles()).containsExactlyInAnyOrder(expectedRemainingFiles
-        .toArray(new File[expectedRemainingFiles.size()]));
+    service.shutdownNow();
+    for (final ExecutableFlow flow : executableFlows) {
+      final File execDir = new File(this.executionsDir, String.valueOf(flow.getExecutionId()));
+      assertTrue(execDir.exists());
+      assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
+    }
   }
 
   @Test
-  public void testProjectCacheDirCleaner() throws IOException, InterruptedException {
-    final Long projectDirMaxSize = 3L;
-
-    //given
-    final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
-        this.executionsDir, this.projectsDir, projectDirMaxSize);
-
-    //when
-    final List<File> expectedRemainingFiles = new ArrayList<>();
-    for (int i = 1; i <= 3; i++) {
-      final int projectId = i;
-      final int version = 1;
-      final ProjectVersion pv = new ProjectVersion(projectId, version, null);
-      flowPreparer.setupProject(pv);
-
-      if (i >= 2) {
-        //the first file will be deleted
-        expectedRemainingFiles.add(pv.getInstalledDir());
-      }
-      // last modified time of millis second granularity of a file is not supported by all file
-      // systems, so sleep for 1 second between creation of each project dir to make their last
-      // modified time different.
-      Thread.sleep(1000);
-    }
+  public void testSetupFlow() throws ExecutorManagerException {
+    final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
+    when(executableFlow.getExecutionId()).thenReturn(12345);
+    when(executableFlow.getProjectId()).thenReturn(12);
+    when(executableFlow.getVersion()).thenReturn(34);
 
-    //then
-    assertThat(this.projectsDir.listFiles()).containsExactlyInAnyOrder(expectedRemainingFiles
-        .toArray(new File[expectedRemainingFiles.size()]));
+    this.instance.setup(executableFlow);
+    final File execDir = new File(this.executionsDir, "12345");
+    assertTrue(execDir.exists());
+    assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
   }
 
+
   @Test
   public void testProjectsCacheMetricsZeroHit() {
     //given
-    final FlowPreparer.ProjectsDirCacheMetrics cacheMetrics = new ProjectsDirCacheMetrics();
+    final ProjectCacheMetrics cacheMetrics = new ProjectCacheMetrics();
 
     //when zero hit and zero miss then
     assertThat(cacheMetrics.getHitRatio()).isEqualTo(0);
@@ -229,7 +203,7 @@ public class FlowPreparerTest {
   @Test
   public void testProjectsCacheMetricsHit() {
     //given
-    final FlowPreparer.ProjectsDirCacheMetrics cacheMetrics = new ProjectsDirCacheMetrics();
+    final ProjectCacheMetrics cacheMetrics = new ProjectCacheMetrics();
 
     //when one hit
     cacheMetrics.incrementCacheHit();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java
new file mode 100644
index 0000000..ba786cc
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.execapp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.utils.Utils;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
+import java.util.zip.ZipFile;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+public class ProjectCacheCleanerTest {
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private File cacheDir;
+
+  private void unzip(final Path srcZipFile, final Path dest) throws IOException {
+    final ZipFile zip = new ZipFile(srcZipFile.toFile());
+    Utils.unzip(zip, dest.toFile());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    this.cacheDir = this.temporaryFolder.newFolder("projects");
+    final ClassLoader classLoader = getClass().getClassLoader();
+
+    final long current = System.currentTimeMillis();
+    unzip(Paths.get(classLoader.getResource("1.1.zip").getPath()),
+        this.cacheDir.toPath());
+    Files.setLastModifiedTime(Paths.get(this.cacheDir.toString(), "1.1",
+        FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME), FileTime.fromMillis(current - 2000));
+
+    unzip(Paths.get(classLoader.getResource("2.1.zip").getPath()),
+        this.cacheDir.toPath());
+    Files.setLastModifiedTime(Paths.get(this.cacheDir.toString(), "2.1",
+        FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME), FileTime.fromMillis(current - 1000));
+
+    unzip(Paths.get(classLoader.getResource("3.1.zip").getPath()),
+        this.cacheDir.toPath());
+    Files.setLastModifiedTime(Paths.get(this.cacheDir.toString(), "3.1",
+        FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME), FileTime.fromMillis(current));
+  }
+
+  @Test
+  /**
+   * There's still space in the cache, no deletion.
+   */
+  public void testNotDeleting() {
+    final Long projectDirMaxSizeInMB = 7L;
+    final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir,
+        projectDirMaxSizeInMB);
+    cleaner.deleteProjectDirsIfNecessary(1);
+
+    assertThat(this.cacheDir.list()).hasSize(3);
+  }
+
+  @Test
+  /**
+   * Deleting everything in the cache to accommodate new item.
+   */
+  public void testDeletingAll() {
+    final Long projectDirMaxSize = 3L;
+    final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+    cleaner.deleteProjectDirsIfNecessary(7000000);
+
+    assertThat(this.cacheDir.list()).hasSize(0);
+  }
+
+  @Test
+  /**
+   * Deleting two least recently used items in the cache to accommodate new item.
+   */
+  public void testDeletingTwoLRUItems() {
+    final Long projectDirMaxSize = 7L;
+    final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+    cleaner.deleteProjectDirsIfNecessary(3000000);
+    assertThat(this.cacheDir.list()).hasSize(1);
+    assertThat(this.cacheDir.list()).contains("3.1");
+  }
+
+  @Test
+  /**
+   * Deleting the least recently used item in the cache to accommodate new item.
+   */
+  public void testDeletingOneLRUItem() {
+    final Long projectDirMaxSize = 7L;
+    final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+    cleaner.deleteProjectDirsIfNecessary(2000000);
+    assertThat(this.cacheDir.list()).hasSize(2);
+    assertThat(this.cacheDir.list()).contains("3.1");
+    assertThat(this.cacheDir.list()).contains("2.1");
+  }
+}
diff --git a/azkaban-exec-server/src/test/resources/1.1.zip b/azkaban-exec-server/src/test/resources/1.1.zip
new file mode 100644
index 0000000..98f1a2a
Binary files /dev/null and b/azkaban-exec-server/src/test/resources/1.1.zip differ
diff --git a/azkaban-exec-server/src/test/resources/2.1.zip b/azkaban-exec-server/src/test/resources/2.1.zip
new file mode 100644
index 0000000..4b574e8
Binary files /dev/null and b/azkaban-exec-server/src/test/resources/2.1.zip differ
diff --git a/azkaban-exec-server/src/test/resources/3.1.zip b/azkaban-exec-server/src/test/resources/3.1.zip
new file mode 100644
index 0000000..430f87b
Binary files /dev/null and b/azkaban-exec-server/src/test/resources/3.1.zip differ
diff --git a/azkaban-exec-server/src/test/resources/sample_flow_01.zip b/azkaban-exec-server/src/test/resources/sample_flow_01.zip
index 1ac01a4..53ae5bd 100644
Binary files a/azkaban-exec-server/src/test/resources/sample_flow_01.zip and b/azkaban-exec-server/src/test/resources/sample_flow_01.zip differ