Details
diff --git a/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java b/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
index 626a9c6..4b2b907 100644
--- a/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
+++ b/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
@@ -68,6 +68,7 @@ public abstract class AbstractJdbcLoader {
protected Connection getDBConnection(boolean autoCommit) throws IOException {
Connection connection = null;
CommonMetrics.INSTANCE.markDBConnection();
+ long startMs = System.currentTimeMillis();
try {
connection = dataSource.getConnection();
connection.setAutoCommit(autoCommit);
@@ -75,7 +76,7 @@ public abstract class AbstractJdbcLoader {
DbUtils.closeQuietly(connection);
throw new IOException("Error getting DB connection.", e);
}
-
+ CommonMetrics.INSTANCE.setDBConnectionTime(System.currentTimeMillis() - startMs);
return connection;
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 80b58e7..930f9c0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -16,6 +16,7 @@
package azkaban.executor;
+import azkaban.metrics.CommonMetrics;
import azkaban.constants.ServerProperties;
import azkaban.utils.FlowUtils;
import java.io.File;
@@ -1537,6 +1538,10 @@ public class ExecutorManager extends EventHandler implements
flow.applyUpdateObject(updateData);
Status newStatus = flow.getStatus();
+ if(oldStatus != newStatus && newStatus == Status.FAILED) {
+ CommonMetrics.INSTANCE.markFlowFail();
+ }
+
ExecutionOptions options = flow.getExecutionOptions();
if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
// We want to see if we should give an email status on first failure.
@@ -1572,12 +1577,12 @@ public class ExecutorManager extends EventHandler implements
public boolean isFinished(ExecutableFlow flow) {
switch (flow.getStatus()) {
- case SUCCEEDED:
- case FAILED:
- case KILLED:
- return true;
- default:
- return false;
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 6959c89..80371a8 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -19,6 +19,8 @@ package azkaban.metrics;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* This singleton class CommonMetrics is in charge of collecting varieties of metrics
* which are accessed in both web and exec modules. That said, these metrics will be
@@ -28,6 +30,9 @@ public enum CommonMetrics {
INSTANCE;
private Meter dbConnectionMeter;
+ private Meter flowFailMeter;
+ private AtomicLong dbConnectionTime = new AtomicLong(0L);
+
private MetricRegistry registry;
CommonMetrics() {
@@ -37,6 +42,8 @@ public enum CommonMetrics {
private void setupAllMetrics() {
dbConnectionMeter = MetricsUtility.addMeter("DB-Connection-meter", registry);
+ flowFailMeter = MetricsUtility.addMeter("flow-fail-meter", registry);
+ MetricsUtility.addGauge("dbConnectionTime", registry, dbConnectionTime::get);
}
/**
@@ -52,4 +59,16 @@ public enum CommonMetrics {
*/
dbConnectionMeter.mark();
}
+
+ /**
+ * Mark flowFailMeter when a flow is considered as FAILED.
+ * This method could be called by Web Server or Executor, as they both detect flow failure.
+ */
+ public void markFlowFail() {
+ flowFailMeter.mark();
+ }
+
+ public void setDBConnectionTime(long milliseconds) {
+ dbConnectionTime.set(milliseconds);
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java
index 12398e3..9fe15fa 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java
@@ -3,8 +3,9 @@ package azkaban.metrics;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
/**
* Creating an utility class to facilitate metrics class like {@link azkaban.metrics.CommonMetrics}
@@ -22,15 +23,20 @@ public final class MetricsUtility {
*/
public static Meter addMeter(String name, MetricRegistry registry) {
Meter curr = registry.meter(name);
- registry.register(name + "-gauge", (Gauge<Double>) curr::getOneMinuteRate);
+ registry.register(name + "-gauge", (Gauge<Double>) curr::getFifteenMinuteRate);
return curr;
}
/**
* A {@link Gauge} is an instantaneous reading of a particular value.
- * This method adds an AtomicLong number/metric to registry.
+ * This method leverages Supplier, a Functional Interface, to get Generics metrics values.
+ * With this support, no matter what our interesting metrics is a Double or a Long, we could pass it
+ * to Metrics Parser.
+ *
+ * E.g., in {@link CommonMetrics#setupAllMetrics()}, we construct a supplier lambda by having
+ * a AtomicLong object and its get method, in order to collect dbConnection metric.
*/
- public static void addLongGauge(String name, AtomicLong value, MetricRegistry registry) {
- registry.register(name, (Gauge<Long>) value::get);
+ public static <T> void addGauge(String name, MetricRegistry registry, Supplier<T> gaugeFunc) {
+ registry.register(name, (Gauge<T>) gaugeFunc::get);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 2e2a0d7..3ff410c 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -350,6 +350,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
public void uploadProjectFile(Project project, int version, String filetype,
String filename, File localFile, String uploader)
throws ProjectManagerException {
+ long startMs = System.currentTimeMillis();
logger.info("Uploading to " + project.getName() + " version:" + version
+ " file:" + filename);
Connection connection = getConnection();
@@ -358,7 +359,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
uploadProjectFile(connection, project, version, filetype, filename,
localFile, uploader);
connection.commit();
- logger.info("Commiting upload " + localFile.getName());
+ logger.info("project " + project.getName() + " commiting upload " + localFile.getName()
+ + " took " + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
} catch (SQLException e) {
logger.error(e);
throw new ProjectManagerException("Error getting DB connection.", e);
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index 5d6a868..ba5c825 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -47,4 +47,9 @@ public class CommonMetricsTest {
public void testMarkDBConnectionMetrics() {
MetricsTestUtility.testMeter("DB-Connection-meter", dr, CommonMetrics.INSTANCE::markDBConnection);
}
+
+ @Test
+ public void testDBConnectionTimeMetrics() {
+ MetricsTestUtility.testGauge("dbConnectionTime", dr, CommonMetrics.INSTANCE::setDBConnectionTime);
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
index 706b52e..6b25d8a 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
@@ -95,10 +95,12 @@ public class MetricsTestUtility {
public static void testGauge(String GaugeName, DummyReporter dr, Consumer<Long> func) {
func.accept(1L);
- sleepMillis(20);
+
+ // needs time to let metrics reporter receive the updated value.
+ sleepMillis(50);
Assert.assertEquals(dr.getGauge(GaugeName), "1");
func.accept(99L);
- sleepMillis(20);
+ sleepMillis(50);
Assert.assertEquals(dr.getGauge(GaugeName), "99");
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 023abe2..b8418f5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -45,8 +45,6 @@ import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.codahale.metrics.MetricRegistry;
-
import azkaban.constants.ServerInternals;
import azkaban.constants.ServerProperties;
@@ -185,13 +183,9 @@ public class AzkabanExecutorServer {
}
private void startExecMetrics() throws Exception {
- MetricRegistry metrics = MetricsManager.INSTANCE.getRegistry();
+ ExecMetrics.INSTANCE.addFlowRunnerManagerMetrics(getFlowRunnerManager());
logger.info("starting reporting Executor Metrics");
- MetricsExecRegister execWorker =
- new MetricsExecRegister.MetricsExecRegisterBuilder("EXEC").addFlowRunnerManager(getFlowRunnerManager()).build();
- execWorker.addExecutorManagerMetrics(metrics);
-
MetricsManager.INSTANCE.startReporting("AZ-EXEC", props);
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
new file mode 100644
index 0000000..8b84045
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsUtility;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * This class ExecMetrics is in charge of collecting metrics from executors.
+ */
+public enum ExecMetrics {
+ INSTANCE;
+
+ private MetricRegistry registry;
+
+ ExecMetrics() {
+ registry = MetricsManager.INSTANCE.getRegistry();
+ setupStaticMetrics();
+ }
+
+ public void setupStaticMetrics() {
+
+ }
+
+ public void addFlowRunnerManagerMetrics(FlowRunnerManager flowRunnerManager) {
+ MetricsUtility.addGauge("EXEC-NumRunningFlows", registry, flowRunnerManager::getNumRunningFlows);
+ MetricsUtility.addGauge("EXEC-NumQueuedFlows", registry, flowRunnerManager::getNumQueuedFlows);
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java
new file mode 100644
index 0000000..53cad27
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsTestUtility;
+import azkaban.metrics.MetricsTestUtility.DummyReporter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+
+public class ExecMetricsTest {
+
+ private DummyReporter dr;
+
+ @Before
+ public void setup() {
+ dr = new DummyReporter(MetricsManager.INSTANCE.getRegistry());
+ dr.start(Duration.ofMillis(2).toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @After
+ public void shutdown() {
+ if (null != dr)
+ dr.stop();
+
+ dr = null;
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 2053f0b..457775f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
+import java.util.function.Supplier;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
@@ -65,6 +66,7 @@ import azkaban.executor.JdbcExecutorLoader;
import azkaban.jmx.JmxExecutorManager;
import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxTriggerManager;
+import azkaban.metrics.MetricsUtility;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectManager;
import azkaban.scheduler.ScheduleLoader;
@@ -155,6 +157,10 @@ public class AzkabanWebServer extends AzkabanServer {
private final VelocityEngine velocityEngine;
private final Server server;
+
+ //queuedThreadPool is mainly used to monitor jetty threadpool.
+ private QueuedThreadPool queuedThreadPool;
+
private UserManager userManager;
private ProjectManager projectManager;
// private ExecutorManagerAdapter executorManager;
@@ -227,22 +233,28 @@ public class AzkabanWebServer extends AzkabanServer {
}
configureMBeanServer();
- if (props.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
- startWebMetrics();
- }
}
- private void startWebMetrics() throws Exception{
- MetricRegistry metrics = MetricsManager.INSTANCE.getRegistry();
- MetricsWebRegister execWorker = new MetricsWebRegister.MetricsWebRegisterBuilder("WEB")
- .addExecutorManager(getExecutorManager())
- .build();
- execWorker.addExecutorManagerMetrics(metrics);
+ private void startWebMetrics() throws Exception {
- MetricsManager.INSTANCE.startReporting("AZ-WEB", props);
- }
+ MetricRegistry registry = MetricsManager.INSTANCE.getRegistry();
+ MetricsUtility.addGauge("JETTY-NumIdleThreads", registry, queuedThreadPool::getIdleThreads);
+ MetricsUtility.addGauge("JETTY-NumTotalThreads", registry, queuedThreadPool::getThreads);
+ MetricsUtility.addGauge("JETTY-NumQueueSize", registry, queuedThreadPool::getQueueSize);
+ MetricsUtility.addGauge("WEB-NumQueuedFlows", registry, executorManager::getQueuedFlowSize);
+ /**
+ * TODO: Currently {@link ExecutorManager#getRunningFlows()} includes both running and non-dispatched flows.
+ * Originally we would like to do a subtraction between getRunningFlows and {@link ExecutorManager#getQueuedFlowSize()},
+ * in order to have the correct runnable flows.
+ * However, both getRunningFlows and getQueuedFlowSize are not synchronized, such that we can not make
+ * a thread safe subtraction. We need to fix this in the future.
+ */
+ MetricsUtility.addGauge("WEB-NumRunningFlows", registry, () -> executorManager.getRunningFlows().size());
+ logger.info("starting reporting Web Server Metrics");
+ MetricsManager.INSTANCE.startReporting("AZ-WEB", props);
+ }
private void setTriggerPlugins(Map<String, TriggerPlugin> triggerPlugins) {
this.triggerPlugins = triggerPlugins;
@@ -777,6 +789,7 @@ public class AzkabanWebServer extends AzkabanServer {
}
QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+ app.setThreadPool(httpThreadPool);
server.setThreadPool(httpThreadPool);
String staticDir =
@@ -828,6 +841,11 @@ public class AzkabanWebServer extends AzkabanServer {
app.getTriggerManager().start();
root.setAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, app);
+
+
+ if (azkabanSettings.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
+ app.startWebMetrics();
+ }
try {
server.start();
} catch (Exception e) {
@@ -1335,4 +1353,8 @@ public class AzkabanWebServer extends AzkabanServer {
return null;
}
}
+
+ private void setThreadPool(QueuedThreadPool queuedThreadPool) {
+ this.queuedThreadPool = queuedThreadPool;
+ }
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
index b0aeabe..8bc1c15 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
@@ -16,6 +16,7 @@
package azkaban.webapp;
+import azkaban.executor.ExecutorManager;
import azkaban.metrics.MetricsManager;
import azkaban.metrics.MetricsUtility;
@@ -23,6 +24,7 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
/**
@@ -42,13 +44,13 @@ public enum WebMetrics {
WebMetrics() {
registry = MetricsManager.INSTANCE.getRegistry();
- setupAllMetrics();
+ setupStaticMetrics();
}
- private void setupAllMetrics() {
+ private void setupStaticMetrics() {
webGetCall = MetricsUtility.addMeter("Web-Get-Call-Meter", registry);
webPostCall = MetricsUtility.addMeter("Web-Post-Call-Meter", registry);
- MetricsUtility.addLongGauge("fetchLogLatency", logFetchLatency, registry);
+ MetricsUtility.addGauge("fetchLogLatency", registry, logFetchLatency::get);
}
public void markWebGetCall() {