azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 5942088..9298358 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -38,6 +38,8 @@ public class ProcessJob extends AbstractProcessJob {
   public static final String COMMAND = "command";
   private static final long KILL_TIME_MS = 5000;
   private volatile AzkabanProcess process;
+  private static final String MEMCHECK_ENABLED = "memCheck.enabled";
+  private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
 
   public ProcessJob(final String jobId, final Props sysProps,
       final Props jobProps, final Logger log) {
@@ -52,11 +54,14 @@ public class ProcessJob extends AbstractProcessJob {
       handleError("Bad property definition! " + e.getMessage(), e);
     }
 
-    Pair<Long, Long> memPair = getProcMemoryRequirement();
-    boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond());
-    if (!isMemGranted) {
-      throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
-              memPair.getFirst(), memPair.getSecond(), getId()));
+    if (sysProps.getBoolean(MEMCHECK_ENABLED, true)) {
+      long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
+      Pair<Long, Long> memPair = getProcMemoryRequirement();
+      boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+      if (!isMemGranted) {
+        throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+                memPair.getFirst(), memPair.getSecond(), getId()));
+      }
     }
 
     List<String> commands = null;
diff --git a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
index cf0e746..144530c 100644
--- a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
+++ b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
@@ -26,21 +26,17 @@ public class SystemMemoryInfo {
 
   private static String MEMINFO_FILE = "/proc/meminfo"; 
   private static boolean memCheckEnabled;
-  private static boolean memInfoExists;
   private static long freeMemAmount = 0;
-  private static long freeMemDecrAmt = 0;
   private static final long LOW_MEM_THRESHOLD = 3L*1024L*1024L; //3 GB
 
   private static ScheduledExecutorService scheduledExecutorService;
 
-  public static void init(boolean memChkEnabled, long memDecrAmt) {
+  public static void init() {
     File f = new File(MEMINFO_FILE);
-    memInfoExists = f.exists() && !f.isDirectory();
-    memCheckEnabled = memChkEnabled && memInfoExists;
+    memCheckEnabled = f.exists() && !f.isDirectory();
     if (memCheckEnabled) {
       //initial reading of the mem info
       readMemoryInfoFile();
-      freeMemDecrAmt = memDecrAmt;
 
       //schedule a thread to read it
       logger.info("Scheduled thread to read /proc/meminfo every 30 seconds");
@@ -57,7 +53,7 @@ public class SystemMemoryInfo {
    * Given Xms/Xmx values (in kb) used by java process, determine if system can
    * satisfy the memory request
    */
-  public synchronized static boolean canSystemGrantMemory(long xms, long xmx) {
+  public synchronized static boolean canSystemGrantMemory(long xms, long xmx, long freeMemDecrAmt) {
     if (!memCheckEnabled) {
       return true;
     }
@@ -91,11 +87,12 @@ public class SystemMemoryInfo {
 
     if (freeMemDecrAmt > 0) {
       freeMemAmount -= freeMemDecrAmt;
+      logger.info(String.format("Memory (%d kb) granted. Current free memory amount is %d kb", freeMemDecrAmt, freeMemAmount));
     } else {
       freeMemAmount -= xms;
+      logger.info(String.format("Memory (%d kb) granted. Current free memory amount is %d kb", xms, freeMemAmount));
     }
     
-    logger.info("Current free memory amount is " + freeMemAmount);
     return true;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index fb65ed2..7227b0b 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -433,6 +433,8 @@ public class Utils {
       sizeInKb = size * 1024L * 1024L;
     } else if (strMemSize.endsWith("m") || strMemSize.endsWith("M")) {
       sizeInKb = size * 1024L;
+    } else if (strMemSize.endsWith("k") || strMemSize.endsWith("K")) {
+      sizeInKb = size;
     } else {
       sizeInKb = size / 1024L;
     }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 677ae8c..80d2554 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -125,8 +125,7 @@ public class AzkabanExecutorServer {
     configureMBeanServer();
     configureMetricReports();
 
-    SystemMemoryInfo.init(props.getBoolean("executor.memCheck.enabled", true),
-            props.getLong("executor.memCheck.freeMemDecrAmt", 0));
+    SystemMemoryInfo.init();
 
     try {
       server.start();