azkaban-aplcache

EndDate Expiring Schedule Introduction (#1110) * Implementing

6/5/2017 3:40:41 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 06c982c..2b228ac 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -49,6 +49,9 @@ public class Constants {
   public static final int DEFAULT_SSL_PORT_NUMBER = 8443;
   public static final int DEFAULT_JETTY_MAX_THREAD_COUNT = 20;
 
+  // One Schedule's default End Time: 01/01/2050, 00:00:00, UTC
+  public static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
+
   public static class ConfigurationKeys {
     // These properties are configurable through azkaban.properties
 
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java b/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
index d495059..4613a4d 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
@@ -43,6 +43,7 @@ public class Schedule {
   private final String projectName;
   private final String flowName;
   private final long firstSchedTime;
+  private final long endSchedTime;
   private final DateTimeZone timezone;
   private final long lastModifyTime;
   private final ReadablePeriod period;
@@ -56,40 +57,28 @@ public class Schedule {
   private ExecutionOptions executionOptions;
   private List<SlaOption> slaOptions;
 
-  public Schedule(final int scheduleId, final int projectId, final String projectName,
-      final String flowName, final String status, final long firstSchedTime,
-      final DateTimeZone timezone, final ReadablePeriod period, final long lastModifyTime,
-      final long nextExecTime, final long submitTime, final String submitUser) {
-
-    this(scheduleId, projectId, projectName, flowName, status, firstSchedTime,
-        timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser,
-        null, null, null);
-  }
-
-  public Schedule(final int scheduleId, final int projectId, final String projectName,
-      final String flowName, final String status, final long firstSchedTime,
-      final String timezoneId,
-      final String period, final long lastModifyTime, final long nextExecTime,
-      final long submitTime,
-      final String submitUser, final ExecutionOptions executionOptions,
-      final List<SlaOption> slaOptions) {
-    this(scheduleId, projectId, projectName, flowName, status, firstSchedTime,
-        DateTimeZone.forID(timezoneId), parsePeriodString(period),
-        lastModifyTime, nextExecTime, submitTime, submitUser, executionOptions,
-        slaOptions, null);
-  }
-
-  public Schedule(final int scheduleId, final int projectId, final String projectName,
-      final String flowName, final String status, final long firstSchedTime,
-      final DateTimeZone timezone, final ReadablePeriod period, final long lastModifyTime,
-      final long nextExecTime, final long submitTime, final String submitUser,
-      final ExecutionOptions executionOptions, final List<SlaOption> slaOptions,
-      final String cronExpression) {
+  public Schedule(final int scheduleId,
+                  final int projectId,
+                  final String projectName,
+                  final String flowName,
+                  final String status,
+                  final long firstSchedTime,
+                  final long endSchedTime,
+                  final DateTimeZone timezone,
+                  final ReadablePeriod period,
+                  final long lastModifyTime,
+                  final long nextExecTime,
+                  final long submitTime,
+                  final String submitUser,
+                  final ExecutionOptions executionOptions,
+                  final List<SlaOption> slaOptions,
+                  final String cronExpression) {
     this.scheduleId = scheduleId;
     this.projectId = projectId;
     this.projectName = projectName;
     this.flowName = flowName;
     this.firstSchedTime = firstSchedTime;
+    this.endSchedTime = endSchedTime;
     this.timezone = timezone;
     this.lastModifyTime = lastModifyTime;
     this.period = period;
@@ -112,27 +101,27 @@ public class Schedule {
     final int periodInt =
         Integer.parseInt(periodStr.substring(0, periodStr.length() - 1));
     switch (periodUnit) {
-      case 'M':
-        period = Months.months(periodInt);
-        break;
-      case 'w':
-        period = Weeks.weeks(periodInt);
-        break;
-      case 'd':
-        period = Days.days(periodInt);
-        break;
-      case 'h':
-        period = Hours.hours(periodInt);
-        break;
-      case 'm':
-        period = Minutes.minutes(periodInt);
-        break;
-      case 's':
-        period = Seconds.seconds(periodInt);
-        break;
-      default:
-        throw new IllegalArgumentException("Invalid schedule period unit '"
-            + periodUnit);
+    case 'M':
+      period = Months.months(periodInt);
+      break;
+    case 'w':
+      period = Weeks.weeks(periodInt);
+      break;
+    case 'd':
+      period = Days.days(periodInt);
+      break;
+    case 'h':
+      period = Hours.hours(periodInt);
+      break;
+    case 'm':
+      period = Minutes.minutes(periodInt);
+      break;
+    case 's':
+      period = Seconds.seconds(periodInt);
+      break;
+    default:
+      throw new IllegalArgumentException("Invalid schedule period unit '"
+          + periodUnit);
     }
 
     return period;
@@ -192,9 +181,7 @@ public class Schedule {
   public String toString() {
 
     final String underlying =
-        this.projectName + "." + this.flowName + " (" + this.projectId + ")"
-            + " to be run at (starting) "
-            + new DateTime(
+        this.projectName + "." + this.flowName + " (" + this.projectId + ")" + " to be run at (starting) " + new DateTime(
             this.firstSchedTime).toDateTimeISO();
     if (this.period == null && this.cronExpression == null) {
       return underlying + " non-recurring";
@@ -275,8 +262,9 @@ public class Schedule {
     }
 
     if (this.cronExpression != null) {
-      final DateTime nextTime = getNextCronRuntime(this.nextExecTime, this.timezone,
-          Utils.parseCronExpression(this.cronExpression, this.timezone));
+      final DateTime nextTime = getNextCronRuntime(
+          this.nextExecTime, this.timezone, Utils.parseCronExpression(this.cronExpression,
+              this.timezone));
       this.nextExecTime = nextTime.getMillis();
       return true;
     }
@@ -292,7 +280,7 @@ public class Schedule {
   }
 
   private DateTime getNextRuntime(final long scheduleTime, final DateTimeZone timezone,
-      final ReadablePeriod period) {
+                                  final ReadablePeriod period) {
     final DateTime now = new DateTime();
     DateTime date = new DateTime(scheduleTime).withZone(timezone);
     int count = 0;
@@ -315,13 +303,14 @@ public class Schedule {
   }
 
   /**
-   * @param scheduleTime represents the time when Schedule Servlet receives the Cron Schedule API
-   * call.
+   *
+   * @param scheduleTime represents the time when Schedule Servlet receives the Cron Schedule API call.
    * @param timezone is always UTC (after 3.1.0)
+   * @param ce
    * @return the First Scheduled DateTime to run this flow.
    */
   private DateTime getNextCronRuntime(final long scheduleTime, final DateTimeZone timezone,
-      final CronExpression ce) {
+                                      final CronExpression ce) {
 
     Date date = new DateTime(scheduleTime).withZone(timezone).toDate();
     if (ce != null) {
@@ -387,4 +376,7 @@ public class Schedule {
     return this.skipPastOccurrences;
   }
 
+  public long getEndSchedTime() {
+    return this.endSchedTime;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
index 65ae121..6c97b80 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
@@ -24,8 +24,6 @@ public interface ScheduleLoader {
 
   public void updateSchedule(Schedule s) throws ScheduleManagerException;
 
-  public List<Schedule> loadSchedules() throws ScheduleManagerException;
-
   public void removeSchedule(Schedule s) throws ScheduleManagerException;
 
   public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
index 60cfe2b..023d1ed 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
@@ -41,7 +41,6 @@ import org.joda.time.format.DateTimeFormatter;
  * TODO kunkun-tang: When new AZ quartz Scheduler comes, we will remove this class.
  */
 public class ScheduleManager implements TriggerAgent {
-
   public static final String triggerSource = "SimpleTimeTrigger";
   private static final Logger logger = Logger.getLogger(ScheduleManager.class);
   private final DateTimeFormatter _dateFormat = DateTimeFormat
@@ -56,6 +55,7 @@ public class ScheduleManager implements TriggerAgent {
   /**
    * Give the schedule manager a loader class that will properly load the
    * schedule.
+   *
    */
   public ScheduleManager(final ScheduleLoader loader) {
     this.loader = loader;
@@ -95,6 +95,7 @@ public class ScheduleManager implements TriggerAgent {
 
   /**
    * Retrieves a copy of the list of schedules.
+   *
    */
   public synchronized List<Schedule> getSchedules()
       throws ScheduleManagerException {
@@ -105,6 +106,7 @@ public class ScheduleManager implements TriggerAgent {
 
   /**
    * Returns the scheduled flow for the flow name
+   *
    */
   public Schedule getSchedule(final int projectId, final String flowId)
       throws ScheduleManagerException {
@@ -126,6 +128,7 @@ public class ScheduleManager implements TriggerAgent {
 
   /**
    * Removes the flow from the schedule if it exists.
+   *
    */
   public synchronized void removeSchedule(final Schedule sched) {
     final Pair<Integer, String> identityPairMap = sched.getScheduleIdentityPair();
@@ -145,43 +148,50 @@ public class ScheduleManager implements TriggerAgent {
   }
 
   public Schedule scheduleFlow(final int scheduleId,
-      final int projectId,
-      final String projectName,
-      final String flowName,
-      final String status,
-      final long firstSchedTime,
-      final DateTimeZone timezone,
-      final ReadablePeriod period,
-      final long lastModifyTime,
-      final long nextExecTime,
-      final long submitTime,
-      final String submitUser,
-      final ExecutionOptions execOptions,
-      final List<SlaOption> slaOptions) {
-    final Schedule sched =
-        new Schedule(scheduleId, projectId, projectName, flowName, status,
-            firstSchedTime, timezone, period, lastModifyTime, nextExecTime,
-            submitTime, submitUser, execOptions, slaOptions, null);
+                               final int projectId,
+                               final String projectName,
+                               final String flowName,
+                               final String status,
+                               final long firstSchedTime,
+                               final long endSchedTime,
+                               final DateTimeZone timezone,
+                               final ReadablePeriod period,
+                               final long lastModifyTime,
+                               final long nextExecTime,
+                               final long submitTime,
+                               final String submitUser,
+                               final ExecutionOptions execOptions,
+                               final List<SlaOption> slaOptions) {
+    final Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status,
+        firstSchedTime, endSchedTime, timezone, period, lastModifyTime, nextExecTime,
+        submitTime, submitUser, execOptions, slaOptions, null);
     logger
         .info("Scheduling flow '" + sched.getScheduleName() + "' for "
-            + this._dateFormat.print(firstSchedTime) + " with a period of " + (period == null
-            ? "(non-recurring)"
+            + this._dateFormat.print(firstSchedTime) + " with a period of " + (period == null ? "(non-recurring)"
             : period));
 
     insertSchedule(sched);
     return sched;
   }
 
-  public Schedule cronScheduleFlow(final int scheduleId, final int projectId,
-      final String projectName, final String flowName, final String status,
-      final long firstSchedTime, final DateTimeZone timezone,
-      final long lastModifyTime,
-      final long nextExecTime, final long submitTime, final String submitUser,
-      final ExecutionOptions execOptions, final List<SlaOption> slaOptions,
-      final String cronExpression) {
+  public Schedule cronScheduleFlow(final int scheduleId,
+                                   final int projectId,
+                                   final String projectName,
+                                   final String flowName,
+                                   final String status,
+                                   final long firstSchedTime,
+                                   final long endSchedTime,
+                                   final DateTimeZone timezone,
+                                   final long lastModifyTime,
+                                   final long nextExecTime,
+                                   final long submitTime,
+                                   final String submitUser,
+                                   final ExecutionOptions execOptions,
+                                   final List<SlaOption> slaOptions,
+                                   final String cronExpression) {
     final Schedule sched =
         new Schedule(scheduleId, projectId, projectName, flowName, status,
-            firstSchedTime, timezone, null, lastModifyTime, nextExecTime,
+            firstSchedTime, endSchedTime, timezone, null, lastModifyTime, nextExecTime,
             submitTime, submitUser, execOptions, slaOptions, cronExpression);
     logger
         .info("Scheduling flow '" + sched.getScheduleName() + "' for "
@@ -190,7 +200,6 @@ public class ScheduleManager implements TriggerAgent {
     insertSchedule(sched);
     return sched;
   }
-
   /**
    * Schedules the flow, but doesn't save the schedule afterwards.
    */
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 7483169..f32b900 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -16,6 +16,7 @@
 
 package azkaban.scheduler;
 
+import azkaban.Constants;
 import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.Trigger;
@@ -43,7 +44,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
   private long lastUpdateTime = -1;
 
   public TriggerBasedScheduleLoader(final TriggerManager triggerManager,
-      final String triggerSource) {
+                                    final String triggerSource) {
     this.triggerManager = triggerManager;
     this.triggerSource = triggerSource;
   }
@@ -95,18 +96,14 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
     return cond;
   }
 
-  // if failed to trigger, auto expire?
   private Condition createExpireCondition(final Schedule s) {
-    final Map<String, ConditionChecker> checkers =
-        new HashMap<>();
-    final ConditionChecker checker =
-        new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(),
-            s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(),
-            s.getPeriod(), s.getCronExpression());
+    final Map<String, ConditionChecker> checkers = new HashMap<>();
+    final ConditionChecker checker = new BasicTimeChecker("EndTimeChecker_1", s.getFirstSchedTime(),
+        s.getTimezone(), s.getEndSchedTime(), false, false,
+        null, null);
     checkers.put(checker.getId(), checker);
     final String expr = checker.getId() + ".eval()";
-    final Condition cond = new Condition(checkers, expr);
-    return cond;
+    return new Condition(checkers, expr);
   }
 
   @Override
@@ -130,33 +127,11 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
     }
   }
 
-  // TODO may need to add logic to filter out skip runs
-  @Override
-  public synchronized List<Schedule> loadSchedules()
-      throws ScheduleManagerException {
-    final List<Trigger> triggers = this.triggerManager.getTriggers(this.triggerSource);
-    final List<Schedule> schedules = new ArrayList<>();
-    for (final Trigger t : triggers) {
-      this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime());
-      final Schedule s = triggerToSchedule(t);
-      schedules.add(s);
-      System.out.println("loaded schedule for "
-          + s.getProjectName() + " (project_ID: " + s.getProjectId() + ")");
-    }
-    return schedules;
+  private Schedule triggerToSchedule(final Trigger t) throws ScheduleManagerException {
 
-  }
+    final BasicTimeChecker triggerTimeChecker = getBasicTimeChecker(t.getTriggerCondition().getCheckers());
+    final BasicTimeChecker endTimeChecker = getBasicTimeChecker(t.getExpireCondition().getCheckers());
 
-  private Schedule triggerToSchedule(final Trigger t) throws ScheduleManagerException {
-    final Condition triggerCond = t.getTriggerCondition();
-    final Map<String, ConditionChecker> checkers = triggerCond.getCheckers();
-    BasicTimeChecker ck = null;
-    for (final ConditionChecker checker : checkers.values()) {
-      if (checker.getType().equals(BasicTimeChecker.type)) {
-        ck = (BasicTimeChecker) checker;
-        break;
-      }
-    }
     final List<TriggerAction> actions = t.getActions();
     ExecuteFlowAction act = null;
     for (final TriggerAction action : actions) {
@@ -165,15 +140,23 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
         break;
       }
     }
-    if (ck != null && act != null) {
-      final Schedule s =
-          new Schedule(t.getTriggerId(), act.getProjectId(),
-              act.getProjectName(), act.getFlowName(),
-              t.getStatus().toString(), ck.getFirstCheckTime(),
-              ck.getTimeZone(), ck.getPeriod(), t.getLastModifyTime(),
-              ck.getNextCheckTime(), t.getSubmitTime(), t.getSubmitUser(),
-              act.getExecutionOptions(), act.getSlaOptions(), ck.getCronExpression());
-      return s;
+    if (triggerTimeChecker != null && act != null) {
+      return new Schedule(t.getTriggerId(),
+          act.getProjectId(),
+          act.getProjectName(),
+          act.getFlowName(),
+          t.getStatus().toString(),
+          triggerTimeChecker.getFirstCheckTime(),
+          endTimeChecker == null? Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME: endTimeChecker.getNextCheckTime(),
+          triggerTimeChecker.getTimeZone(),
+          triggerTimeChecker.getPeriod(),
+          t.getLastModifyTime(),
+          triggerTimeChecker.getNextCheckTime(),
+          t.getSubmitTime(),
+          t.getSubmitUser(),
+          act.getExecutionOptions(),
+          act.getSlaOptions(),
+          triggerTimeChecker.getCronExpression());
     } else {
       logger.error("Failed to parse schedule from trigger!");
       throw new ScheduleManagerException(
@@ -181,6 +164,16 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
     }
   }
 
+  // expirecheckers or triggerCheckers only have BasicTimeChecker today. This should be refactored in future.
+  private BasicTimeChecker getBasicTimeChecker(final Map<String, ConditionChecker> checkers) {
+    for (final ConditionChecker checker : checkers.values()) {
+      if (checker.getType().equals(BasicTimeChecker.type)) {
+        return (BasicTimeChecker) checker;
+      }
+    }
+    return null;
+  }
+
   @Override
   public void removeSchedule(final Schedule s) throws ScheduleManagerException {
     try {
@@ -212,7 +205,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
       this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime());
       final Schedule s = triggerToSchedule(t);
       schedules.add(s);
-      System.out.println("loaded schedule for "
+      logger.info("loaded schedule for "
           + s.getProjectName() + " (project_ID: " + s.getProjectId() + ")");
     }
     return schedules;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 05b7316..ad71790 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -28,7 +28,6 @@ import org.quartz.CronExpression;
 
 public class BasicTimeChecker implements ConditionChecker {
 
-
   public static final String type = "BasicTimeChecker";
   private final String id;
   private final long firstCheckTime;
@@ -41,8 +40,8 @@ public class BasicTimeChecker implements ConditionChecker {
   private boolean skipPastChecks = true;
 
   public BasicTimeChecker(final String id, final long firstCheckTime,
-      final DateTimeZone timezone, final boolean isRecurring, final boolean skipPastChecks,
-      final ReadablePeriod period, final String cronExpression) {
+                          final DateTimeZone timezone, final boolean isRecurring, final boolean skipPastChecks,
+                          final ReadablePeriod period, final String cronExpression) {
     this.id = id;
     this.firstCheckTime = firstCheckTime;
     this.timezone = timezone;
@@ -56,8 +55,8 @@ public class BasicTimeChecker implements ConditionChecker {
   }
 
   public BasicTimeChecker(final String id, final long firstCheckTime,
-      final DateTimeZone timezone, final long nextCheckTime, final boolean isRecurring,
-      final boolean skipPastChecks, final ReadablePeriod period, final String cronExpression) {
+                          final DateTimeZone timezone, final long nextCheckTime, final boolean isRecurring,
+                          final boolean skipPastChecks, final ReadablePeriod period, final String cronExpression) {
     this.id = id;
     this.firstCheckTime = firstCheckTime;
     this.timezone = timezone;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Condition.java b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
index 4490721..beb9820 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Condition.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
@@ -30,7 +30,7 @@ public class Condition {
 
   private static final Logger logger = Logger.getLogger(Condition.class);
 
-  private static JexlEngine jexl = new JexlEngine();
+  private static final JexlEngine jexl = new JexlEngine();
   private static CheckerTypeLoader checkerLoader = null;
   private final MapContext context = new MapContext();
   private Expression expression;
@@ -45,20 +45,12 @@ public class Condition {
   }
 
   public Condition(final Map<String, ConditionChecker> checkers, final String expr,
-      final long nextCheckTime) {
+                   final long nextCheckTime) {
     this.nextCheckTime = nextCheckTime;
     setCheckers(checkers);
     this.expression = jexl.createExpression(expr);
   }
 
-  public synchronized static void setJexlEngine(final JexlEngine jexl) {
-    Condition.jexl = jexl;
-  }
-
-  protected static CheckerTypeLoader getCheckerLoader() {
-    return checkerLoader;
-  }
-
   public synchronized static void setCheckerLoader(final CheckerTypeLoader loader) {
     Condition.checkerLoader = loader;
   }
@@ -98,12 +90,6 @@ public class Condition {
     return cond;
   }
 
-  protected void registerChecker(final ConditionChecker checker) {
-    this.checkers.put(checker.getId(), checker);
-    this.context.set(checker.getId(), checker);
-    updateNextCheckTime();
-  }
-
   public long getNextCheckTime() {
     return this.nextCheckTime;
   }
@@ -112,7 +98,7 @@ public class Condition {
     return this.checkers;
   }
 
-  public void setCheckers(final Map<String, ConditionChecker> checkers) {
+  private void setCheckers(final Map<String, ConditionChecker> checkers) {
     this.checkers = checkers;
     for (final ConditionChecker checker : checkers.values()) {
       this.context.set(checker.getId(), checker);
@@ -120,12 +106,6 @@ public class Condition {
     updateNextCheckTime();
   }
 
-  public void updateCheckTime(final Long ct) {
-    if (this.nextCheckTime < ct) {
-      this.nextCheckTime = ct;
-    }
-  }
-
   private void updateNextCheckTime() {
     long time = Long.MAX_VALUE;
     for (final ConditionChecker checker : this.checkers.values()) {
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
index c88cd82..8b4014f 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
@@ -36,10 +36,8 @@ public class Trigger {
   private final String source;
   private final List<TriggerAction> actions;
   private final List<TriggerAction> expireActions;
-  
   private Condition expireCondition;
   private Condition triggerCondition;
-  
   private int triggerId = -1;
   private long lastModifyTime;
   private TriggerStatus status = TriggerStatus.READY;
@@ -50,16 +48,15 @@ public class Trigger {
 
   private long nextCheckTime = -1;
 
-  @SuppressWarnings("unused")
   private Trigger() throws TriggerManagerException {
     throw new TriggerManagerException("Triggers should always be specified");
   }
 
-  private Trigger(int triggerId, long lastModifyTime, long submitTime,
-      String submitUser, String source, Condition triggerCondition,
-      Condition expireCondition, List<TriggerAction> actions,
-      List<TriggerAction> expireActions, Map<String, Object> info,
-      Map<String, Object> context) {
+  private Trigger(final int triggerId, final long lastModifyTime, final long submitTime,
+                  final String submitUser, final String source, final Condition triggerCondition,
+                  final Condition expireCondition, final List<TriggerAction> actions,
+                  final List<TriggerAction> expireActions, final Map<String, Object> info,
+                  final Map<String, Object> context) {
     requireNonNull(submitUser);
     requireNonNull(source);
     requireNonNull(triggerCondition);
@@ -84,75 +81,72 @@ public class Trigger {
     return actionTypeLoader;
   }
 
-  public static synchronized void setActionTypeLoader(ActionTypeLoader loader) {
+  public static synchronized void setActionTypeLoader(final ActionTypeLoader loader) {
     Trigger.actionTypeLoader = loader;
   }
 
-  @SuppressWarnings("unchecked")
-  public static Trigger fromJson(Object obj) throws Exception {
+  public static Trigger fromJson(final Object obj) throws Exception {
 
     if (actionTypeLoader == null) {
       throw new Exception("Trigger Action Type loader not initialized.");
     }
-
-    Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+    final Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
 
     Trigger trigger = null;
     try {
       logger.info("Decoding for " + JSONUtils.toJSON(obj));
-      Condition triggerCond =
-          Condition.fromJson(jsonObj.get("triggerCondition"));
-      Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
-      List<TriggerAction> actions = new ArrayList<>();
-      List<Object> actionsJson = (List<Object>) jsonObj.get("actions");
-      for (Object actObj : actionsJson) {
-        Map<String, Object> oneActionJson = (HashMap<String, Object>) actObj;
-        String type = (String) oneActionJson.get("type");
-        TriggerAction act =
+      final Condition triggerCond = Condition.fromJson(jsonObj.get("triggerCondition"));
+      final Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
+      final List<TriggerAction> actions = new ArrayList<>();
+      final List<Object> actionsJson = (List<Object>) jsonObj.get("actions");
+      for (final Object actObj : actionsJson) {
+        final Map<String, Object> oneActionJson = (HashMap<String, Object>) actObj;
+        final String type = (String) oneActionJson.get("type");
+        final TriggerAction act =
             actionTypeLoader.createActionFromJson(type,
                 oneActionJson.get("actionJson"));
         actions.add(act);
       }
-      List<TriggerAction> expireActions = new ArrayList<>();
-      List<Object> expireActionsJson =
+      final List<TriggerAction> expireActions = new ArrayList<>();
+      final List<Object> expireActionsJson =
           (List<Object>) jsonObj.get("expireActions");
-      for (Object expireActObj : expireActionsJson) {
-        Map<String, Object> oneExpireActionJson =
+      for (final Object expireActObj : expireActionsJson) {
+        final Map<String, Object> oneExpireActionJson =
             (HashMap<String, Object>) expireActObj;
-        String type = (String) oneExpireActionJson.get("type");
-        TriggerAction expireAct =
+        final String type = (String) oneExpireActionJson.get("type");
+        final TriggerAction expireAct =
             actionTypeLoader.createActionFromJson(type,
                 oneExpireActionJson.get("actionJson"));
         expireActions.add(expireAct);
       }
-      boolean resetOnTrigger =
+      final boolean resetOnTrigger =
           Boolean.valueOf((String) jsonObj.get("resetOnTrigger"));
-      boolean resetOnExpire =
+      final boolean resetOnExpire =
           Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
-      String submitUser = (String) jsonObj.get("submitUser");
-      String source = (String) jsonObj.get("source");
-      long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
-      long lastModifyTime =
+      final String submitUser = (String) jsonObj.get("submitUser");
+      final String source = (String) jsonObj.get("source");
+      final long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
+      final long lastModifyTime =
           Long.valueOf((String) jsonObj.get("lastModifyTime"));
-      int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
-      TriggerStatus status =
+      final int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
+      final TriggerStatus status =
           TriggerStatus.valueOf((String) jsonObj.get("status"));
-      Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
+      final Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
       Map<String, Object> context =
           (Map<String, Object>) jsonObj.get("context");
       if (context == null) {
         context = new HashMap<>();
       }
-      for (ConditionChecker checker : triggerCond.getCheckers().values()) {
+      for (final ConditionChecker checker : triggerCond.getCheckers().values()) {
         checker.setContext(context);
       }
-      for (ConditionChecker checker : expireCond.getCheckers().values()) {
+      for (final ConditionChecker checker : expireCond.getCheckers().values()) {
         checker.setContext(context);
       }
-      for (TriggerAction action : actions) {
+      for (final TriggerAction action : actions) {
         action.setContext(context);
       }
-      for (TriggerAction action : expireActions) {
+      for (final TriggerAction action : expireActions) {
         action.setContext(context);
       }
 
@@ -172,7 +166,7 @@ public class Trigger {
       trigger.setResetOnExpire(resetOnExpire);
       trigger.setResetOnTrigger(resetOnTrigger);
       trigger.setStatus(status);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       logger.error("Failed to decode the trigger.", e);
       throw new Exception("Failed to decode the trigger.", e);
@@ -182,183 +176,182 @@ public class Trigger {
   }
 
   public void updateNextCheckTime() {
-    this.nextCheckTime =
-        Math.min(triggerCondition.getNextCheckTime(),
-            expireCondition.getNextCheckTime());
+    this.nextCheckTime = Math.min(this.triggerCondition.getNextCheckTime(),
+        this.expireCondition.getNextCheckTime());
   }
 
   public long getNextCheckTime() {
-    return nextCheckTime;
+    return this.nextCheckTime;
   }
 
-  public void setNextCheckTime(long nct) {
+  public void setNextCheckTime(final long nct) {
     this.nextCheckTime = nct;
   }
 
   public long getSubmitTime() {
-    return submitTime;
+    return this.submitTime;
   }
 
   public String getSubmitUser() {
-    return submitUser;
+    return this.submitUser;
   }
 
   public TriggerStatus getStatus() {
-    return status;
+    return this.status;
   }
 
-  public void setStatus(TriggerStatus status) {
+  public void setStatus(final TriggerStatus status) {
     this.status = status;
   }
 
   public Condition getTriggerCondition() {
-    return triggerCondition;
+    return this.triggerCondition;
   }
 
-  public void setTriggerCondition(Condition triggerCondition) {
+  public void setTriggerCondition(final Condition triggerCondition) {
     this.triggerCondition = triggerCondition;
   }
 
   public Condition getExpireCondition() {
-    return expireCondition;
+    return this.expireCondition;
   }
 
-  public void setExpireCondition(Condition expireCondition) {
+ public void setExpireCondition(final Condition expireCondition) {
     this.expireCondition = expireCondition;
-  }
+ }
 
   public List<TriggerAction> getActions() {
-    return actions;
+    return this.actions;
   }
 
   public List<TriggerAction> getExpireActions() {
-    return expireActions;
+    return this.expireActions;
   }
 
   public Map<String, Object> getInfo() {
-    return info;
+    return this.info;
   }
 
-  public void setInfo(Map<String, Object> info) {
+  public void setInfo(final Map<String, Object> info) {
     this.info = info;
   }
 
   public Map<String, Object> getContext() {
-    return context;
+    return this.context;
   }
 
-  public void setContext(Map<String, Object> context) {
+  public void setContext(final Map<String, Object> context) {
     this.context = context;
   }
 
   public boolean isResetOnTrigger() {
-    return resetOnTrigger;
+    return this.resetOnTrigger;
   }
 
-  public void setResetOnTrigger(boolean resetOnTrigger) {
+  public void setResetOnTrigger(final boolean resetOnTrigger) {
     this.resetOnTrigger = resetOnTrigger;
   }
 
   public boolean isResetOnExpire() {
-    return resetOnExpire;
+    return this.resetOnExpire;
   }
 
-  public void setResetOnExpire(boolean resetOnExpire) {
+  public void setResetOnExpire(final boolean resetOnExpire) {
     this.resetOnExpire = resetOnExpire;
   }
 
   public long getLastModifyTime() {
-    return lastModifyTime;
+    return this.lastModifyTime;
   }
 
-  public void setLastModifyTime(long lastModifyTime) {
+  public void setLastModifyTime(final long lastModifyTime) {
     this.lastModifyTime = lastModifyTime;
   }
 
   public int getTriggerId() {
-    return triggerId;
+    return this.triggerId;
   }
 
-  public void setTriggerId(int id) {
+  public void setTriggerId(final int id) {
     this.triggerId = id;
   }
 
   public boolean triggerConditionMet() {
-    return triggerCondition.isMet();
+    return this.triggerCondition.isMet();
   }
 
   public boolean expireConditionMet() {
-    return expireCondition.isMet();
+    return this.expireCondition.isMet();
   }
 
   public void resetTriggerConditions() {
-    triggerCondition.resetCheckers();
+    this.triggerCondition.resetCheckers();
     updateNextCheckTime();
   }
 
   public void resetExpireCondition() {
-    expireCondition.resetCheckers();
+    this.expireCondition.resetCheckers();
     updateNextCheckTime();
   }
 
   public List<TriggerAction> getTriggerActions() {
-    return actions;
+    return this.actions;
   }
 
   public Map<String, Object> toJson() {
-    Map<String, Object> jsonObj = new HashMap<>();
-    jsonObj.put("triggerCondition", triggerCondition.toJson());
-    jsonObj.put("expireCondition", expireCondition.toJson());
-    List<Object> actionsJson = new ArrayList<>();
-    for (TriggerAction action : actions) {
-      Map<String, Object> oneActionJson = new HashMap<>();
+    final Map<String, Object> jsonObj = new HashMap<>();
+    jsonObj.put("triggerCondition", this.triggerCondition.toJson());
+    jsonObj.put("expireCondition", this.expireCondition.toJson());
+    final List<Object> actionsJson = new ArrayList<>();
+    for (final TriggerAction action : this.actions) {
+      final Map<String, Object> oneActionJson = new HashMap<>();
       oneActionJson.put("type", action.getType());
       oneActionJson.put("actionJson", action.toJson());
       actionsJson.add(oneActionJson);
     }
     jsonObj.put("actions", actionsJson);
-    List<Object> expireActionsJson = new ArrayList<>();
-    for (TriggerAction expireAction : expireActions) {
-      Map<String, Object> oneExpireActionJson = new HashMap<>();
+    final List<Object> expireActionsJson = new ArrayList<>();
+    for (final TriggerAction expireAction : this.expireActions) {
+      final Map<String, Object> oneExpireActionJson = new HashMap<>();
       oneExpireActionJson.put("type", expireAction.getType());
       oneExpireActionJson.put("actionJson", expireAction.toJson());
       expireActionsJson.add(oneExpireActionJson);
     }
     jsonObj.put("expireActions", expireActionsJson);
 
-    jsonObj.put("resetOnTrigger", String.valueOf(resetOnTrigger));
-    jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
-    jsonObj.put("submitUser", submitUser);
-    jsonObj.put("source", source);
-    jsonObj.put("submitTime", String.valueOf(submitTime));
-    jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime));
-    jsonObj.put("triggerId", String.valueOf(triggerId));
-    jsonObj.put("status", status.toString());
-    jsonObj.put("info", info);
-    jsonObj.put("context", context);
+    jsonObj.put("resetOnTrigger", String.valueOf(this.resetOnTrigger));
+    jsonObj.put("resetOnExpire", String.valueOf(this.resetOnExpire));
+    jsonObj.put("submitUser", this.submitUser);
+    jsonObj.put("source", this.source);
+    jsonObj.put("submitTime", String.valueOf(this.submitTime));
+    jsonObj.put("lastModifyTime", String.valueOf(this.lastModifyTime));
+    jsonObj.put("triggerId", String.valueOf(this.triggerId));
+    jsonObj.put("status", this.status.toString());
+    jsonObj.put("info", this.info);
+    jsonObj.put("context", this.context);
     return jsonObj;
   }
 
   public String getSource() {
-    return source;
+    return this.source;
   }
 
   public String getDescription() {
-    StringBuffer actionsString = new StringBuffer();
-    for (TriggerAction act : actions) {
+    final StringBuffer actionsString = new StringBuffer();
+    for (final TriggerAction act : this.actions) {
       actionsString.append(", ");
       actionsString.append(act.getDescription());
     }
     return "Trigger from " + getSource() + " with trigger condition of "
-        + triggerCondition.getExpression() + " and expire condition of "
-        + expireCondition.getExpression() + actionsString;
+        + this.triggerCondition.getExpression() + " and expire condition of "
+        + this.expireCondition.getExpression() + actionsString;
   }
 
   public void stopCheckers() {
-    for (ConditionChecker checker : triggerCondition.getCheckers().values()) {
+    for (final ConditionChecker checker : this.triggerCondition.getCheckers().values()) {
       checker.stopChecker();
     }
-    for (ConditionChecker checker : expireCondition.getCheckers().values()) {
+    for (final ConditionChecker checker : this.expireCondition.getCheckers().values()) {
       checker.stopChecker();
     }
   }
@@ -383,63 +376,63 @@ public class Trigger {
     private Map<String, Object> info = new HashMap<>();
     private Map<String, Object> context = new HashMap<>();
 
-    public TriggerBuilder(String submitUser,
-                          String source,
-                          Condition triggerCondition,
-                          Condition expireCondition,
-                          List<TriggerAction> actions) {
+    public TriggerBuilder(final String submitUser,
+                          final String source,
+                          final Condition triggerCondition,
+                          final Condition expireCondition,
+                          final List<TriggerAction> actions) {
       this.submitUser = submitUser;
       this.source = source;
       this.triggerCondition = triggerCondition;
       this.actions = actions;
       this.expireCondition = expireCondition;
-      long now = DateTime.now().getMillis();
+      final long now = DateTime.now().getMillis();
       this.submitTime = now;
       this.lastModifyTime = now;
     }
 
-    public TriggerBuilder setId(int id) {
+    public TriggerBuilder setId(final int id) {
       this.triggerId = id;
       return this;
     }
 
-    public TriggerBuilder setSubmitTime(long time) {
+    public TriggerBuilder setSubmitTime(final long time) {
       this.submitTime = time;
       return this;
     }
 
-    public TriggerBuilder setLastModifyTime(long time) {
+    public TriggerBuilder setLastModifyTime(final long time) {
       this.lastModifyTime = time;
       return this;
     }
 
-    public TriggerBuilder setExpireActions(List<TriggerAction> actions) {
+    public TriggerBuilder setExpireActions(final List<TriggerAction> actions) {
       this.expireActions = actions;
       return this;
     }
 
-    public TriggerBuilder setInfo(Map<String, Object> info) {
+    public TriggerBuilder setInfo(final Map<String, Object> info) {
       this.info = info;
       return this;
     }
 
-    public TriggerBuilder setContext(Map<String, Object> context) {
+    public TriggerBuilder setContext(final Map<String, Object> context) {
       this.context = context;
       return this;
     }
 
     public Trigger build() {
-      return new Trigger(triggerId,
-                        lastModifyTime,
-                        submitTime,
-                        submitUser,
-                        source,
-                        triggerCondition,
-                        expireCondition,
-                        actions,
-                        expireActions,
-                        info,
-                        context);
+      return new Trigger(this.triggerId,
+          this.lastModifyTime,
+          this.submitTime,
+          this.submitUser,
+          this.source,
+          this.triggerCondition,
+          this.expireCondition,
+          this.actions,
+          this.expireActions,
+          this.info,
+          this.context);
     }
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 2051117..19a0af6 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -16,7 +16,6 @@
 
 package azkaban.trigger;
 
-import azkaban.ServiceProvider;
 import azkaban.event.Event;
 import azkaban.event.Event.Type;
 import azkaban.event.EventHandler;
@@ -24,6 +23,7 @@ import azkaban.event.EventListener;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
 import azkaban.utils.Props;
+import com.google.inject.Inject;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -38,7 +38,6 @@ import org.apache.log4j.Logger;
 
 public class TriggerManager extends EventHandler implements
     TriggerManagerAdapter {
-
   public static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
   private static final Logger logger = Logger.getLogger(TriggerManager.class);
   private static final Map<Integer, Trigger> triggerIdMap =
@@ -55,32 +54,30 @@ public class TriggerManager extends EventHandler implements
   private long runnerThreadIdleTime = -1;
   private String scannerStage = "";
 
-  // TODO kunkun-tang: Before apply guice to this class, we should make
-  // ExecutorManager guiceable.
-  public TriggerManager(final Props props, final TriggerLoader triggerLoader,
-      final ExecutorManager executorManager) throws TriggerManagerException {
+  @Inject
+  public TriggerManager(Props props, TriggerLoader triggerLoader,
+      ExecutorManager executorManager) throws TriggerManagerException {
 
-    // TODO kunkun-tang: Doing hack here to allow calling new azkaban-db code. Should fix in future.
-    this.triggerLoader = ServiceProvider.SERVICE_PROVIDER.getInstance(TriggerLoader.class);
+    this.triggerLoader = triggerLoader;
 
-    final long scannerInterval =
+    long scannerInterval =
         props.getLong("trigger.scan.interval", DEFAULT_SCANNER_INTERVAL_MS);
-    this.runnerThread = new TriggerScannerThread(scannerInterval);
+    runnerThread = new TriggerScannerThread(scannerInterval);
 
-    this.checkerTypeLoader = new CheckerTypeLoader();
-    this.actionTypeLoader = new ActionTypeLoader();
+    checkerTypeLoader = new CheckerTypeLoader();
+    actionTypeLoader = new ActionTypeLoader();
 
     try {
-      this.checkerTypeLoader.init(props);
-      this.actionTypeLoader.init(props);
-    } catch (final Exception e) {
+      checkerTypeLoader.init(props);
+      actionTypeLoader.init(props);
+    } catch (Exception e) {
       throw new TriggerManagerException(e);
     }
 
-    Condition.setCheckerLoader(this.checkerTypeLoader);
-    Trigger.setActionTypeLoader(this.actionTypeLoader);
+    Condition.setCheckerLoader(checkerTypeLoader);
+    Trigger.setActionTypeLoader(actionTypeLoader);
 
-    executorManager.addListener(this.listener);
+    executorManager.addListener(listener);
 
     logger.info("TriggerManager loaded.");
   }
@@ -90,68 +87,68 @@ public class TriggerManager extends EventHandler implements
 
     try {
       // expect loader to return valid triggers
-      final List<Trigger> triggers = this.triggerLoader.loadTriggers();
-      for (final Trigger t : triggers) {
-        this.runnerThread.addTrigger(t);
+      List<Trigger> triggers = triggerLoader.loadTriggers();
+      for (Trigger t : triggers) {
+        runnerThread.addTrigger(t);
         triggerIdMap.put(t.getTriggerId(), t);
       }
-    } catch (final Exception e) {
+    } catch (Exception e) {
       logger.error(e);
       throw new TriggerManagerException(e);
     }
 
-    this.runnerThread.start();
+    runnerThread.start();
   }
 
   protected CheckerTypeLoader getCheckerLoader() {
-    return this.checkerTypeLoader;
+    return checkerTypeLoader;
   }
 
   protected ActionTypeLoader getActionLoader() {
-    return this.actionTypeLoader;
+    return actionTypeLoader;
   }
 
-  public void insertTrigger(final Trigger t) throws TriggerManagerException {
+  public void insertTrigger(Trigger t) throws TriggerManagerException {
     logger.info("Inserting trigger " + t + " in TriggerManager");
-    synchronized (this.syncObj) {
+    synchronized (syncObj) {
       try {
-        this.triggerLoader.addTrigger(t);
-      } catch (final TriggerLoaderException e) {
+        triggerLoader.addTrigger(t);
+      } catch (TriggerLoaderException e) {
         throw new TriggerManagerException(e);
       }
-      this.runnerThread.addTrigger(t);
+      runnerThread.addTrigger(t);
       triggerIdMap.put(t.getTriggerId(), t);
     }
   }
 
-  public void removeTrigger(final int id) throws TriggerManagerException {
+  public void removeTrigger(int id) throws TriggerManagerException {
     logger.info("Removing trigger with id: " + id + " from TriggerManager");
-    synchronized (this.syncObj) {
-      final Trigger t = triggerIdMap.get(id);
+    synchronized (syncObj) {
+      Trigger t = triggerIdMap.get(id);
       if (t != null) {
         removeTrigger(triggerIdMap.get(id));
       }
     }
   }
 
-  public void updateTrigger(final Trigger t) throws TriggerManagerException {
+  public void updateTrigger(Trigger t) throws TriggerManagerException {
     logger.info("Updating trigger " + t + " in TriggerManager");
-    synchronized (this.syncObj) {
-      this.runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
-      this.runnerThread.addTrigger(t);
+    synchronized (syncObj) {
+      runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
+      runnerThread.addTrigger(t);
       triggerIdMap.put(t.getTriggerId(), t);
     }
   }
 
-  public void removeTrigger(final Trigger t) throws TriggerManagerException {
+  public void removeTrigger(Trigger t) throws TriggerManagerException {
     logger.info("Removing trigger " + t + " from TriggerManager");
-    synchronized (this.syncObj) {
-      this.runnerThread.deleteTrigger(t);
+    synchronized (syncObj) {
+      runnerThread.deleteTrigger(t);
       triggerIdMap.remove(t.getTriggerId());
       try {
         t.stopCheckers();
-        this.triggerLoader.removeTrigger(t);
-      } catch (final TriggerLoaderException e) {
+        triggerLoader.removeTrigger(t);
+      } catch (TriggerLoaderException e) {
         throw new TriggerManagerException(e);
       }
     }
@@ -162,24 +159,24 @@ public class TriggerManager extends EventHandler implements
   }
 
   public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
-    return this.checkerTypeLoader.getSupportedCheckers();
+    return checkerTypeLoader.getSupportedCheckers();
   }
 
-  public Trigger getTrigger(final int triggerId) {
-    synchronized (this.syncObj) {
+  public Trigger getTrigger(int triggerId) {
+    synchronized (syncObj) {
       return triggerIdMap.get(triggerId);
     }
   }
 
-  public void expireTrigger(final int triggerId) {
-    final Trigger t = getTrigger(triggerId);
+  public void expireTrigger(int triggerId) {
+    Trigger t = getTrigger(triggerId);
     t.setStatus(TriggerStatus.EXPIRED);
   }
 
   @Override
-  public List<Trigger> getTriggers(final String triggerSource) {
-    final List<Trigger> triggers = new ArrayList<>();
-    for (final Trigger t : triggerIdMap.values()) {
+  public List<Trigger> getTriggers(String triggerSource) {
+    List<Trigger> triggers = new ArrayList<>();
+    for (Trigger t : triggerIdMap.values()) {
       if (t.getSource().equals(triggerSource)) {
         triggers.add(t);
       }
@@ -188,10 +185,10 @@ public class TriggerManager extends EventHandler implements
   }
 
   @Override
-  public List<Trigger> getTriggerUpdates(final String triggerSource,
-      final long lastUpdateTime) throws TriggerManagerException {
-    final List<Trigger> triggers = new ArrayList<>();
-    for (final Trigger t : triggerIdMap.values()) {
+  public List<Trigger> getTriggerUpdates(String triggerSource,
+      long lastUpdateTime) throws TriggerManagerException {
+    List<Trigger> triggers = new ArrayList<>();
+    for (Trigger t : triggerIdMap.values()) {
       if (t.getSource().equals(triggerSource)
           && t.getLastModifyTime() > lastUpdateTime) {
         triggers.add(t);
@@ -201,10 +198,10 @@ public class TriggerManager extends EventHandler implements
   }
 
   @Override
-  public List<Trigger> getAllTriggerUpdates(final long lastUpdateTime)
+  public List<Trigger> getAllTriggerUpdates(long lastUpdateTime)
       throws TriggerManagerException {
-    final List<Trigger> triggers = new ArrayList<>();
-    for (final Trigger t : triggerIdMap.values()) {
+    List<Trigger> triggers = new ArrayList<>();
+    for (Trigger t : triggerIdMap.values()) {
       if (t.getLastModifyTime() > lastUpdateTime) {
         triggers.add(t);
       }
@@ -213,25 +210,25 @@ public class TriggerManager extends EventHandler implements
   }
 
   @Override
-  public void insertTrigger(final Trigger t, final String user)
+  public void insertTrigger(Trigger t, String user)
       throws TriggerManagerException {
     insertTrigger(t);
   }
 
   @Override
-  public void removeTrigger(final int id, final String user) throws TriggerManagerException {
+  public void removeTrigger(int id, String user) throws TriggerManagerException {
     removeTrigger(id);
   }
 
   @Override
-  public void updateTrigger(final Trigger t, final String user)
+  public void updateTrigger(Trigger t, String user)
       throws TriggerManagerException {
     updateTrigger(t);
   }
 
   @Override
   public void shutdown() {
-    this.runnerThread.shutdown();
+    runnerThread.shutdown();
   }
 
   @Override
@@ -240,89 +237,88 @@ public class TriggerManager extends EventHandler implements
   }
 
   @Override
-  public void registerCheckerType(final String name,
-      final Class<? extends ConditionChecker> checker) {
-    this.checkerTypeLoader.registerCheckerType(name, checker);
+  public void registerCheckerType(String name,
+      Class<? extends ConditionChecker> checker) {
+    checkerTypeLoader.registerCheckerType(name, checker);
   }
 
   @Override
-  public void registerActionType(final String name,
-      final Class<? extends TriggerAction> action) {
-    this.actionTypeLoader.registerActionType(name, action);
+  public void registerActionType(String name,
+      Class<? extends TriggerAction> action) {
+    actionTypeLoader.registerActionType(name, action);
   }
 
   private class TriggerScannerThread extends Thread {
-
     private final long scannerInterval;
     private final BlockingQueue<Trigger> triggers;
     private final Map<Integer, ExecutableFlow> justFinishedFlows;
     private boolean shutdown = false;
 
-    public TriggerScannerThread(final long scannerInterval) {
-      this.triggers = new PriorityBlockingQueue<>(1, new TriggerComparator());
-      this.justFinishedFlows = new ConcurrentHashMap<>();
+    public TriggerScannerThread(long scannerInterval) {
+      triggers = new PriorityBlockingQueue<>(1, new TriggerComparator());
+      justFinishedFlows = new ConcurrentHashMap<>();
       this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
       this.scannerInterval = scannerInterval;
     }
 
     public void shutdown() {
       logger.error("Shutting down trigger manager thread " + this.getName());
-      this.shutdown = true;
+      shutdown = true;
       this.interrupt();
     }
 
-    public void addJustFinishedFlow(final ExecutableFlow flow) {
-      synchronized (TriggerManager.this.syncObj) {
-        this.justFinishedFlows.put(flow.getExecutionId(), flow);
+    public void addJustFinishedFlow(ExecutableFlow flow) {
+      synchronized (syncObj) {
+        justFinishedFlows.put(flow.getExecutionId(), flow);
       }
     }
 
-    public void addTrigger(final Trigger t) {
-      synchronized (TriggerManager.this.syncObj) {
+    public void addTrigger(Trigger t) {
+      synchronized (syncObj) {
         t.updateNextCheckTime();
-        this.triggers.add(t);
+        triggers.add(t);
       }
     }
 
-    public void deleteTrigger(final Trigger t) {
-      this.triggers.remove(t);
+    public void deleteTrigger(Trigger t) {
+      triggers.remove(t);
     }
 
     @Override
     public void run() {
-      while (!this.shutdown) {
-        synchronized (TriggerManager.this.syncObj) {
+      while (!shutdown) {
+        synchronized (syncObj) {
           try {
-            TriggerManager.this.lastRunnerThreadCheckTime = System.currentTimeMillis();
+            lastRunnerThreadCheckTime = System.currentTimeMillis();
 
-            TriggerManager.this.scannerStage =
+            scannerStage =
                 "Ready to start a new scan cycle at "
-                    + TriggerManager.this.lastRunnerThreadCheckTime;
+                    + lastRunnerThreadCheckTime;
 
             try {
               checkAllTriggers();
-              this.justFinishedFlows.clear();
-            } catch (final Exception e) {
+              justFinishedFlows.clear();
+            } catch (Exception e) {
               e.printStackTrace();
               logger.error(e.getMessage());
-            } catch (final Throwable t) {
+            } catch (Throwable t) {
               t.printStackTrace();
               logger.error(t.getMessage());
             }
 
-            TriggerManager.this.scannerStage = "Done flipping all triggers.";
+            scannerStage = "Done flipping all triggers.";
 
-            TriggerManager.this.runnerThreadIdleTime =
-                this.scannerInterval
-                    - (System.currentTimeMillis() - TriggerManager.this.lastRunnerThreadCheckTime);
+            runnerThreadIdleTime =
+                scannerInterval
+                    - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
 
-            if (TriggerManager.this.runnerThreadIdleTime < 0) {
+            if (runnerThreadIdleTime < 0) {
               logger.error("Trigger manager thread " + this.getName()
                   + " is too busy!");
             } else {
-              TriggerManager.this.syncObj.wait(TriggerManager.this.runnerThreadIdleTime);
+              syncObj.wait(runnerThreadIdleTime);
             }
-          } catch (final InterruptedException e) {
+          } catch (InterruptedException e) {
             logger.info("Interrupted. Probably to shut down.");
           }
         }
@@ -331,15 +327,22 @@ public class TriggerManager extends EventHandler implements
 
     private void checkAllTriggers() throws TriggerManagerException {
       // sweep through the rest of them
-      for (final Trigger t : this.triggers) {
+      for (Trigger t : triggers) {
         try {
-          TriggerManager.this.scannerStage = "Checking for trigger " + t.getTriggerId();
+          scannerStage = "Checking for trigger " + t.getTriggerId();
 
           if (t.getStatus().equals(TriggerStatus.READY)) {
-            if (t.triggerConditionMet()) {
+
+            /**
+             * Prior to this change, expiration condition should never be called though
+             * we have some related code here. ExpireCondition used the same BasicTimeChecker
+             * as triggerCondition do. As a consequence, we need to figure out a way to distinguish
+             * the previous ExpireCondition and this commit's ExpireCondition.
+             */
+            if (t.getExpireCondition().getExpression().contains("EndTimeChecker") && t.expireConditionMet()) {
+              onTriggerPause(t);
+            } else if (t.triggerConditionMet()) {
               onTriggerTrigger(t);
-            } else if (t.expireConditionMet()) {
-              onTriggerExpire(t);
             }
           }
           if (t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
@@ -347,69 +350,64 @@ public class TriggerManager extends EventHandler implements
           } else {
             t.updateNextCheckTime();
           }
-        } catch (final Throwable th) {
+        } catch (Throwable th) {
           //skip this trigger, moving on to the next one
           logger.error("Failed to process trigger with id : " + t, th);
         }
       }
     }
 
-    private void onTriggerTrigger(final Trigger t) throws TriggerManagerException {
-      final List<TriggerAction> actions = t.getTriggerActions();
-      for (final TriggerAction action : actions) {
+    private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
+      List<TriggerAction> actions = t.getTriggerActions();
+      for (TriggerAction action : actions) {
         try {
           logger.info("Doing trigger actions " + action.getDescription() + " for " + t);
           action.doAction();
-        } catch (final Exception e) {
+        } catch (Exception e) {
           logger.error("Failed to do action " + action.getDescription() + " for " + t, e);
-        } catch (final Throwable th) {
+        } catch (Throwable th) {
           logger.error("Failed to do action " + action.getDescription() + " for " + t, th);
         }
       }
+
       if (t.isResetOnTrigger()) {
         t.resetTriggerConditions();
-        t.resetExpireCondition();
       } else {
         t.setStatus(TriggerStatus.EXPIRED);
       }
       try {
-        TriggerManager.this.triggerLoader.updateTrigger(t);
-      } catch (final TriggerLoaderException e) {
+        triggerLoader.updateTrigger(t);
+      } catch (TriggerLoaderException e) {
         throw new TriggerManagerException(e);
       }
     }
 
-    private void onTriggerExpire(final Trigger t) throws TriggerManagerException {
-      final List<TriggerAction> expireActions = t.getExpireActions();
-      for (final TriggerAction action : expireActions) {
+    private void onTriggerPause(Trigger t) throws TriggerManagerException {
+      List<TriggerAction> expireActions = t.getExpireActions();
+      for (TriggerAction action : expireActions) {
         try {
-          logger.info("Doing expire actions for " + action.getDescription() + " for " + t);
+          logger.info("Doing expire actions for "+ action.getDescription() + " for " + t);
           action.doAction();
-        } catch (final Exception e) {
+        } catch (Exception e) {
           logger.error("Failed to do expire action " + action.getDescription() + " for " + t, e);
-        } catch (final Throwable th) {
+        } catch (Throwable th) {
           logger.error("Failed to do expire action " + action.getDescription() + " for " + t, th);
         }
       }
-      if (t.isResetOnExpire()) {
-        t.resetTriggerConditions();
-        t.resetExpireCondition();
-      } else {
-        t.setStatus(TriggerStatus.EXPIRED);
-      }
+      logger.info("Pausing Trigger " + t.getDescription());
+      t.setStatus(TriggerStatus.PAUSED);
       try {
-        TriggerManager.this.triggerLoader.updateTrigger(t);
-      } catch (final TriggerLoaderException e) {
+        triggerLoader.updateTrigger(t);
+      } catch (TriggerLoaderException e) {
         throw new TriggerManagerException(e);
       }
     }
 
     private class TriggerComparator implements Comparator<Trigger> {
-
       @Override
-      public int compare(final Trigger arg0, final Trigger arg1) {
-        final long first = arg1.getNextCheckTime();
-        final long second = arg0.getNextCheckTime();
+      public int compare(Trigger arg0, Trigger arg1) {
+        long first = arg1.getNextCheckTime();
+        long second = arg0.getNextCheckTime();
 
         if (first == second) {
           return 0;
@@ -425,12 +423,12 @@ public class TriggerManager extends EventHandler implements
 
     @Override
     public long getLastRunnerThreadCheckTime() {
-      return TriggerManager.this.lastRunnerThreadCheckTime;
+      return lastRunnerThreadCheckTime;
     }
 
     @Override
     public boolean isRunnerThreadActive() {
-      return TriggerManager.this.runnerThread.isAlive();
+      return runnerThread.isAlive();
     }
 
     @Override
@@ -445,8 +443,8 @@ public class TriggerManager extends EventHandler implements
 
     @Override
     public String getTriggerSources() {
-      final Set<String> sources = new HashSet<>();
-      for (final Trigger t : triggerIdMap.values()) {
+      Set<String> sources = new HashSet<>();
+      for (Trigger t : triggerIdMap.values()) {
         sources.add(t.getSource());
       }
       return sources.toString();
@@ -459,7 +457,7 @@ public class TriggerManager extends EventHandler implements
 
     @Override
     public long getScannerIdleTime() {
-      return TriggerManager.this.runnerThreadIdleTime;
+      return runnerThreadIdleTime;
     }
 
     @Override
@@ -469,24 +467,23 @@ public class TriggerManager extends EventHandler implements
 
     @Override
     public String getScannerThreadStage() {
-      return TriggerManager.this.scannerStage;
+      return scannerStage;
     }
 
   }
 
   private class ExecutorManagerEventListener implements EventListener {
-
     public ExecutorManagerEventListener() {
     }
 
     @Override
-    public void handleEvent(final Event event) {
+    public void handleEvent(Event event) {
       // this needs to be fixed for perf
-      synchronized (TriggerManager.this.syncObj) {
-        final ExecutableFlow flow = (ExecutableFlow) event.getRunner();
+      synchronized (syncObj) {
+        ExecutableFlow flow = (ExecutableFlow) event.getRunner();
         if (event.getType() == Type.FLOW_FINISHED) {
           logger.info("Flow finish event received. " + flow.getExecutionId());
-          TriggerManager.this.runnerThread.addJustFinishedFlow(flow);
+          runnerThread.addJustFinishedFlow(flow);
         }
       }
     }
diff --git a/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java b/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
index dec5fe5..50122a4 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
@@ -80,31 +80,25 @@ public class ThresholdChecker implements ConditionChecker {
 
   @Override
   public Object getNum() {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public Object toJson() {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public void stopChecker() {
     return;
-
   }
 
   @Override
   public void setContext(final Map<String, Object> context) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public long getNextCheckTime() {
-    // TODO Auto-generated method stub
     return 0;
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
index b3940bf..fd3958f 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
@@ -17,140 +17,185 @@
 package azkaban.trigger;
 
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 
+import azkaban.executor.ExecutorManager;
+import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.utils.Props;
+import azkaban.utils.Utils;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.joda.time.DateTimeZone;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TriggerManagerTest {
 
-  private TriggerLoader triggerLoader;
+  private static TriggerLoader triggerLoader;
+  private static ExecutorManager executorManager;
+  private TriggerManager triggerManager;
+
+  @BeforeClass
+  public static void prepare() {
+    triggerLoader = new MockTriggerLoader();
+    executorManager = mock(ExecutorManager.class);
+    doNothing().when(executorManager).addListener(anyObject());
+  }
 
   @Before
   public void setup() throws TriggerException, TriggerManagerException {
-    this.triggerLoader = new MockTriggerLoader();
-
+    final Props props = new Props();
+    props.put("trigger.scan.interval", 300);
+    this.triggerManager = new TriggerManager(props, triggerLoader, executorManager);
+    this.triggerManager.registerCheckerType(ThresholdChecker.type,
+        ThresholdChecker.class);
+    this.triggerManager.registerActionType(DummyTriggerAction.type,
+        DummyTriggerAction.class);
+    this.triggerManager.start();
   }
 
   @After
   public void tearDown() {
-
+    this.triggerManager.shutdown();
   }
 
-  @Ignore
   @Test
-  public void triggerManagerSimpleTest() throws TriggerManagerException {
-    final Props props = new Props();
-    props.put("trigger.scan.interval", 4000);
-    final TriggerManager triggerManager =
-        new TriggerManager(props, this.triggerLoader, null);
-
-    triggerManager.registerCheckerType(ThresholdChecker.type,
-        ThresholdChecker.class);
-    triggerManager.registerActionType(DummyTriggerAction.type,
-        DummyTriggerAction.class);
-
-    ThresholdChecker.setVal(1);
+  public void neverExpireTriggerTest() throws TriggerManagerException {
 
-    triggerManager.insertTrigger(
-        createDummyTrigger("test1", "triggerLoader", 10), "testUser");
-    List<Trigger> triggers = triggerManager.getTriggers();
-    assertTrue(triggers.size() == 1);
-    final Trigger t1 = triggers.get(0);
+    final Trigger t1 = createNeverExpireTrigger("triggerLoader", 10);
+    this.triggerManager.insertTrigger(t1);
     t1.setResetOnTrigger(false);
-    triggerManager.updateTrigger(t1, "testUser");
-    final ThresholdChecker checker1 =
+    final ThresholdChecker triggerChecker =
         (ThresholdChecker) t1.getTriggerCondition().getCheckers().values()
             .toArray()[0];
-    assertTrue(t1.getSource().equals("triggerLoader"));
 
-    final Trigger t2 =
-        createDummyTrigger("test2: add new trigger", "addNewTriggerTest", 20);
-    triggerManager.insertTrigger(t2, "testUser");
-    final ThresholdChecker checker2 =
-        (ThresholdChecker) t2.getTriggerCondition().getCheckers().values()
+    final BasicTimeChecker expireChecker =
+        (BasicTimeChecker) t1.getExpireCondition().getCheckers().values()
             .toArray()[0];
 
     ThresholdChecker.setVal(15);
-    try {
-      Thread.sleep(2000);
-    } catch (final InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
+    sleep(300);
+    sleep(300);
+    assertTrue(triggerChecker.isCheckerMet() == true);
+    assertTrue(expireChecker.eval() == false);
 
-    assertTrue(checker1.isCheckerMet() == false);
-    assertTrue(checker2.isCheckerMet() == false);
-    assertTrue(checker1.isCheckerReset() == false);
-    assertTrue(checker2.isCheckerReset() == false);
+    ThresholdChecker.setVal(25);
+    sleep(300);
+    assertTrue(triggerChecker.isCheckerMet() == true);
+    assertTrue(expireChecker.eval() == false);
+  }
 
-    try {
-      Thread.sleep(2000);
-    } catch (final InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
 
-    assertTrue(checker1.isCheckerMet() == true);
-    assertTrue(checker2.isCheckerMet() == false);
-    assertTrue(checker1.isCheckerReset() == false);
-    assertTrue(checker2.isCheckerReset() == false);
+  @Test
+  public void timeCheckerAndExpireTriggerTest() throws TriggerManagerException {
+
+    final long curr = System.currentTimeMillis();
+    final Trigger t1 = createPeriodAndEndCheckerTrigger(curr);
+    this.triggerManager.insertTrigger(t1);
+    t1.setResetOnTrigger(true);
+    final BasicTimeChecker expireChecker =
+        (BasicTimeChecker) t1.getExpireCondition().getCheckers().values()
+            .toArray()[0];
+
+    sleep(1000);
 
-    ThresholdChecker.setVal(25);
+    assertTrue(expireChecker.eval() == false);
+    assertTrue(t1.getStatus() == TriggerStatus.READY);
+
+    sleep(1000);
+    sleep(1000);
+    sleep(1000);
+    assertTrue(expireChecker.eval() == true);
+    assertTrue(t1.getStatus() == TriggerStatus.PAUSED);
+
+    sleep(1000);
+    assertTrue(expireChecker.eval() == true);
+    assertTrue(t1.getStatus() == TriggerStatus.PAUSED);
+  }
+
+  private void sleep (final long millis) {
     try {
-      Thread.sleep(4000);
+      Thread.sleep(millis);
     } catch (final InterruptedException e) {
-      // TODO Auto-generated catch block
       e.printStackTrace();
     }
+  }
+
+  private Trigger createNeverExpireTrigger(final String source, final int threshold) {
+    final Map<String, ConditionChecker> triggerCheckers = new HashMap<>();
+    final Map<String, ConditionChecker> expireCheckers = new HashMap<>();
+    final ConditionChecker triggerChecker = new ThresholdChecker(ThresholdChecker.type, threshold);
+    final ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeCheck_1", 111L,
+        DateTimeZone.UTC, 2536871155000L,false, false,
+        null, null);
+    triggerCheckers.put(triggerChecker.getId(), triggerChecker);
+    expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
 
-    assertTrue(checker1.isCheckerMet() == true);
-    assertTrue(checker1.isCheckerReset() == false);
-    assertTrue(checker2.isCheckerReset() == true);
+    final String triggerExpr = triggerChecker.getId() + ".eval()";
+    final String expireExpr = endTimeChecker.getId() + ".eval()";
 
-    triggers = triggerManager.getTriggers();
-    assertTrue(triggers.size() == 1);
+    final Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+    final Condition expireCond = new Condition(expireCheckers, expireExpr);
 
+    final Trigger fakeTrigger = new Trigger.TriggerBuilder("azkaban",
+        source,
+        triggerCond,
+        expireCond,
+        getTriggerActions()).build();
+
+    fakeTrigger.setResetOnTrigger(false);
+    fakeTrigger.setResetOnExpire(true);
+    return fakeTrigger;
   }
 
-  private Trigger createDummyTrigger(final String message, final String source,
-      final int threshold) {
+  private Trigger createPeriodAndEndCheckerTrigger(final long currMillis) {
+    final Map<String, ConditionChecker> triggerCheckers = new HashMap<>();
+    final Map<String, ConditionChecker> expireCheckers = new HashMap<>();
 
-    final Map<String, ConditionChecker> checkers =
-        new HashMap<>();
-    final ConditionChecker checker =
-        new ThresholdChecker(ThresholdChecker.type, threshold);
-    checkers.put(checker.getId(), checker);
+    // TODO kunkun-tang: 1 second is the minimum unit for {@link org.joda.time.ReadablePeriod}.
+    // In future, we should use some smaller alternative.
+    final ConditionChecker triggerChecker = new BasicTimeChecker("BasicTimeChecker_1",
+        currMillis, DateTimeZone.UTC, true, true,
+        Utils.parsePeriodString("1s"), null);
 
-    final List<TriggerAction> actions = new ArrayList<>();
-    final TriggerAction act = new DummyTriggerAction(message);
-    actions.add(act);
+    // End time is 3 seconds past now.
+    final ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeChecker_1", 111L,
+        DateTimeZone.UTC, currMillis + 3000L,false, false,
+        null, null);
+    triggerCheckers.put(triggerChecker.getId(), triggerChecker);
+    expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
 
-    final String expr = checker.getId() + ".eval()";
+    final String triggerExpr = triggerChecker.getId() + ".eval()";
+    final String expireExpr = endTimeChecker.getId() + ".eval()";
 
-    final Condition triggerCond = new Condition(checkers, expr);
-    final Condition expireCond = new Condition(checkers, expr);
+    final Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+    final Condition expireCond = new Condition(expireCheckers, expireExpr);
 
-    final Trigger fakeTrigger = new Trigger.TriggerBuilder("azkaban",
-        source,
+    final Trigger timeTrigger = new Trigger.TriggerBuilder("azkaban",
+        "",
         triggerCond,
         expireCond,
-        actions).build();
+        getTriggerActions()).build();
 
-    fakeTrigger.setResetOnTrigger(true);
-    fakeTrigger.setResetOnExpire(true);
+    timeTrigger.setResetOnTrigger(false);
+    timeTrigger.setResetOnExpire(true);
+    return timeTrigger;
+  }
 
-    return fakeTrigger;
+  private List<TriggerAction> getTriggerActions() {
+    final List<TriggerAction> actions = new ArrayList<>();
+    final TriggerAction act = new DummyTriggerAction("");
+    actions.add(act);
+    return actions;
   }
 
   public static class MockTriggerLoader implements TriggerLoader {
-
     private final Map<Integer, Trigger> triggers = new HashMap<>();
     private int idIndex = 0;
 
@@ -188,22 +233,5 @@ public class TriggerManagerTest {
       // TODO Auto-generated method stub
       return null;
     }
-
   }
-
-  // public class MockCheckerLoader extends CheckerTypeLoader{
-  //
-  // @Override
-  // public void init(Props props) {
-  // checkerToClass.put(ThresholdChecker.type, ThresholdChecker.class);
-  // }
-  // }
-  //
-  // public class MockActionLoader extends ActionTypeLoader {
-  // @Override
-  // public void init(Props props) {
-  // actionToClass.put(DummyTriggerAction.type, DummyTriggerAction.class);
-  // }
-  // }
-
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index c6158b2..0b3b4fd 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,6 +16,31 @@
 
 package azkaban.webapp.servlet;
 
+import azkaban.Constants;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.flow.Node;
+import azkaban.project.Project;
+import azkaban.project.ProjectLogEvent.EventType;
+import azkaban.project.ProjectManager;
+import azkaban.scheduler.Schedule;
+import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
+import azkaban.server.HttpRequestUtils;
+import azkaban.server.session.Session;
+import azkaban.sla.SlaOption;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.SplitterOutputStream;
+import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.SchedulerStatistics;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -29,12 +54,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
@@ -44,31 +67,6 @@ import org.joda.time.Minutes;
 import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.executor.ExecutorManagerException;
-import azkaban.flow.Flow;
-import azkaban.flow.Node;
-import azkaban.project.Project;
-import azkaban.project.ProjectLogEvent.EventType;
-import azkaban.project.ProjectManager;
-import azkaban.scheduler.Schedule;
-import azkaban.scheduler.ScheduleManager;
-import azkaban.scheduler.ScheduleManagerException;
-import azkaban.server.session.Session;
-import azkaban.server.HttpRequestUtils;
-import azkaban.sla.SlaOption;
-import azkaban.user.Permission;
-import azkaban.user.Permission.Type;
-import azkaban.user.User;
-import azkaban.user.UserManager;
-import azkaban.utils.JSONUtils;
-import azkaban.utils.SplitterOutputStream;
-import azkaban.utils.Utils;
-import azkaban.webapp.AzkabanWebServer;
-import azkaban.webapp.SchedulerStatistics;
-
 public class ScheduleServlet extends LoginAbstractAzkabanServlet {
   private static final long serialVersionUID = 1L;
   private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
@@ -77,17 +75,17 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
   private UserManager userManager;
 
   @Override
-  public void init(ServletConfig config) throws ServletException {
+  public void init(final ServletConfig config) throws ServletException {
     super.init(config);
-    AzkabanWebServer server = (AzkabanWebServer) getApplication();
-    userManager = server.getUserManager();
-    projectManager = server.getProjectManager();
-    scheduleManager = server.getScheduleManager();
+    final AzkabanWebServer server = (AzkabanWebServer) getApplication();
+    this.userManager = server.getUserManager();
+    this.projectManager = server.getProjectManager();
+    this.scheduleManager = server.getScheduleManager();
   }
 
   @Override
-  protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
-      Session session) throws ServletException, IOException {
+  protected void handleGet(final HttpServletRequest req, final HttpServletResponse resp,
+                           final Session session) throws ServletException, IOException {
     if (hasParam(req, "ajax")) {
       handleAJAXAction(req, resp, session);
     } else if (hasParam(req, "calendar")) {
@@ -97,11 +95,11 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
-  private void handleAJAXAction(HttpServletRequest req,
-      HttpServletResponse resp, Session session) throws ServletException,
+  private void handleAJAXAction(final HttpServletRequest req,
+                                final HttpServletResponse resp, final Session session) throws ServletException,
       IOException {
-    HashMap<String, Object> ret = new HashMap<String, Object>();
-    String ajaxName = getParam(req, "ajax");
+    HashMap<String, Object> ret = new HashMap<>();
+    final String ajaxName = getParam(req, "ajax");
 
     if (ajaxName.equals("slaInfo")) {
       ajaxSlaInfo(req, ret, session.getUser());
@@ -125,11 +123,11 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
-  private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret,
-      User user) {
+  private void ajaxSetSla(final HttpServletRequest req, final HashMap<String, Object> ret,
+                          final User user) {
     try {
-      int scheduleId = getIntParam(req, "scheduleId");
-      Schedule sched = scheduleManager.getSchedule(scheduleId);
+      final int scheduleId = getIntParam(req, "scheduleId");
+      final Schedule sched = this.scheduleManager.getSchedule(scheduleId);
       if (sched == null) {
         ret.put("error",
                 "Error loading schedule. Schedule " + scheduleId
@@ -137,25 +135,25 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
         return;
       }
 
-      Project project = projectManager.getProject(sched.getProjectId());
+      final Project project = this.projectManager.getProject(sched.getProjectId());
       if (!hasPermission(project, user, Permission.Type.SCHEDULE)) {
         ret.put("error", "User " + user
             + " does not have permission to set SLA for this flow.");
         return;
       }
 
-      String emailStr = getParam(req, "slaEmails");
-      String[] emailSplit = emailStr.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-      List<String> slaEmails = Arrays.asList(emailSplit);
+      final String emailStr = getParam(req, "slaEmails");
+      final String[] emailSplit = emailStr.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+      final List<String> slaEmails = Arrays.asList(emailSplit);
 
-      Map<String, String> settings = getParamGroup(req, "settings");
+      final Map<String, String> settings = getParamGroup(req, "settings");
 
-      List<SlaOption> slaOptions = new ArrayList<SlaOption>();
-      for (String set : settings.keySet()) {
-        SlaOption sla;
+      final List<SlaOption> slaOptions = new ArrayList<>();
+      for (final String set : settings.keySet()) {
+        final SlaOption sla;
         try {
           sla = parseSlaSetting(settings.get(set));
-        } catch (Exception e) {
+        } catch (final Exception e) {
           throw new ServletException(e);
         }
         if (sla != null) {
@@ -166,35 +164,35 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       }
 
       sched.setSlaOptions(slaOptions);
-      scheduleManager.insertSchedule(sched);
+      this.scheduleManager.insertSchedule(sched);
 
       if (slaOptions != null) {
-        projectManager.postProjectEvent(project, EventType.SLA,
+        this.projectManager.postProjectEvent(project, EventType.SLA,
             user.getUserId(), "SLA for flow " + sched.getFlowName()
                 + " has been added/changed.");
       }
 
-    } catch (ServletException e) {
+    } catch (final ServletException e) {
       ret.put("error", e.getMessage());
-    } catch (ScheduleManagerException e) {
+    } catch (final ScheduleManagerException e) {
       logger.error(e.getMessage(), e);
       ret.put("error", e.getMessage());
     }
 
   }
 
-  private SlaOption parseSlaSetting(String set) throws ScheduleManagerException {
+  private SlaOption parseSlaSetting(final String set) throws ScheduleManagerException {
     logger.info("Tryint to set sla with the following set: " + set);
 
-    String slaType;
-    List<String> slaActions = new ArrayList<String>();
-    Map<String, Object> slaInfo = new HashMap<String, Object>();
-    String[] parts = set.split(",", -1);
-    String id = parts[0];
-    String rule = parts[1];
-    String duration = parts[2];
-    String emailAction = parts[3];
-    String killAction = parts[4];
+    final String slaType;
+    final List<String> slaActions = new ArrayList<>();
+    final Map<String, Object> slaInfo = new HashMap<>();
+    final String[] parts = set.split(",", -1);
+    final String id = parts[0];
+    final String rule = parts[1];
+    final String duration = parts[2];
+    final String emailAction = parts[3];
+    final String killAction = parts[4];
     if (emailAction.equals("true") || killAction.equals("true")) {
       if (emailAction.equals("true")) {
         slaActions.add(SlaOption.ACTION_ALERT);
@@ -218,16 +216,16 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
         }
       }
 
-      ReadablePeriod dur;
+      final ReadablePeriod dur;
       try {
         dur = parseDuration(duration);
-      } catch (Exception e) {
+      } catch (final Exception e) {
         throw new ScheduleManagerException(
             "Unable to parse duration for a SLA that needs to take actions!", e);
       }
 
       slaInfo.put(SlaOption.INFO_DURATION, Utils.createPeriodString(dur));
-      SlaOption r = new SlaOption(slaType, slaActions, slaInfo);
+      final SlaOption r = new SlaOption(slaType, slaActions, slaInfo);
       logger.info("Parsing sla as id:" + id + " type:" + slaType + " rule:"
           + rule + " Duration:" + duration + " actions:" + slaActions);
       return r;
@@ -235,22 +233,22 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     return null;
   }
 
-  private ReadablePeriod parseDuration(String duration) {
-    int hour = Integer.parseInt(duration.split(":")[0]);
-    int min = Integer.parseInt(duration.split(":")[1]);
+  private ReadablePeriod parseDuration(final String duration) {
+    final int hour = Integer.parseInt(duration.split(":")[0]);
+    final int min = Integer.parseInt(duration.split(":")[1]);
     return Minutes.minutes(min + hour * 60).toPeriod();
   }
 
-  private void ajaxFetchSchedule(HttpServletRequest req,
-      HashMap<String, Object> ret, User user) throws ServletException {
+  private void ajaxFetchSchedule(final HttpServletRequest req,
+                                 final HashMap<String, Object> ret, final User user) throws ServletException {
 
-    int projectId = getIntParam(req, "projectId");
-    String flowId = getParam(req, "flowId");
+    final int projectId = getIntParam(req, "projectId");
+    final String flowId = getParam(req, "flowId");
     try {
-      Schedule schedule = scheduleManager.getSchedule(projectId, flowId);
+      final Schedule schedule = this.scheduleManager.getSchedule(projectId, flowId);
 
       if (schedule != null) {
-        Map<String, Object> jsonObj = new HashMap<String, Object>();
+        final Map<String, Object> jsonObj = new HashMap<>();
         jsonObj.put("scheduleId", Integer.toString(schedule.getScheduleId()));
         jsonObj.put("submitUser", schedule.getSubmitUser());
         jsonObj.put("firstSchedTime",
@@ -262,18 +260,18 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
         jsonObj.put("executionOptions", schedule.getExecutionOptions());
         ret.put("schedule", jsonObj);
       }
-    } catch (ScheduleManagerException e) {
+    } catch (final ScheduleManagerException e) {
       logger.error(e.getMessage(), e);
       ret.put("error", e);
     }
   }
 
-  private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret,
-      User user) {
-    int scheduleId;
+  private void ajaxSlaInfo(final HttpServletRequest req, final HashMap<String, Object> ret,
+                           final User user) {
+    final int scheduleId;
     try {
       scheduleId = getIntParam(req, "scheduleId");
-      Schedule sched = scheduleManager.getSchedule(scheduleId);
+      final Schedule sched = this.scheduleManager.getSchedule(scheduleId);
       if (sched == null) {
         ret.put("error",
                 "Error loading schedule. Schedule " + scheduleId
@@ -281,7 +279,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
         return;
       }
 
-      Project project =
+      final Project project =
           getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
       if (project == null) {
         ret.put("error",
@@ -290,58 +288,58 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
         return;
       }
 
-      Flow flow = project.getFlow(sched.getFlowName());
+      final Flow flow = project.getFlow(sched.getFlowName());
       if (flow == null) {
         ret.put("error", "Error loading flow. Flow " + sched.getFlowName()
             + " doesn't exist in " + sched.getProjectId());
         return;
       }
 
-      List<SlaOption> slaOptions = sched.getSlaOptions();
-      ExecutionOptions flowOptions = sched.getExecutionOptions();
+      final List<SlaOption> slaOptions = sched.getSlaOptions();
+      final ExecutionOptions flowOptions = sched.getExecutionOptions();
 
       if (slaOptions != null && slaOptions.size() > 0) {
         ret.put("slaEmails",
             slaOptions.get(0).getInfo().get(SlaOption.INFO_EMAIL_LIST));
 
-        List<Object> setObj = new ArrayList<Object>();
-        for (SlaOption sla : slaOptions) {
+        final List<Object> setObj = new ArrayList<>();
+        for (final SlaOption sla : slaOptions) {
           setObj.add(sla.toWebObject());
         }
         ret.put("settings", setObj);
       } else if (flowOptions != null) {
         if (flowOptions.getFailureEmails() != null) {
-          List<String> emails = flowOptions.getFailureEmails();
+          final List<String> emails = flowOptions.getFailureEmails();
           if (emails.size() > 0) {
             ret.put("slaEmails", emails);
           }
         }
       } else {
         if (flow.getFailureEmails() != null) {
-          List<String> emails = flow.getFailureEmails();
+          final List<String> emails = flow.getFailureEmails();
           if (emails.size() > 0) {
             ret.put("slaEmails", emails);
           }
         }
       }
 
-      List<String> allJobs = new ArrayList<String>();
-      for (Node n : flow.getNodes()) {
+      final List<String> allJobs = new ArrayList<>();
+      for (final Node n : flow.getNodes()) {
         allJobs.add(n.getId());
       }
 
       ret.put("allJobNames", allJobs);
-    } catch (ServletException e) {
+    } catch (final ServletException e) {
       ret.put("error", e);
-    } catch (ScheduleManagerException e) {
+    } catch (final ScheduleManagerException e) {
       logger.error(e.getMessage(), e);
       ret.put("error", e);
     }
   }
 
-  protected Project getProjectAjaxByPermission(Map<String, Object> ret,
-      int projectId, User user, Permission.Type type) {
-    Project project = projectManager.getProject(projectId);
+  protected Project getProjectAjaxByPermission(final Map<String, Object> ret,
+                                               final int projectId, final User user, final Permission.Type type) {
+    final Project project = this.projectManager.getProject(projectId);
 
     if (project == null) {
       ret.put("error", "Project '" + project + "' not found.");
@@ -356,36 +354,36 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     return null;
   }
 
-  private void handleGetAllSchedules(HttpServletRequest req,
-      HttpServletResponse resp, Session session) throws ServletException,
+  private void handleGetAllSchedules(final HttpServletRequest req,
+                                     final HttpServletResponse resp, final Session session) throws ServletException,
       IOException {
 
-    Page page =
+    final Page page =
         newPage(req, resp, session,
             "azkaban/webapp/servlet/velocity/scheduledflowpage.vm");
 
-    List<Schedule> schedules;
+    final List<Schedule> schedules;
     try {
-      schedules = scheduleManager.getSchedules();
-    } catch (ScheduleManagerException e) {
+      schedules = this.scheduleManager.getSchedules();
+    } catch (final ScheduleManagerException e) {
       throw new ServletException(e);
     }
     page.add("schedules", schedules);
     page.render();
   }
 
-  private void handleGetScheduleCalendar(HttpServletRequest req,
-      HttpServletResponse resp, Session session) throws ServletException,
+  private void handleGetScheduleCalendar(final HttpServletRequest req,
+                                         final HttpServletResponse resp, final Session session) throws ServletException,
       IOException {
 
-    Page page =
+    final Page page =
         newPage(req, resp, session,
             "azkaban/webapp/servlet/velocity/scheduledflowcalendarpage.vm");
 
-    List<Schedule> schedules;
+    final List<Schedule> schedules;
     try {
-      schedules = scheduleManager.getSchedules();
-    } catch (ScheduleManagerException e) {
+      schedules = this.scheduleManager.getSchedules();
+    } catch (final ScheduleManagerException e) {
       throw new ServletException(e);
     }
     page.add("schedules", schedules);
@@ -393,14 +391,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
   }
 
   @Override
-  protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
-      Session session) throws ServletException, IOException {
+  protected void handlePost(final HttpServletRequest req, final HttpServletResponse resp,
+                            final Session session) throws ServletException, IOException {
     if (hasParam(req, "ajax")) {
       handleAJAXAction(req, resp, session);
     } else {
-      HashMap<String, Object> ret = new HashMap<String, Object>();
+      final HashMap<String, Object> ret = new HashMap<>();
       if (hasParam(req, "action")) {
-        String action = getParam(req, "action");
+        final String action = getParam(req, "action");
         if (action.equals("scheduleFlow")) {
           ajaxScheduleFlow(req, ret, session.getUser());
         } else if (action.equals("scheduleCronFlow")) {
@@ -419,44 +417,44 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
-  private void ajaxLoadFlows(HttpServletRequest req,
-      HashMap<String, Object> ret, User user) throws ServletException {
-    List<Schedule> schedules;
+  private void ajaxLoadFlows(final HttpServletRequest req,
+                             final HashMap<String, Object> ret, final User user) throws ServletException {
+    final List<Schedule> schedules;
     try {
-      schedules = scheduleManager.getSchedules();
-    } catch (ScheduleManagerException e) {
+      schedules = this.scheduleManager.getSchedules();
+    } catch (final ScheduleManagerException e) {
       throw new ServletException(e);
     }
     // See if anything is scheduled
     if (schedules.size() <= 0)
       return;
 
-    List<HashMap<String, Object>> output =
-        new ArrayList<HashMap<String, Object>>();
+    final List<HashMap<String, Object>> output =
+        new ArrayList<>();
     ret.put("items", output);
 
-    for (Schedule schedule : schedules) {
+    for (final Schedule schedule : schedules) {
       try {
         writeScheduleData(output, schedule);
-      } catch (ScheduleManagerException e) {
+      } catch (final ScheduleManagerException e) {
         throw new ServletException(e);
       }
     }
   }
 
-  private void writeScheduleData(List<HashMap<String, Object>> output,
-      Schedule schedule) throws ScheduleManagerException {
-    Map<String, Object> stats =
+  private void writeScheduleData(final List<HashMap<String, Object>> output,
+                                 final Schedule schedule) throws ScheduleManagerException {
+    final Map<String, Object> stats =
         SchedulerStatistics.getStatistics(schedule.getScheduleId(),
             (AzkabanWebServer) getApplication());
-    HashMap<String, Object> data = new HashMap<String, Object>();
+    final HashMap<String, Object> data = new HashMap<>();
     data.put("scheduleid", schedule.getScheduleId());
     data.put("flowname", schedule.getFlowName());
     data.put("projectname", schedule.getProjectName());
     data.put("time", schedule.getFirstSchedTime());
     data.put("cron", schedule.getCronExpression());
 
-    DateTime time = DateTime.now();
+    final DateTime time = DateTime.now();
     long period = 0;
     if (schedule.getPeriod() != null) {
       period = time.plus(schedule.getPeriod()).getMillis() - time.getMillis();
@@ -475,26 +473,26 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     output.add(data);
   }
 
-  private void ajaxLoadHistory(HttpServletRequest req,
-      HttpServletResponse resp, User user) throws ServletException, IOException {
+  private void ajaxLoadHistory(final HttpServletRequest req,
+                               final HttpServletResponse resp, final User user) throws ServletException, IOException {
     resp.setContentType(JSON_MIME_TYPE);
-    long today = DateTime.now().withTime(0, 0, 0, 0).getMillis();
+    final long today = DateTime.now().withTime(0, 0, 0, 0).getMillis();
     long startTime = getLongParam(req, "startTime");
-    DateTime start = new DateTime(startTime);
+    final DateTime start = new DateTime(startTime);
     // Ensure start time is 12:00 AM
     startTime = start.withTime(0, 0, 0, 0).getMillis();
     boolean useCache = false;
     if (startTime < today) {
       useCache = true;
     }
-    long endTime = startTime + 24 * 3600 * 1000;
-    int loadAll = getIntParam(req, "loadAll");
+    final long endTime = startTime + 24 * 3600 * 1000;
+    final int loadAll = getIntParam(req, "loadAll");
 
     // Cache file
-    String cacheDir =
+    final String cacheDir =
         getApplication().getServerProps().getString("cache.directory", "cache");
-    File cacheDirFile = new File(cacheDir, "schedule-history");
-    File cache = new File(cacheDirFile, startTime + ".cache");
+    final File cacheDirFile = new File(cacheDir, "schedule-history");
+    final File cache = new File(cacheDirFile, startTime + ".cache");
     cache.getParentFile().mkdirs();
 
     if (useCache) {
@@ -505,7 +503,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       }
       if (cacheExists) {
         // Send the cache instead
-        InputStream cacheInput =
+        final InputStream cacheInput =
             new BufferedInputStream(new FileInputStream(cache));
         try {
           IOUtils.copy(cacheInput, resp.getOutputStream());
@@ -519,20 +517,20 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     // Load data if not cached
     List<ExecutableFlow> history = null;
     try {
-      AzkabanWebServer server = (AzkabanWebServer) getApplication();
-      ExecutorManagerAdapter executorManager = server.getExecutorManager();
+      final AzkabanWebServer server = (AzkabanWebServer) getApplication();
+      final ExecutorManagerAdapter executorManager = server.getExecutorManager();
       history =
           executorManager.getExecutableFlows(null, null, null, 0, startTime,
               endTime, -1, -1);
-    } catch (ExecutorManagerException e) {
+    } catch (final ExecutorManagerException e) {
       logger.error(e.getMessage(), e);
     }
 
-    HashMap<String, Object> ret = new HashMap<String, Object>();
-    List<HashMap<String, Object>> output =
-        new ArrayList<HashMap<String, Object>>();
+    final HashMap<String, Object> ret = new HashMap<>();
+    final List<HashMap<String, Object>> output =
+        new ArrayList<>();
     ret.put("items", output);
-    for (ExecutableFlow historyItem : history) {
+    for (final ExecutableFlow historyItem : history) {
       // Check if it is an scheduled execution
       if (historyItem.getScheduleId() >= 0 || loadAll != 0) {
         writeHistoryData(output, historyItem);
@@ -548,12 +546,12 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     }
 
     // Create cache file
-    File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
+    final File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
     cacheTemp.createNewFile();
-    OutputStream cacheOutput =
+    final OutputStream cacheOutput =
         new BufferedOutputStream(new FileOutputStream(cacheTemp));
     try {
-      OutputStream outputStream =
+      final OutputStream outputStream =
           new SplitterOutputStream(cacheOutput, resp.getOutputStream());
       // Write to both the cache file and web output
       JSONUtils.toJSON(ret, outputStream, false);
@@ -566,12 +564,12 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     }
   }
 
-  private void writeHistoryData(List<HashMap<String, Object>> output,
-      ExecutableFlow history) {
-    HashMap<String, Object> data = new HashMap<String, Object>();
+  private void writeHistoryData(final List<HashMap<String, Object>> output,
+                                final ExecutableFlow history) {
+    final HashMap<String, Object> data = new HashMap<>();
 
     data.put("scheduleid", history.getScheduleId());
-    Project project = projectManager.getProject(history.getProjectId());
+    final Project project = this.projectManager.getProject(history.getProjectId());
     data.put("flowname", history.getFlowId());
     data.put("projectname", project.getName());
     data.put("time", history.getStartTime());
@@ -587,13 +585,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     output.add(data);
   }
 
-  private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret,
-      User user) throws ServletException {
-    int scheduleId = getIntParam(req, "scheduleId");
-    Schedule sched;
+  private void ajaxRemoveSched(final HttpServletRequest req, final Map<String, Object> ret,
+                               final User user) throws ServletException {
+    final int scheduleId = getIntParam(req, "scheduleId");
+    final Schedule sched;
     try {
-      sched = scheduleManager.getSchedule(scheduleId);
-    } catch (ScheduleManagerException e) {
+      sched = this.scheduleManager.getSchedule(scheduleId);
+    } catch (final ScheduleManagerException e) {
       throw new ServletException(e);
     }
     if (sched == null) {
@@ -602,7 +600,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       return;
     }
 
-    Project project = projectManager.getProject(sched.getProjectId());
+    final Project project = this.projectManager.getProject(sched.getProjectId());
 
     if (project == null) {
       ret.put("message", "Project " + sched.getProjectId() + " does not exist");
@@ -617,10 +615,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       return;
     }
 
-    scheduleManager.removeSchedule(sched);
+    this.scheduleManager.removeSchedule(sched);
     logger.info("User '" + user.getUserId() + " has removed schedule "
         + sched.getScheduleName());
-    projectManager
+    this.projectManager
         .postProjectEvent(project, EventType.SCHEDULE, user.getUserId(),
             "Schedule " + sched.toString() + " has been removed.");
 
@@ -630,13 +628,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     return;
   }
 
-  private void ajaxScheduleFlow(HttpServletRequest req,
-      HashMap<String, Object> ret, User user) throws ServletException {
-    String projectName = getParam(req, "projectName");
-    String flowName = getParam(req, "flow");
-    int projectId = getIntParam(req, "projectId");
+  private void ajaxScheduleFlow(final HttpServletRequest req,
+                                final HashMap<String, Object> ret, final User user) throws ServletException {
+    final String projectName = getParam(req, "projectName");
+    final String flowName = getParam(req, "flow");
+    final int projectId = getIntParam(req, "projectId");
 
-    Project project = projectManager.getProject(projectId);
+    final Project project = this.projectManager.getProject(projectId);
 
     if (project == null) {
       ret.put("message", "Project " + projectName + " does not exist");
@@ -650,7 +648,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       return;
     }
 
-    Flow flow = project.getFlow(flowName);
+    final Flow flow = project.getFlow(flowName);
     if (flow == null) {
       ret.put("status", "error");
       ret.put("message", "Flow " + flowName + " cannot be found in project "
@@ -658,46 +656,54 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       return;
     }
 
-    String scheduleTime = getParam(req, "scheduleTime");
-    String scheduleDate = getParam(req, "scheduleDate");
-    DateTime firstSchedTime;
+    final String scheduleTime = getParam(req, "scheduleTime");
+    final String scheduleDate = getParam(req, "scheduleDate");
+    final DateTime firstSchedTime;
     try {
       firstSchedTime = parseDateTime(scheduleDate, scheduleTime);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       ret.put("error", "Invalid date and/or time '" + scheduleDate + " "
           + scheduleTime);
       return;
     }
 
+    final long endSchedTime = getLongParam(req, "endSchedTime", Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME);
+    try {
+      // Todo kunkun-tang: Need to verify if passed end time is valid.
+    } catch (final Exception e) {
+      ret.put("error", "Invalid date and time: " + endSchedTime);
+      return;
+    }
+
     ReadablePeriod thePeriod = null;
     try {
       if (hasParam(req, "is_recurring")
           && getParam(req, "is_recurring").equals("on")) {
         thePeriod = Schedule.parsePeriodString(getParam(req, "period"));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       ret.put("error", e.getMessage());
     }
 
     ExecutionOptions flowOptions = null;
     try {
       flowOptions = HttpRequestUtils.parseFlowOptions(req);
-      HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
-    } catch (Exception e) {
+      HttpRequestUtils.filterAdminOnlyFlowParams(this.userManager, flowOptions, user);
+    } catch (final Exception e) {
       ret.put("error", e.getMessage());
     }
 
-    List<SlaOption> slaOptions = null;
+    final List<SlaOption> slaOptions = null;
 
-    Schedule schedule =
-        scheduleManager.scheduleFlow(-1, projectId, projectName, flowName,
-            "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(),
+    final Schedule schedule =
+        this.scheduleManager.scheduleFlow(-1, projectId, projectName, flowName,
+            "ready", firstSchedTime.getMillis(), endSchedTime, firstSchedTime.getZone(),
             thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(),
             firstSchedTime.getMillis(), user.getUserId(), flowOptions,
             slaOptions);
     logger.info("User '" + user.getUserId() + "' has scheduled " + "["
         + projectName + flowName + " (" + projectId + ")" + "].");
-    projectManager.postProjectEvent(project, EventType.SCHEDULE,
+    this.projectManager.postProjectEvent(project, EventType.SCHEDULE,
         user.getUserId(), "Schedule " + schedule.toString()
             + " has been added.");
 
@@ -711,19 +717,19 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
    * This method is in charge of doing cron scheduling.
    * @throws ServletException
    */
-  private void ajaxScheduleCronFlow(HttpServletRequest req,
-      HashMap<String, Object> ret, User user) throws ServletException {
-    String projectName = getParam(req, "projectName");
-    String flowName = getParam(req, "flow");
+  private void ajaxScheduleCronFlow(final HttpServletRequest req,
+                                    final HashMap<String, Object> ret, final User user) throws ServletException {
+    final String projectName = getParam(req, "projectName");
+    final String flowName = getParam(req, "flow");
 
-    Project project = projectManager.getProject(projectName);
+    final Project project = this.projectManager.getProject(projectName);
 
     if (project == null) {
       ret.put("message", "Project " + projectName + " does not exist");
       ret.put("status", "error");
       return;
     }
-    int projectId = project.getId();
+    final int projectId = project.getId();
 
     if (!hasPermission(project, user, Type.SCHEDULE)) {
       ret.put("status", "error");
@@ -731,7 +737,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       return;
     }
 
-    Flow flow = project.getFlow(flowName);
+    final Flow flow = project.getFlow(flowName);
     if (flow == null) {
       ret.put("status", "error");
       ret.put("message", "Flow " + flowName + " cannot be found in project "
@@ -739,8 +745,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       return;
     }
 
-    DateTimeZone timezone = DateTimeZone.getDefault();
-    DateTime firstSchedTime = getPresentTimeByTimezone(timezone);
+    final DateTimeZone timezone = DateTimeZone.getDefault();
+    final DateTime firstSchedTime = getPresentTimeByTimezone(timezone);
 
     String cronExpression = null;
     try {
@@ -755,30 +761,39 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       }
       if(cronExpression == null)
         throw new Exception("Cron expression must exist.");
-    } catch (Exception e) {
+    } catch (final Exception e) {
       ret.put("error", e.getMessage());
     }
 
+    final long endSchedTime = getLongParam(req, "endSchedTime", Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME);
+    try {
+      // Todo kunkun-tang: Need to verify if passed end time is valid.
+    } catch (final Exception e) {
+      ret.put("error", "Invalid date and time: " + endSchedTime);
+      return;
+    }
+
     ExecutionOptions flowOptions = null;
     try {
       flowOptions = HttpRequestUtils.parseFlowOptions(req);
-      HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
-    } catch (Exception e) {
+      HttpRequestUtils.filterAdminOnlyFlowParams(this.userManager, flowOptions, user);
+    } catch (final Exception e) {
       ret.put("error", e.getMessage());
     }
 
-    List<SlaOption> slaOptions = null;
+    final List<SlaOption> slaOptions = null;
 
     // Because either cronExpression or recurrence exists, we build schedule in the below way.
-    Schedule schedule = scheduleManager.cronScheduleFlow(-1, projectId, projectName, flowName,
-            "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(),
+    final Schedule schedule = this.scheduleManager
+        .cronScheduleFlow(-1, projectId, projectName, flowName,
+            "ready", firstSchedTime.getMillis(), endSchedTime, firstSchedTime.getZone(),
             DateTime.now().getMillis(), firstSchedTime.getMillis(),
             firstSchedTime.getMillis(), user.getUserId(), flowOptions,
             slaOptions, cronExpression);
 
     logger.info("User '" + user.getUserId() + "' has scheduled " + "["
         + projectName + flowName + " (" + projectId + ")" + "].");
-    projectManager.postProjectEvent(project, EventType.SCHEDULE,
+    this.projectManager.postProjectEvent(project, EventType.SCHEDULE,
         user.getUserId(), "Schedule " + schedule.toString()
             + " has been added.");
 
@@ -787,14 +802,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     ret.put("message", projectName + "." + flowName + " scheduled.");
   }
 
-  private DateTime parseDateTime(String scheduleDate, String scheduleTime) {
+  private DateTime parseDateTime(final String scheduleDate, final String scheduleTime) {
     // scheduleTime: 12,00,pm,PDT
-    String[] parts = scheduleTime.split(",", -1);
+    final String[] parts = scheduleTime.split(",", -1);
     int hour = Integer.parseInt(parts[0]);
-    int minutes = Integer.parseInt(parts[1]);
-    boolean isPm = parts[2].equalsIgnoreCase("pm");
+    final int minutes = Integer.parseInt(parts[1]);
+    final boolean isPm = parts[2].equalsIgnoreCase("pm");
 
-    DateTimeZone timezone =
+    final DateTimeZone timezone =
         parts[3].equals("UTC") ? DateTimeZone.UTC : DateTimeZone.getDefault();
 
     // scheduleDate: 02/10/2013
@@ -811,7 +826,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
     if (isPm)
       hour += 12;
 
-    DateTime firstSchedTime =
+    final DateTime firstSchedTime =
         day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
 
     return firstSchedTime;
@@ -821,14 +836,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
    * @param cronTimezone represents the timezone from remote API call
    * @return if the string is equal to UTC, we return UTC; otherwise, we always return default timezone.
    */
-  private DateTimeZone parseTimeZone(String cronTimezone) {
+  private DateTimeZone parseTimeZone(final String cronTimezone) {
     if(cronTimezone != null && cronTimezone.equals("UTC"))
       return DateTimeZone.UTC;
 
     return DateTimeZone.getDefault();
   }
 
-  private DateTime getPresentTimeByTimezone(DateTimeZone timezone) {
+  private DateTime getPresentTimeByTimezone(final DateTimeZone timezone) {
     return new DateTime(timezone);
   }
 }