FlowTriggerInstanceLoaderTest.java

341 lines | 13.495 kB Blame History Raw Download
/*
 * Copyright 2017 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package azkaban.flowtrigger;

import static org.assertj.core.api.Assertions.assertThat;

import azkaban.Constants;
import azkaban.db.DatabaseOperator;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.flowtrigger.database.JdbcFlowTriggerInstanceLoaderImpl;
import azkaban.project.DirectoryYamlFlowLoader;
import azkaban.project.FlowLoaderUtils;
import azkaban.project.FlowTrigger;
import azkaban.project.FlowTriggerDependency;
import azkaban.project.JdbcProjectImpl;
import azkaban.project.JdbcProjectImplTest;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class FlowTriggerInstanceLoaderTest {

  private static final Logger logger = LoggerFactory.getLogger(FlowTriggerInstanceLoaderTest.class);
  private static final String test_project_zip_dir = "flowtriggeryamltest";
  private static final String test_flow_file = "flow_trigger.flow";
  private static final int project_id = 123;
  private static final String project_name = "test";
  private static final int project_version = 3;
  private static final String flow_id = "flow_trigger";
  private static final int flow_version = 1;
  private static final Props props = new Props();
  private static final String submitUser = "uploadUser1";
  private static DatabaseOperator dbOperator;
  private static ProjectLoader projLoader;
  private static FlowTrigger flowTrigger;
  private static FlowTriggerInstanceLoader triggerInstLoader;
  private static Project project;

  @AfterClass
  public static void destroyDB() {
    try {
      dbOperator.update("SHUTDOWN");
      dbOperator.update("DROP ALL OBJECTS");
      dbOperator.update("SHUTDOWN");
    } catch (final SQLException e) {
      logger.error("unable to destroy db", e);
    }
  }

  @BeforeClass
  public static void setup() throws Exception {
    dbOperator = Utils.initTestDB();
    projLoader = new JdbcProjectImpl(props, dbOperator);
    triggerInstLoader = new JdbcFlowTriggerInstanceLoaderImpl(dbOperator, projLoader);
    project = new Project(project_id, project_name);
    final DirectoryYamlFlowLoader yamlFlowLoader = new DirectoryYamlFlowLoader(new Props());
    yamlFlowLoader
        .loadProjectFlow(project, ExecutionsTestUtil.getFlowDir(test_project_zip_dir));
    project.setVersion(project_version);
    project.setFlows(yamlFlowLoader.getFlowMap());
    project.setLastModifiedUser(submitUser);

    final File flowFile = new File(JdbcProjectImplTest.class.getClassLoader().getResource
        (test_flow_file).getFile());

    projLoader
        .uploadFlowFile(project_id, project_version, flowFile, flow_version);
    flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
  }


  private TriggerInstance createTriggerInstance(final FlowTrigger flowTrigger, final String flowId,
      final int flowVersion, final String submitUser, final Project project, final long startTime) {
    final String triggerInstId = UUID.randomUUID().toString();
    final List<DependencyInstance> depInstList = new ArrayList<>();
    for (final FlowTriggerDependency dep : flowTrigger.getDependencies()) {
      final String depName = dep.getName();
      final Date startDate = new Date(startTime);
      final DependencyInstanceContext context = new TestDependencyInstanceContext(null, null, null);
      final Status status = Status.RUNNING;
      final CancellationCause cause = CancellationCause.NONE;
      final Date endTime = null;
      final DependencyInstance depInst = new DependencyInstance(depName, startDate, endTime,
          context, status, cause);
      depInstList.add(depInst);
    }

    final int flowExecId = Constants.UNASSIGNED_EXEC_ID;
    final TriggerInstance triggerInstance = new TriggerInstance(triggerInstId, flowTrigger,
        flowId, flowVersion, submitUser, depInstList, flowExecId, project);

    return triggerInstance;
  }


  @Test
  public void testUploadTriggerInstance() {
    final TriggerInstance expectedTriggerInst = this.createTriggerInstance(this.flowTrigger, this
        .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis());

    this.triggerInstLoader.uploadTriggerInstance(expectedTriggerInst);

    final TriggerInstance actualTriggerInst = this.triggerInstLoader
        .getTriggerInstanceById(expectedTriggerInst.getId());

    assertThat(expectedTriggerInst.getFlowTrigger().toString())
        .isEqualToIgnoringWhitespace(actualTriggerInst.getFlowTrigger().toString());

    assertThat(expectedTriggerInst).isEqualToIgnoringGivenFields(actualTriggerInst,
        "depInstances", "flowTrigger");

    assertThat(expectedTriggerInst.getDepInstances())
        .usingElementComparatorIgnoringFields("triggerInstance", "context")
        .containsAll(actualTriggerInst.getDepInstances())
        .hasSameSizeAs(actualTriggerInst.getDepInstances());
  }

  private void assertTriggerInstancesEqual(final TriggerInstance actual,
      final TriggerInstance expected, final boolean ignoreFlowTrigger) {
    if (!ignoreFlowTrigger) {
      if (actual.getFlowTrigger() != null && expected.getFlowTrigger() != null) {
        assertThat(actual.getFlowTrigger().toString())
            .isEqualToIgnoringWhitespace(expected.getFlowTrigger().toString());
      } else {
        assertThat(actual.getFlowTrigger()).isNull();
        assertThat(expected.getFlowTrigger()).isNull();
      }
    }

    assertThat(actual).isEqualToIgnoringGivenFields(expected, "depInstances", "flowTrigger");

    assertThat(actual.getDepInstances())
        .usingComparatorForElementFieldsWithType((d1, d2) -> {
          if (d1 == null && d2 == null) {
            return 0;
          } else if (d1 != null && d2 != null && d1.getTime() == d2.getTime()) {
            return 0;
          } else {
            return -1;
          }
        }, Date.class)
        .usingElementComparatorIgnoringFields("triggerInstance", "context")
        .containsExactlyInAnyOrder(expected.getDepInstances()
            .toArray(new DependencyInstance[expected.getDepInstances().size()]));
  }

  @Test
  public void testUpdateDependencyExecutionStatus() {
    final TriggerInstance expectedTriggerInst = this.createTriggerInstance(this.flowTrigger, this
        .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis());

    this.triggerInstLoader.uploadTriggerInstance(expectedTriggerInst);
    for (final DependencyInstance depInst : expectedTriggerInst.getDepInstances()) {
      depInst.setStatus(Status.CANCELLED);
      depInst.setEndTime(new Date());
      depInst.setCancellationCause(CancellationCause.MANUAL);
      this.triggerInstLoader.updateDependencyExecutionStatus(depInst);
    }

    final TriggerInstance actualTriggerInst = this.triggerInstLoader
        .getTriggerInstanceById(expectedTriggerInst.getId());
    assertTriggerInstancesEqual(actualTriggerInst, expectedTriggerInst, false);
  }

  private void finalizeTriggerInstanceWithSuccess(final TriggerInstance triggerInst, final int
      associateFlowExecId) {
    for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
      depInst.setStatus(Status.SUCCEEDED);
      depInst.getTriggerInstance().setFlowExecId(associateFlowExecId);
    }
  }

  private void finalizeTriggerInstanceWithCancelled(final TriggerInstance triggerInst) {
    for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
      depInst.setStatus(Status.CANCELLED);
      depInst.setCancellationCause(CancellationCause.TIMEOUT);
      depInst.setEndTime(new Date());
    }
  }

  @Test
  public void testGetIncompleteTriggerInstancesReturnsEmpty() {
    final List<TriggerInstance> all = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
      all.add(this.createTriggerInstance(this.flowTrigger, this
          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
      if (i <= 2) {
        finalizeTriggerInstanceWithCancelled(all.get(i));
      } else {
        finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
      }
    }

    all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));

    final List<TriggerInstance> actual = new ArrayList<>(this.triggerInstLoader
        .getIncompleteTriggerInstances());
    all.sort(Comparator.comparing(TriggerInstance::getId));
    actual.sort(Comparator.comparing(TriggerInstance::getId));

    assertThat(actual).isEmpty();
  }

  @Test
  public void testGetIncompleteTriggerInstances() {
    final List<TriggerInstance> allInstances = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
      allInstances.add(this.createTriggerInstance(this.flowTrigger, this
          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
    }

    finalizeTriggerInstanceWithCancelled(allInstances.get(0));
    finalizeTriggerInstanceWithSuccess(allInstances.get(1), 1000);
    // this trigger instance should still count as incomplete one since no flow execution has
    // been started
    finalizeTriggerInstanceWithSuccess(allInstances.get(2), -1);

    allInstances.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));

    final List<TriggerInstance> expected = allInstances.subList(2, allInstances.size());
    final List<TriggerInstance> actual = new ArrayList<>(this.triggerInstLoader
        .getIncompleteTriggerInstances());
    assertTwoTriggerInstanceListsEqual(actual, expected, false, false);
  }

  private void assertTwoTriggerInstanceListsEqual(final List<TriggerInstance> actual,
      final List<TriggerInstance> expected, final boolean ignoreFlowTrigger,
      final boolean keepOriginalOrder) {
    if (!keepOriginalOrder) {
      expected.sort(Comparator.comparing(TriggerInstance::getId));
      actual.sort(Comparator.comparing(TriggerInstance::getId));
    }

    assertThat(actual).hasSameSizeAs(expected);
    final Iterator<TriggerInstance> it1 = actual.iterator();
    final Iterator<TriggerInstance> it2 = expected.iterator();
    while (it1.hasNext() && it2.hasNext()) {
      //8bfafb89-ac79-45a0-a049-b55038b0886b
      assertTriggerInstancesEqual(it1.next(), it2.next(), ignoreFlowTrigger);
    }
  }

  @Test
  public void testUpdateAssociatedFlowExecId() {
    final TriggerInstance expectedTriggerInst = this.createTriggerInstance(this.flowTrigger, this
        .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis());
    this.triggerInstLoader.uploadTriggerInstance(expectedTriggerInst);
    finalizeTriggerInstanceWithSuccess(expectedTriggerInst, 1000);

    expectedTriggerInst.getDepInstances()
        .forEach(depInst -> this.triggerInstLoader.updateDependencyExecutionStatus(depInst));

    this.triggerInstLoader.updateAssociatedFlowExecId(expectedTriggerInst);

    final TriggerInstance actualTriggerInst = this.triggerInstLoader
        .getTriggerInstanceById(expectedTriggerInst.getId());

    assertTriggerInstancesEqual(actualTriggerInst, expectedTriggerInst, false);
  }

  @Test
  public void testGetRecentlyFinishedReturnsEmpty() {
    final List<TriggerInstance> all = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
      all.add(this.createTriggerInstance(this.flowTrigger, this
          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
    }

    all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));

    final Collection<TriggerInstance> recentlyFinished = this.triggerInstLoader
        .getRecentlyFinished(10);
    assertThat(recentlyFinished).isEmpty();
  }

  @Test
  public void testGetRecentlyFinished() {

    final List<TriggerInstance> all = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
      all.add(this.createTriggerInstance(this.flowTrigger, this
          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
          + i * 10000));
      if (i <= 3) {
        finalizeTriggerInstanceWithCancelled(all.get(i));
      } else if (i <= 6) {
        finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
      }
    }

    all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));

    final List<TriggerInstance> expected = all.subList(0, 7);
    expected.sort(Comparator.comparing(TriggerInstance::getStartTime));

    final Collection<TriggerInstance> recentlyFinished = this.triggerInstLoader
        .getRecentlyFinished(10);
    assertTwoTriggerInstanceListsEqual(new ArrayList<>(recentlyFinished), expected, true, true);
  }

  @After
  public void cleanDB() {
    try {
      dbOperator.update("TRUNCATE TABLE execution_dependencies");
    } catch (final SQLException e) {
      e.printStackTrace();
    }
  }
}