azkaban-aplcache

Revert "Use RetryingHiveMetaStoreClient instead of HiveMetaStoreClient

7/10/2017 7:30:10 PM
3.31.0

Changes

azkaban-hadoop-security-plugin/src/main/java/azkaban/hive/HiveMetaStoreClientFactory.java 90(+0 -90)

azkaban-hadoop-security-plugin/src/test/java/azkaban/hive/HiveMetaStoreClientFactoryTest.java 69(+0 -69)

build.gradle 1(+0 -1)

Details

diff --git a/azkaban-hadoop-security-plugin/build.gradle b/azkaban-hadoop-security-plugin/build.gradle
index d2d3640..477ed8f 100644
--- a/azkaban-hadoop-security-plugin/build.gradle
+++ b/azkaban-hadoop-security-plugin/build.gradle
@@ -1,9 +1,5 @@
 apply plugin: 'distribution'
 
-configurations {
-    testCompile.extendsFrom compileOnly
-}
-
 dependencies {
     compile project(":azkaban-common")
 
@@ -11,10 +7,6 @@ dependencies {
     compileOnly deps.hadoopMRClientCommon
     compileOnly deps.hadoopMRClientCore
     compileOnly deps.hiveMetastore
-    compileOnly(deps.hiveExecCore) {
-        exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'
-        exclude group: 'eigenbase', module: 'eigenbase-properties'
-    }
 }
 
 /**
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index 0b5d35c..7bfc055 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -16,7 +16,6 @@
 
 package azkaban.security;
 
-import azkaban.hive.HiveMetaStoreClientFactory;
 import azkaban.security.commons.HadoopSecurityManager;
 import azkaban.security.commons.HadoopSecurityManagerException;
 import azkaban.utils.Props;
@@ -40,7 +39,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -74,6 +74,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
    * needs to be fixed as part of a plugin infrastructure implementation.
    */
   public static final String NATIVE_LIB_FOLDER = "azkaban.native.lib";
+
   /**
    * TODO: This should be exposed as a configurable parameter
    *
@@ -92,7 +93,8 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   public static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
   // "mapreduce.jobtracker.kerberos.principal";
   public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
-  public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
+  public static final String HADOOP_JOB_TRACKER_2 =
+      "mapreduce.jobtracker.address";
   public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
   /**
    * the key that will be used to set proper signature for each of the hcat
@@ -105,9 +107,8 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   public static final String CHMOD = "chmod";
   // The file permissions assigned to a Delegation token file on fetch
   public static final String TOKEN_FILE_PERMISSIONS = "460";
-
-  private static final Logger log = Logger.getLogger(HadoopSecurityManager_H_2_0.class);
-  private static final String FS_HDFS_IMPL_DISABLE_CACHE = "fs.hdfs.impl.disable.cache";
+  private static final String FS_HDFS_IMPL_DISABLE_CACHE =
+      "fs.hdfs.impl.disable.cache";
   private static final String OTHER_NAMENODES_TO_GET_TOKEN = "other_namenodes";
   /**
    * the settings to be defined by user indicating if there are hcat locations
@@ -118,17 +119,16 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   private static final String EXTRA_HCAT_LOCATION = "other_hcat_location";
   private static final String AZKABAN_KEYTAB_LOCATION = "proxy.keytab.location";
   private static final String AZKABAN_PRINCIPAL = "proxy.user";
-  private static final String OBTAIN_JOBHISTORYSERVER_TOKEN = "obtain.jobhistoryserver.token";
-
+  private static final String OBTAIN_JOBHISTORYSERVER_TOKEN =
+      "obtain.jobhistoryserver.token";
+  private final static Logger logger = Logger
+      .getLogger(HadoopSecurityManager_H_2_0.class);
   private static volatile HadoopSecurityManager hsmInstance = null;
   private static URLClassLoader ucl;
-
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private final ExecuteAsUser executeAsUser;
   private final Configuration conf;
   private final ConcurrentMap<String, UserGroupInformation> userUgiMap;
-  private final HiveMetaStoreClientFactory hiveMetaStoreClientFactory;
-
   private UserGroupInformation loginUser = null;
   private String keytabLocation;
   private String keytabPrincipal;
@@ -155,14 +155,14 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     URL urlToHadoop = null;
     if (hadoopConfDir != null) {
       urlToHadoop = new File(hadoopConfDir).toURI().toURL();
-      log.info("Using hadoop config found in " + urlToHadoop);
+      logger.info("Using hadoop config found in " + urlToHadoop);
       resources.add(urlToHadoop);
     } else if (hadoopHome != null) {
       urlToHadoop = new File(hadoopHome, "conf").toURI().toURL();
-      log.info("Using hadoop config found in " + urlToHadoop);
+      logger.info("Using hadoop config found in " + urlToHadoop);
       resources.add(urlToHadoop);
     } else {
-      log.info("HADOOP_HOME not set, using default hadoop config.");
+      logger.info("HADOOP_HOME not set, using default hadoop config.");
     }
 
     ucl = new URLClassLoader(resources.toArray(new URL[resources.size()]));
@@ -171,24 +171,24 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     this.conf.setClassLoader(ucl);
 
     if (props.containsKey(FS_HDFS_IMPL_DISABLE_CACHE)) {
-      log.info("Setting " + FS_HDFS_IMPL_DISABLE_CACHE + " to "
+      logger.info("Setting " + FS_HDFS_IMPL_DISABLE_CACHE + " to "
           + props.get(FS_HDFS_IMPL_DISABLE_CACHE));
       this.conf.setBoolean(FS_HDFS_IMPL_DISABLE_CACHE,
           Boolean.valueOf(props.get(FS_HDFS_IMPL_DISABLE_CACHE)));
     }
 
-    log.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + ": "
+    logger.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + ": "
         + this.conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
-    log.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION + ":  "
+    logger.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION + ":  "
         + this.conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION));
-    log.info(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY + ": "
+    logger.info(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY + ": "
         + this.conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
 
     UserGroupInformation.setConfiguration(this.conf);
 
     this.securityEnabled = UserGroupInformation.isSecurityEnabled();
     if (this.securityEnabled) {
-      log.info("The Hadoop cluster has enabled security");
+      logger.info("The Hadoop cluster has enabled security");
       this.shouldProxy = true;
       try {
 
@@ -201,15 +201,15 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
       // try login
       try {
         if (this.loginUser == null) {
-          log.info("No login user. Creating login user");
-          log.info("Using principal from " + this.keytabPrincipal + " and "
+          logger.info("No login user. Creating login user");
+          logger.info("Using principal from " + this.keytabPrincipal + " and "
               + this.keytabLocation);
           UserGroupInformation.loginUserFromKeytab(this.keytabPrincipal,
               this.keytabLocation);
           this.loginUser = UserGroupInformation.getLoginUser();
-          log.info("Logged in with user " + this.loginUser);
+          logger.info("Logged in with user " + this.loginUser);
         } else {
-          log.info("loginUser (" + this.loginUser
+          logger.info("loginUser (" + this.loginUser
               + ") already created, refreshing tgt.");
           this.loginUser.checkTGTAndReloginFromKeytab();
         }
@@ -221,9 +221,8 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     }
 
     this.userUgiMap = new ConcurrentHashMap<>();
-    this.hiveMetaStoreClientFactory = new HiveMetaStoreClientFactory(new HiveConf());
 
-    log.info("Hadoop Security Manager initialized");
+    logger.info("Hadoop Security Manager initialized");
   }
 
   public static HadoopSecurityManager getInstance(final Props props)
@@ -231,13 +230,13 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     if (hsmInstance == null) {
       synchronized (HadoopSecurityManager_H_2_0.class) {
         if (hsmInstance == null) {
-          log.info("getting new instance");
+          logger.info("getting new instance");
           hsmInstance = new HadoopSecurityManager_H_2_0(props);
         }
       }
     }
 
-    log.debug("Relogging in from keytab if necessary.");
+    logger.debug("Relogging in from keytab if necessary.");
     hsmInstance.reloginFromKeytab();
 
     return hsmInstance;
@@ -257,7 +256,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
 
     UserGroupInformation ugi = this.userUgiMap.get(userToProxy);
     if (ugi == null) {
-      log.info("proxy user " + userToProxy
+      logger.info("proxy user " + userToProxy
           + " not exist. Creating new proxy user");
       if (this.shouldProxy) {
         try {
@@ -306,7 +305,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
       throws HadoopSecurityManagerException {
     final FileSystem fs;
     try {
-      log.info("Getting file system as " + user);
+      logger.info("Getting file system as " + user);
       final UserGroupInformation ugi = getProxiedUser(user);
 
       if (ugi != null) {
@@ -403,10 +402,11 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     }
   }
 
-  private void cancelHiveToken(final Token<? extends TokenIdentifier> t)
-      throws HadoopSecurityManagerException {
+  private void cancelHiveToken(final Token<? extends TokenIdentifier> t,
+      final String userToProxy) throws HadoopSecurityManagerException {
     try {
-      final IMetaStoreClient hiveClient = this.hiveMetaStoreClientFactory.create();
+      final HiveConf hiveConf = new HiveConf();
+      final HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
       hiveClient.cancelDelegationToken(t.encodeToUrlString());
     } catch (final Exception e) {
       throw new HadoopSecurityManagerException("Failed to cancel Token. "
@@ -418,7 +418,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   public void cancelTokens(final File tokenFile, final String userToProxy, final Logger logger)
       throws HadoopSecurityManagerException {
     // nntoken
-    final Credentials cred;
+    Credentials cred = null;
     try {
       cred =
           Credentials.readTokenStorageFile(new Path(tokenFile.toURI()),
@@ -430,7 +430,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
 
         if (t.getKind().equals(new Text("HIVE_DELEGATION_TOKEN"))) {
           logger.info("Cancelling hive token.");
-          cancelHiveToken(t);
+          cancelHiveToken(t, userToProxy);
         } else if (t.getKind().equals(new Text("RM_DELEGATION_TOKEN"))) {
           logger.info("Ignore cancelling mr job tracker token request.");
         } else if (t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
@@ -458,7 +458,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
    */
   private Token<DelegationTokenIdentifier> fetchHcatToken(final String userToProxy,
       final HiveConf hiveConf, final String tokenSignatureOverwrite, final Logger logger)
-      throws IOException, TException {
+      throws IOException, MetaException, TException {
 
     logger.info(HiveConf.ConfVars.METASTOREURIS.varname + ": "
         + hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
@@ -469,7 +469,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     logger.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": "
         + hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
 
-    final IMetaStoreClient hiveClient = this.hiveMetaStoreClientFactory.create();
+    final HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
     final String hcatTokenStr =
         hiveClient.getDelegationToken(userToProxy, UserGroupInformation
             .getLoginUser().getShortUserName());
@@ -479,9 +479,13 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
 
     // overwrite the value of the service property of the token if the signature
     // override is specified.
-    if (tokenSignatureOverwrite != null && tokenSignatureOverwrite.trim().length() > 0) {
-      hcatToken.setService(new Text(tokenSignatureOverwrite.trim().toLowerCase()));
-      logger.info(HIVE_TOKEN_SIGNATURE_KEY + ":" + tokenSignatureOverwrite);
+    if (tokenSignatureOverwrite != null
+        && tokenSignatureOverwrite.trim().length() > 0) {
+      hcatToken.setService(new Text(tokenSignatureOverwrite.trim()
+          .toLowerCase()));
+
+      logger.info(HIVE_TOKEN_SIGNATURE_KEY + ":"
+          + (tokenSignatureOverwrite == null ? "" : tokenSignatureOverwrite));
     }
 
     logger.info("Created hive metastore token.");

build.gradle 1(+0 -1)

diff --git a/build.gradle b/build.gradle
index eec472b..fef530f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -51,7 +51,6 @@ ext.deps = [
         hadoopHdfs          : "org.apache.hadoop:hadoop-hdfs:" + versions.hadoop,
         hadoopMRClientCommon: "org.apache.hadoop:hadoop-mapreduce-client-common:" + versions.hadoop,
         hadoopMRClientCore  : "org.apache.hadoop:hadoop-mapreduce-client-core:" + versions.hadoop,
-        hiveExecCore        : "org.apache.hive:hive-exec:" + versions.hive + ":core",
         hiveMetastore       : "org.apache.hive:hive-metastore:" + versions.hive,
         httpclient          : 'org.apache.httpcomponents:httpclient:4.5.2',
         httpcore            : 'org.apache.httpcomponents:httpcore:4.4.5',