azkaban-aplcache

Implemented put API for Database Storage (#998) This change

4/24/2017 6:41:05 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 83aaf7c..11424f6 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -49,7 +49,7 @@ public class AzkabanCommonModule extends AbstractModule {
 
   public AzkabanCommonModule(Props props) {
     this.props = props;
-    this.storageImplementation = props.getString(Constants.ConfigurationKeys.AZKABAN_STORAGE_TYPE, LOCAL.name());
+    this.storageImplementation = props.getString(Constants.ConfigurationKeys.AZKABAN_STORAGE_TYPE, DATABASE.name());
   }
 
   @Override
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 6f55a9c..806f7b0 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -16,6 +16,7 @@
 
 package azkaban.project;
 
+import com.google.common.io.Files;
 import com.google.inject.Inject;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -350,20 +351,23 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
   }
 
   @Override
-  public void uploadProjectFile(Project project, int version, String filetype,
-      String filename, File localFile, String uploader)
+  public void uploadProjectFile(int projectId, int version, File localFile, String uploader)
       throws ProjectManagerException {
     long startMs = System.currentTimeMillis();
-    logger.info("Uploading to " + project.getName() + " version:" + version
-        + " file:" + filename);
+    logger.info(String.format("Uploading Project ID: %d file: %s [%d bytes]",
+        projectId, localFile.getName(), localFile.length()));
     Connection connection = getConnection();
 
     try {
-      uploadProjectFile(connection, project, version, filetype, filename,
-          localFile, uploader);
+      /* Update DB with new project info */
+      addProjectToProjectVersions(connection, projectId, version, localFile, uploader);
+
+      uploadProjectFile(connection, projectId, version, localFile);
+
       connection.commit();
-      logger.info("project " + project.getName() + " commiting upload " + localFile.getName()
-              + " took " + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
+      long duration = (System.currentTimeMillis() - startMs) / 1000;
+      logger.info(String.format("Uploaded Project ID: %d file: %s [%d bytes] in %d sec",
+          projectId, localFile.getName(), localFile.length(), duration));
     } catch (SQLException e) {
       logger.error(e);
       throw new ProjectManagerException("Error getting DB connection.", e);
@@ -372,14 +376,43 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     }
   }
 
-  private void uploadProjectFile(Connection connection, Project project,
-      int version, String filetype, String filename, File localFile,
+  private void uploadProjectFile(Connection connection, int projectId, int version, File localFile)
+      throws ProjectManagerException {
+    /* Step 1: Upload File in chunks to DB */
+    int chunks = uploadFileInChunks(connection, projectId, version, localFile);
+
+    /* Step 2: Update number of chunks in DB */
+    updateChunksInProjectVersions(connection, projectId, version, chunks);
+  }
+
+  /**
+   * Insert a new version record to TABLE project_versions before uploading files.
+   *
+   * The reason for this operation:
+   * When error chunking happens in remote mysql server, incomplete file data remains
+   * in DB, and an SQL exception is thrown. If we don't have this operation before uploading file,
+   * the SQL exception prevents AZ from creating the new version record in Table project_versions.
+   * However, the Table project_files still reserve the incomplete files, which causes troubles
+   * when uploading a new file: Since the version in TABLE project_versions is still old, mysql will stop
+   * inserting new files to db.
+   *
+   * Why this operation is safe:
+   * When AZ uploads a new zip file, it always fetches the latest version proj_v from TABLE project_version,
+   * proj_v+1 will be used as the new version for the uploading files.
+   *
+   * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version + 1).
+   * When we upload a new project zip in day2, new file in day 2 will use the new version (proj_v + 1).
+   * When file uploading completes, AZ will clean all old chunks in DB afterward.
+   */
+  private void addProjectToProjectVersions(Connection connection,
+      int projectId,
+      int version,
+      File localFile,
       String uploader) throws ProjectManagerException {
+    final long updateTime = System.currentTimeMillis();
     QueryRunner runner = new QueryRunner();
-    long updateTime = System.currentTimeMillis();
-
     logger.info("Creating message digest for upload " + localFile.getName());
-    byte[] md5 = null;
+    byte[] md5;
     try {
       md5 = Md5Hasher.md5Hash(localFile);
     } catch (IOException e) {
@@ -388,41 +421,35 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
     logger.info("Md5 hash created");
 
-    /**
-     * Insert a new version record to TABLE project_versions before uploading files.
-     *
-     * The reason for this operation:
-     * When error chunking happens in remote mysql server, incomplete file data remains
-     * in DB, and an SQL exception is thrown. If we don't have this operation before uploading file,
-     * the SQL exception prevents AZ from creating the new version record in Table project_versions.
-     * However, the Table project_files still reserve the incomplete files, which causes troubles
-     * when uploading a new file: Since the version in TABLE project_versions is still old, mysql will stop
-     * inserting new files to db.
-     *
-     * Why this operation is safe:
-     * When AZ uploads a new zip file, it always fetches the latest version proj_v from TABLE project_version,
-     * proj_v+1 will be used as the new version for the uploading files.
-     *
-     * Assume error chunking happens on day 1. proj_v is created for this bad file (old file version + 1).
-     * When we upload a new project zip in day2, new file in day 2 will use the new version (proj_v + 1).
-     * When file uploading completes, AZ will clean all old chunks in DB afterward.
-     */
-    final String INSERT_PROJECT_VERSION =
-        "INSERT INTO project_versions (project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks) values (?,?,?,?,?,?,?,?)";
+    final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
+        + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks) values "
+        + "(?,?,?,?,?,?,?,?)";
 
     try {
-
       /**
        * As we don't know the num_chunks before uploading the file, we initialize it to 0,
        * and will update it after uploading completes.
        */
-      runner.update(connection, INSERT_PROJECT_VERSION, project.getId(),
-          version, updateTime, uploader, filetype, filename, md5, 0);
+      runner.update(connection,
+          INSERT_PROJECT_VERSION,
+          projectId,
+          version,
+          updateTime,
+          uploader,
+          Files.getFileExtension(localFile.getName()),
+          localFile.getName(),
+          md5,
+          0);
     } catch (SQLException e) {
-      logger.error(e);
-      throw new ProjectManagerException("Error initializing project version "
-          + project.getName(), e);
+      String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
+      logger.error(msg, e);
+      throw new ProjectManagerException(msg, e);
     }
+  }
+
+  private int uploadFileInChunks(Connection connection, int projectId, int version, File localFile)
+      throws ProjectManagerException {
+    QueryRunner runner = new QueryRunner();
 
     // Really... I doubt we'll get a > 2gig file. So int casting it is!
     byte[] buffer = new byte[CHUCK_SIZE];
@@ -435,15 +462,14 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       bufferedStream = new BufferedInputStream(new FileInputStream(localFile));
       int size = bufferedStream.read(buffer);
       while (size >= 0) {
-        logger.info("Read bytes for " + filename + " size:" + size);
+        logger.info("Read bytes for " + localFile.getName() + " size:" + size);
         byte[] buf = buffer;
         if (size < buffer.length) {
           buf = Arrays.copyOfRange(buffer, 0, size);
         }
         try {
-          logger.info("Running update for " + filename + " chunk " + chunk);
-          runner.update(connection, INSERT_PROJECT_FILES, project.getId(),
-              version, chunk, size, buf);
+          logger.info("Running update for " + localFile.getName() + " chunk " + chunk);
+          runner.update(connection, INSERT_PROJECT_FILES, projectId, version, chunk, size, buf);
 
           /**
            * We enforce az committing to db when uploading every single chunk,
@@ -453,7 +479,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
            * the remote mysql server will run into memory troubles.
            */
           connection.commit();
-          logger.info("Finished update for " + filename + " chunk " + chunk);
+          logger.info("Finished update for " + localFile.getName() + " chunk " + chunk);
         } catch (SQLException e) {
           throw new ProjectManagerException("Error Chunking during uploading files to db...");
         }
@@ -462,27 +488,34 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
         size = bufferedStream.read(buffer);
       }
     } catch (IOException e) {
-      throw new ProjectManagerException("Error chunking file " + filename);
+      throw new ProjectManagerException(String.format(
+          "Error chunking file. projectId: %d, version: %d, file:%s[%d bytes], chunk: %d",
+          projectId, version, localFile.getName(), localFile.length(), chunk));
     } finally {
       IOUtils.closeQuietly(bufferedStream);
     }
+    return chunk;
+  }
+
+  /**
+   * we update num_chunks's actual number to db here.
+   */
+  private void updateChunksInProjectVersions(Connection connection, int projectId, int version, int chunk)
+      throws ProjectManagerException {
 
-    /**
-     * we update num_chunks's actual number to db here.
-     */
     final String UPDATE_PROJECT_NUM_CHUNKS =
         "UPDATE project_versions SET num_chunks=? WHERE project_id=? AND version=?";
 
+    QueryRunner runner = new QueryRunner();
     try {
-      runner.update(connection, UPDATE_PROJECT_NUM_CHUNKS, chunk, project.getId(), version);
+      runner.update(connection, UPDATE_PROJECT_NUM_CHUNKS, chunk, projectId, version);
       connection.commit();
     } catch (SQLException e) {
-      throw new ProjectManagerException(
-          "Error updating project " + project.getId() + " : chunk_num "
-              + chunk, e);
+      throw new ProjectManagerException("Error updating project " + projectId + " : chunk_num " + chunk, e);
     }
   }
 
+
   @Override
   public ProjectFileHandler getUploadedFile(Project project, int version)
       throws ProjectManagerException {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index aa886d6..c7af71c 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -36,7 +36,7 @@ public interface ProjectLoader {
    * @return
    * @throws ProjectManagerException
    */
-  public List<Project> fetchAllActiveProjects() throws ProjectManagerException;
+  List<Project> fetchAllActiveProjects() throws ProjectManagerException;
 
   /**
    * Loads whole project, including permissions, by the project id.
@@ -45,7 +45,7 @@ public interface ProjectLoader {
    * @return
    * @throws ProjectManagerException
    */
-  public Project fetchProjectById(int id) throws ProjectManagerException;
+  Project fetchProjectById(int id) throws ProjectManagerException;
 
   /**
    * Loads whole project, including permissions, by the project name.
@@ -53,7 +53,7 @@ public interface ProjectLoader {
    * @return
    * @throws ProjectManagerException
    */
-  public Project fetchProjectByName(String name) throws ProjectManagerException;
+  Project fetchProjectByName(String name) throws ProjectManagerException;
 
   /**
    * Should create an empty project with the given name and user and adds it to
@@ -69,7 +69,7 @@ public interface ProjectLoader {
    * @throws ProjectManagerException if an active project of the same name
    *           exists.
    */
-  public Project createNewProject(String name, String description, User creator)
+  Project createNewProject(String name, String description, User creator)
       throws ProjectManagerException;
 
   /**
@@ -78,7 +78,7 @@ public interface ProjectLoader {
    * @param project
    * @throws ProjectManagerException
    */
-  public void removeProject(Project project, String user)
+  void removeProject(Project project, String user)
       throws ProjectManagerException;
 
   /**
@@ -92,10 +92,10 @@ public interface ProjectLoader {
    * @param isGroup
    * @throws ProjectManagerException
    */
-  public void updatePermission(Project project, String name, Permission perm,
+  void updatePermission(Project project, String name, Permission perm,
       boolean isGroup) throws ProjectManagerException;
 
-  public void removePermission(Project project, String name, boolean isGroup)
+  void removePermission(Project project, String name, boolean isGroup)
       throws ProjectManagerException;
 
   /**
@@ -105,7 +105,7 @@ public interface ProjectLoader {
    * @param description
    * @throws ProjectManagerException
    */
-  public void updateDescription(Project project, String description, String user)
+  void updateDescription(Project project, String description, String user)
       throws ProjectManagerException;
 
   /**
@@ -116,7 +116,7 @@ public interface ProjectLoader {
    * @param type
    * @param message return true if the posting was success.
    */
-  public boolean postEvent(Project project, EventType type, String user,
+  boolean postEvent(Project project, EventType type, String user,
       String message);
 
   /**
@@ -125,14 +125,13 @@ public interface ProjectLoader {
    * @param project
    * @return
    */
-  public List<ProjectLogEvent> getProjectEvents(Project project, int num,
+  List<ProjectLogEvent> getProjectEvents(Project project, int num,
       int skip) throws ProjectManagerException;
 
   /**
    * Will upload the files and return the version number of the file uploaded.
    */
-  public void uploadProjectFile(Project project, int version, String filetype,
-      String filename, File localFile, String user)
+  void uploadProjectFile(int projectId, int version, File localFile, String user)
       throws ProjectManagerException;
 
   /**
@@ -140,7 +139,7 @@ public interface ProjectLoader {
    *
    * @return
    */
-  public ProjectFileHandler getUploadedFile(Project project, int version)
+  ProjectFileHandler getUploadedFile(Project project, int version)
       throws ProjectManagerException;
 
   /**
@@ -148,7 +147,7 @@ public interface ProjectLoader {
    *
    * @return
    */
-  public ProjectFileHandler getUploadedFile(int projectId, int version)
+  ProjectFileHandler getUploadedFile(int projectId, int version)
       throws ProjectManagerException;
 
   /**
@@ -158,10 +157,10 @@ public interface ProjectLoader {
    * @param version
    * @throws ProjectManagerException
    */
-  public void changeProjectVersion(Project project, int version, String user)
+  void changeProjectVersion(Project project, int version, String user)
       throws ProjectManagerException;
 
-  public void updateFlow(Project project, int version, Flow flow)
+  void updateFlow(Project project, int version, Flow flow)
       throws ProjectManagerException;
 
   /**
@@ -172,7 +171,7 @@ public interface ProjectLoader {
    * @param flows
    * @throws ProjectManagerException
    */
-  public void uploadFlows(Project project, int version, Collection<Flow> flows)
+  void uploadFlows(Project project, int version, Collection<Flow> flows)
       throws ProjectManagerException;
 
   /**
@@ -183,7 +182,7 @@ public interface ProjectLoader {
    * @param flow
    * @throws ProjectManagerException
    */
-  public void uploadFlow(Project project, int version, Flow flow)
+  void uploadFlow(Project project, int version, Flow flow)
       throws ProjectManagerException;
 
   /**
@@ -194,7 +193,7 @@ public interface ProjectLoader {
    * @param flowId
    * @throws ProjectManagerException
    */
-  public Flow fetchFlow(Project project, String flowId)
+  Flow fetchFlow(Project project, String flowId)
       throws ProjectManagerException;
 
   /**
@@ -205,13 +204,13 @@ public interface ProjectLoader {
    * @param flowId
    * @throws ProjectManagerException
    */
-  public List<Flow> fetchAllProjectFlows(Project project)
+  List<Flow> fetchAllProjectFlows(Project project)
       throws ProjectManagerException;
 
   /**
    * Gets the latest upload version.
    */
-  public int getLatestProjectVersion(Project project)
+  int getLatestProjectVersion(Project project)
       throws ProjectManagerException;
 
   /**
@@ -222,7 +221,7 @@ public interface ProjectLoader {
    * @param properties
    * @throws ProjectManagerException
    */
-  public void uploadProjectProperty(Project project, Props props)
+  void uploadProjectProperty(Project project, Props props)
       throws ProjectManagerException;
 
   /**
@@ -233,7 +232,7 @@ public interface ProjectLoader {
    * @param properties
    * @throws ProjectManagerException
    */
-  public void uploadProjectProperties(Project project, List<Props> properties)
+  void uploadProjectProperties(Project project, List<Props> properties)
       throws ProjectManagerException;
 
   /**
@@ -244,7 +243,7 @@ public interface ProjectLoader {
    * @return
    * @throws ProjectManagerException
    */
-  public Props fetchProjectProperty(Project project, String propsName)
+  Props fetchProjectProperty(Project project, String propsName)
       throws ProjectManagerException;
 
   /**
@@ -254,7 +253,7 @@ public interface ProjectLoader {
    * @return
    * @throws ProjectManagerException
    */
-  public Map<String, Props> fetchProjectProperties(int projectId, int version)
+  Map<String, Props> fetchProjectProperties(int projectId, int version)
       throws ProjectManagerException;
 
   /**
@@ -264,10 +263,10 @@ public interface ProjectLoader {
    * @param version
    * @throws ProjectManagerException
    */
-  public void cleanOlderProjectVersion(int projectId, int version)
+  void cleanOlderProjectVersion(int projectId, int version)
       throws ProjectManagerException;
 
-  public void updateProjectProperty(Project project, Props props)
+  void updateProjectProperty(Project project, Props props)
       throws ProjectManagerException;
 
   Props fetchProjectProperty(int projectId, int projectVer, String propsName)
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 6ea28c3..6e77d78 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -16,6 +16,8 @@
 
 package azkaban.project;
 
+import azkaban.storage.StorageManager;
+import com.google.inject.Inject;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -23,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 import java.util.zip.ZipFile;
@@ -45,17 +46,24 @@ import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.Utils;
 
+import static java.util.Objects.*;
+
+
 public class ProjectManager {
   private static final Logger logger = Logger.getLogger(ProjectManager.class);
   private final ProjectLoader projectLoader;
+  private final StorageManager storageManager;
   private final Props props;
   private final File tempDir;
   private final int projectVersionRetention;
   private final boolean creatorDefaultPermissions;
 
-  public ProjectManager(ProjectLoader loader, Props props) {
-    this.projectLoader = loader;
-    this.props = props;
+  @Inject
+  public ProjectManager(ProjectLoader loader, StorageManager storageManager, Props props) {
+    this.projectLoader = requireNonNull(loader);
+    this.storageManager = requireNonNull(storageManager);
+    this.props = requireNonNull(props);
+
     this.tempDir = new File(this.props.getString("project.temp.dir", "temp"));
     this.projectVersionRetention =
         (props.getInt("project.version.retention", 3));
@@ -492,9 +500,8 @@ public class ProjectManager {
         flow.setVersion(newVersion);
       }
 
-      logger.info("Uploading file to db " + archive.getName());
-      projectLoader.uploadProjectFile(project, newVersion, fileType,
-          archive.getName(), archive, uploader.getUserId());
+      storageManager.uploadProject(project, newVersion, archive, uploader);
+
       logger.info("Uploading flow to db " + archive.getName());
       projectLoader.uploadFlows(project, newVersion, flows.values());
       logger.info("Changing project versions " + archive.getName());
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManagerException.java b/azkaban-common/src/main/java/azkaban/project/ProjectManagerException.java
index 6ad88bf..fe1b50a 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManagerException.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManagerException.java
@@ -16,7 +16,10 @@
 
 package azkaban.project;
 
-public class ProjectManagerException extends Exception {
+import azkaban.spi.AzkabanException;
+
+
+public class ProjectManagerException extends AzkabanException {
   private static final long serialVersionUID = 1L;
 
   public ProjectManagerException(String message) {
diff --git a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
index 583fa78..3149d9c 100644
--- a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
@@ -17,9 +17,10 @@
 
 package azkaban.storage;
 
-import azkaban.project.JdbcProjectLoader;
+import azkaban.project.ProjectLoader;
 import azkaban.spi.Storage;
 import azkaban.spi.StorageMetadata;
+import java.io.File;
 import java.io.InputStream;
 import java.net.URI;
 import javax.inject.Inject;
@@ -33,18 +34,26 @@ import javax.inject.Inject;
  */
 public class DatabaseStorage implements Storage {
 
+  private final ProjectLoader projectLoader;
+
   @Inject
-  public DatabaseStorage(JdbcProjectLoader jdbcProjectLoader) {
+  public DatabaseStorage(ProjectLoader projectLoader) {
 
+    this.projectLoader = projectLoader;
   }
 
   @Override
   public InputStream get(URI key) {
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public URI put(StorageMetadata metadata, InputStream is) {
+  public URI put(StorageMetadata metadata, File localFile) {
+    projectLoader.uploadProjectFile(
+        metadata.getProjectId(),
+        metadata.getVersion(),
+        localFile, metadata.getUploader());
+
     return null;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
index a71f131..a1a709e 100644
--- a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
@@ -21,6 +21,7 @@ import azkaban.spi.Storage;
 import azkaban.spi.StorageException;
 import azkaban.spi.StorageMetadata;
 import azkaban.utils.FileIOUtils;
+import com.google.common.io.Files;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -56,14 +57,15 @@ public class LocalStorage implements Storage {
   }
 
   @Override
-  public URI put(StorageMetadata metadata, InputStream is) {
+  public URI put(StorageMetadata metadata, File localFile) {
 
-    final File projectDir = new File(baseDirectory, metadata.getProjectId());
+    final File projectDir = new File(baseDirectory, String.valueOf(metadata.getProjectId()));
     if (projectDir.mkdir()) {
       log.info("Created project dir: " + projectDir.getAbsolutePath());
     }
 
-    final File targetFile = new File(projectDir, metadata.getVersion() + "." + metadata.getExtension());
+    final File targetFile = new File(projectDir,
+        metadata.getVersion() + "." + Files.getFileExtension(localFile.getName()));
 
     if (targetFile.exists()) {
       throw new StorageException(String.format(
@@ -71,7 +73,7 @@ public class LocalStorage implements Storage {
           targetFile, metadata));
     }
     try {
-      FileUtils.copyInputStreamToFile(is, targetFile);
+      FileUtils.copyFile(localFile, targetFile);
     } catch (IOException e) {
       log.error("LocalStorage error in put(): Metadata: " + metadata);
       throw new StorageException(e);
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index 6d0f714..d09e5f5 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -18,9 +18,12 @@
 package azkaban.storage;
 
 import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManagerException;
 import azkaban.spi.Storage;
 import azkaban.spi.StorageException;
 import azkaban.spi.StorageMetadata;
+import azkaban.user.User;
 import com.google.inject.Inject;
 import java.io.File;
 import java.io.FileInputStream;
@@ -47,41 +50,25 @@ public class StorageManager {
   /**
    * API to a project file into Azkaban Storage
    *
-   * @param project           project ID
-   * @param fileExtension     extension of the file
-   * @param filename          name of the file
+   * TODO clean up interface
+   *
+   * @param project           project
+   * @param version           The new version to be uploaded
    * @param localFile         local file
    * @param uploader          the user who uploaded
    */
   public void uploadProject(
       Project project,
-      String fileExtension,
-      String filename,
+      int version,
       File localFile,
-      String uploader) {
+      User uploader) {
     final StorageMetadata metadata = new StorageMetadata(
-        String.valueOf(project.getId()),
-        String.valueOf(getLatestVersion(project)),
-        fileExtension
+        project.getId(),
+        version,
+        uploader.getUserId()
     );
-    log.info(String.format(
-        "Uploading project. Uploader: %s, Metadata:%s, filename: %s[%d bytes]",
-        uploader, metadata, filename, localFile.length()
-    ));
-    try {
-      uploadProject(metadata, new FileInputStream(localFile));
-    } catch (FileNotFoundException e) {
-      throw new StorageException(e);
-    }
-  }
-
-  private int getLatestVersion(Project project) {
-    // TODO Implement
-    return -1;
-  }
-
-  public void uploadProject(StorageMetadata metadata, InputStream is) {
-    // TODO Implement
-    URI key = storage.put(metadata, is);
+    log.info(String.format("Adding archive to storage. Meta:%s File: %s[%d bytes]",
+        metadata, localFile.getName(), localFile.length()));
+    storage.put(metadata, localFile);
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
index 67efed9..fb0543b 100644
--- a/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/JdbcProjectLoaderTest.java
@@ -537,10 +537,9 @@ public class JdbcProjectLoaderTest {
     Assert.assertEquals("Project description", projectDescription,
         project.getDescription());
 
-    File testDir = new File("unit/project/testjob/testjob.zip");
+    File testFile = new File("unit/project/testjob/testjob.zip");
 
-    loader.uploadProjectFile(project, 1, "zip", "testjob.zip", testDir,
-        user.getUserId());
+    loader.uploadProjectFile(project.getId(), 1, testFile, user.getUserId());
 
     ProjectFileHandler handler = loader.getUploadedFile(project, 1);
     Assert.assertEquals(handler.getProjectId(), project.getId());
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 6d13146..1e1d37d 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -116,8 +116,7 @@ public class MockProjectLoader implements ProjectLoader {
   }
 
   @Override
-  public void uploadProjectFile(Project project, int version, String filetype,
-      String filename, File localFile, String user)
+  public void uploadProjectFile(int projectId, int version, File localFile, String user)
       throws ProjectManagerException {
     // TODO Auto-generated method stub
 
diff --git a/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java b/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
index ca731d3..6a1674b 100644
--- a/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/ProjectManagerTest.java
@@ -1,5 +1,6 @@
 package azkaban.project;
 
+import azkaban.storage.StorageManager;
 import azkaban.user.User;
 import azkaban.utils.Props;
 import java.io.File;
@@ -15,6 +16,7 @@ import org.mockito.stubbing.Answer;
 public class ProjectManagerTest {
   private ProjectManager manager;
   private ProjectLoader loader;
+  private StorageManager storageManager;
   private User user;
   private static final String PROJECT_NAME = "myTest";
   private static final String PROJECT_NAME_2 = "myTest_2";
@@ -30,7 +32,8 @@ public class ProjectManagerTest {
   public void setUp() throws Exception {
     Props props = new Props();
     loader = mock(ProjectLoader.class);
-    manager = new ProjectManager(loader, props);
+    storageManager = mock(StorageManager.class);
+    manager = new ProjectManager(loader, storageManager, props);
     user = new User(TEST_USER);
     Project project1 = new Project(PROJECT_ID, PROJECT_NAME);
     project1.setDescription(PROJECT_DESCRIPTION);
@@ -116,18 +119,19 @@ public class ProjectManagerTest {
   public void testUploadProject() throws Exception {
     System.out.println("TestUploadProject");
     Project project = manager.createProject(PROJECT_NAME, PROJECT_DESCRIPTION, user);
-    File testDir = new File(this.getClass().getClassLoader().getResource("project/testjob/testjob.zip").getFile());
-    System.out.println("Uploading zip file: " + testDir.getAbsolutePath());
+    File testFile = new File(this.getClass().getClassLoader().getResource("project/testjob/testjob.zip").getFile());
+    System.out.println("Uploading zip file: " + testFile.getAbsolutePath());
     Props props = new Props();
-    manager.uploadProject(project, testDir, FILE_TYPE, user, props);
-    verify(loader).uploadProjectFile(project, PROJECT_VERSION + 1, FILE_TYPE, testDir.getName(),
-        testDir, user.getUserId());
+    manager.uploadProject(project, testFile, FILE_TYPE, user, props);
+
+    verify(storageManager).uploadProject(project, PROJECT_VERSION + 1, testFile, user);
+
     verify(loader).uploadFlows(eq(project), eq(PROJECT_VERSION + 1), anyCollection());
     verify(loader).changeProjectVersion(project, PROJECT_VERSION + 1, user.getUserId());
     //uploadProjectProperties should be called twice, one for jobProps, the other for propProps
     verify(loader, times(2)).uploadProjectProperties(eq(project), anyList());
     verify(loader).postEvent(project, ProjectLogEvent.EventType.UPLOADED, user.getUserId(),
-        "Uploaded project files zip " + testDir.getName());
+        "Uploaded project files zip " + testFile.getName());
     verify(loader).cleanOlderProjectVersion(project.getId(), PROJECT_VERSION + 1 - PROJECT_VERSION_RETENTIION);
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java
new file mode 100644
index 0000000..5e61869
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.storage;
+
+import azkaban.project.ProjectLoader;
+import azkaban.spi.StorageMetadata;
+import java.io.File;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class DatabaseStorageTest {
+  private final ProjectLoader projectLoader = mock(ProjectLoader.class);
+  private final DatabaseStorage databaseStorage = new DatabaseStorage(projectLoader);
+
+  @Test
+  public void testPut() throws Exception {
+    final File file = mock(File.class);
+    int projectId = 1234;
+    int version = 1;
+    String uploader = "testuser";
+    final StorageMetadata metadata = new StorageMetadata(projectId, version, uploader);
+    databaseStorage.put(metadata, file);
+    verify(projectLoader).uploadProjectFile(projectId, version, file, uploader);
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
index f4ff2b4..6f4d029 100644
--- a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
+++ b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
@@ -19,7 +19,6 @@ package azkaban.storage;
 
 import azkaban.spi.StorageMetadata;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.InputStream;
 import java.net.URI;
 import org.apache.commons.io.FileUtils;
@@ -56,18 +55,16 @@ public class LocalStorageTest {
     ClassLoader classLoader = getClass().getClassLoader();
     File testFile = new File(classLoader.getResource(SAMPLE_FILE).getFile());
 
-    URI key;
-    try (InputStream is = new FileInputStream(testFile)) {
-      // test put
-      key = localStorage.put(new StorageMetadata("testProjectId", "1", "zip"), is);
-    }
+    final StorageMetadata metadata = new StorageMetadata(1, 1, "testuser");
+    final URI key = localStorage.put(metadata, testFile);
     assertNotNull(key);
     log.info("Key URI: " + key);
 
     File expectedTargetFile = new File(BASE_DIRECTORY, new StringBuilder()
-        .append("testProjectId")
+        .append(metadata.getProjectId())
         .append(File.separator)
-        .append("1.zip")
+        .append(metadata.getVersion())
+        .append(".zip")
         .toString()
     );
     assertTrue(expectedTargetFile.exists());
diff --git a/azkaban-spi/src/main/java/azkaban/spi/Storage.java b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
index b8f4d94..24497d4 100644
--- a/azkaban-spi/src/main/java/azkaban/spi/Storage.java
+++ b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
@@ -17,6 +17,7 @@
 
 package azkaban.spi;
 
+import java.io.File;
 import java.io.InputStream;
 import java.net.URI;
 
@@ -45,11 +46,11 @@ public interface Storage {
    * Put an object and return a key.
    *
    * @param metadata Metadata related to the input stream
-   * @param is The input stream from which the value is read to the store. The value is read completely
+   * @param localFile Read data from a local file
    *
    * @return the URI of the data
    */
-  URI put(StorageMetadata metadata, InputStream is);
+  URI put(StorageMetadata metadata, File localFile);
 
   /**
    * Delete an object from Storage.
diff --git a/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java b/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
index 9ae84c9..b562344 100644
--- a/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
+++ b/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
@@ -23,14 +23,14 @@ import static java.util.Objects.*;
 
 
 public class StorageMetadata {
-  private final String projectId;
-  private final String version;
-  private final String extension;
+  private final int projectId;
+  private final int version;
+  private final String uploader;
 
-  public StorageMetadata(String projectId, String version, String extension) {
-    this.projectId = requireNonNull(projectId);
-    this.version = requireNonNull(version);
-    this.extension = requireNonNull(extension);
+  public StorageMetadata(int projectId, int version, String uploader) {
+    this.projectId = projectId;
+    this.version = version;
+    this.uploader = requireNonNull(uploader);
   }
 
   @Override
@@ -38,16 +38,16 @@ public class StorageMetadata {
     return "StorageMetadata{" + "projectId='" + projectId + '\'' + ", version='" + version + '\'' + '}';
   }
 
-  public String getProjectId() {
+  public int getProjectId() {
     return projectId;
   }
 
-  public String getVersion() {
+  public int getVersion() {
     return version;
   }
 
-  public String getExtension() {
-    return extension;
+  public String getUploader() {
+    return uploader;
   }
 
   @Override
@@ -61,11 +61,11 @@ public class StorageMetadata {
     StorageMetadata that = (StorageMetadata) o;
     return Objects.equals(projectId, that.projectId) &&
         Objects.equals(version, that.version) &&
-        Objects.equals(extension, that.extension);
+        Objects.equals(uploader, that.uploader);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(projectId, version, extension);
+    return Objects.hash(projectId, version, uploader);
   }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 8a9e4fb..3b650e6 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -16,8 +16,10 @@
 
 package azkaban.webapp;
 
+import azkaban.AzkabanCommonModule;
 import com.codahale.metrics.MetricRegistry;
 
+import com.google.inject.Guice;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -65,7 +67,6 @@ import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxTriggerManager;
 import azkaban.metrics.MetricsUtility;
-import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectManager;
 import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
@@ -108,6 +109,10 @@ import azkaban.metrics.MetricsManager;
 
 import com.linkedin.restli.server.RestliServlet;
 
+import static azkaban.ServiceProvider.*;
+import static java.util.Objects.*;
+
+
 /**
  * The Azkaban Jetty server class
  *
@@ -134,7 +139,6 @@ public class AzkabanWebServer extends AzkabanServer {
 
   private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
 
-  public static final String AZKABAN_HOME = "AZKABAN_HOME";
   public static final String DEFAULT_CONF_PATH = "conf";
   public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
   public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
@@ -159,23 +163,22 @@ public class AzkabanWebServer extends AzkabanServer {
   //queuedThreadPool is mainly used to monitor jetty threadpool.
   private QueuedThreadPool queuedThreadPool;
 
-  private UserManager userManager;
-  private ProjectManager projectManager;
-  // private ExecutorManagerAdapter executorManager;
-  private ExecutorManager executorManager;
-  private ScheduleManager scheduleManager;
-  private TriggerManager triggerManager;
-  private Map<String, Alerter> alerters;
+  private final UserManager userManager;
+  private final ProjectManager projectManager;
+  private final ExecutorManager executorManager;
+  private final ScheduleManager scheduleManager;
+  private final TriggerManager triggerManager;
+  private final Map<String, Alerter> alerters;
 
   private final ClassLoader baseClassLoader;
 
-  private Props props;
-  private SessionCache sessionCache;
-  private File tempDir;
-  private Map<String, TriggerPlugin> triggerPlugins;
+  private final Props props;
+  private final SessionCache sessionCache;
+  private final File tempDir;
+  private final List<ObjectName> registeredMBeans = new ArrayList<>();
 
+  private Map<String, TriggerPlugin> triggerPlugins;
   private MBeanServer mbeanServer;
-  private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
 
   public static AzkabanWebServer getInstance() {
     return app;
@@ -193,18 +196,22 @@ public class AzkabanWebServer extends AzkabanServer {
    * Constructor
    */
   public AzkabanWebServer(Server server, Props props) throws Exception {
-    this.props = props;
+    this.props = requireNonNull(props);
     this.server = server;
-    velocityEngine =
-        configureVelocityEngine(props
-            .getBoolean(VELOCITY_DEV_MODE_PARAM, false));
+
+    /* Initialize Guice Injector */
+    // TODO move this to a common static context.
+    SERVICE_PROVIDER.setInjector(Guice.createInjector(new AzkabanCommonModule(props)));
+
+    velocityEngine = configureVelocityEngine(props.getBoolean(VELOCITY_DEV_MODE_PARAM, false));
     sessionCache = new SessionCache(props);
     userManager = loadUserManager(props);
 
     alerters = loadAlerters(props);
 
     executorManager = loadExecutorManager(props);
-    projectManager = loadProjectManager(props);
+    // TODO remove hack. Move injection to constructor
+    projectManager = SERVICE_PROVIDER.getInstance(ProjectManager.class);
 
     triggerManager = loadTriggerManager(props);
     loadBuiltinCheckersAndActions();
@@ -267,17 +274,14 @@ public class AzkabanWebServer extends AzkabanServer {
 
   private UserManager loadUserManager(Props props) {
     Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
-    logger.info("Loading user manager class " + userManagerClass.getName());
-    UserManager manager = null;
-    if (userManagerClass != null
-        && userManagerClass.getConstructors().length > 0) {
+    UserManager manager;
+    if (userManagerClass != null && userManagerClass.getConstructors().length > 0) {
+      logger.info("Loading user manager class " + userManagerClass.getName());
       try {
-        Constructor<?> userManagerConstructor =
-            userManagerClass.getConstructor(Props.class);
+        Constructor<?> userManagerConstructor = userManagerClass.getConstructor(Props.class);
         manager = (UserManager) userManagerConstructor.newInstance(props);
       } catch (Exception e) {
-        logger.error("Could not instantiate UserManager "
-            + userManagerClass.getName());
+        logger.error("Could not instantiate UserManager " + userManagerClass.getName());
         throw new RuntimeException(e);
       }
     } else {
@@ -286,13 +290,6 @@ public class AzkabanWebServer extends AzkabanServer {
     return manager;
   }
 
-  private ProjectManager loadProjectManager(Props props) {
-    logger.info("Loading JDBC for project management");
-    JdbcProjectLoader loader = new JdbcProjectLoader(props);
-    ProjectManager manager = new ProjectManager(loader, props);
-    return manager;
-  }
-
   private ExecutorManager loadExecutorManager(Props props) throws Exception {
     JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
     ExecutorManager execManager = new ExecutorManager(props, loader, alerters);