DirectoryYamlFlowLoaderTest.java

219 lines | 10.09 kB Blame History Raw Download
/*
* Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the “License”); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package azkaban.project;

import static org.assertj.core.api.Assertions.assertThat;

import azkaban.Constants;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectoryYamlFlowLoaderTest {

  private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoaderTest
      .class);

  private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
  private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
  private static final String RECURSIVE_DIRECTORY_FLOW_YAML_DIR = "recursivedirectoryyamltest";
  private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowyamltest";
  private static final String MULTIPLE_EMBEDDED_FLOW_YAML_DIR = "multipleembeddedflowyamltest";
  private static final String CYCLE_FOUND_YAML_DIR = "cyclefoundyamltest";
  private static final String DUPLICATE_NODENAME_YAML_DIR = "duplicatenodenamesyamltest";
  private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
  private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
  private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
  private static final String INVALID_CONDITION_YAML_DIR = "invalidconditionalflowyamltest";
  private static final String BASIC_FLOW_1 = "basic_flow";
  private static final String BASIC_FLOW_2 = "basic_flow2";
  private static final String EMBEDDED_FLOW = "embedded_flow";
  private static final String EMBEDDED_FLOW_1 = "embedded_flow" + Constants.PATH_DELIMITER +
      "embedded_flow1";
  private static final String EMBEDDED_FLOW_2 =
      "embedded_flow" + Constants.PATH_DELIMITER + "embedded_flow1" + Constants.PATH_DELIMITER
          + "embedded_flow2";
  private static final String EMBEDDED_FLOW_B = "embedded_flow_b";
  private static final String EMBEDDED_FLOW_B1 =
      "embedded_flow_b" + Constants.PATH_DELIMITER + "embedded_flow1";
  private static final String EMBEDDED_FLOW_B2 =
      "embedded_flow_b" + Constants.PATH_DELIMITER + "embedded_flow1" + Constants.PATH_DELIMITER
          + "embedded_flow2";
  private static final String DUPLICATE_NODENAME_FLOW_FILE = "duplicate_nodename.flow";
  private static final String DEPENDENCY_UNDEFINED_FLOW_FILE = "dependency_undefined.flow";
  private static final String CYCLE_FOUND_FLOW = "cycle_found";
  private static final String CYCLE_FOUND_ERROR = "Cycles found.";
  private static final String SHELL_PWD = "invalid_jobprops:shell_pwd";
  private Project project;

  @Before
  public void setUp() {
    this.project = new Project(12, "myTestProject");
  }

  @Test
  public void testLoadBasicYamlFile() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(BASIC_FLOW_YAML_DIR));
    checkFlowLoaderProperties(loader, 0, 1, 1);
    checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 1, 3, null);
  }

  @Test
  public void testLoadMultipleYamlFiles() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(MULTIPLE_FLOW_YAML_DIR));
    checkFlowLoaderProperties(loader, 0, 2, 2);
    checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 1, 3, null);
    checkFlowProperties(loader, BASIC_FLOW_2, 0, 3, 1, 2, null);
  }

  @Test
  public void testLoadYamlFileRecursively() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project,
        ExecutionsTestUtil.getFlowDir(RECURSIVE_DIRECTORY_FLOW_YAML_DIR));
    checkFlowLoaderProperties(loader, 0, 2, 2);
    checkFlowProperties(loader, BASIC_FLOW_1, 0, 3, 1, 2, null);
    checkFlowProperties(loader, BASIC_FLOW_2, 0, 4, 1, 3, null);
  }

  @Test
  public void testLoadEmbeddedFlowYamlFile() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(EMBEDDED_FLOW_YAML_DIR));
    checkFlowLoaderProperties(loader, 0, 3, 3);
    checkFlowProperties(loader, EMBEDDED_FLOW, 0, 4, 1, 3, null);
    checkFlowProperties(loader, EMBEDDED_FLOW_1, 0, 4, 1, 3, null);
    checkFlowProperties(loader, EMBEDDED_FLOW_2, 0, 2, 1, 1, null);
  }

  @Test
  public void testLoadMultipleEmbeddedFlowYamlFiles() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project,
        ExecutionsTestUtil.getFlowDir(MULTIPLE_EMBEDDED_FLOW_YAML_DIR));
    checkFlowLoaderProperties(loader, 0, 6, 6);
    checkFlowProperties(loader, EMBEDDED_FLOW, 0, 4, 1, 3, null);
    checkFlowProperties(loader, EMBEDDED_FLOW_1, 0, 4, 1, 3, null);
    checkFlowProperties(loader, EMBEDDED_FLOW_2, 0, 2, 1, 1, null);
    checkFlowProperties(loader, EMBEDDED_FLOW_B, 0, 4, 1, 3, null);
    checkFlowProperties(loader, EMBEDDED_FLOW_B1, 0, 4, 1, 3, null);
    checkFlowProperties(loader, EMBEDDED_FLOW_B2, 0, 2, 1, 1, null);
  }

  @Test
  public void testLoadInvalidFlowYamlFileWithDuplicateNodeNames() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project,
        ExecutionsTestUtil.getFlowDir(DUPLICATE_NODENAME_YAML_DIR));
    checkFlowLoaderProperties(loader, 1, 0, 0);
    assertThat(loader.getErrors()).containsExactly(
        "Failed to validate nodeBean for " + DUPLICATE_NODENAME_FLOW_FILE
            + ". Duplicate nodes found or dependency undefined.");
  }

  @Test
  public void testLoadInvalidFlowYamlFileWithUndefinedDependency() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project,
        ExecutionsTestUtil.getFlowDir(DEPENDENCY_UNDEFINED_YAML_DIR));
    checkFlowLoaderProperties(loader, 1, 0, 0);
    assertThat(loader.getErrors()).containsExactly(
        "Failed to validate nodeBean for " + DEPENDENCY_UNDEFINED_FLOW_FILE
            + ". Duplicate nodes found or dependency undefined.");
  }

  @Test
  public void testLoadInvalidFlowYamlFileWithCycle() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(CYCLE_FOUND_YAML_DIR));
    checkFlowLoaderProperties(loader, 1, 1, 1);
    checkFlowProperties(loader, CYCLE_FOUND_FLOW, 1, 4, 1, 4, CYCLE_FOUND_ERROR);
  }

  @Test
  public void testLoadFlowYamlFileWithInvalidJobProps() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project,
        ExecutionsTestUtil.getFlowDir(INVALID_JOBPROPS_YAML_DIR));
    checkFlowLoaderProperties(loader, 1, 1, 1);
    assertThat(loader.getErrors()).containsExactly(
        SHELL_PWD + ": Xms value has exceeded the allowed limit (max Xms = "
            + Constants.JobProperties.MAX_XMS_DEFAULT + ")");
  }

  @Test
  public void testLoadNoFlowYamlFile() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(NO_FLOW_YAML_DIR));
    checkFlowLoaderProperties(loader, 0, 0, 0);
  }

  @Test
  public void testFlowYamlFileWithInvalidConditions() {
    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_CONDITION_YAML_DIR));
    checkFlowLoaderProperties(loader, 5, 5, 5);
    Assert.assertTrue(
        loader.getErrors().contains("Invalid condition for jobB: jobC doesn't exist in the flow."));
    Assert.assertTrue(loader.getErrors().contains(
        "Invalid condition for jobA: should not define condition on its descendant node jobD."));
    Assert.assertTrue(
        loader.getErrors().contains("Invalid condition for jobB: operand is an empty string."));
    Assert.assertTrue(loader.getErrors().contains(
        "Invalid condition for jobB: cannot resolve the condition. Please check the syntax for supported conditions."));
    Assert.assertTrue(loader.getErrors().contains(
        "Invalid condition for jobB: cannot combine more than one conditionOnJobStatus macros."));
  }

  private void checkFlowLoaderProperties(final DirectoryYamlFlowLoader loader, final int numError,
      final int numFlowMap, final int numEdgeMap) {
    assertThat(loader.getErrors().size()).isEqualTo(numError);
    assertThat(loader.getFlowMap().size()).isEqualTo(numFlowMap);
    assertThat(loader.getEdgeMap().size()).isEqualTo(numEdgeMap);
  }

  private void checkFlowProperties(final DirectoryYamlFlowLoader loader, final String flowName,
      final int numError, final int numNode, final int numFlowProps, final int numEdge, final
  String edgeError) {
    assertThat(loader.getFlowMap().containsKey(flowName)).isTrue();
    final Flow flow = loader.getFlowMap().get(flowName);
    if (numError != 0) {
      assertThat(flow.getErrors().size()).isEqualTo(numError);
    }
    assertThat(flow.getNodes().size()).isEqualTo(numNode);
    assertThat(flow.getAllFlowProps().size()).isEqualTo(numFlowProps);

    // Verify flow edges
    assertThat(loader.getEdgeMap().get(flowName).size()).isEqualTo(numEdge);
    assertThat(flow.getEdges().size()).isEqualTo(numEdge);
    for (final Edge edge : loader.getEdgeMap().get(flowName)) {
      this.logger.info(flowName + ".flow has edge: " + edge.getId());
      if (edge.getError() != null) {
        assertThat(edge.getError()).isEqualTo(edgeError);
      }
    }
  }
}