azkaban-developers

Merge pull request #428 from weikang2002/master should

5/15/2015 9:07:45 AM

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index 13959f8..134602b 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -22,8 +22,11 @@ import java.util.List;
 
 import org.apache.log4j.Logger;
 
+import azkaban.server.AzkabanServer;
+import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.Utils;
 
 public class JavaProcessJob extends ProcessJob {
   public static final String CLASSPATH = "classpath";
@@ -149,12 +152,30 @@ public class JavaProcessJob extends ProcessJob {
     return "";
   }
 
-  protected Pair<Long, Long> getProcMemoryRequirement() {
-    String strInitMemSize = getInitialMemorySize();
-    String strMaxMemSize = getMaxMemorySize();
-    long initMemSize = azkaban.utils.Utils.parseMemString(strInitMemSize);
-    long maxMemSize = azkaban.utils.Utils.parseMemString(strMaxMemSize);
+  protected Pair<Long, Long> getProcMemoryRequirement() throws Exception {
+    String strXms = getInitialMemorySize();
+    String strXmx = getMaxMemorySize();
+    long xms = Utils.parseMemString(strXms);
+    long xmx = Utils.parseMemString(strXmx);
+
+    Props azkabanProperties = AzkabanServer.getAzkabanProperties();
+    if (azkabanProperties != null) {
+      String maxXms = azkabanProperties.getString(DirectoryFlowLoader.JOB_MAX_XMS, DirectoryFlowLoader.MAX_XMS_DEFAULT);
+      String maxXmx = azkabanProperties.getString(DirectoryFlowLoader.JOB_MAX_XMX, DirectoryFlowLoader.MAX_XMX_DEFAULT);
+      long sizeMaxXms = Utils.parseMemString(maxXms);
+      long sizeMaxXmx = Utils.parseMemString(maxXmx);
+
+      if (xms > sizeMaxXms) {
+        throw new Exception(String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
+                getId(), maxXms));
+      }
+
+      if (xmx > sizeMaxXmx) {
+        throw new Exception(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
+                getId(), maxXms));
+      }
+    }
 
-    return new Pair<Long, Long>(initMemSize, maxMemSize);
+    return new Pair<Long, Long>(xms, xmx);
   }
 }
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index c6e0260..29596d1 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -122,7 +122,7 @@ public class ProcessJob extends AbstractProcessJob {
    *  
    * @return pair of min/max memory size
    */
-  protected Pair<Long, Long> getProcMemoryRequirement() {
+  protected Pair<Long, Long> getProcMemoryRequirement() throws Exception {
     return new Pair<Long, Long>(0L, 0L);
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
index 45f49f2..7f50989 100644
--- a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
+++ b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
@@ -38,9 +38,15 @@ public abstract class AzkabanServer {
   public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
       "azkaban.private.properties";
   public static final String DEFAULT_CONF_PATH = "conf";
+  private static Props azkabanProperties = null;
 
   public static Props loadProps(String[] args) {
-    return loadProps(args, new OptionParser());
+    azkabanProperties = loadProps(args, new OptionParser());
+    return azkabanProperties;
+  }
+
+  public static Props getAzkabanProperties() {
+    return azkabanProperties;
   }
 
   public static Props loadProps(String[] args, OptionParser parser) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
index 1a6610e..ad7a48f 100644
--- a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
@@ -44,10 +44,10 @@ public class DirectoryFlowLoader implements ProjectValidator {
   private static final DirFilter DIR_FILTER = new DirFilter();
   private static final String PROPERTY_SUFFIX = ".properties";
   private static final String JOB_SUFFIX = ".job";
-  private static final String JOB_MAX_XMS = "job.max.Xms";
-  private static final String MAX_XMS_DEFAULT = "1G";
-  private static final String JOB_MAX_XMX = "job.max.Xmx";
-  private static final String MAX_XMX_DEFAULT = "2G";
+  public static final String JOB_MAX_XMS = "job.max.Xms";
+  public static final String MAX_XMS_DEFAULT = "1G";
+  public static final String JOB_MAX_XMX = "job.max.Xmx";
+  public static final String MAX_XMX_DEFAULT = "2G";
   private static final String XMS = "Xms";
   private static final String XMX = "Xmx";
 
@@ -379,18 +379,20 @@ public class DirectoryFlowLoader implements ProjectValidator {
   private void jobPropertiesCheck() {
     String maxXms = props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
     String maxXmx = props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
-    long sizeMaxXms = azkaban.utils.Utils.parseMemString(maxXms);
-    long sizeMaxXmx = azkaban.utils.Utils.parseMemString(maxXmx);
+    long sizeMaxXms = Utils.parseMemString(maxXms);
+    long sizeMaxXmx = Utils.parseMemString(maxXmx);
 
     for (String jobName : jobPropsMap.keySet()) {
-      Props resolvedJobProps = PropsUtils.resolveProps(jobPropsMap.get(jobName));
-      String xms = resolvedJobProps.getString(XMS, null);
-      if (xms != null && azkaban.utils.Utils.parseMemString(xms) > sizeMaxXms) {
+      Props jobProps = jobPropsMap.get(jobName);
+      String xms = jobProps.getString(XMS, null);
+      if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms) 
+              && Utils.parseMemString(xms) > sizeMaxXms) {
         errors.add(String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
                 jobName, maxXms));
       }
-      String xmx = resolvedJobProps.getString(XMX, null);
-      if (xmx != null && azkaban.utils.Utils.parseMemString(xmx) > sizeMaxXmx) {
+      String xmx = jobProps.getString(XMX, null);
+      if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
+              && Utils.parseMemString(xmx) > sizeMaxXmx) {
         errors.add(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
                 jobName, maxXmx));
       }
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 6fe0ac7..596e8b3 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -142,6 +142,11 @@ public class PropsUtils {
   private static final Pattern VARIABLE_REPLACEMENT_PATTERN = Pattern
       .compile("\\$\\{([a-zA-Z_.0-9]+)\\}");
 
+  public static boolean isVarialbeReplacementPattern(String str) {
+    Matcher matcher = VARIABLE_REPLACEMENT_PATTERN.matcher(str);
+    return matcher.matches();
+  }
+
   public static Props resolveProps(Props props) {
     if (props == null)
       return null;