ExecutorManagerTest.java

540 lines | 22.685 kB Blame History Raw Download
/*
 * Copyright 2017 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.executor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.alert.Alerter;
import azkaban.metrics.CommonMetrics;
import azkaban.metrics.MetricsManager;
import azkaban.user.User;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.TestUtils;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;

/**
 * Test class for executor manager
 */
public class ExecutorManagerTest {

  private final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
  private final CommonMetrics commonMetrics = new CommonMetrics(
      new MetricsManager(new MetricRegistry()));
  private ExecutorManager manager;
  private ExecutorLoader loader;
  private Props props;
  private User user;
  private ExecutableFlow flow1;
  private ExecutableFlow flow2;
  private AlerterHolder alertHolder;
  private ExecutorApiGateway apiGateway;
  private Alerter mailAlerter;
  private RunningExecutions runningExecutions;
  private ExecutorManagerUpdaterStage updaterStage;

  @Before
  public void setup() {
    this.props = new Props();
    this.mailAlerter = mock(Alerter.class);
    this.alertHolder = mock(AlerterHolder.class);
    when(this.alertHolder.get("email")).thenReturn(this.mailAlerter);
    this.loader = new MockExecutorLoader();
    this.runningExecutions = new RunningExecutions();
    this.updaterStage = new ExecutorManagerUpdaterStage();
  }

  @After
  public void tearDown() {
    if (this.manager != null) {
      this.manager.shutdown();
    }
  }

  /*
   * Helper method to create a ExecutorManager Instance
   */
  private ExecutorManager createMultiExecutorManagerInstance() throws Exception {
    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
    this.props.put(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, "false");
    this.loader.addExecutor("localhost", 12345);
    this.loader.addExecutor("localhost", 12346);
    return createExecutorManager();
  }

  /*
   * Test create an executor manager instance without any executor local or
   * remote
   */
  @Test(expected = ExecutorManagerException.class)
  public void testNoExecutorScenario() throws Exception {
    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
    @SuppressWarnings("unused") final ExecutorManager manager = createExecutorManager();
  }

  /*
   * Test error message with unsupported local executor conf
   */
  @Test
  public void testLocalExecutorScenario() {
    this.props.put(ConfigurationKeys.EXECUTOR_PORT, 12345);
    final Throwable thrown = catchThrowable(() -> createExecutorManager());
    assertThat(thrown).isInstanceOf(IllegalArgumentException.class);
    assertThat(thrown.getMessage()).isEqualTo(
        "azkaban.use.multiple.executors must be true. Single executor mode is not supported any more.");
  }

  /*
   * Test executor manager initialization with multiple executors
   */
  @Test
  public void testMultipleExecutorScenario() throws Exception {
    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
    final Executor executor1 = this.loader.addExecutor("localhost", 12345);
    final Executor executor2 = this.loader.addExecutor("localhost", 12346);

    final ExecutorManager manager = createExecutorManager();
    final Set<Executor> activeExecutors =
        new HashSet(manager.getAllActiveExecutors());
    Assert.assertArrayEquals(activeExecutors.toArray(), new Executor[]{
        executor1, executor2});
  }

  private ExecutorManager createExecutorManager()
      throws ExecutorManagerException {
    // TODO rename this test to ExecutorManagerIntegrationTest & create separate unit tests as well?
    final ActiveExecutors activeExecutors = new ActiveExecutors(this.loader);
    final ExecutionFinalizer executionFinalizer = new ExecutionFinalizer(this.loader,
        this.updaterStage, this.alertHolder, this.runningExecutions);
    final RunningExecutionsUpdaterThread updaterThread = new RunningExecutionsUpdaterThread(
        new RunningExecutionsUpdater(
            this.updaterStage, this.alertHolder, this.commonMetrics, this.apiGateway,
            this.runningExecutions, executionFinalizer), this.runningExecutions);
    updaterThread.waitTimeIdleMs = 0;
    updaterThread.waitTimeMs = 0;
    final ExecutorManager executorManager = new ExecutorManager(this.props, this.loader,
        this.commonMetrics, this.apiGateway, this.runningExecutions, activeExecutors,
        this.updaterStage, executionFinalizer, updaterThread);
    executorManager.setSleepAfterDispatchFailure(Duration.ZERO);
    executorManager.initialize();
    return executorManager;
  }

  /*
   * Test executor manager active executor reload
   */
  @Test
  public void testSetupExecutorsSucess() throws Exception {
    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
    final Executor executor1 = this.loader.addExecutor("localhost", 12345);
    final ExecutorManager manager = createExecutorManager();
    Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
        new Executor[]{executor1});

    // mark older executor as inactive
    executor1.setActive(false);
    this.loader.updateExecutor(executor1);
    final Executor executor2 = this.loader.addExecutor("localhost", 12346);
    final Executor executor3 = this.loader.addExecutor("localhost", 12347);
    manager.setupExecutors();

    Assert.assertArrayEquals(manager.getAllActiveExecutors().toArray(),
        new Executor[]{executor2, executor3});
  }

  /*
   * Test executor manager active executor reload and resulting in no active
   * executors
   */
  @Test(expected = ExecutorManagerException.class)
  public void testSetupExecutorsException() throws Exception {
    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
    final Executor executor1 = this.loader.addExecutor("localhost", 12345);
    final ExecutorManager manager = createExecutorManager();
    final Set<Executor> activeExecutors =
        new HashSet(manager.getAllActiveExecutors());
    Assert.assertArrayEquals(activeExecutors.toArray(),
        new Executor[]{executor1});

    // mark older executor as inactive
    executor1.setActive(false);
    this.loader.updateExecutor(executor1);
    manager.setupExecutors();
  }

  /* Test disabling queue process thread to pause dispatching */
  @Test
  public void testDisablingQueueProcessThread() throws Exception {
    final ExecutorManager manager = createMultiExecutorManagerInstance();
    manager.enableQueueProcessorThread();
    Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
    manager.disableQueueProcessorThread();
    Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
  }

  /* Test renabling queue process thread to pause restart dispatching */
  @Test
  public void testEnablingQueueProcessThread() throws Exception {
    final ExecutorManager manager = createMultiExecutorManagerInstance();
    Assert.assertEquals(manager.isQueueProcessorThreadActive(), false);
    manager.enableQueueProcessorThread();
    Assert.assertEquals(manager.isQueueProcessorThreadActive(), true);
  }

  /* Test submit a non-dispatched flow */
  @Test
  public void testQueuedFlows() throws Exception {
    final ExecutorManager manager = createMultiExecutorManagerInstance();
    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
    flow1.setExecutionId(1);
    final ExecutableFlow flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
    flow2.setExecutionId(2);

    final User testUser = TestUtils.getTestUser();
    manager.submitExecutableFlow(flow1, testUser.getUserId());
    manager.submitExecutableFlow(flow2, testUser.getUserId());

    final List<Integer> testFlows = Arrays.asList(flow1.getExecutionId(), flow2.getExecutionId());

    final List<Pair<ExecutionReference, ExecutableFlow>> queuedFlowsDB =
        this.loader.fetchQueuedFlows();
    Assert.assertEquals(queuedFlowsDB.size(), testFlows.size());
    // Verify things are correctly setup in db
    for (final Pair<ExecutionReference, ExecutableFlow> pair : queuedFlowsDB) {
      Assert.assertTrue(testFlows.contains(pair.getSecond().getExecutionId()));
    }

    // Verify running flows using old definition of "running" flows i.e. a
    // non-dispatched flow is also considered running
    final List<Integer> managerActiveFlows = manager.getRunningFlows()
        .stream().map(ExecutableFlow::getExecutionId).collect(Collectors.toList());
    Assert.assertTrue(managerActiveFlows.containsAll(testFlows)
        && testFlows.containsAll(managerActiveFlows));

    // Verify getQueuedFlowIds method
    Assert.assertEquals("[1, 2]", manager.getQueuedFlowIds());
  }

  /* Test submit duplicate flow when previous instance is not dispatched */
  @Test(expected = ExecutorManagerException.class)
  public void testDuplicateQueuedFlows() throws Exception {
    final ExecutorManager manager = createMultiExecutorManagerInstance();
    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
    flow1.getExecutionOptions().setConcurrentOption(
        ExecutionOptions.CONCURRENT_OPTION_SKIP);

    final User testUser = TestUtils.getTestUser();
    manager.submitExecutableFlow(flow1, testUser.getUserId());
    manager.submitExecutableFlow(flow1, testUser.getUserId());
  }

  /*
   * Test killing a job in preparation stage at webserver side i.e. a
   * non-dispatched flow
   */
  @Test
  public void testKillQueuedFlow() throws Exception {
    final ExecutorManager manager = createMultiExecutorManagerInstance();
    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
    final User testUser = TestUtils.getTestUser();
    manager.submitExecutableFlow(flow1, testUser.getUserId());

    manager.cancelFlow(flow1, testUser.getUserId());
    final ExecutableFlow fetchedFlow =
        this.loader.fetchExecutableFlow(flow1.getExecutionId());
    Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);

    Assert.assertFalse(manager.getRunningFlows().contains(flow1));
  }

  /* Flow has been running on an executor but is not any more (for example because of restart) */
  @Test
  public void testNotFoundFlows() throws Exception {
    testSetUpForRunningFlows();
    this.manager.start();
    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
    when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
    mockFlowDoesNotExist();
    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
    final ExecutableFlow fetchedFlow = waitFlowFinished(flow1);
    Assert.assertEquals(fetchedFlow.getStatus(), Status.FAILED);
  }

  /**
   * 1. Executor 1 throws an exception when trying to dispatch to it 2. ExecutorManager should try
   * next executor 3. Executor 2 accepts the dispatched execution
   */
  @Test
  public void testDispatchException() throws Exception {
    testSetUpForRunningFlows();
    this.manager.start();
    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
    doReturn(flow1).when(this.loader).fetchExecutableFlow(-1);
    mockFlowDoesNotExist();
    when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
        .thenThrow(new ExecutorManagerException("Mocked dispatch exception"))
        .thenReturn(null);
    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
    waitFlowFinished(flow1);
    verify(this.apiGateway)
        .callWithExecutable(flow1, this.manager.fetchExecutor(1), ConnectorParams.EXECUTE_ACTION);
    verify(this.apiGateway)
        .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
    verify(this.loader, Mockito.times(1)).unassignExecutor(-1);
  }

  /**
   * ExecutorManager should try to dispatch to all executors & when both fail it should remove the
   * execution from queue and finalize it.
   */
  @Test
  public void testDispatchFailed() throws Exception {
    testSetUpForRunningFlows();
    this.manager.start();
    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
    flow1.getExecutionOptions().setFailureEmails(Arrays.asList("test@example.com"));
    when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);
    when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
        .thenThrow(new ExecutorManagerException("Mocked dispatch exception"));
    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
    waitFlowFinished(flow1);
    verify(this.apiGateway)
        .callWithExecutable(flow1, this.manager.fetchExecutor(1), ConnectorParams.EXECUTE_ACTION);
    verify(this.apiGateway)
        .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);
    verify(this.loader, Mockito.times(2)).unassignExecutor(-1);
    verify(this.mailAlerter).alertOnError(eq(flow1),
        eq("Failed to dispatch queued execution derived-member-data because reached "
            + "azkaban.maxDispatchingErrors (tried 2 executors)"),
        contains("Mocked dispatch exception"));
  }

  private void mockFlowDoesNotExist() throws Exception {
    mockUpdateResponse(ImmutableMap.of(ConnectorParams.RESPONSE_UPDATED_FLOWS,
        Collections.singletonList(ImmutableMap.of(
            ConnectorParams.UPDATE_MAP_EXEC_ID, -1,
            "error", "Flow does not exist"))));
  }

  // Suppress "unchecked generic array creation for varargs parameter".
  // No way to avoid this when mocking a method with generic varags.
  @SuppressWarnings("unchecked")
  private void mockUpdateResponse(
      final Map<String, List<Map<String, Object>>> map) throws Exception {
    doReturn(map).when(this.apiGateway).updateExecutions(any(), any());
  }

  /*
   * Added tests for runningFlows
   * TODO: When removing queuedFlows cache, will refactor rest of the ExecutorManager test cases
   */
  @Test
  public void testSubmitFlows() throws Exception {
    testSetUpForRunningFlows();
    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
    verify(this.loader).uploadExecutableFlow(flow1);
    verify(this.loader).addActiveExecutableReference(any());
  }

  // Too many concurrent flows will fail job submission
  @Test(expected = ExecutorManagerException.class)
  public void testTooManySubmitFlows() throws Exception {
    testSetUpForRunningFlows();
    final ExecutableFlow flow1 = TestUtils
        .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
    flow1.setExecutionId(101);
    final ExecutableFlow flow2 = TestUtils
        .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
    flow2.setExecutionId(102);
    final ExecutableFlow flow3 = TestUtils
        .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
    flow3.setExecutionId(103);
    final ExecutableFlow flow4 = TestUtils
        .createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
    flow4.setExecutionId(104);
    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
    verify(this.loader).uploadExecutableFlow(flow1);
    this.manager.submitExecutableFlow(flow2, this.user.getUserId());
    verify(this.loader).uploadExecutableFlow(flow2);
    this.manager.submitExecutableFlow(flow3, this.user.getUserId());
    this.manager.submitExecutableFlow(flow4, this.user.getUserId());
  }

  @Ignore
  @Test
  public void testFetchAllActiveFlows() throws Exception {
    testSetUpForRunningFlows();
    final List<ExecutableFlow> flows = this.manager.getRunningFlows();
    for (final Pair<ExecutionReference, ExecutableFlow> pair : this.activeFlows.values()) {
      Assert.assertTrue(flows.contains(pair.getSecond()));
    }
  }

  @Ignore
  @Test
  public void testFetchActiveFlowByProject() throws Exception {
    testSetUpForRunningFlows();
    final List<Integer> executions = this.manager.getRunningFlows(this.flow1.getProjectId(),
        this.flow1.getFlowId());
    Assert.assertTrue(executions.contains(this.flow1.getExecutionId()));
    Assert
        .assertTrue(this.manager.isFlowRunning(this.flow1.getProjectId(), this.flow1.getFlowId()));
  }

  @Ignore
  @Test
  public void testFetchActiveFlowWithExecutor() throws Exception {
    testSetUpForRunningFlows();
    final List<Pair<ExecutableFlow, Optional<Executor>>> activeFlowsWithExecutor =
        this.manager.getActiveFlowsWithExecutor();
    Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(this.flow1,
        Optional.ofNullable(this.manager.fetchExecutor(this.flow1.getExecutionId())))));
    Assert.assertTrue(activeFlowsWithExecutor.contains(new Pair<>(this.flow2,
        Optional.ofNullable(this.manager.fetchExecutor(this.flow2.getExecutionId())))));
  }

  @Test
  public void testFetchAllActiveExecutorServerHosts() throws Exception {
    testSetUpForRunningFlows();
    final Set<String> activeExecutorServerHosts = this.manager.getAllActiveExecutorServerHosts();
    final Executor executor1 = this.manager.fetchExecutor(this.flow1.getExecutionId());
    final Executor executor2 = this.manager.fetchExecutor(this.flow2.getExecutionId());
    Assert.assertTrue(
        activeExecutorServerHosts.contains(executor1.getHost() + ":" + executor1.getPort()));
    Assert.assertTrue(
        activeExecutorServerHosts.contains(executor2.getHost() + ":" + executor2.getPort()));
  }

  /**
   * ExecutorManager should try to dispatch to all executors until it succeeds.
   */
  @Test
  public void testDispatchMultipleRetries() throws Exception {
    this.props.put(Constants.ConfigurationKeys.MAX_DISPATCHING_ERRORS_PERMITTED, 4);
    testSetUpForRunningFlows();
    this.manager.start();
    final ExecutableFlow flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
    flow1.getExecutionOptions().setFailureEmails(Arrays.asList("test@example.com"));
    when(this.loader.fetchExecutableFlow(-1)).thenReturn(flow1);

    // fail 2 first dispatch attempts, then succeed
    when(this.apiGateway.callWithExecutable(any(), any(), eq(ConnectorParams.EXECUTE_ACTION)))
        .thenThrow(new ExecutorManagerException("Mocked dispatch exception 1"))
        .thenThrow(new ExecutorManagerException("Mocked dispatch exception 2"))
        .thenReturn(null);

    // this is just to clean up the execution as FAILED after it has been submitted
    mockFlowDoesNotExist();

    this.manager.submitExecutableFlow(flow1, this.user.getUserId());
    waitFlowFinished(flow1);

    // it's random which executor is chosen each time, but both should have been tried at least once
    verify(this.apiGateway, Mockito.atLeast(1))
        .callWithExecutable(flow1, this.manager.fetchExecutor(1), ConnectorParams.EXECUTE_ACTION);
    verify(this.apiGateway, Mockito.atLeast(1))
        .callWithExecutable(flow1, this.manager.fetchExecutor(2), ConnectorParams.EXECUTE_ACTION);

    // verify that there was a 3rd (successful) dispatch call
    verify(this.apiGateway, Mockito.times(3))
        .callWithExecutable(eq(flow1), any(), eq(ConnectorParams.EXECUTE_ACTION));

    verify(this.loader, Mockito.times(2)).unassignExecutor(-1);
  }

  /*
   * TODO: will move below method to setUp() and run before every test for both runningFlows and queuedFlows
   */
  private void testSetUpForRunningFlows() throws Exception {
    this.loader = mock(ExecutorLoader.class);
    this.apiGateway = mock(ExecutorApiGateway.class);
    this.user = TestUtils.getTestUser();
    this.props.put(Constants.ConfigurationKeys.USE_MULTIPLE_EXECUTORS, "true");
    //To test runningFlows, AZKABAN_QUEUEPROCESSING_ENABLED should be set to true
    //so that flows will be dispatched to executors.
    this.props.put(Constants.ConfigurationKeys.QUEUEPROCESSING_ENABLED, "true");

    // allow two concurrent runs give one Flow
    this.props.put(Constants.ConfigurationKeys.MAX_CONCURRENT_RUNS_ONEFLOW, 2);

    final List<Executor> executors = new ArrayList<>();
    final Executor executor1 = new Executor(1, "localhost", 12345, true);
    final Executor executor2 = new Executor(2, "localhost", 12346, true);
    executors.add(executor1);
    executors.add(executor2);

    when(this.loader.fetchActiveExecutors()).thenReturn(executors);
    this.manager = createExecutorManager();

    this.flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
    this.flow2 = TestUtils.createTestExecutableFlow("exectest1", "exec2");
    this.flow1.setExecutionId(1);
    this.flow2.setExecutionId(2);
    final ExecutionReference ref1 =
        new ExecutionReference(this.flow1.getExecutionId(), executor1);
    final ExecutionReference ref2 =
        new ExecutionReference(this.flow2.getExecutionId(), executor2);
    this.activeFlows.put(this.flow1.getExecutionId(), new Pair<>(ref1, this.flow1));
    this.activeFlows.put(this.flow2.getExecutionId(), new Pair<>(ref2, this.flow2));
    when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
  }

  private ExecutableFlow waitFlowFinished(final ExecutableFlow flow) throws Exception {
    azkaban.test.TestUtils.await().untilAsserted(() -> assertThat(getFlowStatus(flow))
        .matches(Status::isStatusFinished, "isStatusFinished"));
    return fetchFlow(flow);
  }

  private Status getFlowStatus(final ExecutableFlow flow) throws Exception {
    return fetchFlow(flow) != null ? fetchFlow(flow).getStatus() : null;
  }

  private ExecutableFlow fetchFlow(final ExecutableFlow flow) throws ExecutorManagerException {
    return this.loader.fetchExecutableFlow(flow.getExecutionId());
  }

}