azkaban-aplcache
Changes
azkaban-hadoop-security-plugin/src/main/java/azkaban/hive/HiveMetaStoreClientFactory.java 90(+90 -0)
azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java 82(+39 -43)
azkaban-hadoop-security-plugin/src/test/java/azkaban/hive/HiveMetaStoreClientFactoryTest.java 69(+69 -0)
build.gradle 1(+1 -0)
Details
diff --git a/azkaban-hadoop-security-plugin/build.gradle b/azkaban-hadoop-security-plugin/build.gradle
index 477ed8f..d2d3640 100644
--- a/azkaban-hadoop-security-plugin/build.gradle
+++ b/azkaban-hadoop-security-plugin/build.gradle
@@ -1,5 +1,9 @@
apply plugin: 'distribution'
+configurations {
+ testCompile.extendsFrom compileOnly
+}
+
dependencies {
compile project(":azkaban-common")
@@ -7,6 +11,10 @@ dependencies {
compileOnly deps.hadoopMRClientCommon
compileOnly deps.hadoopMRClientCore
compileOnly deps.hiveMetastore
+ compileOnly(deps.hiveExecCore) {
+ exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'
+ exclude group: 'eigenbase', module: 'eigenbase-properties'
+ }
}
/**
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/hive/HiveMetaStoreClientFactory.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/hive/HiveMetaStoreClientFactory.java
new file mode 100644
index 0000000..9872a5a
--- /dev/null
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/hive/HiveMetaStoreClientFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.hive;
+
+
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of {@link BasePooledObjectFactory} for {@link IMetaStoreClient}.
+ */
+public class HiveMetaStoreClientFactory extends BasePooledObjectFactory<IMetaStoreClient> {
+
+ private static final Logger log = LoggerFactory.getLogger(HiveMetaStoreClientFactory.class);
+ private final HiveConf hiveConf;
+
+ public HiveMetaStoreClientFactory(final HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ }
+
+ private IMetaStoreClient createMetaStoreClient() throws MetaException {
+ final HiveMetaHookLoader hookLoader = tbl -> {
+ if (tbl == null) {
+ return null;
+ }
+
+ try {
+ final HiveStorageHandler storageHandler =
+ HiveUtils.getStorageHandler(HiveMetaStoreClientFactory.this.hiveConf,
+ tbl.getParameters().get(META_TABLE_STORAGE));
+ return storageHandler == null ? null : storageHandler.getMetaHook();
+ } catch (final HiveException e) {
+ log.error(e.getMessage(), e);
+ throw new MetaException("Failed to get storage handler: " + e);
+ }
+ };
+
+ return RetryingMetaStoreClient
+ .getProxy(this.hiveConf, hookLoader, HiveMetaStoreClient.class.getName());
+ }
+
+ @Override
+ public IMetaStoreClient create() {
+ try {
+ return createMetaStoreClient();
+ } catch (final MetaException e) {
+ throw new RuntimeException("Unable to create " + IMetaStoreClient.class.getSimpleName(), e);
+ }
+ }
+
+ @Override
+ public PooledObject<IMetaStoreClient> wrap(final IMetaStoreClient client) {
+ return new DefaultPooledObject<>(client);
+ }
+
+ @Override
+ public void destroyObject(final PooledObject<IMetaStoreClient> client) {
+ client.getObject().close();
+ }
+}
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index 7bfc055..0b5d35c 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -16,6 +16,7 @@
package azkaban.security;
+import azkaban.hive.HiveMetaStoreClientFactory;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.Props;
@@ -39,8 +40,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@@ -74,7 +74,6 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
* needs to be fixed as part of a plugin infrastructure implementation.
*/
public static final String NATIVE_LIB_FOLDER = "azkaban.native.lib";
-
/**
* TODO: This should be exposed as a configurable parameter
*
@@ -93,8 +92,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
public static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
// "mapreduce.jobtracker.kerberos.principal";
public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
- public static final String HADOOP_JOB_TRACKER_2 =
- "mapreduce.jobtracker.address";
+ public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
/**
* the key that will be used to set proper signature for each of the hcat
@@ -107,8 +105,9 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
public static final String CHMOD = "chmod";
// The file permissions assigned to a Delegation token file on fetch
public static final String TOKEN_FILE_PERMISSIONS = "460";
- private static final String FS_HDFS_IMPL_DISABLE_CACHE =
- "fs.hdfs.impl.disable.cache";
+
+ private static final Logger log = Logger.getLogger(HadoopSecurityManager_H_2_0.class);
+ private static final String FS_HDFS_IMPL_DISABLE_CACHE = "fs.hdfs.impl.disable.cache";
private static final String OTHER_NAMENODES_TO_GET_TOKEN = "other_namenodes";
/**
* the settings to be defined by user indicating if there are hcat locations
@@ -119,16 +118,17 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
private static final String EXTRA_HCAT_LOCATION = "other_hcat_location";
private static final String AZKABAN_KEYTAB_LOCATION = "proxy.keytab.location";
private static final String AZKABAN_PRINCIPAL = "proxy.user";
- private static final String OBTAIN_JOBHISTORYSERVER_TOKEN =
- "obtain.jobhistoryserver.token";
- private final static Logger logger = Logger
- .getLogger(HadoopSecurityManager_H_2_0.class);
+ private static final String OBTAIN_JOBHISTORYSERVER_TOKEN = "obtain.jobhistoryserver.token";
+
private static volatile HadoopSecurityManager hsmInstance = null;
private static URLClassLoader ucl;
+
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private final ExecuteAsUser executeAsUser;
private final Configuration conf;
private final ConcurrentMap<String, UserGroupInformation> userUgiMap;
+ private final HiveMetaStoreClientFactory hiveMetaStoreClientFactory;
+
private UserGroupInformation loginUser = null;
private String keytabLocation;
private String keytabPrincipal;
@@ -155,14 +155,14 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
URL urlToHadoop = null;
if (hadoopConfDir != null) {
urlToHadoop = new File(hadoopConfDir).toURI().toURL();
- logger.info("Using hadoop config found in " + urlToHadoop);
+ log.info("Using hadoop config found in " + urlToHadoop);
resources.add(urlToHadoop);
} else if (hadoopHome != null) {
urlToHadoop = new File(hadoopHome, "conf").toURI().toURL();
- logger.info("Using hadoop config found in " + urlToHadoop);
+ log.info("Using hadoop config found in " + urlToHadoop);
resources.add(urlToHadoop);
} else {
- logger.info("HADOOP_HOME not set, using default hadoop config.");
+ log.info("HADOOP_HOME not set, using default hadoop config.");
}
ucl = new URLClassLoader(resources.toArray(new URL[resources.size()]));
@@ -171,24 +171,24 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
this.conf.setClassLoader(ucl);
if (props.containsKey(FS_HDFS_IMPL_DISABLE_CACHE)) {
- logger.info("Setting " + FS_HDFS_IMPL_DISABLE_CACHE + " to "
+ log.info("Setting " + FS_HDFS_IMPL_DISABLE_CACHE + " to "
+ props.get(FS_HDFS_IMPL_DISABLE_CACHE));
this.conf.setBoolean(FS_HDFS_IMPL_DISABLE_CACHE,
Boolean.valueOf(props.get(FS_HDFS_IMPL_DISABLE_CACHE)));
}
- logger.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + ": "
+ log.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + ": "
+ this.conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
- logger.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION + ": "
+ log.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION + ": "
+ this.conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION));
- logger.info(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY + ": "
+ log.info(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY + ": "
+ this.conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
UserGroupInformation.setConfiguration(this.conf);
this.securityEnabled = UserGroupInformation.isSecurityEnabled();
if (this.securityEnabled) {
- logger.info("The Hadoop cluster has enabled security");
+ log.info("The Hadoop cluster has enabled security");
this.shouldProxy = true;
try {
@@ -201,15 +201,15 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
// try login
try {
if (this.loginUser == null) {
- logger.info("No login user. Creating login user");
- logger.info("Using principal from " + this.keytabPrincipal + " and "
+ log.info("No login user. Creating login user");
+ log.info("Using principal from " + this.keytabPrincipal + " and "
+ this.keytabLocation);
UserGroupInformation.loginUserFromKeytab(this.keytabPrincipal,
this.keytabLocation);
this.loginUser = UserGroupInformation.getLoginUser();
- logger.info("Logged in with user " + this.loginUser);
+ log.info("Logged in with user " + this.loginUser);
} else {
- logger.info("loginUser (" + this.loginUser
+ log.info("loginUser (" + this.loginUser
+ ") already created, refreshing tgt.");
this.loginUser.checkTGTAndReloginFromKeytab();
}
@@ -221,8 +221,9 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
}
this.userUgiMap = new ConcurrentHashMap<>();
+ this.hiveMetaStoreClientFactory = new HiveMetaStoreClientFactory(new HiveConf());
- logger.info("Hadoop Security Manager initialized");
+ log.info("Hadoop Security Manager initialized");
}
public static HadoopSecurityManager getInstance(final Props props)
@@ -230,13 +231,13 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
if (hsmInstance == null) {
synchronized (HadoopSecurityManager_H_2_0.class) {
if (hsmInstance == null) {
- logger.info("getting new instance");
+ log.info("getting new instance");
hsmInstance = new HadoopSecurityManager_H_2_0(props);
}
}
}
- logger.debug("Relogging in from keytab if necessary.");
+ log.debug("Relogging in from keytab if necessary.");
hsmInstance.reloginFromKeytab();
return hsmInstance;
@@ -256,7 +257,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
UserGroupInformation ugi = this.userUgiMap.get(userToProxy);
if (ugi == null) {
- logger.info("proxy user " + userToProxy
+ log.info("proxy user " + userToProxy
+ " not exist. Creating new proxy user");
if (this.shouldProxy) {
try {
@@ -305,7 +306,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
throws HadoopSecurityManagerException {
final FileSystem fs;
try {
- logger.info("Getting file system as " + user);
+ log.info("Getting file system as " + user);
final UserGroupInformation ugi = getProxiedUser(user);
if (ugi != null) {
@@ -402,11 +403,10 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
}
}
- private void cancelHiveToken(final Token<? extends TokenIdentifier> t,
- final String userToProxy) throws HadoopSecurityManagerException {
+ private void cancelHiveToken(final Token<? extends TokenIdentifier> t)
+ throws HadoopSecurityManagerException {
try {
- final HiveConf hiveConf = new HiveConf();
- final HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
+ final IMetaStoreClient hiveClient = this.hiveMetaStoreClientFactory.create();
hiveClient.cancelDelegationToken(t.encodeToUrlString());
} catch (final Exception e) {
throw new HadoopSecurityManagerException("Failed to cancel Token. "
@@ -418,7 +418,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
public void cancelTokens(final File tokenFile, final String userToProxy, final Logger logger)
throws HadoopSecurityManagerException {
// nntoken
- Credentials cred = null;
+ final Credentials cred;
try {
cred =
Credentials.readTokenStorageFile(new Path(tokenFile.toURI()),
@@ -430,7 +430,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
if (t.getKind().equals(new Text("HIVE_DELEGATION_TOKEN"))) {
logger.info("Cancelling hive token.");
- cancelHiveToken(t, userToProxy);
+ cancelHiveToken(t);
} else if (t.getKind().equals(new Text("RM_DELEGATION_TOKEN"))) {
logger.info("Ignore cancelling mr job tracker token request.");
} else if (t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
@@ -458,7 +458,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
*/
private Token<DelegationTokenIdentifier> fetchHcatToken(final String userToProxy,
final HiveConf hiveConf, final String tokenSignatureOverwrite, final Logger logger)
- throws IOException, MetaException, TException {
+ throws IOException, TException {
logger.info(HiveConf.ConfVars.METASTOREURIS.varname + ": "
+ hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
@@ -469,7 +469,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
logger.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": "
+ hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
- final HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
+ final IMetaStoreClient hiveClient = this.hiveMetaStoreClientFactory.create();
final String hcatTokenStr =
hiveClient.getDelegationToken(userToProxy, UserGroupInformation
.getLoginUser().getShortUserName());
@@ -479,13 +479,9 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
// overwrite the value of the service property of the token if the signature
// override is specified.
- if (tokenSignatureOverwrite != null
- && tokenSignatureOverwrite.trim().length() > 0) {
- hcatToken.setService(new Text(tokenSignatureOverwrite.trim()
- .toLowerCase()));
-
- logger.info(HIVE_TOKEN_SIGNATURE_KEY + ":"
- + (tokenSignatureOverwrite == null ? "" : tokenSignatureOverwrite));
+ if (tokenSignatureOverwrite != null && tokenSignatureOverwrite.trim().length() > 0) {
+ hcatToken.setService(new Text(tokenSignatureOverwrite.trim().toLowerCase()));
+ logger.info(HIVE_TOKEN_SIGNATURE_KEY + ":" + tokenSignatureOverwrite);
}
logger.info("Created hive metastore token.");
diff --git a/azkaban-hadoop-security-plugin/src/test/java/azkaban/hive/HiveMetaStoreClientFactoryTest.java b/azkaban-hadoop-security-plugin/src/test/java/azkaban/hive/HiveMetaStoreClientFactoryTest.java
new file mode 100644
index 0000000..3d488dd
--- /dev/null
+++ b/azkaban-hadoop-security-plugin/src/test/java/azkaban/hive/HiveMetaStoreClientFactoryTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.hive;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+public class HiveMetaStoreClientFactoryTest {
+
+ public static final File METASTORE_DB_DIR = new File("metastore_db");
+ public static final File DERBY_LOG_FILE = new File("derby.log");
+
+ @Test
+ public void testCreate() throws TException, IOException {
+ cleanup();
+
+ final HiveConf hiveConf = new HiveConf();
+ final HiveMetaStoreClientFactory factory = new HiveMetaStoreClientFactory(hiveConf);
+ final IMetaStoreClient msc = factory.create();
+
+ final String dbName = "test_db";
+ final String description = "test database";
+ final String location = "file:/tmp/" + dbName;
+ Database db = new Database(dbName, description, location, null);
+
+ msc.dropDatabase(dbName, true, true);
+ msc.createDatabase(db);
+ db = msc.getDatabase(dbName);
+
+ assertThat(db.getName()).isEqualTo(dbName);
+ assertThat(db.getDescription()).isEqualTo(description);
+ assertThat(db.getLocationUri()).isEqualTo(location);
+
+ // Clean up if the test is successful
+ cleanup();
+ }
+
+ private void cleanup() throws IOException {
+ if (METASTORE_DB_DIR.exists()) {
+ FileUtils.deleteDirectory(METASTORE_DB_DIR);
+ }
+ if (DERBY_LOG_FILE.exists()) {
+ DERBY_LOG_FILE.delete();
+ }
+ }
+}
build.gradle 1(+1 -0)
diff --git a/build.gradle b/build.gradle
index efaf2c1..a257b77 100644
--- a/build.gradle
+++ b/build.gradle
@@ -54,6 +54,7 @@ ext.deps = [
hadoopHdfs : "org.apache.hadoop:hadoop-hdfs:" + versions.hadoop,
hadoopMRClientCommon: "org.apache.hadoop:hadoop-mapreduce-client-common:" + versions.hadoop,
hadoopMRClientCore : "org.apache.hadoop:hadoop-mapreduce-client-core:" + versions.hadoop,
+ hiveExecCore : "org.apache.hive:hive-exec:" + versions.hive + ":core",
hiveMetastore : "org.apache.hive:hive-metastore:" + versions.hive,
httpclient : 'org.apache.httpcomponents:httpclient:4.5.2',
httpcore : 'org.apache.httpcomponents:httpcore:4.4.5',