Dag.java

172 lines | 5.06 kB Blame History Raw Download
/*
 * Copyright 2018 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.dag;

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * A DAG (Directed acyclic graph) consists of {@link Node}s.
 *
 * <p>Most of the methods in this class should remain package private. Code outside of this
 * package should mainly interact with the {@link DagService}.
 */
class Dag {

  private final String name;
  private final DagProcessor dagProcessor;
  private final List<Node> nodes = new ArrayList<>();
  private final Map<String, Node> nameToNodeMap = new HashMap<>();
  private Status status = Status.READY;

  Dag(final String name, final DagProcessor dagProcessor) {
    requireNonNull(name, "The name of the Dag can't be null");
    this.name = name;
    requireNonNull(dagProcessor, "The dagProcessor parameter can't be null.");
    this.dagProcessor = dagProcessor;
  }

  /**
   * Adds a node to the current dag.
   *
   * <p>It's important NOT to expose this method as public. The design relies on this to ensure
   * correctness. The DAG's structure shouldn't change after it is created.
   *
   * @param node a node to add
   */
  void addNode(final Node node) {
    assert (node.getDag() == this);
    this.nodes.add(node);
    assert (!this.nameToNodeMap.containsKey(node.getName()));
    this.nameToNodeMap.put(node.getName(), node);
  }

  /**
   * Gets the node associated with the name.
   *
   * @param name node name
   * @return node. null if the node with this name doesn't exist
   */
  Node getNodeByName(final String name) {
    return this.nameToNodeMap.get(name);
  }

  void start() {
    assert (this.status == Status.READY);
    changeStatus(Status.RUNNING);
    for (final Node node : this.nodes) {
      node.runIfAllowed();
    }
    // It's possible that all nodes are disabled. In this rare case the dag should be
    // marked success. Otherwise it will be stuck in the the running state.
    updateDagStatus();
  }

  void kill() {
    if (this.status.isTerminal() || this.status == Status.KILLING) {
      // It is possible that a kill is issued after a dag has finished or multiple kill requests
      // are received. Without this check, this method will make duplicate calls to the
      // DagProcessor.
      return;
    }
    changeStatus(Status.KILLING);
    for (final Node node : this.nodes) {
      node.kill();
    }
    updateDagStatus();
  }

  /**
   * Update the final dag status when all nodes are done.
   *
   * <p>If any node has not reached its terminal state, this method will simply return.
   */
  void updateDagStatus() {
    // A dag may have nodes that are disabled. It's safer to scan all the nodes.
    // Assume the overhead is minimal. If it is not the case, we can optimize later.
    boolean failed = false;
    for (final Node node : this.nodes) {
      final Status nodeStatus = node.getStatus();
      if (!nodeStatus.isTerminal()) {
        return;
      }
      if (nodeStatus == Status.FAILURE) {
        failed = true;
      }
    }

    // Update the dag status only after all nodes have reached terminal states.
    updateDagStatusInternal(failed);
  }

  /**
   * Update the final dag status.
   *
   * @param failed true if any of the jobs has failed
   */
  private void updateDagStatusInternal(final boolean failed) {
    if (this.status == Status.KILLING) {
      /*
      It's possible that some nodes have failed when the dag is killed.
      Since killing a dag signals an intent from an operator, it is more important to make
      the dag status reflect the result of that explict intent. e.g. if the killing is a
      result of handing a job failure, users more likely want to know that someone has taken
      an action rather than that a job has failed. Operators can still see the individual job
      status.
      */
      changeStatus(Status.KILLED);
    } else if (failed) {
      changeStatus(Status.FAILURE);
    } else {
      changeStatus(Status.SUCCESS);
    }
  }

  private void changeStatus(final Status status) {
    this.status = status;
    this.dagProcessor.changeStatus(this, status);
  }

  @Override
  public String toString() {
    return String.format("dag (%s), status (%s)", this.name, this.status);
  }

  String getName() {
    return this.name;
  }

  Status getStatus() {
    return this.status;
  }

  @VisibleForTesting
  void setStatus(final Status status) {
    this.status = status;
  }

  @VisibleForTesting
  public List<Node> getNodes() {
    return this.nodes;
  }
}