azkaban-aplcache
Changes
azkaban-common/build.gradle 1(+1 -0)
azkaban-db/.gitignore 0(+0 -0)
azkaban-db/build.gradle 29(+29 -0)
settings.gradle 1(+1 -0)
Details
azkaban-common/build.gradle 1(+1 -0)
diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index ce0ee92..aada39d 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -36,6 +36,7 @@ configurations {
}
dependencies {
compile project(':azkaban-spi')
+ compile project(':azkaban-db')
compile('com.google.inject:guice:4.1.0')
compile('com.google.guava:guava:21.0')
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 817c04f..57752cf 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -16,6 +16,9 @@
*/
package azkaban;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.DatabaseOperatorImpl;
+
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectLoader;
import azkaban.spi.Storage;
@@ -28,6 +31,7 @@ import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import java.io.File;
+import org.apache.commons.dbutils.QueryRunner;
/**
@@ -47,6 +51,9 @@ public class AzkabanCommonModule extends AbstractModule {
bind(ProjectLoader.class).to(JdbcProjectLoader.class).in(Scopes.SINGLETON);
bind(Props.class).toInstance(config.getProps());
bind(Storage.class).to(resolveStorageClassType()).in(Scopes.SINGLETON);
+ bind(DatabaseOperator.class).to(DatabaseOperatorImpl.class).in(Scopes.SINGLETON);
+ //todo kunkun-tang : Consider both H2 DataSource and MysqlDatasource case.
+ bind(QueryRunner.class).toInstance(config.getQueryRunner());
}
public Class<? extends Storage> resolveStorageClassType() {
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
index 015e86d..680b889 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
@@ -17,11 +17,15 @@
package azkaban;
+import azkaban.db.AzkabanDataSource;
+import azkaban.db.H2FileDataSource;
+import azkaban.db.MySQLDataSource;
import azkaban.storage.StorageImplementationType;
import azkaban.utils.Props;
import com.google.inject.Inject;
import java.net.URI;
import java.net.URISyntaxException;
+import org.apache.commons.dbutils.QueryRunner;
import org.apache.log4j.Logger;
import static azkaban.Constants.ConfigurationKeys.*;
@@ -68,6 +72,32 @@ public class AzkabanCommonModuleConfig {
return localStorageBaseDirPath;
}
+ // todo kunkun-tang: the below method should moved out to azkaban-db module eventually.
+ // Today azkaban-db can not rely on Props, so we can not do it.
+ public AzkabanDataSource getDataSource() {
+ String databaseType = props.getString("database.type");
+
+ // todo kunkun-tang: temperaroy workaround to let service provider test work.
+ if(databaseType.equals("h2")) {
+ String path = props.getString("h2.path");
+ return new H2FileDataSource(path);
+ }
+ int port = props.getInt("mysql.port");
+ String host = props.getString("mysql.host");
+ String database = props.getString("mysql.database");
+ String user = props.getString("mysql.user");
+ String password = props.getString("mysql.password");
+ int numConnections = props.getInt("mysql.numconnections");
+
+ return MySQLDataSource.getInstance(host, port, database, user, password,
+ numConnections);
+
+ }
+
+ public QueryRunner getQueryRunner() {
+ return new QueryRunner(getDataSource());
+ }
+
public URI getHdfsBaseUri() {
return hdfsBaseUri;
}
azkaban-db/.gitignore 0(+0 -0)
diff --git a/azkaban-db/.gitignore b/azkaban-db/.gitignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/azkaban-db/.gitignore
azkaban-db/build.gradle 29(+29 -0)
diff --git a/azkaban-db/build.gradle b/azkaban-db/build.gradle
new file mode 100644
index 0000000..50379a6
--- /dev/null
+++ b/azkaban-db/build.gradle
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+dependencies {
+
+ // todo kunkun-tang: consolidate dependencies in azkaban-common and azkaban-db
+ compile('log4j:log4j:1.2.16')
+ compile('com.google.inject:guice:4.1.0')
+
+ compile('commons-dbutils:commons-dbutils:1.5')
+ compile('org.apache.commons:commons-dbcp2:2.1.1')
+
+ testCompile('org.mockito:mockito-all:1.10.19')
+ testRuntime('com.h2database:h2:1.4.193')
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/AzDBUtil.java b/azkaban-db/src/main/java/azkaban/db/AzDBUtil.java
new file mode 100644
index 0000000..f9e19fa
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/AzDBUtil.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+public class AzDBUtil {
+ static final int MAX_DB_RETRY_COUNT = 5;
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/AzkabanDataSource.java b/azkaban-db/src/main/java/azkaban/db/AzkabanDataSource.java
new file mode 100644
index 0000000..7f7ad42
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/AzkabanDataSource.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.db;
+
+import org.apache.commons.dbcp2.BasicDataSource;
+
+public abstract class AzkabanDataSource extends BasicDataSource {
+ public abstract String getDBType();
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
new file mode 100644
index 0000000..3cc5b07
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+import java.sql.SQLException;
+import org.apache.commons.dbutils.ResultSetHandler;
+
+/**
+ * This interface is to define Base Data Access Object contract for Azkaban. All azkaban
+ * DB related operations must be performed upon this interface. AZ DB operators all leverages
+ * QueryRunner interface.
+ *
+ * @see org.apache.commons.dbutils.QueryRunner
+ */
+public interface DatabaseOperator {
+
+ /**
+ * Executes the given Azkaban related SELECT SQL operations.
+ *
+ * @param sqlQuery The SQL query statement to execute.
+ * @param resultHandler The handler used to create the result object
+ * @param params Initialize the PreparedStatement's IN parameters
+ * @param <T> The type of object that the qeury handler returns
+ * @return The object returned by the handler.
+ * @throws SQLException
+ */
+ <T> T query(String sqlQuery, ResultSetHandler<T> resultHandler, Object...params) throws SQLException;
+
+ /**
+ * Provide a way to allow users define custom SQL operations without relying on fixed
+ * SQL interface. The common use case is to group a sequence of SQL operations without
+ * commit every time.
+ *
+ * @param operations A sequence of DB operations
+ * @param <T> The type of object that the operations returns. Note that T could be null
+ * @return T The object returned by the SQL statement, expected by the caller
+ * @throws SQLException
+ */
+ <T> T transaction(SQLTransaction<T> operations) throws SQLException;
+
+ /**
+ * Executes the given AZ related INSERT, UPDATE, or DELETE SQL statement.
+ *
+ * @param updateClause sql statements to execute
+ * @param params Initialize the PreparedStatement's IN parameters
+ * @return The number of rows updated.
+ * @throws SQLException
+ */
+ int update(String updateClause, Object...params) throws SQLException;
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java b/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java
new file mode 100644
index 0000000..388c38f
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.db;
+
+import java.sql.Connection;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.log4j.Logger;
+
+import java.sql.SQLException;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import com.google.inject.Inject;
+
+import static java.util.Objects.*;
+
+/**
+ * Implement AZ DB related operations. This class is thread safe.
+ */
+public class DatabaseOperatorImpl implements DatabaseOperator {
+
+ private static final Logger logger = Logger.getLogger(DatabaseOperatorImpl.class);
+
+ private final QueryRunner queryRunner;
+
+ /**
+ * Note: this queryRunner should include a concrete {@link AzkabanDataSource} inside.
+ *
+ * @param queryRunner
+ */
+ @Inject
+ public DatabaseOperatorImpl(QueryRunner queryRunner){
+ requireNonNull(queryRunner.getDataSource(), "data source must not be null.");
+ this.queryRunner = queryRunner;
+ }
+
+ /**
+ * query method Implementation. it will call {@link AzkabanDataSource#getConnection()} inside queryrunner.query.
+ */
+ @Override
+ public <T> T query(String baseQuery, ResultSetHandler<T> resultHandler, Object...params) throws SQLException {
+ try{
+ return queryRunner.query(baseQuery, resultHandler, params);
+ } catch (SQLException ex){
+ // todo kunkun-tang: Retry logics should be implemented here.
+ logger.error("query failed", ex);
+ throw ex;
+ }
+ }
+
+ /**
+ * transaction method Implementation.
+ *
+ */
+ @Override
+ public <T> T transaction(SQLTransaction<T> operations) throws SQLException {
+ Connection conn = null;
+ try{
+ conn = queryRunner.getDataSource().getConnection();
+ conn.setAutoCommit(false);
+ DatabaseTransOperator transOperator = new DatabaseTransOperatorImpl(queryRunner, conn);
+ T res = operations.execute(transOperator);
+ conn.commit();
+ return res;
+ } catch (SQLException ex) {
+ // todo kunkun-tang: Retry logics should be implemented here.
+ logger.error("transaction failed", ex);
+ throw ex;
+ } finally {
+ DbUtils.closeQuietly(conn);
+ }
+ }
+
+ /**
+ * update implementation. it will call {@link AzkabanDataSource#getConnection()} inside queryrunner.update.
+ *
+ * @param updateClause sql statements to execute
+ * @param params Initialize the PreparedStatement's IN parameters
+ * @return the number of rows being affected by update
+ * @throws SQLException
+ */
+ @Override
+ public int update(String updateClause, Object...params) throws SQLException {
+ try{
+ return queryRunner.update(updateClause, params);
+ } catch (SQLException ex){
+ // todo kunkun-tang: Retry logics should be implemented here.
+ logger.error("update failed", ex);
+ throw ex;
+ }
+ }
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java
new file mode 100644
index 0000000..42a7ae6
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import org.apache.commons.dbutils.ResultSetHandler;
+
+
+/**
+ * This interface is designed as an supplement of {@link DatabaseOperator}, which do commit at the end of every query. Given
+ * this interface, users/callers (implementation code) should decide where to {@link Connection#commit()}
+ * based on their requirements.
+ *
+ * @see org.apache.commons.dbutils.QueryRunner
+ */
+public interface DatabaseTransOperator {
+
+
+ /**
+ * returns the last id from a previous insert statement.
+ * Note that last insert and this operation should use the same connection.
+ *
+ * @return the last inserted id in mysql per connection.
+ */
+ long getLastInsertId();
+
+ /**
+ *
+ * @param querySql
+ * @param resultHandler
+ * @param params
+ * @param <T>
+ * @return
+ * @throws SQLException
+ */
+ <T> T query(String querySql, ResultSetHandler<T> resultHandler, Object... params) throws SQLException;
+
+ /**
+ *
+ * @param updateClause
+ * @param params
+ * @return
+ * @throws SQLException
+ */
+ int update(String updateClause, Object... params) throws SQLException;
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperatorImpl.java b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperatorImpl.java
new file mode 100644
index 0000000..6bcd18b
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperatorImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.db;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.dbutils.handlers.ScalarHandler;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class should be only used inside {@link DatabaseOperatorImpl}, then we remove public.
+ */
+class DatabaseTransOperatorImpl implements DatabaseTransOperator {
+
+ private static final Logger logger = Logger.getLogger(DatabaseTransOperator.class);
+ private final Connection conn;
+ private final QueryRunner queryRunner;
+
+ public DatabaseTransOperatorImpl(QueryRunner queryRunner, Connection conn) {
+ this.conn = conn;
+ this.queryRunner= queryRunner;
+ }
+
+ /**
+ * The ID that was generated is maintained in Mysql server on a per-connection basis.
+ * This means that the value returned by the function to a given client is
+ * the first AUTO_INCREMENT value generated for most recent statement
+ *
+ * This value cannot be affected by other callers, even if they generate
+ * AUTO_INCREMENT values of their own.
+ * @return last insertion ID
+ *
+ */
+ @Override
+ public long getLastInsertId() {
+ // A default connection: autocommit = true.
+ long num = -1;
+ try {
+ num = ((Number) queryRunner.query(conn,"SELECT LAST_INSERT_ID();", new ScalarHandler<>(1))).longValue();
+ } catch (SQLException ex) {
+ logger.error("can not get last insertion ID", ex);
+ }
+ return num;
+ }
+
+
+ @Override
+ public <T> T query(String querySql, ResultSetHandler<T> resultHandler, Object... params) throws SQLException {
+ try{
+ return queryRunner.query(conn, querySql, resultHandler, params);
+ } catch (SQLException ex){
+ //RETRY Logic should be implemented here if needed.
+ throw ex;
+ } finally {
+ // Note: CAN NOT CLOSE CONNECTION HERE.
+ }
+ }
+
+ @Override
+ public int update(String updateClause, Object... params) throws SQLException {
+ try{
+ return queryRunner.update(conn, updateClause, params);
+ } catch (SQLException ex){
+ //RETRY Logic should be implemented here if needed.
+ throw ex;
+ } finally {
+ // Note: CAN NOT CLOSE CONNECTION HERE.
+ }
+ }
+
+ public Connection getConnection() {
+ return conn;
+ }
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java b/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java
new file mode 100644
index 0000000..98e361f
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.db;
+
+public class H2FileDataSource extends AzkabanDataSource {
+ public H2FileDataSource(String filePath) {
+ super();
+ String url = "jdbc:h2:file:" + filePath;
+ setDriverClassName("org.h2.Driver");
+ setUrl(url);
+ }
+
+ @Override
+ public String getDBType() {
+ return "h2";
+ }
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java b/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
new file mode 100644
index 0000000..4220546
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.db;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.log4j.Logger;
+
+
+public class MySQLDataSource extends AzkabanDataSource {
+
+ private static Logger logger = Logger.getLogger(MySQLDataSource.class);
+
+ private static volatile MySQLDataSource instance = null;
+
+ // TODO kunkun-tang: have guice inject working here
+ private MySQLDataSource(String host, int port, String dbName,
+ String user, String password, int numConnections) {
+ super();
+
+ String url = "jdbc:mysql://" + (host + ":" + port + "/" + dbName);
+ addConnectionProperty("useUnicode", "yes");
+ addConnectionProperty("characterEncoding", "UTF-8");
+ setDriverClassName("com.mysql.jdbc.Driver");
+ setUsername(user);
+ setPassword(password);
+ setUrl(url);
+ setMaxTotal(numConnections);
+ setValidationQuery("/* ping */ select 1");
+ setTestOnBorrow(true);
+ }
+
+ /**
+ * Get a singleton object for MySQL BasicDataSource
+ */
+ public static MySQLDataSource getInstance(String host, int port, String dbName,
+ String user, String password, int numConnections) {
+ if (instance == null) {
+ synchronized (MySQLDataSource.class) {
+ if (instance == null) {
+ logger.info("Instantiating MetricReportManager");
+ instance = new MySQLDataSource(host, port, dbName, user, password, numConnections);
+ }
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * This method overrides {@link BasicDataSource#getConnection()}, in order to have retry logics.
+ *
+ */
+ @Override
+ public synchronized Connection getConnection() throws SQLException {
+
+ /*
+ * getInitialSize() returns the initial size of the connection pool.
+ *
+ * Note: The connection pool is only initialized the first time one of the
+ * following methods is invoked: <code>getConnection, setLogwriter,
+ * setLoginTimeout, getLoginTimeout, getLogWriter.</code>
+ */
+ if (getInitialSize() == 0) {
+ return createDataSource().getConnection();
+ }
+
+ Connection connection = null;
+ int retryAttempt = 0;
+ while (retryAttempt < AzDBUtil.MAX_DB_RETRY_COUNT) {
+ try {
+ /*
+ * when DB connection could not be fetched here, dbcp library will keep searching until a timeout defined in
+ * its code hardly.
+ */
+ connection = createDataSource().getConnection();
+ if(connection != null)
+ return connection;
+ } catch (SQLException ex) {
+ logger.error("Failed to find DB connection. waits 1 minutes and retry. No.Attempt = " + retryAttempt, ex);
+ } finally {
+ retryAttempt ++;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String getDBType() {
+ return "mysql";
+ }
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java b/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java
new file mode 100644
index 0000000..40860a6
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+import java.sql.SQLException;
+
+
+/**
+ * This interface defines how a sequence of sql statements are organized and packed together. All transaction
+ * implementations must follow this interface, and will be called in
+ * {@link DatabaseOperatorImpl#transaction(SQLTransaction)}
+ *
+ * @param <T> The transaction return type
+ */
+@FunctionalInterface
+public interface SQLTransaction<T> {
+ public T execute(DatabaseTransOperator transOperator) throws SQLException;
+}
diff --git a/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java b/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
new file mode 100644
index 0000000..3fea2b1
--- /dev/null
+++ b/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+class AzDBTestUtility {
+
+ public static class EmbeddedH2BasicDataSource extends AzkabanDataSource {
+
+ EmbeddedH2BasicDataSource() {
+ super();
+ String url = "jdbc:h2:mem:test";
+ setDriverClassName("org.h2.Driver");
+ setUrl(url);
+ }
+
+ @Override
+ public String getDBType() {
+ return "h2-in-memory";
+ }
+ }
+}
diff --git a/azkaban-db/src/test/java/azkaban/db/DatabaseOperatorImplTest.java b/azkaban-db/src/test/java/azkaban/db/DatabaseOperatorImplTest.java
new file mode 100644
index 0000000..82be854
--- /dev/null
+++ b/azkaban-db/src/test/java/azkaban/db/DatabaseOperatorImplTest.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.dbutils.handlers.ScalarHandler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class DatabaseOperatorImplTest {
+
+ private AzkabanDataSource datasource = new AzDBTestUtility.EmbeddedH2BasicDataSource();
+
+ private DatabaseOperator dbOperator;
+ private QueryRunner queryRunner;
+ private Connection conn;
+
+ private ResultSetHandler<Integer> handler = rs -> {
+ if (!rs.next()) {
+ return 0;
+ }
+ return rs.getInt(1);
+ };
+
+ private static final List<Integer> list = new ArrayList<>();
+
+ private static int index_1 = 3;
+ private static int index_2 = 15;
+
+ @Before
+ public void setUp() throws Exception {
+ queryRunner = mock(QueryRunner.class);
+
+ conn = datasource.getConnection();
+ DataSource mockDataSource = mock(datasource.getClass());
+
+ when(queryRunner.getDataSource()).thenReturn(mockDataSource);
+ when(mockDataSource.getConnection()).thenReturn(conn);
+
+ this.dbOperator = new DatabaseOperatorImpl(queryRunner);
+
+ list.add(index_1);
+ list.add(index_2);
+
+ // valid query returns correct value
+ when(queryRunner.query("select * from blah where ? = ?", handler, "id", 2)).thenReturn(index_2);
+
+ // If select an non-existing entry, handler returns 0.
+ when(queryRunner.query("select * from blah where ? = ?", handler, "id", 3)).thenReturn(0);
+
+ //If typos, throw Exceptions.
+ doThrow(SQLException.class).when(queryRunner).query("sele * from blah where ? = ?", handler, "id", 2);
+
+ doAnswer(invocation -> {
+ index_1 = 26;
+ return 1;
+ }).when(queryRunner).update("update blah set ? = ?", "1", 26);
+ }
+
+ @Test
+ public void testValidQuery() throws Exception {
+ int res = dbOperator.query("select * from blah where ? = ?", handler, "id", 2);
+ Assert.assertEquals(15, res);
+ verify(queryRunner).query("select * from blah where ? = ?", handler, "id", 2);
+ }
+
+ @Test
+ public void testInvalidQuery() throws Exception {
+ int res = dbOperator.query("select * from blah where ? = ?", handler, "id", 3);
+ Assert.assertEquals(0, res);
+ }
+
+ @Test(expected = SQLException.class)
+ public void testTypoSqlStatement() throws Exception {
+ System.out.println("testTypoSqlStatement");
+ dbOperator.query("sele * from blah where ? = ?", handler, "id", 2);
+ }
+
+ @Test
+ public void testTransaction() throws Exception {
+ when(queryRunner.update(conn, "update blah set ? = ?", "1", 26)).thenReturn(1);
+ when(queryRunner.query(conn, "select * from blah where ? = ?", handler, "id", 1)).thenReturn(26);
+
+ SQLTransaction<Integer> transaction = transOperator -> {
+ transOperator.update("update blah set ? = ?", "1", 26);
+ return transOperator.query("select * from blah where ? = ?", handler, "id", 1);
+ };
+
+ int res = dbOperator.transaction(transaction);
+ Assert.assertEquals(26, res);
+ }
+
+ @Test
+ public void testValidUpdate() throws Exception {
+ int res = dbOperator.update("update blah set ? = ?", "1", 26);
+
+ // 1 row is affected
+ Assert.assertEquals(1, res);
+ Assert.assertEquals(26, index_1);
+ verify(queryRunner).update("update blah set ? = ?", "1", 26);
+ }
+
+ @Test
+ public void testInvalidUpdate() throws Exception {
+ int res = dbOperator.update("update blah set ? = ?", "3", 26);
+
+ // 0 row is affected
+ Assert.assertEquals(0, res);
+ }
+}
diff --git a/azkaban-db/src/test/java/azkaban/db/DatabaseTransOperatorImplTest.java b/azkaban-db/src/test/java/azkaban/db/DatabaseTransOperatorImplTest.java
new file mode 100644
index 0000000..86f11f6
--- /dev/null
+++ b/azkaban-db/src/test/java/azkaban/db/DatabaseTransOperatorImplTest.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+import org.apache.commons.dbutils.QueryRunner;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+// todo kunkun-tang: complete this test.
+public class DatabaseTransOperatorImplTest {
+
+ @Before
+ public void setUp() throws Exception {
+ AzkabanDataSource datasource = new AzDBTestUtility.EmbeddedH2BasicDataSource();
+ DatabaseTransOperator operator = new DatabaseTransOperatorImpl(new QueryRunner(), datasource.getConnection());
+ }
+
+ @Ignore @Test
+ public void testQuery() throws Exception {
+ }
+
+ @Ignore @Test
+ public void testUpdate() throws Exception {
+ }
+}
settings.gradle 1(+1 -0)
diff --git a/settings.gradle b/settings.gradle
index 9cdfa91..caa8add 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,6 +16,7 @@
*/
include 'azkaban-spi'
+include 'azkaban-db'
include 'azkaban-common'
include 'azkaban-exec-server'
include 'azkaban-hadoop-security-plugin'