azkaban-aplcache
Changes
azkaban-exec-server/build.gradle 17(+17 -0)
Details
azkaban-exec-server/build.gradle 17(+17 -0)
diff --git a/azkaban-exec-server/build.gradle b/azkaban-exec-server/build.gradle
index 8749c19..ef672e1 100644
--- a/azkaban-exec-server/build.gradle
+++ b/azkaban-exec-server/build.gradle
@@ -1,9 +1,26 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
apply plugin: 'distribution'
dependencies {
compile(project(':az-core'))
compile(project(':azkaban-common'))
+ compile deps.guava
compile deps.jsr305
compile deps.kafkaLog4jAppender
runtime(project(':azkaban-hadoop-security-plugin'))
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
new file mode 100644
index 0000000..8029ee2
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Dag.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A DAG (Directed acyclic graph) consists of {@link Node}s.
+ */
+class Dag {
+
+ private final String name;
+ private final DagProcessor dagProcessor;
+ private final List<Node> nodes = new ArrayList<>();
+ private Status status = Status.READY;
+
+ Dag(final String name, final DagProcessor dagProcessor) {
+ this.name = name;
+ requireNonNull(dagProcessor, "The dagProcessor parameter can't be null.");
+ this.dagProcessor = dagProcessor;
+ }
+
+ void addNode(final Node node) {
+ node.setDag(this);
+ this.nodes.add(node);
+ }
+
+ void start() {
+ assert (this.status == Status.READY);
+ changeStatus(Status.RUNNING);
+ for (final Node node : this.nodes) {
+ node.runIfAllowed();
+ }
+ // It's possible that all nodes are disabled. In this rare case the dag should be
+ // marked success. Otherwise it will be stuck in the the running state.
+ updateDagStatus();
+ }
+
+ void kill() {
+ if (this.status.isTerminal() || this.status == Status.KILLING) {
+ // It is possible that a kill is issued after a dag has finished or multiple kill requests
+ // are received. Without this check, this method will make duplicate calls to the
+ // DagProcessor.
+ return;
+ }
+ changeStatus(Status.KILLING);
+ for (final Node node : this.nodes) {
+ node.kill();
+ }
+ updateDagStatus();
+ }
+
+ /**
+ * Update the final dag status when all nodes are done.
+ *
+ * <p>If any node has not reached its terminal state, this method will simply return.
+ */
+ void updateDagStatus() {
+ // A dag may have nodes that are disabled. It's safer to scan all the nodes.
+ // Assume the overhead is minimal. If it is not the case, we can optimize later.
+ boolean failed = false;
+ for (final Node node : this.nodes) {
+ final Status nodeStatus = node.getStatus();
+ if (!nodeStatus.isTerminal()) {
+ return;
+ }
+ if (nodeStatus == Status.FAILURE) {
+ failed = true;
+ }
+ }
+
+ // Update the dag status only after all nodes have reached terminal states.
+ updateDagStatusInternal(failed);
+ }
+
+ /**
+ * Update the final dag status.
+ *
+ * @param failed true if any of the jobs has failed
+ */
+ private void updateDagStatusInternal(final boolean failed) {
+ if (this.status == Status.KILLING) {
+ /*
+ It's possible that some nodes have failed when the dag is killed.
+ Since killing a dag signals an intent from an operator, it is more important to make
+ the dag status reflect the result of that explict intent. e.g. if the killing is a
+ result of handing a job failure, users more likely want to know that someone has taken
+ an action rather than that a job has failed. Operators can still see the individual job
+ status.
+ */
+ changeStatus(Status.KILLED);
+ } else if (failed) {
+ changeStatus(Status.FAILURE);
+ } else {
+ changeStatus(Status.SUCCESS);
+ }
+ }
+
+ private void changeStatus(final Status status) {
+ this.status = status;
+ this.dagProcessor.changeStatus(this, status);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("dag (%s), status (%s)", this.name, this.status);
+ }
+
+ String getName() {
+ return this.name;
+ }
+
+ Status getStatus() {
+ return this.status;
+ }
+
+ @VisibleForTesting
+ void setStatus(final Status status) {
+ this.status = status;
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagProcessor.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagProcessor.java
new file mode 100644
index 0000000..1951344
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+public interface DagProcessor {
+
+ /**
+ * Changes the status of the dag.
+ *
+ * @param dag the dag to change
+ * @param status the new status
+ */
+ void changeStatus(Dag dag, Status status);
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
new file mode 100644
index 0000000..be2fda1
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/DagService.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread safe and non blocking service for DAG processing.
+ *
+ * <p>Allow external inputs to be given to a dag or node to allow the dag to transition states
+ * . Since only one thread is used to progress the DAG, thread synchronization is avoided.
+ */
+@SuppressWarnings("FutureReturnValueIgnored")
+@Singleton
+class DagService {
+
+ private static final long SHUTDOWN_WAIT_TIMEOUT = 60;
+ private static final Logger logger = LoggerFactory.getLogger(DagService.class);
+
+ private final ExecutorService executorService;
+
+ DagService() {
+ // Give the thread a name to make debugging easier.
+ final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("Dag-service").build();
+ this.executorService = Executors.newSingleThreadExecutor(namedThreadFactory);
+ }
+
+ void startDag(final Dag dag) {
+ this.executorService.submit(dag::start);
+ }
+
+ /**
+ * Transitions the node to the success state.
+ */
+ void markNodeSuccess(final Node node) {
+ this.executorService.submit(node::markSuccess);
+ }
+
+ /**
+ * Transitions the node from the killing state to the killed state.
+ */
+ void markNodeKilled(final Node node) {
+ this.executorService.submit(node::markKilled);
+ }
+
+ /**
+ * Transitions the node to the failure state.
+ */
+ void markNodeFailed(final Node node) {
+ this.executorService.submit(node::markFailed);
+ }
+
+ /**
+ * Kills a DAG.
+ */
+ void killDag(final Dag dag) {
+ this.executorService.submit(dag::kill);
+ }
+
+ /**
+ * Shuts down the service and waits for the tasks to finish.
+ *
+ * <p>Adopted from
+ * <a href="https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html">
+ * the Oracle JAVA Documentation.
+ * </a>
+ */
+ void shutdownAndAwaitTermination() {
+ this.executorService.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!this.executorService.awaitTermination(SHUTDOWN_WAIT_TIMEOUT, TimeUnit.SECONDS)) {
+ this.executorService.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!this.executorService.awaitTermination(SHUTDOWN_WAIT_TIMEOUT, TimeUnit.SECONDS)) {
+ logger.error("The DagService did not terminate.");
+ }
+ }
+ } catch (final InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ this.executorService.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Node.java b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
new file mode 100644
index 0000000..b1a9f26
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Node.java
@@ -0,0 +1,198 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Node in a DAG: Directed acyclic graph.
+ */
+class Node {
+
+ private final String name;
+
+ private final NodeProcessor nodeProcessor;
+
+ // The nodes that this node depends on.
+ private final List<Node> parents = new ArrayList<>();
+
+ // The nodes that depend on this node.
+ private final List<Node> children = new ArrayList<>();
+
+ private Status status = Status.READY;
+
+ private Dag dag;
+
+ Node(final String name, final NodeProcessor nodeProcessor) {
+ this.name = name;
+ requireNonNull(nodeProcessor, "The nodeProcessor parameter can't be null.");
+ this.nodeProcessor = nodeProcessor;
+ }
+
+ public Dag getDag() {
+ return this.dag;
+ }
+
+ public void setDag(final Dag dag) {
+ this.dag = dag;
+ }
+
+ private void addParent(final Node node) {
+ this.parents.add(node);
+ }
+
+ void addChild(final Node node) {
+ this.children.add(node);
+ node.addParent(this);
+ }
+
+ void addChildren(final Node... nodes) {
+ for (final Node node : nodes) {
+ addChild(node);
+ }
+ }
+
+ boolean hasParent() {
+ return !this.parents.isEmpty();
+ }
+
+ /**
+ * Checks if the node is ready to run.
+ *
+ * @return true if the node is ready to run
+ */
+ private boolean isReady() {
+ if (this.status != Status.READY) {
+ // e.g. if the node is disabled, it is not ready to run.
+ return false;
+ }
+ for (final Node parent : this.parents) {
+ if (!parent.status.isSuccessEffectively()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Transitions the node to the success state.
+ */
+ void markSuccess() {
+ // It's possible that the dag is killed before this method is called.
+ assertRunningOrKilling();
+ changeStatus(Status.SUCCESS);
+ for (final Node child : this.children) {
+ child.runIfAllowed();
+ }
+ this.dag.updateDagStatus();
+ }
+
+ /**
+ * Checks if all the dependencies are met and run if they are.
+ */
+ void runIfAllowed() {
+ if (isReady()) {
+ changeStatus(Status.RUNNING);
+ }
+ }
+
+ /**
+ * Transitions the node to the failure state.
+ */
+ void markFailed() {
+ // It's possible that the dag is killed before this method is called.
+ assertRunningOrKilling();
+ changeStatus(Status.FAILURE);
+ for (final Node child : this.children) {
+ child.cancel();
+ }
+ //todo: HappyRay support failure options "Finish Current Running" and "Cancel All"
+ this.dag.updateDagStatus();
+ }
+
+ private void cancel() {
+ // The node shouldn't have started.
+ assert (this.status.isPreRunState());
+ if (this.status != Status.DISABLED) {
+ changeStatus(Status.CANCELED);
+ }
+ for (final Node node : this.children) {
+ node.cancel();
+ }
+ }
+
+ /**
+ * Asserts that the state is running or killing.
+ */
+ private void assertRunningOrKilling() {
+ assert (this.status == Status.RUNNING || this.status == Status.KILLING);
+ }
+
+ private void changeStatus(final Status status) {
+ this.status = status;
+ this.nodeProcessor.changeStatus(this, this.status);
+ }
+
+ /**
+ * Kills a node.
+ *
+ * <p>A node is not designed to be killed individually. This method expects {@link Dag#kill()}
+ * method to kill all nodes. Thus this method itself doesn't need to propagate the kill signal to
+ * the node's children nodes.
+ */
+ void kill() {
+ assert (this.dag.getStatus() == Status.KILLING);
+ if (this.status == Status.READY || this.status == Status.BLOCKED) {
+ // If the node is disabled, keep the status as disabled.
+ changeStatus(Status.CANCELED);
+ } else if (this.status == Status.RUNNING) {
+ changeStatus(Status.KILLING);
+ }
+ // If the node has finished, leave the status intact.
+ }
+
+ /**
+ * Transition the node from the killing state to the killed state.
+ */
+ void markKilled() {
+ assert (this.status == Status.KILLING);
+ changeStatus(Status.KILLED);
+ this.dag.updateDagStatus();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Node (%s) status (%s) in %s", this.name, this.status, this.dag);
+ }
+
+ Status getStatus() {
+ return this.status;
+ }
+
+ @VisibleForTesting
+ void setStatus(final Status status) {
+ this.status = status;
+ }
+
+ String getName() {
+ return this.name;
+ }
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/NodeProcessor.java b/azkaban-exec-server/src/main/java/azkaban/dag/NodeProcessor.java
new file mode 100644
index 0000000..423b0f7
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/NodeProcessor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+public interface NodeProcessor {
+
+ /**
+ * Changes the status of the node.
+ *
+ * <p>Typically a processor implementation should handle the RUNNING and KILLING status by
+ * starting or killing a unit of work and call the {@link DagService} to transition the node
+ * to the next status.
+ *
+ * <p>The call will be made in the context of the DagService's one and only thread. Thus a
+ * processor should limit the time it takes to process the call. For lengthy operations such as
+ * I/O operations, consider offloading them to other threads.
+ *
+ * @param node the node to change
+ * @param status the new status
+ */
+ void changeStatus(Node node, Status status);
+}
diff --git a/azkaban-exec-server/src/main/java/azkaban/dag/Status.java b/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
new file mode 100644
index 0000000..c3f3a62
--- /dev/null
+++ b/azkaban-exec-server/src/main/java/azkaban/dag/Status.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import com.google.common.collect.ImmutableSet;
+
+enum Status {
+ READY, // ready to run
+ DISABLED, // disabled by users. Treat as the node has the status of success
+ BLOCKED, // temporarily blocked. Need to be unblocked by another external event
+ RUNNING,
+ SUCCESS,
+ FAILURE,
+
+ // doesn't run because one of the nodes it depends on fails or is killed. Applies to a node only.
+ CANCELED,
+ KILLING, // in the process of killing a running job
+ KILLED; // explicitly killed by a user
+
+ // The states that will not transition to other states
+ private static final ImmutableSet TERMINAL_STATES = ImmutableSet.of(DISABLED, SUCCESS, FAILURE,
+ CANCELED, KILLED);
+
+ boolean isTerminal() {
+ return TERMINAL_STATES.contains(this);
+ }
+
+ // The states that are considered as success effectively
+ private static final ImmutableSet EFFECTIVE_SUCCESS_STATES = ImmutableSet.of(DISABLED, SUCCESS);
+
+ boolean isSuccessEffectively() {
+ return EFFECTIVE_SUCCESS_STATES.contains(this);
+ }
+
+ // The states that are possible before a node ever starts to run or be killed or canceled
+ private static final ImmutableSet PRE_RUN_STATES = ImmutableSet.of(DISABLED, READY, BLOCKED);
+
+ boolean isPreRunState() {
+ return PRE_RUN_STATES.contains(this);
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
new file mode 100644
index 0000000..ddcb068
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagServiceTest.java
@@ -0,0 +1,315 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javafx.util.Pair;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests {@link DagService}
+ *
+ * <p>Naming conventions: nodes are named with letters such as a, b. Dags are named with 'f'
+ * prefix. e.g. "fa". A sub DAG has the prefix "sf" such as "sfb". A sub DAG is a node within the
+ * parent DAG.
+ */
+public class DagServiceTest {
+
+ private final DagService dagService = new DagService();
+ private final StatusChangeRecorder statusChangeRecorder = new StatusChangeRecorder();
+ private final Set<Node> nodesToFail = new HashSet<>();
+ private final TestNodeProcessor nodeProcessor = new TestNodeProcessor(this.dagService,
+ this.statusChangeRecorder, this.nodesToFail);
+ private final CountDownLatch dagFinishedLatch = new CountDownLatch(1);
+ private final DagProcessor dagProcessor = new TestDagProcessor(this.dagFinishedLatch,
+ this.statusChangeRecorder);
+ private final Dag testDag = createDag();
+ private final List<Pair<String, Status>> expectedSequence = new ArrayList<>();
+
+
+ @After
+ public void tearDown() {
+ this.dagService.shutdownAndAwaitTermination();
+ }
+
+ /**
+ * Tests a DAG with one node which will run successfully.
+ */
+ @Test
+ public void oneNodeSuccess() throws Exception {
+ createNodeAndAddToTestDag("a");
+ addToExpectedSequence("fa", Status.RUNNING);
+ addToExpectedSequence("a", Status.RUNNING);
+ addToExpectedSequence("a", Status.SUCCESS);
+ addToExpectedSequence("fa", Status.SUCCESS);
+
+ runAndVerify();
+ }
+
+ /**
+ * Tests a DAG with two nodes which will run successfully.
+ * a
+ * |
+ * b
+ */
+ @Test
+ public void twoNodesSuccess() throws Exception {
+ final Node aNode = createNodeAndAddToTestDag("a");
+ final Node bNode = createNodeAndAddToTestDag("b");
+ aNode.addChild(bNode);
+ addToExpectedSequence("fa", Status.RUNNING);
+ addToExpectedSequence("a", Status.RUNNING);
+ addToExpectedSequence("a", Status.SUCCESS);
+ addToExpectedSequence("b", Status.RUNNING);
+ addToExpectedSequence("b", Status.SUCCESS);
+ addToExpectedSequence("fa", Status.SUCCESS);
+
+ runAndVerify();
+ }
+
+ /**
+ * Tests a DAG with three nodes which will run successfully.
+ * <pre>
+ * a
+ * / \
+ * b c
+ * </pre>
+ */
+ @Test
+ public void threeNodesSuccess() throws Exception {
+ final Node aNode = createNodeAndAddToTestDag("a");
+ final Node bNode = createNodeAndAddToTestDag("b");
+ final Node cNode = createNodeAndAddToTestDag("c");
+ aNode.addChildren(bNode, cNode);
+
+ addToExpectedSequence("fa", Status.RUNNING);
+ addToExpectedSequence("a", Status.RUNNING);
+ addToExpectedSequence("a", Status.SUCCESS);
+ addToExpectedSequence("b", Status.RUNNING);
+ addToExpectedSequence("c", Status.RUNNING);
+ addToExpectedSequence("b", Status.SUCCESS);
+ addToExpectedSequence("c", Status.SUCCESS);
+ addToExpectedSequence("fa", Status.SUCCESS);
+
+ runAndVerify();
+
+ }
+
+ /**
+ * Tests a DAG with one node which will fail.
+ */
+ @Test
+ public void oneNodeFailure() throws Exception {
+ final Node aNode = createNodeAndAddToTestDag("a");
+ this.nodesToFail.add(aNode);
+ addToExpectedSequence("fa", Status.RUNNING);
+ addToExpectedSequence("a", Status.RUNNING);
+ addToExpectedSequence("a", Status.FAILURE);
+ addToExpectedSequence("fa", Status.FAILURE);
+
+ runAndVerify();
+ }
+
+ /**
+ * Tests a DAG with two nodes, fails the first one.
+ *
+ * Expects the child node to be marked canceled.
+ *
+ * a (fail)
+ * |
+ * b
+ */
+ @Test
+ public void twoNodesFailFirst() throws Exception {
+ final Node aNode = createNodeAndAddToTestDag("a");
+ final Node bNode = createNodeAndAddToTestDag("b");
+ aNode.addChild(bNode);
+ this.nodesToFail.add(aNode);
+
+ addToExpectedSequence("fa", Status.RUNNING);
+ addToExpectedSequence("a", Status.RUNNING);
+ addToExpectedSequence("a", Status.FAILURE);
+ addToExpectedSequence("b", Status.CANCELED);
+ addToExpectedSequence("fa", Status.FAILURE);
+
+ runAndVerify();
+ }
+
+ /**
+ * Tests a DAG with three nodes with one failure.
+ *
+ * Expects the sibling nodes to finish.
+ *
+ * <pre>
+ * a
+ * / \
+ * b (fail) c
+ * </pre>
+ */
+ @Test
+ public void threeNodesFailSecond() throws Exception {
+ final Node aNode = createNodeAndAddToTestDag("a");
+ final Node bNode = createNodeAndAddToTestDag("b");
+ final Node cNode = createNodeAndAddToTestDag("c");
+ aNode.addChildren(bNode, cNode);
+
+ this.nodesToFail.add(bNode);
+
+ addToExpectedSequence("fa", Status.RUNNING);
+ addToExpectedSequence("a", Status.RUNNING);
+ addToExpectedSequence("a", Status.SUCCESS);
+ addToExpectedSequence("b", Status.RUNNING);
+ addToExpectedSequence("c", Status.RUNNING);
+ addToExpectedSequence("b", Status.FAILURE);
+ addToExpectedSequence("c", Status.SUCCESS);
+ addToExpectedSequence("fa", Status.FAILURE);
+
+ runAndVerify();
+
+ }
+
+ /**
+ * Tests a DAG with one subDag, all successful.
+ *
+ * <pre>
+ * sfb
+ * |
+ * c
+ *
+ * subDag: fb
+ * a b
+ * </pre>
+ */
+ @Test
+ public void simple_subdag_success_case() throws Exception {
+ final TestSubDagProcessor testSubDagProcessor = new TestSubDagProcessor
+ (this.dagService, this.statusChangeRecorder);
+ final Dag bDag = new Dag("fb", testSubDagProcessor);
+ createNodeAndAddToDag("a", bDag);
+ createNodeAndAddToDag("b", bDag);
+
+ final TestSubDagNodeProcessor testSubDagNodeProcessor = new TestSubDagNodeProcessor
+ (this.dagService, this.statusChangeRecorder, bDag);
+ final Node subDagNode = new Node("sfb", testSubDagNodeProcessor);
+ testSubDagProcessor.setNode(subDagNode);
+ this.testDag.addNode(subDagNode);
+
+ final Node cNode = createNodeAndAddToTestDag("c");
+ subDagNode.addChild(cNode);
+
+ addToExpectedSequence("fa", Status.RUNNING);
+ addToExpectedSequence("sfb", Status.RUNNING);
+ addToExpectedSequence("fb", Status.RUNNING);
+ addToExpectedSequence("a", Status.RUNNING);
+ addToExpectedSequence("b", Status.RUNNING);
+ addToExpectedSequence("a", Status.SUCCESS);
+ addToExpectedSequence("b", Status.SUCCESS);
+ addToExpectedSequence("fb", Status.SUCCESS);
+ addToExpectedSequence("sfb", Status.SUCCESS);
+ addToExpectedSequence("c", Status.RUNNING);
+ addToExpectedSequence("c", Status.SUCCESS);
+ addToExpectedSequence("fa", Status.SUCCESS);
+
+ runAndVerify();
+
+ }
+
+ /**
+ * Tests killing a dag.
+ */
+ @Test
+ public void kill_a_node() throws Exception {
+ final CountDownLatch nodeRunningLatch = new CountDownLatch(1);
+ final TestKillNodeProcessor killNodeProcessor = new TestKillNodeProcessor(this.dagService,
+ this.statusChangeRecorder, nodeRunningLatch);
+ final Node aNode = new Node("a", killNodeProcessor);
+ this.testDag.addNode(aNode);
+
+ addToExpectedSequence("fa", Status.RUNNING);
+ addToExpectedSequence("a", Status.RUNNING);
+ addToExpectedSequence("fa", Status.KILLING);
+ addToExpectedSequence("a", Status.KILLING);
+ addToExpectedSequence("a", Status.KILLED);
+ addToExpectedSequence("fa", Status.KILLED);
+
+ this.dagService.startDag(this.testDag);
+
+ // Make sure the node is running before killing the DAG.
+ nodeRunningLatch.await(120, TimeUnit.SECONDS);
+ this.dagService.killDag(this.testDag);
+
+ final boolean isWaitSuccessful = this.dagFinishedLatch.await(120, TimeUnit.SECONDS);
+ // Make sure the dag finishes.
+ assertThat(isWaitSuccessful).isTrue();
+ verifyStatusSequence();
+ }
+
+
+ private void addToExpectedSequence(final String name, final Status status) {
+ this.expectedSequence.add(new Pair<>(name, status));
+ }
+
+ private void runDag() throws InterruptedException {
+ this.dagService.startDag(this.testDag);
+ final boolean isWaitSuccessful = this.dagFinishedLatch.await(2, TimeUnit.SECONDS);
+
+ // Make sure the dag finishes.
+ assertThat(isWaitSuccessful).isTrue();
+ }
+
+ private void verifyStatusSequence() {
+ this.statusChangeRecorder.verifySequence(this.expectedSequence);
+ }
+
+ private void runAndVerify() throws InterruptedException {
+ runDag();
+ verifyStatusSequence();
+ }
+
+ /**
+ * Creates a node and add to the test dag.
+ *
+ * @param name node name
+ * @return Node object
+ */
+ private Node createNode(final String name) {
+ return new Node(name, this.nodeProcessor);
+ }
+
+ private Node createNodeAndAddToDag(final String name, final Dag flow) {
+ final Node node = createNode(name);
+ flow.addNode(node);
+ return node;
+ }
+
+ private Node createNodeAndAddToTestDag(final String name) {
+ return createNodeAndAddToDag(name, this.testDag);
+ }
+
+ private Dag createDag() {
+ return new Dag("fa", this.dagProcessor);
+ }
+}
+
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
new file mode 100644
index 0000000..522beb2
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/DagTest.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Test;
+
+/**
+ * Tests the dag state ( including its nodes' states) transitions.
+ *
+ * Focuses on how the dag state changes in response to one external request.
+ */
+public class DagTest {
+
+ private final Dag testFlow = new Dag("fa", mock(DagProcessor.class));
+
+ @Test
+ public void dag_finish_with_only_disabled_nodes() {
+ final Node aNode = createAndAddNode("a");
+ aNode.setStatus(Status.DISABLED);
+ this.testFlow.start();
+ assertThat(aNode.getStatus()).isEqualTo(Status.DISABLED);
+ assertThat(this.testFlow.getStatus()).isEqualTo(Status.SUCCESS);
+ }
+
+ @Test
+ public void running_nodes_can_be_killed() {
+ final Node aNode = createAndAddNode("a");
+ aNode.setStatus(Status.RUNNING);
+ this.testFlow.setStatus(Status.RUNNING);
+ this.testFlow.kill();
+ assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
+ assertThat(this.testFlow.getStatus()).isEqualTo(Status.KILLING);
+ }
+
+ /**
+ * Tests ready nodes are canceled when the dag is killed.
+ */
+ @Test
+ public void waiting_nodes_are_canceled_when_killed() {
+ final Node aNode = createAndAddNode("a");
+ aNode.setStatus(Status.RUNNING);
+ final Node bNode = createAndAddNode("b");
+ aNode.addChild(bNode);
+ this.testFlow.setStatus(Status.RUNNING);
+ this.testFlow.kill();
+ assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
+ assertThat(bNode.getStatus()).isEqualTo(Status.CANCELED);
+ assertThat(this.testFlow.getStatus()).isEqualTo(Status.KILLING);
+ }
+
+ /**
+ * Tests multiple ready nodes are canceled when the dag is killed.
+ * <pre>
+ * a (running)
+ * / \
+ * b c
+ * \
+ * d
+ * </pre>
+ */
+ @Test
+ public void multiple_waiting_nodes_are_canceled_when_killed() {
+ final Node aNode = createAndAddNode("a");
+ aNode.setStatus(Status.RUNNING);
+ final Node bNode = createAndAddNode("b");
+ aNode.addChild(bNode);
+ final Node cNode = createAndAddNode("c");
+ aNode.addChild(cNode);
+ final Node dNode = createAndAddNode("d");
+ cNode.addChild(dNode);
+
+ this.testFlow.setStatus(Status.RUNNING);
+ this.testFlow.kill();
+ assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
+ assertThat(bNode.getStatus()).isEqualTo(Status.CANCELED);
+ assertThat(dNode.getStatus()).isEqualTo(Status.CANCELED);
+ assertThat(dNode.getStatus()).isEqualTo(Status.CANCELED);
+ assertThat(this.testFlow.getStatus()).isEqualTo(Status.KILLING);
+ }
+
+ /**
+ * Tests blocked nodes are canceled when the dag is killed.
+ */
+ @Test
+ public void blocked_nodes_are_canceled_when_killed() {
+ final Node aNode = createAndAddNode("a");
+ aNode.setStatus(Status.RUNNING);
+ final Node bNode = createAndAddNode("b");
+ aNode.addChild(bNode);
+ bNode.setStatus(Status.BLOCKED);
+ this.testFlow.setStatus(Status.RUNNING);
+ this.testFlow.kill();
+ assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
+ assertThat(bNode.getStatus()).isEqualTo(Status.CANCELED);
+ }
+
+ /**
+ * Tests success nodes' states remain the same when the dag is killed.
+ * <pre>
+ * a (success)
+ * /
+ * b (running)
+ * </pre>
+ */
+ @Test
+ public void success_node_state_remain_the_same_when_killed() {
+ final Node aNode = createAndAddNode("a");
+ aNode.setStatus(Status.SUCCESS);
+ final Node bNode = createAndAddNode("b");
+ bNode.setStatus(Status.RUNNING);
+ aNode.addChild(bNode);
+ this.testFlow.kill();
+ assertThat(aNode.getStatus()).isEqualTo(Status.SUCCESS);
+ assertThat(bNode.getStatus()).isEqualTo(Status.KILLING);
+ }
+
+ /**
+ * Tests failed nodes' states remain the same when the dag is killed.
+ * This can happen when running jobs are allowed to finish when a node fails.
+ *
+ * <pre>
+ * a (running) b (failure)
+ * </pre>
+ */
+ @Test
+ public void failed_node_state_remain_the_same_when_killed() {
+ final Node aNode = createAndAddNode("a");
+ aNode.setStatus(Status.RUNNING);
+ final Node bNode = createAndAddNode("b");
+ bNode.setStatus(Status.FAILURE);
+ this.testFlow.kill();
+ assertThat(aNode.getStatus()).isEqualTo(Status.KILLING);
+ assertThat(bNode.getStatus()).isEqualTo(Status.FAILURE);
+ }
+
+ /**
+ * Creates a node and add to the test dag.
+ *
+ * @param name node name
+ * @return Node object
+ */
+ private Node createAndAddNode(final String name) {
+ final Node node = TestUtil.createNodeWithNullProcessor(name);
+ this.testFlow.addNode(node);
+ return node;
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java
new file mode 100644
index 0000000..d871624
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/NodeTest.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class NodeTest {
+
+ @Test
+ public void hasParent() {
+ final Node node = TestUtil.createNodeWithNullProcessor("a");
+ final Node parentNode = TestUtil.createNodeWithNullProcessor("parent");
+ parentNode.addChild(node);
+ final boolean hasParent = node.hasParent();
+ assertThat(hasParent).isTrue();
+ }
+
+ @Test
+ public void hasParentNegative() {
+ final Node node = TestUtil.createNodeWithNullProcessor("a");
+ final boolean hasParent = node.hasParent();
+ assertThat(hasParent).isFalse();
+ }
+
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/StatusChangeRecorder.java b/azkaban-exec-server/src/test/java/azkaban/dag/StatusChangeRecorder.java
new file mode 100644
index 0000000..9890942
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/StatusChangeRecorder.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import javafx.util.Pair;
+
+/**
+ * Records the sequence of nodes and dag status change.
+ */
+class StatusChangeRecorder {
+
+ private final List<Pair<String, Status>> sequence = new ArrayList<>();
+
+ void recordNode(final Node node) {
+ this.sequence.add(new Pair<>(node.getName(), node.getStatus()));
+ }
+
+ void recordDag(final Dag dag) {
+ this.sequence.add(new Pair<>(dag.getName(), dag.getStatus()));
+ }
+
+ void verifySequence(final List<Pair<String, Status>> expectedSequence) {
+ assertThat(this.sequence).isEqualTo(expectedSequence);
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/StatusTest.java b/azkaban-exec-server/src/test/java/azkaban/dag/StatusTest.java
new file mode 100644
index 0000000..68e76fa
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/StatusTest.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class StatusTest {
+
+ //todo HappyRay: add tests for other status
+ @Test
+ public void disabled_is_terminal() {
+ assertThat(Status.DISABLED.isTerminal()).isTrue();
+ }
+
+ @Test
+ public void running_is_not_terminal() {
+ assertThat(Status.RUNNING.isTerminal()).isFalse();
+ }
+
+ @Test
+ public void diabled_is_effectively_success() {
+ assertThat(Status.DISABLED.isSuccessEffectively()).isTrue();
+ }
+
+ @Test
+ public void failure_is_not_effectively_success() {
+ assertThat(Status.FAILURE.isSuccessEffectively()).isFalse();
+ }
+
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestDagProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestDagProcessor.java
new file mode 100644
index 0000000..79af3fe
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestDagProcessor.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import java.util.concurrent.CountDownLatch;
+
+public class TestDagProcessor implements DagProcessor {
+
+ private final StatusChangeRecorder statusChangeRecorder;
+ private final CountDownLatch dagFinishedLatch;
+
+
+ TestDagProcessor(final CountDownLatch dagFinishedLatch,
+ final StatusChangeRecorder statusChangeRecorder) {
+ this.dagFinishedLatch = dagFinishedLatch;
+ this.statusChangeRecorder = statusChangeRecorder;
+ }
+
+ @Override
+ public void changeStatus(final Dag dag, final Status status) {
+ System.out.println(dag);
+ this.statusChangeRecorder.recordDag(dag);
+ if (status.isTerminal()) {
+ this.dagFinishedLatch.countDown();
+ }
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestKillNodeProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestKillNodeProcessor.java
new file mode 100644
index 0000000..cbcd0f2
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestKillNodeProcessor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import java.util.concurrent.CountDownLatch;
+
+public class TestKillNodeProcessor implements NodeProcessor {
+
+ private final DagService dagService;
+ private final StatusChangeRecorder statusChangeRecorder;
+ private final CountDownLatch nodeRunningLatch;
+
+ /**
+ * A node processor that tests killing a node.
+ *
+ * @param nodeRunningLatch signal that the node has started running
+ */
+ TestKillNodeProcessor(final DagService dagService,
+ final StatusChangeRecorder statusChangeRecorder, final CountDownLatch nodeRunningLatch) {
+ this.dagService = dagService;
+ this.statusChangeRecorder = statusChangeRecorder;
+ this.nodeRunningLatch = nodeRunningLatch;
+ }
+
+ @Override
+ public void changeStatus(final Node node, final Status status) {
+ System.out.println(node);
+ this.statusChangeRecorder.recordNode(node);
+ switch (node.getStatus()) {
+ case RUNNING:
+ // Don't mark the job finished. Simulate a long running job.
+ this.nodeRunningLatch.countDown();
+ break;
+ case KILLING:
+ this.dagService.markNodeKilled(node);
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java
new file mode 100644
index 0000000..f0b1751
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestNodeProcessor.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import java.util.Set;
+
+public class TestNodeProcessor implements NodeProcessor {
+
+ private final DagService dagService;
+ private final StatusChangeRecorder statusChangeRecorder;
+ private final Set<Node> nodesToFail;
+
+ TestNodeProcessor(final DagService dagService,
+ final StatusChangeRecorder statusChangeRecorder, final Set<Node> nodesToFail) {
+ this.dagService = dagService;
+ this.statusChangeRecorder = statusChangeRecorder;
+ this.nodesToFail = nodesToFail;
+ }
+
+ @Override
+ public void changeStatus(final Node node, final Status status) {
+ System.out.println(node);
+ this.statusChangeRecorder.recordNode(node);
+
+ switch (status) {
+ case RUNNING:
+ if (this.nodesToFail.contains(node)) {
+ this.dagService.markNodeFailed(node);
+ } else {
+ this.dagService.markNodeSuccess(node);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagNodeProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagNodeProcessor.java
new file mode 100644
index 0000000..288cec0
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagNodeProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+/**
+ * A NodeProcessor that bridges the sub DAG and the parent DAG.
+ */
+public class TestSubDagNodeProcessor implements NodeProcessor {
+
+ private final DagService dagService;
+ private final StatusChangeRecorder statusChangeRecorder;
+ private final Dag dag;
+
+
+ TestSubDagNodeProcessor(final DagService dagService,
+ final StatusChangeRecorder statusChangeRecorder,
+ final Dag dag
+ ) {
+ this.dagService = dagService;
+ this.statusChangeRecorder = statusChangeRecorder;
+ this.dag = dag;
+ }
+
+
+ /**
+ * Triggers the sub DAG state change when the sub DAG node in the parent DAG's status changes.
+ *
+ * @param node the node to change
+ * @param status the new status
+ */
+ @Override
+ public void changeStatus(final Node node, final Status status) {
+ System.out.println(node);
+ this.statusChangeRecorder.recordNode(node);
+
+ switch (status) {
+ case RUNNING:
+ this.dagService.startDag(this.dag);
+ break;
+ case KILLING:
+ this.dagService.killDag(this.dag);
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagProcessor.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagProcessor.java
new file mode 100644
index 0000000..0b7a018
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestSubDagProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A DAG processor that brides the sub DAG and the parent DAG.
+ */
+public class TestSubDagProcessor implements DagProcessor {
+
+ private final DagService dagService;
+ private final StatusChangeRecorder statusChangeRecorder;
+ private Node node;
+
+
+ TestSubDagProcessor(final DagService dagService,
+ final StatusChangeRecorder statusChangeRecorder
+ ) {
+ this.dagService = dagService;
+ this.statusChangeRecorder = statusChangeRecorder;
+ }
+
+
+ /**
+ * Transfers the node state in the parent DAG when the sub DAG status changes.
+ *
+ * @param dag the dag to change
+ * @param status the new status
+ */
+ @Override
+ public void changeStatus(final Dag dag, final Status status) {
+ System.out.println(dag);
+ this.statusChangeRecorder.recordDag(dag);
+ requireNonNull(this.node, "Node for the subDag in the parent DAG can't be null.");
+ switch (status) {
+ case SUCCESS:
+ this.dagService.markNodeSuccess(this.node);
+ break;
+ case FAILURE:
+ this.dagService.markNodeFailed(this.node);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Sets the node that this subflow belongs.
+ * <p>
+ * Can't pass this information in the constructor since it will cause a circular dependency
+ * problem.
+ *
+ * @param node the node as part of the parent flow
+ */
+ public void setNode(final Node node) {
+ this.node = node;
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java b/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java
new file mode 100644
index 0000000..29642c6
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/dag/TestUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2018 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.dag;
+
+import static org.mockito.Mockito.mock;
+
+class TestUtil {
+
+ /**
+ * Creates a node with a processor that does nothing.
+ *
+ * @param name node name
+ */
+ static Node createNodeWithNullProcessor(final String name) {
+ return new Node(name, mock(NodeProcessor.class));
+ }
+}