azkaban-aplcache
Details
az-core/build.gradle 13(+13 -0)
diff --git a/az-core/build.gradle b/az-core/build.gradle
index db9f912..f9a83d1 100644
--- a/az-core/build.gradle
+++ b/az-core/build.gradle
@@ -17,6 +17,19 @@
/**
* Place foundation classes in this sub-module.
*/
+task testJar(type: Jar, dependsOn: testClasses) {
+ from sourceSets.test.output
+ classifier = 'test'
+}
+
+configurations {
+ testOutput.extendsFrom(testCompile)
+}
+
+artifacts {
+ testOutput testJar
+}
+
dependencies {
compile deps.jexl
compile deps.commonsLang
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index e3d6e06..165272f 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -39,11 +39,8 @@ import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import javax.inject.Inject;
import javax.inject.Singleton;
-import javax.sql.DataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -72,9 +69,9 @@ public class AzkabanCommonModule extends AbstractModule {
protected void configure() {
install(new AzkabanCoreModule(this.props));
bind(Storage.class).to(resolveStorageClassType());
+ bind(AzkabanDataSource.class).to(resolveDataSourceType());
bind(TriggerLoader.class).to(JdbcTriggerImpl.class);
bind(ProjectLoader.class).to(JdbcProjectImpl.class);
- bind(DataSource.class).to(AzkabanDataSource.class);
bind(ExecutorLoader.class).to(JdbcExecutorLoader.class);
}
@@ -96,29 +93,17 @@ public class AzkabanCommonModule extends AbstractModule {
}
}
- // todo kunkun-tang: the below method should moved out to azkaban-db module eventually.
- @Inject
- @Provides
- @Singleton
- public AzkabanDataSource getDataSource(final Props props) {
- final String databaseType = props.getString("database.type");
+ private Class<? extends AzkabanDataSource> resolveDataSourceType() {
+ final String databaseType = this.props.getString("database.type");
if (databaseType.equals("h2")) {
- final String path = props.getString("h2.path");
- final Path h2DbPath = Paths.get(path).toAbsolutePath();
- log.info("h2 DB path: " + h2DbPath);
- return new H2FileDataSource(h2DbPath);
+ return H2FileDataSource.class;
+ } else {
+ return MySQLDataSource.class;
}
- final int port = props.getInt("mysql.port");
- final String host = props.getString("mysql.host");
- final String database = props.getString("mysql.database");
- final String user = props.getString("mysql.user");
- final String password = props.getString("mysql.password");
- final int numConnections = props.getInt("mysql.numconnections");
-
- return new MySQLDataSource(host, port, database, user, password, numConnections);
}
+
@Inject
@Provides
@Singleton
diff --git a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
index bca65ff..3d009fc 100644
--- a/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
+++ b/azkaban-common/src/main/java/azkaban/metrics/CommonMetrics.java
@@ -17,9 +17,9 @@
package azkaban.metrics;
import com.codahale.metrics.Meter;
+import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import javax.inject.Singleton;
-import java.util.concurrent.atomic.AtomicLong;
/**
* This singleton class CommonMetrics is in charge of collecting varieties of metrics which are
@@ -29,10 +29,8 @@ import java.util.concurrent.atomic.AtomicLong;
@Singleton
public class CommonMetrics {
- private final AtomicLong dbConnectionTime = new AtomicLong(0L);
private final AtomicLong OOMWaitingJobCount = new AtomicLong(0L);
private final MetricsManager metricsManager;
- private Meter dbConnectionMeter;
private Meter flowFailMeter;
private Meter dispatchFailMeter;
private Meter dispatchSuccessMeter;
@@ -46,29 +44,14 @@ public class CommonMetrics {
}
private void setupAllMetrics() {
- this.dbConnectionMeter = this.metricsManager.addMeter("DB-Connection-meter");
this.flowFailMeter = this.metricsManager.addMeter("flow-fail-meter");
this.dispatchFailMeter = this.metricsManager.addMeter("dispatch-fail-meter");
this.dispatchSuccessMeter = this.metricsManager.addMeter("dispatch-success-meter");
this.sendEmailFailMeter = this.metricsManager.addMeter("send-email-fail-meter");
this.sendEmailSuccessMeter = this.metricsManager.addMeter("send-email-success-meter");
this.metricsManager.addGauge("OOM-waiting-job-count", this.OOMWaitingJobCount::get);
- this.metricsManager.addGauge("dbConnectionTime", this.dbConnectionTime::get);
}
- /**
- * Mark the occurrence of an DB query event.
- */
- public void markDBConnection() {
-
- /*
- * This method should be Thread Safe.
- * Two reasons that we don't make this function call synchronized:
- * 1). drop wizard metrics deals with concurrency internally;
- * 2). mark is basically a math addition operation, which should not cause race condition issue.
- */
- this.dbConnectionMeter.mark();
- }
/**
* Mark flowFailMeter when a flow is considered as FAILED. This method could be called by Web
@@ -106,10 +89,6 @@ public class CommonMetrics {
this.sendEmailSuccessMeter.mark();
}
- public void setDBConnectionTime(final long milliseconds) {
- this.dbConnectionTime.set(milliseconds);
- }
-
/**
* Mark the occurrence of an job waiting event due to OOM
*/
diff --git a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
index 37efc7c..64ac5fd 100644
--- a/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
+++ b/azkaban-common/src/test/java/azkaban/metrics/CommonMetricsTest.java
@@ -36,12 +36,6 @@ public class CommonMetricsTest {
}
@Test
- public void testDBConnectionTimeMetrics() {
- this.metrics.setDBConnectionTime(14);
- assertEquals(14, this.testUtil.getGaugeValue("dbConnectionTime"));
- }
-
- @Test
public void testOOMWaitingJobMetrics() {
final String metricName = "OOM-waiting-job-count";
azkaban-db/build.gradle 1(+1 -0)
diff --git a/azkaban-db/build.gradle b/azkaban-db/build.gradle
index 9ae414b..fef3a67 100644
--- a/azkaban-db/build.gradle
+++ b/azkaban-db/build.gradle
@@ -37,6 +37,7 @@ dependencies {
compile deps.dbcp2
compile deps.io
+ testCompile project(path: ':az-core', configuration: 'testOutput')
testRuntime deps.h2
}
diff --git a/azkaban-db/src/main/java/azkaban/db/DBMetrics.java b/azkaban-db/src/main/java/azkaban/db/DBMetrics.java
new file mode 100644
index 0000000..5129130
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DBMetrics.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.db;
+
+import azkaban.metrics.MetricsManager;
+import com.codahale.metrics.Meter;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+/**
+ * This singleton class CommonMetrics
+ */
+@Singleton
+public class DBMetrics {
+
+ private final AtomicLong dbConnectionTime = new AtomicLong(0L);
+ private final MetricsManager metricsManager;
+ private Meter dbConnectionMeter;
+ private Meter dbConnectionFailMeter;
+
+ @Inject
+ public DBMetrics(final MetricsManager metricsManager) {
+ this.metricsManager = metricsManager;
+ setupAllMetrics();
+ }
+
+ private void setupAllMetrics() {
+ this.dbConnectionMeter = this.metricsManager.addMeter("DB-Connection-meter");
+ this.dbConnectionFailMeter = this.metricsManager.addMeter("DB-Fail-Connection-meter");
+ this.metricsManager.addGauge("dbConnectionTime", this.dbConnectionTime::get);
+ }
+
+ /**
+ * Mark the occurrence of an DB query event.
+ */
+ public void markDBConnection() {
+
+ /*
+ * This method should be Thread Safe.
+ * Two reasons that we don't make this function call synchronized:
+ * 1). drop wizard metrics deals with concurrency internally;
+ * 2). mark is basically a math addition operation, which should not cause race condition issue.
+ */
+ this.dbConnectionMeter.mark();
+ }
+
+ /**
+ * Mark the occurrence when DB get connection fails.
+ */
+ public void markDBFailConnection() {
+ this.dbConnectionFailMeter.mark();
+ }
+
+ public void setDBConnectionTime(final long milliseconds) {
+ this.dbConnectionTime.set(milliseconds);
+ }
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java b/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java
index c12e04c..c1fac0b 100644
--- a/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java
+++ b/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java
@@ -15,14 +15,20 @@
*/
package azkaban.db;
+import azkaban.utils.Props;
import java.nio.file.Path;
+import java.nio.file.Paths;
+import javax.inject.Inject;
public class H2FileDataSource extends AzkabanDataSource {
- public H2FileDataSource(final Path filePath) {
+ @Inject
+ public H2FileDataSource(final Props props) {
super();
- final String url = "jdbc:h2:file:" + filePath;
+ final String filePath = props.getString("h2.path");
+ final Path h2DbPath = Paths.get(filePath).toAbsolutePath();
+ final String url = "jdbc:h2:file:" + h2DbPath;
setDriverClassName("org.h2.Driver");
setUrl(url);
}
diff --git a/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java b/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
index 85ef742..c12de73 100644
--- a/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
+++ b/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
@@ -15,20 +15,31 @@
*/
package azkaban.db;
+import azkaban.utils.Props;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import javax.inject.Inject;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.log4j.Logger;
public class MySQLDataSource extends AzkabanDataSource {
private static final Logger logger = Logger.getLogger(MySQLDataSource.class);
+ private final DBMetrics dbMetrics;
- public MySQLDataSource(final String host, final int port, final String dbName,
- final String user, final String password, final int numConnections) {
+ @Inject
+ public MySQLDataSource(final Props props, final DBMetrics dbMetrics) {
super();
+ this.dbMetrics = dbMetrics;
+
+ final int port = props.getInt("mysql.port");
+ final String host = props.getString("mysql.host");
+ final String dbName = props.getString("mysql.database");
+ final String user = props.getString("mysql.user");
+ final String password = props.getString("mysql.password");
+ final int numConnections = props.getInt("mysql.numconnections");
final String url = "jdbc:mysql://" + (host + ":" + port + "/" + dbName);
addConnectionProperty("useUnicode", "yes");
@@ -41,7 +52,6 @@ public class MySQLDataSource extends AzkabanDataSource {
setValidationQuery("/* ping */ select 1");
setTestOnBorrow(true);
}
-
/**
* This method overrides {@link BasicDataSource#getConnection()}, in order to have retry logics.
* We don't make the call synchronized in order to guarantee normal cases performance.
@@ -49,6 +59,8 @@ public class MySQLDataSource extends AzkabanDataSource {
@Override
public Connection getConnection() throws SQLException {
+ this.dbMetrics.markDBConnection();
+ final long startMs = System.currentTimeMillis();
Connection connection = null;
int retryAttempt = 1;
while (retryAttempt < AzDBUtil.MAX_DB_RETRY_COUNT) {
@@ -69,6 +81,9 @@ public class MySQLDataSource extends AzkabanDataSource {
if (connection == null || isReadOnly(connection)) {
throw new SQLException("Failed to find DB connection Or connection is read only. ");
} else {
+
+ // Evalaute how long it takes to get DB Connection.
+ this.dbMetrics.setDBConnectionTime(System.currentTimeMillis() - startMs);
return connection;
}
} catch (final SQLException ex) {
@@ -77,6 +92,7 @@ public class MySQLDataSource extends AzkabanDataSource {
* invalidate connection and reconstruct it later. if remote IP address is not reachable,
* it will get hang for a while and throw exception.
*/
+ this.dbMetrics.markDBFailConnection();
try {
invalidateConnection(connection);
} catch (final Exception e) {
diff --git a/azkaban-db/src/test/java/azkaban/db/DBMetricsTest.java b/azkaban-db/src/test/java/azkaban/db/DBMetricsTest.java
new file mode 100644
index 0000000..3e3269d
--- /dev/null
+++ b/azkaban-db/src/test/java/azkaban/db/DBMetricsTest.java
@@ -0,0 +1,27 @@
+package azkaban.db;
+
+import static org.junit.Assert.assertEquals;
+
+import azkaban.metrics.MetricsManager;
+import azkaban.metrics.MetricsTestUtility;
+import com.codahale.metrics.MetricRegistry;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DBMetricsTest {
+ private MetricsTestUtility testUtil;
+ private DBMetrics metrics;
+
+ @Before
+ public void setUp() {
+ final MetricRegistry metricRegistry = new MetricRegistry();
+ this.testUtil = new MetricsTestUtility(metricRegistry);
+ this.metrics = new DBMetrics(new MetricsManager(metricRegistry));
+ }
+
+ @Test
+ public void testDBConnectionTimeMetrics() {
+ this.metrics.setDBConnectionTime(14);
+ assertEquals(14, this.testUtil.getGaugeValue("dbConnectionTime"));
+ }
+}