JdbcFlowTriggerInstanceLoaderImpl.java

467 lines | 17.271 kB Blame History Raw Download
/*
 * Copyright 2017 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.flowtrigger.db;

import azkaban.Constants;
import azkaban.db.DatabaseOperator;
import azkaban.db.SQLTransaction;
import azkaban.flow.FlowUtils;
import azkaban.flowtrigger.CancellationCause;
import azkaban.flowtrigger.DependencyException;
import azkaban.flowtrigger.DependencyInstance;
import azkaban.flowtrigger.Status;
import azkaban.flowtrigger.TriggerInstance;
import azkaban.project.FlowLoaderUtils;
import azkaban.project.FlowTrigger;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Singleton
public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoader {

  private static final Logger logger = LoggerFactory
      .getLogger(JdbcFlowTriggerInstanceLoaderImpl.class);

  private static final String[] DEPENDENCY_EXECUTIONS_COLUMNS = {"trigger_instance_id", "dep_name",
      "starttime", "endtime", "dep_status", "cancelleation_cause", "project_id", "project_version",
      "flow_id", "flow_version", "project_json", "flow_exec_id"};

  private static final String DEPENDENCY_EXECUTION_TABLE = "execution_dependencies";

  private static final String INSERT_DEPENDENCY = String.format("INSERT INTO %s(%s) VALUES(%s);"
      + "", DEPENDENCY_EXECUTION_TABLE, StringUtils.join
      (DEPENDENCY_EXECUTIONS_COLUMNS, ","), String.join(",", Collections.nCopies
      (DEPENDENCY_EXECUTIONS_COLUMNS.length, "?")));

  private static final String UPDATE_DEPENDENCY_STATUS_ENDTIME_AND_CANCELLEATION_CAUSE = String
      .format
          ("UPDATE %s SET dep_status = ?, endtime = ?, cancelleation_cause  = ? WHERE trigger_instance_id = "
              + "? AND dep_name = ? ;", DEPENDENCY_EXECUTION_TABLE);


  private static final String SELECT_EXECUTIONS_BY_INSTANCE_ID =
      String.format("SELECT %s FROM %s WHERE trigger_instance_id = ?",
          StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
          DEPENDENCY_EXECUTION_TABLE);

  private static final String SELECT_ALL_PENDING_EXECUTIONS =
      String
          .format(
              "SELECT %s FROM %s WHERE trigger_instance_id in (SELECT trigger_instance_id FROM %s "
                  + "WHERE "
                  + "dep_status = %s or dep_status = %s or (dep_status = %s and "
                  + "flow_exec_id = %s))",
              StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
              DEPENDENCY_EXECUTION_TABLE,
              DEPENDENCY_EXECUTION_TABLE,
              Status.RUNNING.ordinal(), Status.CANCELLING.ordinal(),
              Status.SUCCEEDED.ordinal(),
              Constants.UNASSIGNED_EXEC_ID);

  private static final String SELECT_RECENTLY_FINISHED =
      "SELECT execution_dependencies.trigger_instance_id,dep_name,starttime,endtime,dep_status,"
          + "cancelleation_cause,project_id,project_version,flow_id,flow_version,"
          + "project_json, flow_exec_id \n"
          + "FROM execution_dependencies JOIN (\n"
          + "SELECT trigger_instance_id FROM execution_dependencies WHERE trigger_instance_id not in (\n"
          + "SELECT distinct(trigger_instance_id)  FROM execution_dependencies WHERE dep_status =  0 or dep_status = 4)\n"
          + "GROUP BY trigger_instance_id\n"
          + "ORDER BY  min(starttime) desc limit %s) temp on execution_dependencies"
          + ".trigger_instance_id in (temp.trigger_instance_id);";

  private static final String UPDATE_DEPENDENCY_FLOW_EXEC_ID = String.format("UPDATE %s SET "
      + "flow_exec_id "
      + "= ? WHERE trigger_instance_id = ? AND dep_name = ? ;", DEPENDENCY_EXECUTION_TABLE);

  private final DatabaseOperator dbOperator;
  private final ProjectLoader projectLoader;

  @Inject
  public JdbcFlowTriggerInstanceLoaderImpl(final DatabaseOperator databaseOperator,
      final ProjectLoader projectLoader) {
    this.dbOperator = databaseOperator;
    this.projectLoader = projectLoader;
  }

  @Override
  public Collection<TriggerInstance> getIncompleteTriggerInstances() {
    Collection<TriggerInstance> unfinished = Collections.EMPTY_LIST;
    try {
      unfinished = this.dbOperator
          .query(SELECT_ALL_PENDING_EXECUTIONS, new TriggerInstanceHandler());

      // backfilling flow trigger for unfinished trigger instances
      // dedup flow config id with a set to avoid downloading/parsing same flow file multiple times

      final Set<FlowConfigID> flowConfigIDSet = unfinished.stream()
          .map(triggerInstance -> new FlowConfigID(triggerInstance.getProject().getId(),
              triggerInstance.getProject().getVersion(), triggerInstance.getFlowId(),
              triggerInstance.getFlowVersion())).collect(Collectors.toSet());

      final Map<FlowConfigID, FlowTrigger> flowTriggers = new HashMap<>();
      for (final FlowConfigID flowConfigID : flowConfigIDSet) {
        final File tempDir = Files.createTempDir();
        try {
          final File flowFile = this.projectLoader
              .getUploadedFlowFile(flowConfigID.getProjectId(), flowConfigID.getProjectVersion(),
                  flowConfigID.getFlowId() + ".flow", flowConfigID.getFlowVersion(), tempDir);

          if (flowFile != null) {
            final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
            if (flowTrigger != null) {
              flowTriggers.put(flowConfigID, flowTrigger);
            }
          } else {
            logger.error("Unable to find flow file for " + flowConfigID);
          }
        } catch (final IOException ex) {
          logger.error("error in getting flow file", ex);
        } finally {
          FlowLoaderUtils.cleanUpDir(tempDir);
        }
      }

      for (final TriggerInstance triggerInst : unfinished) {
        triggerInst.setFlowTrigger(flowTriggers.get(new FlowConfigID(triggerInst.getProject()
            .getId(), triggerInst.getProject().getVersion(), triggerInst.getFlowId(),
            triggerInst.getFlowVersion())));
      }

    } catch (final SQLException ex) {
      handleSQLException(ex);
    }

    return unfinished;
  }

  private void handleSQLException(final SQLException ex)
      throws DependencyException {
    final String error = "exception when accessing db!";
    logger.error(error, ex);
    throw new DependencyException(error, ex);
  }

  @Override
  public void updateAssociatedFlowExecId(final TriggerInstance triggerInst) {
    final SQLTransaction<Integer> insertTrigger = transOperator -> {
      for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
        transOperator
            .update(UPDATE_DEPENDENCY_FLOW_EXEC_ID, triggerInst.getFlowExecId(),
                triggerInst.getId(), depInst.getDepName());
      }
      return null;
    };
    executeTransaction(insertTrigger);
  }

  private void executeUpdate(final String query, final Object... params) {
    try {
      this.dbOperator.update(query, params);
    } catch (final SQLException ex) {
      handleSQLException(ex);
    }
  }

  private void executeTransaction(final SQLTransaction<Integer> tran) {
    try {
      this.dbOperator.transaction(tran);
    } catch (final SQLException ex) {
      handleSQLException(ex);
    }
  }

  @Override
  public void uploadTriggerInstance(final TriggerInstance triggerInst) {
    final SQLTransaction<Integer> insertTrigger = transOperator -> {
      for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
        transOperator
            .update(INSERT_DEPENDENCY, triggerInst.getId(), depInst.getDepName(),
                depInst.getStartTime(), depInst.getEndTime(), depInst.getStatus().ordinal(),
                depInst.getCancellationCause().ordinal(),
                triggerInst.getProject().getId(),
                triggerInst.getProject().getVersion(),
                triggerInst.getFlowId(),
                triggerInst.getFlowVersion(),
                FlowUtils.toJson(triggerInst.getProject()),
                triggerInst.getFlowExecId());
      }
      return null;
    };

    executeTransaction(insertTrigger);
  }

  @Override
  public void updateDependencyExecutionStatus(final DependencyInstance depInst) {
    executeUpdate(UPDATE_DEPENDENCY_STATUS_ENDTIME_AND_CANCELLEATION_CAUSE, depInst.getStatus()
            .ordinal(),
        depInst.getEndTime(), depInst.getCancellationCause().ordinal(),
        depInst.getTriggerInstance().getId(),
        depInst.getDepName());
  }

  /**
   * Retrieve recently finished trigger instances, but flow trigger properties are not populated
   * into the returned trigger instances for efficiency. Flow trigger properties will be
   * retrieved only on request time.
   */
  @Override
  public Collection<TriggerInstance> getRecentlyFinished(final int limit) {
    final String query = String.format(SELECT_RECENTLY_FINISHED, limit);
    try {
      return this.dbOperator.query(query, new TriggerInstanceHandler());
    } catch (final SQLException ex) {
      handleSQLException(ex);
    }
    return Collections.emptyList();
  }

  /**
   * Retrieve a trigger instance given an instance id. Flow trigger properties will also be
   * populated into the returned trigger instance.
   */
  @Override
  public TriggerInstance getTriggerInstanceById(final String triggerInstanceId) {
    TriggerInstance triggerInstance = null;
    try {
      final Collection<TriggerInstance> res = this.dbOperator
          .query(SELECT_EXECUTIONS_BY_INSTANCE_ID, new TriggerInstanceHandler(), triggerInstanceId);
      triggerInstance = !res.isEmpty() ? res.iterator().next() : null;
    } catch (final SQLException ex) {
      handleSQLException(ex);
    }
    if (triggerInstance != null) {
      final int projectId = triggerInstance.getProject().getId();
      final int projectVersion = triggerInstance.getProject().getVersion();
      final String flowFileName = triggerInstance.getFlowId() + ".flow";
      final int flowVersion = triggerInstance.getFlowVersion();
      final File tempDir = Files.createTempDir();
      try {
        final File flowFile = this.projectLoader
            .getUploadedFlowFile(projectId, projectVersion, flowFileName, flowVersion, tempDir);

        if (flowFile != null) {
          final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
          if (flowTrigger != null) {
            triggerInstance.setFlowTrigger(flowTrigger);
          }
        } else {
          logger.error("Unable to find flow file for " + triggerInstanceId);
        }
      } catch (final IOException ex) {
        logger.error("error in getting flow file", ex);
      } finally {
        FlowLoaderUtils.cleanUpDir(tempDir);
      }
    }
    return triggerInstance;
  }


  private static class TriggerInstanceHandler implements
      ResultSetHandler<Collection<TriggerInstance>> {

    public TriggerInstanceHandler() {
    }

    @Override
    public Collection<TriggerInstance> handle(final ResultSet rs) throws SQLException {
      final Map<TriggerInstKey, List<DependencyInstance>> triggerInstMap = new HashMap<>();

      while (rs.next()) {
        final String triggerInstId = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[0]);
        final String depName = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[1]);
        final Date startTime = rs.getTimestamp(DEPENDENCY_EXECUTIONS_COLUMNS[2]);
        final Date endTime = rs.getTimestamp(DEPENDENCY_EXECUTIONS_COLUMNS[3]);
        final Status status = Status.values()[rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[4])];
        final CancellationCause cause = CancellationCause.values()[rs.getInt
            (DEPENDENCY_EXECUTIONS_COLUMNS[5])];
        final int projId = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[6]);
        final int projVersion = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[7]);
        final String flowId = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[8]);
        final int flowVersion = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[9]);
        final Project project = FlowUtils
            .toProject(rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[10]));
        final int flowExecId = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[11]);

        final TriggerInstKey key = new TriggerInstKey(triggerInstId, project.getLastModifiedUser(),
            projId, projVersion, flowId, flowVersion, flowExecId, project);
        List<DependencyInstance> dependencyInstanceList = triggerInstMap.get(key);
        final DependencyInstance depInst = new DependencyInstance(depName, startTime, endTime,
            null, status, cause);
        if (dependencyInstanceList == null) {
          dependencyInstanceList = new ArrayList<>();
          triggerInstMap.put(key, dependencyInstanceList);
        }

        dependencyInstanceList.add(depInst);
      }

      final List<TriggerInstance> res = new ArrayList<>();
      for (final Map.Entry<TriggerInstKey, List<DependencyInstance>> entry : triggerInstMap
          .entrySet()) {
        res.add(new TriggerInstance(entry.getKey().triggerInstId, null, entry.getKey()
            .flowConfigID.flowId, entry.getKey().flowConfigID.flowVersion, entry.getKey()
            .submitUser, entry.getValue(), entry.getKey().flowExecId, entry.getKey().project));
      }

      //sort on start time in ascending order
      Collections.sort(res, (o1, o2) -> {
        if (o1.getStartTime() == null && o2.getStartTime() == null) {
          return 0;
        } else if (o1.getStartTime() != null && o2.getStartTime() != null) {
          return o1.getStartTime().compareTo(o2.getStartTime());
        } else {
          return o1.getStartTime() == null ? -1 : 1;
        }
      });

      return res;
    }

    private static class TriggerInstKey {

      String triggerInstId;
      FlowConfigID flowConfigID;
      String submitUser;
      int flowExecId;
      Project project;

      public TriggerInstKey(final String triggerInstId, final String submitUser, final int projId,
          final int projVersion, final String flowId, final int flowVerion, final int flowExecId,
          final Project project) {
        this.triggerInstId = triggerInstId;
        this.flowConfigID = new FlowConfigID(projId, projVersion, flowId, flowVerion);
        this.submitUser = submitUser;
        this.flowExecId = flowExecId;
        this.project = project;
      }

      @Override
      public boolean equals(final Object o) {
        if (this == o) {
          return true;
        }

        if (o == null || getClass() != o.getClass()) {
          return false;
        }

        final TriggerInstKey that = (TriggerInstKey) o;

        return new EqualsBuilder()
            .append(this.triggerInstId, that.triggerInstId)
            .append(this.flowConfigID, that.flowConfigID)
            .isEquals();
      }

      @Override
      public int hashCode() {
        return new HashCodeBuilder(17, 37)
            .append(this.triggerInstId)
            .append(this.flowConfigID)
            .toHashCode();
      }
    }
  }

  public static class FlowConfigID {

    private final int projectId;
    private final int projectVerison;
    private final String flowId;
    private final int flowVersion;

    public FlowConfigID(final int projectId, final int projectVerison, final String flowId,
        final int flowVersion) {
      this.projectId = projectId;
      this.projectVerison = projectVerison;
      this.flowId = flowId;
      this.flowVersion = flowVersion;
    }

    public int getProjectId() {
      return this.projectId;
    }

    public int getProjectVersion() {
      return this.projectVerison;
    }

    public String getFlowId() {
      return this.flowId;
    }

    @Override
    public boolean equals(final Object o) {
      if (this == o) {
        return true;
      }

      if (o == null || getClass() != o.getClass()) {
        return false;
      }

      final FlowConfigID that = (FlowConfigID) o;

      return new EqualsBuilder()
          .append(this.projectId, that.projectId)
          .append(this.projectVerison, that.projectVerison)
          .append(this.flowVersion, that.flowVersion)
          .append(this.flowId, that.flowId)
          .isEquals();
    }

    @Override
    public int hashCode() {
      return new HashCodeBuilder(17, 37)
          .append(this.projectId)
          .append(this.projectVerison)
          .append(this.flowId)
          .append(this.flowVersion)
          .toHashCode();
    }

    public int getFlowVersion() {
      return this.flowVersion;
    }
  }
}