Details
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 7505be4..05ada78 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -18,7 +18,6 @@ package azkaban.metrics;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -40,8 +39,6 @@ public class CommonMetrics {
public static final String SUBMIT_FLOW_SKIP_METER_NAME = "submit-flow-skip-meter";
public static final String OOM_WAITING_JOB_COUNT_NAME = "OOM-waiting-job-count";
public static final String QUEUE_WAIT_HISTOGRAM_NAME = "queue-wait-histogram";
- public static final String FLOW_SETUP_TIMER_NAME = "flow-setup-timer";
-
private final AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
private final MetricsManager metricsManager;
@@ -54,7 +51,6 @@ public class CommonMetrics {
private Meter submitFlowFailMeter;
private Meter submitFlowSkipMeter;
private Histogram queueWaitMeter;
- private Timer flowSetupTimer;
@Inject
public CommonMetrics(final MetricsManager metricsManager) {
@@ -73,7 +69,6 @@ public class CommonMetrics {
this.submitFlowSkipMeter = this.metricsManager.addMeter(SUBMIT_FLOW_SKIP_METER_NAME);
this.metricsManager.addGauge(OOM_WAITING_JOB_COUNT_NAME, this.OOMWaitingJobCount::get);
this.queueWaitMeter = this.metricsManager.addHistogram(QUEUE_WAIT_HISTOGRAM_NAME);
- this.flowSetupTimer = this.metricsManager.addTimer(FLOW_SETUP_TIMER_NAME);
}
/**
@@ -153,9 +148,4 @@ public class CommonMetrics {
* @param time queue wait time for a flow.
*/
public void addQueueWait(long time) { this.queueWaitMeter.update(time); }
-
- /**
- * @return the {@link Timer.Context} for the timer.
- */
- public Timer.Context getFlowSetupTimerContext() { return this.flowSetupTimer.time(); }
}
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index 6099ed5..ecd6e06 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -16,12 +16,10 @@
package azkaban.metrics;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.*;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
import org.junit.Before;
import org.junit.Test;
@@ -42,24 +40,24 @@ public class CommonMetricsTest {
public void testOOMWaitingJobMetrics() {
final String metricName = CommonMetrics.OOM_WAITING_JOB_COUNT_NAME;
- assertEquals(0, this.testUtil.getGaugeValue(metricName));
+ assertThat(this.testUtil.getGaugeValue(metricName)).isEqualTo(0);
this.metrics.incrementOOMJobWaitCount();
- assertEquals(1, this.testUtil.getGaugeValue(metricName));
+ assertThat(this.testUtil.getGaugeValue(metricName)).isEqualTo(1);
}
@Test
public void testSubmitMetrics() {
- assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME));
+ assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME)).isEqualTo(0);
this.metrics.markSubmitFlowFail();
- assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME));
+ assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_FAIL_METER_NAME)).isEqualTo(1);
- assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME));
+ assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME)).isEqualTo(0);
this.metrics.markSubmitFlowSkip();
- assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME));
+ assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SKIP_METER_NAME)).isEqualTo(1);
- assertEquals(0, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME));
+ assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME)).isEqualTo(0);
this.metrics.markSubmitFlowSuccess();
- assertEquals(1, this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME));
+ assertThat(this.testUtil.getMeterValue(CommonMetrics.SUBMIT_FLOW_SUCCESS_METER_NAME)).isEqualTo(1);
}
@Test
@@ -70,25 +68,9 @@ public class CommonMetricsTest {
this.metrics.addQueueWait(600L);
this.metrics.addQueueWait(1000L);
Snapshot snapshot = this.testUtil.getHistogramSnapshot(CommonMetrics.QUEUE_WAIT_HISTOGRAM_NAME);
- assertEquals(600, snapshot.getMedian(), delta);
- assertEquals(700, snapshot.getMean(), delta);
- assertEquals(500, snapshot.getMin(), delta);
- assertEquals(1000, snapshot.getMax(), delta);
- }
-
- @Test
- public void testFlowSetupMetrics() throws InterruptedException {
- assertEquals(0, this.testUtil.getTimerCount(CommonMetrics.FLOW_SETUP_TIMER_NAME));
- Timer.Context context = this.metrics.getFlowSetupTimerContext();
- try {
- Thread.sleep(100);
- }
- finally {
- context.stop();
- }
- assertEquals(1, this.testUtil.getTimerCount(CommonMetrics.FLOW_SETUP_TIMER_NAME));
- Snapshot snapshot = this.testUtil.getTimerSnapshot(CommonMetrics.FLOW_SETUP_TIMER_NAME);
- double val = snapshot.getMax();
- assertTrue(snapshot.getMax() > 100);
+ assertThat(snapshot.getMedian()).isCloseTo(600.0, within(delta));
+ assertThat(snapshot.getMean()).isCloseTo(700.0, within(delta));
+ assertThat(snapshot.getMin()).isEqualTo(500);
+ assertThat( snapshot.getMax()).isEqualTo(1000);
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
index b0e544f..b527e6b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
import azkaban.execapp.metric.ProjectCacheHitRatio;
import azkaban.metrics.MetricsManager;
+import com.codahale.metrics.Timer;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -26,8 +27,13 @@ import javax.inject.Singleton;
*/
@Singleton
public class ExecMetrics {
+ public static final String NUM_RUNNING_FLOWS_NAME = "EXEC-NumRunningFlows";
+ public static final String NUM_QUEUED_FLOWS_NAME = "EXEC-NumQueuedFlows";
+ public static final String PROJECT_DIR_CACHE_HIT_RATIO_NAME = "EXEC-ProjectDirCacheHitRatio";
+ public static final String FLOW_SETUP_TIMER_NAME = "EXEC-flow-setup-timer";
private final MetricsManager metricsManager;
+ private Timer flowSetupTimer;
private final ProjectCacheHitRatio projectCacheHitRatio;
@Inject
@@ -37,6 +43,7 @@ public class ExecMetrics {
this.projectCacheHitRatio = new ProjectCacheHitRatio();
metricsManager.addGauge("EXEC-ProjectDirCacheHitRatio",
this.projectCacheHitRatio::getRatio);
+ this.flowSetupTimer = this.metricsManager.addTimer(FLOW_SETUP_TIMER_NAME);
}
ProjectCacheHitRatio getProjectCacheHitRatio() {
@@ -45,8 +52,13 @@ public class ExecMetrics {
public void addFlowRunnerManagerMetrics(final FlowRunnerManager flowRunnerManager) {
this.metricsManager
- .addGauge("EXEC-NumRunningFlows", flowRunnerManager::getNumRunningFlows);
+ .addGauge(NUM_RUNNING_FLOWS_NAME, flowRunnerManager::getNumRunningFlows);
this.metricsManager
- .addGauge("EXEC-NumQueuedFlows", flowRunnerManager::getNumQueuedFlows);
+ .addGauge(NUM_QUEUED_FLOWS_NAME, flowRunnerManager::getNumQueuedFlows);
}
+
+ /**
+ * @return the {@link Timer.Context} for the timer.
+ */
+ public Timer.Context getFlowSetupTimerContext() { return this.flowSetupTimer.time(); }
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 78bebac..df9b74c 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -387,7 +387,7 @@ public class FlowRunnerManager implements EventListener,
this.commonMetrics.addQueueWait(System.currentTimeMillis() -
flow.getExecutableFlow().getSubmitTime());
- final Timer.Context flowPrepTimerContext = this.commonMetrics.getFlowSetupTimerContext();
+ final Timer.Context flowPrepTimerContext = execMetrics.getFlowSetupTimerContext();
try {
if (this.active || isExecutorSpecified(flow)) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java
new file mode 100644
index 0000000..6790ae3
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2019 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import static org.assertj.core.api.Assertions.*;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsTestUtility;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.junit.Before;
+import org.junit.Test;
+
+/** Tests for executor metrics */
+public class ExecMetricsTest {
+
+ private MetricsTestUtility testUtil;
+ private ExecMetrics metrics;
+
+ @Before
+ public void setUp() {
+ final MetricRegistry metricRegistry = new MetricRegistry();
+ this.testUtil = new MetricsTestUtility(metricRegistry);
+ this.metrics = new ExecMetrics(new MetricsManager(metricRegistry));
+ }
+
+ @Test
+ public void testFlowSetupMetrics() throws InterruptedException {
+ assertThat(this.testUtil.getTimerCount(ExecMetrics.FLOW_SETUP_TIMER_NAME)).isEqualTo(0);
+ Timer.Context context = this.metrics.getFlowSetupTimerContext();
+ try {
+ Thread.sleep(10);
+ }
+ finally {
+ context.stop();
+ }
+ assertThat(this.testUtil.getTimerCount(ExecMetrics.FLOW_SETUP_TIMER_NAME)).isEqualTo(1);
+ Snapshot snapshot = this.testUtil.getTimerSnapshot(ExecMetrics.FLOW_SETUP_TIMER_NAME);
+ double val = snapshot.getMax();
+ assertThat(snapshot.getMax()).isGreaterThanOrEqualTo(10);
+ }
+
+}