azkaban-developers

Rollback changes on PR #2075: (#2094) Changes made on PR (#2075)

1/16/2019 3:42:55 PM
3.67.0

Details

diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index d43fc68..480f43a 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -16,8 +16,8 @@
 
 package azkaban.jobExecutor.utils.process;
 
+import azkaban.utils.LogGobbler;
 import com.google.common.base.Joiner;
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
@@ -96,6 +97,17 @@ public class AzkabanProcess {
 
       this.startupLatch.countDown();
 
+      final LogGobbler outputGobbler =
+          new LogGobbler(
+              new InputStreamReader(this.process.getInputStream(), StandardCharsets.UTF_8),
+              this.logger, Level.INFO, 30);
+      final LogGobbler errorGobbler =
+          new LogGobbler(
+              new InputStreamReader(this.process.getErrorStream(), StandardCharsets.UTF_8),
+              this.logger, Level.ERROR, 30);
+
+      outputGobbler.start();
+      errorGobbler.start();
       int exitCode = -1;
       try {
         exitCode = this.process.waitFor();
@@ -105,8 +117,17 @@ public class AzkabanProcess {
 
       this.completeLatch.countDown();
 
+      // try to wait for everything to get logged out before exiting
+      outputGobbler.awaitCompletion(5000);
+      errorGobbler.awaitCompletion(5000);
+
       if (exitCode != 0) {
-        throw new ProcessFailureException();
+        final String output =
+            new StringBuilder().append("Stdout:\n")
+                .append(outputGobbler.getRecentLog()).append("\n\n")
+                .append("Stderr:\n").append(errorGobbler.getRecentLog())
+                .append("\n").toString();
+        throw new ProcessFailureException(exitCode, output);
       }
 
     } finally {
@@ -157,13 +178,21 @@ public class AzkabanProcess {
       throws InterruptedException {
     checkStarted();
     if (this.processId != 0 && isStarted()) {
-      String cmd = String.format("%s %d", KILL_COMMAND, this.processId);
-      if (this.isExecuteAsUser) {
-        cmd = String.format("%s %s %s", this.executeAsUserBinary, this.effectiveUser, cmd);
+      try {
+        if (this.isExecuteAsUser) {
+          final String cmd =
+              String.format("%s %s %s %d", this.executeAsUserBinary,
+                  this.effectiveUser, KILL_COMMAND, this.processId);
+          Runtime.getRuntime().exec(cmd);
+        } else {
+          final String cmd = String.format("%s %d", KILL_COMMAND, this.processId);
+          Runtime.getRuntime().exec(cmd);
+        }
+        return this.completeLatch.await(time, unit);
+      } catch (final IOException e) {
+        this.logger.error("Kill attempt failed.", e);
       }
-      executeAndWaitForKillCmd(cmd);
-      return this.completeLatch.await(time, unit);
-
+      return false;
     }
     return false;
   }
@@ -175,38 +204,24 @@ public class AzkabanProcess {
     checkStarted();
     if (isRunning()) {
       if (this.processId != 0) {
-        String cmd = String.format("%s -9 %d", KILL_COMMAND, this.processId);
-        if (this.isExecuteAsUser) {
-          cmd = String.format("%s %s %s", this.executeAsUserBinary, this.effectiveUser, cmd);
+        try {
+          if (this.isExecuteAsUser) {
+            final String cmd =
+                String.format("%s %s %s -9 %d", this.executeAsUserBinary,
+                    this.effectiveUser, KILL_COMMAND, this.processId);
+            Runtime.getRuntime().exec(cmd);
+          } else {
+            final String cmd = String.format("%s -9 %d", KILL_COMMAND, this.processId);
+            Runtime.getRuntime().exec(cmd);
+          }
+        } catch (final IOException e) {
+          this.logger.error("Kill attempt failed.", e);
         }
-        executeAndWaitForKillCmd(cmd);
       }
       this.process.destroy();
     }
   }
 
-  private void executeAndWaitForKillCmd(final String cmd) {
-    try {
-      final Process p = Runtime.getRuntime().exec(cmd);
-      // kill should return immediately since it only sends a signal
-      final int exitCode = p.waitFor();
-      if (exitCode != 0) {
-        this.logger.error("Kill terminated with exit code " + exitCode);
-        final BufferedReader b = new BufferedReader(
-            new InputStreamReader(p.getErrorStream(), StandardCharsets.UTF_8));
-        String line;
-        while ((line = b.readLine()) != null) {
-          // log why kill failed
-          this.logger.error(line);
-        }
-      }
-    } catch (final IOException e) {
-      this.logger.error("Kill attempt failed.", e);
-    } catch (final InterruptedException e) {
-      this.logger.error("Interrupted while waiting for kill command to complete.", e);
-    }
-  }
-
   /**
    * Attempt to get the process id for this process
    *
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
index 36d9d9d..473dace 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
@@ -20,7 +20,20 @@ public class ProcessFailureException extends RuntimeException {
 
   private static final long serialVersionUID = 1;
 
-  public ProcessFailureException() {
+  private final int exitCode;
+  private final String logSnippet;
+
+  public ProcessFailureException(final int exitCode, final String logSnippet) {
+    this.exitCode = exitCode;
+    this.logSnippet = logSnippet;
+  }
+
+  public int getExitCode() {
+    return this.exitCode;
+  }
+
+  public String getLogSnippet() {
+    return this.logSnippet;
   }
 
 }