azkaban-aplcache
Changes
azkaban-common/build.gradle 7(+4 -3)
Details
azkaban-common/build.gradle 7(+4 -3)
diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index 922dbc4..3bef370 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -31,13 +31,14 @@ model {
}
}
-configurations {
- testCompileOnly.extendsFrom(compileOnly)
-}
dependencies {
compile project(':azkaban-spi')
compile project(':azkaban-db')
+ compile "org.apache.hadoop:hadoop-auth:$hadoopVersion"
+ compile "org.apache.hadoop:hadoop-annotations:$hadoopVersion"
+ compile "org.apache.hadoop:hadoop-common:$hadoopVersion"
+ compile "org.apache.hadoop:hadoop-hdfs:$hadoopVersion"
compile('com.google.inject:guice:4.1.0')
compile('com.google.guava:guava:21.0')
compile('commons-collections:commons-collections:3.2.2')
diff --git a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
index 1fbf5b8..f57c2eb 100644
--- a/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
+++ b/azkaban-common/src/main/java/azkaban/AzkabanCommonModule.java
@@ -25,6 +25,7 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectLoader;
+import azkaban.spi.AzkabanException;
import azkaban.spi.Storage;
import azkaban.spi.StorageException;
import azkaban.storage.StorageImplementationType;
@@ -36,13 +37,21 @@ import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
+import java.io.File;
+import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import javax.sql.DataSource;
import org.apache.commons.dbutils.QueryRunner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static azkaban.Constants.ConfigurationKeys.*;
+import static com.google.common.base.Preconditions.*;
+import static java.util.Objects.*;
+
/**
* This Guice module is currently a one place container for all bindings in the current module. This is intended to
@@ -50,11 +59,13 @@ import org.slf4j.LoggerFactory;
* structuring of Guice components.
*/
public class AzkabanCommonModule extends AbstractModule {
- private static final Logger logger = LoggerFactory.getLogger(AzkabanCommonModule.class);
+ private static final Logger log = LoggerFactory.getLogger(AzkabanCommonModule.class);
+ private final Props props;
private final AzkabanCommonModuleConfig config;
public AzkabanCommonModule(Props props) {
+ this.props = props;
this.config = new AzkabanCommonModuleConfig(props);
}
@@ -97,7 +108,7 @@ public class AzkabanCommonModule extends AbstractModule {
if(databaseType.equals("h2")) {
String path = props.getString("h2.path");
Path h2DbPath = Paths.get(path).toAbsolutePath();
- logger.info("h2 DB path: " + h2DbPath);
+ log.info("h2 DB path: " + h2DbPath);
return new H2FileDataSource(h2DbPath);
}
int port = props.getInt("mysql.port");
@@ -107,9 +118,35 @@ public class AzkabanCommonModule extends AbstractModule {
String password = props.getString("mysql.password");
int numConnections = props.getInt("mysql.numconnections");
- return MySQLDataSource.getInstance(host, port, database, user, password,
- numConnections);
+ return MySQLDataSource.getInstance(host, port, database, user, password, numConnections);
+ }
+
+ @Inject
+ @Provides
+ @Singleton
+ public Configuration createHadoopConfiguration() {
+ final String hadoopConfDirPath = requireNonNull(props.get(HADOOP_CONF_DIR_PATH));
+
+ final File hadoopConfDir = new File(requireNonNull(hadoopConfDirPath));
+ checkArgument(hadoopConfDir.exists() && hadoopConfDir.isDirectory());
+ final Configuration hadoopConf = new Configuration(false);
+ hadoopConf.addResource(new org.apache.hadoop.fs.Path(hadoopConfDirPath, "core-site.xml"));
+ hadoopConf.addResource(new org.apache.hadoop.fs.Path(hadoopConfDirPath, "hdfs-site.xml"));
+ hadoopConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+ return hadoopConf;
+ }
+
+ @Inject
+ @Provides
+ @Singleton
+ public FileSystem createHadoopFileSystem(final Configuration hadoopConf) {
+ try {
+ return FileSystem.get(hadoopConf);
+ } catch (IOException e) {
+ log.error("Unable to initialize HDFS", e);
+ throw new AzkabanException(e);
+ }
}
@Provides
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index be061d3..aea1499 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -90,7 +90,10 @@ public class Constants {
public static final String AZKABAN_STORAGE_TYPE = "azkaban.storage.type";
public static final String AZKABAN_STORAGE_LOCAL_BASEDIR = "azkaban.storage.local.basedir";
+ public static final String HADOOP_CONF_DIR_PATH = "hadoop.conf.dir.path";
public static final String AZKABAN_STORAGE_HDFS_ROOT_URI = "azkaban.storage.hdfs.root.uri";
+ public static final String AZKABAN_KERBEROS_PRINCIPAL = "azkaban.kerberos.principal";
+ public static final String AZKABAN_KEYTAB_PATH = "azkaban.keytab.path";
}
public static class FlowProperties {
diff --git a/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java b/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java
new file mode 100644
index 0000000..00ea850
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/storage/HdfsAuth.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.storage;
+
+import azkaban.spi.AzkabanException;
+import azkaban.utils.Props;
+import com.google.inject.Inject;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+import static azkaban.Constants.ConfigurationKeys.*;
+import static java.util.Objects.*;
+
+
+/**
+ * This class helps in HDFS authorization and is a wrapper over Hadoop's {@link UserGroupInformation} class.
+ */
+public class HdfsAuth {
+ private static final Logger log = Logger.getLogger(HdfsAuth.class);
+
+ private final boolean isSecurityEnabled;
+
+ private UserGroupInformation loggedInUser = null;
+ private String keytabPath = null;
+ private String keytabPrincipal = null;
+
+ @Inject
+ public HdfsAuth(Props props, Configuration conf) {
+ UserGroupInformation.setConfiguration(conf);
+ isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+ if (isSecurityEnabled) {
+ log.info("The Hadoop cluster has enabled security");
+ keytabPath = requireNonNull(props.getString(AZKABAN_KEYTAB_PATH));
+ keytabPrincipal = requireNonNull(props.getString(AZKABAN_KERBEROS_PRINCIPAL));
+ }
+ }
+
+ /**
+ * API to authorize HDFS access.
+ * This logins in the configured user via the keytab.
+ * If the user is already logged in then it renews the TGT.
+ */
+ public void authorize() {
+ if (isSecurityEnabled) {
+ try {
+ login(keytabPrincipal, keytabPath);
+ } catch (IOException e) {
+ log.error(e);
+ throw new AzkabanException(String.format(
+ "Error: Unable to authorize to Hadoop. Principal: %s Keytab: %s", keytabPrincipal, keytabPath));
+ }
+ }
+ }
+
+ private void login(String keytabPrincipal, String keytabPath) throws IOException {
+ if (loggedInUser == null) {
+ log.info(String.format("Logging in using Principal: %s Keytab: %s", keytabPrincipal, keytabPath));
+
+ UserGroupInformation.loginUserFromKeytab(keytabPrincipal, keytabPath);
+ loggedInUser = UserGroupInformation.getLoginUser();
+ log.info(String.format("User %s logged in.", loggedInUser));
+ } else {
+ log.info(String.format("User %s already logged in. Refreshing TGT", loggedInUser));
+ loggedInUser.checkTGTAndReloginFromKeytab();
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java b/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
index 2157aaa..5bfba8d 100644
--- a/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/HdfsStorage.java
@@ -17,28 +17,85 @@
package azkaban.storage;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import azkaban.AzkabanCommonModuleConfig;
import azkaban.spi.Storage;
+import azkaban.spi.StorageException;
import azkaban.spi.StorageMetadata;
+import com.google.common.io.Files;
import com.google.inject.Inject;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
public class HdfsStorage implements Storage {
+ private static final Logger log = Logger.getLogger(HdfsStorage.class);
+ private static final String HDFS_SCHEME = "hdfs";
+
+ private final HdfsAuth hdfsAuth;
+ private final URI rootUri;
+ private final FileSystem hdfs;
+
@Inject
- public HdfsStorage() {
+ public HdfsStorage(HdfsAuth hdfsAuth, FileSystem hdfs, AzkabanCommonModuleConfig config) {
+ this.hdfsAuth = requireNonNull(hdfsAuth);
+ this.hdfs = requireNonNull(hdfs);
+ this.rootUri = config.getHdfsRootUri();
+ requireNonNull(rootUri.getAuthority(), "URI must have host:port mentioned.");
+ checkArgument(HDFS_SCHEME.equals(rootUri.getScheme()));
}
@Override
- public InputStream get(String key) {
- throw new UnsupportedOperationException("Method not implemented");
+ public InputStream get(String key) throws IOException {
+ hdfsAuth.authorize();
+ return hdfs.open(new Path(rootUri.toString(), key));
}
@Override
public String put(StorageMetadata metadata, File localFile) {
- throw new UnsupportedOperationException("Method not implemented");
+ hdfsAuth.authorize();
+ final Path projectsPath = new Path(rootUri.getPath(), String.valueOf(metadata.getProjectId()));
+ try {
+ if (hdfs.mkdirs(projectsPath)) {
+ log.info("Created project dir: " + projectsPath);
+ }
+ final Path targetPath = createTargetPath(metadata, localFile, projectsPath);
+ if (hdfs.exists(targetPath)) {
+ log.info(
+ String.format("Duplicate Found: meta: %s path: %s", metadata, targetPath));
+ return getRelativePath(targetPath);
+ }
+
+ // Copy file to HDFS
+ log.info(String.format("Creating project artifact: meta: %s path: %s", metadata, targetPath));
+ hdfs.copyFromLocalFile(new Path(localFile.getAbsolutePath()), targetPath);
+ return getRelativePath(targetPath);
+ } catch (IOException e) {
+ log.error("error in put(): Metadata: " + metadata);
+ throw new StorageException(e);
+ }
+ }
+
+ private String getRelativePath(Path targetPath) {
+ return URI.create(rootUri.getPath()).relativize(targetPath.toUri()).getPath();
+ }
+
+ private Path createTargetPath(StorageMetadata metadata, File localFile, Path projectsPath) {
+ return new Path(projectsPath, String.format("%s-%s.%s",
+ String.valueOf(metadata.getProjectId()),
+ new String(Hex.encodeHex(metadata.getHash())),
+ Files.getFileExtension(localFile.getName())
+ ));
}
@Override
diff --git a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
index 35a2dcb..a56253c 100644
--- a/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
+++ b/azkaban-common/src/main/java/azkaban/storage/LocalStorage.java
@@ -17,6 +17,8 @@
package azkaban.storage;
+import static com.google.common.base.Preconditions.checkArgument;
+
import azkaban.AzkabanCommonModuleConfig;
import azkaban.spi.Storage;
import azkaban.spi.StorageException;
@@ -31,17 +33,17 @@ import java.io.InputStream;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
-import static com.google.common.base.Preconditions.*;
-
public class LocalStorage implements Storage {
+
private static final Logger log = Logger.getLogger(LocalStorage.class);
final File rootDirectory;
@Inject
public LocalStorage(AzkabanCommonModuleConfig config) {
- this.rootDirectory = validateRootDirectory(createIfDoesNotExist(config.getLocalStorageBaseDirPath()));
+ this.rootDirectory = validateRootDirectory(
+ createIfDoesNotExist(config.getLocalStorageBaseDirPath()));
}
/**
@@ -65,20 +67,22 @@ public class LocalStorage implements Storage {
Files.getFileExtension(localFile.getName())));
if (targetFile.exists()) {
- throw new StorageException(String.format(
- "Error in LocalStorage. Target file already exists. targetFile: %s, Metadata: %s",
- targetFile, metadata));
+ log.info(String.format("Duplicate found: meta: %s, targetFile: %s, ", metadata,
+ targetFile.getAbsolutePath()));
+ return getRelativePath(targetFile);
}
+
+ // Copy file to storage dir
try {
FileUtils.copyFile(localFile, targetFile);
} catch (IOException e) {
- log.error("LocalStorage error in put(): Metadata: " + metadata);
+ log.error("LocalStorage error in put(): meta: " + metadata);
throw new StorageException(e);
}
- return createRelativePath(targetFile);
+ return getRelativePath(targetFile);
}
- private String createRelativePath(File targetFile) {
+ private String getRelativePath(File targetFile) {
return rootDirectory.toURI().relativize(targetFile.toURI()).getPath();
}
@@ -89,7 +93,7 @@ public class LocalStorage implements Storage {
private static File createIfDoesNotExist(String baseDirectoryPath) {
final File baseDirectory = new File(baseDirectoryPath);
- if(!baseDirectory.exists()) {
+ if (!baseDirectory.exists()) {
baseDirectory.mkdir();
log.info("Creating dir: " + baseDirectory.getAbsolutePath());
}
diff --git a/azkaban-common/src/test/java/azkaban/storage/HdfsStorageTest.java b/azkaban-common/src/test/java/azkaban/storage/HdfsStorageTest.java
new file mode 100644
index 0000000..a5dc030
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/storage/HdfsStorageTest.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.storage;
+
+import azkaban.AzkabanCommonModuleConfig;
+import azkaban.spi.StorageMetadata;
+import azkaban.utils.Md5Hasher;
+import java.io.File;
+import java.net.URI;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class HdfsStorageTest {
+ private HdfsAuth hdfsAuth;
+ private HdfsStorage hdfsStorage;
+ private FileSystem hdfs;
+
+ @Before
+ public void setUp() throws Exception {
+ hdfs = mock(FileSystem.class);
+ hdfsAuth = mock(HdfsAuth.class);
+ AzkabanCommonModuleConfig config = mock(AzkabanCommonModuleConfig.class);
+ when(config.getHdfsRootUri()).thenReturn(URI.create("hdfs://localhost:9000/path/to/foo"));
+
+ hdfsStorage = new HdfsStorage(hdfsAuth, hdfs, config);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ hdfsStorage.get("1/1-hash.zip");
+ verify(hdfs).open(new Path("hdfs://localhost:9000/path/to/foo/1/1-hash.zip"));
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ File file = new File(getClass().getClassLoader().getResource("sample_flow_01.zip").getFile());
+ final String hash = new String(Hex.encodeHex(Md5Hasher.md5Hash(file)));
+
+ when(hdfs.exists(any(Path.class))).thenReturn(false);
+
+ StorageMetadata metadata = new StorageMetadata(1, 2, "uploader", Md5Hasher.md5Hash(file));
+ String key = hdfsStorage.put(metadata, file);
+
+ final String expectedName = String.format("1/1-%s.zip", hash);
+ Assert.assertEquals(expectedName, key);
+
+ final String expectedPath = "/path/to/foo/" + expectedName;
+ verify(hdfs).copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(expectedPath));
+ }
+}