FlowRunnerYamlTest.java

129 lines | 5.47 kB Blame History Raw Download
/*
* Copyright 2018 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the “License”); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.execapp;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.Status;
import azkaban.project.Project;
import azkaban.test.executions.ExecutionsTestUtil;
import java.io.File;
import java.util.HashMap;
import org.junit.Ignore;
import org.junit.Test;

public class FlowRunnerYamlTest extends FlowRunnerTestBase {

  private static final String BASIC_FLOW_YAML_DIR = "basicflowwithoutendnode";
  private static final String FAIL_BASIC_FLOW_YAML_DIR = "failbasicflowwithoutendnode";
  private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowwithoutendnode";
  private static final String BASIC_FLOW_NAME = "basic_flow";
  private static final String BASIC_FLOW_YAML_FILE = BASIC_FLOW_NAME + ".flow";
  private static final String FAIL_BASIC_FLOW_NAME = "fail_basic_flow";
  private static final String FAIL_BASIC_FLOW_YAML_FILE = FAIL_BASIC_FLOW_NAME + ".flow";
  private static final String EMBEDDED_FLOW_NAME = "embedded_flow";
  private static final String EMBEDDED_FLOW_YAML_FILE = EMBEDDED_FLOW_NAME + ".flow";
  private FlowRunnerTestUtil testUtil;

  @Test
  public void testBasicFlowWithoutEndNode() throws Exception {
    setUp(BASIC_FLOW_YAML_DIR, BASIC_FLOW_YAML_FILE);
    final HashMap<String, String> flowProps = new HashMap<>();
    this.runner = this.testUtil.createFromFlowMap(BASIC_FLOW_NAME, flowProps);
    final ExecutableFlow flow = this.runner.getExecutableFlow();
    FlowRunnerTestUtil.startThread(this.runner);
    assertStatus("jobA", Status.SUCCEEDED);
    assertStatus("jobB", Status.SUCCEEDED);
    assertFlowStatus(flow, Status.RUNNING);
    InteractiveTestJob.getTestJob("jobC").succeedJob();
    assertStatus("jobC", Status.SUCCEEDED);
    assertFlowStatus(flow, Status.SUCCEEDED);
  }

  /**
   * There seems to be an actual race condition bug in the runtime code. See: issue #1921: Flaky
   * test FlowRunnerTestYaml & issue #1311: Potential race condition between flowRunner thread and
   * jetty killing thread. Disable this test until the potential bug is fixed or new DAG engine
   * code is ready.
   */
  @Ignore
  @Test
  public void testKillBasicFlowWithoutEndNode() throws Exception {
    setUp(BASIC_FLOW_YAML_DIR, BASIC_FLOW_YAML_FILE);
    final HashMap<String, String> flowProps = new HashMap<>();
    this.runner = this.testUtil.createFromFlowMap(BASIC_FLOW_NAME, flowProps);
    final ExecutableFlow flow = this.runner.getExecutableFlow();
    FlowRunnerTestUtil.startThread(this.runner);
    assertStatus("jobA", Status.SUCCEEDED);
    assertStatus("jobB", Status.SUCCEEDED);
    this.runner.kill();
    assertStatus("jobC", Status.KILLED);
    assertFlowStatus(flow, Status.KILLED);
  }

  @Ignore
  @Test
  public void testFailBasicFlowWithoutEndNode() throws Exception {
    setUp(FAIL_BASIC_FLOW_YAML_DIR, FAIL_BASIC_FLOW_YAML_FILE);
    final HashMap<String, String> flowProps = new HashMap<>();
    this.runner = this.testUtil.createFromFlowMap(FAIL_BASIC_FLOW_NAME, flowProps);
    final ExecutableFlow flow = this.runner.getExecutableFlow();
    FlowRunnerTestUtil.startThread(this.runner);
    InteractiveTestJob.getTestJob("jobC").failJob();
    assertStatus("jobC", Status.FAILED);
    InteractiveTestJob.getTestJob("jobB").succeedJob();
    assertStatus("jobB", Status.SUCCEEDED);
    InteractiveTestJob.getTestJob("jobA").succeedJob();
    assertStatus("jobA", Status.SUCCEEDED);
    assertStatus("jobD", Status.CANCELLED);
    assertFlowStatus(flow, Status.FAILED);
  }

  @Test
  public void testEmbeddedFlowWithoutEndNode() throws Exception {
    setUp(EMBEDDED_FLOW_YAML_DIR, EMBEDDED_FLOW_YAML_FILE);
    final HashMap<String, String> flowProps = new HashMap<>();
    this.runner = this.testUtil.createFromFlowMap(EMBEDDED_FLOW_NAME, flowProps);
    final ExecutableFlow flow = this.runner.getExecutableFlow();
    FlowRunnerTestUtil.startThread(this.runner);
    assertStatus("jobA", Status.SUCCEEDED);
    assertStatus("embedded_flow1:jobB", Status.SUCCEEDED);
    assertStatus("embedded_flow1:jobC", Status.SUCCEEDED);
    assertStatus("embedded_flow1", Status.SUCCEEDED);
    assertStatus("jobD", Status.SUCCEEDED);
    assertFlowStatus(flow, Status.SUCCEEDED);
  }

  private void setUp(final String projectDir, final String flowYamlFile) throws Exception {
    this.testUtil = new FlowRunnerTestUtil(projectDir, this.temporaryFolder);
    final Project project = this.testUtil.getProject();
    when(this.testUtil.getProjectLoader()
        .getLatestFlowVersion(project.getId(), project.getVersion(), flowYamlFile))
        .thenReturn(1);
    when(this.testUtil.getProjectLoader()
        .getUploadedFlowFile(eq(project.getId()), eq(project.getVersion()),
            eq(flowYamlFile),
            eq(1), any(File.class)))
        .thenReturn(
            ExecutionsTestUtil.getFlowFile(projectDir, flowYamlFile));
  }

}