Details
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index 13959f8..134602b 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -22,8 +22,11 @@ import java.util.List;
import org.apache.log4j.Logger;
+import azkaban.server.AzkabanServer;
+import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.Utils;
public class JavaProcessJob extends ProcessJob {
public static final String CLASSPATH = "classpath";
@@ -149,12 +152,30 @@ public class JavaProcessJob extends ProcessJob {
return "";
}
- protected Pair<Long, Long> getProcMemoryRequirement() {
- String strInitMemSize = getInitialMemorySize();
- String strMaxMemSize = getMaxMemorySize();
- long initMemSize = azkaban.utils.Utils.parseMemString(strInitMemSize);
- long maxMemSize = azkaban.utils.Utils.parseMemString(strMaxMemSize);
+ protected Pair<Long, Long> getProcMemoryRequirement() throws Exception {
+ String strXms = getInitialMemorySize();
+ String strXmx = getMaxMemorySize();
+ long xms = Utils.parseMemString(strXms);
+ long xmx = Utils.parseMemString(strXmx);
+
+ Props azkabanProperties = AzkabanServer.getAzkabanProperties();
+ if (azkabanProperties != null) {
+ String maxXms = azkabanProperties.getString(DirectoryFlowLoader.JOB_MAX_XMS, DirectoryFlowLoader.MAX_XMS_DEFAULT);
+ String maxXmx = azkabanProperties.getString(DirectoryFlowLoader.JOB_MAX_XMX, DirectoryFlowLoader.MAX_XMX_DEFAULT);
+ long sizeMaxXms = Utils.parseMemString(maxXms);
+ long sizeMaxXmx = Utils.parseMemString(maxXmx);
+
+ if (xms > sizeMaxXms) {
+ throw new Exception(String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
+ getId(), maxXms));
+ }
+
+ if (xmx > sizeMaxXmx) {
+ throw new Exception(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
+ getId(), maxXms));
+ }
+ }
- return new Pair<Long, Long>(initMemSize, maxMemSize);
+ return new Pair<Long, Long>(xms, xmx);
}
}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index c6e0260..29596d1 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -122,7 +122,7 @@ public class ProcessJob extends AbstractProcessJob {
*
* @return pair of min/max memory size
*/
- protected Pair<Long, Long> getProcMemoryRequirement() {
+ protected Pair<Long, Long> getProcMemoryRequirement() throws Exception {
return new Pair<Long, Long>(0L, 0L);
}
diff --git a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
index 45f49f2..7f50989 100644
--- a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
+++ b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
@@ -38,9 +38,15 @@ public abstract class AzkabanServer {
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
"azkaban.private.properties";
public static final String DEFAULT_CONF_PATH = "conf";
+ private static Props azkabanProperties = null;
public static Props loadProps(String[] args) {
- return loadProps(args, new OptionParser());
+ azkabanProperties = loadProps(args, new OptionParser());
+ return azkabanProperties;
+ }
+
+ public static Props getAzkabanProperties() {
+ return azkabanProperties;
}
public static Props loadProps(String[] args, OptionParser parser) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
index 1a6610e..ad7a48f 100644
--- a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
@@ -44,10 +44,10 @@ public class DirectoryFlowLoader implements ProjectValidator {
private static final DirFilter DIR_FILTER = new DirFilter();
private static final String PROPERTY_SUFFIX = ".properties";
private static final String JOB_SUFFIX = ".job";
- private static final String JOB_MAX_XMS = "job.max.Xms";
- private static final String MAX_XMS_DEFAULT = "1G";
- private static final String JOB_MAX_XMX = "job.max.Xmx";
- private static final String MAX_XMX_DEFAULT = "2G";
+ public static final String JOB_MAX_XMS = "job.max.Xms";
+ public static final String MAX_XMS_DEFAULT = "1G";
+ public static final String JOB_MAX_XMX = "job.max.Xmx";
+ public static final String MAX_XMX_DEFAULT = "2G";
private static final String XMS = "Xms";
private static final String XMX = "Xmx";
@@ -379,18 +379,20 @@ public class DirectoryFlowLoader implements ProjectValidator {
private void jobPropertiesCheck() {
String maxXms = props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
String maxXmx = props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
- long sizeMaxXms = azkaban.utils.Utils.parseMemString(maxXms);
- long sizeMaxXmx = azkaban.utils.Utils.parseMemString(maxXmx);
+ long sizeMaxXms = Utils.parseMemString(maxXms);
+ long sizeMaxXmx = Utils.parseMemString(maxXmx);
for (String jobName : jobPropsMap.keySet()) {
- Props resolvedJobProps = PropsUtils.resolveProps(jobPropsMap.get(jobName));
- String xms = resolvedJobProps.getString(XMS, null);
- if (xms != null && azkaban.utils.Utils.parseMemString(xms) > sizeMaxXms) {
+ Props jobProps = jobPropsMap.get(jobName);
+ String xms = jobProps.getString(XMS, null);
+ if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
+ && Utils.parseMemString(xms) > sizeMaxXms) {
errors.add(String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
jobName, maxXms));
}
- String xmx = resolvedJobProps.getString(XMX, null);
- if (xmx != null && azkaban.utils.Utils.parseMemString(xmx) > sizeMaxXmx) {
+ String xmx = jobProps.getString(XMX, null);
+ if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
+ && Utils.parseMemString(xmx) > sizeMaxXmx) {
errors.add(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
jobName, maxXmx));
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 6fe0ac7..596e8b3 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -142,6 +142,11 @@ public class PropsUtils {
private static final Pattern VARIABLE_REPLACEMENT_PATTERN = Pattern
.compile("\\$\\{([a-zA-Z_.0-9]+)\\}");
+ public static boolean isVarialbeReplacementPattern(String str) {
+ Matcher matcher = VARIABLE_REPLACEMENT_PATTERN.matcher(str);
+ return matcher.matches();
+ }
+
public static Props resolveProps(Props props) {
if (props == null)
return null;