azkaban-aplcache

Storage Refactor. Changed filename to include hash of file

5/8/2017 3:16:01 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 7b44419..8dfc10d 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -28,7 +28,6 @@ import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
 import azkaban.spi.Storage;
 import azkaban.spi.StorageException;
-import azkaban.storage.LocalStorage;
 import azkaban.storage.StorageImplementationType;
 import azkaban.trigger.JdbcTriggerImpl;
 import azkaban.trigger.TriggerLoader;
@@ -38,7 +37,6 @@ import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
-import java.io.File;
 import javax.sql.DataSource;
 import org.apache.commons.dbutils.QueryRunner;
 
@@ -84,12 +82,6 @@ public class AzkabanCommonModule extends AbstractModule {
     }
   }
 
-  @Inject
-  public @Provides
-  LocalStorage createLocalStorage(AzkabanCommonModuleConfig config) {
-    return new LocalStorage(new File(config.getLocalStorageBaseDirPath()));
-  }
-
   // todo kunkun-tang: the below method should moved out to azkaban-db module eventually.
   @Inject
   @Provides
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
index 8f41693..957efc6 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
@@ -24,7 +24,6 @@ import azkaban.storage.StorageImplementationType;
 import azkaban.utils.Props;
 import com.google.inject.Inject;
 import java.net.URI;
-import java.net.URISyntaxException;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.log4j.Logger;
 
@@ -47,17 +46,16 @@ public class AzkabanCommonModuleConfig {
    *
    */
   private String storageImplementation = DATABASE.name();
-  private String localStorageBaseDirPath = "AZKABAN_STORAGE";
-  private URI hdfsBaseUri = uri("hdfs://localhost:50070/path/to/base/");
+  private String localStorageBaseDirPath = "LOCAL_STORAGE";
+  private URI hdfsRootUri;
 
   @Inject
   public AzkabanCommonModuleConfig(Props props) {
     this.props = props;
 
-    storageImplementation = props.getString(Constants.ConfigurationKeys.AZKABAN_STORAGE_TYPE,
-        storageImplementation);
+    storageImplementation = props.getString(AZKABAN_STORAGE_TYPE, storageImplementation);
     localStorageBaseDirPath = props.getString(AZKABAN_STORAGE_LOCAL_BASEDIR, localStorageBaseDirPath);
-    hdfsBaseUri = props.getUri(AZKABAN_STORAGE_HDFS_BASEURI, hdfsBaseUri);
+    hdfsRootUri = props.get(AZKABAN_STORAGE_HDFS_ROOT_URI) != null ? props.getUri(AZKABAN_STORAGE_HDFS_ROOT_URI) : null;
   }
 
   public Props getProps() {
@@ -72,18 +70,7 @@ public class AzkabanCommonModuleConfig {
     return localStorageBaseDirPath;
   }
 
-
-
-  public URI getHdfsBaseUri() {
-    return hdfsBaseUri;
-  }
-
-  private static URI uri(String uri){
-    try {
-      return new URI(uri);
-    } catch (URISyntaxException e) {
-      log.error(e);
-    }
-    return null;
+  public URI getHdfsRootUri() {
+    return hdfsRootUri;
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index d4b760a..be061d3 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -90,7 +90,7 @@ public class Constants {
 
     public static final String AZKABAN_STORAGE_TYPE = "azkaban.storage.type";
     public static final String AZKABAN_STORAGE_LOCAL_BASEDIR = "azkaban.storage.local.basedir";
-    public static final String AZKABAN_STORAGE_HDFS_BASEURI = "azkaban.storage.hdfs.baseuri";
+    public static final String AZKABAN_STORAGE_HDFS_ROOT_URI = "azkaban.storage.hdfs.root.uri";
   }
 
   public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
index 91c2f59..c41c9c2 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectLoader.java
@@ -360,7 +360,8 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
 
     try {
       /* Update DB with new project info */
-      addProjectToProjectVersions(connection, projectId, version, localFile, uploader, null);
+      addProjectToProjectVersions(
+          connection, projectId, version, localFile, uploader, computeHash(localFile), null);
 
       uploadProjectFile(connection, projectId, version, localFile);
 
@@ -391,9 +392,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       int version,
       File localFile,
       String uploader,
+      byte[] md5,
       String resourceId) throws ProjectManagerException {
     try (Connection connection = getConnection()) {
-      addProjectToProjectVersions(connection, projectId, version, localFile, uploader, resourceId);
+      addProjectToProjectVersions(connection, projectId, version, localFile, uploader, md5, resourceId);
       connection.commit();
     } catch (SQLException e) {
       logger.error(e);
@@ -426,18 +428,10 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
       int version,
       File localFile,
       String uploader,
+      byte[] md5,
       String resourceId) throws ProjectManagerException {
     final long updateTime = System.currentTimeMillis();
     QueryRunner runner = new QueryRunner();
-    logger.info("Creating message digest for upload " + localFile.getName());
-    byte[] md5;
-    try {
-      md5 = Md5Hasher.md5Hash(localFile);
-    } catch (IOException e) {
-      throw new ProjectManagerException("Error getting md5 hash.", e);
-    }
-
-    logger.info("Md5 hash created");
 
     final String INSERT_PROJECT_VERSION = "INSERT INTO project_versions "
         + "(project_id, version, upload_time, uploader, file_type, file_name, md5, num_chunks, resource_id) values "
@@ -466,6 +460,19 @@ public class JdbcProjectLoader extends AbstractJdbcLoader implements
     }
   }
 
+  private byte[] computeHash(File localFile) {
+    logger.info("Creating message digest for upload " + localFile.getName());
+    byte[] md5;
+    try {
+      md5 = Md5Hasher.md5Hash(localFile);
+    } catch (IOException e) {
+      throw new ProjectManagerException("Error getting md5 hash.", e);
+    }
+
+    logger.info("Md5 hash created");
+    return md5;
+  }
+
   private int uploadFileInChunks(Connection connection, int projectId, int version, File localFile)
       throws ProjectManagerException {
     QueryRunner runner = new QueryRunner();
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index c03e97c..e1142ed 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -142,10 +142,11 @@ public interface ProjectLoader {
    * @param version
    * @param localFile
    * @param uploader
+   * @param md5
    * @param resourceId
    * @throws ProjectManagerException
    */
-  void addProjectVersion(int projectId, int version, File localFile, String uploader, String resourceId)
+  void addProjectVersion(int projectId, int version, File localFile, String uploader, byte[] md5, String resourceId)
       throws ProjectManagerException;
 
   /**
diff --git a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
index 7df7e6b..35a2dcb 100644
--- a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
@@ -17,14 +17,15 @@
 
 package azkaban.storage;
 
+import azkaban.AzkabanCommonModuleConfig;
 import azkaban.spi.Storage;
 import azkaban.spi.StorageException;
 import azkaban.spi.StorageMetadata;
 import azkaban.utils.FileIOUtils;
 import com.google.common.io.Files;
+import com.google.inject.Inject;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import org.apache.commons.io.FileUtils;
@@ -36,34 +37,32 @@ import static com.google.common.base.Preconditions.*;
 public class LocalStorage implements Storage {
   private static final Logger log = Logger.getLogger(LocalStorage.class);
 
-  final File baseDirectory;
+  final File rootDirectory;
 
-  public LocalStorage(File baseDirectory) {
-    this.baseDirectory = validateBaseDirectory(createIfDoesNotExist(baseDirectory));
+  @Inject
+  public LocalStorage(AzkabanCommonModuleConfig config) {
+    this.rootDirectory = validateRootDirectory(createIfDoesNotExist(config.getLocalStorageBaseDirPath()));
   }
 
   /**
    * @param key Relative path of the file from the baseDirectory
    */
   @Override
-  public InputStream get(String key) {
-    try {
-      return new FileInputStream(new File(baseDirectory, key));
-    } catch (FileNotFoundException e) {
-      return null;
-    }
+  public InputStream get(String key) throws IOException {
+    return new FileInputStream(new File(rootDirectory, key));
   }
 
   @Override
   public String put(StorageMetadata metadata, File localFile) {
-
-    final File projectDir = new File(baseDirectory, String.valueOf(metadata.getProjectId()));
+    final File projectDir = new File(rootDirectory, String.valueOf(metadata.getProjectId()));
     if (projectDir.mkdir()) {
       log.info("Created project dir: " + projectDir.getAbsolutePath());
     }
 
-    final File targetFile = new File(projectDir,
-        metadata.getVersion() + "." + Files.getFileExtension(localFile.getName()));
+    final File targetFile = new File(projectDir, String.format("%s-%s.%s",
+        String.valueOf(metadata.getProjectId()),
+        new String(metadata.getHash()),
+        Files.getFileExtension(localFile.getName())));
 
     if (targetFile.exists()) {
       throw new StorageException(String.format(
@@ -80,7 +79,7 @@ public class LocalStorage implements Storage {
   }
 
   private String createRelativePath(File targetFile) {
-    return baseDirectory.toURI().relativize(targetFile.toURI()).getPath();
+    return rootDirectory.toURI().relativize(targetFile.toURI()).getPath();
   }
 
   @Override
@@ -88,7 +87,8 @@ public class LocalStorage implements Storage {
     throw new UnsupportedOperationException("delete has not been implemented.");
   }
 
-  private static File createIfDoesNotExist(File baseDirectory) {
+  private static File createIfDoesNotExist(String baseDirectoryPath) {
+    final File baseDirectory = new File(baseDirectoryPath);
     if(!baseDirectory.exists()) {
       baseDirectory.mkdir();
       log.info("Creating dir: " + baseDirectory.getAbsolutePath());
@@ -96,7 +96,7 @@ public class LocalStorage implements Storage {
     return baseDirectory;
   }
 
-  private static File validateBaseDirectory(File baseDirectory) {
+  private static File validateRootDirectory(File baseDirectory) {
     checkArgument(baseDirectory.isDirectory());
     if (!FileIOUtils.isDirWritable(baseDirectory)) {
       throw new IllegalArgumentException("Directory not writable: " + baseDirectory);
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
index d1be9f7..ff83f20 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageManager.java
@@ -53,8 +53,8 @@ public class StorageManager {
   @Inject
   public StorageManager(Props props, Storage storage, ProjectLoader projectLoader) {
     this.tempDir = new File(props.getString("project.temp.dir", "temp"));
-    this.storage = storage;
-    this.projectLoader = projectLoader;
+    this.storage = requireNonNull(storage);
+    this.projectLoader = requireNonNull(projectLoader);
 
     prepareTempDir();
   }
@@ -81,11 +81,15 @@ public class StorageManager {
       int version,
       File localFile,
       User uploader) {
+    byte[] md5 = null;
+    if (!(storage instanceof DatabaseStorage)) {
+      md5 = computeHash(localFile);
+    }
     final StorageMetadata metadata = new StorageMetadata(
         project.getId(),
         version,
-        uploader.getUserId()
-    );
+        uploader.getUserId(),
+        md5);
     log.info(String.format("Adding archive to storage. Meta:%s File: %s[%d bytes]",
         metadata, localFile.getName(), localFile.length()));
 
@@ -100,13 +104,24 @@ public class StorageManager {
           version,
           localFile,
           uploader.getUserId(),
-          resourceId
+          requireNonNull(md5),
+          requireNonNull(resourceId)
       );
       log.info(String.format("Added project metadata to DB. Meta:%s File: %s[%d bytes] URI: %s",
           metadata, localFile.getName(), localFile.length(), resourceId));
     }
   }
 
+  private byte[] computeHash(File localFile) {
+    final byte[] md5;
+    try {
+      md5 = Md5Hasher.md5Hash(localFile);
+    } catch (IOException e) {
+      throw new StorageException(e);
+    }
+    return md5;
+  }
+
   /**
    * Fetch project file from storage.
    *
diff --git a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
index 463e2c0..71f9e9b 100644
--- a/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
+++ b/azkaban-common/src/test/java/azkaban/project/MockProjectLoader.java
@@ -123,7 +123,7 @@ public class MockProjectLoader implements ProjectLoader {
   }
 
   @Override
-  public void addProjectVersion(int projectId, int version, File localFile, String uploader, String resourceId)
+  public void addProjectVersion(int projectId, int version, File localFile, String uploader, byte[] md5, String resourceId)
       throws ProjectManagerException {
 
   }
diff --git a/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java
index 5e61869..69b51b9 100644
--- a/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java
+++ b/azkaban-common/src/test/java/azkaban/storage/DatabaseStorageTest.java
@@ -22,7 +22,6 @@ import azkaban.spi.StorageMetadata;
 import java.io.File;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 
@@ -36,7 +35,7 @@ public class DatabaseStorageTest {
     int projectId = 1234;
     int version = 1;
     String uploader = "testuser";
-    final StorageMetadata metadata = new StorageMetadata(projectId, version, uploader);
+    final StorageMetadata metadata = new StorageMetadata(projectId, version, uploader, null);
     databaseStorage.put(metadata, file);
     verify(projectLoader).uploadProjectFile(projectId, version, file, uploader);
   }
diff --git a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
index 61cd1b6..d7b098a 100644
--- a/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
+++ b/azkaban-common/src/test/java/azkaban/storage/LocalStorageTest.java
@@ -17,10 +17,11 @@
 
 package azkaban.storage;
 
+import azkaban.AzkabanCommonModuleConfig;
 import azkaban.spi.StorageMetadata;
+import azkaban.utils.Md5Hasher;
 import java.io.File;
 import java.io.InputStream;
-import java.net.URI;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 import org.junit.After;
@@ -28,6 +29,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 
 public class LocalStorageTest {
@@ -37,12 +39,15 @@ public class LocalStorageTest {
   static final String LOCAL_STORAGE = "LOCAL_STORAGE";
   static final File BASE_DIRECTORY = new File(LOCAL_STORAGE);
 
-  private final LocalStorage localStorage = new LocalStorage(BASE_DIRECTORY);
+  private LocalStorage localStorage;
 
   @Before
   public void setUp() throws Exception {
     tearDown();
     BASE_DIRECTORY.mkdir();
+    AzkabanCommonModuleConfig config = mock(AzkabanCommonModuleConfig.class);
+    when(config.getLocalStorageBaseDirPath()).thenReturn(LOCAL_STORAGE);
+    localStorage = new LocalStorage(config);
   }
 
   @After
@@ -51,11 +56,12 @@ public class LocalStorageTest {
   }
 
   @Test
-  public void testAll() throws Exception {
+  public void testPutGet() throws Exception {
     ClassLoader classLoader = getClass().getClassLoader();
     File testFile = new File(classLoader.getResource(SAMPLE_FILE).getFile());
 
-    final StorageMetadata metadata = new StorageMetadata(1, 1, "testuser");
+    final StorageMetadata metadata = new StorageMetadata(
+        1, 1, "testuser", Md5Hasher.md5Hash(testFile));
     final String key = localStorage.put(metadata, testFile);
     assertNotNull(key);
     log.info("Key URI: " + key);
@@ -63,7 +69,9 @@ public class LocalStorageTest {
     File expectedTargetFile = new File(BASE_DIRECTORY, new StringBuilder()
         .append(metadata.getProjectId())
         .append(File.separator)
-        .append(metadata.getVersion())
+        .append(metadata.getProjectId())
+        .append("-")
+        .append(new String(metadata.getHash()))
         .append(".zip")
         .toString()
     );
diff --git a/azkaban-spi/src/main/java/azkaban/spi/Storage.java b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
index f61af34..7c7ed9d 100644
--- a/azkaban-spi/src/main/java/azkaban/spi/Storage.java
+++ b/azkaban-spi/src/main/java/azkaban/spi/Storage.java
@@ -18,6 +18,7 @@
 package azkaban.spi;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 
 
@@ -39,7 +40,7 @@ public interface Storage {
    * @return InputStream for fetching the blob. null if the key is not found.
    *
    */
-  InputStream get(String key);
+  InputStream get(String key) throws IOException;
 
   /**
    * Put an object and return a key.
diff --git a/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java b/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
index b562344..19f31a5 100644
--- a/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
+++ b/azkaban-spi/src/main/java/azkaban/spi/StorageMetadata.java
@@ -26,11 +26,13 @@ public class StorageMetadata {
   private final int projectId;
   private final int version;
   private final String uploader;
+  private byte[] hash;
 
-  public StorageMetadata(int projectId, int version, String uploader) {
+  public StorageMetadata(int projectId, int version, String uploader, byte[] hash) {
     this.projectId = projectId;
     this.version = version;
     this.uploader = requireNonNull(uploader);
+    this.hash = hash;
   }
 
   @Override
@@ -50,6 +52,10 @@ public class StorageMetadata {
     return uploader;
   }
 
+  public byte[] getHash() {
+    return hash;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {