azkaban-aplcache

job name should be customized for different types of quartz

5/9/2018 7:29:16 PM

Details

diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
index 75ca642..24d4a81 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerQuartzJob.java
@@ -34,6 +34,8 @@ public class FlowTriggerQuartzJob extends AbstractQuartzJob {
   public static final String FLOW_ID = "FLOW_ID";
   public static final String FLOW_VERSION = "FLOW_VERSION";
 
+  public static final String JOB_NAME = "FLOW_TRIGGER";
+
   private final FlowTriggerService triggerService;
   private final ProjectManager projectManager;
 
diff --git a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
index 8ae50cb..9a3d7b0 100644
--- a/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/flowtrigger/quartz/FlowTriggerScheduler.java
@@ -98,10 +98,12 @@ public class FlowTriggerScheduler {
             logger.info("scheduling flow " + flow.getProjectId() + "." + flow.getId());
             this.scheduler
                 .registerJob(flowTrigger.getSchedule().getCronExpression(), new QuartzJobDescription
-                    (FlowTriggerQuartzJob.class, generateGroupName(flow), contextMap));
+                    (FlowTriggerQuartzJob.class, FlowTriggerQuartzJob.JOB_NAME,
+                        generateGroupName(flow), contextMap));
           }
         } catch (final Exception ex) {
-          logger.error("error in getting flow file", ex);
+          logger.error(String.format("error in registering flow [project: %s, flow: %s]", project
+              .getName(), flow.getId()), ex);
         } finally {
           FlowLoaderUtils.cleanUpDir(tempDir);
         }
@@ -112,13 +114,14 @@ public class FlowTriggerScheduler {
   public void pauseFlowTrigger(final int projectId, final String flowId) throws SchedulerException {
     logger.info(String.format("pausing flow trigger for [projectId:%s, flowId:%s]", projectId,
         flowId));
-    this.scheduler.pauseJob(generateGroupName(projectId, flowId));
+    this.scheduler.pauseJob(FlowTriggerQuartzJob.JOB_NAME, generateGroupName(projectId, flowId));
   }
 
   public void resumeFlowTrigger(final int projectId, final String flowId) throws
       SchedulerException {
-    logger.info(String.format("resuming flow trigger for [projectId:%s, flowId:%s]", projectId, flowId));
-    this.scheduler.resumeJob(generateGroupName(projectId, flowId));
+    logger.info(
+        String.format("resuming flow trigger for [projectId:%s, flowId:%s]", projectId, flowId));
+    this.scheduler.resumeJob(FlowTriggerQuartzJob.JOB_NAME, generateGroupName(projectId, flowId));
   }
 
   /**
@@ -131,7 +134,7 @@ public class FlowTriggerScheduler {
 
       final List<ScheduledFlowTrigger> flowTriggerJobDetails = new ArrayList<>();
       for (final String groupName : groupNames) {
-        final JobKey jobKey = new JobKey(QuartzScheduler.DEFAULT_JOB_NAME, groupName);
+        final JobKey jobKey = new JobKey(FlowTriggerQuartzJob.JOB_NAME, groupName);
         ScheduledFlowTrigger scheduledFlowTrigger = null;
         try {
           final JobDetail job = quartzScheduler.getJobDetail(jobKey);
@@ -143,7 +146,8 @@ public class FlowTriggerScheduler {
               .get(FlowTriggerQuartzJob.FLOW_TRIGGER);
           final String submitUser = jobDataMap.getString(FlowTriggerQuartzJob.SUBMIT_USER);
           final List<? extends Trigger> quartzTriggers = quartzScheduler.getTriggersOfJob(jobKey);
-          final boolean isPaused = this.scheduler.isJobPaused(groupName);
+          final boolean isPaused = this.scheduler
+              .isJobPaused(FlowTriggerQuartzJob.JOB_NAME, groupName);
           scheduledFlowTrigger = new ScheduledFlowTrigger(projectId,
               this.projectManager.getProject(projectId).getName(),
               flowId, flowTrigger, submitUser, quartzTriggers.isEmpty() ? null
@@ -170,7 +174,11 @@ public class FlowTriggerScheduler {
       logger.info("unscheduling flow" + flow.getProjectId() + "." + flow.getId() + " if it has "
           + " schedule");
       if (!flow.isEmbeddedFlow()) {
-        this.scheduler.unregisterJob(generateGroupName(flow));
+        try {
+          this.scheduler.unregisterJob(FlowTriggerQuartzJob.JOB_NAME, generateGroupName(flow));
+        } catch (final Exception ex) {
+          logger.info("error when unregistering job", ex);
+        }
       }
     }
   }
@@ -214,7 +222,7 @@ public class FlowTriggerScheduler {
     }
 
     public boolean isPaused() {
-      return isPaused;
+      return this.isPaused;
     }
 
     public int getProjectId() {
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
index 12ea3f1..9f90772 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzJobDescription.java
@@ -26,11 +26,11 @@ import java.util.Map;
 public class QuartzJobDescription<T extends AbstractQuartzJob> {
 
   private final String groupName;
+  private final String jobName;
   private final Class<T> jobClass;
   private final Map<String, ? extends Serializable> contextMap;
-
   public QuartzJobDescription(final Class<T> jobClass,
-      final String groupName,
+      final String jobName, final String groupName,
       final Map<String, ? extends Serializable> contextMap) {
 
     /**
@@ -41,23 +41,18 @@ public class QuartzJobDescription<T extends AbstractQuartzJob> {
       throw new ClassCastException("jobClass must extend AbstractQuartzJob class");
     }
     this.jobClass = jobClass;
+    this.jobName = jobName;
     this.groupName = groupName;
     this.contextMap = contextMap;
   }
 
   public QuartzJobDescription(final Class<T> jobClass,
-      final String groupName) {
+      final String jobName, final String groupName) {
+    this(jobClass, jobName, groupName, new HashMap<String, String>());
+  }
 
-    /**
-     * This check is necessary for raw type. Please see test
-     * {@link QuartzJobDescriptionTest#testCreateQuartzJobDescription2}
-     */
-    if (jobClass.getSuperclass() != AbstractQuartzJob.class) {
-      throw new ClassCastException("jobClass must extend AbstractQuartzJob class");
-    }
-    this.jobClass = jobClass;
-    this.groupName = groupName;
-    this.contextMap = new HashMap<String, String>();
+  public String getJobName() {
+    return jobName;
   }
 
   public Class<? extends AbstractQuartzJob> getJobClass() {
diff --git a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
index 4b4de62..fb37741 100644
--- a/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
+++ b/azkaban-web-server/src/main/java/azkaban/scheduler/QuartzScheduler.java
@@ -22,7 +22,6 @@ import static java.util.Objects.requireNonNull;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.utils.Props;
 import java.util.List;
-import java.util.Set;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.quartz.CronExpression;
@@ -36,18 +35,20 @@ import org.quartz.Trigger;
 import org.quartz.Trigger.TriggerState;
 import org.quartz.TriggerBuilder;
 import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.impl.matchers.GroupMatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Manages Quartz schedules. Azkaban regards QuartzJob and QuartzTrigger as an one-to-one mapping.
+ * Manages Quartz schedules. Azkaban regards QuartzJob and QuartzTrigger as an one-to-one
+ * mapping.
+ * Quartz job key naming standard:
+ * Job key is composed of job name and group name. Job type denotes job name. Project id+flow
+ * name denotes group name.
+ * E.x FLOW_TRIGGER as job name, 1.flow1 as group name
  */
 @Singleton
 public class QuartzScheduler {
 
-  //Unless specified, all Quartz jobs's identities comes with the default job name.
-  public static final String DEFAULT_JOB_NAME = "job1";
   private static final Logger logger = LoggerFactory.getLogger(QuartzScheduler.class);
   private Scheduler scheduler = null;
 
@@ -113,20 +114,28 @@ public class QuartzScheduler {
     }
   }
 
+  private void checkJobExistence(final String jobName, final String groupName)
+      throws SchedulerException {
+    if (!ifJobExist(jobName, groupName)) {
+      throw new SchedulerException(String.format("can not find job with job name: %s and group "
+          + "name %s: in quartz.", jobName, groupName));
+    }
+  }
+
+
   /**
    * pause a job given the groupname. since pausing request might be issued concurrently,
    * so synchronized is added to ensure thread safety.
    */
-  public synchronized void pauseJob(final String groupName) throws SchedulerException {
-    if (!ifJobExist(groupName)) {
-      throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
-    } else {
-      this.scheduler.pauseJob(new JobKey(DEFAULT_JOB_NAME, groupName));
-    }
+  public synchronized void pauseJob(final String jobName, final String groupName)
+      throws SchedulerException {
+    checkJobExistence(jobName, groupName);
+    this.scheduler.pauseJob(new JobKey(jobName, groupName));
   }
 
-  public synchronized boolean isJobPaused(final String groupName) throws SchedulerException {
-    final JobKey jobKey = new JobKey(DEFAULT_JOB_NAME, groupName);
+  public synchronized boolean isJobPaused(final String jobName, final String groupName)
+      throws SchedulerException {
+    final JobKey jobKey = new JobKey(jobName, groupName);
     final JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
     final List<? extends Trigger> triggers = this.scheduler.getTriggersOfJob(jobDetail.getKey());
     for (final Trigger trigger : triggers) {
@@ -142,24 +151,20 @@ public class QuartzScheduler {
    * resume a paused job given the groupname. since resuming request might be issued concurrently,
    * so synchronized is added to ensure thread safety.
    */
-  public synchronized void resumeJob(final String groupName) throws SchedulerException {
-    if (!ifJobExist(groupName)) {
-      throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
-    } else {
-      this.scheduler.resumeJob(new JobKey(DEFAULT_JOB_NAME, groupName));
-    }
+  public synchronized void resumeJob(final String jobName, final String groupName)
+      throws SchedulerException {
+    checkJobExistence(jobName, groupName);
+    this.scheduler.resumeJob(new JobKey(jobName, groupName));
   }
 
   /**
    * Unregister a job given the groupname. Since unregister might be called when
    * concurrently removing projects, so synchronized is added to ensure thread safety.
    */
-  public synchronized void unregisterJob(final String groupName) throws SchedulerException {
-    if (!ifJobExist(groupName)) {
-      throw new SchedulerException("can not find job with group name: " + groupName + " in quartz.");
-    } else {
-      this.scheduler.deleteJob(new JobKey(DEFAULT_JOB_NAME, groupName));
-    }
+  public synchronized void unregisterJob(final String jobName, final String groupName) throws
+      SchedulerException {
+    checkJobExistence(jobName, groupName);
+    this.scheduler.deleteJob(new JobKey(jobName, groupName));
   }
 
   /**
@@ -181,10 +186,9 @@ public class QuartzScheduler {
 
     requireNonNull(jobDescription, "jobDescription is null");
 
-    // Not allowed to register duplicate job name.
-    if (ifJobExist(jobDescription.getGroupName())) {
-      throw new SchedulerException(
-          "can not register existing job " + jobDescription.getGroupName());
+    if (ifJobExist(jobDescription.getJobName(), jobDescription.getGroupName())) {
+      throw new SchedulerException(String.format("can not register existing job with job name: "
+          + "%s and group name: %s", jobDescription.getJobName(), jobDescription.getGroupName()));
     }
 
     if (!CronExpression.isValidExpression(cronExpression)) {
@@ -194,7 +198,7 @@ public class QuartzScheduler {
 
     // TODO kunkun-tang: we will modify this when we start supporting multi schedules per flow.
     final JobDetail job = JobBuilder.newJob(jobDescription.getJobClass())
-        .withIdentity(DEFAULT_JOB_NAME, jobDescription.getGroupName()).build();
+        .withIdentity(jobDescription.getJobName(), jobDescription.getGroupName()).build();
 
     // Add external dependencies to Job Data Map.
     job.getJobDataMap().putAll(jobDescription.getContextMap());
@@ -215,9 +219,9 @@ public class QuartzScheduler {
   }
 
 
-  public boolean ifJobExist(final String groupName) throws SchedulerException {
-    final Set<JobKey> jobKeySet = this.scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName));
-    return jobKeySet != null && jobKeySet.size() > 0;
+  public boolean ifJobExist(final String jobName, final String groupName)
+      throws SchedulerException {
+    return this.scheduler.getJobDetail(new JobKey(jobName, groupName)) != null;
   }
 
   public Scheduler getScheduler() {
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
index 2c94d2d..cd714b0 100644
--- a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzJobDescriptionTest.java
@@ -26,19 +26,18 @@ import org.junit.Test;
 public class QuartzJobDescriptionTest {
 
   @Test
-  public void testCreateQuartzJobDescription() throws Exception{
+  public void testCreateQuartzJobDescription() throws Exception {
     final Map<String, SampleService> contextMap = new HashMap<>();
     assertThatCode(() -> {
-          new QuartzJobDescription<>(SampleQuartzJob.class,
-          "SampleService",
-          contextMap);
+      new QuartzJobDescription<>(SampleQuartzJob.class, "SampleJob",
+          "SampleService", contextMap);
     }).doesNotThrowAnyException();
   }
 
   @Test
   public void testCreateQuartzJobDescriptionRawType2() throws Exception {
     assertThatThrownBy(
-        () -> new QuartzJobDescription(SampleService.class, "SampleService"))
+        () -> new QuartzJobDescription(SampleService.class, "SampleJob", "SampleService"))
         .isInstanceOf(RuntimeException.class)
         .hasMessageContaining("jobClass must extend AbstractQuartzJob class");
   }
diff --git a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
index 776554d..fd27ec6 100644
--- a/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
+++ b/azkaban-web-server/src/test/java/azkaban/scheduler/QuartzSchedulerTest.java
@@ -94,7 +94,7 @@ public class QuartzSchedulerTest {
   @Test
   public void testCreateScheduleAndRun() throws Exception {
     scheduler.registerJob("* * * * * ?", createJobDescription());
-    assertThat(scheduler.ifJobExist("SampleService")).isEqualTo(true);
+    assertThat(scheduler.ifJobExist("SampleJob", "SampleService")).isEqualTo(true);
     TestUtils.await().untilAsserted(() -> assertThat(SampleQuartzJob.COUNT_EXECUTION)
         .isNotNull().isGreaterThan(1));
   }
@@ -119,9 +119,27 @@ public class QuartzSchedulerTest {
   @Test
   public void testUnregisterSchedule() throws Exception {
     scheduler.registerJob("* * * * * ?", createJobDescription());
-    assertThat(scheduler.ifJobExist("SampleService")).isEqualTo(true);
-    scheduler.unregisterJob("SampleService");
-    assertThat(scheduler.ifJobExist("SampleService")).isEqualTo(false);
+    assertThat(scheduler.ifJobExist("SampleJob", "SampleService")).isEqualTo(true);
+    scheduler.unregisterJob("SampleJob", "SampleService");
+    assertThat(scheduler.ifJobExist("SampleJob", "SampleService")).isEqualTo(false);
+  }
+
+  @Test
+  public void testPauseSchedule() throws Exception {
+    scheduler.registerJob("* * * * * ?", createJobDescription());
+    scheduler.pauseJob("SampleJob", "SampleService");
+    assertThat(scheduler.isJobPaused("SampleJob", "SampleService")).isEqualTo(true);
+    scheduler.resumeJob("SampleJob", "SampleService");
+    assertThat(scheduler.isJobPaused("SampleJob", "SampleService")).isEqualTo(false);
+
+    // test pausing a paused job
+    scheduler.pauseJob("SampleJob", "SampleService");
+    scheduler.pauseJob("SampleJob", "SampleService");
+    assertThat(scheduler.isJobPaused("SampleJob", "SampleService")).isEqualTo(true);
+    // test resuming a non-paused job
+    scheduler.resumeJob("SampleJob", "SampleService");
+    scheduler.resumeJob("SampleJob", "SampleService");
+    assertThat(scheduler.isJobPaused("SampleJob", "SampleService")).isEqualTo(false);
   }
 
   @Ignore("Flaky test, slow too. Don't use Thread.sleep in unit tests.")
@@ -138,6 +156,6 @@ public class QuartzSchedulerTest {
   }
 
   private QuartzJobDescription createJobDescription() {
-    return new QuartzJobDescription<>(SampleQuartzJob.class, "SampleService");
+    return new QuartzJobDescription<>(SampleQuartzJob.class, "SampleJob", "SampleService");
   }
 }