azkaban-aplcache

Changing Storage API key from URI to String (#1051) Motivation:

5/3/2017 8:40:33 PM
3.22.0

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 03dfa00..91c2f59 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -391,9 +391,9 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       int version,
       File localFile,
       String uploader,
-      String uri) throws ProjectManagerException {
+      String resourceId) throws ProjectManagerException {
     try (Connection connection = getConnection()) {
-      addProjectToProjectVersions(connection, projectId, version, localFile, uploader, uri);
+      addProjectToProjectVersions(connection, projectId, version, localFile, uploader, resourceId);
       connection.commit();
     } catch (SQLException e) {
       logger.error(e);
@@ -426,7 +426,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       int version,
       File localFile,
       String uploader,
-      String uri) throws ProjectManagerException {
+      String resourceId) throws ProjectManagerException {
     final long updateTime = System.currentTimeMillis();
     QueryRunner runner = new QueryRunner();
     logger.info("Creating message digest for upload " + localFile.getName());
@@ -440,7 +440,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     logger.info("Md5 hash created");
 
     final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
-        + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, uri) values "
+        + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id) values "
         + "(?,?,?,?,?,?,?,?,?)";
 
     try {
@@ -458,7 +458,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
           localFile.getName(),
           md5,
           0,
-          uri);
+          resourceId);
     } catch (SQLException e) {
       String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
       logger.error(msg, e);
@@ -1539,7 +1539,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
   private static class ProjectVersionResultHandler implements ResultSetHandler<List<ProjectFileHandler>> {
     private static String SELECT_PROJECT_VERSION =
-        "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, uri "
+        "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id "
             + "FROM project_versions WHERE project_id=? AND version=?";
 
     @Override
@@ -1558,10 +1558,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
         String fileName = rs.getString(6);
         byte[] md5 = rs.getBytes(7);
         int numChunks = rs.getInt(8);
-        String uri = rs.getString(9);
+        String resourceId = rs.getString(9);
 
-        ProjectFileHandler handler =
-            new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5, uri);
+        ProjectFileHandler handler = new ProjectFileHandler(
+            projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5, resourceId);
 
         handlers.add(handler);
       } while (rs.next());
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java b/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
index 09e0c7b..b1edeb0 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
@@ -27,7 +27,7 @@ public class ProjectFileHandler {
   private final String uploader;
   private final byte[] md5Hash;
   private final int numChunks;
-  private final String uri;
+  private final String resourceId;
 
   private File localFile = null;
 
@@ -40,7 +40,7 @@ public class ProjectFileHandler {
       String fileName,
       int numChunks,
       byte[] md5Hash,
-      String uri) {
+      String resourceId) {
     this.projectId = projectId;
     this.version = version;
     this.uploadTime = uploadTime;
@@ -49,7 +49,7 @@ public class ProjectFileHandler {
     this.fileName = fileName;
     this.md5Hash = md5Hash;
     this.numChunks = numChunks;
-    this.uri = uri;
+    this.resourceId = resourceId;
   }
 
   public int getProjectId() {
@@ -99,7 +99,7 @@ public class ProjectFileHandler {
     return numChunks;
   }
 
-  public String getUri() {
-    return uri;
+  public String getResourceId() {
+    return resourceId;
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index e2fcf09..c03e97c 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -142,10 +142,10 @@ public interface ProjectLoader {
    * @param version
    * @param localFile
    * @param uploader
-   * @param uri
+   * @param resourceId
    * @throws ProjectManagerException
    */
-  void addProjectVersion(int projectId, int version, File localFile, String uploader, String uri)
+  void addProjectVersion(int projectId, int version, File localFile, String uploader, String resourceId)
       throws ProjectManagerException;
 
   /**
diff --git a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
index 6b0478c..3d71771 100644
--- a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
@@ -23,7 +23,6 @@ import azkaban.spi.Storage;
 import azkaban.spi.StorageMetadata;
 import java.io.File;
 import java.io.InputStream;
-import java.net.URI;
 import javax.inject.Inject;
 
 
@@ -42,7 +41,7 @@ public class DatabaseStorage implements Storage {
   }
 
   @Override
-  public InputStream get(URI key) {
+  public InputStream get(String key) {
     throw new UnsupportedOperationException("Not implemented yet. Use get(projectId, version) instead");
   }
 
@@ -51,7 +50,7 @@ public class DatabaseStorage implements Storage {
   }
 
   @Override
-  public URI put(StorageMetadata metadata, File localFile) {
+  public String put(StorageMetadata metadata, File localFile) {
     projectLoader.uploadProjectFile(
         metadata.getProjectId(),
         metadata.getVersion(),
@@ -61,7 +60,7 @@ public class DatabaseStorage implements Storage {
   }
 
   @Override
-  public boolean delete(URI key) {
+  public boolean delete(String key) {
     throw new UnsupportedOperationException("Delete is not supported");
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java b/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
index 3d2bf13..2157aaa 100644
--- a/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
@@ -22,7 +22,6 @@ import azkaban.spi.StorageMetadata;
 import com.google.inject.Inject;
 import java.io.File;
 import java.io.InputStream;
-import java.net.URI;
 
 
 public class HdfsStorage implements Storage {
@@ -33,17 +32,17 @@ public class HdfsStorage implements Storage {
   }
 
   @Override
-  public InputStream get(URI key) {
+  public InputStream get(String key) {
     throw new UnsupportedOperationException("Method not implemented");
   }
 
   @Override
-  public URI put(StorageMetadata metadata, File localFile) {
+  public String put(StorageMetadata metadata, File localFile) {
     throw new UnsupportedOperationException("Method not implemented");
   }
 
   @Override
-  public boolean delete(URI key) {
+  public boolean delete(String key) {
     throw new UnsupportedOperationException("Method not implemented");
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
index a1a709e..7df7e6b 100644
--- a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
@@ -27,7 +27,6 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.URI;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
@@ -43,21 +42,20 @@ public class LocalStorage implements Storage {
     this.baseDirectory = validateBaseDirectory(createIfDoesNotExist(baseDirectory));
   }
 
+  /**
+   * @param key Relative path of the file from the baseDirectory
+   */
   @Override
-  public InputStream get(URI key) {
+  public InputStream get(String key) {
     try {
-      return new FileInputStream(getFile(key));
+      return new FileInputStream(new File(baseDirectory, key));
     } catch (FileNotFoundException e) {
       return null;
     }
   }
 
-  private File getFile(URI key) {
-    return new File(baseDirectory, key.getPath());
-  }
-
   @Override
-  public URI put(StorageMetadata metadata, File localFile) {
+  public String put(StorageMetadata metadata, File localFile) {
 
     final File projectDir = new File(baseDirectory, String.valueOf(metadata.getProjectId()));
     if (projectDir.mkdir()) {
@@ -78,15 +76,15 @@ public class LocalStorage implements Storage {
       log.error("LocalStorage error in put(): Metadata: " + metadata);
       throw new StorageException(e);
     }
-    return createRelativeURI(targetFile);
+    return createRelativePath(targetFile);
   }
 
-  private URI createRelativeURI(File targetFile) {
-    return baseDirectory.toURI().relativize(targetFile.toURI());
+  private String createRelativePath(File targetFile) {
+    return baseDirectory.toURI().relativize(targetFile.toURI()).getPath();
   }
 
   @Override
-  public boolean delete(URI key) {
+  public boolean delete(String key) {
     throw new UnsupportedOperationException("delete has not been implemented.");
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index 9f3e10a..d1be9f7 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -31,8 +31,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Arrays;
 import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
@@ -92,7 +90,7 @@ public class StorageManager {
         metadata, localFile.getName(), localFile.length()));
 
     /* upload to storage */
-    URI uri = storage.put(metadata, localFile);
+    final String resourceId = storage.put(metadata, localFile);
 
     /* Add metadata to db */
     // TODO spyne: remove hack. Database storage should go through the same flow
@@ -102,10 +100,10 @@ public class StorageManager {
           version,
           localFile,
           uploader.getUserId(),
-          uri.toString()
+          resourceId
       );
       log.info(String.format("Added project metadata to DB. Meta:%s File: %s[%d bytes] URI: %s",
-          metadata, localFile.getName(), localFile.length(), uri));
+          metadata, localFile.getName(), localFile.length(), resourceId));
     }
   }
 
@@ -127,9 +125,9 @@ public class StorageManager {
     final ProjectFileHandler pfh = projectLoader.fetchProjectMetaData(projectId, version);
 
     /* Fetch project file from storage and copy to local file */
-    final String uri = requireNonNull(pfh.getUri(), String.format("URI is null. project ID: %d version: %d",
+    final String resourceId = requireNonNull(pfh.getResourceId(), String.format("URI is null. project ID: %d version: %d",
         pfh.getProjectId(), pfh.getVersion()));
-    try (InputStream is = storage.get(new URI(uri))){
+    try (InputStream is = storage.get(resourceId)){
       final File file = createTempOutputFile(pfh);
 
       /* Copy from storage to output stream */
@@ -144,7 +142,7 @@ public class StorageManager {
       pfh.setLocalFile(file);
 
       return pfh;
-    } catch (URISyntaxException | IOException e) {
+    } catch (IOException e) {
       throw new StorageException(e);
     }
   }
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 42498cf..463e2c0 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -123,7 +123,7 @@ public class MockProjectLoader implements ProjectLoader {
   }
 
   @Override
-  public void addProjectVersion(int projectId, int version, File localFile, String uploader, String uri)
+  public void addProjectVersion(int projectId, int version, File localFile, String uploader, String resourceId)
       throws ProjectManagerException {
 
   }
diff --git a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
index 6f4d029..61cd1b6 100644
--- a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
+++ b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
@@ -56,7 +56,7 @@ public class LocalStorageTest {
     File testFile = new File(classLoader.getResource(SAMPLE_FILE).getFile());
 
     final StorageMetadata metadata = new StorageMetadata(1, 1, "testuser");
-    final URI key = localStorage.put(metadata, testFile);
+    final String key = localStorage.put(metadata, testFile);
     assertNotNull(key);
     log.info("Key URI: " + key);
 
diff --git a/azkaban-db/src/main/sql/create.project_versions.sql b/azkaban-db/src/main/sql/create.project_versions.sql
index 8336137..3356559 100644
--- a/azkaban-db/src/main/sql/create.project_versions.sql
+++ b/azkaban-db/src/main/sql/create.project_versions.sql
@@ -7,7 +7,7 @@ CREATE TABLE project_versions (
 	file_name VARCHAR(128),
 	md5 BINARY(16),
 	num_chunks INT,
-	uri VARCHAR(512) DEFAULT NULL,
+	resource_id VARCHAR(512) DEFAULT NULL,
 	PRIMARY KEY (project_id, version)
 );
 
diff --git a/azkaban-spi/src/main/java/azkaban/spi/Storage.java b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
index 24497d4..f61af34 100644
--- a/azkaban-spi/src/main/java/azkaban/spi/Storage.java
+++ b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
@@ -19,7 +19,6 @@ package azkaban.spi;
 
 import java.io.File;
 import java.io.InputStream;
-import java.net.URI;
 
 
 /**
@@ -36,11 +35,11 @@ public interface Storage {
   /**
    * Get an InputStream object by providing a key.
    *
-   * @param key The key is a URI pointing to the blob in Storage.
+   * @param key The key is a string pointing to the blob in Storage.
    * @return InputStream for fetching the blob. null if the key is not found.
    *
    */
-  InputStream get(URI key);
+  InputStream get(String key);
 
   /**
    * Put an object and return a key.
@@ -48,15 +47,15 @@ public interface Storage {
    * @param metadata Metadata related to the input stream
    * @param localFile Read data from a local file
    *
-   * @return the URI of the data
+   * @return Key associated with the current object on successful put
    */
-  URI put(StorageMetadata metadata, File localFile);
+  String put(StorageMetadata metadata, File localFile);
 
   /**
    * Delete an object from Storage.
    *
-   * @param key The key is a URI pointing to the blob in Storage.
+   * @param key The key is a string pointing to the blob in Storage.
    * @return true if delete was successful. false if there was nothing to delete.
    */
-  boolean delete(URI key);
+  boolean delete(String key);
 }