Details
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index a36d33d..13959f8 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -152,8 +152,8 @@ public class JavaProcessJob extends ProcessJob {
protected Pair<Long, Long> getProcMemoryRequirement() {
String strInitMemSize = getInitialMemorySize();
String strMaxMemSize = getMaxMemorySize();
- long initMemSize = azkaban.utils.Utils.parseMemString2KB(strInitMemSize);
- long maxMemSize = azkaban.utils.Utils.parseMemString2KB(strMaxMemSize);
+ long initMemSize = azkaban.utils.Utils.parseMemString(strInitMemSize);
+ long maxMemSize = azkaban.utils.Utils.parseMemString(strMaxMemSize);
return new Pair<Long, Long>(initMemSize, maxMemSize);
}
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 43f26f3..5942088 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -53,8 +53,8 @@ public class ProcessJob extends AbstractProcessJob {
}
Pair<Long, Long> memPair = getProcMemoryRequirement();
- boolean memGranted = SystemMemoryInfo.requestMemory(memPair.getFirst(), memPair.getSecond());
- if (!memGranted) {
+ boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond());
+ if (!isMemGranted) {
throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
memPair.getFirst(), memPair.getSecond(), getId()));
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
index 36ac7ca..1a6610e 100644
--- a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
@@ -379,18 +379,18 @@ public class DirectoryFlowLoader implements ProjectValidator {
private void jobPropertiesCheck() {
String maxXms = props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
String maxXmx = props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
- long sizeMaxXms = azkaban.utils.Utils.parseMemString2KB(maxXms);
- long sizeMaxXmx = azkaban.utils.Utils.parseMemString2KB(maxXmx);
+ long sizeMaxXms = azkaban.utils.Utils.parseMemString(maxXms);
+ long sizeMaxXmx = azkaban.utils.Utils.parseMemString(maxXmx);
for (String jobName : jobPropsMap.keySet()) {
- Props jobProps = jobPropsMap.get(jobName);
- String xms = jobProps.getString(XMS, null);
- if (xms != null && azkaban.utils.Utils.parseMemString2KB(xms) > sizeMaxXms) {
+ Props resolvedJobProps = PropsUtils.resolveProps(jobPropsMap.get(jobName));
+ String xms = resolvedJobProps.getString(XMS, null);
+ if (xms != null && azkaban.utils.Utils.parseMemString(xms) > sizeMaxXms) {
errors.add(String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
jobName, maxXms));
}
- String xmx = jobProps.getString(XMX, null);
- if (xmx != null && azkaban.utils.Utils.parseMemString2KB(xmx) > sizeMaxXmx) {
+ String xmx = resolvedJobProps.getString(XMX, null);
+ if (xmx != null && azkaban.utils.Utils.parseMemString(xmx) > sizeMaxXmx) {
errors.add(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
jobName, maxXmx));
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
index aeed175..cf0e746 100644
--- a/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
+++ b/azkaban-common/src/main/java/azkaban/utils/SystemMemoryInfo.java
@@ -49,7 +49,15 @@ public class SystemMemoryInfo {
}
}
- public synchronized static boolean requestMemory(long xms, long xmx) {
+ /**
+ * @param xms
+ * @param xmx
+ * @return System can satisfy the memory request or not
+ *
+ * Given Xms/Xmx values (in kb) used by java process, determine if system can
+ * satisfy the memory request
+ */
+ public synchronized static boolean canSystemGrantMemory(long xms, long xmx) {
if (!memCheckEnabled) {
return true;
}
@@ -159,7 +167,11 @@ public class SystemMemoryInfo {
static class MemoryInfoReader implements Runnable {
@Override
public void run() {
- readMemoryInfoFile();
+ try {
+ readMemoryInfoFile();
+ } catch (Throwable t) {
+ logger.error("error calling readMemoryInfoFile", t);
+ }
}
}
}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/utils/Utils.java b/azkaban-common/src/main/java/azkaban/utils/Utils.java
index ed9fc52..fb65ed2 100644
--- a/azkaban-common/src/main/java/azkaban/utils/Utils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/Utils.java
@@ -409,7 +409,11 @@ public class Utils {
return periodStr;
}
- public static long parseMemString2KB(String strMemSize) {
+ /**
+ * @param strMemSize : memory string in the format such as 1G, 500M, 3000K, 5000
+ * @return : long value of memory amount in kb
+ */
+ public static long parseMemString(String strMemSize) {
if (strMemSize == null) {
return 0L;
}
@@ -419,7 +423,9 @@ public class Utils {
|| strMemSize.endsWith("m") || strMemSize.endsWith("M")
|| strMemSize.endsWith("k") || strMemSize.endsWith("K")) {
String strSize = strMemSize.substring(0, strMemSize.length() - 1);
- size = Long.parseLong(strSize);
+ size = Long.parseLong(strSize);
+ } else {
+ size = Long.parseLong(strMemSize);
}
long sizeInKb = 0L;
@@ -427,6 +433,8 @@ public class Utils {
sizeInKb = size * 1024L * 1024L;
} else if (strMemSize.endsWith("m") || strMemSize.endsWith("M")) {
sizeInKb = size * 1024L;
+ } else {
+ sizeInKb = size / 1024L;
}
return sizeInKb;