DagServiceTest.java

316 lines | 9.304 kB Blame History Raw Download
/*
 * Copyright 2018 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.dag;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javafx.util.Pair;
import org.junit.After;
import org.junit.Test;

/**
 * Tests {@link DagService}
 *
 * <p>Naming conventions: nodes are named with letters such as a, b. Dags are named with 'f'
 * prefix. e.g. "fa". A sub DAG has the prefix "sf" such as "sfb". A sub DAG is a node within the
 * parent DAG.
 */
public class DagServiceTest {

  private final DagService dagService = new DagService();
  private final StatusChangeRecorder statusChangeRecorder = new StatusChangeRecorder();
  private final Set<Node> nodesToFail = new HashSet<>();
  private final TestNodeProcessor nodeProcessor = new TestNodeProcessor(this.dagService,
      this.statusChangeRecorder, this.nodesToFail);
  private final CountDownLatch dagFinishedLatch = new CountDownLatch(1);
  private final DagProcessor dagProcessor = new TestDagProcessor(this.dagFinishedLatch,
      this.statusChangeRecorder);
  private final Dag testDag = createDag();
  private final List<Pair<String, Status>> expectedSequence = new ArrayList<>();


  @After
  public void tearDown() {
    this.dagService.shutdownAndAwaitTermination();
  }

  /**
   * Tests a DAG with one node which will run successfully.
   */
  @Test
  public void oneNodeSuccess() throws Exception {
    createNodeAndAddToTestDag("a");
    addToExpectedSequence("fa", Status.RUNNING);
    addToExpectedSequence("a", Status.RUNNING);
    addToExpectedSequence("a", Status.SUCCESS);
    addToExpectedSequence("fa", Status.SUCCESS);

    runAndVerify();
  }

  /**
   * Tests a DAG with two nodes which will run successfully.
   * a
   * |
   * b
   */
  @Test
  public void twoNodesSuccess() throws Exception {
    final Node aNode = createNodeAndAddToTestDag("a");
    final Node bNode = createNodeAndAddToTestDag("b");
    aNode.addChild(bNode);
    addToExpectedSequence("fa", Status.RUNNING);
    addToExpectedSequence("a", Status.RUNNING);
    addToExpectedSequence("a", Status.SUCCESS);
    addToExpectedSequence("b", Status.RUNNING);
    addToExpectedSequence("b", Status.SUCCESS);
    addToExpectedSequence("fa", Status.SUCCESS);

    runAndVerify();
  }

  /**
   * Tests a DAG with three nodes which will run successfully.
   * <pre>
   *    a
   *  /  \
   * b    c
   * </pre>
   */
  @Test
  public void threeNodesSuccess() throws Exception {
    final Node aNode = createNodeAndAddToTestDag("a");
    final Node bNode = createNodeAndAddToTestDag("b");
    final Node cNode = createNodeAndAddToTestDag("c");
    aNode.addChildren(bNode, cNode);

    addToExpectedSequence("fa", Status.RUNNING);
    addToExpectedSequence("a", Status.RUNNING);
    addToExpectedSequence("a", Status.SUCCESS);
    addToExpectedSequence("b", Status.RUNNING);
    addToExpectedSequence("c", Status.RUNNING);
    addToExpectedSequence("b", Status.SUCCESS);
    addToExpectedSequence("c", Status.SUCCESS);
    addToExpectedSequence("fa", Status.SUCCESS);

    runAndVerify();

  }

  /**
   * Tests a DAG with one node which will fail.
   */
  @Test
  public void oneNodeFailure() throws Exception {
    final Node aNode = createNodeAndAddToTestDag("a");
    this.nodesToFail.add(aNode);
    addToExpectedSequence("fa", Status.RUNNING);
    addToExpectedSequence("a", Status.RUNNING);
    addToExpectedSequence("a", Status.FAILURE);
    addToExpectedSequence("fa", Status.FAILURE);

    runAndVerify();
  }

  /**
   * Tests a DAG with two nodes, fails the first one.
   *
   * Expects the child node to be marked canceled.
   *
   * a (fail)
   * |
   * b
   */
  @Test
  public void twoNodesFailFirst() throws Exception {
    final Node aNode = createNodeAndAddToTestDag("a");
    final Node bNode = createNodeAndAddToTestDag("b");
    aNode.addChild(bNode);
    this.nodesToFail.add(aNode);

    addToExpectedSequence("fa", Status.RUNNING);
    addToExpectedSequence("a", Status.RUNNING);
    addToExpectedSequence("a", Status.FAILURE);
    addToExpectedSequence("b", Status.CANCELED);
    addToExpectedSequence("fa", Status.FAILURE);

    runAndVerify();
  }

  /**
   * Tests a DAG with three nodes with one failure.
   *
   * Expects the sibling nodes to finish.
   *
   * <pre>
   *       a
   *   /      \
   * b (fail)    c
   * </pre>
   */
  @Test
  public void threeNodesFailSecond() throws Exception {
    final Node aNode = createNodeAndAddToTestDag("a");
    final Node bNode = createNodeAndAddToTestDag("b");
    final Node cNode = createNodeAndAddToTestDag("c");
    aNode.addChildren(bNode, cNode);

    this.nodesToFail.add(bNode);

    addToExpectedSequence("fa", Status.RUNNING);
    addToExpectedSequence("a", Status.RUNNING);
    addToExpectedSequence("a", Status.SUCCESS);
    addToExpectedSequence("b", Status.RUNNING);
    addToExpectedSequence("c", Status.RUNNING);
    addToExpectedSequence("b", Status.FAILURE);
    addToExpectedSequence("c", Status.SUCCESS);
    addToExpectedSequence("fa", Status.FAILURE);

    runAndVerify();

  }

  /**
   * Tests a DAG with one subDag, all successful.
   *
   * <pre>
   *   sfb
   *   |
   *   c
   *
   * subDag: fb
   * a b
   * </pre>
   */
  @Test
  public void simple_subdag_success_case() throws Exception {
    final TestSubDagProcessor testSubDagProcessor = new TestSubDagProcessor
        (this.dagService, this.statusChangeRecorder);
    final Dag bDag = new Dag("fb", testSubDagProcessor);
    createNodeAndAddToDag("a", bDag);
    createNodeAndAddToDag("b", bDag);

    final TestSubDagNodeProcessor testSubDagNodeProcessor = new TestSubDagNodeProcessor
        (this.dagService, this.statusChangeRecorder, bDag);
    final Node subDagNode = new Node("sfb", testSubDagNodeProcessor);
    testSubDagProcessor.setNode(subDagNode);
    this.testDag.addNode(subDagNode);

    final Node cNode = createNodeAndAddToTestDag("c");
    subDagNode.addChild(cNode);

    addToExpectedSequence("fa", Status.RUNNING);
    addToExpectedSequence("sfb", Status.RUNNING);
    addToExpectedSequence("fb", Status.RUNNING);
    addToExpectedSequence("a", Status.RUNNING);
    addToExpectedSequence("b", Status.RUNNING);
    addToExpectedSequence("a", Status.SUCCESS);
    addToExpectedSequence("b", Status.SUCCESS);
    addToExpectedSequence("fb", Status.SUCCESS);
    addToExpectedSequence("sfb", Status.SUCCESS);
    addToExpectedSequence("c", Status.RUNNING);
    addToExpectedSequence("c", Status.SUCCESS);
    addToExpectedSequence("fa", Status.SUCCESS);

    runAndVerify();

  }

  /**
   * Tests killing a dag.
   */
  @Test
  public void kill_a_node() throws Exception {
    final CountDownLatch nodeRunningLatch = new CountDownLatch(1);
    final TestKillNodeProcessor killNodeProcessor = new TestKillNodeProcessor(this.dagService,
        this.statusChangeRecorder, nodeRunningLatch);
    final Node aNode = new Node("a", killNodeProcessor);
    this.testDag.addNode(aNode);

    addToExpectedSequence("fa", Status.RUNNING);
    addToExpectedSequence("a", Status.RUNNING);
    addToExpectedSequence("fa", Status.KILLING);
    addToExpectedSequence("a", Status.KILLING);
    addToExpectedSequence("a", Status.KILLED);
    addToExpectedSequence("fa", Status.KILLED);

    this.dagService.startDag(this.testDag);

    // Make sure the node is running before killing the DAG.
    nodeRunningLatch.await(120, TimeUnit.SECONDS);
    this.dagService.killDag(this.testDag);

    final boolean isWaitSuccessful = this.dagFinishedLatch.await(120, TimeUnit.SECONDS);
    // Make sure the dag finishes.
    assertThat(isWaitSuccessful).isTrue();
    verifyStatusSequence();
  }


  private void addToExpectedSequence(final String name, final Status status) {
    this.expectedSequence.add(new Pair<>(name, status));
  }

  private void runDag() throws InterruptedException {
    this.dagService.startDag(this.testDag);
    final boolean isWaitSuccessful = this.dagFinishedLatch.await(2, TimeUnit.SECONDS);

    // Make sure the dag finishes.
    assertThat(isWaitSuccessful).isTrue();
  }

  private void verifyStatusSequence() {
    this.statusChangeRecorder.verifySequence(this.expectedSequence);
  }

  private void runAndVerify() throws InterruptedException {
    runDag();
    verifyStatusSequence();
  }

  /**
   * Creates a node and add to the test dag.
   *
   * @param name node name
   * @return Node object
   */
  private Node createNode(final String name) {
    return new Node(name, this.nodeProcessor);
  }

  private Node createNodeAndAddToDag(final String name, final Dag flow) {
    final Node node = createNode(name);
    flow.addNode(node);
    return node;
  }

  private Node createNodeAndAddToTestDag(final String name) {
    return createNodeAndAddToDag(name, this.testDag);
  }

  private Dag createDag() {
    return new Dag("fa", this.dagProcessor);
  }
}