FlowRunnerPropertyResolutionTest.java

195 lines | 8.509 kB Blame History Raw Download
/*
 * Copyright 2017 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.execapp;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.Status;
import azkaban.project.Project;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;

/**
 * Test the property resolution of jobs in a flow.
 *
 * The tests are contained in execpropstest, and should be resolved in the following fashion, where
 * the later props take precedence over the previous ones.
 *
 * 1. Global props (set in the FlowRunner) 2. Shared job props (depends on job directory) 3. Flow
 * Override properties 4. Previous job outputs to the embedded flow (Only if contained in embedded
 * flow) 5. Embedded flow properties (Only if contained in embedded flow) 6. Previous job outputs
 * (if exists) 7. Job Props
 *
 * The test contains the following structure: job2 -> innerFlow (job1 -> job4 ) -> job3
 *
 * job2 and 4 are in nested directories so should have different shared properties than other jobs.
 */
public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {

  private static final String EXEC_FLOW_DIR = "execpropstest";
  private static final String FLOW_YAML_DIR = "loadpropsflowyamltest";
  private static final String FLOW_NAME = "job3";
  private static final String FLOW_YAML_FILE = FLOW_NAME + ".flow";
  private FlowRunnerTestUtil testUtil;

  /**
   * Tests the basic flow resolution. Flow is defined in execpropstest
   */
  @Test
  public void testPropertyResolution() throws Exception {
    this.testUtil = new FlowRunnerTestUtil(EXEC_FLOW_DIR, this.temporaryFolder);
    assertProperties(false);
  }

  /**
   * Tests the YAML flow resolution. Flow is defined in loadpropsflowyamltest
   */
  @Test
  public void testYamlFilePropertyResolution() throws Exception {
    this.testUtil = new FlowRunnerTestUtil(FLOW_YAML_DIR, this.temporaryFolder);
    final Project project = this.testUtil.getProject();
    when(this.testUtil.getProjectLoader().isFlowFileUploaded(project.getId(), project.getVersion()))
        .thenReturn(true);
    when(this.testUtil.getProjectLoader()
        .getLatestFlowVersion(project.getId(), project.getVersion(), FLOW_YAML_FILE)).thenReturn(1);
    doAnswer(invocation -> {
      final File flowFile = this.temporaryFolder.newFile(FLOW_YAML_FILE);
      FileUtils.copyFile(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE), flowFile);
      return flowFile;
    }).when(this.testUtil.getProjectLoader())
        .getUploadedFlowFile(project.getId(), project.getVersion(), 1, FLOW_YAML_FILE);
    assertProperties(true);
  }

  /**
   * Helper method to test the flow property resolution.
   */
  private void assertProperties(final boolean isAzkabanFlowVersion20) throws Exception {
    final HashMap<String, String> flowProps = new HashMap<>();
    flowProps.put("props7", "flow7");
    flowProps.put("props6", "flow6");
    flowProps.put("props5", "flow5");
    final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME, flowProps);
    // Todo jamiesjc: remove below line after project_flow_files DB change is rolled out.
    runner.setAzkabanFlowVersion20(isAzkabanFlowVersion20);
    final Map<String, ExecutableNode> nodeMap = new HashMap<>();
    createNodeMap(runner.getExecutableFlow(), nodeMap);
    final ExecutableFlow flow = runner.getExecutableFlow();

    // 1. Start flow. Job 2 should start
    FlowRunnerTestUtil.startThread(runner);
    assertStatus(flow, "job2", Status.RUNNING);

    // Job 2 is a normal job.
    // Only the flow overrides and the shared properties matter
    final Props job2Props = nodeMap.get("job2").getInputProps();
    Assert.assertEquals("shared1", job2Props.get("props1"));
    Assert.assertEquals("job2", job2Props.get("props2"));
    Assert.assertEquals("moo3", job2Props.get("props3"));
    Assert.assertEquals("job7", job2Props.get("props7"));
    Assert.assertEquals("flow5", job2Props.get("props5"));
    Assert.assertEquals("flow6", job2Props.get("props6"));
    Assert.assertEquals("shared4", job2Props.get("props4"));
    Assert.assertEquals("shared8", job2Props.get("props8"));

    // Job 1 is inside another flow, and is nested in a different directory
    // The priority order should be:
    // job1->innerflow->job2.output->flow.overrides->job1 shared props
    final Props job2Generated = new Props();
    job2Generated.put("props6", "gjob6");
    job2Generated.put("props9", "gjob9");
    job2Generated.put("props10", "gjob10");
    InteractiveTestJob.getTestJob("job2").succeedJob(job2Generated);
    assertStatus(flow, "innerflow:job1", Status.RUNNING);

    final Props job1Props = nodeMap.get("innerflow:job1").getInputProps();
    Assert.assertEquals("job1", job1Props.get("props1"));
    Assert.assertEquals("job2", job1Props.get("props2"));
    Assert.assertEquals("job8", job1Props.get("props8"));
    Assert.assertEquals("gjob9", job1Props.get("props9"));
    Assert.assertEquals("gjob10", job1Props.get("props10"));
    Assert.assertEquals("innerflow6", job1Props.get("props6"));
    Assert.assertEquals("innerflow5", job1Props.get("props5"));
    Assert.assertEquals("flow7", job1Props.get("props7"));
    Assert.assertEquals("moo3", job1Props.get("props3"));
    Assert.assertEquals("moo4", job1Props.get("props4"));

    // Job 4 is inside another flow and takes output from job 1
    // The priority order should be:
    // job4->job1.output->innerflow->job2.output->flow.overrides->job4 shared
    // props
    final Props job1GeneratedProps = new Props();
    job1GeneratedProps.put("props9", "g2job9");
    job1GeneratedProps.put("props7", "g2job7");
    InteractiveTestJob.getTestJob("innerflow:job1").succeedJob(
        job1GeneratedProps);
    assertStatus(flow, "innerflow:job4", Status.RUNNING);
    final Props job4Props = nodeMap.get("innerflow:job4").getInputProps();
    Assert.assertEquals("job8", job4Props.get("props8"));
    Assert.assertEquals("job9", job4Props.get("props9"));
    Assert.assertEquals("g2job7", job4Props.get("props7"));
    Assert.assertEquals("innerflow5", job4Props.get("props5"));
    Assert.assertEquals("innerflow6", job4Props.get("props6"));
    Assert.assertEquals("gjob10", job4Props.get("props10"));
    Assert.assertEquals("shared4", job4Props.get("props4"));
    Assert.assertEquals("shared1", job4Props.get("props1"));
    Assert.assertEquals("shared2", job4Props.get("props2"));
    Assert.assertEquals("moo3", job4Props.get("props3"));

    // Job 3 is a normal job taking props from an embedded flow
    // The priority order should be:
    // job3->innerflow.output->flow.overrides->job3.sharedprops
    final Props job4GeneratedProps = new Props();
    job4GeneratedProps.put("props9", "g4job9");
    job4GeneratedProps.put("props6", "g4job6");
    InteractiveTestJob.getTestJob("innerflow:job4").succeedJob(
        job4GeneratedProps);
    assertStatus(flow, FLOW_NAME, Status.RUNNING);
    final Props job3Props = nodeMap.get("job3").getInputProps();
    Assert.assertEquals("job3", job3Props.get("props3"));
    Assert.assertEquals("g4job6", job3Props.get("props6"));
    Assert.assertEquals("g4job9", job3Props.get("props9"));
    Assert.assertEquals("flow7", job3Props.get("props7"));
    Assert.assertEquals("flow5", job3Props.get("props5"));
    Assert.assertEquals("shared1", job3Props.get("props1"));
    Assert.assertEquals("shared2", job3Props.get("props2"));
    Assert.assertEquals("moo4", job3Props.get("props4"));
  }

  private void createNodeMap(final ExecutableFlowBase flow,
      final Map<String, ExecutableNode> nodeMap) {
    for (final ExecutableNode node : flow.getExecutableNodes()) {
      nodeMap.put(node.getNestedId(), node);

      if (node instanceof ExecutableFlowBase) {
        createNodeMap((ExecutableFlowBase) node, nodeMap);
      }
    }
  }
}