SlaCheckerTest.java

215 lines | 7.181 kB Blame History Raw Download
package azkaban.trigger.builtin;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.sla.SlaOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;

import static azkaban.sla.SlaOption.TYPE_FLOW_FINISH;
import static azkaban.sla.SlaOption.TYPE_FLOW_SUCCEED;
import static azkaban.sla.SlaOption.TYPE_JOB_SUCCEED;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class SlaCheckerTest {
  private final int RUNNING_FLOW = 1;
  private final int SUCCEEDED_FLOW = 2;
  private final int KILLED_FLOW = 3;
  private final int FAILED_FLOW = 4;

  private ExecutorManagerAdapter executorManager;

  @Before
  public void setUp() throws Exception {
    executorManager = mock(ExecutorManagerAdapter.class);
    SlaChecker.setExecutorManager(executorManager);

    addMockFlow(RUNNING_FLOW, Status.RUNNING);
    addMockFlow(SUCCEEDED_FLOW, Status.SUCCEEDED);
    addMockFlow(KILLED_FLOW, Status.KILLED);
    addMockFlow(FAILED_FLOW, Status.FAILED);
  }

  private ExecutableFlow addMockFlow(int execId, Status status) throws ExecutorManagerException {
    ExecutableFlow flow = mock(ExecutableFlow.class);
    when(flow.getStartTime()).thenReturn(DateTime.now().getMillis());
    when(flow.getStatus()).thenReturn(status);

    when(executorManager.getExecutableFlow(execId)).thenReturn(flow);

    return flow;
  }

  private static SlaOption createSlaOption(String type, String duration) {
    Map<String, Object> info = new HashMap<>();
    info.put(SlaOption.INFO_DURATION, duration);
    return new SlaOption(type, Collections.emptyList(), info);
  }

  private static SlaChecker createSlaChecker(String type, String duration, int execId) {
    SlaOption slaOption = createSlaOption(type, duration);
    return new SlaChecker("MockFlowSlaChecker", slaOption, execId);
  }

  @Test
  public void testFlowRunningExpired() throws Exception {
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "0s", RUNNING_FLOW);

    assertFalse((Boolean) slaChecker.isSlaPassed());
    assertTrue((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testFlowRunningNotExpired() throws Exception {
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "1d", RUNNING_FLOW);

    assertFalse((Boolean) slaChecker.isSlaPassed());
    assertFalse((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testFlowRunning() throws Exception {
    final int execId = 100;
    ExecutableFlow flow = addMockFlow(execId, Status.RUNNING);
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "2s", execId);

    // Flow hasn't reached terminal state
    assertFalse((Boolean) slaChecker.isSlaPassed());
    // But SLA is still valid. returns false => trigger is not activated.
    assertFalse((Boolean) slaChecker.isSlaFailed());

    Thread.sleep(2000);

    // Flow still hasn't reached terminal state
    assertFalse((Boolean) slaChecker.isSlaPassed());
    // SLA has been missed
    assertTrue((Boolean) slaChecker.isSlaFailed());

    when(flow.getStatus()).thenReturn(Status.SUCCEEDED);

    // Terminal state reached
    assertTrue((Boolean) slaChecker.isSlaPassed());
    // SLA has not been violated
    assertFalse((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testFlowSucceeded1() throws Exception {
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "0s", SUCCEEDED_FLOW);

    assertTrue((Boolean) slaChecker.isSlaPassed());
    assertFalse((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testFlowSucceeded2() throws Exception {
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "1d", SUCCEEDED_FLOW);

    assertTrue((Boolean) slaChecker.isSlaPassed());
    assertFalse((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testFlowKilledBeforeSla() throws Exception {
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "1d", KILLED_FLOW);

    assertFalse((Boolean) slaChecker.isSlaPassed());
    assertTrue((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testFlowFailedBeforeSla() throws Exception {
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "1d", FAILED_FLOW);

    assertFalse((Boolean) slaChecker.isSlaPassed());
    assertTrue((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testFlowFailedAfterSla() throws Exception {
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "0s", FAILED_FLOW);

    assertFalse((Boolean) slaChecker.isSlaPassed());
    assertTrue((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testFlowKilledAfterSla() throws Exception {
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_SUCCEED, "0s", KILLED_FLOW);

    assertFalse((Boolean) slaChecker.isSlaPassed());
    assertTrue((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testFlowKilledBeforeSlaFinish() throws Exception {
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "1d", KILLED_FLOW);

    assertTrue((Boolean) slaChecker.isSlaPassed());
    assertFalse((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testFlowFailedAfterSlaFinish() throws Exception {
    SlaChecker slaChecker = createSlaChecker(TYPE_FLOW_FINISH, "0s", FAILED_FLOW);

    assertTrue((Boolean) slaChecker.isSlaPassed());
    assertFalse((Boolean) slaChecker.isSlaFailed());
  }

  @Test
  public void testJobSucceedSla() throws Exception {
    final int execId = 100;
    final String mockJobName = "mockJob";

    Map<String, Object> info = new HashMap<>();
    info.put(SlaOption.INFO_DURATION, "2s");
    info.put(SlaOption.INFO_JOB_NAME, mockJobName);
    SlaOption slaOption = new SlaOption(TYPE_JOB_SUCCEED, Collections.emptyList(), info);

    ExecutableNode mockJob = mock(ExecutableNode.class);
    when(mockJob.getStartTime()).thenReturn(DateTime.now().getMillis());
    when(mockJob.getStatus()).thenReturn(Status.RUNNING);

    ExecutableFlow flow = addMockFlow(execId, Status.RUNNING);
    when(flow.getExecutableNode(mockJobName)).thenReturn(mockJob);

    SlaChecker slaChecker = new SlaChecker("MockJobSlaChecker", slaOption, execId);

    // Flow hasn't reached terminal state
    assertFalse((Boolean) slaChecker.isSlaPassed());
    // But SLA is still valid. returns false => trigger is not activated.
    assertFalse((Boolean) slaChecker.isSlaFailed());

    Thread.sleep(2000);

    // Flow still hasn't reached terminal state
    assertFalse((Boolean) slaChecker.isSlaPassed());
    // SLA has been missed
    assertTrue((Boolean) slaChecker.isSlaFailed());

    when(mockJob.getStatus()).thenReturn(Status.SUCCEEDED);
    // Terminal state reached
    assertTrue((Boolean) slaChecker.isSlaPassed());
    // SLA has not been violated
    assertFalse((Boolean) slaChecker.isSlaFailed());

    when(mockJob.getStatus()).thenReturn(Status.KILLED);
    // Terminal state reached
    assertFalse((Boolean) slaChecker.isSlaPassed());
    // SLA has not been violated
    assertTrue((Boolean) slaChecker.isSlaFailed());
  }

}