azkaban-aplcache

Refactor the code for getting OS free memory information (#1039) *

5/1/2017 9:52:20 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java b/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java
new file mode 100644
index 0000000..62743b4
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java
@@ -0,0 +1,117 @@
+package azkaban.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for getting system memory information
+ *
+ * Note:
+ * This check is designed for Linux only.
+ * Make sure to call {@link #doesMemInfoFileExist()} first before attempting to get memory information.
+ */
+class OsMemoryUtil {
+  private static final Logger logger = LoggerFactory.getLogger(SystemMemoryInfo.class);
+
+  // This file is used by Linux. It doesn't exist on Mac for example.
+  static final String MEM_INFO_FILE = "/proc/meminfo";
+
+  private final String[] MEM_KEYS;
+
+  OsMemoryUtil() {
+    MEM_KEYS = new String[]{"MemFree", "Buffers", "Cached", "SwapFree"};
+  }
+
+  /**
+   *
+   * @return true if the meminfo file exists.
+   */
+  boolean doesMemInfoFileExist() {
+    File f = new File(MEM_INFO_FILE);
+    return f.exists() && !f.isDirectory();
+  }
+
+  /**
+   * Includes OS cache and free swap.
+   * @return the total free memory size of the OS. 0 if there is an error.
+   */
+  long getOsTotalFreeMemorySize() {
+    List<String> lines;
+    // The file /proc/meminfo seems to contain only ASCII characters.
+    // The assumption is that the file is not too big. So it is simpler to read the whole file into memory.
+    try {
+      lines = Files.readAllLines(Paths.get(MEM_INFO_FILE), StandardCharsets.UTF_8);
+    } catch (IOException e) {
+      String errMsg = "Failed to open mem info file: " + MEM_INFO_FILE;
+      logger.warn(errMsg, e);
+      return 0;
+    }
+    return getOsTotalFreeMemorySizeFromStrings(lines);
+  }
+
+  /**
+   *
+   * @param lines text lines from the procinfo file
+   * @return the total size of free memory in kB. 0 if there is an error.
+   */
+  long getOsTotalFreeMemorySizeFromStrings(List<String> lines) {
+    long totalFree = 0;
+    int count = 0;
+
+    for (String line : lines) {
+      for (String keyName : MEM_KEYS) {
+        if (line.startsWith(keyName)) {
+          count++;
+          long size = parseMemoryLine(line);
+          if (size == 0) {
+            return 0;
+          }
+          totalFree += size;
+        }
+      }
+    }
+
+    int length = MEM_KEYS.length;
+    if (count != length) {
+      String errMsg = String.format("Expect %d keys in the meminfo file. Got %d. content: %s", length, count, lines);
+      logger.warn(errMsg);
+      totalFree = 0;
+    }
+    return totalFree;
+  }
+
+  /**
+   * Example file:
+   * $ cat /proc/meminfo
+   *   MemTotal:       65894008 kB
+   *   MemFree:        59400536 kB
+   *   Buffers:          409348 kB
+   *   Cached:          4290236 kB
+   *   SwapCached:            0 kB
+   *
+   * Make the method package private to make unit testing easier.
+   * Otherwise it can be made private.
+
+   * @param line the text for a memory usage statistics we are interested in
+   * @return size of the memory. unit kB. 0 if there is an error.
+   */
+  long parseMemoryLine(String line) {
+    int idx1 = line.indexOf(":");
+    int idx2 = line.lastIndexOf("kB");
+    String sizeString = line.substring(idx1 + 1, idx2 - 1).trim();
+    try {
+      return Long.parseLong(sizeString);
+    } catch (NumberFormatException e) {
+      String err = "Failed to parse the meminfo file. Line: " + line;
+      logger.warn(err);
+      return 0;
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
index 7b633e9..39a912f 100644
--- a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
+++ b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
@@ -1,17 +1,12 @@
 package azkaban.utils;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.slf4j.LoggerFactory;
 
-import org.apache.log4j.Logger;
 
 /**
- * @author wkang
  * This class is used to maintain system memory information. Processes utilizing
  * large amount of memory should consult this class to see if the system has enough
  * memory to proceed the operation.
@@ -22,34 +17,46 @@ import org.apache.log4j.Logger;
  * All the memory size used in this function is in KB
  */
 public class SystemMemoryInfo {
-  private static final Logger logger = Logger.getLogger(SystemMemoryInfo.class);
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SystemMemoryInfo.class);
 
-  private static String MEMINFO_FILE = "/proc/meminfo";
   private static boolean memCheckEnabled;
-  private static long freeMemAmount = 0;
-  private static final long LOW_MEM_THRESHOLD = 3L*1024L*1024L; //3 GB
+  private static final long LOW_MEM_THRESHOLD = 3L * 1024L * 1024L; //3 GB
+  // In case there is a problem reading the meminfo file, we want to "fail open".
+  private static long freeMemAmount = LOW_MEM_THRESHOLD * 100;
 
   private static ScheduledExecutorService scheduledExecutorService;
 
+  //todo HappyRay: switch to Guice
+  private static OsMemoryUtil util = new OsMemoryUtil();
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  // see http://errorprone.info/bugpattern/FutureReturnValueIgnored
+  // There is no need to check the returned future from scheduledExecutorService
+  // since we don't need to get a return value.
   public static void init(int memCheckInterval) {
-    File f = new File(MEMINFO_FILE);
-    memCheckEnabled = f.exists() && !f.isDirectory();
+    memCheckEnabled = util.doesMemInfoFileExist();
     if (memCheckEnabled) {
-      //initial reading of the mem info
-      readMemoryInfoFile();
-
       //schedule a thread to read it
-      logger.info(String.format("Scheduled thread to read /proc/meminfo every %d seconds", memCheckInterval));
+      logger.info(String.format("Scheduled thread to read memory info every %d seconds", memCheckInterval));
       scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
-      scheduledExecutorService.scheduleAtFixedRate(new MemoryInfoReader(), 0, memCheckInterval, TimeUnit.SECONDS);
+      /*
+      According to java.util.concurrent.Executors.newSingleThreadScheduledExecutor()
+      * (Note however that if this single
+     * thread terminates due to a failure during execution prior to
+     * shutdown, a new one will take its place if needed to execute
+     * subsequent tasks.)
+     * We don't have to worry about the update thread dying due to an uncaught exception.
+       */
+      scheduledExecutorService.scheduleAtFixedRate(SystemMemoryInfo::getFreeMemorySize, 0, memCheckInterval,
+          TimeUnit.SECONDS);
     } else {
-      logger.info("Cannot find /proc/meminfo, memory check will be disabled");
+      logger.info(String.format("Cannot find %s, memory check will be disabled", OsMemoryUtil.MEM_INFO_FILE));
     }
   }
 
   /**
-   * @param xms
-   * @param xmx
+   * @param xms Xms for the process
+   * @param xmx Xmx for the process
    * @return System can satisfy the memory request or not
    *
    * Given Xms/Xmx values (in kb) used by java process, determine if system can
@@ -62,34 +69,39 @@ public class SystemMemoryInfo {
 
     //too small amount of memory left, reject
     if (freeMemAmount < LOW_MEM_THRESHOLD) {
-      logger.info(String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb),  memory request declined.",
+      logger.info(
+          String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb),  memory request declined.",
               freeMemAmount, LOW_MEM_THRESHOLD));
       return false;
     }
 
     //let's get newest mem info
     if (freeMemAmount >= LOW_MEM_THRESHOLD && freeMemAmount < 2 * LOW_MEM_THRESHOLD) {
-      logger.info(String.format("Free memory amount (%d kb) is less than 2x low mem threshold (%d kb),  re-read /proc/meminfo",
-              freeMemAmount, LOW_MEM_THRESHOLD));
-      readMemoryInfoFile();
+      logger.info(String.format(
+          "Free memory amount (%d kb) is less than 2x low mem threshold (%d kb). Update the free memory amount",
+          freeMemAmount, LOW_MEM_THRESHOLD));
+      getFreeMemorySize();
     }
 
     //too small amount of memory left, reject
     if (freeMemAmount < LOW_MEM_THRESHOLD) {
-      logger.info(String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb),  memory request declined.",
+      logger.info(
+          String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb),  memory request declined.",
               freeMemAmount, LOW_MEM_THRESHOLD));
       return false;
     }
 
     if (freeMemAmount - xmx < LOW_MEM_THRESHOLD) {
-      logger.info(String.format("Free memory amount minus xmx (%d - %d kb) is less than low mem threshold (%d kb),  memory request declined.",
-              freeMemAmount, xmx, LOW_MEM_THRESHOLD));
+      logger.info(String.format(
+          "Free memory amount minus xmx (%d - %d kb) is less than low mem threshold (%d kb),  memory request declined.",
+          freeMemAmount, xmx, LOW_MEM_THRESHOLD));
       return false;
     }
 
     if (freeMemDecrAmt > 0) {
       freeMemAmount -= freeMemDecrAmt;
-      logger.info(String.format("Memory (%d kb) granted. Current free memory amount is %d kb", freeMemDecrAmt, freeMemAmount));
+      logger.info(
+          String.format("Memory (%d kb) granted. Current free memory amount is %d kb", freeMemDecrAmt, freeMemAmount));
     } else {
       freeMemAmount -= xms;
       logger.info(String.format("Memory (%d kb) granted. Current free memory amount is %d kb", xms, freeMemAmount));
@@ -102,72 +114,10 @@ public class SystemMemoryInfo {
     freeMemAmount = size;
   }
 
-  private static void readMemoryInfoFile() {
-    BufferedReader br = null;
-    try {
-      br = new BufferedReader(new FileReader(MEMINFO_FILE));
-
-      long sizeMemFree = 0;
-      long sizeBuffers = 0;
-      long sizeCached = 0;
-      long sizeSwapCached = 0;
-      int count = 0;
-      String line = br.readLine();
-      while (line != null) {
-        if (line.startsWith("MemFree:") || line.startsWith("Buffers:")
-            || line.startsWith("Cached") || line.startsWith("SwapCached")) {
-          int idx1 = line.indexOf(":");
-          int idx2 = line.lastIndexOf("kB");
-          String strSize = line.substring(idx1+1, idx2-1).trim();
-
-          if (line.startsWith("MemFree:")) {
-            sizeMemFree = Long.parseLong(strSize);
-          } else if (line.startsWith("Buffers:")) {
-            sizeBuffers = Long.parseLong(strSize);
-          } else if (line.startsWith("Cached:")) {
-            sizeCached = Long.parseLong(strSize);
-          } else if (line.startsWith("SwapCached:")) {
-            sizeSwapCached = Long.parseLong(strSize);
-          }
-
-          //all lines read
-          if (++count == 4) {
-            break;
-          }
-        }
-        line = br.readLine();
-      }
-
-      if (count < 4) {
-        logger.error("Error: less than 4 rows read from /proc/meminfo for free memory information");
-      }
-
-      long sizeTotal = sizeMemFree + sizeBuffers + sizeCached + sizeSwapCached;
-
-      if (sizeTotal > 0) {
-        updateFreeMemAmount(sizeTotal);
-      }
-    } catch (IOException e) {
-      logger.error("Exception in reading memory info file", e);
-    } finally {
-      try {
-        if (br != null) {
-          br.close();
-        }
-      } catch (IOException e) {
-        logger.error("Exception in closing the buffered reader", e);
-      }
-    }
-  }
-
-  static class MemoryInfoReader implements Runnable {
-    @Override
-    public void run() {
-      try {
-        readMemoryInfoFile();
-      } catch (Throwable t) {
-        logger.error("error calling readMemoryInfoFile", t);
-      }
+  private static void getFreeMemorySize() {
+    long freeMemorySize = util.getOsTotalFreeMemorySize();
+    if (freeMemorySize > 0) {
+      updateFreeMemAmount(freeMemorySize);
     }
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/utils/OsMemoryUtilTest.java b/azkaban-common/src/test/java/azkaban/utils/OsMemoryUtilTest.java
new file mode 100644
index 0000000..39390b0
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/OsMemoryUtilTest.java
@@ -0,0 +1,60 @@
+package azkaban.utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class OsMemoryUtilTest {
+  private OsMemoryUtil util = new OsMemoryUtil();
+
+  @Test
+  public void canReadMemInfoFileIfExists() {
+    if (util.doesMemInfoFileExist()) {
+      util.getOsTotalFreeMemorySize();
+    }
+  }
+
+  @Test
+  public void getOsTotalFreeMemorySize() {
+    List<String> lines =
+        Arrays.asList("MemFree:        1 kB", "Buffers:          2 kB", "Cached:          3 kB", "SwapFree:    4 kB",
+            "Foo: 10 kB");
+
+    long size = util.getOsTotalFreeMemorySizeFromStrings(lines);
+    assertEquals(10, size);
+  }
+
+  @Test
+  public void getOsTotalFreeMemorySizeMissingEntry() {
+    List<String> lines = Arrays.asList("MemFree:        1 kB", "Foo: 10 kB");
+
+    long size = util.getOsTotalFreeMemorySizeFromStrings(lines);
+    assertEquals(0, size);
+  }
+
+  @Test
+  public void getOsTotalFreeMemorySizeWrongEntry() {
+    List<String> lines = Collections.singletonList("MemFree:        foo kB");
+
+    long size = util.getOsTotalFreeMemorySizeFromStrings(lines);
+    assertEquals(0, size);
+  }
+
+  @Test
+  public void parseMemoryLine() {
+    String line = "MemFree:        500 kB";
+    long size = util.parseMemoryLine(line);
+    assertEquals(500, size);
+  }
+
+  @Test
+  public void parseIncorrectMemoryLine() {
+    String line = "MemFree:        ab kB";
+    long size = util.parseMemoryLine(line);
+    assertEquals(0, size);
+  }
+}