azkaban-aplcache

Use Intellij's Delegate feature to automate refactor process.

8/20/2017 12:19:41 AM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java
new file mode 100644
index 0000000..0fdc1fc
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ActiveExecutingFlowsDao.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.Props;
+import java.sql.SQLException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.QueryRunner;
+
+@Singleton
+public class ActiveExecutingFlowsDao extends AbstractJdbcLoader{
+
+  private final DatabaseOperator dbOperator;
+
+  @Inject
+  ActiveExecutingFlowsDao(final Props props, final CommonMetrics commonMetrics,
+                          final DatabaseOperator dbOperator) {
+    super(props, commonMetrics);
+    this.dbOperator = dbOperator;
+  }
+
+  void addActiveExecutableReference(final ExecutionReference reference)
+      throws ExecutorManagerException {
+    final String INSERT =
+        "INSERT INTO active_executing_flows "
+            + "(exec_id, update_time) values (?,?)";
+    final QueryRunner runner = createQueryRunner();
+
+    try {
+      runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException(
+          "Error updating active flow reference " + reference.getExecId(), e);
+    }
+  }
+
+  void removeActiveExecutableReference(final int execid)
+      throws ExecutorManagerException {
+    final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
+
+    final QueryRunner runner = createQueryRunner();
+    try {
+      runner.update(DELETE, execid);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException(
+          "Error deleting active flow reference " + execid, e);
+    }
+  }
+
+  boolean updateExecutableReference(final int execId, final long updateTime)
+      throws ExecutorManagerException {
+    final String DELETE =
+        "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
+
+    final QueryRunner runner = createQueryRunner();
+    int updateNum = 0;
+    try {
+      updateNum = runner.update(DELETE, updateTime, execId);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException(
+          "Error deleting active flow reference " + execId, e);
+    }
+
+    // Should be 1.
+    return updateNum > 0;
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
new file mode 100644
index 0000000..5c226c1
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionJobDao.java
@@ -0,0 +1,418 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class ExecutionJobDao extends AbstractJdbcLoader{
+
+  private static final Logger logger = Logger.getLogger(ExecutorDao.class);
+  private final DatabaseOperator dbOperator;
+  private final EncodingType defaultEncodingType = EncodingType.GZIP;
+
+  @Inject
+  ExecutionJobDao(final Props props, final CommonMetrics commonMetrics,
+                  final DatabaseOperator databaseOperator) {
+    super(props, commonMetrics);
+    this.dbOperator = databaseOperator;
+  }
+
+  void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
+      throws ExecutorManagerException {
+    final String INSERT_EXECUTION_NODE =
+        "INSERT INTO execution_jobs "
+            + "(exec_id, project_id, version, flow_id, job_id, start_time, "
+            + "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
+
+    byte[] inputParam = null;
+    if (inputProps != null) {
+      try {
+        final String jsonString =
+            JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
+        inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
+      } catch (final IOException e) {
+        throw new ExecutorManagerException("Error encoding input params");
+      }
+    }
+
+    final ExecutableFlow flow = node.getExecutableFlow();
+    final String flowId = node.getParentFlow().getFlowPath();
+    System.out.println("Uploading flowId " + flowId);
+    final QueryRunner runner = createQueryRunner();
+    try {
+      runner.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
+          flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
+          node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
+          inputParam, node.getAttempt());
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error writing job " + node.getId(), e);
+    }
+  }
+
+  void updateExecutableNode(final ExecutableNode node)
+      throws ExecutorManagerException {
+    final String UPSERT_EXECUTION_NODE =
+        "UPDATE execution_jobs "
+            + "SET start_time=?, end_time=?, status=?, output_params=? "
+            + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+
+    byte[] outputParam = null;
+    final Props outputProps = node.getOutputProps();
+    if (outputProps != null) {
+      try {
+        final String jsonString =
+            JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
+        outputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
+      } catch (final IOException e) {
+        throw new ExecutorManagerException("Error encoding input params");
+      }
+    }
+
+    final QueryRunner runner = createQueryRunner();
+    try {
+      runner.update(UPSERT_EXECUTION_NODE, node.getStartTime(), node
+          .getEndTime(), node.getStatus().getNumVal(), outputParam, node
+          .getExecutableFlow().getExecutionId(), node.getParentFlow()
+          .getFlowPath(), node.getId(), node.getAttempt());
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error updating job " + node.getId(),
+          e);
+    }
+  }
+
+  List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+
+    try {
+      final List<ExecutableJobInfo> info =
+          runner.query(
+              FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
+              new FetchExecutableJobHandler(), execId, jobId);
+      if (info == null || info.isEmpty()) {
+        return null;
+      }
+      return info;
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error querying job info " + jobId, e);
+    }
+  }
+
+  ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempts)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+
+    try {
+      final List<ExecutableJobInfo> info =
+          runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
+              new FetchExecutableJobHandler(), execId, jobId, attempts);
+      if (info == null || info.isEmpty()) {
+        return null;
+      }
+      return info.get(0);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error querying job info " + jobId, e);
+    }
+  }
+
+  Props fetchExecutionJobInputProps(final int execId, final String jobId)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    try {
+      final Pair<Props, Props> props =
+          runner.query(
+              FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
+              new FetchExecutableJobPropsHandler(), execId, jobId);
+      return props.getFirst();
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error querying job params " + execId
+          + " " + jobId, e);
+    }
+  }
+
+  Props fetchExecutionJobOutputProps(final int execId, final String jobId)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    try {
+      final Pair<Props, Props> props =
+          runner
+              .query(
+                  FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
+                  new FetchExecutableJobPropsHandler(), execId, jobId);
+      return props.getFirst();
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error querying job params " + execId
+          + " " + jobId, e);
+    }
+  }
+
+  Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    try {
+      final Pair<Props, Props> props = runner .query(
+                  FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
+                  new FetchExecutableJobPropsHandler(), execId, jobId);
+      return props;
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error querying job params " + execId
+          + " " + jobId, e);
+    }
+  }
+
+  List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
+                                          final int skip, final int size)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+
+    try {
+      final List<ExecutableJobInfo> info =
+          runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
+              new FetchExecutableJobHandler(), projectId, jobId, skip, size);
+      if (info == null || info.isEmpty()) {
+        return null;
+      }
+      return info;
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error querying job info " + jobId, e);
+    }
+  }
+
+  List<Object> fetchAttachments(final int execId, final String jobId, final int attempt)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+
+    try {
+      final String attachments =
+          runner
+              .query(
+                  FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
+                  new FetchExecutableJobAttachmentsHandler(), execId, jobId);
+      if (attachments == null) {
+        return null;
+      }
+
+      final List<Object> attachmentList =
+          (List<Object>) JSONUtils.parseJSONFromString(attachments);
+
+      return attachmentList;
+    } catch (final IOException e) {
+      throw new ExecutorManagerException(
+          "Error converting job attachments to JSON " + jobId, e);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException(
+          "Error query job attachments " + jobId, e);
+    }
+  }
+
+  void uploadAttachmentFile(final ExecutableNode node, final File file)
+      throws  ExecutorManagerException {
+    final Connection connection = getConnection();
+    try {
+      uploadAttachmentFile(connection, node, file, this.defaultEncodingType);
+      connection.commit();
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error committing attachments ", e);
+    } catch (final IOException e) {
+      throw new ExecutorManagerException("Error uploading attachments ", e);
+    } finally {
+      DbUtils.closeQuietly(connection);
+    }
+  }
+
+  private void uploadAttachmentFile(final Connection connection, final ExecutableNode node,
+                                    final File file, final EncodingType encType) throws SQLException, IOException {
+
+    final String jsonString = FileUtils.readFileToString(file);
+    final byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
+
+    final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
+        "UPDATE execution_jobs " + "SET attachments=? "
+            + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
+
+    final QueryRunner runner = new QueryRunner();
+    runner.update(connection, UPDATE_EXECUTION_NODE_ATTACHMENTS, attachments,
+        node.getExecutableFlow().getExecutionId(), node.getParentFlow()
+            .getNestedId(), node.getId(), node.getAttempt());
+  }
+
+  /**
+   * TODO kunkun-tang: Will be removed during the next refactor.
+   */
+  private Connection getConnection() throws ExecutorManagerException {
+    Connection connection = null;
+    try {
+      connection = super.getDBConnection(false);
+    } catch (final Exception e) {
+      DbUtils.closeQuietly(connection);
+      throw new ExecutorManagerException("Error getting DB connection.", e);
+    }
+    return connection;
+  }
+
+  private static class FetchExecutableJobHandler implements
+      ResultSetHandler<List<ExecutableJobInfo>> {
+    private static final String FETCH_EXECUTABLE_NODE =
+        "SELECT exec_id, project_id, version, flow_id, job_id, "
+            + "start_time, end_time, status, attempt "
+            + "FROM execution_jobs WHERE exec_id=? "
+            + "AND job_id=? AND attempt=?";
+    private static final String FETCH_EXECUTABLE_NODE_ATTEMPTS =
+        "SELECT exec_id, project_id, version, flow_id, job_id, "
+            + "start_time, end_time, status, attempt FROM execution_jobs "
+            + "WHERE exec_id=? AND job_id=?";
+    private static final String FETCH_PROJECT_EXECUTABLE_NODE =
+        "SELECT exec_id, project_id, version, flow_id, job_id, "
+            + "start_time, end_time, status, attempt FROM execution_jobs "
+            + "WHERE project_id=? AND job_id=? "
+            + "ORDER BY exec_id DESC LIMIT ?, ? ";
+
+    @Override
+    public List<ExecutableJobInfo> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.<ExecutableJobInfo> emptyList();
+      }
+
+      final List<ExecutableJobInfo> execNodes = new ArrayList<>();
+      do {
+        final int execId = rs.getInt(1);
+        final int projectId = rs.getInt(2);
+        final int version = rs.getInt(3);
+        final String flowId = rs.getString(4);
+        final String jobId = rs.getString(5);
+        final long startTime = rs.getLong(6);
+        final long endTime = rs.getLong(7);
+        final Status status = Status.fromInteger(rs.getInt(8));
+        final int attempt = rs.getInt(9);
+
+        final ExecutableJobInfo info =
+            new ExecutableJobInfo(execId, projectId, version, flowId, jobId,
+                startTime, endTime, status, attempt);
+        execNodes.add(info);
+      } while (rs.next());
+
+      return execNodes;
+    }
+  }
+
+  private static class FetchExecutableJobPropsHandler implements
+      ResultSetHandler<Pair<Props, Props>> {
+    private static final String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
+        "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+    private static final String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
+        "SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
+    private static final String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE =
+        "SELECT input_params, output_params "
+            + "FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+    @Override
+    public Pair<Props, Props> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return new Pair<>(null, null);
+      }
+
+      if (rs.getMetaData().getColumnCount() > 1) {
+        final byte[] input = rs.getBytes(1);
+        final byte[] output = rs.getBytes(2);
+
+        Props inputProps = null;
+        Props outputProps = null;
+        try {
+          if (input != null) {
+            final String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
+            inputProps =
+                PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
+                    .parseJSONFromString(jsonInputString));
+
+          }
+          if (output != null) {
+            final String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
+            outputProps =
+                PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
+                    .parseJSONFromString(jsonOutputString));
+          }
+        } catch (final IOException e) {
+          throw new SQLException("Error decoding param data", e);
+        }
+
+        return new Pair<>(inputProps, outputProps);
+      } else {
+        final byte[] params = rs.getBytes(1);
+        Props props = null;
+        try {
+          if (params != null) {
+            final String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");
+
+            props =
+                PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
+                    .parseJSONFromString(jsonProps));
+          }
+        } catch (final IOException e) {
+          throw new SQLException("Error decoding param data", e);
+        }
+
+        return new Pair<>(props, null);
+      }
+    }
+  }
+
+  private static class FetchExecutableJobAttachmentsHandler implements
+      ResultSetHandler<String> {
+    private static final String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
+        "SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
+
+    @Override
+    public String handle(final ResultSet rs) throws SQLException {
+      String attachmentsJson = null;
+      if (rs.next()) {
+        try {
+          final byte[] attachments = rs.getBytes(1);
+          if (attachments != null) {
+            attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
+          }
+        } catch (final IOException e) {
+          throw new SQLException("Error decoding job attachments", e);
+        }
+      }
+      return attachmentsJson;
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
new file mode 100644
index 0000000..da96fc9
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionLogsDao.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.FileIOUtils;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.io.IOUtils;
+import org.joda.time.DateTime;
+
+@Singleton
+public class ExecutionLogsDao extends AbstractJdbcLoader{
+
+  private final DatabaseOperator dbOperator;
+  private final EncodingType defaultEncodingType = EncodingType.GZIP;
+
+  @Inject
+  ExecutionLogsDao(final Props props, final CommonMetrics commonMetrics,
+                   final DatabaseOperator dbOperator) {
+    super(props, commonMetrics);
+    this.dbOperator = dbOperator;
+  }
+
+  LogData fetchLogs(final int execId, final String name, final int attempt,
+                    final int startByte,
+                    final int length) throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+
+    final FetchLogsHandler handler = new FetchLogsHandler(startByte, length + startByte);
+    try {
+      final LogData result =
+          runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name,
+              attempt, startByte, startByte + length);
+      return result;
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching logs " + execId
+          + " : " + name, e);
+    }
+  }
+
+  void uploadLogFile(final int execId, final String name, final int attempt,
+                     final File... files)
+      throws ExecutorManagerException {
+    final Connection connection = getConnection();
+    try {
+      uploadLogFile(connection, execId, name, attempt, files,
+          getDefaultEncodingType());
+      connection.commit();
+    } catch (final SQLException | IOException e) {
+      throw new ExecutorManagerException("Error committing log", e);
+    } finally {
+      DbUtils.closeQuietly(connection);
+    }
+  }
+
+  void uploadLogFile(final Connection connection, final int execId, final String name,
+                     final int attempt, final File[] files, final EncodingType encType)
+      throws ExecutorManagerException, IOException {
+    // 50K buffer... if logs are greater than this, we chunk.
+    // However, we better prevent large log files from being uploaded somehow
+    final byte[] buffer = new byte[50 * 1024];
+    int pos = 0;
+    int length = buffer.length;
+    int startByte = 0;
+    try {
+      for (int i = 0; i < files.length; ++i) {
+        final File file = files[i];
+
+        final BufferedInputStream bufferedStream =
+            new BufferedInputStream(new FileInputStream(file));
+        try {
+          int size = bufferedStream.read(buffer, pos, length);
+          while (size >= 0) {
+            if (pos + size == buffer.length) {
+              // Flush here.
+              uploadLogPart(connection, execId, name, attempt, startByte,
+                  startByte + buffer.length, encType, buffer, buffer.length);
+
+              pos = 0;
+              length = buffer.length;
+              startByte += buffer.length;
+            } else {
+              // Usually end of file.
+              pos += size;
+              length = buffer.length - pos;
+            }
+            size = bufferedStream.read(buffer, pos, length);
+          }
+        } finally {
+          IOUtils.closeQuietly(bufferedStream);
+        }
+      }
+
+      // Final commit of buffer.
+      if (pos > 0) {
+            uploadLogPart(connection, execId, name, attempt, startByte, startByte
+            + pos, encType, buffer, pos);
+      }
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error writing log part.", e);
+    } catch (final IOException e) {
+      throw new ExecutorManagerException("Error chunking", e);
+    }
+  }
+
+  int removeExecutionLogsByTime(final long millis)
+      throws ExecutorManagerException {
+    final String DELETE_BY_TIME =
+        "DELETE FROM execution_logs WHERE upload_time < ?";
+
+    final QueryRunner runner = this.createQueryRunner();
+    int updateNum = 0;
+    try {
+      updateNum = runner.update(DELETE_BY_TIME, millis);
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      throw new ExecutorManagerException(
+          "Error deleting old execution_logs before " + millis, e);
+    }
+
+    return updateNum;
+  }
+
+  // TODO kunkun-tang: Will be removed in the future refactor.
+  private EncodingType getDefaultEncodingType() {
+    return this.defaultEncodingType;
+  }
+
+  private void uploadLogPart(final Connection connection, final int execId, final String name,
+                             final int attempt, final int startByte, final int endByte,
+                             final EncodingType encType,
+                             final byte[] buffer, final int length) throws SQLException, IOException {
+    final String INSERT_EXECUTION_LOGS =
+        "INSERT INTO execution_logs "
+            + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
+            + "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
+
+    final QueryRunner runner = new QueryRunner();
+    byte[] buf = buffer;
+    if (encType == EncodingType.GZIP) {
+      buf = GZIPUtils.gzipBytes(buf, 0, length);
+    } else if (length < buf.length) {
+      buf = Arrays.copyOf(buffer, length);
+    }
+
+    runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt,
+        encType.getNumVal(), startByte, startByte + length, buf, DateTime.now()
+            .getMillis());
+  }
+
+  // TODO kunkun-tang: should be removed in future refactor.
+  private Connection getConnection() throws ExecutorManagerException {
+    Connection connection = null;
+    try {
+      connection = super.getDBConnection(false);
+    } catch (final Exception e) {
+      DbUtils.closeQuietly(connection);
+      throw new ExecutorManagerException("Error getting DB connection.", e);
+    }
+    return connection;
+  }
+
+  private static class FetchLogsHandler implements ResultSetHandler<LogData> {
+
+    private static final String FETCH_LOGS =
+        "SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log "
+            + "FROM execution_logs "
+            + "WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? "
+            + "AND start_byte <= ? ORDER BY start_byte";
+
+    private final int startByte;
+    private final int endByte;
+
+    FetchLogsHandler(final int startByte, final int endByte) {
+      this.startByte = startByte;
+      this.endByte = endByte;
+    }
+
+    @Override
+    public LogData handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return null;
+      }
+
+      final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+
+      do {
+        // int execId = rs.getInt(1);
+        // String name = rs.getString(2);
+        final int attempt = rs.getInt(3);
+        final EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
+        final int startByte = rs.getInt(5);
+        final int endByte = rs.getInt(6);
+
+        final byte[] data = rs.getBytes(7);
+
+        final int offset =
+            this.startByte > startByte ? this.startByte - startByte : 0;
+        final int length =
+            this.endByte < endByte ? this.endByte - startByte - offset
+                : endByte - startByte - offset;
+        try {
+          byte[] buffer = data;
+          if (encType == EncodingType.GZIP) {
+            buffer = GZIPUtils.unGzipBytes(data);
+          }
+
+          byteStream.write(buffer, offset, length);
+        } catch (final IOException e) {
+          throw new SQLException(e);
+        }
+      } while (rs.next());
+
+      final byte[] buffer = byteStream.toByteArray();
+      final Pair<Integer, Integer> result =
+          FileIOUtils.getUtf8Range(buffer, 0, buffer.length);
+
+      return new LogData(this.startByte + result.getFirst(), result.getSecond(),
+          new String(buffer, result.getFirst(), result.getSecond(), StandardCharsets.UTF_8));
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
new file mode 100644
index 0000000..6c7f44d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorEventsDao.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.executor.ExecutorLogEvent.EventType;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.Props;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+
+@Singleton
+public class ExecutorEventsDao extends AbstractJdbcLoader {
+
+  private final DatabaseOperator dbOperator;
+
+  @Inject
+  public ExecutorEventsDao(final Props props, final CommonMetrics commonMetrics,
+                           final DatabaseOperator dbOperator) {
+    super(props, commonMetrics);
+    this.dbOperator = dbOperator;
+  }
+
+  public void postExecutorEvent(final Executor executor, final EventType type, final String user,
+                                final String message) throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+
+    final String INSERT_PROJECT_EVENTS =
+        "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
+    final Date updateDate = new Date();
+    try {
+      runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
+          updateDate, user, message);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Failed to post executor event", e);
+    }
+  }
+
+  public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
+                                                  final int offset)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+
+    final ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
+    List<ExecutorLogEvent> events = null;
+    try {
+      events =
+          runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
+              logHandler, executor.getId(), num, offset);
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException(
+          "Failed to fetch events for executor id : " + executor.getId(), e);
+    }
+
+    return events;
+  }
+
+  /**
+   * JDBC ResultSetHandler to fetch records from executor_events table
+   */
+  private static class ExecutorLogsResultHandler implements
+      ResultSetHandler<List<ExecutorLogEvent>> {
+
+    private static final String SELECT_EXECUTOR_EVENTS_ORDER =
+        "SELECT executor_id, event_type, event_time, username, message FROM executor_events "
+            + " WHERE executor_id=? ORDER BY event_time LIMIT ? OFFSET ?";
+
+    @Override
+    public List<ExecutorLogEvent> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.<ExecutorLogEvent>emptyList();
+      }
+
+      final ArrayList<ExecutorLogEvent> events = new ArrayList<>();
+      do {
+        final int executorId = rs.getInt(1);
+        final int eventType = rs.getInt(2);
+        final Date eventTime = rs.getDate(3);
+        final String username = rs.getString(4);
+        final String message = rs.getString(5);
+
+        final ExecutorLogEvent event =
+            new ExecutorLogEvent(executorId, username, eventTime,
+                EventType.fromInteger(eventType), message);
+        events.add(event);
+      } while (rs.next());
+
+      return events;
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
new file mode 100644
index 0000000..d1ba681
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/FetchActiveFlowDao.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.db.DatabaseOperator;
+import azkaban.metrics.CommonMetrics;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+
+@Singleton
+public class FetchActiveFlowDao extends AbstractJdbcLoader {
+
+  private static final Logger logger = Logger.getLogger(FetchActiveFlowDao.class);
+  private final DatabaseOperator dbOperator;
+
+  @Inject
+  public FetchActiveFlowDao(final Props props, final CommonMetrics commonMetrics,
+                            final DatabaseOperator dbOperator) {
+    super(props, commonMetrics);
+    this.dbOperator = dbOperator;
+  }
+
+  Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    final FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
+
+    try {
+      final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
+          runner.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
+              flowHandler);
+      return properties;
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching active flows", e);
+    }
+  }
+
+  Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
+      throws ExecutorManagerException {
+    final QueryRunner runner = createQueryRunner();
+    final FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
+
+    try {
+      final List<Pair<ExecutionReference, ExecutableFlow>> flows =
+          runner.query(
+              FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
+              flowHandler, execId);
+      if (flows.isEmpty()) {
+        return null;
+      } else {
+        return flows.get(0);
+      }
+    } catch (final SQLException e) {
+      throw new ExecutorManagerException("Error fetching active flows by exec id", e);
+    }
+  }
+
+  private static class FetchActiveExecutableFlows implements
+      ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
+
+    // Select running and executor assigned flows
+    private static final String FETCH_ACTIVE_EXECUTABLE_FLOW =
+        "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+            + "et.port port, et.id executorId, et.active executorStatus"
+            + " FROM execution_flows ex"
+            + " INNER JOIN "
+            + " executors et ON ex.executor_id = et.id"
+            + " Where ex.status NOT IN ("
+            + Status.SUCCEEDED.getNumVal() + ", "
+            + Status.KILLED.getNumVal() + ", "
+            + Status.FAILED.getNumVal() + ")";
+
+    @Override
+    public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
+        final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyMap();
+      }
+
+      final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
+          new HashMap<>();
+      do {
+        final int id = rs.getInt(1);
+        final int encodingType = rs.getInt(2);
+        final byte[] data = rs.getBytes(3);
+        final String host = rs.getString(4);
+        final int port = rs.getInt(5);
+        final int executorId = rs.getInt(6);
+        final boolean executorStatus = rs.getBoolean(7);
+
+        if (data == null) {
+          execFlows.put(id, null);
+        } else {
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object flowObj;
+          try {
+            // Convoluted way to inflate strings. Should find common package or
+            // helper function.
+            if (encType == EncodingType.GZIP) {
+              // Decompress the sucker.
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              final String jsonString = new String(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+
+            final ExecutableFlow exFlow =
+                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+            final Executor executor = new Executor(executorId, host, port, executorStatus);
+            final ExecutionReference ref = new ExecutionReference(id, executor);
+            execFlows.put(id, new Pair<>(ref, exFlow));
+          } catch (final IOException e) {
+            throw new SQLException("Error retrieving flow data " + id, e);
+          }
+        }
+      } while (rs.next());
+
+      return execFlows;
+    }
+  }
+
+  private static class FetchActiveExecutableFlowByExecId implements
+      ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
+
+    private static final String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
+        "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
+            + "et.port port, et.id executorId, et.active executorStatus"
+            + " FROM execution_flows ex"
+            + " INNER JOIN "
+            + " executors et ON ex.executor_id = et.id"
+            + " Where ex.exec_id = ? AND ex.status NOT IN ("
+            + Status.SUCCEEDED.getNumVal() + ", "
+            + Status.KILLED.getNumVal() + ", "
+            + Status.FAILED.getNumVal() + ")";
+
+    @Override
+    public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
+        throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
+          new ArrayList<>();
+      do {
+        final int id = rs.getInt(1);
+        final int encodingType = rs.getInt(2);
+        final byte[] data = rs.getBytes(3);
+        final String host = rs.getString(4);
+        final int port = rs.getInt(5);
+        final int executorId = rs.getInt(6);
+        final boolean executorStatus = rs.getBoolean(7);
+
+        if (data == null) {
+          logger.error("Found a flow with empty data blob exec_id: " + id);
+        } else {
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object flowObj;
+          try {
+            if (encType == EncodingType.GZIP) {
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              final String jsonString = new String(data, "UTF-8");
+              flowObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+
+            final ExecutableFlow exFlow =
+                ExecutableFlow.createExecutableFlowFromObject(flowObj);
+            final Executor executor = new Executor(executorId, host, port, executorStatus);
+            final ExecutionReference ref = new ExecutionReference(id, executor);
+            execFlows.add(new Pair<>(ref, exFlow));
+          } catch (final IOException e) {
+            throw new SQLException("Error retrieving flow data " + id, e);
+          }
+        }
+      } while (rs.next());
+
+      return execFlows;
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 9c1d579..3856122 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -19,39 +19,27 @@ package azkaban.executor;
 import azkaban.database.AbstractJdbcLoader;
 import azkaban.executor.ExecutorLogEvent.EventType;
 import azkaban.metrics.CommonMetrics;
-import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
-import azkaban.utils.PropsUtils;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
 
 @Singleton
 public class JdbcExecutorLoader extends AbstractJdbcLoader implements
@@ -60,15 +48,30 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       .getLogger(JdbcExecutorLoader.class);
   private final ExecutionFlowDao executionFlowDao;
   private final ExecutorDao executorDao;
+  private final ExecutionJobDao executionJobDao;
+  private final ExecutionLogsDao executionLogsDao;
+  private final ExecutorEventsDao executorEventsDao;
+  private final ActiveExecutingFlowsDao activeExecutingFlowsDao;
+  private final FetchActiveFlowDao fetchActiveFlowDao;
   private EncodingType defaultEncodingType = EncodingType.GZIP;
 
   @Inject
   public JdbcExecutorLoader(final Props props, final CommonMetrics commonMetrics,
                             final ExecutionFlowDao executionFlowDao,
-                            final ExecutorDao executorDao) {
+                            final ExecutorDao executorDao,
+                            final ExecutionJobDao executionJobDao,
+                            final ExecutionLogsDao executionLogsDao,
+                            final ExecutorEventsDao executorEventsDao,
+                            final ActiveExecutingFlowsDao activeExecutingFlowsDao,
+                            final FetchActiveFlowDao fetchActiveFlowDao) {
     super(props, commonMetrics);
     this.executionFlowDao = executionFlowDao;
     this.executorDao = executorDao;
+    this.executionJobDao = executionJobDao;
+    this.executionLogsDao= executionLogsDao;
+    this.executorEventsDao = executorEventsDao;
+    this.activeExecutingFlowsDao = activeExecutingFlowsDao;
+    this.fetchActiveFlowDao = fetchActiveFlowDao;
   }
 
   public EncodingType getDefaultEncodingType() {
@@ -142,38 +145,15 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows()
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchActiveExecutableFlows flowHandler = new FetchActiveExecutableFlows();
 
-    try {
-      final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> properties =
-          runner.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW,
-              flowHandler);
-      return properties;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching active flows", e);
-    }
+    return this.fetchActiveFlowDao.fetchActiveFlows();
   }
 
   @Override
   public Pair<ExecutionReference, ExecutableFlow> fetchActiveFlowByExecId(final int execId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    final FetchActiveExecutableFlowByExecId flowHandler = new FetchActiveExecutableFlowByExecId();
 
-    try {
-      final List<Pair<ExecutionReference, ExecutableFlow>> flows =
-          runner.query(FetchActiveExecutableFlowByExecId.FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID,
-              flowHandler, execId);
-      if(flows.isEmpty()) {
-        return null;
-      }
-      else {
-        return flows.get(0);
-      }
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching active flows by exec id", e);
-    }
+    return this.fetchActiveFlowDao.fetchActiveFlowByExecId(execId);
   }
 
   @Override
@@ -250,283 +230,96 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public void addActiveExecutableReference(final ExecutionReference reference)
       throws ExecutorManagerException {
-    final String INSERT =
-        "INSERT INTO active_executing_flows "
-            + "(exec_id, update_time) values (?,?)";
-    final QueryRunner runner = createQueryRunner();
 
-    try {
-      runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException(
-          "Error updating active flow reference " + reference.getExecId(), e);
-    }
+    this.activeExecutingFlowsDao.addActiveExecutableReference(reference);
   }
 
   @Override
   public void removeActiveExecutableReference(final int execid)
       throws ExecutorManagerException {
-    final String DELETE = "DELETE FROM active_executing_flows WHERE exec_id=?";
 
-    final QueryRunner runner = createQueryRunner();
-    try {
-      runner.update(DELETE, execid);
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException(
-          "Error deleting active flow reference " + execid, e);
-    }
+    this.activeExecutingFlowsDao.removeActiveExecutableReference(execid);
   }
 
   @Override
   public boolean updateExecutableReference(final int execId, final long updateTime)
       throws ExecutorManagerException {
-    final String DELETE =
-        "UPDATE active_executing_flows set update_time=? WHERE exec_id=?";
-
-    final QueryRunner runner = createQueryRunner();
-    int updateNum = 0;
-    try {
-      updateNum = runner.update(DELETE, updateTime, execId);
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException(
-          "Error deleting active flow reference " + execId, e);
-    }
 
     // Should be 1.
-    return updateNum > 0;
+    return this.activeExecutingFlowsDao.updateExecutableReference(execId, updateTime);
   }
 
   @Override
   public void uploadExecutableNode(final ExecutableNode node, final Props inputProps)
       throws ExecutorManagerException {
-    final String INSERT_EXECUTION_NODE =
-        "INSERT INTO execution_jobs "
-            + "(exec_id, project_id, version, flow_id, job_id, start_time, "
-            + "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
-
-    byte[] inputParam = null;
-    if (inputProps != null) {
-      try {
-        final String jsonString =
-            JSONUtils.toJSON(PropsUtils.toHierarchicalMap(inputProps));
-        inputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
-      } catch (final IOException e) {
-        throw new ExecutorManagerException("Error encoding input params");
-      }
-    }
 
-    final ExecutableFlow flow = node.getExecutableFlow();
-    final String flowId = node.getParentFlow().getFlowPath();
-    System.out.println("Uploading flowId " + flowId);
-    final QueryRunner runner = createQueryRunner();
-    try {
-      runner.update(INSERT_EXECUTION_NODE, flow.getExecutionId(),
-          flow.getProjectId(), flow.getVersion(), flowId, node.getId(),
-          node.getStartTime(), node.getEndTime(), node.getStatus().getNumVal(),
-          inputParam, node.getAttempt());
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error writing job " + node.getId(), e);
-    }
+    this.executionJobDao.uploadExecutableNode(node, inputProps);
   }
 
   @Override
   public void updateExecutableNode(final ExecutableNode node)
       throws ExecutorManagerException {
-    final String UPSERT_EXECUTION_NODE =
-        "UPDATE execution_jobs "
-            + "SET start_time=?, end_time=?, status=?, output_params=? "
-            + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
-
-    byte[] outputParam = null;
-    final Props outputProps = node.getOutputProps();
-    if (outputProps != null) {
-      try {
-        final String jsonString =
-            JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps));
-        outputParam = GZIPUtils.gzipString(jsonString, "UTF-8");
-      } catch (final IOException e) {
-        throw new ExecutorManagerException("Error encoding input params");
-      }
-    }
 
-    final QueryRunner runner = createQueryRunner();
-    try {
-      runner.update(UPSERT_EXECUTION_NODE, node.getStartTime(), node
-          .getEndTime(), node.getStatus().getNumVal(), outputParam, node
-          .getExecutableFlow().getExecutionId(), node.getParentFlow()
-          .getFlowPath(), node.getId(), node.getAttempt());
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error updating job " + node.getId(),
-          e);
-    }
+    this.executionJobDao.updateExecutableNode(node);
   }
 
   @Override
   public List<ExecutableJobInfo> fetchJobInfoAttempts(final int execId, final String jobId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
 
-    try {
-      final List<ExecutableJobInfo> info =
-          runner.query(
-              FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS,
-              new FetchExecutableJobHandler(), execId, jobId);
-      if (info == null || info.isEmpty()) {
-        return null;
-      }
-      return info;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error querying job info " + jobId, e);
-    }
+    return this.executionJobDao.fetchJobInfoAttempts(execId, jobId);
   }
 
   @Override
   public ExecutableJobInfo fetchJobInfo(final int execId, final String jobId, final int attempts)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
 
-    try {
-      final List<ExecutableJobInfo> info =
-          runner.query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE,
-              new FetchExecutableJobHandler(), execId, jobId, attempts);
-      if (info == null || info.isEmpty()) {
-        return null;
-      }
-      return info.get(0);
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error querying job info " + jobId, e);
-    }
+    return this.executionJobDao.fetchJobInfo(execId, jobId, attempts);
   }
 
   @Override
   public Props fetchExecutionJobInputProps(final int execId, final String jobId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    try {
-      final Pair<Props, Props> props =
-          runner.query(
-              FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE,
-              new FetchExecutableJobPropsHandler(), execId, jobId);
-      return props.getFirst();
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error querying job params " + execId
-          + " " + jobId, e);
-    }
+    return this.executionJobDao.fetchExecutionJobInputProps(execId, jobId);
   }
 
   @Override
   public Props fetchExecutionJobOutputProps(final int execId, final String jobId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    try {
-      final Pair<Props, Props> props =
-          runner
-              .query(
-                  FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE,
-                  new FetchExecutableJobPropsHandler(), execId, jobId);
-      return props.getFirst();
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error querying job params " + execId
-          + " " + jobId, e);
-    }
+    return this.executionJobDao.fetchExecutionJobOutputProps(execId, jobId);
   }
 
   @Override
   public Pair<Props, Props> fetchExecutionJobProps(final int execId, final String jobId)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
-    try {
-      final Pair<Props, Props> props =
-          runner
-              .query(
-                  FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE,
-                  new FetchExecutableJobPropsHandler(), execId, jobId);
-      return props;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error querying job params " + execId
-          + " " + jobId, e);
-    }
+    return this.executionJobDao.fetchExecutionJobProps(execId, jobId);
   }
 
   @Override
   public List<ExecutableJobInfo> fetchJobHistory(final int projectId, final String jobId,
                                                  final int skip, final int size) throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
 
-    try {
-      final List<ExecutableJobInfo> info =
-          runner.query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE,
-              new FetchExecutableJobHandler(), projectId, jobId, skip, size);
-      if (info == null || info.isEmpty()) {
-        return null;
-      }
-      return info;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error querying job info " + jobId, e);
-    }
+    return this.executionJobDao.fetchJobHistory(projectId, jobId, skip, size);
   }
 
   @Override
   public LogData fetchLogs(final int execId, final String name, final int attempt, final int startByte,
                            final int length) throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
 
-    final FetchLogsHandler handler =
-        new FetchLogsHandler(startByte, length + startByte);
-    try {
-      final LogData result =
-          runner.query(FetchLogsHandler.FETCH_LOGS, handler, execId, name,
-              attempt, startByte, startByte + length);
-      return result;
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error fetching logs " + execId
-          + " : " + name, e);
-    }
+    return this.executionLogsDao.fetchLogs(execId, name, attempt, startByte, length);
   }
 
   @Override
   public List<Object> fetchAttachments(final int execId, final String jobId, final int attempt)
       throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
 
-    try {
-      final String attachments =
-          runner
-              .query(
-                  FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE,
-                  new FetchExecutableJobAttachmentsHandler(), execId, jobId);
-      if (attachments == null) {
-        return null;
-      }
-
-      final List<Object> attachmentList =
-          (List<Object>) JSONUtils.parseJSONFromString(attachments);
-
-      return attachmentList;
-    } catch (final IOException e) {
-      throw new ExecutorManagerException(
-          "Error converting job attachments to JSON " + jobId, e);
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException(
-          "Error query job attachments " + jobId, e);
-    }
+    return this.executionJobDao.fetchAttachments(execId, jobId, attempt);
   }
 
   @Override
   public void uploadLogFile(final int execId, final String name, final int attempt, final File... files)
       throws ExecutorManagerException {
-    final Connection connection = getConnection();
-    try {
-      uploadLogFile(connection, execId, name, attempt, files,
-          this.defaultEncodingType);
-      connection.commit();
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error committing log", e);
-    } catch (final IOException e) {
-      throw new ExecutorManagerException("Error committing log", e);
-    } finally {
-      DbUtils.closeQuietly(connection);
-    }
+    this.executionLogsDao.uploadLogFile(execId, name, attempt, files);
   }
 
   private void uploadLogFile(final Connection connection, final int execId, final String name,
@@ -534,102 +327,13 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       throws ExecutorManagerException, IOException {
     // 50K buffer... if logs are greater than this, we chunk.
     // However, we better prevent large log files from being uploaded somehow
-    final byte[] buffer = new byte[50 * 1024];
-    int pos = 0;
-    int length = buffer.length;
-    int startByte = 0;
-    try {
-      for (int i = 0; i < files.length; ++i) {
-        final File file = files[i];
-
-        final BufferedInputStream bufferedStream =
-            new BufferedInputStream(new FileInputStream(file));
-        try {
-          int size = bufferedStream.read(buffer, pos, length);
-          while (size >= 0) {
-            if (pos + size == buffer.length) {
-              // Flush here.
-              uploadLogPart(connection, execId, name, attempt, startByte,
-                  startByte + buffer.length, encType, buffer, buffer.length);
-
-              pos = 0;
-              length = buffer.length;
-              startByte += buffer.length;
-            } else {
-              // Usually end of file.
-              pos += size;
-              length = buffer.length - pos;
-            }
-            size = bufferedStream.read(buffer, pos, length);
-          }
-        } finally {
-          IOUtils.closeQuietly(bufferedStream);
-        }
-      }
-
-      // Final commit of buffer.
-      if (pos > 0) {
-        uploadLogPart(connection, execId, name, attempt, startByte, startByte
-            + pos, encType, buffer, pos);
-      }
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error writing log part.", e);
-    } catch (final IOException e) {
-      throw new ExecutorManagerException("Error chunking", e);
-    }
-  }
-
-  private void uploadLogPart(final Connection connection, final int execId, final String name,
-                             final int attempt, final int startByte, final int endByte, final EncodingType encType,
-                             final byte[] buffer, final int length) throws SQLException, IOException {
-    final String INSERT_EXECUTION_LOGS =
-        "INSERT INTO execution_logs "
-            + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
-            + "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
-
-    final QueryRunner runner = new QueryRunner();
-    byte[] buf = buffer;
-    if (encType == EncodingType.GZIP) {
-      buf = GZIPUtils.gzipBytes(buf, 0, length);
-    } else if (length < buf.length) {
-      buf = Arrays.copyOf(buffer, length);
-    }
-
-    runner.update(connection, INSERT_EXECUTION_LOGS, execId, name, attempt,
-        encType.getNumVal(), startByte, startByte + length, buf, DateTime.now()
-            .getMillis());
+    this.executionLogsDao.uploadLogFile(connection, execId, name, attempt, files, encType);
   }
 
   @Override
   public void uploadAttachmentFile(final ExecutableNode node, final File file)
       throws ExecutorManagerException {
-    final Connection connection = getConnection();
-    try {
-      uploadAttachmentFile(connection, node, file, this.defaultEncodingType);
-      connection.commit();
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Error committing attachments ", e);
-    } catch (final IOException e) {
-      throw new ExecutorManagerException("Error uploading attachments ", e);
-    } finally {
-      DbUtils.closeQuietly(connection);
-    }
-  }
-
-  private void uploadAttachmentFile(final Connection connection, final ExecutableNode node,
-                                    final File file, final EncodingType encType) throws SQLException, IOException {
-
-    final String jsonString = FileUtils.readFileToString(file);
-    final byte[] attachments = GZIPUtils.gzipString(jsonString, "UTF-8");
-
-    final String UPDATE_EXECUTION_NODE_ATTACHMENTS =
-        "UPDATE execution_jobs " + "SET attachments=? "
-            + "WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?";
-
-    final QueryRunner runner = new QueryRunner();
-    runner.update(connection, UPDATE_EXECUTION_NODE_ATTACHMENTS, attachments,
-        node.getExecutableFlow().getExecutionId(), node.getParentFlow()
-            .getNestedId(), node.getId(), node.getAttempt());
+    this.executionJobDao.uploadAttachmentFile(node, file);
   }
 
   private Connection getConnection() throws ExecutorManagerException {
@@ -743,17 +447,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public void postExecutorEvent(final Executor executor, final EventType type, final String user,
                                 final String message) throws ExecutorManagerException{
-    final QueryRunner runner = createQueryRunner();
 
-    final String INSERT_PROJECT_EVENTS =
-      "INSERT INTO executor_events (executor_id, event_type, event_time, username, message) values (?,?,?,?,?)";
-    final Date updateDate = new Date();
-    try {
-      runner.update(INSERT_PROJECT_EVENTS, executor.getId(), type.getNumVal(),
-        updateDate, user, message);
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException("Failed to post executor event", e);
-    }
+    this.executorEventsDao.postExecutorEvent(executor, type, user, message);
   }
 
   /**
@@ -765,20 +460,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public List<ExecutorLogEvent> getExecutorEvents(final Executor executor, final int num,
                                                   final int offset) throws ExecutorManagerException {
-    final QueryRunner runner = createQueryRunner();
 
-    final ExecutorLogsResultHandler logHandler = new ExecutorLogsResultHandler();
-    List<ExecutorLogEvent> events = null;
-    try {
-      events =
-        runner.query(ExecutorLogsResultHandler.SELECT_EXECUTOR_EVENTS_ORDER,
-          logHandler, executor.getId(), num, offset);
-    } catch (final SQLException e) {
-      throw new ExecutorManagerException(
-        "Failed to fetch events for executor id : " + executor.getId(), e);
-    }
-
-    return events;
+    return this.executorEventsDao.getExecutorEvents(executor, num, offset);
   }
 
   /**
@@ -829,20 +512,8 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
   @Override
   public int removeExecutionLogsByTime(final long millis)
       throws ExecutorManagerException {
-    final String DELETE_BY_TIME =
-        "DELETE FROM execution_logs WHERE upload_time < ?";
-
-    final QueryRunner runner = createQueryRunner();
-    int updateNum = 0;
-    try {
-      updateNum = runner.update(DELETE_BY_TIME, millis);
-    } catch (final SQLException e) {
-      e.printStackTrace();
-      throw new ExecutorManagerException(
-          "Error deleting old execution_logs before " + millis, e);
-    }
 
-    return updateNum;
+    return this.executionLogsDao.removeExecutionLogsByTime(millis);
   }
 
   /**
@@ -881,193 +552,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
-  private static class FetchLogsHandler implements ResultSetHandler<LogData> {
-    private static final String FETCH_LOGS =
-        "SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log "
-            + "FROM execution_logs "
-            + "WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? "
-            + "AND start_byte <= ? ORDER BY start_byte";
-
-    private final int startByte;
-    private final int endByte;
-
-    public FetchLogsHandler(final int startByte, final int endByte) {
-      this.startByte = startByte;
-      this.endByte = endByte;
-    }
-
-    @Override
-    public LogData handle(final ResultSet rs) throws SQLException {
-      if (!rs.next()) {
-        return null;
-      }
-
-      final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
-
-      do {
-        // int execId = rs.getInt(1);
-        // String name = rs.getString(2);
-        final int attempt = rs.getInt(3);
-        final EncodingType encType = EncodingType.fromInteger(rs.getInt(4));
-        final int startByte = rs.getInt(5);
-        final int endByte = rs.getInt(6);
-
-        final byte[] data = rs.getBytes(7);
-
-        final int offset =
-            this.startByte > startByte ? this.startByte - startByte : 0;
-        final int length =
-            this.endByte < endByte ? this.endByte - startByte - offset
-                : endByte - startByte - offset;
-        try {
-          byte[] buffer = data;
-          if (encType == EncodingType.GZIP) {
-            buffer = GZIPUtils.unGzipBytes(data);
-          }
-
-          byteStream.write(buffer, offset, length);
-        } catch (final IOException e) {
-          throw new SQLException(e);
-        }
-      } while (rs.next());
-
-      final byte[] buffer = byteStream.toByteArray();
-      final Pair<Integer, Integer> result =
-          FileIOUtils.getUtf8Range(buffer, 0, buffer.length);
-
-      return new LogData(this.startByte + result.getFirst(), result.getSecond(),
-          new String(buffer, result.getFirst(), result.getSecond(), StandardCharsets.UTF_8));
-    }
-  }
-
-  private static class FetchExecutableJobHandler implements
-      ResultSetHandler<List<ExecutableJobInfo>> {
-    private static final String FETCH_EXECUTABLE_NODE =
-        "SELECT exec_id, project_id, version, flow_id, job_id, "
-            + "start_time, end_time, status, attempt "
-            + "FROM execution_jobs WHERE exec_id=? "
-            + "AND job_id=? AND attempt=?";
-    private static final String FETCH_EXECUTABLE_NODE_ATTEMPTS =
-        "SELECT exec_id, project_id, version, flow_id, job_id, "
-            + "start_time, end_time, status, attempt FROM execution_jobs "
-            + "WHERE exec_id=? AND job_id=?";
-    private static final String FETCH_PROJECT_EXECUTABLE_NODE =
-        "SELECT exec_id, project_id, version, flow_id, job_id, "
-            + "start_time, end_time, status, attempt FROM execution_jobs "
-            + "WHERE project_id=? AND job_id=? "
-            + "ORDER BY exec_id DESC LIMIT ?, ? ";
-
-    @Override
-    public List<ExecutableJobInfo> handle(final ResultSet rs) throws SQLException {
-      if (!rs.next()) {
-        return Collections.<ExecutableJobInfo> emptyList();
-      }
-
-      final List<ExecutableJobInfo> execNodes = new ArrayList<>();
-      do {
-        final int execId = rs.getInt(1);
-        final int projectId = rs.getInt(2);
-        final int version = rs.getInt(3);
-        final String flowId = rs.getString(4);
-        final String jobId = rs.getString(5);
-        final long startTime = rs.getLong(6);
-        final long endTime = rs.getLong(7);
-        final Status status = Status.fromInteger(rs.getInt(8));
-        final int attempt = rs.getInt(9);
-
-        final ExecutableJobInfo info =
-            new ExecutableJobInfo(execId, projectId, version, flowId, jobId,
-                startTime, endTime, status, attempt);
-        execNodes.add(info);
-      } while (rs.next());
-
-      return execNodes;
-    }
-  }
-
-  private static class FetchExecutableJobAttachmentsHandler implements
-      ResultSetHandler<String> {
-    private static final String FETCH_ATTACHMENTS_EXECUTABLE_NODE =
-        "SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";
-
-    @Override
-    public String handle(final ResultSet rs) throws SQLException {
-      String attachmentsJson = null;
-      if (rs.next()) {
-        try {
-          final byte[] attachments = rs.getBytes(1);
-          if (attachments != null) {
-            attachmentsJson = GZIPUtils.unGzipString(attachments, "UTF-8");
-          }
-        } catch (final IOException e) {
-          throw new SQLException("Error decoding job attachments", e);
-        }
-      }
-      return attachmentsJson;
-    }
-  }
-
-  private static class FetchExecutableJobPropsHandler implements
-      ResultSetHandler<Pair<Props, Props>> {
-    private static final String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE =
-        "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
-    private static final String FETCH_INPUT_PARAM_EXECUTABLE_NODE =
-        "SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
-    private static final String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE =
-        "SELECT input_params, output_params "
-            + "FROM execution_jobs WHERE exec_id=? AND job_id=?";
-
-    @Override
-    public Pair<Props, Props> handle(final ResultSet rs) throws SQLException {
-      if (!rs.next()) {
-        return new Pair<>(null, null);
-      }
-
-      if (rs.getMetaData().getColumnCount() > 1) {
-        final byte[] input = rs.getBytes(1);
-        final byte[] output = rs.getBytes(2);
-
-        Props inputProps = null;
-        Props outputProps = null;
-        try {
-          if (input != null) {
-            final String jsonInputString = GZIPUtils.unGzipString(input, "UTF-8");
-            inputProps =
-                PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
-                    .parseJSONFromString(jsonInputString));
-
-          }
-          if (output != null) {
-            final String jsonOutputString = GZIPUtils.unGzipString(output, "UTF-8");
-            outputProps =
-                PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
-                    .parseJSONFromString(jsonOutputString));
-          }
-        } catch (final IOException e) {
-          throw new SQLException("Error decoding param data", e);
-        }
-
-        return new Pair<>(inputProps, outputProps);
-      } else {
-        final byte[] params = rs.getBytes(1);
-        Props props = null;
-        try {
-          if (params != null) {
-            final String jsonProps = GZIPUtils.unGzipString(params, "UTF-8");
-
-            props =
-                PropsUtils.fromHierarchicalMap((Map<String, Object>) JSONUtils
-                    .parseJSONFromString(jsonProps));
-          }
-        } catch (final IOException e) {
-          throw new SQLException("Error decoding param data", e);
-        }
-
-        return new Pair<>(props, null);
-      }
-    }
-  }
-
   /**
    * JDBC ResultSetHandler to fetch queued executions
    */
@@ -1170,130 +654,6 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
-  private static class FetchActiveExecutableFlows implements
-      ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
-    // Select running and executor assigned flows
-    private static final String FETCH_ACTIVE_EXECUTABLE_FLOW =
-      "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
-        + "et.port port, et.id executorId, et.active executorStatus"
-        + " FROM execution_flows ex"
-        + " INNER JOIN "
-        + " executors et ON ex.executor_id = et.id"
-        + " Where ex.status NOT IN ("
-        + Status.SUCCEEDED.getNumVal() + ", "
-        + Status.KILLED.getNumVal() + ", "
-        + Status.FAILED.getNumVal() + ")";
-
-    @Override
-    public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(
-        final ResultSet rs) throws SQLException {
-      if (!rs.next()) {
-        return Collections.emptyMap();
-      }
-
-      final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows =
-          new HashMap<>();
-      do {
-        final int id = rs.getInt(1);
-        final int encodingType = rs.getInt(2);
-        final byte[] data = rs.getBytes(3);
-        final String host = rs.getString(4);
-        final int port = rs.getInt(5);
-        final int executorId = rs.getInt(6);
-        final boolean executorStatus = rs.getBoolean(7);
-
-        if (data == null) {
-          execFlows.put(id, null);
-        } else {
-          final EncodingType encType = EncodingType.fromInteger(encodingType);
-          final Object flowObj;
-          try {
-            // Convoluted way to inflate strings. Should find common package or
-            // helper function.
-            if (encType == EncodingType.GZIP) {
-              // Decompress the sucker.
-              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            } else {
-              final String jsonString = new String(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            }
-
-            final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            final Executor executor = new Executor(executorId, host, port, executorStatus);
-            final ExecutionReference ref = new ExecutionReference(id, executor);
-            execFlows.put(id, new Pair<>(ref, exFlow));
-          } catch (final IOException e) {
-            throw new SQLException("Error retrieving flow data " + id, e);
-          }
-        }
-      } while (rs.next());
-
-      return execFlows;
-    }
-  }
-
-  private static class FetchActiveExecutableFlowByExecId implements
-      ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
-    private static final String FETCH_ACTIVE_EXECUTABLE_FLOW_BY_EXECID =
-        "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, "
-            + "et.port port, et.id executorId, et.active executorStatus"
-            + " FROM execution_flows ex"
-            + " INNER JOIN "
-            + " executors et ON ex.executor_id = et.id"
-            + " Where ex.exec_id = ? AND ex.status NOT IN ("
-            + Status.SUCCEEDED.getNumVal() + ", "
-            + Status.KILLED.getNumVal() + ", "
-            + Status.FAILED.getNumVal() + ")";
-
-    @Override
-    public List<Pair<ExecutionReference, ExecutableFlow>> handle(final ResultSet rs)
-        throws SQLException {
-      if (!rs.next()) {
-        return Collections.emptyList();
-      }
-
-      final List<Pair<ExecutionReference, ExecutableFlow>> execFlows =
-          new ArrayList<>();
-      do {
-        final int id = rs.getInt(1);
-        final int encodingType = rs.getInt(2);
-        final byte[] data = rs.getBytes(3);
-        final String host = rs.getString(4);
-        final int port = rs.getInt(5);
-        final int executorId = rs.getInt(6);
-        final boolean executorStatus = rs.getBoolean(7);
-
-        if (data == null) {
-          logger.error("Found a flow with empty data blob exec_id: " + id);
-        } else {
-          final EncodingType encType = EncodingType.fromInteger(encodingType);
-          final Object flowObj;
-          try {
-            if (encType == EncodingType.GZIP) {
-              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            } else {
-              final String jsonString = new String(data, "UTF-8");
-              flowObj = JSONUtils.parseJSONFromString(jsonString);
-            }
-
-            final ExecutableFlow exFlow =
-                ExecutableFlow.createExecutableFlowFromObject(flowObj);
-            final Executor executor = new Executor(executorId, host, port, executorStatus);
-            final ExecutionReference ref = new ExecutionReference(id, executor);
-            execFlows.add(new Pair<>(ref, exFlow));
-          } catch (final IOException e) {
-            throw new SQLException("Error retrieving flow data " + id, e);
-          }
-        }
-      } while (rs.next());
-
-      return execFlows;
-    }
-  }
-
   private static class IntHandler implements ResultSetHandler<Integer> {
     private static final String NUM_EXECUTIONS =
         "SELECT COUNT(1) FROM execution_flows";
@@ -1313,36 +673,4 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
     }
   }
 
- /**
-   * JDBC ResultSetHandler to fetch records from executor_events table
-   */
-  private static class ExecutorLogsResultHandler implements
-    ResultSetHandler<List<ExecutorLogEvent>> {
-    private static final String SELECT_EXECUTOR_EVENTS_ORDER =
-      "SELECT executor_id, event_type, event_time, username, message FROM executor_events "
-        + " WHERE executor_id=? ORDER BY event_time LIMIT ? OFFSET ?";
-
-    @Override
-    public List<ExecutorLogEvent> handle(final ResultSet rs) throws SQLException {
-      if (!rs.next()) {
-        return Collections.<ExecutorLogEvent> emptyList();
-      }
-
-      final ArrayList<ExecutorLogEvent> events = new ArrayList<>();
-      do {
-        final int executorId = rs.getInt(1);
-        final int eventType = rs.getInt(2);
-        final Date eventTime = rs.getDate(3);
-        final String username = rs.getString(4);
-        final String message = rs.getString(5);
-
-        final ExecutorLogEvent event =
-          new ExecutorLogEvent(executorId, username, eventTime,
-            EventType.fromInteger(eventType), message);
-        events.add(event);
-      } while (rs.next());
-
-      return events;
-    }
-  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
index 1552742..e3317f1 100644
--- a/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/JdbcExecutorLoaderTest.java
@@ -1060,7 +1060,9 @@ public class JdbcExecutorLoaderTest {
 
     //TODO kunkun-tang: temporary work-around here. This Test is to be deprecated.
     return new JdbcExecutorLoader(props,
-        new CommonMetrics(new MetricsManager(new MetricRegistry())), null, null);
+        new CommonMetrics(new MetricsManager(new MetricRegistry())), null
+        , null, null, null, null,
+        null, null);
   }
 
   private boolean isTestSetup() {
diff --git a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
index 6718abc..c3c88e4 100644
--- a/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/webapp/AzkabanWebServerTest.java
@@ -28,12 +28,17 @@ import azkaban.AzkabanCommonModule;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.database.AzkabanDatabaseUpdater;
 import azkaban.db.DatabaseOperator;
+import azkaban.executor.ActiveExecutingFlowsDao;
 import azkaban.executor.AlerterHolder;
 import azkaban.executor.ExecutionFlowDao;
+import azkaban.executor.ExecutionJobDao;
+import azkaban.executor.ExecutionLogsDao;
 import azkaban.executor.Executor;
 import azkaban.executor.ExecutorDao;
+import azkaban.executor.ExecutorEventsDao;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
+import azkaban.executor.FetchActiveFlowDao;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManager;
 import azkaban.spi.Storage;
@@ -138,6 +143,11 @@ public class AzkabanWebServerTest {
     assertSingleton(Emailer.class, injector);
     assertSingleton(ExecutionFlowDao.class, injector);
     assertSingleton(ExecutorDao.class, injector);
+    assertSingleton(ExecutionJobDao.class, injector);
+    assertSingleton(ExecutionLogsDao.class, injector);
+    assertSingleton(ExecutorEventsDao.class, injector);
+    assertSingleton(ActiveExecutingFlowsDao.class, injector);
+    assertSingleton(FetchActiveFlowDao.class, injector);
 
     SERVICE_PROVIDER.unsetInjector();
   }