azkaban-aplcache

refactor ScheduleManager (#1095) Adding end date feature

5/17/2017 1:08:45 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
index c9da698..e6c1923 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
@@ -40,6 +40,8 @@ import azkaban.utils.Props;
  * instead and waits until correct loading time for the flow. It will not remove
  * the flow from the schedule when it is run, which can potentially allow the
  * flow to and overlap each other.
+ *
+ * TODO kunkun-tang: When new AZ quartz Scheduler comes, we will remove this class.
  */
 public class ScheduleManager implements TriggerAgent {
   private static Logger logger = Logger.getLogger(ScheduleManager.class);
@@ -50,42 +52,28 @@ public class ScheduleManager implements TriggerAgent {
   private ScheduleLoader loader;
 
   private Map<Integer, Schedule> scheduleIDMap =
-      new LinkedHashMap<Integer, Schedule>();
+      new LinkedHashMap<>();
   private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap =
-      new LinkedHashMap<Pair<Integer, String>, Schedule>();
+      new LinkedHashMap<>();
 
   /**
    * Give the schedule manager a loader class that will properly load the
    * schedule.
    *
-   * @param loader
    */
   public ScheduleManager(ScheduleLoader loader) {
     this.loader = loader;
   }
 
+  // Since ScheduleManager was already replaced by TriggerManager, many methods like start are
+  // never used.
+  @Deprecated
   @Override
   public void start() throws ScheduleManagerException {
-    List<Schedule> scheduleList = null;
-    try {
-      scheduleList = loader.loadSchedules();
-    } catch (ScheduleManagerException e) {
-      logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
-      e.printStackTrace();
-    }
-
-    for (Schedule sched : scheduleList) {
-      if (sched.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
-        onScheduleExpire(sched);
-      } else {
-        internalSchedule(sched);
-      }
-    }
-
   }
 
   // only do this when using external runner
-  public synchronized void updateLocal() throws ScheduleManagerException {
+  private synchronized void updateLocal() throws ScheduleManagerException {
     List<Schedule> updates = loader.loadUpdatedSchedules();
     for (Schedule s : updates) {
       if (s.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
@@ -112,62 +100,39 @@ public class ScheduleManager implements TriggerAgent {
   /**
    * Retrieves a copy of the list of schedules.
    *
-   * @return
-   * @throws ScheduleManagerException
    */
   public synchronized List<Schedule> getSchedules()
       throws ScheduleManagerException {
 
     updateLocal();
-    return new ArrayList<Schedule>(scheduleIDMap.values());
+    return new ArrayList<>(scheduleIDMap.values());
   }
 
   /**
    * Returns the scheduled flow for the flow name
    *
-   * @param id
-   * @return
-   * @throws ScheduleManagerException
    */
-
   public Schedule getSchedule(int projectId, String flowId)
       throws ScheduleManagerException {
     updateLocal();
-    return scheduleIdentityPairMap.get(new Pair<Integer, String>(projectId,
+    return scheduleIdentityPairMap.get(new Pair<>(projectId,
         flowId));
   }
 
   /**
    * Returns the scheduled flow for the scheduleId
    *
-   * @param id
-   * @return
-   * @throws ScheduleManagerException
+   * @param scheduleId Schedule ID
    */
   public Schedule getSchedule(int scheduleId) throws ScheduleManagerException {
     updateLocal();
     return scheduleIDMap.get(scheduleId);
   }
 
-  /**
-   * Removes the flow from the schedule if it exists.
-   *
-   * @param id
-   * @throws ScheduleManagerException
-   */
-
-  public synchronized void removeSchedule(int projectId, String flowId)
-      throws ScheduleManagerException {
-    Schedule sched = getSchedule(projectId, flowId);
-    if (sched != null) {
-      removeSchedule(sched);
-    }
-  }
 
   /**
    * Removes the flow from the schedule if it exists.
    *
-   * @param id
    */
   public synchronized void removeSchedule(Schedule sched) {
     Pair<Integer, String> identityPairMap = sched.getScheduleIdentityPair();
@@ -182,34 +147,32 @@ public class ScheduleManager implements TriggerAgent {
     try {
       loader.removeSchedule(sched);
     } catch (ScheduleManagerException e) {
-      e.printStackTrace();
+      logger.error(e);
     }
   }
 
-  public Schedule scheduleFlow(final int scheduleId, final int projectId,
-      final String projectName, final String flowName, final String status,
-      final long firstSchedTime, final DateTimeZone timezone,
-      final ReadablePeriod period, final long lastModifyTime,
-      final long nextExecTime, final long submitTime, final String submitUser) {
-    return scheduleFlow(scheduleId, projectId, projectName, flowName, status,
-        firstSchedTime, timezone, period, lastModifyTime, nextExecTime,
-        submitTime, submitUser, null, null);
-  }
-
-  public Schedule scheduleFlow(final int scheduleId, final int projectId,
-      final String projectName, final String flowName, final String status,
-      final long firstSchedTime, final DateTimeZone timezone,
-      final ReadablePeriod period, final long lastModifyTime,
-      final long nextExecTime, final long submitTime, final String submitUser,
-      ExecutionOptions execOptions, List<SlaOption> slaOptions) {
+  public Schedule scheduleFlow(final int scheduleId,
+                               final int projectId,
+                               final String projectName,
+                               final String flowName,
+                               final String status,
+                               final long firstSchedTime,
+                               final DateTimeZone timezone,
+                               final ReadablePeriod period,
+                               final long lastModifyTime,
+                               final long nextExecTime,
+                               final long submitTime,
+                               final String submitUser,
+                               ExecutionOptions execOptions,
+                               List<SlaOption> slaOptions) {
     Schedule sched =
         new Schedule(scheduleId, projectId, projectName, flowName, status,
             firstSchedTime, timezone, period, lastModifyTime, nextExecTime,
             submitTime, submitUser, execOptions, slaOptions, null);
     logger
         .info("Scheduling flow '" + sched.getScheduleName() + "' for "
-            + _dateFormat.print(firstSchedTime) + " with a period of " + period == null ? "(non-recurring)"
-            : period);
+            + _dateFormat.print(firstSchedTime) + " with a period of " + (period == null ? "(non-recurring)"
+            : period));
 
     insertSchedule(sched);
     return sched;
@@ -234,8 +197,6 @@ public class ScheduleManager implements TriggerAgent {
   }
   /**
    * Schedules the flow, but doesn't save the schedule afterwards.
-   *
-   * @param flow
    */
   private synchronized void internalSchedule(Schedule s) {
     scheduleIDMap.put(s.getScheduleId(), s);
@@ -244,8 +205,6 @@ public class ScheduleManager implements TriggerAgent {
 
   /**
    * Adds a flow to the schedule.
-   *
-   * @param flow
    */
   public synchronized void insertSchedule(Schedule s) {
     Schedule exist = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
@@ -260,7 +219,7 @@ public class ScheduleManager implements TriggerAgent {
           internalSchedule(s);
         }
       } catch (ScheduleManagerException e) {
-        e.printStackTrace();
+        logger.error(e);
       }
     } else {
       logger