diff --git a/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java b/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java
new file mode 100644
index 0000000..62743b4
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/OsMemoryUtil.java
@@ -0,0 +1,117 @@
+package azkaban.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for getting system memory information
+ *
+ * Note:
+ * This check is designed for Linux only.
+ * Make sure to call {@link #doesMemInfoFileExist()} first before attempting to get memory information.
+ */
+class OsMemoryUtil {
+ private static final Logger logger = LoggerFactory.getLogger(SystemMemoryInfo.class);
+
+ // This file is used by Linux. It doesn't exist on Mac for example.
+ static final String MEM_INFO_FILE = "/proc/meminfo";
+
+ private final String[] MEM_KEYS;
+
+ OsMemoryUtil() {
+ MEM_KEYS = new String[]{"MemFree", "Buffers", "Cached", "SwapFree"};
+ }
+
+ /**
+ *
+ * @return true if the meminfo file exists.
+ */
+ boolean doesMemInfoFileExist() {
+ File f = new File(MEM_INFO_FILE);
+ return f.exists() && !f.isDirectory();
+ }
+
+ /**
+ * Includes OS cache and free swap.
+ * @return the total free memory size of the OS. 0 if there is an error.
+ */
+ long getOsTotalFreeMemorySize() {
+ List<String> lines;
+ // The file /proc/meminfo seems to contain only ASCII characters.
+ // The assumption is that the file is not too big. So it is simpler to read the whole file into memory.
+ try {
+ lines = Files.readAllLines(Paths.get(MEM_INFO_FILE), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ String errMsg = "Failed to open mem info file: " + MEM_INFO_FILE;
+ logger.warn(errMsg, e);
+ return 0;
+ }
+ return getOsTotalFreeMemorySizeFromStrings(lines);
+ }
+
+ /**
+ *
+ * @param lines text lines from the procinfo file
+ * @return the total size of free memory in kB. 0 if there is an error.
+ */
+ long getOsTotalFreeMemorySizeFromStrings(List<String> lines) {
+ long totalFree = 0;
+ int count = 0;
+
+ for (String line : lines) {
+ for (String keyName : MEM_KEYS) {
+ if (line.startsWith(keyName)) {
+ count++;
+ long size = parseMemoryLine(line);
+ if (size == 0) {
+ return 0;
+ }
+ totalFree += size;
+ }
+ }
+ }
+
+ int length = MEM_KEYS.length;
+ if (count != length) {
+ String errMsg = String.format("Expect %d keys in the meminfo file. Got %d. content: %s", length, count, lines);
+ logger.warn(errMsg);
+ totalFree = 0;
+ }
+ return totalFree;
+ }
+
+ /**
+ * Example file:
+ * $ cat /proc/meminfo
+ * MemTotal: 65894008 kB
+ * MemFree: 59400536 kB
+ * Buffers: 409348 kB
+ * Cached: 4290236 kB
+ * SwapCached: 0 kB
+ *
+ * Make the method package private to make unit testing easier.
+ * Otherwise it can be made private.
+
+ * @param line the text for a memory usage statistics we are interested in
+ * @return size of the memory. unit kB. 0 if there is an error.
+ */
+ long parseMemoryLine(String line) {
+ int idx1 = line.indexOf(":");
+ int idx2 = line.lastIndexOf("kB");
+ String sizeString = line.substring(idx1 + 1, idx2 - 1).trim();
+ try {
+ return Long.parseLong(sizeString);
+ } catch (NumberFormatException e) {
+ String err = "Failed to parse the meminfo file. Line: " + line;
+ logger.warn(err);
+ return 0;
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
index 7b633e9..39a912f 100644
--- a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
+++ b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
@@ -1,17 +1,12 @@
package azkaban.utils;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.slf4j.LoggerFactory;
-import org.apache.log4j.Logger;
/**
- * @author wkang
* This class is used to maintain system memory information. Processes utilizing
* large amount of memory should consult this class to see if the system has enough
* memory to proceed the operation.
@@ -22,34 +17,46 @@ import org.apache.log4j.Logger;
* All the memory size used in this function is in KB
*/
public class SystemMemoryInfo {
- private static final Logger logger = Logger.getLogger(SystemMemoryInfo.class);
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SystemMemoryInfo.class);
- private static String MEMINFO_FILE = "/proc/meminfo";
private static boolean memCheckEnabled;
- private static long freeMemAmount = 0;
- private static final long LOW_MEM_THRESHOLD = 3L*1024L*1024L; //3 GB
+ private static final long LOW_MEM_THRESHOLD = 3L * 1024L * 1024L; //3 GB
+ // In case there is a problem reading the meminfo file, we want to "fail open".
+ private static long freeMemAmount = LOW_MEM_THRESHOLD * 100;
private static ScheduledExecutorService scheduledExecutorService;
+ //todo HappyRay: switch to Guice
+ private static OsMemoryUtil util = new OsMemoryUtil();
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ // see http://errorprone.info/bugpattern/FutureReturnValueIgnored
+ // There is no need to check the returned future from scheduledExecutorService
+ // since we don't need to get a return value.
public static void init(int memCheckInterval) {
- File f = new File(MEMINFO_FILE);
- memCheckEnabled = f.exists() && !f.isDirectory();
+ memCheckEnabled = util.doesMemInfoFileExist();
if (memCheckEnabled) {
- //initial reading of the mem info
- readMemoryInfoFile();
-
//schedule a thread to read it
- logger.info(String.format("Scheduled thread to read /proc/meminfo every %d seconds", memCheckInterval));
+ logger.info(String.format("Scheduled thread to read memory info every %d seconds", memCheckInterval));
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
- scheduledExecutorService.scheduleAtFixedRate(new MemoryInfoReader(), 0, memCheckInterval, TimeUnit.SECONDS);
+ /*
+ According to java.util.concurrent.Executors.newSingleThreadScheduledExecutor()
+ * (Note however that if this single
+ * thread terminates due to a failure during execution prior to
+ * shutdown, a new one will take its place if needed to execute
+ * subsequent tasks.)
+ * We don't have to worry about the update thread dying due to an uncaught exception.
+ */
+ scheduledExecutorService.scheduleAtFixedRate(SystemMemoryInfo::getFreeMemorySize, 0, memCheckInterval,
+ TimeUnit.SECONDS);
} else {
- logger.info("Cannot find /proc/meminfo, memory check will be disabled");
+ logger.info(String.format("Cannot find %s, memory check will be disabled", OsMemoryUtil.MEM_INFO_FILE));
}
}
/**
- * @param xms
- * @param xmx
+ * @param xms Xms for the process
+ * @param xmx Xmx for the process
* @return System can satisfy the memory request or not
*
* Given Xms/Xmx values (in kb) used by java process, determine if system can
@@ -62,34 +69,39 @@ public class SystemMemoryInfo {
//too small amount of memory left, reject
if (freeMemAmount < LOW_MEM_THRESHOLD) {
- logger.info(String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb), memory request declined.",
+ logger.info(
+ String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb), memory request declined.",
freeMemAmount, LOW_MEM_THRESHOLD));
return false;
}
//let's get newest mem info
if (freeMemAmount >= LOW_MEM_THRESHOLD && freeMemAmount < 2 * LOW_MEM_THRESHOLD) {
- logger.info(String.format("Free memory amount (%d kb) is less than 2x low mem threshold (%d kb), re-read /proc/meminfo",
- freeMemAmount, LOW_MEM_THRESHOLD));
- readMemoryInfoFile();
+ logger.info(String.format(
+ "Free memory amount (%d kb) is less than 2x low mem threshold (%d kb). Update the free memory amount",
+ freeMemAmount, LOW_MEM_THRESHOLD));
+ getFreeMemorySize();
}
//too small amount of memory left, reject
if (freeMemAmount < LOW_MEM_THRESHOLD) {
- logger.info(String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb), memory request declined.",
+ logger.info(
+ String.format("Free memory amount (%d kb) is less than low mem threshold (%d kb), memory request declined.",
freeMemAmount, LOW_MEM_THRESHOLD));
return false;
}
if (freeMemAmount - xmx < LOW_MEM_THRESHOLD) {
- logger.info(String.format("Free memory amount minus xmx (%d - %d kb) is less than low mem threshold (%d kb), memory request declined.",
- freeMemAmount, xmx, LOW_MEM_THRESHOLD));
+ logger.info(String.format(
+ "Free memory amount minus xmx (%d - %d kb) is less than low mem threshold (%d kb), memory request declined.",
+ freeMemAmount, xmx, LOW_MEM_THRESHOLD));
return false;
}
if (freeMemDecrAmt > 0) {
freeMemAmount -= freeMemDecrAmt;
- logger.info(String.format("Memory (%d kb) granted. Current free memory amount is %d kb", freeMemDecrAmt, freeMemAmount));
+ logger.info(
+ String.format("Memory (%d kb) granted. Current free memory amount is %d kb", freeMemDecrAmt, freeMemAmount));
} else {
freeMemAmount -= xms;
logger.info(String.format("Memory (%d kb) granted. Current free memory amount is %d kb", xms, freeMemAmount));
@@ -102,72 +114,10 @@ public class SystemMemoryInfo {
freeMemAmount = size;
}
- private static void readMemoryInfoFile() {
- BufferedReader br = null;
- try {
- br = new BufferedReader(new FileReader(MEMINFO_FILE));
-
- long sizeMemFree = 0;
- long sizeBuffers = 0;
- long sizeCached = 0;
- long sizeSwapCached = 0;
- int count = 0;
- String line = br.readLine();
- while (line != null) {
- if (line.startsWith("MemFree:") || line.startsWith("Buffers:")
- || line.startsWith("Cached") || line.startsWith("SwapCached")) {
- int idx1 = line.indexOf(":");
- int idx2 = line.lastIndexOf("kB");
- String strSize = line.substring(idx1+1, idx2-1).trim();
-
- if (line.startsWith("MemFree:")) {
- sizeMemFree = Long.parseLong(strSize);
- } else if (line.startsWith("Buffers:")) {
- sizeBuffers = Long.parseLong(strSize);
- } else if (line.startsWith("Cached:")) {
- sizeCached = Long.parseLong(strSize);
- } else if (line.startsWith("SwapCached:")) {
- sizeSwapCached = Long.parseLong(strSize);
- }
-
- //all lines read
- if (++count == 4) {
- break;
- }
- }
- line = br.readLine();
- }
-
- if (count < 4) {
- logger.error("Error: less than 4 rows read from /proc/meminfo for free memory information");
- }
-
- long sizeTotal = sizeMemFree + sizeBuffers + sizeCached + sizeSwapCached;
-
- if (sizeTotal > 0) {
- updateFreeMemAmount(sizeTotal);
- }
- } catch (IOException e) {
- logger.error("Exception in reading memory info file", e);
- } finally {
- try {
- if (br != null) {
- br.close();
- }
- } catch (IOException e) {
- logger.error("Exception in closing the buffered reader", e);
- }
- }
- }
-
- static class MemoryInfoReader implements Runnable {
- @Override
- public void run() {
- try {
- readMemoryInfoFile();
- } catch (Throwable t) {
- logger.error("error calling readMemoryInfoFile", t);
- }
+ private static void getFreeMemorySize() {
+ long freeMemorySize = util.getOsTotalFreeMemorySize();
+ if (freeMemorySize > 0) {
+ updateFreeMemAmount(freeMemorySize);
}
}
diff --git a/azkaban-common/src/test/java/azkaban/utils/OsMemoryUtilTest.java b/azkaban-common/src/test/java/azkaban/utils/OsMemoryUtilTest.java
new file mode 100644
index 0000000..39390b0
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/utils/OsMemoryUtilTest.java
@@ -0,0 +1,60 @@
+package azkaban.utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class OsMemoryUtilTest {
+ private OsMemoryUtil util = new OsMemoryUtil();
+
+ @Test
+ public void canReadMemInfoFileIfExists() {
+ if (util.doesMemInfoFileExist()) {
+ util.getOsTotalFreeMemorySize();
+ }
+ }
+
+ @Test
+ public void getOsTotalFreeMemorySize() {
+ List<String> lines =
+ Arrays.asList("MemFree: 1 kB", "Buffers: 2 kB", "Cached: 3 kB", "SwapFree: 4 kB",
+ "Foo: 10 kB");
+
+ long size = util.getOsTotalFreeMemorySizeFromStrings(lines);
+ assertEquals(10, size);
+ }
+
+ @Test
+ public void getOsTotalFreeMemorySizeMissingEntry() {
+ List<String> lines = Arrays.asList("MemFree: 1 kB", "Foo: 10 kB");
+
+ long size = util.getOsTotalFreeMemorySizeFromStrings(lines);
+ assertEquals(0, size);
+ }
+
+ @Test
+ public void getOsTotalFreeMemorySizeWrongEntry() {
+ List<String> lines = Collections.singletonList("MemFree: foo kB");
+
+ long size = util.getOsTotalFreeMemorySizeFromStrings(lines);
+ assertEquals(0, size);
+ }
+
+ @Test
+ public void parseMemoryLine() {
+ String line = "MemFree: 500 kB";
+ long size = util.parseMemoryLine(line);
+ assertEquals(500, size);
+ }
+
+ @Test
+ public void parseIncorrectMemoryLine() {
+ String line = "MemFree: ab kB";
+ long size = util.parseMemoryLine(line);
+ assertEquals(0, size);
+ }
+}