azkaban-aplcache

constructing new app metrics structure to collect and send

2/27/2017 3:09:29 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java b/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
index b626fab..626a9c6 100644
--- a/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
+++ b/azkaban-common/src/main/java/azkaban/database/AbstractJdbcLoader.java
@@ -16,6 +16,9 @@
 
 package azkaban.database;
 
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.Props;
+
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -26,7 +29,6 @@ import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 
-import azkaban.utils.Props;
 
 public abstract class AbstractJdbcLoader {
   /**
@@ -65,6 +67,7 @@ public abstract class AbstractJdbcLoader {
 
   protected Connection getDBConnection(boolean autoCommit) throws IOException {
     Connection connection = null;
+    CommonMetrics.INSTANCE.markDBConnection();
     try {
       connection = dataSource.getConnection();
       connection.setAutoCommit(autoCommit);
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
new file mode 100644
index 0000000..6959c89
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metrics;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * This singleton class CommonMetrics is in charge of collecting varieties of metrics
+ * which are accessed in both web and exec modules. That said, these metrics will be
+ * exposed in both Web server and executor.
+ */
+public enum CommonMetrics {
+  INSTANCE;
+
+  private Meter dbConnectionMeter;
+  private MetricRegistry registry;
+
+  CommonMetrics() {
+    registry = MetricsManager.INSTANCE.getRegistry();
+    setupAllMetrics();
+  }
+
+  private void setupAllMetrics() {
+    dbConnectionMeter = MetricsUtility.addMeter("DB-Connection-meter", registry);
+  }
+
+  /**
+   * Mark the occurrence of an DB query event.
+   */
+  public void markDBConnection() {
+
+    /*
+     * This method should be Thread Safe.
+     * Two reasons that we don't make this function call synchronized:
+     * 1). drop wizard metrics deals with concurrency internally;
+     * 2). mark is basically a math addition operation, which should not cause race condition issue.
+     */
+    dbConnectionMeter.mark();
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java b/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java
new file mode 100644
index 0000000..12398e3
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/metrics/MetricsUtility.java
@@ -0,0 +1,36 @@
+package azkaban.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Creating an utility class to facilitate metrics class like {@link azkaban.metrics.CommonMetrics}
+ * to share common operations easily.
+ */
+public final class MetricsUtility {
+
+  private MetricsUtility() {
+    //Utility class's constructor should not be called
+  }
+
+  /**
+   * A {@link Meter} measures the rate of events over time (e.g., “requests per second”).
+   * Here we track 1-minute moving averages.
+   */
+  public static Meter addMeter(String name, MetricRegistry registry) {
+    Meter curr = registry.meter(name);
+    registry.register(name + "-gauge", (Gauge<Double>) curr::getOneMinuteRate);
+    return curr;
+  }
+
+  /**
+   * A {@link Gauge} is an instantaneous reading of a particular value.
+   * This method adds an AtomicLong number/metric to registry.
+   */
+  public static void addLongGauge(String name, AtomicLong value, MetricRegistry registry) {
+    registry.register(name, (Gauge<Long>) value::get);
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
new file mode 100644
index 0000000..5d6a868
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metrics;
+
+import azkaban.metrics.MetricsTestUtility.DummyReporter;
+
+import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CommonMetricsTest {
+
+  private DummyReporter dr;
+
+  @Before
+  public void setup() {
+    dr = new DummyReporter(MetricsManager.INSTANCE.getRegistry());
+    dr.start(Duration.ofMillis(2).toMillis(), TimeUnit.MILLISECONDS);
+  }
+
+  @After
+  public void shutdown() {
+    if (null != dr)
+      dr.stop();
+
+    dr = null;
+  }
+
+  @Test
+  public void testMarkDBConnectionMetrics() {
+    MetricsTestUtility.testMeter("DB-Connection-meter", dr, CommonMetrics.INSTANCE::markDBConnection);
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
new file mode 100644
index 0000000..706b52e
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/metrics/MetricsTestUtility.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * This class is designed for a utility class to test drop wizard metrics
+ */
+public class MetricsTestUtility {
+
+  /**
+   * One Dummy Reporter extending {@link ScheduledReporter} collects metrics in various maps,
+   * which test methods are able to access easily.
+   */
+  public static class DummyReporter extends ScheduledReporter{
+
+    private Map<String, String> gaugeMap;
+
+    private Map<String, Long> meterMap;
+
+    public DummyReporter(MetricRegistry registry) {
+      super(registry, "dummy-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+      this.gaugeMap = new HashMap<>();
+      this.meterMap = new HashMap<>();
+    }
+
+    @Override
+    public void report(SortedMap<String, Gauge> gauges,
+        SortedMap<String, Counter> counters,
+        SortedMap<String, Histogram> histograms,
+        SortedMap<String, Meter> meters,
+        SortedMap<String, Timer> timers) {
+
+      for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+        gaugeMap.put(entry.getKey(), entry.getValue().getValue().toString());
+      }
+
+      for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+        meterMap.put(entry.getKey(), entry.getValue().getCount());
+      }
+    }
+
+    private String getGauge(String key) {
+      return gaugeMap.get(key);
+    }
+
+    private long getMeter(String key) {
+      return meterMap.getOrDefault(key, 0L);
+    }
+  }
+
+  public static void testMeter(String meterName, DummyReporter dr, Runnable runnable) {
+
+    sleepMillis(20);
+    long currMeter = dr.getMeter(meterName);
+    runnable.run();
+    sleepMillis(20);
+    Assert.assertEquals(dr.getMeter(meterName), currMeter + 1);
+
+    runnable.run();
+    runnable.run();
+    sleepMillis(20);
+    Assert.assertEquals(dr.getMeter(meterName), currMeter + 3);
+  }
+
+  public static void testGauge(String GaugeName, DummyReporter dr, Consumer<Long> func) {
+    func.accept(1L);
+    sleepMillis(20);
+    Assert.assertEquals(dr.getGauge(GaugeName), "1");
+    func.accept(99L);
+    sleepMillis(20);
+    Assert.assertEquals(dr.getGauge(GaugeName), "99");
+  }
+
+
+  /**
+   * Helper method to sleep milli seconds.
+   */
+  private static void sleepMillis(int numMilli) {
+    try {
+      Thread.sleep(numMilli);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/azkaban-web-server/build.gradle b/azkaban-web-server/build.gradle
index 23e59f0..bce9503 100644
--- a/azkaban-web-server/build.gradle
+++ b/azkaban-web-server/build.gradle
@@ -58,6 +58,9 @@ dependencies {
   testCompile('org.hamcrest:hamcrest-all:1.3')
   testCompile('org.mockito:mockito-all:1.10.19')
 
+  //AZ web module tests need to access classes defined in azkaban-common test module
+  testCompile project(':azkaban-common').sourceSets.test.output
+
   generateRestli('com.linkedin.pegasus:generator:' + pegasusVersion)
   generateRestli('com.linkedin.pegasus:restli-tools:' + pegasusVersion)
 
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index 5476470..225df57 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -16,22 +16,6 @@
 
 package azkaban.webapp.servlet;
 
-import azkaban.executor.ExecutorManager;
-import azkaban.utils.FlowUtils;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.log4j.Logger;
-
 import azkaban.constants.ServerProperties;
 import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutableFlow;
@@ -57,11 +41,28 @@ import azkaban.user.User;
 import azkaban.user.UserManager;
 import azkaban.utils.ExternalLinkUtils;
 import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.FlowUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.plugin.PluginRegistry;
 import azkaban.webapp.plugin.ViewerPlugin;
+import azkaban.webapp.WebMetrics;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.log4j.Logger;
+
 
 public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   private static final Logger LOGGER =
@@ -489,6 +490,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   private void ajaxFetchExecFlowLogs(HttpServletRequest req,
       HttpServletResponse resp, HashMap<String, Object> ret, User user,
       ExecutableFlow exFlow) throws ServletException {
+    long startMs = System.currentTimeMillis();
     Project project =
         getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
     if (project == null) {
@@ -515,6 +517,14 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
     } catch (ExecutorManagerException e) {
       throw new ServletException(e);
     }
+
+    /*
+     * We originally consider leverage Drop Wizard's Timer API {@link com.codahale.metrics.Timer}
+     * to measure the duration time.
+     * However, Timer will result in too many accompanying metrics (e.g., min, max, 99th quantile)
+     * regarding one metrics. We decided to use gauge to do that and monitor how it behaves.
+     */
+    WebMetrics.INSTANCE.setFetchLogLatency(System.currentTimeMillis() - startMs);
   }
 
   /**
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index f2a4603..3980d3f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -16,6 +16,7 @@
 
 package azkaban.webapp.servlet;
 
+import azkaban.webapp.WebMetrics;
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -99,6 +100,8 @@ public abstract class LoginAbstractAzkabanServlet extends
   @Override
   protected void doGet(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException {
+
+    WebMetrics.INSTANCE.markWebGetCall();
     // Set session id
     Session session = getSessionFromRequest(req);
     logRequest(req, session);
@@ -133,7 +136,7 @@ public abstract class LoginAbstractAzkabanServlet extends
 
   /**
    * Log out request - the format should be close to Apache access log format
-   * 
+   *
    * @param req
    * @param session
    */
@@ -167,7 +170,7 @@ public abstract class LoginAbstractAzkabanServlet extends
         buf.append("not-browser");
       }
     }
-    
+
     logger.info(buf.toString());
   }
 
@@ -276,7 +279,7 @@ public abstract class LoginAbstractAzkabanServlet extends
   protected void doPost(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException {
     Session session = getSessionFromRequest(req);
-
+    WebMetrics.INSTANCE.markWebPostCall();
     logRequest(req, session);
 
     // Handle Multipart differently from other post messages
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
new file mode 100644
index 0000000..b0aeabe
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/WebMetrics.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsUtility;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * This singleton class WebMetrics is in charge of collecting varieties of metrics
+ * from azkaban-web-server modules.
+ */
+public enum WebMetrics {
+  INSTANCE;
+
+  private MetricRegistry registry;
+
+  private Meter webGetCall;
+  private Meter webPostCall;
+
+  // How long does user log fetch take when user call fetch-log api.
+  private AtomicLong logFetchLatency = new AtomicLong(0L);
+
+  WebMetrics() {
+    registry = MetricsManager.INSTANCE.getRegistry();
+    setupAllMetrics();
+  }
+
+  private void setupAllMetrics() {
+    webGetCall = MetricsUtility.addMeter("Web-Get-Call-Meter", registry);
+    webPostCall = MetricsUtility.addMeter("Web-Post-Call-Meter", registry);
+    MetricsUtility.addLongGauge("fetchLogLatency", logFetchLatency, registry);
+  }
+
+  public void markWebGetCall() {
+
+    /*
+     * This method should be Thread Safe.
+     * Two reasons that we don't make this function call synchronized:
+     * 1). drop wizard metrics deals with concurrency internally;
+     * 2). mark is basically a math addition operation, which should not cause race condition issue.
+     */
+    webGetCall.mark();
+  }
+
+  public void markWebPostCall() {
+
+    webPostCall.mark();
+  }
+
+  public void setFetchLogLatency(long milliseconds) {
+    logFetchLatency.set(milliseconds);
+  }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java
new file mode 100644
index 0000000..46a7cb3
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/WebMetricsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsTestUtility.DummyReporter;
+import azkaban.metrics.MetricsTestUtility;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class WebMetricsTest{
+
+  private DummyReporter dr;
+
+  @Before
+  public void setup() {
+    dr = new DummyReporter(MetricsManager.INSTANCE.getRegistry());
+    dr.start(Duration.ofMillis(2).toMillis(), TimeUnit.MILLISECONDS);
+  }
+
+  @After
+  public void shutdown() {
+    if (null != dr)
+      dr.stop();
+
+    dr = null;
+  }
+
+  @Test
+  public void testLogFetchLatencyMetrics() {
+    MetricsTestUtility.testGauge("fetchLogLatency", dr, WebMetrics.INSTANCE::setFetchLogLatency);
+  }
+
+  @Test
+  public void testWebPostCallMeter() {
+    MetricsTestUtility.testMeter("Web-Post-Call-Meter", dr, WebMetrics.INSTANCE::markWebPostCall);
+  }
+
+  @Test
+  public void testWebGetCallMeter() {
+    MetricsTestUtility.testMeter("Web-Get-Call-Meter", dr, WebMetrics.INSTANCE::markWebGetCall);
+  }
+}