azkaban-developers

Replace Counter with AtomicLong for counter-based metrics

3/19/2019 7:45:09 PM

Details

diff --git a/az-core/src/main/java/azkaban/metrics/MetricsManager.java b/az-core/src/main/java/azkaban/metrics/MetricsManager.java
index f935eec..ef1cca0 100644
--- a/az-core/src/main/java/azkaban/metrics/MetricsManager.java
+++ b/az-core/src/main/java/azkaban/metrics/MetricsManager.java
@@ -20,6 +20,7 @@ import static azkaban.Constants.ConfigurationKeys.CUSTOM_METRICS_REPORTER_CLASS_
 import static azkaban.Constants.ConfigurationKeys.METRICS_SERVER_URL;
 
 import azkaban.utils.Props;
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
@@ -81,6 +82,13 @@ public class MetricsManager {
   }
 
   /**
+   * A {@link Counter} is just a gauge for an AtomicLong instance.
+   */
+  public Counter addCounter(final String name) {
+    return this.registry.counter(name);
+  }
+
+  /**
    * A {@link Histogram} measures the statistical distribution of values in a stream of data. In
    * addition to minimum, maximum, mean, etc., it also measures median, 75th,
    * 90th, 95th, 98th, 99th, and 99.9th percentiles.
diff --git a/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java b/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java
index 0c7d59b..1e2f8e3 100644
--- a/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java
+++ b/az-core/src/test/java/azkaban/metrics/MetricsTestUtility.java
@@ -16,6 +16,7 @@
 
 package azkaban.metrics;
 
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
@@ -38,6 +39,13 @@ public class MetricsTestUtility {
     return (long) this.registry.getGauges().get(name).getValue();
   }
 
+  /**
+   * @return the value for the specified {@link Counter}
+   */
+  public long getCounterValue(final String name) {
+    return this.registry.getCounters().get(name).getCount();
+  }
+
   /** @return the value for the specified {@link Meter} */
   public long getMeterValue(final String name) {
     return this.registry.getMeters().get(name).getCount();
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 05ada78..defe77a 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -16,9 +16,9 @@
 
 package azkaban.metrics;
 
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 
@@ -40,7 +40,7 @@ public class CommonMetrics {
   public static final String OOM_WAITING_JOB_COUNT_NAME = "OOM-waiting-job-count";
   public static final String QUEUE_WAIT_HISTOGRAM_NAME = "queue-wait-histogram";
 
-  private final AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
+  private Counter OOMWaitingJobCount;
   private final MetricsManager metricsManager;
   private Meter flowFailMeter;
   private Meter dispatchFailMeter;
@@ -67,7 +67,7 @@ public class CommonMetrics {
     this.submitFlowSuccessMeter = this.metricsManager.addMeter(SUBMIT_FLOW_SUCCESS_METER_NAME);
     this.submitFlowFailMeter = this.metricsManager.addMeter(SUBMIT_FLOW_FAIL_METER_NAME);
     this.submitFlowSkipMeter = this.metricsManager.addMeter(SUBMIT_FLOW_SKIP_METER_NAME);
-    this.metricsManager.addGauge(OOM_WAITING_JOB_COUNT_NAME, this.OOMWaitingJobCount::get);
+    this.OOMWaitingJobCount = this.metricsManager.addCounter(OOM_WAITING_JOB_COUNT_NAME);
     this.queueWaitMeter = this.metricsManager.addHistogram(QUEUE_WAIT_HISTOGRAM_NAME);
   }
 
@@ -132,14 +132,14 @@ public class CommonMetrics {
    * Mark the occurrence of an job waiting event due to OOM
    */
   public void incrementOOMJobWaitCount() {
-    this.OOMWaitingJobCount.incrementAndGet();
+    this.OOMWaitingJobCount.inc();
   }
 
   /**
    * Unmark the occurrence of an job waiting event due to OOM
    */
   public void decrementOOMJobWaitCount() {
-    this.OOMWaitingJobCount.decrementAndGet();
+    this.OOMWaitingJobCount.dec();
   }
 
   /**
@@ -147,5 +147,7 @@ public class CommonMetrics {
    *
    * @param time queue wait time for a flow.
    */
-  public void addQueueWait(long time) { this.queueWaitMeter.update(time); }
+  public void addQueueWait(final long time) {
+    this.queueWaitMeter.update(time);
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index ecd6e06..d1024f9 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -16,7 +16,8 @@
 
 package azkaban.metrics;
 
-import static org.assertj.core.api.Assertions.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.within;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Snapshot;
@@ -40,9 +41,12 @@ public class CommonMetricsTest {
   public void testOOMWaitingJobMetrics() {
     final String metricName = CommonMetrics.OOM_WAITING_JOB_COUNT_NAME;
 
-    assertThat(this.testUtil.getGaugeValue(metricName)).isEqualTo(0);
+    assertThat(this.testUtil.getCounterValue(metricName)).isEqualTo(0);
     this.metrics.incrementOOMJobWaitCount();
-    assertThat(this.testUtil.getGaugeValue(metricName)).isEqualTo(1);
+    assertThat(this.testUtil.getCounterValue(metricName)).isEqualTo(1);
+
+    this.metrics.decrementOOMJobWaitCount();
+    assertThat(this.testUtil.getCounterValue(metricName)).isEqualTo(0);
   }
 
   @Test
@@ -67,7 +71,8 @@ public class CommonMetricsTest {
     this.metrics.addQueueWait(500L);
     this.metrics.addQueueWait(600L);
     this.metrics.addQueueWait(1000L);
-    Snapshot snapshot = this.testUtil.getHistogramSnapshot(CommonMetrics.QUEUE_WAIT_HISTOGRAM_NAME);
+    final Snapshot snapshot = this.testUtil
+        .getHistogramSnapshot(CommonMetrics.QUEUE_WAIT_HISTOGRAM_NAME);
     assertThat(snapshot.getMedian()).isCloseTo(600.0, within(delta));
     assertThat(snapshot.getMean()).isCloseTo(700.0, within(delta));
     assertThat(snapshot.getMin()).isEqualTo(500);