Details
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 8aaf69e..02ca44e 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import azkaban.db.AzkabanDataSource;
-import azkaban.db.DatabaseOperator;
-import azkaban.db.DatabaseOperatorImpl;
import azkaban.db.H2FileDataSource;
import azkaban.db.MySQLDataSource;
import azkaban.executor.ExecutorLoader;
@@ -39,14 +37,14 @@ import azkaban.trigger.TriggerLoader;
import azkaban.utils.Props;
import com.codahale.metrics.MetricRegistry;
import com.google.inject.AbstractModule;
-import javax.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Scopes;
-import javax.inject.Singleton;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import javax.sql.DataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.hadoop.conf.Configuration;
@@ -76,7 +74,6 @@ public class AzkabanCommonModule extends AbstractModule {
protected void configure() {
bind(Props.class).toInstance(this.config.getProps());
bind(Storage.class).to(resolveStorageClassType());
- bind(DatabaseOperator.class).to(DatabaseOperatorImpl.class);
bind(TriggerLoader.class).to(JdbcTriggerImpl.class);
bind(ProjectLoader.class).to(JdbcProjectImpl.class);
bind(DataSource.class).to(AzkabanDataSource.class);
diff --git a/azkaban-common/src/test/java/azkaban/test/Utils.java b/azkaban-common/src/test/java/azkaban/test/Utils.java
index b19248f..d5ab9ae 100644
--- a/azkaban-common/src/test/java/azkaban/test/Utils.java
+++ b/azkaban-common/src/test/java/azkaban/test/Utils.java
@@ -5,7 +5,6 @@ import static azkaban.ServiceProvider.SERVICE_PROVIDER;
import azkaban.db.AzDBTestUtility.EmbeddedH2BasicDataSource;
import azkaban.db.AzkabanDataSource;
import azkaban.db.DatabaseOperator;
-import azkaban.db.DatabaseOperatorImpl;
import azkaban.db.DatabaseSetup;
import azkaban.utils.Props;
import com.google.inject.AbstractModule;
@@ -39,6 +38,6 @@ public class Utils {
final DatabaseSetup setup = new DatabaseSetup(dataSource, sqlScriptsDir);
setup.updateDatabase();
- return new DatabaseOperatorImpl(new QueryRunner(dataSource));
+ return new DatabaseOperator(new QueryRunner(dataSource));
}
}
diff --git a/azkaban-common/src/test/java/azkaban/trigger/JdbcTriggerImplTest.java b/azkaban-common/src/test/java/azkaban/trigger/JdbcTriggerImplTest.java
index 498edd3..0d18772 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/JdbcTriggerImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/JdbcTriggerImplTest.java
@@ -22,7 +22,6 @@ import azkaban.database.AzkabanConnectionPoolTest;
import azkaban.database.AzkabanDataSource;
import azkaban.database.AzkabanDatabaseSetup;
import azkaban.db.DatabaseOperator;
-import azkaban.db.DatabaseOperatorImpl;
import azkaban.executor.ExecutionOptions;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
@@ -79,7 +78,7 @@ public class JdbcTriggerImplTest {
@Before
public void setUp() {
- this.dbOperator = new DatabaseOperatorImpl(new QueryRunner(dataSource));
+ this.dbOperator = new DatabaseOperator(new QueryRunner(dataSource));
this.loader = new JdbcTriggerImpl(this.dbOperator);
}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
index b00ec9b..b776ca1 100644
--- a/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
@@ -16,8 +16,16 @@
*/
package azkaban.db;
+import static java.util.Objects.requireNonNull;
+
+import java.sql.Connection;
import java.sql.SQLException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
/**
* This interface is to define Base Data Access Object contract for Azkaban. All azkaban DB related
@@ -26,10 +34,25 @@ import org.apache.commons.dbutils.ResultSetHandler;
*
* @see org.apache.commons.dbutils.QueryRunner
*/
-public interface DatabaseOperator {
+@Singleton
+public class DatabaseOperator {
+
+ private static final Logger logger = Logger.getLogger(DatabaseOperator.class);
+
+ private final QueryRunner queryRunner;
+
+ /**
+ * Note: this queryRunner should include a concrete {@link AzkabanDataSource} inside.
+ */
+ @Inject
+ public DatabaseOperator(final QueryRunner queryRunner) {
+ requireNonNull(queryRunner.getDataSource(), "data source must not be null.");
+ this.queryRunner = queryRunner;
+ }
/**
- * Executes the given Azkaban related SELECT SQL operations.
+ * Executes the given Azkaban related SELECT SQL operations. it will call
+ * {@link AzkabanDataSource#getConnection()} inside queryrunner.query.
*
* @param sqlQuery The SQL query statement to execute.
* @param resultHandler The handler used to create the result object
@@ -37,8 +60,17 @@ public interface DatabaseOperator {
* @param <T> The type of object that the qeury handler returns
* @return The object returned by the handler.
*/
- <T> T query(String sqlQuery, ResultSetHandler<T> resultHandler, Object... params)
- throws SQLException;
+ public <T> T query(final String baseQuery, final ResultSetHandler<T> resultHandler,
+ final Object... params)
+ throws SQLException {
+ try {
+ return this.queryRunner.query(baseQuery, resultHandler, params);
+ } catch (final SQLException ex) {
+ // todo kunkun-tang: Retry logics should be implemented here.
+ logger.error("query failed", ex);
+ throw ex;
+ }
+ }
/**
* Provide a way to allow users define custom SQL operations without relying on fixed SQL
@@ -49,19 +81,48 @@ public interface DatabaseOperator {
* @param <T> The type of object that the operations returns. Note that T could be null
* @return T The object returned by the SQL statement, expected by the caller
*/
- <T> T transaction(SQLTransaction<T> operations) throws SQLException;
+ public <T> T transaction(final SQLTransaction<T> operations) throws SQLException {
+ Connection conn = null;
+ try {
+ conn = this.queryRunner.getDataSource().getConnection();
+ conn.setAutoCommit(false);
+ final DatabaseTransOperator transOperator = new DatabaseTransOperator(this.queryRunner,
+ conn);
+ final T res = operations.execute(transOperator);
+ conn.commit();
+ return res;
+ } catch (final SQLException ex) {
+ // todo kunkun-tang: Retry logics should be implemented here.
+ logger.error("transaction failed", ex);
+ throw ex;
+ } finally {
+ DbUtils.closeQuietly(conn);
+ }
+ }
/**
* Executes the given AZ related INSERT, UPDATE, or DELETE SQL statement.
+ * it will call {@link AzkabanDataSource#getConnection()} inside
+ * queryrunner.update.
*
* @param updateClause sql statements to execute
* @param params Initialize the PreparedStatement's IN parameters
* @return The number of rows updated.
*/
- int update(String updateClause, Object... params) throws SQLException;
+ public int update(final String updateClause, final Object... params) throws SQLException {
+ try {
+ return this.queryRunner.update(updateClause, params);
+ } catch (final SQLException ex) {
+ // todo kunkun-tang: Retry logics should be implemented here.
+ logger.error("update failed", ex);
+ throw ex;
+ }
+ }
/**
* @return datasource wrapped in the database operator.
*/
- AzkabanDataSource getDataSource();
+ public AzkabanDataSource getDataSource() {
+ return (AzkabanDataSource) this.queryRunner.getDataSource();
+ }
}
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java
index e409512..3788c1b 100644
--- a/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseTransOperator.java
@@ -18,7 +18,10 @@ package azkaban.db;
import java.sql.Connection;
import java.sql.SQLException;
+import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.dbutils.handlers.ScalarHandler;
+import org.apache.log4j.Logger;
/**
@@ -33,7 +36,16 @@ import org.apache.commons.dbutils.ResultSetHandler;
*
* @see org.apache.commons.dbutils.QueryRunner
*/
-public interface DatabaseTransOperator {
+public class DatabaseTransOperator {
+
+ private static final Logger logger = Logger.getLogger(DatabaseTransOperator.class);
+ private final Connection conn;
+ private final QueryRunner queryRunner;
+
+ public DatabaseTransOperator(final QueryRunner queryRunner, final Connection conn) {
+ this.conn = conn;
+ this.queryRunner = queryRunner;
+ }
/**
* returns the last id from a previous insert statement. Note that last insert and this operation
@@ -41,7 +53,19 @@ public interface DatabaseTransOperator {
*
* @return the last inserted id in mysql per connection.
*/
- long getLastInsertId() throws SQLException;
+ public long getLastInsertId() throws SQLException {
+ // A default connection: autocommit = true.
+ long num = -1;
+ try {
+ num = ((Number) this.queryRunner
+ .query(this.conn, "SELECT LAST_INSERT_ID();", new ScalarHandler<>(1)))
+ .longValue();
+ } catch (final SQLException ex) {
+ logger.error("can not get last insertion ID");
+ throw ex;
+ }
+ return num;
+ }
/**
*
@@ -52,8 +76,18 @@ public interface DatabaseTransOperator {
* @return
* @throws SQLException
*/
- <T> T query(String querySql, ResultSetHandler<T> resultHandler, Object... params)
- throws SQLException;
+ public <T> T query(final String querySql, final ResultSetHandler<T> resultHandler,
+ final Object... params)
+ throws SQLException {
+ try {
+ return this.queryRunner.query(this.conn, querySql, resultHandler, params);
+ } catch (final SQLException ex) {
+ //RETRY Logic should be implemented here if needed.
+ throw ex;
+ } finally {
+ // Note: CAN NOT CLOSE CONNECTION HERE.
+ }
+ }
/**
*
@@ -62,10 +96,21 @@ public interface DatabaseTransOperator {
* @return
* @throws SQLException
*/
- int update(String updateClause, Object... params) throws SQLException;
+ public int update(final String updateClause, final Object... params) throws SQLException {
+ try {
+ return this.queryRunner.update(this.conn, updateClause, params);
+ } catch (final SQLException ex) {
+ //RETRY Logic should be implemented here if needed.
+ throw ex;
+ } finally {
+ // Note: CAN NOT CLOSE CONNECTION HERE.
+ }
+ }
/**
* @return the JDBC connection associated with this operator.
*/
- Connection getConnection();
+ public Connection getConnection() {
+ return this.conn;
+ }
}
diff --git a/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java b/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java
index 7db1117..b785ae9 100644
--- a/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java
+++ b/azkaban-db/src/main/java/azkaban/db/SQLTransaction.java
@@ -22,7 +22,7 @@ import java.sql.SQLException;
/**
* This interface defines how a sequence of sql statements are organized and packed together. All
* transaction implementations must follow this interface, and will be called in {@link
- * DatabaseOperatorImpl#transaction(SQLTransaction)}
+ * DatabaseOperator#transaction(SQLTransaction)}
*
* @param <T> The transaction return type
*/