azkaban-aplcache

Azkaban-db subproject introduction (#1021) * Azkaban-db

4/27/2017 5:50:55 PM

Details

diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index ce0ee92..aada39d 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -36,6 +36,7 @@ configurations {
 }
 dependencies {
   compile project(':azkaban-spi')
+  compile project(':azkaban-db')
 
   compile('com.google.inject:guice:4.1.0')
   compile('com.google.guava:guava:21.0')
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 817c04f..57752cf 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -16,6 +16,9 @@
  */
 package azkaban;
 
+import azkaban.db.DatabaseOperator;
+import azkaban.db.DatabaseOperatorImpl;
+
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
 import azkaban.spi.Storage;
@@ -28,6 +31,7 @@ import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import java.io.File;
+import org.apache.commons.dbutils.QueryRunner;
 
 
 /**
@@ -47,6 +51,9 @@ public class AzkabanCommonModule extends AbstractModule {
     bind(ProjectLoader.class).to(JdbcProjectLoader.class).in(Scopes.SINGLETON);
     bind(Props.class).toInstance(config.getProps());
     bind(Storage.class).to(resolveStorageClassType()).in(Scopes.SINGLETON);
+    bind(DatabaseOperator.class).to(DatabaseOperatorImpl.class).in(Scopes.SINGLETON);
+    //todo kunkun-tang : Consider both H2 DataSource and MysqlDatasource case.
+    bind(QueryRunner.class).toInstance(config.getQueryRunner());
   }
 
   public Class<? extends Storage> resolveStorageClassType() {
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
index 015e86d..680b889 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
@@ -17,11 +17,15 @@
 
 package azkaban;
 
+import azkaban.db.AzkabanDataSource;
+import azkaban.db.H2FileDataSource;
+import azkaban.db.MySQLDataSource;
 import azkaban.storage.StorageImplementationType;
 import azkaban.utils.Props;
 import com.google.inject.Inject;
 import java.net.URI;
 import java.net.URISyntaxException;
+import org.apache.commons.dbutils.QueryRunner;
 import org.apache.log4j.Logger;
 
 import static azkaban.Constants.ConfigurationKeys.*;
@@ -68,6 +72,32 @@ public class AzkabanCommonModuleConfig {
     return localStorageBaseDirPath;
   }
 
+  // todo kunkun-tang: the below method should moved out to azkaban-db module eventually.
+  // Today azkaban-db can not rely on Props, so we can not do it.
+  public AzkabanDataSource getDataSource() {
+    String databaseType = props.getString("database.type");
+
+    // todo kunkun-tang: temperaroy workaround to let service provider test work.
+    if(databaseType.equals("h2")) {
+      String path = props.getString("h2.path");
+      return new H2FileDataSource(path);
+    }
+    int port = props.getInt("mysql.port");
+    String host = props.getString("mysql.host");
+    String database = props.getString("mysql.database");
+    String user = props.getString("mysql.user");
+    String password = props.getString("mysql.password");
+    int numConnections = props.getInt("mysql.numconnections");
+
+    return MySQLDataSource.getInstance(host, port, database, user, password,
+        numConnections);
+
+  }
+
+  public QueryRunner getQueryRunner() {
+    return new QueryRunner(getDataSource());
+  }
+
   public URI getHdfsBaseUri() {
     return hdfsBaseUri;
   }
diff --git a/azkaban-db/.gitignore b/azkaban-db/.gitignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/azkaban-db/.gitignore
diff --git a/azkaban-db/build.gradle b/azkaban-db/build.gradle
new file mode 100644
index 0000000..50379a6
--- /dev/null
+++ b/azkaban-db/build.gradle
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+dependencies {
+
+  // todo kunkun-tang: consolidate dependencies in azkaban-common and azkaban-db
+  compile('log4j:log4j:1.2.16')
+  compile('com.google.inject:guice:4.1.0')
+
+  compile('commons-dbutils:commons-dbutils:1.5')
+  compile('org.apache.commons:commons-dbcp2:2.1.1')
+
+  testCompile('org.mockito:mockito-all:1.10.19')
+  testRuntime('com.h2database:h2:1.4.193')
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/AzDBUtil.java b/azkaban-db/src/main/java/azkaban/db/AzDBUtil.java
new file mode 100644
index 0000000..f9e19fa
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/AzDBUtil.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+public class AzDBUtil {
+  static final int MAX_DB_RETRY_COUNT = 5;
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/AzkabanDataSource.java b/azkaban-db/src/main/java/azkaban/db/AzkabanDataSource.java
new file mode 100644
index 0000000..7f7ad42
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/AzkabanDataSource.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.db;
+
+import org.apache.commons.dbcp2.BasicDataSource;
+
+public abstract class AzkabanDataSource extends BasicDataSource {
+  public abstract String getDBType();
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
new file mode 100644
index 0000000..3cc5b07
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+import java.sql.SQLException;
+import org.apache.commons.dbutils.ResultSetHandler;
+
+/**
+ * This interface is to define Base Data Access Object contract for Azkaban. All azkaban
+ * DB related operations must be performed upon this interface. AZ DB operators all leverages
+ * QueryRunner interface.
+ *
+ * @see org.apache.commons.dbutils.QueryRunner
+ */
+public interface DatabaseOperator {
+
+  /**
+   * Executes the given Azkaban related SELECT SQL operations.
+   *
+   * @param sqlQuery The SQL query statement to execute.
+   * @param resultHandler The handler used to create the result object
+   * @param params Initialize the PreparedStatement's IN parameters
+   * @param <T> The type of object that the qeury handler returns
+   * @return The object returned by the handler.
+   * @throws SQLException
+   */
+  <T> T query(String sqlQuery, ResultSetHandler<T> resultHandler, Object...params) throws SQLException;
+
+  /**
+   * Provide a way to allow users define custom SQL operations without relying on fixed
+   * SQL interface. The common use case is to group a sequence of SQL operations without
+   * commit every time.
+   *
+   * @param operations A sequence of DB operations
+   * @param <T> The type of object that the operations returns. Note that T could be null
+   * @return T The object returned by the SQL statement, expected by the caller
+   * @throws SQLException
+   */
+  <T> T transaction(SQLTransaction<T> operations) throws SQLException;
+
+  /**
+   * Executes the given AZ related INSERT, UPDATE, or DELETE SQL statement.
+   *
+   * @param updateClause sql statements to execute
+   * @param params Initialize the PreparedStatement's IN parameters
+   * @return The number of rows updated.
+   * @throws SQLException
+   */
+  int update(String updateClause, Object...params) throws SQLException;
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java b/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java
new file mode 100644
index 0000000..388c38f
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.db;
+
+import java.sql.Connection;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.log4j.Logger;
+
+import java.sql.SQLException;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import com.google.inject.Inject;
+
+import static java.util.Objects.*;
+
+/**
+ * Implement AZ DB related operations. This class is thread safe.
+ */
+public class DatabaseOperatorImpl implements DatabaseOperator {
+
+  private static final Logger logger = Logger.getLogger(DatabaseOperatorImpl.class);
+
+  private final QueryRunner queryRunner;
+
+  /**
+   * Note: this queryRunner should include a concrete {@link AzkabanDataSource} inside.
+   *
+   * @param queryRunner
+   */
+  @Inject
+  public DatabaseOperatorImpl(QueryRunner queryRunner){
+    requireNonNull(queryRunner.getDataSource(), "data source must not be null.");
+    this.queryRunner = queryRunner;
+  }
+
+  /**
+   * query method Implementation. it will call {@link AzkabanDataSource#getConnection()} inside queryrunner.query.
+   */
+  @Override
+  public <T> T query(String baseQuery, ResultSetHandler<T> resultHandler, Object...params) throws SQLException {
+    try{
+      return queryRunner.query(baseQuery, resultHandler, params);
+    } catch (SQLException ex){
+      // todo kunkun-tang: Retry logics should be implemented here.
+      logger.error("query failed", ex);
+      throw ex;
+    }
+  }
+
+  /**
+   * transaction method Implementation.
+   *
+   */
+  @Override
+  public <T> T transaction(SQLTransaction<T> operations) throws SQLException {
+    Connection conn = null;
+    try{
+      conn = queryRunner.getDataSource().getConnection();
+      conn.setAutoCommit(false);
+      DatabaseTransOperator transOperator = new DatabaseTransOperatorImpl(queryRunner, conn);
+      T res = operations.execute(transOperator);
+      conn.commit();
+      return res;
+    } catch (SQLException ex) {
+      // todo kunkun-tang: Retry logics should be implemented here.
+      logger.error("transaction failed", ex);
+      throw ex;
+    } finally {
+      DbUtils.closeQuietly(conn);
+    }
+  }
+
+  /**
+   * update implementation. it will call {@link AzkabanDataSource#getConnection()} inside queryrunner.update.
+   *
+   * @param updateClause sql statements to execute
+   * @param params Initialize the PreparedStatement's IN parameters
+   * @return the number of rows being affected by update
+   * @throws SQLException
+   */
+  @Override
+  public int update(String updateClause, Object...params) throws SQLException {
+    try{
+      return queryRunner.update(updateClause, params);
+    } catch (SQLException ex){
+      // todo kunkun-tang: Retry logics should be implemented here.
+      logger.error("update failed", ex);
+      throw ex;
+    }
+  }
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java
new file mode 100644
index 0000000..42a7ae6
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import org.apache.commons.dbutils.ResultSetHandler;
+
+
+/**
+ * This interface is designed as an supplement of {@link DatabaseOperator}, which do commit at the end of every query. Given
+ * this interface, users/callers (implementation code) should decide where to {@link Connection#commit()}
+ * based on their requirements.
+ *
+ * @see org.apache.commons.dbutils.QueryRunner
+ */
+public interface DatabaseTransOperator {
+
+
+  /**
+   * returns the last id from a previous insert statement.
+   * Note that last insert and this operation should use the same connection.
+   *
+   * @return the last inserted id in mysql per connection.
+   */
+  long  getLastInsertId();
+
+  /**
+   *
+   * @param querySql
+   * @param resultHandler
+   * @param params
+   * @param <T>
+   * @return
+   * @throws SQLException
+   */
+  <T> T query(String querySql, ResultSetHandler<T> resultHandler, Object... params) throws SQLException;
+
+  /**
+   *
+   * @param updateClause
+   * @param params
+   * @return
+   * @throws SQLException
+   */
+  int update(String updateClause, Object... params) throws SQLException;
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperatorImpl.java b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperatorImpl.java
new file mode 100644
index 0000000..6bcd18b
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperatorImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.db;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.dbutils.handlers.ScalarHandler;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class should be only used inside {@link DatabaseOperatorImpl}, then we remove public.
+ */
+class DatabaseTransOperatorImpl implements DatabaseTransOperator {
+
+  private static final Logger logger = Logger.getLogger(DatabaseTransOperator.class);
+  private final Connection conn;
+  private final QueryRunner queryRunner;
+
+  public DatabaseTransOperatorImpl(QueryRunner queryRunner, Connection conn) {
+    this.conn = conn;
+    this.queryRunner= queryRunner;
+  }
+
+  /**
+   * The ID that was generated is maintained in Mysql server on a per-connection basis.
+   * This means that the value returned by the function to a given client is
+   * the first AUTO_INCREMENT value generated for most recent statement
+   *
+   * This value cannot be affected by other callers, even if they generate
+   * AUTO_INCREMENT values of their own.
+   * @return last insertion ID
+   *
+   */
+  @Override
+  public long getLastInsertId() {
+    // A default connection: autocommit = true.
+    long num = -1;
+    try {
+      num = ((Number) queryRunner.query(conn,"SELECT LAST_INSERT_ID();", new ScalarHandler<>(1))).longValue();
+    } catch (SQLException ex) {
+      logger.error("can not get last insertion ID", ex);
+    }
+    return num;
+  }
+
+
+  @Override
+  public <T> T query(String querySql, ResultSetHandler<T> resultHandler, Object... params) throws SQLException {
+    try{
+      return queryRunner.query(conn, querySql, resultHandler, params);
+    } catch (SQLException ex){
+      //RETRY Logic should be implemented here if needed.
+      throw ex;
+    } finally {
+      // Note: CAN NOT CLOSE CONNECTION HERE.
+    }
+  }
+
+  @Override
+  public int update(String updateClause, Object... params) throws SQLException {
+    try{
+      return queryRunner.update(conn, updateClause, params);
+    } catch (SQLException ex){
+      //RETRY Logic should be implemented here if needed.
+      throw ex;
+    } finally {
+      // Note: CAN NOT CLOSE CONNECTION HERE.
+    }
+  }
+
+  public Connection getConnection() {
+    return conn;
+  }
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java b/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java
new file mode 100644
index 0000000..98e361f
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/H2FileDataSource.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.db;
+
+public class H2FileDataSource extends AzkabanDataSource {
+  public H2FileDataSource(String filePath) {
+    super();
+    String url = "jdbc:h2:file:" + filePath;
+    setDriverClassName("org.h2.Driver");
+    setUrl(url);
+  }
+
+  @Override
+  public String getDBType() {
+    return "h2";
+  }
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java b/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
new file mode 100644
index 0000000..4220546
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/MySQLDataSource.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.db;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.log4j.Logger;
+
+
+public class MySQLDataSource extends AzkabanDataSource {
+
+  private static Logger logger = Logger.getLogger(MySQLDataSource.class);
+
+  private static volatile MySQLDataSource instance = null;
+
+  // TODO kunkun-tang: have guice inject working here
+  private MySQLDataSource(String host, int port, String dbName,
+      String user, String password, int numConnections) {
+    super();
+
+    String url = "jdbc:mysql://" + (host + ":" + port + "/" + dbName);
+    addConnectionProperty("useUnicode", "yes");
+    addConnectionProperty("characterEncoding", "UTF-8");
+    setDriverClassName("com.mysql.jdbc.Driver");
+    setUsername(user);
+    setPassword(password);
+    setUrl(url);
+    setMaxTotal(numConnections);
+    setValidationQuery("/* ping */ select 1");
+    setTestOnBorrow(true);
+  }
+
+  /**
+   * Get a singleton object for MySQL BasicDataSource
+   */
+  public static MySQLDataSource getInstance(String host, int port, String dbName,
+      String user, String password, int numConnections) {
+    if (instance == null) {
+      synchronized (MySQLDataSource.class) {
+        if (instance == null) {
+          logger.info("Instantiating MetricReportManager");
+          instance = new MySQLDataSource(host, port, dbName, user, password, numConnections);
+        }
+      }
+    }
+    return instance;
+  }
+
+  /**
+   * This method overrides {@link BasicDataSource#getConnection()}, in order to have retry logics.
+   *
+   */
+  @Override
+  public synchronized Connection getConnection() throws SQLException {
+
+      /*
+       * getInitialSize() returns the initial size of the connection pool.
+       *
+       * Note: The connection pool is only initialized the first time one of the
+       * following methods is invoked: <code>getConnection, setLogwriter,
+       * setLoginTimeout, getLoginTimeout, getLogWriter.</code>
+       */
+    if (getInitialSize() == 0) {
+      return createDataSource().getConnection();
+    }
+
+    Connection connection = null;
+    int retryAttempt = 0;
+    while (retryAttempt < AzDBUtil.MAX_DB_RETRY_COUNT) {
+      try {
+          /*
+           * when DB connection could not be fetched here, dbcp library will keep searching until a timeout defined in
+           * its code hardly.
+           */
+        connection = createDataSource().getConnection();
+        if(connection != null)
+          return connection;
+      } catch (SQLException ex) {
+        logger.error("Failed to find DB connection. waits 1 minutes and retry. No.Attempt = " + retryAttempt, ex);
+      } finally {
+        retryAttempt ++;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String getDBType() {
+    return "mysql";
+  }
+}
diff --git a/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java b/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java
new file mode 100644
index 0000000..40860a6
--- /dev/null
+++ b/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+import java.sql.SQLException;
+
+
+/**
+ * This interface defines how a sequence of sql statements are organized and packed together. All transaction
+ * implementations must follow this interface, and will be called in
+ * {@link DatabaseOperatorImpl#transaction(SQLTransaction)}
+ *
+ * @param <T> The transaction return type
+ */
+@FunctionalInterface
+public interface SQLTransaction<T> {
+  public T execute(DatabaseTransOperator transOperator) throws SQLException;
+}
diff --git a/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java b/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
new file mode 100644
index 0000000..3fea2b1
--- /dev/null
+++ b/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+class AzDBTestUtility {
+
+  public static class EmbeddedH2BasicDataSource extends AzkabanDataSource {
+
+    EmbeddedH2BasicDataSource() {
+      super();
+      String url = "jdbc:h2:mem:test";
+      setDriverClassName("org.h2.Driver");
+      setUrl(url);
+    }
+
+    @Override
+    public String getDBType() {
+      return "h2-in-memory";
+    }
+  }
+}
diff --git a/azkaban-db/src/test/java/azkaban/db/DatabaseOperatorImplTest.java b/azkaban-db/src/test/java/azkaban/db/DatabaseOperatorImplTest.java
new file mode 100644
index 0000000..82be854
--- /dev/null
+++ b/azkaban-db/src/test/java/azkaban/db/DatabaseOperatorImplTest.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.dbutils.handlers.ScalarHandler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class DatabaseOperatorImplTest {
+
+  private AzkabanDataSource datasource = new AzDBTestUtility.EmbeddedH2BasicDataSource();
+
+  private DatabaseOperator dbOperator;
+  private QueryRunner queryRunner;
+  private Connection conn;
+
+  private ResultSetHandler<Integer> handler = rs -> {
+    if (!rs.next()) {
+      return 0;
+    }
+    return rs.getInt(1);
+  };
+
+  private static final List<Integer> list = new ArrayList<>();
+
+  private static int index_1 = 3;
+  private static int index_2 = 15;
+
+  @Before
+  public void setUp() throws Exception {
+    queryRunner = mock(QueryRunner.class);
+
+    conn = datasource.getConnection();
+    DataSource mockDataSource = mock(datasource.getClass());
+
+    when(queryRunner.getDataSource()).thenReturn(mockDataSource);
+    when(mockDataSource.getConnection()).thenReturn(conn);
+
+    this.dbOperator = new DatabaseOperatorImpl(queryRunner);
+
+    list.add(index_1);
+    list.add(index_2);
+
+    // valid query returns correct value
+    when(queryRunner.query("select * from blah where ? = ?", handler, "id", 2)).thenReturn(index_2);
+
+    // If select an non-existing entry, handler returns 0.
+    when(queryRunner.query("select * from blah where ? = ?", handler, "id", 3)).thenReturn(0);
+
+    //If typos, throw Exceptions.
+    doThrow(SQLException.class).when(queryRunner).query("sele * from blah where ? = ?", handler, "id", 2);
+
+    doAnswer(invocation -> {
+      index_1 = 26;
+      return 1;
+    }).when(queryRunner).update("update blah set ? = ?", "1", 26);
+  }
+
+  @Test
+  public void testValidQuery() throws Exception {
+    int res = dbOperator.query("select * from blah where ? = ?", handler, "id", 2);
+    Assert.assertEquals(15, res);
+    verify(queryRunner).query("select * from blah where ? = ?", handler, "id", 2);
+  }
+
+  @Test
+  public void testInvalidQuery() throws Exception {
+    int res = dbOperator.query("select * from blah where ? = ?", handler, "id", 3);
+    Assert.assertEquals(0, res);
+  }
+
+  @Test(expected = SQLException.class)
+  public void testTypoSqlStatement() throws Exception {
+    System.out.println("testTypoSqlStatement");
+    dbOperator.query("sele * from blah where ? = ?", handler, "id", 2);
+  }
+
+  @Test
+  public void testTransaction() throws Exception {
+    when(queryRunner.update(conn, "update blah set ? = ?", "1", 26)).thenReturn(1);
+    when(queryRunner.query(conn, "select * from blah where ? = ?", handler, "id", 1)).thenReturn(26);
+
+    SQLTransaction<Integer> transaction = transOperator -> {
+      transOperator.update("update blah set ? = ?", "1", 26);
+      return transOperator.query("select * from blah where ? = ?", handler, "id", 1);
+    };
+
+    int res = dbOperator.transaction(transaction);
+    Assert.assertEquals(26, res);
+  }
+
+  @Test
+  public void testValidUpdate() throws Exception {
+    int res = dbOperator.update("update blah set ? = ?", "1", 26);
+
+    // 1 row is affected
+    Assert.assertEquals(1, res);
+    Assert.assertEquals(26, index_1);
+    verify(queryRunner).update("update blah set ? = ?", "1", 26);
+  }
+
+  @Test
+  public void testInvalidUpdate() throws Exception {
+    int res = dbOperator.update("update blah set ? = ?", "3", 26);
+
+    // 0 row is affected
+    Assert.assertEquals(0, res);
+  }
+}
diff --git a/azkaban-db/src/test/java/azkaban/db/DatabaseTransOperatorImplTest.java b/azkaban-db/src/test/java/azkaban/db/DatabaseTransOperatorImplTest.java
new file mode 100644
index 0000000..86f11f6
--- /dev/null
+++ b/azkaban-db/src/test/java/azkaban/db/DatabaseTransOperatorImplTest.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package azkaban.db;
+
+import org.apache.commons.dbutils.QueryRunner;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+// todo kunkun-tang: complete this test.
+public class DatabaseTransOperatorImplTest {
+
+  @Before
+  public void setUp() throws Exception {
+    AzkabanDataSource datasource = new AzDBTestUtility.EmbeddedH2BasicDataSource();
+    DatabaseTransOperator operator = new DatabaseTransOperatorImpl(new QueryRunner(), datasource.getConnection());
+  }
+
+  @Ignore @Test
+  public void testQuery() throws Exception {
+  }
+
+  @Ignore @Test
+  public void testUpdate() throws Exception {
+  }
+}

settings.gradle 1(+1 -0)

diff --git a/settings.gradle b/settings.gradle
index 9cdfa91..caa8add 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,6 +16,7 @@
  */
 
 include 'azkaban-spi'
+include 'azkaban-db'
 include 'azkaban-common'
 include 'azkaban-exec-server'
 include 'azkaban-hadoop-security-plugin'