FlowTriggerServiceTest.java

249 lines | 10.825 kB Blame History Raw Download
/*
 * Copyright 2018 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.flowtrigger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerException;
import azkaban.flow.Flow;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
import azkaban.flowtrigger.testplugin.TestDependencyCheck;
import azkaban.flowtrigger.util.TestUtil;
import azkaban.project.FlowTrigger;
import azkaban.project.FlowTriggerDependency;
import azkaban.project.Project;
import azkaban.utils.Emailer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;


public class FlowTriggerServiceTest {

  private static final FlowTriggerInstanceLoader flowTriggerInstanceLoader = new
      MockFlowTriggerInstanceLoader();
  private static TestDependencyCheck testDepCheck;
  private static FlowTriggerService flowTriggerService;
  private static ExecutorManager executorManager;

  @BeforeClass
  public static void setup() throws Exception {
    testDepCheck = new TestDependencyCheck();
    final FlowTriggerDependencyPluginManager pluginManager = mock(FlowTriggerDependencyPluginManager
        .class);
    when(pluginManager.getDependencyCheck(ArgumentMatchers.eq("TestDependencyCheck")))
        .thenReturn(testDepCheck);

    executorManager = mock(ExecutorManager.class);
    when(executorManager.submitExecutableFlow(any(), anyString())).thenReturn("return");

    final Emailer emailer = mock(Emailer.class);
    Mockito.doNothing().when(emailer).sendEmail(any(), anyString(), anyString());

    final TriggerInstanceProcessor triggerInstProcessor = new TriggerInstanceProcessor(
        executorManager,
        flowTriggerInstanceLoader, emailer);
    final DependencyInstanceProcessor depInstProcessor = new DependencyInstanceProcessor
        (flowTriggerInstanceLoader);

    final FlowTriggerExecutionCleaner executionCleaner = new FlowTriggerExecutionCleaner(
        flowTriggerInstanceLoader);

    flowTriggerService = new FlowTriggerService(pluginManager, triggerInstProcessor,
        depInstProcessor, flowTriggerInstanceLoader, executionCleaner);
    flowTriggerService.start();
  }

  @Before
  public void cleanup() {
    ((MockFlowTriggerInstanceLoader) flowTriggerInstanceLoader).clear();
    reset(executorManager);
  }

  private Project createProject() {
    final Project project = new Project(1, "project1");
    project.setVersion(1);
    final Flow flow = new Flow("testflow");
    final Map<String, Flow> flowMap = new HashMap<>();
    flowMap.put("testflow", flow);
    project.setFlows(flowMap);
    return project;
  }

  @Test
  public void testStartTriggerCancelledByTimeout() throws InterruptedException {

    final List<FlowTriggerDependency> deps = new ArrayList<>();
    deps.add(TestUtil.createTestDependency("2secs", 2, false));
    deps.add(TestUtil.createTestDependency("8secs", 8, false));
    deps.add(TestUtil.createTestDependency("9secs", 9, false));
    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(5));
    for (int i = 0; i < 30; i++) {
      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
    }
    Thread.sleep(Duration.ofSeconds(6).toMillis());
    assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
    assertThat(triggerInstances).hasSize(30);
    for (final TriggerInstance inst : triggerInstances) {
      assertThat(inst.getStatus()).isEqualTo(Status.CANCELLED);
      for (final DependencyInstance depInst : inst.getDepInstances()) {
        if (depInst.getDepName().equals("2secs")) {
          assertThat(depInst.getStatus()).isEqualTo(Status.SUCCEEDED);
        } else if (depInst.getDepName().equals("8secs")) {
          assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
          assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.TIMEOUT);
        } else if (depInst.getDepName().equals("9secs")) {
          assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
          assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.TIMEOUT);
        }
      }
    }
  }

  @Test
  public void testStartTriggerCancelledManually() throws InterruptedException {
    final List<FlowTriggerDependency> deps = new ArrayList<>();
    deps.add(TestUtil.createTestDependency("2secs", 2, false));
    deps.add(TestUtil.createTestDependency("8secs", 8, false));
    deps.add(TestUtil.createTestDependency("9secs", 9, false));
    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(5));
    for (int i = 0; i < 30; i++) {
      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
    }

    Thread.sleep(Duration.ofMillis(500).toMillis());
    for (final TriggerInstance runningTrigger : flowTriggerService.getRunningTriggers()) {
      flowTriggerService.cancelTriggerInstance(runningTrigger, CancellationCause.MANUAL);
    }
    Thread.sleep(Duration.ofMillis(500).toMillis());
    assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
    assertThat(triggerInstances).hasSize(30);
    for (final TriggerInstance inst : triggerInstances) {
      assertThat(inst.getStatus()).isEqualTo(Status.CANCELLED);
      for (final DependencyInstance depInst : inst.getDepInstances()) {
        assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
        assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.MANUAL);
      }
    }
  }

  @Test
  public void testStartTriggerCancelledByFailure() throws InterruptedException {
    final List<FlowTriggerDependency> deps = new ArrayList<>();
    deps.add(TestUtil.createTestDependency("2secs", 2, true));
    deps.add(TestUtil.createTestDependency("8secs", 8, false));
    deps.add(TestUtil.createTestDependency("9secs", 9, false));
    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
    for (int i = 0; i < 30; i++) {
      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
    }
    Thread.sleep(Duration.ofSeconds(1).toMillis());
    assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
    assertThat(triggerInstances).hasSize(30);
    for (final TriggerInstance inst : triggerInstances) {
      assertThat(inst.getStatus()).isEqualTo(Status.CANCELLED);
      for (final DependencyInstance depInst : inst.getDepInstances()) {
        if (depInst.getDepName().equals("2secs")) {
          assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
          assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.FAILURE);
        } else {
          assertThat(depInst.getStatus()).isEqualTo(Status.CANCELLED);
          assertThat(depInst.getCancellationCause()).isEqualTo(CancellationCause.CASCADING);
        }
      }
    }
  }

  @Test
  public void testStartTriggerSuccess() throws InterruptedException {
    final List<FlowTriggerDependency> deps = new ArrayList<>();
    deps.add(TestUtil.createTestDependency("2secs", 2, false));
    deps.add(TestUtil.createTestDependency("3secs", 3, false));
    deps.add(TestUtil.createTestDependency("4secs", 4, false));
    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
    for (int i = 0; i < 30; i++) {
      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
    }
    Thread.sleep(Duration.ofSeconds(5).toMillis());
    assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
    assertThat(triggerInstances).hasSize(30);
    for (final TriggerInstance inst : triggerInstances) {
      assertThat(inst.getStatus()).isEqualTo(Status.SUCCEEDED);
    }
  }

  @Test
  public void testStartZeroDependencyTrigger()
      throws InterruptedException, ExecutorManagerException {
    final List<FlowTriggerDependency> deps = new ArrayList<>();
    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
    for (int i = 0; i < 30; i++) {
      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
    }
    Thread.sleep(Duration.ofSeconds(1).toMillis());
    // zero dependency trigger will launch associated flow immediately
    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRunningTriggers();
    assertThat(triggerInstances).isEmpty();
    verify(executorManager, times(30)).submitExecutableFlow(any(), anyString());
  }

  @Test
  public void testRecovery() throws Exception {
    final List<FlowTriggerDependency> deps = new ArrayList<>();
    deps.add(TestUtil.createTestDependency("2secs", 2, false));
    deps.add(TestUtil.createTestDependency("3secs", 3, false));
    deps.add(TestUtil.createTestDependency("4secs", 4, false));
    final FlowTrigger flowTrigger = TestUtil.createTestFlowTrigger(deps, Duration.ofSeconds(10));
    for (int i = 0; i < 30; i++) {
      flowTriggerService.startTrigger(flowTrigger, "testflow", 1, "test", createProject());
    }
    Thread.sleep(Duration.ofSeconds(1).toMillis());
    flowTriggerService.shutdown();
    setup();
    Thread.sleep(Duration.ofSeconds(5).toMillis());
    assertThat(flowTriggerService.getRunningTriggers()).isEmpty();
    final Collection<TriggerInstance> triggerInstances = flowTriggerService.getRecentlyFinished();
    assertThat(triggerInstances).hasSize(30);
    for (final TriggerInstance inst : triggerInstances) {
      assertThat(inst.getStatus()).isEqualTo(Status.SUCCEEDED);
    }
  }
}