azkaban-aplcache

Logs kill command failures and removes unused code (#2075) *

1/7/2019 5:07:05 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 480f43a..d43fc68 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -16,8 +16,8 @@
 
 package azkaban.jobExecutor.utils.process;
 
-import azkaban.utils.LogGobbler;
 import com.google.common.base.Joiner;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.IOUtils;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
@@ -97,17 +96,6 @@ public class AzkabanProcess {
 
       this.startupLatch.countDown();
 
-      final LogGobbler outputGobbler =
-          new LogGobbler(
-              new InputStreamReader(this.process.getInputStream(), StandardCharsets.UTF_8),
-              this.logger, Level.INFO, 30);
-      final LogGobbler errorGobbler =
-          new LogGobbler(
-              new InputStreamReader(this.process.getErrorStream(), StandardCharsets.UTF_8),
-              this.logger, Level.ERROR, 30);
-
-      outputGobbler.start();
-      errorGobbler.start();
       int exitCode = -1;
       try {
         exitCode = this.process.waitFor();
@@ -117,17 +105,8 @@ public class AzkabanProcess {
 
       this.completeLatch.countDown();
 
-      // try to wait for everything to get logged out before exiting
-      outputGobbler.awaitCompletion(5000);
-      errorGobbler.awaitCompletion(5000);
-
       if (exitCode != 0) {
-        final String output =
-            new StringBuilder().append("Stdout:\n")
-                .append(outputGobbler.getRecentLog()).append("\n\n")
-                .append("Stderr:\n").append(errorGobbler.getRecentLog())
-                .append("\n").toString();
-        throw new ProcessFailureException(exitCode, output);
+        throw new ProcessFailureException();
       }
 
     } finally {
@@ -178,21 +157,13 @@ public class AzkabanProcess {
       throws InterruptedException {
     checkStarted();
     if (this.processId != 0 && isStarted()) {
-      try {
-        if (this.isExecuteAsUser) {
-          final String cmd =
-              String.format("%s %s %s %d", this.executeAsUserBinary,
-                  this.effectiveUser, KILL_COMMAND, this.processId);
-          Runtime.getRuntime().exec(cmd);
-        } else {
-          final String cmd = String.format("%s %d", KILL_COMMAND, this.processId);
-          Runtime.getRuntime().exec(cmd);
-        }
-        return this.completeLatch.await(time, unit);
-      } catch (final IOException e) {
-        this.logger.error("Kill attempt failed.", e);
+      String cmd = String.format("%s %d", KILL_COMMAND, this.processId);
+      if (this.isExecuteAsUser) {
+        cmd = String.format("%s %s %s", this.executeAsUserBinary, this.effectiveUser, cmd);
       }
-      return false;
+      executeAndWaitForKillCmd(cmd);
+      return this.completeLatch.await(time, unit);
+
     }
     return false;
   }
@@ -204,24 +175,38 @@ public class AzkabanProcess {
     checkStarted();
     if (isRunning()) {
       if (this.processId != 0) {
-        try {
-          if (this.isExecuteAsUser) {
-            final String cmd =
-                String.format("%s %s %s -9 %d", this.executeAsUserBinary,
-                    this.effectiveUser, KILL_COMMAND, this.processId);
-            Runtime.getRuntime().exec(cmd);
-          } else {
-            final String cmd = String.format("%s -9 %d", KILL_COMMAND, this.processId);
-            Runtime.getRuntime().exec(cmd);
-          }
-        } catch (final IOException e) {
-          this.logger.error("Kill attempt failed.", e);
+        String cmd = String.format("%s -9 %d", KILL_COMMAND, this.processId);
+        if (this.isExecuteAsUser) {
+          cmd = String.format("%s %s %s", this.executeAsUserBinary, this.effectiveUser, cmd);
         }
+        executeAndWaitForKillCmd(cmd);
       }
       this.process.destroy();
     }
   }
 
+  private void executeAndWaitForKillCmd(final String cmd) {
+    try {
+      final Process p = Runtime.getRuntime().exec(cmd);
+      // kill should return immediately since it only sends a signal
+      final int exitCode = p.waitFor();
+      if (exitCode != 0) {
+        this.logger.error("Kill terminated with exit code " + exitCode);
+        final BufferedReader b = new BufferedReader(
+            new InputStreamReader(p.getErrorStream(), StandardCharsets.UTF_8));
+        String line;
+        while ((line = b.readLine()) != null) {
+          // log why kill failed
+          this.logger.error(line);
+        }
+      }
+    } catch (final IOException e) {
+      this.logger.error("Kill attempt failed.", e);
+    } catch (final InterruptedException e) {
+      this.logger.error("Interrupted while waiting for kill command to complete.", e);
+    }
+  }
+
   /**
    * Attempt to get the process id for this process
    *
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
index 473dace..36d9d9d 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
@@ -20,20 +20,7 @@ public class ProcessFailureException extends RuntimeException {
 
   private static final long serialVersionUID = 1;
 
-  private final int exitCode;
-  private final String logSnippet;
-
-  public ProcessFailureException(final int exitCode, final String logSnippet) {
-    this.exitCode = exitCode;
-    this.logSnippet = logSnippet;
-  }
-
-  public int getExitCode() {
-    return this.exitCode;
-  }
-
-  public String getLogSnippet() {
-    return this.logSnippet;
+  public ProcessFailureException() {
   }
 
 }