azkaban-aplcache

Storage schema changes and implementation. Working Local

4/27/2017 5:21:48 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 7a78873..b7d165d 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -360,7 +360,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
     try {
       /* Update DB with new project info */
-      addProjectToProjectVersions(connection, projectId, version, localFile, uploader);
+      addProjectToProjectVersions(connection, projectId, version, localFile, uploader, null);
 
       uploadProjectFile(connection, projectId, version, localFile);
 
@@ -385,6 +385,22 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     updateChunksInProjectVersions(connection, projectId, version, chunks);
   }
 
+  public void addProjectVersion(
+      int projectId,
+      int version,
+      File localFile,
+      String uploader,
+      String uri) throws ProjectManagerException {
+    try (Connection connection = getConnection()) {
+      addProjectToProjectVersions(connection, projectId, version, localFile, uploader, uri);
+      connection.commit();
+    } catch (SQLException e) {
+      logger.error(e);
+      throw new ProjectManagerException(String.format("Add ProjectVersion failed. project id: %d version: %d",
+          projectId, version), e);
+    }
+  }
+
   /**
    * Insert a new version record to TABLE project_versions before uploading files.
    *
@@ -408,7 +424,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       int projectId,
       int version,
       File localFile,
-      String uploader) throws ProjectManagerException {
+      String uploader,
+      String uri) throws ProjectManagerException {
     final long updateTime = System.currentTimeMillis();
     QueryRunner runner = new QueryRunner();
     logger.info("Creating message digest for upload " + localFile.getName());
@@ -422,11 +439,11 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     logger.info("Md5 hash created");
 
     final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
-        + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks) values "
-        + "(?,?,?,?,?,?,?,?)";
+        + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, uri) values "
+        + "(?,?,?,?,?,?,?,?,?)";
 
     try {
-      /**
+      /*
        * As we don't know the num_chunks before uploading the file, we initialize it to 0,
        * and will update it after uploading completes.
        */
@@ -439,7 +456,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
           Files.getFileExtension(localFile.getName()),
           localFile.getName(),
           md5,
-          0);
+          0,
+          uri);
     } catch (SQLException e) {
       String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
       logger.error(msg, e);
@@ -530,42 +548,41 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     return handler;
   }
 
-  private ProjectFileHandler getUploadedFile(Connection connection,
-      int projectId, int version) throws ProjectManagerException {
-    QueryRunner runner = new QueryRunner();
+  public ProjectFileHandler fetchProjectMetaData(int projectId, int version) {
     ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
 
-    List<ProjectFileHandler> projectFiles = null;
-    try {
-      projectFiles =
-          runner.query(connection,
-              ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler,
-              projectId, version);
+    try (Connection connection = getConnection()) {
+      List<ProjectFileHandler> projectFiles = new QueryRunner().query(connection,
+          ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler, projectId, version);
+      if (projectFiles == null || projectFiles.isEmpty()) {
+        return null;
+      }
+      return projectFiles.get(0);
     } catch (SQLException e) {
       logger.error(e);
-      throw new ProjectManagerException(
-          "Query for uploaded file for project id " + projectId + " failed.", e);
+      throw new ProjectManagerException("Query for uploaded file for project id " + projectId + " failed.", e);
     }
-    if (projectFiles == null || projectFiles.isEmpty()) {
+  }
+
+  private ProjectFileHandler getUploadedFile(Connection connection,
+      int projectId, int version) throws ProjectManagerException {
+    ProjectFileHandler projHandler = fetchProjectMetaData(projectId, version);
+    if (projHandler == null) {
       return null;
     }
-
-    ProjectFileHandler projHandler = projectFiles.get(0);
     int numChunks = projHandler.getNumChunks();
     BufferedOutputStream bStream = null;
-    File file = null;
+    File file;
     try {
       try {
-        file =
-            File.createTempFile(projHandler.getFileName(),
-                String.valueOf(version), tempDir);
-
+        file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
         bStream = new BufferedOutputStream(new FileOutputStream(file));
       } catch (IOException e) {
         throw new ProjectManagerException(
             "Error creating temp file for stream.");
       }
 
+      QueryRunner runner = new QueryRunner();
       int collect = 5;
       int fromChunk = 0;
       int toChunk = collect;
@@ -1517,10 +1534,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
   }
 
-  private static class ProjectVersionResultHandler implements
-      ResultSetHandler<List<ProjectFileHandler>> {
+  private static class ProjectVersionResultHandler implements ResultSetHandler<List<ProjectFileHandler>> {
     private static String SELECT_PROJECT_VERSION =
-        "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks FROM project_versions WHERE project_id=? AND version=?";
+        "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, uri "
+            + "FROM project_versions WHERE project_id=? AND version=?";
 
     @Override
     public List<ProjectFileHandler> handle(ResultSet rs) throws SQLException {
@@ -1538,10 +1555,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
         String fileName = rs.getString(6);
         byte[] md5 = rs.getBytes(7);
         int numChunks = rs.getInt(8);
+        String uri = rs.getString(9);
 
         ProjectFileHandler handler =
-            new ProjectFileHandler(projectId, version, uploadTime, uploader,
-                fileType, fileName, numChunks, md5);
+            new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5, uri);
 
         handlers.add(handler);
       } while (rs.next());
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java b/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
index 9412bee..09e0c7b 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
@@ -22,24 +22,34 @@ public class ProjectFileHandler {
   private final int projectId;
   private final int version;
   private final long uploadTime;
-  private String fileType;
-  private String fileName;
-  private String uploader;
-  private byte[] md5Hash;
-  private int numChunks;
+  private final String fileType;
+  private final String fileName;
+  private final String uploader;
+  private final byte[] md5Hash;
+  private final int numChunks;
+  private final String uri;
+
   private File localFile = null;
 
-  public ProjectFileHandler(int projectId, int version, long uploadTime,
-      String uploader, String fileType, String fileName, int numChunks,
-      byte[] md5Hash) {
+  public ProjectFileHandler(
+      int projectId,
+      int version,
+      long uploadTime,
+      String uploader,
+      String fileType,
+      String fileName,
+      int numChunks,
+      byte[] md5Hash,
+      String uri) {
     this.projectId = projectId;
     this.version = version;
     this.uploadTime = uploadTime;
-    this.setUploader(uploader);
-    this.setFileType(fileType);
-    this.setFileName(fileName);
-    this.setMd5Hash(md5Hash);
-    this.setNumChunks(numChunks);
+    this.uploader = uploader;
+    this.fileType = fileType;
+    this.fileName = fileName;
+    this.md5Hash = md5Hash;
+    this.numChunks = numChunks;
+    this.uri = uri;
   }
 
   public int getProjectId() {
@@ -58,26 +68,14 @@ public class ProjectFileHandler {
     return fileType;
   }
 
-  public void setFileType(String fileType) {
-    this.fileType = fileType;
-  }
-
   public String getFileName() {
     return fileName;
   }
 
-  public void setFileName(String fileName) {
-    this.fileName = fileName;
-  }
-
   public byte[] getMd5Hash() {
     return md5Hash;
   }
 
-  public void setMd5Hash(byte[] md5Hash) {
-    this.md5Hash = md5Hash;
-  }
-
   public File getLocalFile() {
     return localFile;
   }
@@ -87,9 +85,7 @@ public class ProjectFileHandler {
   }
 
   public synchronized void deleteLocalFile() {
-    if (localFile == null) {
-      return;
-    } else {
+    if (localFile != null) {
       localFile.delete();
       localFile = null;
     }
@@ -99,16 +95,11 @@ public class ProjectFileHandler {
     return uploader;
   }
 
-  public void setUploader(String uploader) {
-    this.uploader = uploader;
-  }
-
   public int getNumChunks() {
     return numChunks;
   }
 
-  public void setNumChunks(int numChunks) {
-    this.numChunks = numChunks;
+  public String getUri() {
+    return uri;
   }
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 0372773..e2fcf09 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -135,6 +135,29 @@ public interface ProjectLoader {
       throws ProjectManagerException;
 
   /**
+   * Add project and version info to the project_versions table. This current maintains the metadata for each uploaded
+   * version of the project
+   *
+   * @param projectId
+   * @param version
+   * @param localFile
+   * @param uploader
+   * @param uri
+   * @throws ProjectManagerException
+   */
+  void addProjectVersion(int projectId, int version, File localFile, String uploader, String uri)
+      throws ProjectManagerException;
+
+  /**
+   * Fetch project metadata from project_versions table
+   *
+   * @param projectId project ID
+   * @param version version
+   * @return ProjectFileHandler object containing the metadata
+   */
+  ProjectFileHandler fetchProjectMetaData(int projectId, int version);
+
+  /**
    * Get file that's uploaded.
    *
    * @return
diff --git a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
index 972ffe1..6b0478c 100644
--- a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
@@ -34,14 +34,10 @@ import javax.inject.Inject;
  * behavior of Azkaban.
  */
 public class DatabaseStorage implements Storage {
-  public static final String PROJECT_ID = "projectId";
-  public static final String VERSION = "version";
-
   private final ProjectLoader projectLoader;
 
   @Inject
   public DatabaseStorage(ProjectLoader projectLoader) {
-
     this.projectLoader = projectLoader;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index a04f189..9f3e10a 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -19,13 +19,27 @@ package azkaban.storage;
 
 import azkaban.project.Project;
 import azkaban.project.ProjectFileHandler;
+import azkaban.project.ProjectLoader;
 import azkaban.spi.Storage;
+import azkaban.spi.StorageException;
 import azkaban.spi.StorageMetadata;
 import azkaban.user.User;
+import azkaban.utils.Md5Hasher;
+import azkaban.utils.Props;
 import com.google.inject.Inject;
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 
+import static com.google.common.base.Preconditions.*;
+import static java.util.Objects.*;
+
 
 /**
  * StorageManager manages and coordinates all interactions with the Storage layer. This also includes bookkeeping
@@ -35,10 +49,23 @@ public class StorageManager {
   private static final Logger log = Logger.getLogger(StorageManager.class);
 
   private final Storage storage;
+  private final ProjectLoader projectLoader;
+  private final File tempDir;
 
   @Inject
-  public StorageManager(Storage storage) {
+  public StorageManager(Props props, Storage storage, ProjectLoader projectLoader) {
+    this.tempDir = new File(props.getString("project.temp.dir", "temp"));
     this.storage = storage;
+    this.projectLoader = projectLoader;
+
+    prepareTempDir();
+  }
+
+  private void prepareTempDir() {
+    if (!tempDir.exists()) {
+      tempDir.mkdirs();
+    }
+    checkArgument(tempDir.isDirectory());
   }
 
   /**
@@ -63,7 +90,23 @@ public class StorageManager {
     );
     log.info(String.format("Adding archive to storage. Meta:%s File: %s[%d bytes]",
         metadata, localFile.getName(), localFile.length()));
-    storage.put(metadata, localFile);
+
+    /* upload to storage */
+    URI uri = storage.put(metadata, localFile);
+
+    /* Add metadata to db */
+    // TODO spyne: remove hack. Database storage should go through the same flow
+    if (!(storage instanceof DatabaseStorage)) {
+      projectLoader.addProjectVersion(
+          project.getId(),
+          version,
+          localFile,
+          uploader.getUserId(),
+          uri.toString()
+      );
+      log.info(String.format("Added project metadata to DB. Meta:%s File: %s[%d bytes] URI: %s",
+          metadata, localFile.getName(), localFile.length(), uri));
+    }
   }
 
   /**
@@ -79,6 +122,44 @@ public class StorageManager {
     if (storage instanceof DatabaseStorage) {
       return ((DatabaseStorage) storage).get(projectId, version);
     }
-    throw new UnsupportedOperationException("Operation currently unsupported for other types.");
+
+    /* Fetch meta data from db */
+    final ProjectFileHandler pfh = projectLoader.fetchProjectMetaData(projectId, version);
+
+    /* Fetch project file from storage and copy to local file */
+    final String uri = requireNonNull(pfh.getUri(), String.format("URI is null. project ID: %d version: %d",
+        pfh.getProjectId(), pfh.getVersion()));
+    try (InputStream is = storage.get(new URI(uri))){
+      final File file = createTempOutputFile(pfh);
+
+      /* Copy from storage to output stream */
+      try (FileOutputStream fos = new FileOutputStream(file)) {
+        IOUtils.copy(is, fos);
+      }
+
+      /* Validate checksum */
+      validateChecksum(file, pfh);
+
+      /* Attach file to handler */
+      pfh.setLocalFile(file);
+
+      return pfh;
+    } catch (URISyntaxException | IOException e) {
+      throw new StorageException(e);
+    }
+  }
+
+  private void validateChecksum(File file, ProjectFileHandler pfh) throws IOException {
+    final byte[] hash = Md5Hasher.md5Hash(file);
+    checkState(Arrays.equals(pfh.getMd5Hash(), hash),
+        String.format("MD5 HASH Failed. project ID: %d version: %d Expected: %s Actual: %s",
+            pfh.getProjectId(), pfh.getVersion(), new String(pfh.getMd5Hash()), new String(hash))
+    );
+  }
+
+  private File createTempOutputFile(ProjectFileHandler projectFileHandler) throws IOException {
+    return File.createTempFile(
+        projectFileHandler.getFileName(),
+        String.valueOf(projectFileHandler.getVersion()), tempDir);
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 062d880..42498cf 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -123,6 +123,17 @@ public class MockProjectLoader implements ProjectLoader {
   }
 
   @Override
+  public void addProjectVersion(int projectId, int version, File localFile, String uploader, String uri)
+      throws ProjectManagerException {
+
+  }
+
+  @Override
+  public ProjectFileHandler fetchProjectMetaData(int projectId, int version) {
+    return null;
+  }
+
+  @Override
   public ProjectFileHandler getUploadedFile(int projectId, int version)
       throws ProjectManagerException {
     // TODO Auto-generated method stub
diff --git a/azkaban-sql/src/sql/create.project_versions.sql b/azkaban-sql/src/sql/create.project_versions.sql
index 8b80387..8336137 100644
--- a/azkaban-sql/src/sql/create.project_versions.sql
+++ b/azkaban-sql/src/sql/create.project_versions.sql
@@ -7,6 +7,7 @@ CREATE TABLE project_versions (
 	file_name VARCHAR(128),
 	md5 BINARY(16),
 	num_chunks INT,
+	uri VARCHAR(512) DEFAULT NULL,
 	PRIMARY KEY (project_id, version)
 );