azkaban-aplcache

project LRU cache part 1 (#1848) The problem this PR targets

7/17/2018 3:03:52 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index a85faf6..e781732 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -89,7 +89,6 @@ public class Constants {
   // The flow exec id for a flow trigger instance unable to trigger a flow yet
   public static final int FAILED_EXEC_ID = -2;
 
-
   public static class ConfigurationKeys {
 
     // Configures Azkaban Flow Version in project YAML file
diff --git a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
index 8ddecaa..c2e270e 100644
--- a/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
@@ -18,6 +18,7 @@ package azkaban.utils;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
@@ -27,9 +28,11 @@ import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
@@ -69,6 +72,39 @@ public class FileIOUtils {
     return true;
   }
 
+
+  /**
+   * Dumps a number into a new file.
+   *
+   * @param filePath the target file
+   * @param num the number to dump
+   * @throws IOException if file already exists
+   */
+  public static void dumpNumberToFile(final Path filePath, final long num) throws IOException {
+    try (BufferedWriter writer = Files
+        .newBufferedWriter(filePath, StandardCharsets.UTF_8)) {
+      writer.write(String.valueOf(num));
+    } catch (final IOException e) {
+      logger.error(String.format("Failed to write the number %s to the file %s", num, filePath), e);
+      throw e;
+    }
+  }
+
+  /**
+   * Reads a number from a file.
+   *
+   * @param filePath the target file
+   */
+  public static long readNumberFromFile(final Path filePath)
+      throws IOException, NumberFormatException {
+    final List<String> allLines = Files.readAllLines(filePath);
+    if (!allLines.isEmpty()) {
+      return Long.parseLong(allLines.get(0));
+    } else {
+      throw new NumberFormatException("unable to parse empty file " + filePath.toString());
+    }
+  }
+
   public static String getSourcePathFromClass(final Class<?> containedClass) {
     File file =
         new File(containedClass.getProtectionDomain().getCodeSource()
diff --git a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
index 376b573..43565d9 100644
--- a/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/FileIOUtilsTest.java
@@ -109,6 +109,31 @@ public class FileIOUtilsTest {
     FileUtils.deleteDirectory(this.destDir);
   }
 
+
+  private File dumpNumberToTempFile(final String fileName, final long num) throws IOException {
+    final File fileToDump = this.temp.newFile(fileName);
+    FileIOUtils.dumpNumberToFile(fileToDump.toPath(), num);
+    return fileToDump;
+  }
+
+  @Test
+  public void testDumpNumberToFileAndReadFromFile() throws IOException {
+    final String fileName = "number";
+    final long num = 94127;
+    final File fileToDump = dumpNumberToTempFile(fileName, num);
+    assertThat(FileIOUtils.readNumberFromFile(fileToDump.toPath())).isEqualTo(num);
+  }
+
+  @Test
+  public void testDumpNumberToExistingFile() throws IOException {
+    final String fileName = "number";
+    final long firstNum = 94127;
+    final long secondNum = 94128;
+    dumpNumberToTempFile(fileName, firstNum);
+    assertThatThrownBy(() -> dumpNumberToTempFile(fileName, secondNum))
+        .isInstanceOf(IOException.class).hasMessageContaining("already exists");
+  }
+
   @Test
   public void testHardlinkCopy() throws IOException {
     FileIOUtils.createDeepHardlink(this.sourceDir, this.destDir);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 7dc82fe..f50f5c9 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -44,12 +44,12 @@ import azkaban.metric.MetricReportManager;
 import azkaban.metric.inmemoryemitter.InMemoryMetricEmitter;
 import azkaban.metrics.MetricsManager;
 import azkaban.server.AzkabanServer;
+import azkaban.utils.FileIOUtils;
 import azkaban.utils.Props;
 import azkaban.utils.StdOutErrRedirect;
 import azkaban.utils.Utils;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -58,7 +58,6 @@ import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
@@ -275,16 +274,9 @@ public class AzkabanExecutorServer {
 
   private void dumpPortToFile() throws IOException {
     // By default this should write to the working directory
-    final String portFile = this.props
+    final String portFileName = this.props
         .getString(Constants.AZKABAN_EXECUTOR_PORT_FILE, AZKABAN_EXECUTOR_PORT_FILENAME);
-    try (BufferedWriter writer = Files
-        .newBufferedWriter(Paths.get(portFile), StandardCharsets.UTF_8)) {
-      writer.write(String.valueOf(getPort()));
-      writer.write("\n");
-    } catch (final IOException e) {
-      logger.error("Failed to write the port number to a file", e);
-      throw e;
-    }
+    FileIOUtils.dumpNumberToFile(Paths.get(portFileName), getPort());
   }
 
   private void configureJobCallback(final Props props) {
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
index 243a93c..47010d5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
@@ -31,7 +31,10 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.FileTime;
 import java.util.Map;
 import java.util.zip.ZipFile;
 import org.apache.commons.io.FileUtils;
@@ -40,13 +43,13 @@ import org.apache.log4j.Logger;
 
 public class FlowPreparer {
 
+  // Name of the file which keeps project directory size
+  static final String PROJECT_DIR_SIZE_FILE_NAME = "___azkaban_project_dir_size_in_bytes___";
   private static final Logger log = Logger.getLogger(FlowPreparer.class);
-
   // TODO spyne: move to config class
   private final File executionsDir;
   // TODO spyne: move to config class
   private final File projectsDir;
-
   private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
   private final StorageManager storageManager;
 
@@ -89,6 +92,21 @@ public class FlowPreparer {
   }
 
   /**
+   * Touch the file if it exists.
+   *
+   * @param path path to the target file
+   */
+  @VisibleForTesting
+  void touchIfExists(final Path path) {
+    try {
+      Files.setLastModifiedTime(path, FileTime.fromMillis(System.currentTimeMillis()));
+    } catch (final IOException ex) {
+      log.error(ex);
+    }
+  }
+
+
+  /**
    * Prepare the project directory.
    *
    * @param pv ProjectVersion object
@@ -107,6 +125,8 @@ public class FlowPreparer {
     // If directory exists. Assume its prepared and skip.
     if (pv.getInstalledDir().exists()) {
       log.info("Project already cached. Skipping download. " + pv);
+      touchIfExists(
+          Paths.get(pv.getInstalledDir().getPath(), PROJECT_DIR_SIZE_FILE_NAME));
       return;
     }
 
@@ -127,21 +147,36 @@ public class FlowPreparer {
       final File zipFile = requireNonNull(projectFileHandler.getLocalFile());
       final ZipFile zip = new ZipFile(zipFile);
       Utils.unzip(zip, tempDir);
-
+      updateDirSize(tempDir, pv);
       Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
-
-      log.warn(String.format("Project Preparation complete. [%s]", pv));
+      log.warn(String.format("Project preparation completes. [%s]", pv));
     } finally {
-
       if (projectFileHandler != null) {
         projectFileHandler.deleteLocalFile();
       }
-
       // Clean up: Remove tempDir if exists
       FileUtils.deleteDirectory(tempDir);
     }
   }
 
+  /**
+   * Creates a file which keeps the size of {@param dir} in bytes inside the {@param dir} and sets
+   * the dirSize for {@param pv}.
+   *
+   * @param dir the directory whose size needs to be kept in the file to be created.
+   * @param pv the projectVersion whose size needs to updated.
+   */
+  private void updateDirSize(final File dir, final ProjectVersion pv) {
+    final long sizeInByte = FileUtils.sizeOfDirectory(dir);
+    pv.setDirSize(sizeInByte);
+    try {
+      FileIOUtils.dumpNumberToFile(Paths.get(dir.getPath(), PROJECT_DIR_SIZE_FILE_NAME),
+          sizeInByte);
+    } catch (final IOException e) {
+      log.error(e);
+    }
+  }
+
   private void copyCreateHardlinkDirectory(final File projectDir, final File execDir)
       throws IOException {
     FileIOUtils.createDeepHardlink(projectDir, execDir);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
index e5c1407..a97ca35 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ProjectVersion.java
@@ -27,6 +27,7 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
   private final int version;
 
   private File installedDir;
+  private Long dirSize;
 
   public ProjectVersion(final int projectId, final int version) {
     checkArgument(projectId > 0);
@@ -41,6 +42,14 @@ public class ProjectVersion implements Comparable<ProjectVersion> {
     this.installedDir = installedDir;
   }
 
+  public Long getDirSize() {
+    return this.dirSize;
+  }
+
+  public void setDirSize(final Long dirSize) {
+    this.dirSize = dirSize;
+  }
+
   public int getProjectId() {
     return this.projectId;
   }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
index 84cd4e1..7af3573 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowPreparerTest.java
@@ -17,15 +17,23 @@
 
 package azkaban.execapp;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.project.ProjectFileHandler;
 import azkaban.storage.StorageManager;
+import azkaban.utils.FileIOUtils;
 import azkaban.utils.Pair;
 import java.io.File;
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
@@ -61,8 +69,9 @@ public class FlowPreparerTest {
     final StorageManager storageManager = mock(StorageManager.class);
     when(storageManager.getProjectFile(12, 34)).thenReturn(projectFileHandler);
 
-    this.instance = new FlowPreparer(storageManager, this.executionsDir, this.projectsDir,
-        this.installedProjects);
+    this.instance = spy(new FlowPreparer(storageManager, this.executionsDir, this.projectsDir,
+        this.installedProjects));
+    doNothing().when(this.instance).touchIfExists(any());
   }
 
   @After
@@ -77,11 +86,33 @@ public class FlowPreparerTest {
         new File(this.projectsDir, "sample_project_01"));
     this.instance.setupProject(pv);
 
+    final long actualDirSize = 259;
+
+    assertThat(pv.getDirSize()).isEqualTo(actualDirSize);
+    assertThat(FileIOUtils.readNumberFromFile(
+        Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME)))
+        .isEqualTo(actualDirSize);
     assertTrue(pv.getInstalledDir().exists());
     assertTrue(new File(pv.getInstalledDir(), "sample_flow_01").exists());
   }
 
   @Test
+  public void testSetupProjectTouchesTheDirSizeFile() throws Exception {
+    //verifies setup project touches project dir size file.
+    final ProjectVersion pv = new ProjectVersion(12, 34,
+        new File(this.projectsDir, "sample_project_01"));
+
+    //setup project 1st time will not do touch
+    this.instance.setupProject(pv);
+    verify(this.instance, never()).touchIfExists(
+        Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME));
+
+    this.instance.setupProject(pv);
+    verify(this.instance).touchIfExists(
+        Paths.get(pv.getInstalledDir().getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME));
+  }
+
+  @Test
   public void testSetupFlow() throws Exception {
     final ExecutableFlow executableFlow = mock(ExecutableFlow.class);
     when(executableFlow.getExecutionId()).thenReturn(12345);