diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index 0c594de..0b9f486 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -348,7 +348,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
// The credential class must have a constructor accepting 3 parameters, Credentials,
// Props, and Logger in order.
- Constructor constructor = credentialClass.getConstructor (new Class[]
+ final Constructor constructor = credentialClass.getConstructor(new Class[]
{Credentials.class, Props.class, Logger.class});
final CredentialProvider customCredential = (CredentialProvider) constructor
.newInstance(hadoopCred, props, jobLogger);
@@ -536,64 +536,121 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
doPrefetch(tokenFile, props, logger, userToProxy);
}
- private synchronized void doPrefetch(File tokenFile, Props props, Logger logger,
- String userToProxy) throws HadoopSecurityManagerException {
+ private synchronized void doPrefetch(final File tokenFile, final Props props, final Logger logger,
+ final String userToProxy) throws HadoopSecurityManagerException {
final Credentials cred = new Credentials();
- if (props.getBoolean(OBTAIN_HCAT_TOKEN, false)) {
- try {
+ fetchMetaStoreToken(props, logger, userToProxy, cred);
+ fetchJHSToken(props, logger, userToProxy, cred);
- // first we fetch and save the default hcat token.
- logger.info("Pre-fetching default Hive MetaStore token from hive");
+ try {
+ getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ getToken(userToProxy);
+ return null;
+ }
- HiveConf hiveConf = new HiveConf();
- Token<DelegationTokenIdentifier> hcatToken =
- fetchHcatToken(userToProxy, hiveConf, null, logger);
+ private void getToken(final String userToProxy) throws InterruptedException,
+ IOException, HadoopSecurityManagerException {
+ logger.info("Here is the props for " + HadoopSecurityManager.OBTAIN_NAMENODE_TOKEN + ": "
+ + props.getBoolean(HadoopSecurityManager.OBTAIN_NAMENODE_TOKEN));
- cred.addToken(hcatToken.getService(), hcatToken);
+ // Register user secrets by custom credential Object
+ if (props.getBoolean(JobProperties.ENABLE_JOB_SSL, false)) {
+ registerCustomCredential(props, cred, userToProxy, logger);
+ }
- // Added support for extra_hcat_clusters
- final List<String> extraHcatClusters = props.getStringListFromCluster(EXTRA_HCAT_CLUSTERS);
- if (Collections.EMPTY_LIST != extraHcatClusters) {
- logger.info("Need to pre-fetch extra metaStore tokens from extra hive clusters.");
+ fetchNameNodeToken(userToProxy, props, logger, cred);
- // start to process the user inputs.
- for (final String thriftUrls : extraHcatClusters) {
- logger.info("Pre-fetching metaStore token from cluster : " + thriftUrls);
+ fetchJobTrackerToken(userToProxy, props, logger, cred);
- hiveConf = new HiveConf();
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrls);
- hcatToken = fetchHcatToken(userToProxy, hiveConf, thriftUrls, logger);
- cred.addToken(hcatToken.getService(), hcatToken);
- }
- } else {
- // Only if EXTRA_HCAT_CLUSTERS
- final List<String> extraHcatLocations =
- props.getStringList(EXTRA_HCAT_LOCATION);
- if (Collections.EMPTY_LIST != extraHcatLocations) {
- logger.info("Need to pre-fetch extra metaStore tokens from hive.");
+ }
+ });
- // start to process the user inputs.
- for (final String thriftUrl : extraHcatLocations) {
- logger.info("Pre-fetching metaStore token from : " + thriftUrl);
+ prepareTokenFile(userToProxy, cred, tokenFile, logger);
+ // stash them to cancel after use.
- hiveConf = new HiveConf();
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
- hcatToken =
- fetchHcatToken(userToProxy, hiveConf, thriftUrl, logger);
- cred.addToken(hcatToken.getService(), hcatToken);
- }
- }
- }
+ logger.info("Tokens loaded in " + tokenFile.getAbsolutePath());
- } catch (final Throwable t) {
- final String message =
- "Failed to get hive metastore token." + t.getMessage()
- + t.getCause();
- logger.error(message, t);
- throw new HadoopSecurityManagerException(message);
+ } catch (final Exception e) {
+ throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
+ + e.getMessage() + e.getCause(), e);
+ } catch (final Throwable t) {
+ throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
+ + t.getMessage() + t.getCause(), t);
+ }
+ }
+
+ private void fetchJobTrackerToken(final String userToProxy, final Props props,
+ final Logger logger,
+ final Credentials cred)
+ throws IOException, InterruptedException, HadoopSecurityManagerException {
+ if (props.getBoolean(HadoopSecurityManager.OBTAIN_JOBTRACKER_TOKEN, false)) {
+ final JobConf jobConf = new JobConf();
+ final JobClient jobClient = new JobClient(jobConf);
+ logger.info("Pre-fetching JT token from JobTracker");
+
+ final Token<DelegationTokenIdentifier> mrdt =
+ jobClient
+ .getDelegationToken(getMRTokenRenewerInternal(jobConf));
+ if (mrdt == null) {
+ logger.error("Failed to fetch JT token");
+ throw new HadoopSecurityManagerException(
+ "Failed to fetch JT token for " + userToProxy);
+ }
+ logger.info("Created JT token.");
+ logger.info("Token kind: " + mrdt.getKind());
+ logger.info("Token service: " + mrdt.getService());
+ cred.addToken(mrdt.getService(), mrdt);
+ }
+ }
+
+ private void fetchNameNodeToken(final String userToProxy, final Props props, final Logger logger,
+ final Credentials cred)
+ throws IOException, HadoopSecurityManagerException {
+ if (props.getBoolean(HadoopSecurityManager.OBTAIN_NAMENODE_TOKEN, false)) {
+ final FileSystem fs = FileSystem.get(HadoopSecurityManager_H_2_0.this.conf);
+ // check if we get the correct FS, and most importantly, the
+ // conf
+ logger.info("Getting DFS token from " + fs.getUri());
+ final Token<?> fsToken =
+ fs.getDelegationToken(getMRTokenRenewerInternal(new JobConf())
+ .toString());
+ if (fsToken == null) {
+ logger.error("Failed to fetch DFS token for ");
+ throw new HadoopSecurityManagerException(
+ "Failed to fetch DFS token for " + userToProxy);
+ }
+ logger.info("Created DFS token.");
+ logger.info("Token kind: " + fsToken.getKind());
+ logger.info("Token service: " + fsToken.getService());
+
+ cred.addToken(fsToken.getService(), fsToken);
+
+ // getting additional name nodes tokens
+ final String otherNamenodes = props.get(
+ HadoopSecurityManager_H_2_0.OTHER_NAMENODES_TO_GET_TOKEN);
+ if ((otherNamenodes != null) && (otherNamenodes.length() > 0)) {
+ logger.info(
+ HadoopSecurityManager_H_2_0.OTHER_NAMENODES_TO_GET_TOKEN + ": '" + otherNamenodes
+ + "'");
+ final String[] nameNodeArr = otherNamenodes.split(",");
+ final Path[] ps = new Path[nameNodeArr.length];
+ for (int i = 0; i < ps.length; i++) {
+ ps[i] = new Path(nameNodeArr[i].trim());
+ }
+ TokenCache.obtainTokensForNamenodes(cred, ps, HadoopSecurityManager_H_2_0.this.conf);
+ logger.info("Successfully fetched tokens for: " + otherNamenodes);
+ } else {
+ logger.info(
+ HadoopSecurityManager_H_2_0.OTHER_NAMENODES_TO_GET_TOKEN + " was not configured");
}
}
+ }
+ private void fetchJHSToken(
+ final Props props, final Logger logger, final String userToProxy, final Credentials cred)
+ throws HadoopSecurityManagerException {
if (props.getBoolean(OBTAIN_JOBHISTORYSERVER_TOKEN, false)) {
final YarnRPC rpc = YarnRPC.create(this.conf);
final String serviceAddr = this.conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
@@ -625,94 +682,64 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
cred.addToken(jhsdt.getService(), jhsdt);
}
+ }
- try {
- getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- getToken(userToProxy);
- return null;
- }
+ private void fetchMetaStoreToken(final Props props, final Logger logger, final String userToProxy,
+ final Credentials cred)
+ throws HadoopSecurityManagerException {
+ if (props.getBoolean(HadoopSecurityManager.OBTAIN_HCAT_TOKEN, false)) {
+ try {
- private void getToken(final String userToProxy) throws InterruptedException,
- IOException, HadoopSecurityManagerException {
- logger.info("Here is the props for " + OBTAIN_NAMENODE_TOKEN + ": "
- + props.getBoolean(OBTAIN_NAMENODE_TOKEN));
+ // first we fetch and save the default hcat token.
+ logger.info("Pre-fetching default Hive MetaStore token from hive");
- // Register user secrets by custom credential Object
- if (props.getBoolean(JobProperties.ENABLE_JOB_SSL, false)) {
- registerCustomCredential(props, cred, userToProxy, logger);
- }
+ HiveConf hiveConf = new HiveConf();
+ Token<DelegationTokenIdentifier> hcatToken =
+ fetchHcatToken(userToProxy, hiveConf, null, logger);
- if (props.getBoolean(OBTAIN_NAMENODE_TOKEN, false)) {
- final FileSystem fs = FileSystem.get(HadoopSecurityManager_H_2_0.this.conf);
- // check if we get the correct FS, and most importantly, the
- // conf
- logger.info("Getting DFS token from " + fs.getUri());
- final Token<?> fsToken =
- fs.getDelegationToken(getMRTokenRenewerInternal(new JobConf())
- .toString());
- if (fsToken == null) {
- logger.error("Failed to fetch DFS token for ");
- throw new HadoopSecurityManagerException(
- "Failed to fetch DFS token for " + userToProxy);
- }
- logger.info("Created DFS token.");
- logger.info("Token kind: " + fsToken.getKind());
- logger.info("Token service: " + fsToken.getService());
-
- cred.addToken(fsToken.getService(), fsToken);
-
- // getting additional name nodes tokens
- final String otherNamenodes = props.get(OTHER_NAMENODES_TO_GET_TOKEN);
- if ((otherNamenodes != null) && (otherNamenodes.length() > 0)) {
- logger.info(OTHER_NAMENODES_TO_GET_TOKEN + ": '" + otherNamenodes
- + "'");
- final String[] nameNodeArr = otherNamenodes.split(",");
- final Path[] ps = new Path[nameNodeArr.length];
- for (int i = 0; i < ps.length; i++) {
- ps[i] = new Path(nameNodeArr[i].trim());
- }
- TokenCache.obtainTokensForNamenodes(cred, ps, HadoopSecurityManager_H_2_0.this.conf);
- logger.info("Successfully fetched tokens for: " + otherNamenodes);
- } else {
- logger.info(OTHER_NAMENODES_TO_GET_TOKEN + " was not configured");
- }
+ cred.addToken(hcatToken.getService(), hcatToken);
+
+ // Added support for extra_hcat_clusters
+ final List<String> extraHcatClusters = props.getStringListFromCluster(EXTRA_HCAT_CLUSTERS);
+ if (Collections.EMPTY_LIST != extraHcatClusters) {
+ logger.info("Need to pre-fetch extra metaStore tokens from extra hive clusters.");
+
+ // start to process the user inputs.
+ for (final String thriftUrls : extraHcatClusters) {
+ logger.info("Pre-fetching metaStore token from cluster : " + thriftUrls);
+
+ hiveConf = new HiveConf();
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrls);
+ hcatToken = fetchHcatToken(userToProxy, hiveConf, thriftUrls, logger);
+ cred.addToken(hcatToken.getService(), hcatToken);
}
+ } else {
+ // Only if EXTRA_HCAT_CLUSTERS
+ final List<String> extraHcatLocations =
+ props.getStringList(EXTRA_HCAT_LOCATION);
+ if (Collections.EMPTY_LIST != extraHcatLocations) {
+ logger.info("Need to pre-fetch extra metaStore tokens from hive.");
- if (props.getBoolean(OBTAIN_JOBTRACKER_TOKEN, false)) {
- final JobConf jobConf = new JobConf();
- final JobClient jobClient = new JobClient(jobConf);
- logger.info("Pre-fetching JT token from JobTracker");
-
- final Token<DelegationTokenIdentifier> mrdt =
- jobClient
- .getDelegationToken(getMRTokenRenewerInternal(jobConf));
- if (mrdt == null) {
- logger.error("Failed to fetch JT token");
- throw new HadoopSecurityManagerException(
- "Failed to fetch JT token for " + userToProxy);
+ // start to process the user inputs.
+ for (final String thriftUrl : extraHcatLocations) {
+ logger.info("Pre-fetching metaStore token from : " + thriftUrl);
+
+ hiveConf = new HiveConf();
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
+ hcatToken =
+ fetchHcatToken(userToProxy, hiveConf, thriftUrl, logger);
+ cred.addToken(hcatToken.getService(), hcatToken);
}
- logger.info("Created JT token.");
- logger.info("Token kind: " + mrdt.getKind());
- logger.info("Token service: " + mrdt.getService());
- cred.addToken(mrdt.getService(), mrdt);
}
-
}
- });
-
- prepareTokenFile(userToProxy, cred, tokenFile, logger);
- // stash them to cancel after use.
-
- logger.info("Tokens loaded in " + tokenFile.getAbsolutePath());
- } catch (final Exception e) {
- throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
- + e.getMessage() + e.getCause(), e);
- } catch (final Throwable t) {
- throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
- + t.getMessage() + t.getCause(), t);
+ } catch (final Throwable t) {
+ final String message =
+ "Failed to get hive metastore token." + t.getMessage()
+ + t.getCause();
+ logger.error(message, t);
+ throw new HadoopSecurityManagerException(message);
+ }
}
}
@@ -851,7 +878,7 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
HiveUtils.getStorageHandler(hiveConf, tbl.getParameters().get(META_TABLE_STORAGE));
return storageHandler == null ? null : storageHandler.getMetaHook();
} catch (final HiveException e) {
- logger.error(e.toString());
+ HadoopSecurityManager_H_2_0.logger.error(e.toString());
throw new MetaException("Failed to get storage handler: " + e);
}
}