azkaban-aplcache

project LRU cache part 2 (#1865) The problem this PR targets

7/25/2018 8:25:28 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index e781732..b3fd0f4 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -226,6 +226,9 @@ public class Constants {
     public static final String QUEUEPROCESSING_ENABLED = "azkaban.queueprocessing.enabled";
 
     public static final String SESSION_TIME_TO_LIVE = "session.time.to.live";
+
+    // allowed max size of shared project dir in MB
+    public static final String PROJECT_DIR_MAX_SIZE_IN_MB = "azkaban.project_cache_max_size_in_mb";
   }
 
   public static class FlowProperties {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 47010d5..5d93d4d 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -35,6 +35,8 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.FileTime;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.zip.ZipFile;
 import org.apache.commons.io.FileUtils;
@@ -52,16 +54,21 @@ public class FlowPreparer {
   private final File projectsDir;
   private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
   private final StorageManager storageManager;
+  private final ProjectCacheDirCleaner projectDirCleaner;
 
   public FlowPreparer(final StorageManager storageManager, final File executionsDir,
       final File projectsDir,
-      final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects) {
+      final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects,
+      final Long projectDirMaxSizeInMb) {
     this.storageManager = storageManager;
     this.executionsDir = executionsDir;
     this.projectsDir = projectsDir;
     this.installedProjects = installedProjects;
+    this.projectDirCleaner = new ProjectCacheDirCleaner(projectDirMaxSizeInMb);
+
   }
 
+
   /**
    * Prepare the flow directory for execution.
    *
@@ -79,11 +86,14 @@ public class FlowPreparer {
       // Create the execution directory
       execDir = createExecDir(flow);
 
-      // Create the symlinks from the project
-      copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
-
-      log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
-          flow.getExecutionId(), execDir.getPath()));
+      // Synchronized on {@code projectVersion} to prevent one thread deleting a project dir
+      // in {@link FlowPreparer#setup} while another is creating hardlink from the same project dir
+      synchronized (projectVersion) {
+        // Create the symlinks from the project
+        copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+        log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
+            flow.getExecutionId(), execDir.getPath()));
+      }
     } catch (final Exception e) {
       log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
       cleanup(execDir);
@@ -91,6 +101,16 @@ public class FlowPreparer {
     }
   }
 
+  private void cleanup(final File execDir) {
+    if (execDir != null) {
+      try {
+        FileUtils.deleteDirectory(execDir);
+      } catch (final IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
   /**
    * Touch the file if it exists.
    *
@@ -105,7 +125,6 @@ public class FlowPreparer {
     }
   }
 
-
   /**
    * Prepare the project directory.
    *
@@ -148,7 +167,9 @@ public class FlowPreparer {
       final ZipFile zip = new ZipFile(zipFile);
       Utils.unzip(zip, tempDir);
       updateDirSize(tempDir, pv);
+      this.projectDirCleaner.deleteProjectDirsIfNecessary(pv.getDirSizeInBytes());
       Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
+      this.installedProjects.put(new Pair<>(pv.getProjectId(), pv.getVersion()), pv);
       log.warn(String.format("Project preparation completes. [%s]", pv));
     } finally {
       if (projectFileHandler != null) {
@@ -168,7 +189,7 @@ public class FlowPreparer {
    */
   private void updateDirSize(final File dir, final ProjectVersion pv) {
     final long sizeInByte = FileUtils.sizeOfDirectory(dir);
-    pv.setDirSize(sizeInByte);
+    pv.setDirSizeInBytes(sizeInByte);
     try {
       FileIOUtils.dumpNumberToFile(Paths.get(dir.getPath(), PROJECT_DIR_SIZE_FILE_NAME),
           sizeInByte);
@@ -197,19 +218,79 @@ public class FlowPreparer {
     // to set up.
     final ProjectVersion projectVersion;
     synchronized (this.installedProjects) {
-      projectVersion = this.installedProjects
-          .computeIfAbsent(new Pair<>(flow.getProjectId(), flow.getVersion()),
-              k -> new ProjectVersion(flow.getProjectId(), flow.getVersion()));
+      final Pair<Integer, Integer> pair = new Pair<>(flow.getProjectId(), flow.getVersion());
+      projectVersion = this.installedProjects.getOrDefault(pair, new ProjectVersion(flow
+          .getProjectId(), flow.getVersion()));
     }
     return projectVersion;
   }
 
-  private void cleanup(final File execDir) {
-    if (execDir != null) {
+  private class ProjectCacheDirCleaner {
+
+    private final Long projectDirMaxSizeInMb;
+
+    ProjectCacheDirCleaner(final Long projectDirMaxSizeInMb) {
+      this.projectDirMaxSizeInMb = projectDirMaxSizeInMb;
+    }
+
+    /**
+     * @return sum of the size of all project dirs
+     */
+    private long getProjectDirsTotalSizeInBytes() throws IOException {
+      long totalSizeInBytes = 0;
+      for (final ProjectVersion version : FlowPreparer.this.installedProjects.values()) {
+        totalSizeInBytes += version.getDirSizeInBytes();
+      }
+      return totalSizeInBytes;
+    }
+
+    private FileTime getLastReferenceTime(final ProjectVersion pv) throws IOException {
+      final Path dirSizeFile = Paths
+          .get(pv.getInstalledDir().toPath().toString(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
+      return Files.getLastModifiedTime(dirSizeFile);
+    }
+
+    private void deleteLeastRecentlyUsedProjects(long sizeToFreeInBytes,
+        final List<ProjectVersion>
+            projectVersions) throws IOException {
+      // sort project version by last reference time in ascending order
       try {
-        FileUtils.deleteDirectory(execDir);
-      } catch (final IOException e) {
-        throw new RuntimeException(e);
+        projectVersions.sort((o1, o2) -> {
+          try {
+            final FileTime lastReferenceTime1 = getLastReferenceTime(o1);
+            final FileTime lastReferenceTime2 = getLastReferenceTime(o2);
+            return lastReferenceTime1.compareTo(lastReferenceTime2);
+          } catch (final IOException ex) {
+            throw new RuntimeException(ex);
+          }
+        });
+      } catch (final RuntimeException ex) {
+        throw new IOException(ex);
+      }
+
+      for (final ProjectVersion version : projectVersions) {
+        if (sizeToFreeInBytes > 0) {
+          try {
+            // delete the project directory even if flow within is running. It's ok to
+            // delete the directory since execution dir is HARD linked to project dir.
+            FlowRunnerManager.deleteDirectory(version);
+            FlowPreparer.this.installedProjects.remove(new Pair<>(version.getProjectId(), version
+                .getVersion()));
+            sizeToFreeInBytes -= version.getDirSizeInBytes();
+          } catch (final IOException ex) {
+            log.error(ex);
+          }
+        }
+      }
+    }
+
+    void deleteProjectDirsIfNecessary(final long spaceToDeleteInBytes) throws IOException {
+      final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes();
+      if (this.projectDirMaxSizeInMb != null
+          && (currentSpaceInBytes + spaceToDeleteInBytes) >= this
+          .projectDirMaxSizeInMb * 1024 * 1024) {
+        deleteLeastRecentlyUsedProjects(spaceToDeleteInBytes,
+            new ArrayList<>(FlowPreparer.this.installedProjects.values()));
       }
     }
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index e7d17da..405766b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -17,6 +17,7 @@
 package azkaban.execapp;
 
 import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
 import azkaban.event.Event;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
@@ -46,10 +47,14 @@ import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.ThreadPoolExecutingListener;
 import azkaban.utils.TrackingThreadPool;
+import azkaban.utils.UndefinedPropertyException;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.lang.Thread.State;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -121,28 +126,22 @@ public class FlowRunnerManager implements EventListener,
   private final Props azkabanProps;
   private final File executionDirectory;
   private final File projectDirectory;
-
   private final Object executionDirDeletionSync = new Object();
+
   private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
   private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
   private int threadPoolQueueSize = -1;
   private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
-
   private Props globalProps;
-
   private long lastCleanerThreadCheckTime = -1;
   private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
-
   // We want to limit the log sizes to about 20 megs
   private String jobLogChunkSize = "5MB";
   private int jobLogNumFiles = 4;
-
   // If true, jobs will validate proxy user against a list of valid proxy users.
   private boolean validateProxyUser = false;
-
   // date time of the the last flow submitted.
   private long lastFlowSubmittedDate = 0;
-
   // whether the current executor is active
   private volatile boolean isExecutorActive = false;
 
@@ -170,18 +169,13 @@ public class FlowRunnerManager implements EventListener,
       this.projectDirectory.mkdirs();
     }
 
-    this.installedProjects = new HashMap<>();
+    this.installedProjects = new ConcurrentHashMap<>();
 
     // azkaban.temp.dir
     this.numThreads = props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
     this.numJobThreadPerFlow = props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
     this.executorService = createExecutorService(this.numThreads);
 
-    // Create a flow preparer
-    this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory,
-        this.projectDirectory,
-        this.installedProjects);
-
     this.executorLoader = executorLoader;
     this.projectLoader = projectLoader;
     this.triggerManager = triggerManager;
@@ -191,9 +185,6 @@ public class FlowRunnerManager implements EventListener,
 
     this.validateProxyUser = this.azkabanProps.getBoolean("proxy.user.lock.down", false);
 
-    this.cleanerThread = new CleanerThread();
-    this.cleanerThread.start();
-
     final String globalPropsPath = props.getString("executor.global.properties", null);
     if (globalPropsPath != null) {
       this.globalProps = new Props(null, globalPropsPath);
@@ -204,6 +195,36 @@ public class FlowRunnerManager implements EventListener,
             AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR,
             JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), this.globalProps,
             getClass().getClassLoader());
+
+    Long projectDirMaxSize = null;
+    try {
+      projectDirMaxSize = props.getLong(ConfigurationKeys.PROJECT_DIR_MAX_SIZE_IN_MB);
+    } catch (final UndefinedPropertyException ex) {
+    }
+
+    // Create a flow preparer
+    this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory,
+        this.projectDirectory, this.installedProjects, projectDirMaxSize);
+
+    this.cleanerThread = new CleanerThread();
+    this.cleanerThread.start();
+  }
+
+  /*
+   * Delete the project dir associated with {@code version}.
+   * It first acquires object lock of {@code version} waiting for other threads creating
+   * execution dir to finish to avoid race condition. An example of race condition scenario:
+   * delete the dir of a project while an execution of a flow in the same project is being setup
+   * and the flow's execution dir is being created({@link FlowPreparer#setup}).
+   */
+  static void deleteDirectory(final ProjectVersion pv) throws IOException {
+    synchronized (pv) {
+      logger.warn("Deleting project: " + pv);
+      final File installedDir = pv.getInstalledDir();
+      if (installedDir != null && installedDir.exists()) {
+        FileUtils.deleteDirectory(installedDir);
+      }
+    }
   }
 
   /**
@@ -250,9 +271,8 @@ public class FlowRunnerManager implements EventListener,
     }
   }
 
-  private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
-    final Map<Pair<Integer, Integer>, ProjectVersion> allProjects =
-        new HashMap<>();
+  private List<Path> loadExistingProjects() {
+    final List<Path> projects = new ArrayList<>();
     for (final File project : this.projectDirectory.listFiles(new FilenameFilter() {
 
       String pattern = "[0-9]+\\.[0-9]+";
@@ -263,19 +283,35 @@ public class FlowRunnerManager implements EventListener,
       }
     })) {
       if (project.isDirectory()) {
+        projects.add(project.toPath());
+      }
+    }
+    return projects;
+  }
+
+  private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjectsAsCache() {
+    final Map<Pair<Integer, Integer>, ProjectVersion> allProjects =
+        new ConcurrentHashMap<>();
+    for (final Path project : this.loadExistingProjects()) {
+      if (Files.isDirectory(project)) {
         try {
-          final String fileName = new File(project.getAbsolutePath()).getName();
+          final String fileName = project.getFileName().toString();
           final int projectId = Integer.parseInt(fileName.split("\\.")[0]);
           final int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
           final ProjectVersion version =
-              new ProjectVersion(projectId, versionNum, project);
-          allProjects.put(new Pair<>(projectId, versionNum),
-              version);
+              new ProjectVersion(projectId, versionNum, project.toFile());
+
+          version.setDirSizeInBytes(
+              FileIOUtils.readNumberFromFile(Paths.get(version.getInstalledDir().toString(),
+                  FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)));
+
+          allProjects.put(new Pair<>(projectId, versionNum), version);
         } catch (final Exception e) {
           e.printStackTrace();
         }
       }
     }
+
     return allProjects;
   }
 
@@ -285,7 +321,7 @@ public class FlowRunnerManager implements EventListener,
   public void setExecutorActive(final boolean isActive) {
     this.isExecutorActive = isActive;
     if (this.isExecutorActive) {
-      this.installedProjects = this.loadExistingProjects();
+      this.installedProjects = this.loadExistingProjectsAsCache();
     }
   }
 
@@ -304,16 +340,6 @@ public class FlowRunnerManager implements EventListener,
     this.globalProps = globalProps;
   }
 
-  public void deleteDirectory(final ProjectVersion pv) throws IOException {
-    synchronized (pv) {
-      logger.warn("Deleting project: " + pv);
-      final File installedDir = pv.getInstalledDir();
-      if (installedDir != null && installedDir.exists()) {
-        FileUtils.deleteDirectory(installedDir);
-      }
-    }
-  }
-
   public void submitFlow(final int execId) throws ExecutorManagerException {
     // Load file and submit
     if (this.runningFlows.containsKey(execId)) {
@@ -776,6 +802,26 @@ public class FlowRunnerManager implements EventListener,
     }
   }
 
+  private Set<Pair<Integer, Integer>> getActiveProjectVersions() {
+    final Set<Pair<Integer, Integer>> activeProjectVersions = new HashSet<>();
+    for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
+      final ExecutableFlow flow = runner.getExecutableFlow();
+      activeProjectVersions.add(new Pair<>(flow
+          .getProjectId(), flow.getVersion()));
+    }
+    return activeProjectVersions;
+  }
+
+  /**
+   * Checks if the project version contains any running flow
+   */
+  private boolean isActiveProject(final ProjectVersion version) {
+    final Pair<Integer, Integer> versionKey = new Pair<>(version.getProjectId(),
+        version.getVersion());
+    return getActiveProjectVersions().contains(versionKey);
+  }
+
+
   private class CleanerThread extends Thread {
 
     // Every hour, clean execution dir.
@@ -784,7 +830,6 @@ public class FlowRunnerManager implements EventListener,
     private static final long OLD_PROJECT_DIR_INTERVAL_MS = 5 * 60 * 1000;
     // Every 2 mins clean the recently finished list
     private static final long RECENTLY_FINISHED_INTERVAL_MS = 2 * 60 * 1000;
-
     // Every 5 mins kill flows running longer than allowed max running time
     private static final long LONG_RUNNING_FLOW_KILLING_INTERVAL_MS = 5 * 60 * 1000;
     private final long flowMaxRunningTimeInMins = FlowRunnerManager.this.azkabanProps.getInt(
@@ -833,7 +878,7 @@ public class FlowRunnerManager implements EventListener,
             if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > this.lastOldProjectCleanTime
                 && FlowRunnerManager.this.isExecutorActive) {
               logger.info("Cleaning old projects");
-              cleanOlderProjects();
+              cleanProjectsOfOldVersion();
               this.lastOldProjectCleanTime = currentTime;
             }
 
@@ -922,7 +967,7 @@ public class FlowRunnerManager implements EventListener,
       }
     }
 
-    private void cleanOlderProjects() {
+    private void cleanProjectsOfOldVersion() {
       final Map<Integer, ArrayList<ProjectVersion>> projectVersions =
           new HashMap<>();
       for (final ProjectVersion version : FlowRunnerManager.this.installedProjects.values()) {
@@ -935,14 +980,6 @@ public class FlowRunnerManager implements EventListener,
         versionList.add(version);
       }
 
-      final HashSet<Pair<Integer, Integer>> activeProjectVersions =
-          new HashSet<>();
-      for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
-        final ExecutableFlow flow = runner.getExecutableFlow();
-        activeProjectVersions.add(new Pair<>(flow
-            .getProjectId(), flow.getVersion()));
-      }
-
       for (final Map.Entry<Integer, ArrayList<ProjectVersion>> entry : projectVersions
           .entrySet()) {
         // Integer projectId = entry.getKey();
@@ -956,10 +993,7 @@ public class FlowRunnerManager implements EventListener,
         Collections.sort(installedVersions);
         for (int i = 0; i < installedVersions.size() - 1; ++i) {
           final ProjectVersion version = installedVersions.get(i);
-          final Pair<Integer, Integer> versionKey =
-              new Pair<>(version.getProjectId(),
-                  version.getVersion());
-          if (!activeProjectVersions.contains(versionKey)) {
+          if (!isActiveProject(version)) {
             try {
               logger.info("Removing old unused installed project "
                   + version.getProjectId() + ":" + version.getVersion());
@@ -973,6 +1007,8 @@ public class FlowRunnerManager implements EventListener,
         }
       }
     }
+
   }
 
+
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
index a97ca35..4972917 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
@@ -42,11 +42,11 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
     this.installedDir = installedDir;
   }
 
-  public Long getDirSize() {
+  public Long getDirSizeInBytes() {
     return this.dirSize;
   }
 
-  public void setDirSize(final Long dirSize) {
+  public void setDirSizeInBytes(final Long dirSize) {
     this.dirSize = dirSize;
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index 7af3573..1fe97c9 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -20,6 +20,7 @@ package azkaban.execapp;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -33,8 +34,11 @@ import azkaban.storage.StorageManager;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.Pair;
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
@@ -52,13 +56,7 @@ public class FlowPreparerTest {
 
   private FlowPreparer instance;
 
-  @Before
-  public void setUp() throws Exception {
-    tearDown();
-
-    this.executionsDir.mkdirs();
-    this.projectsDir.mkdirs();
-
+  private StorageManager createMockStorageManager() {
     final ClassLoader classLoader = getClass().getClassLoader();
     final File file = new File(classLoader.getResource(SAMPLE_FLOW_01 + ".zip").getFile());
 
@@ -67,10 +65,20 @@ public class FlowPreparerTest {
     when(projectFileHandler.getLocalFile()).thenReturn(file);
 
     final StorageManager storageManager = mock(StorageManager.class);
-    when(storageManager.getProjectFile(12, 34)).thenReturn(projectFileHandler);
+    when(storageManager.getProjectFile(anyInt(), anyInt())).thenReturn(projectFileHandler);
+    return storageManager;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    tearDown();
+
+    this.executionsDir.mkdirs();
+    this.projectsDir.mkdirs();
 
-    this.instance = spy(new FlowPreparer(storageManager, this.executionsDir, this.projectsDir,
-        this.installedProjects));
+    this.instance = spy(
+        new FlowPreparer(createMockStorageManager(), this.executionsDir, this.projectsDir,
+            this.installedProjects, null));
     doNothing().when(this.instance).touchIfExists(any());
   }
 
@@ -86,9 +94,9 @@ public class FlowPreparerTest {
         new File(this.projectsDir, "sample_project_01"));
     this.instance.setupProject(pv);
 
-    final long actualDirSize = 259;
+    final long actualDirSize = 1048835;
 
-    assertThat(pv.getDirSize()).isEqualTo(actualDirSize);
+    assertThat(pv.getDirSizeInBytes()).isEqualTo(actualDirSize);
     assertThat(FileIOUtils.readNumberFromFile(
         Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)))
         .isEqualTo(actualDirSize);
@@ -124,4 +132,61 @@ public class FlowPreparerTest {
     assertTrue(execDir.exists());
     assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
   }
+
+
+  @Test
+  public void testProjectCacheDirCleanerNotEnabled() throws IOException {
+    final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects = new HashMap<>();
+
+    //given
+    final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
+        this.executionsDir, this.projectsDir, installedProjects, null);
+
+    //when
+    final List<File> expectedRemainingFiles = new ArrayList<>();
+    for (int i = 1; i <= 3; i++) {
+      final int projectId = i;
+      final int version = 1;
+      final ProjectVersion pv = new ProjectVersion(projectId, version, null);
+      installedProjects.put(new Pair<>(projectId, version), pv);
+      flowPreparer.setupProject(pv);
+      expectedRemainingFiles.add(pv.getInstalledDir());
+    }
+
+    //then
+    assertThat(this.projectsDir.listFiles()).containsExactlyInAnyOrder(expectedRemainingFiles
+        .toArray(new File[expectedRemainingFiles.size()]));
+  }
+
+  @Test
+  public void testProjectCacheDirCleaner() throws IOException, InterruptedException {
+    final Long projectDirMaxSize = 3L;
+    final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects = new HashMap<>();
+
+    //given
+    final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
+        this.executionsDir, this.projectsDir, installedProjects, projectDirMaxSize);
+
+    //when
+    final List<File> expectedRemainingFiles = new ArrayList<>();
+    for (int i = 1; i <= 3; i++) {
+      final int projectId = i;
+      final int version = 1;
+      final ProjectVersion pv = new ProjectVersion(projectId, version, null);
+      flowPreparer.setupProject(pv);
+
+      if (i >= 2) {
+        //the first file will be deleted
+        expectedRemainingFiles.add(pv.getInstalledDir());
+      }
+      // last modified time of millis second granularity of a file is not supported by all file
+      // systems, so sleep for 1 second between creation of each project dir to make their last
+      // modified time different.
+      Thread.sleep(1000);
+    }
+
+    //then
+    assertThat(this.projectsDir.listFiles()).containsExactlyInAnyOrder(expectedRemainingFiles
+        .toArray(new File[expectedRemainingFiles.size()]));
+  }
 }
diff --git a/azkaban-exec-server/src/test/resources/sample_flow_01.zip b/azkaban-exec-server/src/test/resources/sample_flow_01.zip
index 1147976..1ac01a4 100644
Binary files a/azkaban-exec-server/src/test/resources/sample_flow_01.zip and b/azkaban-exec-server/src/test/resources/sample_flow_01.zip differ