azkaban-aplcache

move project json out of execution_dependencies table (#1704) The

3/22/2018 7:42:01 PM

Details

diff --git a/azkaban-db/src/main/sql/create.execution_dependencies.sql b/azkaban-db/src/main/sql/create.execution_dependencies.sql
index 2535eb7..adc966b 100644
--- a/azkaban-db/src/main/sql/create.execution_dependencies.sql
+++ b/azkaban-db/src/main/sql/create.execution_dependencies.sql
@@ -10,7 +10,6 @@ CREATE TABLE execution_dependencies(
   project_version INT not null,
   flow_id varchar(128) not null,
   flow_version INT not null,
-  project_json MEDIUMTEXT not null,
   flow_exec_id INT not null,
   primary key(trigger_instance_id, dep_name)
 );
\ No newline at end of file
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
index 150274c..342e61e 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/database/JdbcFlowTriggerInstanceLoaderImpl.java
@@ -19,7 +19,6 @@ package azkaban.flowtrigger.database;
 import azkaban.Constants;
 import azkaban.db.DatabaseOperator;
 import azkaban.db.SQLTransaction;
-import azkaban.flow.FlowUtils;
 import azkaban.flowtrigger.CancellationCause;
 import azkaban.flowtrigger.DependencyException;
 import azkaban.flowtrigger.DependencyInstance;
@@ -29,6 +28,7 @@ import azkaban.project.FlowLoaderUtils;
 import azkaban.project.FlowTrigger;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManager;
 import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
@@ -60,7 +60,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
 
   private static final String[] DEPENDENCY_EXECUTIONS_COLUMNS = {"trigger_instance_id", "dep_name",
       "starttime", "endtime", "dep_status", "cancelleation_cause", "project_id", "project_version",
-      "flow_id", "flow_version", "project_json", "flow_exec_id"};
+      "flow_id", "flow_version", "flow_exec_id"};
 
   private static final String DEPENDENCY_EXECUTION_TABLE = "execution_dependencies";
 
@@ -103,7 +103,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
   private static final String SELECT_RECENTLY_FINISHED = String.format(
       "SELECT execution_dependencies.trigger_instance_id,dep_name,starttime,endtime,dep_status,"
           + "cancelleation_cause,project_id,"
-          + "project_version,flow_id,flow_version,project_json, flow_exec_id \n"
+          + "project_version,flow_id,flow_version, flow_exec_id \n"
           + "FROM execution_dependencies JOIN (\n"
           + "SELECT distinct(trigger_instance_id), max(endtime) FROM execution_dependencies "
           + "WHERE dep_status = %s or dep_status = %s\n"
@@ -120,12 +120,14 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
 
   private final DatabaseOperator dbOperator;
   private final ProjectLoader projectLoader;
+  private final ProjectManager projectManager;
 
   @Inject
   public JdbcFlowTriggerInstanceLoaderImpl(final DatabaseOperator databaseOperator,
-      final ProjectLoader projectLoader) {
+      final ProjectLoader projectLoader, final ProjectManager projectManager) {
     this.dbOperator = databaseOperator;
     this.projectLoader = projectLoader;
+    this.projectManager = projectManager;
   }
 
   @Override
@@ -229,7 +231,6 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
                 triggerInst.getProject().getVersion(),
                 triggerInst.getFlowId(),
                 triggerInst.getFlowVersion(),
-                FlowUtils.toJson(triggerInst.getProject()),
                 triggerInst.getFlowExecId());
       }
       return null;
@@ -318,8 +319,69 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
     return triggerInstance;
   }
 
+  public static class FlowConfigID {
+
+    private final int projectId;
+    private final int projectVerison;
+    private final String flowId;
+    private final int flowVersion;
+
+    public FlowConfigID(final int projectId, final int projectVerison, final String flowId,
+        final int flowVersion) {
+      this.projectId = projectId;
+      this.projectVerison = projectVerison;
+      this.flowId = flowId;
+      this.flowVersion = flowVersion;
+    }
+
+    public int getProjectId() {
+      return this.projectId;
+    }
+
+    public int getProjectVersion() {
+      return this.projectVerison;
+    }
+
+    public String getFlowId() {
+      return this.flowId;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      final FlowConfigID that = (FlowConfigID) o;
+
+      return new EqualsBuilder()
+          .append(this.projectId, that.projectId)
+          .append(this.projectVerison, that.projectVerison)
+          .append(this.flowVersion, that.flowVersion)
+          .append(this.flowId, that.flowId)
+          .isEquals();
+    }
 
-  private static class TriggerInstanceHandler implements
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder(17, 37)
+          .append(this.projectId)
+          .append(this.projectVerison)
+          .append(this.flowId)
+          .append(this.flowVersion)
+          .toHashCode();
+    }
+
+    public int getFlowVersion() {
+      return this.flowVersion;
+    }
+  }
+
+  private class TriggerInstanceHandler implements
       ResultSetHandler<Collection<TriggerInstance>> {
 
     public TriggerInstanceHandler() {
@@ -341,9 +403,9 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
         final int projVersion = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[7]);
         final String flowId = rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[8]);
         final int flowVersion = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[9]);
-        final Project project = FlowUtils
-            .toProject(rs.getString(DEPENDENCY_EXECUTIONS_COLUMNS[10]));
-        final int flowExecId = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[11]);
+        final Project project = JdbcFlowTriggerInstanceLoaderImpl.this.projectManager
+            .getProject(projId);
+        final int flowExecId = rs.getInt(DEPENDENCY_EXECUTIONS_COLUMNS[10]);
 
         final TriggerInstKey key = new TriggerInstKey(triggerInstId, project.getLastModifiedUser(),
             projId, projVersion, flowId, flowVersion, flowExecId, project);
@@ -380,7 +442,7 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
       return res;
     }
 
-    private static class TriggerInstKey {
+    private class TriggerInstKey {
 
       String triggerInstId;
       FlowConfigID flowConfigID;
@@ -425,66 +487,4 @@ public class JdbcFlowTriggerInstanceLoaderImpl implements FlowTriggerInstanceLoa
       }
     }
   }
-
-  public static class FlowConfigID {
-
-    private final int projectId;
-    private final int projectVerison;
-    private final String flowId;
-    private final int flowVersion;
-
-    public FlowConfigID(final int projectId, final int projectVerison, final String flowId,
-        final int flowVersion) {
-      this.projectId = projectId;
-      this.projectVerison = projectVerison;
-      this.flowId = flowId;
-      this.flowVersion = flowVersion;
-    }
-
-    public int getProjectId() {
-      return this.projectId;
-    }
-
-    public int getProjectVersion() {
-      return this.projectVerison;
-    }
-
-    public String getFlowId() {
-      return this.flowId;
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-      if (this == o) {
-        return true;
-      }
-
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      final FlowConfigID that = (FlowConfigID) o;
-
-      return new EqualsBuilder()
-          .append(this.projectId, that.projectId)
-          .append(this.projectVerison, that.projectVerison)
-          .append(this.flowVersion, that.flowVersion)
-          .append(this.flowId, that.flowId)
-          .isEquals();
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder(17, 37)
-          .append(this.projectId)
-          .append(this.projectVerison)
-          .append(this.flowId)
-          .append(this.flowVersion)
-          .toHashCode();
-    }
-
-    public int getFlowVersion() {
-      return this.flowVersion;
-    }
-  }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
index e0eab9f..75ca642 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
@@ -16,10 +16,10 @@
 
 package azkaban.flowtrigger.quartz;
 
-import azkaban.flow.FlowUtils;
 import azkaban.flowtrigger.FlowTriggerService;
 import azkaban.project.FlowTrigger;
 import azkaban.project.Project;
+import azkaban.project.ProjectManager;
 import azkaban.scheduler.AbstractQuartzJob;
 import javax.inject.Inject;
 import org.quartz.JobDataMap;
@@ -29,24 +29,26 @@ import org.quartz.JobExecutionContext;
 public class FlowTriggerQuartzJob extends AbstractQuartzJob {
 
   public static final String SUBMIT_USER = "SUBMIT_USER";
-  public static final String PROJECT = "PROJECT";
+  public static final String PROJECT_ID = "PROJECT_ID";
   public static final String FLOW_TRIGGER = "FLOW_TRIGGER";
   public static final String FLOW_ID = "FLOW_ID";
   public static final String FLOW_VERSION = "FLOW_VERSION";
 
   private final FlowTriggerService triggerService;
+  private final ProjectManager projectManager;
 
   @Inject
-  public FlowTriggerQuartzJob(final FlowTriggerService service) {
+  public FlowTriggerQuartzJob(final FlowTriggerService service,
+      final ProjectManager projectManager) {
     this.triggerService = service;
+    this.projectManager = projectManager;
   }
 
   @Override
   public void execute(final JobExecutionContext context) {
     final JobDataMap data = context.getMergedJobDataMap();
-    final String projectJson = data.getString(PROJECT);
-    final Project project = FlowUtils.toProject(projectJson);
-
+    final int projectId = data.getInt(PROJECT_ID);
+    final Project project = this.projectManager.getProject(projectId);
     final String flowId = data.getString(FLOW_ID);
     final int flowVersion = data.getInt(FLOW_VERSION);
     final String submitUser = data.getString(SUBMIT_USER);
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
index 04fae28..09ea6b8 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
@@ -19,11 +19,11 @@ package azkaban.flowtrigger.quartz;
 import static java.util.Objects.requireNonNull;
 
 import azkaban.flow.Flow;
-import azkaban.flow.FlowUtils;
 import azkaban.project.FlowLoaderUtils;
 import azkaban.project.FlowTrigger;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManager;
 import azkaban.project.ProjectManagerException;
 import azkaban.scheduler.QuartzJobDescription;
 import azkaban.scheduler.QuartzScheduler;
@@ -51,11 +51,14 @@ public class FlowTriggerScheduler {
   private static final Logger logger = LoggerFactory.getLogger(FlowTriggerScheduler.class);
   private final ProjectLoader projectLoader;
   private final QuartzScheduler scheduler;
+  private final ProjectManager projectManager;
 
   @Inject
-  public FlowTriggerScheduler(final ProjectLoader projectLoader, final QuartzScheduler scheduler) {
+  public FlowTriggerScheduler(final ProjectLoader projectLoader, final QuartzScheduler scheduler,
+      final ProjectManager projectManager) {
     this.projectLoader = requireNonNull(projectLoader);
     this.scheduler = requireNonNull(scheduler);
+    this.projectManager = requireNonNull(projectManager);
   }
 
   /**
@@ -86,13 +89,12 @@ public class FlowTriggerScheduler {
           final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);
 
           if (flowTrigger != null) {
-            final String projectJson = FlowUtils.toJson(project);
             final Map<String, Object> contextMap = ImmutableMap
                 .of(FlowTriggerQuartzJob.SUBMIT_USER, submitUser,
                     FlowTriggerQuartzJob.FLOW_TRIGGER, flowTrigger,
                     FlowTriggerQuartzJob.FLOW_ID, flow.getId(),
                     FlowTriggerQuartzJob.FLOW_VERSION, latestFlowVersion,
-                    FlowTriggerQuartzJob.PROJECT, projectJson);
+                    FlowTriggerQuartzJob.PROJECT_ID, project.getId());
             logger.info("scheduling flow " + flow.getProjectId() + "." + flow.getId());
             this.scheduler
                 .registerJob(flowTrigger.getSchedule().getCronExpression(), new QuartzJobDescription
@@ -124,15 +126,13 @@ public class FlowTriggerScheduler {
           final JobDataMap jobDataMap = job.getJobDataMap();
 
           final String flowId = jobDataMap.getString(FlowTriggerQuartzJob.FLOW_ID);
-
-          final String projectJson = jobDataMap.getString(FlowTriggerQuartzJob.PROJECT);
-          final Project project = FlowUtils.toProject(projectJson);
+          final int projectId = jobDataMap.getInt(FlowTriggerQuartzJob.PROJECT_ID);
           final FlowTrigger flowTrigger = (FlowTrigger) jobDataMap
               .get(FlowTriggerQuartzJob.FLOW_TRIGGER);
           final String submitUser = jobDataMap.getString(FlowTriggerQuartzJob.SUBMIT_USER);
           final List<? extends Trigger> quartzTriggers = quartzScheduler.getTriggersOfJob(jobKey);
           scheduledFlowTrigger = new ScheduledFlowTrigger(
-              project.getName(),
+              this.projectManager.getProject(projectId).getName(),
               flowId, flowTrigger, submitUser, quartzTriggers.isEmpty() ? null
               : quartzTriggers.get(0));
         } catch (final Exception ex) {
diff --git a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
index 7e8e205..35ca072 100644
--- a/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/flowtrigger/FlowTriggerInstanceLoaderTest.java
@@ -16,6 +16,8 @@
 package azkaban.flowtrigger;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import azkaban.Constants;
 import azkaban.db.DatabaseOperator;
@@ -29,6 +31,7 @@ import azkaban.project.JdbcProjectImpl;
 import azkaban.project.JdbcProjectImplTest;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectManager;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
@@ -67,6 +70,7 @@ public class FlowTriggerInstanceLoaderTest {
   private static FlowTrigger flowTrigger;
   private static FlowTriggerInstanceLoader triggerInstLoader;
   private static Project project;
+  private static ProjectManager projManager;
 
   @AfterClass
   public static void destroyDB() {
@@ -83,8 +87,10 @@ public class FlowTriggerInstanceLoaderTest {
   public static void setup() throws Exception {
     dbOperator = Utils.initTestDB();
     projLoader = new JdbcProjectImpl(props, dbOperator);
-    triggerInstLoader = new JdbcFlowTriggerInstanceLoaderImpl(dbOperator, projLoader);
+    projManager = mock(ProjectManager.class);
+    triggerInstLoader = new JdbcFlowTriggerInstanceLoaderImpl(dbOperator, projLoader, projManager);
     project = new Project(project_id, project_name);
+
     final DirectoryYamlFlowLoader yamlFlowLoader = new DirectoryYamlFlowLoader(new Props());
     yamlFlowLoader
         .loadProjectFlow(project, ExecutionsTestUtil.getFlowDir(test_project_zip_dir));
@@ -95,6 +101,8 @@ public class FlowTriggerInstanceLoaderTest {
     final File flowFile = new File(JdbcProjectImplTest.class.getClassLoader().getResource
         (test_flow_file).getFile());
 
+    when(projManager.getProject(project_id)).thenReturn(project);
+
     projLoader
         .uploadFlowFile(project_id, project_version, flowFile, flow_version);
     flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(flowFile);