azkaban-aplcache

Add a test to run a v2 flow file with the new DAG engine (#1840) *

7/9/2018 7:14:20 PM
3.49.0

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
index de4d7aa..b73ab6a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
@@ -28,7 +28,7 @@ import java.util.List;
  * <p>Most of the methods in this class should remain package private. Code outside of this
  * package should mainly interact with the {@link DagService}.
  */
-class Dag {
+public class Dag {
 
   private final String name;
   private final DagProcessor dagProcessor;
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java
index ccef79c..8f2f643 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java
@@ -35,40 +35,84 @@ import java.util.Set;
  */
 public class DagBuilder {
 
-  private final String name;
-  private final DagProcessor dagProcessor;
+  private final Dag dag;
+  private final Map<String, Node> nameToNodeMap = new HashMap<>();
 
-  private final List<NodeBuilder> builders = new ArrayList<>();
-  private final Set<String> nodeNamesSet = new HashSet<>();
+  // The builder can only be used to build a DAG once to prevent modifying an existing DAG after it
+  // is built.
+  private boolean isBuilt = false;
 
+  /**
+   * A builder for building a DAG.
+   *
+   * @param name name of the DAG
+   * @param dagProcessor the associated DagProcessor
+   */
   public DagBuilder(final String name, final DagProcessor dagProcessor) {
     requireNonNull(name, "The name of the DagBuilder can't be null");
-    this.name = name;
-    requireNonNull(name, "The dagProcessor of the DagBuilder can't be null");
-    this.dagProcessor = dagProcessor;
+    requireNonNull(dagProcessor, "The dagProcessor of the DagBuilder can't be null");
+    this.dag = new Dag(name, dagProcessor);
   }
 
   /**
-   * Creates a new node builder and add it to the DagBuilder.
+   * Creates a new node and adds it to the DagBuilder.
    *
    * @param name name of the node
    * @param nodeProcessor node processor associated with this node
-   * @return a new NodeBuilder associated with this node
+   * @return a new node
    * @throws DagException if the name is not unique in the DAG.
    */
-  public NodeBuilder createNode(final String name, final NodeProcessor nodeProcessor) {
-    if (this.nodeNamesSet.contains(name)) {
+  public Node createNode(final String name, final NodeProcessor nodeProcessor) {
+    checkIsBuilt();
+
+    if (this.nameToNodeMap.get(name) != null) {
       throw new DagException(String.format("Node names in %s need to be unique. The name "
           + "(%s) already exists.", this, name));
     }
-    final NodeBuilder builder = new NodeBuilder(name, nodeProcessor, this);
-    this.builders.add(builder);
-    this.nodeNamesSet.add(name);
+    final Node node = new Node(name, nodeProcessor, this.dag);
+    this.nameToNodeMap.put(name, node);
 
-    return builder;
+    return node;
+  }
+
+  /**
+   * Throws an exception if the {@link DagBuilder#build()} method has been called.
+   */
+  private void checkIsBuilt() {
+    if (this.isBuilt) {
+      final String msg = String
+          .format("The DAG (%s) is built already. Can't create new nodes.", this);
+      throw new DagException(msg);
+    }
   }
 
   /**
+   * Add a parent node to a child node. All the names should have been registered with this builder
+   * with the {@link DagBuilder#createNode(String, NodeProcessor)} call.
+   *
+   * @param childNodeName name of the child node
+   * @param parentNodeName name of the parent node
+   */
+  public void addParentNode(final String childNodeName, final String parentNodeName) {
+    checkIsBuilt();
+
+    final Node child = this.nameToNodeMap.get(childNodeName);
+    if (child == null) {
+      throw new DagException(String.format("Unknown child node (%s). Did you create the node?",
+          childNodeName));
+    }
+
+    final Node parent = this.nameToNodeMap.get(parentNodeName);
+    if (parent == null) {
+      throw new DagException(
+          String.format("Unknown parent node (%s). Did you create the node?", parentNodeName));
+    }
+
+    child.addParent(parent);
+  }
+
+
+  /**
    * Builds the dag.
    *
    * <p>Once this method is called, subsequent calls via NodeBuilder to modify the nodes's
@@ -78,11 +122,10 @@ public class DagBuilder {
    * @return the Dag reflecting the current state of the DagBuilder
    */
   public Dag build() {
+    checkIsBuilt();
     checkCircularDependencies();
-    final Dag dag = new Dag(this.name, this.dagProcessor);
-    final Map<NodeBuilder, Node> builderNodeMap = createBuilderToNodeMap(dag);
-    updateNodesRelationships(builderNodeMap);
-    return dag;
+    this.isBuilt = true;
+    return this.dag;
   }
 
   /**
@@ -98,16 +141,16 @@ public class DagBuilder {
     class CircularDependencyChecker {
 
       // The nodes that need to be visited
-      private final Set<NodeBuilder> toVisit = new HashSet<>(DagBuilder.this.builders);
+      private final Set<Node> toVisit = new HashSet<>(DagBuilder.this.nameToNodeMap.values());
 
       // The nodes that have finished traversing all their parent nodes
-      private final Set<NodeBuilder> finished = new HashSet<>();
+      private final Set<Node> finished = new HashSet<>();
 
       // The nodes that are waiting for their parent nodes to finish visit.
-      private final Set<NodeBuilder> ongoing = new HashSet<>();
+      private final Set<Node> ongoing = new HashSet<>();
 
       // One sample of nodes that form a circular dependency
-      private final List<NodeBuilder> sampleCircularNodes = new ArrayList<>();
+      private final List<Node> sampleCircularNodes = new ArrayList<>();
 
       /**
        * Checks if the builder contains nodes that form a circular dependency ring.
@@ -116,7 +159,7 @@ public class DagBuilder {
        */
       private void check() {
         while (!this.toVisit.isEmpty()) {
-          final NodeBuilder node = removeOneNodeFromToVisitSet();
+          final Node node = removeOneNodeFromToVisitSet();
           if (checkNode(node)) {
             final String msg = String.format("Circular dependency detected. Sample: %s",
                 this.sampleCircularNodes);
@@ -130,9 +173,9 @@ public class DagBuilder {
        *
        * @return a node
        */
-      private NodeBuilder removeOneNodeFromToVisitSet() {
-        final Iterator<NodeBuilder> iterator = this.toVisit.iterator();
-        final NodeBuilder node = iterator.next();
+      private Node removeOneNodeFromToVisitSet() {
+        final Iterator<Node> iterator = this.toVisit.iterator();
+        final Node node = iterator.next();
         iterator.remove();
         return node;
       }
@@ -145,7 +188,7 @@ public class DagBuilder {
        * @param node node to check
        * @return true if it is
        */
-      private boolean checkNode(final NodeBuilder node) {
+      private boolean checkNode(final Node node) {
         if (this.finished.contains(node)) {
           return false;
         }
@@ -155,7 +198,7 @@ public class DagBuilder {
         }
         this.toVisit.remove(node);
         this.ongoing.add(node);
-        for (final NodeBuilder parent : node.getParents()) {
+        for (final Node parent : node.getParents()) {
           if (checkNode(parent)) {
             this.sampleCircularNodes.add(node);
             return true;
@@ -171,47 +214,8 @@ public class DagBuilder {
     checker.check();
   }
 
-  /**
-   * Creates nodes using information stored in the current list of builders.
-   *
-   * <p>New nodes are created here to ensure they don't change even if their corresponding
-   * NodeBuilders are modified after the {@link DagBuilder#build()} is called.
-   *
-   * @param dag the dag to associate the nodes with
-   * @return the map from NodeBuilder to Node
-   */
-  private Map<NodeBuilder, Node> createBuilderToNodeMap(final Dag dag) {
-    final Map<NodeBuilder, Node> builderNodeMap = new HashMap<>();
-    for (final NodeBuilder builder : this.builders) {
-      final Node node = builder.build(dag);
-      builderNodeMap.put(builder, node);
-    }
-    return builderNodeMap;
-  }
-
-  private void updateNodesRelationships(final Map<NodeBuilder, Node> builderNodeMap) {
-    for (final NodeBuilder builder : this.builders) {
-      addParentNodes(builder, builderNodeMap);
-    }
-  }
-
-  /**
-   * Adds parent nodes to the node associated with the builder.
-   */
-  private void addParentNodes(final NodeBuilder builder,
-      final Map<NodeBuilder, Node> builderToNodeMap) {
-    final Node node = builderToNodeMap.get(builder);
-    for (final NodeBuilder parentBuilder : builder.getParents()) {
-      final Node parentNode = builderToNodeMap.get(parentBuilder);
-
-      // The NodeBuilders should have checked if the NodeBuilders belong to the same DagBuilder.
-      assert (parentNode != null);
-      node.addParent(parentNode);
-    }
-  }
-
   @Override
   public String toString() {
-    return String.format("DagBuilder (%s)", this.name);
+    return String.format("DagBuilder (%s)", this.dag.getName());
   }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
index 0088c36..5514138 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings("FutureReturnValueIgnored")
 @Singleton
-class DagService {
+public class DagService {
 
   private static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(10);
   private static final Logger logger = LoggerFactory.getLogger(DagService.class);
@@ -45,7 +45,7 @@ class DagService {
   private final ExecutorService executorService;
 
   @Inject
-  DagService(final ExecutorServiceUtils executorServiceUtils) {
+  public DagService(final ExecutorServiceUtils executorServiceUtils) {
     // Give the thread a name to make debugging easier.
     this.executorServiceUtils = executorServiceUtils;
     final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
@@ -53,42 +53,42 @@ class DagService {
     this.executorService = Executors.newSingleThreadExecutor(namedThreadFactory);
   }
 
-  void startDag(final Dag dag) {
+  public void startDag(final Dag dag) {
     this.executorService.submit(dag::start);
   }
 
   /**
    * Transitions the node to the success state.
    */
-  void markNodeSuccess(final Node node) {
+  public void markNodeSuccess(final Node node) {
     this.executorService.submit(node::markSuccess);
   }
 
   /**
    * Transitions the node from the killing state to the killed state.
    */
-  void markNodeKilled(final Node node) {
+  public void markNodeKilled(final Node node) {
     this.executorService.submit(node::markKilled);
   }
 
   /**
    * Transitions the node to the failure state.
    */
-  void markNodeFailed(final Node node) {
+  public void markNodeFailed(final Node node) {
     this.executorService.submit(node::markFailed);
   }
 
   /**
    * Kills a DAG.
    */
-  void killDag(final Dag dag) {
+  public void killDag(final Dag dag) {
     this.executorService.submit(dag::kill);
   }
 
   /**
    * Shuts down the service and waits for the tasks to finish.
    */
-  void shutdownAndAwaitTermination() throws InterruptedException {
+  public void shutdownAndAwaitTermination() throws InterruptedException {
     logger.info("DagService is shutting down.");
     this.executorServiceUtils.gracefulShutdown(this.executorService, SHUTDOWN_WAIT_TIMEOUT);
   }
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Node.java b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
index 41c6579..d283c61 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
@@ -25,7 +25,7 @@ import java.util.List;
 /**
  * Node in a DAG: Directed acyclic graph.
  */
-class Node {
+public class Node {
 
   private final String name;
 
@@ -48,6 +48,7 @@ class Node {
     this.name = name;
     requireNonNull(dag, "The dag of the node can't be null");
     this.dag = dag;
+    dag.addNode(this);
   }
 
   Dag getDag() {
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Status.java b/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
index f3f9778..ae9b643 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
@@ -18,7 +18,7 @@ package azkaban.dag;
 
 import com.google.common.collect.ImmutableSet;
 
-enum Status {
+public enum Status {
   READY, // ready to run
   DISABLED, // disabled by users. Treat as the node has the status of success
   BLOCKED, // temporarily blocked. Need to be unblocked by another external event
@@ -35,7 +35,7 @@ enum Status {
   static final ImmutableSet<Status> TERMINAL_STATES = ImmutableSet.of(DISABLED, SUCCESS, FAILURE,
       CANCELED, KILLED);
 
-  boolean isTerminal() {
+  public boolean isTerminal() {
     return TERMINAL_STATES.contains(this);
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java
index 2bc1b29..85c935e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java
@@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.Mockito.mock;
 
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import org.junit.Test;
 
@@ -32,23 +32,23 @@ public class DagBuilderTest {
   public void create_nodes_with_same_name_should_throw_an_exception() {
     final String name = "nb";
     // given
-    final NodeBuilder nodeBuilder1 = createNodeBuilder(name);
+    createNode(name);
 
     // when
-    final Throwable thrown = catchThrowable(() -> {
-      createNodeBuilder(name);
-    });
+    final Throwable thrown = catchThrowable(() -> createNode(name));
 
     // then
-    assertThat(thrown).isInstanceOf(DagException.class);
+    assertThrownIsDagBuilderException(thrown);
   }
 
   @Test
   public void build_should_return_expected_dag() {
     // given
-    final NodeBuilder nodeBuilder1 = createNodeBuilder("nb1");
-    final NodeBuilder nodeBuilder2 = createNodeBuilder("nb2");
-    nodeBuilder1.addChildren(nodeBuilder2);
+    final String parentNodeName = "nb1";
+    final String childNodeName = "nb2";
+    createNode(parentNodeName);
+    createNode(childNodeName);
+    addParentNode(childNodeName, parentNodeName);
 
     // when
     final Dag dag = this.dagBuilder.build();
@@ -58,6 +58,13 @@ public class DagBuilderTest {
     assertDagNodes(dag);
   }
 
+  /**
+   * Adds parent as the child's parent node.
+   */
+  private void addParentNode(final String childName, final String parentName) {
+    this.dagBuilder.addParentNode(childName, parentName);
+  }
+
   private void assertDagNodes(final Dag dag) {
     final List<Node> nodes = dag.getNodes();
     assertThat(nodes.size()).isEqualTo(2);
@@ -68,49 +75,78 @@ public class DagBuilderTest {
     assertThat(node2.getName()).isEqualTo("nb2");
 
     assertThat(node1.hasParent()).isFalse();
-    assertThat(node1.getChildren()).isEqualTo(Arrays.asList(node2));
+    assertThat(node1.getChildren()).isEqualTo(Collections.singletonList(node2));
 
     assertThat(node2.hasParent()).isTrue();
     assertThat(node2.getChildren()).isEmpty();
-    assertThat(node2.getParents()).isEqualTo(Arrays.asList(node1));
+    assertThat(node2.getParents()).isEqualTo(Collections.singletonList(node1));
   }
 
-  private NodeBuilder createNodeBuilder(final String name) {
+  private Node createNode(final String name) {
     return this.dagBuilder.createNode(name, mock(NodeProcessor.class));
   }
 
   @Test
   public void build_should_throw_exception_when_circular_dependency_is_detected() {
     // given
-    final NodeBuilder nodeBuilder1 = createNodeBuilder("nb1");
-    final NodeBuilder nodeBuilder2 = createNodeBuilder("nb2");
-    final NodeBuilder nodeBuilder3 = createNodeBuilder("nb3");
-    nodeBuilder2.addParents(nodeBuilder1);
-    nodeBuilder3.addParents(nodeBuilder2);
-    nodeBuilder1.addParents(nodeBuilder3);
+    final String n1Name = "nb1";
+    final String n2Name = "nb2";
+    final String n3Name = "nb3";
+    createNode(n1Name);
+    createNode(n2Name);
+    createNode(n3Name);
+
+    addParentNode(n2Name, n1Name);
+    addParentNode(n3Name, n2Name);
+    addParentNode(n1Name, n3Name);
 
     // when
-    final Throwable thrown = catchThrowable(() -> {
-      this.dagBuilder.build();
-    });
+    final Throwable thrown = catchThrowable(this.dagBuilder::build);
 
     // then
     // Expect the exception message to show the loop: nb1 -> nb2 -> nb3 -> nb1.
     System.out.println("Expect exception: " + thrown);
-    assertThat(thrown).isInstanceOf(DagException.class);
+    assertThrownIsDagBuilderException(thrown);
   }
 
   @Test
-  public void add_dependency_should_not_affect_dag_already_built() {
+  public void can_not_call_createNode_after_dag_already_built() {
     // given
-    final Dag dag = this.dagBuilder.build();
+    this.dagBuilder.build();
 
     // when
-    createNodeBuilder("a");
+    final Throwable thrown = catchThrowable(() -> createNode("a"));
 
     // then
-    final List<Node> nodes = dag.getNodes();
-    assertThat(nodes).hasSize(0);
+    assertThrownIsDagBuilderException(thrown);
+  }
+
+  @Test
+  public void can_not_call_addParentNode_after_dag_already_built() {
+    // given
+    this.dagBuilder.build();
+
+    // when
+    final Throwable thrown = catchThrowable(() -> addParentNode("n1", "n2"));
+
+    // then
+    assertThrownIsDagBuilderException(thrown);
+  }
+
+  @Test
+  public void can_not_call_build_after_dag_already_built() {
+    // given
+    this.dagBuilder.build();
+
+    // when
+    final Throwable thrown = catchThrowable(this.dagBuilder::build);
+
+    // then
+    assertThrownIsDagBuilderException(thrown);
+  }
+
+  private void assertThrownIsDagBuilderException(final Throwable thrown) {
+    assertThat(thrown).isInstanceOf(DagException.class);
   }
 
   @Test
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
index 33cbcb7..4807c34 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
@@ -97,9 +97,9 @@ public class DagServiceTest {
    */
   @Test
   public void twoNodesSuccess() throws Exception {
-    final NodeBuilder aBuilder = createNodeInTestDag("a");
-    final NodeBuilder bBuilder = createNodeInTestDag("b");
-    bBuilder.addParents(aBuilder);
+    createNodeInTestDag("a");
+    createNodeInTestDag("b");
+    this.dagBuilder.addParentNode("b", "a");
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
     addToExpectedSequence("a", Status.SUCCESS);
@@ -120,10 +120,11 @@ public class DagServiceTest {
    */
   @Test
   public void threeNodesSuccess() throws Exception {
-    final NodeBuilder aBuilder = createNodeInTestDag("a");
-    final NodeBuilder bBuilder = createNodeInTestDag("b");
-    final NodeBuilder cNode = createNodeInTestDag("c");
-    aBuilder.addChildren(bBuilder, cNode);
+    createNodeInTestDag("a");
+    createNodeInTestDag("b");
+    createNodeInTestDag("c");
+    this.dagBuilder.addParentNode("b", "a");
+    this.dagBuilder.addParentNode("c", "a");
 
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
@@ -143,8 +144,8 @@ public class DagServiceTest {
    */
   @Test
   public void oneNodeFailure() throws Exception {
-    final NodeBuilder aBuilder = createNodeInTestDag("a");
-    this.nodesToFail.add(aBuilder.getName());
+    createNodeInTestDag("a");
+    this.nodesToFail.add("a");
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
     addToExpectedSequence("a", Status.FAILURE);
@@ -164,10 +165,10 @@ public class DagServiceTest {
    */
   @Test
   public void twoNodesFailFirst() throws Exception {
-    final NodeBuilder aBuilder = createNodeInTestDag("a");
-    final NodeBuilder bBuilder = createNodeInTestDag("b");
-    bBuilder.addParents(aBuilder);
-    this.nodesToFail.add(aBuilder.getName());
+    createNodeInTestDag("a");
+    createNodeInTestDag("b");
+    this.dagBuilder.addParentNode("b", "a");
+    this.nodesToFail.add("a");
 
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
@@ -191,12 +192,12 @@ public class DagServiceTest {
    */
   @Test
   public void threeNodesFailSecond() throws Exception {
-    final NodeBuilder aBuilder = createNodeInTestDag("a");
-    final NodeBuilder bBuilder = createNodeInTestDag("b");
-    final NodeBuilder cBuilder = createNodeInTestDag("c");
-    aBuilder.addChildren(bBuilder, cBuilder);
-
-    this.nodesToFail.add(bBuilder.getName());
+    createNodeInTestDag("a");
+    createNodeInTestDag("b");
+    createNodeInTestDag("c");
+    this.dagBuilder.addParentNode("b", "a");
+    this.dagBuilder.addParentNode("c", "a");
+    this.nodesToFail.add("b");
 
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
@@ -228,28 +229,29 @@ public class DagServiceTest {
     final TestSubDagProcessor testSubDagProcessor = new TestSubDagProcessor
         (this.dagService, this.statusChangeRecorder);
     final DagBuilder subDagBuilder = new DagBuilder("fb", testSubDagProcessor);
-    createNodeInDag("a", subDagBuilder);
-    createNodeInDag("b", subDagBuilder);
+    subDagBuilder.createNode("a", this.nodeProcessor);
+    subDagBuilder.createNode("b", this.nodeProcessor);
     final Dag bDag = subDagBuilder.build();
 
     final TestSubDagNodeProcessor testSubDagNodeProcessor = new TestSubDagNodeProcessor
         (this.dagService, this.statusChangeRecorder, bDag, testSubDagProcessor);
-    final NodeBuilder subDagNodeBuilder = this.dagBuilder
-        .createNode("sfb", testSubDagNodeProcessor);
+    final String SUB_DAG_NAME = "sfb";
+    this.dagBuilder.createNode(SUB_DAG_NAME, testSubDagNodeProcessor);
+
+    createNodeInTestDag("c");
 
-    final NodeBuilder cBuilder = createNodeInTestDag("c");
-    cBuilder.addParents(subDagNodeBuilder);
+    this.dagBuilder.addParentNode("c", SUB_DAG_NAME);
     final Dag dag = this.dagBuilder.build();
 
     addToExpectedSequence("fa", Status.RUNNING);
-    addToExpectedSequence("sfb", Status.RUNNING);
+    addToExpectedSequence(SUB_DAG_NAME, Status.RUNNING);
     addToExpectedSequence("fb", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
     addToExpectedSequence("b", Status.RUNNING);
     addToExpectedSequence("a", Status.SUCCESS);
     addToExpectedSequence("b", Status.SUCCESS);
     addToExpectedSequence("fb", Status.SUCCESS);
-    addToExpectedSequence("sfb", Status.SUCCESS);
+    addToExpectedSequence(SUB_DAG_NAME, Status.SUCCESS);
     addToExpectedSequence("c", Status.RUNNING);
     addToExpectedSequence("c", Status.SUCCESS);
     addToExpectedSequence("fa", Status.SUCCESS);
@@ -258,10 +260,6 @@ public class DagServiceTest {
 
   }
 
-  private void createNodeInDag(final String name, final DagBuilder subDagBuilder) {
-    subDagBuilder.createNode(name, this.nodeProcessor);
-  }
-
   /**
    * Tests killing a dag.
    */
@@ -321,8 +319,8 @@ public class DagServiceTest {
   /**
    * Creates a node and add to the test dag.
    */
-  private NodeBuilder createNodeInTestDag(final String name) {
-    return this.dagBuilder.createNode(name, this.nodeProcessor);
+  private void createNodeInTestDag(final String name) {
+    this.dagBuilder.createNode(name, this.nodeProcessor);
   }
 }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunner2Test.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunner2Test.java
new file mode 100644
index 0000000..6e72356
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunner2Test.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.dag.Dag;
+import azkaban.dag.DagBuilder;
+import azkaban.dag.DagProcessor;
+import azkaban.dag.DagService;
+import azkaban.dag.Node;
+import azkaban.dag.NodeProcessor;
+import azkaban.dag.Status;
+import azkaban.project.NodeBean;
+import azkaban.project.NodeBeanLoader;
+import azkaban.utils.ExecutorServiceUtils;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * Tests for running flows.
+ */
+public class FlowRunner2Test {
+
+  private final DagService dagService = new DagService(new ExecutorServiceUtils());
+  private final CountDownLatch flowFinishedLatch = new CountDownLatch(1);
+
+  // The recorded event sequence.
+  private final List<String> eventSequence = new ArrayList<>();
+
+  @Test
+
+  public void runSimpleV2Flow() throws Exception {
+    final NodeBean flowNode = loadFlowNode();
+    final Dag dag = createDag(flowNode);
+    this.dagService.startDag(dag);
+    this.flowFinishedLatch.await(2, TimeUnit.SECONDS);
+    assertThat(this.eventSequence).isEqualTo(Arrays.asList("n1", "n2"));
+    this.dagService.shutdownAndAwaitTermination();
+  }
+
+  private NodeBean loadFlowNode() throws Exception {
+    final File flowFile = loadFlowFileFromResource();
+    final NodeBeanLoader beanLoader = new NodeBeanLoader();
+    return beanLoader.load(flowFile);
+  }
+
+  private Dag createDag(final NodeBean flowNode) {
+    final DagCreator creator = new DagCreator(flowNode);
+    return creator.create();
+
+  }
+
+  private class DagCreator {
+
+    private final NodeBean flowNode;
+    private final DagBuilder dagBuilder;
+
+    DagCreator(final NodeBean flowNode) {
+      final String flowName = flowNode.getName();
+      this.flowNode = flowNode;
+      this.dagBuilder = new DagBuilder(flowName, new SimpleDagProcessor());
+    }
+
+    Dag create() {
+      createNodes();
+      linkNodes();
+      return this.dagBuilder.build();
+    }
+
+    private void createNodes() {
+      for (final NodeBean node : this.flowNode.getNodes()) {
+        createNode(node);
+      }
+    }
+
+    private void createNode(final NodeBean node) {
+      final String nodeName = node.getName();
+      final SimpleNodeProcessor nodeProcessor = new SimpleNodeProcessor(nodeName, node.getConfig());
+      this.dagBuilder.createNode(nodeName, nodeProcessor);
+    }
+
+    private void linkNodes() {
+      for (final NodeBean node : this.flowNode.getNodes()) {
+        linkNode(node);
+      }
+    }
+
+    private void linkNode(final NodeBean node) {
+      final String name = node.getName();
+      final List<String> parents = node.getDependsOn();
+      if (parents == null) {
+        return;
+      }
+      for (final String parentNodeName : parents) {
+        this.dagBuilder.addParentNode(name, parentNodeName);
+      }
+    }
+  }
+
+  private File loadFlowFileFromResource() {
+    final ClassLoader loader = getClass().getClassLoader();
+    return new File(loader.getResource("hello_world_flow.flow").getFile());
+  }
+
+  class SimpleDagProcessor implements DagProcessor {
+
+    @Override
+    public void changeStatus(final Dag dag, final Status status) {
+      System.out.println(dag + " status changed to " + status);
+      if (status.isTerminal()) {
+        FlowRunner2Test.this.flowFinishedLatch.countDown();
+      }
+    }
+  }
+
+  class SimpleNodeProcessor implements NodeProcessor {
+
+    private final String name;
+    private final Map<String, String> config;
+
+    SimpleNodeProcessor(final String name, final Map<String, String> config) {
+      this.name = name;
+      this.config = config;
+    }
+
+    @Override
+    public void changeStatus(final Node node, final Status status) {
+      System.out.println(node + " status changed to " + status);
+      switch (status) {
+        case RUNNING:
+          System.out.println(String.format("Running with config: %s", this.config));
+          FlowRunner2Test.this.dagService.markNodeSuccess(node);
+          FlowRunner2Test.this.eventSequence.add(this.name);
+          break;
+        default:
+          break;
+      }
+    }
+  }
+}
diff --git a/azkaban-exec-server/src/test/resources/hello_world_flow.flow b/azkaban-exec-server/src/test/resources/hello_world_flow.flow
new file mode 100644
index 0000000..d73b1e2
--- /dev/null
+++ b/azkaban-exec-server/src/test/resources/hello_world_flow.flow
@@ -0,0 +1,11 @@
+nodes:
+  - name: n1
+    type: command
+    config:
+      command: echo "Hello World."
+  - name: n2
+    dependsOn:
+      - n1
+    type: command
+    config:
+      command: echo "This is the second job."