diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 54854d2..4eef247 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -16,6 +16,7 @@
package azkaban.jobExecutor;
+import azkaban.constants.ServerInternals;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
@@ -55,6 +56,8 @@ public class ProcessJob extends AbstractProcessJob {
public static final String USER_TO_PROXY = "user.to.proxy";
public static final String KRB5CCNAME = "KRB5CCNAME";
+ private volatile boolean killed = false;
+
public ProcessJob(final String jobId, final Props sysProps,
final Props jobProps, final Logger log) {
super(jobId, sysProps, jobProps, log);
@@ -76,18 +79,41 @@ public class ProcessJob extends AbstractProcessJob {
&& jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
Pair<Long, Long> memPair = getProcMemoryRequirement();
- boolean isMemGranted =
- SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(),
- memPair.getSecond(), freeMemDecrAmt);
+ // retry backoff in ms
+ String oomMsg = String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+ memPair.getFirst(), memPair.getSecond(), getId());
+ int attempt;
+ boolean isMemGranted = true;
+ for(attempt = 1; attempt <= ServerInternals.MEMORY_CHECK_RETRY_LIMIT; attempt++) {
+ isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+ if (isMemGranted) {
+ info(String.format("Memory granted (Xms %d kb, Xmx %d kb) from system for job %s", memPair.getFirst(), memPair.getSecond(), getId()));
+ break;
+ }
+ if (attempt < ServerInternals.MEMORY_CHECK_RETRY_LIMIT) {
+ info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s", TimeUnit.MILLISECONDS.toSeconds(ServerInternals.MEMORY_CHECK_INTERVAL), attempt, ServerInternals.MEMORY_CHECK_RETRY_LIMIT));
+ synchronized (this) {
+ try {
+ this.wait(ServerInternals.MEMORY_CHECK_INTERVAL);
+ } catch (InterruptedException e) {
+ info(String.format("Job %s interrupted while waiting for memory check retry", getId()));
+ }
+ }
+ if(killed) {
+ info(String.format("Job %s was killed while waiting for memory check retry", getId()));
+ return;
+ }
+ }
+ }
+
if (!isMemGranted) {
throw new Exception(
- String
- .format(
- "Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
- memPair.getFirst(), memPair.getSecond(), getId()));
+ String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s", memPair.getFirst(),
+ memPair.getSecond(), getId()));
}
}
+
List<String> commands = null;
try {
commands = getCommandList();
@@ -127,6 +153,7 @@ public class ProcessJob extends AbstractProcessJob {
}
}
+
for (String command : commands) {
AzkabanProcessBuilder builder = null;
if (isExecuteAsUser) {
@@ -261,10 +288,16 @@ public class ProcessJob extends AbstractProcessJob {
@Override
public void cancel() throws InterruptedException {
+ // in case the job is waiting
+ synchronized (this) {
+ killed = true;
+ this.notify();
+ }
+
if (process == null)
throw new IllegalStateException("Not started.");
- boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
- if (!killed) {
+ boolean processkilled = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
+ if (!processkilled) {
warn("Kill with signal TERM failed. Killing with KILL signal.");
process.hardKill();
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 5dbfee1..f421ae3 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -692,7 +692,6 @@ public class JobRunner extends EventHandler implements Runnable {
try {
job.run();
} catch (Throwable e) {
-
if (props.getBoolean("job.succeed.on.failure", false)) {
finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
logError("Job run failed, but will treat it like success.");