azkaban-aplcache

Revert changes related to project LRU cache (#1841) More

7/9/2018 7:07:52 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 58f4d96..a85faf6 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -227,17 +227,6 @@ public class Constants {
     public static final String QUEUEPROCESSING_ENABLED = "azkaban.queueprocessing.enabled";
 
     public static final String SESSION_TIME_TO_LIVE = "session.time.to.live";
-
-    // allowed max size of project dir in mb
-    public static final String PROJECT_DIR_MAX_SIZE = "azkaban.project_cache_max_size_in_mb";
-
-    // the disk usage threshold to trigger project dir cleanup.
-    // E.g, if set to 80, cleanup will trigger when project dir size >= 80% of {@value#PROJECT_DIR_MAX_SIZE}.
-    public static final String PROJECT_DIR_CLEANUP_START_THRESHOLD = "azkaban.project_cache_start_cleanup_threshold";
-
-    // the disk usage threshold to stop project dir cleanup.
-    // E.g, if set to 60, cleanup will stop when project dir size < 60% of {@value#PROJECT_DIR_MAX_SIZE}.
-    public static final String PROJECT_DIR_CLEANUP_STOP_THRESHOLD = "azkaban.project_cache.stop_cleanup_threshold";
   }
 
   public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 7cd6932..8ddecaa 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -16,7 +16,6 @@
 
 package azkaban.utils;
 
-import com.google.common.base.Preconditions;
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.File;
@@ -26,12 +25,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.nio.file.attribute.FileTime;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -73,33 +69,6 @@ public class FileIOUtils {
     return true;
   }
 
-
-  /**
-   * Return creation time of file.
-   */
-  public static long getCreationTime(final File file) throws IOException {
-    Preconditions.checkArgument(file.exists(), file + " doesn't exist");
-    final BasicFileAttributes attrs = Files
-        .readAttributes(file.toPath(), BasicFileAttributes.class);
-    final FileTime time = attrs.creationTime();
-    return time.toMillis();
-  }
-
-  /**
-   * Return the size of dir in KB.
-   */
-  public static long sizeInKB(final File dir) throws IOException {
-    Preconditions.checkArgument(dir.isDirectory(), dir + " is not a directory");
-    Preconditions.checkArgument(dir.exists(), dir + " doesn't exist");
-
-    final java.util.Scanner s = new java.util.Scanner(
-        Runtime.getRuntime().exec("du -sh -k " + dir.getAbsolutePath()).getInputStream(),
-        Charset.defaultCharset().name()).useDelimiter("\\A");
-    final String str = s.hasNext() ? s.next() : "";
-    final String[] res = str.split("\\s+");
-    return Long.valueOf(res[0]);
-  }
-
   public static String getSourcePathFromClass(final Class<?> containedClass) {
     File file =
         new File(containedClass.getProtectionDomain().getCodeSource()
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 506d16a..243a93c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -70,22 +70,17 @@ public class FlowPreparer {
       // First get the ProjectVersion
       final ProjectVersion projectVersion = getProjectVersion(flow);
 
-      // Synchronized on {@code projectVersion} to prevent
-      // cleaning thread in {@link azkaban.execapp.FlowRunnerManager}
-      // cleaning up the same project version when creating hardlink
-      synchronized (projectVersion) {
-        // Setup the project
-        setupProject(projectVersion);
-
-        // Create the execution directory
-        execDir = createExecDir(flow);
-
-        // Create the links from the project
-        copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
-        log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
-            flow.getExecutionId(), execDir.getPath()));
-      }
+      // Setup the project
+      setupProject(projectVersion);
+
+      // Create the execution directory
+      execDir = createExecDir(flow);
+
+      // Create the symlinks from the project
+      copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
 
+      log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
+          flow.getExecutionId(), execDir.getPath()));
     } catch (final Exception e) {
       log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
       cleanup(execDir);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index c33ef3f..d65315d 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -16,10 +16,7 @@
 
 package azkaban.execapp;
 
-import static azkaban.utils.FileIOUtils.getCreationTime;
-
 import azkaban.Constants;
-import azkaban.Constants.ConfigurationKeys;
 import azkaban.event.Event;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
@@ -49,7 +46,6 @@ import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.ThreadPoolExecutingListener;
 import azkaban.utils.TrackingThreadPool;
-import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -129,10 +125,6 @@ public class FlowRunnerManager implements EventListener,
   private final File executionDirectory;
   private final File projectDirectory;
 
-  private final long projectDirMaxSizeInMb;
-  private final int projectDirStartDeletionThreshold;
-  private final int projectDirStopDeletionThreshold;
-
   private final Object executionDirDeletionSync = new Object();
 
   private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
@@ -142,7 +134,7 @@ public class FlowRunnerManager implements EventListener,
   private Props globalProps;
 
   private long lastCleanerThreadCheckTime = -1;
-  private long executionDirRetention = 1 * 2 * 60 * 60 * 1000; // 2 hours
+  private long executionDirRetention = 1 * 24 * 60 * 60 * 1000; // 1 Day
 
   // We want to limit the log sizes to about 20 megs
   private String jobLogChunkSize = "5MB";
@@ -181,16 +173,6 @@ public class FlowRunnerManager implements EventListener,
       this.projectDirectory.mkdirs();
     }
 
-    this.projectDirMaxSizeInMb = props.getLong(ConfigurationKeys.PROJECT_DIR_MAX_SIZE,
-        128000); // default value as 128GB
-    this.projectDirStartDeletionThreshold = props
-        .getInt(ConfigurationKeys.PROJECT_DIR_CLEANUP_START_THRESHOLD, 90);
-    this.projectDirStopDeletionThreshold = props
-        .getInt(ConfigurationKeys.PROJECT_DIR_CLEANUP_STOP_THRESHOLD, 60);
-    Preconditions.checkArgument(this.projectDirStartDeletionThreshold >= 0 && this
-        .projectDirStartDeletionThreshold <= 100 && this.projectDirStopDeletionThreshold >= 0 &&
-        this.projectDirStopDeletionThreshold < this.projectDirStartDeletionThreshold);
-
     this.installedProjects = loadExistingProjects();
 
     // azkaban.temp.dir
@@ -799,14 +781,11 @@ public class FlowRunnerManager implements EventListener,
     private static final long OLD_PROJECT_DIR_INTERVAL_MS = 5 * 60 * 1000;
     // Every 2 mins clean the recently finished list
     private static final long RECENTLY_FINISHED_INTERVAL_MS = 2 * 60 * 1000;
-    // Every 30 mins clean least recently used project dir
-    private static final long LRU_PROJECT_DIR_INTERVAL_MS = 30 * 60 * 1000;
 
     // Every 5 mins kill flows running longer than allowed max running time
     private static final long LONG_RUNNING_FLOW_KILLING_INTERVAL_MS = 5 * 60 * 1000;
     private final long flowMaxRunningTimeInMins = FlowRunnerManager.this.azkabanProps.getInt(
         Constants.ConfigurationKeys.AZKABAN_MAX_FLOW_RUNNING_MINS, -1);
-    private long lastLRUProjectCleanTime = -1;
     private boolean shutdown = false;
     private long lastExecutionDirCleanTime = -1;
     private long lastOldProjectCleanTime = -1;
@@ -851,7 +830,7 @@ public class FlowRunnerManager implements EventListener,
             if (currentTime - OLD_PROJECT_DIR_INTERVAL_MS > this.lastOldProjectCleanTime
                 && FlowRunnerManager.this.isExecutorActive) {
               logger.info("Cleaning old projects");
-              cleanProjectsOfOldVersions();
+              cleanOlderProjects();
               this.lastOldProjectCleanTime = currentTime;
             }
 
@@ -861,13 +840,6 @@ public class FlowRunnerManager implements EventListener,
               this.lastExecutionDirCleanTime = currentTime;
             }
 
-            if (currentTime - LRU_PROJECT_DIR_INTERVAL_MS > this.lastLRUProjectCleanTime
-                && FlowRunnerManager.this.isExecutorActive) {
-              logger.info("Cleaning LRU project dirs");
-              cleanLRUProjects();
-              this.lastLRUProjectCleanTime = currentTime;
-            }
-
             if (this.flowMaxRunningTimeInMins > 0
                 && currentTime - LONG_RUNNING_FLOW_KILLING_INTERVAL_MS
                 > this.lastLongRunningFlowCleanTime) {
@@ -947,7 +919,7 @@ public class FlowRunnerManager implements EventListener,
       }
     }
 
-    private void cleanProjectsOfOldVersions() {
+    private void cleanOlderProjects() {
       final Map<Integer, ArrayList<ProjectVersion>> projectVersions =
           new HashMap<>();
       for (final ProjectVersion version : FlowRunnerManager.this.installedProjects.values()) {
@@ -960,7 +932,13 @@ public class FlowRunnerManager implements EventListener,
         versionList.add(version);
       }
 
-      final HashSet<Pair<Integer, Integer>> activeProjectVersions = getActiveProjectVersions();
+      final HashSet<Pair<Integer, Integer>> activeProjectVersions =
+          new HashSet<>();
+      for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
+        final ExecutableFlow flow = runner.getExecutableFlow();
+        activeProjectVersions.add(new Pair<>(flow
+            .getProjectId(), flow.getVersion()));
+      }
 
       for (final Map.Entry<Integer, ArrayList<ProjectVersion>> entry : projectVersions
           .entrySet()) {
@@ -992,105 +970,6 @@ public class FlowRunnerManager implements EventListener,
         }
       }
     }
-
-    private HashSet<Pair<Integer, Integer>> getActiveProjectVersions() {
-      final HashSet<Pair<Integer, Integer>> activeProjectVersions =
-          new HashSet<>();
-      for (final FlowRunner runner : FlowRunnerManager.this.runningFlows.values()) {
-        final ExecutableFlow flow = runner.getExecutableFlow();
-        activeProjectVersions.add(new Pair<>(flow
-            .getProjectId(), flow.getVersion()));
-      }
-      return activeProjectVersions;
-    }
-
-    /**
-     * Checks if the project version contains any running flow
-     */
-    private boolean isActiveProject(final ProjectVersion version) {
-      final Pair<Integer, Integer> versionKey = new Pair<>(version.getProjectId(),
-          version.getVersion());
-      return getActiveProjectVersions().contains(versionKey);
-    }
-
-    private boolean shouldCleanup(final File projectDir) {
-      final long dirSize;
-      try {
-        dirSize = FileIOUtils.sizeInKB(projectDir);
-        final long upperLimitInKB = (long) (FlowRunnerManager.this.projectDirMaxSizeInMb *
-            FlowRunnerManager.this.projectDirStartDeletionThreshold * 0.01 * 1024);
-        return dirSize >= upperLimitInKB;
-      } catch (final IOException e) {
-        logger.error(e);
-        return false;
-      }
-    }
-
-    private boolean shouldKeepCleanup(final File projectDir) {
-      final long dirSize;
-      try {
-        dirSize = FileIOUtils.sizeInKB(projectDir);
-        final long lowerLimitInKB = (long) (FlowRunnerManager.this.projectDirMaxSizeInMb *
-            FlowRunnerManager.this.projectDirStopDeletionThreshold * 0.01 * 1024);
-        return dirSize >= lowerLimitInKB;
-      } catch (final IOException e) {
-        logger.error(e);
-        return false;
-      }
-    }
-
-
-    /**
-     * delete least recently used projects
-     */
-    private void cleanLRUProjects() throws IOException {
-      if (shouldCleanup(FlowRunnerManager.this.projectDirectory)) {
-        final List<ProjectVersion> projectVersionList = new ArrayList<>(
-            FlowRunnerManager.this.installedProjects.values());
-
-        // sort project version by last creation time in ascending order
-        projectVersionList.sort((o1, o2) -> {
-          try {
-            final long creationTime1 = getCreationTime(o1.getInstalledDir());
-            final long creationTime2 = getCreationTime(o2.getInstalledDir());
-            if (creationTime1 < creationTime2) {
-              return -1;
-            } else if (creationTime1 > creationTime2) {
-              return 1;
-            } else {
-              return 0;
-            }
-          } catch (final IOException ex) {
-            logger.error("error reading project dir", ex);
-            return 0;
-          }
-        });
-
-        for (final ProjectVersion version : projectVersionList) {
-          if (!isActiveProject(version) && shouldKeepCleanup(FlowRunnerManager.this
-              .projectDirectory)) {
-            delete(version);
-          }
-        }
-      }
-    }
-
-    /**
-     * Delete the project dir associated with {@code version}.
-     * It first acquires object lock of {@code version} waiting for other threads creating
-     * execution dir to finish to avoid race condition. An example of race condition scenario:
-     * delete the dir of a project while an execution of a flow in the same project is being setup
-     * and the flow's execution dir is being created({@link FlowPreparer#setup}).
-     */
-    private void delete(final ProjectVersion version) {
-      synchronized (version) {
-        try {
-          deleteDirectory(version);
-        } catch (final IOException ex) {
-          logger.error(ex);
-        }
-      }
-    }
   }
 
 }