azkaban-aplcache

Set flow status to killed when killed by violating job-level

8/29/2017 7:09:17 PM

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index ef460ff..4c8139e 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -427,7 +427,8 @@ public class FlowRunner extends EventHandler implements Runnable {
         // The job cannot be retried or has run out of retry attempts. We will
         // fail the job and its flow now.
         if (!retryJobIfPossible(node)) {
-          propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
+          propagateStatus(node.getParentFlow(),
+              node.getStatus() == Status.KILLED ? Status.KILLED : Status.FAILED_FINISHING);
           if (this.failureAction == FailureAction.CANCEL_ALL) {
             this.kill();
           }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 00d3a09..6d1f35a 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -1035,6 +1035,31 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
   }
 
   /**
+   * Tests the case when a job is killed by SLA causing a flow to fail. The flow should be in
+   * "killed" status.
+   */
+  @Test
+  public void testFlowKilledByJobLevelSLA() throws Exception {
+    final EventCollectorListener eventCollector = new EventCollectorListener();
+    this.runner = createFlowRunner(eventCollector,
+        FailureAction.CANCEL_ALL);
+
+    runFlowRunnerInThread(this.runner);
+    assertStatus("joba", Status.RUNNING);
+    assertStatus("joba1", Status.RUNNING);
+
+    for (final JobRunner jobRunner : this.runner.getActiveJobRunners()) {
+      if (jobRunner.getJobId().equals("joba")) {
+        jobRunner.killBySLA();
+        break;
+      }
+    }
+
+    assertFlowStatus(Status.KILLED);
+    assertThreadShutDown();
+  }
+
+  /**
    * Tests the case when a flow is paused and a failure causes a kill. The flow should die
    * immediately regardless of the 'paused' status.
    */