azkaban-aplcache

Use RetryingHiveMetaStoreClient instead of HiveMetaStoreClient

6/21/2017 7:52:23 PM

Details

diff --git a/azkaban-hadoop-security-plugin/build.gradle b/azkaban-hadoop-security-plugin/build.gradle
index 477ed8f..d2d3640 100644
--- a/azkaban-hadoop-security-plugin/build.gradle
+++ b/azkaban-hadoop-security-plugin/build.gradle
@@ -1,5 +1,9 @@
 apply plugin: 'distribution'
 
+configurations {
+    testCompile.extendsFrom compileOnly
+}
+
 dependencies {
     compile project(":azkaban-common")
 
@@ -7,6 +11,10 @@ dependencies {
     compileOnly deps.hadoopMRClientCommon
     compileOnly deps.hadoopMRClientCore
     compileOnly deps.hiveMetastore
+    compileOnly(deps.hiveExecCore) {
+        exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'
+        exclude group: 'eigenbase', module: 'eigenbase-properties'
+    }
 }
 
 /**
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/hive/HiveMetaStoreClientFactory.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/hive/HiveMetaStoreClientFactory.java
new file mode 100644
index 0000000..9872a5a
--- /dev/null
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/hive/HiveMetaStoreClientFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.hive;
+
+
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of {@link BasePooledObjectFactory} for {@link IMetaStoreClient}.
+ */
+public class HiveMetaStoreClientFactory extends BasePooledObjectFactory<IMetaStoreClient> {
+
+  private static final Logger log = LoggerFactory.getLogger(HiveMetaStoreClientFactory.class);
+  private final HiveConf hiveConf;
+
+  public HiveMetaStoreClientFactory(final HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+  }
+
+  private IMetaStoreClient createMetaStoreClient() throws MetaException {
+    final HiveMetaHookLoader hookLoader = tbl -> {
+      if (tbl == null) {
+        return null;
+      }
+
+      try {
+        final HiveStorageHandler storageHandler =
+            HiveUtils.getStorageHandler(HiveMetaStoreClientFactory.this.hiveConf,
+                tbl.getParameters().get(META_TABLE_STORAGE));
+        return storageHandler == null ? null : storageHandler.getMetaHook();
+      } catch (final HiveException e) {
+        log.error(e.getMessage(), e);
+        throw new MetaException("Failed to get storage handler: " + e);
+      }
+    };
+
+    return RetryingMetaStoreClient
+        .getProxy(this.hiveConf, hookLoader, HiveMetaStoreClient.class.getName());
+  }
+
+  @Override
+  public IMetaStoreClient create() {
+    try {
+      return createMetaStoreClient();
+    } catch (final MetaException e) {
+      throw new RuntimeException("Unable to create " + IMetaStoreClient.class.getSimpleName(), e);
+    }
+  }
+
+  @Override
+  public PooledObject<IMetaStoreClient> wrap(final IMetaStoreClient client) {
+    return new DefaultPooledObject<>(client);
+  }
+
+  @Override
+  public void destroyObject(final PooledObject<IMetaStoreClient> client) {
+    client.getObject().close();
+  }
+}
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index 7bfc055..0b5d35c 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -16,6 +16,7 @@
 
 package azkaban.security;
 
+import azkaban.hive.HiveMetaStoreClientFactory;
 import azkaban.security.commons.HadoopSecurityManager;
 import azkaban.security.commons.HadoopSecurityManagerException;
 import azkaban.utils.Props;
@@ -39,8 +40,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -74,7 +74,6 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
    * needs to be fixed as part of a plugin infrastructure implementation.
    */
   public static final String NATIVE_LIB_FOLDER = "azkaban.native.lib";
-
   /**
    * TODO: This should be exposed as a configurable parameter
    *
@@ -93,8 +92,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   public static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
   // "mapreduce.jobtracker.kerberos.principal";
   public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
-  public static final String HADOOP_JOB_TRACKER_2 =
-      "mapreduce.jobtracker.address";
+  public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
   public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
   /**
    * the key that will be used to set proper signature for each of the hcat
@@ -107,8 +105,9 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   public static final String CHMOD = "chmod";
   // The file permissions assigned to a Delegation token file on fetch
   public static final String TOKEN_FILE_PERMISSIONS = "460";
-  private static final String FS_HDFS_IMPL_DISABLE_CACHE =
-      "fs.hdfs.impl.disable.cache";
+
+  private static final Logger log = Logger.getLogger(HadoopSecurityManager_H_2_0.class);
+  private static final String FS_HDFS_IMPL_DISABLE_CACHE = "fs.hdfs.impl.disable.cache";
   private static final String OTHER_NAMENODES_TO_GET_TOKEN = "other_namenodes";
   /**
    * the settings to be defined by user indicating if there are hcat locations
@@ -119,16 +118,17 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   private static final String EXTRA_HCAT_LOCATION = "other_hcat_location";
   private static final String AZKABAN_KEYTAB_LOCATION = "proxy.keytab.location";
   private static final String AZKABAN_PRINCIPAL = "proxy.user";
-  private static final String OBTAIN_JOBHISTORYSERVER_TOKEN =
-      "obtain.jobhistoryserver.token";
-  private final static Logger logger = Logger
-      .getLogger(HadoopSecurityManager_H_2_0.class);
+  private static final String OBTAIN_JOBHISTORYSERVER_TOKEN = "obtain.jobhistoryserver.token";
+
   private static volatile HadoopSecurityManager hsmInstance = null;
   private static URLClassLoader ucl;
+
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private final ExecuteAsUser executeAsUser;
   private final Configuration conf;
   private final ConcurrentMap<String, UserGroupInformation> userUgiMap;
+  private final HiveMetaStoreClientFactory hiveMetaStoreClientFactory;
+
   private UserGroupInformation loginUser = null;
   private String keytabLocation;
   private String keytabPrincipal;
@@ -155,14 +155,14 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     URL urlToHadoop = null;
     if (hadoopConfDir != null) {
       urlToHadoop = new File(hadoopConfDir).toURI().toURL();
-      logger.info("Using hadoop config found in " + urlToHadoop);
+      log.info("Using hadoop config found in " + urlToHadoop);
       resources.add(urlToHadoop);
     } else if (hadoopHome != null) {
       urlToHadoop = new File(hadoopHome, "conf").toURI().toURL();
-      logger.info("Using hadoop config found in " + urlToHadoop);
+      log.info("Using hadoop config found in " + urlToHadoop);
       resources.add(urlToHadoop);
     } else {
-      logger.info("HADOOP_HOME not set, using default hadoop config.");
+      log.info("HADOOP_HOME not set, using default hadoop config.");
     }
 
     ucl = new URLClassLoader(resources.toArray(new URL[resources.size()]));
@@ -171,24 +171,24 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     this.conf.setClassLoader(ucl);
 
     if (props.containsKey(FS_HDFS_IMPL_DISABLE_CACHE)) {
-      logger.info("Setting " + FS_HDFS_IMPL_DISABLE_CACHE + " to "
+      log.info("Setting " + FS_HDFS_IMPL_DISABLE_CACHE + " to "
           + props.get(FS_HDFS_IMPL_DISABLE_CACHE));
       this.conf.setBoolean(FS_HDFS_IMPL_DISABLE_CACHE,
           Boolean.valueOf(props.get(FS_HDFS_IMPL_DISABLE_CACHE)));
     }
 
-    logger.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + ": "
+    log.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + ": "
         + this.conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
-    logger.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION + ":  "
+    log.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION + ":  "
         + this.conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION));
-    logger.info(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY + ": "
+    log.info(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY + ": "
         + this.conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
 
     UserGroupInformation.setConfiguration(this.conf);
 
     this.securityEnabled = UserGroupInformation.isSecurityEnabled();
     if (this.securityEnabled) {
-      logger.info("The Hadoop cluster has enabled security");
+      log.info("The Hadoop cluster has enabled security");
       this.shouldProxy = true;
       try {
 
@@ -201,15 +201,15 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
       // try login
       try {
         if (this.loginUser == null) {
-          logger.info("No login user. Creating login user");
-          logger.info("Using principal from " + this.keytabPrincipal + " and "
+          log.info("No login user. Creating login user");
+          log.info("Using principal from " + this.keytabPrincipal + " and "
               + this.keytabLocation);
           UserGroupInformation.loginUserFromKeytab(this.keytabPrincipal,
               this.keytabLocation);
           this.loginUser = UserGroupInformation.getLoginUser();
-          logger.info("Logged in with user " + this.loginUser);
+          log.info("Logged in with user " + this.loginUser);
         } else {
-          logger.info("loginUser (" + this.loginUser
+          log.info("loginUser (" + this.loginUser
               + ") already created, refreshing tgt.");
           this.loginUser.checkTGTAndReloginFromKeytab();
         }
@@ -221,8 +221,9 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     }
 
     this.userUgiMap = new ConcurrentHashMap<>();
+    this.hiveMetaStoreClientFactory = new HiveMetaStoreClientFactory(new HiveConf());
 
-    logger.info("Hadoop Security Manager initialized");
+    log.info("Hadoop Security Manager initialized");
   }
 
   public static HadoopSecurityManager getInstance(final Props props)
@@ -230,13 +231,13 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     if (hsmInstance == null) {
       synchronized (HadoopSecurityManager_H_2_0.class) {
         if (hsmInstance == null) {
-          logger.info("getting new instance");
+          log.info("getting new instance");
           hsmInstance = new HadoopSecurityManager_H_2_0(props);
         }
       }
     }
 
-    logger.debug("Relogging in from keytab if necessary.");
+    log.debug("Relogging in from keytab if necessary.");
     hsmInstance.reloginFromKeytab();
 
     return hsmInstance;
@@ -256,7 +257,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
 
     UserGroupInformation ugi = this.userUgiMap.get(userToProxy);
     if (ugi == null) {
-      logger.info("proxy user " + userToProxy
+      log.info("proxy user " + userToProxy
           + " not exist. Creating new proxy user");
       if (this.shouldProxy) {
         try {
@@ -305,7 +306,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
       throws HadoopSecurityManagerException {
     final FileSystem fs;
     try {
-      logger.info("Getting file system as " + user);
+      log.info("Getting file system as " + user);
       final UserGroupInformation ugi = getProxiedUser(user);
 
       if (ugi != null) {
@@ -402,11 +403,10 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     }
   }
 
-  private void cancelHiveToken(final Token<? extends TokenIdentifier> t,
-      final String userToProxy) throws HadoopSecurityManagerException {
+  private void cancelHiveToken(final Token<? extends TokenIdentifier> t)
+      throws HadoopSecurityManagerException {
     try {
-      final HiveConf hiveConf = new HiveConf();
-      final HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
+      final IMetaStoreClient hiveClient = this.hiveMetaStoreClientFactory.create();
       hiveClient.cancelDelegationToken(t.encodeToUrlString());
     } catch (final Exception e) {
       throw new HadoopSecurityManagerException("Failed to cancel Token. "
@@ -418,7 +418,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   public void cancelTokens(final File tokenFile, final String userToProxy, final Logger logger)
       throws HadoopSecurityManagerException {
     // nntoken
-    Credentials cred = null;
+    final Credentials cred;
     try {
       cred =
           Credentials.readTokenStorageFile(new Path(tokenFile.toURI()),
@@ -430,7 +430,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
 
         if (t.getKind().equals(new Text("HIVE_DELEGATION_TOKEN"))) {
           logger.info("Cancelling hive token.");
-          cancelHiveToken(t, userToProxy);
+          cancelHiveToken(t);
         } else if (t.getKind().equals(new Text("RM_DELEGATION_TOKEN"))) {
           logger.info("Ignore cancelling mr job tracker token request.");
         } else if (t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
@@ -458,7 +458,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
    */
   private Token<DelegationTokenIdentifier> fetchHcatToken(final String userToProxy,
       final HiveConf hiveConf, final String tokenSignatureOverwrite, final Logger logger)
-      throws IOException, MetaException, TException {
+      throws IOException, TException {
 
     logger.info(HiveConf.ConfVars.METASTOREURIS.varname + ": "
         + hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
@@ -469,7 +469,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     logger.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": "
         + hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
 
-    final HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
+    final IMetaStoreClient hiveClient = this.hiveMetaStoreClientFactory.create();
     final String hcatTokenStr =
         hiveClient.getDelegationToken(userToProxy, UserGroupInformation
             .getLoginUser().getShortUserName());
@@ -479,13 +479,9 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
 
     // overwrite the value of the service property of the token if the signature
     // override is specified.
-    if (tokenSignatureOverwrite != null
-        && tokenSignatureOverwrite.trim().length() > 0) {
-      hcatToken.setService(new Text(tokenSignatureOverwrite.trim()
-          .toLowerCase()));
-
-      logger.info(HIVE_TOKEN_SIGNATURE_KEY + ":"
-          + (tokenSignatureOverwrite == null ? "" : tokenSignatureOverwrite));
+    if (tokenSignatureOverwrite != null && tokenSignatureOverwrite.trim().length() > 0) {
+      hcatToken.setService(new Text(tokenSignatureOverwrite.trim().toLowerCase()));
+      logger.info(HIVE_TOKEN_SIGNATURE_KEY + ":" + tokenSignatureOverwrite);
     }
 
     logger.info("Created hive metastore token.");
diff --git a/azkaban-hadoop-security-plugin/src/test/java/azkaban/hive/HiveMetaStoreClientFactoryTest.java b/azkaban-hadoop-security-plugin/src/test/java/azkaban/hive/HiveMetaStoreClientFactoryTest.java
new file mode 100644
index 0000000..3d488dd
--- /dev/null
+++ b/azkaban-hadoop-security-plugin/src/test/java/azkaban/hive/HiveMetaStoreClientFactoryTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.hive;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+public class HiveMetaStoreClientFactoryTest {
+
+  public static final File METASTORE_DB_DIR = new File("metastore_db");
+  public static final File DERBY_LOG_FILE = new File("derby.log");
+
+  @Test
+  public void testCreate() throws TException, IOException {
+    cleanup();
+
+    final HiveConf hiveConf = new HiveConf();
+    final HiveMetaStoreClientFactory factory = new HiveMetaStoreClientFactory(hiveConf);
+    final IMetaStoreClient msc = factory.create();
+
+    final String dbName = "test_db";
+    final String description = "test database";
+    final String location = "file:/tmp/" + dbName;
+    Database db = new Database(dbName, description, location, null);
+
+    msc.dropDatabase(dbName, true, true);
+    msc.createDatabase(db);
+    db = msc.getDatabase(dbName);
+    
+    assertThat(db.getName()).isEqualTo(dbName);
+    assertThat(db.getDescription()).isEqualTo(description);
+    assertThat(db.getLocationUri()).isEqualTo(location);
+
+    // Clean up if the test is successful
+    cleanup();
+  }
+
+  private void cleanup() throws IOException {
+    if (METASTORE_DB_DIR.exists()) {
+      FileUtils.deleteDirectory(METASTORE_DB_DIR);
+    }
+    if (DERBY_LOG_FILE.exists()) {
+      DERBY_LOG_FILE.delete();
+    }
+  }
+}

build.gradle 1(+1 -0)

diff --git a/build.gradle b/build.gradle
index efaf2c1..a257b77 100644
--- a/build.gradle
+++ b/build.gradle
@@ -54,6 +54,7 @@ ext.deps = [
         hadoopHdfs          : "org.apache.hadoop:hadoop-hdfs:" + versions.hadoop,
         hadoopMRClientCommon: "org.apache.hadoop:hadoop-mapreduce-client-common:" + versions.hadoop,
         hadoopMRClientCore  : "org.apache.hadoop:hadoop-mapreduce-client-core:" + versions.hadoop,
+        hiveExecCore        : "org.apache.hive:hive-exec:" + versions.hive + ":core",
         hiveMetastore       : "org.apache.hive:hive-metastore:" + versions.hive,
         httpclient          : 'org.apache.httpcomponents:httpclient:4.5.2',
         httpcore            : 'org.apache.httpcomponents:httpcore:4.4.5',