azkaban-aplcache

Moving Azkaban Hadoop Security code from azkaban-plugins

9/15/2016 5:51:09 PM

Details

diff --git a/azkaban-hadoop-security-plugin/build.gradle b/azkaban-hadoop-security-plugin/build.gradle
new file mode 100644
index 0000000..44b43e0
--- /dev/null
+++ b/azkaban-hadoop-security-plugin/build.gradle
@@ -0,0 +1,27 @@
+apply plugin: 'distribution'
+
+ext.hadoopVersion = "2.6.1"
+
+dependencies {
+  compile project(":azkaban-common")
+
+  compile "org.apache.hadoop:hadoop-common:$hadoopVersion"
+  compile "org.apache.hadoop:hadoop-mapreduce-client-common:$hadoopVersion"
+  compile "org.apache.hadoop:hadoop-mapreduce-client-core:$hadoopVersion"
+  compile "org.apache.hive:hive-metastore:0.13.1"
+}
+
+/**
+ * Just package the jar.
+ * Since, rest of the dependencies are just hadoop and hive. They are not packaged inside the plugin.
+ * It is assumed that classpaths of hadoop, hive, pig, etc will be externally fed into the application.
+ */
+distributions {
+  main {
+    contents {
+      from(jar) {
+        into 'lib'
+      }
+    }
+  }
+}
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java
new file mode 100644
index 0000000..d6578d2
--- /dev/null
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2011 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.security.commons;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+public abstract class HadoopSecurityManager {
+
+  public static final String ENABLE_PROXYING = "azkaban.should.proxy"; // boolean
+
+  public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
+  public static final String PROXY_USER = "proxy.user";
+  public static final String USER_TO_PROXY = "user.to.proxy";
+  public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
+  public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
+      "mapreduce.job.credentials.binary";
+
+  public static final String OBTAIN_JOBTRACKER_TOKEN =
+      "obtain.jobtracker.token";
+  public static final String OBTAIN_NAMENODE_TOKEN = "obtain.namenode.token";
+  public static final String OBTAIN_HCAT_TOKEN = "obtain.hcat.token";
+
+  public boolean isHadoopSecurityEnabled()
+      throws HadoopSecurityManagerException {
+    return false;
+  }
+
+  public void reloginFromKeytab() throws IOException {
+    UserGroupInformation.getLoginUser().reloginFromKeytab();
+  }
+
+  /**
+   * Create a proxied user based on the explicit user name, taking other
+   * parameters necessary from properties file.
+   */
+  public abstract UserGroupInformation getProxiedUser(String toProxy)
+      throws HadoopSecurityManagerException;
+
+  /**
+   * Create a proxied user, taking all parameters, including which user to proxy
+   * from provided Properties.
+   */
+  public abstract UserGroupInformation getProxiedUser(Props prop)
+      throws HadoopSecurityManagerException;
+
+  public abstract FileSystem getFSAsUser(String user)
+      throws HadoopSecurityManagerException;
+
+  public static boolean shouldProxy(Properties prop) {
+    String shouldProxy = prop.getProperty(ENABLE_PROXYING);
+
+    return shouldProxy != null && shouldProxy.equals("true");
+  }
+
+  public abstract void prefetchToken(File tokenFile, String userToProxy,
+      Logger logger) throws HadoopSecurityManagerException;
+
+  public abstract void cancelTokens(File tokenFile, String userToProxy,
+      Logger logger) throws HadoopSecurityManagerException;
+
+  public abstract void prefetchToken(File tokenFile, Props props, Logger logger)
+      throws HadoopSecurityManagerException;
+
+}
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManagerException.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManagerException.java
new file mode 100644
index 0000000..5b55d5f
--- /dev/null
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManagerException.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.security.commons;
+
+public class HadoopSecurityManagerException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  public HadoopSecurityManagerException(String message) {
+    super(message);
+  }
+
+  public HadoopSecurityManagerException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java
new file mode 100644
index 0000000..ef4cc35
--- /dev/null
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/SecurityUtils.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2011 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.security.commons;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+
+public class SecurityUtils {
+  // Secure Hadoop proxy user params
+  public static final String ENABLE_PROXYING = "azkaban.should.proxy"; // boolean
+  public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
+  public static final String PROXY_USER = "proxy.user";
+  public static final String TO_PROXY = "user.to.proxy";
+
+  private static UserGroupInformation loginUser = null;
+
+  /**
+   * Create a proxied user based on the explicit user name, taking other
+   * parameters necessary from properties file.
+   */
+  public static synchronized UserGroupInformation getProxiedUser(
+      String toProxy, Properties prop, Logger log, Configuration conf)
+      throws IOException {
+
+    if (conf == null) {
+      throw new IllegalArgumentException("conf can't be null");
+    }
+    UserGroupInformation.setConfiguration(conf);
+
+    if (toProxy == null) {
+      throw new IllegalArgumentException("toProxy can't be null");
+    }
+
+    if (loginUser == null) {
+      log.info("No login user. Creating login user");
+      String keytab = verifySecureProperty(prop, PROXY_KEYTAB_LOCATION, log);
+      String proxyUser = verifySecureProperty(prop, PROXY_USER, log);
+      UserGroupInformation.loginUserFromKeytab(proxyUser, keytab);
+      loginUser = UserGroupInformation.getLoginUser();
+      log.info("Logged in with user " + loginUser);
+    } else {
+      log.info("loginUser (" + loginUser + ") already created, refreshing tgt.");
+      loginUser.checkTGTAndReloginFromKeytab();
+    }
+
+    return UserGroupInformation.createProxyUser(toProxy, loginUser);
+  }
+
+  /**
+   * Create a proxied user, taking all parameters, including which user to proxy
+   * from provided Properties.
+   */
+  public static UserGroupInformation getProxiedUser(Properties prop,
+      Logger log, Configuration conf) throws IOException {
+    String toProxy = verifySecureProperty(prop, TO_PROXY, log);
+    UserGroupInformation user = getProxiedUser(toProxy, prop, log, conf);
+    if (user == null)
+      throw new IOException(
+          "Proxy as any user in unsecured grid is not supported!"
+              + prop.toString());
+    log.info("created proxy user for " + user.getUserName() + user.toString());
+    return user;
+  }
+
+  public static String verifySecureProperty(Properties properties, String s,
+      Logger l) throws IOException {
+    String value = properties.getProperty(s);
+
+    if (value == null)
+      throw new IOException(s
+          + " not set in properties. Cannot use secure proxy");
+    l.info("Secure proxy configuration: Property " + s + " = " + value);
+    return value;
+  }
+
+  public static boolean shouldProxy(Properties prop) {
+    String shouldProxy = prop.getProperty(ENABLE_PROXYING);
+
+    return shouldProxy != null && shouldProxy.equals("true");
+  }
+
+  public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
+  public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
+      "mapreduce.job.credentials.binary";
+
+  public static synchronized void prefetchToken(final File tokenFile,
+      final Props p, final Logger logger) throws InterruptedException,
+      IOException {
+
+    final Configuration conf = new Configuration();
+    logger.info("Getting proxy user for " + p.getString(TO_PROXY));
+    logger.info("Getting proxy user for " + p.toString());
+
+    getProxiedUser(p.toProperties(), logger, conf).doAs(
+        new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            getToken(p);
+            return null;
+          }
+
+          private void getToken(Props p) throws InterruptedException,
+              IOException {
+            String shouldPrefetch = p.getString(OBTAIN_BINARY_TOKEN);
+            if (shouldPrefetch != null && shouldPrefetch.equals("true")) {
+              logger.info("Pre-fetching token");
+
+              logger.info("Pre-fetching fs token");
+              FileSystem fs = FileSystem.get(conf);
+              Token<?> fsToken =
+                  fs.getDelegationToken(p.getString("user.to.proxy"));
+              logger.info("Created token: " + fsToken.toString());
+
+              Job job =
+                  new Job(conf, "totally phony, extremely fake, not real job");
+              JobConf jc = new JobConf(conf);
+              JobClient jobClient = new JobClient(jc);
+              logger.info("Pre-fetching job token: Got new JobClient: " + jc);
+              Token<DelegationTokenIdentifier> mrdt =
+                  jobClient.getDelegationToken(new Text("hi"));
+              logger.info("Created token: " + mrdt.toString());
+
+              job.getCredentials().addToken(new Text("howdy"), mrdt);
+              job.getCredentials().addToken(fsToken.getService(), fsToken);
+
+              FileOutputStream fos = null;
+              DataOutputStream dos = null;
+              try {
+                fos = new FileOutputStream(tokenFile);
+                dos = new DataOutputStream(fos);
+                job.getCredentials().writeTokenStorageToStream(dos);
+              } finally {
+                if (dos != null) {
+                  dos.close();
+                }
+                if (fos != null) {
+                  fos.close();
+                }
+              }
+              logger.info("Loading hadoop tokens into "
+                  + tokenFile.getAbsolutePath());
+              p.put("HadoopTokenFileLoc", tokenFile.getAbsolutePath());
+            } else {
+              logger.info("Not pre-fetching token");
+            }
+          }
+        });
+  }
+}
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
new file mode 100644
index 0000000..70c6e17
--- /dev/null
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -0,0 +1,855 @@
+/*
+ * Copyright 2011 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.security;
+
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.security.commons.HadoopSecurityManagerException;
+import azkaban.utils.Props;
+import azkaban.utils.UndefinedPropertyException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Master;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
+
+  private static final String FS_HDFS_IMPL_DISABLE_CACHE =
+      "fs.hdfs.impl.disable.cache";
+
+  /** The Kerberos principal for the job tracker. */
+  public static final String JT_PRINCIPAL = JTConfig.JT_USER_NAME;
+  // "mapreduce.jobtracker.kerberos.principal";
+  /** The Kerberos principal for the resource manager. */
+  public static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
+
+  public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
+  public static final String HADOOP_JOB_TRACKER_2 =
+      "mapreduce.jobtracker.address";
+  public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
+
+  private static final String OTHER_NAMENODES_TO_GET_TOKEN = "other_namenodes";
+
+  /**
+   * the settings to be defined by user indicating if there are hcat locations
+   * other than the default one the system should pre-fetch hcat token from.
+   * Note: Multiple thrift uris are supported, use comma to separate the values,
+   * values are case insensitive.
+   * */
+  private static final String EXTRA_HCAT_LOCATION = "other_hcat_location";
+
+  /**
+   * the key that will be used to set proper signature for each of the hcat
+   * token when multiple hcat tokens are required to be fetched.
+   * */
+  public static final String HIVE_TOKEN_SIGNATURE_KEY =
+      "hive.metastore.token.signature";
+
+  public static final Text DEFAULT_RENEWER = new Text("azkaban mr tokens");
+
+  private static final String AZKABAN_KEYTAB_LOCATION = "proxy.keytab.location";
+  private static final String AZKABAN_PRINCIPAL = "proxy.user";
+  private static final String OBTAIN_JOBHISTORYSERVER_TOKEN =
+      "obtain.jobhistoryserver.token";
+
+  private UserGroupInformation loginUser = null;
+  private final static Logger logger = Logger
+      .getLogger(HadoopSecurityManager_H_2_0.class);
+  private Configuration conf;
+
+  private String keytabLocation;
+  private String keytabPrincipal;
+  private boolean shouldProxy = false;
+  private boolean securityEnabled = false;
+
+  private static HadoopSecurityManager hsmInstance = null;
+  private ConcurrentMap<String, UserGroupInformation> userUgiMap;
+
+  private static URLClassLoader ucl;
+
+  private final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  private HadoopSecurityManager_H_2_0(Props props)
+      throws HadoopSecurityManagerException, IOException {
+
+    // for now, assume the same/compatible native library, the same/compatible
+    // hadoop-core jar
+    String hadoopHome = props.getString("hadoop.home", null);
+    String hadoopConfDir = props.getString("hadoop.conf.dir", null);
+
+    if (hadoopHome == null) {
+      hadoopHome = System.getenv("HADOOP_HOME");
+    }
+    if (hadoopConfDir == null) {
+      hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
+    }
+
+    List<URL> resources = new ArrayList<URL>();
+    URL urlToHadoop = null;
+    if (hadoopConfDir != null) {
+      urlToHadoop = new File(hadoopConfDir).toURI().toURL();
+      logger.info("Using hadoop config found in " + urlToHadoop);
+      resources.add(urlToHadoop);
+    } else if (hadoopHome != null) {
+      urlToHadoop = new File(hadoopHome, "conf").toURI().toURL();
+      logger.info("Using hadoop config found in " + urlToHadoop);
+      resources.add(urlToHadoop);
+    } else {
+      logger.info("HADOOP_HOME not set, using default hadoop config.");
+    }
+
+    ucl = new URLClassLoader(resources.toArray(new URL[resources.size()]));
+
+    conf = new Configuration();
+    conf.setClassLoader(ucl);
+
+    if (props.containsKey(FS_HDFS_IMPL_DISABLE_CACHE)) {
+      logger.info("Setting " + FS_HDFS_IMPL_DISABLE_CACHE + " to "
+          + props.get(FS_HDFS_IMPL_DISABLE_CACHE));
+      conf.setBoolean(FS_HDFS_IMPL_DISABLE_CACHE,
+          Boolean.valueOf(props.get(FS_HDFS_IMPL_DISABLE_CACHE)));
+    }
+
+    logger.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + ": "
+        + conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
+    logger.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION + ":  "
+        + conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION));
+    logger.info(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY + ": "
+        + conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
+
+    UserGroupInformation.setConfiguration(conf);
+
+    securityEnabled = UserGroupInformation.isSecurityEnabled();
+    if (securityEnabled) {
+      logger.info("The Hadoop cluster has enabled security");
+      shouldProxy = true;
+      try {
+
+        keytabLocation = props.getString(AZKABAN_KEYTAB_LOCATION);
+        keytabPrincipal = props.getString(AZKABAN_PRINCIPAL);
+      } catch (UndefinedPropertyException e) {
+        throw new HadoopSecurityManagerException(e.getMessage());
+      }
+
+      // try login
+      try {
+        if (loginUser == null) {
+          logger.info("No login user. Creating login user");
+          logger.info("Using principal from " + keytabPrincipal + " and "
+              + keytabLocation);
+          UserGroupInformation.loginUserFromKeytab(keytabPrincipal,
+              keytabLocation);
+          loginUser = UserGroupInformation.getLoginUser();
+          logger.info("Logged in with user " + loginUser);
+        } else {
+          logger.info("loginUser (" + loginUser
+              + ") already created, refreshing tgt.");
+          loginUser.checkTGTAndReloginFromKeytab();
+        }
+      } catch (IOException e) {
+        throw new HadoopSecurityManagerException(
+            "Failed to login with kerberos ", e);
+      }
+
+    }
+
+    userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
+
+    logger.info("Hadoop Security Manager initialized");
+  }
+
+  public static HadoopSecurityManager getInstance(Props props)
+      throws HadoopSecurityManagerException, IOException {
+    if (hsmInstance == null) {
+      synchronized (HadoopSecurityManager_H_2_0.class) {
+        if (hsmInstance == null) {
+          logger.info("getting new instance");
+          hsmInstance = new HadoopSecurityManager_H_2_0(props);
+        }
+      }
+    }
+
+    logger.debug("Relogging in from keytab if necessary.");
+    hsmInstance.reloginFromKeytab();
+
+    return hsmInstance;
+  }
+
+  /**
+   * Create a proxied user based on the explicit user name, taking other
+   * parameters necessary from properties file.
+   *
+   * @throws IOException
+   */
+  @Override
+  public synchronized UserGroupInformation getProxiedUser(String userToProxy)
+      throws HadoopSecurityManagerException {
+
+    if (userToProxy == null) {
+      throw new HadoopSecurityManagerException("userToProxy can't be null");
+    }
+
+    UserGroupInformation ugi = userUgiMap.get(userToProxy);
+    if (ugi == null) {
+      logger.info("proxy user " + userToProxy
+          + " not exist. Creating new proxy user");
+      if (shouldProxy) {
+        try {
+          ugi =
+              UserGroupInformation.createProxyUser(userToProxy,
+                  UserGroupInformation.getLoginUser());
+        } catch (IOException e) {
+          throw new HadoopSecurityManagerException(
+              "Failed to create proxy user", e);
+        }
+      } else {
+        ugi = UserGroupInformation.createRemoteUser(userToProxy);
+      }
+      userUgiMap.putIfAbsent(userToProxy, ugi);
+    }
+    return ugi;
+  }
+
+  /**
+   * Create a proxied user, taking all parameters, including which user to proxy
+   * from provided Properties.
+   */
+  @Override
+  public UserGroupInformation getProxiedUser(Props userProp)
+      throws HadoopSecurityManagerException {
+    String userToProxy = verifySecureProperty(userProp, USER_TO_PROXY);
+    UserGroupInformation user = getProxiedUser(userToProxy);
+    if (user == null) {
+      throw new HadoopSecurityManagerException(
+          "Proxy as any user in unsecured grid is not supported!");
+    }
+    return user;
+  }
+
+  public String verifySecureProperty(Props props, String s)
+      throws HadoopSecurityManagerException {
+    String value = props.getString(s);
+    if (value == null) {
+      throw new HadoopSecurityManagerException(s + " not set in properties.");
+    }
+    return value;
+  }
+
+  @Override
+  public FileSystem getFSAsUser(String user)
+      throws HadoopSecurityManagerException {
+    FileSystem fs;
+    try {
+      logger.info("Getting file system as " + user);
+      UserGroupInformation ugi = getProxiedUser(user);
+
+      if (ugi != null) {
+        fs = ugi.doAs(new PrivilegedAction<FileSystem>() {
+
+          @Override
+          public FileSystem run() {
+            try {
+              return FileSystem.get(conf);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+      } else {
+        fs = FileSystem.get(conf);
+      }
+    } catch (Exception e) {
+      throw new HadoopSecurityManagerException("Failed to get FileSystem. ", e);
+    }
+    return fs;
+  }
+
+  public boolean shouldProxy() {
+    return shouldProxy;
+  }
+
+  @Override
+  public boolean isHadoopSecurityEnabled() {
+    return securityEnabled;
+  }
+
+  /*
+   * Gets hadoop tokens for a user to run mapred/pig jobs on a secured cluster
+   */
+  @Override
+  public synchronized void prefetchToken(final File tokenFile,
+      final String userToProxy, final Logger logger)
+      throws HadoopSecurityManagerException {
+
+    logger.info("Getting hadoop tokens for " + userToProxy);
+
+    try {
+      getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          getToken(userToProxy);
+          return null;
+        }
+
+        private void getToken(String userToProxy) throws InterruptedException,
+            IOException, HadoopSecurityManagerException {
+
+          FileSystem fs = FileSystem.get(conf);
+          // check if we get the correct FS, and most importantly, the conf
+          logger.info("Getting DFS token from " + fs.getCanonicalServiceName()
+              + fs.getUri());
+          Token<?> fsToken = fs.getDelegationToken(userToProxy);
+          if (fsToken == null) {
+            logger.error("Failed to fetch DFS token for ");
+            throw new HadoopSecurityManagerException(
+                "Failed to fetch DFS token for " + userToProxy);
+          }
+          logger.info("Created DFS token: " + fsToken.toString());
+          logger.info("Token kind: " + fsToken.getKind());
+          logger.info("Token id: " + Arrays.toString(fsToken.getIdentifier()));
+          logger.info("Token service: " + fsToken.getService());
+
+          JobConf jc = new JobConf(conf);
+          JobClient jobClient = new JobClient(jc);
+          logger.info("Pre-fetching JT token: Got new JobClient: " + jc);
+
+          Token<DelegationTokenIdentifier> mrdt =
+              jobClient.getDelegationToken(new Text("mr token"));
+          if (mrdt == null) {
+            logger.error("Failed to fetch JT token for ");
+            throw new HadoopSecurityManagerException(
+                "Failed to fetch JT token for " + userToProxy);
+          }
+          logger.info("Created JT token: " + mrdt.toString());
+          logger.info("Token kind: " + mrdt.getKind());
+          logger.info("Token id: " + Arrays.toString(mrdt.getIdentifier()));
+          logger.info("Token service: " + mrdt.getService());
+
+          jc.getCredentials().addToken(mrdt.getService(), mrdt);
+          jc.getCredentials().addToken(fsToken.getService(), fsToken);
+
+          FileOutputStream fos = null;
+          DataOutputStream dos = null;
+          try {
+            fos = new FileOutputStream(tokenFile);
+            dos = new DataOutputStream(fos);
+            jc.getCredentials().writeTokenStorageToStream(dos);
+          } finally {
+            if (dos != null) {
+              try {
+                dos.close();
+              } catch (Throwable t) {
+                // best effort
+                logger
+                    .error(
+                        "encountered exception while closing DataOutputStream of the tokenFile",
+                        t);
+              }
+            }
+            if (fos != null) {
+              fos.close();
+            }
+          }
+          // stash them to cancel after use.
+          logger.info("Tokens loaded in " + tokenFile.getAbsolutePath());
+        }
+      });
+    } catch (Exception e) {
+      throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
+          + e.getMessage() + e.getCause());
+
+    }
+  }
+
+  private void cancelNameNodeToken(final Token<? extends TokenIdentifier> t,
+      String userToProxy) throws HadoopSecurityManagerException {
+    try {
+      getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          cancelToken(t);
+          return null;
+        }
+
+        private void cancelToken(Token<?> nt) throws IOException,
+            InterruptedException {
+          nt.cancel(conf);
+        }
+      });
+    } catch (Exception e) {
+      throw new HadoopSecurityManagerException("Failed to cancel token. "
+          + e.getMessage() + e.getCause(), e);
+    }
+  }
+
+  private void cancelMRJobTrackerToken(
+      final Token<? extends TokenIdentifier> t, String userToProxy)
+      throws HadoopSecurityManagerException {
+    try {
+      getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction<Void>() {
+        @SuppressWarnings("unchecked")
+        @Override
+        public Void run() throws Exception {
+          cancelToken((Token<DelegationTokenIdentifier>) t);
+          return null;
+        }
+
+        private void cancelToken(Token<DelegationTokenIdentifier> jt)
+            throws IOException, InterruptedException {
+          JobConf jc = new JobConf(conf);
+          JobClient jobClient = new JobClient(jc);
+          jobClient.cancelDelegationToken(jt);
+        }
+      });
+    } catch (Exception e) {
+      throw new HadoopSecurityManagerException("Failed to cancel token. "
+          + e.getMessage() + e.getCause(), e);
+    }
+  }
+
+  private void cancelJhsToken(final Token<? extends TokenIdentifier> t,
+      String userToProxy) throws HadoopSecurityManagerException {
+    // it appears yarn would clean up this token after app finish, after a long
+    // while though.
+    org.apache.hadoop.yarn.api.records.Token token =
+        org.apache.hadoop.yarn.api.records.Token.newInstance(t.getIdentifier(),
+            t.getKind().toString(), t.getPassword(), t.getService().toString());
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress jhsAddress = SecurityUtil.getTokenServiceAddr(t);
+    MRClientProtocol jhsProxy = null;
+    try {
+      jhsProxy =
+          UserGroupInformation.getCurrentUser().doAs(
+              new PrivilegedAction<MRClientProtocol>() {
+                @Override
+                public MRClientProtocol run() {
+                  return (MRClientProtocol) rpc.getProxy(
+                      HSClientProtocol.class, jhsAddress, conf);
+                }
+              });
+      CancelDelegationTokenRequest request =
+          Records.newRecord(CancelDelegationTokenRequest.class);
+      request.setDelegationToken(token);
+      jhsProxy.cancelDelegationToken(request);
+    } catch (Exception e) {
+      throw new HadoopSecurityManagerException("Failed to cancel token. "
+          + e.getMessage() + e.getCause(), e);
+    } finally {
+      RPC.stopProxy(jhsProxy);
+    }
+
+  }
+
+  private void cancelHiveToken(final Token<? extends TokenIdentifier> t,
+      String userToProxy) throws HadoopSecurityManagerException {
+    try {
+      HiveConf hiveConf = new HiveConf();
+      HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
+      hiveClient.cancelDelegationToken(t.encodeToUrlString());
+    } catch (Exception e) {
+      throw new HadoopSecurityManagerException("Failed to cancel Token. "
+          + e.getMessage() + e.getCause(), e);
+    }
+  }
+
+  @Override
+  public void cancelTokens(File tokenFile, String userToProxy, Logger logger)
+      throws HadoopSecurityManagerException {
+    // nntoken
+    Credentials cred = null;
+    try {
+      cred =
+          Credentials.readTokenStorageFile(new Path(tokenFile.toURI()),
+              new Configuration());
+      for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) {
+
+        logger.info("Got token: " + t.toString());
+        logger.info("Token kind: " + t.getKind());
+        logger.info("Token id: " + new String(t.getIdentifier()));
+        logger.info("Token service: " + t.getService());
+
+        if (t.getKind().equals(new Text("HIVE_DELEGATION_TOKEN"))) {
+          logger.info("Cancelling hive token " + new String(t.getIdentifier()));
+          cancelHiveToken(t, userToProxy);
+        } else if (t.getKind().equals(new Text("RM_DELEGATION_TOKEN"))) {
+          logger.info("Cancelling mr job tracker token "
+              + new String(t.getIdentifier()));
+          // cancelMRJobTrackerToken(t, userToProxy);
+        } else if (t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+          logger.info("Cancelling namenode token "
+              + new String(t.getIdentifier()));
+          // cancelNameNodeToken(t, userToProxy);
+        } else if (t.getKind().equals(new Text("MR_DELEGATION_TOKEN"))) {
+          logger.info("Cancelling jobhistoryserver mr token "
+              + new String(t.getIdentifier()));
+          // cancelJhsToken(t, userToProxy);
+        } else {
+          logger.info("unknown token type " + t.getKind());
+        }
+      }
+    } catch (Exception e) {
+      throw new HadoopSecurityManagerException("Failed to cancel tokens "
+          + e.getMessage() + e.getCause(), e);
+    }
+
+  }
+
+  /**
+   * function to fetch hcat token as per the specified hive configuration and
+   * then store the token in to the credential store specified .
+   *
+   * @param userToProxy String value indicating the name of the user the token
+   *          will be fetched for.
+   * @param hiveConf the configuration based off which the hive client will be
+   *          initialized.
+   * @param logger the logger instance which writes the logging content to the
+   *          job logs.
+   *
+   * @throws IOException
+   * @throws TException
+   * @throws MetaException
+   *
+   * */
+  private Token<DelegationTokenIdentifier> fetchHcatToken(String userToProxy,
+      HiveConf hiveConf, String tokenSignatureOverwrite, final Logger logger)
+      throws IOException, MetaException, TException {
+
+    logger.info(HiveConf.ConfVars.METASTOREURIS.varname + ": "
+        + hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
+
+    logger.info(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname + ": "
+        + hiveConf.get(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname));
+
+    logger.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": "
+        + hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
+
+    HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
+    String hcatTokenStr =
+        hiveClient.getDelegationToken(userToProxy, UserGroupInformation
+            .getLoginUser().getShortUserName());
+    Token<DelegationTokenIdentifier> hcatToken =
+        new Token<DelegationTokenIdentifier>();
+    hcatToken.decodeFromUrlString(hcatTokenStr);
+
+    // overwrite the value of the service property of the token if the signature
+    // override is specified.
+    if (tokenSignatureOverwrite != null
+        && tokenSignatureOverwrite.trim().length() > 0) {
+      hcatToken.setService(new Text(tokenSignatureOverwrite.trim()
+          .toLowerCase()));
+
+      logger.info(HIVE_TOKEN_SIGNATURE_KEY + ":"
+          + (tokenSignatureOverwrite == null ? "" : tokenSignatureOverwrite));
+    }
+
+    logger.info("Created hive metastore token: " + hcatTokenStr);
+    logger.info("Token kind: " + hcatToken.getKind());
+    logger.info("Token id: " + Arrays.toString(hcatToken.getIdentifier()));
+    logger.info("Token service: " + hcatToken.getService());
+    return hcatToken;
+  }
+
+  /*
+   * Gets hadoop tokens for a user to run mapred/hive jobs on a secured cluster
+   */
+  @Override
+  public synchronized void prefetchToken(final File tokenFile,
+      final Props props, final Logger logger)
+      throws HadoopSecurityManagerException {
+
+    final String userToProxy = props.getString(USER_TO_PROXY);
+
+    logger.info("Getting hadoop tokens based on props for " + userToProxy);
+
+    final Credentials cred = new Credentials();
+
+    if (props.getBoolean(OBTAIN_HCAT_TOKEN, false)) {
+      try {
+
+        // first we fetch and save the default hcat token.
+        logger.info("Pre-fetching default Hive MetaStore token from hive");
+
+        HiveConf hiveConf = new HiveConf();
+        Token<DelegationTokenIdentifier> hcatToken =
+            fetchHcatToken(userToProxy, hiveConf, null, logger);
+
+        cred.addToken(hcatToken.getService(), hcatToken);
+
+        // check and see if user specified the extra hcat locations we need to
+        // look at and fetch token.
+        final List<String> extraHcatLocations =
+            props.getStringList(EXTRA_HCAT_LOCATION);
+        if (Collections.EMPTY_LIST != extraHcatLocations) {
+          logger.info("Need to pre-fetch extra metaStore tokens from hive.");
+
+          // start to process the user inputs.
+          for (String thriftUrl : extraHcatLocations) {
+            logger.info("Pre-fetching metaStore token from : " + thriftUrl);
+
+            hiveConf = new HiveConf();
+            hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
+            hcatToken =
+                fetchHcatToken(userToProxy, hiveConf, thriftUrl, logger);
+            cred.addToken(hcatToken.getService(), hcatToken);
+          }
+
+        }
+
+      } catch (Throwable t) {
+        String message =
+            "Failed to get hive metastore token." + t.getMessage()
+                + t.getCause();
+        logger.error(message, t);
+        throw new HadoopSecurityManagerException(message);
+      }
+    }
+
+    if (props.getBoolean(OBTAIN_JOBHISTORYSERVER_TOKEN, false)) {
+      YarnRPC rpc = YarnRPC.create(conf);
+      final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
+
+      logger.debug("Connecting to HistoryServer at: " + serviceAddr);
+      HSClientProtocol hsProxy =
+          (HSClientProtocol) rpc.getProxy(HSClientProtocol.class,
+              NetUtils.createSocketAddr(serviceAddr), conf);
+      logger.info("Pre-fetching JH token from job history server");
+
+      Token<?> jhsdt = null;
+      try {
+        jhsdt = getDelegationTokenFromHS(hsProxy);
+      } catch (Exception e) {
+        logger.error("Failed to fetch JH token", e);
+        throw new HadoopSecurityManagerException(
+            "Failed to fetch JH token for " + userToProxy);
+      }
+
+      if (jhsdt == null) {
+        logger.error("getDelegationTokenFromHS() returned null");
+        throw new HadoopSecurityManagerException(
+            "Unable to fetch JH token for " + userToProxy);
+      }
+
+      logger.info("Created JH token: " + jhsdt.toString());
+      logger.info("Token kind: " + jhsdt.getKind());
+      logger.info("Token id: " + Arrays.toString(jhsdt.getIdentifier()));
+      logger.info("Token service: " + jhsdt.getService());
+
+      cred.addToken(jhsdt.getService(), jhsdt);
+    }
+
+    try {
+      getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          getToken(userToProxy);
+          return null;
+        }
+
+        private void getToken(String userToProxy) throws InterruptedException,
+            IOException, HadoopSecurityManagerException {
+          logger.info("Here is the props for " + OBTAIN_NAMENODE_TOKEN + ": "
+              + props.getBoolean(OBTAIN_NAMENODE_TOKEN));
+          if (props.getBoolean(OBTAIN_NAMENODE_TOKEN, false)) {
+            FileSystem fs = FileSystem.get(conf);
+            // check if we get the correct FS, and most importantly, the
+            // conf
+            logger.info("Getting DFS token from " + fs.getUri());
+            Token<?> fsToken =
+                fs.getDelegationToken(getMRTokenRenewerInternal(new JobConf())
+                    .toString());
+            if (fsToken == null) {
+              logger.error("Failed to fetch DFS token for ");
+              throw new HadoopSecurityManagerException(
+                  "Failed to fetch DFS token for " + userToProxy);
+            }
+            logger.info("Created DFS token: " + fsToken.toString());
+            logger.info("Token kind: " + fsToken.getKind());
+            logger.info("Token id: " + Arrays.toString(fsToken.getIdentifier()));
+            logger.info("Token service: " + fsToken.getService());
+
+            cred.addToken(fsToken.getService(), fsToken);
+
+            // getting additional name nodes tokens
+            String otherNamenodes = props.get(OTHER_NAMENODES_TO_GET_TOKEN);
+            if ((otherNamenodes != null) && (otherNamenodes.length() > 0)) {
+              logger.info(OTHER_NAMENODES_TO_GET_TOKEN + ": '" + otherNamenodes
+                  + "'");
+              String[] nameNodeArr = otherNamenodes.split(",");
+              Path[] ps = new Path[nameNodeArr.length];
+              for (int i = 0; i < ps.length; i++) {
+                ps[i] = new Path(nameNodeArr[i].trim());
+              }
+              TokenCache.obtainTokensForNamenodes(cred, ps, conf);
+              logger.info("Successfully fetched tokens for: " + otherNamenodes);
+            } else {
+              logger.info(OTHER_NAMENODES_TO_GET_TOKEN + " was not configured");
+            }
+          }
+
+          if (props.getBoolean(OBTAIN_JOBTRACKER_TOKEN, false)) {
+            JobConf jobConf = new JobConf();
+            JobClient jobClient = new JobClient(jobConf);
+            logger.info("Pre-fetching JT token from JobTracker");
+
+            Token<DelegationTokenIdentifier> mrdt =
+                jobClient
+                    .getDelegationToken(getMRTokenRenewerInternal(jobConf));
+            if (mrdt == null) {
+              logger.error("Failed to fetch JT token");
+              throw new HadoopSecurityManagerException(
+                  "Failed to fetch JT token for " + userToProxy);
+            }
+            logger.info("Created JT token: " + mrdt.toString());
+            logger.info("Token kind: " + mrdt.getKind());
+            logger.info("Token id: " + Arrays.toString(mrdt.getIdentifier()));
+            logger.info("Token service: " + mrdt.getService());
+            cred.addToken(mrdt.getService(), mrdt);
+          }
+
+        }
+      });
+
+      FileOutputStream fos = null;
+      DataOutputStream dos = null;
+      try {
+        fos = new FileOutputStream(tokenFile);
+        dos = new DataOutputStream(fos);
+        cred.writeTokenStorageToStream(dos);
+      } finally {
+        if (dos != null) {
+          try {
+            dos.close();
+          } catch (Throwable t) {
+            // best effort
+            logger
+                .error(
+                    "encountered exception while closing DataOutputStream of the tokenFile",
+                    t);
+          }
+        }
+        if (fos != null) {
+          fos.close();
+        }
+      }
+      // stash them to cancel after use.
+
+      logger.info("Tokens loaded in " + tokenFile.getAbsolutePath());
+
+    } catch (Exception e) {
+      throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
+          + e.getMessage() + e.getCause(), e);
+    } catch (Throwable t) {
+      throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
+          + t.getMessage() + t.getCause(), t);
+    }
+
+  }
+
+  private Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
+    // Taken from Oozie
+    //
+    // Getting renewer correctly for JT principal also though JT in hadoop
+    // 1.x does not have
+    // support for renewing/cancelling tokens
+    String servicePrincipal =
+        jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
+    Text renewer;
+    if (servicePrincipal != null) {
+      String target =
+          jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
+      if (target == null) {
+        target = jobConf.get(HADOOP_JOB_TRACKER);
+      }
+
+      String addr = NetUtils.createSocketAddr(target).getHostName();
+      renewer =
+          new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
+    } else {
+      // No security
+      renewer = DEFAULT_RENEWER;
+    }
+
+    return renewer;
+  }
+
+  private Token<?> getDelegationTokenFromHS(HSClientProtocol hsProxy)
+      throws IOException, InterruptedException {
+    GetDelegationTokenRequest request =
+        recordFactory.newRecordInstance(GetDelegationTokenRequest.class);
+    request.setRenewer(Master.getMasterPrincipal(conf));
+    org.apache.hadoop.yarn.api.records.Token mrDelegationToken;
+    mrDelegationToken =
+        hsProxy.getDelegationToken(request).getDelegationToken();
+    return ConverterUtils.convertFromYarn(mrDelegationToken,
+        hsProxy.getConnectAddress());
+  }
+
+  private void cancelDelegationTokenFromHS(
+      final org.apache.hadoop.yarn.api.records.Token t, HSClientProtocol hsProxy)
+      throws IOException, InterruptedException {
+    CancelDelegationTokenRequest request =
+        recordFactory.newRecordInstance(CancelDelegationTokenRequest.class);
+    request.setDelegationToken(t);
+    hsProxy.cancelDelegationToken(request);
+  }
+
+}

settings.gradle 1(+1 -0)

diff --git a/settings.gradle b/settings.gradle
index 7917a49..fe41b0b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,5 +1,6 @@
 include 'azkaban-common'
 include 'azkaban-execserver'
+include 'azkaban-hadoop-security-plugin'
 include 'azkaban-migration'
 include 'azkaban-soloserver'
 include 'azkaban-sql'