azkaban-aplcache

Refactor storage code. Added HDFS storage stub (#1022) Refactor:

4/26/2017 7:15:31 PM

Details

diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index ed9b316..ce0ee92 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -31,6 +31,9 @@ model {
   }
 }
 
+configurations {
+  testCompileOnly.extendsFrom(compileOnly)
+}
 dependencies {
   compile project(':azkaban-spi')
 
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 3c5722a..817c04f 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -21,17 +21,14 @@ import azkaban.project.ProjectLoader;
 import azkaban.spi.Storage;
 import azkaban.spi.StorageException;
 import azkaban.storage.LocalStorage;
-import azkaban.storage.StorageConfig;
 import azkaban.storage.StorageImplementationType;
 import azkaban.utils.Props;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
-import com.google.inject.Singleton;
 import java.io.File;
 
-import static azkaban.storage.StorageImplementationType.*;
 
 /**
  * This Guice module is currently a one place container for all bindings in the current module. This is intended to
@@ -39,36 +36,25 @@ import static azkaban.storage.StorageImplementationType.*;
  * structuring of Guice components.
  */
 public class AzkabanCommonModule extends AbstractModule {
-  private final Props props;
-  /**
-   * Storage Implementation
-   * This can be any of the {@link StorageImplementationType} values in which case {@link StorageFactory} will create
-   * the appropriate storage instance. Or one can feed in a custom implementation class using the full qualified
-   * path required by a classloader.
-   *
-   * examples: LOCAL, DATABASE, azkaban.storage.MyFavStorage
-   *
-   */
-  private final String storageImplementation;
+  private final AzkabanCommonModuleConfig config;
 
   public AzkabanCommonModule(Props props) {
-    this.props = props;
-    this.storageImplementation = props.getString(Constants.ConfigurationKeys.AZKABAN_STORAGE_TYPE, DATABASE.name());
+    this.config = new AzkabanCommonModuleConfig(props);
   }
 
   @Override
   protected void configure() {
     bind(ProjectLoader.class).to(JdbcProjectLoader.class).in(Scopes.SINGLETON);
-    bind(Props.class).toInstance(props);
+    bind(Props.class).toInstance(config.getProps());
     bind(Storage.class).to(resolveStorageClassType()).in(Scopes.SINGLETON);
   }
 
   public Class<? extends Storage> resolveStorageClassType() {
-    final StorageImplementationType type = StorageImplementationType.from(storageImplementation);
+    final StorageImplementationType type = StorageImplementationType.from(config.getStorageImplementation());
     if (type != null) {
       return type.getImplementationClass();
     } else {
-      return loadCustomStorageClass(storageImplementation);
+      return loadCustomStorageClass(config.getStorageImplementation());
     }
   }
 
@@ -83,7 +69,7 @@ public class AzkabanCommonModule extends AbstractModule {
 
   @Inject
   public @Provides
-  LocalStorage createLocalStorage(StorageConfig config) {
-    return new LocalStorage(new File(config.getBaseDirectoryPath()));
+  LocalStorage createLocalStorage(AzkabanCommonModuleConfig config) {
+    return new LocalStorage(new File(config.getLocalStorageBaseDirPath()));
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
new file mode 100644
index 0000000..015e86d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModuleConfig.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban;
+
+import azkaban.storage.StorageImplementationType;
+import azkaban.utils.Props;
+import com.google.inject.Inject;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.log4j.Logger;
+
+import static azkaban.Constants.ConfigurationKeys.*;
+import static azkaban.storage.StorageImplementationType.*;
+
+
+public class AzkabanCommonModuleConfig {
+  private static final Logger log = Logger.getLogger(AzkabanCommonModuleConfig.class);
+
+  private final Props props;
+
+  /**
+   * Storage Implementation
+   * This can be any of the {@link StorageImplementationType} values in which case {@link StorageFactory} will create
+   * the appropriate storage instance. Or one can feed in a custom implementation class using the full qualified
+   * path required by a classloader.
+   *
+   * examples: LOCAL, DATABASE, azkaban.storage.MyFavStorage
+   *
+   */
+  private String storageImplementation = DATABASE.name();
+  private String localStorageBaseDirPath = "AZKABAN_STORAGE";
+  private URI hdfsBaseUri = uri("hdfs://localhost:50070/path/to/base/");
+
+  @Inject
+  public AzkabanCommonModuleConfig(Props props) {
+    this.props = props;
+
+    storageImplementation = props.getString(Constants.ConfigurationKeys.AZKABAN_STORAGE_TYPE,
+        storageImplementation);
+    localStorageBaseDirPath = props.getString(AZKABAN_STORAGE_LOCAL_BASEDIR, localStorageBaseDirPath);
+    hdfsBaseUri = props.getUri(AZKABAN_STORAGE_HDFS_BASEURI, hdfsBaseUri);
+  }
+
+  public Props getProps() {
+    return props;
+  }
+
+  public String getStorageImplementation() {
+    return storageImplementation;
+  }
+
+  public String getLocalStorageBaseDirPath() {
+    return localStorageBaseDirPath;
+  }
+
+  public URI getHdfsBaseUri() {
+    return hdfsBaseUri;
+  }
+
+  private static URI uri(String uri){
+    try {
+      return new URI(uri);
+    } catch (URISyntaxException e) {
+      log.error(e);
+    }
+    return null;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 663475b..d4b760a 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -89,7 +89,8 @@ public class Constants {
     public static final String AZKABAN_MAX_FLOW_RUNNING_MINS = "azkaban.server.flow.max.running.minutes";
 
     public static final String AZKABAN_STORAGE_TYPE = "azkaban.storage.type";
-    public static final String AZKABAN_STORAGE_LOCAL_BASEDIRECTORY = "azkaban.storage.local.basedirectory";
+    public static final String AZKABAN_STORAGE_LOCAL_BASEDIR = "azkaban.storage.local.basedir";
+    public static final String AZKABAN_STORAGE_HDFS_BASEURI = "azkaban.storage.hdfs.baseuri";
   }
 
   public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/storage/StorageImplementationType.java b/azkaban-common/src/main/java/azkaban/storage/StorageImplementationType.java
index fddd550..4c5a074 100644
--- a/azkaban-common/src/main/java/azkaban/storage/StorageImplementationType.java
+++ b/azkaban-common/src/main/java/azkaban/storage/StorageImplementationType.java
@@ -22,6 +22,7 @@ import azkaban.spi.Storage;
 
 public enum StorageImplementationType {
   LOCAL(LocalStorage.class),
+  HDFS(HdfsStorage.class),
   DATABASE(DatabaseStorage.class);
 
   private final Class<? extends Storage> implementationClass;
diff --git a/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java b/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
index c84ee8a..531c2da 100644
--- a/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
+++ b/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
@@ -48,7 +48,7 @@ public class ServiceProviderTest {
     Props props = new Props();
     props.put("database.type", "h2");
     props.put("h2.path", "h2");
-    props.put(Constants.ConfigurationKeys.AZKABAN_STORAGE_LOCAL_BASEDIRECTORY, AZKABAN_LOCAL_TEST_STORAGE);
+    props.put(Constants.ConfigurationKeys.AZKABAN_STORAGE_LOCAL_BASEDIR, AZKABAN_LOCAL_TEST_STORAGE);
 
 
     Injector injector = Guice.createInjector(
diff --git a/azkaban-hadoop-security-plugin/build.gradle b/azkaban-hadoop-security-plugin/build.gradle
index 3a91289..7a9e849 100644
--- a/azkaban-hadoop-security-plugin/build.gradle
+++ b/azkaban-hadoop-security-plugin/build.gradle
@@ -1,12 +1,10 @@
 apply plugin: 'distribution'
 
-ext.hadoopVersion = "2.6.1"
-
 dependencies {
   compile project(":azkaban-common")
 
   compileOnly "org.apache.hadoop:hadoop-common:$hadoopVersion"
   compileOnly "org.apache.hadoop:hadoop-mapreduce-client-common:$hadoopVersion"
   compileOnly "org.apache.hadoop:hadoop-mapreduce-client-core:$hadoopVersion"
-  compileOnly "org.apache.hive:hive-metastore:0.13.1"
+  compileOnly "org.apache.hive:hive-metastore:$hiveVersion"
 }
diff --git a/gradle.properties b/gradle.properties
index ed84481..35e953b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -20,3 +20,10 @@ org.gradle.parallel=true
 
 #Allows generation of idea/eclipse metadata for a specific subproject and its upstream project dependencies
 ide.recursive=true
+
+
+#---------------------------------
+# Versions
+#---------------------------------
+hadoopVersion=2.6.1
+hiveVersion=0.13.1