RunningExecutionsUpdaterTest.java

167 lines | 5.559 kB Blame History Raw Download
package azkaban.executor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import azkaban.alert.Alerter;
import azkaban.metrics.CommonMetrics;
import azkaban.utils.Pair;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.joda.time.DateTimeUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

public class RunningExecutionsUpdaterTest {

  private static final int EXECUTION_ID_77 = 77;
  private static final ExecutorManagerException API_CALL_EXCEPTION =
      new ExecutorManagerException("Mocked API timeout");

  @Mock
  ExecutorManagerUpdaterStage updaterStage;
  @Mock
  AlerterHolder alerterHolder;
  @Mock
  CommonMetrics commonMetrics;
  @Mock
  ExecutorApiGateway apiGateway;
  @Mock
  ExecutionFinalizer executionFinalizer;
  @Mock
  private Alerter mailAlerter;

  private ExecutableFlow execution;
  private RunningExecutions runningExecutions;
  private Executor activeExecutor;

  private RunningExecutionsUpdater updater;

  @Before
  public void setUp() throws Exception {
    MockitoAnnotations.initMocks(this);
    this.execution = new ExecutableFlow();
    this.execution.setExecutionId(EXECUTION_ID_77);
    this.activeExecutor = new Executor(1, "activeExecutor-1", 9999, true);
    this.runningExecutions = new RunningExecutions();
    this.runningExecutions.get().put(EXECUTION_ID_77, new Pair<>(
        new ExecutionReference(EXECUTION_ID_77, this.activeExecutor), this.execution));
    this.updater = new RunningExecutionsUpdater(this.updaterStage, this.alerterHolder,
        this.commonMetrics, this.apiGateway, this.runningExecutions, this.executionFinalizer);
    when(this.alerterHolder.get("email")).thenReturn(this.mailAlerter);
  }

  @After
  public void tearDown() {
    DateTimeUtils.setCurrentMillisSystem();
  }

  @Test
  public void updateExecutionsStillRunning() throws Exception {
    mockFlowStillRunning();
    this.updater.updateExecutions();
    verifyCallUpdateApi();
    verifyZeroInteractions(this.executionFinalizer);
  }

  @Test
  public void updateExecutionsSucceeded() throws Exception {
    mockFlowSucceeded();
    this.updater.updateExecutions();
    verifyCallUpdateApi();
    verifyFinalizeFlow();
  }

  @Test
  public void updateExecutionsExecutorDoesNotExist() throws Exception {
    mockExecutorDoesNotExist();
    this.updater.updateExecutions();
    verifyFinalizeFlow();
  }

  @Test
  public void updateExecutionsFlowDoesNotExist() throws Exception {
    mockFlowDoesNotExist();
    this.updater.updateExecutions();
    verifyCallUpdateApi();
    verifyFinalizeFlow();
  }

  @Test
  public void updateExecutionsUpdateCallFails() throws Exception {
    mockUpdateCallFails();
    DateTimeUtils.setCurrentMillisFixed(System.currentTimeMillis());
    for (int i = 0; i < this.updater.numErrorsBeforeUnresponsiveEmail; i++) {
      this.updater.updateExecutions();
      DateTimeUtils.setCurrentMillisFixed(
          DateTimeUtils.currentTimeMillis() + this.updater.errorThreshold + 1L);
    }
    verify(this.mailAlerter).alertOnFailedUpdate(
        this.activeExecutor, Collections.singletonList(this.execution), API_CALL_EXCEPTION);

    // TODO change to checking if executor exist in the DB any more
    verifyZeroInteractions(this.executionFinalizer);
    // verify(this.executionFinalizer).finalizeFlow(this.execution, "TODO", null);
  }

  private void mockFlowStillRunning() throws Exception {
    mockUpdateResponse();
  }

  private void mockFlowSucceeded() throws Exception {
    final Map<String, Object> executionMap = mockUpdateResponse();
    executionMap.put(ExecutableNode.STATUS_PARAM, Status.SUCCEEDED.getNumVal());
  }

  private void mockExecutorDoesNotExist() {
    this.runningExecutions.get().put(EXECUTION_ID_77, new Pair<>(
        new ExecutionReference(EXECUTION_ID_77, null), this.execution));
  }

  private void mockUpdateCallFails() throws ExecutorManagerException {
    doThrow(API_CALL_EXCEPTION).when(this.apiGateway).updateExecutions(any(), any());
  }

  private void verifyCallUpdateApi() throws ExecutorManagerException {
    verify(this.apiGateway).updateExecutions(
        this.activeExecutor, Collections.singletonList(this.execution));
  }

  private void mockFlowDoesNotExist() throws Exception {
    final Map<String, Object> executionMap = mockUpdateResponse();
    executionMap.put(ConnectorParams.RESPONSE_ERROR, "Flow does not exist");
  }

  private Map<String, Object> mockUpdateResponse() throws Exception {
    final Map<String, Object> executionMap = new HashMap<>(ImmutableMap.of(
        ConnectorParams.UPDATE_MAP_EXEC_ID, EXECUTION_ID_77));
    mockUpdateResponse(ImmutableMap.of(
        ConnectorParams.RESPONSE_UPDATED_FLOWS, Collections.singletonList(executionMap)));
    return executionMap;
  }

  // Suppress "unchecked generic array creation for varargs parameter".
  // No way to avoid this when mocking a method with generic varags.
  @SuppressWarnings("unchecked")
  private void mockUpdateResponse(
      final Map<String, List<Map<String, Object>>> map) throws Exception {
    doReturn(map).when(this.apiGateway).updateExecutions(any(), any());
  }

  private void verifyFinalizeFlow() {
    verify(this.executionFinalizer).finalizeFlow(this.execution,
        "Not running on the assigned executor (any more)", null);
  }

}