diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index d43fc68..480f43a 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -16,8 +16,8 @@
package azkaban.jobExecutor.utils.process;
+import azkaban.utils.LogGobbler;
import com.google.common.base.Joiner;
-import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Level;
import org.apache.log4j.Logger;
/**
@@ -96,6 +97,17 @@ public class AzkabanProcess {
this.startupLatch.countDown();
+ final LogGobbler outputGobbler =
+ new LogGobbler(
+ new InputStreamReader(this.process.getInputStream(), StandardCharsets.UTF_8),
+ this.logger, Level.INFO, 30);
+ final LogGobbler errorGobbler =
+ new LogGobbler(
+ new InputStreamReader(this.process.getErrorStream(), StandardCharsets.UTF_8),
+ this.logger, Level.ERROR, 30);
+
+ outputGobbler.start();
+ errorGobbler.start();
int exitCode = -1;
try {
exitCode = this.process.waitFor();
@@ -105,8 +117,17 @@ public class AzkabanProcess {
this.completeLatch.countDown();
+ // try to wait for everything to get logged out before exiting
+ outputGobbler.awaitCompletion(5000);
+ errorGobbler.awaitCompletion(5000);
+
if (exitCode != 0) {
- throw new ProcessFailureException();
+ final String output =
+ new StringBuilder().append("Stdout:\n")
+ .append(outputGobbler.getRecentLog()).append("\n\n")
+ .append("Stderr:\n").append(errorGobbler.getRecentLog())
+ .append("\n").toString();
+ throw new ProcessFailureException(exitCode, output);
}
} finally {
@@ -157,13 +178,21 @@ public class AzkabanProcess {
throws InterruptedException {
checkStarted();
if (this.processId != 0 && isStarted()) {
- String cmd = String.format("%s %d", KILL_COMMAND, this.processId);
- if (this.isExecuteAsUser) {
- cmd = String.format("%s %s %s", this.executeAsUserBinary, this.effectiveUser, cmd);
+ try {
+ if (this.isExecuteAsUser) {
+ final String cmd =
+ String.format("%s %s %s %d", this.executeAsUserBinary,
+ this.effectiveUser, KILL_COMMAND, this.processId);
+ Runtime.getRuntime().exec(cmd);
+ } else {
+ final String cmd = String.format("%s %d", KILL_COMMAND, this.processId);
+ Runtime.getRuntime().exec(cmd);
+ }
+ return this.completeLatch.await(time, unit);
+ } catch (final IOException e) {
+ this.logger.error("Kill attempt failed.", e);
}
- executeAndWaitForKillCmd(cmd);
- return this.completeLatch.await(time, unit);
-
+ return false;
}
return false;
}
@@ -175,38 +204,24 @@ public class AzkabanProcess {
checkStarted();
if (isRunning()) {
if (this.processId != 0) {
- String cmd = String.format("%s -9 %d", KILL_COMMAND, this.processId);
- if (this.isExecuteAsUser) {
- cmd = String.format("%s %s %s", this.executeAsUserBinary, this.effectiveUser, cmd);
+ try {
+ if (this.isExecuteAsUser) {
+ final String cmd =
+ String.format("%s %s %s -9 %d", this.executeAsUserBinary,
+ this.effectiveUser, KILL_COMMAND, this.processId);
+ Runtime.getRuntime().exec(cmd);
+ } else {
+ final String cmd = String.format("%s -9 %d", KILL_COMMAND, this.processId);
+ Runtime.getRuntime().exec(cmd);
+ }
+ } catch (final IOException e) {
+ this.logger.error("Kill attempt failed.", e);
}
- executeAndWaitForKillCmd(cmd);
}
this.process.destroy();
}
}
- private void executeAndWaitForKillCmd(final String cmd) {
- try {
- final Process p = Runtime.getRuntime().exec(cmd);
- // kill should return immediately since it only sends a signal
- final int exitCode = p.waitFor();
- if (exitCode != 0) {
- this.logger.error("Kill terminated with exit code " + exitCode);
- final BufferedReader b = new BufferedReader(
- new InputStreamReader(p.getErrorStream(), StandardCharsets.UTF_8));
- String line;
- while ((line = b.readLine()) != null) {
- // log why kill failed
- this.logger.error(line);
- }
- }
- } catch (final IOException e) {
- this.logger.error("Kill attempt failed.", e);
- } catch (final InterruptedException e) {
- this.logger.error("Interrupted while waiting for kill command to complete.", e);
- }
- }
-
/**
* Attempt to get the process id for this process
*
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
index 36d9d9d..473dace 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
@@ -20,7 +20,20 @@ public class ProcessFailureException extends RuntimeException {
private static final long serialVersionUID = 1;
- public ProcessFailureException() {
+ private final int exitCode;
+ private final String logSnippet;
+
+ public ProcessFailureException(final int exitCode, final String logSnippet) {
+ this.exitCode = exitCode;
+ this.logSnippet = logSnippet;
+ }
+
+ public int getExitCode() {
+ return this.exitCode;
+ }
+
+ public String getLogSnippet() {
+ return this.logSnippet;
}
}