azkaban-aplcache

Project dir cache enhancement (#2017) a race condition scenarios

11/13/2018 10:15:19 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index c2e270e..967e755 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -72,6 +72,19 @@ public class FileIOUtils {
     return true;
   }
 
+  public static int getFileCount(final File file) {
+    final File[] files = file.listFiles();
+    int count = 0;
+    for (final File f : files) {
+      if (f.isDirectory()) {
+        count += getFileCount(f);
+      } else {
+        count++;
+      }
+    }
+    return count;
+  }
+
 
   /**
    * Dumps a number into a new file.
@@ -81,7 +94,7 @@ public class FileIOUtils {
    * @throws IOException if file already exists
    */
   public static void dumpNumberToFile(final Path filePath, final long num) throws IOException {
-    try (BufferedWriter writer = Files
+    try (final BufferedWriter writer = Files
         .newBufferedWriter(filePath, StandardCharsets.UTF_8)) {
       writer.write(String.valueOf(num));
     } catch (final IOException e) {
diff --git a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
index 43565d9..b6ceb19 100644
--- a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
@@ -135,6 +135,11 @@ public class FileIOUtilsTest {
   }
 
   @Test
+  public void testFileCount() {
+    assertThat(FileIOUtils.getFileCount(this.baseDir)).isEqualTo(5);
+  }
+
+  @Test
   public void testHardlinkCopy() throws IOException {
     FileIOUtils.createDeepHardlink(this.sourceDir, this.destDir);
     assertThat(areDirsEqual(this.sourceDir, this.destDir, true)).isTrue();
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 9c6c1a3..74f7411 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -25,19 +25,20 @@ import azkaban.project.ProjectFileHandler;
 import azkaban.project.ProjectManagerException;
 import azkaban.storage.StorageManager;
 import azkaban.utils.FileIOUtils;
-import azkaban.utils.Pair;
 import azkaban.utils.Utils;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.FileTime;
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.zip.ZipFile;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
@@ -47,23 +48,23 @@ public class FlowPreparer {
 
   // Name of the file which keeps project directory size
   static final String PROJECT_DIR_SIZE_FILE_NAME = "___azkaban_project_dir_size_in_bytes___";
+
+  // Name of the file which keeps count of files inside the directory
+  static final String PROJECT_DIR_COUNT_FILE_NAME = "___azkaban_project_dir_count___";
+
   private static final Logger log = Logger.getLogger(FlowPreparer.class);
   // TODO spyne: move to config class
   private final File executionsDir;
   // TODO spyne: move to config class
   private final File projectsDir;
-  private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
   private final StorageManager storageManager;
   private final ProjectCacheDirCleaner projectDirCleaner;
 
   public FlowPreparer(final StorageManager storageManager, final File executionsDir,
-      final File projectsDir,
-      final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects,
-      final Long projectDirMaxSizeInMb) {
+      final File projectsDir, final Long projectDirMaxSizeInMb) {
     this.storageManager = storageManager;
     this.executionsDir = executionsDir;
     this.projectsDir = projectsDir;
-    this.installedProjects = installedProjects;
     this.projectDirCleaner = new ProjectCacheDirCleaner(projectDirMaxSizeInMb);
 
   }
@@ -76,17 +77,39 @@ public class FlowPreparer {
    * @param pv the projectVersion whose size needs to updated.
    */
   static void updateDirSize(final File dir, final ProjectVersion pv) {
-    final long sizeInByte = FileUtils.sizeOfDirectory(dir);
-    pv.setDirSizeInBytes(sizeInByte);
     try {
-      FileIOUtils.dumpNumberToFile(Paths.get(dir.getPath(), PROJECT_DIR_SIZE_FILE_NAME),
-          sizeInByte);
+      final Path path = Paths.get(dir.getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
+      if (!Files.exists(path)) {
+        final long sizeInByte = FileUtils.sizeOfDirectory(dir);
+        FileIOUtils.dumpNumberToFile(path, sizeInByte);
+      }
+      pv.setDirSizeInBytes(FileIOUtils.readNumberFromFile(path));
     } catch (final IOException e) {
       log.error("error when dumping dir size to file", e);
     }
   }
 
   /**
+   * Creates a file which keeps the count of files inside {@param dir}
+   *
+   * @param dir the directory whose size needs to be kept in the file to be created.
+   * @param pv the projectVersion whose size needs to updated.
+   */
+  static void updateFileCount(final File dir, final ProjectVersion pv) {
+    try {
+      final Path path = Paths.get(dir.getPath(), PROJECT_DIR_COUNT_FILE_NAME);
+      if (!Files.exists(path)) {
+        // count itself
+        final int fileCount = FileIOUtils.getFileCount(dir) + 1;
+        FileIOUtils.dumpNumberToFile(path, fileCount);
+      }
+      pv.setFileCount((int) FileIOUtils.readNumberFromFile(path));
+    } catch (final IOException e) {
+      log.error("error when dumping file count to file", e);
+    }
+  }
+
+  /**
    * Prepare the flow directory for execution.
    *
    * @param flow Executable Flow instance.
@@ -95,7 +118,8 @@ public class FlowPreparer {
     File execDir = null;
     try {
       // First get the ProjectVersion
-      final ProjectVersion projectVersion = getProjectVersion(flow);
+      final ProjectVersion projectVersion = new ProjectVersion(flow.getProjectId(),
+          flow.getVersion());
 
       // Setup the project
       setupProject(projectVersion);
@@ -103,14 +127,12 @@ public class FlowPreparer {
       // Create the execution directory
       execDir = createExecDir(flow);
 
-      // Synchronized on {@code projectVersion} to prevent one thread deleting a project dir
-      // in {@link FlowPreparer#setup} while another is creating hardlink from the same project dir
-      synchronized (projectVersion) {
-        // Create the symlinks from the project
-        copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
-        log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
-            flow.getExecutionId(), execDir.getPath()));
-      }
+      // Create the symlinks from the project
+      copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+
+      log.info(String
+          .format("Flow Preparation complete. [execid: %d, path: %s]", flow.getExecutionId(),
+              execDir.getPath()));
     } catch (final Exception e) {
       log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
       cleanup(execDir);
@@ -176,17 +198,18 @@ public class FlowPreparer {
 
     ProjectFileHandler projectFileHandler = null;
     try {
-      projectFileHandler = requireNonNull(this.storageManager.getProjectFile(projectId, version));
+      log.info(String.format("Downloading zip file for Project Version {%s}", pv));
+      projectFileHandler = requireNonNull(
+          this.storageManager.getProjectFile(pv.getProjectId(), pv.getVersion()));
       checkState("zip".equals(projectFileHandler.getFileType()));
-
-      log.info("Downloading zip file.");
       final File zipFile = requireNonNull(projectFileHandler.getLocalFile());
       final ZipFile zip = new ZipFile(zipFile);
       Utils.unzip(zip, tempDir);
       updateDirSize(tempDir, pv);
+      updateFileCount(tempDir, pv);
+      log.info(String.format("Downloading zip file for Project Version {%s} completes", pv));
       this.projectDirCleaner.deleteProjectDirsIfNecessary(pv.getDirSizeInBytes());
       Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
-      this.installedProjects.put(new Pair<>(pv.getProjectId(), pv.getVersion()), pv);
       log.warn(String.format("Project preparation completes. [%s]", pv));
     } finally {
       if (projectFileHandler != null) {
@@ -212,84 +235,120 @@ public class FlowPreparer {
     return execDir;
   }
 
-  private ProjectVersion getProjectVersion(final ExecutableFlow flow) {
-    // We're setting up the installed projects. First time, it may take a while
-    // to set up.
-    final ProjectVersion projectVersion;
-    synchronized (this.installedProjects) {
-      final Pair<Integer, Integer> pair = new Pair<>(flow.getProjectId(), flow.getVersion());
-      projectVersion = this.installedProjects.getOrDefault(pair, new ProjectVersion(flow
-          .getProjectId(), flow.getVersion()));
-    }
-    return projectVersion;
-  }
 
   private class ProjectCacheDirCleaner {
 
     private final Long projectDirMaxSizeInMb;
 
+    /*
+     * Delete the project dir associated with {@code version}.
+     */
+    private void deleteDirectory(final ProjectVersion pv) throws IOException {
+      final File installedDir = pv.getInstalledDir();
+      if (installedDir != null && installedDir.exists()) {
+        FileUtils.deleteDirectory(installedDir);
+      }
+    }
+
     ProjectCacheDirCleaner(final Long projectDirMaxSizeInMb) {
       this.projectDirMaxSizeInMb = projectDirMaxSizeInMb;
     }
 
+    private List<Path> loadAllProjectDirs() {
+      final List<Path> projects = new ArrayList<>();
+      for (final File project : FlowPreparer.this.projectsDir.listFiles(new FilenameFilter() {
+
+        String pattern = "[0-9]+\\.[0-9]+";
+
+        @Override
+        public boolean accept(final File dir, final String name) {
+          return name.matches(this.pattern);
+        }
+      })) {
+        if (project.exists() && project.isDirectory()) {
+          projects.add(project.toPath());
+        } else {
+          FlowPreparer.log
+              .debug(String.format("project %s doesn't exist or is non-dir.", project.getName()));
+        }
+      }
+      return projects;
+    }
+
+    private List<ProjectVersion> loadAllProjects() {
+      final List<ProjectVersion> allProjects = new ArrayList<>();
+      for (final Path project : this.loadAllProjectDirs()) {
+        try {
+          final String fileName = project.getFileName().toString();
+          final int projectId = Integer.parseInt(fileName.split("\\.")[0]);
+          final int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
+
+          final ProjectVersion projVersion =
+              new ProjectVersion(projectId, versionNum, project.toFile());
+
+          FlowPreparer.updateDirSize(projVersion.getInstalledDir(), projVersion);
+          FlowPreparer.updateFileCount(projVersion.getInstalledDir(), projVersion);
+
+          final Path projectDirFileCount = Paths.get(projVersion.getInstalledDir().toString(),
+              FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
+
+          projVersion.setLastAccessTime(Files.getLastModifiedTime(projectDirFileCount));
+          allProjects.add(projVersion);
+        } catch (final Exception e) {
+          FlowPreparer.log
+              .error(String.format("error while loading project dir metadata for project %s",
+                  project.getFileName()), e);
+        }
+      }
+      return allProjects;
+    }
+
     /**
      * @return sum of the size of all project dirs
      */
-    private long getProjectDirsTotalSizeInBytes() throws IOException {
+    private long getProjectDirsTotalSizeInBytes(final List<ProjectVersion> allProjects) {
       long totalSizeInBytes = 0;
-      for (final ProjectVersion version : FlowPreparer.this.installedProjects.values()) {
+      for (final ProjectVersion version : allProjects) {
         totalSizeInBytes += version.getDirSizeInBytes();
       }
       return totalSizeInBytes;
     }
 
-    private FileTime getLastReferenceTime(final ProjectVersion pv) throws IOException {
-      final Path dirSizeFile = Paths
-          .get(pv.getInstalledDir().toPath().toString(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
-      return Files.getLastModifiedTime(dirSizeFile);
-    }
-
     private void deleteLeastRecentlyUsedProjects(long sizeToFreeInBytes,
-        final List<ProjectVersion>
-            projectVersions) throws IOException {
+        final List<ProjectVersion> projectVersions) {
       // sort project version by last reference time in ascending order
-      try {
-        projectVersions.sort((o1, o2) -> {
-          try {
-            final FileTime lastReferenceTime1 = getLastReferenceTime(o1);
-            final FileTime lastReferenceTime2 = getLastReferenceTime(o2);
-            return lastReferenceTime1.compareTo(lastReferenceTime2);
-          } catch (final IOException ex) {
-            throw new RuntimeException(ex);
-          }
-        });
-      } catch (final RuntimeException ex) {
-        throw new IOException(ex);
-      }
-
+      projectVersions.sort(Comparator.comparing(ProjectVersion::getLastAccessTime));
       for (final ProjectVersion version : projectVersions) {
         if (sizeToFreeInBytes > 0) {
           try {
-            // delete the project directory even if flow within is running. It's ok to
+            // delete the project directory even if flow within is running. It's OK to
             // delete the directory since execution dir is HARD linked to project dir.
-            FlowRunnerManager.deleteDirectory(version);
-            FlowPreparer.this.installedProjects.remove(new Pair<>(version.getProjectId(), version
-                .getVersion()));
+            FlowPreparer.log.info(String.format("deleting project version %s", version));
+            deleteDirectory(version);
             sizeToFreeInBytes -= version.getDirSizeInBytes();
           } catch (final IOException ex) {
-            log.error(ex);
+            FlowPreparer.log.error(ex);
           }
         }
       }
     }
 
-    void deleteProjectDirsIfNecessary(final long spaceToDeleteInBytes) throws IOException {
-      final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes();
-      if (this.projectDirMaxSizeInMb != null
-          && (currentSpaceInBytes + spaceToDeleteInBytes) >= this
-          .projectDirMaxSizeInMb * 1024 * 1024) {
-        deleteLeastRecentlyUsedProjects(spaceToDeleteInBytes,
-            new ArrayList<>(FlowPreparer.this.installedProjects.values()));
+    synchronized void deleteProjectDirsIfNecessary(final long spaceToDeleteInBytes) {
+      if (this.projectDirMaxSizeInMb != null) {
+        final long start = System.currentTimeMillis();
+        final List<ProjectVersion> allProjects = loadAllProjects();
+        FlowPreparer.log
+            .debug(String.format("loading all project dirs metadata completes in %s sec(s)",
+                Duration.ofSeconds(System.currentTimeMillis() - start).getSeconds()));
+
+        final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes(allProjects);
+        if (currentSpaceInBytes + spaceToDeleteInBytes
+            >= this.projectDirMaxSizeInMb * 1024 * 1024) {
+          FlowPreparer.log.info(String.format("Project dir disk usage[%s bytes] exceeds the "
+                  + "limit[%s mb], start cleaning up project dirs",
+              currentSpaceInBytes + spaceToDeleteInBytes, this.projectDirMaxSizeInMb.longValue()));
+          deleteLeastRecentlyUsedProjects(spaceToDeleteInBytes, allProjects);
+        }
       }
     }
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 51ea5a5..3fb6eb0 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -51,12 +51,8 @@ import azkaban.utils.TrackingThreadPool;
 import azkaban.utils.UndefinedPropertyException;
 import com.google.common.base.Preconditions;
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.lang.Thread.State;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -129,7 +125,6 @@ public class FlowRunnerManager implements EventListener,
   private final File projectDirectory;
   private final Object executionDirDeletionSync = new Object();
 
-  private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
   private final int numThreads;
   private int threadPoolQueueSize = -1;
   private final int numJobThreadPerFlow;
@@ -143,8 +138,6 @@ public class FlowRunnerManager implements EventListener,
   private final boolean validateProxyUser;
   // date time of the the last flow submitted.
   private long lastFlowSubmittedDate = 0;
-  // whether the current executor is active
-  private volatile boolean isExecutorActive = false;
 
   @Inject
   public FlowRunnerManager(final Props props,
@@ -170,8 +163,6 @@ public class FlowRunnerManager implements EventListener,
       this.projectDirectory.mkdirs();
     }
 
-    this.installedProjects = new ConcurrentHashMap<>();
-
     // azkaban.temp.dir
     this.numThreads = props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
     this.numJobThreadPerFlow = props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
@@ -205,29 +196,12 @@ public class FlowRunnerManager implements EventListener,
 
     // Create a flow preparer
     this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory,
-        this.projectDirectory, this.installedProjects, projectDirMaxSize);
+        this.projectDirectory, projectDirMaxSize);
 
     this.cleanerThread = new CleanerThread();
     this.cleanerThread.start();
   }
 
-  /*
-   * Delete the project dir associated with {@code version}.
-   * It first acquires object lock of {@code version} waiting for other threads creating
-   * execution dir to finish to avoid race condition. An example of race condition scenario:
-   * delete the dir of a project while an execution of a flow in the same project is being setup
-   * and the flow's execution dir is being created({@link FlowPreparer#setup}).
-   */
-  static void deleteDirectory(final ProjectVersion pv) throws IOException {
-    synchronized (pv) {
-      logger.warn("Deleting project: " + pv);
-      final File installedDir = pv.getInstalledDir();
-      if (installedDir != null && installedDir.exists()) {
-        FileUtils.deleteDirectory(installedDir);
-      }
-    }
-  }
-
   /**
    * Setting the gid bit on the execution directory forces all files/directories created within the
    * directory to be a part of the group associated with the azkaban process. Then, when users
@@ -272,55 +246,6 @@ public class FlowRunnerManager implements EventListener,
     }
   }
 
-  private List<Path> loadExistingProjects() {
-    final List<Path> projects = new ArrayList<>();
-    for (final File project : this.projectDirectory.listFiles(new FilenameFilter() {
-
-      String pattern = "[0-9]+\\.[0-9]+";
-
-      @Override
-      public boolean accept(final File dir, final String name) {
-        return name.matches(this.pattern);
-      }
-    })) {
-      if (project.isDirectory()) {
-        projects.add(project.toPath());
-      }
-    }
-    return projects;
-  }
-
-  private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjectsAsCache() {
-    final Map<Pair<Integer, Integer>, ProjectVersion> allProjects =
-        new ConcurrentHashMap<>();
-    logger.info("loading project dir metadata into memory");
-    for (final Path project : this.loadExistingProjects()) {
-      if (Files.isDirectory(project)) {
-        try {
-          final String fileName = project.getFileName().toString();
-          final int projectId = Integer.parseInt(fileName.split("\\.")[0]);
-          final int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
-          final ProjectVersion projVersion =
-              new ProjectVersion(projectId, versionNum, project.toFile());
-          final Path projectDirSizeFile = Paths
-              .get(projVersion.getInstalledDir().toString(),
-                  FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
-          if (!Files.exists(projectDirSizeFile)) {
-            FlowPreparer.updateDirSize(projVersion.getInstalledDir(), projVersion);
-          }
-
-          projVersion.setDirSizeInBytes(FileIOUtils.readNumberFromFile(projectDirSizeFile));
-          allProjects.put(new Pair<>(projectId, versionNum), projVersion);
-        } catch (final Exception e) {
-          logger.error("error while loading project dir metadata", e);
-        }
-      }
-      logger.info("finish loading project dir metadata into memory");
-    }
-
-    return allProjects;
-  }
-
   public void setExecutorActive(final boolean isActive, final String host, final int port)
       throws ExecutorManagerException {
     final Executor executor = this.executorLoader.fetchExecutor(host, port);
@@ -328,10 +253,6 @@ public class FlowRunnerManager implements EventListener,
     if (executor.isActive() != isActive) {
       executor.setActive(isActive);
       this.executorLoader.updateExecutor(executor);
-      this.isExecutorActive = isActive;
-      if (this.isExecutorActive) {
-        this.installedProjects = this.loadExistingProjectsAsCache();
-      }
     } else {
       logger.info(
           "Set active action ignored. Executor is already " + (isActive ? "active" : "inactive"));
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
index 4972917..b466673 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
@@ -19,6 +19,7 @@ package azkaban.execapp;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.File;
+import java.nio.file.attribute.FileTime;
 
 
 public class ProjectVersion implements Comparable<ProjectVersion> {
@@ -28,6 +29,8 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
 
   private File installedDir;
   private Long dirSize;
+  private Integer fileCount;
+  private FileTime lastAccessTime;
 
   public ProjectVersion(final int projectId, final int version) {
     checkArgument(projectId > 0);
@@ -50,6 +53,14 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
     this.dirSize = dirSize;
   }
 
+  public Integer getFileCount() {
+    return this.fileCount;
+  }
+
+  public void setFileCount(final Integer fileCount) {
+    this.fileCount = fileCount;
+  }
+
   public int getProjectId() {
     return this.projectId;
   }
@@ -77,9 +88,21 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
 
   @Override
   public String toString() {
-    return "ProjectVersion{" + "projectId=" + this.projectId + ", version=" + this.version
-        + ", installedDir="
-        + this.installedDir
-        + '}';
+    return "ProjectVersion{" +
+        "projectId=" + this.projectId +
+        ", version=" + this.version +
+        ", installedDir=" + this.installedDir +
+        ", dirSize=" + this.dirSize +
+        ", fileCount=" + this.fileCount +
+        ", lastAccessTime=" + this.lastAccessTime +
+        '}';
+  }
+
+  public FileTime getLastAccessTime() {
+    return this.lastAccessTime;
+  }
+
+  public void setLastAccessTime(final FileTime lastAccessTime) {
+    this.lastAccessTime = lastAccessTime;
   }
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index e1344dd..640e203 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -49,7 +49,6 @@ import org.junit.rules.TemporaryFolder;
 public class FlowPreparerTest {
 
   public static final String SAMPLE_FLOW_01 = "sample_flow_01";
-  final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects = new HashMap<>();
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private File executionsDir;
@@ -75,8 +74,7 @@ public class FlowPreparerTest {
     this.projectsDir = this.temporaryFolder.newFolder("projects");
 
     this.instance = spy(
-        new FlowPreparer(createMockStorageManager(), this.executionsDir, this.projectsDir,
-            this.installedProjects, null));
+        new FlowPreparer(createMockStorageManager(), this.executionsDir, this.projectsDir, null));
     doNothing().when(this.instance).touchIfExists(any());
   }
 
@@ -92,6 +90,11 @@ public class FlowPreparerTest {
     assertThat(FileIOUtils.readNumberFromFile(
         Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)))
         .isEqualTo(actualDirSize);
+
+    assertThat(FileIOUtils.readNumberFromFile(
+        Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_COUNT_FILE_NAME)))
+        .isEqualTo(8);
+
     assertTrue(pv.getInstalledDir().exists());
     assertTrue(new File(pv.getInstalledDir(), "sample_flow_01").exists());
   }
@@ -132,7 +135,7 @@ public class FlowPreparerTest {
 
     //given
     final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
-        this.executionsDir, this.projectsDir, installedProjects, null);
+        this.executionsDir, this.projectsDir, null);
 
     //when
     final List<File> expectedRemainingFiles = new ArrayList<>();
@@ -153,11 +156,10 @@ public class FlowPreparerTest {
   @Test
   public void testProjectCacheDirCleaner() throws IOException, InterruptedException {
     final Long projectDirMaxSize = 3L;
-    final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects = new HashMap<>();
 
     //given
     final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
-        this.executionsDir, this.projectsDir, installedProjects, projectDirMaxSize);
+        this.executionsDir, this.projectsDir, projectDirMaxSize);
 
     //when
     final List<File> expectedRemainingFiles = new ArrayList<>();