azkaban-developers

bring dropwizard Metrics to AZ to get a better monitoring feature

12/2/2016 8:48:44 PM

Details

diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index 687e392..3359923 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -38,6 +38,8 @@ dependencies {
   compile('org.mortbay.jetty:jetty:6.1.26')
   compile('org.mortbay.jetty:jetty-util:6.1.26')
   compile('org.quartz-scheduler:quartz:2.2.1')
+  compile('io.dropwizard.metrics:metrics-core:3.1.0')
+  compile('io.dropwizard.metrics:metrics-jvm:3.1.0')
 
   testCompile(project(':azkaban-test').sourceSets.test.output)
   testCompile('org.hamcrest:hamcrest-all:1.3')
diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java b/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
index 3a06f47..0d7eb28 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
@@ -26,4 +26,5 @@ public class ServerInternals {
   public static final String AZKABAN_EXECUTOR_PORT_FILENAME = "executor.port";
 
   public static final String AZKABAN_SERVLET_CONTEXT_KEY = "azkaban_app";
+  
 }
diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
index 7aa56b2..895aeda 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerProperties.java
@@ -22,4 +22,19 @@ public class ServerProperties {
   // Configures the Kafka appender for logging user jobs, specified for the exec server
   public static final String AZKABAN_SERVER_LOGGING_KAFKA_BROKERLIST = "azkaban.server.logging.kafka.brokerList";
   public static final String AZKABAN_SERVER_LOGGING_KAFKA_TOPIC = "azkaban.server.logging.kafka.topic";
+
+  /**
+   * Represent the class name of azkaban metrics reporter.
+   */
+  public static final String CUSTOM_METRICS_REPORTER_CLASS_NAME =
+      "azkaban.metrics.reporter.name";
+
+  /**
+   * Represent the metrics server URL.
+   */
+  public static final String METRICS_SERVER_URL =
+      "azkaban.metrics.server.url";
+
+  public static final String IS_METRICS_ENABLED =
+      "azkaban.is.metrics.enabled";
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 4557464..e3517ce 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -601,6 +601,11 @@ public class ExecutorManager extends EventHandler implements
     return allIds.toString();
   }
 
+
+  public long getQueuedFlowSize() {
+    return queuedFlows.size();
+  }
+
   /* Helper method to flow ids of all running flows */
   private void getRunningFlowsIdsHelper(List<Integer> allIds,
     Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
new file mode 100644
index 0000000..447c613
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsManager.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metrics;
+
+import azkaban.utils.Props;
+import static azkaban.constants.ServerProperties.METRICS_SERVER_URL;
+import static azkaban.constants.ServerProperties.CUSTOM_METRICS_REPORTER_CLASS_NAME;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+
+import org.apache.log4j.Logger;
+
+import java.lang.reflect.Constructor;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The singleton class, MetricsManager, is the place to have MetricRegistry and
+ * ConsoleReporter in this class. Also, web servers and executors can call {@link #startReporting(String, Props)}
+ * to start reporting AZ metrics to remote metrics server.
+ */
+public enum MetricsManager {
+  INSTANCE;
+
+  private final MetricRegistry registry        = new MetricRegistry();
+  private ConsoleReporter consoleReporter      = null;
+  private static final Logger logger = Logger.getLogger(MetricsManager.class);
+
+  /**
+   * Constructor is eaagerly called when this class is loaded.
+   */
+  private MetricsManager() {
+    registry.register("MEMORY_Gauge", new MemoryUsageGaugeSet());
+    registry.register("GC_Gauge", new GarbageCollectorMetricSet());
+    registry.register("Thread_State_Gauge", new ThreadStatesGaugeSet());
+  }
+  /**
+   * Return the Metrics registry.
+   *
+   * @return the single {@code MetricRegistry} used for all of Az Metrics
+   *         monitoring
+   */
+  public MetricRegistry getRegistry() {
+    return registry;
+  }
+
+  /**
+   * reporting metrics to remote metrics collector.
+   * Note: this method must be synchronized, since both web server and executor
+   * will call it during initialization.
+   */
+  public synchronized void startReporting(String reporterName, Props props) {
+    String metricsReporterClassName = props.get(CUSTOM_METRICS_REPORTER_CLASS_NAME);
+    String metricsServerURL = props.get(METRICS_SERVER_URL);
+    if (metricsReporterClassName != null && metricsServerURL != null) {
+      try {
+        logger.info("metricsReporterClassName: " + metricsReporterClassName);
+        Class metricsClass = Class.forName(metricsReporterClassName);
+
+        Constructor[] constructors =
+            metricsClass.getConstructors();
+        constructors[0].newInstance(reporterName, registry, metricsServerURL);
+
+      } catch (Exception e) {
+        logger.error("Encountered error while loading and instantiating "
+            + metricsReporterClassName, e);
+        throw new IllegalStateException(
+            "Encountered error while loading and instantiating "
+                + metricsReporterClassName, e);
+      }
+    } else {
+      logger.error("No value for property: "
+          + CUSTOM_METRICS_REPORTER_CLASS_NAME
+          + "or" + METRICS_SERVER_URL + " was found");
+    }
+
+  }
+
+  /**
+   * Create a ConsoleReporter to the AZ Metrics registry.
+   * @param reportInterval
+   *            time to wait between dumping metrics to the console
+   */
+  public synchronized void addConsoleReporter(Duration reportInterval) {
+    if (null != consoleReporter) {
+      return;
+    }
+
+    consoleReporter = ConsoleReporter.forRegistry(getRegistry()).build();
+    consoleReporter.start(reportInterval.toMillis(), TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Stop ConsoldeReporter previously created by a call to
+   * {@link #addConsoleReporter(Duration)} and release it for GC.
+   */
+  public synchronized void removeConsoleReporter() {
+    if (null != consoleReporter)
+      consoleReporter.stop();
+
+    consoleReporter = null;
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 21be9a4..0a69dc4 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -44,7 +44,11 @@ import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.codahale.metrics.MetricRegistry;
+
 import azkaban.constants.ServerInternals;
+import azkaban.constants.ServerProperties;
+
 import azkaban.execapp.event.JobCallbackManager;
 import azkaban.execapp.jmx.JmxFlowRunnerManager;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
@@ -68,6 +72,7 @@ import azkaban.server.AzkabanServer;
 import azkaban.utils.Props;
 import azkaban.utils.SystemMemoryInfo;
 import azkaban.utils.Utils;
+import azkaban.metrics.MetricsManager;
 
 import static azkaban.constants.ServerInternals.AZKABAN_EXECUTOR_PORT_FILENAME;
 import static com.google.common.base.Preconditions.checkState;
@@ -133,7 +138,12 @@ public class AzkabanExecutorServer {
 
     insertExecutorEntryIntoDB();
     dumpPortToFile();
+
     logger.info("Started Executor Server on " + getExecutorHostPort());
+
+    if (props.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
+      startExecMetrics();
+    }
   }
 
   private Server createJettyServer(Props props) {
@@ -175,6 +185,17 @@ public class AzkabanExecutorServer {
     return server;
   }
 
+  private void startExecMetrics() throws Exception {
+    MetricRegistry metrics = MetricsManager.INSTANCE.getRegistry();
+
+    logger.info("starting reporting Executor Metrics");
+    MetricsExecRegister execWorker =
+        new MetricsExecRegister.MetricsExecRegisterBuilder("EXEC").addFlowRunnerManager(getFlowRunnerManager()).build();
+    execWorker.addExecutorManagerMetrics(metrics);
+
+    MetricsManager.INSTANCE.startReporting("AZ-EXEC", props);
+  }
+
   private void insertExecutorEntryIntoDB() {
     try {
       final String host = requireNonNull(getHost());
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/MetricsExecRegister.java b/azkaban-exec-server/src/main/java/azkaban/execapp/MetricsExecRegister.java
new file mode 100644
index 0000000..e2d0303
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/MetricsExecRegister.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Gauge;
+
+import azkaban.execapp.FlowRunnerManager;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class MetricsExecRegister is in charge of collecting metrics from executors.
+ */
+public class MetricsExecRegister {
+  private static final Logger logger = Logger.getLogger(MetricsExecRegister.class);
+
+  private String endpointName;
+  private FlowRunnerManager _flowRunnerManager;
+
+  public MetricsExecRegister(MetricsExecRegisterBuilder builder) {
+    this.endpointName = builder.endpointName;
+    this._flowRunnerManager = builder._flowRunnerManager;
+  }
+
+  public void addExecutorManagerMetrics(MetricRegistry metrics) throws Exception {
+    if (_flowRunnerManager == null)
+      throw new Exception("flowRunnerManager has not yet been initialized.");
+
+    logger.info("register executor metrics.");
+    metrics.register("EXEC-NumRunningFlows", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return _flowRunnerManager.getNumRunningFlows();
+      }
+    });
+
+
+    metrics.register("EXEC-NumQueuedFlows", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return _flowRunnerManager.getNumQueuedFlows();
+      }
+    });
+  }
+
+  public static class MetricsExecRegisterBuilder {
+    private FlowRunnerManager _flowRunnerManager;
+    private String endpointName;
+
+    public MetricsExecRegisterBuilder(String endpointName) {
+      this.endpointName = endpointName;
+    }
+
+    public MetricsExecRegisterBuilder addFlowRunnerManager(FlowRunnerManager flowRunnerManager) {
+      this._flowRunnerManager = flowRunnerManager;
+      return this;
+    }
+
+    public MetricsExecRegister build() {
+      return new MetricsExecRegister(this);
+    }
+  }
+
+}
diff --git a/azkaban-solo-server/build.gradle b/azkaban-solo-server/build.gradle
index ad8a27e..afa4b56 100644
--- a/azkaban-solo-server/build.gradle
+++ b/azkaban-solo-server/build.gradle
@@ -1,10 +1,10 @@
 apply plugin: 'distribution'
 
 dependencies {
-  compile(project(':azkaban-common'))
   compile(project(':azkaban-web-server'))
   compile(project(':azkaban-exec-server'))
 
+  runtime('org.slf4j:slf4j-log4j12:1.7.18')
   runtime('com.h2database:h2:1.4.193')
 }
 
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index e490a7d..c5d2ff9 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -16,6 +16,8 @@
 
 package azkaban.webapp;
 
+import com.codahale.metrics.MetricRegistry;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -56,6 +58,7 @@ import org.mortbay.thread.QueuedThreadPool;
 
 import azkaban.alert.Alerter;
 import azkaban.constants.ServerInternals;
+import azkaban.constants.ServerProperties;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
@@ -100,6 +103,7 @@ import azkaban.webapp.servlet.ProjectServlet;
 import azkaban.webapp.servlet.ScheduleServlet;
 import azkaban.webapp.servlet.StatsServlet;
 import azkaban.webapp.servlet.TriggerManagerServlet;
+import azkaban.metrics.MetricsManager;
 
 import com.linkedin.restli.server.RestliServlet;
 
@@ -222,8 +226,23 @@ public class AzkabanWebServer extends AzkabanServer {
     }
 
     configureMBeanServer();
+    if (props.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
+      startWebMetrics();
+    }
   }
 
+  private void startWebMetrics() throws Exception{
+    MetricRegistry metrics = MetricsManager.INSTANCE.getRegistry();
+    MetricsWebRegister execWorker = new MetricsWebRegister.MetricsWebRegisterBuilder("WEB")
+        .addExecutorManager(getExecutorManager())
+        .build();
+    execWorker.addExecutorManagerMetrics(metrics);
+
+    MetricsManager.INSTANCE.startReporting("AZ-WEB", props);
+  }
+
+
+
   private void setTriggerPlugins(Map<String, TriggerPlugin> triggerPlugins) {
     this.triggerPlugins = triggerPlugins;
   }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/MetricsWebRegister.java b/azkaban-web-server/src/main/java/azkaban/webapp/MetricsWebRegister.java
new file mode 100644
index 0000000..400c9d1
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/MetricsWebRegister.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Gauge;
+
+import azkaban.executor.ExecutorManager;
+
+/**
+ * This class MetricsWebRegister is in charge of collecting metrics from web server.
+ */
+public class MetricsWebRegister {
+  private ExecutorManager _executorManager;
+  private String endpointName;
+
+  public MetricsWebRegister(MetricsWebRegisterBuilder builder) {
+    this.endpointName = builder.endpointName;
+    this._executorManager = builder._executorManager;
+  }
+
+  public void addExecutorManagerMetrics(MetricRegistry metrics) throws Exception {
+    if (_executorManager == null)
+      throw new Exception("Can not find executorManager.");
+
+    metrics.register("WEB-NumRunningFlows", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return _executorManager.getRunningFlows().size();
+      }
+    });
+
+    metrics.register("WEB-NumQueuedFlows", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return _executorManager.getQueuedFlowSize();
+      }
+    });
+  }
+
+  public static class MetricsWebRegisterBuilder {
+    private ExecutorManager _executorManager;
+    private String endpointName;
+
+    public MetricsWebRegisterBuilder(String endpointName) {
+      this.endpointName = endpointName;
+    }
+
+    public MetricsWebRegisterBuilder addExecutorManager(ExecutorManager executorManager) {
+      this._executorManager = executorManager;
+      return this;
+    }
+
+    public MetricsWebRegister build() {
+      return new MetricsWebRegister(this);
+    }
+  }
+
+}