azkaban-aplcache
Changes
az-core/src/main/java/azkaban/Constants.java 11(+0 -11)
Details
az-core/src/main/java/azkaban/Constants.java 11(+0 -11)
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 58f4d96..a85faf6 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -227,17 +227,6 @@ public class Constants {
public static final String QUEUEPROCESSING_ENABLED = "azkaban.queueprocessing.enabled";
public static final String SESSION_TIME_TO_LIVE = "session.time.to.live";
-
- // allowed max size of project dir in mb
- public static final String PROJECT_DIR_MAX_SIZE = "azkaban.project_cache_max_size_in_mb";
-
- // the disk usage threshold to trigger project dir cleanup.
- // E.g, if set to 80, cleanup will trigger when project dir size >= 80% of {@value#PROJECT_DIR_MAX_SIZE}.
- public static final String PROJECT_DIR_CLEANUP_START_THRESHOLD = "azkaban.project_cache_start_cleanup_threshold";
-
- // the disk usage threshold to stop project dir cleanup.
- // E.g, if set to 60, cleanup will stop when project dir size < 60% of {@value#PROJECT_DIR_MAX_SIZE}.
- public static final String PROJECT_DIR_CLEANUP_STOP_THRESHOLD = "azkaban.project_cache.stop_cleanup_threshold";
}
public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 7cd6932..8ddecaa 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -16,7 +16,6 @@
package azkaban.utils;
-import com.google.common.base.Preconditions;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
@@ -26,12 +25,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.nio.file.attribute.FileTime;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -73,33 +69,6 @@ public class FileIOUtils {
return true;
}
-
- /**
- * Return creation time of file.
- */
- public static long getCreationTime(final File file) throws IOException {
- Preconditions.checkArgument(file.exists(), file + " doesn't exist");
- final BasicFileAttributes attrs = Files
- .readAttributes(file.toPath(), BasicFileAttributes.class);
- final FileTime time = attrs.creationTime();
- return time.toMillis();
- }
-
- /**
- * Return the size of dir in KB.
- */
- public static long sizeInKB(final File dir) throws IOException {
- Preconditions.checkArgument(dir.isDirectory(), dir + " is not a directory");
- Preconditions.checkArgument(dir.exists(), dir + " doesn't exist");
-
- final java.util.Scanner s = new java.util.Scanner(
- Runtime.getRuntime().exec("du -sh -k " + dir.getAbsolutePath()).getInputStream(),
- Charset.defaultCharset().name()).useDelimiter("\\A");
- final String str = s.hasNext() ? s.next() : "";
- final String[] res = str.split("\\s+");
- return Long.valueOf(res[0]);
- }
-
public static String getSourcePathFromClass(final Class<?> containedClass) {
File file =
new File(containedClass.getProtectionDomain().getCodeSource()
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 506d16a..243a93c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -70,22 +70,17 @@ public class FlowPreparer {
// First get the ProjectVersion
final ProjectVersion projectVersion = getProjectVersion(flow);
- // Synchronized on {@code projectVersion} to prevent
- // cleaning thread in {@link azkaban.execapp.FlowRunnerManager}
- // cleaning up the same project version when creating hardlink
- synchronized (projectVersion) {
- // Setup the project
- setupProject(projectVersion);
-
- // Create the execution directory
- execDir = createExecDir(flow);
-
- // Create the links from the project
- copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
- log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
- flow.getExecutionId(), execDir.getPath()));
- }
+ // Setup the project
+ setupProject(projectVersion);
+
+ // Create the execution directory
+ execDir = createExecDir(flow);
+
+ // Create the symlinks from the project
+ copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+ log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
+ flow.getExecutionId(), execDir.getPath()));
} catch (final Exception e) {
log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
cleanup(execDir);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index c33ef3f..d65315d 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -16,10 +16,7 @@
package azkaban.execapp;
-import static azkaban.utils.FileIOUtils.getCreationTime;
-
import azkaban.Constants;
-import azkaban.Constants.ConfigurationKeys;
import azkaban.event.Event;
import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
@@ -49,7 +46,6 @@ import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.ThreadPoolExecutingListener;
import azkaban.utils.TrackingThreadPool;
-import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -129,10 +125,6 @@ public class FlowRunnerManager implements EventListener,
private final File executionDirectory;
private final File projectDirectory;
- private final long projectDirMaxSizeInMb;
- private final int projectDirStartDeletionThreshold;
- private final int projectDirStopDeletionThreshold;
-
private final Object executionDirDeletionSync = new Object();
private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
@@ -142,7 +134,7 @@ public class FlowRunnerManager implements EventListener,
private Props globalProps;
private long lastCleanerThreadCheckTime = -1;
- private long executionDirRetention = 1 * 2 * 60 * 60 * 1000; // 2 hours
+ private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
// We want to limit the log sizes to about 20 megs
private String jobLogChunkSize = "5MB";
@@ -181,16 +173,6 @@ public class FlowRunnerManager implements EventListener,
this.projectDirectory.mkdirs();
}
- this.projectDirMaxSizeInMb = props.getLong(ConfigurationKeys.PROJECT_DIR_MAX_SIZE,
- 128000); // default value as 128GB
- this.projectDirStartDeletionThreshold = props
- .getInt(ConfigurationKeys.PROJECT_DIR_CLEANUP_START_THRESHOLD, 90);
- this.projectDirStopDeletionThreshold = props
- .getInt(ConfigurationKeys.PROJECT_DIR_CLEANUP_STOP_THRESHOLD, 60);
- Preconditions.checkArgument(this.projectDirStartDeletionThreshold >= 0 && this
- .projectDirStartDeletionThreshold <= 100 && this.projectDirStopDeletionThreshold >= 0 &&
- this.projectDirStopDeletionThreshold < this.projectDirStartDeletionThreshold);
-
this.installedProjects = loadExistingProjects();
// azkaban.temp.dir
@@ -799,14 +781,11 @@ public class FlowRunnerManager implements EventListener,
private static final long OLD_PROJECT_DIR_INTERVAL_MS = 5 * 60 * 1000;
// Every 2 mins clean the recently finished list
private static final long RECENTLY_FINISHED_INTERVAL_MS = 2 * 60 * 1000;
- // Every 30 mins clean least recently used project dir
- private static final long LRU_PROJECT_DIR_INTERVAL_MS = 30 * 60 * 1000;
// Every 5 mins kill flows running longer than allowed max running time
private static final long LONG_RUNNING_FLOW_KILLING_INTERVAL_MS = 5 * 60 * 1000;
private final long flowMaxRunningTimeInMins = FlowRunnerManager.this.azkabanProps.getInt(
Constants.ConfigurationKeys.AZKABAN_MAX_FLOW_RUNNING_MINS, -1);
- private long lastLRUProjectCleanTime = -1;
private boolean shutdown = false;
private long lastExecutionDirCleanTime = -1;
private long lastOldProjectCleanTime = -1;
@@ -851,7 +830,7 @@ public class FlowRunnerManager implements EventListener,
if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > this.lastOldProjectCleanTime
&& FlowRunnerManager.this.isExecutorActive) {
logger.info("Cleaning old projects");
- cleanProjectsOfOldVersions();
+ cleanOlderProjects();
this.lastOldProjectCleanTime = currentTime;
}
@@ -861,13 +840,6 @@ public class FlowRunnerManager implements EventListener,
this.lastExecutionDirCleanTime = currentTime;
}
- if (currentTime - LRU_PROJECT_DIR_INTERVAL_MS > this.lastLRUProjectCleanTime
- && FlowRunnerManager.this.isExecutorActive) {
- logger.info("Cleaning LRU project dirs");
- cleanLRUProjects();
- this.lastLRUProjectCleanTime = currentTime;
- }
-
if (this.flowMaxRunningTimeInMins > 0
&& currentTime - LONG_RUNNING_FLOW_KILLING_INTERVAL_MS
> this.lastLongRunningFlowCleanTime) {
@@ -947,7 +919,7 @@ public class FlowRunnerManager implements EventListener,
}
}
- private void cleanProjectsOfOldVersions() {
+ private void cleanOlderProjects() {
final Map<Integer, ArrayList<ProjectVersion>> projectVersions =
new HashMap<>();
for (final ProjectVersion version : FlowRunnerManager.this.installedProjects.values()) {
@@ -960,7 +932,13 @@ public class FlowRunnerManager implements EventListener,
versionList.add(version);
}
- final HashSet<Pair<Integer, Integer>> activeProjectVersions = getActiveProjectVersions();
+ final HashSet<Pair<Integer, Integer>> activeProjectVersions =
+ new HashSet<>();
+ for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
+ final ExecutableFlow flow = runner.getExecutableFlow();
+ activeProjectVersions.add(new Pair<>(flow
+ .getProjectId(), flow.getVersion()));
+ }
for (final Map.Entry<Integer, ArrayList<ProjectVersion>> entry : projectVersions
.entrySet()) {
@@ -992,105 +970,6 @@ public class FlowRunnerManager implements EventListener,
}
}
}
-
- private HashSet<Pair<Integer, Integer>> getActiveProjectVersions() {
- final HashSet<Pair<Integer, Integer>> activeProjectVersions =
- new HashSet<>();
- for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
- final ExecutableFlow flow = runner.getExecutableFlow();
- activeProjectVersions.add(new Pair<>(flow
- .getProjectId(), flow.getVersion()));
- }
- return activeProjectVersions;
- }
-
- /**
- * Checks if the project version contains any running flow
- */
- private boolean isActiveProject(final ProjectVersion version) {
- final Pair<Integer, Integer> versionKey = new Pair<>(version.getProjectId(),
- version.getVersion());
- return getActiveProjectVersions().contains(versionKey);
- }
-
- private boolean shouldCleanup(final File projectDir) {
- final long dirSize;
- try {
- dirSize = FileIOUtils.sizeInKB(projectDir);
- final long upperLimitInKB = (long) (FlowRunnerManager.this.projectDirMaxSizeInMb *
- FlowRunnerManager.this.projectDirStartDeletionThreshold * 0.01 * 1024);
- return dirSize >= upperLimitInKB;
- } catch (final IOException e) {
- logger.error(e);
- return false;
- }
- }
-
- private boolean shouldKeepCleanup(final File projectDir) {
- final long dirSize;
- try {
- dirSize = FileIOUtils.sizeInKB(projectDir);
- final long lowerLimitInKB = (long) (FlowRunnerManager.this.projectDirMaxSizeInMb *
- FlowRunnerManager.this.projectDirStopDeletionThreshold * 0.01 * 1024);
- return dirSize >= lowerLimitInKB;
- } catch (final IOException e) {
- logger.error(e);
- return false;
- }
- }
-
-
- /**
- * delete least recently used projects
- */
- private void cleanLRUProjects() throws IOException {
- if (shouldCleanup(FlowRunnerManager.this.projectDirectory)) {
- final List<ProjectVersion> projectVersionList = new ArrayList<>(
- FlowRunnerManager.this.installedProjects.values());
-
- // sort project version by last creation time in ascending order
- projectVersionList.sort((o1, o2) -> {
- try {
- final long creationTime1 = getCreationTime(o1.getInstalledDir());
- final long creationTime2 = getCreationTime(o2.getInstalledDir());
- if (creationTime1 < creationTime2) {
- return -1;
- } else if (creationTime1 > creationTime2) {
- return 1;
- } else {
- return 0;
- }
- } catch (final IOException ex) {
- logger.error("error reading project dir", ex);
- return 0;
- }
- });
-
- for (final ProjectVersion version : projectVersionList) {
- if (!isActiveProject(version) && shouldKeepCleanup(FlowRunnerManager.this
- .projectDirectory)) {
- delete(version);
- }
- }
- }
- }
-
- /**
- * Delete the project dir associated with {@code version}.
- * It first acquires object lock of {@code version} waiting for other threads creating
- * execution dir to finish to avoid race condition. An example of race condition scenario:
- * delete the dir of a project while an execution of a flow in the same project is being setup
- * and the flow's execution dir is being created({@link FlowPreparer#setup}).
- */
- private void delete(final ProjectVersion version) {
- synchronized (version) {
- try {
- deleteDirectory(version);
- } catch (final IOException ex) {
- logger.error(ex);
- }
- }
- }
}
}