azkaban-aplcache

Fix race condition between Job#run & Job#kill (#1330) There's

8/18/2017 5:07:23 PM

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index cfc0388..1efe1bb 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -91,7 +91,7 @@ public class JobRunner extends EventHandler implements Runnable {
   private int jobLogBackupIndex;
 
   private long delayStartMs = 0;
-  private boolean killed = false;
+  private volatile boolean killed = false;
   private BlockingStatus currentBlockStatus = null;
 
   public JobRunner(final ExecutableNode node, final File workingDir, final ExecutorLoader loader,
@@ -394,32 +394,34 @@ public class JobRunner extends EventHandler implements Runnable {
    * anything.
    */
   private boolean handleNonReadyStatus() {
-    Status nodeStatus = this.node.getStatus();
-    boolean quickFinish = false;
-    final long time = System.currentTimeMillis();
-
-    if (Status.isStatusFinished(nodeStatus)) {
-      quickFinish = true;
-    } else if (nodeStatus == Status.DISABLED) {
-      nodeStatus = changeStatus(Status.SKIPPED, time);
-      quickFinish = true;
-    } else if (this.isKilled()) {
-      nodeStatus = changeStatus(Status.KILLED, time);
-      quickFinish = true;
-    }
-
-    if (quickFinish) {
-      this.node.setStartTime(time);
-      fireEvent(
-          Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus, this.node.getNestedId())));
-      this.node.setEndTime(time);
-      fireEvent(
-          Event
-              .create(this, Type.JOB_FINISHED, new EventData(nodeStatus, this.node.getNestedId())));
-      return true;
-    }
+    synchronized (this.syncObject) {
+      Status nodeStatus = this.node.getStatus();
+      boolean quickFinish = false;
+      final long time = System.currentTimeMillis();
+
+      if (Status.isStatusFinished(nodeStatus)) {
+        quickFinish = true;
+      } else if (nodeStatus == Status.DISABLED) {
+        nodeStatus = changeStatus(Status.SKIPPED, time);
+        quickFinish = true;
+      } else if (this.isKilled()) {
+        nodeStatus = changeStatus(Status.KILLED, time);
+        quickFinish = true;
+      }
 
-    return false;
+      if (quickFinish) {
+        this.node.setStartTime(time);
+        fireEvent(
+            Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus, this.node.getNestedId())));
+        this.node.setEndTime(time);
+        fireEvent(
+            Event
+                .create(this, Type.JOB_FINISHED, new EventData(nodeStatus, this.node.getNestedId())));
+        return true;
+      }
+
+      return false;
+    }
   }
 
   /**
@@ -743,19 +745,21 @@ public class JobRunner extends EventHandler implements Runnable {
     try {
       this.job.run();
     } catch (final Throwable e) {
-      if (this.props.getBoolean("job.succeed.on.failure", false)) {
-        finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
-        logError("Job run failed, but will treat it like success.");
-        logError(e.getMessage() + " cause: " + e.getCause(), e);
-      } else {
-        if (isKilled() || this.node.getStatus() == Status.KILLED) {
-          finalStatus = Status.KILLED;
-          logError("Job run killed!", e);
+      synchronized (this.syncObject) {
+        if (this.props.getBoolean("job.succeed.on.failure", false)) {
+          finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
+          logError("Job run failed, but will treat it like success.");
+          logError(e.getMessage() + " cause: " + e.getCause(), e);
         } else {
-          finalStatus = changeStatus(Status.FAILED);
-          logError("Job run failed!", e);
+          if (isKilled() || this.node.getStatus() == Status.KILLED) {
+            finalStatus = Status.KILLED;
+            logError("Job run killed!", e);
+          } else {
+            finalStatus = changeStatus(Status.FAILED);
+            logError("Job run failed!", e);
+          }
+          logError(e.getMessage() + " cause: " + e.getCause());
         }
-        logError(e.getMessage() + " cause: " + e.getCause());
       }
     }
 
@@ -763,9 +767,11 @@ public class JobRunner extends EventHandler implements Runnable {
       this.node.setOutputProps(this.job.getJobGeneratedProperties());
     }
 
-    // If the job is still running, set the status to Success.
-    if (!Status.isStatusFinished(finalStatus)) {
-      finalStatus = changeStatus(Status.SUCCEEDED);
+    synchronized (this.syncObject) {
+      // If the job is still running, set the status to Success.
+      if (!Status.isStatusFinished(finalStatus) && !isKilled()) {
+        finalStatus = changeStatus(Status.SUCCEEDED);
+      }
     }
     return finalStatus;
   }