Details
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 5942088..9298358 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -38,6 +38,8 @@ public class ProcessJob extends AbstractProcessJob {
public static final String COMMAND = "command";
private static final long KILL_TIME_MS = 5000;
private volatile AzkabanProcess process;
+ private static final String MEMCHECK_ENABLED = "memCheck.enabled";
+ private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
public ProcessJob(final String jobId, final Props sysProps,
final Props jobProps, final Logger log) {
@@ -52,11 +54,14 @@ public class ProcessJob extends AbstractProcessJob {
handleError("Bad property definition! " + e.getMessage(), e);
}
- Pair<Long, Long> memPair = getProcMemoryRequirement();
- boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond());
- if (!isMemGranted) {
- throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
- memPair.getFirst(), memPair.getSecond(), getId()));
+ if (sysProps.getBoolean(MEMCHECK_ENABLED, true)) {
+ long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
+ Pair<Long, Long> memPair = getProcMemoryRequirement();
+ boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+ if (!isMemGranted) {
+ throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+ memPair.getFirst(), memPair.getSecond(), getId()));
+ }
}
List<String> commands = null;
diff --git a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
index cf0e746..144530c 100644
--- a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
+++ b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
@@ -26,21 +26,17 @@ public class SystemMemoryInfo {
private static String MEMINFO_FILE = "/proc/meminfo";
private static boolean memCheckEnabled;
- private static boolean memInfoExists;
private static long freeMemAmount = 0;
- private static long freeMemDecrAmt = 0;
private static final long LOW_MEM_THRESHOLD = 3L*1024L*1024L; //3 GB
private static ScheduledExecutorService scheduledExecutorService;
- public static void init(boolean memChkEnabled, long memDecrAmt) {
+ public static void init() {
File f = new File(MEMINFO_FILE);
- memInfoExists = f.exists() && !f.isDirectory();
- memCheckEnabled = memChkEnabled && memInfoExists;
+ memCheckEnabled = f.exists() && !f.isDirectory();
if (memCheckEnabled) {
//initial reading of the mem info
readMemoryInfoFile();
- freeMemDecrAmt = memDecrAmt;
//schedule a thread to read it
logger.info("Scheduled thread to read /proc/meminfo every 30 seconds");
@@ -57,7 +53,7 @@ public class SystemMemoryInfo {
* Given Xms/Xmx values (in kb) used by java process, determine if system can
* satisfy the memory request
*/
- public synchronized static boolean canSystemGrantMemory(long xms, long xmx) {
+ public synchronized static boolean canSystemGrantMemory(long xms, long xmx, long freeMemDecrAmt) {
if (!memCheckEnabled) {
return true;
}
@@ -91,11 +87,12 @@ public class SystemMemoryInfo {
if (freeMemDecrAmt > 0) {
freeMemAmount -= freeMemDecrAmt;
+ logger.info(String.format("Memory (%d kb) granted. Current free memory amount is %d kb", freeMemDecrAmt, freeMemAmount));
} else {
freeMemAmount -= xms;
+ logger.info(String.format("Memory (%d kb) granted. Current free memory amount is %d kb", xms, freeMemAmount));
}
- logger.info("Current free memory amount is " + freeMemAmount);
return true;
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index fb65ed2..7227b0b 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -433,6 +433,8 @@ public class Utils {
sizeInKb = size * 1024L * 1024L;
} else if (strMemSize.endsWith("m") || strMemSize.endsWith("M")) {
sizeInKb = size * 1024L;
+ } else if (strMemSize.endsWith("k") || strMemSize.endsWith("K")) {
+ sizeInKb = size;
} else {
sizeInKb = size / 1024L;
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 677ae8c..80d2554 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -125,8 +125,7 @@ public class AzkabanExecutorServer {
configureMBeanServer();
configureMetricReports();
- SystemMemoryInfo.init(props.getBoolean("executor.memCheck.enabled", true),
- props.getLong("executor.memCheck.freeMemDecrAmt", 0));
+ SystemMemoryInfo.init();
try {
server.start();