FlowRunnerTest.java

405 lines | 14.208 kB Blame History Raw Download
/*
 * Copyright 2014 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.execapp;

import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.when;

import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobExecutor.AllJobExecutorTests;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

public class FlowRunnerTest extends FlowRunnerTestBase {

  private static final File TEST_DIR = ExecutionsTestUtil.getFlowDir("exectest1");
  private File workingDir;
  private JobTypeManager jobtypeManager;

  @Mock
  private ProjectLoader fakeProjectLoader;

  @Mock
  private ExecutorLoader loader;

  @Before
  public void setUp() throws Exception {
    MockitoAnnotations.initMocks(this);
    when(this.loader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
    System.out.println("Create temp dir");
    this.workingDir = new File("build/tmp/_AzkabanTestDir_" + System.currentTimeMillis());
    if (this.workingDir.exists()) {
      FileUtils.deleteDirectory(this.workingDir);
    }
    this.workingDir.mkdirs();
    this.jobtypeManager =
        new JobTypeManager(null, null, this.getClass().getClassLoader());
    final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
    pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
    pluginSet.addPluginClass("test", InteractiveTestJob.class);
    Utils.initServiceProvider();
    JmxJobMBeanManager.getInstance().initialize(new Props());

    InteractiveTestJob.clearTestJobs();
  }

  @After
  public void tearDown() throws IOException {
    System.out.println("Teardown temp dir");
    synchronized (this) {
      if (this.workingDir != null) {
        FileUtils.deleteDirectory(this.workingDir);
        this.workingDir = null;
      }
    }
  }

  @Test
  public void exec1Normal() throws Exception {
    final EventCollectorListener eventCollector = new EventCollectorListener();
    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
    this.runner = createFlowRunner(this.loader, eventCollector, "exec1");

    startThread(this.runner);
    succeedJobs("job3", "job4", "job6");

    assertFlowStatus(Status.SUCCEEDED);
    assertThreadShutDown();
    compareFinishedRuntime(this.runner);

    assertStatus("job1", Status.SUCCEEDED);
    assertStatus("job2", Status.SUCCEEDED);
    assertStatus("job3", Status.SUCCEEDED);
    assertStatus("job4", Status.SUCCEEDED);
    assertStatus("job5", Status.SUCCEEDED);
    assertStatus("job6", Status.SUCCEEDED);
    assertStatus("job7", Status.SUCCEEDED);
    assertStatus("job8", Status.SUCCEEDED);
    assertStatus("job10", Status.SUCCEEDED);

    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
  }

  @Test
  public void exec1Disabled() throws Exception {
    final EventCollectorListener eventCollector = new EventCollectorListener();
    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
    final ExecutableFlow exFlow = prepareExecDir(TEST_DIR, "exec1", 1);

    // Disable couple in the middle and at the end.
    exFlow.getExecutableNode("job1").setStatus(Status.DISABLED);
    exFlow.getExecutableNode("job6").setStatus(Status.DISABLED);
    exFlow.getExecutableNode("job5").setStatus(Status.DISABLED);
    exFlow.getExecutableNode("job10").setStatus(Status.DISABLED);

    this.runner = createFlowRunner(exFlow, this.loader, eventCollector);

    Assert.assertTrue(!this.runner.isKilled());
    assertFlowStatus(Status.READY);

    startThread(this.runner);
    succeedJobs("job3", "job4");

    assertThreadShutDown();
    compareFinishedRuntime(this.runner);

    assertFlowStatus(Status.SUCCEEDED);

    assertStatus("job1", Status.SKIPPED);
    assertStatus("job2", Status.SUCCEEDED);
    assertStatus("job3", Status.SUCCEEDED);
    assertStatus("job4", Status.SUCCEEDED);
    assertStatus("job5", Status.SKIPPED);
    assertStatus("job6", Status.SKIPPED);
    assertStatus("job7", Status.SUCCEEDED);
    assertStatus("job8", Status.SUCCEEDED);
    assertStatus("job10", Status.SKIPPED);

    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
  }

  @Test
  public void exec1Failed() throws Exception {
    final EventCollectorListener eventCollector = new EventCollectorListener();
    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
    final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec2", 1);

    this.runner = createFlowRunner(flow, this.loader, eventCollector);

    startThread(this.runner);
    succeedJobs("job6");

    Assert.assertTrue(!this.runner.isKilled());
    assertFlowStatus(Status.FAILED);

    assertStatus("job1", Status.SUCCEEDED);
    assertStatus("job2d", Status.FAILED);
    assertStatus("job3", Status.CANCELLED);
    assertStatus("job4", Status.CANCELLED);
    assertStatus("job5", Status.CANCELLED);
    assertStatus("job6", Status.SUCCEEDED);
    assertStatus("job7", Status.CANCELLED);
    assertStatus("job8", Status.CANCELLED);
    assertStatus("job9", Status.CANCELLED);
    assertStatus("job10", Status.CANCELLED);
    assertThreadShutDown();

    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
  }

  @Test
  public void exec1FailedKillAll() throws Exception {
    final EventCollectorListener eventCollector = new EventCollectorListener();
    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
    final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec2", 1);
    flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);

    this.runner = createFlowRunner(flow, this.loader, eventCollector);

    startThread(this.runner);
    assertThreadShutDown();

    Assert.assertTrue(this.runner.isKilled());

    assertFlowStatus(Status.KILLED);

    assertStatus("job1", Status.SUCCEEDED);
    assertStatus("job2d", Status.FAILED);
    assertStatus("job3", Status.CANCELLED);
    assertStatus("job4", Status.CANCELLED);
    assertStatus("job5", Status.CANCELLED);
    assertStatus("job6", Status.KILLED);
    assertStatus("job7", Status.CANCELLED);
    assertStatus("job8", Status.CANCELLED);
    assertStatus("job9", Status.CANCELLED);
    assertStatus("job10", Status.CANCELLED);

    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
  }

  @Test
  public void exec1FailedFinishRest() throws Exception {
    final EventCollectorListener eventCollector = new EventCollectorListener();
    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
    final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec3", 1);
    flow.getExecutionOptions().setFailureAction(
        FailureAction.FINISH_ALL_POSSIBLE);
    this.runner = createFlowRunner(flow, this.loader, eventCollector);

    startThread(this.runner);
    succeedJobs("job3");

    assertFlowStatus(Status.FAILED);

    assertStatus("job1", Status.SUCCEEDED);
    assertStatus("job2d", Status.FAILED);
    assertStatus("job3", Status.SUCCEEDED);
    assertStatus("job4", Status.CANCELLED);
    assertStatus("job5", Status.CANCELLED);
    assertStatus("job6", Status.CANCELLED);
    assertStatus("job7", Status.SUCCEEDED);
    assertStatus("job8", Status.SUCCEEDED);
    assertStatus("job9", Status.SUCCEEDED);
    assertStatus("job10", Status.CANCELLED);
    assertThreadShutDown();

    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
  }

  @Test
  public void execAndCancel() throws Exception {
    final EventCollectorListener eventCollector = new EventCollectorListener();
    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
    this.runner = createFlowRunner(this.loader, eventCollector, "exec1");

    startThread(this.runner);

    assertStatus("job1", Status.SUCCEEDED);
    assertStatus("job2", Status.SUCCEEDED);
    waitJobsStarted(this.runner, "job3", "job4", "job6");

    this.runner.kill("me");
    Assert.assertTrue(this.runner.isKilled());

    assertStatus("job5", Status.CANCELLED);
    assertStatus("job7", Status.CANCELLED);
    assertStatus("job8", Status.CANCELLED);
    assertStatus("job10", Status.CANCELLED);
    assertStatus("job3", Status.KILLED);
    assertStatus("job4", Status.KILLED);
    assertStatus("job6", Status.KILLED);
    assertThreadShutDown();

    assertFlowStatus(Status.KILLED);

    eventCollector.assertEvents(Type.FLOW_STARTED, Type.FLOW_FINISHED);
  }

  @Test
  public void execRetries() throws Exception {
    final EventCollectorListener eventCollector = new EventCollectorListener();
    eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
        Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
    this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");

    startThread(this.runner);
    assertThreadShutDown();

    assertStatus("job-retry", Status.SUCCEEDED);
    assertStatus("job-pass", Status.SUCCEEDED);
    assertStatus("job-retry-fail", Status.FAILED);
    assertAttempts("job-retry", 3);
    assertAttempts("job-pass", 0);
    assertAttempts("job-retry-fail", 2);

    assertFlowStatus(Status.FAILED);
  }

  private void startThread(final FlowRunner runner) {
    Assert.assertTrue(!runner.isKilled());
    final Thread thread = new Thread(runner);
    thread.start();
  }

  private void assertAttempts(final String name, final int attempt) {
    final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNode(name);
    if (node.getAttempt() != attempt) {
      Assert.fail("Expected " + attempt + " got " + node.getAttempt()
          + " attempts " + name);
    }
  }

  private ExecutableFlow prepareExecDir(final File execDir, final String flowName,
      final int execId) throws IOException {
    FileUtils.copyDirectory(execDir, this.workingDir);

    final File jsonFlowFile = new File(this.workingDir, flowName + ".flow");
    final HashMap<String, Object> flowObj =
        (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);

    final Project project = new Project(1, "myproject");
    project.setVersion(2);

    final Flow flow = Flow.flowFromObject(flowObj);
    final ExecutableFlow execFlow = new ExecutableFlow(project, flow);
    execFlow.setExecutionId(execId);
    execFlow.setExecutionPath(this.workingDir.getPath());
    return execFlow;
  }

  private void compareFinishedRuntime(final FlowRunner runner) throws Exception {
    final ExecutableFlow flow = runner.getExecutableFlow();
    for (final String flowName : flow.getStartNodes()) {
      final ExecutableNode node = flow.getExecutableNode(flowName);
      compareStartFinishTimes(flow, node, 0);
    }
  }

  private void compareStartFinishTimes(final ExecutableFlow flow,
      final ExecutableNode node, final long previousEndTime) throws Exception {
    final long startTime = node.getStartTime();
    final long endTime = node.getEndTime();

    // If start time is < 0, so will the endtime.
    if (startTime <= 0) {
      Assert.assertTrue(endTime <= 0);
      return;
    }

    Assert.assertTrue("Checking start and end times", startTime > 0
        && endTime >= startTime);
    Assert.assertTrue("Start time for " + node.getId() + " is " + startTime
        + " and less than " + previousEndTime, startTime >= previousEndTime);

    for (final String outNode : node.getOutNodes()) {
      final ExecutableNode childNode = flow.getExecutableNode(outNode);
      compareStartFinishTimes(flow, childNode, endTime);
    }
  }

  private FlowRunner createFlowRunner(final ExecutableFlow flow,
      final ExecutorLoader loader, final EventCollectorListener eventCollector) throws Exception {
    return createFlowRunner(flow, loader, eventCollector, new Props());
  }

  private FlowRunner createFlowRunner(final ExecutableFlow flow,
      final ExecutorLoader loader, final EventCollectorListener eventCollector,
      final Props azkabanProps)
      throws Exception {

    loader.uploadExecutableFlow(flow);
    final FlowRunner runner =
        new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);

    runner.addListener(eventCollector);

    return runner;
  }

  private FlowRunner createFlowRunner(final ExecutorLoader loader,
      final EventCollectorListener eventCollector, final String flowName) throws Exception {
    return createFlowRunner(loader, eventCollector, flowName, new Props());
  }

  private FlowRunner createFlowRunner(final ExecutorLoader loader,
      final EventCollectorListener eventCollector, final String flowName, final Props azkabanProps)
      throws Exception {
    final ExecutableFlow exFlow = prepareExecDir(TEST_DIR, flowName, 1);

    loader.uploadExecutableFlow(exFlow);

    final FlowRunner runner =
        new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps);

    runner.addListener(eventCollector);

    return runner;
  }
}