diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 4424c15..c106952 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -204,6 +204,9 @@ public class Constants {
@Deprecated
public static final String EXTRA_HCAT_LOCATION = "other_hcat_location";
+ // If true, AZ will fetches the jobs' certificate from remote Certificate Authority.
+ public static final String ENABLE_JOB_SSL = "azkaban.job.enable.ssl";
+
// Job properties that indicate maximum memory size
public static final String JOB_MAX_XMS = "job.max.Xms";
public static final String MAX_XMS_DEFAULT = "1G";
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index 192484e..c7defdc 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -21,6 +21,8 @@ import static azkaban.Constants.JobProperties.EXTRA_HCAT_CLUSTERS;
import static azkaban.Constants.JobProperties.EXTRA_HCAT_LOCATION;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
+import azkaban.Constants;
+import azkaban.Constants.JobProperties;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.ExecuteAsUser;
@@ -30,6 +32,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.URL;
import java.net.URLClassLoader;
import java.security.PrivilegedAction;
@@ -334,6 +337,27 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
return this.shouldProxy;
}
+ private void registerCustomCredential(final Props props, final Credentials hadoopCred, final
+ String userToProxy) {
+ String credentialClassName = "unknown class";
+ try {
+ credentialClassName = props
+ .getString(Constants.ConfigurationKeys.CUSTOM_CREDENTIAL_NAME);
+ logger.info("custom credential class name: " + credentialClassName);
+ final Class metricsClass = Class.forName(credentialClassName);
+
+ final Constructor[] constructors = metricsClass.getConstructors();
+ final CredentialProvider customCredential = (CredentialProvider) constructors[0]
+ .newInstance(hadoopCred, props);
+ customCredential.register(userToProxy);
+ } catch (final Exception e) {
+ logger.error("Encountered error while loading and instantiating "
+ + credentialClassName, e);
+ throw new IllegalStateException("Encountered error while loading and instantiating "
+ + credentialClassName, e);
+ }
+ }
+
@Override
public boolean isHadoopSecurityEnabled() {
return this.securityEnabled;
@@ -609,6 +633,12 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
IOException, HadoopSecurityManagerException {
logger.info("Here is the props for " + OBTAIN_NAMENODE_TOKEN + ": "
+ props.getBoolean(OBTAIN_NAMENODE_TOKEN));
+
+ // Register user secrets by custom credential Object
+ if (props.getBoolean(JobProperties.ENABLE_JOB_SSL, false)) {
+ registerCustomCredential(props, cred, userToProxy);
+ }
+
if (props.getBoolean(OBTAIN_NAMENODE_TOKEN, false)) {
final FileSystem fs = FileSystem.get(HadoopSecurityManager_H_2_0.this.conf);
// check if we get the correct FS, and most importantly, the