azkaban-aplcache

Enable new configuration other_hcat_clusters in AZ to talk

10/2/2017 9:01:10 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index b21a4fe..b60100f 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -160,6 +160,24 @@ public class Constants {
     // Job property that enables/disables using Kafka logging of user job logs
     public static final String AZKABAN_JOB_LOGGING_KAFKA_ENABLE = "azkaban.job.logging.kafka.enable";
 
+    /*
+     * this parameter is used to replace EXTRA_HCAT_LOCATION that could fail when one of the uris is not available.
+     * EXTRA_HCAT_CLUSTERS has the following format:
+     * other_hcat_clusters = "thrift://hcat1:port,thrift://hcat2:port;thrift://hcat3:port,thrift://hcat4:port"
+     * Each string in the parenthesis is regarded as a "cluster", and we will get a delegation token from each cluster.
+     * The uris(hcat servers) in a "cluster" ensures HA is provided.
+     **/
+    public static final String EXTRA_HCAT_CLUSTERS = "azkaban.job.hive.other_hcat_clusters";
+
+    /*
+     * the settings to be defined by user indicating if there are hcat locations other than the
+     * default one the system should pre-fetch hcat token from. Note: Multiple thrift uris are
+     * supported, use comma to separate the values, values are case insensitive.
+     **/
+    // Use EXTRA_HCAT_CLUSTERS instead
+    @Deprecated
+    public static final String EXTRA_HCAT_LOCATION = "other_hcat_location";
+
     // Job properties that indicate maximum memory size
     public static final String JOB_MAX_XMS = "job.max.Xms";
     public static final String MAX_XMS_DEFAULT = "1G";
diff --git a/az-core/src/main/java/azkaban/utils/Props.java b/az-core/src/main/java/azkaban/utils/Props.java
index fd0cfd1..7df3c35 100644
--- a/az-core/src/main/java/azkaban/utils/Props.java
+++ b/az-core/src/main/java/azkaban/utils/Props.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -430,6 +431,23 @@ public class Props {
   }
 
   /**
+   * Returns a list of clusters with the comma as the separator of the value
+   * e.g., for input string: "thrift://hcat1:port,thrift://hcat2:port;thrift://hcat3:port,thrift://hcat4:port;"
+   * we will get ["thrift://hcat1:port,thrift://hcat2:port", "thrift://hcat3:port,thrift://hcat4:port"] as output
+   */
+  public List<String> getStringListFromCluster(final String key) {
+    List<String> curlist = getStringList(key, "\\s*;\\s*");
+    // remove empty elements in the array
+    for (Iterator<String> iter = curlist.listIterator(); iter.hasNext(); ) {
+      String a = iter.next();
+      if (a.length() == 0) {
+        iter.remove();
+      }
+    }
+    return curlist;
+  }
+
+  /**
    * Returns a list of strings with the sep as the separator of the value
    */
   public List<String> getStringList(final String key, final String sep) {
diff --git a/az-core/src/test/java/azkaban/utils/PropsTest.java b/az-core/src/test/java/azkaban/utils/PropsTest.java
new file mode 100644
index 0000000..c669b6b
--- /dev/null
+++ b/az-core/src/test/java/azkaban/utils/PropsTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.utils;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class for azkaban.utils.Props
+ */
+public class PropsTest {
+  Props p = new Props();
+  private static final String EXTRA_HCAT_CLUSTERS = "other_hcat_clusters";
+
+  /* Test for getStringListFromCluster(String s)*/
+  @Test
+  public void testSplit1() {
+    String s1 = "thrift://hcat1:port,thrift://hcat2:port;thrift://hcat3:port,thrift://hcat4:port;";
+    p.put(EXTRA_HCAT_CLUSTERS, s1);
+    List<String> s2 = Arrays.asList("thrift://hcat1:port,thrift://hcat2:port" , "thrift://hcat3:port,thrift://hcat4:port");
+    Assert.assertTrue(p.getStringListFromCluster(EXTRA_HCAT_CLUSTERS).equals(s2));
+
+    String s3 = "thrift://hcat1:port,thrift://hcat2:port     ;      thrift://hcat3:port,thrift://hcat4:port;";
+    p.put(EXTRA_HCAT_CLUSTERS, s3);
+    List<String> s4 = Arrays.asList( "thrift://hcat1:port,thrift://hcat2:port" , "thrift://hcat3:port,thrift://hcat4:port");
+    Assert.assertTrue(p.getStringListFromCluster(EXTRA_HCAT_CLUSTERS).equals(s4));
+
+    String s5 = "thrift://hcat1:port,thrift://hcat2:port";
+    p.put(EXTRA_HCAT_CLUSTERS, s5);
+    List<String> s6 = Arrays.asList("thrift://hcat1:port,thrift://hcat2:port");
+    Assert.assertTrue(p.getStringListFromCluster(EXTRA_HCAT_CLUSTERS).equals(s6));
+  }
+}
\ No newline at end of file
diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
index 1082847..e2ed7f9 100644
--- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/HadoopSecurityManager_H_2_0.java
@@ -17,6 +17,8 @@
 package azkaban.security;
 
 import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_NATIVE_LIB_FOLDER;
+import static azkaban.Constants.JobProperties.EXTRA_HCAT_CLUSTERS;
+import static azkaban.Constants.JobProperties.EXTRA_HCAT_LOCATION;
 import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
 
 import azkaban.security.commons.HadoopSecurityManager;
@@ -116,12 +118,6 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
   private static final String FS_HDFS_IMPL_DISABLE_CACHE =
       "fs.hdfs.impl.disable.cache";
   private static final String OTHER_NAMENODES_TO_GET_TOKEN = "other_namenodes";
-  /**
-   * the settings to be defined by user indicating if there are hcat locations other than the
-   * default one the system should pre-fetch hcat token from. Note: Multiple thrift uris are
-   * supported, use comma to separate the values, values are case insensitive.
-   */
-  private static final String EXTRA_HCAT_LOCATION = "other_hcat_location";
   private static final String AZKABAN_KEYTAB_LOCATION = "proxy.keytab.location";
   private static final String AZKABAN_PRINCIPAL = "proxy.user";
   private static final String OBTAIN_JOBHISTORYSERVER_TOKEN =
@@ -484,7 +480,8 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
 
     // overwrite the value of the service property of the token if the signature
     // override is specified.
-    if (tokenSignatureOverwrite != null
+    // If the service field is set, do not overwrite that
+    if (hcatToken.getService().getLength() <= 0 && tokenSignatureOverwrite != null
         && tokenSignatureOverwrite.trim().length() > 0) {
       hcatToken.setService(new Text(tokenSignatureOverwrite.trim()
           .toLowerCase()));
@@ -525,24 +522,38 @@ public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
 
         cred.addToken(hcatToken.getService(), hcatToken);
 
-        // check and see if user specified the extra hcat locations we need to
-        // look at and fetch token.
-        final List<String> extraHcatLocations =
-            props.getStringList(EXTRA_HCAT_LOCATION);
-        if (Collections.EMPTY_LIST != extraHcatLocations) {
-          logger.info("Need to pre-fetch extra metaStore tokens from hive.");
+        // Added support for extra_hcat_clusters
+        final List<String> extraHcatClusters = props.getStringListFromCluster(EXTRA_HCAT_CLUSTERS);
+        if (Collections.EMPTY_LIST != extraHcatClusters) {
+          logger.info("Need to pre-fetch extra metaStore tokens from extra hive clusters.");
 
           // start to process the user inputs.
-          for (final String thriftUrl : extraHcatLocations) {
-            logger.info("Pre-fetching metaStore token from : " + thriftUrl);
+          for (final String thriftUrls : extraHcatClusters) {
+            logger.info("Pre-fetching metaStore token from cluster : " + thriftUrls);
 
             hiveConf = new HiveConf();
-            hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
-            hcatToken =
-                fetchHcatToken(userToProxy, hiveConf, thriftUrl, logger);
+            hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrls);
+            hcatToken = fetchHcatToken(userToProxy, hiveConf, thriftUrls, logger);
             cred.addToken(hcatToken.getService(), hcatToken);
           }
-
+        } else {
+          // Only if EXTRA_HCAT_CLUSTERS
+          final List<String> extraHcatLocations =
+              props.getStringList(EXTRA_HCAT_LOCATION);
+          if (Collections.EMPTY_LIST != extraHcatLocations) {
+            logger.info("Need to pre-fetch extra metaStore tokens from hive.");
+
+            // start to process the user inputs.
+            for (final String thriftUrl : extraHcatLocations) {
+              logger.info("Pre-fetching metaStore token from : " + thriftUrl);
+
+              hiveConf = new HiveConf();
+              hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
+              hcatToken =
+                  fetchHcatToken(userToProxy, hiveConf, thriftUrl, logger);
+              cred.addToken(hcatToken.getService(), hcatToken);
+            }
+          }
         }
 
       } catch (final Throwable t) {