azkaban-aplcache

setup up individual krb5cc files for each individual job. prevents

9/18/2015 7:11:53 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
index 0c98ecb..8c208a5 100644
--- a/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
+++ b/azkaban-common/src/main/java/azkaban/flow/CommonJobProperties.java
@@ -133,6 +133,11 @@ public class CommonJobProperties {
    * hotspot occurs.
    */
   public static final String PROJECT_VERSION = "azkaban.flow.projectversion";
+  
+  /**
+   * Find out who is the submit user, in addition to the user.to.proxy (they may be different)
+   */
+  public static final String SUBMIT_USER = "azkaban.flow.submituser";
 
   /**
    * A uuid assigned to every execution
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 4e268a0..9789799 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 
+import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.utils.process.AzkabanProcess;
 import azkaban.jobExecutor.utils.process.AzkabanProcessBuilder;
 import azkaban.utils.Pair;
@@ -41,10 +42,15 @@ public class ProcessJob extends AbstractProcessJob {
   private static final String MEMCHECK_ENABLED = "memCheck.enabled";
   private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
   public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
+  public static final String USER_TO_PROXY = "user.to.proxy";
+  public static final String KRB5CCNAME = "KRB5CCNAME";  
 
   public ProcessJob(final String jobId, final Props sysProps,
       final Props jobProps, final Logger log) {
     super(jobId, sysProps, jobProps, log);
+    
+    // this is in line with what other job types (hadoopJava, spark, pig, hive) is doing
+    jobProps.put(CommonJobProperties.JOB_ID, jobId);
   }
 
   @Override
@@ -81,6 +87,12 @@ public class ProcessJob extends AbstractProcessJob {
     info(commands.size() + " commands to execute.");
     File[] propFiles = initPropsFiles();
     Map<String, String> envVars = getEnvironmentVariables();
+    
+    // change krb5ccname env var    
+    String effectiveUser = getEffectiveUser(jobProps);
+    String krb5ccname =
+        String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", jobProps.getString(CommonJobProperties.PROJECT_NAME),jobProps.getString(CommonJobProperties.FLOW_ID), jobProps.getString(CommonJobProperties.JOB_ID), jobProps.getString(CommonJobProperties.EXEC_ID), effectiveUser);
+    envVars.put(KRB5CCNAME, krb5ccname);
 
     for (String command : commands) {
       info("Command: " + command);
@@ -119,6 +131,31 @@ public class ProcessJob extends AbstractProcessJob {
     generateProperties(propFiles[1]);
   }
 
+  /** 
+   * Determines what user id should the process job run as.
+   * Logic should be self explanatory
+   * @param jobProps
+   * @return
+   */
+  private String getEffectiveUser(Props jobProps)
+  {
+    String effectiveUser = null;
+    if (jobProps.containsKey(USER_TO_PROXY))
+    {
+      effectiveUser = jobProps.getString(USER_TO_PROXY);
+    }
+    else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER))
+    {
+      effectiveUser = jobProps.getString(CommonJobProperties.SUBMIT_USER);
+    }
+    else
+    {
+      throw new RuntimeException("There is no user.to.proxy and submit.user in the jobProps.  There is something wrong with the system");
+    }
+    info("effective user is: " + effectiveUser);
+    return effectiveUser;
+  }
+
   /**
    * This is used to get the min/max memory size requirement by processes.
    * SystemMemoryInfo can use the info to determine if the memory request
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 596e8b3..3412425 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -292,6 +292,7 @@ public class PropsUtils {
     props.put(CommonJobProperties.FLOW_UUID, UUID.randomUUID().toString());
     props.put(CommonJobProperties.PROJECT_LAST_CHANGED_BY, flow.getLastModifiedByUser());
     props.put(CommonJobProperties.PROJECT_LAST_CHANGED_DATE, flow.getLastModifiedTimestamp());
+    props.put(CommonJobProperties.SUBMIT_USER, flow.getExecutableFlow().getSubmitUser());  
 
     DateTime loadTime = new DateTime();