azkaban-aplcache

Metrics Code Refactor and adding key metrics (#930) * Metrics

3/7/2017 2:46:16 PM
3.16.0

Details

diff --git a/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java b/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
index 626a9c6..4b2b907 100644
--- a/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
+++ b/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
@@ -68,6 +68,7 @@ public abstract class AbstractJdbcLoader {
   protected Connection getDBConnection(boolean autoCommit) throws IOException {
     Connection connection = null;
     CommonMetrics.INSTANCE.markDBConnection();
+    long startMs = System.currentTimeMillis();
     try {
       connection = dataSource.getConnection();
       connection.setAutoCommit(autoCommit);
@@ -75,7 +76,7 @@ public abstract class AbstractJdbcLoader {
       DbUtils.closeQuietly(connection);
       throw new IOException("Error getting DB connection.", e);
     }
-
+    CommonMetrics.INSTANCE.setDBConnectionTime(System.currentTimeMillis() - startMs);
     return connection;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 80b58e7..930f9c0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -16,6 +16,7 @@
 
 package azkaban.executor;
 
+import azkaban.metrics.CommonMetrics;
 import azkaban.constants.ServerProperties;
 import azkaban.utils.FlowUtils;
 import java.io.File;
@@ -1537,6 +1538,10 @@ public class ExecutorManager extends EventHandler implements
     flow.applyUpdateObject(updateData);
     Status newStatus = flow.getStatus();
 
+    if(oldStatus != newStatus && newStatus == Status.FAILED) {
+      CommonMetrics.INSTANCE.markFlowFail();
+    }
+
     ExecutionOptions options = flow.getExecutionOptions();
     if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
       // We want to see if we should give an email status on first failure.
@@ -1572,12 +1577,12 @@ public class ExecutorManager extends EventHandler implements
 
   public boolean isFinished(ExecutableFlow flow) {
     switch (flow.getStatus()) {
-    case SUCCEEDED:
-    case FAILED:
-    case KILLED:
-      return true;
-    default:
-      return false;
+      case SUCCEEDED:
+      case FAILED:
+      case KILLED:
+        return true;
+      default:
+        return false;
     }
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index 6959c89..80371a8 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -19,6 +19,8 @@ package azkaban.metrics;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * This singleton class CommonMetrics is in charge of collecting varieties of metrics
  * which are accessed in both web and exec modules. That said, these metrics will be
@@ -28,6 +30,9 @@ public enum CommonMetrics {
   INSTANCE;
 
   private Meter dbConnectionMeter;
+  private Meter flowFailMeter;
+  private AtomicLong dbConnectionTime = new AtomicLong(0L);
+
   private MetricRegistry registry;
 
   CommonMetrics() {
@@ -37,6 +42,8 @@ public enum CommonMetrics {
 
   private void setupAllMetrics() {
     dbConnectionMeter = MetricsUtility.addMeter("DB-Connection-meter", registry);
+    flowFailMeter = MetricsUtility.addMeter("flow-fail-meter", registry);
+    MetricsUtility.addGauge("dbConnectionTime", registry, dbConnectionTime::get);
   }
 
   /**
@@ -52,4 +59,16 @@ public enum CommonMetrics {
      */
     dbConnectionMeter.mark();
   }
+
+  /**
+   * Mark flowFailMeter when a flow is considered as FAILED.
+   * This method could be called by Web Server or Executor, as they both detect flow failure.
+   */
+  public void markFlowFail() {
+    flowFailMeter.mark();
+  }
+
+  public void setDBConnectionTime(long milliseconds) {
+    dbConnectionTime.set(milliseconds);
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java
index 12398e3..9fe15fa 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java
@@ -3,8 +3,9 @@ package azkaban.metrics;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 /**
  * Creating an utility class to facilitate metrics class like {@link azkaban.metrics.CommonMetrics}
@@ -22,15 +23,20 @@ public final class MetricsUtility {
    */
   public static Meter addMeter(String name, MetricRegistry registry) {
     Meter curr = registry.meter(name);
-    registry.register(name + "-gauge", (Gauge<Double>) curr::getOneMinuteRate);
+    registry.register(name + "-gauge", (Gauge<Double>) curr::getFifteenMinuteRate);
     return curr;
   }
 
   /**
    * A {@link Gauge} is an instantaneous reading of a particular value.
-   * This method adds an AtomicLong number/metric to registry.
+   * This method leverages Supplier, a Functional Interface, to get Generics metrics values.
+   * With this support, no matter what our interesting metrics is a Double or a Long, we could pass it
+   * to Metrics Parser.
+   *
+   * E.g., in {@link CommonMetrics#setupAllMetrics()}, we construct a supplier lambda by having
+   * a AtomicLong object and its get method, in order to collect dbConnection metric.
    */
-  public static void addLongGauge(String name, AtomicLong value, MetricRegistry registry) {
-    registry.register(name, (Gauge<Long>) value::get);
+  public static <T> void addGauge(String name, MetricRegistry registry, Supplier<T> gaugeFunc) {
+    registry.register(name, (Gauge<T>) gaugeFunc::get);
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 2e2a0d7..3ff410c 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -350,6 +350,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
   public void uploadProjectFile(Project project, int version, String filetype,
       String filename, File localFile, String uploader)
       throws ProjectManagerException {
+    long startMs = System.currentTimeMillis();
     logger.info("Uploading to " + project.getName() + " version:" + version
         + " file:" + filename);
     Connection connection = getConnection();
@@ -358,7 +359,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       uploadProjectFile(connection, project, version, filetype, filename,
           localFile, uploader);
       connection.commit();
-      logger.info("Commiting upload " + localFile.getName());
+      logger.info("project " + project.getName() + " commiting upload " + localFile.getName()
+              + " took " + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
     } catch (SQLException e) {
       logger.error(e);
       throw new ProjectManagerException("Error getting DB connection.", e);
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index 5d6a868..ba5c825 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -47,4 +47,9 @@ public class CommonMetricsTest {
   public void testMarkDBConnectionMetrics() {
     MetricsTestUtility.testMeter("DB-Connection-meter", dr, CommonMetrics.INSTANCE::markDBConnection);
   }
+
+  @Test
+  public void testDBConnectionTimeMetrics() {
+    MetricsTestUtility.testGauge("dbConnectionTime", dr, CommonMetrics.INSTANCE::setDBConnectionTime);
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
index 706b52e..6b25d8a 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
@@ -95,10 +95,12 @@ public class MetricsTestUtility {
 
   public static void testGauge(String GaugeName, DummyReporter dr, Consumer<Long> func) {
     func.accept(1L);
-    sleepMillis(20);
+
+    // needs time to let metrics reporter receive the updated value.
+    sleepMillis(50);
     Assert.assertEquals(dr.getGauge(GaugeName), "1");
     func.accept(99L);
-    sleepMillis(20);
+    sleepMillis(50);
     Assert.assertEquals(dr.getGauge(GaugeName), "99");
   }
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 023abe2..b8418f5 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -45,8 +45,6 @@ import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.codahale.metrics.MetricRegistry;
-
 import azkaban.constants.ServerInternals;
 import azkaban.constants.ServerProperties;
 
@@ -185,13 +183,9 @@ public class AzkabanExecutorServer {
   }
 
   private void startExecMetrics() throws Exception {
-    MetricRegistry metrics = MetricsManager.INSTANCE.getRegistry();
+    ExecMetrics.INSTANCE.addFlowRunnerManagerMetrics(getFlowRunnerManager());
 
     logger.info("starting reporting Executor Metrics");
-    MetricsExecRegister execWorker =
-        new MetricsExecRegister.MetricsExecRegisterBuilder("EXEC").addFlowRunnerManager(getFlowRunnerManager()).build();
-    execWorker.addExecutorManagerMetrics(metrics);
-
     MetricsManager.INSTANCE.startReporting("AZ-EXEC", props);
   }
 
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
new file mode 100644
index 0000000..8b84045
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/ExecMetrics.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsUtility;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * This class ExecMetrics is in charge of collecting metrics from executors.
+ */
+public enum ExecMetrics {
+  INSTANCE;
+
+  private MetricRegistry registry;
+
+  ExecMetrics() {
+    registry = MetricsManager.INSTANCE.getRegistry();
+    setupStaticMetrics();
+  }
+
+  public void setupStaticMetrics() {
+
+  }
+
+  public void addFlowRunnerManagerMetrics(FlowRunnerManager flowRunnerManager) {
+    MetricsUtility.addGauge("EXEC-NumRunningFlows", registry, flowRunnerManager::getNumRunningFlows);
+    MetricsUtility.addGauge("EXEC-NumQueuedFlows", registry, flowRunnerManager::getNumQueuedFlows);
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java
new file mode 100644
index 0000000..53cad27
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/ExecMetricsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsTestUtility;
+import azkaban.metrics.MetricsTestUtility.DummyReporter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+
+public class ExecMetricsTest {
+
+  private DummyReporter dr;
+
+  @Before
+  public void setup() {
+    dr = new DummyReporter(MetricsManager.INSTANCE.getRegistry());
+    dr.start(Duration.ofMillis(2).toMillis(), TimeUnit.MILLISECONDS);
+  }
+
+  @After
+  public void shutdown() {
+    if (null != dr)
+      dr.stop();
+
+    dr = null;
+  }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 2053f0b..457775f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
+import java.util.function.Supplier;
 
 import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
@@ -65,6 +66,7 @@ import azkaban.executor.JdbcExecutorLoader;
 import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxTriggerManager;
+import azkaban.metrics.MetricsUtility;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectManager;
 import azkaban.scheduler.ScheduleLoader;
@@ -155,6 +157,10 @@ public class AzkabanWebServer extends AzkabanServer {
   private final VelocityEngine velocityEngine;
 
   private final Server server;
+
+  //queuedThreadPool is mainly used to monitor jetty threadpool.
+  private QueuedThreadPool queuedThreadPool;
+
   private UserManager userManager;
   private ProjectManager projectManager;
   // private ExecutorManagerAdapter executorManager;
@@ -227,22 +233,28 @@ public class AzkabanWebServer extends AzkabanServer {
     }
 
     configureMBeanServer();
-    if (props.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
-      startWebMetrics();
-    }
   }
 
-  private void startWebMetrics() throws Exception{
-    MetricRegistry metrics = MetricsManager.INSTANCE.getRegistry();
-    MetricsWebRegister execWorker = new MetricsWebRegister.MetricsWebRegisterBuilder("WEB")
-        .addExecutorManager(getExecutorManager())
-        .build();
-    execWorker.addExecutorManagerMetrics(metrics);
+  private void startWebMetrics() throws Exception {
 
-    MetricsManager.INSTANCE.startReporting("AZ-WEB", props);
-  }
+    MetricRegistry registry = MetricsManager.INSTANCE.getRegistry();
+    MetricsUtility.addGauge("JETTY-NumIdleThreads", registry, queuedThreadPool::getIdleThreads);
+    MetricsUtility.addGauge("JETTY-NumTotalThreads", registry, queuedThreadPool::getThreads);
+    MetricsUtility.addGauge("JETTY-NumQueueSize", registry, queuedThreadPool::getQueueSize);
 
+    MetricsUtility.addGauge("WEB-NumQueuedFlows", registry, executorManager::getQueuedFlowSize);
+    /**
+     * TODO: Currently {@link ExecutorManager#getRunningFlows()} includes both running and non-dispatched flows.
+     * Originally we would like to do a subtraction between getRunningFlows and {@link ExecutorManager#getQueuedFlowSize()},
+     * in order to have the correct runnable flows.
+     * However, both getRunningFlows and getQueuedFlowSize are not synchronized, such that we can not make
+     * a thread safe subtraction. We need to fix this in the future.
+     */
+    MetricsUtility.addGauge("WEB-NumRunningFlows", registry, () -> executorManager.getRunningFlows().size());
 
+    logger.info("starting reporting Web Server Metrics");
+    MetricsManager.INSTANCE.startReporting("AZ-WEB", props);
+  }
 
   private void setTriggerPlugins(Map<String, TriggerPlugin> triggerPlugins) {
     this.triggerPlugins = triggerPlugins;
@@ -777,6 +789,7 @@ public class AzkabanWebServer extends AzkabanServer {
     }
 
     QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+    app.setThreadPool(httpThreadPool);
     server.setThreadPool(httpThreadPool);
 
     String staticDir =
@@ -828,6 +841,11 @@ public class AzkabanWebServer extends AzkabanServer {
     app.getTriggerManager().start();
 
     root.setAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, app);
+
+
+    if (azkabanSettings.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
+      app.startWebMetrics();
+    }
     try {
       server.start();
     } catch (Exception e) {
@@ -1335,4 +1353,8 @@ public class AzkabanWebServer extends AzkabanServer {
       return null;
     }
   }
+
+  private void setThreadPool(QueuedThreadPool queuedThreadPool) {
+    this.queuedThreadPool = queuedThreadPool;
+  }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
index b0aeabe..8bc1c15 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
@@ -16,6 +16,7 @@
 
 package azkaban.webapp;
 
+import azkaban.executor.ExecutorManager;
 import azkaban.metrics.MetricsManager;
 import azkaban.metrics.MetricsUtility;
 
@@ -23,6 +24,7 @@ import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 
 /**
@@ -42,13 +44,13 @@ public enum WebMetrics {
 
   WebMetrics() {
     registry = MetricsManager.INSTANCE.getRegistry();
-    setupAllMetrics();
+    setupStaticMetrics();
   }
 
-  private void setupAllMetrics() {
+  private void setupStaticMetrics() {
     webGetCall = MetricsUtility.addMeter("Web-Get-Call-Meter", registry);
     webPostCall = MetricsUtility.addMeter("Web-Post-Call-Meter", registry);
-    MetricsUtility.addLongGauge("fetchLogLatency", logFetchLatency, registry);
+    MetricsUtility.addGauge("fetchLogLatency", registry, logFetchLatency::get);
   }
 
   public void markWebGetCall() {