Details
diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java b/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
index 0a49408..b2aebe1 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
@@ -29,7 +29,7 @@ public class ServerInternals {
// Memory check retry interval when OOM in ms
- public static final long MEMORY_CHECK_INTERVAL = 1000*60*1;
+ public static final long MEMORY_CHECK_INTERVAL_MS = 1000 * 60 * 1;
// Max number of memory check retry
public static final int MEMORY_CHECK_RETRY_LIMIT = 720;
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 653349d..4029d46 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -17,6 +17,7 @@
package azkaban.jobExecutor;
import azkaban.constants.ServerInternals;
+import azkaban.metrics.CommonMetrics;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
@@ -84,18 +85,25 @@ public class ProcessJob extends AbstractProcessJob {
isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
if (isMemGranted) {
info(String.format("Memory granted (Xms %d kb, Xmx %d kb) from system for job %s", memPair.getFirst(), memPair.getSecond(), getId()));
+ if(attempt > 1) {
+ CommonMetrics.INSTANCE.decrementOOMJobWaitCount();
+ }
break;
}
if (attempt < ServerInternals.MEMORY_CHECK_RETRY_LIMIT) {
- info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s", TimeUnit.MILLISECONDS.toSeconds(ServerInternals.MEMORY_CHECK_INTERVAL), attempt, ServerInternals.MEMORY_CHECK_RETRY_LIMIT));
+ info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s", TimeUnit.MILLISECONDS.toSeconds(ServerInternals.MEMORY_CHECK_INTERVAL_MS), attempt, ServerInternals.MEMORY_CHECK_RETRY_LIMIT));
+ if (attempt == 1) {
+ CommonMetrics.INSTANCE.incrementOOMJobWaitCount();
+ }
synchronized (this) {
try {
- this.wait(ServerInternals.MEMORY_CHECK_INTERVAL);
+ this.wait(ServerInternals.MEMORY_CHECK_INTERVAL_MS);
} catch (InterruptedException e) {
info(String.format("Job %s interrupted while waiting for memory check retry", getId()));
}
}
if(killed) {
+ CommonMetrics.INSTANCE.decrementOOMJobWaitCount();
info(String.format("Job %s was killed while waiting for memory check retry", getId()));
return;
}
@@ -103,9 +111,8 @@ public class ProcessJob extends AbstractProcessJob {
}
if (!isMemGranted) {
- throw new Exception(
- String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s", memPair.getFirst(),
- memPair.getSecond(), getId()));
+ CommonMetrics.INSTANCE.decrementOOMJobWaitCount();
+ handleError(oomMsg, null);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 80371a8..fcd4c64 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -32,7 +32,7 @@ public enum CommonMetrics {
private Meter dbConnectionMeter;
private Meter flowFailMeter;
private AtomicLong dbConnectionTime = new AtomicLong(0L);
-
+ private AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
private MetricRegistry registry;
CommonMetrics() {
@@ -43,6 +43,7 @@ public enum CommonMetrics {
private void setupAllMetrics() {
dbConnectionMeter = MetricsUtility.addMeter("DB-Connection-meter", registry);
flowFailMeter = MetricsUtility.addMeter("flow-fail-meter", registry);
+ MetricsUtility.addGauge("OOM-waiting-job-count", registry, OOMWaitingJobCount::get);
MetricsUtility.addGauge("dbConnectionTime", registry, dbConnectionTime::get);
}
@@ -71,4 +72,20 @@ public enum CommonMetrics {
public void setDBConnectionTime(long milliseconds) {
dbConnectionTime.set(milliseconds);
}
+
+ /**
+ * Mark the occurrence of an job waiting event due to OOM
+ */
+ public void incrementOOMJobWaitCount() {
+ OOMWaitingJobCount.incrementAndGet();
+ }
+
+ /**
+ * Unmark the occurrence of an job waiting event due to OOM
+ */
+ public void decrementOOMJobWaitCount() {
+ OOMWaitingJobCount.decrementAndGet();
+ }
+
+
}
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index ba5c825..0d3c345 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -52,4 +52,9 @@ public class CommonMetricsTest {
public void testDBConnectionTimeMetrics() {
MetricsTestUtility.testGauge("dbConnectionTime", dr, CommonMetrics.INSTANCE::setDBConnectionTime);
}
+
+ @Test
+ public void testOOMWaitingJobMetrics() {
+ MetricsTestUtility.testGauge("OOM-waiting-job-count", dr, CommonMetrics.INSTANCE::incrementOOMJobWaitCount);
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
index 6b25d8a..063581e 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
@@ -93,6 +93,19 @@ public class MetricsTestUtility {
Assert.assertEquals(dr.getMeter(meterName), currMeter + 3);
}
+ public static void testGauge(String meterName, DummyReporter dr, Runnable runnable) {
+ sleepMillis(20);
+ long currMeter = Long.valueOf(dr.getGauge(meterName));
+ runnable.run();
+ sleepMillis(20);
+ Assert.assertEquals(Long.valueOf(dr.getGauge(meterName)).longValue(), currMeter + 1);
+
+ runnable.run();
+ runnable.run();
+ sleepMillis(20);
+ Assert.assertEquals(Long.valueOf(dr.getGauge(meterName)).longValue(), currMeter + 3);
+ }
+
public static void testGauge(String GaugeName, DummyReporter dr, Consumer<Long> func) {
func.accept(1L);