diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index 480f43a..d43fc68 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -16,8 +16,8 @@
package azkaban.jobExecutor.utils.process;
-import azkaban.utils.LogGobbler;
import com.google.common.base.Joiner;
+import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
/**
@@ -97,17 +96,6 @@ public class AzkabanProcess {
this.startupLatch.countDown();
- final LogGobbler outputGobbler =
- new LogGobbler(
- new InputStreamReader(this.process.getInputStream(), StandardCharsets.UTF_8),
- this.logger, Level.INFO, 30);
- final LogGobbler errorGobbler =
- new LogGobbler(
- new InputStreamReader(this.process.getErrorStream(), StandardCharsets.UTF_8),
- this.logger, Level.ERROR, 30);
-
- outputGobbler.start();
- errorGobbler.start();
int exitCode = -1;
try {
exitCode = this.process.waitFor();
@@ -117,17 +105,8 @@ public class AzkabanProcess {
this.completeLatch.countDown();
- // try to wait for everything to get logged out before exiting
- outputGobbler.awaitCompletion(5000);
- errorGobbler.awaitCompletion(5000);
-
if (exitCode != 0) {
- final String output =
- new StringBuilder().append("Stdout:\n")
- .append(outputGobbler.getRecentLog()).append("\n\n")
- .append("Stderr:\n").append(errorGobbler.getRecentLog())
- .append("\n").toString();
- throw new ProcessFailureException(exitCode, output);
+ throw new ProcessFailureException();
}
} finally {
@@ -178,21 +157,13 @@ public class AzkabanProcess {
throws InterruptedException {
checkStarted();
if (this.processId != 0 && isStarted()) {
- try {
- if (this.isExecuteAsUser) {
- final String cmd =
- String.format("%s %s %s %d", this.executeAsUserBinary,
- this.effectiveUser, KILL_COMMAND, this.processId);
- Runtime.getRuntime().exec(cmd);
- } else {
- final String cmd = String.format("%s %d", KILL_COMMAND, this.processId);
- Runtime.getRuntime().exec(cmd);
- }
- return this.completeLatch.await(time, unit);
- } catch (final IOException e) {
- this.logger.error("Kill attempt failed.", e);
+ String cmd = String.format("%s %d", KILL_COMMAND, this.processId);
+ if (this.isExecuteAsUser) {
+ cmd = String.format("%s %s %s", this.executeAsUserBinary, this.effectiveUser, cmd);
}
- return false;
+ executeAndWaitForKillCmd(cmd);
+ return this.completeLatch.await(time, unit);
+
}
return false;
}
@@ -204,24 +175,38 @@ public class AzkabanProcess {
checkStarted();
if (isRunning()) {
if (this.processId != 0) {
- try {
- if (this.isExecuteAsUser) {
- final String cmd =
- String.format("%s %s %s -9 %d", this.executeAsUserBinary,
- this.effectiveUser, KILL_COMMAND, this.processId);
- Runtime.getRuntime().exec(cmd);
- } else {
- final String cmd = String.format("%s -9 %d", KILL_COMMAND, this.processId);
- Runtime.getRuntime().exec(cmd);
- }
- } catch (final IOException e) {
- this.logger.error("Kill attempt failed.", e);
+ String cmd = String.format("%s -9 %d", KILL_COMMAND, this.processId);
+ if (this.isExecuteAsUser) {
+ cmd = String.format("%s %s %s", this.executeAsUserBinary, this.effectiveUser, cmd);
}
+ executeAndWaitForKillCmd(cmd);
}
this.process.destroy();
}
}
+ private void executeAndWaitForKillCmd(final String cmd) {
+ try {
+ final Process p = Runtime.getRuntime().exec(cmd);
+ // kill should return immediately since it only sends a signal
+ final int exitCode = p.waitFor();
+ if (exitCode != 0) {
+ this.logger.error("Kill terminated with exit code " + exitCode);
+ final BufferedReader b = new BufferedReader(
+ new InputStreamReader(p.getErrorStream(), StandardCharsets.UTF_8));
+ String line;
+ while ((line = b.readLine()) != null) {
+ // log why kill failed
+ this.logger.error(line);
+ }
+ }
+ } catch (final IOException e) {
+ this.logger.error("Kill attempt failed.", e);
+ } catch (final InterruptedException e) {
+ this.logger.error("Interrupted while waiting for kill command to complete.", e);
+ }
+ }
+
/**
* Attempt to get the process id for this process
*
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
index 473dace..36d9d9d 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/utils/process/ProcessFailureException.java
@@ -20,20 +20,7 @@ public class ProcessFailureException extends RuntimeException {
private static final long serialVersionUID = 1;
- private final int exitCode;
- private final String logSnippet;
-
- public ProcessFailureException(final int exitCode, final String logSnippet) {
- this.exitCode = exitCode;
- this.logSnippet = logSnippet;
- }
-
- public int getExitCode() {
- return this.exitCode;
- }
-
- public String getLogSnippet() {
- return this.logSnippet;
+ public ProcessFailureException() {
}
}