azkaban-aplcache

Creating DB Metrics (#1468) The goal of this PR is to build

9/17/2017 12:21:30 AM

Details

diff --git a/az-core/build.gradle b/az-core/build.gradle
index db9f912..f9a83d1 100644
--- a/az-core/build.gradle
+++ b/az-core/build.gradle
@@ -17,6 +17,19 @@
 /**
  * Place foundation classes in this sub-module.
  */
+task testJar(type: Jar, dependsOn: testClasses) {
+    from sourceSets.test.output
+    classifier = 'test'
+}
+
+configurations {
+    testOutput.extendsFrom(testCompile)
+}
+
+artifacts {
+    testOutput testJar
+}
+
 dependencies {
     compile deps.jexl
     compile deps.commonsLang
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index e3d6e06..165272f 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -39,11 +39,8 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import javax.sql.DataSource;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -72,9 +69,9 @@ public class AzkabanCommonModule extends AbstractModule {
   protected void configure() {
     install(new AzkabanCoreModule(this.props));
     bind(Storage.class).to(resolveStorageClassType());
+    bind(AzkabanDataSource.class).to(resolveDataSourceType());
     bind(TriggerLoader.class).to(JdbcTriggerImpl.class);
     bind(ProjectLoader.class).to(JdbcProjectImpl.class);
-    bind(DataSource.class).to(AzkabanDataSource.class);
     bind(ExecutorLoader.class).to(JdbcExecutorLoader.class);
   }
 
@@ -96,29 +93,17 @@ public class AzkabanCommonModule extends AbstractModule {
     }
   }
 
-  // todo kunkun-tang: the below method should moved out to azkaban-db module eventually.
-  @Inject
-  @Provides
-  @Singleton
-  public AzkabanDataSource getDataSource(final Props props) {
-    final String databaseType = props.getString("database.type");
+  private Class<? extends AzkabanDataSource> resolveDataSourceType() {
 
+    final String databaseType = this.props.getString("database.type");
     if (databaseType.equals("h2")) {
-      final String path = props.getString("h2.path");
-      final Path h2DbPath = Paths.get(path).toAbsolutePath();
-      log.info("h2 DB path: " + h2DbPath);
-      return new H2FileDataSource(h2DbPath);
+      return H2FileDataSource.class;
+    } else {
+      return MySQLDataSource.class;
     }
-    final int port = props.getInt("mysql.port");
-    final String host = props.getString("mysql.host");
-    final String database = props.getString("mysql.database");
-    final String user = props.getString("mysql.user");
-    final String password = props.getString("mysql.password");
-    final int numConnections = props.getInt("mysql.numconnections");
-
-    return new MySQLDataSource(host, port, database, user, password, numConnections);
   }
 
+
   @Inject
   @Provides
   @Singleton
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index bca65ff..3d009fc 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -17,9 +17,9 @@
 package azkaban.metrics;
 
 import com.codahale.metrics.Meter;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This singleton class CommonMetrics is in charge of collecting varieties of metrics which are
@@ -29,10 +29,8 @@ import java.util.concurrent.atomic.AtomicLong;
 @Singleton
 public class CommonMetrics {
 
-  private final AtomicLong dbConnectionTime = new AtomicLong(0L);
   private final AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
   private final MetricsManager metricsManager;
-  private Meter dbConnectionMeter;
   private Meter flowFailMeter;
   private Meter dispatchFailMeter;
   private Meter dispatchSuccessMeter;
@@ -46,29 +44,14 @@ public class CommonMetrics {
   }
 
   private void setupAllMetrics() {
-    this.dbConnectionMeter = this.metricsManager.addMeter("DB-Connection-meter");
     this.flowFailMeter = this.metricsManager.addMeter("flow-fail-meter");
     this.dispatchFailMeter = this.metricsManager.addMeter("dispatch-fail-meter");
     this.dispatchSuccessMeter = this.metricsManager.addMeter("dispatch-success-meter");
     this.sendEmailFailMeter = this.metricsManager.addMeter("send-email-fail-meter");
     this.sendEmailSuccessMeter = this.metricsManager.addMeter("send-email-success-meter");
     this.metricsManager.addGauge("OOM-waiting-job-count", this.OOMWaitingJobCount::get);
-    this.metricsManager.addGauge("dbConnectionTime", this.dbConnectionTime::get);
   }
 
-  /**
-   * Mark the occurrence of an DB query event.
-   */
-  public void markDBConnection() {
-
-    /*
-     * This method should be Thread Safe.
-     * Two reasons that we don't make this function call synchronized:
-     * 1). drop wizard metrics deals with concurrency internally;
-     * 2). mark is basically a math addition operation, which should not cause race condition issue.
-     */
-    this.dbConnectionMeter.mark();
-  }
 
   /**
    * Mark flowFailMeter when a flow is considered as FAILED. This method could be called by Web
@@ -106,10 +89,6 @@ public class CommonMetrics {
     this.sendEmailSuccessMeter.mark();
   }
 
-  public void setDBConnectionTime(final long milliseconds) {
-    this.dbConnectionTime.set(milliseconds);
-  }
-
   /**
    * Mark the occurrence of an job waiting event due to OOM
    */
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index 37efc7c..64ac5fd 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -36,12 +36,6 @@ public class CommonMetricsTest {
   }
 
   @Test
-  public void testDBConnectionTimeMetrics() {
-    this.metrics.setDBConnectionTime(14);
-    assertEquals(14, this.testUtil.getGaugeValue("dbConnectionTime"));
-  }
-
-  @Test
   public void testOOMWaitingJobMetrics() {
     final String metricName = "OOM-waiting-job-count";
 
diff --git a/azkaban-db/build.gradle b/azkaban-db/build.gradle
index 9ae414b..fef3a67 100644
--- a/azkaban-db/build.gradle
+++ b/azkaban-db/build.gradle
@@ -37,6 +37,7 @@ dependencies {
     compile deps.dbcp2
     compile deps.io
 
+    testCompile project(path: ':az-core', configuration: 'testOutput')
     testRuntime deps.h2
 }
 
diff --git a/azkaban-db/src/main/java/azkaban/db/DBMetrics.java b/azkaban-db/src/main/java/azkaban/db/DBMetrics.java
new file mode 100644
index 0000000..5129130
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DBMetrics.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.db;
+
+import azkaban.metrics.MetricsManager;
+import com.codahale.metrics.Meter;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+/**
+ * This singleton class CommonMetrics
+ */
+@Singleton
+public class DBMetrics {
+
+  private final AtomicLong dbConnectionTime = new AtomicLong(0L);
+  private final MetricsManager metricsManager;
+  private Meter dbConnectionMeter;
+  private Meter dbConnectionFailMeter;
+
+  @Inject
+  public DBMetrics(final MetricsManager metricsManager) {
+    this.metricsManager = metricsManager;
+    setupAllMetrics();
+  }
+
+  private void setupAllMetrics() {
+    this.dbConnectionMeter = this.metricsManager.addMeter("DB-Connection-meter");
+    this.dbConnectionFailMeter = this.metricsManager.addMeter("DB-Fail-Connection-meter");
+    this.metricsManager.addGauge("dbConnectionTime", this.dbConnectionTime::get);
+  }
+
+  /**
+   * Mark the occurrence of an DB query event.
+   */
+  public void markDBConnection() {
+
+    /*
+     * This method should be Thread Safe.
+     * Two reasons that we don't make this function call synchronized:
+     * 1). drop wizard metrics deals with concurrency internally;
+     * 2). mark is basically a math addition operation, which should not cause race condition issue.
+     */
+    this.dbConnectionMeter.mark();
+  }
+
+  /**
+   * Mark the occurrence when DB get connection fails.
+   */
+  public void markDBFailConnection() {
+    this.dbConnectionFailMeter.mark();
+  }
+
+  public void setDBConnectionTime(final long milliseconds) {
+    this.dbConnectionTime.set(milliseconds);
+  }
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java b/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java
index c12e04c..c1fac0b 100644
--- a/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java
+++ b/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java
@@ -15,14 +15,20 @@
  */
 package azkaban.db;
 
+import azkaban.utils.Props;
 import java.nio.file.Path;
+import java.nio.file.Paths;
+import javax.inject.Inject;
 
 
 public class H2FileDataSource extends AzkabanDataSource {
 
-  public H2FileDataSource(final Path filePath) {
+  @Inject
+  public H2FileDataSource(final Props props) {
     super();
-    final String url = "jdbc:h2:file:" + filePath;
+    final String filePath = props.getString("h2.path");
+    final Path h2DbPath = Paths.get(filePath).toAbsolutePath();
+    final String url = "jdbc:h2:file:" + h2DbPath;
     setDriverClassName("org.h2.Driver");
     setUrl(url);
   }
diff --git a/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java b/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
index 85ef742..c12de73 100644
--- a/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
+++ b/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
@@ -15,20 +15,31 @@
  */
 package azkaban.db;
 
+import azkaban.utils.Props;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import javax.inject.Inject;
 import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.log4j.Logger;
 
 public class MySQLDataSource extends AzkabanDataSource {
 
   private static final Logger logger = Logger.getLogger(MySQLDataSource.class);
+  private final DBMetrics dbMetrics;
 
-  public MySQLDataSource(final String host, final int port, final String dbName,
-      final String user, final String password, final int numConnections) {
+  @Inject
+  public MySQLDataSource(final Props props, final DBMetrics dbMetrics) {
     super();
+    this.dbMetrics = dbMetrics;
+
+    final int port = props.getInt("mysql.port");
+    final String host = props.getString("mysql.host");
+    final String dbName = props.getString("mysql.database");
+    final String user = props.getString("mysql.user");
+    final String password = props.getString("mysql.password");
+    final int numConnections = props.getInt("mysql.numconnections");
 
     final String url = "jdbc:mysql://" + (host + ":" + port + "/" + dbName);
     addConnectionProperty("useUnicode", "yes");
@@ -41,7 +52,6 @@ public class MySQLDataSource extends AzkabanDataSource {
     setValidationQuery("/* ping */ select 1");
     setTestOnBorrow(true);
   }
-
   /**
    * This method overrides {@link BasicDataSource#getConnection()}, in order to have retry logics.
    * We don't make the call synchronized in order to guarantee normal cases performance.
@@ -49,6 +59,8 @@ public class MySQLDataSource extends AzkabanDataSource {
   @Override
   public Connection getConnection() throws SQLException {
 
+    this.dbMetrics.markDBConnection();
+    final long startMs = System.currentTimeMillis();
     Connection connection = null;
     int retryAttempt = 1;
     while (retryAttempt < AzDBUtil.MAX_DB_RETRY_COUNT) {
@@ -69,6 +81,9 @@ public class MySQLDataSource extends AzkabanDataSource {
         if (connection == null || isReadOnly(connection)) {
           throw new SQLException("Failed to find DB connection Or connection is read only. ");
         } else {
+
+          // Evalaute how long it takes to get DB Connection.
+          this.dbMetrics.setDBConnectionTime(System.currentTimeMillis() - startMs);
           return connection;
         }
       } catch (final SQLException ex) {
@@ -77,6 +92,7 @@ public class MySQLDataSource extends AzkabanDataSource {
          * invalidate connection and reconstruct it later. if remote IP address is not reachable,
          * it will get hang for a while and throw exception.
          */
+        this.dbMetrics.markDBFailConnection();
         try {
           invalidateConnection(connection);
         } catch (final Exception e) {
diff --git a/azkaban-db/src/test/java/azkaban/db/DBMetricsTest.java b/azkaban-db/src/test/java/azkaban/db/DBMetricsTest.java
new file mode 100644
index 0000000..3e3269d
--- /dev/null
+++ b/azkaban-db/src/test/java/azkaban/db/DBMetricsTest.java
@@ -0,0 +1,27 @@
+package azkaban.db;
+
+import static org.junit.Assert.assertEquals;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsTestUtility;
+import com.codahale.metrics.MetricRegistry;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DBMetricsTest {
+  private MetricsTestUtility testUtil;
+  private DBMetrics metrics;
+
+  @Before
+  public void setUp() {
+    final MetricRegistry metricRegistry = new MetricRegistry();
+    this.testUtil = new MetricsTestUtility(metricRegistry);
+    this.metrics = new DBMetrics(new MetricsManager(metricRegistry));
+  }
+
+  @Test
+  public void testDBConnectionTimeMetrics() {
+    this.metrics.setDBConnectionTime(14);
+    assertEquals(14, this.testUtil.getGaugeValue("dbConnectionTime"));
+  }
+}