azkaban-aplcache

Separate dag creation from dag execution (#1759) * Separate

5/16/2018 2:58:52 PM

Details

diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
index 8029ee2..2ce6979 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
@@ -20,27 +20,54 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A DAG (Directed acyclic graph) consists of {@link Node}s.
+ *
+ * <p>Most of the methods in this class should remain package private. Code outside of this
+ * package should mainly interact with the {@link DagService}.
  */
 class Dag {
 
   private final String name;
   private final DagProcessor dagProcessor;
   private final List<Node> nodes = new ArrayList<>();
+  private final Map<String, Node> nameToNodeMap = new HashMap<>();
   private Status status = Status.READY;
 
   Dag(final String name, final DagProcessor dagProcessor) {
+    requireNonNull(name, "The name of the Dag can't be null");
     this.name = name;
     requireNonNull(dagProcessor, "The dagProcessor parameter can't be null.");
     this.dagProcessor = dagProcessor;
   }
 
+  /**
+   * Adds a node to the current dag.
+   *
+   * <p>It's important NOT to expose this method as public. The design relies on this to ensure
+   * correctness. The DAG's structure shouldn't change after it is created.
+   *
+   * @param node a node to add
+   */
   void addNode(final Node node) {
-    node.setDag(this);
+    assert (node.getDag() == this);
     this.nodes.add(node);
+    assert (!this.nameToNodeMap.containsKey(node.getName()));
+    this.nameToNodeMap.put(node.getName(), node);
+  }
+
+  /**
+   * Gets the node associated with the name.
+   *
+   * @param name node name
+   * @return node. null if the node with this name doesn't exist
+   */
+  Node getNodeByName(final String name) {
+    return this.nameToNodeMap.get(name);
   }
 
   void start() {
@@ -136,4 +163,9 @@ class Dag {
   void setStatus(final Status status) {
     this.status = status;
   }
+
+  @VisibleForTesting
+  public List<Node> getNodes() {
+    return this.nodes;
+  }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java
new file mode 100644
index 0000000..fed5a75
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A builder to build DAGs.
+ *
+ * <p>Use the {@link DagBuilder#createNode} method to create NodeBuilder instances. Call
+ * methods on NodeBuilder to add dependencies among them. Call the {@link DagBuilder#build()} method
+ * to build a Dag.
+ */
+public class DagBuilder {
+
+  private final String name;
+  private final DagProcessor dagProcessor;
+
+  private final List<NodeBuilder> builders = new ArrayList<>();
+  private final Set<String> nodeNamesSet = new HashSet<>();
+
+  public DagBuilder(final String name, final DagProcessor dagProcessor) {
+    requireNonNull(name, "The name of the DagBuilder can't be null");
+    this.name = name;
+    requireNonNull(name, "The dagProcessor of the DagBuilder can't be null");
+    this.dagProcessor = dagProcessor;
+  }
+
+  public NodeBuilder createNode(final String name, final NodeProcessor nodeProcessor) {
+    final NodeBuilder builder = new NodeBuilder(name, nodeProcessor, this);
+    this.builders.add(builder);
+    if (this.nodeNamesSet.contains(name)) {
+      throw new DagException(String.format("Node names in %s need to be unique. The name "
+          + "(%s) already exists.", this, name));
+    }
+    this.nodeNamesSet.add(name);
+
+    return builder;
+  }
+
+  /**
+   * Builds the dag.
+   *
+   * <p>Once this method is called, subsequent calls via NodeBuilder to modify the nodes's
+   * relationships in the dag will have no effect on the returned Dag object.
+   * </p>
+   *
+   * @return the Dag reflecting the current state of the DagBuilder
+   */
+  public Dag build() {
+    final Dag dag = new Dag(this.name, this.dagProcessor);
+    final Map<NodeBuilder, Node> builderNodeMap = createBuilderToNodeMap(dag);
+    updateNodesRelationships(builderNodeMap);
+    // todo HappyRay: circular dependency detection.
+    return dag;
+  }
+
+  /**
+   * Creates nodes using information stored in the current list of builders.
+   *
+   * <p>New nodes are created here to ensure they don't change even if their corresponding
+   * NodeBuilders are modified after the {@link DagBuilder#build()} is called.
+   *
+   * @param dag the dag to associate the nodes with
+   * @return the map from NodeBuilder to Node
+   */
+  private Map<NodeBuilder, Node> createBuilderToNodeMap(final Dag dag) {
+    final Map<NodeBuilder, Node> builderNodeMap = new HashMap<>();
+    for (final NodeBuilder builder : this.builders) {
+      final Node node = builder.build(dag);
+      builderNodeMap.put(builder, node);
+    }
+    return builderNodeMap;
+  }
+
+  private void updateNodesRelationships(final Map<NodeBuilder, Node> builderNodeMap) {
+    for (final NodeBuilder builder : this.builders) {
+      addParentNodes(builder, builderNodeMap);
+    }
+  }
+
+  /**
+   * Adds parent nodes to the node associated with the builder.
+   */
+  private void addParentNodes(final NodeBuilder builder,
+      final Map<NodeBuilder, Node> builderToNodeMap) {
+    final Node node = builderToNodeMap.get(builder);
+    for (final NodeBuilder parentBuilder : builder.getParents()) {
+      final Node parentNode = builderToNodeMap.get(parentBuilder);
+
+      // The NodeBuilders should have checked if the NodeBuilders belong to the same DagBuilder.
+      assert (parentNode != null);
+      node.addParent(parentNode);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("DagBuilder (%s)", this.name);
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagException.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagException.java
new file mode 100644
index 0000000..d6b0589
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagException.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+public class DagException extends RuntimeException {
+
+  public DagException(final String message) {
+    super(message);
+  }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Node.java b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
index b1a9f26..41c6579 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
@@ -39,35 +39,34 @@ class Node {
 
   private Status status = Status.READY;
 
-  private Dag dag;
+  private final Dag dag;
 
-  Node(final String name, final NodeProcessor nodeProcessor) {
-    this.name = name;
+  Node(final String name, final NodeProcessor nodeProcessor, final Dag dag) {
     requireNonNull(nodeProcessor, "The nodeProcessor parameter can't be null.");
     this.nodeProcessor = nodeProcessor;
+    requireNonNull(name, "The name of the node can't be null");
+    this.name = name;
+    requireNonNull(dag, "The dag of the node can't be null");
+    this.dag = dag;
   }
 
-  public Dag getDag() {
+  Dag getDag() {
     return this.dag;
   }
 
-  public void setDag(final Dag dag) {
-    this.dag = dag;
-  }
-
-  private void addParent(final Node node) {
+  /**
+   * Adds the node as the current node's parent i.e. the current node depends on the given node.
+   *
+   * <p>It's important NOT to expose this method as public. The design relies on this to ensure
+   * correctness. The DAG's structure shouldn't change after it is created.
+   */
+  void addParent(final Node node) {
     this.parents.add(node);
+    node.addChild(this);
   }
 
-  void addChild(final Node node) {
+  private void addChild(final Node node) {
     this.children.add(node);
-    node.addParent(this);
-  }
-
-  void addChildren(final Node... nodes) {
-    for (final Node node : nodes) {
-      addChild(node);
-    }
   }
 
   boolean hasParent() {
@@ -195,4 +194,14 @@ class Node {
   String getName() {
     return this.name;
   }
+
+  @VisibleForTesting
+  List<Node> getChildren() {
+    return this.children;
+  }
+
+  @VisibleForTesting
+  List<Node> getParents() {
+    return this.parents;
+  }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/NodeBuilder.java b/azkaban-exec-server/src/main/java/azkaban/dag/NodeBuilder.java
new file mode 100644
index 0000000..15960c6
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/NodeBuilder.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class NodeBuilder {
+
+  private final String name;
+  private final NodeProcessor nodeProcessor;
+
+  private final DagBuilder dagBuilder;
+
+  // The nodes that this node depends on
+  private final Set<NodeBuilder> parents = new HashSet<>();
+
+  public NodeBuilder(final String name, final NodeProcessor nodeProcessor,
+      final DagBuilder dagBuilder) {
+    requireNonNull(name, "The name of the NodeBuilder can't be null");
+    this.name = name;
+    requireNonNull(nodeProcessor, "The nodeProcessor of the NodeBuilder can't be null");
+    this.nodeProcessor = nodeProcessor;
+    requireNonNull(dagBuilder, "The dagBuilder of the NodeBuilder can't be null");
+    this.dagBuilder = dagBuilder;
+  }
+
+  /**
+   * Adds the given builder as a parent of this builder.
+   *
+   * <p>If the same builder is added multiple times to this builder, this builder will retain
+   * only one reference to it.</p>
+   */
+  private void addParent(final NodeBuilder builder) {
+    checkBuildersBelongToSameDag(builder);
+
+    // Add the relationship to the data structure internal to the builder instead of changing
+    // the associated node directly. This is done to prevent users of this method to change the
+    // structure of the dag after the DagBuilder::build method is called.
+    this.parents.add(builder);
+  }
+
+  /**
+   * Checks if the given NodeBuilder belongs to the same DagBuilder as the current NodeBuilder.
+   */
+  private void checkBuildersBelongToSameDag(final NodeBuilder builder) {
+    if (builder.dagBuilder != this.dagBuilder) {
+      throw new DagException(String.format("Can't add a dependency from %s to %s since they "
+          + "belong to different DagBuilders.", builder, this));
+    }
+  }
+
+  /**
+   * Add builders as parents of this builder.
+   *
+   * <p>This method handles de-duplication of builders.</p>
+   *
+   * @throws DagException if builders are not created from the same DagBuilder
+   */
+  public void addParents(final NodeBuilder... builders) {
+    for (final NodeBuilder builder : builders) {
+      addParent(builder);
+    }
+  }
+
+  /**
+   * Add builders as children of this builder.
+   *
+   * <p>This method handles de-duplication of builders.</p>
+   *
+   * @throws DagException if builders are not created from the same DagBuilder
+   */
+  public void addChildren(final NodeBuilder... builders) {
+    for (final NodeBuilder builder : builders) {
+      builder.addParent(this);
+    }
+  }
+
+  Set<NodeBuilder> getParents() {
+    return this.parents;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("NodeBuilder (%s) in %s", this.name, this.dagBuilder);
+  }
+
+  String getName() {
+    return this.name;
+  }
+
+  /**
+   * Builds a Node and adds it to the given Dag.
+   *
+   * @param dag Dag to associate this node with
+   * @return a node
+   */
+  public Node build(final Dag dag) {
+    final Node node = new Node(this.name, this.nodeProcessor, dag);
+    dag.addNode(node);
+    return node;
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java
new file mode 100644
index 0000000..132aa83
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+public class DagBuilderTest {
+
+  private final DagBuilder dagBuilder = new DagBuilder("dag builder", mock(DagProcessor.class));
+
+  @Test
+  public void create_nodes_with_same_name_should_throw_an_exception() {
+    final String name = "nb";
+    // given
+    final NodeBuilder nodeBuilder1 = createNodeBuilder(name);
+
+    // when
+    final Throwable thrown = catchThrowable(() -> {
+      createNodeBuilder(name);
+    });
+
+    // then
+    assertThat(thrown).isInstanceOf(DagException.class);
+  }
+
+  @Test
+  public void build_should_return_expected_dag() {
+    // given
+    final NodeBuilder nodeBuilder1 = createNodeBuilder("nb1");
+    final NodeBuilder nodeBuilder2 = createNodeBuilder("nb2");
+    nodeBuilder1.addChildren(nodeBuilder2);
+
+    // when
+    final Dag dag = this.dagBuilder.build();
+
+    // then
+    assertThat(dag.getName()).isEqualTo("dag builder");
+    assertDagNodes(dag);
+  }
+
+  private void assertDagNodes(final Dag dag) {
+    final List<Node> nodes = dag.getNodes();
+    assertThat(nodes.size()).isEqualTo(2);
+    final Node node1 = nodes.get(0);
+    final Node node2 = nodes.get(1);
+
+    assertThat(node1.getName()).isEqualTo("nb1");
+    assertThat(node2.getName()).isEqualTo("nb2");
+
+    assertThat(node1.hasParent()).isFalse();
+    assertThat(node1.getChildren()).isEqualTo(Arrays.asList(node2));
+
+    assertThat(node2.hasParent()).isTrue();
+    assertThat(node2.getChildren()).isEmpty();
+    assertThat(node2.getParents()).isEqualTo(Arrays.asList(node1));
+  }
+
+  private NodeBuilder createNodeBuilder(final String name) {
+    return this.dagBuilder.createNode(name, mock(NodeProcessor.class));
+  }
+
+  @Test
+  public void add_dependency_should_not_affect_dag_already_built() {
+    // given
+    final Dag dag = this.dagBuilder.build();
+
+    // when
+    createNodeBuilder("a");
+
+    // then
+    final List<Node> nodes = dag.getNodes();
+    assertThat(nodes).hasSize(0);
+  }
+
+  @Test
+  public void test_toString() {
+    // given
+
+    // when
+    final String stringRepresentation = this.dagBuilder.toString();
+
+    // then
+    assertThat(stringRepresentation).isEqualTo("DagBuilder (dag builder)");
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
index ddcb068..c757d31 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
@@ -39,13 +39,15 @@ public class DagServiceTest {
 
   private final DagService dagService = new DagService();
   private final StatusChangeRecorder statusChangeRecorder = new StatusChangeRecorder();
-  private final Set<Node> nodesToFail = new HashSet<>();
+
+  // The names of the nodes that are supposed to fail.
+  private final Set<String> nodesToFail = new HashSet<>();
   private final TestNodeProcessor nodeProcessor = new TestNodeProcessor(this.dagService,
       this.statusChangeRecorder, this.nodesToFail);
   private final CountDownLatch dagFinishedLatch = new CountDownLatch(1);
   private final DagProcessor dagProcessor = new TestDagProcessor(this.dagFinishedLatch,
       this.statusChangeRecorder);
-  private final Dag testDag = createDag();
+  private final DagBuilder dagBuilder = new DagBuilder("fa", this.dagProcessor);
   private final List<Pair<String, Status>> expectedSequence = new ArrayList<>();
 
 
@@ -59,13 +61,13 @@ public class DagServiceTest {
    */
   @Test
   public void oneNodeSuccess() throws Exception {
-    createNodeAndAddToTestDag("a");
+    createNodeInTestDag("a");
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
     addToExpectedSequence("a", Status.SUCCESS);
     addToExpectedSequence("fa", Status.SUCCESS);
 
-    runAndVerify();
+    buildDagRunAndVerify();
   }
 
   /**
@@ -76,9 +78,9 @@ public class DagServiceTest {
    */
   @Test
   public void twoNodesSuccess() throws Exception {
-    final Node aNode = createNodeAndAddToTestDag("a");
-    final Node bNode = createNodeAndAddToTestDag("b");
-    aNode.addChild(bNode);
+    final NodeBuilder aBuilder = createNodeInTestDag("a");
+    final NodeBuilder bBuilder = createNodeInTestDag("b");
+    bBuilder.addParents(aBuilder);
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
     addToExpectedSequence("a", Status.SUCCESS);
@@ -86,7 +88,7 @@ public class DagServiceTest {
     addToExpectedSequence("b", Status.SUCCESS);
     addToExpectedSequence("fa", Status.SUCCESS);
 
-    runAndVerify();
+    buildDagRunAndVerify();
   }
 
   /**
@@ -99,10 +101,10 @@ public class DagServiceTest {
    */
   @Test
   public void threeNodesSuccess() throws Exception {
-    final Node aNode = createNodeAndAddToTestDag("a");
-    final Node bNode = createNodeAndAddToTestDag("b");
-    final Node cNode = createNodeAndAddToTestDag("c");
-    aNode.addChildren(bNode, cNode);
+    final NodeBuilder aBuilder = createNodeInTestDag("a");
+    final NodeBuilder bBuilder = createNodeInTestDag("b");
+    final NodeBuilder cNode = createNodeInTestDag("c");
+    aBuilder.addChildren(bBuilder, cNode);
 
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
@@ -113,7 +115,7 @@ public class DagServiceTest {
     addToExpectedSequence("c", Status.SUCCESS);
     addToExpectedSequence("fa", Status.SUCCESS);
 
-    runAndVerify();
+    buildDagRunAndVerify();
 
   }
 
@@ -122,14 +124,14 @@ public class DagServiceTest {
    */
   @Test
   public void oneNodeFailure() throws Exception {
-    final Node aNode = createNodeAndAddToTestDag("a");
-    this.nodesToFail.add(aNode);
+    final NodeBuilder aBuilder = createNodeInTestDag("a");
+    this.nodesToFail.add(aBuilder.getName());
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
     addToExpectedSequence("a", Status.FAILURE);
     addToExpectedSequence("fa", Status.FAILURE);
 
-    runAndVerify();
+    buildDagRunAndVerify();
   }
 
   /**
@@ -143,10 +145,10 @@ public class DagServiceTest {
    */
   @Test
   public void twoNodesFailFirst() throws Exception {
-    final Node aNode = createNodeAndAddToTestDag("a");
-    final Node bNode = createNodeAndAddToTestDag("b");
-    aNode.addChild(bNode);
-    this.nodesToFail.add(aNode);
+    final NodeBuilder aBuilder = createNodeInTestDag("a");
+    final NodeBuilder bBuilder = createNodeInTestDag("b");
+    bBuilder.addParents(aBuilder);
+    this.nodesToFail.add(aBuilder.getName());
 
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
@@ -154,7 +156,7 @@ public class DagServiceTest {
     addToExpectedSequence("b", Status.CANCELED);
     addToExpectedSequence("fa", Status.FAILURE);
 
-    runAndVerify();
+    buildDagRunAndVerify();
   }
 
   /**
@@ -170,12 +172,12 @@ public class DagServiceTest {
    */
   @Test
   public void threeNodesFailSecond() throws Exception {
-    final Node aNode = createNodeAndAddToTestDag("a");
-    final Node bNode = createNodeAndAddToTestDag("b");
-    final Node cNode = createNodeAndAddToTestDag("c");
-    aNode.addChildren(bNode, cNode);
+    final NodeBuilder aBuilder = createNodeInTestDag("a");
+    final NodeBuilder bBuilder = createNodeInTestDag("b");
+    final NodeBuilder cBuilder = createNodeInTestDag("c");
+    aBuilder.addChildren(bBuilder, cBuilder);
 
-    this.nodesToFail.add(bNode);
+    this.nodesToFail.add(bBuilder.getName());
 
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
@@ -186,7 +188,7 @@ public class DagServiceTest {
     addToExpectedSequence("c", Status.SUCCESS);
     addToExpectedSequence("fa", Status.FAILURE);
 
-    runAndVerify();
+    buildDagRunAndVerify();
 
   }
 
@@ -203,21 +205,24 @@ public class DagServiceTest {
    * </pre>
    */
   @Test
-  public void simple_subdag_success_case() throws Exception {
+  public void simple_sub_dag_success_case() throws Exception {
     final TestSubDagProcessor testSubDagProcessor = new TestSubDagProcessor
         (this.dagService, this.statusChangeRecorder);
-    final Dag bDag = new Dag("fb", testSubDagProcessor);
-    createNodeAndAddToDag("a", bDag);
-    createNodeAndAddToDag("b", bDag);
+    final DagBuilder subDagBuilder = new DagBuilder("fb", testSubDagProcessor);
+    createNodeInDag("a", subDagBuilder);
+    createNodeInDag("b", subDagBuilder);
+    final Dag bDag = subDagBuilder.build();
 
     final TestSubDagNodeProcessor testSubDagNodeProcessor = new TestSubDagNodeProcessor
         (this.dagService, this.statusChangeRecorder, bDag);
-    final Node subDagNode = new Node("sfb", testSubDagNodeProcessor);
-    testSubDagProcessor.setNode(subDagNode);
-    this.testDag.addNode(subDagNode);
+    final NodeBuilder subDagNodeBuilder = this.dagBuilder
+        .createNode("sfb", testSubDagNodeProcessor);
+
+    final NodeBuilder cBuilder = createNodeInTestDag("c");
+    cBuilder.addParents(subDagNodeBuilder);
+    final Dag dag = this.dagBuilder.build();
 
-    final Node cNode = createNodeAndAddToTestDag("c");
-    subDagNode.addChild(cNode);
+    testSubDagProcessor.setNode(dag.getNodeByName(subDagNodeBuilder.getName()));
 
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("sfb", Status.RUNNING);
@@ -232,8 +237,12 @@ public class DagServiceTest {
     addToExpectedSequence("c", Status.SUCCESS);
     addToExpectedSequence("fa", Status.SUCCESS);
 
-    runAndVerify();
+    runAndVerify(dag);
+
+  }
 
+  private void createNodeInDag(final String name, final DagBuilder subDagBuilder) {
+    subDagBuilder.createNode(name, this.nodeProcessor);
   }
 
   /**
@@ -244,8 +253,8 @@ public class DagServiceTest {
     final CountDownLatch nodeRunningLatch = new CountDownLatch(1);
     final TestKillNodeProcessor killNodeProcessor = new TestKillNodeProcessor(this.dagService,
         this.statusChangeRecorder, nodeRunningLatch);
-    final Node aNode = new Node("a", killNodeProcessor);
-    this.testDag.addNode(aNode);
+    this.dagBuilder.createNode("a", killNodeProcessor);
+    final Dag dag = this.dagBuilder.build();
 
     addToExpectedSequence("fa", Status.RUNNING);
     addToExpectedSequence("a", Status.RUNNING);
@@ -254,25 +263,24 @@ public class DagServiceTest {
     addToExpectedSequence("a", Status.KILLED);
     addToExpectedSequence("fa", Status.KILLED);
 
-    this.dagService.startDag(this.testDag);
+    this.dagService.startDag(dag);
 
     // Make sure the node is running before killing the DAG.
-    nodeRunningLatch.await(120, TimeUnit.SECONDS);
-    this.dagService.killDag(this.testDag);
+    nodeRunningLatch.await(1, TimeUnit.SECONDS);
+    this.dagService.killDag(dag);
 
-    final boolean isWaitSuccessful = this.dagFinishedLatch.await(120, TimeUnit.SECONDS);
+    final boolean isWaitSuccessful = this.dagFinishedLatch.await(1, TimeUnit.SECONDS);
     // Make sure the dag finishes.
     assertThat(isWaitSuccessful).isTrue();
     verifyStatusSequence();
   }
 
-
   private void addToExpectedSequence(final String name, final Status status) {
     this.expectedSequence.add(new Pair<>(name, status));
   }
 
-  private void runDag() throws InterruptedException {
-    this.dagService.startDag(this.testDag);
+  private void runDag(final Dag dag) throws InterruptedException {
+    this.dagService.startDag(dag);
     final boolean isWaitSuccessful = this.dagFinishedLatch.await(2, TimeUnit.SECONDS);
 
     // Make sure the dag finishes.
@@ -283,33 +291,21 @@ public class DagServiceTest {
     this.statusChangeRecorder.verifySequence(this.expectedSequence);
   }
 
-  private void runAndVerify() throws InterruptedException {
-    runDag();
+  private void buildDagRunAndVerify() throws InterruptedException {
+    final Dag dag = this.dagBuilder.build();
+    runAndVerify(dag);
+  }
+
+  private void runAndVerify(final Dag dag) throws InterruptedException {
+    runDag(dag);
     verifyStatusSequence();
   }
 
   /**
    * Creates a node and add to the test dag.
-   *
-   * @param name node name
-   * @return Node object
    */
-  private Node createNode(final String name) {
-    return new Node(name, this.nodeProcessor);
-  }
-
-  private Node createNodeAndAddToDag(final String name, final Dag flow) {
-    final Node node = createNode(name);
-    flow.addNode(node);
-    return node;
-  }
-
-  private Node createNodeAndAddToTestDag(final String name) {
-    return createNodeAndAddToDag(name, this.testDag);
-  }
-
-  private Dag createDag() {
-    return new Dag("fa", this.dagProcessor);
+  private NodeBuilder createNodeInTestDag(final String name) {
+    return this.dagBuilder.createNode(name, this.nodeProcessor);
   }
 }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
index 522beb2..6bd7f13 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
@@ -57,7 +57,7 @@ public class DagTest {
     final Node aNode = createAndAddNode("a");
     aNode.setStatus(Status.RUNNING);
     final Node bNode = createAndAddNode("b");
-    aNode.addChild(bNode);
+    bNode.addParent(aNode);
     this.testFlow.setStatus(Status.RUNNING);
     this.testFlow.kill();
     assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
@@ -80,11 +80,11 @@ public class DagTest {
     final Node aNode = createAndAddNode("a");
     aNode.setStatus(Status.RUNNING);
     final Node bNode = createAndAddNode("b");
-    aNode.addChild(bNode);
+    bNode.addParent(aNode);
     final Node cNode = createAndAddNode("c");
-    aNode.addChild(cNode);
+    cNode.addParent(aNode);
     final Node dNode = createAndAddNode("d");
-    cNode.addChild(dNode);
+    dNode.addParent(cNode);
 
     this.testFlow.setStatus(Status.RUNNING);
     this.testFlow.kill();
@@ -103,7 +103,7 @@ public class DagTest {
     final Node aNode = createAndAddNode("a");
     aNode.setStatus(Status.RUNNING);
     final Node bNode = createAndAddNode("b");
-    aNode.addChild(bNode);
+    bNode.addParent(aNode);
     bNode.setStatus(Status.BLOCKED);
     this.testFlow.setStatus(Status.RUNNING);
     this.testFlow.kill();
@@ -125,7 +125,7 @@ public class DagTest {
     aNode.setStatus(Status.SUCCESS);
     final Node bNode = createAndAddNode("b");
     bNode.setStatus(Status.RUNNING);
-    aNode.addChild(bNode);
+    bNode.addParent(aNode);
     this.testFlow.kill();
     assertThat(aNode.getStatus()).isEqualTo(Status.SUCCESS);
     assertThat(bNode.getStatus()).isEqualTo(Status.KILLING);
@@ -157,7 +157,7 @@ public class DagTest {
    * @return Node object
    */
   private Node createAndAddNode(final String name) {
-    final Node node = TestUtil.createNodeWithNullProcessor(name);
+    final Node node = TestUtil.createNodeWithNullProcessor(name, this.testFlow);
     this.testFlow.addNode(node);
     return node;
   }
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/NodeBuilderTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/NodeBuilderTest.java
new file mode 100644
index 0000000..1c15a02
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/NodeBuilderTest.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Test;
+
+public class NodeBuilderTest {
+
+  private final DagBuilder dagBuilder = mock(DagBuilder.class);
+  private final NodeBuilder builder = createBuilder("builder");
+
+  @Test
+  public void addChildren() {
+    // given
+    final NodeBuilder builder2 = createBuilder("builder2");
+    final NodeBuilder builder3 = createBuilder("builder3");
+
+    // when
+    this.builder.addChildren(builder2, builder3);
+    assertParentMatch(builder2);
+    assertParentMatch(builder3);
+  }
+
+  /**
+   * Asserts that the parent of the given node is the test builder node.
+   */
+  private void assertParentMatch(final NodeBuilder builder) {
+    final Set<NodeBuilder> parents = builder.getParents();
+    assertThat(parents).isEqualTo(new HashSet<>(Arrays.asList(this
+        .builder)));
+  }
+
+  @Test
+  public void addParents() {
+    // given
+    final NodeBuilder builder2 = createBuilder("builder2");
+    final NodeBuilder builder3 = createBuilder("builder3");
+
+    // when
+    this.builder.addParents(builder2, builder3);
+    final Set<NodeBuilder> parents = this.builder.getParents();
+
+    // then
+    assertThat(parents).isEqualTo(new HashSet<>(Arrays.asList(builder2, builder3)));
+  }
+
+  private NodeBuilder createBuilder(final String name) {
+    return new NodeBuilder(name, mock(NodeProcessor.class), this.dagBuilder);
+  }
+
+  @Test
+  public void depend_on_node_in_a_different_dag_should_throw_exception() {
+    // given
+    final NodeBuilder builderInAnotherDag = new NodeBuilder("builder from another dag", mock
+        (NodeProcessor.class), mock
+        (DagBuilder.class));
+
+    // when
+    final Throwable thrown = catchThrowable(() -> {
+      this.builder.addChildren(builderInAnotherDag);
+    });
+
+    // then
+    assertThat(thrown).isInstanceOf(DagException.class);
+  }
+
+  @Test
+  public void toStringTest() {
+    // given
+    final DagBuilder dagBuilder = new DagBuilder("dag", mock(DagProcessor.class));
+    final NodeBuilder nodeBuilder = new NodeBuilder("node", mock(NodeProcessor.class),
+        dagBuilder);
+
+    // when
+    final String stringRepresentation = nodeBuilder.toString();
+
+    // then
+    assertThat(stringRepresentation).isEqualTo("NodeBuilder (node) in DagBuilder (dag)");
+  }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java
index d871624..e0ba3e8 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java
@@ -18,6 +18,7 @@ package azkaban.dag;
 
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
 
 import org.junit.Test;
 
@@ -25,18 +26,23 @@ public class NodeTest {
 
   @Test
   public void hasParent() {
-    final Node node = TestUtil.createNodeWithNullProcessor("a");
-    final Node parentNode = TestUtil.createNodeWithNullProcessor("parent");
-    parentNode.addChild(node);
+    final Node node = createTestNode("a");
+    final Node parentNode = createTestNode("parent");
+    node.addParent(parentNode);
     final boolean hasParent = node.hasParent();
     assertThat(hasParent).isTrue();
   }
 
   @Test
   public void hasParentNegative() {
-    final Node node = TestUtil.createNodeWithNullProcessor("a");
+    final Node node = createTestNode("a");
     final boolean hasParent = node.hasParent();
     assertThat(hasParent).isFalse();
   }
 
+  private Node createTestNode(final String name) {
+    return TestUtil.createNodeWithNullProcessor(name, mock(Dag.class));
+  }
+
+
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java
index f0b1751..cc50834 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java
@@ -22,10 +22,10 @@ public class TestNodeProcessor implements NodeProcessor {
 
   private final DagService dagService;
   private final StatusChangeRecorder statusChangeRecorder;
-  private final Set<Node> nodesToFail;
+  private final Set<String> nodesToFail;
 
   TestNodeProcessor(final DagService dagService,
-      final StatusChangeRecorder statusChangeRecorder, final Set<Node> nodesToFail) {
+      final StatusChangeRecorder statusChangeRecorder, final Set<String> nodesToFail) {
     this.dagService = dagService;
     this.statusChangeRecorder = statusChangeRecorder;
     this.nodesToFail = nodesToFail;
@@ -38,7 +38,7 @@ public class TestNodeProcessor implements NodeProcessor {
 
     switch (status) {
       case RUNNING:
-        if (this.nodesToFail.contains(node)) {
+        if (this.nodesToFail.contains(node.getName())) {
           this.dagService.markNodeFailed(node);
         } else {
           this.dagService.markNodeSuccess(node);
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java
index 29642c6..0f7dbcd 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java
@@ -25,7 +25,7 @@ class TestUtil {
    *
    * @param name node name
    */
-  static Node createNodeWithNullProcessor(final String name) {
-    return new Node(name, mock(NodeProcessor.class));
+  static Node createNodeWithNullProcessor(final String name, final Dag dag) {
+    return new Node(name, mock(NodeProcessor.class), dag);
   }
 }