azkaban-aplcache

Switch Metastore calls to use RetryingMetastoreClient (#1386) *

8/25/2017 5:24:34 PM

Details

diff --git a/azkaban-hadoop-security-plugin/build.gradle b/azkaban-hadoop-security-plugin/build.gradle
index 477ed8f..d996ee5 100644
--- a/azkaban-hadoop-security-plugin/build.gradle
+++ b/azkaban-hadoop-security-plugin/build.gradle
@@ -7,6 +7,10 @@ dependencies {
     compileOnly deps.hadoopMRClientCommon
     compileOnly deps.hadoopMRClientCore
     compileOnly deps.hiveMetastore
+    compileOnly (deps.hiveExecCore) {
+        exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'
+        exclude group: 'eigenbase', module: 'eigenbase-properties'
+    }
 }
 
 /**
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index b5d9cda..3264e31 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -17,6 +17,7 @@
 package azkaban.security;
 
 import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_NATIVE_LIB_FOLDER;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
 
 import azkaban.security.commons.HadoopSecurityManager;
 import azkaban.security.commons.HadoopSecurityManagerException;
@@ -42,8 +43,16 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -403,7 +412,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
       final String userToProxy) throws HadoopSecurityManagerException {
     try {
       final HiveConf hiveConf = new HiveConf();
-      final HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
+      final IMetaStoreClient hiveClient = createRetryingMetaStoreClient(hiveConf);
       hiveClient.cancelDelegationToken(t.encodeToUrlString());
     } catch (final Exception e) {
       throw new HadoopSecurityManagerException("Failed to cancel Token. "
@@ -466,7 +475,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     logger.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": "
         + hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
 
-    final HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
+    final IMetaStoreClient hiveClient = createRetryingMetaStoreClient(hiveConf);
     final String hcatTokenStr =
         hiveClient.getDelegationToken(userToProxy, UserGroupInformation
             .getLoginUser().getShortUserName());
@@ -780,4 +789,29 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
         hsProxy.getConnectAddress());
   }
 
+  /**
+   * Method to create a metastore client that retries on failures
+   */
+  private IMetaStoreClient createRetryingMetaStoreClient(HiveConf hiveConf) throws MetaException {
+    // Custom hook-loader to return a HiveMetaHook if the table is configured with a custom storage handler
+    HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
+      @Override
+      public HiveMetaHook getHook(Table tbl) throws MetaException {
+        if (tbl == null) {
+          return null;
+        }
+
+        try {
+          HiveStorageHandler storageHandler =
+              HiveUtils.getStorageHandler(hiveConf, tbl.getParameters().get(META_TABLE_STORAGE));
+          return storageHandler == null ? null : storageHandler.getMetaHook();
+        } catch (HiveException e) {
+          logger.error(e.toString());
+          throw new MetaException("Failed to get storage handler: " + e);
+        }
+      }
+    };
+
+    return RetryingMetaStoreClient.getProxy(hiveConf, hookLoader, HiveMetaStoreClient.class.getName());
+  }
 }

build.gradle 1(+1 -0)

diff --git a/build.gradle b/build.gradle
index 6dfcb22..5b48f85 100644
--- a/build.gradle
+++ b/build.gradle
@@ -67,6 +67,7 @@ ext.deps = [
         hadoopHdfs          : "org.apache.hadoop:hadoop-hdfs:" + versions.hadoop,
         hadoopMRClientCommon: "org.apache.hadoop:hadoop-mapreduce-client-common:" + versions.hadoop,
         hadoopMRClientCore  : "org.apache.hadoop:hadoop-mapreduce-client-core:" + versions.hadoop,
+        hiveExecCore        : "org.apache.hive:hive-exec:" + versions.hive + ":core",
         hiveMetastore       : "org.apache.hive:hive-metastore:" + versions.hive,
         httpclient          : 'org.apache.httpcomponents:httpclient:4.5.3',
         io                  : 'commons-io:commons-io:2.4',