azkaban-aplcache

Clean shared project directory when disk usage is too high (#1803)

6/19/2018 6:16:35 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index a85faf6..2fe093d 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -227,6 +227,21 @@ public class Constants {
     public static final String QUEUEPROCESSING_ENABLED = "azkaban.queueprocessing.enabled";
 
     public static final String SESSION_TIME_TO_LIVE = "session.time.to.live";
+
+    // allowed max size of project dir in mb
+    public static final String PROJECT_DIR_MAX_SIZE = "azkaban.project.dir_max_size";
+
+    // the disk usage threshold to trigger project dir cleanup.
+    // E.g, if set to 0.8, cleanup will trigger when project dir size >= 0.8 of
+    // {@value#PROJECT_DIR_MAX_SIZE}.
+    public static final String PROJECT_DIR_CLEANUP_START_THRESHOLD = "azkaban.project"
+        + ".start_cleanup_threshold";
+
+    // the disk usage threshold to stop project dir cleanup.
+    // E.g, if set to 0.6, cleanup will stop when project dir size < 0.6 of
+    // {@value#PROJECT_DIR_MAX_SIZE}.
+    public static final String PROJECT_DIR_CLEANUP_STOP_THRESHOLD = "azkaban.project"
+        + ".stop_cleanup_threshold";
   }
 
   public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 8ddecaa..7cd6932 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -16,6 +16,7 @@
 
 package azkaban.utils;
 
+import com.google.common.base.Preconditions;
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.File;
@@ -25,9 +26,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileTime;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -69,6 +73,33 @@ public class FileIOUtils {
     return true;
   }
 
+
+  /**
+   * Return creation time of file.
+   */
+  public static long getCreationTime(final File file) throws IOException {
+    Preconditions.checkArgument(file.exists(), file + " doesn't exist");
+    final BasicFileAttributes attrs = Files
+        .readAttributes(file.toPath(), BasicFileAttributes.class);
+    final FileTime time = attrs.creationTime();
+    return time.toMillis();
+  }
+
+  /**
+   * Return the size of dir in KB.
+   */
+  public static long sizeInKB(final File dir) throws IOException {
+    Preconditions.checkArgument(dir.isDirectory(), dir + " is not a directory");
+    Preconditions.checkArgument(dir.exists(), dir + " doesn't exist");
+
+    final java.util.Scanner s = new java.util.Scanner(
+        Runtime.getRuntime().exec("du -sh -k " + dir.getAbsolutePath()).getInputStream(),
+        Charset.defaultCharset().name()).useDelimiter("\\A");
+    final String str = s.hasNext() ? s.next() : "";
+    final String[] res = str.split("\\s+");
+    return Long.valueOf(res[0]);
+  }
+
   public static String getSourcePathFromClass(final Class<?> containedClass) {
     File file =
         new File(containedClass.getProtectionDomain().getCodeSource()
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 243a93c..506d16a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -70,17 +70,22 @@ public class FlowPreparer {
       // First get the ProjectVersion
       final ProjectVersion projectVersion = getProjectVersion(flow);
 
-      // Setup the project
-      setupProject(projectVersion);
-
-      // Create the execution directory
-      execDir = createExecDir(flow);
-
-      // Create the symlinks from the project
-      copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+      // Synchronized on {@code projectVersion} to prevent
+      // cleaning thread in {@link azkaban.execapp.FlowRunnerManager}
+      // cleaning up the same project version when creating hardlink
+      synchronized (projectVersion) {
+        // Setup the project
+        setupProject(projectVersion);
+
+        // Create the execution directory
+        execDir = createExecDir(flow);
+
+        // Create the links from the project
+        copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
+        log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
+            flow.getExecutionId(), execDir.getPath()));
+      }
 
-      log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
-          flow.getExecutionId(), execDir.getPath()));
     } catch (final Exception e) {
       log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
       cleanup(execDir);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index d65315d..523f815 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -16,7 +16,10 @@
 
 package azkaban.execapp;
 
+import static azkaban.utils.FileIOUtils.getCreationTime;
+
 import azkaban.Constants;
+import azkaban.Constants.ConfigurationKeys;
 import azkaban.event.Event;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
@@ -46,6 +49,7 @@ import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.ThreadPoolExecutingListener;
 import azkaban.utils.TrackingThreadPool;
+import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -125,6 +129,10 @@ public class FlowRunnerManager implements EventListener,
   private final File executionDirectory;
   private final File projectDirectory;
 
+  private final long projectDirMaxSizeInMB;
+  private final double projectDirStartDeletionThreshold;
+  private final double projectDirStopDeletionThreshold;
+
   private final Object executionDirDeletionSync = new Object();
 
   private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
@@ -134,7 +142,7 @@ public class FlowRunnerManager implements EventListener,
   private Props globalProps;
 
   private long lastCleanerThreadCheckTime = -1;
-  private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
+  private long executionDirRetention = 1 * 2 * 60 * 60 * 1000; // 2 hours
 
   // We want to limit the log sizes to about 20 megs
   private String jobLogChunkSize = "5MB";
@@ -173,6 +181,16 @@ public class FlowRunnerManager implements EventListener,
       this.projectDirectory.mkdirs();
     }
 
+    this.projectDirMaxSizeInMB = props.getLong(ConfigurationKeys.PROJECT_DIR_MAX_SIZE,
+        2000000); // default value as 2TB
+    this.projectDirStartDeletionThreshold = props
+        .getDouble(ConfigurationKeys.PROJECT_DIR_CLEANUP_START_THRESHOLD, 0.9);
+    this.projectDirStopDeletionThreshold = props
+        .getDouble(ConfigurationKeys.PROJECT_DIR_CLEANUP_STOP_THRESHOLD, 0.8);
+    Preconditions.checkArgument(this.projectDirStartDeletionThreshold >= 0 && this
+        .projectDirStartDeletionThreshold <= 1 && this.projectDirStopDeletionThreshold >= 0 &&
+        this.projectDirStopDeletionThreshold <= 1);
+
     this.installedProjects = loadExistingProjects();
 
     // azkaban.temp.dir
@@ -781,11 +799,14 @@ public class FlowRunnerManager implements EventListener,
     private static final long OLD_PROJECT_DIR_INTERVAL_MS = 5 * 60 * 1000;
     // Every 2 mins clean the recently finished list
     private static final long RECENTLY_FINISHED_INTERVAL_MS = 2 * 60 * 1000;
+    // Every 30 mins clean least recently used project dir
+    private static final long LRU_PROJECT_DIR_INTERVAL_MS = 30 * 60 * 1000;
 
     // Every 5 mins kill flows running longer than allowed max running time
     private static final long LONG_RUNNING_FLOW_KILLING_INTERVAL_MS = 5 * 60 * 1000;
     private final long flowMaxRunningTimeInMins = FlowRunnerManager.this.azkabanProps.getInt(
         Constants.ConfigurationKeys.AZKABAN_MAX_FLOW_RUNNING_MINS, -1);
+    private long lastLRUProjectCleanTime = -1;
     private boolean shutdown = false;
     private long lastExecutionDirCleanTime = -1;
     private long lastOldProjectCleanTime = -1;
@@ -830,7 +851,7 @@ public class FlowRunnerManager implements EventListener,
             if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > this.lastOldProjectCleanTime
                 && FlowRunnerManager.this.isExecutorActive) {
               logger.info("Cleaning old projects");
-              cleanOlderProjects();
+              cleanProjectsOfOldVersions();
               this.lastOldProjectCleanTime = currentTime;
             }
 
@@ -840,6 +861,13 @@ public class FlowRunnerManager implements EventListener,
               this.lastExecutionDirCleanTime = currentTime;
             }
 
+            if (currentTime - LRU_PROJECT_DIR_INTERVAL_MS > this.lastLRUProjectCleanTime
+                && FlowRunnerManager.this.isExecutorActive) {
+              logger.info("Cleaning LRU project dirs");
+              cleanLRUProjects();
+              this.lastLRUProjectCleanTime = currentTime;
+            }
+
             if (this.flowMaxRunningTimeInMins > 0
                 && currentTime - LONG_RUNNING_FLOW_KILLING_INTERVAL_MS
                 > this.lastLongRunningFlowCleanTime) {
@@ -919,7 +947,7 @@ public class FlowRunnerManager implements EventListener,
       }
     }
 
-    private void cleanOlderProjects() {
+    private void cleanProjectsOfOldVersions() {
       final Map<Integer, ArrayList<ProjectVersion>> projectVersions =
           new HashMap<>();
       for (final ProjectVersion version : FlowRunnerManager.this.installedProjects.values()) {
@@ -932,13 +960,7 @@ public class FlowRunnerManager implements EventListener,
         versionList.add(version);
       }
 
-      final HashSet<Pair<Integer, Integer>> activeProjectVersions =
-          new HashSet<>();
-      for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
-        final ExecutableFlow flow = runner.getExecutableFlow();
-        activeProjectVersions.add(new Pair<>(flow
-            .getProjectId(), flow.getVersion()));
-      }
+      final HashSet<Pair<Integer, Integer>> activeProjectVersions = getActiveProjectVersions();
 
       for (final Map.Entry<Integer, ArrayList<ProjectVersion>> entry : projectVersions
           .entrySet()) {
@@ -970,6 +992,105 @@ public class FlowRunnerManager implements EventListener,
         }
       }
     }
+
+    private HashSet<Pair<Integer, Integer>> getActiveProjectVersions() {
+      final HashSet<Pair<Integer, Integer>> activeProjectVersions =
+          new HashSet<>();
+      for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
+        final ExecutableFlow flow = runner.getExecutableFlow();
+        activeProjectVersions.add(new Pair<>(flow
+            .getProjectId(), flow.getVersion()));
+      }
+      return activeProjectVersions;
+    }
+
+    /**
+     * Checks if the project version contains any running flow
+     */
+    private boolean isActiveProject(final ProjectVersion version) {
+      final Pair<Integer, Integer> versionKey = new Pair<>(version.getProjectId(),
+          version.getVersion());
+      return getActiveProjectVersions().contains(versionKey);
+    }
+
+    private boolean shouldCleanup(final File projectDir) {
+      final long dirSize;
+      try {
+        dirSize = FileIOUtils.sizeInKB(projectDir);
+        final long upperLimitInKB = (long) (FlowRunnerManager.this.projectDirMaxSizeInMB *
+            FlowRunnerManager.this.projectDirStartDeletionThreshold * 1024);
+        return dirSize >= upperLimitInKB;
+      } catch (final IOException e) {
+        logger.error(e);
+        return false;
+      }
+    }
+
+    private boolean shouldKeepCleanup(final File projectDir) {
+      final long dirSize;
+      try {
+        dirSize = FileIOUtils.sizeInKB(projectDir);
+        final long lowerLimitInKB = (long) (FlowRunnerManager.this.projectDirMaxSizeInMB *
+            FlowRunnerManager.this.projectDirStopDeletionThreshold * 1024);
+        return dirSize >= lowerLimitInKB;
+      } catch (final IOException e) {
+        logger.error(e);
+        return false;
+      }
+    }
+
+
+    /**
+     * delete least recently used projects
+     */
+    private void cleanLRUProjects() throws IOException {
+      if (shouldCleanup(FlowRunnerManager.this.projectDirectory)) {
+        final List<ProjectVersion> projectVersionList = new ArrayList<>(
+            FlowRunnerManager.this.installedProjects.values());
+
+        // sort project version by last creation time in ascending order
+        projectVersionList.sort((o1, o2) -> {
+          try {
+            final long creationTime1 = getCreationTime(o1.getInstalledDir());
+            final long creationTime2 = getCreationTime(o2.getInstalledDir());
+            if (creationTime1 < creationTime2) {
+              return -1;
+            } else if (creationTime1 > creationTime2) {
+              return 1;
+            } else {
+              return 0;
+            }
+          } catch (final IOException ex) {
+            logger.error("error reading project dir", ex);
+            return 0;
+          }
+        });
+
+        for (final ProjectVersion version : projectVersionList) {
+          if (!isActiveProject(version) && shouldKeepCleanup(FlowRunnerManager.this
+              .projectDirectory)) {
+            delete(version);
+          }
+        }
+      }
+    }
+
+    /**
+     * Delete the project dir associated with {@code version}.
+     * It first acquires object lock of {@code version} waiting for other threads creating
+     * execution dir to finish to avoid race condition. An example of race condition scenario:
+     * delete the dir of a project while an execution of a flow in the same project is being setup
+     * and the flow's execution dir is being created({@link FlowPreparer#setup}).
+     */
+    private void delete(final ProjectVersion version) {
+      synchronized (version) {
+        try {
+          deleteDirectory(version);
+        } catch (final IOException ex) {
+          logger.error(ex);
+        }
+      }
+    }
   }
 
 }