ExecutorHealthCheckerTest.java

137 lines | 5.925 kB Blame History Raw Download
/*
* Copyright 2019 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the “License”); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.executor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import azkaban.Constants.ConfigurationKeys;
import azkaban.alert.Alerter;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.TestUtils;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;

/**
 * Test case for executor health checker.
 */
public class ExecutorHealthCheckerTest {

  private static final int EXECUTION_ID_11 = 11;
  private static final String AZ_ADMIN_ALERT_EMAIL = "az_admin1@foo.com,az_admin2@foo.com";
  private final Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
  private ExecutorHealthChecker executorHealthChecker;
  private Props props;
  private ExecutorLoader loader;
  private ExecutorApiGateway apiGateway;
  private Alerter mailAlerter;
  private AlerterHolder alerterHolder;
  private ExecutableFlow flow1;
  private Executor executor1;

  @Before
  public void setUp() throws Exception {
    this.props = new Props();
    this.props.put(ConfigurationKeys.AZKABAN_EXECUTOR_MAX_FAILURE_COUNT, 2);
    this.props.put(ConfigurationKeys.AZKABAN_ADMIN_ALERT_EMAIL, AZ_ADMIN_ALERT_EMAIL);
    this.loader = mock(ExecutorLoader.class);
    this.mailAlerter = mock(Alerter.class);
    this.alerterHolder = mock(AlerterHolder.class);
    this.apiGateway = mock(ExecutorApiGateway.class);
    this.executorHealthChecker = new ExecutorHealthChecker(this.props, this.loader, this
        .apiGateway, this.alerterHolder);
    this.flow1 = TestUtils.createTestExecutableFlow("exectest1", "exec1");
    this.flow1.setExecutionId(EXECUTION_ID_11);
    this.flow1.setStatus(Status.RUNNING);
    this.executor1 = new Executor(1, "localhost", 12345, true);
    when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
    when(this.alerterHolder.get("email")).thenReturn(this.mailAlerter);
  }

  /**
   * Test running flow is not finalized and alert email is not sent when executor is alive.
   */
  @Test
  public void checkExecutorHealthAlive() throws Exception {
    this.activeFlows.put(EXECUTION_ID_11, new Pair<>(
        new ExecutionReference(EXECUTION_ID_11, this.executor1), this.flow1));
    when(this.apiGateway.callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
        ConnectorParams.PING_ACTION, null, null)).thenReturn(ImmutableMap.of(ConnectorParams
        .STATUS_PARAM, ConnectorParams.RESPONSE_ALIVE));
    this.executorHealthChecker.checkExecutorHealth();
    assertThat(this.flow1.getStatus()).isEqualTo(Status.RUNNING);
    verifyZeroInteractions(this.alerterHolder);
  }

  /**
   * Test running flow is finalized when its executor is removed from DB.
   */
  @Test
  public void checkExecutorHealthExecutorIdRemoved() throws Exception {
    this.activeFlows.put(EXECUTION_ID_11, new Pair<>(
        new ExecutionReference(EXECUTION_ID_11, null), this.flow1));
    when(this.loader.fetchExecutableFlow(EXECUTION_ID_11)).thenReturn(this.flow1);
    this.executorHealthChecker.checkExecutorHealth();
    verify(this.loader).updateExecutableFlow(this.flow1);
    assertThat(this.flow1.getStatus()).isEqualTo(Status.FAILED);
  }

  /**
   * Test alert emails are sent when there are consecutive failures to contact the executor.
   */
  @Test
  public void checkExecutorHealthConsecutiveFailures() throws Exception {
    this.activeFlows.put(EXECUTION_ID_11, new Pair<>(
        new ExecutionReference(EXECUTION_ID_11, this.executor1), this.flow1));
    // Failed to ping executor. Failure count (=1) < MAX_FAILURE_COUNT (=2). Do not alert.
    this.executorHealthChecker.checkExecutorHealth();
    verify(this.apiGateway).callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
        ConnectorParams.PING_ACTION, null, null);
    verifyZeroInteractions(this.alerterHolder);

    // Pinged executor successfully. Failure count (=0) < MAX_FAILURE_COUNT (=2). Do not alert.
    when(this.apiGateway.callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
        ConnectorParams.PING_ACTION, null, null)).thenReturn(ImmutableMap.of(ConnectorParams
        .STATUS_PARAM, ConnectorParams.RESPONSE_ALIVE));
    this.executorHealthChecker.checkExecutorHealth();
    verifyZeroInteractions(this.alerterHolder);

    // Failed to ping executor. Failure count (=1) < MAX_FAILURE_COUNT (=2). Do not alert.
    when(this.apiGateway.callWithExecutionId(this.executor1.getHost(), this.executor1.getPort(),
        ConnectorParams.PING_ACTION, null, null)).thenReturn(null);
    this.executorHealthChecker.checkExecutorHealth();
    verifyZeroInteractions(this.alerterHolder);

    // Failed to ping executor again. Failure count (=2) = MAX_FAILURE_COUNT (=2). Alert AZ admin.
    this.executorHealthChecker.checkExecutorHealth();
    verify((this.alerterHolder).get("email"))
        .alertOnFailedUpdate(eq(this.executor1), eq(Arrays.asList(this.flow1)),
            any(ExecutorManagerException.class));
    assertThat(this.flow1.getExecutionOptions().getFailureEmails()).isEqualTo
        (Arrays.asList(AZ_ADMIN_ALERT_EMAIL.split(",")));
  }
}