azkaban-aplcache
Changes
az-core/src/main/java/azkaban/Constants.java 15(+15 -0)
Details
az-core/src/main/java/azkaban/Constants.java 15(+15 -0)
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index a85faf6..2fe093d 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -227,6 +227,21 @@ public class Constants {
public static final String QUEUEPROCESSING_ENABLED = "azkaban.queueprocessing.enabled";
public static final String SESSION_TIME_TO_LIVE = "session.time.to.live";
+
+ // allowed max size of project dir in mb
+ public static final String PROJECT_DIR_MAX_SIZE = "azkaban.project.dir_max_size";
+
+ // the disk usage threshold to trigger project dir cleanup.
+ // E.g, if set to 0.8, cleanup will trigger when project dir size >= 0.8 of
+ // {@value#PROJECT_DIR_MAX_SIZE}.
+ public static final String PROJECT_DIR_CLEANUP_START_THRESHOLD = "azkaban.project"
+ + ".start_cleanup_threshold";
+
+ // the disk usage threshold to stop project dir cleanup.
+ // E.g, if set to 0.6, cleanup will stop when project dir size < 0.6 of
+ // {@value#PROJECT_DIR_MAX_SIZE}.
+ public static final String PROJECT_DIR_CLEANUP_STOP_THRESHOLD = "azkaban.project"
+ + ".stop_cleanup_threshold";
}
public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 8ddecaa..7cd6932 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -16,6 +16,7 @@
package azkaban.utils;
+import com.google.common.base.Preconditions;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
@@ -25,9 +26,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileTime;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -69,6 +73,33 @@ public class FileIOUtils {
return true;
}
+
+ /**
+ * Return creation time of file.
+ */
+ public static long getCreationTime(final File file) throws IOException {
+ Preconditions.checkArgument(file.exists(), file + " doesn't exist");
+ final BasicFileAttributes attrs = Files
+ .readAttributes(file.toPath(), BasicFileAttributes.class);
+ final FileTime time = attrs.creationTime();
+ return time.toMillis();
+ }
+
+ /**
+ * Return the size of dir in KB.
+ */
+ public static long sizeInKB(final File dir) throws IOException {
+ Preconditions.checkArgument(dir.isDirectory(), dir + " is not a directory");
+ Preconditions.checkArgument(dir.exists(), dir + " doesn't exist");
+
+ final java.util.Scanner s = new java.util.Scanner(
+ Runtime.getRuntime().exec("du -sh -k " + dir.getAbsolutePath()).getInputStream(),
+ Charset.defaultCharset().name()).useDelimiter("\\A");
+ final String str = s.hasNext() ? s.next() : "";
+ final String[] res = str.split("\\s+");
+ return Long.valueOf(res[0]);
+ }
+
public static String getSourcePathFromClass(final Class<?> containedClass) {
File file =
new File(containedClass.getProtectionDomain().getCodeSource()
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 243a93c..506d16a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -70,17 +70,22 @@ public class FlowPreparer {
// First get the ProjectVersion
final ProjectVersion projectVersion = getProjectVersion(flow);
- // Setup the project
- setupProject(projectVersion);
-
- // Create the execution directory
- execDir = createExecDir(flow);
-
- // Create the symlinks from the project
- copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+ // Synchronized on {@code projectVersion} to prevent
+ // cleaning thread in {@link azkaban.execapp.FlowRunnerManager}
+ // cleaning up the same project version when creating hardlink
+ synchronized (projectVersion) {
+ // Setup the project
+ setupProject(projectVersion);
+
+ // Create the execution directory
+ execDir = createExecDir(flow);
+
+ // Create the links from the project
+ copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+ log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
+ flow.getExecutionId(), execDir.getPath()));
+ }
- log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
- flow.getExecutionId(), execDir.getPath()));
} catch (final Exception e) {
log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
cleanup(execDir);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index d65315d..523f815 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -16,7 +16,10 @@
package azkaban.execapp;
+import static azkaban.utils.FileIOUtils.getCreationTime;
+
import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
import azkaban.event.Event;
import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
@@ -46,6 +49,7 @@ import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.ThreadPoolExecutingListener;
import azkaban.utils.TrackingThreadPool;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -125,6 +129,10 @@ public class FlowRunnerManager implements EventListener,
private final File executionDirectory;
private final File projectDirectory;
+ private final long projectDirMaxSizeInMB;
+ private final double projectDirStartDeletionThreshold;
+ private final double projectDirStopDeletionThreshold;
+
private final Object executionDirDeletionSync = new Object();
private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
@@ -134,7 +142,7 @@ public class FlowRunnerManager implements EventListener,
private Props globalProps;
private long lastCleanerThreadCheckTime = -1;
- private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
+ private long executionDirRetention = 1 * 2 * 60 * 60 * 1000; // 2 hours
// We want to limit the log sizes to about 20 megs
private String jobLogChunkSize = "5MB";
@@ -173,6 +181,16 @@ public class FlowRunnerManager implements EventListener,
this.projectDirectory.mkdirs();
}
+ this.projectDirMaxSizeInMB = props.getLong(ConfigurationKeys.PROJECT_DIR_MAX_SIZE,
+ 2000000); // default value as 2TB
+ this.projectDirStartDeletionThreshold = props
+ .getDouble(ConfigurationKeys.PROJECT_DIR_CLEANUP_START_THRESHOLD, 0.9);
+ this.projectDirStopDeletionThreshold = props
+ .getDouble(ConfigurationKeys.PROJECT_DIR_CLEANUP_STOP_THRESHOLD, 0.8);
+ Preconditions.checkArgument(this.projectDirStartDeletionThreshold >= 0 && this
+ .projectDirStartDeletionThreshold <= 1 && this.projectDirStopDeletionThreshold >= 0 &&
+ this.projectDirStopDeletionThreshold <= 1);
+
this.installedProjects = loadExistingProjects();
// azkaban.temp.dir
@@ -781,11 +799,14 @@ public class FlowRunnerManager implements EventListener,
private static final long OLD_PROJECT_DIR_INTERVAL_MS = 5 * 60 * 1000;
// Every 2 mins clean the recently finished list
private static final long RECENTLY_FINISHED_INTERVAL_MS = 2 * 60 * 1000;
+ // Every 30 mins clean least recently used project dir
+ private static final long LRU_PROJECT_DIR_INTERVAL_MS = 30 * 60 * 1000;
// Every 5 mins kill flows running longer than allowed max running time
private static final long LONG_RUNNING_FLOW_KILLING_INTERVAL_MS = 5 * 60 * 1000;
private final long flowMaxRunningTimeInMins = FlowRunnerManager.this.azkabanProps.getInt(
Constants.ConfigurationKeys.AZKABAN_MAX_FLOW_RUNNING_MINS, -1);
+ private long lastLRUProjectCleanTime = -1;
private boolean shutdown = false;
private long lastExecutionDirCleanTime = -1;
private long lastOldProjectCleanTime = -1;
@@ -830,7 +851,7 @@ public class FlowRunnerManager implements EventListener,
if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > this.lastOldProjectCleanTime
&& FlowRunnerManager.this.isExecutorActive) {
logger.info("Cleaning old projects");
- cleanOlderProjects();
+ cleanProjectsOfOldVersions();
this.lastOldProjectCleanTime = currentTime;
}
@@ -840,6 +861,13 @@ public class FlowRunnerManager implements EventListener,
this.lastExecutionDirCleanTime = currentTime;
}
+ if (currentTime - LRU_PROJECT_DIR_INTERVAL_MS > this.lastLRUProjectCleanTime
+ && FlowRunnerManager.this.isExecutorActive) {
+ logger.info("Cleaning LRU project dirs");
+ cleanLRUProjects();
+ this.lastLRUProjectCleanTime = currentTime;
+ }
+
if (this.flowMaxRunningTimeInMins > 0
&& currentTime - LONG_RUNNING_FLOW_KILLING_INTERVAL_MS
> this.lastLongRunningFlowCleanTime) {
@@ -919,7 +947,7 @@ public class FlowRunnerManager implements EventListener,
}
}
- private void cleanOlderProjects() {
+ private void cleanProjectsOfOldVersions() {
final Map<Integer, ArrayList<ProjectVersion>> projectVersions =
new HashMap<>();
for (final ProjectVersion version : FlowRunnerManager.this.installedProjects.values()) {
@@ -932,13 +960,7 @@ public class FlowRunnerManager implements EventListener,
versionList.add(version);
}
- final HashSet<Pair<Integer, Integer>> activeProjectVersions =
- new HashSet<>();
- for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
- final ExecutableFlow flow = runner.getExecutableFlow();
- activeProjectVersions.add(new Pair<>(flow
- .getProjectId(), flow.getVersion()));
- }
+ final HashSet<Pair<Integer, Integer>> activeProjectVersions = getActiveProjectVersions();
for (final Map.Entry<Integer, ArrayList<ProjectVersion>> entry : projectVersions
.entrySet()) {
@@ -970,6 +992,105 @@ public class FlowRunnerManager implements EventListener,
}
}
}
+
+ private HashSet<Pair<Integer, Integer>> getActiveProjectVersions() {
+ final HashSet<Pair<Integer, Integer>> activeProjectVersions =
+ new HashSet<>();
+ for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
+ final ExecutableFlow flow = runner.getExecutableFlow();
+ activeProjectVersions.add(new Pair<>(flow
+ .getProjectId(), flow.getVersion()));
+ }
+ return activeProjectVersions;
+ }
+
+ /**
+ * Checks if the project version contains any running flow
+ */
+ private boolean isActiveProject(final ProjectVersion version) {
+ final Pair<Integer, Integer> versionKey = new Pair<>(version.getProjectId(),
+ version.getVersion());
+ return getActiveProjectVersions().contains(versionKey);
+ }
+
+ private boolean shouldCleanup(final File projectDir) {
+ final long dirSize;
+ try {
+ dirSize = FileIOUtils.sizeInKB(projectDir);
+ final long upperLimitInKB = (long) (FlowRunnerManager.this.projectDirMaxSizeInMB *
+ FlowRunnerManager.this.projectDirStartDeletionThreshold * 1024);
+ return dirSize >= upperLimitInKB;
+ } catch (final IOException e) {
+ logger.error(e);
+ return false;
+ }
+ }
+
+ private boolean shouldKeepCleanup(final File projectDir) {
+ final long dirSize;
+ try {
+ dirSize = FileIOUtils.sizeInKB(projectDir);
+ final long lowerLimitInKB = (long) (FlowRunnerManager.this.projectDirMaxSizeInMB *
+ FlowRunnerManager.this.projectDirStopDeletionThreshold * 1024);
+ return dirSize >= lowerLimitInKB;
+ } catch (final IOException e) {
+ logger.error(e);
+ return false;
+ }
+ }
+
+
+ /**
+ * delete least recently used projects
+ */
+ private void cleanLRUProjects() throws IOException {
+ if (shouldCleanup(FlowRunnerManager.this.projectDirectory)) {
+ final List<ProjectVersion> projectVersionList = new ArrayList<>(
+ FlowRunnerManager.this.installedProjects.values());
+
+ // sort project version by last creation time in ascending order
+ projectVersionList.sort((o1, o2) -> {
+ try {
+ final long creationTime1 = getCreationTime(o1.getInstalledDir());
+ final long creationTime2 = getCreationTime(o2.getInstalledDir());
+ if (creationTime1 < creationTime2) {
+ return -1;
+ } else if (creationTime1 > creationTime2) {
+ return 1;
+ } else {
+ return 0;
+ }
+ } catch (final IOException ex) {
+ logger.error("error reading project dir", ex);
+ return 0;
+ }
+ });
+
+ for (final ProjectVersion version : projectVersionList) {
+ if (!isActiveProject(version) && shouldKeepCleanup(FlowRunnerManager.this
+ .projectDirectory)) {
+ delete(version);
+ }
+ }
+ }
+ }
+
+ /**
+ * Delete the project dir associated with {@code version}.
+ * It first acquires object lock of {@code version} waiting for other threads creating
+ * execution dir to finish to avoid race condition. An example of race condition scenario:
+ * delete the dir of a project while an execution of a flow in the same project is being setup
+ * and the flow's execution dir is being created({@link FlowPreparer#setup}).
+ */
+ private void delete(final ProjectVersion version) {
+ synchronized (version) {
+ try {
+ deleteDirectory(version);
+ } catch (final IOException ex) {
+ logger.error(ex);
+ }
+ }
+ }
}
}