Details
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 7290bad..5a86e7d 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -36,8 +36,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -45,7 +47,7 @@ import org.apache.log4j.Logger;
*/
public class FileIOUtils {
- private final static Logger logger = Logger.getLogger(FileIOUtils.class);
+ private static final Logger log = LoggerFactory.getLogger(FileIOUtils.class);
/**
* Check if a directory is writable
@@ -72,17 +74,17 @@ public class FileIOUtils {
return true;
}
- public static int getFileCount(final File file) {
- final File[] files = file.listFiles();
- int count = 0;
- for (final File f : files) {
- if (f.isDirectory()) {
- count += getFileCount(f);
- } else {
- count++;
+ /**
+ * Delete a directory, log the error if deletion fails.
+ */
+ public static void deleteDirectorySilently(final File dir) {
+ if (dir != null) {
+ try {
+ FileUtils.deleteDirectory(dir);
+ } catch (final IOException e) {
+ log.error("error when deleting dir {}", dir, e);
}
}
- return count;
}
@@ -98,7 +100,7 @@ public class FileIOUtils {
.newBufferedWriter(filePath, StandardCharsets.UTF_8)) {
writer.write(String.valueOf(num));
} catch (final IOException e) {
- logger.error(String.format("Failed to write the number %s to the file %s", num, filePath), e);
+ log.error("Failed to write the number {} to the file {}", num, filePath, e);
throw e;
}
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
index 9887004..f5cfeb7 100644
--- a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
@@ -135,11 +135,6 @@ public class FileIOUtilsTest {
}
@Test
- public void testFileCount() {
- assertThat(FileIOUtils.getFileCount(this.baseDir)).isEqualTo(5);
- }
-
- @Test
public void testHardlinkCopy() throws IOException {
final int hardLinkCount = FileIOUtils.createDeepHardlink(this.sourceDir, this.destDir);
assertThat(areDirsEqual(this.sourceDir, this.destDir, true)).isTrue();
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 4f68c10..66f6b8d 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -21,49 +21,44 @@ import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
import azkaban.project.ProjectFileHandler;
-import azkaban.project.ProjectManagerException;
import azkaban.storage.StorageManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Utils;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.io.File;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
+import java.util.Optional;
import java.util.zip.ZipFile;
import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class FlowPreparer {
+class FlowPreparer {
// Name of the file which keeps project directory size
static final String PROJECT_DIR_SIZE_FILE_NAME = "___azkaban_project_dir_size_in_bytes___";
- // Name of the file which keeps count of files inside the directory
- static final String PROJECT_DIR_COUNT_FILE_NAME = "___azkaban_project_dir_count___";
+ private static final Logger log = LoggerFactory.getLogger(FlowPreparer.class);
- private static final Logger log = Logger.getLogger(FlowPreparer.class);
// TODO spyne: move to config class
private final File executionsDir;
// TODO spyne: move to config class
- private final File projectsDir;
+ private final File projectCacheDir;
private final StorageManager storageManager;
- private final ProjectCacheDirCleaner projectDirCleaner;
- private final ProjectsDirCacheMetrics cacheMetrics;
+ // Null if cache clean-up is disabled
+ private final Optional<ProjectCacheCleaner> projectCacheCleaner;
+ private final ProjectCacheMetrics cacheMetrics;
@VisibleForTesting
- static class ProjectsDirCacheMetrics {
-
+ static class ProjectCacheMetrics {
private long cacheHit;
private long cacheMiss;
@@ -84,328 +79,205 @@ public class FlowPreparer {
}
}
- public FlowPreparer(final StorageManager storageManager, final File executionsDir,
- final File projectsDir, final Long projectDirMaxSizeInMb) {
+ FlowPreparer(final StorageManager storageManager, final File executionsDir,
+ final File projectsDir, final ProjectCacheCleaner cleaner) {
+ Preconditions.checkNotNull(storageManager);
+ Preconditions.checkNotNull(executionsDir);
+ Preconditions.checkNotNull(projectsDir);
+
+ Preconditions.checkArgument(projectsDir.exists());
+ Preconditions.checkArgument(executionsDir.exists());
+
this.storageManager = storageManager;
this.executionsDir = executionsDir;
- this.projectsDir = projectsDir;
- this.projectDirCleaner =
- projectDirMaxSizeInMb != null ? new ProjectCacheDirCleaner(projectDirMaxSizeInMb) : null;
- this.cacheMetrics = new ProjectsDirCacheMetrics();
+ this.projectCacheDir = projectsDir;
+ this.projectCacheCleaner = Optional.ofNullable(cleaner);
+ this.cacheMetrics = new ProjectCacheMetrics();
}
- private boolean isProjectCacheSizeLimitEnabled() {
- return this.projectDirCleaner != null;
- }
-
- public double getProjectDirCacheHitRatio() {
+ double getProjectDirCacheHitRatio() {
return this.cacheMetrics.getHitRatio();
}
/**
- * Creates a file which keeps the size of {@param dir} in bytes inside the {@param dir} and sets
- * the dirSize for {@param pv}.
- *
- * @param dir the directory whose size needs to be kept in the file to be created.
- * @param pv the projectVersion whose size needs to updated.
- */
- static void updateDirSize(final File dir, final ProjectVersion pv) {
- try {
- final Path path = Paths.get(dir.getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
- if (!Files.exists(path)) {
- final long sizeInByte = FileUtils.sizeOfDirectory(dir);
- FileIOUtils.dumpNumberToFile(path, sizeInByte);
- }
- pv.setDirSizeInBytes(FileIOUtils.readNumberFromFile(path));
- } catch (final IOException e) {
- log.error("error when dumping dir size to file", e);
- }
- }
-
- /**
- * Creates a file which keeps the count of files inside {@param dir}
+ * Calculate the directory size and save it to a file.
*
- * @param dir the directory whose size needs to be kept in the file to be created.
- * @param pv the projectVersion whose size needs to updated.
+ * @param dir the directory whose size needs to be saved.
+ * @return the size of the dir.
*/
- static void updateFileCount(final File dir, final ProjectVersion pv) {
- try {
- final Path path = Paths.get(dir.getPath(), PROJECT_DIR_COUNT_FILE_NAME);
- if (!Files.exists(path)) {
- // count itself
- final int fileCount = FileIOUtils.getFileCount(dir) + 1;
- FileIOUtils.dumpNumberToFile(path, fileCount);
- }
- pv.setFileCount((int) FileIOUtils.readNumberFromFile(path));
- } catch (final IOException e) {
- log.error("error when updating file count", e);
+ static long calculateDirSizeAndSave(final File dir) throws IOException {
+ final Path path = Paths.get(dir.getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
+ if (!Files.exists(path)) {
+ final long sizeInByte = FileUtils.sizeOfDirectory(dir);
+ FileIOUtils.dumpNumberToFile(path, sizeInByte);
+ return sizeInByte;
+ } else {
+ return FileIOUtils.readNumberFromFile(path);
}
}
/**
- * check if number of files inside the project dir equals to target
- */
- @VisibleForTesting
- boolean isFileCountEqual(final ProjectVersion pv, final int target) {
- final int fileCount;
- try {
- updateFileCount(pv.getInstalledDir(), pv);
- final Path path = Paths.get(pv.getInstalledDir().getPath(), PROJECT_DIR_COUNT_FILE_NAME);
- fileCount = (int) FileIOUtils.readNumberFromFile(path);
- return fileCount == target;
- } catch (final IOException e) {
- log.error(e);
- return false;
- }
- }
-
- /**
* Prepare the flow directory for execution.
*
* @param flow Executable Flow instance.
*/
- void setup(final ExecutableFlow flow) {
- File execDir = null;
+ void setup(final ExecutableFlow flow) throws ExecutorManagerException {
+ final ProjectFileHandler projectFileHandler = null;
+ File tempDir = null;
try {
- // First get the ProjectVersion
- final ProjectVersion projectVersion = new ProjectVersion(flow.getProjectId(),
+ final ProjectDirectoryMetadata project = new ProjectDirectoryMetadata(
+ flow.getProjectId(),
flow.getVersion());
- // Setup the project
- setupProject(projectVersion);
-
- // Create the execution directory
- execDir = createExecDir(flow);
-
- // Create the symlinks from the project
- final int linkCount = FileIOUtils
- .createDeepHardlink(projectVersion.getInstalledDir(), execDir);
-
- if (isProjectCacheSizeLimitEnabled() && !isFileCountEqual(projectVersion, linkCount)) {
- throw new Exception(String.format("File count check failed for execid: %d, project dir %s"
- + " are being deleted when setting this execution up",
- flow.getExecutionId(), projectVersion.getInstalledDir()));
+ final long flowPrepStartTime = System.currentTimeMillis();
+
+ // Download project to a temp dir if not exists in local cache.
+ final long start = System.currentTimeMillis();
+
+ tempDir = downloadProjectIfNotExists(project);
+
+ log.info("Downloading zip file for project {} when preparing execution [execid {}] "
+ + "completed in {} second(s)", project, flow.getExecutionId(),
+ (System.currentTimeMillis() - start) / 1000);
+
+ // With synchronization, only one thread is allowed to proceed to avoid complicated race
+ // conditions which could arise when multiple threads are downloading/deleting/hard-linking
+ // the same project. But it doesn't prevent multiple executor processes interfering with each
+ // other triggering race conditions. So it's important to operationally make sure that only
+ // one executor process is setting up flow execution against the shared project directory.
+ long criticalSectionStartTime = -1;
+ File execDir = null;
+
+ synchronized (this) {
+ criticalSectionStartTime = System.currentTimeMillis();
+ if (!project.getInstalledDir().exists() && tempDir != null) {
+ // If new project is downloaded and project dir cache clean-up feature is enabled, then
+ // perform clean-up if size of all project dirs exceeds the cache size.
+ if (this.projectCacheCleaner.isPresent()) {
+ this.projectCacheCleaner.get()
+ .deleteProjectDirsIfNecessary(project.getDirSizeInByte());
+ }
+ // Rename temp dir to a proper project directory name.
+ Files.move(tempDir.toPath(), project.getInstalledDir().toPath());
+ }
+ execDir = setupExecutionDir(project.getInstalledDir(), flow);
}
- log.info(String
- .format("Flow Preparation complete. [execid: %d, path: %s]", flow.getExecutionId(),
- execDir.getPath()));
- } catch (final Exception e) {
- log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
- cleanup(execDir);
- throw new RuntimeException(e);
+ final long flowPrepCompletionTime = System.currentTimeMillis();
+ log.info("Flow preparation completed in {} sec(s), out ot which {} sec(s) was spent inside "
+ + "critical section. [execid: {}, path: {}]",
+ (flowPrepCompletionTime - flowPrepStartTime) / 1000,
+ (flowPrepCompletionTime - criticalSectionStartTime) / 1000,
+ flow.getExecutionId(), execDir.getPath());
+ } catch (final Exception ex) {
+ FileIOUtils.deleteDirectorySilently(tempDir);
+ log.error("Error in preparing flow execution {}", flow.getExecutionId(), ex);
+ throw new ExecutorManagerException(ex);
+ } finally {
+ if (projectFileHandler != null) {
+ projectFileHandler.deleteLocalFile();
+ }
}
}
- private void cleanup(final File execDir) {
- if (execDir != null) {
- try {
- FileUtils.deleteDirectory(execDir);
- } catch (final IOException e) {
- throw new RuntimeException(e);
- }
+ private File setupExecutionDir(final File installedDir, final ExecutableFlow flow)
+ throws IOException {
+ File execDir = null;
+ try {
+ execDir = createExecDir(flow);
+ // Create hardlinks from the project
+ FileIOUtils.createDeepHardlink(installedDir, execDir);
+ return execDir;
+ } catch (final Exception ex) {
+ FileIOUtils.deleteDirectorySilently(execDir);
+ throw ex;
}
}
/**
- * Touch the file if it exists.
+ * Update last modified time of the file if it exists.
*
* @param path path to the target file
*/
@VisibleForTesting
- void touchIfExists(final Path path) {
+ void updateLastModifiedTime(final Path path) {
try {
Files.setLastModifiedTime(path, FileTime.fromMillis(System.currentTimeMillis()));
} catch (final IOException ex) {
- log.error(ex);
+ log.warn("Error when updating last modified time for {}", path, ex);
}
}
/**
- * Prepare the project directory.
- *
- * @param pv ProjectVersion object
+ * @return the project directory name of a project
*/
- @VisibleForTesting
- void setupProject(final ProjectVersion pv)
- throws ProjectManagerException, IOException {
- final int projectId = pv.getProjectId();
- final int version = pv.getVersion();
-
- final String projectDir = String.valueOf(projectId) + "." + String.valueOf(version);
- if (pv.getInstalledDir() == null) {
- pv.setInstalledDir(new File(this.projectsDir, projectDir));
- }
-
- // If directory exists. Assume its prepared and skip.
- if (pv.getInstalledDir().exists()) {
- log.info("Project already cached. Skipping download. " + pv);
- this.cacheMetrics.incrementCacheHit();
- touchIfExists(
- Paths.get(pv.getInstalledDir().getPath(), PROJECT_DIR_SIZE_FILE_NAME));
- return;
- }
-
- log.info("Preparing Project: " + pv);
+ private String generateProjectDirName(final ProjectDirectoryMetadata proj) {
+ return String.valueOf(proj.getProjectId()) + "." + String.valueOf(proj.getVersion());
+ }
- final File tempDir = new File(this.projectsDir,
+ private File createTempDir(final ProjectDirectoryMetadata proj) {
+ final String projectDir = generateProjectDirName(proj);
+ final File tempDir = new File(this.projectCacheDir,
"_temp." + projectDir + "." + System.currentTimeMillis());
-
- // TODO spyne: Why mkdirs? This path should be already set up.
tempDir.mkdirs();
+ return tempDir;
+ }
- ProjectFileHandler projectFileHandler = null;
+ private void downloadAndUnzipProject(final ProjectDirectoryMetadata proj, final File dest)
+ throws IOException {
+ final ProjectFileHandler projectFileHandler = requireNonNull(this.storageManager
+ .getProjectFile(proj.getProjectId(), proj.getVersion()));
try {
- log.info(String.format("Downloading zip file for Project Version {%s}", pv));
- this.cacheMetrics.incrementCacheMiss();
- projectFileHandler = requireNonNull(
- this.storageManager.getProjectFile(pv.getProjectId(), pv.getVersion()));
checkState("zip".equals(projectFileHandler.getFileType()));
final File zipFile = requireNonNull(projectFileHandler.getLocalFile());
final ZipFile zip = new ZipFile(zipFile);
- Utils.unzip(zip, tempDir);
- updateDirSize(tempDir, pv);
- updateFileCount(tempDir, pv);
- log.info(String.format("Downloading zip file for Project Version {%s} completes", pv));
- if (this.projectDirCleaner != null) {
- this.projectDirCleaner.deleteProjectDirsIfNecessary(pv.getDirSizeInBytes());
- }
- Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
- log.warn(String.format("Project preparation completes. [%s]", pv));
+ Utils.unzip(zip, dest);
+ proj.setDirSizeInByte(calculateDirSizeAndSave(dest));
} finally {
- if (projectFileHandler != null) {
- projectFileHandler.deleteLocalFile();
- }
- // Clean up: Remove tempDir if exists
- FileUtils.deleteDirectory(tempDir);
+ projectFileHandler.deleteLocalFile();
}
}
+ /**
+ * Download project zip and unzip it if not exists locally.
+ *
+ * @param proj project to download
+ * @return the temp dir where the new project is downloaded to, null if no project is downloaded.
+ * @throws IOException if downloading or unzipping fails.
+ */
+ @VisibleForTesting
+ File downloadProjectIfNotExists(final ProjectDirectoryMetadata proj)
+ throws IOException {
+ final String projectDir = generateProjectDirName(proj);
+ if (proj.getInstalledDir() == null) {
+ proj.setInstalledDir(new File(this.projectCacheDir, projectDir));
+ }
+
+ // If directory exists, assume it's prepared and skip.
+ if (proj.getInstalledDir().exists()) {
+ log.info("Project {} already cached. Skipping download.", proj);
+ // Hit the local cache.
+ this.cacheMetrics.incrementCacheHit();
+ // Update last modified time of the file keeping project dir size when the project is
+ // accessed. This last modified time will be used to determined least recently used
+ // projects when performing project directory clean-up.
+ updateLastModifiedTime(
+ Paths.get(proj.getInstalledDir().getPath(), PROJECT_DIR_SIZE_FILE_NAME));
+ return null;
+ }
+
+ this.cacheMetrics.incrementCacheMiss();
+ final File tempDir = createTempDir(proj);
+ downloadAndUnzipProject(proj, tempDir);
+ return tempDir;
+ }
private File createExecDir(final ExecutableFlow flow) {
final int execId = flow.getExecutionId();
final File execDir = new File(this.executionsDir, String.valueOf(execId));
flow.setExecutionPath(execDir.getPath());
-
- // TODO spyne: Why mkdirs? This path should be already set up.
execDir.mkdirs();
return execDir;
}
-
-
- private class ProjectCacheDirCleaner {
-
- private final long projectDirMaxSizeInMb;
-
- /*
- * Delete the project dir associated with {@code version}.
- */
- private void deleteDirectory(final ProjectVersion pv) throws IOException {
- final File installedDir = pv.getInstalledDir();
- if (installedDir != null && installedDir.exists()) {
- FileUtils.deleteDirectory(installedDir);
- }
- }
-
- ProjectCacheDirCleaner(final long projectDirMaxSizeInMb) {
- this.projectDirMaxSizeInMb = projectDirMaxSizeInMb;
- }
-
- private List<Path> loadAllProjectDirs() {
- final List<Path> projects = new ArrayList<>();
- for (final File project : FlowPreparer.this.projectsDir.listFiles(new FilenameFilter() {
-
- String pattern = "[0-9]+\\.[0-9]+";
-
- @Override
- public boolean accept(final File dir, final String name) {
- return name.matches(this.pattern);
- }
- })) {
- if (project.exists() && project.isDirectory()) {
- projects.add(project.toPath());
- } else {
- FlowPreparer.log
- .debug(String.format("project %s doesn't exist or is non-dir.", project.getName()));
- }
- }
- return projects;
- }
-
- private List<ProjectVersion> loadAllProjects() {
- final List<ProjectVersion> allProjects = new ArrayList<>();
- for (final Path project : this.loadAllProjectDirs()) {
- try {
- final String fileName = project.getFileName().toString();
- final int projectId = Integer.parseInt(fileName.split("\\.")[0]);
- final int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
-
- final ProjectVersion projVersion =
- new ProjectVersion(projectId, versionNum, project.toFile());
-
- FlowPreparer.updateDirSize(projVersion.getInstalledDir(), projVersion);
- FlowPreparer.updateFileCount(projVersion.getInstalledDir(), projVersion);
-
- final Path projectDirFileCount = Paths.get(projVersion.getInstalledDir().toString(),
- FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
-
- projVersion.setLastAccessTime(Files.getLastModifiedTime(projectDirFileCount));
- allProjects.add(projVersion);
- } catch (final Exception e) {
- FlowPreparer.log
- .error(String.format("error while loading project dir metadata for project %s",
- project.getFileName()), e);
- }
- }
- return allProjects;
- }
-
- /**
- * @return sum of the size of all project dirs
- */
- private long getProjectDirsTotalSizeInBytes(final List<ProjectVersion> allProjects) {
- long totalSizeInBytes = 0;
- for (final ProjectVersion version : allProjects) {
- totalSizeInBytes += version.getDirSizeInBytes();
- }
- return totalSizeInBytes;
- }
-
- private void deleteLeastRecentlyUsedProjects(long sizeToFreeInBytes,
- final List<ProjectVersion> projectVersions) {
- // sort project version by last reference time in ascending order
- projectVersions.sort(Comparator.comparing(ProjectVersion::getLastAccessTime));
- for (final ProjectVersion version : projectVersions) {
- if (sizeToFreeInBytes > 0) {
- try {
- // delete the project directory even if flow within is running. It's OK to
- // delete the directory since execution dir is HARD linked to project dir.
- FlowPreparer.log.info(String.format("deleting project version %s", version));
- deleteDirectory(version);
- sizeToFreeInBytes -= version.getDirSizeInBytes();
- } catch (final IOException ex) {
- FlowPreparer.log.error(ex);
- }
- }
- }
- }
-
- synchronized void deleteProjectDirsIfNecessary(final long spaceToDeleteInBytes) {
- final long start = System.currentTimeMillis();
- final List<ProjectVersion> allProjects = loadAllProjects();
- FlowPreparer.log
- .debug(String.format("loading all project dirs metadata completes in %s sec(s)",
- Duration.ofSeconds(System.currentTimeMillis() - start).getSeconds()));
-
- final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes(allProjects);
- if (currentSpaceInBytes + spaceToDeleteInBytes
- >= this.projectDirMaxSizeInMb * 1024 * 1024) {
- FlowPreparer.log.info(String.format("Project dir disk usage[%s bytes] exceeds the "
- + "limit[%s mb], start cleaning up project dirs",
- currentSpaceInBytes + spaceToDeleteInBytes, this.projectDirMaxSizeInMb));
- deleteLeastRecentlyUsedProjects(spaceToDeleteInBytes, allProjects);
- }
- }
- }
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 4dee864..f0b2122 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -201,14 +201,16 @@ public class FlowRunnerManager implements EventListener,
getClass().getClassLoader());
Long projectDirMaxSize = null;
+ ProjectCacheCleaner cleaner = null;
try {
projectDirMaxSize = props.getLong(ConfigurationKeys.PROJECT_DIR_MAX_SIZE_IN_MB);
+ cleaner = new ProjectCacheCleaner(this.projectDirectory, projectDirMaxSize);
} catch (final UndefinedPropertyException ex) {
}
// Create a flow preparer
this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory,
- this.projectDirectory, projectDirMaxSize);
+ this.projectDirectory, cleaner);
this.cleanerThread = new CleanerThread();
this.cleanerThread.start();
@@ -960,7 +962,7 @@ public class FlowRunnerManager implements EventListener,
AzkabanExecutorServer.getApp().getPort()), "The executor can not be null");
this.executorId = executor.getId();
} catch (final Exception e) {
- logger.error("Failed to fetch executor ", e);
+ FlowRunnerManager.logger.error("Failed to fetch executor ", e);
}
}
} else if (FlowRunnerManager.this.active) {
@@ -969,11 +971,11 @@ public class FlowRunnerManager implements EventListener,
final int execId = FlowRunnerManager.this.executorLoader
.selectAndUpdateExecution(this.executorId);
if (execId != -1) {
- logger.info("Submitting flow " + execId);
+ FlowRunnerManager.logger.info("Submitting flow " + execId);
submitFlow(execId);
}
} catch (final Exception e) {
- logger.error("Failed to submit flow ", e);
+ FlowRunnerManager.logger.error("Failed to submit flow ", e);
}
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java
new file mode 100644
index 0000000..751b9c9
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectCacheCleaner.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2019 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import azkaban.utils.ExecutorServiceUtils;
+import azkaban.utils.FileIOUtils;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for deleting least recently accessed projects in the shared project
+ * cache when there's no room to accommodate a new project.
+ */
+class ProjectCacheCleaner {
+
+ private final File projectCacheDir;
+ private final long projectCacheMaxSizeInMB;
+ private static final Logger log = LoggerFactory.getLogger(ProjectCacheCleaner.class);
+
+ ProjectCacheCleaner(final File projectCacheDir, final long projectCacheMaxSizeInMB) {
+ Preconditions.checkNotNull(projectCacheDir);
+ Preconditions.checkArgument(projectCacheDir.exists());
+ Preconditions.checkArgument(projectCacheMaxSizeInMB > 0);
+
+ this.projectCacheDir = projectCacheDir;
+ this.projectCacheMaxSizeInMB = projectCacheMaxSizeInMB;
+ }
+
+ /**
+ * @return a list of project directories.
+ */
+ private List<Path> loadAllProjectDirs() {
+ final List<Path> projects = new ArrayList<>();
+ for (final File project : this.projectCacheDir.listFiles(new FilenameFilter() {
+
+ String pattern = "[0-9]+\\.[0-9]+";
+
+ @Override
+ public boolean accept(final File dir, final String name) {
+ return name.matches(this.pattern);
+ }
+ })) {
+ if (project.exists() && project.isDirectory()) {
+ projects.add(project.toPath());
+ } else {
+ log.debug("project {} doesn't exist or is non-dir.", project.getName());
+ }
+ }
+ return projects;
+ }
+
+ /**
+ * @return a list of {@link ProjectDirectoryMetadata} for all project directories
+ */
+ private List<ProjectDirectoryMetadata> loadAllProjects() {
+ final List<ProjectDirectoryMetadata> allProjects = new ArrayList<>();
+ for (final Path project : this.loadAllProjectDirs()) {
+ try {
+ final String fileName = project.getFileName().toString();
+ final int projectId = Integer.parseInt(fileName.split("\\.")[0]);
+ final int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
+
+ final ProjectDirectoryMetadata projectDirMetadata =
+ new ProjectDirectoryMetadata(projectId, versionNum, project.toFile());
+
+ projectDirMetadata.setDirSizeInByte(
+ FlowPreparer.calculateDirSizeAndSave(projectDirMetadata.getInstalledDir()));
+ projectDirMetadata.setLastAccessTime(
+ Files.getLastModifiedTime(Paths.get(projectDirMetadata.getInstalledDir().toString(),
+ FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)));
+ allProjects.add(projectDirMetadata);
+ } catch (final Exception e) {
+ log.warn("error while loading project dir metadata for project {}",
+ project.getFileName(), e);
+ }
+ }
+ return allProjects;
+ }
+
+ /**
+ * @return sum of the size of all project dirs
+ */
+ private long getProjectDirsTotalSizeInBytes(final List<ProjectDirectoryMetadata> allProjects) {
+ final long totalSizeInBytes = allProjects.stream()
+ .mapToLong(ProjectDirectoryMetadata::getDirSizeInByte).sum();
+ return totalSizeInBytes;
+ }
+
+ /**
+ * Delete all the files in parallel
+ *
+ * @param filesToDelete a list of files to delete
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void deleteFilesInParallel(final List<File> filesToDelete) {
+ final int CLEANING_SERVICE_THREAD_NUM = 8;
+ final ExecutorService deletionService = Executors
+ .newFixedThreadPool(CLEANING_SERVICE_THREAD_NUM);
+
+ for (final File toDelete : filesToDelete) {
+ deletionService.submit(() -> FileIOUtils.deleteDirectorySilently(toDelete));
+ }
+
+ try {
+ new ExecutorServiceUtils().gracefulShutdown(deletionService, Duration.ofDays(1));
+ } catch (final InterruptedException e) {
+ log.warn("error when deleting files", e);
+ }
+ }
+
+ /**
+ * Delete least recently used projects to free up space
+ *
+ * @param sizeToFreeInBytes space to free up
+ * @param projectDirMetadataList a list of candidate files to delete
+ */
+ private void deleteLeastRecentlyUsedProjects(long sizeToFreeInBytes,
+ final List<ProjectDirectoryMetadata> projectDirMetadataList) {
+ // Sort projects by last reference time in ascending order
+ projectDirMetadataList.sort(Comparator.comparing(ProjectDirectoryMetadata::getLastAccessTime));
+ final List<File> filesToDelete = new ArrayList<>();
+
+ for (final ProjectDirectoryMetadata proj : projectDirMetadataList) {
+ if (sizeToFreeInBytes > 0) {
+ // Delete the project directory even if flow within is running. It's OK to
+ // delete the directory since execution dir is HARD linked to project dir. Note that even
+ // if project is deleted, disk space will be freed up only when all associated execution
+ // dirs are deleted.
+ log.debug("deleting project {}", proj);
+ if (proj.getInstalledDir() != null) {
+ filesToDelete.add(proj.getInstalledDir());
+ sizeToFreeInBytes -= proj.getDirSizeInByte();
+ }
+ } else {
+ break;
+ }
+ }
+ deleteFilesInParallel(filesToDelete);
+ }
+
+ /**
+ * Deleting least recently accessed project dirs when there's no room to accommodate new project
+ */
+ void deleteProjectDirsIfNecessary(final long newProjectSizeInBytes) {
+ final long start = System.currentTimeMillis();
+ final List<ProjectDirectoryMetadata> allProjects = loadAllProjects();
+ log.debug("loading all project dirs metadata completed in {} sec(s)",
+ Duration.ofSeconds(System.currentTimeMillis() - start).getSeconds());
+
+ final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes(allProjects);
+ if (currentSpaceInBytes + newProjectSizeInBytes
+ >= this.projectCacheMaxSizeInMB * 1024 * 1024) {
+ log.info(
+ "project cache usage[{} MB] exceeds the limit[{} MB], start cleaning up project dirs",
+ (currentSpaceInBytes + newProjectSizeInBytes) / (1024 * 1024),
+ this.projectCacheMaxSizeInMB);
+
+ final long freeCacheSpaceInBytes =
+ this.projectCacheMaxSizeInMB * 1024 * 1024 - currentSpaceInBytes;
+
+ deleteLeastRecentlyUsedProjects(newProjectSizeInBytes - freeCacheSpaceInBytes, allProjects);
+ }
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index b8fd5f8..dcb4f6c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -18,6 +18,7 @@
package azkaban.execapp;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -28,19 +29,21 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import azkaban.execapp.FlowPreparer.ProjectsDirCacheMetrics;
+import azkaban.execapp.FlowPreparer.ProjectCacheMetrics;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
import azkaban.project.ProjectFileHandler;
import azkaban.storage.StorageManager;
import azkaban.utils.FileIOUtils;
-import azkaban.utils.Pair;
import java.io.File;
-import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -50,6 +53,7 @@ import org.junit.rules.TemporaryFolder;
public class FlowPreparerTest {
public static final String SAMPLE_FLOW_01 = "sample_flow_01";
+
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File executionsDir;
@@ -69,6 +73,15 @@ public class FlowPreparerTest {
return storageManager;
}
+ private ExecutableFlow mockExecutableFlow(final int execId, final int projectId,
+ final int version) {
+ final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
+ when(executableFlow.getExecutionId()).thenReturn(execId);
+ when(executableFlow.getProjectId()).thenReturn(projectId);
+ when(executableFlow.getVersion()).thenReturn(version);
+ return executableFlow;
+ }
+
@Before
public void setUp() throws Exception {
this.executionsDir = this.temporaryFolder.newFolder("executions");
@@ -76,146 +89,107 @@ public class FlowPreparerTest {
this.instance = spy(
new FlowPreparer(createMockStorageManager(), this.executionsDir, this.projectsDir, null));
- doNothing().when(this.instance).touchIfExists(any());
+ doNothing().when(this.instance).updateLastModifiedTime(any());
}
@Test
- public void testSetupProject() throws Exception {
- final ProjectVersion pv = new ProjectVersion(12, 34,
- new File(this.projectsDir, "sample_project_01"));
- this.instance.setupProject(pv);
+ public void testProjectDirSizeIsSet() throws Exception {
+ final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
+ new File(this.projectsDir, SAMPLE_FLOW_01));
+
+ final File tmp = this.instance.downloadProjectIfNotExists(proj);
final long actualDirSize = 1048835;
- assertThat(pv.getDirSizeInBytes()).isEqualTo(actualDirSize);
+ assertThat(proj.getDirSizeInByte()).isEqualTo(actualDirSize);
assertThat(FileIOUtils.readNumberFromFile(
- Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)))
+ Paths.get(tmp.getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)))
.isEqualTo(actualDirSize);
-
- assertThat(FileIOUtils.readNumberFromFile(
- Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_COUNT_FILE_NAME)))
- .isEqualTo(8);
-
- assertTrue(pv.getInstalledDir().exists());
- assertTrue(new File(pv.getInstalledDir(), "sample_flow_01").exists());
}
@Test
- public void testSetupProjectTouchesTheDirSizeFile() throws Exception {
- //verifies setup project touches project dir size file.
- final ProjectVersion pv = new ProjectVersion(12, 34,
- new File(this.projectsDir, "sample_project_01"));
-
- //setup project 1st time will not do touch
- this.instance.setupProject(pv);
- verify(this.instance, never()).touchIfExists(
- Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME));
-
- this.instance.setupProject(pv);
- verify(this.instance).touchIfExists(
- Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME));
- }
+ public void testDownloadingProjectIfNotExists() throws Exception {
+ final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
+ new File(this.projectsDir, SAMPLE_FLOW_01));
+ final File tmp = this.instance.downloadProjectIfNotExists(proj);
- @Test
- public void testSetupFlow() {
- final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
- when(executableFlow.getExecutionId()).thenReturn(12345);
- when(executableFlow.getProjectId()).thenReturn(12);
- when(executableFlow.getVersion()).thenReturn(34);
+ final Path projectDirSizeFile = Paths.get(proj.getInstalledDir().getPath(),
+ FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
- this.instance.setup(executableFlow);
- final File execDir = new File(this.executionsDir, "12345");
- assertTrue(execDir.exists());
- assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
+ verify(this.instance, never()).updateLastModifiedTime(projectDirSizeFile);
+ assertThat(tmp).isNotNull();
+ assertThat(tmp.list()).contains(SAMPLE_FLOW_01);
}
@Test
- public void testFileCountCheckNotCalled() {
- //given
- final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
- when(executableFlow.getExecutionId()).thenReturn(12345);
- when(executableFlow.getProjectId()).thenReturn(12);
- when(executableFlow.getVersion()).thenReturn(34);
-
- //when
- this.instance.setup(executableFlow);
+ public void testNotDownloadingProjectIfExists() throws Exception {
+ final ProjectDirectoryMetadata proj = new ProjectDirectoryMetadata(12, 34,
+ new File(this.projectsDir, SAMPLE_FLOW_01));
+ File tmp = this.instance.downloadProjectIfNotExists(proj);
+ Files.move(tmp.toPath(), proj.getInstalledDir().toPath());
- //then
- verify(this.instance, never()).isFileCountEqual(any(), anyInt());
- }
+ // Try downloading the same project again
+ tmp = this.instance.downloadProjectIfNotExists(proj);
- @Test
- public void testIsFileCountEqual() {
- //given
- final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
- this.executionsDir, this.projectsDir, 1L);
- final File projectDir = new File(this.projectsDir, "sample_project_01");
- projectDir.mkdir();
- final ProjectVersion pv = new ProjectVersion(1, 1, projectDir);
+ final Path projectDirSizeFile = Paths.get(proj.getInstalledDir().getPath(),
+ FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
- //then
- assertThat(flowPreparer.isFileCountEqual(pv, 1)).isEqualTo(true);
+ verify(this.instance).updateLastModifiedTime(projectDirSizeFile);
+ assertThat(tmp).isNull();
}
@Test
- public void testProjectCacheDirCleanerNotEnabled() throws IOException {
- final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects = new HashMap<>();
-
- //given
- final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
- this.executionsDir, this.projectsDir, null);
+ public void testSetupFlowByMultipleThreads() {
+ final int threadNum = 4;
+
+ final ExecutableFlow[] executableFlows = new ExecutableFlow[]{
+ mockExecutableFlow(1, 12, 34),
+ mockExecutableFlow(2, 12, 34),
+ mockExecutableFlow(3, 12, 34),
+ mockExecutableFlow(4, 12, 34)
+ };
+
+ final ExecutorService service = Executors.newFixedThreadPool(threadNum);
+
+ final List<Future> futures = new ArrayList<>();
+ for (int i = 0; i < threadNum; i++) {
+ final int finalI = i;
+ futures.add(service.submit(() -> {
+ assertThatCode(() -> this.instance.setup(executableFlows[finalI])
+ ).doesNotThrowAnyException();
+ }));
+ }
- //when
- final List<File> expectedRemainingFiles = new ArrayList<>();
- for (int i = 1; i <= 3; i++) {
- final int projectId = i;
- final int version = 1;
- final ProjectVersion pv = new ProjectVersion(projectId, version, null);
- installedProjects.put(new Pair<>(projectId, version), pv);
- flowPreparer.setupProject(pv);
- expectedRemainingFiles.add(pv.getInstalledDir());
+ for (final Future future : futures) {
+ assertThatCode(() -> future.get()).doesNotThrowAnyException();
}
- //then
- assertThat(this.projectsDir.listFiles()).containsExactlyInAnyOrder(expectedRemainingFiles
- .toArray(new File[expectedRemainingFiles.size()]));
+ service.shutdownNow();
+ for (final ExecutableFlow flow : executableFlows) {
+ final File execDir = new File(this.executionsDir, String.valueOf(flow.getExecutionId()));
+ assertTrue(execDir.exists());
+ assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
+ }
}
@Test
- public void testProjectCacheDirCleaner() throws IOException, InterruptedException {
- final Long projectDirMaxSize = 3L;
-
- //given
- final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
- this.executionsDir, this.projectsDir, projectDirMaxSize);
-
- //when
- final List<File> expectedRemainingFiles = new ArrayList<>();
- for (int i = 1; i <= 3; i++) {
- final int projectId = i;
- final int version = 1;
- final ProjectVersion pv = new ProjectVersion(projectId, version, null);
- flowPreparer.setupProject(pv);
-
- if (i >= 2) {
- //the first file will be deleted
- expectedRemainingFiles.add(pv.getInstalledDir());
- }
- // last modified time of millis second granularity of a file is not supported by all file
- // systems, so sleep for 1 second between creation of each project dir to make their last
- // modified time different.
- Thread.sleep(1000);
- }
+ public void testSetupFlow() throws ExecutorManagerException {
+ final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
+ when(executableFlow.getExecutionId()).thenReturn(12345);
+ when(executableFlow.getProjectId()).thenReturn(12);
+ when(executableFlow.getVersion()).thenReturn(34);
- //then
- assertThat(this.projectsDir.listFiles()).containsExactlyInAnyOrder(expectedRemainingFiles
- .toArray(new File[expectedRemainingFiles.size()]));
+ this.instance.setup(executableFlow);
+ final File execDir = new File(this.executionsDir, "12345");
+ assertTrue(execDir.exists());
+ assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
}
+
@Test
public void testProjectsCacheMetricsZeroHit() {
//given
- final FlowPreparer.ProjectsDirCacheMetrics cacheMetrics = new ProjectsDirCacheMetrics();
+ final ProjectCacheMetrics cacheMetrics = new ProjectCacheMetrics();
//when zero hit and zero miss then
assertThat(cacheMetrics.getHitRatio()).isEqualTo(0);
@@ -229,7 +203,7 @@ public class FlowPreparerTest {
@Test
public void testProjectsCacheMetricsHit() {
//given
- final FlowPreparer.ProjectsDirCacheMetrics cacheMetrics = new ProjectsDirCacheMetrics();
+ final ProjectCacheMetrics cacheMetrics = new ProjectCacheMetrics();
//when one hit
cacheMetrics.incrementCacheHit();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java
new file mode 100644
index 0000000..ba786cc
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/ProjectCacheCleanerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.execapp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.utils.Utils;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
+import java.util.zip.ZipFile;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+public class ProjectCacheCleanerTest {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ private File cacheDir;
+
+ private void unzip(final Path srcZipFile, final Path dest) throws IOException {
+ final ZipFile zip = new ZipFile(srcZipFile.toFile());
+ Utils.unzip(zip, dest.toFile());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.cacheDir = this.temporaryFolder.newFolder("projects");
+ final ClassLoader classLoader = getClass().getClassLoader();
+
+ final long current = System.currentTimeMillis();
+ unzip(Paths.get(classLoader.getResource("1.1.zip").getPath()),
+ this.cacheDir.toPath());
+ Files.setLastModifiedTime(Paths.get(this.cacheDir.toString(), "1.1",
+ FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME), FileTime.fromMillis(current - 2000));
+
+ unzip(Paths.get(classLoader.getResource("2.1.zip").getPath()),
+ this.cacheDir.toPath());
+ Files.setLastModifiedTime(Paths.get(this.cacheDir.toString(), "2.1",
+ FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME), FileTime.fromMillis(current - 1000));
+
+ unzip(Paths.get(classLoader.getResource("3.1.zip").getPath()),
+ this.cacheDir.toPath());
+ Files.setLastModifiedTime(Paths.get(this.cacheDir.toString(), "3.1",
+ FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME), FileTime.fromMillis(current));
+ }
+
+ @Test
+ /**
+ * There's still space in the cache, no deletion.
+ */
+ public void testNotDeleting() {
+ final Long projectDirMaxSizeInMB = 7L;
+ final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir,
+ projectDirMaxSizeInMB);
+ cleaner.deleteProjectDirsIfNecessary(1);
+
+ assertThat(this.cacheDir.list()).hasSize(3);
+ }
+
+ @Test
+ /**
+ * Deleting everything in the cache to accommodate new item.
+ */
+ public void testDeletingAll() {
+ final Long projectDirMaxSize = 3L;
+ final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+ cleaner.deleteProjectDirsIfNecessary(7000000);
+
+ assertThat(this.cacheDir.list()).hasSize(0);
+ }
+
+ @Test
+ /**
+ * Deleting two least recently used items in the cache to accommodate new item.
+ */
+ public void testDeletingTwoLRUItems() {
+ final Long projectDirMaxSize = 7L;
+ final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+ cleaner.deleteProjectDirsIfNecessary(3000000);
+ assertThat(this.cacheDir.list()).hasSize(1);
+ assertThat(this.cacheDir.list()).contains("3.1");
+ }
+
+ @Test
+ /**
+ * Deleting the least recently used item in the cache to accommodate new item.
+ */
+ public void testDeletingOneLRUItem() {
+ final Long projectDirMaxSize = 7L;
+ final ProjectCacheCleaner cleaner = new ProjectCacheCleaner(this.cacheDir, projectDirMaxSize);
+ cleaner.deleteProjectDirsIfNecessary(2000000);
+ assertThat(this.cacheDir.list()).hasSize(2);
+ assertThat(this.cacheDir.list()).contains("3.1");
+ assertThat(this.cacheDir.list()).contains("2.1");
+ }
+}
diff --git a/azkaban-exec-server/src/test/resources/1.1.zip b/azkaban-exec-server/src/test/resources/1.1.zip
new file mode 100644
index 0000000..98f1a2a
Binary files /dev/null and b/azkaban-exec-server/src/test/resources/1.1.zip differ
diff --git a/azkaban-exec-server/src/test/resources/2.1.zip b/azkaban-exec-server/src/test/resources/2.1.zip
new file mode 100644
index 0000000..4b574e8
Binary files /dev/null and b/azkaban-exec-server/src/test/resources/2.1.zip differ
diff --git a/azkaban-exec-server/src/test/resources/3.1.zip b/azkaban-exec-server/src/test/resources/3.1.zip
new file mode 100644
index 0000000..430f87b
Binary files /dev/null and b/azkaban-exec-server/src/test/resources/3.1.zip differ
diff --git a/azkaban-exec-server/src/test/resources/sample_flow_01.zip b/azkaban-exec-server/src/test/resources/sample_flow_01.zip
index 1ac01a4..53ae5bd 100644
Binary files a/azkaban-exec-server/src/test/resources/sample_flow_01.zip and b/azkaban-exec-server/src/test/resources/sample_flow_01.zip differ