diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 7a78873..b7d165d 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -360,7 +360,7 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
try {
/* Update DB with new project info */
- addProjectToProjectVersions(connection, projectId, version, localFile, uploader);
+ addProjectToProjectVersions(connection, projectId, version, localFile, uploader, null);
uploadProjectFile(connection, projectId, version, localFile);
@@ -385,6 +385,22 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
updateChunksInProjectVersions(connection, projectId, version, chunks);
}
+ public void addProjectVersion(
+ int projectId,
+ int version,
+ File localFile,
+ String uploader,
+ String uri) throws ProjectManagerException {
+ try (Connection connection = getConnection()) {
+ addProjectToProjectVersions(connection, projectId, version, localFile, uploader, uri);
+ connection.commit();
+ } catch (SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException(String.format("Add ProjectVersion failed. project id: %d version: %d",
+ projectId, version), e);
+ }
+ }
+
/**
* Insert a new version record to TABLE project_versions before uploading files.
*
@@ -408,7 +424,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
int projectId,
int version,
File localFile,
- String uploader) throws ProjectManagerException {
+ String uploader,
+ String uri) throws ProjectManagerException {
final long updateTime = System.currentTimeMillis();
QueryRunner runner = new QueryRunner();
logger.info("Creating message digest for upload " + localFile.getName());
@@ -422,11 +439,11 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
logger.info("Md5 hash created");
final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
- + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks) values "
- + "(?,?,?,?,?,?,?,?)";
+ + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, uri) values "
+ + "(?,?,?,?,?,?,?,?,?)";
try {
- /**
+ /*
* As we don't know the num_chunks before uploading the file, we initialize it to 0,
* and will update it after uploading completes.
*/
@@ -439,7 +456,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
Files.getFileExtension(localFile.getName()),
localFile.getName(),
md5,
- 0);
+ 0,
+ uri);
} catch (SQLException e) {
String msg = String.format("Error initializing project id: %d version: %d ", projectId, version);
logger.error(msg, e);
@@ -530,42 +548,41 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
return handler;
}
- private ProjectFileHandler getUploadedFile(Connection connection,
- int projectId, int version) throws ProjectManagerException {
- QueryRunner runner = new QueryRunner();
+ public ProjectFileHandler fetchProjectMetaData(int projectId, int version) {
ProjectVersionResultHandler pfHandler = new ProjectVersionResultHandler();
- List<ProjectFileHandler> projectFiles = null;
- try {
- projectFiles =
- runner.query(connection,
- ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler,
- projectId, version);
+ try (Connection connection = getConnection()) {
+ List<ProjectFileHandler> projectFiles = new QueryRunner().query(connection,
+ ProjectVersionResultHandler.SELECT_PROJECT_VERSION, pfHandler, projectId, version);
+ if (projectFiles == null || projectFiles.isEmpty()) {
+ return null;
+ }
+ return projectFiles.get(0);
} catch (SQLException e) {
logger.error(e);
- throw new ProjectManagerException(
- "Query for uploaded file for project id " + projectId + " failed.", e);
+ throw new ProjectManagerException("Query for uploaded file for project id " + projectId + " failed.", e);
}
- if (projectFiles == null || projectFiles.isEmpty()) {
+ }
+
+ private ProjectFileHandler getUploadedFile(Connection connection,
+ int projectId, int version) throws ProjectManagerException {
+ ProjectFileHandler projHandler = fetchProjectMetaData(projectId, version);
+ if (projHandler == null) {
return null;
}
-
- ProjectFileHandler projHandler = projectFiles.get(0);
int numChunks = projHandler.getNumChunks();
BufferedOutputStream bStream = null;
- File file = null;
+ File file;
try {
try {
- file =
- File.createTempFile(projHandler.getFileName(),
- String.valueOf(version), tempDir);
-
+ file = File.createTempFile(projHandler.getFileName(), String.valueOf(version), tempDir);
bStream = new BufferedOutputStream(new FileOutputStream(file));
} catch (IOException e) {
throw new ProjectManagerException(
"Error creating temp file for stream.");
}
+ QueryRunner runner = new QueryRunner();
int collect = 5;
int fromChunk = 0;
int toChunk = collect;
@@ -1517,10 +1534,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
}
- private static class ProjectVersionResultHandler implements
- ResultSetHandler<List<ProjectFileHandler>> {
+ private static class ProjectVersionResultHandler implements ResultSetHandler<List<ProjectFileHandler>> {
private static String SELECT_PROJECT_VERSION =
- "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks FROM project_versions WHERE project_id=? AND version=?";
+ "SELECT project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, uri "
+ + "FROM project_versions WHERE project_id=? AND version=?";
@Override
public List<ProjectFileHandler> handle(ResultSet rs) throws SQLException {
@@ -1538,10 +1555,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
String fileName = rs.getString(6);
byte[] md5 = rs.getBytes(7);
int numChunks = rs.getInt(8);
+ String uri = rs.getString(9);
ProjectFileHandler handler =
- new ProjectFileHandler(projectId, version, uploadTime, uploader,
- fileType, fileName, numChunks, md5);
+ new ProjectFileHandler(projectId, version, uploadTime, uploader, fileType, fileName, numChunks, md5, uri);
handlers.add(handler);
} while (rs.next());
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java b/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
index 9412bee..09e0c7b 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectFileHandler.java
@@ -22,24 +22,34 @@ public class ProjectFileHandler {
private final int projectId;
private final int version;
private final long uploadTime;
- private String fileType;
- private String fileName;
- private String uploader;
- private byte[] md5Hash;
- private int numChunks;
+ private final String fileType;
+ private final String fileName;
+ private final String uploader;
+ private final byte[] md5Hash;
+ private final int numChunks;
+ private final String uri;
+
private File localFile = null;
- public ProjectFileHandler(int projectId, int version, long uploadTime,
- String uploader, String fileType, String fileName, int numChunks,
- byte[] md5Hash) {
+ public ProjectFileHandler(
+ int projectId,
+ int version,
+ long uploadTime,
+ String uploader,
+ String fileType,
+ String fileName,
+ int numChunks,
+ byte[] md5Hash,
+ String uri) {
this.projectId = projectId;
this.version = version;
this.uploadTime = uploadTime;
- this.setUploader(uploader);
- this.setFileType(fileType);
- this.setFileName(fileName);
- this.setMd5Hash(md5Hash);
- this.setNumChunks(numChunks);
+ this.uploader = uploader;
+ this.fileType = fileType;
+ this.fileName = fileName;
+ this.md5Hash = md5Hash;
+ this.numChunks = numChunks;
+ this.uri = uri;
}
public int getProjectId() {
@@ -58,26 +68,14 @@ public class ProjectFileHandler {
return fileType;
}
- public void setFileType(String fileType) {
- this.fileType = fileType;
- }
-
public String getFileName() {
return fileName;
}
- public void setFileName(String fileName) {
- this.fileName = fileName;
- }
-
public byte[] getMd5Hash() {
return md5Hash;
}
- public void setMd5Hash(byte[] md5Hash) {
- this.md5Hash = md5Hash;
- }
-
public File getLocalFile() {
return localFile;
}
@@ -87,9 +85,7 @@ public class ProjectFileHandler {
}
public synchronized void deleteLocalFile() {
- if (localFile == null) {
- return;
- } else {
+ if (localFile != null) {
localFile.delete();
localFile = null;
}
@@ -99,16 +95,11 @@ public class ProjectFileHandler {
return uploader;
}
- public void setUploader(String uploader) {
- this.uploader = uploader;
- }
-
public int getNumChunks() {
return numChunks;
}
- public void setNumChunks(int numChunks) {
- this.numChunks = numChunks;
+ public String getUri() {
+ return uri;
}
-
}
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index a04f189..9f3e10a 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -19,13 +19,27 @@ package azkaban.storage;
import azkaban.project.Project;
import azkaban.project.ProjectFileHandler;
+import azkaban.project.ProjectLoader;
import azkaban.spi.Storage;
+import azkaban.spi.StorageException;
import azkaban.spi.StorageMetadata;
import azkaban.user.User;
+import azkaban.utils.Md5Hasher;
+import azkaban.utils.Props;
import com.google.inject.Inject;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
+import static com.google.common.base.Preconditions.*;
+import static java.util.Objects.*;
+
/**
* StorageManager manages and coordinates all interactions with the Storage layer. This also includes bookkeeping
@@ -35,10 +49,23 @@ public class StorageManager {
private static final Logger log = Logger.getLogger(StorageManager.class);
private final Storage storage;
+ private final ProjectLoader projectLoader;
+ private final File tempDir;
@Inject
- public StorageManager(Storage storage) {
+ public StorageManager(Props props, Storage storage, ProjectLoader projectLoader) {
+ this.tempDir = new File(props.getString("project.temp.dir", "temp"));
this.storage = storage;
+ this.projectLoader = projectLoader;
+
+ prepareTempDir();
+ }
+
+ private void prepareTempDir() {
+ if (!tempDir.exists()) {
+ tempDir.mkdirs();
+ }
+ checkArgument(tempDir.isDirectory());
}
/**
@@ -63,7 +90,23 @@ public class StorageManager {
);
log.info(String.format("Adding archive to storage. Meta:%s File: %s[%d bytes]",
metadata, localFile.getName(), localFile.length()));
- storage.put(metadata, localFile);
+
+ /* upload to storage */
+ URI uri = storage.put(metadata, localFile);
+
+ /* Add metadata to db */
+ // TODO spyne: remove hack. Database storage should go through the same flow
+ if (!(storage instanceof DatabaseStorage)) {
+ projectLoader.addProjectVersion(
+ project.getId(),
+ version,
+ localFile,
+ uploader.getUserId(),
+ uri.toString()
+ );
+ log.info(String.format("Added project metadata to DB. Meta:%s File: %s[%d bytes] URI: %s",
+ metadata, localFile.getName(), localFile.length(), uri));
+ }
}
/**
@@ -79,6 +122,44 @@ public class StorageManager {
if (storage instanceof DatabaseStorage) {
return ((DatabaseStorage) storage).get(projectId, version);
}
- throw new UnsupportedOperationException("Operation currently unsupported for other types.");
+
+ /* Fetch meta data from db */
+ final ProjectFileHandler pfh = projectLoader.fetchProjectMetaData(projectId, version);
+
+ /* Fetch project file from storage and copy to local file */
+ final String uri = requireNonNull(pfh.getUri(), String.format("URI is null. project ID: %d version: %d",
+ pfh.getProjectId(), pfh.getVersion()));
+ try (InputStream is = storage.get(new URI(uri))){
+ final File file = createTempOutputFile(pfh);
+
+ /* Copy from storage to output stream */
+ try (FileOutputStream fos = new FileOutputStream(file)) {
+ IOUtils.copy(is, fos);
+ }
+
+ /* Validate checksum */
+ validateChecksum(file, pfh);
+
+ /* Attach file to handler */
+ pfh.setLocalFile(file);
+
+ return pfh;
+ } catch (URISyntaxException | IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ private void validateChecksum(File file, ProjectFileHandler pfh) throws IOException {
+ final byte[] hash = Md5Hasher.md5Hash(file);
+ checkState(Arrays.equals(pfh.getMd5Hash(), hash),
+ String.format("MD5 HASH Failed. project ID: %d version: %d Expected: %s Actual: %s",
+ pfh.getProjectId(), pfh.getVersion(), new String(pfh.getMd5Hash()), new String(hash))
+ );
+ }
+
+ private File createTempOutputFile(ProjectFileHandler projectFileHandler) throws IOException {
+ return File.createTempFile(
+ projectFileHandler.getFileName(),
+ String.valueOf(projectFileHandler.getVersion()), tempDir);
}
}