azkaban-aplcache

incorporating feedback: Azkaban #495 1. modified line 106 2.

9/21/2015 4:01:01 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 9789799..54192ee 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -34,7 +34,8 @@ import azkaban.utils.SystemMemoryInfo;
 /**
  * A job that runs a simple unix command
  */
-public class ProcessJob extends AbstractProcessJob {
+public class ProcessJob extends AbstractProcessJob
+{
 
   public static final String COMMAND = "command";
   private static final long KILL_TIME_MS = 5000;
@@ -43,64 +44,77 @@ public class ProcessJob extends AbstractProcessJob {
   private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
   public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
   public static final String USER_TO_PROXY = "user.to.proxy";
-  public static final String KRB5CCNAME = "KRB5CCNAME";  
+  public static final String KRB5CCNAME = "KRB5CCNAME";
 
-  public ProcessJob(final String jobId, final Props sysProps,
-      final Props jobProps, final Logger log) {
+  public ProcessJob(final String jobId, final Props sysProps, final Props jobProps, final Logger log)
+  {
     super(jobId, sysProps, jobProps, log);
-    
+
     // this is in line with what other job types (hadoopJava, spark, pig, hive) is doing
     jobProps.put(CommonJobProperties.JOB_ID, jobId);
   }
 
   @Override
-  public void run() throws Exception {
-    try {
+  public void run() throws Exception
+  {
+    try
+    {
       resolveProps();
-    } catch (Exception e) {
+    }
+    catch (Exception e)
+    {
       handleError("Bad property definition! " + e.getMessage(), e);
     }
 
-    if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
+    if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true))
+    {
       long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
       Pair<Long, Long> memPair = getProcMemoryRequirement();
-      boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
-      if (!isMemGranted) {
+      boolean isMemGranted =
+          SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+      if (!isMemGranted)
+      {
         throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
-                memPair.getFirst(), memPair.getSecond(), getId()));
+                                          memPair.getFirst(),
+                                          memPair.getSecond(),
+                                          getId()));
       }
     }
 
     List<String> commands = null;
-    try {
+    try
+    {
       commands = getCommandList();
-    } catch (Exception e) {
+    }
+    catch (Exception e)
+    {
       handleError("Job set up failed " + e.getCause(), e);
     }
 
     long startMs = System.currentTimeMillis();
 
-    if (commands == null) {
+    if (commands == null)
+    {
       handleError("There are no commands to execute", null);
     }
 
     info(commands.size() + " commands to execute.");
     File[] propFiles = initPropsFiles();
     Map<String, String> envVars = getEnvironmentVariables();
-    
-    // change krb5ccname env var    
-    String effectiveUser = getEffectiveUser(jobProps);
-    String krb5ccname =
-        String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", jobProps.getString(CommonJobProperties.PROJECT_NAME),jobProps.getString(CommonJobProperties.FLOW_ID), jobProps.getString(CommonJobProperties.JOB_ID), jobProps.getString(CommonJobProperties.EXEC_ID), effectiveUser);
-    envVars.put(KRB5CCNAME, krb5ccname);
 
-    for (String command : commands) {
+    // change krb5ccname env var so that each job execution gets its own cache
+    envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));
+
+    for (String command : commands)
+    {
       info("Command: " + command);
       AzkabanProcessBuilder builder =
-          new AzkabanProcessBuilder(partitionCommandLine(command))
-              .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
+          new AzkabanProcessBuilder(partitionCommandLine(command)).setEnv(envVars)
+                                                                  .setWorkingDir(getCwd())
+                                                                  .setLogger(getLog());
 
-      if (builder.getEnv().size() > 0) {
+      if (builder.getEnv().size() > 0)
+      {
         info("Environment variables: " + builder.getEnv());
       }
       info("Working directory: " + builder.getWorkingDir());
@@ -111,18 +125,22 @@ public class ProcessJob extends AbstractProcessJob {
       boolean success = false;
       this.process = builder.build();
 
-      try {
+      try
+      {
         this.process.run();
         success = true;
-      } catch (Throwable e) {
+      }
+      catch (Throwable e)
+      {
         for (File file : propFiles)
           if (file != null && file.exists())
             file.delete();
         throw new RuntimeException(e);
-      } finally {
+      }
+      finally
+      {
         this.process = null;
-        info("Process completed "
-            + (success ? "successfully" : "unsuccessfully") + " in "
+        info("Process completed " + (success ? "successfully" : "unsuccessfully") + " in "
             + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
       }
     }
@@ -131,11 +149,40 @@ public class ProcessJob extends AbstractProcessJob {
     generateProperties(propFiles[1]);
   }
 
-  /** 
-   * Determines what user id should the process job run as.
-   * Logic should be self explanatory
+  /**
+   * <pre>
+   * This method extracts the kerberos ticket cache file name from the jobprops.
+   * This method will ensure that each job execution will have its own kerberos ticket cache file
+   * Given that the code only sets an environmental variable, the number of files created corresponds
+   * to the number of processes that are doing kinit in their flow, which should not be an inordinately 
+   * high number.
+   * </pre>
+   * 
+   * @return file name: the kerberos ticket cache file to use
+   */
+  private String getKrb5ccname(Props jobProps)
+  {
+    String effectiveUser = getEffectiveUser(jobProps);
+    String projectName = jobProps.getString(CommonJobProperties.PROJECT_NAME).replace(" ", "_");
+    String flowId = jobProps.getString(CommonJobProperties.FLOW_ID).replace(" ", "_");
+    String jobId = jobProps.getString(CommonJobProperties.JOB_ID).replace(" ", "_");
+    // execId should be an int and should not have space in it, ever
+    String execId = jobProps.getString(CommonJobProperties.EXEC_ID);
+    String krb5ccname =
+        String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId, jobId, execId, effectiveUser);
+
+    return krb5ccname;
+  }
+
+  /**
+   * <pre>
+   * Determines what user id should the process job run as, in the following order of precedence:
+   * 1. USER_TO_PROXY
+   * 2. SUBMIT_USER
+   * </pre>
+   * 
    * @param jobProps
-   * @return
+   * @return the user that Azkaban is going to execute as
    */
   private String getEffectiveUser(Props jobProps)
   {
@@ -150,36 +197,43 @@ public class ProcessJob extends AbstractProcessJob {
     }
     else
     {
-      throw new RuntimeException("There is no user.to.proxy and submit.user in the jobProps.  There is something wrong with the system");
+      throw new RuntimeException("Internal Error: No user.to.proxy or submit.user in the jobProps");
     }
     info("effective user is: " + effectiveUser);
     return effectiveUser;
   }
 
   /**
-   * This is used to get the min/max memory size requirement by processes.
-   * SystemMemoryInfo can use the info to determine if the memory request
-   * can be fulfilled. For Java process, this should be Xms/Xmx setting.
+   * This is used to get the min/max memory size requirement by processes. SystemMemoryInfo can use
+   * the info to determine if the memory request can be fulfilled. For Java process, this should be
+   * Xms/Xmx setting.
    *
    * @return pair of min/max memory size
    */
-  protected Pair<Long, Long> getProcMemoryRequirement() throws Exception {
+  protected Pair<Long, Long> getProcMemoryRequirement() throws Exception
+  {
     return new Pair<Long, Long>(0L, 0L);
   }
 
-  protected void handleError(String errorMsg, Exception e) throws Exception {
+  protected void handleError(String errorMsg, Exception e) throws Exception
+  {
     error(errorMsg);
-    if (e != null) {
+    if (e != null)
+    {
       throw new Exception(errorMsg, e);
-    } else {
+    }
+    else
+    {
       throw new Exception(errorMsg);
     }
   }
 
-  protected List<String> getCommandList() {
+  protected List<String> getCommandList()
+  {
     List<String> commands = new ArrayList<String>();
     commands.add(jobProps.getString(COMMAND));
-    for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++) {
+    for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++)
+    {
       commands.add(jobProps.getString(COMMAND + "." + i));
     }
 
@@ -187,37 +241,43 @@ public class ProcessJob extends AbstractProcessJob {
   }
 
   @Override
-  public void cancel() throws InterruptedException {
+  public void cancel() throws InterruptedException
+  {
     if (process == null)
       throw new IllegalStateException("Not started.");
     boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
-    if (!killed) {
+    if (!killed)
+    {
       warn("Kill with signal TERM failed. Killing with KILL signal.");
       process.hardKill();
     }
   }
 
   @Override
-  public double getProgress() {
+  public double getProgress()
+  {
     return process != null && process.isComplete() ? 1.0 : 0.0;
   }
 
-  public int getProcessId() {
+  public int getProcessId()
+  {
     return process.getProcessId();
   }
 
-  public String getPath() {
+  public String getPath()
+  {
     return _jobPath == null ? "" : _jobPath;
   }
 
   /**
-   * Splits the command into a unix like command line structure. Quotes and
-   * single quotes are treated as nested strings.
+   * Splits the command into a unix like command line structure. Quotes and single quotes are
+   * treated as nested strings.
    *
    * @param command
    * @return
    */
-  public static String[] partitionCommandLine(final String command) {
+  public static String[] partitionCommandLine(final String command)
+  {
     ArrayList<String> commands = new ArrayList<String>();
 
     int index = 0;
@@ -226,43 +286,56 @@ public class ProcessJob extends AbstractProcessJob {
 
     boolean isApos = false;
     boolean isQuote = false;
-    while (index < command.length()) {
+    while (index < command.length())
+    {
       char c = command.charAt(index);
 
-      switch (c) {
-      case ' ':
-        if (!isQuote && !isApos) {
-          String arg = buffer.toString();
-          buffer = new StringBuffer(command.length() - index);
-          if (arg.length() > 0) {
-            commands.add(arg);
+      switch (c)
+      {
+        case ' ':
+          if (!isQuote && !isApos)
+          {
+            String arg = buffer.toString();
+            buffer = new StringBuffer(command.length() - index);
+            if (arg.length() > 0)
+            {
+              commands.add(arg);
+            }
           }
-        } else {
-          buffer.append(c);
-        }
-        break;
-      case '\'':
-        if (!isQuote) {
-          isApos = !isApos;
-        } else {
-          buffer.append(c);
-        }
-        break;
-      case '"':
-        if (!isApos) {
-          isQuote = !isQuote;
-        } else {
+          else
+          {
+            buffer.append(c);
+          }
+          break;
+        case '\'':
+          if (!isQuote)
+          {
+            isApos = !isApos;
+          }
+          else
+          {
+            buffer.append(c);
+          }
+          break;
+        case '"':
+          if (!isApos)
+          {
+            isQuote = !isQuote;
+          }
+          else
+          {
+            buffer.append(c);
+          }
+          break;
+        default:
           buffer.append(c);
-        }
-        break;
-      default:
-        buffer.append(c);
       }
 
       index++;
     }
 
-    if (buffer.length() > 0) {
+    if (buffer.length() > 0)
+    {
       String arg = buffer.toString();
       commands.add(arg);
     }
diff --git a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
index d72f358..57f4505 100644
--- a/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobExecutor/ProcessJobTest.java
@@ -68,6 +68,37 @@ public class ProcessJobTest {
     job.run();
 
   }
+  
+  /**
+   * this job should run fine if the props contain user.to.proxy
+   * @throws Exception
+   */
+  @Test
+  public void testOneUnixCommandWithProxyUserInsteadOfSubmitUser() throws Exception {
+    
+    // Initialize the Props
+    props.removeLocal(CommonJobProperties.SUBMIT_USER);
+    props.put("user.to.proxy", "test_user");
+    props.put(ProcessJob.COMMAND, "ls -al");
+    
+    job.run();
+
+  }
+  
+  /**
+   * this job should fail because there is no user.to.proxy and no CommonJobProperties.SUBMIT_USER
+   * @throws Exception
+   */
+  @Test (expected=RuntimeException.class)
+  public void testOneUnixCommandWithNoUser() throws Exception {
+    
+    // Initialize the Props
+    props.removeLocal(CommonJobProperties.SUBMIT_USER);    
+    props.put(ProcessJob.COMMAND, "ls -al");
+    
+    job.run();
+
+  }
 
   @Test
   public void testFailedUnixCommand() throws Exception {