azkaban-aplcache

Unify jvm memory settings validation & improve upload error

2/26/2019 11:02:12 PM

Details

diff --git a/az-core/src/main/java/azkaban/utils/MemConfValue.java b/az-core/src/main/java/azkaban/utils/MemConfValue.java
new file mode 100644
index 0000000..dbefe71
--- /dev/null
+++ b/az-core/src/main/java/azkaban/utils/MemConfValue.java
@@ -0,0 +1,45 @@
+package azkaban.utils;
+
+import azkaban.Constants;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+
+public class MemConfValue {
+
+  private final String string;
+  private final long size;
+
+  public static MemConfValue parseMaxXms(final Props props) {
+    return parse(props,
+        Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
+  }
+
+  public static MemConfValue parseMaxXmx(final Props props) {
+    return parse(props,
+        Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
+  }
+
+  private static MemConfValue parse(final Props props, final String key,
+      final String defaultValue) {
+    final String stringValue = props.getString(key, defaultValue);
+    Preconditions.checkArgument(!StringUtils.isBlank(stringValue),
+        String.format("%s must not have an empty value. "
+            + "Remove the property to use default or specify a valid value.", key));
+    final long size = Utils.parseMemString(stringValue);
+    return new MemConfValue(stringValue, size);
+  }
+
+  private MemConfValue(final String string, final long size) {
+    this.string = string;
+    this.size = size;
+  }
+
+  public String getString() {
+    return this.string;
+  }
+
+  public long getSize() {
+    return this.size;
+  }
+
+}
diff --git a/az-core/src/main/java/azkaban/utils/Utils.java b/az-core/src/main/java/azkaban/utils/Utils.java
index 0a6fffa..e27e643 100644
--- a/az-core/src/main/java/azkaban/utils/Utils.java
+++ b/az-core/src/main/java/azkaban/utils/Utils.java
@@ -443,7 +443,7 @@ public class Utils {
       return 0L;
     }
 
-    long size = 0L;
+    final long size;
     if (strMemSize.endsWith("g") || strMemSize.endsWith("G")
         || strMemSize.endsWith("m") || strMemSize.endsWith("M")
         || strMemSize.endsWith("k") || strMemSize.endsWith("K")) {
@@ -453,7 +453,7 @@ public class Utils {
       size = Long.parseLong(strMemSize);
     }
 
-    long sizeInKb = 0L;
+    final long sizeInKb;
     if (strMemSize.endsWith("g") || strMemSize.endsWith("G")) {
       sizeInKb = size * 1024L * 1024L;
     } else if (strMemSize.endsWith("m") || strMemSize.endsWith("M")) {
diff --git a/az-core/src/test/java/azkaban/utils/MemConfValueTest.java b/az-core/src/test/java/azkaban/utils/MemConfValueTest.java
new file mode 100644
index 0000000..815bf0b
--- /dev/null
+++ b/az-core/src/test/java/azkaban/utils/MemConfValueTest.java
@@ -0,0 +1,55 @@
+package azkaban.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.junit.Assert.assertEquals;
+
+import azkaban.Constants;
+import org.junit.Test;
+
+public class MemConfValueTest {
+
+  @Test
+  public void parseMaxXmX() {
+    assertXmx(Props.of(Constants.JobProperties.JOB_MAX_XMX, "1K"), "1K", 1L);
+  }
+
+  @Test
+  public void parseMaxXmXDefault() {
+    assertXmx(new Props(), Constants.JobProperties.MAX_XMX_DEFAULT, 2097152L);
+  }
+
+  @Test
+  public void parseMaxXms() {
+    assertXms(Props.of(Constants.JobProperties.JOB_MAX_XMS, "1K"), "1K", 1L);
+  }
+
+  @Test
+  public void parseMaxXmsDefault() {
+    assertXms(new Props(), Constants.JobProperties.MAX_XMS_DEFAULT, 1048576L);
+  }
+
+  @Test
+  public void parseEmptyThrows() {
+    final Throwable thrown = catchThrowable(() ->
+        MemConfValue.parseMaxXmx(Props.of(Constants.JobProperties.JOB_MAX_XMX, "")));
+    assertThat(thrown).isInstanceOf(IllegalArgumentException.class);
+    assertThat(thrown).hasMessage("job.max.Xmx must not have an empty value. "
+        + "Remove the property to use default or specify a valid value.");
+  }
+
+  private static void assertXmx(final Props props, final String expectedString, final long expectedSize) {
+    assertMemConf(MemConfValue.parseMaxXmx(props), expectedString, expectedSize);
+  }
+
+  private static void assertXms(final Props props, final String expectedString, final long expectedSize) {
+    assertMemConf(MemConfValue.parseMaxXms(props), expectedString, expectedSize);
+  }
+
+  private static void assertMemConf(final MemConfValue memConf, final String expectedString,
+      final long expectedSize) {
+    assertEquals(expectedString, memConf.getString());
+    assertEquals(expectedSize, memConf.getSize());
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index f87f7ab..21974c5 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -16,8 +16,8 @@
 
 package azkaban.jobExecutor;
 
-import azkaban.Constants;
 import azkaban.server.AzkabanServer;
+import azkaban.utils.MemConfValue;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
@@ -160,23 +160,19 @@ public class JavaProcessJob extends ProcessJob {
 
     final Props azkabanProperties = AzkabanServer.getAzkabanProperties();
     if (azkabanProperties != null) {
-      final String maxXms = azkabanProperties
-          .getString(Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
-      final String maxXmx = azkabanProperties
-          .getString(Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
-      final long sizeMaxXms = Utils.parseMemString(maxXms);
-      final long sizeMaxXmx = Utils.parseMemString(maxXmx);
-
-      if (xms > sizeMaxXms) {
+      final MemConfValue maxXms = MemConfValue.parseMaxXms(azkabanProperties);
+      final MemConfValue maxXmx = MemConfValue.parseMaxXmx(azkabanProperties);
+
+      if (xms > maxXms.getSize()) {
         throw new Exception(
             String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
-                getId(), maxXms));
+                getId(), maxXms.getString()));
       }
 
-      if (xmx > sizeMaxXmx) {
+      if (xmx > maxXmx.getSize()) {
         throw new Exception(
             String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
-                getId(), maxXmx));
+                getId(), maxXmx.getString()));
       }
     }
 
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index 29b5ea4..95606b5 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -20,6 +20,7 @@ import azkaban.flow.CommonJobProperties;
 import azkaban.flow.Flow;
 import azkaban.jobcallback.JobCallbackValidator;
 import azkaban.project.validator.ValidationReport;
+import azkaban.utils.MemConfValue;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.Utils;
@@ -253,28 +254,24 @@ public class FlowLoaderUtils {
       return;
     }
 
-    final String maxXms = props.getString(
-        Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
-    final String maxXmx = props.getString(
-        Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
-    final long sizeMaxXms = Utils.parseMemString(maxXms);
-    final long sizeMaxXmx = Utils.parseMemString(maxXmx);
+    final MemConfValue maxXms = MemConfValue.parseMaxXms(props);
+    final MemConfValue maxXmx = MemConfValue.parseMaxXmx(props);
 
     for (final String jobName : jobPropsMap.keySet()) {
       final Props jobProps = jobPropsMap.get(jobName);
       final String xms = jobProps.getString(XMS, null);
       if (xms != null && !PropsUtils.isVariableReplacementPattern(xms)
-          && Utils.parseMemString(xms) > sizeMaxXms) {
+          && Utils.parseMemString(xms) > maxXms.getSize()) {
         errors.add(String.format(
             "%s: Xms value has exceeded the allowed limit (max Xms = %s)",
-            jobName, maxXms));
+            jobName, maxXms.getString()));
       }
       final String xmx = jobProps.getString(XMX, null);
       if (xmx != null && !PropsUtils.isVariableReplacementPattern(xmx)
-          && Utils.parseMemString(xmx) > sizeMaxXmx) {
+          && Utils.parseMemString(xmx) > maxXmx.getSize()) {
         errors.add(String.format(
             "%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
-            jobName, maxXmx));
+            jobName, maxXmx.getString()));
       }
 
       // job callback properties check