azkaban-aplcache
Changes
azkaban-web-server/src/main/java/azkaban/flowtrigger/db/JdbcFlowTriggerInstanceLoaderImpl.java 466(+466 -0)
Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index e8326be..037c947 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -79,6 +79,9 @@ public class Constants {
// One Schedule's default End Time: 01/01/2050, 00:00:00, UTC
public static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
+ // The flow exec id for a flow trigger instance which hasn't started a flow yet
+ public static final int UNASSIGNED_EXEC_ID = -1;
+
public static class ConfigurationKeys {
// Configures Azkaban Flow Version in project YAML file
@@ -179,7 +182,7 @@ public class Constants {
public static final String AZKABAN_STORAGE_ARTIFACT_MAX_RETENTION = "azkaban.storage.artifact.max.retention";
// enable Quartz Scheduler if true.
- public static final String ENABLE_QUARTZ= "azkaban.server.schedule.enable_quartz";
+ public static final String ENABLE_QUARTZ = "azkaban.server.schedule.enable_quartz";
public static final String CUSTOM_CREDENTIAL_NAME = "azkaban.security.credential";
}
diff --git a/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java b/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
index 9aaad84..cdf951e 100644
--- a/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
+++ b/azkaban-common/src/main/java/azkaban/flow/FlowUtils.java
@@ -25,6 +25,7 @@ import azkaban.executor.Status;
import azkaban.project.Project;
import azkaban.project.ProjectManager;
import azkaban.utils.Props;
+import com.google.gson.Gson;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -116,4 +117,15 @@ public class FlowUtils {
exflow.addAllProxyUsers(project.getProxyUsers());
return exflow;
}
+
+ public static String toJson(final Project proj) {
+ final Gson gson = new Gson();
+ final String jsonStr = gson.toJson(proj);
+ return jsonStr;
+ }
+
+ public static Project toProject(final String json) {
+ final Gson gson = new Gson();
+ return gson.fromJson(json, Project.class);
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/CronSchedule.java b/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
index 9875b9a..6e74471 100644
--- a/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
+++ b/azkaban-common/src/main/java/azkaban/project/CronSchedule.java
@@ -18,6 +18,8 @@ package azkaban.project;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
/**
* FlowTriggerSchedule is the logical representation of a cron-based schedule.
@@ -40,4 +42,28 @@ public class CronSchedule {
public String getCronExpression() {
return this.cronExpression;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final CronSchedule that = (CronSchedule) o;
+
+ return new EqualsBuilder()
+ .append(this.cronExpression, that.cronExpression)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(this.cronExpression)
+ .toHashCode();
+ }
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/FlowTriggerInstanceLoader.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/FlowTriggerInstanceLoader.java
new file mode 100644
index 0000000..38f1618
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/FlowTriggerInstanceLoader.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.db;
+
+import azkaban.flowtrigger.DependencyInstance;
+import azkaban.flowtrigger.TriggerInstance;
+import java.util.Collection;
+
+public interface FlowTriggerInstanceLoader {
+
+ /**
+ * Upload a trigger instance into db
+ */
+ void uploadTriggerInstance(TriggerInstance triggerInstance);
+
+ /**
+ * Update dependency status, cancellation cause and end time
+ */
+ void updateDependencyExecutionStatus(DependencyInstance depInst);
+
+ /**
+ * Retrieve trigger instances not in done state(cancelling, running, or succeeded but associated
+ * flow hasn't been triggered yet). This is used when recovering unfinished
+ * trigger instance during web server restarts.
+ */
+ Collection<TriggerInstance> getIncompleteTriggerInstances();
+
+ /**
+ * Update associated flow execution id for a trigger instance. This will be called when a trigger
+ * instance successfully starts a flow.
+ */
+ void updateAssociatedFlowExecId(TriggerInstance triggerInst);
+
+ /**
+ * Retrieve recently finished trigger instances.
+ *
+ * @param limit number of trigger instances to retrieve
+ */
+ Collection<TriggerInstance> getRecentlyFinished(int limit);
+
+ TriggerInstance getTriggerInstanceById(String triggerInstanceId);
+
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/JdbcFlowTriggerInstanceLoaderImpl.java
new file mode 100644
index 0000000..6ae50a8
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/db/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -0,0 +1,466 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger.db;
+
+import azkaban.Constants;
+import azkaban.db.DatabaseOperator;
+import azkaban.db.SQLTransaction;
+import azkaban.flow.FlowUtils;
+import azkaban.flowtrigger.CancellationCause;
+import azkaban.flowtrigger.DependencyException;
+import azkaban.flowtrigger.DependencyInstance;
+import azkaban.flowtrigger.Status;
+import azkaban.flowtrigger.TriggerInstance;
+import azkaban.project.FlowLoaderUtils;
+import azkaban.project.FlowTrigger;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Singleton
+public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoader {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(JdbcFlowTriggerInstanceLoaderImpl.class);
+
+ private static final String[] DEPENDENCY_EXECUTIONS_COLUMNS = {"trigger_instance_id", "dep_name",
+ "starttime", "endtime", "dep_status", "cancelleation_cause", "project_id", "project_version",
+ "flow_id", "flow_version", "project_json", "flow_exec_id"};
+
+ private static final String DEPENDENCY_EXECUTION_TABLE = "execution_dependencies";
+
+ private static final String INSERT_DEPENDENCY = String.format("INSERT INTO %s(%s) VALUES(%s);"
+ + "", DEPENDENCY_EXECUTION_TABLE, StringUtils.join
+ (DEPENDENCY_EXECUTIONS_COLUMNS, ","), String.join(",", Collections.nCopies
+ (DEPENDENCY_EXECUTIONS_COLUMNS.length, "?")));
+
+ private static final String UPDATE_DEPENDENCY_STATUS_ENDTIME_AND_CANCELLEATION_CAUSE = String
+ .format
+ ("UPDATE %s SET dep_status = ?, endtime = ?, cancelleation_cause = ? WHERE trigger_instance_id = "
+ + "? AND dep_name = ? ;", DEPENDENCY_EXECUTION_TABLE);
+
+
+ private static final String SELECT_EXECUTIONS_BY_INSTANCE_ID =
+ String.format("SELECT %s FROM %s WHERE trigger_instance_id = ?",
+ StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
+ DEPENDENCY_EXECUTION_TABLE);
+
+ private static final String SELECT_ALL_PENDING_EXECUTIONS =
+ String
+ .format(
+ "SELECT %s FROM %s WHERE trigger_instance_id in (SELECT trigger_instance_id FROM %s "
+ + "WHERE "
+ + "dep_status = %s or dep_status = %s or (dep_status = %s and "
+ + "flow_exec_id = %s))",
+ StringUtils.join(DEPENDENCY_EXECUTIONS_COLUMNS, ","),
+ DEPENDENCY_EXECUTION_TABLE,
+ DEPENDENCY_EXECUTION_TABLE,
+ Status.RUNNING.ordinal(), Status.CANCELLING.ordinal(),
+ Status.SUCCEEDED.ordinal(),
+ Constants.UNASSIGNED_EXEC_ID);
+
+ private static final String SELECT_RECENTLY_FINISHED =
+ "SELECT execution_dependencies.trigger_instance_id,dep_name,starttime,endtime,dep_status,"
+ + "cancelleation_cause,project_id,project_version,flow_id,flow_version,"
+ + "project_json, flow_exec_id \n"
+ + "FROM execution_dependencies JOIN (\n"
+ + "SELECT trigger_instance_id FROM execution_dependencies WHERE trigger_instance_id not in (\n"
+ + "SELECT distinct(trigger_instance_id) FROM execution_dependencies WHERE dep_status = 0 or dep_status = 4)\n"
+ + "GROUP BY trigger_instance_id\n"
+ + "ORDER BY min(starttime) desc limit %s) temp on execution_dependencies"
+ + ".trigger_instance_id in (temp.trigger_instance_id);";
+
+ private static final String UPDATE_DEPENDENCY_FLOW_EXEC_ID = String.format("UPDATE %s SET "
+ + "flow_exec_id "
+ + "= ? WHERE trigger_instance_id = ? AND dep_name = ? ;", DEPENDENCY_EXECUTION_TABLE);
+
+ private final DatabaseOperator dbOperator;
+ private final ProjectLoader projectLoader;
+
+ @Inject
+ public JdbcFlowTriggerInstanceLoaderImpl(final DatabaseOperator databaseOperator,
+ final ProjectLoader projectLoader) {
+ this.dbOperator = databaseOperator;
+ this.projectLoader = projectLoader;
+ }
+
+ @Override
+ public Collection<TriggerInstance> getIncompleteTriggerInstances() {
+ Collection<TriggerInstance> unfinished = Collections.EMPTY_LIST;
+ try {
+ unfinished = this.dbOperator
+ .query(SELECT_ALL_PENDING_EXECUTIONS, new TriggerInstanceHandler());
+
+ // backfilling flow trigger for unfinished trigger instances
+ // dedup flow config id with a set to avoid downloading/parsing same flow file multiple times
+
+ final Set<FlowConfigID> flowConfigIDSet = unfinished.stream()
+ .map(triggerInstance -> new FlowConfigID(triggerInstance.getProject().getId(),
+ triggerInstance.getProject().getVersion(), triggerInstance.getFlowId(),
+ triggerInstance.getFlowVersion())).collect(Collectors.toSet());
+
+ final Map<FlowConfigID, FlowTrigger> flowTriggers = new HashMap<>();
+ for (final FlowConfigID flowConfigID : flowConfigIDSet) {
+ final File tempDir = Files.createTempDir();
+ try {
+ final File flowFile = this.projectLoader
+ .getUploadedFlowFile(flowConfigID.getProjectId(), flowConfigID.getProjectVersion(),
+ flowConfigID.getFlowId() + ".flow", flowConfigID.getFlowVersion(), tempDir);
+
+ if (flowFile != null) {
+ final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
+ if (flowTrigger != null) {
+ flowTriggers.put(flowConfigID, flowTrigger);
+ }
+ } else {
+ logger.error("Unable to find flow file for " + flowConfigID);
+ }
+ } catch (final IOException ex) {
+ logger.error("error in getting flow file", ex);
+ } finally {
+ FlowLoaderUtils.cleanUpDir(tempDir);
+ }
+ }
+
+ for (final TriggerInstance triggerInst : unfinished) {
+ triggerInst.setFlowTrigger(flowTriggers.get(new FlowConfigID(triggerInst.getProject()
+ .getId(), triggerInst.getProject().getVersion(), triggerInst.getFlowId(),
+ triggerInst.getFlowVersion())));
+ }
+
+ } catch (final SQLException ex) {
+ handleSQLException(ex);
+ }
+
+ return unfinished;
+ }
+
+ private void handleSQLException(final SQLException ex)
+ throws DependencyException {
+ final String error = "exception when accessing db!";
+ logger.error(error, ex);
+ throw new DependencyException(error, ex);
+ }
+
+ @Override
+ public void updateAssociatedFlowExecId(final TriggerInstance triggerInst) {
+ final SQLTransaction<Integer> insertTrigger = transOperator -> {
+ for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+ transOperator
+ .update(UPDATE_DEPENDENCY_FLOW_EXEC_ID, triggerInst.getFlowExecId(),
+ triggerInst.getId(), depInst.getDepName());
+ }
+ return null;
+ };
+ executeTransaction(insertTrigger);
+ }
+
+ private void executeUpdate(final String query, final Object... params) {
+ try {
+ this.dbOperator.update(query, params);
+ } catch (final SQLException ex) {
+ handleSQLException(ex);
+ }
+ }
+
+ private void executeTransaction(final SQLTransaction<Integer> tran) {
+ try {
+ this.dbOperator.transaction(tran);
+ } catch (final SQLException ex) {
+ handleSQLException(ex);
+ }
+ }
+
+ @Override
+ public void uploadTriggerInstance(final TriggerInstance triggerInst) {
+ final SQLTransaction<Integer> insertTrigger = transOperator -> {
+ for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+ transOperator
+ .update(INSERT_DEPENDENCY, triggerInst.getId(), depInst.getDepName(),
+ depInst.getStartTime(), depInst.getEndTime(), depInst.getStatus().ordinal(),
+ depInst.getCancellationCause().ordinal(),
+ triggerInst.getProject().getId(),
+ triggerInst.getProject().getVersion(),
+ triggerInst.getFlowId(),
+ triggerInst.getFlowVersion(),
+ FlowUtils.toJson(triggerInst.getProject()),
+ triggerInst.getFlowExecId());
+ }
+ return null;
+ };
+
+ executeTransaction(insertTrigger);
+ }
+
+ @Override
+ public void updateDependencyExecutionStatus(final DependencyInstance depInst) {
+ executeUpdate(UPDATE_DEPENDENCY_STATUS_ENDTIME_AND_CANCELLEATION_CAUSE, depInst.getStatus()
+ .ordinal(),
+ depInst.getEndTime(), depInst.getCancellationCause().ordinal(),
+ depInst.getTriggerInstance().getId(),
+ depInst.getDepName());
+ }
+
+ /**
+ * Retrieve recently finished trigger instances, but flow trigger properties are not populated
+ * into the returned trigger instances for efficiency. Flow trigger properties will be
+ * retrieved only on request time.
+ */
+ @Override
+ public Collection<TriggerInstance> getRecentlyFinished(final int limit) {
+ final String query = String.format(SELECT_RECENTLY_FINISHED, limit);
+ try {
+ return this.dbOperator.query(query, new TriggerInstanceHandler());
+ } catch (final SQLException ex) {
+ handleSQLException(ex);
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Retrieve a trigger instance given an instance id. Flow trigger properties will also be
+ * populated into the returned trigger instance.
+ */
+ @Override
+ public TriggerInstance getTriggerInstanceById(final String triggerInstanceId) {
+ TriggerInstance triggerInstance = null;
+ try {
+ final Collection<TriggerInstance> res = this.dbOperator
+ .query(SELECT_EXECUTIONS_BY_INSTANCE_ID, new TriggerInstanceHandler(), triggerInstanceId);
+ triggerInstance = !res.isEmpty() ? res.iterator().next() : null;
+ } catch (final SQLException ex) {
+ handleSQLException(ex);
+ }
+ if (triggerInstance != null) {
+ final int projectId = triggerInstance.getProject().getId();
+ final int projectVersion = triggerInstance.getProject().getVersion();
+ final String flowFileName = triggerInstance.getFlowId() + ".flow";
+ final int flowVersion = triggerInstance.getFlowVersion();
+ final File tempDir = Files.createTempDir();
+ try {
+ final File flowFile = this.projectLoader
+ .getUploadedFlowFile(projectId, projectVersion, flowFileName, flowVersion, tempDir);
+
+ if (flowFile != null) {
+ final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
+ if (flowTrigger != null) {
+ triggerInstance.setFlowTrigger(flowTrigger);
+ }
+ } else {
+ logger.error("Unable to find flow file for " + triggerInstanceId);
+ }
+ } catch (final IOException ex) {
+ logger.error("error in getting flow file", ex);
+ } finally {
+ FlowLoaderUtils.cleanUpDir(tempDir);
+ }
+ }
+ return triggerInstance;
+ }
+
+
+ private static class TriggerInstanceHandler implements
+ ResultSetHandler<Collection<TriggerInstance>> {
+
+ public TriggerInstanceHandler() {
+ }
+
+ @Override
+ public Collection<TriggerInstance> handle(final ResultSet rs) throws SQLException {
+ final Map<TriggerInstKey, List<DependencyInstance>> triggerInstMap = new HashMap<>();
+
+ while (rs.next()) {
+ final String triggerInstId = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[0]);
+ final String depName = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[1]);
+ final Date startTime = rs.getTimestamp(DEPENDENCY_EXECUTIONS_COLUMNS[2]);
+ final Date endTime = rs.getTimestamp(DEPENDENCY_EXECUTIONS_COLUMNS[3]);
+ final Status status = Status.values()[rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[4])];
+ final CancellationCause cause = CancellationCause.values()[rs.getInt
+ (DEPENDENCY_EXECUTIONS_COLUMNS[5])];
+ final int projId = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[6]);
+ final int projVersion = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[7]);
+ final String flowId = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[8]);
+ final int flowVersion = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[9]);
+ final Project project = FlowUtils
+ .toProject(rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[10]));
+ final int flowExecId = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[11]);
+
+ final TriggerInstKey key = new TriggerInstKey(triggerInstId, project.getLastModifiedUser(),
+ projId, projVersion, flowId, flowVersion, flowExecId, project);
+ List<DependencyInstance> dependencyInstanceList = triggerInstMap.get(key);
+ final DependencyInstance depInst = new DependencyInstance(depName, startTime, endTime,
+ null, status, cause);
+ if (dependencyInstanceList == null) {
+ dependencyInstanceList = new ArrayList<>();
+ triggerInstMap.put(key, dependencyInstanceList);
+ }
+
+ dependencyInstanceList.add(depInst);
+ }
+
+ final List<TriggerInstance> res = new ArrayList<>();
+ for (final Map.Entry<TriggerInstKey, List<DependencyInstance>> entry : triggerInstMap
+ .entrySet()) {
+ res.add(new TriggerInstance(entry.getKey().triggerInstId, null, entry.getKey()
+ .flowConfigID.flowId, entry.getKey().flowConfigID.flowVersion, entry.getKey()
+ .submitUser, entry.getValue(), entry.getKey().flowExecId, entry.getKey().project));
+ }
+
+ //sort on start time in ascending order
+ Collections.sort(res, (o1, o2) -> {
+ if (o1.getStartTime() == null && o2.getStartTime() == null) {
+ return 0;
+ } else if (o1.getStartTime() != null && o2.getStartTime() != null) {
+ return o1.getStartTime().compareTo(o2.getStartTime());
+ } else {
+ return o1.getStartTime() == null ? -1 : 1;
+ }
+ });
+
+ return res;
+ }
+
+ private static class TriggerInstKey {
+
+ String triggerInstId;
+ FlowConfigID flowConfigID;
+ String submitUser;
+ int flowExecId;
+ Project project;
+
+ public TriggerInstKey(final String triggerInstId, final String submitUser, final int projId,
+ final int projVersion, final String flowId, final int flowVerion, final int flowExecId,
+ final Project project) {
+ this.triggerInstId = triggerInstId;
+ this.flowConfigID = new FlowConfigID(projId, projVersion, flowId, flowVerion);
+ this.submitUser = submitUser;
+ this.flowExecId = flowExecId;
+ this.project = project;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final TriggerInstKey that = (TriggerInstKey) o;
+
+ return new EqualsBuilder()
+ .append(this.triggerInstId, that.triggerInstId)
+ .append(this.flowConfigID, that.flowConfigID)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(this.triggerInstId)
+ .append(this.flowConfigID)
+ .toHashCode();
+ }
+ }
+ }
+
+ public static class FlowConfigID {
+
+ private final int projectId;
+ private final int projectVerison;
+ private final String flowId;
+ private final int flowVersion;
+
+ public FlowConfigID(final int projectId, final int projectVerison, final String flowId,
+ final int flowVersion) {
+ this.projectId = projectId;
+ this.projectVerison = projectVerison;
+ this.flowId = flowId;
+ this.flowVersion = flowVersion;
+ }
+
+ public int getProjectId() {
+ return this.projectId;
+ }
+
+ public int getProjectVersion() {
+ return this.projectVerison;
+ }
+
+ public String getFlowId() {
+ return this.flowId;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final FlowConfigID that = (FlowConfigID) o;
+
+ return new EqualsBuilder()
+ .append(this.projectId, that.projectId)
+ .append(this.projectVerison, that.projectVerison)
+ .append(this.flowVersion, that.flowVersion)
+ .append(this.flowId, that.flowId)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(this.projectId)
+ .append(this.projectVerison)
+ .append(this.flowId)
+ .append(this.flowVersion)
+ .toHashCode();
+ }
+
+ public int getFlowVersion() {
+ return this.flowVersion;
+ }
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyException.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyException.java
new file mode 100644
index 0000000..002e440
--- /dev/null
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyException.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+import azkaban.spi.AzkabanException;
+
+public class DependencyException extends AzkabanException {
+
+ private static final long serialVersionUID = 1L;
+
+ public DependencyException(final String message) {
+ super(message);
+ }
+
+ public DependencyException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstance.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstance.java
index d03968c..e175d21 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstance.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/DependencyInstance.java
@@ -40,6 +40,18 @@ public class DependencyInstance {
this.cause = cause;
}
+ @Override
+ public String toString() {
+ return "DependencyInstance{" +
+ "startTime=" + this.startTime.getTime() +
+ ", depName='" + this.depName + '\'' +
+ ", context=" + this.context +
+ ", endTime=" + (this.endTime == null ? null : this.endTime.getTime()) +
+ ", status=" + this.status +
+ ", cause=" + this.cause +
+ '}';
+ }
+
public CancellationCause getCancellationCause() {
return this.cause;
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
index 592aa55..d63c7c1 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/TriggerInstance.java
@@ -54,6 +54,20 @@ public class TriggerInstance {
}
}
+ @Override
+ public String toString() {
+ return "TriggerInstance{" +
+ "depInstances=" + depInstances +
+ ", id='" + id + '\'' +
+ ", submitUser='" + submitUser + '\'' +
+ ", project=" + project +
+ ", flowId='" + flowId + '\'' +
+ ", flowVersion=" + flowVersion +
+ ", flowTrigger=" + flowTrigger +
+ ", flowExecId=" + flowExecId +
+ '}';
+ }
+
public Project getProject() {
return this.project;
}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
new file mode 100644
index 0000000..71045b0
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -0,0 +1,340 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.flowtrigger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.Constants;
+import azkaban.db.DatabaseOperator;
+import azkaban.flowtrigger.db.FlowTriggerInstanceLoader;
+import azkaban.flowtrigger.db.JdbcFlowTriggerInstanceLoaderImpl;
+import azkaban.project.DirectoryYamlFlowLoader;
+import azkaban.project.FlowLoaderUtils;
+import azkaban.project.FlowTrigger;
+import azkaban.project.FlowTriggerDependency;
+import azkaban.project.JdbcProjectImpl;
+import azkaban.project.JdbcProjectImplTest;
+import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
+import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
+import java.io.File;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FlowTriggerInstanceLoaderTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlowTriggerInstanceLoaderTest.class);
+ private static final String test_project_zip_dir = "flowtriggeryamltest";
+ private static final String test_flow_file = "flow_trigger.flow";
+ private static final int project_id = 123;
+ private static final String project_name = "test";
+ private static final int project_version = 3;
+ private static final String flow_id = "flow_trigger";
+ private static final int flow_version = 1;
+ private static final Props props = new Props();
+ private static final String submitUser = "uploadUser1";
+ private static DatabaseOperator dbOperator;
+ private static ProjectLoader projLoader;
+ private static FlowTrigger flowTrigger;
+ private static FlowTriggerInstanceLoader triggerInstLoader;
+ private static Project project;
+
+ @AfterClass
+ public static void destroyDB() {
+ try {
+ dbOperator.update("SHUTDOWN");
+ dbOperator.update("DROP ALL OBJECTS");
+ dbOperator.update("SHUTDOWN");
+ } catch (final SQLException e) {
+ logger.error("unable to destroy db", e);
+ }
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ dbOperator = Utils.initTestDB();
+ projLoader = new JdbcProjectImpl(props, dbOperator);
+ triggerInstLoader = new JdbcFlowTriggerInstanceLoaderImpl(dbOperator, projLoader);
+ project = new Project(project_id, project_name);
+ final DirectoryYamlFlowLoader yamlFlowLoader = new DirectoryYamlFlowLoader(new Props());
+ yamlFlowLoader
+ .loadProjectFlow(project, ExecutionsTestUtil.getFlowDir(test_project_zip_dir));
+ project.setVersion(project_version);
+ project.setFlows(yamlFlowLoader.getFlowMap());
+ project.setLastModifiedUser(submitUser);
+
+ final File flowFile = new File(JdbcProjectImplTest.class.getClassLoader().getResource
+ (test_flow_file).getFile());
+
+ projLoader
+ .uploadFlowFile(project_id, project_version, flowFile, flow_version);
+ flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
+ }
+
+
+ private TriggerInstance createTriggerInstance(final FlowTrigger flowTrigger, final String flowId,
+ final int flowVersion, final String submitUser, final Project project, final long startTime) {
+ final String triggerInstId = UUID.randomUUID().toString();
+ final List<DependencyInstance> depInstList = new ArrayList<>();
+ for (final FlowTriggerDependency dep : flowTrigger.getDependencies()) {
+ final String depName = dep.getName();
+ final Date startDate = new Date(startTime);
+ final DependencyInstanceContext context = new TestDependencyInstanceContext(null, null, null);
+ final Status status = Status.RUNNING;
+ final CancellationCause cause = CancellationCause.NONE;
+ final Date endTime = null;
+ final DependencyInstance depInst = new DependencyInstance(depName, startDate, endTime,
+ context, status, cause);
+ depInstList.add(depInst);
+ }
+
+ final int flowExecId = Constants.UNASSIGNED_EXEC_ID;
+ final TriggerInstance triggerInstance = new TriggerInstance(triggerInstId, flowTrigger,
+ flowId, flowVersion, submitUser, depInstList, flowExecId, project);
+
+ return triggerInstance;
+ }
+
+
+ @Test
+ public void testUploadTriggerInstance() {
+ final TriggerInstance expectedTriggerInst = this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis());
+
+ this.triggerInstLoader.uploadTriggerInstance(expectedTriggerInst);
+
+ final TriggerInstance actualTriggerInst = this.triggerInstLoader
+ .getTriggerInstanceById(expectedTriggerInst.getId());
+
+ assertThat(expectedTriggerInst.getFlowTrigger().toString())
+ .isEqualToIgnoringWhitespace(actualTriggerInst.getFlowTrigger().toString());
+
+ assertThat(expectedTriggerInst).isEqualToIgnoringGivenFields(actualTriggerInst,
+ "depInstances", "flowTrigger");
+
+ assertThat(expectedTriggerInst.getDepInstances())
+ .usingElementComparatorIgnoringFields("triggerInstance", "context")
+ .containsAll(actualTriggerInst.getDepInstances())
+ .hasSameSizeAs(actualTriggerInst.getDepInstances());
+ }
+
+ private void assertTriggerInstancesEqual(final TriggerInstance actual,
+ final TriggerInstance expected, final boolean ignoreFlowTrigger) {
+ if (!ignoreFlowTrigger) {
+ if (actual.getFlowTrigger() != null && expected.getFlowTrigger() != null) {
+ assertThat(actual.getFlowTrigger().toString())
+ .isEqualToIgnoringWhitespace(expected.getFlowTrigger().toString());
+ } else {
+ assertThat(actual.getFlowTrigger()).isNull();
+ assertThat(expected.getFlowTrigger()).isNull();
+ }
+ }
+
+ assertThat(actual).isEqualToIgnoringGivenFields(expected, "depInstances", "flowTrigger");
+
+ assertThat(actual.getDepInstances())
+ .usingComparatorForElementFieldsWithType((d1, d2) -> {
+ if (d1 == null && d2 == null) {
+ return 0;
+ } else if (d1 != null && d2 != null && d1.getTime() == d2.getTime()) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }, Date.class)
+ .usingElementComparatorIgnoringFields("triggerInstance", "context")
+ .containsExactlyInAnyOrder(expected.getDepInstances()
+ .toArray(new DependencyInstance[expected.getDepInstances().size()]));
+ }
+
+ @Test
+ public void testUpdateDependencyExecutionStatus() {
+ final TriggerInstance expectedTriggerInst = this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis());
+
+ this.triggerInstLoader.uploadTriggerInstance(expectedTriggerInst);
+ for (final DependencyInstance depInst : expectedTriggerInst.getDepInstances()) {
+ depInst.setStatus(Status.CANCELLED);
+ depInst.setEndTime(new Date());
+ depInst.setCancellationCause(CancellationCause.MANUAL);
+ this.triggerInstLoader.updateDependencyExecutionStatus(depInst);
+ }
+
+ final TriggerInstance actualTriggerInst = this.triggerInstLoader
+ .getTriggerInstanceById(expectedTriggerInst.getId());
+ assertTriggerInstancesEqual(actualTriggerInst, expectedTriggerInst, false);
+ }
+
+ private void finalizeTriggerInstanceWithSuccess(final TriggerInstance triggerInst, final int
+ associateFlowExecId) {
+ for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+ depInst.setStatus(Status.SUCCEEDED);
+ depInst.getTriggerInstance().setFlowExecId(associateFlowExecId);
+ }
+ }
+
+ private void finalizeTriggerInstanceWithCancelled(final TriggerInstance triggerInst) {
+ for (final DependencyInstance depInst : triggerInst.getDepInstances()) {
+ depInst.setStatus(Status.CANCELLED);
+ depInst.setCancellationCause(CancellationCause.TIMEOUT);
+ depInst.setEndTime(new Date());
+ }
+ }
+
+ @Test
+ public void testGetIncompleteTriggerInstancesReturnsEmpty() {
+ final List<TriggerInstance> all = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ all.add(this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
+ if (i <= 2) {
+ finalizeTriggerInstanceWithCancelled(all.get(i));
+ } else {
+ finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+ }
+ }
+
+ all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+
+ final List<TriggerInstance> actual = new ArrayList<>(this.triggerInstLoader
+ .getIncompleteTriggerInstances());
+ all.sort(Comparator.comparing(TriggerInstance::getId));
+ actual.sort(Comparator.comparing(TriggerInstance::getId));
+
+ assertThat(actual).isEmpty();
+ }
+
+ @Test
+ public void testGetIncompleteTriggerInstances() {
+ final List<TriggerInstance> allInstances = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ allInstances.add(this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
+ }
+
+ finalizeTriggerInstanceWithCancelled(allInstances.get(0));
+ finalizeTriggerInstanceWithSuccess(allInstances.get(1), 1000);
+ // this trigger instance should still count as incomplete one since no flow execution has
+ // been started
+ finalizeTriggerInstanceWithSuccess(allInstances.get(2), -1);
+
+ allInstances.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+
+ final List<TriggerInstance> expected = allInstances.subList(2, allInstances.size());
+ final List<TriggerInstance> actual = new ArrayList<>(this.triggerInstLoader
+ .getIncompleteTriggerInstances());
+ assertTwoTriggerInstanceListsEqual(actual, expected, false, false);
+ }
+
+ private void assertTwoTriggerInstanceListsEqual(final List<TriggerInstance> actual,
+ final List<TriggerInstance> expected, final boolean ignoreFlowTrigger,
+ final boolean keepOriginalOrder) {
+ if (!keepOriginalOrder) {
+ expected.sort(Comparator.comparing(TriggerInstance::getId));
+ actual.sort(Comparator.comparing(TriggerInstance::getId));
+ }
+
+ assertThat(actual).hasSameSizeAs(expected);
+ final Iterator<TriggerInstance> it1 = actual.iterator();
+ final Iterator<TriggerInstance> it2 = expected.iterator();
+ while (it1.hasNext() && it2.hasNext()) {
+ //8bfafb89-ac79-45a0-a049-b55038b0886b
+ assertTriggerInstancesEqual(it1.next(), it2.next(), ignoreFlowTrigger);
+ }
+ }
+
+ @Test
+ public void testUpdateAssociatedFlowExecId() {
+ final TriggerInstance expectedTriggerInst = this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis());
+ this.triggerInstLoader.uploadTriggerInstance(expectedTriggerInst);
+ finalizeTriggerInstanceWithSuccess(expectedTriggerInst, 1000);
+
+ expectedTriggerInst.getDepInstances()
+ .forEach(depInst -> this.triggerInstLoader.updateDependencyExecutionStatus(depInst));
+
+ this.triggerInstLoader.updateAssociatedFlowExecId(expectedTriggerInst);
+
+ final TriggerInstance actualTriggerInst = this.triggerInstLoader
+ .getTriggerInstanceById(expectedTriggerInst.getId());
+
+ assertTriggerInstancesEqual(actualTriggerInst, expectedTriggerInst, false);
+ }
+
+ @Test
+ public void testGetRecentlyFinishedReturnsEmpty() {
+ final List<TriggerInstance> all = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ all.add(this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()));
+ }
+
+ all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+
+ final Collection<TriggerInstance> recentlyFinished = this.triggerInstLoader
+ .getRecentlyFinished(10);
+ assertThat(recentlyFinished).isEmpty();
+ }
+
+ @Test
+ public void testGetRecentlyFinished() {
+
+ final List<TriggerInstance> all = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ all.add(this.createTriggerInstance(this.flowTrigger, this
+ .flow_id, this.flow_version, this.submitUser, this.project, System.currentTimeMillis()
+ + i * 10000));
+ if (i <= 3) {
+ finalizeTriggerInstanceWithCancelled(all.get(i));
+ } else if (i <= 6) {
+ finalizeTriggerInstanceWithSuccess(all.get(i), 1000);
+ }
+ }
+
+ all.forEach(triggerInst -> this.triggerInstLoader.uploadTriggerInstance(triggerInst));
+
+ final List<TriggerInstance> expected = all.subList(0, 7);
+ expected.sort(Comparator.comparing(TriggerInstance::getStartTime));
+
+ final Collection<TriggerInstance> recentlyFinished = this.triggerInstLoader
+ .getRecentlyFinished(10);
+ assertTwoTriggerInstanceListsEqual(new ArrayList<>(recentlyFinished), expected, true, true);
+ }
+
+ @After
+ public void cleanDB() {
+ try {
+ dbOperator.update("TRUNCATE TABLE execution_dependencies");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/TestDependencyInstanceContext.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TestDependencyInstanceContext.java
new file mode 100644
index 0000000..9ca54a5
--- /dev/null
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/TestDependencyInstanceContext.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.flowtrigger;
+
+public class TestDependencyInstanceContext implements DependencyInstanceContext {
+
+ private final DependencyInstanceCallback callback;
+
+ public TestDependencyInstanceContext(final DependencyInstanceConfig config,
+ final DependencyInstanceRuntimeProps runtimeProps,
+ final DependencyInstanceCallback callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void cancel() {
+ this.callback.onCancel(this);
+ }
+}
diff --git a/azkaban-web-server/src/test/resources/flow_trigger.flow b/azkaban-web-server/src/test/resources/flow_trigger.flow
new file mode 100644
index 0000000..4157128
--- /dev/null
+++ b/azkaban-web-server/src/test/resources/flow_trigger.flow
@@ -0,0 +1,59 @@
+---
+# Flow trigger
+trigger:
+ maxWaitMins: 1
+ schedule:
+ type: cron
+ value: 0/5 * * * * ?
+
+ triggerDependencies:
+ - name: search-impression # an unique name to identify the dependency
+ type: dali
+ params:
+ view: search_mp_versioned.search_impression_event_0_0_47
+ delay: 1
+ window: 1
+ unit: daily
+ filter: is_guest=0
+
+ - name: other-name
+ type: dali
+ params:
+ view: another dataset
+ delay: 1
+ window: 7
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+ failure.emails: chren@linkedin.com
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: sleep 10