azkaban-aplcache

Implemented HDFS Storage (#1069) Implemented HDFS Storage

5/9/2017 7:32:08 PM

Details

diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index 922dbc4..3bef370 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -31,13 +31,14 @@ model {
   }
 }
 
-configurations {
-  testCompileOnly.extendsFrom(compileOnly)
-}
 dependencies {
   compile project(':azkaban-spi')
   compile project(':azkaban-db')
 
+  compile "org.apache.hadoop:hadoop-auth:$hadoopVersion"
+  compile "org.apache.hadoop:hadoop-annotations:$hadoopVersion"
+  compile "org.apache.hadoop:hadoop-common:$hadoopVersion"
+  compile "org.apache.hadoop:hadoop-hdfs:$hadoopVersion"
   compile('com.google.inject:guice:4.1.0')
   compile('com.google.guava:guava:21.0')
   compile('commons-collections:commons-collections:3.2.2')
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 1fbf5b8..f57c2eb 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -25,6 +25,7 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanException;
 import azkaban.spi.Storage;
 import azkaban.spi.StorageException;
 import azkaban.storage.StorageImplementationType;
@@ -36,13 +37,21 @@ import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
+import java.io.File;
+import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import javax.sql.DataSource;
 import org.apache.commons.dbutils.QueryRunner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static azkaban.Constants.ConfigurationKeys.*;
+import static com.google.common.base.Preconditions.*;
+import static java.util.Objects.*;
+
 
 /**
  * This Guice module is currently a one place container for all bindings in the current module. This is intended to
@@ -50,11 +59,13 @@ import org.slf4j.LoggerFactory;
  * structuring of Guice components.
  */
 public class AzkabanCommonModule extends AbstractModule {
-  private static final Logger logger = LoggerFactory.getLogger(AzkabanCommonModule.class);
+  private static final Logger log = LoggerFactory.getLogger(AzkabanCommonModule.class);
 
+  private final Props props;
   private final AzkabanCommonModuleConfig config;
 
   public AzkabanCommonModule(Props props) {
+    this.props = props;
     this.config = new AzkabanCommonModuleConfig(props);
   }
 
@@ -97,7 +108,7 @@ public class AzkabanCommonModule extends AbstractModule {
     if(databaseType.equals("h2")) {
       String path = props.getString("h2.path");
       Path h2DbPath = Paths.get(path).toAbsolutePath();
-      logger.info("h2 DB path: " + h2DbPath);
+      log.info("h2 DB path: " + h2DbPath);
       return new H2FileDataSource(h2DbPath);
     }
     int port = props.getInt("mysql.port");
@@ -107,9 +118,35 @@ public class AzkabanCommonModule extends AbstractModule {
     String password = props.getString("mysql.password");
     int numConnections = props.getInt("mysql.numconnections");
 
-    return MySQLDataSource.getInstance(host, port, database, user, password,
-        numConnections);
+    return MySQLDataSource.getInstance(host, port, database, user, password, numConnections);
+  }
+
+  @Inject
+  @Provides
+  @Singleton
+  public Configuration createHadoopConfiguration() {
+    final String hadoopConfDirPath = requireNonNull(props.get(HADOOP_CONF_DIR_PATH));
+
+    final File hadoopConfDir = new File(requireNonNull(hadoopConfDirPath));
+    checkArgument(hadoopConfDir.exists() && hadoopConfDir.isDirectory());
 
+    final Configuration hadoopConf = new Configuration(false);
+    hadoopConf.addResource(new org.apache.hadoop.fs.Path(hadoopConfDirPath, "core-site.xml"));
+    hadoopConf.addResource(new org.apache.hadoop.fs.Path(hadoopConfDirPath, "hdfs-site.xml"));
+    hadoopConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+    return hadoopConf;
+  }
+
+  @Inject
+  @Provides
+  @Singleton
+  public FileSystem createHadoopFileSystem(final Configuration hadoopConf) {
+    try {
+      return FileSystem.get(hadoopConf);
+    } catch (IOException e) {
+      log.error("Unable to initialize HDFS", e);
+      throw new AzkabanException(e);
+    }
   }
 
   @Provides
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index be061d3..aea1499 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -90,7 +90,10 @@ public class Constants {
 
     public static final String AZKABAN_STORAGE_TYPE = "azkaban.storage.type";
     public static final String AZKABAN_STORAGE_LOCAL_BASEDIR = "azkaban.storage.local.basedir";
+    public static final String HADOOP_CONF_DIR_PATH = "hadoop.conf.dir.path";
     public static final String AZKABAN_STORAGE_HDFS_ROOT_URI = "azkaban.storage.hdfs.root.uri";
+    public static final String AZKABAN_KERBEROS_PRINCIPAL = "azkaban.kerberos.principal";
+    public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
   }
 
   public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java b/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java
new file mode 100644
index 0000000..00ea850
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.storage;
+
+import azkaban.spi.AzkabanException;
+import azkaban.utils.Props;
+import com.google.inject.Inject;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+import static azkaban.Constants.ConfigurationKeys.*;
+import static java.util.Objects.*;
+
+
+/**
+ * This class helps in HDFS authorization and is a wrapper over Hadoop's {@link UserGroupInformation} class.
+ */
+public class HdfsAuth {
+  private static final Logger log = Logger.getLogger(HdfsAuth.class);
+
+  private final boolean isSecurityEnabled;
+
+  private UserGroupInformation loggedInUser = null;
+  private String keytabPath = null;
+  private String keytabPrincipal = null;
+
+  @Inject
+  public HdfsAuth(Props props, Configuration conf) {
+    UserGroupInformation.setConfiguration(conf);
+    isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+    if (isSecurityEnabled) {
+      log.info("The Hadoop cluster has enabled security");
+      keytabPath = requireNonNull(props.getString(AZKABAN_KEYTAB_PATH));
+      keytabPrincipal = requireNonNull(props.getString(AZKABAN_KERBEROS_PRINCIPAL));
+    }
+  }
+
+  /**
+   * API to authorize HDFS access.
+   * This logins in the configured user via the keytab.
+   * If the user is already logged in then it renews the TGT.
+   */
+  public void authorize() {
+    if (isSecurityEnabled) {
+      try {
+        login(keytabPrincipal, keytabPath);
+      } catch (IOException e) {
+        log.error(e);
+        throw new AzkabanException(String.format(
+            "Error: Unable to authorize to Hadoop. Principal: %s Keytab: %s", keytabPrincipal, keytabPath));
+      }
+    }
+  }
+
+  private void login(String keytabPrincipal, String keytabPath) throws IOException {
+    if (loggedInUser == null) {
+      log.info(String.format("Logging in using Principal: %s Keytab: %s", keytabPrincipal, keytabPath));
+
+      UserGroupInformation.loginUserFromKeytab(keytabPrincipal, keytabPath);
+      loggedInUser = UserGroupInformation.getLoginUser();
+      log.info(String.format("User %s logged in.", loggedInUser));
+    } else {
+      log.info(String.format("User %s already logged in. Refreshing TGT", loggedInUser));
+      loggedInUser.checkTGTAndReloginFromKeytab();
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java b/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
index 2157aaa..5bfba8d 100644
--- a/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
@@ -17,28 +17,85 @@
 
 package azkaban.storage;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import azkaban.AzkabanCommonModuleConfig;
 import azkaban.spi.Storage;
+import azkaban.spi.StorageException;
 import azkaban.spi.StorageMetadata;
+import com.google.common.io.Files;
 import com.google.inject.Inject;
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
 
 
 public class HdfsStorage implements Storage {
 
+  private static final Logger log = Logger.getLogger(HdfsStorage.class);
+  private static final String HDFS_SCHEME = "hdfs";
+
+  private final HdfsAuth hdfsAuth;
+  private final URI rootUri;
+  private final FileSystem hdfs;
+
   @Inject
-  public HdfsStorage() {
+  public HdfsStorage(HdfsAuth hdfsAuth, FileSystem hdfs, AzkabanCommonModuleConfig config) {
+    this.hdfsAuth = requireNonNull(hdfsAuth);
+    this.hdfs = requireNonNull(hdfs);
 
+    this.rootUri = config.getHdfsRootUri();
+    requireNonNull(rootUri.getAuthority(), "URI must have host:port mentioned.");
+    checkArgument(HDFS_SCHEME.equals(rootUri.getScheme()));
   }
 
   @Override
-  public InputStream get(String key) {
-    throw new UnsupportedOperationException("Method not implemented");
+  public InputStream get(String key) throws IOException {
+    hdfsAuth.authorize();
+    return hdfs.open(new Path(rootUri.toString(), key));
   }
 
   @Override
   public String put(StorageMetadata metadata, File localFile) {
-    throw new UnsupportedOperationException("Method not implemented");
+    hdfsAuth.authorize();
+    final Path projectsPath = new Path(rootUri.getPath(), String.valueOf(metadata.getProjectId()));
+    try {
+      if (hdfs.mkdirs(projectsPath)) {
+        log.info("Created project dir: " + projectsPath);
+      }
+      final Path targetPath = createTargetPath(metadata, localFile, projectsPath);
+      if (hdfs.exists(targetPath)) {
+        log.info(
+            String.format("Duplicate Found: meta: %s path: %s", metadata, targetPath));
+        return getRelativePath(targetPath);
+      }
+
+      // Copy file to HDFS
+      log.info(String.format("Creating project artifact: meta: %s path: %s", metadata, targetPath));
+      hdfs.copyFromLocalFile(new Path(localFile.getAbsolutePath()), targetPath);
+      return getRelativePath(targetPath);
+    } catch (IOException e) {
+      log.error("error in put(): Metadata: " + metadata);
+      throw new StorageException(e);
+    }
+  }
+
+  private String getRelativePath(Path targetPath) {
+    return URI.create(rootUri.getPath()).relativize(targetPath.toUri()).getPath();
+  }
+
+  private Path createTargetPath(StorageMetadata metadata, File localFile, Path projectsPath) {
+    return new Path(projectsPath, String.format("%s-%s.%s",
+        String.valueOf(metadata.getProjectId()),
+        new String(Hex.encodeHex(metadata.getHash())),
+        Files.getFileExtension(localFile.getName())
+    ));
   }
 
   @Override
diff --git a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
index 35a2dcb..a56253c 100644
--- a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
@@ -17,6 +17,8 @@
 
 package azkaban.storage;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import azkaban.AzkabanCommonModuleConfig;
 import azkaban.spi.Storage;
 import azkaban.spi.StorageException;
@@ -31,17 +33,17 @@ import java.io.InputStream;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
-import static com.google.common.base.Preconditions.*;
-
 
 public class LocalStorage implements Storage {
+
   private static final Logger log = Logger.getLogger(LocalStorage.class);
 
   final File rootDirectory;
 
   @Inject
   public LocalStorage(AzkabanCommonModuleConfig config) {
-    this.rootDirectory = validateRootDirectory(createIfDoesNotExist(config.getLocalStorageBaseDirPath()));
+    this.rootDirectory = validateRootDirectory(
+        createIfDoesNotExist(config.getLocalStorageBaseDirPath()));
   }
 
   /**
@@ -65,20 +67,22 @@ public class LocalStorage implements Storage {
         Files.getFileExtension(localFile.getName())));
 
     if (targetFile.exists()) {
-      throw new StorageException(String.format(
-          "Error in LocalStorage. Target file already exists. targetFile: %s, Metadata: %s",
-          targetFile, metadata));
+      log.info(String.format("Duplicate found: meta: %s, targetFile: %s, ", metadata,
+          targetFile.getAbsolutePath()));
+      return getRelativePath(targetFile);
     }
+
+    // Copy file to storage dir
     try {
       FileUtils.copyFile(localFile, targetFile);
     } catch (IOException e) {
-      log.error("LocalStorage error in put(): Metadata: " + metadata);
+      log.error("LocalStorage error in put(): meta: " + metadata);
       throw new StorageException(e);
     }
-    return createRelativePath(targetFile);
+    return getRelativePath(targetFile);
   }
 
-  private String createRelativePath(File targetFile) {
+  private String getRelativePath(File targetFile) {
     return rootDirectory.toURI().relativize(targetFile.toURI()).getPath();
   }
 
@@ -89,7 +93,7 @@ public class LocalStorage implements Storage {
 
   private static File createIfDoesNotExist(String baseDirectoryPath) {
     final File baseDirectory = new File(baseDirectoryPath);
-    if(!baseDirectory.exists()) {
+    if (!baseDirectory.exists()) {
       baseDirectory.mkdir();
       log.info("Creating dir: " + baseDirectory.getAbsolutePath());
     }
diff --git a/azkaban-common/src/test/java/azkaban/storage/HdfsStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/HdfsStorageTest.java
new file mode 100644
index 0000000..a5dc030
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/storage/HdfsStorageTest.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.storage;
+
+import azkaban.AzkabanCommonModuleConfig;
+import azkaban.spi.StorageMetadata;
+import azkaban.utils.Md5Hasher;
+import java.io.File;
+import java.net.URI;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class HdfsStorageTest {
+  private HdfsAuth hdfsAuth;
+  private HdfsStorage hdfsStorage;
+  private FileSystem hdfs;
+
+  @Before
+  public void setUp() throws Exception {
+    hdfs = mock(FileSystem.class);
+    hdfsAuth = mock(HdfsAuth.class);
+    AzkabanCommonModuleConfig config = mock(AzkabanCommonModuleConfig.class);
+    when(config.getHdfsRootUri()).thenReturn(URI.create("hdfs://localhost:9000/path/to/foo"));
+
+    hdfsStorage = new HdfsStorage(hdfsAuth, hdfs, config);
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    hdfsStorage.get("1/1-hash.zip");
+    verify(hdfs).open(new Path("hdfs://localhost:9000/path/to/foo/1/1-hash.zip"));
+  }
+
+  @Test
+  public void testPut() throws Exception {
+    File file = new File(getClass().getClassLoader().getResource("sample_flow_01.zip").getFile());
+    final String hash = new String(Hex.encodeHex(Md5Hasher.md5Hash(file)));
+
+    when(hdfs.exists(any(Path.class))).thenReturn(false);
+
+    StorageMetadata metadata = new StorageMetadata(1, 2, "uploader", Md5Hasher.md5Hash(file));
+    String key = hdfsStorage.put(metadata, file);
+
+    final String expectedName = String.format("1/1-%s.zip", hash);
+    Assert.assertEquals(expectedName, key);
+
+    final String expectedPath = "/path/to/foo/" + expectedName;
+    verify(hdfs).copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(expectedPath));
+  }
+}