Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index e781732..b3fd0f4 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -226,6 +226,9 @@ public class Constants {
public static final String QUEUEPROCESSING_ENABLED = "azkaban.queueprocessing.enabled";
public static final String SESSION_TIME_TO_LIVE = "session.time.to.live";
+
+ // allowed max size of shared project dir in MB
+ public static final String PROJECT_DIR_MAX_SIZE_IN_MB = "azkaban.project_cache_max_size_in_mb";
}
public static class FlowProperties {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 47010d5..5d93d4d 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -35,6 +35,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.zip.ZipFile;
import org.apache.commons.io.FileUtils;
@@ -52,16 +54,21 @@ public class FlowPreparer {
private final File projectsDir;
private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
private final StorageManager storageManager;
+ private final ProjectCacheDirCleaner projectDirCleaner;
public FlowPreparer(final StorageManager storageManager, final File executionsDir,
final File projectsDir,
- final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects) {
+ final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects,
+ final Long projectDirMaxSizeInMb) {
this.storageManager = storageManager;
this.executionsDir = executionsDir;
this.projectsDir = projectsDir;
this.installedProjects = installedProjects;
+ this.projectDirCleaner = new ProjectCacheDirCleaner(projectDirMaxSizeInMb);
+
}
+
/**
* Prepare the flow directory for execution.
*
@@ -79,11 +86,14 @@ public class FlowPreparer {
// Create the execution directory
execDir = createExecDir(flow);
- // Create the symlinks from the project
- copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
-
- log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
- flow.getExecutionId(), execDir.getPath()));
+ // Synchronized on {@code projectVersion} to prevent one thread deleting a project dir
+ // in {@link FlowPreparer#setup} while another is creating hardlink from the same project dir
+ synchronized (projectVersion) {
+ // Create the symlinks from the project
+ copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+ log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
+ flow.getExecutionId(), execDir.getPath()));
+ }
} catch (final Exception e) {
log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
cleanup(execDir);
@@ -91,6 +101,16 @@ public class FlowPreparer {
}
}
+ private void cleanup(final File execDir) {
+ if (execDir != null) {
+ try {
+ FileUtils.deleteDirectory(execDir);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
/**
* Touch the file if it exists.
*
@@ -105,7 +125,6 @@ public class FlowPreparer {
}
}
-
/**
* Prepare the project directory.
*
@@ -148,7 +167,9 @@ public class FlowPreparer {
final ZipFile zip = new ZipFile(zipFile);
Utils.unzip(zip, tempDir);
updateDirSize(tempDir, pv);
+ this.projectDirCleaner.deleteProjectDirsIfNecessary(pv.getDirSizeInBytes());
Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
+ this.installedProjects.put(new Pair<>(pv.getProjectId(), pv.getVersion()), pv);
log.warn(String.format("Project preparation completes. [%s]", pv));
} finally {
if (projectFileHandler != null) {
@@ -168,7 +189,7 @@ public class FlowPreparer {
*/
private void updateDirSize(final File dir, final ProjectVersion pv) {
final long sizeInByte = FileUtils.sizeOfDirectory(dir);
- pv.setDirSize(sizeInByte);
+ pv.setDirSizeInBytes(sizeInByte);
try {
FileIOUtils.dumpNumberToFile(Paths.get(dir.getPath(), PROJECT_DIR_SIZE_FILE_NAME),
sizeInByte);
@@ -197,19 +218,79 @@ public class FlowPreparer {
// to set up.
final ProjectVersion projectVersion;
synchronized (this.installedProjects) {
- projectVersion = this.installedProjects
- .computeIfAbsent(new Pair<>(flow.getProjectId(), flow.getVersion()),
- k -> new ProjectVersion(flow.getProjectId(), flow.getVersion()));
+ final Pair<Integer, Integer> pair = new Pair<>(flow.getProjectId(), flow.getVersion());
+ projectVersion = this.installedProjects.getOrDefault(pair, new ProjectVersion(flow
+ .getProjectId(), flow.getVersion()));
}
return projectVersion;
}
- private void cleanup(final File execDir) {
- if (execDir != null) {
+ private class ProjectCacheDirCleaner {
+
+ private final Long projectDirMaxSizeInMb;
+
+ ProjectCacheDirCleaner(final Long projectDirMaxSizeInMb) {
+ this.projectDirMaxSizeInMb = projectDirMaxSizeInMb;
+ }
+
+ /**
+ * @return sum of the size of all project dirs
+ */
+ private long getProjectDirsTotalSizeInBytes() throws IOException {
+ long totalSizeInBytes = 0;
+ for (final ProjectVersion version : FlowPreparer.this.installedProjects.values()) {
+ totalSizeInBytes += version.getDirSizeInBytes();
+ }
+ return totalSizeInBytes;
+ }
+
+ private FileTime getLastReferenceTime(final ProjectVersion pv) throws IOException {
+ final Path dirSizeFile = Paths
+ .get(pv.getInstalledDir().toPath().toString(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
+ return Files.getLastModifiedTime(dirSizeFile);
+ }
+
+ private void deleteLeastRecentlyUsedProjects(long sizeToFreeInBytes,
+ final List<ProjectVersion>
+ projectVersions) throws IOException {
+ // sort project version by last reference time in ascending order
try {
- FileUtils.deleteDirectory(execDir);
- } catch (final IOException e) {
- throw new RuntimeException(e);
+ projectVersions.sort((o1, o2) -> {
+ try {
+ final FileTime lastReferenceTime1 = getLastReferenceTime(o1);
+ final FileTime lastReferenceTime2 = getLastReferenceTime(o2);
+ return lastReferenceTime1.compareTo(lastReferenceTime2);
+ } catch (final IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ } catch (final RuntimeException ex) {
+ throw new IOException(ex);
+ }
+
+ for (final ProjectVersion version : projectVersions) {
+ if (sizeToFreeInBytes > 0) {
+ try {
+ // delete the project directory even if flow within is running. It's ok to
+ // delete the directory since execution dir is HARD linked to project dir.
+ FlowRunnerManager.deleteDirectory(version);
+ FlowPreparer.this.installedProjects.remove(new Pair<>(version.getProjectId(), version
+ .getVersion()));
+ sizeToFreeInBytes -= version.getDirSizeInBytes();
+ } catch (final IOException ex) {
+ log.error(ex);
+ }
+ }
+ }
+ }
+
+ void deleteProjectDirsIfNecessary(final long spaceToDeleteInBytes) throws IOException {
+ final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes();
+ if (this.projectDirMaxSizeInMb != null
+ && (currentSpaceInBytes + spaceToDeleteInBytes) >= this
+ .projectDirMaxSizeInMb * 1024 * 1024) {
+ deleteLeastRecentlyUsedProjects(spaceToDeleteInBytes,
+ new ArrayList<>(FlowPreparer.this.installedProjects.values()));
}
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index e7d17da..405766b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -17,6 +17,7 @@
package azkaban.execapp;
import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.event.Event;
import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
@@ -46,10 +47,14 @@ import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.ThreadPoolExecutingListener;
import azkaban.utils.TrackingThreadPool;
+import azkaban.utils.UndefinedPropertyException;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.Thread.State;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -121,28 +126,22 @@ public class FlowRunnerManager implements EventListener,
private final Props azkabanProps;
private final File executionDirectory;
private final File projectDirectory;
-
private final Object executionDirDeletionSync = new Object();
+
private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
private int threadPoolQueueSize = -1;
private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
-
private Props globalProps;
-
private long lastCleanerThreadCheckTime = -1;
private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
-
// We want to limit the log sizes to about 20 megs
private String jobLogChunkSize = "5MB";
private int jobLogNumFiles = 4;
-
// If true, jobs will validate proxy user against a list of valid proxy users.
private boolean validateProxyUser = false;
-
// date time of the the last flow submitted.
private long lastFlowSubmittedDate = 0;
-
// whether the current executor is active
private volatile boolean isExecutorActive = false;
@@ -170,18 +169,13 @@ public class FlowRunnerManager implements EventListener,
this.projectDirectory.mkdirs();
}
- this.installedProjects = new HashMap<>();
+ this.installedProjects = new ConcurrentHashMap<>();
// azkaban.temp.dir
this.numThreads = props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
this.numJobThreadPerFlow = props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
this.executorService = createExecutorService(this.numThreads);
- // Create a flow preparer
- this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory,
- this.projectDirectory,
- this.installedProjects);
-
this.executorLoader = executorLoader;
this.projectLoader = projectLoader;
this.triggerManager = triggerManager;
@@ -191,9 +185,6 @@ public class FlowRunnerManager implements EventListener,
this.validateProxyUser = this.azkabanProps.getBoolean("proxy.user.lock.down", false);
- this.cleanerThread = new CleanerThread();
- this.cleanerThread.start();
-
final String globalPropsPath = props.getString("executor.global.properties", null);
if (globalPropsPath != null) {
this.globalProps = new Props(null, globalPropsPath);
@@ -204,6 +195,36 @@ public class FlowRunnerManager implements EventListener,
AzkabanExecutorServer.JOBTYPE_PLUGIN_DIR,
JobTypeManager.DEFAULT_JOBTYPEPLUGINDIR), this.globalProps,
getClass().getClassLoader());
+
+ Long projectDirMaxSize = null;
+ try {
+ projectDirMaxSize = props.getLong(ConfigurationKeys.PROJECT_DIR_MAX_SIZE_IN_MB);
+ } catch (final UndefinedPropertyException ex) {
+ }
+
+ // Create a flow preparer
+ this.flowPreparer = new FlowPreparer(storageManager, this.executionDirectory,
+ this.projectDirectory, this.installedProjects, projectDirMaxSize);
+
+ this.cleanerThread = new CleanerThread();
+ this.cleanerThread.start();
+ }
+
+ /*
+ * Delete the project dir associated with {@code version}.
+ * It first acquires object lock of {@code version} waiting for other threads creating
+ * execution dir to finish to avoid race condition. An example of race condition scenario:
+ * delete the dir of a project while an execution of a flow in the same project is being setup
+ * and the flow's execution dir is being created({@link FlowPreparer#setup}).
+ */
+ static void deleteDirectory(final ProjectVersion pv) throws IOException {
+ synchronized (pv) {
+ logger.warn("Deleting project: " + pv);
+ final File installedDir = pv.getInstalledDir();
+ if (installedDir != null && installedDir.exists()) {
+ FileUtils.deleteDirectory(installedDir);
+ }
+ }
}
/**
@@ -250,9 +271,8 @@ public class FlowRunnerManager implements EventListener,
}
}
- private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
- final Map<Pair<Integer, Integer>, ProjectVersion> allProjects =
- new HashMap<>();
+ private List<Path> loadExistingProjects() {
+ final List<Path> projects = new ArrayList<>();
for (final File project : this.projectDirectory.listFiles(new FilenameFilter() {
String pattern = "[0-9]+\\.[0-9]+";
@@ -263,19 +283,35 @@ public class FlowRunnerManager implements EventListener,
}
})) {
if (project.isDirectory()) {
+ projects.add(project.toPath());
+ }
+ }
+ return projects;
+ }
+
+ private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjectsAsCache() {
+ final Map<Pair<Integer, Integer>, ProjectVersion> allProjects =
+ new ConcurrentHashMap<>();
+ for (final Path project : this.loadExistingProjects()) {
+ if (Files.isDirectory(project)) {
try {
- final String fileName = new File(project.getAbsolutePath()).getName();
+ final String fileName = project.getFileName().toString();
final int projectId = Integer.parseInt(fileName.split("\\.")[0]);
final int versionNum = Integer.parseInt(fileName.split("\\.")[1]);
final ProjectVersion version =
- new ProjectVersion(projectId, versionNum, project);
- allProjects.put(new Pair<>(projectId, versionNum),
- version);
+ new ProjectVersion(projectId, versionNum, project.toFile());
+
+ version.setDirSizeInBytes(
+ FileIOUtils.readNumberFromFile(Paths.get(version.getInstalledDir().toString(),
+ FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)));
+
+ allProjects.put(new Pair<>(projectId, versionNum), version);
} catch (final Exception e) {
e.printStackTrace();
}
}
}
+
return allProjects;
}
@@ -285,7 +321,7 @@ public class FlowRunnerManager implements EventListener,
public void setExecutorActive(final boolean isActive) {
this.isExecutorActive = isActive;
if (this.isExecutorActive) {
- this.installedProjects = this.loadExistingProjects();
+ this.installedProjects = this.loadExistingProjectsAsCache();
}
}
@@ -304,16 +340,6 @@ public class FlowRunnerManager implements EventListener,
this.globalProps = globalProps;
}
- public void deleteDirectory(final ProjectVersion pv) throws IOException {
- synchronized (pv) {
- logger.warn("Deleting project: " + pv);
- final File installedDir = pv.getInstalledDir();
- if (installedDir != null && installedDir.exists()) {
- FileUtils.deleteDirectory(installedDir);
- }
- }
- }
-
public void submitFlow(final int execId) throws ExecutorManagerException {
// Load file and submit
if (this.runningFlows.containsKey(execId)) {
@@ -776,6 +802,26 @@ public class FlowRunnerManager implements EventListener,
}
}
+ private Set<Pair<Integer, Integer>> getActiveProjectVersions() {
+ final Set<Pair<Integer, Integer>> activeProjectVersions = new HashSet<>();
+ for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
+ final ExecutableFlow flow = runner.getExecutableFlow();
+ activeProjectVersions.add(new Pair<>(flow
+ .getProjectId(), flow.getVersion()));
+ }
+ return activeProjectVersions;
+ }
+
+ /**
+ * Checks if the project version contains any running flow
+ */
+ private boolean isActiveProject(final ProjectVersion version) {
+ final Pair<Integer, Integer> versionKey = new Pair<>(version.getProjectId(),
+ version.getVersion());
+ return getActiveProjectVersions().contains(versionKey);
+ }
+
+
private class CleanerThread extends Thread {
// Every hour, clean execution dir.
@@ -784,7 +830,6 @@ public class FlowRunnerManager implements EventListener,
private static final long OLD_PROJECT_DIR_INTERVAL_MS = 5 * 60 * 1000;
// Every 2 mins clean the recently finished list
private static final long RECENTLY_FINISHED_INTERVAL_MS = 2 * 60 * 1000;
-
// Every 5 mins kill flows running longer than allowed max running time
private static final long LONG_RUNNING_FLOW_KILLING_INTERVAL_MS = 5 * 60 * 1000;
private final long flowMaxRunningTimeInMins = FlowRunnerManager.this.azkabanProps.getInt(
@@ -833,7 +878,7 @@ public class FlowRunnerManager implements EventListener,
if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > this.lastOldProjectCleanTime
&& FlowRunnerManager.this.isExecutorActive) {
logger.info("Cleaning old projects");
- cleanOlderProjects();
+ cleanProjectsOfOldVersion();
this.lastOldProjectCleanTime = currentTime;
}
@@ -922,7 +967,7 @@ public class FlowRunnerManager implements EventListener,
}
}
- private void cleanOlderProjects() {
+ private void cleanProjectsOfOldVersion() {
final Map<Integer, ArrayList<ProjectVersion>> projectVersions =
new HashMap<>();
for (final ProjectVersion version : FlowRunnerManager.this.installedProjects.values()) {
@@ -935,14 +980,6 @@ public class FlowRunnerManager implements EventListener,
versionList.add(version);
}
- final HashSet<Pair<Integer, Integer>> activeProjectVersions =
- new HashSet<>();
- for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
- final ExecutableFlow flow = runner.getExecutableFlow();
- activeProjectVersions.add(new Pair<>(flow
- .getProjectId(), flow.getVersion()));
- }
-
for (final Map.Entry<Integer, ArrayList<ProjectVersion>> entry : projectVersions
.entrySet()) {
// Integer projectId = entry.getKey();
@@ -956,10 +993,7 @@ public class FlowRunnerManager implements EventListener,
Collections.sort(installedVersions);
for (int i = 0; i < installedVersions.size() - 1; ++i) {
final ProjectVersion version = installedVersions.get(i);
- final Pair<Integer, Integer> versionKey =
- new Pair<>(version.getProjectId(),
- version.getVersion());
- if (!activeProjectVersions.contains(versionKey)) {
+ if (!isActiveProject(version)) {
try {
logger.info("Removing old unused installed project "
+ version.getProjectId() + ":" + version.getVersion());
@@ -973,6 +1007,8 @@ public class FlowRunnerManager implements EventListener,
}
}
}
+
}
+
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
index a97ca35..4972917 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
@@ -42,11 +42,11 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
this.installedDir = installedDir;
}
- public Long getDirSize() {
+ public Long getDirSizeInBytes() {
return this.dirSize;
}
- public void setDirSize(final Long dirSize) {
+ public void setDirSizeInBytes(final Long dirSize) {
this.dirSize = dirSize;
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index 7af3573..1fe97c9 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -20,6 +20,7 @@ package azkaban.execapp;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -33,8 +34,11 @@ import azkaban.storage.StorageManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Pair;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.junit.After;
@@ -52,13 +56,7 @@ public class FlowPreparerTest {
private FlowPreparer instance;
- @Before
- public void setUp() throws Exception {
- tearDown();
-
- this.executionsDir.mkdirs();
- this.projectsDir.mkdirs();
-
+ private StorageManager createMockStorageManager() {
final ClassLoader classLoader = getClass().getClassLoader();
final File file = new File(classLoader.getResource(SAMPLE_FLOW_01 + ".zip").getFile());
@@ -67,10 +65,20 @@ public class FlowPreparerTest {
when(projectFileHandler.getLocalFile()).thenReturn(file);
final StorageManager storageManager = mock(StorageManager.class);
- when(storageManager.getProjectFile(12, 34)).thenReturn(projectFileHandler);
+ when(storageManager.getProjectFile(anyInt(), anyInt())).thenReturn(projectFileHandler);
+ return storageManager;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ tearDown();
+
+ this.executionsDir.mkdirs();
+ this.projectsDir.mkdirs();
- this.instance = spy(new FlowPreparer(storageManager, this.executionsDir, this.projectsDir,
- this.installedProjects));
+ this.instance = spy(
+ new FlowPreparer(createMockStorageManager(), this.executionsDir, this.projectsDir,
+ this.installedProjects, null));
doNothing().when(this.instance).touchIfExists(any());
}
@@ -86,9 +94,9 @@ public class FlowPreparerTest {
new File(this.projectsDir, "sample_project_01"));
this.instance.setupProject(pv);
- final long actualDirSize = 259;
+ final long actualDirSize = 1048835;
- assertThat(pv.getDirSize()).isEqualTo(actualDirSize);
+ assertThat(pv.getDirSizeInBytes()).isEqualTo(actualDirSize);
assertThat(FileIOUtils.readNumberFromFile(
Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)))
.isEqualTo(actualDirSize);
@@ -124,4 +132,61 @@ public class FlowPreparerTest {
assertTrue(execDir.exists());
assertTrue(new File(execDir, SAMPLE_FLOW_01).exists());
}
+
+
+ @Test
+ public void testProjectCacheDirCleanerNotEnabled() throws IOException {
+ final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects = new HashMap<>();
+
+ //given
+ final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
+ this.executionsDir, this.projectsDir, installedProjects, null);
+
+ //when
+ final List<File> expectedRemainingFiles = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ final int projectId = i;
+ final int version = 1;
+ final ProjectVersion pv = new ProjectVersion(projectId, version, null);
+ installedProjects.put(new Pair<>(projectId, version), pv);
+ flowPreparer.setupProject(pv);
+ expectedRemainingFiles.add(pv.getInstalledDir());
+ }
+
+ //then
+ assertThat(this.projectsDir.listFiles()).containsExactlyInAnyOrder(expectedRemainingFiles
+ .toArray(new File[expectedRemainingFiles.size()]));
+ }
+
+ @Test
+ public void testProjectCacheDirCleaner() throws IOException, InterruptedException {
+ final Long projectDirMaxSize = 3L;
+ final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects = new HashMap<>();
+
+ //given
+ final FlowPreparer flowPreparer = new FlowPreparer(createMockStorageManager(),
+ this.executionsDir, this.projectsDir, installedProjects, projectDirMaxSize);
+
+ //when
+ final List<File> expectedRemainingFiles = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ final int projectId = i;
+ final int version = 1;
+ final ProjectVersion pv = new ProjectVersion(projectId, version, null);
+ flowPreparer.setupProject(pv);
+
+ if (i >= 2) {
+ //the first file will be deleted
+ expectedRemainingFiles.add(pv.getInstalledDir());
+ }
+ // last modified time of millis second granularity of a file is not supported by all file
+ // systems, so sleep for 1 second between creation of each project dir to make their last
+ // modified time different.
+ Thread.sleep(1000);
+ }
+
+ //then
+ assertThat(this.projectsDir.listFiles()).containsExactlyInAnyOrder(expectedRemainingFiles
+ .toArray(new File[expectedRemainingFiles.size()]));
+ }
}
diff --git a/azkaban-exec-server/src/test/resources/sample_flow_01.zip b/azkaban-exec-server/src/test/resources/sample_flow_01.zip
index 1147976..1ac01a4 100644
Binary files a/azkaban-exec-server/src/test/resources/sample_flow_01.zip and b/azkaban-exec-server/src/test/resources/sample_flow_01.zip differ