diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 150274c..342e61e 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -19,7 +19,6 @@ package azkaban.flowtrigger.database;
import azkaban.Constants;
import azkaban.db.DatabaseOperator;
import azkaban.db.SQLTransaction;
-import azkaban.flow.FlowUtils;
import azkaban.flowtrigger.CancellationCause;
import azkaban.flowtrigger.DependencyException;
import azkaban.flowtrigger.DependencyInstance;
@@ -29,6 +28,7 @@ import azkaban.project.FlowLoaderUtils;
import azkaban.project.FlowTrigger;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManager;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
@@ -60,7 +60,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
private static final String[] DEPENDENCY_EXECUTIONS_COLUMNS = {"trigger_instance_id", "dep_name",
"starttime", "endtime", "dep_status", "cancelleation_cause", "project_id", "project_version",
- "flow_id", "flow_version", "project_json", "flow_exec_id"};
+ "flow_id", "flow_version", "flow_exec_id"};
private static final String DEPENDENCY_EXECUTION_TABLE = "execution_dependencies";
@@ -103,7 +103,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
private static final String SELECT_RECENTLY_FINISHED = String.format(
"SELECT execution_dependencies.trigger_instance_id,dep_name,starttime,endtime,dep_status,"
+ "cancelleation_cause,project_id,"
- + "project_version,flow_id,flow_version,project_json, flow_exec_id \n"
+ + "project_version,flow_id,flow_version, flow_exec_id \n"
+ "FROM execution_dependencies JOIN (\n"
+ "SELECT distinct(trigger_instance_id), max(endtime) FROM execution_dependencies "
+ "WHERE dep_status = %s or dep_status = %s\n"
@@ -120,12 +120,14 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
private final DatabaseOperator dbOperator;
private final ProjectLoader projectLoader;
+ private final ProjectManager projectManager;
@Inject
public JdbcFlowTriggerInstanceLoaderImpl(final DatabaseOperator databaseOperator,
- final ProjectLoader projectLoader) {
+ final ProjectLoader projectLoader, final ProjectManager projectManager) {
this.dbOperator = databaseOperator;
this.projectLoader = projectLoader;
+ this.projectManager = projectManager;
}
@Override
@@ -229,7 +231,6 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
triggerInst.getProject().getVersion(),
triggerInst.getFlowId(),
triggerInst.getFlowVersion(),
- FlowUtils.toJson(triggerInst.getProject()),
triggerInst.getFlowExecId());
}
return null;
@@ -318,8 +319,69 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
return triggerInstance;
}
+ public static class FlowConfigID {
+
+ private final int projectId;
+ private final int projectVerison;
+ private final String flowId;
+ private final int flowVersion;
+
+ public FlowConfigID(final int projectId, final int projectVerison, final String flowId,
+ final int flowVersion) {
+ this.projectId = projectId;
+ this.projectVerison = projectVerison;
+ this.flowId = flowId;
+ this.flowVersion = flowVersion;
+ }
+
+ public int getProjectId() {
+ return this.projectId;
+ }
+
+ public int getProjectVersion() {
+ return this.projectVerison;
+ }
+
+ public String getFlowId() {
+ return this.flowId;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final FlowConfigID that = (FlowConfigID) o;
+
+ return new EqualsBuilder()
+ .append(this.projectId, that.projectId)
+ .append(this.projectVerison, that.projectVerison)
+ .append(this.flowVersion, that.flowVersion)
+ .append(this.flowId, that.flowId)
+ .isEquals();
+ }
- private static class TriggerInstanceHandler implements
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(this.projectId)
+ .append(this.projectVerison)
+ .append(this.flowId)
+ .append(this.flowVersion)
+ .toHashCode();
+ }
+
+ public int getFlowVersion() {
+ return this.flowVersion;
+ }
+ }
+
+ private class TriggerInstanceHandler implements
ResultSetHandler<Collection<TriggerInstance>> {
public TriggerInstanceHandler() {
@@ -341,9 +403,9 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
final int projVersion = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[7]);
final String flowId = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[8]);
final int flowVersion = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[9]);
- final Project project = FlowUtils
- .toProject(rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[10]));
- final int flowExecId = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[11]);
+ final Project project = JdbcFlowTriggerInstanceLoaderImpl.this.projectManager
+ .getProject(projId);
+ final int flowExecId = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[10]);
final TriggerInstKey key = new TriggerInstKey(triggerInstId, project.getLastModifiedUser(),
projId, projVersion, flowId, flowVersion, flowExecId, project);
@@ -380,7 +442,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
return res;
}
- private static class TriggerInstKey {
+ private class TriggerInstKey {
String triggerInstId;
FlowConfigID flowConfigID;
@@ -425,66 +487,4 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
}
}
}
-
- public static class FlowConfigID {
-
- private final int projectId;
- private final int projectVerison;
- private final String flowId;
- private final int flowVersion;
-
- public FlowConfigID(final int projectId, final int projectVerison, final String flowId,
- final int flowVersion) {
- this.projectId = projectId;
- this.projectVerison = projectVerison;
- this.flowId = flowId;
- this.flowVersion = flowVersion;
- }
-
- public int getProjectId() {
- return this.projectId;
- }
-
- public int getProjectVersion() {
- return this.projectVerison;
- }
-
- public String getFlowId() {
- return this.flowId;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- final FlowConfigID that = (FlowConfigID) o;
-
- return new EqualsBuilder()
- .append(this.projectId, that.projectId)
- .append(this.projectVerison, that.projectVerison)
- .append(this.flowVersion, that.flowVersion)
- .append(this.flowId, that.flowId)
- .isEquals();
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder(17, 37)
- .append(this.projectId)
- .append(this.projectVerison)
- .append(this.flowId)
- .append(this.flowVersion)
- .toHashCode();
- }
-
- public int getFlowVersion() {
- return this.flowVersion;
- }
- }
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
index 04fae28..09ea6b8 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
@@ -19,11 +19,11 @@ package azkaban.flowtrigger.quartz;
import static java.util.Objects.requireNonNull;
import azkaban.flow.Flow;
-import azkaban.flow.FlowUtils;
import azkaban.project.FlowLoaderUtils;
import azkaban.project.FlowTrigger;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
import azkaban.scheduler.QuartzJobDescription;
import azkaban.scheduler.QuartzScheduler;
@@ -51,11 +51,14 @@ public class FlowTriggerScheduler {
private static final Logger logger = LoggerFactory.getLogger(FlowTriggerScheduler.class);
private final ProjectLoader projectLoader;
private final QuartzScheduler scheduler;
+ private final ProjectManager projectManager;
@Inject
- public FlowTriggerScheduler(final ProjectLoader projectLoader, final QuartzScheduler scheduler) {
+ public FlowTriggerScheduler(final ProjectLoader projectLoader, final QuartzScheduler scheduler,
+ final ProjectManager projectManager) {
this.projectLoader = requireNonNull(projectLoader);
this.scheduler = requireNonNull(scheduler);
+ this.projectManager = requireNonNull(projectManager);
}
/**
@@ -86,13 +89,12 @@ public class FlowTriggerScheduler {
final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
if (flowTrigger != null) {
- final String projectJson = FlowUtils.toJson(project);
final Map<String, Object> contextMap = ImmutableMap
.of(FlowTriggerQuartzJob.SUBMIT_USER, submitUser,
FlowTriggerQuartzJob.FLOW_TRIGGER, flowTrigger,
FlowTriggerQuartzJob.FLOW_ID, flow.getId(),
FlowTriggerQuartzJob.FLOW_VERSION, latestFlowVersion,
- FlowTriggerQuartzJob.PROJECT, projectJson);
+ FlowTriggerQuartzJob.PROJECT_ID, project.getId());
logger.info("scheduling flow " + flow.getProjectId() + "." + flow.getId());
this.scheduler
.registerJob(flowTrigger.getSchedule().getCronExpression(), new QuartzJobDescription
@@ -124,15 +126,13 @@ public class FlowTriggerScheduler {
final JobDataMap jobDataMap = job.getJobDataMap();
final String flowId = jobDataMap.getString(FlowTriggerQuartzJob.FLOW_ID);
-
- final String projectJson = jobDataMap.getString(FlowTriggerQuartzJob.PROJECT);
- final Project project = FlowUtils.toProject(projectJson);
+ final int projectId = jobDataMap.getInt(FlowTriggerQuartzJob.PROJECT_ID);
final FlowTrigger flowTrigger = (FlowTrigger) jobDataMap
.get(FlowTriggerQuartzJob.FLOW_TRIGGER);
final String submitUser = jobDataMap.getString(FlowTriggerQuartzJob.SUBMIT_USER);
final List<? extends Trigger> quartzTriggers = quartzScheduler.getTriggersOfJob(jobKey);
scheduledFlowTrigger = new ScheduledFlowTrigger(
- project.getName(),
+ this.projectManager.getProject(projectId).getName(),
flowId, flowTrigger, submitUser, quartzTriggers.isEmpty() ? null
: quartzTriggers.get(0));
} catch (final Exception ex) {
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index 7e8e205..35ca072 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -16,6 +16,8 @@
package azkaban.flowtrigger;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import azkaban.Constants;
import azkaban.db.DatabaseOperator;
@@ -29,6 +31,7 @@ import azkaban.project.JdbcProjectImpl;
import azkaban.project.JdbcProjectImplTest;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManager;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
@@ -67,6 +70,7 @@ public class FlowTriggerInstanceLoaderTest {
private static FlowTrigger flowTrigger;
private static FlowTriggerInstanceLoader triggerInstLoader;
private static Project project;
+ private static ProjectManager projManager;
@AfterClass
public static void destroyDB() {
@@ -83,8 +87,10 @@ public class FlowTriggerInstanceLoaderTest {
public static void setup() throws Exception {
dbOperator = Utils.initTestDB();
projLoader = new JdbcProjectImpl(props, dbOperator);
- triggerInstLoader = new JdbcFlowTriggerInstanceLoaderImpl(dbOperator, projLoader);
+ projManager = mock(ProjectManager.class);
+ triggerInstLoader = new JdbcFlowTriggerInstanceLoaderImpl(dbOperator, projLoader, projManager);
project = new Project(project_id, project_name);
+
final DirectoryYamlFlowLoader yamlFlowLoader = new DirectoryYamlFlowLoader(new Props());
yamlFlowLoader
.loadProjectFlow(project, ExecutionsTestUtil.getFlowDir(test_project_zip_dir));
@@ -95,6 +101,8 @@ public class FlowTriggerInstanceLoaderTest {
final File flowFile = new File(JdbcProjectImplTest.class.getClassLoader().getResource
(test_flow_file).getFile());
+ when(projManager.getProject(project_id)).thenReturn(project);
+
projLoader
.uploadFlowFile(project_id, project_version, flowFile, flow_version);
flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);