azkaban-developers

reformatting ProcessJob

9/21/2015 9:24:42 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 54192ee..18f2262 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -34,20 +34,25 @@ import azkaban.utils.SystemMemoryInfo;
 /**
  * A job that runs a simple unix command
  */
-public class ProcessJob extends AbstractProcessJob
-{
+public class ProcessJob extends AbstractProcessJob {
 
   public static final String COMMAND = "command";
+
   private static final long KILL_TIME_MS = 5000;
+
   private volatile AzkabanProcess process;
+
   private static final String MEMCHECK_ENABLED = "memCheck.enabled";
+
   private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
+
   public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
+
   public static final String USER_TO_PROXY = "user.to.proxy";
+
   public static final String KRB5CCNAME = "KRB5CCNAME";
 
-  public ProcessJob(final String jobId, final Props sysProps, final Props jobProps, final Logger log)
-  {
+  public ProcessJob(final String jobId, final Props sysProps, final Props jobProps, final Logger log) {
     super(jobId, sysProps, jobProps, log);
 
     // this is in line with what other job types (hadoopJava, spark, pig, hive) is doing
@@ -55,46 +60,36 @@ public class ProcessJob extends AbstractProcessJob
   }
 
   @Override
-  public void run() throws Exception
-  {
-    try
-    {
+  public void run() throws Exception {
+    try {
       resolveProps();
-    }
-    catch (Exception e)
-    {
+    } catch (Exception e) {
       handleError("Bad property definition! " + e.getMessage(), e);
     }
 
-    if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true))
-    {
+    if (sysProps.getBoolean(MEMCHECK_ENABLED, true)
+            && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
       long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
       Pair<Long, Long> memPair = getProcMemoryRequirement();
-      boolean isMemGranted =
-          SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
-      if (!isMemGranted)
-      {
-        throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
-                                          memPair.getFirst(),
-                                          memPair.getSecond(),
-                                          getId()));
+      boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(),
+              memPair.getSecond(), freeMemDecrAmt);
+      if (!isMemGranted) {
+        throw new Exception(String.format(
+                "Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+                memPair.getFirst(), memPair.getSecond(), getId()));
       }
     }
 
     List<String> commands = null;
-    try
-    {
+    try {
       commands = getCommandList();
-    }
-    catch (Exception e)
-    {
+    } catch (Exception e) {
       handleError("Job set up failed " + e.getCause(), e);
     }
 
     long startMs = System.currentTimeMillis();
 
-    if (commands == null)
-    {
+    if (commands == null) {
       handleError("There are no commands to execute", null);
     }
 
@@ -105,16 +100,12 @@ public class ProcessJob extends AbstractProcessJob
     // change krb5ccname env var so that each job execution gets its own cache
     envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));
 
-    for (String command : commands)
-    {
+    for (String command : commands) {
       info("Command: " + command);
-      AzkabanProcessBuilder builder =
-          new AzkabanProcessBuilder(partitionCommandLine(command)).setEnv(envVars)
-                                                                  .setWorkingDir(getCwd())
-                                                                  .setLogger(getLog());
+      AzkabanProcessBuilder builder = new AzkabanProcessBuilder(partitionCommandLine(command))
+              .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
 
-      if (builder.getEnv().size() > 0)
-      {
+      if (builder.getEnv().size() > 0) {
         info("Environment variables: " + builder.getEnv());
       }
       info("Working directory: " + builder.getWorkingDir());
@@ -125,23 +116,18 @@ public class ProcessJob extends AbstractProcessJob
       boolean success = false;
       this.process = builder.build();
 
-      try
-      {
+      try {
         this.process.run();
         success = true;
-      }
-      catch (Throwable e)
-      {
+      } catch (Throwable e) {
         for (File file : propFiles)
           if (file != null && file.exists())
             file.delete();
         throw new RuntimeException(e);
-      }
-      finally
-      {
+      } finally {
         this.process = null;
         info("Process completed " + (success ? "successfully" : "unsuccessfully") + " in "
-            + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
+                + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
       }
     }
 
@@ -160,16 +146,15 @@ public class ProcessJob extends AbstractProcessJob
    * 
    * @return file name: the kerberos ticket cache file to use
    */
-  private String getKrb5ccname(Props jobProps)
-  {
+  private String getKrb5ccname(Props jobProps) {
     String effectiveUser = getEffectiveUser(jobProps);
     String projectName = jobProps.getString(CommonJobProperties.PROJECT_NAME).replace(" ", "_");
     String flowId = jobProps.getString(CommonJobProperties.FLOW_ID).replace(" ", "_");
     String jobId = jobProps.getString(CommonJobProperties.JOB_ID).replace(" ", "_");
     // execId should be an int and should not have space in it, ever
     String execId = jobProps.getString(CommonJobProperties.EXEC_ID);
-    String krb5ccname =
-        String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId, jobId, execId, effectiveUser);
+    String krb5ccname = String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId,
+            jobId, execId, effectiveUser);
 
     return krb5ccname;
   }
@@ -184,19 +169,13 @@ public class ProcessJob extends AbstractProcessJob
    * @param jobProps
    * @return the user that Azkaban is going to execute as
    */
-  private String getEffectiveUser(Props jobProps)
-  {
+  private String getEffectiveUser(Props jobProps) {
     String effectiveUser = null;
-    if (jobProps.containsKey(USER_TO_PROXY))
-    {
+    if (jobProps.containsKey(USER_TO_PROXY)) {
       effectiveUser = jobProps.getString(USER_TO_PROXY);
-    }
-    else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER))
-    {
+    } else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER)) {
       effectiveUser = jobProps.getString(CommonJobProperties.SUBMIT_USER);
-    }
-    else
-    {
+    } else {
       throw new RuntimeException("Internal Error: No user.to.proxy or submit.user in the jobProps");
     }
     info("effective user is: " + effectiveUser);
@@ -210,30 +189,23 @@ public class ProcessJob extends AbstractProcessJob
    *
    * @return pair of min/max memory size
    */
-  protected Pair<Long, Long> getProcMemoryRequirement() throws Exception
-  {
+  protected Pair<Long, Long> getProcMemoryRequirement() throws Exception {
     return new Pair<Long, Long>(0L, 0L);
   }
 
-  protected void handleError(String errorMsg, Exception e) throws Exception
-  {
+  protected void handleError(String errorMsg, Exception e) throws Exception {
     error(errorMsg);
-    if (e != null)
-    {
+    if (e != null) {
       throw new Exception(errorMsg, e);
-    }
-    else
-    {
+    } else {
       throw new Exception(errorMsg);
     }
   }
 
-  protected List<String> getCommandList()
-  {
+  protected List<String> getCommandList() {
     List<String> commands = new ArrayList<String>();
     commands.add(jobProps.getString(COMMAND));
-    for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++)
-    {
+    for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++) {
       commands.add(jobProps.getString(COMMAND + "." + i));
     }
 
@@ -241,31 +213,26 @@ public class ProcessJob extends AbstractProcessJob
   }
 
   @Override
-  public void cancel() throws InterruptedException
-  {
+  public void cancel() throws InterruptedException {
     if (process == null)
       throw new IllegalStateException("Not started.");
     boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
-    if (!killed)
-    {
+    if (!killed) {
       warn("Kill with signal TERM failed. Killing with KILL signal.");
       process.hardKill();
     }
   }
 
   @Override
-  public double getProgress()
-  {
+  public double getProgress() {
     return process != null && process.isComplete() ? 1.0 : 0.0;
   }
 
-  public int getProcessId()
-  {
+  public int getProcessId() {
     return process.getProcessId();
   }
 
-  public String getPath()
-  {
+  public String getPath() {
     return _jobPath == null ? "" : _jobPath;
   }
 
@@ -276,8 +243,7 @@ public class ProcessJob extends AbstractProcessJob
    * @param command
    * @return
    */
-  public static String[] partitionCommandLine(final String command)
-  {
+  public static String[] partitionCommandLine(final String command) {
     ArrayList<String> commands = new ArrayList<String>();
 
     int index = 0;
@@ -286,44 +252,32 @@ public class ProcessJob extends AbstractProcessJob
 
     boolean isApos = false;
     boolean isQuote = false;
-    while (index < command.length())
-    {
+    while (index < command.length()) {
       char c = command.charAt(index);
 
-      switch (c)
-      {
+      switch (c) {
         case ' ':
-          if (!isQuote && !isApos)
-          {
+          if (!isQuote && !isApos) {
             String arg = buffer.toString();
             buffer = new StringBuffer(command.length() - index);
-            if (arg.length() > 0)
-            {
+            if (arg.length() > 0) {
               commands.add(arg);
             }
-          }
-          else
-          {
+          } else {
             buffer.append(c);
           }
           break;
         case '\'':
-          if (!isQuote)
-          {
+          if (!isQuote) {
             isApos = !isApos;
-          }
-          else
-          {
+          } else {
             buffer.append(c);
           }
           break;
         case '"':
-          if (!isApos)
-          {
+          if (!isApos) {
             isQuote = !isQuote;
-          }
-          else
-          {
+          } else {
             buffer.append(c);
           }
           break;
@@ -334,8 +288,7 @@ public class ProcessJob extends AbstractProcessJob
       index++;
     }
 
-    if (buffer.length() > 0)
-    {
+    if (buffer.length() > 0) {
       String arg = buffer.toString();
       commands.add(arg);
     }