azkaban-aplcache

New DAG engine initial checkin (#1755) Initial DAG engine

5/11/2018 5:42:37 PM

Details

diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 8749c19..ef672e1 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -1,9 +1,26 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 apply plugin: 'distribution'
 
 dependencies {
     compile(project(':az-core'))
     compile(project(':azkaban-common'))
 
+    compile deps.guava
     compile deps.jsr305
     compile deps.kafkaLog4jAppender
     runtime(project(':azkaban-hadoop-security-plugin'))
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
new file mode 100644
index 0000000..8029ee2
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A DAG (Directed acyclic graph) consists of {@link Node}s.
+ */
+class Dag {
+
+  private final String name;
+  private final DagProcessor dagProcessor;
+  private final List<Node> nodes = new ArrayList<>();
+  private Status status = Status.READY;
+
+  Dag(final String name, final DagProcessor dagProcessor) {
+    this.name = name;
+    requireNonNull(dagProcessor, "The dagProcessor parameter can't be null.");
+    this.dagProcessor = dagProcessor;
+  }
+
+  void addNode(final Node node) {
+    node.setDag(this);
+    this.nodes.add(node);
+  }
+
+  void start() {
+    assert (this.status == Status.READY);
+    changeStatus(Status.RUNNING);
+    for (final Node node : this.nodes) {
+      node.runIfAllowed();
+    }
+    // It's possible that all nodes are disabled. In this rare case the dag should be
+    // marked success. Otherwise it will be stuck in the the running state.
+    updateDagStatus();
+  }
+
+  void kill() {
+    if (this.status.isTerminal() || this.status == Status.KILLING) {
+      // It is possible that a kill is issued after a dag has finished or multiple kill requests
+      // are received. Without this check, this method will make duplicate calls to the
+      // DagProcessor.
+      return;
+    }
+    changeStatus(Status.KILLING);
+    for (final Node node : this.nodes) {
+      node.kill();
+    }
+    updateDagStatus();
+  }
+
+  /**
+   * Update the final dag status when all nodes are done.
+   *
+   * <p>If any node has not reached its terminal state, this method will simply return.
+   */
+  void updateDagStatus() {
+    // A dag may have nodes that are disabled. It's safer to scan all the nodes.
+    // Assume the overhead is minimal. If it is not the case, we can optimize later.
+    boolean failed = false;
+    for (final Node node : this.nodes) {
+      final Status nodeStatus = node.getStatus();
+      if (!nodeStatus.isTerminal()) {
+        return;
+      }
+      if (nodeStatus == Status.FAILURE) {
+        failed = true;
+      }
+    }
+
+    // Update the dag status only after all nodes have reached terminal states.
+    updateDagStatusInternal(failed);
+  }
+
+  /**
+   * Update the final dag status.
+   *
+   * @param failed true if any of the jobs has failed
+   */
+  private void updateDagStatusInternal(final boolean failed) {
+    if (this.status == Status.KILLING) {
+      /*
+      It's possible that some nodes have failed when the dag is killed.
+      Since killing a dag signals an intent from an operator, it is more important to make
+      the dag status reflect the result of that explict intent. e.g. if the killing is a
+      result of handing a job failure, users more likely want to know that someone has taken
+      an action rather than that a job has failed. Operators can still see the individual job
+      status.
+      */
+      changeStatus(Status.KILLED);
+    } else if (failed) {
+      changeStatus(Status.FAILURE);
+    } else {
+      changeStatus(Status.SUCCESS);
+    }
+  }
+
+  private void changeStatus(final Status status) {
+    this.status = status;
+    this.dagProcessor.changeStatus(this, status);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("dag (%s), status (%s)", this.name, this.status);
+  }
+
+  String getName() {
+    return this.name;
+  }
+
+  Status getStatus() {
+    return this.status;
+  }
+
+  @VisibleForTesting
+  void setStatus(final Status status) {
+    this.status = status;
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagProcessor.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagProcessor.java
new file mode 100644
index 0000000..1951344
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+public interface DagProcessor {
+
+  /**
+   * Changes the status of the dag.
+   *
+   * @param dag the dag to change
+   * @param status the new status
+   */
+  void changeStatus(Dag dag, Status status);
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
new file mode 100644
index 0000000..be2fda1
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread safe and non blocking service for DAG processing.
+ *
+ * <p>Allow external inputs to be given to a dag or node to allow the dag to transition states
+ * . Since only one thread is used to progress the DAG, thread synchronization is avoided.
+ */
+@SuppressWarnings("FutureReturnValueIgnored")
+@Singleton
+class DagService {
+
+  private static final long SHUTDOWN_WAIT_TIMEOUT = 60;
+  private static final Logger logger = LoggerFactory.getLogger(DagService.class);
+
+  private final ExecutorService executorService;
+
+  DagService() {
+    // Give the thread a name to make debugging easier.
+    final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("Dag-service").build();
+    this.executorService = Executors.newSingleThreadExecutor(namedThreadFactory);
+  }
+
+  void startDag(final Dag dag) {
+    this.executorService.submit(dag::start);
+  }
+
+  /**
+   * Transitions the node to the success state.
+   */
+  void markNodeSuccess(final Node node) {
+    this.executorService.submit(node::markSuccess);
+  }
+
+  /**
+   * Transitions the node from the killing state to the killed state.
+   */
+  void markNodeKilled(final Node node) {
+    this.executorService.submit(node::markKilled);
+  }
+
+  /**
+   * Transitions the node to the failure state.
+   */
+  void markNodeFailed(final Node node) {
+    this.executorService.submit(node::markFailed);
+  }
+
+  /**
+   * Kills a DAG.
+   */
+  void killDag(final Dag dag) {
+    this.executorService.submit(dag::kill);
+  }
+
+  /**
+   * Shuts down the service and waits for the tasks to finish.
+   *
+   * <p>Adopted from
+   * <a href="https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html">
+   *   the Oracle JAVA Documentation.
+   * </a>
+   */
+  void shutdownAndAwaitTermination() {
+    this.executorService.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!this.executorService.awaitTermination(SHUTDOWN_WAIT_TIMEOUT, TimeUnit.SECONDS)) {
+        this.executorService.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!this.executorService.awaitTermination(SHUTDOWN_WAIT_TIMEOUT, TimeUnit.SECONDS)) {
+          logger.error("The DagService did not terminate.");
+        }
+      }
+    } catch (final InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      this.executorService.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Node.java b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
new file mode 100644
index 0000000..b1a9f26
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
@@ -0,0 +1,198 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Node in a DAG: Directed acyclic graph.
+ */
+class Node {
+
+  private final String name;
+
+  private final NodeProcessor nodeProcessor;
+
+  // The nodes that this node depends on.
+  private final List<Node> parents = new ArrayList<>();
+
+  // The nodes that depend on this node.
+  private final List<Node> children = new ArrayList<>();
+
+  private Status status = Status.READY;
+
+  private Dag dag;
+
+  Node(final String name, final NodeProcessor nodeProcessor) {
+    this.name = name;
+    requireNonNull(nodeProcessor, "The nodeProcessor parameter can't be null.");
+    this.nodeProcessor = nodeProcessor;
+  }
+
+  public Dag getDag() {
+    return this.dag;
+  }
+
+  public void setDag(final Dag dag) {
+    this.dag = dag;
+  }
+
+  private void addParent(final Node node) {
+    this.parents.add(node);
+  }
+
+  void addChild(final Node node) {
+    this.children.add(node);
+    node.addParent(this);
+  }
+
+  void addChildren(final Node... nodes) {
+    for (final Node node : nodes) {
+      addChild(node);
+    }
+  }
+
+  boolean hasParent() {
+    return !this.parents.isEmpty();
+  }
+
+  /**
+   * Checks if the node is ready to run.
+   *
+   * @return true if the node is ready to run
+   */
+  private boolean isReady() {
+    if (this.status != Status.READY) {
+      // e.g. if the node is disabled, it is not ready to run.
+      return false;
+    }
+    for (final Node parent : this.parents) {
+      if (!parent.status.isSuccessEffectively()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Transitions the node to the success state.
+   */
+  void markSuccess() {
+    // It's possible that the dag is killed before this method is called.
+    assertRunningOrKilling();
+    changeStatus(Status.SUCCESS);
+    for (final Node child : this.children) {
+      child.runIfAllowed();
+    }
+    this.dag.updateDagStatus();
+  }
+
+  /**
+   * Checks if all the dependencies are met and run if they are.
+   */
+  void runIfAllowed() {
+    if (isReady()) {
+      changeStatus(Status.RUNNING);
+    }
+  }
+
+  /**
+   * Transitions the node to the failure state.
+   */
+  void markFailed() {
+    // It's possible that the dag is killed before this method is called.
+    assertRunningOrKilling();
+    changeStatus(Status.FAILURE);
+    for (final Node child : this.children) {
+      child.cancel();
+    }
+    //todo: HappyRay support failure options "Finish Current Running" and "Cancel All"
+    this.dag.updateDagStatus();
+  }
+
+  private void cancel() {
+    // The node shouldn't have started.
+    assert (this.status.isPreRunState());
+    if (this.status != Status.DISABLED) {
+      changeStatus(Status.CANCELED);
+    }
+    for (final Node node : this.children) {
+      node.cancel();
+    }
+  }
+
+  /**
+   * Asserts that the state is running or killing.
+   */
+  private void assertRunningOrKilling() {
+    assert (this.status == Status.RUNNING || this.status == Status.KILLING);
+  }
+
+  private void changeStatus(final Status status) {
+    this.status = status;
+    this.nodeProcessor.changeStatus(this, this.status);
+  }
+
+  /**
+   * Kills a node.
+   *
+   * <p>A node is not designed to be killed individually. This method expects {@link Dag#kill()}
+   * method to kill all nodes. Thus this method itself doesn't need to propagate the kill signal to
+   * the node's children nodes.
+   */
+  void kill() {
+    assert (this.dag.getStatus() == Status.KILLING);
+    if (this.status == Status.READY || this.status == Status.BLOCKED) {
+      // If the node is disabled, keep the status as disabled.
+      changeStatus(Status.CANCELED);
+    } else if (this.status == Status.RUNNING) {
+      changeStatus(Status.KILLING);
+    }
+    // If the node has finished, leave the status intact.
+  }
+
+  /**
+   * Transition the node from the killing state to the killed state.
+   */
+  void markKilled() {
+    assert (this.status == Status.KILLING);
+    changeStatus(Status.KILLED);
+    this.dag.updateDagStatus();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Node (%s) status (%s) in %s", this.name, this.status, this.dag);
+  }
+
+  Status getStatus() {
+    return this.status;
+  }
+
+  @VisibleForTesting
+  void setStatus(final Status status) {
+    this.status = status;
+  }
+
+  String getName() {
+    return this.name;
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/NodeProcessor.java b/azkaban-exec-server/src/main/java/azkaban/dag/NodeProcessor.java
new file mode 100644
index 0000000..423b0f7
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/NodeProcessor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+public interface NodeProcessor {
+
+  /**
+   * Changes the status of the node.
+   *
+   * <p>Typically a processor implementation should handle the RUNNING and KILLING status by
+   * starting or killing a unit of work and call the {@link DagService} to transition the node
+   * to the next status.
+   *
+   * <p>The call will be made in the context of the DagService's one and only thread. Thus a
+   * processor should limit the time it takes to process the call. For lengthy operations such as
+   * I/O operations, consider offloading them to other threads.
+   *
+   * @param node the node to change
+   * @param status the new status
+   */
+  void changeStatus(Node node, Status status);
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Status.java b/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
new file mode 100644
index 0000000..c3f3a62
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import com.google.common.collect.ImmutableSet;
+
+enum Status {
+  READY, // ready to run
+  DISABLED, // disabled by users. Treat as the node has the status of success
+  BLOCKED, // temporarily blocked. Need to be unblocked by another external event
+  RUNNING,
+  SUCCESS,
+  FAILURE,
+
+  // doesn't run because one of the nodes it depends on fails or is killed. Applies to a node only.
+  CANCELED,
+  KILLING, // in the process of killing a running job
+  KILLED; // explicitly killed by a user
+
+  // The states that will not transition to other states
+  private static final ImmutableSet TERMINAL_STATES = ImmutableSet.of(DISABLED, SUCCESS, FAILURE,
+      CANCELED, KILLED);
+
+  boolean isTerminal() {
+    return TERMINAL_STATES.contains(this);
+  }
+
+  // The states that are considered as success effectively
+  private static final ImmutableSet EFFECTIVE_SUCCESS_STATES = ImmutableSet.of(DISABLED, SUCCESS);
+
+  boolean isSuccessEffectively() {
+    return EFFECTIVE_SUCCESS_STATES.contains(this);
+  }
+
+  // The states that are possible before a node ever starts to run or be killed or canceled
+  private static final ImmutableSet PRE_RUN_STATES = ImmutableSet.of(DISABLED, READY, BLOCKED);
+
+  boolean isPreRunState() {
+    return PRE_RUN_STATES.contains(this);
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
new file mode 100644
index 0000000..ddcb068
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
@@ -0,0 +1,315 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javafx.util.Pair;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests {@link DagService}
+ *
+ * <p>Naming conventions: nodes are named with letters such as a, b. Dags are named with 'f'
+ * prefix. e.g. "fa". A sub DAG has the prefix "sf" such as "sfb". A sub DAG is a node within the
+ * parent DAG.
+ */
+public class DagServiceTest {
+
+  private final DagService dagService = new DagService();
+  private final StatusChangeRecorder statusChangeRecorder = new StatusChangeRecorder();
+  private final Set<Node> nodesToFail = new HashSet<>();
+  private final TestNodeProcessor nodeProcessor = new TestNodeProcessor(this.dagService,
+      this.statusChangeRecorder, this.nodesToFail);
+  private final CountDownLatch dagFinishedLatch = new CountDownLatch(1);
+  private final DagProcessor dagProcessor = new TestDagProcessor(this.dagFinishedLatch,
+      this.statusChangeRecorder);
+  private final Dag testDag = createDag();
+  private final List<Pair<String, Status>> expectedSequence = new ArrayList<>();
+
+
+  @After
+  public void tearDown() {
+    this.dagService.shutdownAndAwaitTermination();
+  }
+
+  /**
+   * Tests a DAG with one node which will run successfully.
+   */
+  @Test
+  public void oneNodeSuccess() throws Exception {
+    createNodeAndAddToTestDag("a");
+    addToExpectedSequence("fa", Status.RUNNING);
+    addToExpectedSequence("a", Status.RUNNING);
+    addToExpectedSequence("a", Status.SUCCESS);
+    addToExpectedSequence("fa", Status.SUCCESS);
+
+    runAndVerify();
+  }
+
+  /**
+   * Tests a DAG with two nodes which will run successfully.
+   * a
+   * |
+   * b
+   */
+  @Test
+  public void twoNodesSuccess() throws Exception {
+    final Node aNode = createNodeAndAddToTestDag("a");
+    final Node bNode = createNodeAndAddToTestDag("b");
+    aNode.addChild(bNode);
+    addToExpectedSequence("fa", Status.RUNNING);
+    addToExpectedSequence("a", Status.RUNNING);
+    addToExpectedSequence("a", Status.SUCCESS);
+    addToExpectedSequence("b", Status.RUNNING);
+    addToExpectedSequence("b", Status.SUCCESS);
+    addToExpectedSequence("fa", Status.SUCCESS);
+
+    runAndVerify();
+  }
+
+  /**
+   * Tests a DAG with three nodes which will run successfully.
+   * <pre>
+   *    a
+   *  /  \
+   * b    c
+   * </pre>
+   */
+  @Test
+  public void threeNodesSuccess() throws Exception {
+    final Node aNode = createNodeAndAddToTestDag("a");
+    final Node bNode = createNodeAndAddToTestDag("b");
+    final Node cNode = createNodeAndAddToTestDag("c");
+    aNode.addChildren(bNode, cNode);
+
+    addToExpectedSequence("fa", Status.RUNNING);
+    addToExpectedSequence("a", Status.RUNNING);
+    addToExpectedSequence("a", Status.SUCCESS);
+    addToExpectedSequence("b", Status.RUNNING);
+    addToExpectedSequence("c", Status.RUNNING);
+    addToExpectedSequence("b", Status.SUCCESS);
+    addToExpectedSequence("c", Status.SUCCESS);
+    addToExpectedSequence("fa", Status.SUCCESS);
+
+    runAndVerify();
+
+  }
+
+  /**
+   * Tests a DAG with one node which will fail.
+   */
+  @Test
+  public void oneNodeFailure() throws Exception {
+    final Node aNode = createNodeAndAddToTestDag("a");
+    this.nodesToFail.add(aNode);
+    addToExpectedSequence("fa", Status.RUNNING);
+    addToExpectedSequence("a", Status.RUNNING);
+    addToExpectedSequence("a", Status.FAILURE);
+    addToExpectedSequence("fa", Status.FAILURE);
+
+    runAndVerify();
+  }
+
+  /**
+   * Tests a DAG with two nodes, fails the first one.
+   *
+   * Expects the child node to be marked canceled.
+   *
+   * a (fail)
+   * |
+   * b
+   */
+  @Test
+  public void twoNodesFailFirst() throws Exception {
+    final Node aNode = createNodeAndAddToTestDag("a");
+    final Node bNode = createNodeAndAddToTestDag("b");
+    aNode.addChild(bNode);
+    this.nodesToFail.add(aNode);
+
+    addToExpectedSequence("fa", Status.RUNNING);
+    addToExpectedSequence("a", Status.RUNNING);
+    addToExpectedSequence("a", Status.FAILURE);
+    addToExpectedSequence("b", Status.CANCELED);
+    addToExpectedSequence("fa", Status.FAILURE);
+
+    runAndVerify();
+  }
+
+  /**
+   * Tests a DAG with three nodes with one failure.
+   *
+   * Expects the sibling nodes to finish.
+   *
+   * <pre>
+   *       a
+   *   /      \
+   * b (fail)    c
+   * </pre>
+   */
+  @Test
+  public void threeNodesFailSecond() throws Exception {
+    final Node aNode = createNodeAndAddToTestDag("a");
+    final Node bNode = createNodeAndAddToTestDag("b");
+    final Node cNode = createNodeAndAddToTestDag("c");
+    aNode.addChildren(bNode, cNode);
+
+    this.nodesToFail.add(bNode);
+
+    addToExpectedSequence("fa", Status.RUNNING);
+    addToExpectedSequence("a", Status.RUNNING);
+    addToExpectedSequence("a", Status.SUCCESS);
+    addToExpectedSequence("b", Status.RUNNING);
+    addToExpectedSequence("c", Status.RUNNING);
+    addToExpectedSequence("b", Status.FAILURE);
+    addToExpectedSequence("c", Status.SUCCESS);
+    addToExpectedSequence("fa", Status.FAILURE);
+
+    runAndVerify();
+
+  }
+
+  /**
+   * Tests a DAG with one subDag, all successful.
+   *
+   * <pre>
+   *   sfb
+   *   |
+   *   c
+   *
+   * subDag: fb
+   * a b
+   * </pre>
+   */
+  @Test
+  public void simple_subdag_success_case() throws Exception {
+    final TestSubDagProcessor testSubDagProcessor = new TestSubDagProcessor
+        (this.dagService, this.statusChangeRecorder);
+    final Dag bDag = new Dag("fb", testSubDagProcessor);
+    createNodeAndAddToDag("a", bDag);
+    createNodeAndAddToDag("b", bDag);
+
+    final TestSubDagNodeProcessor testSubDagNodeProcessor = new TestSubDagNodeProcessor
+        (this.dagService, this.statusChangeRecorder, bDag);
+    final Node subDagNode = new Node("sfb", testSubDagNodeProcessor);
+    testSubDagProcessor.setNode(subDagNode);
+    this.testDag.addNode(subDagNode);
+
+    final Node cNode = createNodeAndAddToTestDag("c");
+    subDagNode.addChild(cNode);
+
+    addToExpectedSequence("fa", Status.RUNNING);
+    addToExpectedSequence("sfb", Status.RUNNING);
+    addToExpectedSequence("fb", Status.RUNNING);
+    addToExpectedSequence("a", Status.RUNNING);
+    addToExpectedSequence("b", Status.RUNNING);
+    addToExpectedSequence("a", Status.SUCCESS);
+    addToExpectedSequence("b", Status.SUCCESS);
+    addToExpectedSequence("fb", Status.SUCCESS);
+    addToExpectedSequence("sfb", Status.SUCCESS);
+    addToExpectedSequence("c", Status.RUNNING);
+    addToExpectedSequence("c", Status.SUCCESS);
+    addToExpectedSequence("fa", Status.SUCCESS);
+
+    runAndVerify();
+
+  }
+
+  /**
+   * Tests killing a dag.
+   */
+  @Test
+  public void kill_a_node() throws Exception {
+    final CountDownLatch nodeRunningLatch = new CountDownLatch(1);
+    final TestKillNodeProcessor killNodeProcessor = new TestKillNodeProcessor(this.dagService,
+        this.statusChangeRecorder, nodeRunningLatch);
+    final Node aNode = new Node("a", killNodeProcessor);
+    this.testDag.addNode(aNode);
+
+    addToExpectedSequence("fa", Status.RUNNING);
+    addToExpectedSequence("a", Status.RUNNING);
+    addToExpectedSequence("fa", Status.KILLING);
+    addToExpectedSequence("a", Status.KILLING);
+    addToExpectedSequence("a", Status.KILLED);
+    addToExpectedSequence("fa", Status.KILLED);
+
+    this.dagService.startDag(this.testDag);
+
+    // Make sure the node is running before killing the DAG.
+    nodeRunningLatch.await(120, TimeUnit.SECONDS);
+    this.dagService.killDag(this.testDag);
+
+    final boolean isWaitSuccessful = this.dagFinishedLatch.await(120, TimeUnit.SECONDS);
+    // Make sure the dag finishes.
+    assertThat(isWaitSuccessful).isTrue();
+    verifyStatusSequence();
+  }
+
+
+  private void addToExpectedSequence(final String name, final Status status) {
+    this.expectedSequence.add(new Pair<>(name, status));
+  }
+
+  private void runDag() throws InterruptedException {
+    this.dagService.startDag(this.testDag);
+    final boolean isWaitSuccessful = this.dagFinishedLatch.await(2, TimeUnit.SECONDS);
+
+    // Make sure the dag finishes.
+    assertThat(isWaitSuccessful).isTrue();
+  }
+
+  private void verifyStatusSequence() {
+    this.statusChangeRecorder.verifySequence(this.expectedSequence);
+  }
+
+  private void runAndVerify() throws InterruptedException {
+    runDag();
+    verifyStatusSequence();
+  }
+
+  /**
+   * Creates a node and add to the test dag.
+   *
+   * @param name node name
+   * @return Node object
+   */
+  private Node createNode(final String name) {
+    return new Node(name, this.nodeProcessor);
+  }
+
+  private Node createNodeAndAddToDag(final String name, final Dag flow) {
+    final Node node = createNode(name);
+    flow.addNode(node);
+    return node;
+  }
+
+  private Node createNodeAndAddToTestDag(final String name) {
+    return createNodeAndAddToDag(name, this.testDag);
+  }
+
+  private Dag createDag() {
+    return new Dag("fa", this.dagProcessor);
+  }
+}
+
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
new file mode 100644
index 0000000..522beb2
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Test;
+
+/**
+ * Tests the dag state ( including its nodes' states) transitions.
+ *
+ * Focuses on how the dag state changes in response to one external request.
+ */
+public class DagTest {
+
+  private final Dag testFlow = new Dag("fa", mock(DagProcessor.class));
+
+  @Test
+  public void dag_finish_with_only_disabled_nodes() {
+    final Node aNode = createAndAddNode("a");
+    aNode.setStatus(Status.DISABLED);
+    this.testFlow.start();
+    assertThat(aNode.getStatus()).isEqualTo(Status.DISABLED);
+    assertThat(this.testFlow.getStatus()).isEqualTo(Status.SUCCESS);
+  }
+
+  @Test
+  public void running_nodes_can_be_killed() {
+    final Node aNode = createAndAddNode("a");
+    aNode.setStatus(Status.RUNNING);
+    this.testFlow.setStatus(Status.RUNNING);
+    this.testFlow.kill();
+    assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
+    assertThat(this.testFlow.getStatus()).isEqualTo(Status.KILLING);
+  }
+
+  /**
+   * Tests ready nodes are canceled when the dag is killed.
+   */
+  @Test
+  public void waiting_nodes_are_canceled_when_killed() {
+    final Node aNode = createAndAddNode("a");
+    aNode.setStatus(Status.RUNNING);
+    final Node bNode = createAndAddNode("b");
+    aNode.addChild(bNode);
+    this.testFlow.setStatus(Status.RUNNING);
+    this.testFlow.kill();
+    assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
+    assertThat(bNode.getStatus()).isEqualTo(Status.CANCELED);
+    assertThat(this.testFlow.getStatus()).isEqualTo(Status.KILLING);
+  }
+
+  /**
+   * Tests multiple ready nodes are canceled when the dag is killed.
+   * <pre>
+   *     a (running)
+   *    / \
+   *   b   c
+   *        \
+   *         d
+   * </pre>
+   */
+  @Test
+  public void multiple_waiting_nodes_are_canceled_when_killed() {
+    final Node aNode = createAndAddNode("a");
+    aNode.setStatus(Status.RUNNING);
+    final Node bNode = createAndAddNode("b");
+    aNode.addChild(bNode);
+    final Node cNode = createAndAddNode("c");
+    aNode.addChild(cNode);
+    final Node dNode = createAndAddNode("d");
+    cNode.addChild(dNode);
+
+    this.testFlow.setStatus(Status.RUNNING);
+    this.testFlow.kill();
+    assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
+    assertThat(bNode.getStatus()).isEqualTo(Status.CANCELED);
+    assertThat(dNode.getStatus()).isEqualTo(Status.CANCELED);
+    assertThat(dNode.getStatus()).isEqualTo(Status.CANCELED);
+    assertThat(this.testFlow.getStatus()).isEqualTo(Status.KILLING);
+  }
+
+  /**
+   * Tests blocked nodes are canceled when the dag is killed.
+   */
+  @Test
+  public void blocked_nodes_are_canceled_when_killed() {
+    final Node aNode = createAndAddNode("a");
+    aNode.setStatus(Status.RUNNING);
+    final Node bNode = createAndAddNode("b");
+    aNode.addChild(bNode);
+    bNode.setStatus(Status.BLOCKED);
+    this.testFlow.setStatus(Status.RUNNING);
+    this.testFlow.kill();
+    assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
+    assertThat(bNode.getStatus()).isEqualTo(Status.CANCELED);
+  }
+
+  /**
+   * Tests success nodes' states remain the same when the dag is killed.
+   * <pre>
+   *     a (success)
+   *    /
+   *   b (running)
+   * </pre>
+   */
+  @Test
+  public void success_node_state_remain_the_same_when_killed() {
+    final Node aNode = createAndAddNode("a");
+    aNode.setStatus(Status.SUCCESS);
+    final Node bNode = createAndAddNode("b");
+    bNode.setStatus(Status.RUNNING);
+    aNode.addChild(bNode);
+    this.testFlow.kill();
+    assertThat(aNode.getStatus()).isEqualTo(Status.SUCCESS);
+    assertThat(bNode.getStatus()).isEqualTo(Status.KILLING);
+  }
+
+  /**
+   * Tests failed nodes' states remain the same when the dag is killed.
+   * This can happen when running jobs are allowed to finish when a node fails.
+   *
+   * <pre>
+   *  a (running)   b (failure)
+   * </pre>
+   */
+  @Test
+  public void failed_node_state_remain_the_same_when_killed() {
+    final Node aNode = createAndAddNode("a");
+    aNode.setStatus(Status.RUNNING);
+    final Node bNode = createAndAddNode("b");
+    bNode.setStatus(Status.FAILURE);
+    this.testFlow.kill();
+    assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
+    assertThat(bNode.getStatus()).isEqualTo(Status.FAILURE);
+  }
+
+  /**
+   * Creates a node and add to the test dag.
+   *
+   * @param name node name
+   * @return Node object
+   */
+  private Node createAndAddNode(final String name) {
+    final Node node = TestUtil.createNodeWithNullProcessor(name);
+    this.testFlow.addNode(node);
+    return node;
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java
new file mode 100644
index 0000000..d871624
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class NodeTest {
+
+  @Test
+  public void hasParent() {
+    final Node node = TestUtil.createNodeWithNullProcessor("a");
+    final Node parentNode = TestUtil.createNodeWithNullProcessor("parent");
+    parentNode.addChild(node);
+    final boolean hasParent = node.hasParent();
+    assertThat(hasParent).isTrue();
+  }
+
+  @Test
+  public void hasParentNegative() {
+    final Node node = TestUtil.createNodeWithNullProcessor("a");
+    final boolean hasParent = node.hasParent();
+    assertThat(hasParent).isFalse();
+  }
+
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/StatusChangeRecorder.java b/azkaban-exec-server/src/test/java/azkaban/dag/StatusChangeRecorder.java
new file mode 100644
index 0000000..9890942
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/StatusChangeRecorder.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import javafx.util.Pair;
+
+/**
+ * Records the sequence of nodes and dag status change.
+ */
+class StatusChangeRecorder {
+
+  private final List<Pair<String, Status>> sequence = new ArrayList<>();
+
+  void recordNode(final Node node) {
+    this.sequence.add(new Pair<>(node.getName(), node.getStatus()));
+  }
+
+  void recordDag(final Dag dag) {
+    this.sequence.add(new Pair<>(dag.getName(), dag.getStatus()));
+  }
+
+  void verifySequence(final List<Pair<String, Status>> expectedSequence) {
+    assertThat(this.sequence).isEqualTo(expectedSequence);
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/StatusTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/StatusTest.java
new file mode 100644
index 0000000..68e76fa
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/StatusTest.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class StatusTest {
+
+  //todo HappyRay: add tests for other status
+  @Test
+  public void disabled_is_terminal() {
+    assertThat(Status.DISABLED.isTerminal()).isTrue();
+  }
+
+  @Test
+  public void running_is_not_terminal() {
+    assertThat(Status.RUNNING.isTerminal()).isFalse();
+  }
+
+  @Test
+  public void diabled_is_effectively_success() {
+    assertThat(Status.DISABLED.isSuccessEffectively()).isTrue();
+  }
+
+  @Test
+  public void failure_is_not_effectively_success() {
+    assertThat(Status.FAILURE.isSuccessEffectively()).isFalse();
+  }
+
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestDagProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestDagProcessor.java
new file mode 100644
index 0000000..79af3fe
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestDagProcessor.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import java.util.concurrent.CountDownLatch;
+
+public class TestDagProcessor implements DagProcessor {
+
+  private final StatusChangeRecorder statusChangeRecorder;
+  private final CountDownLatch dagFinishedLatch;
+
+
+  TestDagProcessor(final CountDownLatch dagFinishedLatch,
+      final StatusChangeRecorder statusChangeRecorder) {
+    this.dagFinishedLatch = dagFinishedLatch;
+    this.statusChangeRecorder = statusChangeRecorder;
+  }
+
+  @Override
+  public void changeStatus(final Dag dag, final Status status) {
+    System.out.println(dag);
+    this.statusChangeRecorder.recordDag(dag);
+    if (status.isTerminal()) {
+      this.dagFinishedLatch.countDown();
+    }
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestKillNodeProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestKillNodeProcessor.java
new file mode 100644
index 0000000..cbcd0f2
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestKillNodeProcessor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import java.util.concurrent.CountDownLatch;
+
+public class TestKillNodeProcessor implements NodeProcessor {
+
+  private final DagService dagService;
+  private final StatusChangeRecorder statusChangeRecorder;
+  private final CountDownLatch nodeRunningLatch;
+
+  /**
+   * A node processor that tests killing a node.
+   *
+   * @param nodeRunningLatch signal that the node has started running
+   */
+  TestKillNodeProcessor(final DagService dagService,
+      final StatusChangeRecorder statusChangeRecorder, final CountDownLatch nodeRunningLatch) {
+    this.dagService = dagService;
+    this.statusChangeRecorder = statusChangeRecorder;
+    this.nodeRunningLatch = nodeRunningLatch;
+  }
+
+  @Override
+  public void changeStatus(final Node node, final Status status) {
+    System.out.println(node);
+    this.statusChangeRecorder.recordNode(node);
+    switch (node.getStatus()) {
+      case RUNNING:
+        // Don't mark the job finished. Simulate a long running job.
+        this.nodeRunningLatch.countDown();
+        break;
+      case KILLING:
+        this.dagService.markNodeKilled(node);
+        break;
+      default:
+        break;
+    }
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java
new file mode 100644
index 0000000..f0b1751
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import java.util.Set;
+
+public class TestNodeProcessor implements NodeProcessor {
+
+  private final DagService dagService;
+  private final StatusChangeRecorder statusChangeRecorder;
+  private final Set<Node> nodesToFail;
+
+  TestNodeProcessor(final DagService dagService,
+      final StatusChangeRecorder statusChangeRecorder, final Set<Node> nodesToFail) {
+    this.dagService = dagService;
+    this.statusChangeRecorder = statusChangeRecorder;
+    this.nodesToFail = nodesToFail;
+  }
+
+  @Override
+  public void changeStatus(final Node node, final Status status) {
+    System.out.println(node);
+    this.statusChangeRecorder.recordNode(node);
+
+    switch (status) {
+      case RUNNING:
+        if (this.nodesToFail.contains(node)) {
+          this.dagService.markNodeFailed(node);
+        } else {
+          this.dagService.markNodeSuccess(node);
+        }
+        break;
+      default:
+        break;
+    }
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagNodeProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagNodeProcessor.java
new file mode 100644
index 0000000..288cec0
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagNodeProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+/**
+ * A NodeProcessor that bridges the sub DAG and the parent DAG.
+ */
+public class TestSubDagNodeProcessor implements NodeProcessor {
+
+  private final DagService dagService;
+  private final StatusChangeRecorder statusChangeRecorder;
+  private final Dag dag;
+
+
+  TestSubDagNodeProcessor(final DagService dagService,
+      final StatusChangeRecorder statusChangeRecorder,
+      final Dag dag
+  ) {
+    this.dagService = dagService;
+    this.statusChangeRecorder = statusChangeRecorder;
+    this.dag = dag;
+  }
+
+
+  /**
+   * Triggers the sub DAG state change when the sub DAG node in the parent DAG's status changes.
+   *
+   * @param node the node to change
+   * @param status the new status
+   */
+  @Override
+  public void changeStatus(final Node node, final Status status) {
+    System.out.println(node);
+    this.statusChangeRecorder.recordNode(node);
+
+    switch (status) {
+      case RUNNING:
+        this.dagService.startDag(this.dag);
+        break;
+      case KILLING:
+        this.dagService.killDag(this.dag);
+        break;
+      default:
+        break;
+    }
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagProcessor.java
new file mode 100644
index 0000000..0b7a018
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A DAG processor that brides the sub DAG and the parent DAG.
+ */
+public class TestSubDagProcessor implements DagProcessor {
+
+  private final DagService dagService;
+  private final StatusChangeRecorder statusChangeRecorder;
+  private Node node;
+
+
+  TestSubDagProcessor(final DagService dagService,
+      final StatusChangeRecorder statusChangeRecorder
+  ) {
+    this.dagService = dagService;
+    this.statusChangeRecorder = statusChangeRecorder;
+  }
+
+
+  /**
+   * Transfers the node state in the parent DAG when the sub DAG status changes.
+   *
+   * @param dag the dag to change
+   * @param status the new status
+   */
+  @Override
+  public void changeStatus(final Dag dag, final Status status) {
+    System.out.println(dag);
+    this.statusChangeRecorder.recordDag(dag);
+    requireNonNull(this.node, "Node for the subDag in the parent DAG can't be null.");
+    switch (status) {
+      case SUCCESS:
+        this.dagService.markNodeSuccess(this.node);
+        break;
+      case FAILURE:
+        this.dagService.markNodeFailed(this.node);
+        break;
+      default:
+        break;
+    }
+  }
+
+  /**
+   * Sets the node that this subflow belongs.
+   * <p>
+   * Can't pass this information in the constructor since it will cause a circular dependency
+   * problem.
+   *
+   * @param node the node as part of the parent flow
+   */
+  public void setNode(final Node node) {
+    this.node = node;
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java
new file mode 100644
index 0000000..29642c6
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.mockito.Mockito.mock;
+
+class TestUtil {
+
+  /**
+   * Creates a node with a processor that does nothing.
+   *
+   * @param name node name
+   */
+  static Node createNodeWithNullProcessor(final String name) {
+    return new Node(name, mock(NodeProcessor.class));
+  }
+}