azkaban-aplcache

Refactor hadoop token fetch logic follow-up (#2028) This

11/15/2018 11:45:28 PM

Details

diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index 0b9f486..9ca327f 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -536,7 +536,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
     doPrefetch(tokenFile, props, logger, userToProxy);
   }
 
-  private synchronized void doPrefetch(final File tokenFile, final Props props, final Logger logger,
+  private void doPrefetch(final File tokenFile, final Props props, final Logger logger,
       final String userToProxy) throws HadoopSecurityManagerException {
     final Credentials cred = new Credentials();
     fetchMetaStoreToken(props, logger, userToProxy, cred);
@@ -567,6 +567,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
         }
       });
 
+      logger.info("Preparing token file " + tokenFile.getAbsolutePath());
       prepareTokenFile(userToProxy, cred, tokenFile, logger);
       // stash them to cancel after use.
 
@@ -582,8 +583,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   }
 
   private void fetchJobTrackerToken(final String userToProxy, final Props props,
-      final Logger logger,
-      final Credentials cred)
+      final Logger logger, final Credentials cred)
       throws IOException, InterruptedException, HadoopSecurityManagerException {
     if (props.getBoolean(HadoopSecurityManager.OBTAIN_JOBTRACKER_TOKEN, false)) {
       final JobConf jobConf = new JobConf();
@@ -598,20 +598,18 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
         throw new HadoopSecurityManagerException(
             "Failed to fetch JT token for " + userToProxy);
       }
-      logger.info("Created JT token.");
-      logger.info("Token kind: " + mrdt.getKind());
-      logger.info("Token service: " + mrdt.getService());
+
+      logger.info(String.format("JT token pre-fetched, token kind: %s, token service: %s",
+          mrdt.getKind(), mrdt.getService()));
       cred.addToken(mrdt.getService(), mrdt);
     }
   }
 
   private void fetchNameNodeToken(final String userToProxy, final Props props, final Logger logger,
-      final Credentials cred)
-      throws IOException, HadoopSecurityManagerException {
+      final Credentials cred) throws IOException, HadoopSecurityManagerException {
     if (props.getBoolean(HadoopSecurityManager.OBTAIN_NAMENODE_TOKEN, false)) {
       final FileSystem fs = FileSystem.get(HadoopSecurityManager_H_2_0.this.conf);
-      // check if we get the correct FS, and most importantly, the
-      // conf
+      // check if we get the correct FS, and most importantly, the conf
       logger.info("Getting DFS token from " + fs.getUri());
       final Token<?> fsToken =
           fs.getDelegationToken(getMRTokenRenewerInternal(new JobConf())
@@ -621,9 +619,10 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
         throw new HadoopSecurityManagerException(
             "Failed to fetch DFS token for " + userToProxy);
       }
-      logger.info("Created DFS token.");
-      logger.info("Token kind: " + fsToken.getKind());
-      logger.info("Token service: " + fsToken.getService());
+
+      logger.info(String
+          .format("DFS token from namenode pre-fetched, token kind: %s, token service: %s",
+              fsToken.getKind(), fsToken.getService()));
 
       cred.addToken(fsToken.getService(), fsToken);
 
@@ -652,14 +651,15 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
       final Props props, final Logger logger, final String userToProxy, final Credentials cred)
       throws HadoopSecurityManagerException {
     if (props.getBoolean(OBTAIN_JOBHISTORYSERVER_TOKEN, false)) {
+      logger.info("Pre-fetching JH token from job history server");
+
       final YarnRPC rpc = YarnRPC.create(this.conf);
       final String serviceAddr = this.conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
 
-      logger.debug("Connecting to HistoryServer at: " + serviceAddr);
+      logger.info("Connecting to HistoryServer at: " + serviceAddr);
       final HSClientProtocol hsProxy =
           (HSClientProtocol) rpc.getProxy(HSClientProtocol.class,
               NetUtils.createSocketAddr(serviceAddr), this.conf);
-      logger.info("Pre-fetching JH token from job history server");
 
       Token<?> jhsdt = null;
       try {
@@ -676,9 +676,9 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
             "Unable to fetch JH token for " + userToProxy);
       }
 
-      logger.info("Created JH token.");
-      logger.info("Token kind: " + jhsdt.getKind());
-      logger.info("Token service: " + jhsdt.getService());
+      logger.info(String
+          .format("JH token from job history server pre-fetched, token Kind: %s, token service: %s",
+              jhsdt.getKind(), jhsdt.getService()));
 
       cred.addToken(jhsdt.getService(), jhsdt);
     }
@@ -691,7 +691,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
       try {
 
         // first we fetch and save the default hcat token.
-        logger.info("Pre-fetching default Hive MetaStore token from hive");
+        logger.info("Pre-fetching default hive metastore token from hive");
 
         HiveConf hiveConf = new HiveConf();
         Token<DelegationTokenIdentifier> hcatToken =
@@ -702,11 +702,11 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
         // Added support for extra_hcat_clusters
         final List<String> extraHcatClusters = props.getStringListFromCluster(EXTRA_HCAT_CLUSTERS);
         if (Collections.EMPTY_LIST != extraHcatClusters) {
-          logger.info("Need to pre-fetch extra metaStore tokens from extra hive clusters.");
+          logger.info("Need to pre-fetch extra metastore tokens from extra hive clusters.");
 
           // start to process the user inputs.
           for (final String thriftUrls : extraHcatClusters) {
-            logger.info("Pre-fetching metaStore token from cluster : " + thriftUrls);
+            logger.info("Pre-fetching metastore token from cluster : " + thriftUrls);
 
             hiveConf = new HiveConf();
             hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrls);
@@ -718,11 +718,11 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
           final List<String> extraHcatLocations =
               props.getStringList(EXTRA_HCAT_LOCATION);
           if (Collections.EMPTY_LIST != extraHcatLocations) {
-            logger.info("Need to pre-fetch extra metaStore tokens from hive.");
+            logger.info("Need to pre-fetch extra metastore tokens from hive.");
 
             // start to process the user inputs.
             for (final String thriftUrl : extraHcatLocations) {
-              logger.info("Pre-fetching metaStore token from : " + thriftUrl);
+              logger.info("Pre-fetching metastore token from : " + thriftUrl);
 
               hiveConf = new HiveConf();
               hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
@@ -733,6 +733,8 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
           }
         }
 
+        logger.info("Hive metastore token(s) prefetched");
+
       } catch (final Throwable t) {
         final String message =
             "Failed to get hive metastore token." + t.getMessage()