azkaban-developers
Details
azkaban-common/build.gradle 2(+2 -0)
diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index 687e392..3359923 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -38,6 +38,8 @@ dependencies {
compile('org.mortbay.jetty:jetty:6.1.26')
compile('org.mortbay.jetty:jetty-util:6.1.26')
compile('org.quartz-scheduler:quartz:2.2.1')
+ compile('io.dropwizard.metrics:metrics-core:3.1.0')
+ compile('io.dropwizard.metrics:metrics-jvm:3.1.0')
testCompile(project(':azkaban-test').sourceSets.test.output)
testCompile('org.hamcrest:hamcrest-all:1.3')
diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java b/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
index 3a06f47..0d7eb28 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
@@ -26,4 +26,5 @@ public class ServerInternals {
public static final String AZKABAN_EXECUTOR_PORT_FILENAME = "executor.port";
public static final String AZKABAN_SERVLET_CONTEXT_KEY = "azkaban_app";
+
}
diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
index 7aa56b2..895aeda 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
@@ -22,4 +22,19 @@ public class ServerProperties {
// Configures the Kafka appender for logging user jobs, specified for the exec server
public static final String AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST = "azkaban.server.logging.kafka.brokerList";
public static final String AZKABAN_SERVER_LOGGING_KAFKA_TOPIC = "azkaban.server.logging.kafka.topic";
+
+ /**
+ * Represent the class name of azkaban metrics reporter.
+ */
+ public static final String CUSTOM_METRICS_REPORTER_CLASS_NAME =
+ "azkaban.metrics.reporter.name";
+
+ /**
+ * Represent the metrics server URL.
+ */
+ public static final String METRICS_SERVER_URL =
+ "azkaban.metrics.server.url";
+
+ public static final String IS_METRICS_ENABLED =
+ "azkaban.is.metrics.enabled";
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 4557464..e3517ce 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -601,6 +601,11 @@ public class ExecutorManager extends EventHandler implements
return allIds.toString();
}
+
+ public long getQueuedFlowSize() {
+ return queuedFlows.size();
+ }
+
/* Helper method to flow ids of all running flows */
private void getRunningFlowsIdsHelper(List<Integer> allIds,
Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
new file mode 100644
index 0000000..447c613
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metrics;
+
+import azkaban.utils.Props;
+import static azkaban.constants.ServerProperties.METRICS_SERVER_URL;
+import static azkaban.constants.ServerProperties.CUSTOM_METRICS_REPORTER_CLASS_NAME;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+
+import org.apache.log4j.Logger;
+
+import java.lang.reflect.Constructor;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The singleton class, MetricsManager, is the place to have MetricRegistry and
+ * ConsoleReporter in this class. Also, web servers and executors can call {@link #startReporting(String, Props)}
+ * to start reporting AZ metrics to remote metrics server.
+ */
+public enum MetricsManager {
+ INSTANCE;
+
+ private final MetricRegistry registry = new MetricRegistry();
+ private ConsoleReporter consoleReporter = null;
+ private static final Logger logger = Logger.getLogger(MetricsManager.class);
+
+ /**
+ * Constructor is eaagerly called when this class is loaded.
+ */
+ private MetricsManager() {
+ registry.register("MEMORY_Gauge", new MemoryUsageGaugeSet());
+ registry.register("GC_Gauge", new GarbageCollectorMetricSet());
+ registry.register("Thread_State_Gauge", new ThreadStatesGaugeSet());
+ }
+ /**
+ * Return the Metrics registry.
+ *
+ * @return the single {@code MetricRegistry} used for all of Az Metrics
+ * monitoring
+ */
+ public MetricRegistry getRegistry() {
+ return registry;
+ }
+
+ /**
+ * reporting metrics to remote metrics collector.
+ * Note: this method must be synchronized, since both web server and executor
+ * will call it during initialization.
+ */
+ public synchronized void startReporting(String reporterName, Props props) {
+ String metricsReporterClassName = props.get(CUSTOM_METRICS_REPORTER_CLASS_NAME);
+ String metricsServerURL = props.get(METRICS_SERVER_URL);
+ if (metricsReporterClassName != null && metricsServerURL != null) {
+ try {
+ logger.info("metricsReporterClassName: " + metricsReporterClassName);
+ Class metricsClass = Class.forName(metricsReporterClassName);
+
+ Constructor[] constructors =
+ metricsClass.getConstructors();
+ constructors[0].newInstance(reporterName, registry, metricsServerURL);
+
+ } catch (Exception e) {
+ logger.error("Encountered error while loading and instantiating "
+ + metricsReporterClassName, e);
+ throw new IllegalStateException(
+ "Encountered error while loading and instantiating "
+ + metricsReporterClassName, e);
+ }
+ } else {
+ logger.error("No value for property: "
+ + CUSTOM_METRICS_REPORTER_CLASS_NAME
+ + "or" + METRICS_SERVER_URL + " was found");
+ }
+
+ }
+
+ /**
+ * Create a ConsoleReporter to the AZ Metrics registry.
+ * @param reportInterval
+ * time to wait between dumping metrics to the console
+ */
+ public synchronized void addConsoleReporter(Duration reportInterval) {
+ if (null != consoleReporter) {
+ return;
+ }
+
+ consoleReporter = ConsoleReporter.forRegistry(getRegistry()).build();
+ consoleReporter.start(reportInterval.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Stop ConsoldeReporter previously created by a call to
+ * {@link #addConsoleReporter(Duration)} and release it for GC.
+ */
+ public synchronized void removeConsoleReporter() {
+ if (null != consoleReporter)
+ consoleReporter.stop();
+
+ consoleReporter = null;
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 21be9a4..0a69dc4 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -44,7 +44,11 @@ import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.codahale.metrics.MetricRegistry;
+
import azkaban.constants.ServerInternals;
+import azkaban.constants.ServerProperties;
+
import azkaban.execapp.event.JobCallbackManager;
import azkaban.execapp.jmx.JmxFlowRunnerManager;
import azkaban.execapp.jmx.JmxJobMBeanManager;
@@ -68,6 +72,7 @@ import azkaban.server.AzkabanServer;
import azkaban.utils.Props;
import azkaban.utils.SystemMemoryInfo;
import azkaban.utils.Utils;
+import azkaban.metrics.MetricsManager;
import static azkaban.constants.ServerInternals.AZKABAN_EXECUTOR_PORT_FILENAME;
import static com.google.common.base.Preconditions.checkState;
@@ -133,7 +138,12 @@ public class AzkabanExecutorServer {
insertExecutorEntryIntoDB();
dumpPortToFile();
+
logger.info("Started Executor Server on " + getExecutorHostPort());
+
+ if (props.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
+ startExecMetrics();
+ }
}
private Server createJettyServer(Props props) {
@@ -175,6 +185,17 @@ public class AzkabanExecutorServer {
return server;
}
+ private void startExecMetrics() throws Exception {
+ MetricRegistry metrics = MetricsManager.INSTANCE.getRegistry();
+
+ logger.info("starting reporting Executor Metrics");
+ MetricsExecRegister execWorker =
+ new MetricsExecRegister.MetricsExecRegisterBuilder("EXEC").addFlowRunnerManager(getFlowRunnerManager()).build();
+ execWorker.addExecutorManagerMetrics(metrics);
+
+ MetricsManager.INSTANCE.startReporting("AZ-EXEC", props);
+ }
+
private void insertExecutorEntryIntoDB() {
try {
final String host = requireNonNull(getHost());
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/MetricsExecRegister.java b/azkaban-exec-server/src/main/java/azkaban/execapp/MetricsExecRegister.java
new file mode 100644
index 0000000..e2d0303
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/MetricsExecRegister.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Gauge;
+
+import azkaban.execapp.FlowRunnerManager;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class MetricsExecRegister is in charge of collecting metrics from executors.
+ */
+public class MetricsExecRegister {
+ private static final Logger logger = Logger.getLogger(MetricsExecRegister.class);
+
+ private String endpointName;
+ private FlowRunnerManager _flowRunnerManager;
+
+ public MetricsExecRegister(MetricsExecRegisterBuilder builder) {
+ this.endpointName = builder.endpointName;
+ this._flowRunnerManager = builder._flowRunnerManager;
+ }
+
+ public void addExecutorManagerMetrics(MetricRegistry metrics) throws Exception {
+ if (_flowRunnerManager == null)
+ throw new Exception("flowRunnerManager has not yet been initialized.");
+
+ logger.info("register executor metrics.");
+ metrics.register("EXEC-NumRunningFlows", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return _flowRunnerManager.getNumRunningFlows();
+ }
+ });
+
+
+ metrics.register("EXEC-NumQueuedFlows", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return _flowRunnerManager.getNumQueuedFlows();
+ }
+ });
+ }
+
+ public static class MetricsExecRegisterBuilder {
+ private FlowRunnerManager _flowRunnerManager;
+ private String endpointName;
+
+ public MetricsExecRegisterBuilder(String endpointName) {
+ this.endpointName = endpointName;
+ }
+
+ public MetricsExecRegisterBuilder addFlowRunnerManager(FlowRunnerManager flowRunnerManager) {
+ this._flowRunnerManager = flowRunnerManager;
+ return this;
+ }
+
+ public MetricsExecRegister build() {
+ return new MetricsExecRegister(this);
+ }
+ }
+
+}
azkaban-solo-server/build.gradle 2(+1 -1)
diff --git a/azkaban-solo-server/build.gradle b/azkaban-solo-server/build.gradle
index ad8a27e..afa4b56 100644
--- a/azkaban-solo-server/build.gradle
+++ b/azkaban-solo-server/build.gradle
@@ -1,10 +1,10 @@
apply plugin: 'distribution'
dependencies {
- compile(project(':azkaban-common'))
compile(project(':azkaban-web-server'))
compile(project(':azkaban-exec-server'))
+ runtime('org.slf4j:slf4j-log4j12:1.7.18')
runtime('com.h2database:h2:1.4.193')
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index e490a7d..c5d2ff9 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -16,6 +16,8 @@
package azkaban.webapp;
+import com.codahale.metrics.MetricRegistry;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -56,6 +58,7 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.alert.Alerter;
import azkaban.constants.ServerInternals;
+import azkaban.constants.ServerProperties;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.executor.ExecutorManager;
import azkaban.executor.JdbcExecutorLoader;
@@ -100,6 +103,7 @@ import azkaban.webapp.servlet.ProjectServlet;
import azkaban.webapp.servlet.ScheduleServlet;
import azkaban.webapp.servlet.StatsServlet;
import azkaban.webapp.servlet.TriggerManagerServlet;
+import azkaban.metrics.MetricsManager;
import com.linkedin.restli.server.RestliServlet;
@@ -222,8 +226,23 @@ public class AzkabanWebServer extends AzkabanServer {
}
configureMBeanServer();
+ if (props.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
+ startWebMetrics();
+ }
}
+ private void startWebMetrics() throws Exception{
+ MetricRegistry metrics = MetricsManager.INSTANCE.getRegistry();
+ MetricsWebRegister execWorker = new MetricsWebRegister.MetricsWebRegisterBuilder("WEB")
+ .addExecutorManager(getExecutorManager())
+ .build();
+ execWorker.addExecutorManagerMetrics(metrics);
+
+ MetricsManager.INSTANCE.startReporting("AZ-WEB", props);
+ }
+
+
+
private void setTriggerPlugins(Map<String, TriggerPlugin> triggerPlugins) {
this.triggerPlugins = triggerPlugins;
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/MetricsWebRegister.java b/azkaban-web-server/src/main/java/azkaban/webapp/MetricsWebRegister.java
new file mode 100644
index 0000000..400c9d1
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/MetricsWebRegister.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Gauge;
+
+import azkaban.executor.ExecutorManager;
+
+/**
+ * This class MetricsWebRegister is in charge of collecting metrics from web server.
+ */
+public class MetricsWebRegister {
+ private ExecutorManager _executorManager;
+ private String endpointName;
+
+ public MetricsWebRegister(MetricsWebRegisterBuilder builder) {
+ this.endpointName = builder.endpointName;
+ this._executorManager = builder._executorManager;
+ }
+
+ public void addExecutorManagerMetrics(MetricRegistry metrics) throws Exception {
+ if (_executorManager == null)
+ throw new Exception("Can not find executorManager.");
+
+ metrics.register("WEB-NumRunningFlows", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return _executorManager.getRunningFlows().size();
+ }
+ });
+
+ metrics.register("WEB-NumQueuedFlows", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return _executorManager.getQueuedFlowSize();
+ }
+ });
+ }
+
+ public static class MetricsWebRegisterBuilder {
+ private ExecutorManager _executorManager;
+ private String endpointName;
+
+ public MetricsWebRegisterBuilder(String endpointName) {
+ this.endpointName = endpointName;
+ }
+
+ public MetricsWebRegisterBuilder addExecutorManager(ExecutorManager executorManager) {
+ this._executorManager = executorManager;
+ return this;
+ }
+
+ public MetricsWebRegister build() {
+ return new MetricsWebRegister(this);
+ }
+ }
+
+}