azkaban-aplcache

Remove unused code and consolidate common logics (#1359) This

8/17/2017 8:13:19 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index db8afa2..207796c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -90,21 +90,7 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public ExecutableFlow fetchExecutableFlow(final int id)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
-
-    try {
-      final List<ExecutableFlow> properties =
-          runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler,
-              id);
-      if (properties.isEmpty()) {
-        return null;
-      } else {
-        return properties.get(0);
-      }
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching flow id " + id, e);
-    }
+    return this.executionFlowDBManager.fetchExecutableFlow(id);
   }
 
   /**
@@ -749,7 +735,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   /**
    * {@inheritDoc}
    *
-   * @see azkaban.executor.ExecutorLoader#updateExecutor(int)
    */
   @Override
   public void updateExecutor(final Executor executor) throws ExecutorManagerException {
@@ -1402,70 +1387,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
-  private static class FetchExecutableFlows implements
-      ResultSetHandler<List<ExecutableFlow>> {
-    private static final String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
-        "SELECT exec_id, enc_type, flow_data FROM execution_flows ";
-    private static final String FETCH_EXECUTABLE_FLOW =
-        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
-            + "WHERE exec_id=?";
-    // private static String FETCH_ACTIVE_EXECUTABLE_FLOW =
-    // "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data "
-    // +
-    // "FROM execution_flows ex " +
-    // "INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";
-    private static final String FETCH_ALL_EXECUTABLE_FLOW_HISTORY =
-        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
-            + "ORDER BY exec_id DESC LIMIT ?, ?";
-    private static final String FETCH_EXECUTABLE_FLOW_HISTORY =
-        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
-            + "WHERE project_id=? AND flow_id=? "
-            + "ORDER BY exec_id DESC LIMIT ?, ?";
-    private static final String FETCH_EXECUTABLE_FLOW_BY_STATUS =
-        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
-            + "WHERE project_id=? AND flow_id=? AND status=? "
-            + "ORDER BY exec_id DESC LIMIT ?, ?";
-
-    @Override
-    public List<ExecutableFlow> handle(final ResultSet rs) throws SQLException {
-      if (!rs.next()) {
-        return Collections.<ExecutableFlow> emptyList();
-      }
-
-      final List<ExecutableFlow> execFlows = new ArrayList<>();
-      do {
-        final int id = rs.getInt(1);
-        final int encodingType = rs.getInt(2);
-        final byte[] data = rs.getBytes(3);
-
-        if (data != null) {
-          final EncodingType encType = EncodingType.fromInteger(encodingType);
-          final Object flowObj;
-          try {
-            // Convoluted way to inflate strings. Should find common package
-            // or helper function.
-            if (encType == EncodingType.GZIP) {
-              // Decompress the sucker.
-              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            } else {
-              final String jsonString = new String(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            }
-
-            final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            execFlows.add(exFlow);
-          } catch (final IOException e) {
-            throw new SQLException("Error retrieving flow data " + id, e);
-          }
-        }
-      } while (rs.next());
-
-      return execFlows;
-    }
-  }
-
   private static class IntHandler implements ResultSetHandler<Integer> {
     private static final String NUM_EXECUTIONS =
         "SELECT COUNT(1) FROM execution_flows";
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java
index 6c87b35..d0c0347 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java
@@ -16,20 +16,14 @@
 
 package azkaban.executor;
 
-import static azkaban.db.AzDBTestUtility.EmbeddedH2BasicDataSource;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import azkaban.database.AzkabanDatabaseSetup;
-import azkaban.db.AzkabanDataSource;
 import azkaban.db.DatabaseOperator;
-import azkaban.db.DatabaseOperatorImpl;
-import azkaban.utils.Props;
+import azkaban.test.Utils;
 import azkaban.utils.TestUtils;
-import java.io.File;
 import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.List;
-import org.apache.commons.dbutils.QueryRunner;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -38,24 +32,12 @@ import org.junit.Test;
 
 public class ExecutionFlowDBManagerTest {
 
-  private static final Props props = new Props();
   private static DatabaseOperator dbOperator;
   private ExecutionFlowDBManager executionFlowDBManager;
 
   @BeforeClass
   public static void setUp() throws Exception {
-    final AzkabanDataSource dataSource = new EmbeddedH2BasicDataSource();
-    dbOperator = new DatabaseOperatorImpl(new QueryRunner(dataSource));
-
-    final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
-    props.put("database.sql.scripts.dir", sqlScriptsDir);
-
-    // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azkaban-db
-    final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
-        new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
-    final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
-    setup.loadTableInfo();
-    setup.updateDatabase(true, false);
+    dbOperator = Utils.initTestDB();
   }
 
   @AfterClass
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
index 031ddee..c6bafed 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -15,12 +15,9 @@
  */
 package azkaban.project;
 
-import azkaban.database.AzkabanDatabaseSetup;
-import azkaban.db.AzDBTestUtility;
-import azkaban.db.AzkabanDataSource;
 import azkaban.db.DatabaseOperator;
-import azkaban.db.DatabaseOperatorImpl;
 import azkaban.flow.Flow;
+import azkaban.test.Utils;
 import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.utils.Md5Hasher;
@@ -33,7 +30,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.dbutils.QueryRunner;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -51,18 +47,7 @@ public class JdbcProjectImplTest {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    final AzkabanDataSource dataSource = new AzDBTestUtility.EmbeddedH2BasicDataSource();
-    dbOperator = new DatabaseOperatorImpl(new QueryRunner(dataSource));
-
-    final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
-    props.put("database.sql.scripts.dir", sqlScriptsDir);
-
-    // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azakaban-db
-    final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
-        new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
-    final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
-    setup.loadTableInfo();
-    setup.updateDatabase(true, false);
+    dbOperator = Utils.initTestDB();
   }
 
   @AfterClass
diff --git a/azkaban-common/src/test/java/azkaban/test/Utils.java b/azkaban-common/src/test/java/azkaban/test/Utils.java
index c8e1699..9949b63 100644
--- a/azkaban-common/src/test/java/azkaban/test/Utils.java
+++ b/azkaban-common/src/test/java/azkaban/test/Utils.java
@@ -2,9 +2,17 @@ package azkaban.test;
 
 import static azkaban.ServiceProvider.SERVICE_PROVIDER;
 
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.db.AzDBTestUtility.EmbeddedH2BasicDataSource;
+import azkaban.db.AzkabanDataSource;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.DatabaseOperatorImpl;
+import azkaban.utils.Props;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import java.io.File;
+import org.apache.commons.dbutils.QueryRunner;
 
 
 public class Utils {
@@ -21,4 +29,21 @@ public class Utils {
 
     SERVICE_PROVIDER.setInjector(injector);
   }
+
+  public static DatabaseOperator initTestDB() throws Exception{
+    final AzkabanDataSource dataSource = new EmbeddedH2BasicDataSource();
+
+    final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
+    final Props props = new Props();
+    props.put("database.sql.scripts.dir", sqlScriptsDir);
+
+    // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azkaban-db
+    final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
+        new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
+    final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
+    setup.loadTableInfo();
+    setup.updateDatabase(true, false);
+
+    return new DatabaseOperatorImpl(new QueryRunner(dataSource));
+  }
 }