Details
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
index 8029ee2..2ce6979 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
@@ -20,27 +20,54 @@ import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* A DAG (Directed acyclic graph) consists of {@link Node}s.
+ *
+ * <p>Most of the methods in this class should remain package private. Code outside of this
+ * package should mainly interact with the {@link DagService}.
*/
class Dag {
private final String name;
private final DagProcessor dagProcessor;
private final List<Node> nodes = new ArrayList<>();
+ private final Map<String, Node> nameToNodeMap = new HashMap<>();
private Status status = Status.READY;
Dag(final String name, final DagProcessor dagProcessor) {
+ requireNonNull(name, "The name of the Dag can't be null");
this.name = name;
requireNonNull(dagProcessor, "The dagProcessor parameter can't be null.");
this.dagProcessor = dagProcessor;
}
+ /**
+ * Adds a node to the current dag.
+ *
+ * <p>It's important NOT to expose this method as public. The design relies on this to ensure
+ * correctness. The DAG's structure shouldn't change after it is created.
+ *
+ * @param node a node to add
+ */
void addNode(final Node node) {
- node.setDag(this);
+ assert (node.getDag() == this);
this.nodes.add(node);
+ assert (!this.nameToNodeMap.containsKey(node.getName()));
+ this.nameToNodeMap.put(node.getName(), node);
+ }
+
+ /**
+ * Gets the node associated with the name.
+ *
+ * @param name node name
+ * @return node. null if the node with this name doesn't exist
+ */
+ Node getNodeByName(final String name) {
+ return this.nameToNodeMap.get(name);
}
void start() {
@@ -136,4 +163,9 @@ class Dag {
void setStatus(final Status status) {
this.status = status;
}
+
+ @VisibleForTesting
+ public List<Node> getNodes() {
+ return this.nodes;
+ }
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java
new file mode 100644
index 0000000..fed5a75
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A builder to build DAGs.
+ *
+ * <p>Use the {@link DagBuilder#createNode} method to create NodeBuilder instances. Call
+ * methods on NodeBuilder to add dependencies among them. Call the {@link DagBuilder#build()} method
+ * to build a Dag.
+ */
+public class DagBuilder {
+
+ private final String name;
+ private final DagProcessor dagProcessor;
+
+ private final List<NodeBuilder> builders = new ArrayList<>();
+ private final Set<String> nodeNamesSet = new HashSet<>();
+
+ public DagBuilder(final String name, final DagProcessor dagProcessor) {
+ requireNonNull(name, "The name of the DagBuilder can't be null");
+ this.name = name;
+ requireNonNull(name, "The dagProcessor of the DagBuilder can't be null");
+ this.dagProcessor = dagProcessor;
+ }
+
+ public NodeBuilder createNode(final String name, final NodeProcessor nodeProcessor) {
+ final NodeBuilder builder = new NodeBuilder(name, nodeProcessor, this);
+ this.builders.add(builder);
+ if (this.nodeNamesSet.contains(name)) {
+ throw new DagException(String.format("Node names in %s need to be unique. The name "
+ + "(%s) already exists.", this, name));
+ }
+ this.nodeNamesSet.add(name);
+
+ return builder;
+ }
+
+ /**
+ * Builds the dag.
+ *
+ * <p>Once this method is called, subsequent calls via NodeBuilder to modify the nodes's
+ * relationships in the dag will have no effect on the returned Dag object.
+ * </p>
+ *
+ * @return the Dag reflecting the current state of the DagBuilder
+ */
+ public Dag build() {
+ final Dag dag = new Dag(this.name, this.dagProcessor);
+ final Map<NodeBuilder, Node> builderNodeMap = createBuilderToNodeMap(dag);
+ updateNodesRelationships(builderNodeMap);
+ // todo HappyRay: circular dependency detection.
+ return dag;
+ }
+
+ /**
+ * Creates nodes using information stored in the current list of builders.
+ *
+ * <p>New nodes are created here to ensure they don't change even if their corresponding
+ * NodeBuilders are modified after the {@link DagBuilder#build()} is called.
+ *
+ * @param dag the dag to associate the nodes with
+ * @return the map from NodeBuilder to Node
+ */
+ private Map<NodeBuilder, Node> createBuilderToNodeMap(final Dag dag) {
+ final Map<NodeBuilder, Node> builderNodeMap = new HashMap<>();
+ for (final NodeBuilder builder : this.builders) {
+ final Node node = builder.build(dag);
+ builderNodeMap.put(builder, node);
+ }
+ return builderNodeMap;
+ }
+
+ private void updateNodesRelationships(final Map<NodeBuilder, Node> builderNodeMap) {
+ for (final NodeBuilder builder : this.builders) {
+ addParentNodes(builder, builderNodeMap);
+ }
+ }
+
+ /**
+ * Adds parent nodes to the node associated with the builder.
+ */
+ private void addParentNodes(final NodeBuilder builder,
+ final Map<NodeBuilder, Node> builderToNodeMap) {
+ final Node node = builderToNodeMap.get(builder);
+ for (final NodeBuilder parentBuilder : builder.getParents()) {
+ final Node parentNode = builderToNodeMap.get(parentBuilder);
+
+ // The NodeBuilders should have checked if the NodeBuilders belong to the same DagBuilder.
+ assert (parentNode != null);
+ node.addParent(parentNode);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DagBuilder (%s)", this.name);
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagException.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagException.java
new file mode 100644
index 0000000..d6b0589
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagException.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+public class DagException extends RuntimeException {
+
+ public DagException(final String message) {
+ super(message);
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Node.java b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
index b1a9f26..41c6579 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
@@ -39,35 +39,34 @@ class Node {
private Status status = Status.READY;
- private Dag dag;
+ private final Dag dag;
- Node(final String name, final NodeProcessor nodeProcessor) {
- this.name = name;
+ Node(final String name, final NodeProcessor nodeProcessor, final Dag dag) {
requireNonNull(nodeProcessor, "The nodeProcessor parameter can't be null.");
this.nodeProcessor = nodeProcessor;
+ requireNonNull(name, "The name of the node can't be null");
+ this.name = name;
+ requireNonNull(dag, "The dag of the node can't be null");
+ this.dag = dag;
}
- public Dag getDag() {
+ Dag getDag() {
return this.dag;
}
- public void setDag(final Dag dag) {
- this.dag = dag;
- }
-
- private void addParent(final Node node) {
+ /**
+ * Adds the node as the current node's parent i.e. the current node depends on the given node.
+ *
+ * <p>It's important NOT to expose this method as public. The design relies on this to ensure
+ * correctness. The DAG's structure shouldn't change after it is created.
+ */
+ void addParent(final Node node) {
this.parents.add(node);
+ node.addChild(this);
}
- void addChild(final Node node) {
+ private void addChild(final Node node) {
this.children.add(node);
- node.addParent(this);
- }
-
- void addChildren(final Node... nodes) {
- for (final Node node : nodes) {
- addChild(node);
- }
}
boolean hasParent() {
@@ -195,4 +194,14 @@ class Node {
String getName() {
return this.name;
}
+
+ @VisibleForTesting
+ List<Node> getChildren() {
+ return this.children;
+ }
+
+ @VisibleForTesting
+ List<Node> getParents() {
+ return this.parents;
+ }
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/NodeBuilder.java b/azkaban-exec-server/src/main/java/azkaban/dag/NodeBuilder.java
new file mode 100644
index 0000000..15960c6
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/NodeBuilder.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class NodeBuilder {
+
+ private final String name;
+ private final NodeProcessor nodeProcessor;
+
+ private final DagBuilder dagBuilder;
+
+ // The nodes that this node depends on
+ private final Set<NodeBuilder> parents = new HashSet<>();
+
+ public NodeBuilder(final String name, final NodeProcessor nodeProcessor,
+ final DagBuilder dagBuilder) {
+ requireNonNull(name, "The name of the NodeBuilder can't be null");
+ this.name = name;
+ requireNonNull(nodeProcessor, "The nodeProcessor of the NodeBuilder can't be null");
+ this.nodeProcessor = nodeProcessor;
+ requireNonNull(dagBuilder, "The dagBuilder of the NodeBuilder can't be null");
+ this.dagBuilder = dagBuilder;
+ }
+
+ /**
+ * Adds the given builder as a parent of this builder.
+ *
+ * <p>If the same builder is added multiple times to this builder, this builder will retain
+ * only one reference to it.</p>
+ */
+ private void addParent(final NodeBuilder builder) {
+ checkBuildersBelongToSameDag(builder);
+
+ // Add the relationship to the data structure internal to the builder instead of changing
+ // the associated node directly. This is done to prevent users of this method to change the
+ // structure of the dag after the DagBuilder::build method is called.
+ this.parents.add(builder);
+ }
+
+ /**
+ * Checks if the given NodeBuilder belongs to the same DagBuilder as the current NodeBuilder.
+ */
+ private void checkBuildersBelongToSameDag(final NodeBuilder builder) {
+ if (builder.dagBuilder != this.dagBuilder) {
+ throw new DagException(String.format("Can't add a dependency from %s to %s since they "
+ + "belong to different DagBuilders.", builder, this));
+ }
+ }
+
+ /**
+ * Add builders as parents of this builder.
+ *
+ * <p>This method handles de-duplication of builders.</p>
+ *
+ * @throws DagException if builders are not created from the same DagBuilder
+ */
+ public void addParents(final NodeBuilder... builders) {
+ for (final NodeBuilder builder : builders) {
+ addParent(builder);
+ }
+ }
+
+ /**
+ * Add builders as children of this builder.
+ *
+ * <p>This method handles de-duplication of builders.</p>
+ *
+ * @throws DagException if builders are not created from the same DagBuilder
+ */
+ public void addChildren(final NodeBuilder... builders) {
+ for (final NodeBuilder builder : builders) {
+ builder.addParent(this);
+ }
+ }
+
+ Set<NodeBuilder> getParents() {
+ return this.parents;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("NodeBuilder (%s) in %s", this.name, this.dagBuilder);
+ }
+
+ String getName() {
+ return this.name;
+ }
+
+ /**
+ * Builds a Node and adds it to the given Dag.
+ *
+ * @param dag Dag to associate this node with
+ * @return a node
+ */
+ public Node build(final Dag dag) {
+ final Node node = new Node(this.name, this.nodeProcessor, dag);
+ dag.addNode(node);
+ return node;
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java
new file mode 100644
index 0000000..132aa83
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+public class DagBuilderTest {
+
+ private final DagBuilder dagBuilder = new DagBuilder("dag builder", mock(DagProcessor.class));
+
+ @Test
+ public void create_nodes_with_same_name_should_throw_an_exception() {
+ final String name = "nb";
+ // given
+ final NodeBuilder nodeBuilder1 = createNodeBuilder(name);
+
+ // when
+ final Throwable thrown = catchThrowable(() -> {
+ createNodeBuilder(name);
+ });
+
+ // then
+ assertThat(thrown).isInstanceOf(DagException.class);
+ }
+
+ @Test
+ public void build_should_return_expected_dag() {
+ // given
+ final NodeBuilder nodeBuilder1 = createNodeBuilder("nb1");
+ final NodeBuilder nodeBuilder2 = createNodeBuilder("nb2");
+ nodeBuilder1.addChildren(nodeBuilder2);
+
+ // when
+ final Dag dag = this.dagBuilder.build();
+
+ // then
+ assertThat(dag.getName()).isEqualTo("dag builder");
+ assertDagNodes(dag);
+ }
+
+ private void assertDagNodes(final Dag dag) {
+ final List<Node> nodes = dag.getNodes();
+ assertThat(nodes.size()).isEqualTo(2);
+ final Node node1 = nodes.get(0);
+ final Node node2 = nodes.get(1);
+
+ assertThat(node1.getName()).isEqualTo("nb1");
+ assertThat(node2.getName()).isEqualTo("nb2");
+
+ assertThat(node1.hasParent()).isFalse();
+ assertThat(node1.getChildren()).isEqualTo(Arrays.asList(node2));
+
+ assertThat(node2.hasParent()).isTrue();
+ assertThat(node2.getChildren()).isEmpty();
+ assertThat(node2.getParents()).isEqualTo(Arrays.asList(node1));
+ }
+
+ private NodeBuilder createNodeBuilder(final String name) {
+ return this.dagBuilder.createNode(name, mock(NodeProcessor.class));
+ }
+
+ @Test
+ public void add_dependency_should_not_affect_dag_already_built() {
+ // given
+ final Dag dag = this.dagBuilder.build();
+
+ // when
+ createNodeBuilder("a");
+
+ // then
+ final List<Node> nodes = dag.getNodes();
+ assertThat(nodes).hasSize(0);
+ }
+
+ @Test
+ public void test_toString() {
+ // given
+
+ // when
+ final String stringRepresentation = this.dagBuilder.toString();
+
+ // then
+ assertThat(stringRepresentation).isEqualTo("DagBuilder (dag builder)");
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
index ddcb068..c757d31 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
@@ -39,13 +39,15 @@ public class DagServiceTest {
private final DagService dagService = new DagService();
private final StatusChangeRecorder statusChangeRecorder = new StatusChangeRecorder();
- private final Set<Node> nodesToFail = new HashSet<>();
+
+ // The names of the nodes that are supposed to fail.
+ private final Set<String> nodesToFail = new HashSet<>();
private final TestNodeProcessor nodeProcessor = new TestNodeProcessor(this.dagService,
this.statusChangeRecorder, this.nodesToFail);
private final CountDownLatch dagFinishedLatch = new CountDownLatch(1);
private final DagProcessor dagProcessor = new TestDagProcessor(this.dagFinishedLatch,
this.statusChangeRecorder);
- private final Dag testDag = createDag();
+ private final DagBuilder dagBuilder = new DagBuilder("fa", this.dagProcessor);
private final List<Pair<String, Status>> expectedSequence = new ArrayList<>();
@@ -59,13 +61,13 @@ public class DagServiceTest {
*/
@Test
public void oneNodeSuccess() throws Exception {
- createNodeAndAddToTestDag("a");
+ createNodeInTestDag("a");
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
addToExpectedSequence("a", Status.SUCCESS);
addToExpectedSequence("fa", Status.SUCCESS);
- runAndVerify();
+ buildDagRunAndVerify();
}
/**
@@ -76,9 +78,9 @@ public class DagServiceTest {
*/
@Test
public void twoNodesSuccess() throws Exception {
- final Node aNode = createNodeAndAddToTestDag("a");
- final Node bNode = createNodeAndAddToTestDag("b");
- aNode.addChild(bNode);
+ final NodeBuilder aBuilder = createNodeInTestDag("a");
+ final NodeBuilder bBuilder = createNodeInTestDag("b");
+ bBuilder.addParents(aBuilder);
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
addToExpectedSequence("a", Status.SUCCESS);
@@ -86,7 +88,7 @@ public class DagServiceTest {
addToExpectedSequence("b", Status.SUCCESS);
addToExpectedSequence("fa", Status.SUCCESS);
- runAndVerify();
+ buildDagRunAndVerify();
}
/**
@@ -99,10 +101,10 @@ public class DagServiceTest {
*/
@Test
public void threeNodesSuccess() throws Exception {
- final Node aNode = createNodeAndAddToTestDag("a");
- final Node bNode = createNodeAndAddToTestDag("b");
- final Node cNode = createNodeAndAddToTestDag("c");
- aNode.addChildren(bNode, cNode);
+ final NodeBuilder aBuilder = createNodeInTestDag("a");
+ final NodeBuilder bBuilder = createNodeInTestDag("b");
+ final NodeBuilder cNode = createNodeInTestDag("c");
+ aBuilder.addChildren(bBuilder, cNode);
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
@@ -113,7 +115,7 @@ public class DagServiceTest {
addToExpectedSequence("c", Status.SUCCESS);
addToExpectedSequence("fa", Status.SUCCESS);
- runAndVerify();
+ buildDagRunAndVerify();
}
@@ -122,14 +124,14 @@ public class DagServiceTest {
*/
@Test
public void oneNodeFailure() throws Exception {
- final Node aNode = createNodeAndAddToTestDag("a");
- this.nodesToFail.add(aNode);
+ final NodeBuilder aBuilder = createNodeInTestDag("a");
+ this.nodesToFail.add(aBuilder.getName());
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
addToExpectedSequence("a", Status.FAILURE);
addToExpectedSequence("fa", Status.FAILURE);
- runAndVerify();
+ buildDagRunAndVerify();
}
/**
@@ -143,10 +145,10 @@ public class DagServiceTest {
*/
@Test
public void twoNodesFailFirst() throws Exception {
- final Node aNode = createNodeAndAddToTestDag("a");
- final Node bNode = createNodeAndAddToTestDag("b");
- aNode.addChild(bNode);
- this.nodesToFail.add(aNode);
+ final NodeBuilder aBuilder = createNodeInTestDag("a");
+ final NodeBuilder bBuilder = createNodeInTestDag("b");
+ bBuilder.addParents(aBuilder);
+ this.nodesToFail.add(aBuilder.getName());
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
@@ -154,7 +156,7 @@ public class DagServiceTest {
addToExpectedSequence("b", Status.CANCELED);
addToExpectedSequence("fa", Status.FAILURE);
- runAndVerify();
+ buildDagRunAndVerify();
}
/**
@@ -170,12 +172,12 @@ public class DagServiceTest {
*/
@Test
public void threeNodesFailSecond() throws Exception {
- final Node aNode = createNodeAndAddToTestDag("a");
- final Node bNode = createNodeAndAddToTestDag("b");
- final Node cNode = createNodeAndAddToTestDag("c");
- aNode.addChildren(bNode, cNode);
+ final NodeBuilder aBuilder = createNodeInTestDag("a");
+ final NodeBuilder bBuilder = createNodeInTestDag("b");
+ final NodeBuilder cBuilder = createNodeInTestDag("c");
+ aBuilder.addChildren(bBuilder, cBuilder);
- this.nodesToFail.add(bNode);
+ this.nodesToFail.add(bBuilder.getName());
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
@@ -186,7 +188,7 @@ public class DagServiceTest {
addToExpectedSequence("c", Status.SUCCESS);
addToExpectedSequence("fa", Status.FAILURE);
- runAndVerify();
+ buildDagRunAndVerify();
}
@@ -203,21 +205,24 @@ public class DagServiceTest {
* </pre>
*/
@Test
- public void simple_subdag_success_case() throws Exception {
+ public void simple_sub_dag_success_case() throws Exception {
final TestSubDagProcessor testSubDagProcessor = new TestSubDagProcessor
(this.dagService, this.statusChangeRecorder);
- final Dag bDag = new Dag("fb", testSubDagProcessor);
- createNodeAndAddToDag("a", bDag);
- createNodeAndAddToDag("b", bDag);
+ final DagBuilder subDagBuilder = new DagBuilder("fb", testSubDagProcessor);
+ createNodeInDag("a", subDagBuilder);
+ createNodeInDag("b", subDagBuilder);
+ final Dag bDag = subDagBuilder.build();
final TestSubDagNodeProcessor testSubDagNodeProcessor = new TestSubDagNodeProcessor
(this.dagService, this.statusChangeRecorder, bDag);
- final Node subDagNode = new Node("sfb", testSubDagNodeProcessor);
- testSubDagProcessor.setNode(subDagNode);
- this.testDag.addNode(subDagNode);
+ final NodeBuilder subDagNodeBuilder = this.dagBuilder
+ .createNode("sfb", testSubDagNodeProcessor);
+
+ final NodeBuilder cBuilder = createNodeInTestDag("c");
+ cBuilder.addParents(subDagNodeBuilder);
+ final Dag dag = this.dagBuilder.build();
- final Node cNode = createNodeAndAddToTestDag("c");
- subDagNode.addChild(cNode);
+ testSubDagProcessor.setNode(dag.getNodeByName(subDagNodeBuilder.getName()));
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("sfb", Status.RUNNING);
@@ -232,8 +237,12 @@ public class DagServiceTest {
addToExpectedSequence("c", Status.SUCCESS);
addToExpectedSequence("fa", Status.SUCCESS);
- runAndVerify();
+ runAndVerify(dag);
+
+ }
+ private void createNodeInDag(final String name, final DagBuilder subDagBuilder) {
+ subDagBuilder.createNode(name, this.nodeProcessor);
}
/**
@@ -244,8 +253,8 @@ public class DagServiceTest {
final CountDownLatch nodeRunningLatch = new CountDownLatch(1);
final TestKillNodeProcessor killNodeProcessor = new TestKillNodeProcessor(this.dagService,
this.statusChangeRecorder, nodeRunningLatch);
- final Node aNode = new Node("a", killNodeProcessor);
- this.testDag.addNode(aNode);
+ this.dagBuilder.createNode("a", killNodeProcessor);
+ final Dag dag = this.dagBuilder.build();
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
@@ -254,25 +263,24 @@ public class DagServiceTest {
addToExpectedSequence("a", Status.KILLED);
addToExpectedSequence("fa", Status.KILLED);
- this.dagService.startDag(this.testDag);
+ this.dagService.startDag(dag);
// Make sure the node is running before killing the DAG.
- nodeRunningLatch.await(120, TimeUnit.SECONDS);
- this.dagService.killDag(this.testDag);
+ nodeRunningLatch.await(1, TimeUnit.SECONDS);
+ this.dagService.killDag(dag);
- final boolean isWaitSuccessful = this.dagFinishedLatch.await(120, TimeUnit.SECONDS);
+ final boolean isWaitSuccessful = this.dagFinishedLatch.await(1, TimeUnit.SECONDS);
// Make sure the dag finishes.
assertThat(isWaitSuccessful).isTrue();
verifyStatusSequence();
}
-
private void addToExpectedSequence(final String name, final Status status) {
this.expectedSequence.add(new Pair<>(name, status));
}
- private void runDag() throws InterruptedException {
- this.dagService.startDag(this.testDag);
+ private void runDag(final Dag dag) throws InterruptedException {
+ this.dagService.startDag(dag);
final boolean isWaitSuccessful = this.dagFinishedLatch.await(2, TimeUnit.SECONDS);
// Make sure the dag finishes.
@@ -283,33 +291,21 @@ public class DagServiceTest {
this.statusChangeRecorder.verifySequence(this.expectedSequence);
}
- private void runAndVerify() throws InterruptedException {
- runDag();
+ private void buildDagRunAndVerify() throws InterruptedException {
+ final Dag dag = this.dagBuilder.build();
+ runAndVerify(dag);
+ }
+
+ private void runAndVerify(final Dag dag) throws InterruptedException {
+ runDag(dag);
verifyStatusSequence();
}
/**
* Creates a node and add to the test dag.
- *
- * @param name node name
- * @return Node object
*/
- private Node createNode(final String name) {
- return new Node(name, this.nodeProcessor);
- }
-
- private Node createNodeAndAddToDag(final String name, final Dag flow) {
- final Node node = createNode(name);
- flow.addNode(node);
- return node;
- }
-
- private Node createNodeAndAddToTestDag(final String name) {
- return createNodeAndAddToDag(name, this.testDag);
- }
-
- private Dag createDag() {
- return new Dag("fa", this.dagProcessor);
+ private NodeBuilder createNodeInTestDag(final String name) {
+ return this.dagBuilder.createNode(name, this.nodeProcessor);
}
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
index 522beb2..6bd7f13 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
@@ -57,7 +57,7 @@ public class DagTest {
final Node aNode = createAndAddNode("a");
aNode.setStatus(Status.RUNNING);
final Node bNode = createAndAddNode("b");
- aNode.addChild(bNode);
+ bNode.addParent(aNode);
this.testFlow.setStatus(Status.RUNNING);
this.testFlow.kill();
assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
@@ -80,11 +80,11 @@ public class DagTest {
final Node aNode = createAndAddNode("a");
aNode.setStatus(Status.RUNNING);
final Node bNode = createAndAddNode("b");
- aNode.addChild(bNode);
+ bNode.addParent(aNode);
final Node cNode = createAndAddNode("c");
- aNode.addChild(cNode);
+ cNode.addParent(aNode);
final Node dNode = createAndAddNode("d");
- cNode.addChild(dNode);
+ dNode.addParent(cNode);
this.testFlow.setStatus(Status.RUNNING);
this.testFlow.kill();
@@ -103,7 +103,7 @@ public class DagTest {
final Node aNode = createAndAddNode("a");
aNode.setStatus(Status.RUNNING);
final Node bNode = createAndAddNode("b");
- aNode.addChild(bNode);
+ bNode.addParent(aNode);
bNode.setStatus(Status.BLOCKED);
this.testFlow.setStatus(Status.RUNNING);
this.testFlow.kill();
@@ -125,7 +125,7 @@ public class DagTest {
aNode.setStatus(Status.SUCCESS);
final Node bNode = createAndAddNode("b");
bNode.setStatus(Status.RUNNING);
- aNode.addChild(bNode);
+ bNode.addParent(aNode);
this.testFlow.kill();
assertThat(aNode.getStatus()).isEqualTo(Status.SUCCESS);
assertThat(bNode.getStatus()).isEqualTo(Status.KILLING);
@@ -157,7 +157,7 @@ public class DagTest {
* @return Node object
*/
private Node createAndAddNode(final String name) {
- final Node node = TestUtil.createNodeWithNullProcessor(name);
+ final Node node = TestUtil.createNodeWithNullProcessor(name, this.testFlow);
this.testFlow.addNode(node);
return node;
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/NodeBuilderTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/NodeBuilderTest.java
new file mode 100644
index 0000000..1c15a02
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/NodeBuilderTest.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Test;
+
+public class NodeBuilderTest {
+
+ private final DagBuilder dagBuilder = mock(DagBuilder.class);
+ private final NodeBuilder builder = createBuilder("builder");
+
+ @Test
+ public void addChildren() {
+ // given
+ final NodeBuilder builder2 = createBuilder("builder2");
+ final NodeBuilder builder3 = createBuilder("builder3");
+
+ // when
+ this.builder.addChildren(builder2, builder3);
+ assertParentMatch(builder2);
+ assertParentMatch(builder3);
+ }
+
+ /**
+ * Asserts that the parent of the given node is the test builder node.
+ */
+ private void assertParentMatch(final NodeBuilder builder) {
+ final Set<NodeBuilder> parents = builder.getParents();
+ assertThat(parents).isEqualTo(new HashSet<>(Arrays.asList(this
+ .builder)));
+ }
+
+ @Test
+ public void addParents() {
+ // given
+ final NodeBuilder builder2 = createBuilder("builder2");
+ final NodeBuilder builder3 = createBuilder("builder3");
+
+ // when
+ this.builder.addParents(builder2, builder3);
+ final Set<NodeBuilder> parents = this.builder.getParents();
+
+ // then
+ assertThat(parents).isEqualTo(new HashSet<>(Arrays.asList(builder2, builder3)));
+ }
+
+ private NodeBuilder createBuilder(final String name) {
+ return new NodeBuilder(name, mock(NodeProcessor.class), this.dagBuilder);
+ }
+
+ @Test
+ public void depend_on_node_in_a_different_dag_should_throw_exception() {
+ // given
+ final NodeBuilder builderInAnotherDag = new NodeBuilder("builder from another dag", mock
+ (NodeProcessor.class), mock
+ (DagBuilder.class));
+
+ // when
+ final Throwable thrown = catchThrowable(() -> {
+ this.builder.addChildren(builderInAnotherDag);
+ });
+
+ // then
+ assertThat(thrown).isInstanceOf(DagException.class);
+ }
+
+ @Test
+ public void toStringTest() {
+ // given
+ final DagBuilder dagBuilder = new DagBuilder("dag", mock(DagProcessor.class));
+ final NodeBuilder nodeBuilder = new NodeBuilder("node", mock(NodeProcessor.class),
+ dagBuilder);
+
+ // when
+ final String stringRepresentation = nodeBuilder.toString();
+
+ // then
+ assertThat(stringRepresentation).isEqualTo("NodeBuilder (node) in DagBuilder (dag)");
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java
index d871624..e0ba3e8 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java
@@ -18,6 +18,7 @@ package azkaban.dag;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
import org.junit.Test;
@@ -25,18 +26,23 @@ public class NodeTest {
@Test
public void hasParent() {
- final Node node = TestUtil.createNodeWithNullProcessor("a");
- final Node parentNode = TestUtil.createNodeWithNullProcessor("parent");
- parentNode.addChild(node);
+ final Node node = createTestNode("a");
+ final Node parentNode = createTestNode("parent");
+ node.addParent(parentNode);
final boolean hasParent = node.hasParent();
assertThat(hasParent).isTrue();
}
@Test
public void hasParentNegative() {
- final Node node = TestUtil.createNodeWithNullProcessor("a");
+ final Node node = createTestNode("a");
final boolean hasParent = node.hasParent();
assertThat(hasParent).isFalse();
}
+ private Node createTestNode(final String name) {
+ return TestUtil.createNodeWithNullProcessor(name, mock(Dag.class));
+ }
+
+
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java
index f0b1751..cc50834 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java
@@ -22,10 +22,10 @@ public class TestNodeProcessor implements NodeProcessor {
private final DagService dagService;
private final StatusChangeRecorder statusChangeRecorder;
- private final Set<Node> nodesToFail;
+ private final Set<String> nodesToFail;
TestNodeProcessor(final DagService dagService,
- final StatusChangeRecorder statusChangeRecorder, final Set<Node> nodesToFail) {
+ final StatusChangeRecorder statusChangeRecorder, final Set<String> nodesToFail) {
this.dagService = dagService;
this.statusChangeRecorder = statusChangeRecorder;
this.nodesToFail = nodesToFail;
@@ -38,7 +38,7 @@ public class TestNodeProcessor implements NodeProcessor {
switch (status) {
case RUNNING:
- if (this.nodesToFail.contains(node)) {
+ if (this.nodesToFail.contains(node.getName())) {
this.dagService.markNodeFailed(node);
} else {
this.dagService.markNodeSuccess(node);
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java
index 29642c6..0f7dbcd 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java
@@ -25,7 +25,7 @@ class TestUtil {
*
* @param name node name
*/
- static Node createNodeWithNullProcessor(final String name) {
- return new Node(name, mock(NodeProcessor.class));
+ static Node createNodeWithNullProcessor(final String name, final Dag dag) {
+ return new Node(name, mock(NodeProcessor.class), dag);
}
}