azkaban-aplcache

add metrics for jobs waiting when OOM (#969) 1. Track the number

4/13/2017 9:44:00 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java b/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
index 0a49408..b2aebe1 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
@@ -29,7 +29,7 @@ public class ServerInternals {
 
 
   // Memory check retry interval when OOM in ms
-  public static final long MEMORY_CHECK_INTERVAL = 1000*60*1;
+  public static final long MEMORY_CHECK_INTERVAL_MS = 1000 * 60 * 1;
 
   // Max number of memory check retry
   public static final int MEMORY_CHECK_RETRY_LIMIT = 720;
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 653349d..4029d46 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -17,6 +17,7 @@
 package azkaban.jobExecutor;
 
 import azkaban.constants.ServerInternals;
+import azkaban.metrics.CommonMetrics;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
@@ -84,18 +85,25 @@ public class ProcessJob extends AbstractProcessJob {
         isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
         if (isMemGranted) {
           info(String.format("Memory granted (Xms %d kb, Xmx %d kb) from system for job %s", memPair.getFirst(), memPair.getSecond(), getId()));
+          if(attempt > 1) {
+            CommonMetrics.INSTANCE.decrementOOMJobWaitCount();
+          }
           break;
         }
         if (attempt < ServerInternals.MEMORY_CHECK_RETRY_LIMIT) {
-          info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s", TimeUnit.MILLISECONDS.toSeconds(ServerInternals.MEMORY_CHECK_INTERVAL), attempt, ServerInternals.MEMORY_CHECK_RETRY_LIMIT));
+          info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s", TimeUnit.MILLISECONDS.toSeconds(ServerInternals.MEMORY_CHECK_INTERVAL_MS), attempt, ServerInternals.MEMORY_CHECK_RETRY_LIMIT));
+          if (attempt == 1) {
+            CommonMetrics.INSTANCE.incrementOOMJobWaitCount();
+          }
           synchronized (this) {
             try {
-              this.wait(ServerInternals.MEMORY_CHECK_INTERVAL);
+              this.wait(ServerInternals.MEMORY_CHECK_INTERVAL_MS);
             } catch (InterruptedException e) {
               info(String.format("Job %s interrupted while waiting for memory check retry", getId()));
             }
           }
           if(killed) {
+            CommonMetrics.INSTANCE.decrementOOMJobWaitCount();
             info(String.format("Job %s was killed while waiting for memory check retry", getId()));
             return;
           }
@@ -103,9 +111,8 @@ public class ProcessJob extends AbstractProcessJob {
       }
 
       if (!isMemGranted) {
-        throw new Exception(
-            String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s", memPair.getFirst(),
-                memPair.getSecond(), getId()));
+        CommonMetrics.INSTANCE.decrementOOMJobWaitCount();
+        handleError(oomMsg, null);
       }
     }
 
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 80371a8..fcd4c64 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -32,7 +32,7 @@ public enum CommonMetrics {
   private Meter dbConnectionMeter;
   private Meter flowFailMeter;
   private AtomicLong dbConnectionTime = new AtomicLong(0L);
-
+  private AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
   private MetricRegistry registry;
 
   CommonMetrics() {
@@ -43,6 +43,7 @@ public enum CommonMetrics {
   private void setupAllMetrics() {
     dbConnectionMeter = MetricsUtility.addMeter("DB-Connection-meter", registry);
     flowFailMeter = MetricsUtility.addMeter("flow-fail-meter", registry);
+    MetricsUtility.addGauge("OOM-waiting-job-count", registry, OOMWaitingJobCount::get);
     MetricsUtility.addGauge("dbConnectionTime", registry, dbConnectionTime::get);
   }
 
@@ -71,4 +72,20 @@ public enum CommonMetrics {
   public void setDBConnectionTime(long milliseconds) {
     dbConnectionTime.set(milliseconds);
   }
+
+  /**
+   * Mark the occurrence of an job waiting event due to OOM
+   */
+  public void incrementOOMJobWaitCount() {
+    OOMWaitingJobCount.incrementAndGet();
+  }
+
+  /**
+   * Unmark the occurrence of an job waiting event due to OOM
+   */
+  public void decrementOOMJobWaitCount() {
+    OOMWaitingJobCount.decrementAndGet();
+  }
+
+
 }
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index ba5c825..0d3c345 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -52,4 +52,9 @@ public class CommonMetricsTest {
   public void testDBConnectionTimeMetrics() {
     MetricsTestUtility.testGauge("dbConnectionTime", dr, CommonMetrics.INSTANCE::setDBConnectionTime);
   }
+
+  @Test
+  public void testOOMWaitingJobMetrics() {
+    MetricsTestUtility.testGauge("OOM-waiting-job-count", dr, CommonMetrics.INSTANCE::incrementOOMJobWaitCount);
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
index 6b25d8a..063581e 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
@@ -93,6 +93,19 @@ public class MetricsTestUtility {
     Assert.assertEquals(dr.getMeter(meterName), currMeter + 3);
   }
 
+  public static void testGauge(String meterName, DummyReporter dr, Runnable runnable) {
+    sleepMillis(20);
+    long currMeter = Long.valueOf(dr.getGauge(meterName));
+    runnable.run();
+    sleepMillis(20);
+    Assert.assertEquals(Long.valueOf(dr.getGauge(meterName)).longValue(), currMeter + 1);
+
+    runnable.run();
+    runnable.run();
+    sleepMillis(20);
+    Assert.assertEquals(Long.valueOf(dr.getGauge(meterName)).longValue(), currMeter + 3);
+  }
+
   public static void testGauge(String GaugeName, DummyReporter dr, Consumer<Long> func) {
     func.accept(1L);