diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index cfc0388..1efe1bb 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -91,7 +91,7 @@ public class JobRunner extends EventHandler implements Runnable {
private int jobLogBackupIndex;
private long delayStartMs = 0;
- private boolean killed = false;
+ private volatile boolean killed = false;
private BlockingStatus currentBlockStatus = null;
public JobRunner(final ExecutableNode node, final File workingDir, final ExecutorLoader loader,
@@ -394,32 +394,34 @@ public class JobRunner extends EventHandler implements Runnable {
* anything.
*/
private boolean handleNonReadyStatus() {
- Status nodeStatus = this.node.getStatus();
- boolean quickFinish = false;
- final long time = System.currentTimeMillis();
-
- if (Status.isStatusFinished(nodeStatus)) {
- quickFinish = true;
- } else if (nodeStatus == Status.DISABLED) {
- nodeStatus = changeStatus(Status.SKIPPED, time);
- quickFinish = true;
- } else if (this.isKilled()) {
- nodeStatus = changeStatus(Status.KILLED, time);
- quickFinish = true;
- }
-
- if (quickFinish) {
- this.node.setStartTime(time);
- fireEvent(
- Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus, this.node.getNestedId())));
- this.node.setEndTime(time);
- fireEvent(
- Event
- .create(this, Type.JOB_FINISHED, new EventData(nodeStatus, this.node.getNestedId())));
- return true;
- }
+ synchronized (this.syncObject) {
+ Status nodeStatus = this.node.getStatus();
+ boolean quickFinish = false;
+ final long time = System.currentTimeMillis();
+
+ if (Status.isStatusFinished(nodeStatus)) {
+ quickFinish = true;
+ } else if (nodeStatus == Status.DISABLED) {
+ nodeStatus = changeStatus(Status.SKIPPED, time);
+ quickFinish = true;
+ } else if (this.isKilled()) {
+ nodeStatus = changeStatus(Status.KILLED, time);
+ quickFinish = true;
+ }
- return false;
+ if (quickFinish) {
+ this.node.setStartTime(time);
+ fireEvent(
+ Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus, this.node.getNestedId())));
+ this.node.setEndTime(time);
+ fireEvent(
+ Event
+ .create(this, Type.JOB_FINISHED, new EventData(nodeStatus, this.node.getNestedId())));
+ return true;
+ }
+
+ return false;
+ }
}
/**
@@ -743,19 +745,21 @@ public class JobRunner extends EventHandler implements Runnable {
try {
this.job.run();
} catch (final Throwable e) {
- if (this.props.getBoolean("job.succeed.on.failure", false)) {
- finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
- logError("Job run failed, but will treat it like success.");
- logError(e.getMessage() + " cause: " + e.getCause(), e);
- } else {
- if (isKilled() || this.node.getStatus() == Status.KILLED) {
- finalStatus = Status.KILLED;
- logError("Job run killed!", e);
+ synchronized (this.syncObject) {
+ if (this.props.getBoolean("job.succeed.on.failure", false)) {
+ finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
+ logError("Job run failed, but will treat it like success.");
+ logError(e.getMessage() + " cause: " + e.getCause(), e);
} else {
- finalStatus = changeStatus(Status.FAILED);
- logError("Job run failed!", e);
+ if (isKilled() || this.node.getStatus() == Status.KILLED) {
+ finalStatus = Status.KILLED;
+ logError("Job run killed!", e);
+ } else {
+ finalStatus = changeStatus(Status.FAILED);
+ logError("Job run failed!", e);
+ }
+ logError(e.getMessage() + " cause: " + e.getCause());
}
- logError(e.getMessage() + " cause: " + e.getCause());
}
}
@@ -763,9 +767,11 @@ public class JobRunner extends EventHandler implements Runnable {
this.node.setOutputProps(this.job.getJobGeneratedProperties());
}
- // If the job is still running, set the status to Success.
- if (!Status.isStatusFinished(finalStatus)) {
- finalStatus = changeStatus(Status.SUCCEEDED);
+ synchronized (this.syncObject) {
+ // If the job is still running, set the status to Success.
+ if (!Status.isStatusFinished(finalStatus) && !isKilled()) {
+ finalStatus = changeStatus(Status.SUCCEEDED);
+ }
}
return finalStatus;
}