Details
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
index de4d7aa..b73ab6a 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
@@ -28,7 +28,7 @@ import java.util.List;
* <p>Most of the methods in this class should remain package private. Code outside of this
* package should mainly interact with the {@link DagService}.
*/
-class Dag {
+public class Dag {
private final String name;
private final DagProcessor dagProcessor;
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java
index ccef79c..8f2f643 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagBuilder.java
@@ -35,40 +35,84 @@ import java.util.Set;
*/
public class DagBuilder {
- private final String name;
- private final DagProcessor dagProcessor;
+ private final Dag dag;
+ private final Map<String, Node> nameToNodeMap = new HashMap<>();
- private final List<NodeBuilder> builders = new ArrayList<>();
- private final Set<String> nodeNamesSet = new HashSet<>();
+ // The builder can only be used to build a DAG once to prevent modifying an existing DAG after it
+ // is built.
+ private boolean isBuilt = false;
+ /**
+ * A builder for building a DAG.
+ *
+ * @param name name of the DAG
+ * @param dagProcessor the associated DagProcessor
+ */
public DagBuilder(final String name, final DagProcessor dagProcessor) {
requireNonNull(name, "The name of the DagBuilder can't be null");
- this.name = name;
- requireNonNull(name, "The dagProcessor of the DagBuilder can't be null");
- this.dagProcessor = dagProcessor;
+ requireNonNull(dagProcessor, "The dagProcessor of the DagBuilder can't be null");
+ this.dag = new Dag(name, dagProcessor);
}
/**
- * Creates a new node builder and add it to the DagBuilder.
+ * Creates a new node and adds it to the DagBuilder.
*
* @param name name of the node
* @param nodeProcessor node processor associated with this node
- * @return a new NodeBuilder associated with this node
+ * @return a new node
* @throws DagException if the name is not unique in the DAG.
*/
- public NodeBuilder createNode(final String name, final NodeProcessor nodeProcessor) {
- if (this.nodeNamesSet.contains(name)) {
+ public Node createNode(final String name, final NodeProcessor nodeProcessor) {
+ checkIsBuilt();
+
+ if (this.nameToNodeMap.get(name) != null) {
throw new DagException(String.format("Node names in %s need to be unique. The name "
+ "(%s) already exists.", this, name));
}
- final NodeBuilder builder = new NodeBuilder(name, nodeProcessor, this);
- this.builders.add(builder);
- this.nodeNamesSet.add(name);
+ final Node node = new Node(name, nodeProcessor, this.dag);
+ this.nameToNodeMap.put(name, node);
- return builder;
+ return node;
+ }
+
+ /**
+ * Throws an exception if the {@link DagBuilder#build()} method has been called.
+ */
+ private void checkIsBuilt() {
+ if (this.isBuilt) {
+ final String msg = String
+ .format("The DAG (%s) is built already. Can't create new nodes.", this);
+ throw new DagException(msg);
+ }
}
/**
+ * Add a parent node to a child node. All the names should have been registered with this builder
+ * with the {@link DagBuilder#createNode(String, NodeProcessor)} call.
+ *
+ * @param childNodeName name of the child node
+ * @param parentNodeName name of the parent node
+ */
+ public void addParentNode(final String childNodeName, final String parentNodeName) {
+ checkIsBuilt();
+
+ final Node child = this.nameToNodeMap.get(childNodeName);
+ if (child == null) {
+ throw new DagException(String.format("Unknown child node (%s). Did you create the node?",
+ childNodeName));
+ }
+
+ final Node parent = this.nameToNodeMap.get(parentNodeName);
+ if (parent == null) {
+ throw new DagException(
+ String.format("Unknown parent node (%s). Did you create the node?", parentNodeName));
+ }
+
+ child.addParent(parent);
+ }
+
+
+ /**
* Builds the dag.
*
* <p>Once this method is called, subsequent calls via NodeBuilder to modify the nodes's
@@ -78,11 +122,10 @@ public class DagBuilder {
* @return the Dag reflecting the current state of the DagBuilder
*/
public Dag build() {
+ checkIsBuilt();
checkCircularDependencies();
- final Dag dag = new Dag(this.name, this.dagProcessor);
- final Map<NodeBuilder, Node> builderNodeMap = createBuilderToNodeMap(dag);
- updateNodesRelationships(builderNodeMap);
- return dag;
+ this.isBuilt = true;
+ return this.dag;
}
/**
@@ -98,16 +141,16 @@ public class DagBuilder {
class CircularDependencyChecker {
// The nodes that need to be visited
- private final Set<NodeBuilder> toVisit = new HashSet<>(DagBuilder.this.builders);
+ private final Set<Node> toVisit = new HashSet<>(DagBuilder.this.nameToNodeMap.values());
// The nodes that have finished traversing all their parent nodes
- private final Set<NodeBuilder> finished = new HashSet<>();
+ private final Set<Node> finished = new HashSet<>();
// The nodes that are waiting for their parent nodes to finish visit.
- private final Set<NodeBuilder> ongoing = new HashSet<>();
+ private final Set<Node> ongoing = new HashSet<>();
// One sample of nodes that form a circular dependency
- private final List<NodeBuilder> sampleCircularNodes = new ArrayList<>();
+ private final List<Node> sampleCircularNodes = new ArrayList<>();
/**
* Checks if the builder contains nodes that form a circular dependency ring.
@@ -116,7 +159,7 @@ public class DagBuilder {
*/
private void check() {
while (!this.toVisit.isEmpty()) {
- final NodeBuilder node = removeOneNodeFromToVisitSet();
+ final Node node = removeOneNodeFromToVisitSet();
if (checkNode(node)) {
final String msg = String.format("Circular dependency detected. Sample: %s",
this.sampleCircularNodes);
@@ -130,9 +173,9 @@ public class DagBuilder {
*
* @return a node
*/
- private NodeBuilder removeOneNodeFromToVisitSet() {
- final Iterator<NodeBuilder> iterator = this.toVisit.iterator();
- final NodeBuilder node = iterator.next();
+ private Node removeOneNodeFromToVisitSet() {
+ final Iterator<Node> iterator = this.toVisit.iterator();
+ final Node node = iterator.next();
iterator.remove();
return node;
}
@@ -145,7 +188,7 @@ public class DagBuilder {
* @param node node to check
* @return true if it is
*/
- private boolean checkNode(final NodeBuilder node) {
+ private boolean checkNode(final Node node) {
if (this.finished.contains(node)) {
return false;
}
@@ -155,7 +198,7 @@ public class DagBuilder {
}
this.toVisit.remove(node);
this.ongoing.add(node);
- for (final NodeBuilder parent : node.getParents()) {
+ for (final Node parent : node.getParents()) {
if (checkNode(parent)) {
this.sampleCircularNodes.add(node);
return true;
@@ -171,47 +214,8 @@ public class DagBuilder {
checker.check();
}
- /**
- * Creates nodes using information stored in the current list of builders.
- *
- * <p>New nodes are created here to ensure they don't change even if their corresponding
- * NodeBuilders are modified after the {@link DagBuilder#build()} is called.
- *
- * @param dag the dag to associate the nodes with
- * @return the map from NodeBuilder to Node
- */
- private Map<NodeBuilder, Node> createBuilderToNodeMap(final Dag dag) {
- final Map<NodeBuilder, Node> builderNodeMap = new HashMap<>();
- for (final NodeBuilder builder : this.builders) {
- final Node node = builder.build(dag);
- builderNodeMap.put(builder, node);
- }
- return builderNodeMap;
- }
-
- private void updateNodesRelationships(final Map<NodeBuilder, Node> builderNodeMap) {
- for (final NodeBuilder builder : this.builders) {
- addParentNodes(builder, builderNodeMap);
- }
- }
-
- /**
- * Adds parent nodes to the node associated with the builder.
- */
- private void addParentNodes(final NodeBuilder builder,
- final Map<NodeBuilder, Node> builderToNodeMap) {
- final Node node = builderToNodeMap.get(builder);
- for (final NodeBuilder parentBuilder : builder.getParents()) {
- final Node parentNode = builderToNodeMap.get(parentBuilder);
-
- // The NodeBuilders should have checked if the NodeBuilders belong to the same DagBuilder.
- assert (parentNode != null);
- node.addParent(parentNode);
- }
- }
-
@Override
public String toString() {
- return String.format("DagBuilder (%s)", this.name);
+ return String.format("DagBuilder (%s)", this.dag.getName());
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
index 0088c36..5514138 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings("FutureReturnValueIgnored")
@Singleton
-class DagService {
+public class DagService {
private static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(10);
private static final Logger logger = LoggerFactory.getLogger(DagService.class);
@@ -45,7 +45,7 @@ class DagService {
private final ExecutorService executorService;
@Inject
- DagService(final ExecutorServiceUtils executorServiceUtils) {
+ public DagService(final ExecutorServiceUtils executorServiceUtils) {
// Give the thread a name to make debugging easier.
this.executorServiceUtils = executorServiceUtils;
final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
@@ -53,42 +53,42 @@ class DagService {
this.executorService = Executors.newSingleThreadExecutor(namedThreadFactory);
}
- void startDag(final Dag dag) {
+ public void startDag(final Dag dag) {
this.executorService.submit(dag::start);
}
/**
* Transitions the node to the success state.
*/
- void markNodeSuccess(final Node node) {
+ public void markNodeSuccess(final Node node) {
this.executorService.submit(node::markSuccess);
}
/**
* Transitions the node from the killing state to the killed state.
*/
- void markNodeKilled(final Node node) {
+ public void markNodeKilled(final Node node) {
this.executorService.submit(node::markKilled);
}
/**
* Transitions the node to the failure state.
*/
- void markNodeFailed(final Node node) {
+ public void markNodeFailed(final Node node) {
this.executorService.submit(node::markFailed);
}
/**
* Kills a DAG.
*/
- void killDag(final Dag dag) {
+ public void killDag(final Dag dag) {
this.executorService.submit(dag::kill);
}
/**
* Shuts down the service and waits for the tasks to finish.
*/
- void shutdownAndAwaitTermination() throws InterruptedException {
+ public void shutdownAndAwaitTermination() throws InterruptedException {
logger.info("DagService is shutting down.");
this.executorServiceUtils.gracefulShutdown(this.executorService, SHUTDOWN_WAIT_TIMEOUT);
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Node.java b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
index 41c6579..d283c61 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
@@ -25,7 +25,7 @@ import java.util.List;
/**
* Node in a DAG: Directed acyclic graph.
*/
-class Node {
+public class Node {
private final String name;
@@ -48,6 +48,7 @@ class Node {
this.name = name;
requireNonNull(dag, "The dag of the node can't be null");
this.dag = dag;
+ dag.addNode(this);
}
Dag getDag() {
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Status.java b/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
index f3f9778..ae9b643 100644
--- a/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
@@ -18,7 +18,7 @@ package azkaban.dag;
import com.google.common.collect.ImmutableSet;
-enum Status {
+public enum Status {
READY, // ready to run
DISABLED, // disabled by users. Treat as the node has the status of success
BLOCKED, // temporarily blocked. Need to be unblocked by another external event
@@ -35,7 +35,7 @@ enum Status {
static final ImmutableSet<Status> TERMINAL_STATES = ImmutableSet.of(DISABLED, SUCCESS, FAILURE,
CANCELED, KILLED);
- boolean isTerminal() {
+ public boolean isTerminal() {
return TERMINAL_STATES.contains(this);
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java
index 2bc1b29..85c935e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagBuilderTest.java
@@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.Mockito.mock;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.junit.Test;
@@ -32,23 +32,23 @@ public class DagBuilderTest {
public void create_nodes_with_same_name_should_throw_an_exception() {
final String name = "nb";
// given
- final NodeBuilder nodeBuilder1 = createNodeBuilder(name);
+ createNode(name);
// when
- final Throwable thrown = catchThrowable(() -> {
- createNodeBuilder(name);
- });
+ final Throwable thrown = catchThrowable(() -> createNode(name));
// then
- assertThat(thrown).isInstanceOf(DagException.class);
+ assertThrownIsDagBuilderException(thrown);
}
@Test
public void build_should_return_expected_dag() {
// given
- final NodeBuilder nodeBuilder1 = createNodeBuilder("nb1");
- final NodeBuilder nodeBuilder2 = createNodeBuilder("nb2");
- nodeBuilder1.addChildren(nodeBuilder2);
+ final String parentNodeName = "nb1";
+ final String childNodeName = "nb2";
+ createNode(parentNodeName);
+ createNode(childNodeName);
+ addParentNode(childNodeName, parentNodeName);
// when
final Dag dag = this.dagBuilder.build();
@@ -58,6 +58,13 @@ public class DagBuilderTest {
assertDagNodes(dag);
}
+ /**
+ * Adds parent as the child's parent node.
+ */
+ private void addParentNode(final String childName, final String parentName) {
+ this.dagBuilder.addParentNode(childName, parentName);
+ }
+
private void assertDagNodes(final Dag dag) {
final List<Node> nodes = dag.getNodes();
assertThat(nodes.size()).isEqualTo(2);
@@ -68,49 +75,78 @@ public class DagBuilderTest {
assertThat(node2.getName()).isEqualTo("nb2");
assertThat(node1.hasParent()).isFalse();
- assertThat(node1.getChildren()).isEqualTo(Arrays.asList(node2));
+ assertThat(node1.getChildren()).isEqualTo(Collections.singletonList(node2));
assertThat(node2.hasParent()).isTrue();
assertThat(node2.getChildren()).isEmpty();
- assertThat(node2.getParents()).isEqualTo(Arrays.asList(node1));
+ assertThat(node2.getParents()).isEqualTo(Collections.singletonList(node1));
}
- private NodeBuilder createNodeBuilder(final String name) {
+ private Node createNode(final String name) {
return this.dagBuilder.createNode(name, mock(NodeProcessor.class));
}
@Test
public void build_should_throw_exception_when_circular_dependency_is_detected() {
// given
- final NodeBuilder nodeBuilder1 = createNodeBuilder("nb1");
- final NodeBuilder nodeBuilder2 = createNodeBuilder("nb2");
- final NodeBuilder nodeBuilder3 = createNodeBuilder("nb3");
- nodeBuilder2.addParents(nodeBuilder1);
- nodeBuilder3.addParents(nodeBuilder2);
- nodeBuilder1.addParents(nodeBuilder3);
+ final String n1Name = "nb1";
+ final String n2Name = "nb2";
+ final String n3Name = "nb3";
+ createNode(n1Name);
+ createNode(n2Name);
+ createNode(n3Name);
+
+ addParentNode(n2Name, n1Name);
+ addParentNode(n3Name, n2Name);
+ addParentNode(n1Name, n3Name);
// when
- final Throwable thrown = catchThrowable(() -> {
- this.dagBuilder.build();
- });
+ final Throwable thrown = catchThrowable(this.dagBuilder::build);
// then
// Expect the exception message to show the loop: nb1 -> nb2 -> nb3 -> nb1.
System.out.println("Expect exception: " + thrown);
- assertThat(thrown).isInstanceOf(DagException.class);
+ assertThrownIsDagBuilderException(thrown);
}
@Test
- public void add_dependency_should_not_affect_dag_already_built() {
+ public void can_not_call_createNode_after_dag_already_built() {
// given
- final Dag dag = this.dagBuilder.build();
+ this.dagBuilder.build();
// when
- createNodeBuilder("a");
+ final Throwable thrown = catchThrowable(() -> createNode("a"));
// then
- final List<Node> nodes = dag.getNodes();
- assertThat(nodes).hasSize(0);
+ assertThrownIsDagBuilderException(thrown);
+ }
+
+ @Test
+ public void can_not_call_addParentNode_after_dag_already_built() {
+ // given
+ this.dagBuilder.build();
+
+ // when
+ final Throwable thrown = catchThrowable(() -> addParentNode("n1", "n2"));
+
+ // then
+ assertThrownIsDagBuilderException(thrown);
+ }
+
+ @Test
+ public void can_not_call_build_after_dag_already_built() {
+ // given
+ this.dagBuilder.build();
+
+ // when
+ final Throwable thrown = catchThrowable(this.dagBuilder::build);
+
+ // then
+ assertThrownIsDagBuilderException(thrown);
+ }
+
+ private void assertThrownIsDagBuilderException(final Throwable thrown) {
+ assertThat(thrown).isInstanceOf(DagException.class);
}
@Test
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
index 33cbcb7..4807c34 100644
--- a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
@@ -97,9 +97,9 @@ public class DagServiceTest {
*/
@Test
public void twoNodesSuccess() throws Exception {
- final NodeBuilder aBuilder = createNodeInTestDag("a");
- final NodeBuilder bBuilder = createNodeInTestDag("b");
- bBuilder.addParents(aBuilder);
+ createNodeInTestDag("a");
+ createNodeInTestDag("b");
+ this.dagBuilder.addParentNode("b", "a");
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
addToExpectedSequence("a", Status.SUCCESS);
@@ -120,10 +120,11 @@ public class DagServiceTest {
*/
@Test
public void threeNodesSuccess() throws Exception {
- final NodeBuilder aBuilder = createNodeInTestDag("a");
- final NodeBuilder bBuilder = createNodeInTestDag("b");
- final NodeBuilder cNode = createNodeInTestDag("c");
- aBuilder.addChildren(bBuilder, cNode);
+ createNodeInTestDag("a");
+ createNodeInTestDag("b");
+ createNodeInTestDag("c");
+ this.dagBuilder.addParentNode("b", "a");
+ this.dagBuilder.addParentNode("c", "a");
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
@@ -143,8 +144,8 @@ public class DagServiceTest {
*/
@Test
public void oneNodeFailure() throws Exception {
- final NodeBuilder aBuilder = createNodeInTestDag("a");
- this.nodesToFail.add(aBuilder.getName());
+ createNodeInTestDag("a");
+ this.nodesToFail.add("a");
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
addToExpectedSequence("a", Status.FAILURE);
@@ -164,10 +165,10 @@ public class DagServiceTest {
*/
@Test
public void twoNodesFailFirst() throws Exception {
- final NodeBuilder aBuilder = createNodeInTestDag("a");
- final NodeBuilder bBuilder = createNodeInTestDag("b");
- bBuilder.addParents(aBuilder);
- this.nodesToFail.add(aBuilder.getName());
+ createNodeInTestDag("a");
+ createNodeInTestDag("b");
+ this.dagBuilder.addParentNode("b", "a");
+ this.nodesToFail.add("a");
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
@@ -191,12 +192,12 @@ public class DagServiceTest {
*/
@Test
public void threeNodesFailSecond() throws Exception {
- final NodeBuilder aBuilder = createNodeInTestDag("a");
- final NodeBuilder bBuilder = createNodeInTestDag("b");
- final NodeBuilder cBuilder = createNodeInTestDag("c");
- aBuilder.addChildren(bBuilder, cBuilder);
-
- this.nodesToFail.add(bBuilder.getName());
+ createNodeInTestDag("a");
+ createNodeInTestDag("b");
+ createNodeInTestDag("c");
+ this.dagBuilder.addParentNode("b", "a");
+ this.dagBuilder.addParentNode("c", "a");
+ this.nodesToFail.add("b");
addToExpectedSequence("fa", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
@@ -228,28 +229,29 @@ public class DagServiceTest {
final TestSubDagProcessor testSubDagProcessor = new TestSubDagProcessor
(this.dagService, this.statusChangeRecorder);
final DagBuilder subDagBuilder = new DagBuilder("fb", testSubDagProcessor);
- createNodeInDag("a", subDagBuilder);
- createNodeInDag("b", subDagBuilder);
+ subDagBuilder.createNode("a", this.nodeProcessor);
+ subDagBuilder.createNode("b", this.nodeProcessor);
final Dag bDag = subDagBuilder.build();
final TestSubDagNodeProcessor testSubDagNodeProcessor = new TestSubDagNodeProcessor
(this.dagService, this.statusChangeRecorder, bDag, testSubDagProcessor);
- final NodeBuilder subDagNodeBuilder = this.dagBuilder
- .createNode("sfb", testSubDagNodeProcessor);
+ final String SUB_DAG_NAME = "sfb";
+ this.dagBuilder.createNode(SUB_DAG_NAME, testSubDagNodeProcessor);
+
+ createNodeInTestDag("c");
- final NodeBuilder cBuilder = createNodeInTestDag("c");
- cBuilder.addParents(subDagNodeBuilder);
+ this.dagBuilder.addParentNode("c", SUB_DAG_NAME);
final Dag dag = this.dagBuilder.build();
addToExpectedSequence("fa", Status.RUNNING);
- addToExpectedSequence("sfb", Status.RUNNING);
+ addToExpectedSequence(SUB_DAG_NAME, Status.RUNNING);
addToExpectedSequence("fb", Status.RUNNING);
addToExpectedSequence("a", Status.RUNNING);
addToExpectedSequence("b", Status.RUNNING);
addToExpectedSequence("a", Status.SUCCESS);
addToExpectedSequence("b", Status.SUCCESS);
addToExpectedSequence("fb", Status.SUCCESS);
- addToExpectedSequence("sfb", Status.SUCCESS);
+ addToExpectedSequence(SUB_DAG_NAME, Status.SUCCESS);
addToExpectedSequence("c", Status.RUNNING);
addToExpectedSequence("c", Status.SUCCESS);
addToExpectedSequence("fa", Status.SUCCESS);
@@ -258,10 +260,6 @@ public class DagServiceTest {
}
- private void createNodeInDag(final String name, final DagBuilder subDagBuilder) {
- subDagBuilder.createNode(name, this.nodeProcessor);
- }
-
/**
* Tests killing a dag.
*/
@@ -321,8 +319,8 @@ public class DagServiceTest {
/**
* Creates a node and add to the test dag.
*/
- private NodeBuilder createNodeInTestDag(final String name) {
- return this.dagBuilder.createNode(name, this.nodeProcessor);
+ private void createNodeInTestDag(final String name) {
+ this.dagBuilder.createNode(name, this.nodeProcessor);
}
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunner2Test.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunner2Test.java
new file mode 100644
index 0000000..6e72356
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunner2Test.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.dag.Dag;
+import azkaban.dag.DagBuilder;
+import azkaban.dag.DagProcessor;
+import azkaban.dag.DagService;
+import azkaban.dag.Node;
+import azkaban.dag.NodeProcessor;
+import azkaban.dag.Status;
+import azkaban.project.NodeBean;
+import azkaban.project.NodeBeanLoader;
+import azkaban.utils.ExecutorServiceUtils;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * Tests for running flows.
+ */
+public class FlowRunner2Test {
+
+ private final DagService dagService = new DagService(new ExecutorServiceUtils());
+ private final CountDownLatch flowFinishedLatch = new CountDownLatch(1);
+
+ // The recorded event sequence.
+ private final List<String> eventSequence = new ArrayList<>();
+
+ @Test
+
+ public void runSimpleV2Flow() throws Exception {
+ final NodeBean flowNode = loadFlowNode();
+ final Dag dag = createDag(flowNode);
+ this.dagService.startDag(dag);
+ this.flowFinishedLatch.await(2, TimeUnit.SECONDS);
+ assertThat(this.eventSequence).isEqualTo(Arrays.asList("n1", "n2"));
+ this.dagService.shutdownAndAwaitTermination();
+ }
+
+ private NodeBean loadFlowNode() throws Exception {
+ final File flowFile = loadFlowFileFromResource();
+ final NodeBeanLoader beanLoader = new NodeBeanLoader();
+ return beanLoader.load(flowFile);
+ }
+
+ private Dag createDag(final NodeBean flowNode) {
+ final DagCreator creator = new DagCreator(flowNode);
+ return creator.create();
+
+ }
+
+ private class DagCreator {
+
+ private final NodeBean flowNode;
+ private final DagBuilder dagBuilder;
+
+ DagCreator(final NodeBean flowNode) {
+ final String flowName = flowNode.getName();
+ this.flowNode = flowNode;
+ this.dagBuilder = new DagBuilder(flowName, new SimpleDagProcessor());
+ }
+
+ Dag create() {
+ createNodes();
+ linkNodes();
+ return this.dagBuilder.build();
+ }
+
+ private void createNodes() {
+ for (final NodeBean node : this.flowNode.getNodes()) {
+ createNode(node);
+ }
+ }
+
+ private void createNode(final NodeBean node) {
+ final String nodeName = node.getName();
+ final SimpleNodeProcessor nodeProcessor = new SimpleNodeProcessor(nodeName, node.getConfig());
+ this.dagBuilder.createNode(nodeName, nodeProcessor);
+ }
+
+ private void linkNodes() {
+ for (final NodeBean node : this.flowNode.getNodes()) {
+ linkNode(node);
+ }
+ }
+
+ private void linkNode(final NodeBean node) {
+ final String name = node.getName();
+ final List<String> parents = node.getDependsOn();
+ if (parents == null) {
+ return;
+ }
+ for (final String parentNodeName : parents) {
+ this.dagBuilder.addParentNode(name, parentNodeName);
+ }
+ }
+ }
+
+ private File loadFlowFileFromResource() {
+ final ClassLoader loader = getClass().getClassLoader();
+ return new File(loader.getResource("hello_world_flow.flow").getFile());
+ }
+
+ class SimpleDagProcessor implements DagProcessor {
+
+ @Override
+ public void changeStatus(final Dag dag, final Status status) {
+ System.out.println(dag + " status changed to " + status);
+ if (status.isTerminal()) {
+ FlowRunner2Test.this.flowFinishedLatch.countDown();
+ }
+ }
+ }
+
+ class SimpleNodeProcessor implements NodeProcessor {
+
+ private final String name;
+ private final Map<String, String> config;
+
+ SimpleNodeProcessor(final String name, final Map<String, String> config) {
+ this.name = name;
+ this.config = config;
+ }
+
+ @Override
+ public void changeStatus(final Node node, final Status status) {
+ System.out.println(node + " status changed to " + status);
+ switch (status) {
+ case RUNNING:
+ System.out.println(String.format("Running with config: %s", this.config));
+ FlowRunner2Test.this.dagService.markNodeSuccess(node);
+ FlowRunner2Test.this.eventSequence.add(this.name);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}
diff --git a/azkaban-exec-server/src/test/resources/hello_world_flow.flow b/azkaban-exec-server/src/test/resources/hello_world_flow.flow
new file mode 100644
index 0000000..d73b1e2
--- /dev/null
+++ b/azkaban-exec-server/src/test/resources/hello_world_flow.flow
@@ -0,0 +1,11 @@
+nodes:
+ - name: n1
+ type: command
+ config:
+ command: echo "Hello World."
+ - name: n2
+ dependsOn:
+ - n1
+ type: command
+ config:
+ command: echo "This is the second job."