azkaban-developers

Move flowSetupTimer from CommonMetrics to ExecMetrics (#2138) *

3/5/2019 11:36:59 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 7505be4..05ada78 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -18,7 +18,6 @@ package azkaban.metrics;
 
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -40,8 +39,6 @@ public class CommonMetrics {
   public static final String SUBMIT_FLOW_SKIP_METER_NAME = "submit-flow-skip-meter";
   public static final String OOM_WAITING_JOB_COUNT_NAME = "OOM-waiting-job-count";
   public static final String QUEUE_WAIT_HISTOGRAM_NAME = "queue-wait-histogram";
-  public static final String FLOW_SETUP_TIMER_NAME = "flow-setup-timer";
-
 
   private final AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
   private final MetricsManager metricsManager;
@@ -54,7 +51,6 @@ public class CommonMetrics {
   private Meter submitFlowFailMeter;
   private Meter submitFlowSkipMeter;
   private Histogram queueWaitMeter;
-  private Timer flowSetupTimer;
 
   @Inject
   public CommonMetrics(final MetricsManager metricsManager) {
@@ -73,7 +69,6 @@ public class CommonMetrics {
     this.submitFlowSkipMeter = this.metricsManager.addMeter(SUBMIT_FLOW_SKIP_METER_NAME);
     this.metricsManager.addGauge(OOM_WAITING_JOB_COUNT_NAME, this.OOMWaitingJobCount::get);
     this.queueWaitMeter = this.metricsManager.addHistogram(QUEUE_WAIT_HISTOGRAM_NAME);
-    this.flowSetupTimer = this.metricsManager.addTimer(FLOW_SETUP_TIMER_NAME);
   }
 
   /**
@@ -153,9 +148,4 @@ public class CommonMetrics {
    * @param time queue wait time for a flow.
    */
   public void addQueueWait(long time) { this.queueWaitMeter.update(time); }
-
-  /**
-   * @return the {@link Timer.Context} for the timer.
-   */
-  public Timer.Context getFlowSetupTimerContext() { return this.flowSetupTimer.time(); }
 }
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index 6099ed5..ecd6e06 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -16,12 +16,10 @@
 
 package azkaban.metrics;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.*;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -42,24 +40,24 @@ public class CommonMetricsTest {
   public void testOOMWaitingJobMetrics() {
     final String metricName = CommonMetrics.OOM_WAITING_JOB_COUNT_NAME;
 
-    assertEquals(0, this.testUtil.getGaugeValue(metricName));
+    assertThat(this.testUtil.getGaugeValue(metricName)).isEqualTo(0);
     this.metrics.incrementOOMJobWaitCount();
-    assertEquals(1, this.testUtil.getGaugeValue(metricName));
+    assertThat(this.testUtil.getGaugeValue(metricName)).isEqualTo(1);
   }
 
   @Test
   public void testSubmitMetrics() {
-    assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME));
+    assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME)).isEqualTo(0);
     this.metrics.markSubmitFlowFail();
-    assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME));
+    assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME)).isEqualTo(1);
 
-    assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME));
+    assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME)).isEqualTo(0);
     this.metrics.markSubmitFlowSkip();
-    assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME));
+    assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME)).isEqualTo(1);
 
-    assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME));
+    assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME)).isEqualTo(0);
     this.metrics.markSubmitFlowSuccess();
-    assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME));
+    assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME)).isEqualTo(1);
   }
 
   @Test
@@ -70,25 +68,9 @@ public class CommonMetricsTest {
     this.metrics.addQueueWait(600L);
     this.metrics.addQueueWait(1000L);
     Snapshot snapshot = this.testUtil.getHistogramSnapshot(CommonMetrics.QUEUE_WAIT_HISTOGRAM_NAME);
-    assertEquals(600, snapshot.getMedian(), delta);
-    assertEquals(700, snapshot.getMean(), delta);
-    assertEquals(500, snapshot.getMin(), delta);
-    assertEquals(1000, snapshot.getMax(), delta);
-  }
-
-  @Test
-  public void testFlowSetupMetrics() throws InterruptedException {
-    assertEquals(0, this.testUtil.getTimerCount(CommonMetrics.FLOW_SETUP_TIMER_NAME));
-    Timer.Context context = this.metrics.getFlowSetupTimerContext();
-    try {
-      Thread.sleep(100);
-    }
-    finally {
-      context.stop();
-    }
-    assertEquals(1, this.testUtil.getTimerCount(CommonMetrics.FLOW_SETUP_TIMER_NAME));
-    Snapshot snapshot = this.testUtil.getTimerSnapshot(CommonMetrics.FLOW_SETUP_TIMER_NAME);
-    double val = snapshot.getMax();
-    assertTrue(snapshot.getMax() > 100);
+    assertThat(snapshot.getMedian()).isCloseTo(600.0, within(delta));
+    assertThat(snapshot.getMean()).isCloseTo(700.0, within(delta));
+    assertThat(snapshot.getMin()).isEqualTo(500);
+    assertThat( snapshot.getMax()).isEqualTo(1000);
   }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
index b0e544f..b527e6b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
 
 import azkaban.execapp.metric.ProjectCacheHitRatio;
 import azkaban.metrics.MetricsManager;
+import com.codahale.metrics.Timer;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 
@@ -26,8 +27,13 @@ import javax.inject.Singleton;
  */
 @Singleton
 public class ExecMetrics {
+  public static final String NUM_RUNNING_FLOWS_NAME = "EXEC-NumRunningFlows";
+  public static final String NUM_QUEUED_FLOWS_NAME = "EXEC-NumQueuedFlows";
+  public static final String PROJECT_DIR_CACHE_HIT_RATIO_NAME = "EXEC-ProjectDirCacheHitRatio";
+  public static final String FLOW_SETUP_TIMER_NAME = "EXEC-flow-setup-timer";
 
   private final MetricsManager metricsManager;
+  private Timer flowSetupTimer;
   private final ProjectCacheHitRatio projectCacheHitRatio;
 
   @Inject
@@ -37,6 +43,7 @@ public class ExecMetrics {
     this.projectCacheHitRatio = new ProjectCacheHitRatio();
     metricsManager.addGauge("EXEC-ProjectDirCacheHitRatio",
         this.projectCacheHitRatio::getRatio);
+    this.flowSetupTimer = this.metricsManager.addTimer(FLOW_SETUP_TIMER_NAME);
   }
 
   ProjectCacheHitRatio getProjectCacheHitRatio() {
@@ -45,8 +52,13 @@ public class ExecMetrics {
 
   public void addFlowRunnerManagerMetrics(final FlowRunnerManager flowRunnerManager) {
     this.metricsManager
-        .addGauge("EXEC-NumRunningFlows", flowRunnerManager::getNumRunningFlows);
+        .addGauge(NUM_RUNNING_FLOWS_NAME, flowRunnerManager::getNumRunningFlows);
     this.metricsManager
-        .addGauge("EXEC-NumQueuedFlows", flowRunnerManager::getNumQueuedFlows);
+        .addGauge(NUM_QUEUED_FLOWS_NAME, flowRunnerManager::getNumQueuedFlows);
   }
+
+  /**
+   * @return the {@link Timer.Context} for the timer.
+   */
+  public Timer.Context getFlowSetupTimerContext() { return this.flowSetupTimer.time(); }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 78bebac..df9b74c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -387,7 +387,7 @@ public class FlowRunnerManager implements EventListener,
     this.commonMetrics.addQueueWait(System.currentTimeMillis() -
         flow.getExecutableFlow().getSubmitTime());
 
-    final Timer.Context flowPrepTimerContext = this.commonMetrics.getFlowSetupTimerContext();
+    final Timer.Context flowPrepTimerContext = execMetrics.getFlowSetupTimerContext();
 
     try {
       if (this.active || isExecutorSpecified(flow)) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java
new file mode 100644
index 0000000..6790ae3
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2019 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import static org.assertj.core.api.Assertions.*;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsTestUtility;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.junit.Before;
+import org.junit.Test;
+
+/** Tests for executor metrics */
+public class ExecMetricsTest {
+
+  private MetricsTestUtility testUtil;
+  private ExecMetrics metrics;
+
+  @Before
+  public void setUp() {
+    final MetricRegistry metricRegistry = new MetricRegistry();
+    this.testUtil = new MetricsTestUtility(metricRegistry);
+    this.metrics = new ExecMetrics(new MetricsManager(metricRegistry));
+  }
+
+  @Test
+  public void testFlowSetupMetrics() throws InterruptedException {
+    assertThat(this.testUtil.getTimerCount(ExecMetrics.FLOW_SETUP_TIMER_NAME)).isEqualTo(0);
+    Timer.Context context = this.metrics.getFlowSetupTimerContext();
+    try {
+      Thread.sleep(10);
+    }
+    finally {
+      context.stop();
+    }
+    assertThat(this.testUtil.getTimerCount(ExecMetrics.FLOW_SETUP_TIMER_NAME)).isEqualTo(1);
+    Snapshot snapshot = this.testUtil.getTimerSnapshot(ExecMetrics.FLOW_SETUP_TIMER_NAME);
+    double val = snapshot.getMax();
+    assertThat(snapshot.getMax()).isGreaterThanOrEqualTo(10);
+  }
+
+}