azkaban-aplcache

New status: KILLING (#1172)

6/28/2017 2:26:53 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3efc7c9..08d0787 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -1479,7 +1479,7 @@ public class ExecutorManager extends EventHandler implements
         continue;
         // case UNKNOWN:
       case READY:
-        node.setStatus(Status.KILLED);
+        node.setStatus(Status.KILLING);
         break;
       default:
         node.setStatus(Status.FAILED);
diff --git a/azkaban-common/src/main/java/azkaban/executor/Status.java b/azkaban-common/src/main/java/azkaban/executor/Status.java
index 26ae3a5..6f2d409 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Status.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Status.java
@@ -16,12 +16,16 @@
 
 package azkaban.executor;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+
 public enum Status {
   READY(10),
   PREPARING(20),
   RUNNING(30),
   PAUSED(40),
   SUCCEEDED(50),
+  KILLING(55),
   KILLED(60),
   FAILED(70),
   FAILED_FINISHING(80),
@@ -30,6 +34,11 @@ public enum Status {
   QUEUED(110),
   FAILED_SUCCEEDED(120),
   CANCELLED(130);
+  // status is TINYINT and in H2 DB the possible values are: -128 to 127
+  // so trying to store CANCELLED in H2 fails at the moment
+
+  private static final ImmutableMap<Integer, Status> numValMap = Arrays.stream(Status.values())
+      .collect(ImmutableMap.toImmutableMap(status -> status.getNumVal(), status -> status));
 
   private final int numVal;
 
@@ -38,36 +47,7 @@ public enum Status {
   }
 
   public static Status fromInteger(final int x) {
-    switch (x) {
-      case 10:
-        return READY;
-      case 20:
-        return PREPARING;
-      case 30:
-        return RUNNING;
-      case 40:
-        return PAUSED;
-      case 50:
-        return SUCCEEDED;
-      case 60:
-        return KILLED;
-      case 70:
-        return FAILED;
-      case 80:
-        return FAILED_FINISHING;
-      case 90:
-        return SKIPPED;
-      case 100:
-        return DISABLED;
-      case 110:
-        return QUEUED;
-      case 120:
-        return FAILED_SUCCEEDED;
-      case 130:
-        return CANCELLED;
-      default:
-        return READY;
-    }
+    return numValMap.getOrDefault(x, READY);
   }
 
   public static boolean isStatusFinished(final Status status) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/WebUtils.java b/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
index 0fe3ae1..e0f7ea7 100644
--- a/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/WebUtils.java
@@ -97,6 +97,8 @@ public class WebUtils {
         return "Paused";
       case SKIPPED:
         return "Skipped";
+      case KILLING:
+        return "Killing";
       default:
     }
     return "Unknown";
diff --git a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
index 6aad32f..a9323cc 100644
--- a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
@@ -33,6 +33,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
   private Props generatedProperties = new Props();
   private boolean isWaiting = true;
   private boolean succeed = true;
+  private boolean ignoreCancel = false;
 
   public InteractiveTestJob(final String jobId, final Props sysProps, final Props jobProps,
       final Logger log) {
@@ -62,6 +63,12 @@ public class InteractiveTestJob extends AbstractProcessJob {
     testJobs.clear();
   }
 
+  public static void clearTestJobs(final String... names) {
+    for (final String name : names) {
+      assertNotNull(testJobs.remove(name));
+    }
+  }
+
   @Override
   public void run() throws Exception {
     final String nestedFlowPath =
@@ -137,6 +144,12 @@ public class InteractiveTestJob extends AbstractProcessJob {
     }
   }
 
+  public void ignoreCancel() {
+    synchronized (this) {
+      this.ignoreCancel = true;
+    }
+  }
+
   @Override
   public Props getJobGeneratedProperties() {
     return this.generatedProperties;
@@ -145,12 +158,8 @@ public class InteractiveTestJob extends AbstractProcessJob {
   @Override
   public void cancel() throws InterruptedException {
     info("Killing job");
-    failJob();
-  }
-
-  public static void clearTestJobs(final String... names) {
-    for (String name : names) {
-      assertNotNull(testJobs.remove(name));
+    if (!this.ignoreCancel) {
+      failJob();
     }
   }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index dfa00c8..0083980 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -112,13 +112,13 @@ public class FlowRunner extends EventHandler implements Runnable {
   private String jobLogFileSize = "5MB";
   private int jobLogNumFiles = 4;
 
-  private boolean flowPaused = false;
-  private boolean flowFailed = false;
-  private boolean flowFinished = false;
-  private boolean flowKilled = false;
+  private volatile boolean flowPaused = false;
+  private volatile boolean flowFailed = false;
+  private volatile boolean flowFinished = false;
+  private volatile boolean flowKilled = false;
 
   // The following is state that will trigger a retry of all failed jobs
-  private boolean retryFailedJobs = false;
+  private volatile boolean retryFailedJobs = false;
 
   /**
    * Constructor. This will create its own ExecutorService for thread pools
@@ -462,8 +462,7 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Instant kill or skip if necessary.
     boolean jobsRun = false;
     for (final ExecutableNode node : nodesToCheck) {
-      if (Status.isStatusFinished(node.getStatus())
-          || Status.isStatusRunning(node.getStatus())) {
+      if (notReadyToRun(node.getStatus())) {
         // Really shouldn't get in here.
         continue;
       }
@@ -479,6 +478,12 @@ public class FlowRunner extends EventHandler implements Runnable {
     return false;
   }
 
+  private boolean notReadyToRun(final Status status) {
+    return Status.isStatusFinished(status)
+        || Status.isStatusRunning(status)
+        || Status.KILLING == status;
+  }
+
   private boolean runReadyJob(final ExecutableNode node) throws IOException {
     if (Status.isStatusFinished(node.getStatus())
         || Status.isStatusRunning(node.getStatus())) {
@@ -541,7 +546,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   }
 
   private void propagateStatus(final ExecutableFlowBase base, final Status status) {
-    if (!Status.isStatusFinished(base.getStatus())) {
+    if (!Status.isStatusFinished(base.getStatus()) && base.getStatus() != Status.KILLING) {
       this.logger.info("Setting " + base.getNestedId() + " to " + status);
       base.setStatus(status);
       if (base.getParentFlow() != null) {
@@ -568,6 +573,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       final ExecutableNode node = flow.getExecutableNode(end);
 
       if (node.getStatus() == Status.KILLED
+          || node.getStatus() == Status.KILLING
           || node.getStatus() == Status.FAILED
           || node.getStatus() == Status.CANCELLED) {
         succeeded = false;
@@ -595,6 +601,11 @@ public class FlowRunner extends EventHandler implements Runnable {
             + durationSec + " seconds");
         flow.setStatus(Status.FAILED);
         break;
+      case KILLING:
+        this.logger
+            .info("Setting flow '" + id + "' status to KILLED in " + durationSec + " seconds");
+        flow.setStatus(Status.KILLED);
+        break;
       case FAILED:
       case KILLED:
       case CANCELLED:
@@ -863,7 +874,7 @@ public class FlowRunner extends EventHandler implements Runnable {
         if (this.flowFailed) {
           this.flow.setStatus(Status.FAILED_FINISHING);
         } else if (this.flowKilled) {
-          this.flow.setStatus(Status.KILLED);
+          this.flow.setStatus(Status.KILLING);
         } else {
           this.flow.setStatus(Status.RUNNING);
         }
@@ -886,7 +897,7 @@ public class FlowRunner extends EventHandler implements Runnable {
         return;
       }
       this.logger.info("Kill has been called on flow " + this.execId);
-      this.flow.setStatus(Status.KILLED);
+      this.flow.setStatus(Status.KILLING);
       // If the flow is paused, then we'll also unpause
       this.flowPaused = false;
       this.flowKilled = true;
@@ -937,6 +948,8 @@ public class FlowRunner extends EventHandler implements Runnable {
         continue;
       } else if (node.getStatus() == Status.RUNNING) {
         continue;
+      } else if (node.getStatus() == Status.KILLING) {
+        continue;
       } else if (node.getStatus() == Status.SKIPPED) {
         node.setStatus(Status.DISABLED);
         node.setEndTime(-1);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
index 7df22ed..e590678 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/JobRunner.java
@@ -92,7 +92,7 @@ public class JobRunner extends EventHandler implements Runnable {
   private int jobLogBackupIndex;
 
   private long delayStartMs = 0;
-  private boolean killed = false;
+  private volatile boolean killed = false;
   private BlockingStatus currentBlockStatus = null;
 
   public JobRunner(final ExecutableNode node, final File workingDir, final ExecutorLoader loader,
@@ -345,7 +345,7 @@ public class JobRunner extends EventHandler implements Runnable {
         this.azkabanProps
             .getString(Constants.ConfigurationKeys.AZKABAN_SERVER_LOGGING_KAFKA_TOPIC));
 
-    final JSONObject layout = LogUtil.createLogPatternLayoutJsonObject(props, jobId);
+    final JSONObject layout = LogUtil.createLogPatternLayoutJsonObject(this.props, this.jobId);
 
     kafkaProducer.setLayout(new PatternLayoutEscaped(layout.toString()));
     kafkaProducer.activateOptions();
@@ -401,7 +401,7 @@ public class JobRunner extends EventHandler implements Runnable {
       nodeStatus = changeStatus(Status.SKIPPED, time);
       quickFinish = true;
     } else if (this.isKilled()) {
-      nodeStatus = changeStatus(Status.KILLED, time);
+      nodeStatus = changeStatus(Status.KILLING, time);
       quickFinish = true;
     }
 
@@ -737,9 +737,10 @@ public class JobRunner extends EventHandler implements Runnable {
   }
 
   private Status runJob() {
-    Status finalStatus = this.node.getStatus();
+    Status finalStatus;
     try {
       this.job.run();
+      finalStatus = this.node.getStatus();
     } catch (final Throwable e) {
       if (this.props.getBoolean("job.succeed.on.failure", false)) {
         finalStatus = changeStatus(Status.FAILED_SUCCEEDED);
@@ -761,8 +762,8 @@ public class JobRunner extends EventHandler implements Runnable {
       this.node.setOutputProps(this.job.getJobGeneratedProperties());
     }
 
-    // If the job is still running, set the status to Success.
-    if (!Status.isStatusFinished(finalStatus)) {
+    // If the job is still running (but not killed), set the status to Success.
+    if (!Status.isStatusFinished(finalStatus) && finalStatus != Status.KILLING) {
       finalStatus = changeStatus(Status.SUCCEEDED);
     }
     return finalStatus;
@@ -796,6 +797,7 @@ public class JobRunner extends EventHandler implements Runnable {
         return;
       }
       logError("Kill has been called.");
+      this.changeStatus(Status.KILLING);
       this.killed = true;
 
       final BlockingStatus status = this.currentBlockStatus;
@@ -821,7 +823,6 @@ public class JobRunner extends EventHandler implements Runnable {
             "Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
       }
 
-      this.changeStatus(Status.KILLED);
     }
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index 2041e78..9ef446c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -263,7 +263,12 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job2", Status.SUCCEEDED);
     waitJobsStarted(this.runner, "job3", "job4", "job6");
 
+    InteractiveTestJob.getTestJob("job3").ignoreCancel();
     this.runner.kill("me");
+    assertStatus("job3", Status.KILLING);
+    assertFlowStatus(Status.KILLING);
+    InteractiveTestJob.getTestJob("job3").failJob();
+
     Assert.assertTrue(this.runner.isKilled());
 
     assertStatus("job5", Status.CANCELLED);
diff --git a/azkaban-web-server/src/main/less/azkaban-graph.less b/azkaban-web-server/src/main/less/azkaban-graph.less
index ce93658..bcb3817 100644
--- a/azkaban-web-server/src/main/less/azkaban-graph.less
+++ b/azkaban-web-server/src/main/less/azkaban-graph.less
@@ -94,6 +94,15 @@
   fill: #FFF;
 }
 
+.KILLING > g > rect {
+  fill: #FF9999;
+  stroke: #FF9999;
+}
+
+.KILLING > g > text {
+  fill: #FFF;
+}
+
 .CANCELLED > g > rect {
   fill: #FF9999;
   stroke: #FF9999;
diff --git a/azkaban-web-server/src/main/less/flow.less b/azkaban-web-server/src/main/less/flow.less
index 01e7386..1273dce 100644
--- a/azkaban-web-server/src/main/less/flow.less
+++ b/azkaban-web-server/src/main/less/flow.less
@@ -65,6 +65,18 @@
     background-color: @flow-killed-color;
   }
 
+  // #ff9999 = killing vs. #3398cc = running
+  &.KILLING {
+    background-color: @flow-killing-color;
+    background-image: -webkit-gradient(linear, 0 100%, 100% 0, color-stop(0.25, rgba(255, 255, 255, 0.15)), color-stop(0.25, transparent), color-stop(0.5, transparent), color-stop(0.5, rgba(255, 255, 255, 0.15)), color-stop(0.75, rgba(255, 255, 255, 0.15)), color-stop(0.75, transparent), to(transparent));
+    background-image: -webkit-linear-gradient(45deg, rgba(255, 255, 255, 0.15) 25%, transparent 25%, transparent 50%, rgba(255, 255, 255, 0.15) 50%, rgba(255, 255, 255, 0.15) 75%, transparent 75%, transparent);
+    background-image: -moz-linear-gradient(45deg, rgba(255, 255, 255, 0.15) 25%, transparent 25%, transparent 50%, rgba(255, 255, 255, 0.15) 50%, rgba(255, 255, 255, 0.15) 75%, transparent 75%, transparent);
+    background-image: linear-gradient(45deg, rgba(255, 255, 255, 0.15) 25%, transparent 25%, transparent 50%, rgba(255, 255, 255, 0.15) 50%, rgba(255, 255, 255, 0.15) 75%, transparent 75%, transparent);
+    background-size: 40px 40px;
+    -webkit-animation: progress-bar-stripes 2s linear infinite;
+            animation: progress-bar-stripes 2s linear infinite;
+  }
+
   &.RUNNING {
     background-color: @flow-running-color;
     background-image: -webkit-gradient(linear, 0 100%, 100% 0, color-stop(0.25, rgba(255, 255, 255, 0.15)), color-stop(0.25, transparent), color-stop(0.5, transparent), color-stop(0.5, rgba(255, 255, 255, 0.15)), color-stop(0.75, rgba(255, 255, 255, 0.15)), color-stop(0.75, transparent), to(transparent));
@@ -115,6 +127,10 @@ td {
       background-color: @flow-killed-color;
     }
 
+    &.KILLING {
+      background-color: @flow-killing-color;
+    }
+
     &.PAUSED {
       background-color: @flow-paused-color;
     }
@@ -169,6 +185,10 @@ td {
     color: @flow-killed-color;
   }
 
+  &.KILLING {
+    color: @flow-killing-color;
+  }
+
   &.CANCELLED {
     color: @flow-cancelled-color;
   }
@@ -310,6 +330,10 @@ li.tree-list-item {
       background-position: 0px 0px;
     }
 
+    &.KILLING .icon {
+      background-position: 0px 0px;
+    }
+
     &.CANCELLED .icon {
       background-position: 0px 0px;
       opacity: 0.5;
diff --git a/azkaban-web-server/src/main/less/variables.less b/azkaban-web-server/src/main/less/variables.less
index 078d2b2..226ada2 100644
--- a/azkaban-web-server/src/main/less/variables.less
+++ b/azkaban-web-server/src/main/less/variables.less
@@ -3,6 +3,7 @@
 @flow-succeeded-color: #5cb85c;
 @flow-failed-color: #d9534f;
 @flow-killed-color: #d9534f;
+@flow-killing-color: #ff9999;
 @flow-paused-color: #c82123;
 @flow-running-color: #3398cc;
 @flow-failed-finishing-color: #f19153;
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/historypage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/historypage.vm
index b18f52e..58c407f 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/historypage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/historypage.vm
@@ -197,6 +197,7 @@
                       <option value=30>Running</option>
                       <option value=40>Paused</option>
                       <option value=50>Succeed</option>
+                      <option value=55>Killing</option>
                       <option value=60>Killed</option>
                       <option value=70>Failed</option>
                       <option value=80>Failed Finishing</option>
diff --git a/azkaban-web-server/src/web/js/azkaban/util/job-status.js b/azkaban-web-server/src/web/js/azkaban/util/job-status.js
index fd61693..a5f17b6 100644
--- a/azkaban-web-server/src/web/js/azkaban/util/job-status.js
+++ b/azkaban-web-server/src/web/js/azkaban/util/job-status.js
@@ -24,6 +24,7 @@ var statusStringMap = {
   "FAILED_FINISHING": "Running w/Failure",
   "RUNNING": "Running",
   "WAITING": "Waiting",
+  "KILLING": "Killing",
   "KILLED": "Killed",
   "CANCELLED": "Cancelled",
   "DISABLED": "Disabled",
diff --git a/azkaban-web-server/src/web/js/azkaban/view/exflow.js b/azkaban-web-server/src/web/js/azkaban/view/exflow.js
index 026a946..0920b55 100644
--- a/azkaban-web-server/src/web/js/azkaban/view/exflow.js
+++ b/azkaban-web-server/src/web/js/azkaban/view/exflow.js
@@ -205,6 +205,8 @@ azkaban.FlowTabView = Backbone.View.extend({
 		else if (data.status == "KILLED") {
 			$("#executebtn").show();
 		}
+		else if (data.status == "KILLING") {
+		}
 	},
 
 	handleCancelClick: function(evt) {
@@ -447,6 +449,10 @@ var updaterFunction = function() {
 			// 2 min updates
 			setTimeout(function() {updaterFunction();}, 2*60*1000);
 		}
+		else if (data.status == "KILLING") {
+			// 30 s updates - should finish soon now
+			setTimeout(function() {updaterFunction();}, 30*1000);
+		}
 		else if (data.status != "SUCCEEDED" && data.status != "FAILED") {
 			// 2 min updates
 			setTimeout(function() {updaterFunction();}, 2*60*1000);
diff --git a/azkaban-web-server/src/web/js/azkaban/view/time-graph.js b/azkaban-web-server/src/web/js/azkaban/view/time-graph.js
index a74a49c..9c43560 100644
--- a/azkaban-web-server/src/web/js/azkaban/view/time-graph.js
+++ b/azkaban-web-server/src/web/js/azkaban/view/time-graph.js
@@ -93,6 +93,9 @@ azkaban.TimeGraphView = Backbone.View.extend({
       else if (status == 'PAUSED') {
         return '#c92123';
       }
+      else if (status == 'KILLING') {
+        return '#ff9999';
+      }
       else if (status == 'FAILED' ||
           status == 'FAILED_FINISHING' ||
           status == 'KILLED') {