Details
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 7b44419..8dfc10d 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -28,7 +28,6 @@ import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectLoader;
import azkaban.spi.Storage;
import azkaban.spi.StorageException;
-import azkaban.storage.LocalStorage;
import azkaban.storage.StorageImplementationType;
import azkaban.trigger.JdbcTriggerImpl;
import azkaban.trigger.TriggerLoader;
@@ -38,7 +37,6 @@ import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
-import java.io.File;
import javax.sql.DataSource;
import org.apache.commons.dbutils.QueryRunner;
@@ -84,12 +82,6 @@ public class AzkabanCommonModule extends AbstractModule {
}
}
- @Inject
- public @Provides
- LocalStorage createLocalStorage(AzkabanCommonModuleConfig config) {
- return new LocalStorage(new File(config.getLocalStorageBaseDirPath()));
- }
-
// todo kunkun-tang: the below method should moved out to azkaban-db module eventually.
@Inject
@Provides
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
index 8f41693..957efc6 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
@@ -24,7 +24,6 @@ import azkaban.storage.StorageImplementationType;
import azkaban.utils.Props;
import com.google.inject.Inject;
import java.net.URI;
-import java.net.URISyntaxException;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.log4j.Logger;
@@ -47,17 +46,16 @@ public class AzkabanCommonModuleConfig {
*
*/
private String storageImplementation = DATABASE.name();
- private String localStorageBaseDirPath = "AZKABAN_STORAGE";
- private URI hdfsBaseUri = uri("hdfs://localhost:50070/path/to/base/");
+ private String localStorageBaseDirPath = "LOCAL_STORAGE";
+ private URI hdfsRootUri;
@Inject
public AzkabanCommonModuleConfig(Props props) {
this.props = props;
- storageImplementation = props.getString(Constants.ConfigurationKeys.AZKABAN_STORAGE_TYPE,
- storageImplementation);
+ storageImplementation = props.getString(AZKABAN_STORAGE_TYPE, storageImplementation);
localStorageBaseDirPath = props.getString(AZKABAN_STORAGE_LOCAL_BASEDIR, localStorageBaseDirPath);
- hdfsBaseUri = props.getUri(AZKABAN_STORAGE_HDFS_BASEURI, hdfsBaseUri);
+ hdfsRootUri = props.get(AZKABAN_STORAGE_HDFS_ROOT_URI) != null ? props.getUri(AZKABAN_STORAGE_HDFS_ROOT_URI) : null;
}
public Props getProps() {
@@ -72,18 +70,7 @@ public class AzkabanCommonModuleConfig {
return localStorageBaseDirPath;
}
-
-
- public URI getHdfsBaseUri() {
- return hdfsBaseUri;
- }
-
- private static URI uri(String uri){
- try {
- return new URI(uri);
- } catch (URISyntaxException e) {
- log.error(e);
- }
- return null;
+ public URI getHdfsRootUri() {
+ return hdfsRootUri;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index d4b760a..be061d3 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -90,7 +90,7 @@ public class Constants {
public static final String AZKABAN_STORAGE_TYPE = "azkaban.storage.type";
public static final String AZKABAN_STORAGE_LOCAL_BASEDIR = "azkaban.storage.local.basedir";
- public static final String AZKABAN_STORAGE_HDFS_BASEURI = "azkaban.storage.hdfs.baseuri";
+ public static final String AZKABAN_STORAGE_HDFS_ROOT_URI = "azkaban.storage.hdfs.root.uri";
}
public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 91c2f59..c41c9c2 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -360,7 +360,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
try {
/* Update DB with new project info */
- addProjectToProjectVersions(connection, projectId, version, localFile, uploader, null);
+ addProjectToProjectVersions(
+ connection, projectId, version, localFile, uploader, computeHash(localFile), null);
uploadProjectFile(connection, projectId, version, localFile);
@@ -391,9 +392,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
int version,
File localFile,
String uploader,
+ byte[] md5,
String resourceId) throws ProjectManagerException {
try (Connection connection = getConnection()) {
- addProjectToProjectVersions(connection, projectId, version, localFile, uploader, resourceId);
+ addProjectToProjectVersions(connection, projectId, version, localFile, uploader, md5, resourceId);
connection.commit();
} catch (SQLException e) {
logger.error(e);
@@ -426,18 +428,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
int version,
File localFile,
String uploader,
+ byte[] md5,
String resourceId) throws ProjectManagerException {
final long updateTime = System.currentTimeMillis();
QueryRunner runner = new QueryRunner();
- logger.info("Creating message digest for upload " + localFile.getName());
- byte[] md5;
- try {
- md5 = Md5Hasher.md5Hash(localFile);
- } catch (IOException e) {
- throw new ProjectManagerException("Error getting md5 hash.", e);
- }
-
- logger.info("Md5 hash created");
final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
+ "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id) values "
@@ -466,6 +460,19 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
}
}
+ private byte[] computeHash(File localFile) {
+ logger.info("Creating message digest for upload " + localFile.getName());
+ byte[] md5;
+ try {
+ md5 = Md5Hasher.md5Hash(localFile);
+ } catch (IOException e) {
+ throw new ProjectManagerException("Error getting md5 hash.", e);
+ }
+
+ logger.info("Md5 hash created");
+ return md5;
+ }
+
private int uploadFileInChunks(Connection connection, int projectId, int version, File localFile)
throws ProjectManagerException {
QueryRunner runner = new QueryRunner();
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index c03e97c..e1142ed 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -142,10 +142,11 @@ public interface ProjectLoader {
* @param version
* @param localFile
* @param uploader
+ * @param md5
* @param resourceId
* @throws ProjectManagerException
*/
- void addProjectVersion(int projectId, int version, File localFile, String uploader, String resourceId)
+ void addProjectVersion(int projectId, int version, File localFile, String uploader, byte[] md5, String resourceId)
throws ProjectManagerException;
/**
diff --git a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
index 7df7e6b..35a2dcb 100644
--- a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
@@ -17,14 +17,15 @@
package azkaban.storage;
+import azkaban.AzkabanCommonModuleConfig;
import azkaban.spi.Storage;
import azkaban.spi.StorageException;
import azkaban.spi.StorageMetadata;
import azkaban.utils.FileIOUtils;
import com.google.common.io.Files;
+import com.google.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.io.FileUtils;
@@ -36,34 +37,32 @@ import static com.google.common.base.Preconditions.*;
public class LocalStorage implements Storage {
private static final Logger log = Logger.getLogger(LocalStorage.class);
- final File baseDirectory;
+ final File rootDirectory;
- public LocalStorage(File baseDirectory) {
- this.baseDirectory = validateBaseDirectory(createIfDoesNotExist(baseDirectory));
+ @Inject
+ public LocalStorage(AzkabanCommonModuleConfig config) {
+ this.rootDirectory = validateRootDirectory(createIfDoesNotExist(config.getLocalStorageBaseDirPath()));
}
/**
* @param key Relative path of the file from the baseDirectory
*/
@Override
- public InputStream get(String key) {
- try {
- return new FileInputStream(new File(baseDirectory, key));
- } catch (FileNotFoundException e) {
- return null;
- }
+ public InputStream get(String key) throws IOException {
+ return new FileInputStream(new File(rootDirectory, key));
}
@Override
public String put(StorageMetadata metadata, File localFile) {
-
- final File projectDir = new File(baseDirectory, String.valueOf(metadata.getProjectId()));
+ final File projectDir = new File(rootDirectory, String.valueOf(metadata.getProjectId()));
if (projectDir.mkdir()) {
log.info("Created project dir: " + projectDir.getAbsolutePath());
}
- final File targetFile = new File(projectDir,
- metadata.getVersion() + "." + Files.getFileExtension(localFile.getName()));
+ final File targetFile = new File(projectDir, String.format("%s-%s.%s",
+ String.valueOf(metadata.getProjectId()),
+ new String(metadata.getHash()),
+ Files.getFileExtension(localFile.getName())));
if (targetFile.exists()) {
throw new StorageException(String.format(
@@ -80,7 +79,7 @@ public class LocalStorage implements Storage {
}
private String createRelativePath(File targetFile) {
- return baseDirectory.toURI().relativize(targetFile.toURI()).getPath();
+ return rootDirectory.toURI().relativize(targetFile.toURI()).getPath();
}
@Override
@@ -88,7 +87,8 @@ public class LocalStorage implements Storage {
throw new UnsupportedOperationException("delete has not been implemented.");
}
- private static File createIfDoesNotExist(File baseDirectory) {
+ private static File createIfDoesNotExist(String baseDirectoryPath) {
+ final File baseDirectory = new File(baseDirectoryPath);
if(!baseDirectory.exists()) {
baseDirectory.mkdir();
log.info("Creating dir: " + baseDirectory.getAbsolutePath());
@@ -96,7 +96,7 @@ public class LocalStorage implements Storage {
return baseDirectory;
}
- private static File validateBaseDirectory(File baseDirectory) {
+ private static File validateRootDirectory(File baseDirectory) {
checkArgument(baseDirectory.isDirectory());
if (!FileIOUtils.isDirWritable(baseDirectory)) {
throw new IllegalArgumentException("Directory not writable: " + baseDirectory);
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index d1be9f7..ff83f20 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -53,8 +53,8 @@ public class StorageManager {
@Inject
public StorageManager(Props props, Storage storage, ProjectLoader projectLoader) {
this.tempDir = new File(props.getString("project.temp.dir", "temp"));
- this.storage = storage;
- this.projectLoader = projectLoader;
+ this.storage = requireNonNull(storage);
+ this.projectLoader = requireNonNull(projectLoader);
prepareTempDir();
}
@@ -81,11 +81,15 @@ public class StorageManager {
int version,
File localFile,
User uploader) {
+ byte[] md5 = null;
+ if (!(storage instanceof DatabaseStorage)) {
+ md5 = computeHash(localFile);
+ }
final StorageMetadata metadata = new StorageMetadata(
project.getId(),
version,
- uploader.getUserId()
- );
+ uploader.getUserId(),
+ md5);
log.info(String.format("Adding archive to storage. Meta:%s File: %s[%d bytes]",
metadata, localFile.getName(), localFile.length()));
@@ -100,13 +104,24 @@ public class StorageManager {
version,
localFile,
uploader.getUserId(),
- resourceId
+ requireNonNull(md5),
+ requireNonNull(resourceId)
);
log.info(String.format("Added project metadata to DB. Meta:%s File: %s[%d bytes] URI: %s",
metadata, localFile.getName(), localFile.length(), resourceId));
}
}
+ private byte[] computeHash(File localFile) {
+ final byte[] md5;
+ try {
+ md5 = Md5Hasher.md5Hash(localFile);
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ return md5;
+ }
+
/**
* Fetch project file from storage.
*
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 463e2c0..71f9e9b 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -123,7 +123,7 @@ public class MockProjectLoader implements ProjectLoader {
}
@Override
- public void addProjectVersion(int projectId, int version, File localFile, String uploader, String resourceId)
+ public void addProjectVersion(int projectId, int version, File localFile, String uploader, byte[] md5, String resourceId)
throws ProjectManagerException {
}
diff --git a/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java
index 5e61869..69b51b9 100644
--- a/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java
+++ b/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java
@@ -22,7 +22,6 @@ import azkaban.spi.StorageMetadata;
import java.io.File;
import org.junit.Test;
-import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -36,7 +35,7 @@ public class DatabaseStorageTest {
int projectId = 1234;
int version = 1;
String uploader = "testuser";
- final StorageMetadata metadata = new StorageMetadata(projectId, version, uploader);
+ final StorageMetadata metadata = new StorageMetadata(projectId, version, uploader, null);
databaseStorage.put(metadata, file);
verify(projectLoader).uploadProjectFile(projectId, version, file, uploader);
}
diff --git a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
index 61cd1b6..d7b098a 100644
--- a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
+++ b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
@@ -17,10 +17,11 @@
package azkaban.storage;
+import azkaban.AzkabanCommonModuleConfig;
import azkaban.spi.StorageMetadata;
+import azkaban.utils.Md5Hasher;
import java.io.File;
import java.io.InputStream;
-import java.net.URI;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.junit.After;
@@ -28,6 +29,7 @@ import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
public class LocalStorageTest {
@@ -37,12 +39,15 @@ public class LocalStorageTest {
static final String LOCAL_STORAGE = "LOCAL_STORAGE";
static final File BASE_DIRECTORY = new File(LOCAL_STORAGE);
- private final LocalStorage localStorage = new LocalStorage(BASE_DIRECTORY);
+ private LocalStorage localStorage;
@Before
public void setUp() throws Exception {
tearDown();
BASE_DIRECTORY.mkdir();
+ AzkabanCommonModuleConfig config = mock(AzkabanCommonModuleConfig.class);
+ when(config.getLocalStorageBaseDirPath()).thenReturn(LOCAL_STORAGE);
+ localStorage = new LocalStorage(config);
}
@After
@@ -51,11 +56,12 @@ public class LocalStorageTest {
}
@Test
- public void testAll() throws Exception {
+ public void testPutGet() throws Exception {
ClassLoader classLoader = getClass().getClassLoader();
File testFile = new File(classLoader.getResource(SAMPLE_FILE).getFile());
- final StorageMetadata metadata = new StorageMetadata(1, 1, "testuser");
+ final StorageMetadata metadata = new StorageMetadata(
+ 1, 1, "testuser", Md5Hasher.md5Hash(testFile));
final String key = localStorage.put(metadata, testFile);
assertNotNull(key);
log.info("Key URI: " + key);
@@ -63,7 +69,9 @@ public class LocalStorageTest {
File expectedTargetFile = new File(BASE_DIRECTORY, new StringBuilder()
.append(metadata.getProjectId())
.append(File.separator)
- .append(metadata.getVersion())
+ .append(metadata.getProjectId())
+ .append("-")
+ .append(new String(metadata.getHash()))
.append(".zip")
.toString()
);
diff --git a/azkaban-spi/src/main/java/azkaban/spi/Storage.java b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
index f61af34..7c7ed9d 100644
--- a/azkaban-spi/src/main/java/azkaban/spi/Storage.java
+++ b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
@@ -18,6 +18,7 @@
package azkaban.spi;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
@@ -39,7 +40,7 @@ public interface Storage {
* @return InputStream for fetching the blob. null if the key is not found.
*
*/
- InputStream get(String key);
+ InputStream get(String key) throws IOException;
/**
* Put an object and return a key.
diff --git a/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java b/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
index b562344..19f31a5 100644
--- a/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
+++ b/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
@@ -26,11 +26,13 @@ public class StorageMetadata {
private final int projectId;
private final int version;
private final String uploader;
+ private byte[] hash;
- public StorageMetadata(int projectId, int version, String uploader) {
+ public StorageMetadata(int projectId, int version, String uploader, byte[] hash) {
this.projectId = projectId;
this.version = version;
this.uploader = requireNonNull(uploader);
+ this.hash = hash;
}
@Override
@@ -50,6 +52,10 @@ public class StorageMetadata {
return uploader;
}
+ public byte[] getHash() {
+ return hash;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {