Details
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index 05c7b8d..9277c3a 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -267,22 +267,22 @@ public class NodeBeanLoaderTest {
assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
.isInstanceOf(IllegalArgumentException.class);
- loader.load(ExecutionsTestUtil.getFlowFile(
+ final NodeBean nodeBean1 = loader.load(ExecutionsTestUtil.getFlowFile(
TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_second_level_cron_expression1.flow"));
- assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+ assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean1.getTrigger()))
.isInstanceOf(IllegalArgumentException.class);
- loader.load(ExecutionsTestUtil.getFlowFile(
+ final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_second_level_cron_expression2.flow"));
- assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+ assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
.isInstanceOf(IllegalArgumentException.class);
- final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
+ final NodeBean nodeBean3 = loader.load(ExecutionsTestUtil.getFlowFile(
TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_schedule.flow"));
- assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
+ assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean3.getTrigger()))
.isInstanceOf(NullPointerException.class);
}
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
index afbfd99..8ae50cb 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
@@ -109,6 +109,18 @@ public class FlowTriggerScheduler {
}
}
+ public void pauseFlowTrigger(final int projectId, final String flowId) throws SchedulerException {
+ logger.info(String.format("pausing flow trigger for [projectId:%s, flowId:%s]", projectId,
+ flowId));
+ this.scheduler.pauseJob(generateGroupName(projectId, flowId));
+ }
+
+ public void resumeFlowTrigger(final int projectId, final String flowId) throws
+ SchedulerException {
+ logger.info(String.format("resuming flow trigger for [projectId:%s, flowId:%s]", projectId, flowId));
+ this.scheduler.resumeJob(generateGroupName(projectId, flowId));
+ }
+
/**
* Retrieve the list of scheduled flow triggers from quartz database
*/
@@ -131,10 +143,11 @@ public class FlowTriggerScheduler {
.get(FlowTriggerQuartzJob.FLOW_TRIGGER);
final String submitUser = jobDataMap.getString(FlowTriggerQuartzJob.SUBMIT_USER);
final List<? extends Trigger> quartzTriggers = quartzScheduler.getTriggersOfJob(jobKey);
+ final boolean isPaused = this.scheduler.isJobPaused(groupName);
scheduledFlowTrigger = new ScheduledFlowTrigger(projectId,
this.projectManager.getProject(projectId).getName(),
flowId, flowTrigger, submitUser, quartzTriggers.isEmpty() ? null
- : quartzTriggers.get(0));
+ : quartzTriggers.get(0), isPaused);
} catch (final Exception ex) {
logger.error(String.format("unable to get flow trigger by job key %s", jobKey), ex);
scheduledFlowTrigger = null;
@@ -163,7 +176,11 @@ public class FlowTriggerScheduler {
}
private String generateGroupName(final Flow flow) {
- return String.valueOf(flow.getProjectId()) + "." + flow.getId();
+ return generateGroupName(flow.getProjectId(), flow.getId());
+ }
+
+ private String generateGroupName(final int projectId, final String flowId) {
+ return String.valueOf(projectId) + "." + flowId;
}
public void start() {
@@ -182,16 +199,22 @@ public class FlowTriggerScheduler {
private final FlowTrigger flowTrigger;
private final Trigger quartzTrigger;
private final String submitUser;
+ private final boolean isPaused;
public ScheduledFlowTrigger(final int projectId, final String projectName, final String flowId,
final FlowTrigger flowTrigger, final String submitUser,
- final Trigger quartzTrigger) {
+ final Trigger quartzTrigger, final boolean isPaused) {
this.projectId = projectId;
this.projectName = projectName;
this.flowId = flowId;
this.flowTrigger = flowTrigger;
this.submitUser = submitUser;
this.quartzTrigger = quartzTrigger;
+ this.isPaused = isPaused;
+ }
+
+ public boolean isPaused() {
+ return isPaused;
}
public int getProjectId() {
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
index 9115ab1..4b4de62 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
@@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
import azkaban.Constants.ConfigurationKeys;
import azkaban.utils.Props;
+import java.util.List;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -32,6 +33,7 @@ import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
+import org.quartz.Trigger.TriggerState;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
@@ -112,12 +114,49 @@ public class QuartzScheduler {
}
/**
+ * pause a job given the groupname. since pausing request might be issued concurrently,
+ * so synchronized is added to ensure thread safety.
+ */
+ public synchronized void pauseJob(final String groupName) throws SchedulerException {
+ if (!ifJobExist(groupName)) {
+ throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
+ } else {
+ this.scheduler.pauseJob(new JobKey(DEFAULT_JOB_NAME, groupName));
+ }
+ }
+
+ public synchronized boolean isJobPaused(final String groupName) throws SchedulerException {
+ final JobKey jobKey = new JobKey(DEFAULT_JOB_NAME, groupName);
+ final JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
+ final List<? extends Trigger> triggers = this.scheduler.getTriggersOfJob(jobDetail.getKey());
+ for (final Trigger trigger : triggers) {
+ final TriggerState triggerState = this.scheduler.getTriggerState(trigger.getKey());
+ if (TriggerState.PAUSED.equals(triggerState)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * resume a paused job given the groupname. since resuming request might be issued concurrently,
+ * so synchronized is added to ensure thread safety.
+ */
+ public synchronized void resumeJob(final String groupName) throws SchedulerException {
+ if (!ifJobExist(groupName)) {
+ throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
+ } else {
+ this.scheduler.resumeJob(new JobKey(DEFAULT_JOB_NAME, groupName));
+ }
+ }
+
+ /**
* Unregister a job given the groupname. Since unregister might be called when
* concurrently removing projects, so synchronized is added to ensure thread safety.
*/
public synchronized void unregisterJob(final String groupName) throws SchedulerException {
if (!ifJobExist(groupName)) {
- logger.warn("can not find job with " + groupName + " in quartz.");
+ throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
} else {
this.scheduler.deleteJob(new JobKey(DEFAULT_JOB_NAME, groupName));
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
index ceaacf1..22adff9 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerInstanceServlet.java
@@ -118,13 +118,11 @@ public class FlowTriggerInstanceServlet extends LoginAbstractAzkabanServlet {
final Project project = this.projectManager.getProject(projectName);
if (project == null) {
ret.put("error", "please specify a valid project name");
- return;
}
- if (!hasPermission(project, session.getUser(), Type.READ)) {
+ else if (!hasPermission(project, session.getUser(), Type.READ)) {
ret.put("error", "Permission denied. Need READ access.");
- return;
}
- ajaxFetchTriggerInstances(project.getId(), flowId, ret, req);
+ else ajaxFetchTriggerInstances(project.getId(), flowId, ret, req);
} else {
ret.put("error", "please specify project id and flow id");
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
index 7ea7bc6..fcfbeff 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/FlowTriggerServlet.java
@@ -18,7 +18,10 @@ package azkaban.webapp.servlet;
import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
import azkaban.flowtrigger.quartz.FlowTriggerScheduler.ScheduledFlowTrigger;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
import azkaban.server.session.Session;
+import azkaban.user.Permission.Type;
import azkaban.webapp.AzkabanWebServer;
import java.io.IOException;
import java.util.HashMap;
@@ -27,17 +30,20 @@ import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.quartz.SchedulerException;
public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
private FlowTriggerScheduler scheduler;
+ private ProjectManager projectManager;
@Override
public void init(final ServletConfig config) throws ServletException {
super.init(config);
final AzkabanWebServer server = (AzkabanWebServer) getApplication();
this.scheduler = server.getScheduler();
+ this.projectManager = server.getProjectManager();
}
@Override
@@ -75,21 +81,50 @@ public class FlowTriggerServlet extends LoginAbstractAzkabanServlet {
}
}
+ private boolean checkProjectIdAndFlowId(HttpServletRequest req) {
+ return hasParam(req, "projectId") && hasParam(req, "flowId");
+ }
+
private void handleAJAXAction(final HttpServletRequest req,
final HttpServletResponse resp, final Session session) throws ServletException,
IOException {
final HashMap<String, Object> ret = new HashMap<>();
final String ajaxName = getParam(req, "ajax");
if (ajaxName.equals("fetchTrigger")) {
- if (hasParam(req, "projectId") && hasParam(req, "flowId")) {
+ if (checkProjectIdAndFlowId(req)) {
final int projectId = getIntParam(req, "projectId");
final String flowId = getParam(req, "flowId");
ajaxFetchTrigger(projectId, flowId, session, ret);
- if (ret != null) {
- this.writeJSON(resp, ret);
+ }
+ } else if (ajaxName.equals("pauseTrigger") || ajaxName.equals("resumeTrigger")) {
+ if (checkProjectIdAndFlowId(req)) {
+ final int projectId = getIntParam(req, "projectId");
+ final String flowId = getParam(req, "flowId");
+ final Project project = this.projectManager.getProject(projectId);
+
+ if (project == null) {
+ ret.put("error", "please specify a valid project id");
+ }
+ else if (!hasPermission(project, session.getUser(), Type.ADMIN)) {
+ ret.put("error", "Permission denied. Need ADMIN access.");
+ }
+ else {
+ try {
+ if(ajaxName.equals("pauseTrigger")) {
+ this.scheduler.pauseFlowTrigger(projectId, flowId);
+ } else {
+ this.scheduler.resumeFlowTrigger(projectId, flowId);
+ }
+ ret.put("status", "success");
+ } catch (SchedulerException ex) {
+ ret.put("error", ex.getMessage());
+ }
}
}
}
+ if (ret != null) {
+ this.writeJSON(resp, ret);
+ }
}
private void handlePage(final HttpServletRequest req,
diff --git a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
index ec8c033..608e958 100644
--- a/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
+++ b/azkaban-web-server/src/main/resources/azkaban/webapp/servlet/velocity/flowtriggerspage.vm
@@ -21,6 +21,7 @@
#parse("azkaban/webapp/servlet/velocity/style.vm")
#parse("azkaban/webapp/servlet/velocity/javascript.vm")
<script type="text/javascript" src="${context}/js/azkaban/util/ajax.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban/view/executions.js"></script>
<script type="text/javascript" src="${context}/js/jquery/jquery.tablesorter.js"></script>
<script type="text/javascript">
var contextURL = "${context}";
@@ -34,6 +35,36 @@
jobTable.tablesorter();
});
</script>
+
+ <script type="text/javascript">
+ function pauseTrigger(projectId, flowId) {
+ var requestURL = document.location.href;
+ var requestData = {"projectId": projectId, "flowId" : flowId, "ajax": "pauseTrigger"};
+ var successHandler = function (data) {
+ if (data.error) {
+ showDialog("Error", data.error);
+ }
+ else {
+ window.location = requestURL;
+ }
+ };
+ ajaxCall(requestURL, requestData, successHandler);
+ }
+
+ function resumeTrigger(projectId, flowId) {
+ var requestURL = document.location.href;
+ var requestData = {"projectId": projectId, "flowId" : flowId, "ajax": "resumeTrigger"};
+ var successHandler = function (data) {
+ if (data.error) {
+ showDialog("Error", data.error);
+ }
+ else {
+ window.location = requestURL;
+ }
+ };
+ ajaxCall(requestURL, requestData, successHandler);
+ }
+ </script>
</head>
<body>
@@ -73,6 +104,8 @@
<th>Schedule</th>
<th>Max Wait Mins</th>
<th>Dependencies</th>
+ <th>Is Paused</th>
+ <th>Pause/Resume</th>
</tr>
</thead>
<tbody>
@@ -130,6 +163,21 @@
</div>
</div>
+ <td>${trigger.isPaused()}</td>
+
+ ##todo chengren311: add pause/resume button in flow page.
+ <td>
+ #if (${trigger.isPaused()} == "false")
+ <button type="button" id="pausebtn" class="btn btn-danger btn-sm"
+ onclick="pauseTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">Pause
+ </button>
+ #else
+ <button type="button" id="resumebtn" class="btn btn-info btn-sm"
+ onclick="resumeTrigger('${trigger.getProjectId()}', '${trigger.getFlowId()}')">Resume
+ </button>
+ #end
+ </td>
+
</tr>
#end
#else