azkaban-aplcache

flow trigger JDBC loader (#1615) The PR added JDBC loader

1/25/2018 10:34:38 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index e8326be..037c947 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -79,6 +79,9 @@ public class Constants {
   // One Schedule's default End Time: 01/01/2050, 00:00:00, UTC
   public static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
 
+  // The flow exec id for a flow trigger instance which hasn't started a flow yet
+  public static final int UNASSIGNED_EXEC_ID = -1;
+
   public static class ConfigurationKeys {
 
     // Configures Azkaban Flow Version in project YAML file
@@ -179,7 +182,7 @@ public class Constants {
     public static final String AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION = "azkaban.storage.artifact.max.retention";
 
     // enable Quartz Scheduler if true.
-    public static final String ENABLE_QUARTZ= "azkaban.server.schedule.enable_quartz";
+    public static final String ENABLE_QUARTZ = "azkaban.server.schedule.enable_quartz";
 
     public static final String CUSTOM_CREDENTIAL_NAME = "azkaban.security.credential";
   }
diff --git a/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java b/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
index 9aaad84..cdf951e 100644
--- a/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
+++ b/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
@@ -25,6 +25,7 @@ import azkaban.executor.Status;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 import azkaban.utils.Props;
+import com.google.gson.Gson;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -116,4 +117,15 @@ public class FlowUtils {
     exflow.addAllProxyUsers(project.getProxyUsers());
     return exflow;
   }
+
+  public static String toJson(final Project proj) {
+    final Gson gson = new Gson();
+    final String jsonStr = gson.toJson(proj);
+    return jsonStr;
+  }
+
+  public static Project toProject(final String json) {
+    final Gson gson = new Gson();
+    return gson.fromJson(json, Project.class);
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/CronSchedule.java b/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
index 9875b9a..6e74471 100644
--- a/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
+++ b/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
@@ -18,6 +18,8 @@ package azkaban.project;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
 
 /**
  * FlowTriggerSchedule is the logical representation of a cron-based schedule.
@@ -40,4 +42,28 @@ public class CronSchedule {
   public String getCronExpression() {
     return this.cronExpression;
   }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    final CronSchedule that = (CronSchedule) o;
+
+    return new EqualsBuilder()
+        .append(this.cronExpression, that.cronExpression)
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+        .append(this.cronExpression)
+        .toHashCode();
+  }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/FlowTriggerInstanceLoader.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/FlowTriggerInstanceLoader.java
new file mode 100644
index 0000000..38f1618
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/FlowTriggerInstanceLoader.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.db;
+
+import azkaban.flowtrigger.DependencyInstance;
+import azkaban.flowtrigger.TriggerInstance;
+import java.util.Collection;
+
+public interface FlowTriggerInstanceLoader {
+
+  /**
+   * Upload a trigger instance into db
+   */
+  void uploadTriggerInstance(TriggerInstance triggerInstance);
+
+  /**
+   * Update dependency status, cancellation cause and end time
+   */
+  void updateDependencyExecutionStatus(DependencyInstance depInst);
+
+  /**
+   * Retrieve trigger instances not in done state(cancelling, running, or succeeded but associated
+   * flow hasn't been triggered yet). This is used when recovering unfinished
+   * trigger instance during web server restarts.
+   */
+  Collection<TriggerInstance> getIncompleteTriggerInstances();
+
+  /**
+   * Update associated flow execution id for a trigger instance. This will be called when a trigger
+   * instance successfully starts a flow.
+   */
+  void updateAssociatedFlowExecId(TriggerInstance triggerInst);
+
+  /**
+   * Retrieve recently finished trigger instances.
+   *
+   * @param limit number of trigger instances to retrieve
+   */
+  Collection<TriggerInstance> getRecentlyFinished(int limit);
+
+  TriggerInstance getTriggerInstanceById(String triggerInstanceId);
+
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/JdbcFlowTriggerInstanceLoaderImpl.java
new file mode 100644
index 0000000..6ae50a8
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -0,0 +1,466 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.db;
+
+import azkaban.Constants;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.SQLTransaction;
+import azkaban.flow.FlowUtils;
+import azkaban.flowtrigger.CancellationCause;
+import azkaban.flowtrigger.DependencyException;
+import azkaban.flowtrigger.DependencyInstance;
+import azkaban.flowtrigger.Status;
+import azkaban.flowtrigger.TriggerInstance;
+import azkaban.project.FlowLoaderUtils;
+import azkaban.project.FlowTrigger;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Singleton
+public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoader {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(JdbcFlowTriggerInstanceLoaderImpl.class);
+
+  private static final String[] DEPENDENCY_EXECUTIONS_COLUMNS = {"trigger_instance_id", "dep_name",
+      "starttime", "endtime", "dep_status", "cancelleation_cause", "project_id", "project_version",
+      "flow_id", "flow_version", "project_json", "flow_exec_id"};
+
+  private static final String DEPENDENCY_EXECUTION_TABLE = "execution_dependencies";
+
+  private static final String INSERT_DEPENDENCY = String.format("INSERT INTO %s(%s) VALUES(%s);"
+      + "", DEPENDENCY_EXECUTION_TABLE, StringUtils.join
+      (DEPENDENCY_EXECUTIONS_COLUMNS, ","), String.join(",", Collections.nCopies
+      (DEPENDENCY_EXECUTIONS_COLUMNS.length, "?")));
+
+  private static final String UPDATE_DEPENDENCY_STATUS_ENDTIME_AND_CANCELLEATION_CAUSE = String
+      .format
+          ("UPDATE %s SET dep_status = ?, endtime = ?, cancelleation_cause  = ? WHERE trigger_instance_id = "
+              + "? AND dep_name = ? ;", DEPENDENCY_EXECUTION_TABLE);
+
+
+  private static final String SELECT_EXECUTIONS_BY_INSTANCE_ID =
+      String.format("SELECT %s FROM %s WHERE trigger_instance_id = ?",
+          StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
+          DEPENDENCY_EXECUTION_TABLE);
+
+  private static final String SELECT_ALL_PENDING_EXECUTIONS =
+      String
+          .format(
+              "SELECT %s FROM %s WHERE trigger_instance_id in (SELECT trigger_instance_id FROM %s "
+                  + "WHERE "
+                  + "dep_status = %s or dep_status = %s or (dep_status = %s and "
+                  + "flow_exec_id = %s))",
+              StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
+              DEPENDENCY_EXECUTION_TABLE,
+              DEPENDENCY_EXECUTION_TABLE,
+              Status.RUNNING.ordinal(), Status.CANCELLING.ordinal(),
+              Status.SUCCEEDED.ordinal(),
+              Constants.UNASSIGNED_EXEC_ID);
+
+  private static final String SELECT_RECENTLY_FINISHED =
+      "SELECT execution_dependencies.trigger_instance_id,dep_name,starttime,endtime,dep_status,"
+          + "cancelleation_cause,project_id,project_version,flow_id,flow_version,"
+          + "project_json, flow_exec_id \n"
+          + "FROM execution_dependencies JOIN (\n"
+          + "SELECT trigger_instance_id FROM execution_dependencies WHERE trigger_instance_id not in (\n"
+          + "SELECT distinct(trigger_instance_id)  FROM execution_dependencies WHERE dep_status =  0 or dep_status = 4)\n"
+          + "GROUP BY trigger_instance_id\n"
+          + "ORDER BY  min(starttime) desc limit %s) temp on execution_dependencies"
+          + ".trigger_instance_id in (temp.trigger_instance_id);";
+
+  private static final String UPDATE_DEPENDENCY_FLOW_EXEC_ID = String.format("UPDATE %s SET "
+      + "flow_exec_id "
+      + "= ? WHERE trigger_instance_id = ? AND dep_name = ? ;", DEPENDENCY_EXECUTION_TABLE);
+
+  private final DatabaseOperator dbOperator;
+  private final ProjectLoader projectLoader;
+
+  @Inject
+  public JdbcFlowTriggerInstanceLoaderImpl(final DatabaseOperator databaseOperator,
+      final ProjectLoader projectLoader) {
+    this.dbOperator = databaseOperator;
+    this.projectLoader = projectLoader;
+  }
+
+  @Override
+  public Collection<TriggerInstance> getIncompleteTriggerInstances() {
+    Collection<TriggerInstance> unfinished = Collections.EMPTY_LIST;
+    try {
+      unfinished = this.dbOperator
+          .query(SELECT_ALL_PENDING_EXECUTIONS, new TriggerInstanceHandler());
+
+      // backfilling flow trigger for unfinished trigger instances
+      // dedup flow config id with a set to avoid downloading/parsing same flow file multiple times
+
+      final Set<FlowConfigID> flowConfigIDSet = unfinished.stream()
+          .map(triggerInstance -> new FlowConfigID(triggerInstance.getProject().getId(),
+              triggerInstance.getProject().getVersion(), triggerInstance.getFlowId(),
+              triggerInstance.getFlowVersion())).collect(Collectors.toSet());
+
+      final Map<FlowConfigID, FlowTrigger> flowTriggers = new HashMap<>();
+      for (final FlowConfigID flowConfigID : flowConfigIDSet) {
+        final File tempDir = Files.createTempDir();
+        try {
+          final File flowFile = this.projectLoader
+              .getUploadedFlowFile(flowConfigID.getProjectId(), flowConfigID.getProjectVersion(),
+                  flowConfigID.getFlowId() + ".flow", flowConfigID.getFlowVersion(), tempDir);
+
+          if (flowFile != null) {
+            final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
+            if (flowTrigger != null) {
+              flowTriggers.put(flowConfigID, flowTrigger);
+            }
+          } else {
+            logger.error("Unable to find flow file for " + flowConfigID);
+          }
+        } catch (final IOException ex) {
+          logger.error("error in getting flow file", ex);
+        } finally {
+          FlowLoaderUtils.cleanUpDir(tempDir);
+        }
+      }
+
+      for (final TriggerInstance triggerInst : unfinished) {
+        triggerInst.setFlowTrigger(flowTriggers.get(new FlowConfigID(triggerInst.getProject()
+            .getId(), triggerInst.getProject().getVersion(), triggerInst.getFlowId(),
+            triggerInst.getFlowVersion())));
+      }
+
+    } catch (final SQLException ex) {
+      handleSQLException(ex);
+    }
+
+    return unfinished;
+  }
+
+  private void handleSQLException(final SQLException ex)
+      throws DependencyException {
+    final String error = "exception when accessing db!";
+    logger.error(error, ex);
+    throw new DependencyException(error, ex);
+  }
+
+  @Override
+  public void updateAssociatedFlowExecId(final TriggerInstance triggerInst) {
+    final SQLTransaction<Integer> insertTrigger = transOperator -> {
+      for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+        transOperator
+            .update(UPDATE_DEPENDENCY_FLOW_EXEC_ID, triggerInst.getFlowExecId(),
+                triggerInst.getId(), depInst.getDepName());
+      }
+      return null;
+    };
+    executeTransaction(insertTrigger);
+  }
+
+  private void executeUpdate(final String query, final Object... params) {
+    try {
+      this.dbOperator.update(query, params);
+    } catch (final SQLException ex) {
+      handleSQLException(ex);
+    }
+  }
+
+  private void executeTransaction(final SQLTransaction<Integer> tran) {
+    try {
+      this.dbOperator.transaction(tran);
+    } catch (final SQLException ex) {
+      handleSQLException(ex);
+    }
+  }
+
+  @Override
+  public void uploadTriggerInstance(final TriggerInstance triggerInst) {
+    final SQLTransaction<Integer> insertTrigger = transOperator -> {
+      for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+        transOperator
+            .update(INSERT_DEPENDENCY, triggerInst.getId(), depInst.getDepName(),
+                depInst.getStartTime(), depInst.getEndTime(), depInst.getStatus().ordinal(),
+                depInst.getCancellationCause().ordinal(),
+                triggerInst.getProject().getId(),
+                triggerInst.getProject().getVersion(),
+                triggerInst.getFlowId(),
+                triggerInst.getFlowVersion(),
+                FlowUtils.toJson(triggerInst.getProject()),
+                triggerInst.getFlowExecId());
+      }
+      return null;
+    };
+
+    executeTransaction(insertTrigger);
+  }
+
+  @Override
+  public void updateDependencyExecutionStatus(final DependencyInstance depInst) {
+    executeUpdate(UPDATE_DEPENDENCY_STATUS_ENDTIME_AND_CANCELLEATION_CAUSE, depInst.getStatus()
+            .ordinal(),
+        depInst.getEndTime(), depInst.getCancellationCause().ordinal(),
+        depInst.getTriggerInstance().getId(),
+        depInst.getDepName());
+  }
+
+  /**
+   * Retrieve recently finished trigger instances, but flow trigger properties are not populated
+   * into the returned trigger instances for efficiency. Flow trigger properties will be
+   * retrieved only on request time.
+   */
+  @Override
+  public Collection<TriggerInstance> getRecentlyFinished(final int limit) {
+    final String query = String.format(SELECT_RECENTLY_FINISHED, limit);
+    try {
+      return this.dbOperator.query(query, new TriggerInstanceHandler());
+    } catch (final SQLException ex) {
+      handleSQLException(ex);
+    }
+    return Collections.emptyList();
+  }
+
+  /**
+   * Retrieve a trigger instance given an instance id. Flow trigger properties will also be
+   * populated into the returned trigger instance.
+   */
+  @Override
+  public TriggerInstance getTriggerInstanceById(final String triggerInstanceId) {
+    TriggerInstance triggerInstance = null;
+    try {
+      final Collection<TriggerInstance> res = this.dbOperator
+          .query(SELECT_EXECUTIONS_BY_INSTANCE_ID, new TriggerInstanceHandler(), triggerInstanceId);
+      triggerInstance = !res.isEmpty() ? res.iterator().next() : null;
+    } catch (final SQLException ex) {
+      handleSQLException(ex);
+    }
+    if (triggerInstance != null) {
+      final int projectId = triggerInstance.getProject().getId();
+      final int projectVersion = triggerInstance.getProject().getVersion();
+      final String flowFileName = triggerInstance.getFlowId() + ".flow";
+      final int flowVersion = triggerInstance.getFlowVersion();
+      final File tempDir = Files.createTempDir();
+      try {
+        final File flowFile = this.projectLoader
+            .getUploadedFlowFile(projectId, projectVersion, flowFileName, flowVersion, tempDir);
+
+        if (flowFile != null) {
+          final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
+          if (flowTrigger != null) {
+            triggerInstance.setFlowTrigger(flowTrigger);
+          }
+        } else {
+          logger.error("Unable to find flow file for " + triggerInstanceId);
+        }
+      } catch (final IOException ex) {
+        logger.error("error in getting flow file", ex);
+      } finally {
+        FlowLoaderUtils.cleanUpDir(tempDir);
+      }
+    }
+    return triggerInstance;
+  }
+
+
+  private static class TriggerInstanceHandler implements
+      ResultSetHandler<Collection<TriggerInstance>> {
+
+    public TriggerInstanceHandler() {
+    }
+
+    @Override
+    public Collection<TriggerInstance> handle(final ResultSet rs) throws SQLException {
+      final Map<TriggerInstKey, List<DependencyInstance>> triggerInstMap = new HashMap<>();
+
+      while (rs.next()) {
+        final String triggerInstId = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[0]);
+        final String depName = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[1]);
+        final Date startTime = rs.getTimestamp(DEPENDENCY_EXECUTIONS_COLUMNS[2]);
+        final Date endTime = rs.getTimestamp(DEPENDENCY_EXECUTIONS_COLUMNS[3]);
+        final Status status = Status.values()[rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[4])];
+        final CancellationCause cause = CancellationCause.values()[rs.getInt
+            (DEPENDENCY_EXECUTIONS_COLUMNS[5])];
+        final int projId = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[6]);
+        final int projVersion = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[7]);
+        final String flowId = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[8]);
+        final int flowVersion = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[9]);
+        final Project project = FlowUtils
+            .toProject(rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[10]));
+        final int flowExecId = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[11]);
+
+        final TriggerInstKey key = new TriggerInstKey(triggerInstId, project.getLastModifiedUser(),
+            projId, projVersion, flowId, flowVersion, flowExecId, project);
+        List<DependencyInstance> dependencyInstanceList = triggerInstMap.get(key);
+        final DependencyInstance depInst = new DependencyInstance(depName, startTime, endTime,
+            null, status, cause);
+        if (dependencyInstanceList == null) {
+          dependencyInstanceList = new ArrayList<>();
+          triggerInstMap.put(key, dependencyInstanceList);
+        }
+
+        dependencyInstanceList.add(depInst);
+      }
+
+      final List<TriggerInstance> res = new ArrayList<>();
+      for (final Map.Entry<TriggerInstKey, List<DependencyInstance>> entry : triggerInstMap
+          .entrySet()) {
+        res.add(new TriggerInstance(entry.getKey().triggerInstId, null, entry.getKey()
+            .flowConfigID.flowId, entry.getKey().flowConfigID.flowVersion, entry.getKey()
+            .submitUser, entry.getValue(), entry.getKey().flowExecId, entry.getKey().project));
+      }
+
+      //sort on start time in ascending order
+      Collections.sort(res, (o1, o2) -> {
+        if (o1.getStartTime() == null && o2.getStartTime() == null) {
+          return 0;
+        } else if (o1.getStartTime() != null && o2.getStartTime() != null) {
+          return o1.getStartTime().compareTo(o2.getStartTime());
+        } else {
+          return o1.getStartTime() == null ? -1 : 1;
+        }
+      });
+
+      return res;
+    }
+
+    private static class TriggerInstKey {
+
+      String triggerInstId;
+      FlowConfigID flowConfigID;
+      String submitUser;
+      int flowExecId;
+      Project project;
+
+      public TriggerInstKey(final String triggerInstId, final String submitUser, final int projId,
+          final int projVersion, final String flowId, final int flowVerion, final int flowExecId,
+          final Project project) {
+        this.triggerInstId = triggerInstId;
+        this.flowConfigID = new FlowConfigID(projId, projVersion, flowId, flowVerion);
+        this.submitUser = submitUser;
+        this.flowExecId = flowExecId;
+        this.project = project;
+      }
+
+      @Override
+      public boolean equals(final Object o) {
+        if (this == o) {
+          return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+          return false;
+        }
+
+        final TriggerInstKey that = (TriggerInstKey) o;
+
+        return new EqualsBuilder()
+            .append(this.triggerInstId, that.triggerInstId)
+            .append(this.flowConfigID, that.flowConfigID)
+            .isEquals();
+      }
+
+      @Override
+      public int hashCode() {
+        return new HashCodeBuilder(17, 37)
+            .append(this.triggerInstId)
+            .append(this.flowConfigID)
+            .toHashCode();
+      }
+    }
+  }
+
+  public static class FlowConfigID {
+
+    private final int projectId;
+    private final int projectVerison;
+    private final String flowId;
+    private final int flowVersion;
+
+    public FlowConfigID(final int projectId, final int projectVerison, final String flowId,
+        final int flowVersion) {
+      this.projectId = projectId;
+      this.projectVerison = projectVerison;
+      this.flowId = flowId;
+      this.flowVersion = flowVersion;
+    }
+
+    public int getProjectId() {
+      return this.projectId;
+    }
+
+    public int getProjectVersion() {
+      return this.projectVerison;
+    }
+
+    public String getFlowId() {
+      return this.flowId;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      final FlowConfigID that = (FlowConfigID) o;
+
+      return new EqualsBuilder()
+          .append(this.projectId, that.projectId)
+          .append(this.projectVerison, that.projectVerison)
+          .append(this.flowVersion, that.flowVersion)
+          .append(this.flowId, that.flowId)
+          .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder(17, 37)
+          .append(this.projectId)
+          .append(this.projectVerison)
+          .append(this.flowId)
+          .append(this.flowVersion)
+          .toHashCode();
+    }
+
+    public int getFlowVersion() {
+      return this.flowVersion;
+    }
+  }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyException.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyException.java
new file mode 100644
index 0000000..002e440
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyException.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import azkaban.spi.AzkabanException;
+
+public class DependencyException extends AzkabanException {
+
+  private static final long serialVersionUID = 1L;
+
+  public DependencyException(final String message) {
+    super(message);
+  }
+
+  public DependencyException(final String message, final Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstance.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstance.java
index d03968c..e175d21 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstance.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstance.java
@@ -40,6 +40,18 @@ public class DependencyInstance {
     this.cause = cause;
   }
 
+  @Override
+  public String toString() {
+    return "DependencyInstance{" +
+        "startTime=" + this.startTime.getTime() +
+        ", depName='" + this.depName + '\'' +
+        ", context=" + this.context +
+        ", endTime=" + (this.endTime == null ? null : this.endTime.getTime()) +
+        ", status=" + this.status +
+        ", cause=" + this.cause +
+        '}';
+  }
+
   public CancellationCause getCancellationCause() {
     return this.cause;
   }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
index 592aa55..d63c7c1 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
@@ -54,6 +54,20 @@ public class TriggerInstance {
     }
   }
 
+  @Override
+  public String toString() {
+    return "TriggerInstance{" +
+        "depInstances=" + depInstances +
+        ", id='" + id + '\'' +
+        ", submitUser='" + submitUser + '\'' +
+        ", project=" + project +
+        ", flowId='" + flowId + '\'' +
+        ", flowVersion=" + flowVersion +
+        ", flowTrigger=" + flowTrigger +
+        ", flowExecId=" + flowExecId +
+        '}';
+  }
+
   public Project getProject() {
     return this.project;
   }
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
new file mode 100644
index 0000000..71045b0
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -0,0 +1,340 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.flowtrigger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.Constants;
+import azkaban.db.DatabaseOperator;
+import azkaban.flowtrigger.db.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.db.JdbcFlowTriggerInstanceLoaderImpl;
+import azkaban.project.DirectoryYamlFlowLoader;
+import azkaban.project.FlowLoaderUtils;
+import azkaban.project.FlowTrigger;
+import azkaban.project.FlowTriggerDependency;
+import azkaban.project.JdbcProjectImpl;
+import azkaban.project.JdbcProjectImplTest;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
+import java.io.File;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FlowTriggerInstanceLoaderTest {
+
+  private static final Logger logger = LoggerFactory.getLogger(FlowTriggerInstanceLoaderTest.class);
+  private static final String test_project_zip_dir = "flowtriggeryamltest";
+  private static final String test_flow_file = "flow_trigger.flow";
+  private static final int project_id = 123;
+  private static final String project_name = "test";
+  private static final int project_version = 3;
+  private static final String flow_id = "flow_trigger";
+  private static final int flow_version = 1;
+  private static final Props props = new Props();
+  private static final String submitUser = "uploadUser1";
+  private static DatabaseOperator dbOperator;
+  private static ProjectLoader projLoader;
+  private static FlowTrigger flowTrigger;
+  private static FlowTriggerInstanceLoader triggerInstLoader;
+  private static Project project;
+
+  @AfterClass
+  public static void destroyDB() {
+    try {
+      dbOperator.update("SHUTDOWN");
+      dbOperator.update("DROP ALL OBJECTS");
+      dbOperator.update("SHUTDOWN");
+    } catch (final SQLException e) {
+      logger.error("unable to destroy db", e);
+    }
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    dbOperator = Utils.initTestDB();
+    projLoader = new JdbcProjectImpl(props, dbOperator);
+    triggerInstLoader = new JdbcFlowTriggerInstanceLoaderImpl(dbOperator, projLoader);
+    project = new Project(project_id, project_name);
+    final DirectoryYamlFlowLoader yamlFlowLoader = new DirectoryYamlFlowLoader(new Props());
+    yamlFlowLoader
+        .loadProjectFlow(project, ExecutionsTestUtil.getFlowDir(test_project_zip_dir));
+    project.setVersion(project_version);
+    project.setFlows(yamlFlowLoader.getFlowMap());
+    project.setLastModifiedUser(submitUser);
+
+    final File flowFile = new File(JdbcProjectImplTest.class.getClassLoader().getResource
+        (test_flow_file).getFile());
+
+    projLoader
+        .uploadFlowFile(project_id, project_version, flowFile, flow_version);
+    flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
+  }
+
+
+  private TriggerInstance createTriggerInstance(final FlowTrigger flowTrigger, final String flowId,
+      final int flowVersion, final String submitUser, final Project project, final long startTime) {
+    final String triggerInstId = UUID.randomUUID().toString();
+    final List<DependencyInstance> depInstList = new ArrayList<>();
+    for (final FlowTriggerDependency dep : flowTrigger.getDependencies()) {
+      final String depName = dep.getName();
+      final Date startDate = new Date(startTime);
+      final DependencyInstanceContext context = new TestDependencyInstanceContext(null, null, null);
+      final Status status = Status.RUNNING;
+      final CancellationCause cause = CancellationCause.NONE;
+      final Date endTime = null;
+      final DependencyInstance depInst = new DependencyInstance(depName, startDate, endTime,
+          context, status, cause);
+      depInstList.add(depInst);
+    }
+
+    final int flowExecId = Constants.UNASSIGNED_EXEC_ID;
+    final TriggerInstance triggerInstance = new TriggerInstance(triggerInstId, flowTrigger,
+        flowId, flowVersion, submitUser, depInstList, flowExecId, project);
+
+    return triggerInstance;
+  }
+
+
+  @Test
+  public void testUploadTriggerInstance() {
+    final TriggerInstance expectedTriggerInst = this.createTriggerInstance(this.flowTrigger, this
+        .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis());
+
+    this.triggerInstLoader.uploadTriggerInstance(expectedTriggerInst);
+
+    final TriggerInstance actualTriggerInst = this.triggerInstLoader
+        .getTriggerInstanceById(expectedTriggerInst.getId());
+
+    assertThat(expectedTriggerInst.getFlowTrigger().toString())
+        .isEqualToIgnoringWhitespace(actualTriggerInst.getFlowTrigger().toString());
+
+    assertThat(expectedTriggerInst).isEqualToIgnoringGivenFields(actualTriggerInst,
+        "depInstances", "flowTrigger");
+
+    assertThat(expectedTriggerInst.getDepInstances())
+        .usingElementComparatorIgnoringFields("triggerInstance", "context")
+        .containsAll(actualTriggerInst.getDepInstances())
+        .hasSameSizeAs(actualTriggerInst.getDepInstances());
+  }
+
+  private void assertTriggerInstancesEqual(final TriggerInstance actual,
+      final TriggerInstance expected, final boolean ignoreFlowTrigger) {
+    if (!ignoreFlowTrigger) {
+      if (actual.getFlowTrigger() != null && expected.getFlowTrigger() != null) {
+        assertThat(actual.getFlowTrigger().toString())
+            .isEqualToIgnoringWhitespace(expected.getFlowTrigger().toString());
+      } else {
+        assertThat(actual.getFlowTrigger()).isNull();
+        assertThat(expected.getFlowTrigger()).isNull();
+      }
+    }
+
+    assertThat(actual).isEqualToIgnoringGivenFields(expected, "depInstances", "flowTrigger");
+
+    assertThat(actual.getDepInstances())
+        .usingComparatorForElementFieldsWithType((d1, d2) -> {
+          if (d1 == null && d2 == null) {
+            return 0;
+          } else if (d1 != null && d2 != null && d1.getTime() == d2.getTime()) {
+            return 0;
+          } else {
+            return -1;
+          }
+        }, Date.class)
+        .usingElementComparatorIgnoringFields("triggerInstance", "context")
+        .containsExactlyInAnyOrder(expected.getDepInstances()
+            .toArray(new DependencyInstance[expected.getDepInstances().size()]));
+  }
+
+  @Test
+  public void testUpdateDependencyExecutionStatus() {
+    final TriggerInstance expectedTriggerInst = this.createTriggerInstance(this.flowTrigger, this
+        .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis());
+
+    this.triggerInstLoader.uploadTriggerInstance(expectedTriggerInst);
+    for (final DependencyInstance depInst : expectedTriggerInst.getDepInstances()) {
+      depInst.setStatus(Status.CANCELLED);
+      depInst.setEndTime(new Date());
+      depInst.setCancellationCause(CancellationCause.MANUAL);
+      this.triggerInstLoader.updateDependencyExecutionStatus(depInst);
+    }
+
+    final TriggerInstance actualTriggerInst = this.triggerInstLoader
+        .getTriggerInstanceById(expectedTriggerInst.getId());
+    assertTriggerInstancesEqual(actualTriggerInst, expectedTriggerInst, false);
+  }
+
+  private void finalizeTriggerInstanceWithSuccess(final TriggerInstance triggerInst, final int
+      associateFlowExecId) {
+    for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+      depInst.setStatus(Status.SUCCEEDED);
+      depInst.getTriggerInstance().setFlowExecId(associateFlowExecId);
+    }
+  }
+
+  private void finalizeTriggerInstanceWithCancelled(final TriggerInstance triggerInst) {
+    for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+      depInst.setStatus(Status.CANCELLED);
+      depInst.setCancellationCause(CancellationCause.TIMEOUT);
+      depInst.setEndTime(new Date());
+    }
+  }
+
+  @Test
+  public void testGetIncompleteTriggerInstancesReturnsEmpty() {
+    final List<TriggerInstance> all = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      all.add(this.createTriggerInstance(this.flowTrigger, this
+          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
+      if (i <= 2) {
+        finalizeTriggerInstanceWithCancelled(all.get(i));
+      } else {
+        finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+      }
+    }
+
+    all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+
+    final List<TriggerInstance> actual = new ArrayList<>(this.triggerInstLoader
+        .getIncompleteTriggerInstances());
+    all.sort(Comparator.comparing(TriggerInstance::getId));
+    actual.sort(Comparator.comparing(TriggerInstance::getId));
+
+    assertThat(actual).isEmpty();
+  }
+
+  @Test
+  public void testGetIncompleteTriggerInstances() {
+    final List<TriggerInstance> allInstances = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      allInstances.add(this.createTriggerInstance(this.flowTrigger, this
+          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
+    }
+
+    finalizeTriggerInstanceWithCancelled(allInstances.get(0));
+    finalizeTriggerInstanceWithSuccess(allInstances.get(1), 1000);
+    // this trigger instance should still count as incomplete one since no flow execution has
+    // been started
+    finalizeTriggerInstanceWithSuccess(allInstances.get(2), -1);
+
+    allInstances.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+
+    final List<TriggerInstance> expected = allInstances.subList(2, allInstances.size());
+    final List<TriggerInstance> actual = new ArrayList<>(this.triggerInstLoader
+        .getIncompleteTriggerInstances());
+    assertTwoTriggerInstanceListsEqual(actual, expected, false, false);
+  }
+
+  private void assertTwoTriggerInstanceListsEqual(final List<TriggerInstance> actual,
+      final List<TriggerInstance> expected, final boolean ignoreFlowTrigger,
+      final boolean keepOriginalOrder) {
+    if (!keepOriginalOrder) {
+      expected.sort(Comparator.comparing(TriggerInstance::getId));
+      actual.sort(Comparator.comparing(TriggerInstance::getId));
+    }
+
+    assertThat(actual).hasSameSizeAs(expected);
+    final Iterator<TriggerInstance> it1 = actual.iterator();
+    final Iterator<TriggerInstance> it2 = expected.iterator();
+    while (it1.hasNext() && it2.hasNext()) {
+      //8bfafb89-ac79-45a0-a049-b55038b0886b
+      assertTriggerInstancesEqual(it1.next(), it2.next(), ignoreFlowTrigger);
+    }
+  }
+
+  @Test
+  public void testUpdateAssociatedFlowExecId() {
+    final TriggerInstance expectedTriggerInst = this.createTriggerInstance(this.flowTrigger, this
+        .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis());
+    this.triggerInstLoader.uploadTriggerInstance(expectedTriggerInst);
+    finalizeTriggerInstanceWithSuccess(expectedTriggerInst, 1000);
+
+    expectedTriggerInst.getDepInstances()
+        .forEach(depInst -> this.triggerInstLoader.updateDependencyExecutionStatus(depInst));
+
+    this.triggerInstLoader.updateAssociatedFlowExecId(expectedTriggerInst);
+
+    final TriggerInstance actualTriggerInst = this.triggerInstLoader
+        .getTriggerInstanceById(expectedTriggerInst.getId());
+
+    assertTriggerInstancesEqual(actualTriggerInst, expectedTriggerInst, false);
+  }
+
+  @Test
+  public void testGetRecentlyFinishedReturnsEmpty() {
+    final List<TriggerInstance> all = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      all.add(this.createTriggerInstance(this.flowTrigger, this
+          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
+    }
+
+    all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+
+    final Collection<TriggerInstance> recentlyFinished = this.triggerInstLoader
+        .getRecentlyFinished(10);
+    assertThat(recentlyFinished).isEmpty();
+  }
+
+  @Test
+  public void testGetRecentlyFinished() {
+
+    final List<TriggerInstance> all = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      all.add(this.createTriggerInstance(this.flowTrigger, this
+          .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
+          + i * 10000));
+      if (i <= 3) {
+        finalizeTriggerInstanceWithCancelled(all.get(i));
+      } else if (i <= 6) {
+        finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+      }
+    }
+
+    all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+
+    final List<TriggerInstance> expected = all.subList(0, 7);
+    expected.sort(Comparator.comparing(TriggerInstance::getStartTime));
+
+    final Collection<TriggerInstance> recentlyFinished = this.triggerInstLoader
+        .getRecentlyFinished(10);
+    assertTwoTriggerInstanceListsEqual(new ArrayList<>(recentlyFinished), expected, true, true);
+  }
+
+  @After
+  public void cleanDB() {
+    try {
+      dbOperator.update("TRUNCATE TABLE execution_dependencies");
+    } catch (final SQLException e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TestDependencyInstanceContext.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TestDependencyInstanceContext.java
new file mode 100644
index 0000000..9ca54a5
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TestDependencyInstanceContext.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+public class TestDependencyInstanceContext implements DependencyInstanceContext {
+
+  private final DependencyInstanceCallback callback;
+
+  public TestDependencyInstanceContext(final DependencyInstanceConfig config,
+      final DependencyInstanceRuntimeProps runtimeProps,
+      final DependencyInstanceCallback callback) {
+    this.callback = callback;
+  }
+
+  @Override
+  public void cancel() {
+    this.callback.onCancel(this);
+  }
+}
diff --git a/azkaban-web-server/src/test/resources/flow_trigger.flow b/azkaban-web-server/src/test/resources/flow_trigger.flow
new file mode 100644
index 0000000..4157128
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/flow_trigger.flow
@@ -0,0 +1,59 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 1
+  schedule:
+    type: cron
+    value: 0/5 * * * * ?
+
+  triggerDependencies:
+    - name: search-impression # an unique name to identify the dependency
+      type: dali
+      params:
+        view: search_mp_versioned.search_impression_event_0_0_47
+        delay: 1
+        window: 1
+        unit: daily
+        filter: is_guest=0
+
+    - name: other-name
+      type: dali
+      params:
+        view: another dataset
+        delay: 1
+        window: 7
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+  failure.emails: chren@linkedin.com
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: sleep 10