azkaban-developers

Remove internal DB interface (#1437) This code patch removes

9/7/2017 1:32:30 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 8aaf69e..02ca44e 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
 import azkaban.db.AzkabanDataSource;
-import azkaban.db.DatabaseOperator;
-import azkaban.db.DatabaseOperatorImpl;
 import azkaban.db.H2FileDataSource;
 import azkaban.db.MySQLDataSource;
 import azkaban.executor.ExecutorLoader;
@@ -39,14 +37,14 @@ import azkaban.trigger.TriggerLoader;
 import azkaban.utils.Props;
 import com.codahale.metrics.MetricRegistry;
 import com.google.inject.AbstractModule;
-import javax.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
-import javax.inject.Singleton;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import javax.sql.DataSource;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.hadoop.conf.Configuration;
@@ -76,7 +74,6 @@ public class AzkabanCommonModule extends AbstractModule {
   protected void configure() {
     bind(Props.class).toInstance(this.config.getProps());
     bind(Storage.class).to(resolveStorageClassType());
-    bind(DatabaseOperator.class).to(DatabaseOperatorImpl.class);
     bind(TriggerLoader.class).to(JdbcTriggerImpl.class);
     bind(ProjectLoader.class).to(JdbcProjectImpl.class);
     bind(DataSource.class).to(AzkabanDataSource.class);
diff --git a/azkaban-common/src/test/java/azkaban/test/Utils.java b/azkaban-common/src/test/java/azkaban/test/Utils.java
index b19248f..d5ab9ae 100644
--- a/azkaban-common/src/test/java/azkaban/test/Utils.java
+++ b/azkaban-common/src/test/java/azkaban/test/Utils.java
@@ -5,7 +5,6 @@ import static azkaban.ServiceProvider.SERVICE_PROVIDER;
 import azkaban.db.AzDBTestUtility.EmbeddedH2BasicDataSource;
 import azkaban.db.AzkabanDataSource;
 import azkaban.db.DatabaseOperator;
-import azkaban.db.DatabaseOperatorImpl;
 import azkaban.db.DatabaseSetup;
 import azkaban.utils.Props;
 import com.google.inject.AbstractModule;
@@ -39,6 +38,6 @@ public class Utils {
 
     final DatabaseSetup setup = new DatabaseSetup(dataSource, sqlScriptsDir);
     setup.updateDatabase();
-    return new DatabaseOperatorImpl(new QueryRunner(dataSource));
+    return new DatabaseOperator(new QueryRunner(dataSource));
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/trigger/JdbcTriggerImplTest.java b/azkaban-common/src/test/java/azkaban/trigger/JdbcTriggerImplTest.java
index 498edd3..0d18772 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/JdbcTriggerImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/JdbcTriggerImplTest.java
@@ -22,7 +22,6 @@ import azkaban.database.AzkabanConnectionPoolTest;
 import azkaban.database.AzkabanDataSource;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.db.DatabaseOperator;
-import azkaban.db.DatabaseOperatorImpl;
 import azkaban.executor.ExecutionOptions;
 import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.trigger.builtin.ExecuteFlowAction;
@@ -79,7 +78,7 @@ public class JdbcTriggerImplTest {
   @Before
   public void setUp() {
 
-    this.dbOperator = new DatabaseOperatorImpl(new QueryRunner(dataSource));
+    this.dbOperator = new DatabaseOperator(new QueryRunner(dataSource));
     this.loader = new JdbcTriggerImpl(this.dbOperator);
   }
 
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
index b00ec9b..b776ca1 100644
--- a/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
@@ -16,8 +16,16 @@
  */
 package azkaban.db;
 
+import static java.util.Objects.requireNonNull;
+
+import java.sql.Connection;
 import java.sql.SQLException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
 
 /**
  * This interface is to define Base Data Access Object contract for Azkaban. All azkaban DB related
@@ -26,10 +34,25 @@ import org.apache.commons.dbutils.ResultSetHandler;
  *
  * @see org.apache.commons.dbutils.QueryRunner
  */
-public interface DatabaseOperator {
+@Singleton
+public class DatabaseOperator {
+
+  private static final Logger logger = Logger.getLogger(DatabaseOperator.class);
+
+  private final QueryRunner queryRunner;
+
+  /**
+   * Note: this queryRunner should include a concrete {@link AzkabanDataSource} inside.
+   */
+  @Inject
+  public DatabaseOperator(final QueryRunner queryRunner) {
+    requireNonNull(queryRunner.getDataSource(), "data source must not be null.");
+    this.queryRunner = queryRunner;
+  }
 
   /**
-   * Executes the given Azkaban related SELECT SQL operations.
+   * Executes the given Azkaban related SELECT SQL operations. it will call
+   * {@link AzkabanDataSource#getConnection()} inside queryrunner.query.
    *
    * @param sqlQuery The SQL query statement to execute.
    * @param resultHandler The handler used to create the result object
@@ -37,8 +60,17 @@ public interface DatabaseOperator {
    * @param <T> The type of object that the qeury handler returns
    * @return The object returned by the handler.
    */
-  <T> T query(String sqlQuery, ResultSetHandler<T> resultHandler, Object... params)
-      throws SQLException;
+  public <T> T query(final String baseQuery, final ResultSetHandler<T> resultHandler,
+      final Object... params)
+      throws SQLException {
+    try {
+      return this.queryRunner.query(baseQuery, resultHandler, params);
+    } catch (final SQLException ex) {
+      // todo kunkun-tang: Retry logics should be implemented here.
+      logger.error("query failed", ex);
+      throw ex;
+    }
+  }
 
   /**
    * Provide a way to allow users define custom SQL operations without relying on fixed SQL
@@ -49,19 +81,48 @@ public interface DatabaseOperator {
    * @param <T> The type of object that the operations returns. Note that T could be null
    * @return T The object returned by the SQL statement, expected by the caller
    */
-  <T> T transaction(SQLTransaction<T> operations) throws SQLException;
+  public <T> T transaction(final SQLTransaction<T> operations) throws SQLException {
+    Connection conn = null;
+    try {
+      conn = this.queryRunner.getDataSource().getConnection();
+      conn.setAutoCommit(false);
+      final DatabaseTransOperator transOperator = new DatabaseTransOperator(this.queryRunner,
+          conn);
+      final T res = operations.execute(transOperator);
+      conn.commit();
+      return res;
+    } catch (final SQLException ex) {
+      // todo kunkun-tang: Retry logics should be implemented here.
+      logger.error("transaction failed", ex);
+      throw ex;
+    } finally {
+      DbUtils.closeQuietly(conn);
+    }
+  }
 
   /**
    * Executes the given AZ related INSERT, UPDATE, or DELETE SQL statement.
+   * it will call {@link AzkabanDataSource#getConnection()} inside
+   * queryrunner.update.
    *
    * @param updateClause sql statements to execute
    * @param params Initialize the PreparedStatement's IN parameters
    * @return The number of rows updated.
    */
-  int update(String updateClause, Object... params) throws SQLException;
+  public int update(final String updateClause, final Object... params) throws SQLException {
+    try {
+      return this.queryRunner.update(updateClause, params);
+    } catch (final SQLException ex) {
+      // todo kunkun-tang: Retry logics should be implemented here.
+      logger.error("update failed", ex);
+      throw ex;
+    }
+  }
 
   /**
    * @return datasource wrapped in the database operator.
    */
-  AzkabanDataSource getDataSource();
+  public AzkabanDataSource getDataSource() {
+    return (AzkabanDataSource) this.queryRunner.getDataSource();
+  }
 }
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java
index e409512..3788c1b 100644
--- a/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java
@@ -18,7 +18,10 @@ package azkaban.db;
 
 import java.sql.Connection;
 import java.sql.SQLException;
+import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.dbutils.handlers.ScalarHandler;
+import org.apache.log4j.Logger;
 
 
 /**
@@ -33,7 +36,16 @@ import org.apache.commons.dbutils.ResultSetHandler;
  *
  * @see org.apache.commons.dbutils.QueryRunner
  */
-public interface DatabaseTransOperator {
+public class DatabaseTransOperator {
+
+  private static final Logger logger = Logger.getLogger(DatabaseTransOperator.class);
+  private final Connection conn;
+  private final QueryRunner queryRunner;
+
+  public DatabaseTransOperator(final QueryRunner queryRunner, final Connection conn) {
+    this.conn = conn;
+    this.queryRunner = queryRunner;
+  }
 
   /**
    * returns the last id from a previous insert statement. Note that last insert and this operation
@@ -41,7 +53,19 @@ public interface DatabaseTransOperator {
    *
    * @return the last inserted id in mysql per connection.
    */
-  long getLastInsertId() throws SQLException;
+  public long getLastInsertId() throws SQLException {
+    // A default connection: autocommit = true.
+    long num = -1;
+    try {
+      num = ((Number) this.queryRunner
+          .query(this.conn, "SELECT LAST_INSERT_ID();", new ScalarHandler<>(1)))
+          .longValue();
+    } catch (final SQLException ex) {
+      logger.error("can not get last insertion ID");
+      throw ex;
+    }
+    return num;
+  }
 
   /**
    *
@@ -52,8 +76,18 @@ public interface DatabaseTransOperator {
    * @return
    * @throws SQLException
    */
-  <T> T query(String querySql, ResultSetHandler<T> resultHandler, Object... params)
-      throws SQLException;
+  public <T> T query(final String querySql, final ResultSetHandler<T> resultHandler,
+      final Object... params)
+      throws SQLException {
+    try {
+      return this.queryRunner.query(this.conn, querySql, resultHandler, params);
+    } catch (final SQLException ex) {
+      //RETRY Logic should be implemented here if needed.
+      throw ex;
+    } finally {
+      // Note: CAN NOT CLOSE CONNECTION HERE.
+    }
+  }
 
   /**
    *
@@ -62,10 +96,21 @@ public interface DatabaseTransOperator {
    * @return
    * @throws SQLException
    */
-  int update(String updateClause, Object... params) throws SQLException;
+  public int update(final String updateClause, final Object... params) throws SQLException {
+    try {
+      return this.queryRunner.update(this.conn, updateClause, params);
+    } catch (final SQLException ex) {
+      //RETRY Logic should be implemented here if needed.
+      throw ex;
+    } finally {
+      // Note: CAN NOT CLOSE CONNECTION HERE.
+    }
+  }
 
   /**
    * @return the JDBC connection associated with this operator.
    */
-  Connection getConnection();
+  public Connection getConnection() {
+    return this.conn;
+  }
 }
diff --git a/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java b/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java
index 7db1117..b785ae9 100644
--- a/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java
+++ b/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java
@@ -22,7 +22,7 @@ import java.sql.SQLException;
 /**
  * This interface defines how a sequence of sql statements are organized and packed together. All
  * transaction implementations must follow this interface, and will be called in {@link
- * DatabaseOperatorImpl#transaction(SQLTransaction)}
+ * DatabaseOperator#transaction(SQLTransaction)}
  *
  * @param <T> The transaction return type
  */