diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 9789799..54192ee 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -34,7 +34,8 @@ import azkaban.utils.SystemMemoryInfo;
/**
* A job that runs a simple unix command
*/
-public class ProcessJob extends AbstractProcessJob {
+public class ProcessJob extends AbstractProcessJob
+{
public static final String COMMAND = "command";
private static final long KILL_TIME_MS = 5000;
@@ -43,64 +44,77 @@ public class ProcessJob extends AbstractProcessJob {
private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
public static final String USER_TO_PROXY = "user.to.proxy";
- public static final String KRB5CCNAME = "KRB5CCNAME";
+ public static final String KRB5CCNAME = "KRB5CCNAME";
- public ProcessJob(final String jobId, final Props sysProps,
- final Props jobProps, final Logger log) {
+ public ProcessJob(final String jobId, final Props sysProps, final Props jobProps, final Logger log)
+ {
super(jobId, sysProps, jobProps, log);
-
+
// this is in line with what other job types (hadoopJava, spark, pig, hive) is doing
jobProps.put(CommonJobProperties.JOB_ID, jobId);
}
@Override
- public void run() throws Exception {
- try {
+ public void run() throws Exception
+ {
+ try
+ {
resolveProps();
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
handleError("Bad property definition! " + e.getMessage(), e);
}
- if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
+ if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true))
+ {
long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
Pair<Long, Long> memPair = getProcMemoryRequirement();
- boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
- if (!isMemGranted) {
+ boolean isMemGranted =
+ SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+ if (!isMemGranted)
+ {
throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
- memPair.getFirst(), memPair.getSecond(), getId()));
+ memPair.getFirst(),
+ memPair.getSecond(),
+ getId()));
}
}
List<String> commands = null;
- try {
+ try
+ {
commands = getCommandList();
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
handleError("Job set up failed " + e.getCause(), e);
}
long startMs = System.currentTimeMillis();
- if (commands == null) {
+ if (commands == null)
+ {
handleError("There are no commands to execute", null);
}
info(commands.size() + " commands to execute.");
File[] propFiles = initPropsFiles();
Map<String, String> envVars = getEnvironmentVariables();
-
- // change krb5ccname env var
- String effectiveUser = getEffectiveUser(jobProps);
- String krb5ccname =
- String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", jobProps.getString(CommonJobProperties.PROJECT_NAME),jobProps.getString(CommonJobProperties.FLOW_ID), jobProps.getString(CommonJobProperties.JOB_ID), jobProps.getString(CommonJobProperties.EXEC_ID), effectiveUser);
- envVars.put(KRB5CCNAME, krb5ccname);
- for (String command : commands) {
+ // change krb5ccname env var so that each job execution gets its own cache
+ envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));
+
+ for (String command : commands)
+ {
info("Command: " + command);
AzkabanProcessBuilder builder =
- new AzkabanProcessBuilder(partitionCommandLine(command))
- .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
+ new AzkabanProcessBuilder(partitionCommandLine(command)).setEnv(envVars)
+ .setWorkingDir(getCwd())
+ .setLogger(getLog());
- if (builder.getEnv().size() > 0) {
+ if (builder.getEnv().size() > 0)
+ {
info("Environment variables: " + builder.getEnv());
}
info("Working directory: " + builder.getWorkingDir());
@@ -111,18 +125,22 @@ public class ProcessJob extends AbstractProcessJob {
boolean success = false;
this.process = builder.build();
- try {
+ try
+ {
this.process.run();
success = true;
- } catch (Throwable e) {
+ }
+ catch (Throwable e)
+ {
for (File file : propFiles)
if (file != null && file.exists())
file.delete();
throw new RuntimeException(e);
- } finally {
+ }
+ finally
+ {
this.process = null;
- info("Process completed "
- + (success ? "successfully" : "unsuccessfully") + " in "
+ info("Process completed " + (success ? "successfully" : "unsuccessfully") + " in "
+ ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
}
}
@@ -131,11 +149,40 @@ public class ProcessJob extends AbstractProcessJob {
generateProperties(propFiles[1]);
}
- /**
- * Determines what user id should the process job run as.
- * Logic should be self explanatory
+ /**
+ * <pre>
+ * This method extracts the kerberos ticket cache file name from the jobprops.
+ * This method will ensure that each job execution will have its own kerberos ticket cache file
+ * Given that the code only sets an environmental variable, the number of files created corresponds
+ * to the number of processes that are doing kinit in their flow, which should not be an inordinately
+ * high number.
+ * </pre>
+ *
+ * @return file name: the kerberos ticket cache file to use
+ */
+ private String getKrb5ccname(Props jobProps)
+ {
+ String effectiveUser = getEffectiveUser(jobProps);
+ String projectName = jobProps.getString(CommonJobProperties.PROJECT_NAME).replace(" ", "_");
+ String flowId = jobProps.getString(CommonJobProperties.FLOW_ID).replace(" ", "_");
+ String jobId = jobProps.getString(CommonJobProperties.JOB_ID).replace(" ", "_");
+ // execId should be an int and should not have space in it, ever
+ String execId = jobProps.getString(CommonJobProperties.EXEC_ID);
+ String krb5ccname =
+ String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId, jobId, execId, effectiveUser);
+
+ return krb5ccname;
+ }
+
+ /**
+ * <pre>
+ * Determines what user id should the process job run as, in the following order of precedence:
+ * 1. USER_TO_PROXY
+ * 2. SUBMIT_USER
+ * </pre>
+ *
* @param jobProps
- * @return
+ * @return the user that Azkaban is going to execute as
*/
private String getEffectiveUser(Props jobProps)
{
@@ -150,36 +197,43 @@ public class ProcessJob extends AbstractProcessJob {
}
else
{
- throw new RuntimeException("There is no user.to.proxy and submit.user in the jobProps. There is something wrong with the system");
+ throw new RuntimeException("Internal Error: No user.to.proxy or submit.user in the jobProps");
}
info("effective user is: " + effectiveUser);
return effectiveUser;
}
/**
- * This is used to get the min/max memory size requirement by processes.
- * SystemMemoryInfo can use the info to determine if the memory request
- * can be fulfilled. For Java process, this should be Xms/Xmx setting.
+ * This is used to get the min/max memory size requirement by processes. SystemMemoryInfo can use
+ * the info to determine if the memory request can be fulfilled. For Java process, this should be
+ * Xms/Xmx setting.
*
* @return pair of min/max memory size
*/
- protected Pair<Long, Long> getProcMemoryRequirement() throws Exception {
+ protected Pair<Long, Long> getProcMemoryRequirement() throws Exception
+ {
return new Pair<Long, Long>(0L, 0L);
}
- protected void handleError(String errorMsg, Exception e) throws Exception {
+ protected void handleError(String errorMsg, Exception e) throws Exception
+ {
error(errorMsg);
- if (e != null) {
+ if (e != null)
+ {
throw new Exception(errorMsg, e);
- } else {
+ }
+ else
+ {
throw new Exception(errorMsg);
}
}
- protected List<String> getCommandList() {
+ protected List<String> getCommandList()
+ {
List<String> commands = new ArrayList<String>();
commands.add(jobProps.getString(COMMAND));
- for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++) {
+ for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++)
+ {
commands.add(jobProps.getString(COMMAND + "." + i));
}
@@ -187,37 +241,43 @@ public class ProcessJob extends AbstractProcessJob {
}
@Override
- public void cancel() throws InterruptedException {
+ public void cancel() throws InterruptedException
+ {
if (process == null)
throw new IllegalStateException("Not started.");
boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
- if (!killed) {
+ if (!killed)
+ {
warn("Kill with signal TERM failed. Killing with KILL signal.");
process.hardKill();
}
}
@Override
- public double getProgress() {
+ public double getProgress()
+ {
return process != null && process.isComplete() ? 1.0 : 0.0;
}
- public int getProcessId() {
+ public int getProcessId()
+ {
return process.getProcessId();
}
- public String getPath() {
+ public String getPath()
+ {
return _jobPath == null ? "" : _jobPath;
}
/**
- * Splits the command into a unix like command line structure. Quotes and
- * single quotes are treated as nested strings.
+ * Splits the command into a unix like command line structure. Quotes and single quotes are
+ * treated as nested strings.
*
* @param command
* @return
*/
- public static String[] partitionCommandLine(final String command) {
+ public static String[] partitionCommandLine(final String command)
+ {
ArrayList<String> commands = new ArrayList<String>();
int index = 0;
@@ -226,43 +286,56 @@ public class ProcessJob extends AbstractProcessJob {
boolean isApos = false;
boolean isQuote = false;
- while (index < command.length()) {
+ while (index < command.length())
+ {
char c = command.charAt(index);
- switch (c) {
- case ' ':
- if (!isQuote && !isApos) {
- String arg = buffer.toString();
- buffer = new StringBuffer(command.length() - index);
- if (arg.length() > 0) {
- commands.add(arg);
+ switch (c)
+ {
+ case ' ':
+ if (!isQuote && !isApos)
+ {
+ String arg = buffer.toString();
+ buffer = new StringBuffer(command.length() - index);
+ if (arg.length() > 0)
+ {
+ commands.add(arg);
+ }
}
- } else {
- buffer.append(c);
- }
- break;
- case '\'':
- if (!isQuote) {
- isApos = !isApos;
- } else {
- buffer.append(c);
- }
- break;
- case '"':
- if (!isApos) {
- isQuote = !isQuote;
- } else {
+ else
+ {
+ buffer.append(c);
+ }
+ break;
+ case '\'':
+ if (!isQuote)
+ {
+ isApos = !isApos;
+ }
+ else
+ {
+ buffer.append(c);
+ }
+ break;
+ case '"':
+ if (!isApos)
+ {
+ isQuote = !isQuote;
+ }
+ else
+ {
+ buffer.append(c);
+ }
+ break;
+ default:
buffer.append(c);
- }
- break;
- default:
- buffer.append(c);
}
index++;
}
- if (buffer.length() > 0) {
+ if (buffer.length() > 0)
+ {
String arg = buffer.toString();
commands.add(arg);
}