diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Status.java b/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
index c3f3a62..f3f9778 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
@@ -32,7 +32,7 @@ enum Status {
KILLED; // explicitly killed by a user
// The states that will not transition to other states
- private static final ImmutableSet TERMINAL_STATES = ImmutableSet.of(DISABLED, SUCCESS, FAILURE,
+ static final ImmutableSet<Status> TERMINAL_STATES = ImmutableSet.of(DISABLED, SUCCESS, FAILURE,
CANCELED, KILLED);
boolean isTerminal() {
@@ -40,14 +40,16 @@ enum Status {
}
// The states that are considered as success effectively
- private static final ImmutableSet EFFECTIVE_SUCCESS_STATES = ImmutableSet.of(DISABLED, SUCCESS);
+ private static final ImmutableSet<Status> EFFECTIVE_SUCCESS_STATES = ImmutableSet.of(DISABLED,
+ SUCCESS);
boolean isSuccessEffectively() {
return EFFECTIVE_SUCCESS_STATES.contains(this);
}
// The states that are possible before a node ever starts to run or be killed or canceled
- private static final ImmutableSet PRE_RUN_STATES = ImmutableSet.of(DISABLED, READY, BLOCKED);
+ private static final ImmutableSet<Status> PRE_RUN_STATES = ImmutableSet
+ .of(DISABLED, READY, BLOCKED);
boolean isPreRunState() {
return PRE_RUN_STATES.contains(this);
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
index 790c68a..54fb6f0 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
@@ -17,7 +17,10 @@
package azkaban.dag;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import org.junit.Test;
@@ -28,7 +31,8 @@ import org.junit.Test;
*/
public class DagTest {
- private final Dag testFlow = new Dag("fa", mock(DagProcessor.class));
+ private final DagProcessor mockDagProcessor = mock(DagProcessor.class);
+ private final Dag testFlow = new Dag("fa", this.mockDagProcessor);
@Test
public void dag_finish_with_only_disabled_nodes() {
@@ -49,6 +53,30 @@ public class DagTest {
assertThat(this.testFlow.getStatus()).isEqualTo(Status.KILLING);
}
+ @Test
+ public void kill_node_in_terminal_state_should_have_no_effect() {
+ for (final Status status : Status.TERMINAL_STATES) {
+ kill_dag_in_this_state_should_have_no_effect(status);
+ }
+ }
+
+ @Test
+ public void kill_node_in_killing_state_should_have_no_effect() {
+ kill_dag_in_this_state_should_have_no_effect(Status.KILLING);
+ }
+
+ private void kill_dag_in_this_state_should_have_no_effect(final Status status) {
+ // given
+ this.testFlow.setStatus(status);
+
+ // when
+ this.testFlow.kill();
+
+ // then
+ assertThat(this.testFlow.getStatus()).isEqualTo(status);
+ verify(this.mockDagProcessor, never()).changeStatus(any(), any());
+ }
+
/**
* Tests ready nodes are canceled when the dag is killed.
*/