DirectoryFlowLoader.java

463 lines | 15.673 kB Blame History Raw Download
/*
 * Copyright 2012 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.project;

import azkaban.flow.CommonJobProperties;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.flow.SpecialJobTypes;
import azkaban.jobcallback.JobCallbackValidator;
import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import azkaban.utils.Utils;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;

public class DirectoryFlowLoader {

  public static final String JOB_MAX_XMS = "job.max.Xms";
  public static final String MAX_XMS_DEFAULT = "1G";
  public static final String JOB_MAX_XMX = "job.max.Xmx";
  public static final String MAX_XMX_DEFAULT = "2G";
  private static final DirFilter DIR_FILTER = new DirFilter();
  private static final String PROPERTY_SUFFIX = ".properties";
  private static final String JOB_SUFFIX = ".job";
  private static final String XMS = "Xms";
  private static final String XMX = "Xmx";

  private static final Logger logger = Logger.getLogger(DirectoryFlowLoader.class);
  private final Props props;
  private HashSet<String> rootNodes;
  private HashMap<String, Flow> flowMap;
  private HashMap<String, Node> nodeMap;
  private HashMap<String, Map<String, Edge>> nodeDependencies;
  private HashMap<String, Props> jobPropsMap;

  // Flow dependencies for embedded flows.
  private HashMap<String, Set<String>> flowDependencies;

  private ArrayList<FlowProps> flowPropsList;
  private ArrayList<Props> propsList;
  private Set<String> errors;
  private Set<String> duplicateJobs;

  /**
   * Creates a new DirectoryFlowLoader.
   *
   * @param props Properties to add.
   */
  public DirectoryFlowLoader(final Props props) {
    this.props = props;
  }

  /**
   * Returns errors caught when loading flows.
   *
   * @return Set of error strings.
   */
  public Set<String> getErrors() {
    return this.errors;
  }

  /**
   * Loads all flows from the directory into the project.
   *
   * @param project The project to load flows to.
   * @param baseDirectory The directory to load flows from.
   */
  public void loadProjectFlow(final Project project, final File baseDirectory) {
    this.propsList = new ArrayList<>();
    this.flowPropsList = new ArrayList<>();
    this.jobPropsMap = new HashMap<>();
    this.nodeMap = new HashMap<>();
    this.flowMap = new HashMap<>();
    this.errors = new HashSet<>();
    this.duplicateJobs = new HashSet<>();
    this.nodeDependencies = new HashMap<>();
    this.rootNodes = new HashSet<>();
    this.flowDependencies = new HashMap<>();

    // Load all the props files and create the Node objects
    loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);

    // Create edges and find missing dependencies
    resolveDependencies();

    // Create the flows.
    buildFlowsFromDependencies();

    // Resolve embedded flows
    resolveEmbeddedFlows();

    project.setFlows(this.flowMap);
    project.setPropsList(this.propsList);
    project.setJobPropsMap(this.jobPropsMap);

  }

  private void loadProjectFromDir(final String base, final File dir, Props parent) {
    final File[] propertyFiles = dir.listFiles(new SuffixFilter(PROPERTY_SUFFIX));
    Arrays.sort(propertyFiles);

    for (final File file : propertyFiles) {
      final String relative = getRelativeFilePath(base, file.getPath());
      try {
        parent = new Props(parent, file);
        parent.setSource(relative);

        final FlowProps flowProps = new FlowProps(parent);
        this.flowPropsList.add(flowProps);
      } catch (final IOException e) {
        this.errors.add("Error loading properties " + file.getName() + ":"
            + e.getMessage());
      }

      this.logger.info("Adding " + relative);
      this.propsList.add(parent);
    }

    // Load all Job files. If there's a duplicate name, then we don't load
    final File[] jobFiles = dir.listFiles(new SuffixFilter(JOB_SUFFIX));
    for (final File file : jobFiles) {
      final String jobName = getNameWithoutExtension(file);
      try {
        if (!this.duplicateJobs.contains(jobName)) {
          if (this.jobPropsMap.containsKey(jobName)) {
            this.errors.add("Duplicate job names found '" + jobName + "'.");
            this.duplicateJobs.add(jobName);
            this.jobPropsMap.remove(jobName);
            this.nodeMap.remove(jobName);
          } else {
            final Props prop = new Props(parent, file);
            final String relative = getRelativeFilePath(base, file.getPath());
            prop.setSource(relative);

            final Node node = new Node(jobName);
            final String type = prop.getString("type", null);
            if (type == null) {
              this.errors.add("Job doesn't have type set '" + jobName + "'.");
            }

            node.setType(type);

            node.setJobSource(relative);
            if (parent != null) {
              node.setPropsSource(parent.getSource());
            }

            // Force root node
            if (prop.getBoolean(CommonJobProperties.ROOT_NODE, false)) {
              this.rootNodes.add(jobName);
            }

            this.jobPropsMap.put(jobName, prop);
            this.nodeMap.put(jobName, node);
          }
        }
      } catch (final IOException e) {
        this.errors.add("Error loading job file " + file.getName() + ":"
            + e.getMessage());
      }
    }

    final File[] subDirs = dir.listFiles(DIR_FILTER);
    for (final File file : subDirs) {
      loadProjectFromDir(base, file, parent);
    }
  }

  private void resolveEmbeddedFlows() {
    for (final String flowId : this.flowDependencies.keySet()) {
      final HashSet<String> visited = new HashSet<>();
      resolveEmbeddedFlow(flowId, visited);
    }
  }

  private void resolveEmbeddedFlow(final String flowId, final Set<String> visited) {
    final Set<String> embeddedFlow = this.flowDependencies.get(flowId);
    if (embeddedFlow == null) {
      return;
    }

    visited.add(flowId);
    for (final String embeddedFlowId : embeddedFlow) {
      if (visited.contains(embeddedFlowId)) {
        this.errors.add("Embedded flow cycle found in " + flowId + "->"
            + embeddedFlowId);
        return;
      } else if (!this.flowMap.containsKey(embeddedFlowId)) {
        this.errors.add("Flow " + flowId + " depends on " + embeddedFlowId
            + " but can't be found.");
        return;
      } else {
        resolveEmbeddedFlow(embeddedFlowId, visited);
      }
    }

    visited.remove(flowId);
  }

  private void resolveDependencies() {
    // Add all the in edges and out edges. Catch bad dependencies and self
    // referrals. Also collect list of nodes who are parents.
    for (final Node node : this.nodeMap.values()) {
      final Props props = this.jobPropsMap.get(node.getId());

      if (props == null) {
        this.logger.error("Job props not found!! For some reason.");
        continue;
      }

      final List<String> dependencyList =
          props.getStringList(CommonJobProperties.DEPENDENCIES,
              (List<String>) null);

      if (dependencyList != null) {
        Map<String, Edge> dependencies = this.nodeDependencies.get(node.getId());
        if (dependencies == null) {
          dependencies = new HashMap<>();

          for (String dependencyName : dependencyList) {
            dependencyName =
                dependencyName == null ? null : dependencyName.trim();
            if (dependencyName == null || dependencyName.isEmpty()) {
              continue;
            }

            final Edge edge = new Edge(dependencyName, node.getId());
            final Node dependencyNode = this.nodeMap.get(dependencyName);
            if (dependencyNode == null) {
              if (this.duplicateJobs.contains(dependencyName)) {
                edge.setError("Ambiguous Dependency. Duplicates found.");
                dependencies.put(dependencyName, edge);
                this.errors.add(node.getId() + " has ambiguous dependency "
                    + dependencyName);
              } else {
                edge.setError("Dependency not found.");
                dependencies.put(dependencyName, edge);
                this.errors.add(node.getId() + " cannot find dependency "
                    + dependencyName);
              }
            } else if (dependencyNode == node) {
              // We have a self cycle
              edge.setError("Self cycle found.");
              dependencies.put(dependencyName, edge);
              this.errors.add(node.getId() + " has a self cycle");
            } else {
              dependencies.put(dependencyName, edge);
            }
          }

          if (!dependencies.isEmpty()) {
            this.nodeDependencies.put(node.getId(), dependencies);
          }
        }
      }
    }
  }

  private void buildFlowsFromDependencies() {
    // Find all root nodes by finding ones without dependents.
    final HashSet<String> nonRootNodes = new HashSet<>();
    for (final Map<String, Edge> edges : this.nodeDependencies.values()) {
      for (final String sourceId : edges.keySet()) {
        nonRootNodes.add(sourceId);
      }
    }

    // Now create flows. Bad flows are marked invalid
    final Set<String> visitedNodes = new HashSet<>();
    for (final Node base : this.nodeMap.values()) {
      // Root nodes can be discovered when parsing jobs
      if (this.rootNodes.contains(base.getId())
          || !nonRootNodes.contains(base.getId())) {
        this.rootNodes.add(base.getId());
        final Flow flow = new Flow(base.getId());
        final Props jobProp = this.jobPropsMap.get(base.getId());

        // Dedup with sets
        final List<String> successEmailList =
            jobProp.getStringList(CommonJobProperties.SUCCESS_EMAILS,
                Collections.EMPTY_LIST);
        final Set<String> successEmail = new HashSet<>();
        for (final String email : successEmailList) {
          successEmail.add(email.toLowerCase());
        }

        final List<String> failureEmailList =
            jobProp.getStringList(CommonJobProperties.FAILURE_EMAILS,
                Collections.EMPTY_LIST);
        final Set<String> failureEmail = new HashSet<>();
        for (final String email : failureEmailList) {
          failureEmail.add(email.toLowerCase());
        }

        final List<String> notifyEmailList =
            jobProp.getStringList(CommonJobProperties.NOTIFY_EMAILS,
                Collections.EMPTY_LIST);
        for (String email : notifyEmailList) {
          email = email.toLowerCase();
          successEmail.add(email);
          failureEmail.add(email);
        }

        flow.addFailureEmails(failureEmail);
        flow.addSuccessEmails(successEmail);

        flow.addAllFlowProperties(this.flowPropsList);
        constructFlow(flow, base, visitedNodes);
        flow.initialize();
        this.flowMap.put(base.getId(), flow);
      }
    }
  }

  private void constructFlow(final Flow flow, final Node node, final Set<String> visited) {
    visited.add(node.getId());

    flow.addNode(node);
    if (SpecialJobTypes.EMBEDDED_FLOW_TYPE.equals(node.getType())) {
      final Props props = this.jobPropsMap.get(node.getId());
      final String embeddedFlow = props.get(SpecialJobTypes.FLOW_NAME);

      Set<String> embeddedFlows = this.flowDependencies.get(flow.getId());
      if (embeddedFlows == null) {
        embeddedFlows = new HashSet<>();
        this.flowDependencies.put(flow.getId(), embeddedFlows);
      }

      node.setEmbeddedFlowId(embeddedFlow);
      embeddedFlows.add(embeddedFlow);
    }
    final Map<String, Edge> dependencies = this.nodeDependencies.get(node.getId());

    if (dependencies != null) {
      for (Edge edge : dependencies.values()) {
        if (edge.hasError()) {
          flow.addEdge(edge);
        } else if (visited.contains(edge.getSourceId())) {
          // We have a cycle. We set it as an error edge
          edge = new Edge(edge.getSourceId(), node.getId());
          edge.setError("Cyclical dependencies found.");
          this.errors.add("Cyclical dependency found at " + edge.getId());
          flow.addEdge(edge);
        } else {
          // This should not be null
          flow.addEdge(edge);
          final Node sourceNode = this.nodeMap.get(edge.getSourceId());
          constructFlow(flow, sourceNode, visited);
        }
      }
    }

    visited.remove(node.getId());
  }

  private void checkJobProperties(final Project project) {
    // if project is in the memory check whitelist, then we don't need to check
    // its memory settings
    if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
        ProjectWhitelist.WhitelistType.MemoryCheck)) {
      return;
    }

    final String maxXms = this.props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
    final String maxXmx = this.props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
    final long sizeMaxXms = Utils.parseMemString(maxXms);
    final long sizeMaxXmx = Utils.parseMemString(maxXmx);

    for (final String jobName : this.jobPropsMap.keySet()) {

      final Props jobProps = this.jobPropsMap.get(jobName);
      final String xms = jobProps.getString(XMS, null);
      if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
          && Utils.parseMemString(xms) > sizeMaxXms) {
        this.errors.add(String.format(
            "%s: Xms value has exceeded the allowed limit (max Xms = %s)",
            jobName, maxXms));
      }
      final String xmx = jobProps.getString(XMX, null);
      if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
          && Utils.parseMemString(xmx) > sizeMaxXmx) {
        this.errors.add(String.format(
            "%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
            jobName, maxXmx));
      }

      // job callback properties check
      JobCallbackValidator.validate(jobName, this.props, jobProps, this.errors);
    }
  }

  private String getNameWithoutExtension(final File file) {
    final String filename = file.getName();
    final int index = filename.lastIndexOf('.');

    return index < 0 ? filename : filename.substring(0, index);
  }

  private String getRelativeFilePath(final String basePath, final String filePath) {
    return filePath.substring(basePath.length() + 1);
  }

  public ValidationReport loadProject(final Project project, final File projectDir) {
    loadProjectFlow(project, projectDir);
    checkJobProperties(project);
    final ValidationReport report = new ValidationReport();
    report.addErrorMsgs(this.errors);
    return report;
  }

  private static class DirFilter implements FileFilter {

    @Override
    public boolean accept(final File pathname) {
      return pathname.isDirectory();
    }
  }

  private static class SuffixFilter implements FileFilter {

    private final String suffix;

    public SuffixFilter(final String suffix) {
      this.suffix = suffix;
    }

    @Override
    public boolean accept(final File pathname) {
      final String name = pathname.getName();

      return pathname.isFile() && !pathname.isHidden()
          && name.length() > this.suffix.length() && name.endsWith(this.suffix);
    }
  }
}