azkaban-aplcache

Refactor hadoop token fetch logic (#2027) 1. apply "save

11/14/2018 1:26:42 AM

Details

diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index 0c594de..0b9f486 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -348,7 +348,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
 
         // The credential class must have a constructor accepting 3 parameters, Credentials,
         // Props, and Logger in order.
-        Constructor constructor = credentialClass.getConstructor (new Class[]
+        final Constructor constructor = credentialClass.getConstructor(new Class[]
             {Credentials.class, Props.class, Logger.class});
         final CredentialProvider customCredential = (CredentialProvider) constructor
               .newInstance(hadoopCred, props, jobLogger);
@@ -536,64 +536,121 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     doPrefetch(tokenFile, props, logger, userToProxy);
   }
 
-  private synchronized void doPrefetch(File tokenFile, Props props, Logger logger,
-      String userToProxy) throws HadoopSecurityManagerException {
+  private synchronized void doPrefetch(final File tokenFile, final Props props, final Logger logger,
+      final String userToProxy) throws HadoopSecurityManagerException {
     final Credentials cred = new Credentials();
-    if (props.getBoolean(OBTAIN_HCAT_TOKEN, false)) {
-      try {
+    fetchMetaStoreToken(props, logger, userToProxy, cred);
+    fetchJHSToken(props, logger, userToProxy, cred);
 
-        // first we fetch and save the default hcat token.
-        logger.info("Pre-fetching default Hive MetaStore token from hive");
+    try {
+      getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          getToken(userToProxy);
+          return null;
+        }
 
-        HiveConf hiveConf = new HiveConf();
-        Token<DelegationTokenIdentifier> hcatToken =
-            fetchHcatToken(userToProxy, hiveConf, null, logger);
+        private void getToken(final String userToProxy) throws InterruptedException,
+            IOException, HadoopSecurityManagerException {
+          logger.info("Here is the props for " + HadoopSecurityManager.OBTAIN_NAMENODE_TOKEN + ": "
+              + props.getBoolean(HadoopSecurityManager.OBTAIN_NAMENODE_TOKEN));
 
-        cred.addToken(hcatToken.getService(), hcatToken);
+          // Register user secrets by custom credential Object
+          if (props.getBoolean(JobProperties.ENABLE_JOB_SSL, false)) {
+            registerCustomCredential(props, cred, userToProxy, logger);
+          }
 
-        // Added support for extra_hcat_clusters
-        final List<String> extraHcatClusters = props.getStringListFromCluster(EXTRA_HCAT_CLUSTERS);
-        if (Collections.EMPTY_LIST != extraHcatClusters) {
-          logger.info("Need to pre-fetch extra metaStore tokens from extra hive clusters.");
+          fetchNameNodeToken(userToProxy, props, logger, cred);
 
-          // start to process the user inputs.
-          for (final String thriftUrls : extraHcatClusters) {
-            logger.info("Pre-fetching metaStore token from cluster : " + thriftUrls);
+          fetchJobTrackerToken(userToProxy, props, logger, cred);
 
-            hiveConf = new HiveConf();
-            hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrls);
-            hcatToken = fetchHcatToken(userToProxy, hiveConf, thriftUrls, logger);
-            cred.addToken(hcatToken.getService(), hcatToken);
-          }
-        } else {
-          // Only if EXTRA_HCAT_CLUSTERS
-          final List<String> extraHcatLocations =
-              props.getStringList(EXTRA_HCAT_LOCATION);
-          if (Collections.EMPTY_LIST != extraHcatLocations) {
-            logger.info("Need to pre-fetch extra metaStore tokens from hive.");
+        }
+      });
 
-            // start to process the user inputs.
-            for (final String thriftUrl : extraHcatLocations) {
-              logger.info("Pre-fetching metaStore token from : " + thriftUrl);
+      prepareTokenFile(userToProxy, cred, tokenFile, logger);
+      // stash them to cancel after use.
 
-              hiveConf = new HiveConf();
-              hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
-              hcatToken =
-                  fetchHcatToken(userToProxy, hiveConf, thriftUrl, logger);
-              cred.addToken(hcatToken.getService(), hcatToken);
-            }
-          }
-        }
+      logger.info("Tokens loaded in " + tokenFile.getAbsolutePath());
 
-      } catch (final Throwable t) {
-        final String message =
-            "Failed to get hive metastore token." + t.getMessage()
-                + t.getCause();
-        logger.error(message, t);
-        throw new HadoopSecurityManagerException(message);
+    } catch (final Exception e) {
+      throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
+          + e.getMessage() + e.getCause(), e);
+    } catch (final Throwable t) {
+      throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
+          + t.getMessage() + t.getCause(), t);
+    }
+  }
+
+  private void fetchJobTrackerToken(final String userToProxy, final Props props,
+      final Logger logger,
+      final Credentials cred)
+      throws IOException, InterruptedException, HadoopSecurityManagerException {
+    if (props.getBoolean(HadoopSecurityManager.OBTAIN_JOBTRACKER_TOKEN, false)) {
+      final JobConf jobConf = new JobConf();
+      final JobClient jobClient = new JobClient(jobConf);
+      logger.info("Pre-fetching JT token from JobTracker");
+
+      final Token<DelegationTokenIdentifier> mrdt =
+          jobClient
+              .getDelegationToken(getMRTokenRenewerInternal(jobConf));
+      if (mrdt == null) {
+        logger.error("Failed to fetch JT token");
+        throw new HadoopSecurityManagerException(
+            "Failed to fetch JT token for " + userToProxy);
+      }
+      logger.info("Created JT token.");
+      logger.info("Token kind: " + mrdt.getKind());
+      logger.info("Token service: " + mrdt.getService());
+      cred.addToken(mrdt.getService(), mrdt);
+    }
+  }
+
+  private void fetchNameNodeToken(final String userToProxy, final Props props, final Logger logger,
+      final Credentials cred)
+      throws IOException, HadoopSecurityManagerException {
+    if (props.getBoolean(HadoopSecurityManager.OBTAIN_NAMENODE_TOKEN, false)) {
+      final FileSystem fs = FileSystem.get(HadoopSecurityManager_H_2_0.this.conf);
+      // check if we get the correct FS, and most importantly, the
+      // conf
+      logger.info("Getting DFS token from " + fs.getUri());
+      final Token<?> fsToken =
+          fs.getDelegationToken(getMRTokenRenewerInternal(new JobConf())
+              .toString());
+      if (fsToken == null) {
+        logger.error("Failed to fetch DFS token for ");
+        throw new HadoopSecurityManagerException(
+            "Failed to fetch DFS token for " + userToProxy);
+      }
+      logger.info("Created DFS token.");
+      logger.info("Token kind: " + fsToken.getKind());
+      logger.info("Token service: " + fsToken.getService());
+
+      cred.addToken(fsToken.getService(), fsToken);
+
+      // getting additional name nodes tokens
+      final String otherNamenodes = props.get(
+          HadoopSecurityManager_H_2_0.OTHER_NAMENODES_TO_GET_TOKEN);
+      if ((otherNamenodes != null) && (otherNamenodes.length() > 0)) {
+        logger.info(
+            HadoopSecurityManager_H_2_0.OTHER_NAMENODES_TO_GET_TOKEN + ": '" + otherNamenodes
+                + "'");
+        final String[] nameNodeArr = otherNamenodes.split(",");
+        final Path[] ps = new Path[nameNodeArr.length];
+        for (int i = 0; i < ps.length; i++) {
+          ps[i] = new Path(nameNodeArr[i].trim());
+        }
+        TokenCache.obtainTokensForNamenodes(cred, ps, HadoopSecurityManager_H_2_0.this.conf);
+        logger.info("Successfully fetched tokens for: " + otherNamenodes);
+      } else {
+        logger.info(
+            HadoopSecurityManager_H_2_0.OTHER_NAMENODES_TO_GET_TOKEN + " was not configured");
       }
     }
+  }
 
+  private void fetchJHSToken(
+      final Props props, final Logger logger, final String userToProxy, final Credentials cred)
+      throws HadoopSecurityManagerException {
     if (props.getBoolean(OBTAIN_JOBHISTORYSERVER_TOKEN, false)) {
       final YarnRPC rpc = YarnRPC.create(this.conf);
       final String serviceAddr = this.conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
@@ -625,94 +682,64 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
 
       cred.addToken(jhsdt.getService(), jhsdt);
     }
+  }
 
-    try {
-      getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction<Void>() {
-        @Override
-        public Void run() throws Exception {
-          getToken(userToProxy);
-          return null;
-        }
+  private void fetchMetaStoreToken(final Props props, final Logger logger, final String userToProxy,
+      final Credentials cred)
+      throws HadoopSecurityManagerException {
+    if (props.getBoolean(HadoopSecurityManager.OBTAIN_HCAT_TOKEN, false)) {
+      try {
 
-        private void getToken(final String userToProxy) throws InterruptedException,
-            IOException, HadoopSecurityManagerException {
-          logger.info("Here is the props for " + OBTAIN_NAMENODE_TOKEN + ": "
-              + props.getBoolean(OBTAIN_NAMENODE_TOKEN));
+        // first we fetch and save the default hcat token.
+        logger.info("Pre-fetching default Hive MetaStore token from hive");
 
-          // Register user secrets by custom credential Object
-          if (props.getBoolean(JobProperties.ENABLE_JOB_SSL, false)) {
-            registerCustomCredential(props, cred, userToProxy, logger);
-          }
+        HiveConf hiveConf = new HiveConf();
+        Token<DelegationTokenIdentifier> hcatToken =
+            fetchHcatToken(userToProxy, hiveConf, null, logger);
 
-          if (props.getBoolean(OBTAIN_NAMENODE_TOKEN, false)) {
-            final FileSystem fs = FileSystem.get(HadoopSecurityManager_H_2_0.this.conf);
-            // check if we get the correct FS, and most importantly, the
-            // conf
-            logger.info("Getting DFS token from " + fs.getUri());
-            final Token<?> fsToken =
-                fs.getDelegationToken(getMRTokenRenewerInternal(new JobConf())
-                    .toString());
-            if (fsToken == null) {
-              logger.error("Failed to fetch DFS token for ");
-              throw new HadoopSecurityManagerException(
-                  "Failed to fetch DFS token for " + userToProxy);
-            }
-            logger.info("Created DFS token.");
-            logger.info("Token kind: " + fsToken.getKind());
-            logger.info("Token service: " + fsToken.getService());
-
-            cred.addToken(fsToken.getService(), fsToken);
-
-            // getting additional name nodes tokens
-            final String otherNamenodes = props.get(OTHER_NAMENODES_TO_GET_TOKEN);
-            if ((otherNamenodes != null) && (otherNamenodes.length() > 0)) {
-              logger.info(OTHER_NAMENODES_TO_GET_TOKEN + ": '" + otherNamenodes
-                  + "'");
-              final String[] nameNodeArr = otherNamenodes.split(",");
-              final Path[] ps = new Path[nameNodeArr.length];
-              for (int i = 0; i < ps.length; i++) {
-                ps[i] = new Path(nameNodeArr[i].trim());
-              }
-              TokenCache.obtainTokensForNamenodes(cred, ps, HadoopSecurityManager_H_2_0.this.conf);
-              logger.info("Successfully fetched tokens for: " + otherNamenodes);
-            } else {
-              logger.info(OTHER_NAMENODES_TO_GET_TOKEN + " was not configured");
-            }
+        cred.addToken(hcatToken.getService(), hcatToken);
+
+        // Added support for extra_hcat_clusters
+        final List<String> extraHcatClusters = props.getStringListFromCluster(EXTRA_HCAT_CLUSTERS);
+        if (Collections.EMPTY_LIST != extraHcatClusters) {
+          logger.info("Need to pre-fetch extra metaStore tokens from extra hive clusters.");
+
+          // start to process the user inputs.
+          for (final String thriftUrls : extraHcatClusters) {
+            logger.info("Pre-fetching metaStore token from cluster : " + thriftUrls);
+
+            hiveConf = new HiveConf();
+            hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrls);
+            hcatToken = fetchHcatToken(userToProxy, hiveConf, thriftUrls, logger);
+            cred.addToken(hcatToken.getService(), hcatToken);
           }
+        } else {
+          // Only if EXTRA_HCAT_CLUSTERS
+          final List<String> extraHcatLocations =
+              props.getStringList(EXTRA_HCAT_LOCATION);
+          if (Collections.EMPTY_LIST != extraHcatLocations) {
+            logger.info("Need to pre-fetch extra metaStore tokens from hive.");
 
-          if (props.getBoolean(OBTAIN_JOBTRACKER_TOKEN, false)) {
-            final JobConf jobConf = new JobConf();
-            final JobClient jobClient = new JobClient(jobConf);
-            logger.info("Pre-fetching JT token from JobTracker");
-
-            final Token<DelegationTokenIdentifier> mrdt =
-                jobClient
-                    .getDelegationToken(getMRTokenRenewerInternal(jobConf));
-            if (mrdt == null) {
-              logger.error("Failed to fetch JT token");
-              throw new HadoopSecurityManagerException(
-                  "Failed to fetch JT token for " + userToProxy);
+            // start to process the user inputs.
+            for (final String thriftUrl : extraHcatLocations) {
+              logger.info("Pre-fetching metaStore token from : " + thriftUrl);
+
+              hiveConf = new HiveConf();
+              hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
+              hcatToken =
+                  fetchHcatToken(userToProxy, hiveConf, thriftUrl, logger);
+              cred.addToken(hcatToken.getService(), hcatToken);
             }
-            logger.info("Created JT token.");
-            logger.info("Token kind: " + mrdt.getKind());
-            logger.info("Token service: " + mrdt.getService());
-            cred.addToken(mrdt.getService(), mrdt);
           }
-
         }
-      });
-
-      prepareTokenFile(userToProxy, cred, tokenFile, logger);
-      // stash them to cancel after use.
-
-      logger.info("Tokens loaded in " + tokenFile.getAbsolutePath());
 
-    } catch (final Exception e) {
-      throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
-          + e.getMessage() + e.getCause(), e);
-    } catch (final Throwable t) {
-      throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
-          + t.getMessage() + t.getCause(), t);
+      } catch (final Throwable t) {
+        final String message =
+            "Failed to get hive metastore token." + t.getMessage()
+                + t.getCause();
+        logger.error(message, t);
+        throw new HadoopSecurityManagerException(message);
+      }
     }
   }
 
@@ -851,7 +878,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
               HiveUtils.getStorageHandler(hiveConf, tbl.getParameters().get(META_TABLE_STORAGE));
           return storageHandler == null ? null : storageHandler.getMetaHook();
         } catch (final HiveException e) {
-          logger.error(e.toString());
+          HadoopSecurityManager_H_2_0.logger.error(e.toString());
           throw new MetaException("Failed to get storage handler: " + e);
         }
       }