azkaban-aplcache

Enable project uploading from restli endpoint schedule flow

7/10/2018 2:55:17 PM
3.50.0

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
index 9a3d7b0..59ac61a 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
@@ -66,7 +66,6 @@ public class FlowTriggerScheduler {
    */
   public void scheduleAll(final Project project, final String submitUser)
       throws SchedulerException, ProjectManagerException {
-    //todo chengren311: schedule on uploading via CRT
 
     for (final Flow flow : project.getFlows()) {
       //todo chengren311: we should validate embedded flow shouldn't have flow trigger defined.
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index daca46f..c52b9b3 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -1730,6 +1730,8 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
         //unscheduleall/scheduleall should only work with flow which has defined flow trigger
         //unschedule all flows within the old project
         if (this.enableQuartz) {
+          //todo chengren311: should maintain atomicity,
+          // e.g, if uploadProject fails, associated schedule shouldn't be added.
           this.scheduler.unscheduleAll(project);
         }
         final Map<String, ValidationReport> reports =
diff --git a/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java b/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java
index b2d2575..f890641 100644
--- a/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java
+++ b/azkaban-web-server/src/restli/java/azkaban/restli/ProjectManagerResource.java
@@ -15,6 +15,8 @@
  */
 package azkaban.restli;
 
+import azkaban.Constants.ConfigurationKeys;
+import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 import azkaban.project.ProjectManagerException;
@@ -39,6 +41,7 @@ import java.util.Map;
 import javax.servlet.ServletException;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
+import org.quartz.SchedulerException;
 
 @RestLiActions(name = "project", namespace = "azkaban.restli")
 public class ProjectManagerResource extends ResourceContextHolder {
@@ -55,13 +58,18 @@ public class ProjectManagerResource extends ResourceContextHolder {
       @ActionParam("projectName") final String projectName,
       @ActionParam("packageUrl") final String packageUrl)
       throws ProjectManagerException, RestLiServiceException, UserManagerException,
-      ServletException, IOException {
+      ServletException, IOException, SchedulerException {
     logger.info("Deploy called. {projectName: " + projectName + ", packageUrl:" + packageUrl + "}");
 
     final String ip = ResourceUtils.getRealClientIpAddr(this.getContext());
     final User user = ResourceUtils.getUserFromSessionId(sessionId);
     final ProjectManager projectManager = getAzkaban().getProjectManager();
     final Project project = projectManager.getProject(projectName);
+
+    final FlowTriggerScheduler scheduler = getAzkaban().getScheduler();
+    final boolean enableQuartz = getAzkaban().getServerProps().getBoolean(ConfigurationKeys
+        .ENABLE_QUARTZ, false);
+
     logger.info("Deploy: reference of project " + projectName + " is " + System.identityHashCode
         (project));
     if (project == null) {
@@ -111,16 +119,26 @@ public class ProjectManagerResource extends ResourceContextHolder {
     }
 
     try {
+      if (enableQuartz) {
+        //todo chengren311: should maintain atomicity,
+        // e.g, if uploadProject fails, associated schedule shouldn't be added.
+        scheduler.unscheduleAll(project);
+      }
       // Check if project upload runs into any errors, such as the file
       // having blacklisted jars
       final Props props = new Props();
       final Map<String, ValidationReport> reports = projectManager
           .uploadProject(project, archiveFile, "zip", user, props);
+
+      if (enableQuartz) {
+        scheduler.scheduleAll(project, user.getUserId());
+      }
+
       checkReports(reports);
       logger.info("Deploy: project " + projectName + " version is " + project.getVersion()
           + ", reference is " + System.identityHashCode(project));
       return Integer.toString(project.getVersion());
-    } catch (final ProjectManagerException e) {
+    } catch (final ProjectManagerException | SchedulerException e) {
       final String errorMsg = "Upload of project " + project + " from " + archiveFile + " failed";
       logger.error(errorMsg, e);
       throw e;