azkaban-developers

Merge pull request #495 from johnyu0520/krb5_hotfix setup

9/21/2015 9:39:59 PM
2.7.0

Details

diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index 0c98ecb..8c208a5 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -133,6 +133,11 @@ public class CommonJobProperties {
    * hotspot occurs.
    */
   public static final String PROJECT_VERSION = "azkaban.flow.projectversion";
+  
+  /**
+   * Find out who is the submit user, in addition to the user.to.proxy (they may be different)
+   */
+  public static final String SUBMIT_USER = "azkaban.flow.submituser";
 
   /**
    * A uuid assigned to every execution
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 4e268a0..3ba6d86 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.utils.process.AzkabanProcess;
 import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
 import azkaban.utils.Pair;
@@ -36,15 +37,29 @@ import azkaban.utils.SystemMemoryInfo;
 public class ProcessJob extends AbstractProcessJob {
 
   public static final String COMMAND = "command";
+
   private static final long KILL_TIME_MS = 5000;
+
   private volatile AzkabanProcess process;
+
   private static final String MEMCHECK_ENABLED = "memCheck.enabled";
-  private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
+
+  private static final String MEMCHECK_FREEMEMDECRAMT =
+      "memCheck.freeMemDecrAmt";
+
   public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
 
+  public static final String USER_TO_PROXY = "user.to.proxy";
+
+  public static final String KRB5CCNAME = "KRB5CCNAME";
+
   public ProcessJob(final String jobId, final Props sysProps,
       final Props jobProps, final Logger log) {
     super(jobId, sysProps, jobProps, log);
+
+    // this is in line with what other job types (hadoopJava, spark, pig, hive)
+    // is doing
+    jobProps.put(CommonJobProperties.JOB_ID, jobId);
   }
 
   @Override
@@ -55,13 +70,19 @@ public class ProcessJob extends AbstractProcessJob {
       handleError("Bad property definition! " + e.getMessage(), e);
     }
 
-    if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
+    if (sysProps.getBoolean(MEMCHECK_ENABLED, true)
+        && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
       long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
       Pair<Long, Long> memPair = getProcMemoryRequirement();
-      boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+      boolean isMemGranted =
+          SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(),
+              memPair.getSecond(), freeMemDecrAmt);
       if (!isMemGranted) {
-        throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
-                memPair.getFirst(), memPair.getSecond(), getId()));
+        throw new Exception(
+            String
+                .format(
+                    "Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+                    memPair.getFirst(), memPair.getSecond(), getId()));
       }
     }
 
@@ -82,6 +103,9 @@ public class ProcessJob extends AbstractProcessJob {
     File[] propFiles = initPropsFiles();
     Map<String, String> envVars = getEnvironmentVariables();
 
+    // change krb5ccname env var so that each job execution gets its own cache
+    envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));
+
     for (String command : commands) {
       info("Command: " + command);
       AzkabanProcessBuilder builder =
@@ -120,9 +144,61 @@ public class ProcessJob extends AbstractProcessJob {
   }
 
   /**
+   * <pre>
+   * This method extracts the kerberos ticket cache file name from the jobprops.
+   * This method will ensure that each job execution will have its own kerberos ticket cache file
+   * Given that the code only sets an environmental variable, the number of files created corresponds
+   * to the number of processes that are doing kinit in their flow, which should not be an inordinately 
+   * high number.
+   * </pre>
+   * 
+   * @return file name: the kerberos ticket cache file to use
+   */
+  private String getKrb5ccname(Props jobProps) {
+    String effectiveUser = getEffectiveUser(jobProps);
+    String projectName =
+        jobProps.getString(CommonJobProperties.PROJECT_NAME).replace(" ", "_");
+    String flowId =
+        jobProps.getString(CommonJobProperties.FLOW_ID).replace(" ", "_");
+    String jobId =
+        jobProps.getString(CommonJobProperties.JOB_ID).replace(" ", "_");
+    // execId should be an int and should not have space in it, ever
+    String execId = jobProps.getString(CommonJobProperties.EXEC_ID);
+    String krb5ccname =
+        String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId,
+            jobId, execId, effectiveUser);
+
+    return krb5ccname;
+  }
+
+  /**
+   * <pre>
+   * Determines what user id should the process job run as, in the following order of precedence:
+   * 1. USER_TO_PROXY
+   * 2. SUBMIT_USER
+   * </pre>
+   * 
+   * @param jobProps
+   * @return the user that Azkaban is going to execute as
+   */
+  private String getEffectiveUser(Props jobProps) {
+    String effectiveUser = null;
+    if (jobProps.containsKey(USER_TO_PROXY)) {
+      effectiveUser = jobProps.getString(USER_TO_PROXY);
+    } else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER)) {
+      effectiveUser = jobProps.getString(CommonJobProperties.SUBMIT_USER);
+    } else {
+      throw new RuntimeException(
+          "Internal Error: No user.to.proxy or submit.user in the jobProps");
+    }
+    info("effective user is: " + effectiveUser);
+    return effectiveUser;
+  }
+
+  /**
    * This is used to get the min/max memory size requirement by processes.
-   * SystemMemoryInfo can use the info to determine if the memory request
-   * can be fulfilled. For Java process, this should be Xms/Xmx setting.
+   * SystemMemoryInfo can use the info to determine if the memory request can be
+   * fulfilled. For Java process, this should be Xms/Xmx setting.
    *
    * @return pair of min/max memory size
    */
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 596e8b3..3412425 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -292,6 +292,7 @@ public class PropsUtils {
     props.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
     props.put(CommonJobProperties.PROJECT_LAST_CHANGED_BY, flow.getLastModifiedByUser());
     props.put(CommonJobProperties.PROJECT_LAST_CHANGED_DATE, flow.getLastModifiedTimestamp());
+    props.put(CommonJobProperties.SUBMIT_USER, flow.getExecutableFlow().getSubmitUser());  
 
     DateTime loadTime = new DateTime();
 
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
index 05a0a28..068935f 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/JavaProcessJobTest.java
@@ -22,7 +22,6 @@ import java.util.Date;
 import java.util.Properties;
 
 import org.apache.log4j.Logger;
-
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -33,6 +32,7 @@ import org.junit.Test;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.utils.Props;
 
 public class JavaProcessJobTest {
@@ -108,6 +108,14 @@ public class JavaProcessJobTest {
     props.put(AbstractProcessJob.WORKING_DIR, workingDir.getCanonicalPath());
     props.put("type", "java");
     props.put("fullPath", ".");
+    
+    props.put(CommonJobProperties.PROJECT_NAME, "test_project");
+    props.put(CommonJobProperties.FLOW_ID, "test_flow");
+    props.put(CommonJobProperties.JOB_ID, "test_job");
+    props.put(CommonJobProperties.EXEC_ID, "123");
+    props.put(CommonJobProperties.SUBMIT_USER, "test_user");
+    
+
     job = new JavaProcessJob("testJavaProcess", props, props, log);
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index 974895b..57f4505 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -20,7 +20,6 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.log4j.Logger;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -28,6 +27,7 @@ import org.junit.Test;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.utils.Props;
 
 public class ProcessJobTest {
@@ -46,6 +46,12 @@ public class ProcessJobTest {
     props.put(AbstractProcessJob.WORKING_DIR, workingDir.getCanonicalPath());
     props.put("type", "command");
     props.put("fullPath", ".");
+    
+    props.put(CommonJobProperties.PROJECT_NAME, "test_project");
+    props.put(CommonJobProperties.FLOW_ID, "test_flow");
+    props.put(CommonJobProperties.JOB_ID, "test_job");
+    props.put(CommonJobProperties.EXEC_ID, "123");
+    props.put(CommonJobProperties.SUBMIT_USER, "test_user");
 
     job = new ProcessJob("TestProcess", props, props, log);
   }
@@ -62,6 +68,37 @@ public class ProcessJobTest {
     job.run();
 
   }
+  
+  /**
+   * this job should run fine if the props contain user.to.proxy
+   * @throws Exception
+   */
+  @Test
+  public void testOneUnixCommandWithProxyUserInsteadOfSubmitUser() throws Exception {
+    
+    // Initialize the Props
+    props.removeLocal(CommonJobProperties.SUBMIT_USER);
+    props.put("user.to.proxy", "test_user");
+    props.put(ProcessJob.COMMAND, "ls -al");
+    
+    job.run();
+
+  }
+  
+  /**
+   * this job should fail because there is no user.to.proxy and no CommonJobProperties.SUBMIT_USER
+   * @throws Exception
+   */
+  @Test (expected=RuntimeException.class)
+  public void testOneUnixCommandWithNoUser() throws Exception {
+    
+    // Initialize the Props
+    props.removeLocal(CommonJobProperties.SUBMIT_USER);    
+    props.put(ProcessJob.COMMAND, "ls -al");
+    
+    job.run();
+
+  }
 
   @Test
   public void testFailedUnixCommand() throws Exception {