Details
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 03dfa00..91c2f59 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -391,9 +391,9 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
int version,
File localFile,
String uploader,
- String uri) throws ProjectManagerException {
+ String resourceId) throws ProjectManagerException {
try (Connection connection = getConnection()) {
- addProjectToProjectVersions(connection, projectId, version, localFile, uploader, uri);
+ addProjectToProjectVersions(connection, projectId, version, localFile, uploader, resourceId);
connection.commit();
} catch (SQLException e) {
logger.error(e);
@@ -426,7 +426,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
int version,
File localFile,
String uploader,
- String uri) throws ProjectManagerException {
+ String resourceId) throws ProjectManagerException {
final long updateTime = System.currentTimeMillis();
QueryRunner runner = new QueryRunner();
logger.info("Creating message digest for upload " + localFile.getName());
@@ -440,7 +440,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
logger.info("Md5 hash created");
final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
- + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, uri) values "
+ + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id) values "
+ "(?,?,?,?,?,?,?,?,?)";
try {
@@ -458,7 +458,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
localFile.getName(),
md5,
0,
- uri);
+ resourceId);
} catch (SQLException e) {
String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
logger.error(msg, e);
@@ -1539,7 +1539,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
private static class ProjectVersionResultHandler implements ResultSetHandler<List<ProjectFileHandler>> {
private static String SELECT_PROJECT_VERSION =
- "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, uri "
+ "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id "
+ "FROM project_versions WHERE project_id=? AND version=?";
@Override
@@ -1558,10 +1558,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
String fileName = rs.getString(6);
byte[] md5 = rs.getBytes(7);
int numChunks = rs.getInt(8);
- String uri = rs.getString(9);
+ String resourceId = rs.getString(9);
- ProjectFileHandler handler =
- new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5, uri);
+ ProjectFileHandler handler = new ProjectFileHandler(
+ projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5, resourceId);
handlers.add(handler);
} while (rs.next());
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java b/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
index 09e0c7b..b1edeb0 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
@@ -27,7 +27,7 @@ public class ProjectFileHandler {
private final String uploader;
private final byte[] md5Hash;
private final int numChunks;
- private final String uri;
+ private final String resourceId;
private File localFile = null;
@@ -40,7 +40,7 @@ public class ProjectFileHandler {
String fileName,
int numChunks,
byte[] md5Hash,
- String uri) {
+ String resourceId) {
this.projectId = projectId;
this.version = version;
this.uploadTime = uploadTime;
@@ -49,7 +49,7 @@ public class ProjectFileHandler {
this.fileName = fileName;
this.md5Hash = md5Hash;
this.numChunks = numChunks;
- this.uri = uri;
+ this.resourceId = resourceId;
}
public int getProjectId() {
@@ -99,7 +99,7 @@ public class ProjectFileHandler {
return numChunks;
}
- public String getUri() {
- return uri;
+ public String getResourceId() {
+ return resourceId;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index e2fcf09..c03e97c 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -142,10 +142,10 @@ public interface ProjectLoader {
* @param version
* @param localFile
* @param uploader
- * @param uri
+ * @param resourceId
* @throws ProjectManagerException
*/
- void addProjectVersion(int projectId, int version, File localFile, String uploader, String uri)
+ void addProjectVersion(int projectId, int version, File localFile, String uploader, String resourceId)
throws ProjectManagerException;
/**
diff --git a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
index 6b0478c..3d71771 100644
--- a/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/DatabaseStorage.java
@@ -23,7 +23,6 @@ import azkaban.spi.Storage;
import azkaban.spi.StorageMetadata;
import java.io.File;
import java.io.InputStream;
-import java.net.URI;
import javax.inject.Inject;
@@ -42,7 +41,7 @@ public class DatabaseStorage implements Storage {
}
@Override
- public InputStream get(URI key) {
+ public InputStream get(String key) {
throw new UnsupportedOperationException("Not implemented yet. Use get(projectId, version) instead");
}
@@ -51,7 +50,7 @@ public class DatabaseStorage implements Storage {
}
@Override
- public URI put(StorageMetadata metadata, File localFile) {
+ public String put(StorageMetadata metadata, File localFile) {
projectLoader.uploadProjectFile(
metadata.getProjectId(),
metadata.getVersion(),
@@ -61,7 +60,7 @@ public class DatabaseStorage implements Storage {
}
@Override
- public boolean delete(URI key) {
+ public boolean delete(String key) {
throw new UnsupportedOperationException("Delete is not supported");
}
}
diff --git a/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java b/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
index 3d2bf13..2157aaa 100644
--- a/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
@@ -22,7 +22,6 @@ import azkaban.spi.StorageMetadata;
import com.google.inject.Inject;
import java.io.File;
import java.io.InputStream;
-import java.net.URI;
public class HdfsStorage implements Storage {
@@ -33,17 +32,17 @@ public class HdfsStorage implements Storage {
}
@Override
- public InputStream get(URI key) {
+ public InputStream get(String key) {
throw new UnsupportedOperationException("Method not implemented");
}
@Override
- public URI put(StorageMetadata metadata, File localFile) {
+ public String put(StorageMetadata metadata, File localFile) {
throw new UnsupportedOperationException("Method not implemented");
}
@Override
- public boolean delete(URI key) {
+ public boolean delete(String key) {
throw new UnsupportedOperationException("Method not implemented");
}
}
diff --git a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
index a1a709e..7df7e6b 100644
--- a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
@@ -27,7 +27,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URI;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
@@ -43,21 +42,20 @@ public class LocalStorage implements Storage {
this.baseDirectory = validateBaseDirectory(createIfDoesNotExist(baseDirectory));
}
+ /**
+ * @param key Relative path of the file from the baseDirectory
+ */
@Override
- public InputStream get(URI key) {
+ public InputStream get(String key) {
try {
- return new FileInputStream(getFile(key));
+ return new FileInputStream(new File(baseDirectory, key));
} catch (FileNotFoundException e) {
return null;
}
}
- private File getFile(URI key) {
- return new File(baseDirectory, key.getPath());
- }
-
@Override
- public URI put(StorageMetadata metadata, File localFile) {
+ public String put(StorageMetadata metadata, File localFile) {
final File projectDir = new File(baseDirectory, String.valueOf(metadata.getProjectId()));
if (projectDir.mkdir()) {
@@ -78,15 +76,15 @@ public class LocalStorage implements Storage {
log.error("LocalStorage error in put(): Metadata: " + metadata);
throw new StorageException(e);
}
- return createRelativeURI(targetFile);
+ return createRelativePath(targetFile);
}
- private URI createRelativeURI(File targetFile) {
- return baseDirectory.toURI().relativize(targetFile.toURI());
+ private String createRelativePath(File targetFile) {
+ return baseDirectory.toURI().relativize(targetFile.toURI()).getPath();
}
@Override
- public boolean delete(URI key) {
+ public boolean delete(String key) {
throw new UnsupportedOperationException("delete has not been implemented.");
}
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index 9f3e10a..d1be9f7 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -31,8 +31,6 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.Arrays;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
@@ -92,7 +90,7 @@ public class StorageManager {
metadata, localFile.getName(), localFile.length()));
/* upload to storage */
- URI uri = storage.put(metadata, localFile);
+ final String resourceId = storage.put(metadata, localFile);
/* Add metadata to db */
// TODO spyne: remove hack. Database storage should go through the same flow
@@ -102,10 +100,10 @@ public class StorageManager {
version,
localFile,
uploader.getUserId(),
- uri.toString()
+ resourceId
);
log.info(String.format("Added project metadata to DB. Meta:%s File: %s[%d bytes] URI: %s",
- metadata, localFile.getName(), localFile.length(), uri));
+ metadata, localFile.getName(), localFile.length(), resourceId));
}
}
@@ -127,9 +125,9 @@ public class StorageManager {
final ProjectFileHandler pfh = projectLoader.fetchProjectMetaData(projectId, version);
/* Fetch project file from storage and copy to local file */
- final String uri = requireNonNull(pfh.getUri(), String.format("URI is null. project ID: %d version: %d",
+ final String resourceId = requireNonNull(pfh.getResourceId(), String.format("URI is null. project ID: %d version: %d",
pfh.getProjectId(), pfh.getVersion()));
- try (InputStream is = storage.get(new URI(uri))){
+ try (InputStream is = storage.get(resourceId)){
final File file = createTempOutputFile(pfh);
/* Copy from storage to output stream */
@@ -144,7 +142,7 @@ public class StorageManager {
pfh.setLocalFile(file);
return pfh;
- } catch (URISyntaxException | IOException e) {
+ } catch (IOException e) {
throw new StorageException(e);
}
}
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 42498cf..463e2c0 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -123,7 +123,7 @@ public class MockProjectLoader implements ProjectLoader {
}
@Override
- public void addProjectVersion(int projectId, int version, File localFile, String uploader, String uri)
+ public void addProjectVersion(int projectId, int version, File localFile, String uploader, String resourceId)
throws ProjectManagerException {
}
diff --git a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
index 6f4d029..61cd1b6 100644
--- a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
+++ b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
@@ -56,7 +56,7 @@ public class LocalStorageTest {
File testFile = new File(classLoader.getResource(SAMPLE_FILE).getFile());
final StorageMetadata metadata = new StorageMetadata(1, 1, "testuser");
- final URI key = localStorage.put(metadata, testFile);
+ final String key = localStorage.put(metadata, testFile);
assertNotNull(key);
log.info("Key URI: " + key);
diff --git a/azkaban-db/src/main/sql/create.project_versions.sql b/azkaban-db/src/main/sql/create.project_versions.sql
index 8336137..3356559 100644
--- a/azkaban-db/src/main/sql/create.project_versions.sql
+++ b/azkaban-db/src/main/sql/create.project_versions.sql
@@ -7,7 +7,7 @@ CREATE TABLE project_versions (
file_name VARCHAR(128),
md5 BINARY(16),
num_chunks INT,
- uri VARCHAR(512) DEFAULT NULL,
+ resource_id VARCHAR(512) DEFAULT NULL,
PRIMARY KEY (project_id, version)
);
diff --git a/azkaban-spi/src/main/java/azkaban/spi/Storage.java b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
index 24497d4..f61af34 100644
--- a/azkaban-spi/src/main/java/azkaban/spi/Storage.java
+++ b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
@@ -19,7 +19,6 @@ package azkaban.spi;
import java.io.File;
import java.io.InputStream;
-import java.net.URI;
/**
@@ -36,11 +35,11 @@ public interface Storage {
/**
* Get an InputStream object by providing a key.
*
- * @param key The key is a URI pointing to the blob in Storage.
+ * @param key The key is a string pointing to the blob in Storage.
* @return InputStream for fetching the blob. null if the key is not found.
*
*/
- InputStream get(URI key);
+ InputStream get(String key);
/**
* Put an object and return a key.
@@ -48,15 +47,15 @@ public interface Storage {
* @param metadata Metadata related to the input stream
* @param localFile Read data from a local file
*
- * @return the URI of the data
+ * @return Key associated with the current object on successful put
*/
- URI put(StorageMetadata metadata, File localFile);
+ String put(StorageMetadata metadata, File localFile);
/**
* Delete an object from Storage.
*
- * @param key The key is a URI pointing to the blob in Storage.
+ * @param key The key is a string pointing to the blob in Storage.
* @return true if delete was successful. false if there was nothing to delete.
*/
- boolean delete(URI key);
+ boolean delete(String key);
}