Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index db8afa2..207796c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -90,21 +90,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@Override
public ExecutableFlow fetchExecutableFlow(final int id)
throws ExecutorManagerException {
- final QueryRunner runner = createQueryRunner();
- final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
-
- try {
- final List<ExecutableFlow> properties =
- runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler,
- id);
- if (properties.isEmpty()) {
- return null;
- } else {
- return properties.get(0);
- }
- } catch (final SQLException e) {
- throw new ExecutorManagerException("Error fetching flow id " + id, e);
- }
+ return this.executionFlowDBManager.fetchExecutableFlow(id);
}
/**
@@ -749,7 +735,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
/**
* {@inheritDoc}
*
- * @see azkaban.executor.ExecutorLoader#updateExecutor(int)
*/
@Override
public void updateExecutor(final Executor executor) throws ExecutorManagerException {
@@ -1402,70 +1387,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
}
}
- private static class FetchExecutableFlows implements
- ResultSetHandler<List<ExecutableFlow>> {
- private static final String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
- "SELECT exec_id, enc_type, flow_data FROM execution_flows ";
- private static final String FETCH_EXECUTABLE_FLOW =
- "SELECT exec_id, enc_type, flow_data FROM execution_flows "
- + "WHERE exec_id=?";
- // private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
- // "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data "
- // +
- // "FROM execution_flows ex " +
- // "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
- private static final String FETCH_ALL_EXECUTABLE_FLOW_HISTORY =
- "SELECT exec_id, enc_type, flow_data FROM execution_flows "
- + "ORDER BY exec_id DESC LIMIT ?, ?";
- private static final String FETCH_EXECUTABLE_FLOW_HISTORY =
- "SELECT exec_id, enc_type, flow_data FROM execution_flows "
- + "WHERE project_id=? AND flow_id=? "
- + "ORDER BY exec_id DESC LIMIT ?, ?";
- private static final String FETCH_EXECUTABLE_FLOW_BY_STATUS =
- "SELECT exec_id, enc_type, flow_data FROM execution_flows "
- + "WHERE project_id=? AND flow_id=? AND status=? "
- + "ORDER BY exec_id DESC LIMIT ?, ?";
-
- @Override
- public List<ExecutableFlow> handle(final ResultSet rs) throws SQLException {
- if (!rs.next()) {
- return Collections.<ExecutableFlow> emptyList();
- }
-
- final List<ExecutableFlow> execFlows = new ArrayList<>();
- do {
- final int id = rs.getInt(1);
- final int encodingType = rs.getInt(2);
- final byte[] data = rs.getBytes(3);
-
- if (data != null) {
- final EncodingType encType = EncodingType.fromInteger(encodingType);
- final Object flowObj;
- try {
- // Convoluted way to inflate strings. Should find common package
- // or helper function.
- if (encType == EncodingType.GZIP) {
- // Decompress the sucker.
- final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- } else {
- final String jsonString = new String(data, "UTF-8");
- flowObj = JSONUtils.parseJSONFromString(jsonString);
- }
-
- final ExecutableFlow exFlow =
- ExecutableFlow.createExecutableFlowFromObject(flowObj);
- execFlows.add(exFlow);
- } catch (final IOException e) {
- throw new SQLException("Error retrieving flow data " + id, e);
- }
- }
- } while (rs.next());
-
- return execFlows;
- }
- }
-
private static class IntHandler implements ResultSetHandler<Integer> {
private static final String NUM_EXECUTIONS =
"SELECT COUNT(1) FROM execution_flows";
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java
index 6c87b35..d0c0347 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java
@@ -16,20 +16,14 @@
package azkaban.executor;
-import static azkaban.db.AzDBTestUtility.EmbeddedH2BasicDataSource;
import static org.assertj.core.api.Assertions.assertThat;
-import azkaban.database.AzkabanDatabaseSetup;
-import azkaban.db.AzkabanDataSource;
import azkaban.db.DatabaseOperator;
-import azkaban.db.DatabaseOperatorImpl;
-import azkaban.utils.Props;
+import azkaban.test.Utils;
import azkaban.utils.TestUtils;
-import java.io.File;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
-import org.apache.commons.dbutils.QueryRunner;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -38,24 +32,12 @@ import org.junit.Test;
public class ExecutionFlowDBManagerTest {
- private static final Props props = new Props();
private static DatabaseOperator dbOperator;
private ExecutionFlowDBManager executionFlowDBManager;
@BeforeClass
public static void setUp() throws Exception {
- final AzkabanDataSource dataSource = new EmbeddedH2BasicDataSource();
- dbOperator = new DatabaseOperatorImpl(new QueryRunner(dataSource));
-
- final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
- props.put("database.sql.scripts.dir", sqlScriptsDir);
-
- // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azkaban-db
- final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
- new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
- final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
- setup.loadTableInfo();
- setup.updateDatabase(true, false);
+ dbOperator = Utils.initTestDB();
}
@AfterClass
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index 031ddee..c6bafed 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -15,12 +15,9 @@
*/
package azkaban.project;
-import azkaban.database.AzkabanDatabaseSetup;
-import azkaban.db.AzDBTestUtility;
-import azkaban.db.AzkabanDataSource;
import azkaban.db.DatabaseOperator;
-import azkaban.db.DatabaseOperatorImpl;
import azkaban.flow.Flow;
+import azkaban.test.Utils;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.utils.Md5Hasher;
@@ -33,7 +30,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.commons.dbutils.QueryRunner;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -51,18 +47,7 @@ public class JdbcProjectImplTest {
@BeforeClass
public static void setUp() throws Exception {
- final AzkabanDataSource dataSource = new AzDBTestUtility.EmbeddedH2BasicDataSource();
- dbOperator = new DatabaseOperatorImpl(new QueryRunner(dataSource));
-
- final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
- props.put("database.sql.scripts.dir", sqlScriptsDir);
-
- // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azakaban-db
- final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
- new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
- final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
- setup.loadTableInfo();
- setup.updateDatabase(true, false);
+ dbOperator = Utils.initTestDB();
}
@AfterClass
diff --git a/azkaban-common/src/test/java/azkaban/test/Utils.java b/azkaban-common/src/test/java/azkaban/test/Utils.java
index c8e1699..9949b63 100644
--- a/azkaban-common/src/test/java/azkaban/test/Utils.java
+++ b/azkaban-common/src/test/java/azkaban/test/Utils.java
@@ -2,9 +2,17 @@ package azkaban.test;
import static azkaban.ServiceProvider.SERVICE_PROVIDER;
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.db.AzDBTestUtility.EmbeddedH2BasicDataSource;
+import azkaban.db.AzkabanDataSource;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.DatabaseOperatorImpl;
+import azkaban.utils.Props;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
+import java.io.File;
+import org.apache.commons.dbutils.QueryRunner;
public class Utils {
@@ -21,4 +29,21 @@ public class Utils {
SERVICE_PROVIDER.setInjector(injector);
}
+
+ public static DatabaseOperator initTestDB() throws Exception{
+ final AzkabanDataSource dataSource = new EmbeddedH2BasicDataSource();
+
+ final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
+ final Props props = new Props();
+ props.put("database.sql.scripts.dir", sqlScriptsDir);
+
+ // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azkaban-db
+ final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
+ new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
+ final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
+ setup.loadTableInfo();
+ setup.updateDatabase(true, false);
+
+ return new DatabaseOperatorImpl(new QueryRunner(dataSource));
+ }
}