azkaban-aplcache

Retry the jobs on executor instead of killing them right away

3/20/2017 6:45:27 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java b/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
index 0d7eb28..0a49408 100644
--- a/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
+++ b/azkaban-common/src/main/java/azkaban/constants/ServerInternals.java
@@ -26,5 +26,12 @@ public class ServerInternals {
   public static final String AZKABAN_EXECUTOR_PORT_FILENAME = "executor.port";
 
   public static final String AZKABAN_SERVLET_CONTEXT_KEY = "azkaban_app";
-  
+
+
+  // Memory check retry interval when OOM in ms
+  public static final long MEMORY_CHECK_INTERVAL = 1000*60*1;
+
+  // Max number of memory check retry
+  public static final int MEMORY_CHECK_RETRY_LIMIT = 720;
+
 }
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index 54854d2..4eef247 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -16,6 +16,7 @@
 
 package azkaban.jobExecutor;
 
+import azkaban.constants.ServerInternals;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
@@ -55,6 +56,8 @@ public class ProcessJob extends AbstractProcessJob {
   public static final String USER_TO_PROXY = "user.to.proxy";
   public static final String KRB5CCNAME = "KRB5CCNAME";
 
+  private volatile boolean killed = false;
+
   public ProcessJob(final String jobId, final Props sysProps,
       final Props jobProps, final Logger log) {
     super(jobId, sysProps, jobProps, log);
@@ -76,18 +79,41 @@ public class ProcessJob extends AbstractProcessJob {
         && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
       long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
       Pair<Long, Long> memPair = getProcMemoryRequirement();
-      boolean isMemGranted =
-          SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(),
-              memPair.getSecond(), freeMemDecrAmt);
+      // retry backoff in ms
+      String oomMsg = String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
+          memPair.getFirst(), memPair.getSecond(), getId());
+      int attempt;
+      boolean isMemGranted = true;
+      for(attempt = 1; attempt <= ServerInternals.MEMORY_CHECK_RETRY_LIMIT; attempt++) {
+        isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
+        if (isMemGranted) {
+          info(String.format("Memory granted (Xms %d kb, Xmx %d kb) from system for job %s", memPair.getFirst(), memPair.getSecond(), getId()));
+          break;
+        }
+        if (attempt < ServerInternals.MEMORY_CHECK_RETRY_LIMIT) {
+          info(String.format(oomMsg + ", sleep for %s secs and retry, attempt %s of %s", TimeUnit.MILLISECONDS.toSeconds(ServerInternals.MEMORY_CHECK_INTERVAL), attempt, ServerInternals.MEMORY_CHECK_RETRY_LIMIT));
+          synchronized (this) {
+            try {
+              this.wait(ServerInternals.MEMORY_CHECK_INTERVAL);
+            } catch (InterruptedException e) {
+              info(String.format("Job %s interrupted while waiting for memory check retry", getId()));
+            }
+          }
+          if(killed) {
+            info(String.format("Job %s was killed while waiting for memory check retry", getId()));
+            return;
+          }
+        }
+      }
+
       if (!isMemGranted) {
         throw new Exception(
-            String
-                .format(
-                    "Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s",
-                    memPair.getFirst(), memPair.getSecond(), getId()));
+            String.format("Cannot request memory (Xms %d kb, Xmx %d kb) from system for job %s", memPair.getFirst(),
+                memPair.getSecond(), getId()));
       }
     }
 
+
     List<String> commands = null;
     try {
       commands = getCommandList();
@@ -127,6 +153,7 @@ public class ProcessJob extends AbstractProcessJob {
       }
     }
 
+
     for (String command : commands) {
       AzkabanProcessBuilder builder = null;
       if (isExecuteAsUser) {
@@ -261,10 +288,16 @@ public class ProcessJob extends AbstractProcessJob {
 
   @Override
   public void cancel() throws InterruptedException {
+    // in case the job is waiting
+    synchronized (this) {
+      killed = true;
+      this.notify();
+    }
+
     if (process == null)
       throw new IllegalStateException("Not started.");
-    boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
-    if (!killed) {
+    boolean processkilled = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
+    if (!processkilled) {
       warn("Kill with signal TERM failed. Killing with KILL signal.");
       process.hardKill();
     }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 5dbfee1..f421ae3 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -692,7 +692,6 @@ public class JobRunner extends EventHandler implements Runnable {
     try {
       job.run();
     } catch (Throwable e) {
-
       if (props.getBoolean("job.succeed.on.failure", false)) {
         finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
         logError("Job run failed, but will treat it like success.");