azkaban-aplcache

create hadoop module (#1516) #1499 makes Hadoop dependencies

10/5/2017 6:43:18 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 165272f..d888ffd 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -16,10 +16,6 @@
  */
 package azkaban;
 
-import static azkaban.Constants.ConfigurationKeys.HADOOP_CONF_DIR_PATH;
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
 import azkaban.db.AzkabanDataSource;
 import azkaban.db.H2FileDataSource;
 import azkaban.db.MySQLDataSource;
@@ -27,23 +23,15 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.project.JdbcProjectImpl;
 import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanException;
 import azkaban.spi.Storage;
 import azkaban.spi.StorageException;
-import azkaban.storage.HdfsAuth;
 import azkaban.storage.StorageImplementationType;
 import azkaban.trigger.JdbcTriggerImpl;
 import azkaban.trigger.TriggerLoader;
 import azkaban.utils.Props;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
-import java.io.File;
-import java.io.IOException;
-import javax.inject.Inject;
-import javax.inject.Singleton;
 import org.apache.commons.dbutils.QueryRunner;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +66,9 @@ public class AzkabanCommonModule extends AbstractModule {
   public Class<? extends Storage> resolveStorageClassType() {
     final StorageImplementationType type = StorageImplementationType
         .from(this.config.getStorageImplementation());
+    if (type == StorageImplementationType.HDFS) {
+      install(new HadoopModule(this.props));
+    }
     if (type != null) {
       return type.getImplementationClass();
     } else {
@@ -103,36 +94,6 @@ public class AzkabanCommonModule extends AbstractModule {
     }
   }
 
-
-  @Inject
-  @Provides
-  @Singleton
-  public Configuration createHadoopConfiguration() {
-    final String hadoopConfDirPath = requireNonNull(this.props.get(HADOOP_CONF_DIR_PATH));
-
-    final File hadoopConfDir = new File(requireNonNull(hadoopConfDirPath));
-    checkArgument(hadoopConfDir.exists() && hadoopConfDir.isDirectory());
-
-    final Configuration hadoopConf = new Configuration(false);
-    hadoopConf.addResource(new org.apache.hadoop.fs.Path(hadoopConfDirPath, "core-site.xml"));
-    hadoopConf.addResource(new org.apache.hadoop.fs.Path(hadoopConfDirPath, "hdfs-site.xml"));
-    hadoopConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
-    return hadoopConf;
-  }
-
-  @Inject
-  @Provides
-  @Singleton
-  public FileSystem createHadoopFileSystem(final Configuration hadoopConf, final HdfsAuth auth) {
-    try {
-      auth.authorize();
-      return FileSystem.get(hadoopConf);
-    } catch (final IOException e) {
-      log.error("Unable to initialize HDFS", e);
-      throw new AzkabanException(e);
-    }
-  }
-
   @Provides
   public QueryRunner createQueryRunner(final AzkabanDataSource dataSource) {
     return new QueryRunner(dataSource);
diff --git a/azkaban-common/src/main/java/azkaban/HadoopModule.java b/azkaban-common/src/main/java/azkaban/HadoopModule.java
new file mode 100644
index 0000000..6884ac8
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/HadoopModule.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban;
+
+import static azkaban.Constants.ConfigurationKeys.HADOOP_CONF_DIR_PATH;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import azkaban.spi.AzkabanException;
+import azkaban.storage.HdfsAuth;
+import azkaban.utils.Props;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import java.io.File;
+import java.io.IOException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Place Hadoop dependencies in this module. Since Hadoop is not included in the Azkaban Runtime
+ * dependency, we only install this module when Hadoop related injection (e.g., HDFS storage) is
+ * needed.
+ */
+public class HadoopModule extends AbstractModule{
+
+  private static final Logger log = LoggerFactory.getLogger(HadoopModule.class);
+  private final Props props;
+
+  HadoopModule(final Props props) {
+    this.props = props;
+  }
+
+  @Inject
+  @Provides
+  @Singleton
+  public Configuration createHadoopConfiguration() {
+    final String hadoopConfDirPath = requireNonNull(this.props.get(HADOOP_CONF_DIR_PATH));
+
+    final File hadoopConfDir = new File(requireNonNull(hadoopConfDirPath));
+    checkArgument(hadoopConfDir.exists() && hadoopConfDir.isDirectory());
+
+    final Configuration hadoopConf = new Configuration(false);
+    hadoopConf.addResource(new org.apache.hadoop.fs.Path(hadoopConfDirPath, "core-site.xml"));
+    hadoopConf.addResource(new org.apache.hadoop.fs.Path(hadoopConfDirPath, "hdfs-site.xml"));
+    hadoopConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+    return hadoopConf;
+  }
+
+  @Inject
+  @Provides
+  @Singleton
+  public FileSystem createHadoopFileSystem(final Configuration hadoopConf, final HdfsAuth auth) {
+    try {
+      auth.authorize();
+      return FileSystem.get(hadoopConf);
+    } catch (final IOException e) {
+      log.error("Unable to initialize HDFS", e);
+      throw new AzkabanException(e);
+    }
+  }
+
+  @Override
+  protected void configure() {
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java b/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
index d401660..9c8c40f 100644
--- a/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
+++ b/azkaban-common/src/test/java/azkaban/ServiceProviderTest.java
@@ -19,14 +19,17 @@ package azkaban;
 
 import static azkaban.ServiceProvider.SERVICE_PROVIDER;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import azkaban.db.DatabaseOperator;
 import azkaban.project.JdbcProjectImpl;
 import azkaban.spi.Storage;
 import azkaban.storage.DatabaseStorage;
+import azkaban.storage.HdfsStorage;
 import azkaban.storage.LocalStorage;
 import azkaban.storage.StorageManager;
 import azkaban.utils.Props;
+import com.google.inject.ConfigurationException;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import java.io.File;
@@ -37,7 +40,9 @@ import org.junit.Test;
 
 public class ServiceProviderTest {
 
-  public static final String AZKABAN_LOCAL_TEST_STORAGE = "AZKABAN_LOCAL_TEST_STORAGE";
+  private static final String AZKABAN_LOCAL_TEST_STORAGE = "AZKABAN_LOCAL_TEST_STORAGE";
+  private static final String AZKABAN_TEST_HDFS_STORAGE_TYPE = "HDFS";
+  private static final String AZKABAN_TEST_STORAGE_HDFS_URI= "hdfs://test.com:9000/azkaban/";
 
   // Test if one class is singletonly guiced. could be called by
   // AZ Common, Web, or Exec Modules.
@@ -71,7 +76,34 @@ public class ServiceProviderTest {
     assertThat(injector.getInstance(StorageManager.class)).isNotNull();
     assertThat(injector.getInstance(DatabaseStorage.class)).isNotNull();
     assertThat(injector.getInstance(LocalStorage.class)).isNotNull();
+    assertThatThrownBy(
+        () -> injector.getInstance(HdfsStorage.class))
+        .isInstanceOf(ConfigurationException.class)
+        .hasMessageContaining("Guice configuration errors");
+
     assertThat(injector.getInstance(Storage.class)).isNotNull();
     assertThat(injector.getInstance(DatabaseOperator.class)).isNotNull();
   }
+
+  @Test
+  public void testHadoopInjection() throws Exception {
+    final Props props = new Props();
+    props.put("database.type", "h2");
+    props.put("h2.path", "h2");
+    props
+        .put(Constants.ConfigurationKeys.AZKABAN_STORAGE_TYPE, AZKABAN_TEST_HDFS_STORAGE_TYPE);
+    props
+        .put(Constants.ConfigurationKeys.HADOOP_CONF_DIR_PATH, "./");
+    props .put(Constants.ConfigurationKeys.AZKABAN_STORAGE_HDFS_ROOT_URI,
+        AZKABAN_TEST_STORAGE_HDFS_URI);
+
+    final Injector injector = Guice.createInjector(
+        new AzkabanCommonModule(props)
+    );
+    SERVICE_PROVIDER.unsetInjector();
+    SERVICE_PROVIDER.setInjector(injector);
+
+    assertSingleton(HdfsStorage.class, injector);
+    assertThat(injector.getInstance(Storage.class)).isNotNull();
+  }
 }
diff --git a/azkaban-solo-server/build.gradle b/azkaban-solo-server/build.gradle
index ae87359..0bda1bc 100644
--- a/azkaban-solo-server/build.gradle
+++ b/azkaban-solo-server/build.gradle
@@ -7,10 +7,6 @@ dependencies {
     compile(project(':azkaban-exec-server'))
 
     runtime deps.h2
-    compile deps.hadoopAnnotations
-    compile deps.hadoopAuth
-    compile deps.hadoopCommon
-    compile deps.hadoopHdfs
 }
 
 installDist.dependsOn ':azkaban-web-server:installDist'