azkaban-developers
Changes
src/java/azkaban/executor/FlowRunner.java 63(+42 -21)
src/java/azkaban/executor/FlowRunnerManager.java 200(+94 -106)
src/java/azkaban/executor/JobRunner.java 16(+5 -11)
unit/executions/exectest1/job4.job 2(+1 -1)
Details
src/java/azkaban/executor/FlowRunner.java 63(+42 -21)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index dd21207..04f0991 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -114,12 +114,12 @@ public class FlowRunner extends EventHandler implements Runnable {
flowAppender.close();
}
- public synchronized void cancel(String user) {
- logger.info("Cancel called by " + user);
+ private synchronized void cancel() {
cancelled = true;
- executorService.shutdownNow();
-
+ executorService.shutdown();
+ jobsToRun.clear();
+
if (pausedJobsToRun.size() > 0) {
logger.info("Cancelling... Clearing paused jobs queue of size " + pausedJobsToRun.size());
pausedJobsToRun.clear();
@@ -149,17 +149,17 @@ public class FlowRunner extends EventHandler implements Runnable {
default:
}
}
-
- logger.info("Flow cancelled.");
- if (flow.getStatus() == Status.FAILED_FINISHING) {
- flow.setStatus(Status.FAILED);
- }
- else if (flow.getStatus() != Status.FAILED) {
- flow.setStatus(Status.KILLED);
- }
+
long time = System.currentTimeMillis();
- flow.setStartTime(time);
flow.setEndTime(time);
+ setStatus(flow, Status.FAILED);
+
+ }
+
+ public synchronized void cancel(String user) {
+ logger.info("Cancel called by " + user);
+ setStatus(flow, Status.KILLED);
+ cancel();
}
public synchronized void pause(String user) {
@@ -192,8 +192,7 @@ public class FlowRunner extends EventHandler implements Runnable {
int count = commitCount.getAndIncrement();
try {
- ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow,
- count);
+ ExecutableFlowLoader.writeExecutableFlowFile(this.basePath, flow, count);
} catch (ExecutorManagerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -298,6 +297,7 @@ public class FlowRunner extends EventHandler implements Runnable {
}
commitFlow();
+ System.out.println("Reached flow finished");
this.fireEventListeners(Event.create(this, Type.FLOW_FINISHED));
closeLogger();
}
@@ -443,7 +443,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private void handleFailedJob(ExecutableNode node) {
System.err.println("Job " + node.getId() + " failed.");
- if (flow.getStatus() != Status.FAILED_FINISHING && flow.getStatus() != Status.FAILED) {
+ if (flow.getStatus() != Status.FAILED_FINISHING && flow.getStatus() != Status.FAILED && flow.getStatus() != Status.KILLED) {
this.fireEventListeners(Event.create(this, Type.FLOW_FAILED_FINISHING));
}
@@ -451,10 +451,9 @@ public class FlowRunner extends EventHandler implements Runnable {
// We finish running current jobs and then fail. Do not accept new jobs.
case FINISH_CURRENTLY_RUNNING:
logger.info("Failure Action: Finish up remaining running jobs.");
- flow.setStatus(Status.FAILED_FINISHING);
+ setStatus(flow, Status.FAILED_FINISHING);
runningJobs.clear();
- executorService.shutdown();
// Go through and mark everything else killed.
long endTime = System.currentTimeMillis();
@@ -473,18 +472,40 @@ public class FlowRunner extends EventHandler implements Runnable {
// We kill all running jobs and fail immediately
case CANCEL_ALL:
logger.info("Failure Action: Kill flow immediately.");
- flow.setStatus(Status.FAILED);
- this.cancel("azkaban");
+ setStatus(flow, Status.FAILED);
+ this.cancel();
break;
default:
logger.info("Failure Action: Finishing accessible jobs.");
- flow.setStatus(Status.FAILED_FINISHING);
+ setStatus(flow, Status.FAILED_FINISHING);
queueNextJobs(node);
}
runningJobs.remove(node.getId());
}
+ // We use this so we can have status priority.
+ private void setStatus(ExecutableFlow flow, Status status) {
+ // Here's the order we can go with the flow:
+ if (flow.getStatus() == Status.KILLED) {
+ // Killed overrides everything.
+ return;
+ }
+ else if (flow.getStatus() == Status.FAILED_FINISHING ) {
+ if (status == Status.KILLED || status == status.FAILED) {
+ // Only override if it's KILLED or FAILED.
+ flow.setStatus(status);
+ }
+ }
+ else if (flow.getStatus() == Status.FAILED || flow.getStatus() == Status.SUCCEEDED) {
+ // Will not override a finished flow
+ return;
+ }
+ else {
+ flow.setStatus(status);
+ }
+ }
+
private class JobRunnerEventListener implements EventListener {
private FlowRunner flowRunner;
src/java/azkaban/executor/FlowRunnerManager.java 200(+94 -106)
diff --git a/src/java/azkaban/executor/FlowRunnerManager.java b/src/java/azkaban/executor/FlowRunnerManager.java
index 3107491..164479c 100644
--- a/src/java/azkaban/executor/FlowRunnerManager.java
+++ b/src/java/azkaban/executor/FlowRunnerManager.java
@@ -24,12 +24,12 @@ import azkaban.utils.Props;
/**
* Execution manager for the server side execution.
- *
+ *
*/
public class FlowRunnerManager {
private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
private File basePath;
-
+
private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
private ConcurrentHashMap<String, FlowRunner> runningFlows = new ConcurrentHashMap<String, FlowRunner>();
private LinkedBlockingQueue<FlowRunner> queue = new LinkedBlockingQueue<FlowRunner>();
@@ -43,206 +43,194 @@ public class FlowRunnerManager {
private String senderAddress;
private String clientHostname;
private String clientPortNumber;
-
+
private Props globalProps;
-
+
public FlowRunnerManager(Props props, Props globalProps, Mailman mailer) {
this.mailer = mailer;
-
+
this.senderAddress = props.getString("mail.sender");
this.clientHostname = props.getString("jetty.hostname", "localhost");
this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
-
+
basePath = new File(props.getString("execution.directory"));
numThreads = props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
executorService = Executors.newFixedThreadPool(numThreads);
eventListener = new FlowRunnerEventListener(this);
-
+
submitterThread = new SubmitterThread(queue);
submitterThread.start();
}
-
+
public void submitFlow(String id, String path) throws ExecutorManagerException {
// Load file and submit
logger.info("Flow " + id + " submitted with path " + path);
-
+
File dir = new File(path);
ExecutableFlow flow = ExecutableFlowLoader.loadExecutableFlowFromDir(dir);
flow.setExecutionPath(path);
-
+
FlowRunner runner = new FlowRunner(flow);
runningFlows.put(id, runner);
runner.setGlobalProps(globalProps);
runner.addListener(eventListener);
executorService.submit(runner);
}
-
+
public void cancelFlow(String id, String user) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(id);
if (runner != null) {
runner.cancel(user);
}
}
-
+
public void pauseFlow(String id, String user) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(id);
if (runner != null) {
runner.pause(user);
}
}
-
+
public void resumeFlow(String id, String user) throws ExecutorManagerException {
FlowRunner runner = runningFlows.get(id);
if (runner != null) {
runner.resume(user);
}
}
-
+
public FlowRunner getFlowRunner(String id) {
return runningFlows.get(id);
}
-
+
public ExecutableFlow getExecutableFlow(String id) {
FlowRunner runner = runningFlows.get(id);
if (runner == null) {
return null;
}
-
+
return runner.getFlow();
}
-
+
private class SubmitterThread extends Thread {
private BlockingQueue<FlowRunner> queue;
private boolean shutdown = false;
-
+
public SubmitterThread(BlockingQueue<FlowRunner> queue) {
this.queue = queue;
}
-
+
public void shutdown() {
shutdown = true;
this.interrupt();
}
-
+
public void run() {
- while(!shutdown) {
+ while (!shutdown) {
try {
FlowRunner flowRunner = queue.take();
executorService.submit(flowRunner);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");
}
}
}
}
-
+
private class FlowRunnerEventListener implements EventListener {
private FlowRunnerManager manager;
-
+
public FlowRunnerEventListener(FlowRunnerManager manager) {
this.manager = manager;
}
@Override
public synchronized void handleEvent(Event event) {
- FlowRunner runner = (FlowRunner)event.getRunner();
+ FlowRunner runner = (FlowRunner) event.getRunner();
ExecutableFlow flow = runner.getFlow();
-
-
-
+
System.out.println("Event " + flow.getExecutionId() + " " + flow.getFlowId() + " " + event.getType());
if (event.getType() == Type.FLOW_FINISHED) {
- if(flow.getStatus() == Status.SUCCEEDED)
+ if (flow.getStatus() == Status.SUCCEEDED)
sendSuccessEmail(runner);
- else sendErrorEmail(runner);
-
+ else
+ sendErrorEmail(runner);
+
logger.info("Flow " + flow.getExecutionId() + " has finished.");
runningFlows.remove(flow.getExecutionId());
-
}
}
}
-
- private List<String> getLogURLs(FlowRunner runner)
- {
+
+ private List<String> getLogURLs(FlowRunner runner) {
List<String> logURLs = new ArrayList<String>();
-
+
String flowID = runner.getFlow().getFlowId();
String execID = runner.getFlow().getExecutionId();
List<String> jobIDs = runner.getJobsFinished();
-
- //first construct log URL;
- String logURL = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "#log";
+
+ // first construct log URL;
+ String logURL = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID
+ + "#log";
logURLs.add(logURL);
- //then the individual jobs log URL that actually ran
- for(String jobID : jobIDs) {
- String jobLog = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execID + "&flow=" + flowID + "&job=" + jobID;
+ // then the individual jobs log URL that actually ran
+ for (String jobID : jobIDs) {
+ String jobLog = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid="
+ + execID + "&flow=" + flowID + "&job=" + jobID;
logURLs.add(jobLog);
}
-
+
return logURLs;
}
-
- /*
- * Wrap a single exception with the name of the scheduled job
- */
- private void sendErrorEmail(FlowRunner runner) {
- ExecutableFlow flow = runner.getFlow();
- List<String> emailList = flow.getFailureEmails();
- if(emailList != null && !emailList.isEmpty() && mailer != null) {
-
-
-
-
- try {
-
- String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
- String body = "The Flow '" + flow.getFlowId() + "' failed. \n See logs below: \n" ;
- for(String URL : getLogURLs(runner)) {
- body += (URL + "\n");
- }
-
- mailer.sendEmailIfPossible(senderAddress,
- emailList,
- subject,
- body);
- } catch(UnknownHostException uhe) {
- logger.error(uhe);
- }
- catch (Exception e) {
- logger.error(e);
- }
- }
- }
-
-
- private void sendSuccessEmail(FlowRunner runner) {
-
- ExecutableFlow flow = runner.getFlow();
-
- List<String> emailList = flow.getSuccessEmails();
-
- if(emailList != null && !emailList.isEmpty() && mailer != null) {
- try {
-
- String subject = "Flow '" + flow.getFlowId() + "' has completed on " + InetAddress.getLocalHost().getHostName() + "!";
- String body = "The Flow '" + flow.getFlowId() + "' succeeded. \n See logs below: \n" ;
- for(String URL : getLogURLs(runner)) {
- body += (URL + "\n");
- }
-
- mailer.sendEmailIfPossible(senderAddress,
- emailList,
- subject,
- body);
- } catch(UnknownHostException uhe) {
- logger.error(uhe);
- }
- catch (Exception e) {
- logger.error(e);
- }
- }
- }
-
+
+ /*
+ * Wrap a single exception with the name of the scheduled job
+ */
+ private void sendErrorEmail(FlowRunner runner) {
+ ExecutableFlow flow = runner.getFlow();
+ List<String> emailList = flow.getFailureEmails();
+ if (emailList != null && !emailList.isEmpty() && mailer != null) {
+
+ try {
+
+ String subject = "Flow '" + flow.getFlowId() + "' has completed on "
+ + InetAddress.getLocalHost().getHostName() + "!";
+ String body = "The Flow '" + flow.getFlowId() + "' failed. \n See logs below: \n";
+ for (String URL : getLogURLs(runner)) {
+ body += (URL + "\n");
+ }
+
+ mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+ } catch (UnknownHostException uhe) {
+ logger.error(uhe);
+ } catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+
+ private void sendSuccessEmail(FlowRunner runner) {
+
+ ExecutableFlow flow = runner.getFlow();
+
+ List<String> emailList = flow.getSuccessEmails();
+
+ if (emailList != null && !emailList.isEmpty() && mailer != null) {
+ try {
+
+ String subject = "Flow '" + flow.getFlowId() + "' has completed on "
+ + InetAddress.getLocalHost().getHostName() + "!";
+ String body = "The Flow '" + flow.getFlowId() + "' succeeded. \n See logs below: \n";
+ for (String URL : getLogURLs(runner)) {
+ body += (URL + "\n");
+ }
+
+ mailer.sendEmailIfPossible(senderAddress, emailList, subject, body);
+ } catch (UnknownHostException uhe) {
+ logger.error(uhe);
+ } catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+
}
src/java/azkaban/executor/JobRunner.java 16(+5 -11)
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 6af4d3d..a1bdef0 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -22,8 +22,7 @@ import azkaban.jobExecutor.utils.JobWrappingFactory;
import azkaban.utils.Props;
public class JobRunner extends EventHandler implements Runnable {
- private static final Layout DEFAULT_LAYOUT = new PatternLayout(
- "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+ private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
private Props props;
private Props outputProps;
@@ -120,6 +119,7 @@ public class JobRunner extends EventHandler implements Runnable {
job.run();
} catch (Throwable e) {
succeeded = false;
+ node.setStatus(Status.FAILED);
logError("Job run failed!");
e.printStackTrace();
}
@@ -132,7 +132,7 @@ public class JobRunner extends EventHandler implements Runnable {
}
this.fireEventListeners(Event.create(this, Type.JOB_SUCCEEDED));
} else {
- node.setStatus(Status.FAILED);
+ System.out.println("Setting FAILED to " + node.getId());
this.fireEventListeners(Event.create(this, Type.JOB_FAILED));
}
logInfo("Finishing job " + node.getId() + " at " + node.getEndTime());
@@ -150,14 +150,8 @@ public class JobRunner extends EventHandler implements Runnable {
try {
job.cancel();
} catch (Exception e) {
- logError("Failed trying to cancel job!");
- e.printStackTrace();
- }
-
- // will just interrupt, I guess, until the code is finished.
- this.notifyAll();
- if (node.getStatus() != Status.FAILED) {
- node.setStatus(Status.KILLED);
+ logError(e.getMessage());
+ logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
}
}
diff --git a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
index f22923d..a4ac15d 100644
--- a/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
+++ b/src/java/azkaban/jobExecutor/utils/process/AzkabanProcess.java
@@ -44,236 +44,230 @@ import com.google.common.base.Joiner;
*/
public class AzkabanProcess {
- private final String workingDir;
- private final List<String> cmd;
- private final Map<String, String> env;
- private final Logger logger;
- private final CountDownLatch startupLatch;
- private final CountDownLatch completeLatch;
- private volatile int processId;
- private volatile Process process;
-
- public AzkabanProcess(final List<String> cmd,
- final Map<String, String> env, final String workingDir,
- final Logger logger) {
- this.cmd = cmd;
- this.env = env;
- this.workingDir = workingDir;
- this.processId = -1;
- this.startupLatch = new CountDownLatch(1);
- this.completeLatch = new CountDownLatch(1);
- this.logger = logger;
- }
-
- /**
- * Execute this process, blocking until it has completed.
- */
- public void run() throws IOException {
- if (this.isStarted() || this.isComplete()) {
- throw new IllegalStateException(
- "The process can only be used once.");
- }
-
- ProcessBuilder builder = new ProcessBuilder(cmd);
- builder.directory(new File(workingDir));
- builder.environment().putAll(env);
- this.process = builder.start();
- this.processId = processId(process);
- if (processId == 0) {
- logger.debug("Spawned thread with unknown process id");
- } else {
- logger.debug("Spawned thread with process id " + processId);
- }
-
- this.startupLatch.countDown();
-
- LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(
- process.getInputStream()), logger, Level.INFO, 30);
- LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(
- process.getErrorStream()), logger, Level.ERROR, 30);
-
- outputGobbler.start();
- errorGobbler.start();
- int exitCode = -1;
- try {
- exitCode = process.waitFor();
- } catch (InterruptedException e) {
- logger.info("Process interrupted.", e);
- }
-
- completeLatch.countDown();
- if (exitCode != 0) {
- throw new ProcessFailureException(exitCode,
- errorGobbler.getRecentLog());
- }
-
- // try to wait for everything to get logged out before exiting
- outputGobbler.awaitCompletion(5000);
- errorGobbler.awaitCompletion(5000);
- }
-
- /**
- * Await the completion of this process
- *
- * @throws InterruptedException
- * if the thread is interrupted while waiting.
- */
- public void awaitCompletion() throws InterruptedException {
- this.completeLatch.await();
- }
-
- /**
- * Await the start of this process
- *
- * @throws InterruptedException
- * if the thread is interrupted while waiting.
- */
- public void awaitStartup() throws InterruptedException {
- this.startupLatch.await();
- }
-
- /**
- * Get the process id for this process, if it has started.
- *
- * @return The process id or -1 if it cannot be fetched
- */
- public int getProcessId() {
- checkStarted();
- return this.processId;
- }
-
- /**
- * Attempt to kill the process, waiting up to the given time for it to die
- *
- * @param time
- * The amount of time to wait
- * @param unit
- * The time unit
- * @return true iff this soft kill kills the process in the given wait time.
- */
- public boolean softKill(final long time, final TimeUnit unit)
- throws InterruptedException {
- checkStarted();
- if (processId != 0 && isStarted()) {
- try {
- Runtime.getRuntime().exec("kill " + processId);
- return completeLatch.await(time, unit);
- } catch (IOException e) {
- logger.error("Kill attempt failed.", e);
- }
- return false;
- }
- return false;
- }
-
- /**
- * Force kill this process
- */
- public void hardKill() {
- checkStarted();
- if (isRunning()) {
- process.destroy();
- }
- }
-
- /**
- * Attempt to get the process id for this process
- *
- * @param process
- * The process to get the id from
- * @return The id of the process
- */
- private int processId(final java.lang.Process process) {
- int processId = 0;
- try {
- Field f = process.getClass().getDeclaredField("pid");
- f.setAccessible(true);
-
- processId = f.getInt(process);
- } catch (Throwable e) {
- e.printStackTrace();
- }
-
- return processId;
- }
-
- /**
- * @return true iff the process has been started
- */
- public boolean isStarted() {
- return startupLatch.getCount() == 0L;
- }
-
- /**
- * @return true iff the process has completed
- */
- public boolean isComplete() {
- return completeLatch.getCount() == 0L;
- }
-
- /**
- * @return true iff the process is currently running
- */
- public boolean isRunning() {
- return isStarted() && !isComplete();
- }
-
- public void checkStarted() {
- if (!isStarted()) {
- throw new IllegalStateException("Process has not yet started.");
- }
- }
-
- @Override
- public String toString() {
- return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env
- + ", cwd = " + workingDir + ")";
- }
-
- private static class LogGobbler extends Thread {
-
- private final BufferedReader inputReader;
- private final Logger logger;
- private final Level loggingLevel;
- private final CircularBuffer<String> buffer;
-
- public LogGobbler(final Reader inputReader, final Logger logger,
- final Level level, final int bufferLines) {
- this.inputReader = new BufferedReader(inputReader);
- this.logger = logger;
- this.loggingLevel = level;
- buffer = new CircularBuffer<String>(bufferLines);
- }
-
- @Override
- public void run() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- String line = inputReader.readLine();
- if (line == null) {
- return;
- }
-
- buffer.append(line);
- logger.log(loggingLevel, line);
- }
- } catch (IOException e) {
- logger.error("Error reading from logging stream:", e);
- }
- }
-
- public void awaitCompletion(final long waitMs) {
- try {
- join(waitMs);
- } catch (InterruptedException e) {
- logger.info("I/O thread interrupted.", e);
- }
- }
-
- public String getRecentLog() {
- return Joiner.on(System.getProperty("line.separator")).join(buffer);
- }
-
- }
+ private final String workingDir;
+ private final List<String> cmd;
+ private final Map<String, String> env;
+ private final Logger logger;
+ private final CountDownLatch startupLatch;
+ private final CountDownLatch completeLatch;
+ private volatile int processId;
+ private volatile Process process;
+
+ public AzkabanProcess(final List<String> cmd, final Map<String, String> env, final String workingDir,
+ final Logger logger) {
+ this.cmd = cmd;
+ this.env = env;
+ this.workingDir = workingDir;
+ this.processId = -1;
+ this.startupLatch = new CountDownLatch(1);
+ this.completeLatch = new CountDownLatch(1);
+ this.logger = logger;
+ }
+
+ /**
+ * Execute this process, blocking until it has completed.
+ */
+ public void run() throws IOException {
+ if (this.isStarted() || this.isComplete()) {
+ throw new IllegalStateException("The process can only be used once.");
+ }
+
+ ProcessBuilder builder = new ProcessBuilder(cmd);
+ builder.directory(new File(workingDir));
+ builder.environment().putAll(env);
+ this.process = builder.start();
+ this.processId = processId(process);
+ if (processId == 0) {
+ logger.debug("Spawned thread with unknown process id");
+ } else {
+ logger.debug("Spawned thread with process id " + processId);
+ }
+
+ this.startupLatch.countDown();
+
+ LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(process.getInputStream()), logger, Level.INFO,
+ 30);
+ LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process.getErrorStream()), logger, Level.ERROR,
+ 30);
+
+ outputGobbler.start();
+ errorGobbler.start();
+ int exitCode = -1;
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException e) {
+ logger.info("Process interrupted. Exit code is " + exitCode, e);
+ }
+
+ completeLatch.countDown();
+ if (exitCode != 0) {
+ throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
+ }
+
+ // try to wait for everything to get logged out before exiting
+ outputGobbler.awaitCompletion(5000);
+ errorGobbler.awaitCompletion(5000);
+ }
+
+ /**
+ * Await the completion of this process
+ *
+ * @throws InterruptedException
+ * if the thread is interrupted while waiting.
+ */
+ public void awaitCompletion() throws InterruptedException {
+ this.completeLatch.await();
+ }
+
+ /**
+ * Await the start of this process
+ *
+ * @throws InterruptedException
+ * if the thread is interrupted while waiting.
+ */
+ public void awaitStartup() throws InterruptedException {
+ this.startupLatch.await();
+ }
+
+ /**
+ * Get the process id for this process, if it has started.
+ *
+ * @return The process id or -1 if it cannot be fetched
+ */
+ public int getProcessId() {
+ checkStarted();
+ return this.processId;
+ }
+
+ /**
+ * Attempt to kill the process, waiting up to the given time for it to die
+ *
+ * @param time
+ * The amount of time to wait
+ * @param unit
+ * The time unit
+ * @return true iff this soft kill kills the process in the given wait time.
+ */
+ public boolean softKill(final long time, final TimeUnit unit) throws InterruptedException {
+ checkStarted();
+ if (processId != 0 && isStarted()) {
+ try {
+ Runtime.getRuntime().exec("kill " + processId);
+ return completeLatch.await(time, unit);
+ } catch (IOException e) {
+ logger.error("Kill attempt failed.", e);
+ }
+ return false;
+ }
+ return false;
+ }
+
+ /**
+ * Force kill this process
+ */
+ public void hardKill() {
+ checkStarted();
+ if (isRunning()) {
+ process.destroy();
+ }
+ }
+
+ /**
+ * Attempt to get the process id for this process
+ *
+ * @param process
+ * The process to get the id from
+ * @return The id of the process
+ */
+ private int processId(final java.lang.Process process) {
+ int processId = 0;
+ try {
+ Field f = process.getClass().getDeclaredField("pid");
+ f.setAccessible(true);
+
+ processId = f.getInt(process);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+
+ return processId;
+ }
+
+ /**
+ * @return true iff the process has been started
+ */
+ public boolean isStarted() {
+ return startupLatch.getCount() == 0L;
+ }
+
+ /**
+ * @return true iff the process has completed
+ */
+ public boolean isComplete() {
+ return completeLatch.getCount() == 0L;
+ }
+
+ /**
+ * @return true iff the process is currently running
+ */
+ public boolean isRunning() {
+ return isStarted() && !isComplete();
+ }
+
+ public void checkStarted() {
+ if (!isStarted()) {
+ throw new IllegalStateException("Process has not yet started.");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env + ", cwd = " + workingDir + ")";
+ }
+
+ private static class LogGobbler extends Thread {
+
+ private final BufferedReader inputReader;
+ private final Logger logger;
+ private final Level loggingLevel;
+ private final CircularBuffer<String> buffer;
+
+ public LogGobbler(final Reader inputReader, final Logger logger, final Level level, final int bufferLines) {
+ this.inputReader = new BufferedReader(inputReader);
+ this.logger = logger;
+ this.loggingLevel = level;
+ buffer = new CircularBuffer<String>(bufferLines);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ String line = inputReader.readLine();
+ if (line == null) {
+ return;
+ }
+
+ buffer.append(line);
+ logger.log(loggingLevel, line);
+ }
+ } catch (IOException e) {
+ logger.error("Error reading from logging stream:", e);
+ }
+ }
+
+ public void awaitCompletion(final long waitMs) {
+ try {
+ join(waitMs);
+ } catch (InterruptedException e) {
+ logger.info("I/O thread interrupted.", e);
+ }
+ }
+
+ public String getRecentLog() {
+ return Joiner.on(System.getProperty("line.separator")).join(buffer);
+ }
+
+ }
}
unit/executions/exectest1/job4.job 2(+1 -1)
diff --git a/unit/executions/exectest1/job4.job b/unit/executions/exectest1/job4.job
index 1cbac6f..1eccb73 100644
--- a/unit/executions/exectest1/job4.job
+++ b/unit/executions/exectest1/job4.job
@@ -1,5 +1,5 @@
type=java
job.class=azkaban.test.executor.SleepJavaJob
dependencies=job2
-seconds=4
+seconds=8
fail=false
diff --git a/unit/java/azkaban/test/executor/FlowRunnerTest.java b/unit/java/azkaban/test/executor/FlowRunnerTest.java
index ab0970f..7bdc413 100644
--- a/unit/java/azkaban/test/executor/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/executor/FlowRunnerTest.java
@@ -195,6 +195,62 @@ public class FlowRunnerTest {
}
}
+ @Test
+ public void execAndCancel() throws Exception {
+ File testDir = new File("unit/executions/exectest1");
+ ExecutableFlow exFlow = prepareExecDir(testDir, "exec1");
+
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = new FlowRunner(exFlow);
+ runner.addListener(eventCollector);
+
+ Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(exFlow.getStatus() == Status.READY);
+ Thread thread = new Thread(runner);
+ thread.start();
+
+ synchronized(this) {
+ try {
+ wait(4500);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ runner.cancel("me");
+ }
+ synchronized(this) {
+ // Wait for cleanup.
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ testStatus(exFlow, "job5", Status.KILLED);
+ testStatus(exFlow, "job7", Status.KILLED);
+ testStatus(exFlow, "job8", Status.KILLED);
+ testStatus(exFlow, "job9", Status.KILLED);
+ testStatus(exFlow, "job10", Status.KILLED);
+ testStatus(exFlow, "job1", Status.SUCCEEDED);
+ testStatus(exFlow, "job2", Status.SUCCEEDED);
+ testStatus(exFlow, "job3", Status.FAILED);
+ testStatus(exFlow, "job4", Status.FAILED);
+ testStatus(exFlow, "job6", Status.FAILED);
+
+ Assert.assertTrue("Expected KILLED status instead got " + exFlow.getStatus(),exFlow.getStatus() == Status.KILLED);
+
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FINISHED});
+ }
+ catch (Exception e) {
+ System.out.println(e.getMessage());
+ eventCollector.writeAllEvents();
+ Assert.fail(e.getMessage());
+ }
+ }
+
private void testStatus(ExecutableFlow flow, String name, Status status) {
ExecutableNode node = flow.getExecutableNode(name);
diff --git a/unit/java/azkaban/test/executor/JobRunnerTest.java b/unit/java/azkaban/test/executor/JobRunnerTest.java
index 7804d21..cb2d432 100644
--- a/unit/java/azkaban/test/executor/JobRunnerTest.java
+++ b/unit/java/azkaban/test/executor/JobRunnerTest.java
@@ -200,12 +200,12 @@ public class JobRunnerTest {
eventCollector.handleEvent(Event.create(null, Type.JOB_STARTED));
Thread thread = new Thread(runner);
- thread.run();
+ thread.start();
eventCollector.handleEvent(Event.create(null, Type.JOB_KILLED));
synchronized(this) {
try {
- wait(1000);
+ wait(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -214,7 +214,7 @@ public class JobRunnerTest {
}
Assert.assertTrue(runner.getStatus() == node.getStatus());
- Assert.assertTrue(node.getStatus() == Status.KILLED);
+ Assert.assertTrue(node.getStatus() == Status.FAILED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
// Give it 10 ms to fail.
Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);