azkaban-aplcache

The final commit for JDBC Executor rewrite (#1384) Still,

8/23/2017 2:04:06 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java
index 0fdc1fc..f341f7a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java
@@ -16,52 +16,44 @@
 
 package azkaban.executor;
 
-import azkaban.database.AbstractJdbcLoader;
 import azkaban.db.DatabaseOperator;
-import azkaban.metrics.CommonMetrics;
-import azkaban.utils.Props;
 import java.sql.SQLException;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.apache.commons.dbutils.QueryRunner;
 
+//TODO jamiesjc: This class is deprecated as we don't fetch active_execution_flow table any longer.
+// So this class should be removed onwards.
+@Deprecated
 @Singleton
-public class ActiveExecutingFlowsDao extends AbstractJdbcLoader{
+public class ActiveExecutingFlowsDao {
 
   private final DatabaseOperator dbOperator;
 
   @Inject
-  ActiveExecutingFlowsDao(final Props props, final CommonMetrics commonMetrics,
-                          final DatabaseOperator dbOperator) {
-    super(props, commonMetrics);
+  ActiveExecutingFlowsDao(final DatabaseOperator dbOperator) {
     this.dbOperator = dbOperator;
   }
 
   void addActiveExecutableReference(final ExecutionReference reference)
       throws ExecutorManagerException {
-    final String INSERT =
-        "INSERT INTO active_executing_flows "
-            + "(exec_id, update_time) values (?,?)";
-    final QueryRunner runner = createQueryRunner();
-
+    final String INSERT = "INSERT INTO active_executing_flows "
+        + "(exec_id, update_time) values (?,?)";
     try {
-      runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
+      this.dbOperator.update(INSERT, reference.getExecId(), reference.getUpdateTime());
     } catch (final SQLException e) {
       throw new ExecutorManagerException(
           "Error updating active flow reference " + reference.getExecId(), e);
     }
   }
 
-  void removeActiveExecutableReference(final int execid)
+  void removeActiveExecutableReference(final int execId)
       throws ExecutorManagerException {
     final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
-
-    final QueryRunner runner = createQueryRunner();
     try {
-      runner.update(DELETE, execid);
+      this.dbOperator.update(DELETE, execId);
     } catch (final SQLException e) {
       throw new ExecutorManagerException(
-          "Error deleting active flow reference " + execid, e);
+          "Error deleting active flow reference " + execId, e);
     }
   }
 
@@ -70,16 +62,12 @@ public class ActiveExecutingFlowsDao extends AbstractJdbcLoader{
     final String DELETE =
         "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
 
-    final QueryRunner runner = createQueryRunner();
-    int updateNum = 0;
     try {
-      updateNum = runner.update(DELETE, updateTime, execId);
+      // Should be 1.
+      return this.dbOperator.update(DELETE, updateTime, execId) > 0;
     } catch (final SQLException e) {
       throw new ExecutorManagerException(
           "Error deleting active flow reference " + execId, e);
     }
-
-    // Should be 1.
-    return updateNum > 0;
   }
-}
\ No newline at end of file
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
index 5c226c1..88fe492 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
@@ -16,9 +16,7 @@
 
 package azkaban.executor;
 
-import azkaban.database.AbstractJdbcLoader;
 import azkaban.db.DatabaseOperator;
-import azkaban.metrics.CommonMetrics;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
@@ -26,7 +24,6 @@ import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import java.io.File;
 import java.io.IOException;
-import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -35,32 +32,26 @@ import java.util.List;
 import java.util.Map;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.apache.commons.dbutils.DbUtils;
-import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
 @Singleton
-public class ExecutionJobDao extends AbstractJdbcLoader{
+public class ExecutionJobDao {
 
   private static final Logger logger = Logger.getLogger(ExecutorDao.class);
   private final DatabaseOperator dbOperator;
-  private final EncodingType defaultEncodingType = EncodingType.GZIP;
 
   @Inject
-  ExecutionJobDao(final Props props, final CommonMetrics commonMetrics,
-                  final DatabaseOperator databaseOperator) {
-    super(props, commonMetrics);
+  ExecutionJobDao(final DatabaseOperator databaseOperator) {
     this.dbOperator = databaseOperator;
   }
 
-  void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
+  public void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
       throws ExecutorManagerException {
-    final String INSERT_EXECUTION_NODE =
-        "INSERT INTO execution_jobs "
-            + "(exec_id, project_id, version, flow_id, job_id, start_time, "
-            + "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
+    final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs "
+        + "(exec_id, project_id, version, flow_id, job_id, start_time, "
+        + "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
 
     byte[] inputParam = null;
     if (inputProps != null) {
@@ -75,10 +66,9 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
 
     final ExecutableFlow flow = node.getExecutableFlow();
     final String flowId = node.getParentFlow().getFlowPath();
-    System.out.println("Uploading flowId " + flowId);
-    final QueryRunner runner = createQueryRunner();
+    logger.info("Uploading flowId " + flowId);
     try {
-      runner.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
+      this.dbOperator.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
           flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
           node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
           inputParam, node.getAttempt());
@@ -87,12 +77,10 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
     }
   }
 
-  void updateExecutableNode(final ExecutableNode node)
-      throws ExecutorManagerException {
-    final String UPSERT_EXECUTION_NODE =
-        "UPDATE execution_jobs "
-            + "SET start_time=?, end_time=?, status=?, output_params=? "
-            + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+  public void updateExecutableNode(final ExecutableNode node) throws ExecutorManagerException {
+    final String UPSERT_EXECUTION_NODE = "UPDATE execution_jobs "
+        + "SET start_time=?, end_time=?, status=?, output_params=? "
+        + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
 
     byte[] outputParam = null;
     final Props outputProps = node.getOutputProps();
@@ -105,62 +93,54 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
         throw new ExecutorManagerException("Error encoding input params");
       }
     }
-
-    final QueryRunner runner = createQueryRunner();
     try {
-      runner.update(UPSERT_EXECUTION_NODE, node.getStartTime(), node
+      this.dbOperator.update(UPSERT_EXECUTION_NODE, node.getStartTime(), node
           .getEndTime(), node.getStatus().getNumVal(), outputParam, node
           .getExecutableFlow().getExecutionId(), node.getParentFlow()
           .getFlowPath(), node.getId(), node.getAttempt());
     } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error updating job " + node.getId(),
-          e);
+      throw new ExecutorManagerException("Error updating job " + node.getId(), e);
     }
   }
 
-  List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
+  public List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-
     try {
-      final List<ExecutableJobInfo> info =
-          runner.query(
-              FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
-              new FetchExecutableJobHandler(), execId, jobId);
+      final List<ExecutableJobInfo> info = this.dbOperator.query(
+          FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
+          new FetchExecutableJobHandler(), execId, jobId);
       if (info == null || info.isEmpty()) {
         return null;
+      } else {
+        return info;
       }
-      return info;
     } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job info " + jobId, e);
     }
   }
 
-  ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempts)
+  public ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempts)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-
     try {
       final List<ExecutableJobInfo> info =
-          runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
+          this.dbOperator.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
               new FetchExecutableJobHandler(), execId, jobId, attempts);
       if (info == null || info.isEmpty()) {
         return null;
+      } else {
+        return info.get(0);
       }
-      return info.get(0);
     } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job info " + jobId, e);
     }
   }
 
-  Props fetchExecutionJobInputProps(final int execId, final String jobId)
+  public Props fetchExecutionJobInputProps(final int execId, final String jobId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
     try {
-      final Pair<Props, Props> props =
-          runner.query(
-              FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
-              new FetchExecutableJobPropsHandler(), execId, jobId);
+      final Pair<Props, Props> props = this.dbOperator.query(
+          FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
+          new FetchExecutableJobPropsHandler(), execId, jobId);
       return props.getFirst();
     } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job params " + execId
@@ -168,15 +148,12 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
     }
   }
 
-  Props fetchExecutionJobOutputProps(final int execId, final String jobId)
+  public Props fetchExecutionJobOutputProps(final int execId, final String jobId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
     try {
-      final Pair<Props, Props> props =
-          runner
-              .query(
-                  FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
-                  new FetchExecutableJobPropsHandler(), execId, jobId);
+      final Pair<Props, Props> props = this.dbOperator.query(
+          FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
+          new FetchExecutableJobPropsHandler(), execId, jobId);
       return props.getFirst();
     } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job params " + execId
@@ -184,56 +161,47 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
     }
   }
 
-  Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
+  public Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
     try {
-      final Pair<Props, Props> props = runner .query(
-                  FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
-                  new FetchExecutableJobPropsHandler(), execId, jobId);
-      return props;
+      return this.dbOperator.query(
+          FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
+          new FetchExecutableJobPropsHandler(), execId, jobId);
     } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job params " + execId
           + " " + jobId, e);
     }
   }
 
-  List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
-                                          final int skip, final int size)
-      throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-
+  public List<ExecutableJobInfo> fetchJobHistory(final int projectId,
+                                                 final String jobId,
+                                                 final int skip,
+                                                 final int size) throws ExecutorManagerException {
     try {
       final List<ExecutableJobInfo> info =
-          runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
+          this.dbOperator.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
               new FetchExecutableJobHandler(), projectId, jobId, skip, size);
       if (info == null || info.isEmpty()) {
         return null;
+      } else {
+        return info;
       }
-      return info;
     } catch (final SQLException e) {
       throw new ExecutorManagerException("Error querying job info " + jobId, e);
     }
   }
 
-  List<Object> fetchAttachments(final int execId, final String jobId, final int attempt)
+  public List<Object> fetchAttachments(final int execId, final String jobId, final int attempt)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-
     try {
-      final String attachments =
-          runner
-              .query(
-                  FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
-                  new FetchExecutableJobAttachmentsHandler(), execId, jobId);
+      final String attachments = this.dbOperator.query(
+          FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
+          new FetchExecutableJobAttachmentsHandler(), execId, jobId);
       if (attachments == null) {
         return null;
+      } else {
+        return (List<Object>) JSONUtils.parseJSONFromString(attachments);
       }
-
-      final List<Object> attachmentList =
-          (List<Object>) JSONUtils.parseJSONFromString(attachments);
-
-      return attachmentList;
     } catch (final IOException e) {
       throw new ExecutorManagerException(
           "Error converting job attachments to JSON " + jobId, e);
@@ -243,49 +211,20 @@ public class ExecutionJobDao extends AbstractJdbcLoader{
     }
   }
 
-  void uploadAttachmentFile(final ExecutableNode node, final File file)
-      throws  ExecutorManagerException {
-    final Connection connection = getConnection();
-    try {
-      uploadAttachmentFile(connection, node, file, this.defaultEncodingType);
-      connection.commit();
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error committing attachments ", e);
-    } catch (final IOException e) {
-      throw new ExecutorManagerException("Error uploading attachments ", e);
-    } finally {
-      DbUtils.closeQuietly(connection);
-    }
-  }
-
-  private void uploadAttachmentFile(final Connection connection, final ExecutableNode node,
-                                    final File file, final EncodingType encType) throws SQLException, IOException {
-
-    final String jsonString = FileUtils.readFileToString(file);
-    final byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
-
+  public void uploadAttachmentFile(final ExecutableNode node, final File file)
+      throws ExecutorManagerException {
     final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
         "UPDATE execution_jobs " + "SET attachments=? "
             + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
-
-    final QueryRunner runner = new QueryRunner();
-    runner.update(connection, UPDATE_EXECUTION_NODE_ATTACHMENTS, attachments,
-        node.getExecutableFlow().getExecutionId(), node.getParentFlow()
-            .getNestedId(), node.getId(), node.getAttempt());
-  }
-
-  /**
-   * TODO kunkun-tang: Will be removed during the next refactor.
-   */
-  private Connection getConnection() throws ExecutorManagerException {
-    Connection connection = null;
     try {
-      connection = super.getDBConnection(false);
-    } catch (final Exception e) {
-      DbUtils.closeQuietly(connection);
-      throw new ExecutorManagerException("Error getting DB connection.", e);
+      final String jsonString = FileUtils.readFileToString(file);
+      final byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
+      this.dbOperator.update(UPDATE_EXECUTION_NODE_ATTACHMENTS, attachments,
+          node.getExecutableFlow().getExecutionId(), node.getParentFlow()
+              .getNestedId(), node.getId(), node.getAttempt());
+    } catch (final IOException | SQLException e) {
+      throw new ExecutorManagerException("Error uploading attachments.", e);
     }
-    return connection;
   }
 
   private static class FetchExecutableJobHandler implements
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
index da96fc9..0821814 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
@@ -16,80 +16,75 @@
 
 package azkaban.executor;
 
-import azkaban.database.AbstractJdbcLoader;
+import azkaban.database.EncodingType;
 import azkaban.db.DatabaseOperator;
-import azkaban.metrics.CommonMetrics;
+import azkaban.db.DatabaseTransOperator;
+import azkaban.db.SQLTransaction;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.Pair;
-import azkaban.utils.Props;
 import java.io.BufferedInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.apache.commons.dbutils.DbUtils;
-import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
+
 @Singleton
-public class ExecutionLogsDao extends AbstractJdbcLoader{
+public class ExecutionLogsDao {
 
+  private static final Logger logger = Logger.getLogger(ExecutionLogsDao.class);
   private final DatabaseOperator dbOperator;
   private final EncodingType defaultEncodingType = EncodingType.GZIP;
 
   @Inject
-  ExecutionLogsDao(final Props props, final CommonMetrics commonMetrics,
-                   final DatabaseOperator dbOperator) {
-    super(props, commonMetrics);
+  ExecutionLogsDao(final DatabaseOperator dbOperator) {
     this.dbOperator = dbOperator;
   }
 
+  // TODO kunkun-tang: the interface's parameter is called endByte, but actually is length.
   LogData fetchLogs(final int execId, final String name, final int attempt,
                     final int startByte,
                     final int length) throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-
     final FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
     try {
-      final LogData result =
-          runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name,
-              attempt, startByte, startByte + length);
-      return result;
+      return this.dbOperator.query(FetchLogsHandler.FETCH_LOGS, handler,
+          execId, name, attempt, startByte, startByte + length);
     } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching logs " + execId
           + " : " + name, e);
     }
   }
 
-  void uploadLogFile(final int execId, final String name, final int attempt,
-                     final File... files)
-      throws ExecutorManagerException {
-    final Connection connection = getConnection();
+  public void uploadLogFile(final int execId, final String name, final int attempt,
+                            final File... files) throws ExecutorManagerException {
+    final SQLTransaction<Integer> transaction = transOperator -> {
+      uploadLogFile(transOperator, execId, name, attempt, files, this.defaultEncodingType);
+      transOperator.getConnection().commit();
+      return 1;
+    };
     try {
-      uploadLogFile(connection, execId, name, attempt, files,
-          getDefaultEncodingType());
-      connection.commit();
-    } catch (final SQLException | IOException e) {
-      throw new ExecutorManagerException("Error committing log", e);
-    } finally {
-      DbUtils.closeQuietly(connection);
+      this.dbOperator.transaction(transaction);
+    } catch (final SQLException e) {
+      logger.error("uploadLogFile failed.", e);
+      throw new ExecutorManagerException("uploadLogFile failed.", e);
     }
   }
 
-  void uploadLogFile(final Connection connection, final int execId, final String name,
-                     final int attempt, final File[] files, final EncodingType encType)
-      throws ExecutorManagerException, IOException {
+  private void uploadLogFile(final DatabaseTransOperator transOperator, final int execId, final String name,
+                             final int attempt, final File[] files, final EncodingType encType)
+      throws SQLException {
     // 50K buffer... if logs are greater than this, we chunk.
     // However, we better prevent large log files from being uploaded somehow
     final byte[] buffer = new byte[50 * 1024];
@@ -107,7 +102,7 @@ public class ExecutionLogsDao extends AbstractJdbcLoader{
           while (size >= 0) {
             if (pos + size == buffer.length) {
               // Flush here.
-              uploadLogPart(connection, execId, name, attempt, startByte,
+              uploadLogPart(transOperator, execId, name, attempt, startByte,
                   startByte + buffer.length, encType, buffer, buffer.length);
 
               pos = 0;
@@ -127,13 +122,15 @@ public class ExecutionLogsDao extends AbstractJdbcLoader{
 
       // Final commit of buffer.
       if (pos > 0) {
-            uploadLogPart(connection, execId, name, attempt, startByte, startByte
+        uploadLogPart(transOperator, execId, name, attempt, startByte, startByte
             + pos, encType, buffer, pos);
       }
     } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error writing log part.", e);
+      logger.error("Error writing log part.", e);
+      throw new SQLException("Error writing log part", e);
     } catch (final IOException e) {
-      throw new ExecutorManagerException("Error chunking", e);
+      logger.error("Error chunking.", e);
+      throw new SQLException("Error chunking", e);
     }
   }
 
@@ -141,35 +138,25 @@ public class ExecutionLogsDao extends AbstractJdbcLoader{
       throws ExecutorManagerException {
     final String DELETE_BY_TIME =
         "DELETE FROM execution_logs WHERE upload_time < ?";
-
-    final QueryRunner runner = this.createQueryRunner();
-    int updateNum = 0;
     try {
-      updateNum = runner.update(DELETE_BY_TIME, millis);
+      return this.dbOperator.update(DELETE_BY_TIME, millis);
     } catch (final SQLException e) {
-      e.printStackTrace();
+      logger.error("delete execution logs failed", e);
       throw new ExecutorManagerException(
           "Error deleting old execution_logs before " + millis, e);
     }
-
-    return updateNum;
   }
 
-  // TODO kunkun-tang: Will be removed in the future refactor.
-  private EncodingType getDefaultEncodingType() {
-    return this.defaultEncodingType;
-  }
-
-  private void uploadLogPart(final Connection connection, final int execId, final String name,
+  private void uploadLogPart(final DatabaseTransOperator transOperator, final int execId,
+                             final String name,
                              final int attempt, final int startByte, final int endByte,
                              final EncodingType encType,
-                             final byte[] buffer, final int length) throws SQLException, IOException {
-    final String INSERT_EXECUTION_LOGS =
-        "INSERT INTO execution_logs "
-            + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
-            + "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
+                             final byte[] buffer, final int length)
+      throws SQLException, IOException {
+    final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs "
+        + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
+        + "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
 
-    final QueryRunner runner = new QueryRunner();
     byte[] buf = buffer;
     if (encType == EncodingType.GZIP) {
       buf = GZIPUtils.gzipBytes(buf, 0, length);
@@ -177,23 +164,11 @@ public class ExecutionLogsDao extends AbstractJdbcLoader{
       buf = Arrays.copyOf(buffer, length);
     }
 
-    runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt,
+    transOperator.update(INSERT_EXECUTION_LOGS, execId, name, attempt,
         encType.getNumVal(), startByte, startByte + length, buf, DateTime.now()
             .getMillis());
   }
 
-  // TODO kunkun-tang: should be removed in future refactor.
-  private Connection getConnection() throws ExecutorManagerException {
-    Connection connection = null;
-    try {
-      connection = super.getDBConnection(false);
-    } catch (final Exception e) {
-      DbUtils.closeQuietly(connection);
-      throw new ExecutorManagerException("Error getting DB connection.", e);
-    }
-    return connection;
-  }
-
   private static class FetchLogsHandler implements ResultSetHandler<LogData> {
 
     private static final String FETCH_LOGS =
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
index 6c7f44d..d4e581c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
@@ -16,11 +16,8 @@
 
 package azkaban.executor;
 
-import azkaban.database.AbstractJdbcLoader;
 import azkaban.db.DatabaseOperator;
 import azkaban.executor.ExecutorLogEvent.EventType;
-import azkaban.metrics.CommonMetrics;
-import azkaban.utils.Props;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -29,31 +26,25 @@ import java.util.Date;
 import java.util.List;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 
 @Singleton
-public class ExecutorEventsDao extends AbstractJdbcLoader {
+public class ExecutorEventsDao {
 
   private final DatabaseOperator dbOperator;
 
   @Inject
-  public ExecutorEventsDao(final Props props, final CommonMetrics commonMetrics,
-                           final DatabaseOperator dbOperator) {
-    super(props, commonMetrics);
+  public ExecutorEventsDao(final DatabaseOperator dbOperator) {
     this.dbOperator = dbOperator;
   }
 
   public void postExecutorEvent(final Executor executor, final EventType type, final String user,
                                 final String message) throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-
     final String INSERT_PROJECT_EVENTS =
         "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
-    final Date updateDate = new Date();
     try {
-      runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
-          updateDate, user, message);
+      this.dbOperator.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
+          new Date(), user, message);
     } catch (final SQLException e) {
       throw new ExecutorManagerException("Failed to post executor event", e);
     }
@@ -62,20 +53,13 @@ public class ExecutorEventsDao extends AbstractJdbcLoader {
   public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
                                                   final int offset)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-
-    final ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
-    List<ExecutorLogEvent> events = null;
     try {
-      events =
-          runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
-              logHandler, executor.getId(), num, offset);
+      return this.dbOperator.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
+          new ExecutorLogsResultHandler(), executor.getId(), num, offset);
     } catch (final SQLException e) {
       throw new ExecutorManagerException(
           "Failed to fetch events for executor id : " + executor.getId(), e);
     }
-
-    return events;
   }
 
   /**
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
index d1ba681..0b3f06c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -16,13 +16,11 @@
 
 package azkaban.executor;
 
-import azkaban.database.AbstractJdbcLoader;
+import azkaban.database.EncodingType;
 import azkaban.db.DatabaseOperator;
-import azkaban.metrics.CommonMetrics;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
-import azkaban.utils.Props;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -33,33 +31,25 @@ import java.util.List;
 import java.util.Map;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.log4j.Logger;
 
 @Singleton
-public class FetchActiveFlowDao extends AbstractJdbcLoader {
+public class FetchActiveFlowDao {
 
   private static final Logger logger = Logger.getLogger(FetchActiveFlowDao.class);
   private final DatabaseOperator dbOperator;
 
   @Inject
-  public FetchActiveFlowDao(final Props props, final CommonMetrics commonMetrics,
-                            final DatabaseOperator dbOperator) {
-    super(props, commonMetrics);
+  public FetchActiveFlowDao(final DatabaseOperator dbOperator) {
     this.dbOperator = dbOperator;
   }
 
   Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
-
     try {
-      final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
-          runner.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
-              flowHandler);
-      return properties;
+      return this.dbOperator.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
+          new FetchActiveExecutableFlows());
     } catch (final SQLException e) {
       throw new ExecutorManagerException("Error fetching active flows", e);
     }
@@ -67,14 +57,11 @@ public class FetchActiveFlowDao extends AbstractJdbcLoader {
 
   Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
-
     try {
       final List<Pair<ExecutionReference, ExecutableFlow>> flows =
-          runner.query(
-              FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
-              flowHandler, execId);
+          this.dbOperator
+              .query(FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
+                  new FetchActiveExecutableFlowByExecId(), execId);
       if (flows.isEmpty()) {
         return null;
       } else {
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 0818532..3d1da87 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -25,7 +25,6 @@ import azkaban.utils.Props;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.io.File;
-import java.io.IOException;
 import java.sql.Connection;
 import java.time.Duration;
 import java.util.List;
@@ -263,14 +262,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     this.executionLogsDao.uploadLogFile(execId, name, attempt, files);
   }
 
-  private void uploadLogFile(final Connection connection, final int execId, final String name,
-                             final int attempt, final File[] files, final EncodingType encType)
-      throws ExecutorManagerException, IOException {
-    // 50K buffer... if logs are greater than this, we chunk.
-    // However, we better prevent large log files from being uploaded somehow
-    this.executionLogsDao.uploadLogFile(connection, execId, name, attempt, files, encType);
-  }
-
   @Override
   public void uploadAttachmentFile(final ExecutableNode node, final File file)
       throws ExecutorManagerException {
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
index 2cc254e..badcb18 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionFlowDaoTest.java
@@ -21,17 +21,22 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import azkaban.db.DatabaseOperator;
 import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Pair;
+import azkaban.utils.Props;
 import azkaban.utils.TestUtils;
+import java.io.File;
 import java.sql.SQLException;
 import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import org.joda.time.DateTimeUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class ExecutionFlowDaoTest {
@@ -43,6 +48,8 @@ public class ExecutionFlowDaoTest {
   private ExecutionFlowDao executionFlowDao;
   private ExecutorDao executorDao;
   private AssignExecutorDao assignExecutor;
+  private FetchActiveFlowDao fetchActiveFlowDao;
+  private ExecutionJobDao executionJobDao;
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -62,8 +69,10 @@ public class ExecutionFlowDaoTest {
   @Before
   public void setup() {
     this.executionFlowDao = new ExecutionFlowDao(dbOperator);
-    this.executorDao= new ExecutorDao(dbOperator);
-    this.assignExecutor= new AssignExecutorDao(dbOperator, this.executorDao);
+    this.executorDao = new ExecutorDao(dbOperator);
+    this.assignExecutor = new AssignExecutorDao(dbOperator, this.executorDao);
+    this.fetchActiveFlowDao = new FetchActiveFlowDao(dbOperator);
+    this.executionJobDao = new ExecutionJobDao(dbOperator);
   }
 
   @After
@@ -225,6 +234,111 @@ public class ExecutionFlowDaoTest {
         .hasMessageContaining("non-existent execution");
   }
 
+
+  @Test
+  public void testFetchActiveFlowsExecutorAssigned() throws Exception {
+
+    // Upload flow1, executor assigned
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    this.executionFlowDao.uploadExecutableFlow(flow1);
+    final Executor executor = this.executorDao.addExecutor("test", 1);
+    this.assignExecutor.assignExecutor(executor.getId(), flow1.getExecutionId());
+
+    // Upload flow2, executor not assigned
+    final ExecutableFlow flow2 = TestUtils.createExecutableFlow("exectest1", "exec2");
+    this.executionFlowDao.uploadExecutableFlow(flow2);
+
+    final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
+        this.fetchActiveFlowDao.fetchActiveFlows();
+
+    assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isTrue();
+    assertThat(activeFlows1.containsKey(flow2.getExecutionId())).isFalse();
+    final ExecutableFlow flow1Result =
+        activeFlows1.get(flow1.getExecutionId()).getSecond();
+    assertTwoFlowSame(flow1Result, flow1);
+  }
+
+  @Test
+  public void testFetchActiveFlowsStatusChanged() throws Exception {
+    final ExecutableFlow flow1 = TestUtils.createExecutableFlow("exectest1", "exec1");
+    this.executionFlowDao.uploadExecutableFlow(flow1);
+    final Executor executor = this.executorDao.addExecutor("test", 1);
+    this.assignExecutor.assignExecutor(executor.getId(), flow1.getExecutionId());
+
+    Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows1 =
+        this.fetchActiveFlowDao.fetchActiveFlows();
+
+    assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isTrue();
+
+    // When flow status becomes SUCCEEDED/KILLED/FAILED, it should not be in active state
+    flow1.setStatus(Status.SUCCEEDED);
+    this.executionFlowDao.updateExecutableFlow(flow1);
+    activeFlows1 = this.fetchActiveFlowDao.fetchActiveFlows();
+    assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isFalse();
+
+    flow1.setStatus(Status.KILLED);
+    this.executionFlowDao.updateExecutableFlow(flow1);
+    activeFlows1 = this.fetchActiveFlowDao.fetchActiveFlows();
+    assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isFalse();
+
+    flow1.setStatus(Status.FAILED);
+    this.executionFlowDao.updateExecutableFlow(flow1);
+    activeFlows1 = this.fetchActiveFlowDao.fetchActiveFlows();
+    assertThat(activeFlows1.containsKey(flow1.getExecutionId())).isFalse();
+  }
+
+  @Test @Ignore
+  // TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
+  // test methods as well.
+  public void testFetchActiveFlowsReferenceChanged() throws Exception {
+  }
+
+  @Test @Ignore
+  // TODO jamiesjc: Active_execution_flow table is already deprecated. we should remove related
+  // test methods as well.
+  public void testFetchActiveFlowByExecId() throws Exception {
+  }
+
+  @Test
+  public void testUploadAndFetchExecutableNode() throws Exception {
+
+    final ExecutableFlow flow = TestUtils.createExecutableFlow("exectest1", "exec1");
+    flow.setExecutionId(10);
+
+    final File jobFile = ExecutionsTestUtil.getFlowFile("exectest1", "job10.job");
+    final Props props = new Props(null, jobFile);
+    props.put("test", "test2");
+    final ExecutableNode oldNode = flow.getExecutableNode("job10");
+    oldNode.setStartTime(System.currentTimeMillis());
+    this.executionJobDao.uploadExecutableNode(oldNode, props);
+
+    final ExecutableJobInfo info = this.executionJobDao.fetchJobInfo(10, "job10", 0);
+    assertThat(flow.getEndTime()).isEqualTo(info.getEndTime());
+    assertThat(flow.getProjectId()).isEqualTo(info.getProjectId());
+    assertThat(flow.getVersion()).isEqualTo(info.getVersion());
+    assertThat(flow.getFlowId()).isEqualTo(info.getFlowId());
+
+    assertThat(oldNode.getId()).isEqualTo(info.getJobId());
+    assertThat(oldNode.getStatus()).isEqualTo(info.getStatus());
+    assertThat(oldNode.getStartTime()).isEqualTo(info.getStartTime());
+
+    // Fetch props
+    final Props outputProps = new Props();
+    outputProps.put("hello", "output");
+    oldNode.setOutputProps(outputProps);
+    oldNode.setEndTime(System.currentTimeMillis());
+    this.executionJobDao.updateExecutableNode(oldNode);
+
+    final Props fInputProps = this.executionJobDao.fetchExecutionJobInputProps(10, "job10");
+    final Props fOutputProps = this.executionJobDao.fetchExecutionJobOutputProps(10, "job10");
+    final Pair<Props, Props> inOutProps = this.executionJobDao.fetchExecutionJobProps(10, "job10");
+
+    assertThat(fInputProps.get("test")).isEqualTo("test2");
+    assertThat(fOutputProps.get("hello")).isEqualTo("output");
+    assertThat(inOutProps.getFirst().get("test")).isEqualTo("test2");
+    assertThat(inOutProps.getSecond().get("hello")).isEqualTo("output");
+  }
+
   private void assertTwoFlowSame(final ExecutableFlow flow1, final ExecutableFlow flow2) {
     assertThat(flow1.getExecutionId()).isEqualTo(flow2.getExecutionId());
     assertThat(flow1.getStatus()).isEqualTo(flow2.getStatus());
@@ -238,4 +352,5 @@ public class ExecutionFlowDaoTest {
         .isEqualTo(flow2.getExecutionOptions().getFailureAction());
     assertThat(new HashSet<>(flow1.getEndNodes())).isEqualTo(new HashSet<>(flow2.getEndNodes()));
   }
+
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java
new file mode 100644
index 0000000..f22a597
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionLogsDaoTest.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.db.DatabaseOperator;
+import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.FileIOUtils.LogData;
+import java.io.File;
+import java.sql.SQLException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ExecutionLogsDaoTest {
+
+  private static final String LOG_TEST_DIR_NAME = "logtest";
+  private static DatabaseOperator dbOperator;
+  private ExecutionLogsDao executionLogsDao;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    dbOperator = Utils.initTestDB();
+  }
+
+  @AfterClass
+  public static void destroyDB() throws Exception {
+    try {
+      dbOperator.update("DROP ALL OBJECTS");
+      dbOperator.update("SHUTDOWN");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Before
+  public void setup() {
+    this.executionLogsDao = new ExecutionLogsDao(dbOperator);
+  }
+
+  @After
+  public void clearDB() {
+    try {
+      dbOperator.update("delete from execution_logs");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testSmallUploadLog() throws ExecutorManagerException {
+    final File logDir = ExecutionsTestUtil.getFlowDir(LOG_TEST_DIR_NAME);
+    final File[] smalllog =
+        { new File(logDir, "log1.log"), new File(logDir, "log2.log"),
+            new File(logDir, "log3.log") };
+
+    this.executionLogsDao.uploadLogFile(1, "smallFiles", 0, smalllog);
+
+    final LogData data = this.executionLogsDao.fetchLogs(1, "smallFiles", 0, 0, 50000);
+    assertThat(data).isNotNull();
+    assertThat(data.getLength()).isEqualTo(53);
+    System.out.println(data.toString());
+
+    final LogData data2 = this.executionLogsDao.fetchLogs(1, "smallFiles", 0, 10, 20);
+    System.out.println(data2.toString());
+
+    assertThat(data2).isNotNull();
+    assertThat(data2.getLength()).isEqualTo(20);
+  }
+
+  @Test
+  public void testLargeUploadLog() throws ExecutorManagerException {
+    final File logDir = ExecutionsTestUtil.getFlowDir(LOG_TEST_DIR_NAME);
+
+    // Multiple of 255 for Henry the Eigth
+    final File[] largelog =
+        { new File(logDir, "largeLog1.log"), new File(logDir, "largeLog2.log"),
+            new File(logDir, "largeLog3.log") };
+
+    this.executionLogsDao.uploadLogFile(1, "largeFiles", 0, largelog);
+
+    final LogData logsResult = this.executionLogsDao.fetchLogs(1, "largeFiles", 0, 0, 64000);
+    assertThat(logsResult).isNotNull();
+    assertThat(logsResult.getLength()).isEqualTo(64000);
+
+    final LogData logsResult2 = this.executionLogsDao.fetchLogs(1, "largeFiles", 0, 1000, 64000);
+    assertThat(logsResult2).isNotNull();
+    assertThat(logsResult2.getLength()).isEqualTo(64000);
+
+    final LogData logsResult3 = this.executionLogsDao.fetchLogs(1, "largeFiles", 0, 150000, 250000);
+    assertThat(logsResult3).isNotNull();
+    assertThat(logsResult3.getLength()).isEqualTo(185493);
+  }
+}