diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 54192ee..18f2262 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -34,20 +34,25 @@ import azkaban.utils.SystemMemoryInfo;
/**
* A job that runs a simple unix command
*/
-public class ProcessJob extends AbstractProcessJob
-{
+public class ProcessJob extends AbstractProcessJob {
public static final String COMMAND = "command";
+
private static final long KILL_TIME_MS = 5000;
+
private volatile AzkabanProcess process;
+
private static final String MEMCHECK_ENABLED = "memCheck.enabled";
+
private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
+
public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
+
public static final String USER_TO_PROXY = "user.to.proxy";
+
public static final String KRB5CCNAME = "KRB5CCNAME";
- public ProcessJob(final String jobId, final Props sysProps, final Props jobProps, final Logger log)
- {
+ public ProcessJob(final String jobId, final Props sysProps, final Props jobProps, final Logger log) {
super(jobId, sysProps, jobProps, log);
// this is in line with what other job types (hadoopJava, spark, pig, hive) is doing
@@ -55,46 +60,36 @@ public class ProcessJob extends AbstractProcessJob
}
@Override
- public void run() throws Exception
- {
- try
- {
+ public void run() throws Exception {
+ try {
resolveProps();
- }
- catch (Exception e)
- {
+ } catch (Exception e) {
handleError("Bad property definition! " + e.getMessage(), e);
}
- if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true))
- {
+ if (sysProps.getBoolean(MEMCHECK_ENABLED, true)
+ && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
Pair<Long, Long> memPair = getProcMemoryRequirement();
- boolean isMemGranted =
- SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
- if (!isMemGranted)
- {
- throw new Exception(String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
- memPair.getFirst(),
- memPair.getSecond(),
- getId()));
+ boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(),
+ memPair.getSecond(), freeMemDecrAmt);
+ if (!isMemGranted) {
+ throw new Exception(String.format(
+ "Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+ memPair.getFirst(), memPair.getSecond(), getId()));
}
}
List<String> commands = null;
- try
- {
+ try {
commands = getCommandList();
- }
- catch (Exception e)
- {
+ } catch (Exception e) {
handleError("Job set up failed " + e.getCause(), e);
}
long startMs = System.currentTimeMillis();
- if (commands == null)
- {
+ if (commands == null) {
handleError("There are no commands to execute", null);
}
@@ -105,16 +100,12 @@ public class ProcessJob extends AbstractProcessJob
// change krb5ccname env var so that each job execution gets its own cache
envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));
- for (String command : commands)
- {
+ for (String command : commands) {
info("Command: " + command);
- AzkabanProcessBuilder builder =
- new AzkabanProcessBuilder(partitionCommandLine(command)).setEnv(envVars)
- .setWorkingDir(getCwd())
- .setLogger(getLog());
+ AzkabanProcessBuilder builder = new AzkabanProcessBuilder(partitionCommandLine(command))
+ .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
- if (builder.getEnv().size() > 0)
- {
+ if (builder.getEnv().size() > 0) {
info("Environment variables: " + builder.getEnv());
}
info("Working directory: " + builder.getWorkingDir());
@@ -125,23 +116,18 @@ public class ProcessJob extends AbstractProcessJob
boolean success = false;
this.process = builder.build();
- try
- {
+ try {
this.process.run();
success = true;
- }
- catch (Throwable e)
- {
+ } catch (Throwable e) {
for (File file : propFiles)
if (file != null && file.exists())
file.delete();
throw new RuntimeException(e);
- }
- finally
- {
+ } finally {
this.process = null;
info("Process completed " + (success ? "successfully" : "unsuccessfully") + " in "
- + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
+ + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
}
}
@@ -160,16 +146,15 @@ public class ProcessJob extends AbstractProcessJob
*
* @return file name: the kerberos ticket cache file to use
*/
- private String getKrb5ccname(Props jobProps)
- {
+ private String getKrb5ccname(Props jobProps) {
String effectiveUser = getEffectiveUser(jobProps);
String projectName = jobProps.getString(CommonJobProperties.PROJECT_NAME).replace(" ", "_");
String flowId = jobProps.getString(CommonJobProperties.FLOW_ID).replace(" ", "_");
String jobId = jobProps.getString(CommonJobProperties.JOB_ID).replace(" ", "_");
// execId should be an int and should not have space in it, ever
String execId = jobProps.getString(CommonJobProperties.EXEC_ID);
- String krb5ccname =
- String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId, jobId, execId, effectiveUser);
+ String krb5ccname = String.format("/tmp/krb5cc__%s__%s__%s__%s__%s", projectName, flowId,
+ jobId, execId, effectiveUser);
return krb5ccname;
}
@@ -184,19 +169,13 @@ public class ProcessJob extends AbstractProcessJob
* @param jobProps
* @return the user that Azkaban is going to execute as
*/
- private String getEffectiveUser(Props jobProps)
- {
+ private String getEffectiveUser(Props jobProps) {
String effectiveUser = null;
- if (jobProps.containsKey(USER_TO_PROXY))
- {
+ if (jobProps.containsKey(USER_TO_PROXY)) {
effectiveUser = jobProps.getString(USER_TO_PROXY);
- }
- else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER))
- {
+ } else if (jobProps.containsKey(CommonJobProperties.SUBMIT_USER)) {
effectiveUser = jobProps.getString(CommonJobProperties.SUBMIT_USER);
- }
- else
- {
+ } else {
throw new RuntimeException("Internal Error: No user.to.proxy or submit.user in the jobProps");
}
info("effective user is: " + effectiveUser);
@@ -210,30 +189,23 @@ public class ProcessJob extends AbstractProcessJob
*
* @return pair of min/max memory size
*/
- protected Pair<Long, Long> getProcMemoryRequirement() throws Exception
- {
+ protected Pair<Long, Long> getProcMemoryRequirement() throws Exception {
return new Pair<Long, Long>(0L, 0L);
}
- protected void handleError(String errorMsg, Exception e) throws Exception
- {
+ protected void handleError(String errorMsg, Exception e) throws Exception {
error(errorMsg);
- if (e != null)
- {
+ if (e != null) {
throw new Exception(errorMsg, e);
- }
- else
- {
+ } else {
throw new Exception(errorMsg);
}
}
- protected List<String> getCommandList()
- {
+ protected List<String> getCommandList() {
List<String> commands = new ArrayList<String>();
commands.add(jobProps.getString(COMMAND));
- for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++)
- {
+ for (int i = 1; jobProps.containsKey(COMMAND + "." + i); i++) {
commands.add(jobProps.getString(COMMAND + "." + i));
}
@@ -241,31 +213,26 @@ public class ProcessJob extends AbstractProcessJob
}
@Override
- public void cancel() throws InterruptedException
- {
+ public void cancel() throws InterruptedException {
if (process == null)
throw new IllegalStateException("Not started.");
boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
- if (!killed)
- {
+ if (!killed) {
warn("Kill with signal TERM failed. Killing with KILL signal.");
process.hardKill();
}
}
@Override
- public double getProgress()
- {
+ public double getProgress() {
return process != null && process.isComplete() ? 1.0 : 0.0;
}
- public int getProcessId()
- {
+ public int getProcessId() {
return process.getProcessId();
}
- public String getPath()
- {
+ public String getPath() {
return _jobPath == null ? "" : _jobPath;
}
@@ -276,8 +243,7 @@ public class ProcessJob extends AbstractProcessJob
* @param command
* @return
*/
- public static String[] partitionCommandLine(final String command)
- {
+ public static String[] partitionCommandLine(final String command) {
ArrayList<String> commands = new ArrayList<String>();
int index = 0;
@@ -286,44 +252,32 @@ public class ProcessJob extends AbstractProcessJob
boolean isApos = false;
boolean isQuote = false;
- while (index < command.length())
- {
+ while (index < command.length()) {
char c = command.charAt(index);
- switch (c)
- {
+ switch (c) {
case ' ':
- if (!isQuote && !isApos)
- {
+ if (!isQuote && !isApos) {
String arg = buffer.toString();
buffer = new StringBuffer(command.length() - index);
- if (arg.length() > 0)
- {
+ if (arg.length() > 0) {
commands.add(arg);
}
- }
- else
- {
+ } else {
buffer.append(c);
}
break;
case '\'':
- if (!isQuote)
- {
+ if (!isQuote) {
isApos = !isApos;
- }
- else
- {
+ } else {
buffer.append(c);
}
break;
case '"':
- if (!isApos)
- {
+ if (!isApos) {
isQuote = !isQuote;
- }
- else
- {
+ } else {
buffer.append(c);
}
break;
@@ -334,8 +288,7 @@ public class ProcessJob extends AbstractProcessJob
index++;
}
- if (buffer.length() > 0)
- {
+ if (buffer.length() > 0) {
String arg = buffer.toString();
commands.add(arg);
}