azkaban-aplcache

Create ExecutionFlowDBManager class to migrate to use new

8/17/2017 6:14:33 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDBManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDBManager.java
new file mode 100644
index 0000000..51f3a5f
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDBManager.java
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.EncodingType;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.SQLTransaction;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.inject.Inject;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class ExecutionFlowDBManager {
+
+  private static final Logger logger = Logger.getLogger(ExecutionFlowDBManager.class);
+  private final DatabaseOperator dbOperator;
+
+  @Inject
+  public ExecutionFlowDBManager(final DatabaseOperator dbOperator) {
+    this.dbOperator = dbOperator;
+  }
+
+  public synchronized void uploadExecutableFlow(final ExecutableFlow flow)
+      throws ExecutorManagerException {
+    final String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows "
+        + "(project_id, flow_id, version, status, submit_time, submit_user, update_time) "
+        + "values (?,?,?,?,?,?,?)";
+    final long submitTime = System.currentTimeMillis();
+    flow.setStatus(Status.PREPARING);
+
+    /**
+     * Why we need a transaction to get last insert ID?
+     * Because "SELECT LAST_INSERT_ID()" needs to have the same connection
+     * as inserting the new entry.
+     * See https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_last-insert-id
+     */
+    final SQLTransaction<Long> insertAndGetLastID = transOperator -> {
+      transOperator.update(INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
+          flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(),
+          submitTime, flow.getSubmitUser(), submitTime);
+      transOperator.getConnection().commit();
+      return transOperator.getLastInsertId();
+    };
+
+    try {
+      final long id = this.dbOperator.transaction(insertAndGetLastID);
+      logger.info("Flow given " + flow.getFlowId() + " given id " + id);
+      flow.setExecutionId((int) id);
+      updateExecutableFlow(flow);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error creating execution.", e);
+    }
+  }
+
+  List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
+      throws ExecutorManagerException {
+    try {
+      return this.dbOperator.query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY,
+          new FetchExecutableFlows(), skip, num);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching flow History", e);
+    }
+  }
+
+  List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+                                        final int skip, final int num)
+      throws ExecutorManagerException {
+    try {
+      return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY,
+          new FetchExecutableFlows(), projectId, flowId, skip, num);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching flow history", e);
+    }
+  }
+
+  List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
+                                        final int skip, final int num,
+                                        final Status status)
+      throws ExecutorManagerException {
+    try {
+      return this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
+          new FetchExecutableFlows(), projectId, flowId, status.getNumVal(), skip, num);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching active flows", e);
+    }
+  }
+
+  List<ExecutableFlow> fetchFlowHistory(final String projContain, final String flowContains,
+                                        final String userNameContains, final int status,
+                                        final long startTime, final long endTime,
+                                        final int skip, final int num)
+      throws ExecutorManagerException {
+    String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
+    final List<Object> params = new ArrayList<>();
+
+    boolean first = true;
+    if (projContain != null && !projContain.isEmpty()) {
+      query += " ef JOIN projects p ON ef.project_id = p.id WHERE name LIKE ?";
+      params.add('%' + projContain + '%');
+      first = false;
+    }
+
+    // todo kunkun-tang: we don't need the below complicated logics. We should just use a simple way.
+    if (flowContains != null && !flowContains.isEmpty()) {
+      if (first) {
+        query += " WHERE ";
+        first = false;
+      } else {
+        query += " AND ";
+      }
+
+      query += " flow_id LIKE ?";
+      params.add('%' + flowContains + '%');
+    }
+
+    if (userNameContains != null && !userNameContains.isEmpty()) {
+      if (first) {
+        query += " WHERE ";
+        first = false;
+      } else {
+        query += " AND ";
+      }
+      query += " submit_user LIKE ?";
+      params.add('%' + userNameContains + '%');
+    }
+
+    if (status != 0) {
+      if (first) {
+        query += " WHERE ";
+        first = false;
+      } else {
+        query += " AND ";
+      }
+      query += " status = ?";
+      params.add(status);
+    }
+
+    if (startTime > 0) {
+      if (first) {
+        query += " WHERE ";
+        first = false;
+      } else {
+        query += " AND ";
+      }
+      query += " start_time > ?";
+      params.add(startTime);
+    }
+
+    if (endTime > 0) {
+      if (first) {
+        query += " WHERE ";
+      } else {
+        query += " AND ";
+      }
+      query += " end_time < ?";
+      params.add(endTime);
+    }
+
+    if (skip > -1 && num > 0) {
+      query += "  ORDER BY exec_id DESC LIMIT ?, ?";
+      params.add(skip);
+      params.add(num);
+    }
+
+    try {
+      return this.dbOperator.query(query, new FetchExecutableFlows(), params.toArray());
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching active flows", e);
+    }
+  }
+
+  void updateExecutableFlow(final ExecutableFlow flow) throws ExecutorManagerException {
+    updateExecutableFlow(flow, EncodingType.GZIP);
+  }
+
+  private void updateExecutableFlow(final ExecutableFlow flow, final EncodingType encType)
+      throws ExecutorManagerException {
+    final String UPDATE_EXECUTABLE_FLOW_DATA =
+        "UPDATE execution_flows "
+            + "SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? "
+            + "WHERE exec_id=?";
+
+    final String json = JSONUtils.toJSON(flow.toObject());
+    byte[] data = null;
+    try {
+      final byte[] stringData = json.getBytes("UTF-8");
+      data = stringData;
+      // Todo kunkun-tang: use a common method to transform stringData to data.
+      if (encType == EncodingType.GZIP) {
+        data = GZIPUtils.gzipBytes(stringData);
+      }
+    } catch (final IOException e) {
+      throw new ExecutorManagerException("Error encoding the execution flow.");
+    }
+
+    try {
+      this.dbOperator.update(UPDATE_EXECUTABLE_FLOW_DATA, flow.getStatus()
+          .getNumVal(), flow.getUpdateTime(), flow.getStartTime(), flow
+          .getEndTime(), encType.getNumVal(), data, flow.getExecutionId());
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error updating flow.", e);
+    }
+  }
+
+  public ExecutableFlow fetchExecutableFlow(final int execId) throws ExecutorManagerException {
+    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
+    try {
+      final List<ExecutableFlow> properties = this.dbOperator
+          .query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler, execId);
+      if (properties.isEmpty()) {
+        return null;
+      } else {
+        return properties.get(0);
+      }
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching flow id " + execId, e);
+    }
+  }
+
+  public static class FetchExecutableFlows implements
+      ResultSetHandler<List<ExecutableFlow>> {
+
+    static String FETCH_BASE_EXECUTABLE_FLOW_QUERY =
+        "SELECT exec_id, enc_type, flow_data FROM execution_flows ";
+    static String FETCH_EXECUTABLE_FLOW =
+        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+            + "WHERE exec_id=?";
+    static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY =
+        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+            + "ORDER BY exec_id DESC LIMIT ?, ?";
+    static String FETCH_EXECUTABLE_FLOW_HISTORY =
+        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+            + "WHERE project_id=? AND flow_id=? "
+            + "ORDER BY exec_id DESC LIMIT ?, ?";
+    static String FETCH_EXECUTABLE_FLOW_BY_STATUS =
+        "SELECT exec_id, enc_type, flow_data FROM execution_flows "
+            + "WHERE project_id=? AND flow_id=? AND status=? "
+            + "ORDER BY exec_id DESC LIMIT ?, ?";
+
+    @Override
+    public List<ExecutableFlow> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final List<ExecutableFlow> execFlows = new ArrayList<>();
+      do {
+        final int id = rs.getInt(1);
+        final int encodingType = rs.getInt(2);
+        final byte[] data = rs.getBytes(3);
+
+        if (data != null) {
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object flowObj;
+
+          /**
+           * The below code is a duplicate against many places, like azkaban.database.EncodingType
+           * TODO kunkun-tang: Extract these duplicates to a single static method.
+           */
+          try {
+            // Convoluted way to inflate strings. Should find common package
+            // or helper function.
+            if (encType == EncodingType.GZIP) {
+              // Decompress the sucker.
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              final String jsonString = new String(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+
+            final ExecutableFlow exFlow =
+                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+            execFlows.add(exFlow);
+          } catch (final IOException e) {
+            throw new SQLException("Error retrieving flow data " + id, e);
+          }
+        }
+      } while (rs.next());
+
+      return execFlows;
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index e2d6c6b..db8afa2 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -58,12 +58,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     ExecutorLoader {
   private static final Logger logger = Logger
       .getLogger(JdbcExecutorLoader.class);
-
+  private final ExecutionFlowDBManager executionFlowDBManager;
   private EncodingType defaultEncodingType = EncodingType.GZIP;
 
   @Inject
-  public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics) {
+  public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics, final ExecutionFlowDBManager executionFlowDBManager) {
     super(props, commonMetrics);
+    this.executionFlowDBManager = executionFlowDBManager;
   }
 
   public EncodingType getDefaultEncodingType() {
@@ -77,91 +78,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public synchronized void uploadExecutableFlow(final ExecutableFlow flow)
       throws ExecutorManagerException {
-    final Connection connection = getConnection();
-    try {
-      uploadExecutableFlow(connection, flow, this.defaultEncodingType);
-    } catch (final IOException e) {
-      throw new ExecutorManagerException("Error uploading flow", e);
-    } finally {
-      DbUtils.closeQuietly(connection);
-    }
-  }
-
-  private synchronized void uploadExecutableFlow(final Connection connection,
-                                                 final ExecutableFlow flow, final EncodingType encType)
-      throws ExecutorManagerException, IOException {
-    final String INSERT_EXECUTABLE_FLOW =
-        "INSERT INTO execution_flows "
-            + "(project_id, flow_id, version, status, submit_time, submit_user, update_time) "
-            + "values (?,?,?,?,?,?,?)";
-    final QueryRunner runner = new QueryRunner();
-    final long submitTime = System.currentTimeMillis();
-
-    final long id;
-    try {
-      flow.setStatus(Status.PREPARING);
-      runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
-          flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(),
-          submitTime, flow.getSubmitUser(), submitTime);
-      connection.commit();
-      id =
-          runner.query(connection, LastInsertID.LAST_INSERT_ID,
-              new LastInsertID());
-
-      if (id == -1L) {
-        throw new ExecutorManagerException(
-            "Execution id is not properly created.");
-      }
-      logger.info("Flow given " + flow.getFlowId() + " given id " + id);
-      flow.setExecutionId((int) id);
-
-      updateExecutableFlow(connection, flow, encType);
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error creating execution.", e);
-    }
+    this.executionFlowDBManager.uploadExecutableFlow(flow);
   }
 
   @Override
   public void updateExecutableFlow(final ExecutableFlow flow)
       throws ExecutorManagerException {
-    final Connection connection = this.getConnection();
-
-    try {
-      updateExecutableFlow(connection, flow, this.defaultEncodingType);
-    } finally {
-      DbUtils.closeQuietly(connection);
-    }
-  }
-
-  private void updateExecutableFlow(final Connection connection, final ExecutableFlow flow,
-                                    final EncodingType encType) throws ExecutorManagerException {
-    final String UPDATE_EXECUTABLE_FLOW_DATA =
-        "UPDATE execution_flows "
-            + "SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? "
-            + "WHERE exec_id=?";
-    final QueryRunner runner = new QueryRunner();
-
-    final String json = JSONUtils.toJSON(flow.toObject());
-    byte[] data = null;
-    try {
-      final byte[] stringData = json.getBytes("UTF-8");
-      data = stringData;
-
-      if (encType == EncodingType.GZIP) {
-        data = GZIPUtils.gzipBytes(stringData);
-      }
-    } catch (final IOException e) {
-      throw new ExecutorManagerException("Error encoding the execution flow.");
-    }
-
-    try {
-      runner.update(connection, UPDATE_EXECUTABLE_FLOW_DATA, flow.getStatus()
-          .getNumVal(), flow.getUpdateTime(), flow.getStartTime(), flow
-          .getEndTime(), encType.getNumVal(), data, flow.getExecutionId());
-      connection.commit();
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error updating flow.", e);
-    }
+    this.executionFlowDBManager.updateExecutableFlow(flow);
   }
 
   @Override
@@ -311,138 +234,27 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
                                                final int skip, final int num) throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
-
-    try {
-      final List<ExecutableFlow> properties =
-          runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY,
-              flowHandler, projectId, flowId, skip, num);
-      return properties;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching active flows", e);
-    }
+    return this.executionFlowDBManager.fetchFlowHistory(projectId, flowId, skip, num);
   }
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final int projectId, final String flowId,
                                                final int skip, final int num, final Status status) throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
-
-    try {
-      final List<ExecutableFlow> properties =
-          runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS,
-              flowHandler, projectId, flowId, status.getNumVal(), skip, num);
-      return properties;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching active flows", e);
-    }
+    return this.executionFlowDBManager.fetchFlowHistory(projectId, flowId, skip, num, status);
   }
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final int skip, final int num)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-
-    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
-
-    try {
-      final List<ExecutableFlow> properties =
-          runner.query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY,
-              flowHandler, skip, num);
-      return properties;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching active flows", e);
-    }
+    return this.executionFlowDBManager.fetchFlowHistory(skip,num);
   }
 
   @Override
   public List<ExecutableFlow> fetchFlowHistory(final String projContain,
                                                final String flowContains, final String userNameContains, final int status, final long startTime,
                                                final long endTime, final int skip, final int num) throws ExecutorManagerException {
-    String query = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
-    final ArrayList<Object> params = new ArrayList<>();
-
-    boolean first = true;
-    if (projContain != null && !projContain.isEmpty()) {
-      query += " ef JOIN projects p ON ef.project_id = p.id WHERE name LIKE ?";
-      params.add('%' + projContain + '%');
-      first = false;
-    }
-
-    if (flowContains != null && !flowContains.isEmpty()) {
-      if (first) {
-        query += " WHERE ";
-        first = false;
-      } else {
-        query += " AND ";
-      }
-
-      query += " flow_id LIKE ?";
-      params.add('%' + flowContains + '%');
-    }
-
-    if (userNameContains != null && !userNameContains.isEmpty()) {
-      if (first) {
-        query += " WHERE ";
-        first = false;
-      } else {
-        query += " AND ";
-      }
-      query += " submit_user LIKE ?";
-      params.add('%' + userNameContains + '%');
-    }
-
-    if (status != 0) {
-      if (first) {
-        query += " WHERE ";
-        first = false;
-      } else {
-        query += " AND ";
-      }
-      query += " status = ?";
-      params.add(status);
-    }
-
-    if (startTime > 0) {
-      if (first) {
-        query += " WHERE ";
-        first = false;
-      } else {
-        query += " AND ";
-      }
-      query += " start_time > ?";
-      params.add(startTime);
-    }
-
-    if (endTime > 0) {
-      if (first) {
-        query += " WHERE ";
-        first = false;
-      } else {
-        query += " AND ";
-      }
-      query += " end_time < ?";
-      params.add(endTime);
-    }
-
-    if (skip > -1 && num > 0) {
-      query += "  ORDER BY exec_id DESC LIMIT ?, ?";
-      params.add(skip);
-      params.add(num);
-    }
-
-    final QueryRunner runner = createQueryRunner();
-    final FetchExecutableFlows flowHandler = new FetchExecutableFlows();
-
-    try {
-      final List<ExecutableFlow> properties =
-          runner.query(query, flowHandler, params.toArray());
-      return properties;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching active flows", e);
-    }
+    return this.executionFlowDBManager.fetchFlowHistory(projContain, flowContains,
+        userNameContains, status, startTime, endTime, skip, num);
   }
 
   @Override
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java
new file mode 100644
index 0000000..6c87b35
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDBManagerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import static azkaban.db.AzDBTestUtility.EmbeddedH2BasicDataSource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.db.AzkabanDataSource;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.DatabaseOperatorImpl;
+import azkaban.utils.Props;
+import azkaban.utils.TestUtils;
+import java.io.File;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.commons.dbutils.QueryRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ExecutionFlowDBManagerTest {
+
+  private static final Props props = new Props();
+  private static DatabaseOperator dbOperator;
+  private ExecutionFlowDBManager executionFlowDBManager;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    final AzkabanDataSource dataSource = new EmbeddedH2BasicDataSource();
+    dbOperator = new DatabaseOperatorImpl(new QueryRunner(dataSource));
+
+    final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
+    props.put("database.sql.scripts.dir", sqlScriptsDir);
+
+    // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azkaban-db
+    final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
+        new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
+    final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
+    setup.loadTableInfo();
+    setup.updateDatabase(true, false);
+  }
+
+  @AfterClass
+  public static void destroyDB() throws Exception {
+    try {
+      dbOperator.update("DROP ALL OBJECTS");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Before
+  public void setup() {
+    this.executionFlowDBManager = new ExecutionFlowDBManager(dbOperator);
+  }
+
+  @After
+  public void clearDB() {
+    try {
+      dbOperator.update("DELETE FROM execution_flows");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private ExecutableFlow createTestFlow() throws Exception {
+    return TestUtils.createExecutableFlow("exectest1", "exec1");
+  }
+
+  @Test
+  public void testUploadAndFetchExecutionFlows() throws Exception {
+
+    final ExecutableFlow flow = createTestFlow();
+    this.executionFlowDBManager.uploadExecutableFlow(flow);
+
+    final ExecutableFlow fetchFlow =
+        this.executionFlowDBManager.fetchExecutableFlow(flow.getExecutionId());
+
+    assertThat(flow).isNotSameAs(fetchFlow);
+    assertTwoFlowSame(flow, fetchFlow);
+  }
+
+
+  @Test
+  public void testUpdateExecutableFlow() throws Exception {
+    final ExecutableFlow flow = createTestFlow();
+    this.executionFlowDBManager.uploadExecutableFlow(flow);
+
+    final ExecutableFlow fetchFlow =
+        this.executionFlowDBManager.fetchExecutableFlow(flow.getExecutionId());
+
+    fetchFlow.setEndTime(System.currentTimeMillis());
+    fetchFlow.setStatus(Status.SUCCEEDED);
+    this.executionFlowDBManager.updateExecutableFlow(fetchFlow);
+    final ExecutableFlow fetchFlow2 =
+        this.executionFlowDBManager.fetchExecutableFlow(flow.getExecutionId());
+
+    assertTwoFlowSame(fetchFlow, fetchFlow2);
+  }
+
+  @Test
+  public void fetchFlowHistory() throws Exception {
+    final ExecutableFlow flow = createTestFlow();
+    this.executionFlowDBManager.uploadExecutableFlow(flow);
+    final List<ExecutableFlow> flowList1 = this.executionFlowDBManager.fetchFlowHistory(0,2 );
+    assertThat(flowList1.size()).isEqualTo(1);
+
+    final List<ExecutableFlow> flowList2 = this.executionFlowDBManager
+        .fetchFlowHistory(flow.getProjectId(), flow.getId(),0,2 );
+    assertThat(flowList2.size()).isEqualTo(1);
+
+    final ExecutableFlow fetchFlow =
+        this.executionFlowDBManager.fetchExecutableFlow(flow.getExecutionId());
+    assertTwoFlowSame(flowList1.get(0), flowList2.get(0));
+    assertTwoFlowSame(flowList1.get(0), fetchFlow);
+  }
+
+  private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
+    assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
+    assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
+    assertThat(flow1.getEndTime()).isEqualTo(flow2.getEndTime());
+    assertThat(flow1.getStartTime()).isEqualTo(flow2.getStartTime());
+    assertThat(flow1.getSubmitTime()).isEqualTo(flow2.getStartTime());
+    assertThat(flow1.getFlowId()).isEqualTo(flow2.getFlowId());
+    assertThat(flow1.getProjectId()).isEqualTo(flow2.getProjectId());
+    assertThat(flow1.getVersion()).isEqualTo(flow2.getVersion());
+    assertThat(flow1.getExecutionOptions().getFailureAction())
+        .isEqualTo(flow2.getExecutionOptions().getFailureAction());
+    assertThat(new HashSet<>(flow1.getEndNodes())).isEqualTo(new HashSet<>(flow2.getEndNodes()));
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 3b3f1ca..9da2af7 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -1006,7 +1006,6 @@ public class JdbcExecutorLoaderTest {
         logsResult6.getLength(), 185493);
   }
 
-  @SuppressWarnings("static-access")
   @Ignore @Test
   public void testRemoveExecutionLogsByTime() throws ExecutorManagerException,
       IOException, InterruptedException {
@@ -1055,8 +1054,9 @@ public class JdbcExecutorLoaderTest {
     props.put("mysql.password", password);
     props.put("mysql.numconnections", numConnections);
 
+    //TODO kunkun-tang: temporary work-around here. This Test is to be deprecated.
     return new JdbcExecutorLoader(props,
-        new CommonMetrics(new MetricsManager(new MetricRegistry())));
+        new CommonMetrics(new MetricsManager(new MetricRegistry())), null);
   }
 
   private boolean isTestSetup() {
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index b8347dc..efa6c12 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -29,6 +29,7 @@ import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.database.AzkabanDatabaseUpdater;
 import azkaban.db.DatabaseOperator;
 import azkaban.executor.AlerterHolder;
+import azkaban.executor.ExecutionFlowDBManager;
 import azkaban.executor.Executor;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
@@ -121,6 +122,7 @@ public class AzkabanWebServerTest {
     executorLoader.updateExecutor(executor);
 
     assertNotNull(injector.getInstance(AzkabanWebServer.class));
+    assertNotNull(injector.getInstance(ExecutionFlowDBManager.class));
 
     //Test if triggermanager is singletonly guiced. If not, the below test will fail.
     assertSingleton(ExecutorLoader.class, injector);
@@ -133,6 +135,7 @@ public class AzkabanWebServerTest {
     assertSingleton(TriggerManager.class, injector);
     assertSingleton(AlerterHolder.class, injector);
     assertSingleton(Emailer.class, injector);
+    assertSingleton(ExecutionFlowDBManager.class, injector);
 
     SERVICE_PROVIDER.unsetInjector();
   }