azkaban-aplcache

allow project admin to pause/resume flow trigger schedule

5/4/2018 12:58:07 AM

Details

diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index 05c7b8d..9277c3a 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -267,22 +267,22 @@ public class NodeBeanLoaderTest {
     assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
         .isInstanceOf(IllegalArgumentException.class);
 
-    loader.load(ExecutionsTestUtil.getFlowFile(
+    final NodeBean nodeBean1 = loader.load(ExecutionsTestUtil.getFlowFile(
         TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_second_level_cron_expression1.flow"));
 
-    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean1.getTrigger()))
         .isInstanceOf(IllegalArgumentException.class);
 
-    loader.load(ExecutionsTestUtil.getFlowFile(
+    final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
         TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_second_level_cron_expression2.flow"));
 
-    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
         .isInstanceOf(IllegalArgumentException.class);
 
-    final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
+    final NodeBean nodeBean3 = loader.load(ExecutionsTestUtil.getFlowFile(
         TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_schedule.flow"));
 
-    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean3.getTrigger()))
         .isInstanceOf(NullPointerException.class);
   }
 
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
index afbfd99..8ae50cb 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
@@ -109,6 +109,18 @@ public class FlowTriggerScheduler {
     }
   }
 
+  public void pauseFlowTrigger(final int projectId, final String flowId) throws SchedulerException {
+    logger.info(String.format("pausing flow trigger for [projectId:%s, flowId:%s]", projectId,
+        flowId));
+    this.scheduler.pauseJob(generateGroupName(projectId, flowId));
+  }
+
+  public void resumeFlowTrigger(final int projectId, final String flowId) throws
+      SchedulerException {
+    logger.info(String.format("resuming flow trigger for [projectId:%s, flowId:%s]", projectId, flowId));
+    this.scheduler.resumeJob(generateGroupName(projectId, flowId));
+  }
+
   /**
    * Retrieve the list of scheduled flow triggers from quartz database
    */
@@ -131,10 +143,11 @@ public class FlowTriggerScheduler {
               .get(FlowTriggerQuartzJob.FLOW_TRIGGER);
           final String submitUser = jobDataMap.getString(FlowTriggerQuartzJob.SUBMIT_USER);
           final List<? extends Trigger> quartzTriggers = quartzScheduler.getTriggersOfJob(jobKey);
+          final boolean isPaused = this.scheduler.isJobPaused(groupName);
           scheduledFlowTrigger = new ScheduledFlowTrigger(projectId,
               this.projectManager.getProject(projectId).getName(),
               flowId, flowTrigger, submitUser, quartzTriggers.isEmpty() ? null
-              : quartzTriggers.get(0));
+              : quartzTriggers.get(0), isPaused);
         } catch (final Exception ex) {
           logger.error(String.format("unable to get flow trigger by job key %s", jobKey), ex);
           scheduledFlowTrigger = null;
@@ -163,7 +176,11 @@ public class FlowTriggerScheduler {
   }
 
   private String generateGroupName(final Flow flow) {
-    return String.valueOf(flow.getProjectId()) + "." + flow.getId();
+    return generateGroupName(flow.getProjectId(), flow.getId());
+  }
+
+  private String generateGroupName(final int projectId, final String flowId) {
+    return String.valueOf(projectId) + "." + flowId;
   }
 
   public void start() {
@@ -182,16 +199,22 @@ public class FlowTriggerScheduler {
     private final FlowTrigger flowTrigger;
     private final Trigger quartzTrigger;
     private final String submitUser;
+    private final boolean isPaused;
 
     public ScheduledFlowTrigger(final int projectId, final String projectName, final String flowId,
         final FlowTrigger flowTrigger, final String submitUser,
-        final Trigger quartzTrigger) {
+        final Trigger quartzTrigger, final boolean isPaused) {
       this.projectId = projectId;
       this.projectName = projectName;
       this.flowId = flowId;
       this.flowTrigger = flowTrigger;
       this.submitUser = submitUser;
       this.quartzTrigger = quartzTrigger;
+      this.isPaused = isPaused;
+    }
+
+    public boolean isPaused() {
+      return isPaused;
     }
 
     public int getProjectId() {
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
index 9115ab1..4b4de62 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
@@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
 
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.utils.Props;
+import java.util.List;
 import java.util.Set;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -32,6 +33,7 @@ import org.quartz.JobKey;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
+import org.quartz.Trigger.TriggerState;
 import org.quartz.TriggerBuilder;
 import org.quartz.impl.StdSchedulerFactory;
 import org.quartz.impl.matchers.GroupMatcher;
@@ -112,12 +114,49 @@ public class QuartzScheduler {
   }
 
   /**
+   * pause a job given the groupname. since pausing request might be issued concurrently,
+   * so synchronized is added to ensure thread safety.
+   */
+  public synchronized void pauseJob(final String groupName) throws SchedulerException {
+    if (!ifJobExist(groupName)) {
+      throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
+    } else {
+      this.scheduler.pauseJob(new JobKey(DEFAULT_JOB_NAME, groupName));
+    }
+  }
+
+  public synchronized boolean isJobPaused(final String groupName) throws SchedulerException {
+    final JobKey jobKey = new JobKey(DEFAULT_JOB_NAME, groupName);
+    final JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
+    final List<? extends Trigger> triggers = this.scheduler.getTriggersOfJob(jobDetail.getKey());
+    for (final Trigger trigger : triggers) {
+      final TriggerState triggerState = this.scheduler.getTriggerState(trigger.getKey());
+      if (TriggerState.PAUSED.equals(triggerState)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * resume a paused job given the groupname. since resuming request might be issued concurrently,
+   * so synchronized is added to ensure thread safety.
+   */
+  public synchronized void resumeJob(final String groupName) throws SchedulerException {
+    if (!ifJobExist(groupName)) {
+      throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
+    } else {
+      this.scheduler.resumeJob(new JobKey(DEFAULT_JOB_NAME, groupName));
+    }
+  }
+
+  /**
    * Unregister a job given the groupname. Since unregister might be called when
    * concurrently removing projects, so synchronized is added to ensure thread safety.
    */
   public synchronized void unregisterJob(final String groupName) throws SchedulerException {
     if (!ifJobExist(groupName)) {
-      logger.warn("can not find job with " + groupName + " in quartz.");
+      throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
     } else {
       this.scheduler.deleteJob(new JobKey(DEFAULT_JOB_NAME, groupName));
     }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
index ceaacf1..22adff9 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
@@ -118,13 +118,11 @@ public class FlowTriggerInstanceServlet extends LoginAbstractAzkabanServlet {
         final Project project = this.projectManager.getProject(projectName);
         if (project == null) {
           ret.put("error", "please specify a valid project name");
-          return;
         }
-        if (!hasPermission(project, session.getUser(), Type.READ)) {
+        else if (!hasPermission(project, session.getUser(), Type.READ)) {
           ret.put("error", "Permission denied. Need READ access.");
-          return;
         }
-        ajaxFetchTriggerInstances(project.getId(), flowId, ret, req);
+        else ajaxFetchTriggerInstances(project.getId(), flowId, ret, req);
       } else {
         ret.put("error", "please specify project id and flow id");
       }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
index 7ea7bc6..fcfbeff 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
@@ -18,7 +18,10 @@ package azkaban.webapp.servlet;
 
 import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
 import azkaban.flowtrigger.quartz.FlowTriggerScheduler.ScheduledFlowTrigger;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
 import azkaban.server.session.Session;
+import azkaban.user.Permission.Type;
 import azkaban.webapp.AzkabanWebServer;
 import java.io.IOException;
 import java.util.HashMap;
@@ -27,17 +30,20 @@ import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import org.quartz.SchedulerException;
 
 public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
 
   private static final long serialVersionUID = 1L;
   private FlowTriggerScheduler scheduler;
+  private ProjectManager projectManager;
 
   @Override
   public void init(final ServletConfig config) throws ServletException {
     super.init(config);
     final AzkabanWebServer server = (AzkabanWebServer) getApplication();
     this.scheduler = server.getScheduler();
+    this.projectManager = server.getProjectManager();
   }
 
   @Override
@@ -75,21 +81,50 @@ public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
+  private boolean checkProjectIdAndFlowId(HttpServletRequest req) {
+    return hasParam(req, "projectId") && hasParam(req, "flowId");
+  }
+
   private void handleAJAXAction(final HttpServletRequest req,
       final HttpServletResponse resp, final Session session) throws ServletException,
       IOException {
     final HashMap<String, Object> ret = new HashMap<>();
     final String ajaxName = getParam(req, "ajax");
     if (ajaxName.equals("fetchTrigger")) {
-      if (hasParam(req, "projectId") && hasParam(req, "flowId")) {
+      if (checkProjectIdAndFlowId(req)) {
         final int projectId = getIntParam(req, "projectId");
         final String flowId = getParam(req, "flowId");
         ajaxFetchTrigger(projectId, flowId, session, ret);
-        if (ret != null) {
-          this.writeJSON(resp, ret);
+      }
+    } else if (ajaxName.equals("pauseTrigger") || ajaxName.equals("resumeTrigger")) {
+      if (checkProjectIdAndFlowId(req)) {
+        final int projectId = getIntParam(req, "projectId");
+        final String flowId = getParam(req, "flowId");
+        final Project project = this.projectManager.getProject(projectId);
+
+        if (project == null) {
+          ret.put("error", "please specify a valid project id");
+        }
+        else if (!hasPermission(project, session.getUser(), Type.ADMIN)) {
+          ret.put("error", "Permission denied. Need ADMIN access.");
+        }
+        else {
+          try {
+            if(ajaxName.equals("pauseTrigger")) {
+              this.scheduler.pauseFlowTrigger(projectId, flowId);
+            } else {
+              this.scheduler.resumeFlowTrigger(projectId, flowId);
+            }
+            ret.put("status", "success");
+          } catch (SchedulerException ex) {
+            ret.put("error", ex.getMessage());
+          }
         }
       }
     }
+    if (ret != null) {
+      this.writeJSON(resp, ret);
+    }
   }
 
   private void handlePage(final HttpServletRequest req,
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
index ec8c033..608e958 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
@@ -21,6 +21,7 @@
   #parse("azkaban/webapp/servlet/velocity/style.vm")
   #parse("azkaban/webapp/servlet/velocity/javascript.vm")
   <script type="text/javascript" src="${context}/js/azkaban/util/ajax.js"></script>
+  <script type="text/javascript" src="${context}/js/azkaban/view/executions.js"></script>
   <script type="text/javascript" src="${context}/js/jquery/jquery.tablesorter.js"></script>
   <script type="text/javascript">
     var contextURL = "${context}";
@@ -34,6 +35,36 @@
       jobTable.tablesorter();
     });
   </script>
+
+  <script type="text/javascript">
+    function pauseTrigger(projectId, flowId) {
+      var requestURL = document.location.href;
+      var requestData = {"projectId": projectId, "flowId" : flowId, "ajax": "pauseTrigger"};
+      var successHandler = function (data) {
+        if (data.error) {
+          showDialog("Error", data.error);
+        }
+        else {
+          window.location = requestURL;
+        }
+      };
+      ajaxCall(requestURL, requestData, successHandler);
+    }
+
+    function resumeTrigger(projectId, flowId) {
+      var requestURL = document.location.href;
+      var requestData = {"projectId": projectId, "flowId" : flowId, "ajax": "resumeTrigger"};
+      var successHandler = function (data) {
+        if (data.error) {
+          showDialog("Error", data.error);
+        }
+        else {
+          window.location = requestURL;
+        }
+      };
+      ajaxCall(requestURL, requestData, successHandler);
+    }
+  </script>
 </head>
 <body>
 
@@ -73,6 +104,8 @@
           <th>Schedule</th>
           <th>Max Wait Mins</th>
           <th>Dependencies</th>
+          <th>Is Paused</th>
+          <th>Pause/Resume</th>
         </tr>
         </thead>
         <tbody>
@@ -130,6 +163,21 @@
                 </div>
               </div>
 
+              <td>${trigger.isPaused()}</td>
+
+              ##todo chengren311: add pause/resume button in flow page.
+              <td>
+                #if (${trigger.isPaused()} == "false")
+                  <button type="button" id="pausebtn" class="btn btn-danger btn-sm"
+                          onclick="pauseTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">Pause
+                  </button>
+                #else
+                  <button type="button" id="resumebtn" class="btn btn-info btn-sm"
+                          onclick="resumeTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">Resume
+                  </button>
+                #end
+              </td>
+
             </tr>
             #end
           #else