azkaban-aplcache

refactor JDBC Project class to leverage new azkaban-db (#1079) *

7/11/2017 7:44:58 PM

Details

diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index 8199c70..0063e49 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -67,7 +67,7 @@ dependencies {
     testRuntime deps.h2
 
     testCompile(project(':test').sourceSets.test.output)
-
+    testCompile project(':azkaban-db').sourceSets.test.output
 }
 
 tasks.withType(JavaCompile) {
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index abdbfb2..f528556 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -29,7 +29,7 @@ import azkaban.executor.AlerterHolder;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
-import azkaban.project.JdbcProjectLoader;
+import azkaban.project.JdbcProjectImpl;
 import azkaban.project.ProjectLoader;
 import azkaban.spi.AzkabanException;
 import azkaban.spi.Storage;
@@ -77,12 +77,12 @@ public class AzkabanCommonModule extends AbstractModule {
   @Override
   protected void configure() {
     bind(ExecutorLoader.class).to(JdbcExecutorLoader.class).in(Scopes.SINGLETON);
-    bind(ProjectLoader.class).to(JdbcProjectLoader.class).in(Scopes.SINGLETON);
     bind(Props.class).toInstance(this.config.getProps());
     bind(Storage.class).to(resolveStorageClassType()).in(Scopes.SINGLETON);
     bind(HdfsAuth.class).in(Scopes.SINGLETON);
     bind(DatabaseOperator.class).to(DatabaseOperatorImpl.class).in(Scopes.SINGLETON);
     bind(TriggerLoader.class).to(JdbcTriggerImpl.class).in(Scopes.SINGLETON);
+    bind(ProjectLoader.class).to(JdbcProjectImpl.class).in(Scopes.SINGLETON);
     bind(DataSource.class).to(AzkabanDataSource.class);
     bind(ExecutorManager.class).in(Scopes.SINGLETON);
     bind(AlerterHolder.class).in(Scopes.SINGLETON);
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
new file mode 100644
index 0000000..16b9bf0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.project;
+
+import azkaban.database.EncodingType;
+import azkaban.flow.Flow;
+import azkaban.user.Permission;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import azkaban.utils.Triple;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.dbutils.ResultSetHandler;
+
+
+/**
+ * This is a JDBC Handler collection place for all project handler classes.
+ */
+class JdbcProjectHandlerSet {
+
+  public static class ProjectResultHandler implements ResultSetHandler<List<Project>> {
+    public static String SELECT_PROJECT_BY_NAME =
+        "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=?";
+
+    public static String SELECT_PROJECT_BY_ID =
+        "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE id=?";
+
+    public static String SELECT_ALL_ACTIVE_PROJECTS =
+        "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE active=true";
+
+    public static String SELECT_ACTIVE_PROJECT_BY_NAME =
+        "SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true";
+
+    @Override
+    public List<Project> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final ArrayList<Project> projects = new ArrayList<>();
+      do {
+        final int id = rs.getInt(1);
+        final String name = rs.getString(2);
+        final boolean active = rs.getBoolean(3);
+        final long modifiedTime = rs.getLong(4);
+        final long createTime = rs.getLong(5);
+        final int version = rs.getInt(6);
+        final String lastModifiedBy = rs.getString(7);
+        final String description = rs.getString(8);
+        final int encodingType = rs.getInt(9);
+        final byte[] data = rs.getBytes(10);
+
+        final Project project;
+        if (data != null) {
+          final EncodingType encType = EncodingType.fromInteger(encodingType);
+          final Object blobObj;
+          try {
+            // Convoluted way to inflate strings. Should find common package or
+            // helper function.
+            if (encType == EncodingType.GZIP) {
+              // Decompress the sucker.
+              final String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+              blobObj = JSONUtils.parseJSONFromString(jsonString);
+            } else {
+              final String jsonString = new String(data, "UTF-8");
+              blobObj = JSONUtils.parseJSONFromString(jsonString);
+            }
+            project = Project.projectFromObject(blobObj);
+          } catch (final IOException e) {
+            throw new SQLException("Failed to get project.", e);
+          }
+        } else {
+          project = new Project(id, name);
+        }
+
+        // update the fields as they may have changed
+
+        project.setActive(active);
+        project.setLastModifiedTimestamp(modifiedTime);
+        project.setCreateTimestamp(createTime);
+        project.setVersion(version);
+        project.setLastModifiedUser(lastModifiedBy);
+        project.setDescription(description);
+
+        projects.add(project);
+      } while (rs.next());
+
+      return projects;
+    }
+  }
+
+  public static class ProjectPermissionsResultHandler implements ResultSetHandler<List<Triple<String, Boolean, Permission>>> {
+    public static String SELECT_PROJECT_PERMISSION =
+        "SELECT project_id, modified_time, name, permissions, isGroup FROM project_permissions WHERE project_id=?";
+
+    @Override
+    public List<Triple<String, Boolean, Permission>> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final List<Triple<String, Boolean, Permission>> permissions = new ArrayList<>();
+      do {
+        final String username = rs.getString(3);
+        final int permissionFlag = rs.getInt(4);
+        final boolean val = rs.getBoolean(5);
+
+        final Permission perm = new Permission(permissionFlag);
+        permissions.add(new Triple<>(username, val, perm));
+      } while (rs.next());
+
+      return permissions;
+    }
+  }
+
+  public static class ProjectFlowsResultHandler implements ResultSetHandler<List<Flow>> {
+    public static String SELECT_PROJECT_FLOW =
+        "SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=? AND flow_id=?";
+
+    public static String SELECT_ALL_PROJECT_FLOWS =
+        "SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?";
+
+    @Override
+    public List<Flow> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final ArrayList<Flow> flows = new ArrayList<>();
+      do {
+        final String flowId = rs.getString(3);
+        final int encodingType = rs.getInt(5);
+        final byte[] dataBytes = rs.getBytes(6);
+
+        if (dataBytes == null) {
+          continue;
+        }
+
+        final EncodingType encType = EncodingType.fromInteger(encodingType);
+
+        Object flowObj = null;
+        try {
+          // Convoluted way to inflate strings. Should find common package or
+          // helper function.
+          if (encType == EncodingType.GZIP) {
+            // Decompress the sucker.
+            final String jsonString = GZIPUtils.unGzipString(dataBytes, "UTF-8");
+            flowObj = JSONUtils.parseJSONFromString(jsonString);
+          } else {
+            final String jsonString = new String(dataBytes, "UTF-8");
+            flowObj = JSONUtils.parseJSONFromString(jsonString);
+          }
+
+          final Flow flow = Flow.flowFromObject(flowObj);
+          flows.add(flow);
+        } catch (final IOException e) {
+          throw new SQLException("Error retrieving flow data " + flowId, e);
+        }
+      } while (rs.next());
+
+      return flows;
+    }
+  }
+
+  public static class ProjectPropertiesResultsHandler implements ResultSetHandler<List<Pair<String, Props>>> {
+    public static String SELECT_PROJECT_PROPERTY =
+        "SELECT project_id, version, name, modified_time, encoding_type, property FROM project_properties WHERE project_id=? AND version=? AND name=?";
+
+    public static String SELECT_PROJECT_PROPERTIES =
+        "SELECT project_id, version, name, modified_time, encoding_type, property FROM project_properties WHERE project_id=? AND version=?";
+
+    @Override
+    public List<Pair<String, Props>> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final List<Pair<String, Props>> properties = new ArrayList<>();
+      do {
+        final String name = rs.getString(3);
+        final int eventType = rs.getInt(5);
+        final byte[] dataBytes = rs.getBytes(6);
+
+        final EncodingType encType = EncodingType.fromInteger(eventType);
+        String propertyString = null;
+
+        try {
+          if (encType == EncodingType.GZIP) {
+            // Decompress the sucker.
+            propertyString = GZIPUtils.unGzipString(dataBytes, "UTF-8");
+          } else {
+            propertyString = new String(dataBytes, "UTF-8");
+          }
+
+          final Props props = PropsUtils.fromJSONString(propertyString);
+          props.setSource(name);
+          properties.add(new Pair<>(name, props));
+        } catch (final IOException e) {
+          throw new SQLException(e);
+        }
+      } while (rs.next());
+
+      return properties;
+    }
+  }
+
+  public static class ProjectLogsResultHandler implements ResultSetHandler<List<ProjectLogEvent>> {
+    public static String SELECT_PROJECT_EVENTS_ORDER =
+        "SELECT project_id, event_type, event_time, username, message FROM project_events WHERE project_id=? ORDER BY event_time DESC LIMIT ? OFFSET ?";
+
+    @Override
+    public List<ProjectLogEvent> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final ArrayList<ProjectLogEvent> events = new ArrayList<>();
+      do {
+        final int projectId = rs.getInt(1);
+        final int eventType = rs.getInt(2);
+        final long eventTime = rs.getLong(3);
+        final String username = rs.getString(4);
+        final String message = rs.getString(5);
+
+        final ProjectLogEvent event =
+            new ProjectLogEvent(projectId, ProjectLogEvent.EventType.fromInteger(eventType), eventTime, username,
+                message);
+        events.add(event);
+      } while (rs.next());
+
+      return events;
+    }
+  }
+
+  public static class ProjectFileChunkResultHandler implements ResultSetHandler<List<byte[]>> {
+    public static String SELECT_PROJECT_CHUNKS_FILE =
+        "SELECT project_id, version, chunk, size, file FROM project_files WHERE project_id=? AND version=? AND chunk >= ? AND chunk < ? ORDER BY chunk ASC";
+
+    @Override
+    public List<byte[]> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return Collections.emptyList();
+      }
+
+      final ArrayList<byte[]> data = new ArrayList<>();
+      do {
+        final byte[] bytes = rs.getBytes(5);
+
+        data.add(bytes);
+      } while (rs.next());
+
+      return data;
+    }
+  }
+
+  public static class ProjectVersionResultHandler implements ResultSetHandler<List<ProjectFileHandler>> {
+    public static String SELECT_PROJECT_VERSION =
+        "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id "
+            + "FROM project_versions WHERE project_id=? AND version=?";
+
+    @Override
+    public List<ProjectFileHandler> handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return null;
+      }
+
+      final List<ProjectFileHandler> handlers = new ArrayList<>();
+      do {
+        final int projectId = rs.getInt(1);
+        final int version = rs.getInt(2);
+        final long uploadTime = rs.getLong(3);
+        final String uploader = rs.getString(4);
+        final String fileType = rs.getString(5);
+        final String fileName = rs.getString(6);
+        final byte[] md5 = rs.getBytes(7);
+        final int numChunks = rs.getInt(8);
+        final String resourceId = rs.getString(9);
+
+        final ProjectFileHandler handler =
+            new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5,
+                resourceId);
+
+        handlers.add(handler);
+      } while (rs.next());
+
+      return handlers;
+    }
+  }
+
+  public static class IntHandler implements ResultSetHandler<Integer> {
+    public static String SELECT_LATEST_VERSION = "SELECT MAX(version) FROM project_versions WHERE project_id=?";
+
+    @Override
+    public Integer handle(final ResultSet rs) throws SQLException {
+      if (!rs.next()) {
+        return 0;
+      }
+
+      return rs.getInt(1);
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
new file mode 100644
index 0000000..b212a92
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -0,0 +1,903 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.project;
+
+import static azkaban.project.JdbcProjectHandlerSet.IntHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectFileChunkResultHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectFlowsResultHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectLogsResultHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectPermissionsResultHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectPropertiesResultsHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectResultHandler;
+import static azkaban.project.JdbcProjectHandlerSet.ProjectVersionResultHandler;
+
+import azkaban.database.EncodingType;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.DatabaseTransOperator;
+import azkaban.db.SQLTransaction;
+import azkaban.flow.Flow;
+import azkaban.project.ProjectLogEvent.EventType;
+import azkaban.user.Permission;
+import azkaban.user.User;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Md5Hasher;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import azkaban.utils.Triple;
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class implements ProjectLoader using new azkaban-db code to allow DB failover.
+ * TODO kunkun-tang: This class is too long. In future, we should split {@link ProjectLoader} interface
+ * and have multiple short class implementations.
+ */
+public class JdbcProjectImpl implements ProjectLoader {
+  private static final Logger logger = Logger.getLogger(JdbcProjectImpl.class);
+
+  private static final int CHUCK_SIZE = 1024 * 1024 * 10;
+  private final DatabaseOperator dbOperator;
+  private final File tempDir;
+  private final EncodingType defaultEncodingType = EncodingType.GZIP;
+
+  @Inject
+  public JdbcProjectImpl(final Props props, final DatabaseOperator databaseOperator) {
+
+    this.dbOperator = databaseOperator;
+    this.tempDir = new File(props.getString("project.temp.dir", "temp"));
+    if (!this.tempDir.exists()) {
+      if (this.tempDir.mkdirs()) {
+        logger.info("project temporary folder is being constructed.");
+      } else {
+        logger.info("project temporary folder already existed.");
+      }
+    }
+  }
+
+  @Override
+  public List<Project> fetchAllActiveProjects() throws ProjectManagerException {
+
+    final ProjectResultHandler handler = new ProjectResultHandler();
+    List<Project> projects = null;
+
+    try {
+      projects = this.dbOperator.query(ProjectResultHandler.SELECT_ALL_ACTIVE_PROJECTS, handler);
+      projects.forEach(project -> {
+        for (final Triple<String, Boolean, Permission> perm : fetchPermissionsForProject(project)) {
+          setProjectPermission(project, perm);
+        }
+      });
+    } catch (final SQLException ex) {
+      logger.error(ProjectResultHandler.SELECT_PROJECT_BY_ID + " failed.", ex);
+      throw new ProjectManagerException("Error retrieving all projects", ex);
+    }
+    return projects;
+  }
+
+  private void setProjectPermission(final Project project, final Triple<String, Boolean, Permission> perm) {
+    if (perm.getSecond()) {
+      project.setGroupPermission(perm.getFirst(), perm.getThird());
+    } else {
+      project.setUserPermission(perm.getFirst(), perm.getThird());
+    }
+  }
+
+  @Override
+  public Project fetchProjectById(final int id) throws ProjectManagerException {
+
+    Project project = null;
+    final ProjectResultHandler handler = new ProjectResultHandler();
+
+    try {
+      final List<Project> projects = this.dbOperator
+          .query(ProjectResultHandler.SELECT_PROJECT_BY_ID, handler, id);
+      if (projects.isEmpty()) {
+        throw new ProjectManagerException("No project with id " + id + " exists in db.");
+      }
+      project = projects.get(0);
+
+      // Fetch the user permissions
+      for (final Triple<String, Boolean, Permission> perm : fetchPermissionsForProject(project)) {
+        // TODO kunkun-tang: understand why we need to check permission not equal to 0 here.
+        if (perm.getThird().toFlags() != 0) {
+          setProjectPermission(project, perm);
+        }
+      }
+    } catch (final SQLException ex) {
+      logger.error(ProjectResultHandler.SELECT_PROJECT_BY_ID + " failed.", ex);
+      throw new ProjectManagerException("Query for existing project failed. Project " + id, ex);
+    }
+
+    return project;
+  }
+
+  @Override
+  public Project fetchProjectByName(final String name) throws ProjectManagerException {
+    Project project = null;
+    final ProjectResultHandler handler = new ProjectResultHandler();
+
+    // select active project from db first, if not exist, select inactive one.
+    // At most one active project with the same name exists in db.
+    try {
+      List<Project> projects = this.dbOperator
+          .query(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler, name);
+      if (projects.isEmpty()) {
+        projects = this.dbOperator.query(ProjectResultHandler.SELECT_PROJECT_BY_NAME, handler, name);
+        if (projects.isEmpty()) {
+          throw new ProjectManagerException("No project with name " + name + " exists in db.");
+        }
+      }
+      project = projects.get(0);
+      for (final Triple<String, Boolean, Permission> perm : fetchPermissionsForProject(project)) {
+        if (perm.getThird().toFlags() != 0) {
+          setProjectPermission(project, perm);
+        }
+      }
+    } catch (final SQLException ex) {
+      logger.error(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME + " failed.", ex);
+      throw new ProjectManagerException(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME + " failed.", ex);
+    }
+    return project;
+  }
+
+  private List<Triple<String, Boolean, Permission>> fetchPermissionsForProject(final Project project)
+      throws ProjectManagerException {
+    final ProjectPermissionsResultHandler permHander = new ProjectPermissionsResultHandler();
+
+    List<Triple<String, Boolean, Permission>> permissions = null;
+    try {
+      permissions =
+          this.dbOperator
+              .query(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION, permHander, project.getId());
+    } catch (final SQLException ex) {
+      logger.error(ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION + " failed.", ex);
+      throw new ProjectManagerException("Query for permissions for " + project.getName() + " failed.", ex);
+    }
+    return permissions;
+  }
+
+  /**
+   * Creates a Project in the db.
+   *
+   * It will throw an exception if it finds an active project of the same name,
+   * or the SQL fails
+   */
+  @Override
+  public synchronized Project createNewProject(final String name, final String description, final User creator)
+      throws ProjectManagerException {
+    final ProjectResultHandler handler = new ProjectResultHandler();
+
+    // Check if the same project name exists.
+    try {
+      final List<Project> projects = this.dbOperator
+          .query(ProjectResultHandler.SELECT_ACTIVE_PROJECT_BY_NAME, handler, name);
+      if (!projects.isEmpty()) {
+        throw new ProjectManagerException("Active project with name " + name + " already exists in db.");
+      }
+    } catch (final SQLException ex) {
+      logger.error(ex);
+      throw new ProjectManagerException("Checking for existing project failed. " + name, ex);
+    }
+
+    final String INSERT_PROJECT =
+        "INSERT INTO projects ( name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob) values (?,?,?,?,?,?,?,?,?)";
+    final SQLTransaction<Integer> insertProject = transOperator -> {
+      final long time = System.currentTimeMillis();
+      return transOperator.update(INSERT_PROJECT, name, true, time, time, null, creator.getUserId(), description,
+          this.defaultEncodingType.getNumVal(), null);
+    };
+
+    // Insert project
+    try {
+      final int numRowsInserted = this.dbOperator.transaction(insertProject);
+      if (numRowsInserted == 0) {
+        throw new ProjectManagerException("No projects have been inserted.");
+      }
+    } catch (final SQLException ex) {
+      logger.error(INSERT_PROJECT + " failed.", ex);
+      throw new ProjectManagerException("Insert project" + name + " for existing project failed. ", ex);
+    }
+    return fetchProjectByName(name);
+  }
+
+  @Override
+  public void uploadProjectFile(final int projectId, final int version, final File localFile, final String uploader)
+      throws ProjectManagerException {
+    final long startMs = System.currentTimeMillis();
+    logger.info(String.format("Uploading Project ID: %d file: %s [%d bytes]", projectId, localFile.getName(),
+        localFile.length()));
+
+    /*
+     * The below transaction uses one connection to do all operations. Ideally, we should commit
+     * after the transaction completes. However, uploadFile needs to commit every time when we
+     * upload any single chunk.
+     *
+     * Todo kunkun-tang: fix the transaction issue.
+     */
+    final SQLTransaction<Integer> uploadProjectFileTransaction = transOperator -> {
+
+      /* Step 1: Update DB with new project info */
+      addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader, computeHash(localFile), null);
+      transOperator.getConnection().commit();
+
+      /* Step 2: Upload File in chunks to DB */
+      final int chunks = uploadFileInChunks(transOperator, projectId, version, localFile);
+
+      /* Step 3: Update number of chunks in DB */
+      updateChunksInProjectVersions(transOperator, projectId, version, chunks);
+      return 1;
+    };
+
+    try {
+      this.dbOperator.transaction(uploadProjectFileTransaction);
+    } catch (final SQLException e) {
+      logger.error("upload project files failed.", e);
+      throw new ProjectManagerException("upload project files failed.", e);
+    }
+
+    final long duration = (System.currentTimeMillis() - startMs) / 1000;
+    logger.info(String.format("Uploaded Project ID: %d file: %s [%d bytes] in %d sec", projectId, localFile.getName(),
+        localFile.length(), duration));
+  }
+
+
+  private byte[] computeHash(final File localFile) {
+    logger.info("Creating message digest for upload " + localFile.getName());
+    final byte[] md5;
+    try {
+      md5 = Md5Hasher.md5Hash(localFile);
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Error getting md5 hash.", e);
+    }
+
+    logger.info("Md5 hash created");
+    return md5;
+  }
+
+  @Override
+  public void addProjectVersion(
+      final int projectId,
+      final int version,
+      final File localFile,
+      final String uploader,
+      final byte[] md5,
+      final String resourceId) throws ProjectManagerException {
+
+    // when one transaction completes, it automatically commits.
+    final SQLTransaction<Integer> transaction = transOperator -> {
+      addProjectToProjectVersions(transOperator, projectId, version, localFile, uploader, md5, resourceId);
+      return 1;
+    };
+    try {
+      this.dbOperator.transaction(transaction);
+    } catch (final SQLException e) {
+      logger.error("addProjectVersion failed.", e);
+      throw new ProjectManagerException("addProjectVersion failed.", e);
+    }
+  }
+
+  /**
+   * Insert a new version record to TABLE project_versions before uploading files.
+   *
+   * The reason for this operation:
+   * When error chunking happens in remote mysql server, incomplete file data remains
+   * in DB, and an SQL exception is thrown. If we don't have this operation before uploading file,
+   * the SQL exception prevents AZ from creating the new version record in Table project_versions.
+   * However, the Table project_files still reserve the incomplete files, which causes troubles
+   * when uploading a new file: Since the version in TABLE project_versions is still old, mysql will stop
+   * inserting new files to db.
+   *
+   * Why this operation is safe:
+   * When AZ uploads a new zip file, it always fetches the latest version proj_v from TABLE project_version,
+   * proj_v+1 will be used as the new version for the uploading files.
+   *
+   * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version + 1).
+   * When we upload a new project zip in day2, new file in day 2 will use the new version (proj_v + 1).
+   * When file uploading completes, AZ will clean all old chunks in DB afterward.
+   */
+  private void addProjectToProjectVersions(
+      final DatabaseTransOperator transOperator,
+      final int projectId,
+      final int version,
+      final File localFile,
+      final String uploader,
+      final byte[] md5,
+      final String resourceId) throws ProjectManagerException {
+    final long updateTime = System.currentTimeMillis();
+    final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
+        + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id) values "
+        + "(?,?,?,?,?,?,?,?,?)";
+
+    try {
+      /*
+       * As we don't know the num_chunks before uploading the file, we initialize it to 0,
+       * and will update it after uploading completes.
+       */
+      transOperator.update(INSERT_PROJECT_VERSION, projectId, version, updateTime, uploader,
+          Files.getFileExtension(localFile.getName()), localFile.getName(), md5, 0, resourceId);
+    } catch (final SQLException e) {
+      final String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
+      logger.error(msg, e);
+      throw new ProjectManagerException(msg, e);
+    }
+  }
+
+  private int uploadFileInChunks(final DatabaseTransOperator transOperator, final int projectId, final int version, final File localFile)
+      throws ProjectManagerException {
+
+    // Really... I doubt we'll get a > 2gig file. So int casting it is!
+    final byte[] buffer = new byte[CHUCK_SIZE];
+    final String INSERT_PROJECT_FILES =
+        "INSERT INTO project_files (project_id, version, chunk, size, file) values (?,?,?,?,?)";
+
+    BufferedInputStream bufferedStream = null;
+    int chunk = 0;
+    try {
+      bufferedStream = new BufferedInputStream(new FileInputStream(localFile));
+      int size = bufferedStream.read(buffer);
+      while (size >= 0) {
+        logger.info("Read bytes for " + localFile.getName() + " size:" + size);
+        byte[] buf = buffer;
+        if (size < buffer.length) {
+          buf = Arrays.copyOfRange(buffer, 0, size);
+        }
+        try {
+          logger.info("Running update for " + localFile.getName() + " chunk " + chunk);
+          transOperator.update(INSERT_PROJECT_FILES, projectId, version, chunk, size, buf);
+
+          /*
+           * We enforce az committing to db when uploading every single chunk,
+           * in order to reduce the transaction duration and conserve sql server resources.
+           *
+           * If the files to be uploaded is very large and we don't commit every single chunk,
+           * the remote mysql server will run into memory troubles.
+           */
+          transOperator.getConnection().commit();
+          logger.info("Finished update for " + localFile.getName() + " chunk " + chunk);
+        } catch (final SQLException e) {
+          throw new ProjectManagerException("Error Chunking during uploading files to db...");
+        }
+        ++chunk;
+        size = bufferedStream.read(buffer);
+      }
+    } catch (final IOException e) {
+      throw new ProjectManagerException(
+          String.format("Error chunking file. projectId: %d, version: %d, file:%s[%d bytes], chunk: %d", projectId,
+              version, localFile.getName(), localFile.length(), chunk));
+    } finally {
+      IOUtils.closeQuietly(bufferedStream);
+    }
+    return chunk;
+  }
+
+  /**
+   * we update num_chunks's actual number to db here.
+   */
+  private void updateChunksInProjectVersions(final DatabaseTransOperator transOperator, final int projectId, final int version, final int chunk)
+      throws ProjectManagerException {
+
+    final String UPDATE_PROJECT_NUM_CHUNKS =
+        "UPDATE project_versions SET num_chunks=? WHERE project_id=? AND version=?";
+    try {
+      transOperator.update(UPDATE_PROJECT_NUM_CHUNKS, chunk, projectId, version);
+      transOperator.getConnection().commit();
+    } catch (final SQLException e) {
+      logger.error("Error updating project " + projectId + " : chunk_num " + chunk, e);
+      throw new ProjectManagerException("Error updating project " + projectId + " : chunk_num " + chunk, e);
+    }
+  }
+
+  @Override
+  public ProjectFileHandler fetchProjectMetaData(final int projectId, final int version) {
+    final ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
+    try {
+      final List<ProjectFileHandler> projectFiles =
+          this.dbOperator
+              .query(ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler, projectId, version);
+      if (projectFiles == null || projectFiles.isEmpty()) {
+        return null;
+      }
+      return projectFiles.get(0);
+    } catch (final SQLException ex) {
+      logger.error("Query for uploaded file for project id " + projectId + " failed.", ex);
+      throw new ProjectManagerException("Query for uploaded file for project id " + projectId + " failed.", ex);
+    }
+  }
+
+  @Override
+  public ProjectFileHandler getUploadedFile(final int projectId, final int version) throws ProjectManagerException {
+    final ProjectFileHandler projHandler = fetchProjectMetaData(projectId, version);
+    if (projHandler == null) {
+      return null;
+    }
+    final int numChunks = projHandler.getNumChunks();
+    BufferedOutputStream bStream = null;
+    File file;
+    try {
+      try {
+        file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), this.tempDir);
+        bStream = new BufferedOutputStream(new FileOutputStream(file));
+      } catch (final IOException e) {
+        throw new ProjectManagerException("Error creating temp file for stream.");
+      }
+
+      final int collect = 5;
+      int fromChunk = 0;
+      int toChunk = collect;
+      do {
+        final ProjectFileChunkResultHandler chunkHandler = new ProjectFileChunkResultHandler();
+        List<byte[]> data = null;
+        try {
+          data = this.dbOperator
+              .query(ProjectFileChunkResultHandler.SELECT_PROJECT_CHUNKS_FILE, chunkHandler, projectId,
+              version, fromChunk, toChunk);
+        } catch (final SQLException e) {
+          logger.error(e);
+          throw new ProjectManagerException("Query for uploaded file for " + projectId + " failed.", e);
+        }
+
+        try {
+          for (final byte[] d : data) {
+            bStream.write(d);
+          }
+        } catch (final IOException e) {
+          throw new ProjectManagerException("Error writing file", e);
+        }
+
+        // Add all the bytes to the stream.
+        fromChunk += collect;
+        toChunk += collect;
+      } while (fromChunk <= numChunks);
+    } finally {
+      IOUtils.closeQuietly(bStream);
+    }
+
+    // Check md5.
+    byte[] md5 = null;
+    try {
+      md5 = Md5Hasher.md5Hash(file);
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Error getting md5 hash.", e);
+    }
+
+    if (Arrays.equals(projHandler.getMd5Hash(), md5)) {
+      logger.info("Md5 Hash is valid");
+    } else {
+      throw new ProjectManagerException("Md5 Hash failed on retrieval of file");
+    }
+
+    projHandler.setLocalFile(file);
+    return projHandler;
+  }
+
+  @Override
+  public void changeProjectVersion(final Project project, final int version, final String user) throws ProjectManagerException {
+    final long timestamp = System.currentTimeMillis();
+    try {
+      final String UPDATE_PROJECT_VERSION =
+          "UPDATE projects SET version=?,modified_time=?,last_modified_by=? WHERE id=?";
+
+      this.dbOperator.update(UPDATE_PROJECT_VERSION, version, timestamp, user, project.getId());
+      project.setVersion(version);
+      project.setLastModifiedTimestamp(timestamp);
+      project.setLastModifiedUser(user);
+    } catch (final SQLException e) {
+      logger.error("Error updating switching project version " + project.getName(), e);
+      throw new ProjectManagerException("Error updating switching project version " + project.getName(), e);
+    }
+  }
+
+  @Override
+  public void updatePermission(final Project project, final String name, final Permission perm, final boolean isGroup)
+      throws ProjectManagerException {
+
+    final long updateTime = System.currentTimeMillis();
+    try {
+      if (this.dbOperator.getDataSource().allowsOnDuplicateKey()) {
+        final String INSERT_PROJECT_PERMISSION =
+            "INSERT INTO project_permissions (project_id, modified_time, name, permissions, isGroup) values (?,?,?,?,?)"
+                + "ON DUPLICATE KEY UPDATE modified_time = VALUES(modified_time), permissions = VALUES(permissions)";
+        this.dbOperator.update(INSERT_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+      } else {
+        final String MERGE_PROJECT_PERMISSION =
+            "MERGE INTO project_permissions (project_id, modified_time, name, permissions, isGroup) KEY (project_id, name) values (?,?,?,?,?)";
+        this.dbOperator.update(MERGE_PROJECT_PERMISSION, project.getId(), updateTime, name, perm.toFlags(), isGroup);
+      }
+    } catch (final SQLException ex) {
+      logger.error("Error updating project permission", ex);
+      throw new ProjectManagerException("Error updating project " + project.getName() + " permissions for " + name, ex);
+    }
+
+    if (isGroup) {
+      project.setGroupPermission(name, perm);
+    } else {
+      project.setUserPermission(name, perm);
+    }
+  }
+
+  @Override
+  public void updateProjectSettings(final Project project) throws ProjectManagerException {
+    updateProjectSettings(project, this.defaultEncodingType);
+  }
+
+  private byte[] convertJsonToBytes(final EncodingType type, final String json) throws IOException {
+    byte[] data = json.getBytes("UTF-8");
+    if (type == EncodingType.GZIP) {
+      data = GZIPUtils.gzipBytes(data);
+    }
+    return data;
+  }
+
+  private void updateProjectSettings(final Project project, final EncodingType encType) throws ProjectManagerException {
+    final String UPDATE_PROJECT_SETTINGS = "UPDATE projects SET enc_type=?, settings_blob=? WHERE id=?";
+
+    final String json = JSONUtils.toJSON(project.toObject());
+    byte[] data = null;
+    try {
+      data = convertJsonToBytes(encType, json);
+      logger.debug("NumChars: " + json.length() + " Gzip:" + data.length);
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Failed to encode. ", e);
+    }
+
+    try {
+      this.dbOperator.update(UPDATE_PROJECT_SETTINGS, encType.getNumVal(), data, project.getId());
+    } catch (final SQLException e) {
+      logger.error("update Project Settings failed.", e);
+      throw new ProjectManagerException(
+          "Error updating project " + project.getName() + " version " + project.getVersion(), e);
+    }
+  }
+
+  @Override
+  public void removePermission(final Project project, final String name, final boolean isGroup) throws ProjectManagerException {
+    final String DELETE_PROJECT_PERMISSION =
+        "DELETE FROM project_permissions WHERE project_id=? AND name=? AND isGroup=?";
+    try {
+      this.dbOperator.update(DELETE_PROJECT_PERMISSION, project.getId(), name, isGroup);
+    } catch (final SQLException e) {
+      logger.error("remove Permission failed.", e);
+      throw new ProjectManagerException("Error deleting project " + project.getName() + " permissions for " + name, e);
+    }
+
+    if (isGroup) {
+      project.removeGroupPermission(name);
+    } else {
+      project.removeUserPermission(name);
+    }
+  }
+
+  @Override
+  public List<Triple<String, Boolean, Permission>> getProjectPermissions(final Project project) throws ProjectManagerException {
+    return fetchPermissionsForProject(project);
+  }
+
+  /**
+   * Todo kunkun-tang: the below implementation doesn't remove a project, but inactivate a project.
+   * We should rewrite the code to follow the literal meanings.
+   */
+  @Override
+  public void removeProject(final Project project, final String user) throws ProjectManagerException {
+
+    final long updateTime = System.currentTimeMillis();
+    final String UPDATE_INACTIVE_PROJECT =
+        "UPDATE projects SET active=false,modified_time=?,last_modified_by=? WHERE id=?";
+    try {
+      this.dbOperator.update(UPDATE_INACTIVE_PROJECT, updateTime, user, project.getId());
+    } catch (final SQLException e) {
+      logger.error("error remove project " + project.getName(), e);
+      throw new ProjectManagerException("Error remove project " + project.getName(), e);
+    }
+  }
+
+  @Override
+  public boolean postEvent(final Project project, final EventType type, final String user, final String message) {
+    final String INSERT_PROJECT_EVENTS =
+        "INSERT INTO project_events (project_id, event_type, event_time, username, message) values (?,?,?,?,?)";
+    final long updateTime = System.currentTimeMillis();
+    try {
+      this.dbOperator
+          .update(INSERT_PROJECT_EVENTS, project.getId(), type.getNumVal(), updateTime, user, message);
+    } catch (final SQLException e) {
+      logger.error("post event failed,", e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public List<ProjectLogEvent> getProjectEvents(final Project project, final int num, final int skip) throws ProjectManagerException {
+    final ProjectLogsResultHandler logHandler = new ProjectLogsResultHandler();
+    List<ProjectLogEvent> events = null;
+    try {
+      events = this.dbOperator
+          .query(ProjectLogsResultHandler.SELECT_PROJECT_EVENTS_ORDER, logHandler, project.getId(), num,
+          skip);
+    } catch (final SQLException e) {
+      logger.error("Error getProjectEvents, project " + project.getName(), e);
+      throw new ProjectManagerException("Error getProjectEvents, project " + project.getName(), e);
+    }
+
+    return events;
+  }
+
+  @Override
+  public void updateDescription(final Project project, final String description, final String user) throws ProjectManagerException {
+    final String UPDATE_PROJECT_DESCRIPTION =
+        "UPDATE projects SET description=?,modified_time=?,last_modified_by=? WHERE id=?";
+    final long updateTime = System.currentTimeMillis();
+    try {
+      this.dbOperator.update(UPDATE_PROJECT_DESCRIPTION, description, updateTime, user, project.getId());
+      project.setDescription(description);
+      project.setLastModifiedTimestamp(updateTime);
+      project.setLastModifiedUser(user);
+    } catch (final SQLException e) {
+      logger.error(e);
+      throw new ProjectManagerException("Error update Description, project " + project.getName(), e);
+    }
+  }
+
+  @Override
+  public int getLatestProjectVersion(final Project project) throws ProjectManagerException {
+    final IntHandler handler = new IntHandler();
+    try {
+      return this.dbOperator.query(IntHandler.SELECT_LATEST_VERSION, handler, project.getId());
+    } catch (final SQLException e) {
+      logger.error(e);
+      throw new ProjectManagerException("Error marking project " + project.getName() + " as inactive", e);
+    }
+  }
+
+  @Override
+  public void uploadFlows(final Project project, final int version, final Collection<Flow> flows) throws ProjectManagerException {
+    // We do one at a time instead of batch... because well, the batch could be
+    // large.
+    logger.info("Uploading flows");
+    try {
+      for (final Flow flow : flows) {
+        uploadFlow(project, version, flow, this.defaultEncodingType);
+      }
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Flow Upload failed.", e);
+    }
+  }
+
+  @Override
+  public void uploadFlow(final Project project, final int version, final Flow flow) throws ProjectManagerException {
+    logger.info("Uploading flow " + flow.getId());
+    try {
+      uploadFlow(project, version, flow, this.defaultEncodingType);
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Flow Upload failed.", e);
+    }
+  }
+
+  @Override
+  public void updateFlow(final Project project, final int version, final Flow flow) throws ProjectManagerException {
+    logger.info("Uploading flow " + flow.getId());
+    try {
+      final String json = JSONUtils.toJSON(flow.toObject());
+      final byte[] data = convertJsonToBytes(this.defaultEncodingType, json);
+      logger.info("Flow upload " + flow.getId() + " is byte size " + data.length);
+      final String UPDATE_FLOW =
+          "UPDATE project_flows SET encoding_type=?,json=? WHERE project_id=? AND version=? AND flow_id=?";
+      try {
+        this.dbOperator
+            .update(UPDATE_FLOW, this.defaultEncodingType.getNumVal(), data, project.getId(), version, flow.getId());
+      } catch (final SQLException e) {
+        logger.error("Error inserting flow", e);
+        throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
+      }
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Flow Upload failed.", e);
+    }
+  }
+
+  private void uploadFlow(final Project project, final int version, final Flow flow, final EncodingType encType)
+      throws ProjectManagerException, IOException {
+    final String json = JSONUtils.toJSON(flow.toObject());
+    final byte[] data = convertJsonToBytes(encType, json);
+
+    logger.info("Flow upload " + flow.getId() + " is byte size " + data.length);
+    final String INSERT_FLOW =
+        "INSERT INTO project_flows (project_id, version, flow_id, modified_time, encoding_type, json) values (?,?,?,?,?,?)";
+    try {
+      this.dbOperator.update(INSERT_FLOW, project.getId(), version, flow.getId(), System.currentTimeMillis(),
+          encType.getNumVal(), data);
+    } catch (final SQLException e) {
+      logger.error("Error inserting flow", e);
+      throw new ProjectManagerException("Error inserting flow " + flow.getId(), e);
+    }
+  }
+
+  @Override
+  public Flow fetchFlow(final Project project, final String flowId) throws ProjectManagerException {
+    throw new UnsupportedOperationException("this method has not been instantiated.");
+  }
+
+  @Override
+  public List<Flow> fetchAllProjectFlows(final Project project) throws ProjectManagerException {
+    final ProjectFlowsResultHandler handler = new ProjectFlowsResultHandler();
+    List<Flow> flows = null;
+    try {
+      flows = this.dbOperator
+          .query(ProjectFlowsResultHandler.SELECT_ALL_PROJECT_FLOWS, handler, project.getId(),
+          project.getVersion());
+    } catch (final SQLException e) {
+      throw new ProjectManagerException(
+          "Error fetching flows from project " + project.getName() + " version " + project.getVersion(), e);
+    }
+    return flows;
+  }
+
+  @Override
+  public void uploadProjectProperties(final Project project, final List<Props> properties) throws ProjectManagerException {
+    for (final Props props : properties) {
+      try {
+        uploadProjectProperty(project, props.getSource(), props);
+      } catch (final IOException e) {
+        throw new ProjectManagerException("Error uploading project property file", e);
+      }
+    }
+  }
+
+  @Override
+  public void uploadProjectProperty(final Project project, final Props props) throws ProjectManagerException {
+    try {
+      uploadProjectProperty(project, props.getSource(), props);
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Error uploading project property file", e);
+    }
+  }
+
+  @Override
+  public void updateProjectProperty(final Project project, final Props props) throws ProjectManagerException {
+    try {
+      updateProjectProperty(project, props.getSource(), props);
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Error uploading project property file", e);
+    }
+  }
+
+  private void updateProjectProperty(final Project project, final String name, final Props props)
+      throws ProjectManagerException, IOException {
+    final String UPDATE_PROPERTIES =
+        "UPDATE project_properties SET property=? WHERE project_id=? AND version=? AND name=?";
+
+    final byte[] propsData = getBytes(props);
+    try {
+      this.dbOperator
+          .update(UPDATE_PROPERTIES, propsData, project.getId(), project.getVersion(), name);
+    } catch (final SQLException e) {
+      throw new ProjectManagerException(
+          "Error updating property " + project.getName() + " version " + project.getVersion(), e);
+    }
+  }
+
+  private void uploadProjectProperty(final Project project, final String name, final Props props)
+      throws ProjectManagerException, IOException {
+    final String INSERT_PROPERTIES =
+        "INSERT INTO project_properties (project_id, version, name, modified_time, encoding_type, property) values (?,?,?,?,?,?)";
+
+    final byte[] propsData = getBytes(props);
+    try {
+      this.dbOperator.update(INSERT_PROPERTIES, project.getId(), project.getVersion(), name, System.currentTimeMillis(),
+          this.defaultEncodingType.getNumVal(), propsData);
+    } catch (final SQLException e) {
+      throw new ProjectManagerException(
+          "Error uploading project properties " + name + " into " + project.getName() + " version "
+              + project.getVersion(), e);
+    }
+  }
+
+  private byte[] getBytes(final Props props) throws IOException {
+    final String propertyJSON = PropsUtils.toJSONString(props, true);
+    byte[] data = propertyJSON.getBytes("UTF-8");
+    if (this.defaultEncodingType == EncodingType.GZIP) {
+      data = GZIPUtils.gzipBytes(data);
+    }
+    return data;
+  }
+
+  @Override
+  public Props fetchProjectProperty(final int projectId, final int projectVer, final String propsName) throws ProjectManagerException {
+
+    final ProjectPropertiesResultsHandler handler = new ProjectPropertiesResultsHandler();
+    try {
+      final List<Pair<String, Props>> properties =
+          this.dbOperator
+              .query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTY, handler, projectId, projectVer,
+              propsName);
+
+      if (properties == null || properties.isEmpty()) {
+        logger.warn("Project " + projectId + " version " + projectVer + " property " + propsName + " is empty.");
+        return null;
+      }
+
+      return properties.get(0).getSecond();
+    } catch (final SQLException e) {
+      logger.error("Error fetching property " + propsName + " Project " + projectId + " version " + projectVer, e);
+      throw new ProjectManagerException("Error fetching property " + propsName, e);
+    }
+  }
+
+  @Override
+  public Props fetchProjectProperty(final Project project, final String propsName) throws ProjectManagerException {
+    return fetchProjectProperty(project.getId(), project.getVersion(), propsName);
+  }
+
+  @Override
+  public Map<String, Props> fetchProjectProperties(final int projectId, final int version) throws ProjectManagerException {
+
+    try {
+      final List<Pair<String, Props>> properties = this.dbOperator.query(ProjectPropertiesResultsHandler.SELECT_PROJECT_PROPERTIES,
+          new ProjectPropertiesResultsHandler(), projectId, version);
+      if (properties == null || properties.isEmpty()) {
+        return null;
+      }
+      final HashMap<String, Props> props = new HashMap<>();
+      for (final Pair<String, Props> pair : properties) {
+        props.put(pair.getFirst(), pair.getSecond());
+      }
+      return props;
+    } catch (final SQLException e) {
+      logger.error("Error fetching properties, project id" + projectId + " version " + version, e);
+      throw new ProjectManagerException("Error fetching properties", e);
+    }
+  }
+
+  @Override
+  public void cleanOlderProjectVersion(final int projectId, final int version) throws ProjectManagerException {
+    final String DELETE_FLOW = "DELETE FROM project_flows WHERE project_id=? AND version<?";
+    final String DELETE_PROPERTIES = "DELETE FROM project_properties WHERE project_id=? AND version<?";
+    final String DELETE_PROJECT_FILES = "DELETE FROM project_files WHERE project_id=? AND version<?";
+    final String UPDATE_PROJECT_VERSIONS = "UPDATE project_versions SET num_chunks=0 WHERE project_id=? AND version<?";
+
+    final SQLTransaction<Integer> cleanOlderProjectTransaction = transOperator -> {
+      transOperator.update(DELETE_FLOW, projectId, version);
+      transOperator.update(DELETE_PROPERTIES, projectId, version);
+      transOperator.update(DELETE_PROJECT_FILES, projectId, version);
+      return transOperator.update(UPDATE_PROJECT_VERSIONS, projectId, version);
+    };
+
+    try {
+      final int res = this.dbOperator.transaction(cleanOlderProjectTransaction);
+      if (res == 0) {
+        logger.info("clean older project given project id " + projectId + " doesn't take effect.");
+      }
+    } catch (final SQLException e) {
+      logger.error("clean older project transaction failed", e);
+      throw new ProjectManagerException("clean older project transaction failed", e);
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index a54095a..90ca0e7 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -30,7 +30,6 @@ import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.Triple;
 import com.google.common.io.Files;
-import com.google.inject.Inject;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -64,7 +63,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
   private EncodingType defaultEncodingType = EncodingType.GZIP;
 
-  @Inject
+  // TODO kunkun-tang: This class is going to be removed soon, as we are migrating to JdbcProjectImpl.
   public JdbcProjectLoader(final Props props, final CommonMetrics commonMetrics) {
     super(props, commonMetrics);
     this.tempDir = new File(props.getString("project.temp.dir", "temp"));
@@ -785,7 +784,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
   @Override
   public List<Triple<String, Boolean, Permission>> getProjectPermissions(
-      final int projectId) throws ProjectManagerException {
+      final Project project) throws ProjectManagerException {
     final ProjectPermissionsResultHandler permHander =
         new ProjectPermissionsResultHandler();
     final QueryRunner runner = createQueryRunner();
@@ -794,10 +793,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       permissions =
           runner.query(
               ProjectPermissionsResultHandler.SELECT_PROJECT_PERMISSION,
-              permHander, projectId);
+              permHander, project.getId());
     } catch (final SQLException e) {
       throw new ProjectManagerException("Query for permissions for "
-          + projectId + " failed.", e);
+          + project.getId() + " failed.", e);
     }
 
     return permissions;
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 84d320e..9245e06 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -200,7 +200,7 @@ public interface ProjectLoader {
   Props fetchProjectProperty(int projectId, int projectVer, String propsName)
       throws ProjectManagerException;
 
-  List<Triple<String, Boolean, Permission>> getProjectPermissions(int projectId)
+  List<Triple<String, Boolean, Permission>> getProjectPermissions(Project project)
       throws ProjectManagerException;
 
   void updateProjectSettings(Project project) throws ProjectManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerImpl.java b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerImpl.java
index 65261b8..1d25215 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerImpl.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/JdbcTriggerImpl.java
@@ -115,6 +115,7 @@ public class JdbcTriggerImpl implements TriggerLoader {
 
     final SQLTransaction<Long> insertAndGetLastID = transOperator -> {
       transOperator.update(ADD_TRIGGER, DateTime.now().getMillis());
+      // This commit must be called in order to unlock trigger table and have last insert ID.
       transOperator.getConnection().commit();
       return transOperator.getLastInsertId();
     };
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
new file mode 100644
index 0000000..031ddee
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectImplTest.java
@@ -0,0 +1,367 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.project;
+
+import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.db.AzDBTestUtility;
+import azkaban.db.AzkabanDataSource;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.DatabaseOperatorImpl;
+import azkaban.flow.Flow;
+import azkaban.user.Permission;
+import azkaban.user.User;
+import azkaban.utils.Md5Hasher;
+import azkaban.utils.Props;
+import azkaban.utils.Triple;
+import java.io.File;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.dbutils.QueryRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class JdbcProjectImplTest {
+
+  private static final String SAMPLE_FILE = "sample_flow_01.zip";
+  private static final Props props = new Props();
+  private static DatabaseOperator dbOperator;
+  private ProjectLoader loader;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    final AzkabanDataSource dataSource = new AzDBTestUtility.EmbeddedH2BasicDataSource();
+    dbOperator = new DatabaseOperatorImpl(new QueryRunner(dataSource));
+
+    final String sqlScriptsDir = new File("../azkaban-db/src/main/sql/").getCanonicalPath();
+    props.put("database.sql.scripts.dir", sqlScriptsDir);
+
+    // TODO kunkun-tang: Need to refactor AzkabanDatabaseSetup to accept datasource in azakaban-db
+    final azkaban.database.AzkabanDataSource dataSourceForSetupDB =
+        new azkaban.database.AzkabanConnectionPoolTest.EmbeddedH2BasicDataSource();
+    final AzkabanDatabaseSetup setup = new AzkabanDatabaseSetup(dataSourceForSetupDB, props);
+    setup.loadTableInfo();
+    setup.updateDatabase(true, false);
+  }
+
+  @AfterClass
+  public static void destroyDB() {
+    try {
+      dbOperator.update("DROP ALL OBJECTS");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Before
+  public void setup() {
+    this.loader = new JdbcProjectImpl(props, dbOperator);
+  }
+
+  private void createThreeProjects() {
+    final String projectName = "mytestProject";
+    final String projectDescription = "This is my new project";
+    final User user = new User("testUser1");
+    this.loader.createNewProject(projectName, projectDescription, user);
+    final String projectName2 = "mytestProject2";
+    final String projectDescription2 = "This is my new project2";
+    this.loader.createNewProject(projectName2, projectDescription2, user);
+    final String projectName3 = "mytestProject3";
+    final String projectDescription3 = "This is my new project3";
+    final User user2 = new User("testUser2");
+    this.loader.createNewProject(projectName3, projectDescription3, user2);
+  }
+
+  @Test
+  public void testCreateProject() throws Exception {
+    final String projectName = "mytestProject";
+    final String projectDescription = "This is my new project";
+    final User user = new User("testUser1");
+    final Project project = this.loader.createNewProject(projectName, projectDescription, user);
+    Assert.assertEquals(project.getName(), projectName);
+    Assert.assertEquals(project.getDescription(), projectDescription);
+    Assert.assertEquals(project.getLastModifiedUser(), "testUser1");
+  }
+
+  @Test
+  public void testFetchAllActiveProjects() throws Exception {
+    createThreeProjects();
+    final List<Project> projectList = this.loader.fetchAllActiveProjects();
+    Assert.assertEquals(projectList.size(), 3);
+  }
+
+  @Test
+  public void testFetchProjectByName() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    Assert.assertEquals(project.getName(), "mytestProject");
+    Assert.assertEquals(project.getDescription(), "This is my new project");
+    Assert.assertEquals(project.getLastModifiedUser(), "testUser1");
+  }
+
+  @Test
+  public void testFetchProjectById() throws Exception {
+    createThreeProjects();
+    final Project project1 = this.loader.fetchProjectByName("mytestProject");
+    final Project project2 = this.loader.fetchProjectById(project1.getId());
+    Assert.assertEquals(project1.getName(), project2.getName());
+    Assert.assertEquals(project1.getDescription(), project2.getDescription());
+    Assert.assertEquals(project1.getLastModifiedUser(), project2.getLastModifiedUser());
+  }
+
+  @Test
+  public void testUploadProjectFile() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+    final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+    this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+
+    final ProjectFileHandler fileHandler = this.loader.getUploadedFile(project.getId(), newVersion);
+    Assert.assertEquals(fileHandler.getFileName(), SAMPLE_FILE);
+    Assert.assertEquals(fileHandler.getUploader(), "uploadUser1");
+  }
+
+  @Test(expected = ProjectManagerException.class)
+  public void testDuplicateUploadProjectFile() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+    final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+    this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+    this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+  }
+
+  private byte[] computeHash(final File localFile) {
+    final byte[] md5;
+    try {
+      md5 = Md5Hasher.md5Hash(localFile);
+    } catch (final IOException e) {
+      throw new ProjectManagerException("Error getting md5 hash.", e);
+    }
+    return md5;
+  }
+
+  @Test
+  public void testAddProjectVersion() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+    final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+    this.loader.addProjectVersion(project.getId(), newVersion, testFile, "uploadUser1", computeHash(testFile), "resourceId1");
+    final int currVersion = this.loader.getLatestProjectVersion(project);
+    Assert.assertEquals(currVersion, newVersion);
+  }
+
+  @Test
+  public void testFetchProjectMetaData() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+    final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+    this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+    final ProjectFileHandler pfh = this.loader.fetchProjectMetaData(project.getId(), newVersion);
+    Assert.assertEquals(pfh.getVersion(), newVersion);
+  }
+
+  @Test
+  public void testChangeProjectVersion() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    final int newVersion = this.loader.getLatestProjectVersion(project) + 7;
+    this.loader.changeProjectVersion(project, newVersion, "uploadUser1");
+    final Project sameProject= this.loader.fetchProjectById(project.getId());
+    Assert.assertEquals(sameProject.getVersion(), newVersion);
+  }
+
+  @Test
+  public void testUpdatePermission() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    this.loader.updatePermission(project, project.getLastModifiedUser(), new Permission(Permission.Type.ADMIN), false);
+
+    final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader.getProjectPermissions(project);
+    Assert.assertEquals(permissionsTriple.size(), 1);
+    Assert.assertEquals(permissionsTriple.get(0).getFirst(), "testUser1");
+    Assert.assertEquals(permissionsTriple.get(0).getThird().toString(), "ADMIN");
+  }
+
+  @Test
+  public void testUpdateProjectSettings() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    Assert.assertEquals(project.getProxyUsers().size(), 0);
+    project.addProxyUser("ProxyUser");
+    this.loader.updateProjectSettings(project);
+    final Project sameProject = this.loader.fetchProjectByName("mytestProject");
+    Assert.assertEquals(sameProject.getProxyUsers().size(), 1);
+  }
+
+  @Test
+  public void testRemovePermission() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    this.loader.updatePermission(project, project.getLastModifiedUser(), new Permission(Permission.Type.ADMIN), false);
+    this.loader.removePermission(project, project.getLastModifiedUser(), false);
+    final List<Triple<String, Boolean, Permission>> permissionsTriple = this.loader.getProjectPermissions(project);
+    Assert.assertEquals(permissionsTriple.size(), 0);
+  }
+
+  @Test
+  public void testRemoveProject() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    Assert.assertEquals(project.isActive(), true);
+    this.loader.removeProject(project, "testUser1");
+    final Project removedProject = this.loader.fetchProjectByName("mytestProject");
+    Assert.assertEquals(removedProject.isActive(), false);
+  }
+
+  @Test
+  public void testPostAndGetEvent() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    this.loader
+        .postEvent(project, ProjectLogEvent.EventType.CREATED, "testUser1", "create a message bla");
+    final List<ProjectLogEvent> events = this.loader.getProjectEvents(project, 5, 0);
+    Assert.assertEquals(events.size(), 1);
+    Assert.assertEquals(events.get(0).getMessage(), "create a message bla");
+  }
+
+  @Test
+  public void testUpdateDescription() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    this.loader.updateDescription(project, "new Description bla", "testUser1");
+    final Project sameProject = this.loader.fetchProjectByName("mytestProject");
+    Assert.assertEquals(sameProject.getDescription(), "new Description bla");
+  }
+
+  @Test
+  public void testUploadAndFetchFlow() throws Exception {
+    final Flow flow1 = new Flow("flow1");
+    final Flow flow2 = new Flow("flow2");
+    final List<Flow> flowList = Arrays.asList(flow1, flow2);
+
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    this.loader.uploadFlows(project, project.getVersion(), flowList);
+
+    final List<Flow> flowList2 = this.loader.fetchAllProjectFlows(project);
+    Assert.assertEquals(flowList2.size(), 2);
+  }
+
+  @Test
+  public void testUpdateFlow() throws Exception {
+    final Flow flow1 = new Flow("flow1");
+    final List<Flow> flowList = Collections.singletonList(flow1);
+
+    flow1.setLayedOut(false);
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    this.loader.uploadFlows(project, project.getVersion(), flowList);
+
+    flow1.setLayedOut(true);
+    this.loader.updateFlow(project, project.getVersion(), flow1);
+    final List<Flow> flowList2 = this.loader.fetchAllProjectFlows(project);
+    Assert.assertEquals(flowList2.get(0).isLayedOut(), true);
+  }
+
+  @Test
+  public void testUploadOrUpdateProjectProperty() throws Exception {
+    final Props props = new Props();
+    props.setSource("source1");
+    props.put("key1", "value1");
+    props.put("key2", "value2");
+
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    this.loader.uploadProjectProperty(project, props);
+
+    final Props sameProps = this.loader.fetchProjectProperty(project, props.getSource());
+    Assert.assertEquals(sameProps.get("key1"), "value1");
+    Assert.assertEquals(sameProps.get("key2"), "value2");
+
+    props.put("key2", "value9");
+    this.loader.updateProjectProperty(project, props);
+
+    final Props sameProps2 = this.loader.fetchProjectProperty(project, props.getSource());
+    Assert.assertEquals(sameProps2.get("key2"), "value9");
+  }
+
+  @Test
+  public void testFetchProjectProperties() throws Exception {
+    final Props props1 = new Props();
+    props1.setSource("source1");
+    props1.put("key1", "value1");
+    props1.put("key2", "value2");
+
+    final Props props2 = new Props();
+    props2.setSource("source2");
+    props2.put("keykey", "valuevalue1");
+    props2.put("keyaaa", "valueaaa");
+    final List<Props> list = Arrays.asList(props1, props2);
+
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    this.loader.uploadProjectProperties(project, list);
+
+    final Map<String, Props> propsMap = this.loader.fetchProjectProperties(project.getId(), project.getVersion());
+    Assert.assertEquals(propsMap.get("source1").get("key2"), "value2");
+    Assert.assertEquals(propsMap.get("source2").get("keyaaa"), "valueaaa");
+  }
+
+  @Test
+  public void cleanOlderProjectVersion() throws Exception {
+    createThreeProjects();
+    final Project project = this.loader.fetchProjectByName("mytestProject");
+    final File testFile = new File(getClass().getClassLoader().getResource(SAMPLE_FILE).getFile());
+    final int newVersion = this.loader.getLatestProjectVersion(project) + 1;
+    this.loader.uploadProjectFile(project.getId(), newVersion, testFile, "uploadUser1");
+
+    final ProjectFileHandler fileHandler = this.loader.getUploadedFile(project.getId(), newVersion);
+    Assert.assertEquals(fileHandler.getNumChunks(), 1);
+
+    this.loader.cleanOlderProjectVersion(project.getId(), newVersion+1);
+
+    final ProjectFileHandler fileHandler2 = this.loader
+        .fetchProjectMetaData(project.getId(), newVersion);
+    Assert.assertEquals(fileHandler2.getNumChunks(), 0);
+  }
+
+  @After
+  public void clearDB() {
+    try {
+      dbOperator.update("DELETE FROM projects");
+      dbOperator.update("DELETE FROM project_versions");
+      dbOperator.update("DELETE FROM project_properties");
+      dbOperator.update("DELETE FROM project_permissions");
+      dbOperator.update("DELETE FROM project_flows");
+      dbOperator.update("DELETE FROM project_files");
+      dbOperator.update("DELETE FROM project_events");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index c05b596..f41a505 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -231,7 +231,7 @@ public class MockProjectLoader implements ProjectLoader {
 
   @Override
   public List<Triple<String, Boolean, Permission>> getProjectPermissions(
-      final int projectId) throws ProjectManagerException {
+      final Project project) throws ProjectManagerException {
     // TODO Auto-generated method stub
     return null;
   }
diff --git a/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java b/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
index a44d31b..ac3d6bd 100644
--- a/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
+++ b/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
@@ -21,7 +21,7 @@ import static azkaban.ServiceProvider.SERVICE_PROVIDER;
 import static org.junit.Assert.assertNotNull;
 
 import azkaban.db.DatabaseOperator;
-import azkaban.project.JdbcProjectLoader;
+import azkaban.project.JdbcProjectImpl;
 import azkaban.spi.Storage;
 import azkaban.storage.DatabaseStorage;
 import azkaban.storage.LocalStorage;
@@ -58,7 +58,7 @@ public class ServiceProviderTest {
     SERVICE_PROVIDER.unsetInjector();
     SERVICE_PROVIDER.setInjector(injector);
 
-    assertNotNull(SERVICE_PROVIDER.getInstance(JdbcProjectLoader.class));
+    assertNotNull(SERVICE_PROVIDER.getInstance(JdbcProjectImpl.class));
     assertNotNull(SERVICE_PROVIDER.getInstance(StorageManager.class));
     assertNotNull(SERVICE_PROVIDER.getInstance(DatabaseStorage.class));
     assertNotNull(SERVICE_PROVIDER.getInstance(LocalStorage.class));
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
index 5af5532..c001ac6 100644
--- a/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseOperator.java
@@ -59,4 +59,9 @@ public interface DatabaseOperator {
    * @return The number of rows updated.
    */
   int update(String updateClause, Object... params) throws SQLException;
+
+   /**
+    * @return datasource wrapped in the database operator.
+    */
+   AzkabanDataSource getDataSource();
 }
diff --git a/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java b/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java
index 68abc9f..556a534 100644
--- a/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java
+++ b/azkaban-db/src/main/java/azkaban/db/DatabaseOperatorImpl.java
@@ -15,7 +15,7 @@
  */
 package azkaban.db;
 
-import static java.util.Objects.*;
+import static java.util.Objects.requireNonNull;
 
 import com.google.inject.Inject;
 import java.sql.Connection;
@@ -101,4 +101,9 @@ public class DatabaseOperatorImpl implements DatabaseOperator {
       throw ex;
     }
   }
+
+  @Override
+  public AzkabanDataSource getDataSource() {
+    return (AzkabanDataSource) this.queryRunner.getDataSource();
+  }
 }
diff --git a/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java b/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
index d070730..c6bc817 100644
--- a/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
+++ b/azkaban-db/src/test/java/azkaban/db/AzDBTestUtility.java
@@ -16,11 +16,11 @@
  */
 package azkaban.db;
 
-class AzDBTestUtility {
+public class AzDBTestUtility {
 
   public static class EmbeddedH2BasicDataSource extends AzkabanDataSource {
 
-    EmbeddedH2BasicDataSource() {
+    public EmbeddedH2BasicDataSource() {
       super();
       final String url = "jdbc:h2:mem:test";
       setDriverClassName("org.h2.Driver");