Details
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
index 75ca642..24d4a81 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
@@ -34,6 +34,8 @@ public class FlowTriggerQuartzJob extends AbstractQuartzJob {
public static final String FLOW_ID = "FLOW_ID";
public static final String FLOW_VERSION = "FLOW_VERSION";
+ public static final String JOB_NAME = "FLOW_TRIGGER";
+
private final FlowTriggerService triggerService;
private final ProjectManager projectManager;
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
index 8ae50cb..9a3d7b0 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
@@ -98,10 +98,12 @@ public class FlowTriggerScheduler {
logger.info("scheduling flow " + flow.getProjectId() + "." + flow.getId());
this.scheduler
.registerJob(flowTrigger.getSchedule().getCronExpression(), new QuartzJobDescription
- (FlowTriggerQuartzJob.class, generateGroupName(flow), contextMap));
+ (FlowTriggerQuartzJob.class, FlowTriggerQuartzJob.JOB_NAME,
+ generateGroupName(flow), contextMap));
}
} catch (final Exception ex) {
- logger.error("error in getting flow file", ex);
+ logger.error(String.format("error in registering flow [project: %s, flow: %s]", project
+ .getName(), flow.getId()), ex);
} finally {
FlowLoaderUtils.cleanUpDir(tempDir);
}
@@ -112,13 +114,14 @@ public class FlowTriggerScheduler {
public void pauseFlowTrigger(final int projectId, final String flowId) throws SchedulerException {
logger.info(String.format("pausing flow trigger for [projectId:%s, flowId:%s]", projectId,
flowId));
- this.scheduler.pauseJob(generateGroupName(projectId, flowId));
+ this.scheduler.pauseJob(FlowTriggerQuartzJob.JOB_NAME, generateGroupName(projectId, flowId));
}
public void resumeFlowTrigger(final int projectId, final String flowId) throws
SchedulerException {
- logger.info(String.format("resuming flow trigger for [projectId:%s, flowId:%s]", projectId, flowId));
- this.scheduler.resumeJob(generateGroupName(projectId, flowId));
+ logger.info(
+ String.format("resuming flow trigger for [projectId:%s, flowId:%s]", projectId, flowId));
+ this.scheduler.resumeJob(FlowTriggerQuartzJob.JOB_NAME, generateGroupName(projectId, flowId));
}
/**
@@ -131,7 +134,7 @@ public class FlowTriggerScheduler {
final List<ScheduledFlowTrigger> flowTriggerJobDetails = new ArrayList<>();
for (final String groupName : groupNames) {
- final JobKey jobKey = new JobKey(QuartzScheduler.DEFAULT_JOB_NAME, groupName);
+ final JobKey jobKey = new JobKey(FlowTriggerQuartzJob.JOB_NAME, groupName);
ScheduledFlowTrigger scheduledFlowTrigger = null;
try {
final JobDetail job = quartzScheduler.getJobDetail(jobKey);
@@ -143,7 +146,8 @@ public class FlowTriggerScheduler {
.get(FlowTriggerQuartzJob.FLOW_TRIGGER);
final String submitUser = jobDataMap.getString(FlowTriggerQuartzJob.SUBMIT_USER);
final List<? extends Trigger> quartzTriggers = quartzScheduler.getTriggersOfJob(jobKey);
- final boolean isPaused = this.scheduler.isJobPaused(groupName);
+ final boolean isPaused = this.scheduler
+ .isJobPaused(FlowTriggerQuartzJob.JOB_NAME, groupName);
scheduledFlowTrigger = new ScheduledFlowTrigger(projectId,
this.projectManager.getProject(projectId).getName(),
flowId, flowTrigger, submitUser, quartzTriggers.isEmpty() ? null
@@ -170,7 +174,11 @@ public class FlowTriggerScheduler {
logger.info("unscheduling flow" + flow.getProjectId() + "." + flow.getId() + " if it has "
+ " schedule");
if (!flow.isEmbeddedFlow()) {
- this.scheduler.unregisterJob(generateGroupName(flow));
+ try {
+ this.scheduler.unregisterJob(FlowTriggerQuartzJob.JOB_NAME, generateGroupName(flow));
+ } catch (final Exception ex) {
+ logger.info("error when unregistering job", ex);
+ }
}
}
}
@@ -214,7 +222,7 @@ public class FlowTriggerScheduler {
}
public boolean isPaused() {
- return isPaused;
+ return this.isPaused;
}
public int getProjectId() {
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
index 12ea3f1..9f90772 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
@@ -26,11 +26,11 @@ import java.util.Map;
public class QuartzJobDescription<T extends AbstractQuartzJob> {
private final String groupName;
+ private final String jobName;
private final Class<T> jobClass;
private final Map<String, ? extends Serializable> contextMap;
-
public QuartzJobDescription(final Class<T> jobClass,
- final String groupName,
+ final String jobName, final String groupName,
final Map<String, ? extends Serializable> contextMap) {
/**
@@ -41,23 +41,18 @@ public class QuartzJobDescription<T extends AbstractQuartzJob> {
throw new ClassCastException("jobClass must extend AbstractQuartzJob class");
}
this.jobClass = jobClass;
+ this.jobName = jobName;
this.groupName = groupName;
this.contextMap = contextMap;
}
public QuartzJobDescription(final Class<T> jobClass,
- final String groupName) {
+ final String jobName, final String groupName) {
+ this(jobClass, jobName, groupName, new HashMap<String, String>());
+ }
- /**
- * This check is necessary for raw type. Please see test
- * {@link QuartzJobDescriptionTest#testCreateQuartzJobDescription2}
- */
- if (jobClass.getSuperclass() != AbstractQuartzJob.class) {
- throw new ClassCastException("jobClass must extend AbstractQuartzJob class");
- }
- this.jobClass = jobClass;
- this.groupName = groupName;
- this.contextMap = new HashMap<String, String>();
+ public String getJobName() {
+ return jobName;
}
public Class<? extends AbstractQuartzJob> getJobClass() {
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
index 4b4de62..fb37741 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
@@ -22,7 +22,6 @@ import static java.util.Objects.requireNonNull;
import azkaban.Constants.ConfigurationKeys;
import azkaban.utils.Props;
import java.util.List;
-import java.util.Set;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.quartz.CronExpression;
@@ -36,18 +35,20 @@ import org.quartz.Trigger;
import org.quartz.Trigger.TriggerState;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Manages Quartz schedules. Azkaban regards QuartzJob and QuartzTrigger as an one-to-one mapping.
+ * Manages Quartz schedules. Azkaban regards QuartzJob and QuartzTrigger as an one-to-one
+ * mapping.
+ * Quartz job key naming standard:
+ * Job key is composed of job name and group name. Job type denotes job name. Project id+flow
+ * name denotes group name.
+ * E.x FLOW_TRIGGER as job name, 1.flow1 as group name
*/
@Singleton
public class QuartzScheduler {
- //Unless specified, all Quartz jobs's identities comes with the default job name.
- public static final String DEFAULT_JOB_NAME = "job1";
private static final Logger logger = LoggerFactory.getLogger(QuartzScheduler.class);
private Scheduler scheduler = null;
@@ -113,20 +114,28 @@ public class QuartzScheduler {
}
}
+ private void checkJobExistence(final String jobName, final String groupName)
+ throws SchedulerException {
+ if (!ifJobExist(jobName, groupName)) {
+ throw new SchedulerException(String.format("can not find job with job name: %s and group "
+ + "name %s: in quartz.", jobName, groupName));
+ }
+ }
+
+
/**
* pause a job given the groupname. since pausing request might be issued concurrently,
* so synchronized is added to ensure thread safety.
*/
- public synchronized void pauseJob(final String groupName) throws SchedulerException {
- if (!ifJobExist(groupName)) {
- throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
- } else {
- this.scheduler.pauseJob(new JobKey(DEFAULT_JOB_NAME, groupName));
- }
+ public synchronized void pauseJob(final String jobName, final String groupName)
+ throws SchedulerException {
+ checkJobExistence(jobName, groupName);
+ this.scheduler.pauseJob(new JobKey(jobName, groupName));
}
- public synchronized boolean isJobPaused(final String groupName) throws SchedulerException {
- final JobKey jobKey = new JobKey(DEFAULT_JOB_NAME, groupName);
+ public synchronized boolean isJobPaused(final String jobName, final String groupName)
+ throws SchedulerException {
+ final JobKey jobKey = new JobKey(jobName, groupName);
final JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
final List<? extends Trigger> triggers = this.scheduler.getTriggersOfJob(jobDetail.getKey());
for (final Trigger trigger : triggers) {
@@ -142,24 +151,20 @@ public class QuartzScheduler {
* resume a paused job given the groupname. since resuming request might be issued concurrently,
* so synchronized is added to ensure thread safety.
*/
- public synchronized void resumeJob(final String groupName) throws SchedulerException {
- if (!ifJobExist(groupName)) {
- throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
- } else {
- this.scheduler.resumeJob(new JobKey(DEFAULT_JOB_NAME, groupName));
- }
+ public synchronized void resumeJob(final String jobName, final String groupName)
+ throws SchedulerException {
+ checkJobExistence(jobName, groupName);
+ this.scheduler.resumeJob(new JobKey(jobName, groupName));
}
/**
* Unregister a job given the groupname. Since unregister might be called when
* concurrently removing projects, so synchronized is added to ensure thread safety.
*/
- public synchronized void unregisterJob(final String groupName) throws SchedulerException {
- if (!ifJobExist(groupName)) {
- throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
- } else {
- this.scheduler.deleteJob(new JobKey(DEFAULT_JOB_NAME, groupName));
- }
+ public synchronized void unregisterJob(final String jobName, final String groupName) throws
+ SchedulerException {
+ checkJobExistence(jobName, groupName);
+ this.scheduler.deleteJob(new JobKey(jobName, groupName));
}
/**
@@ -181,10 +186,9 @@ public class QuartzScheduler {
requireNonNull(jobDescription, "jobDescription is null");
- // Not allowed to register duplicate job name.
- if (ifJobExist(jobDescription.getGroupName())) {
- throw new SchedulerException(
- "can not register existing job " + jobDescription.getGroupName());
+ if (ifJobExist(jobDescription.getJobName(), jobDescription.getGroupName())) {
+ throw new SchedulerException(String.format("can not register existing job with job name: "
+ + "%s and group name: %s", jobDescription.getJobName(), jobDescription.getGroupName()));
}
if (!CronExpression.isValidExpression(cronExpression)) {
@@ -194,7 +198,7 @@ public class QuartzScheduler {
// TODO kunkun-tang: we will modify this when we start supporting multi schedules per flow.
final JobDetail job = JobBuilder.newJob(jobDescription.getJobClass())
- .withIdentity(DEFAULT_JOB_NAME, jobDescription.getGroupName()).build();
+ .withIdentity(jobDescription.getJobName(), jobDescription.getGroupName()).build();
// Add external dependencies to Job Data Map.
job.getJobDataMap().putAll(jobDescription.getContextMap());
@@ -215,9 +219,9 @@ public class QuartzScheduler {
}
- public boolean ifJobExist(final String groupName) throws SchedulerException {
- final Set<JobKey> jobKeySet = this.scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName));
- return jobKeySet != null && jobKeySet.size() > 0;
+ public boolean ifJobExist(final String jobName, final String groupName)
+ throws SchedulerException {
+ return this.scheduler.getJobDetail(new JobKey(jobName, groupName)) != null;
}
public Scheduler getScheduler() {
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
index 2c94d2d..cd714b0 100644
--- a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
@@ -26,19 +26,18 @@ import org.junit.Test;
public class QuartzJobDescriptionTest {
@Test
- public void testCreateQuartzJobDescription() throws Exception{
+ public void testCreateQuartzJobDescription() throws Exception {
final Map<String, SampleService> contextMap = new HashMap<>();
assertThatCode(() -> {
- new QuartzJobDescription<>(SampleQuartzJob.class,
- "SampleService",
- contextMap);
+ new QuartzJobDescription<>(SampleQuartzJob.class, "SampleJob",
+ "SampleService", contextMap);
}).doesNotThrowAnyException();
}
@Test
public void testCreateQuartzJobDescriptionRawType2() throws Exception {
assertThatThrownBy(
- () -> new QuartzJobDescription(SampleService.class, "SampleService"))
+ () -> new QuartzJobDescription(SampleService.class, "SampleJob", "SampleService"))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("jobClass must extend AbstractQuartzJob class");
}
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
index 776554d..fd27ec6 100644
--- a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
@@ -94,7 +94,7 @@ public class QuartzSchedulerTest {
@Test
public void testCreateScheduleAndRun() throws Exception {
scheduler.registerJob("* * * * * ?", createJobDescription());
- assertThat(scheduler.ifJobExist("SampleService")).isEqualTo(true);
+ assertThat(scheduler.ifJobExist("SampleJob", "SampleService")).isEqualTo(true);
TestUtils.await().untilAsserted(() -> assertThat(SampleQuartzJob.COUNT_EXECUTION)
.isNotNull().isGreaterThan(1));
}
@@ -119,9 +119,27 @@ public class QuartzSchedulerTest {
@Test
public void testUnregisterSchedule() throws Exception {
scheduler.registerJob("* * * * * ?", createJobDescription());
- assertThat(scheduler.ifJobExist("SampleService")).isEqualTo(true);
- scheduler.unregisterJob("SampleService");
- assertThat(scheduler.ifJobExist("SampleService")).isEqualTo(false);
+ assertThat(scheduler.ifJobExist("SampleJob", "SampleService")).isEqualTo(true);
+ scheduler.unregisterJob("SampleJob", "SampleService");
+ assertThat(scheduler.ifJobExist("SampleJob", "SampleService")).isEqualTo(false);
+ }
+
+ @Test
+ public void testPauseSchedule() throws Exception {
+ scheduler.registerJob("* * * * * ?", createJobDescription());
+ scheduler.pauseJob("SampleJob", "SampleService");
+ assertThat(scheduler.isJobPaused("SampleJob", "SampleService")).isEqualTo(true);
+ scheduler.resumeJob("SampleJob", "SampleService");
+ assertThat(scheduler.isJobPaused("SampleJob", "SampleService")).isEqualTo(false);
+
+ // test pausing a paused job
+ scheduler.pauseJob("SampleJob", "SampleService");
+ scheduler.pauseJob("SampleJob", "SampleService");
+ assertThat(scheduler.isJobPaused("SampleJob", "SampleService")).isEqualTo(true);
+ // test resuming a non-paused job
+ scheduler.resumeJob("SampleJob", "SampleService");
+ scheduler.resumeJob("SampleJob", "SampleService");
+ assertThat(scheduler.isJobPaused("SampleJob", "SampleService")).isEqualTo(false);
}
@Ignore("Flaky test, slow too. Don't use Thread.sleep in unit tests.")
@@ -138,6 +156,6 @@ public class QuartzSchedulerTest {
}
private QuartzJobDescription createJobDescription() {
- return new QuartzJobDescription<>(SampleQuartzJob.class, "SampleService");
+ return new QuartzJobDescription<>(SampleQuartzJob.class, "SampleJob", "SampleService");
}
}