azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 7c7fc2e..04bdc5f 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -44,6 +44,9 @@ public class Constants {
   // Max number of memory check retry
   public static final int MEMORY_CHECK_RETRY_LIMIT = 720;
 
+  // One Schedule's default End Time: 01/01/2050, 00:00:00, UTC
+  public static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
+
   public static class ConfigurationKeys {
     // These properties are configurable through azkaban.properties
 
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java b/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
index 27d3f30..7e189e3 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
@@ -46,6 +46,7 @@ public class Schedule {
   private String projectName;
   private String flowName;
   private long firstSchedTime;
+  private long endSchedTime;
   private DateTimeZone timezone;
   private long lastModifyTime;
   private ReadablePeriod period;
@@ -60,37 +61,28 @@ public class Schedule {
   private ExecutionOptions executionOptions;
   private List<SlaOption> slaOptions;
 
-  public Schedule(int scheduleId, int projectId, String projectName,
-      String flowName, String status, long firstSchedTime,
-      DateTimeZone timezone, ReadablePeriod period, long lastModifyTime,
-      long nextExecTime, long submitTime, String submitUser) {
-
-    this(scheduleId, projectId, projectName, flowName, status, firstSchedTime,
-        timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser,
-        null, null, null);
-  }
-
-  public Schedule(int scheduleId, int projectId, String projectName,
-      String flowName, String status, long firstSchedTime, String timezoneId,
-      String period, long lastModifyTime, long nextExecTime, long submitTime,
-      String submitUser, ExecutionOptions executionOptions,
-      List<SlaOption> slaOptions) {
-    this(scheduleId, projectId, projectName, flowName, status, firstSchedTime,
-        DateTimeZone.forID(timezoneId), parsePeriodString(period),
-        lastModifyTime, nextExecTime, submitTime, submitUser, executionOptions,
-        slaOptions, null);
-  }
-
-  public Schedule(int scheduleId, int projectId, String projectName,
-      String flowName, String status, long firstSchedTime,
-      DateTimeZone timezone, ReadablePeriod period, long lastModifyTime,
-      long nextExecTime, long submitTime, String submitUser,
-      ExecutionOptions executionOptions, List<SlaOption> slaOptions, String cronExpression) {
+  public Schedule(int scheduleId,
+                  int projectId,
+                  String projectName,
+                  String flowName,
+                  String status,
+                  long firstSchedTime,
+                  long endSchedTime,
+                  DateTimeZone timezone,
+                  ReadablePeriod period,
+                  long lastModifyTime,
+                  long nextExecTime,
+                  long submitTime,
+                  String submitUser,
+                  ExecutionOptions executionOptions,
+                  List<SlaOption> slaOptions,
+                  String cronExpression) {
     this.scheduleId = scheduleId;
     this.projectId = projectId;
     this.projectName = projectName;
     this.flowName = flowName;
     this.firstSchedTime = firstSchedTime;
+    this.endSchedTime = endSchedTime;
     this.timezone = timezone;
     this.lastModifyTime = lastModifyTime;
     this.period = period;
@@ -385,4 +377,7 @@ public class Schedule {
     return skipPastOccurrences;
   }
 
+  public long getEndSchedTime() {
+    return endSchedTime;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
index 65ae121..6c97b80 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
@@ -24,8 +24,6 @@ public interface ScheduleLoader {
 
   public void updateSchedule(Schedule s) throws ScheduleManagerException;
 
-  public List<Schedule> loadSchedules() throws ScheduleManagerException;
-
   public void removeSchedule(Schedule s) throws ScheduleManagerException;
 
   public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
index fbfef13..8b1a848 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
@@ -50,9 +50,9 @@ public class ScheduleManager implements TriggerAgent {
   private ScheduleLoader loader;
 
   private Map<Integer, Schedule> scheduleIDMap =
-      new LinkedHashMap<Integer, Schedule>();
+      new LinkedHashMap<>();
   private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap =
-      new LinkedHashMap<Pair<Integer, String>, Schedule>();
+      new LinkedHashMap<>();
 
   /**
    * Give the schedule manager a loader class that will properly load the
@@ -66,22 +66,6 @@ public class ScheduleManager implements TriggerAgent {
 
   @Override
   public void start() throws ScheduleManagerException {
-    List<Schedule> scheduleList = null;
-    try {
-      scheduleList = loader.loadSchedules();
-    } catch (ScheduleManagerException e) {
-      logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
-      e.printStackTrace();
-    }
-
-    for (Schedule sched : scheduleList) {
-      if (sched.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
-        onScheduleExpire(sched);
-      } else {
-        internalSchedule(sched);
-      }
-    }
-
   }
 
   // only do this when using external runner
@@ -124,7 +108,6 @@ public class ScheduleManager implements TriggerAgent {
   /**
    * Returns the scheduled flow for the flow name
    *
-   * @param id
    * @return
    * @throws ScheduleManagerException
    */
@@ -139,7 +122,6 @@ public class ScheduleManager implements TriggerAgent {
   /**
    * Returns the scheduled flow for the scheduleId
    *
-   * @param id
    * @return
    * @throws ScheduleManagerException
    */
@@ -151,7 +133,6 @@ public class ScheduleManager implements TriggerAgent {
   /**
    * Removes the flow from the schedule if it exists.
    *
-   * @param id
    * @throws ScheduleManagerException
    */
 
@@ -166,7 +147,6 @@ public class ScheduleManager implements TriggerAgent {
   /**
    * Removes the flow from the schedule if it exists.
    *
-   * @param id
    */
   public synchronized void removeSchedule(Schedule sched) {
     Pair<Integer, String> identityPairMap = sched.getScheduleIdentityPair();
@@ -185,44 +165,50 @@ public class ScheduleManager implements TriggerAgent {
     }
   }
 
-  public Schedule scheduleFlow(final int scheduleId, final int projectId,
-      final String projectName, final String flowName, final String status,
-      final long firstSchedTime, final DateTimeZone timezone,
-      final ReadablePeriod period, final long lastModifyTime,
-      final long nextExecTime, final long submitTime, final String submitUser) {
-    return scheduleFlow(scheduleId, projectId, projectName, flowName, status,
-        firstSchedTime, timezone, period, lastModifyTime, nextExecTime,
-        submitTime, submitUser, null, null);
-  }
-
-  public Schedule scheduleFlow(final int scheduleId, final int projectId,
-      final String projectName, final String flowName, final String status,
-      final long firstSchedTime, final DateTimeZone timezone,
-      final ReadablePeriod period, final long lastModifyTime,
-      final long nextExecTime, final long submitTime, final String submitUser,
-      ExecutionOptions execOptions, List<SlaOption> slaOptions) {
-    Schedule sched =
-        new Schedule(scheduleId, projectId, projectName, flowName, status,
-            firstSchedTime, timezone, period, lastModifyTime, nextExecTime,
-            submitTime, submitUser, execOptions, slaOptions, null);
+  public Schedule scheduleFlow(final int scheduleId,
+                               final int projectId,
+                               final String projectName,
+                               final String flowName,
+                               final String status,
+                               final long firstSchedTime,
+                               final long endSchedTime,
+                               final DateTimeZone timezone,
+                               final ReadablePeriod period,
+                               final long lastModifyTime,
+                               final long nextExecTime,
+                               final long submitTime,
+                               final String submitUser,
+                               ExecutionOptions execOptions, List<SlaOption> slaOptions) {
+    Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status,
+        firstSchedTime, endSchedTime, timezone, period, lastModifyTime, nextExecTime,
+        submitTime, submitUser, execOptions, slaOptions, null);
     logger
         .info("Scheduling flow '" + sched.getScheduleName() + "' for "
-            + _dateFormat.print(firstSchedTime) + " with a period of " + period == null ? "(non-recurring)"
-            : period);
+            + _dateFormat.print(firstSchedTime) + " with a period of " + (period == null ? "(non-recurring)"
+            : period));
 
     insertSchedule(sched);
     return sched;
   }
 
-  public Schedule cronScheduleFlow(final int scheduleId, final int projectId,
-      final String projectName, final String flowName, final String status,
-      final long firstSchedTime, final DateTimeZone timezone,
-      final long lastModifyTime,
-      final long nextExecTime, final long submitTime, final String submitUser,
-      ExecutionOptions execOptions, List<SlaOption> slaOptions, String cronExpression) {
+  public Schedule cronScheduleFlow(final int scheduleId,
+                                   final int projectId,
+                                   final String projectName,
+                                   final String flowName,
+                                   final String status,
+                                   final long firstSchedTime,
+                                   final long endSchedTime,
+                                   final DateTimeZone timezone,
+                                   final long lastModifyTime,
+                                   final long nextExecTime,
+                                   final long submitTime,
+                                   final String submitUser,
+                                   ExecutionOptions execOptions,
+                                   List<SlaOption> slaOptions,
+                                   String cronExpression) {
     Schedule sched =
         new Schedule(scheduleId, projectId, projectName, flowName, status,
-            firstSchedTime, timezone, null, lastModifyTime, nextExecTime,
+            firstSchedTime, endSchedTime, timezone, null, lastModifyTime, nextExecTime,
             submitTime, submitUser, execOptions, slaOptions, cronExpression);
     logger
         .info("Scheduling flow '" + sched.getScheduleName() + "' for "
@@ -233,8 +219,6 @@ public class ScheduleManager implements TriggerAgent {
   }
   /**
    * Schedules the flow, but doesn't save the schedule afterwards.
-   *
-   * @param flow
    */
   private synchronized void internalSchedule(Schedule s) {
     scheduleIDMap.put(s.getScheduleId(), s);
@@ -244,7 +228,6 @@ public class ScheduleManager implements TriggerAgent {
   /**
    * Adds a flow to the schedule.
    *
-   * @param flow
    */
   public synchronized void insertSchedule(Schedule s) {
     Schedule exist = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index e1bcf81..306c729 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -16,6 +16,7 @@
 
 package azkaban.scheduler;
 
+import azkaban.Constants;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -45,7 +46,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
   private long lastUpdateTime = -1;
 
   public TriggerBasedScheduleLoader(TriggerManager triggerManager,
-      String triggerSource) {
+                                    String triggerSource) {
     this.triggerManager = triggerManager;
     this.triggerSource = triggerSource;
   }
@@ -79,7 +80,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 
   private Condition createTriggerCondition(Schedule s) {
     Map<String, ConditionChecker> checkers =
-        new HashMap<String, ConditionChecker>();
+        new HashMap<>();
     ConditionChecker checker =
         new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(),
             s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(),
@@ -90,18 +91,14 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
     return cond;
   }
 
-  // if failed to trigger, auto expire?
   private Condition createExpireCondition(Schedule s) {
-    Map<String, ConditionChecker> checkers =
-        new HashMap<String, ConditionChecker>();
-    ConditionChecker checker =
-        new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(),
-            s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(),
-            s.getPeriod(), s.getCronExpression());
+    Map<String, ConditionChecker> checkers = new HashMap<>();
+    ConditionChecker checker = new BasicTimeChecker("EndTimeChecker_1", s.getFirstSchedTime(),
+        s.getTimezone(), s.getEndSchedTime(), false, false,
+        null, null);
     checkers.put(checker.getId(), checker);
     String expr = checker.getId() + ".eval()";
-    Condition cond = new Condition(checkers, expr);
-    return cond;
+    return new Condition(checkers, expr);
   }
 
   @Override
@@ -125,33 +122,11 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
     }
   }
 
-  // TODO may need to add logic to filter out skip runs
-  @Override
-  public synchronized List<Schedule> loadSchedules()
-      throws ScheduleManagerException {
-    List<Trigger> triggers = triggerManager.getTriggers(triggerSource);
-    List<Schedule> schedules = new ArrayList<Schedule>();
-    for (Trigger t : triggers) {
-      lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime());
-      Schedule s = triggerToSchedule(t);
-      schedules.add(s);
-      System.out.println("loaded schedule for "
-          + s.getProjectName() + " (project_ID: " + s.getProjectId() + ")");
-    }
-    return schedules;
+  private Schedule triggerToSchedule(Trigger t) throws ScheduleManagerException {
 
-  }
+    BasicTimeChecker triggerTimeChecker = getBasicTimeChecker(t.getTriggerCondition().getCheckers());
+    BasicTimeChecker endTimeChecker = getBasicTimeChecker(t.getExpireCondition().getCheckers());
 
-  private Schedule triggerToSchedule(Trigger t) throws ScheduleManagerException {
-    Condition triggerCond = t.getTriggerCondition();
-    Map<String, ConditionChecker> checkers = triggerCond.getCheckers();
-    BasicTimeChecker ck = null;
-    for (ConditionChecker checker : checkers.values()) {
-      if (checker.getType().equals(BasicTimeChecker.type)) {
-        ck = (BasicTimeChecker) checker;
-        break;
-      }
-    }
     List<TriggerAction> actions = t.getActions();
     ExecuteFlowAction act = null;
     for (TriggerAction action : actions) {
@@ -160,15 +135,23 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
         break;
       }
     }
-    if (ck != null && act != null) {
-      Schedule s =
-          new Schedule(t.getTriggerId(), act.getProjectId(),
-              act.getProjectName(), act.getFlowName(),
-              t.getStatus().toString(), ck.getFirstCheckTime(),
-              ck.getTimeZone(), ck.getPeriod(), t.getLastModifyTime(),
-              ck.getNextCheckTime(), t.getSubmitTime(), t.getSubmitUser(),
-              act.getExecutionOptions(), act.getSlaOptions(), ck.getCronExpression());
-      return s;
+    if (triggerTimeChecker != null && act != null) {
+      return new Schedule(t.getTriggerId(),
+          act.getProjectId(),
+          act.getProjectName(),
+          act.getFlowName(),
+          t.getStatus().toString(),
+          triggerTimeChecker.getFirstCheckTime(),
+          endTimeChecker == null? Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME: endTimeChecker.getNextCheckTime(),
+          triggerTimeChecker.getTimeZone(),
+          triggerTimeChecker.getPeriod(),
+          t.getLastModifyTime(),
+          triggerTimeChecker.getNextCheckTime(),
+          t.getSubmitTime(),
+          t.getSubmitUser(),
+          act.getExecutionOptions(),
+          act.getSlaOptions(),
+          triggerTimeChecker.getCronExpression());
     } else {
       logger.error("Failed to parse schedule from trigger!");
       throw new ScheduleManagerException(
@@ -176,6 +159,16 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
     }
   }
 
+  // expirecheckers or triggerCheckers only have BasicTimeChecker today. This should be refactored in future.
+  private BasicTimeChecker getBasicTimeChecker(Map<String, ConditionChecker> checkers) {
+    for (ConditionChecker checker : checkers.values()) {
+      if (checker.getType().equals(BasicTimeChecker.type)) {
+        return (BasicTimeChecker) checker;
+      }
+    }
+    return null;
+  }
+
   @Override
   public void removeSchedule(Schedule s) throws ScheduleManagerException {
     try {
@@ -207,7 +200,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
       lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime());
       Schedule s = triggerToSchedule(t);
       schedules.add(s);
-      System.out.println("loaded schedule for "
+      logger.info("loaded schedule for "
           + s.getProjectName() + " (project_ID: " + s.getProjectId() + ")");
     }
     return schedules;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 52ff2e0..0a69b2d 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -23,8 +23,6 @@ import java.util.Date;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.ReadablePeriod;
-import org.apache.log4j.Logger;
-
 import org.quartz.CronExpression;
 
 import azkaban.trigger.ConditionChecker;
@@ -32,8 +30,6 @@ import azkaban.utils.Utils;
 
 public class BasicTimeChecker implements ConditionChecker {
 
-  private static Logger logger = Logger.getLogger(BasicTimeChecker.class);
-
   public static final String type = "BasicTimeChecker";
 
   private long firstCheckTime;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Condition.java b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
index 5e8e7b6..275c567 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Condition.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
@@ -35,7 +35,7 @@ public class Condition {
   private static CheckerTypeLoader checkerLoader = null;
   private Expression expression;
   private Map<String, ConditionChecker> checkers =
-      new HashMap<String, ConditionChecker>();
+      new HashMap<>();
   private MapContext context = new MapContext();
   private Long nextCheckTime = -1L;
 
@@ -52,24 +52,10 @@ public class Condition {
     this.expression = jexl.createExpression(expr);
   }
 
-  public synchronized static void setJexlEngine(JexlEngine jexl) {
-    Condition.jexl = jexl;
-  }
-
   public synchronized static void setCheckerLoader(CheckerTypeLoader loader) {
     Condition.checkerLoader = loader;
   }
 
-  protected static CheckerTypeLoader getCheckerLoader() {
-    return checkerLoader;
-  }
-
-  protected void registerChecker(ConditionChecker checker) {
-    checkers.put(checker.getId(), checker);
-    context.set(checker.getId(), checker);
-    updateNextCheckTime();
-  }
-
   public long getNextCheckTime() {
     return nextCheckTime;
   }
@@ -78,7 +64,7 @@ public class Condition {
     return this.checkers;
   }
 
-  public void setCheckers(Map<String, ConditionChecker> checkers) {
+  private void setCheckers(Map<String, ConditionChecker> checkers) {
     this.checkers = checkers;
     for (ConditionChecker checker : checkers.values()) {
       this.context.set(checker.getId(), checker);
@@ -86,12 +72,6 @@ public class Condition {
     updateNextCheckTime();
   }
 
-  public void updateCheckTime(Long ct) {
-    if (nextCheckTime < ct) {
-      nextCheckTime = ct;
-    }
-  }
-
   private void updateNextCheckTime() {
     long time = Long.MAX_VALUE;
     for (ConditionChecker checker : checkers.values()) {
@@ -125,12 +105,12 @@ public class Condition {
   }
 
   public Object toJson() {
-    Map<String, Object> jsonObj = new HashMap<String, Object>();
+    Map<String, Object> jsonObj = new HashMap<>();
     jsonObj.put("expression", expression.getExpression());
 
-    List<Object> checkersJson = new ArrayList<Object>();
+    List<Object> checkersJson = new ArrayList<>();
     for (ConditionChecker checker : checkers.values()) {
-      Map<String, Object> oneChecker = new HashMap<String, Object>();
+      Map<String, Object> oneChecker = new HashMap<>();
       oneChecker.put("type", checker.getType());
       oneChecker.put("checkerJson", checker.toJson());
       checkersJson.add(oneChecker);
@@ -152,7 +132,7 @@ public class Condition {
 
     try {
       Map<String, ConditionChecker> checkers =
-          new HashMap<String, ConditionChecker>();
+          new HashMap<>();
       List<Object> checkersJson = (List<Object>) jsonObj.get("checkers");
       for (Object oneCheckerJson : checkersJson) {
         Map<String, Object> oneChecker =
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
index edde6d3..6d4312d 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
@@ -59,8 +59,7 @@ public class Trigger {
   }
 
   public void updateNextCheckTime() {
-    this.nextCheckTime =
-        Math.min(triggerCondition.getNextCheckTime(),
+    this.nextCheckTime = Math.min(triggerCondition.getNextCheckTime(),
             expireCondition.getNextCheckTime());
   }
 
@@ -346,8 +345,7 @@ public class Trigger {
     Trigger trigger = null;
     try {
       logger.info("Decoding for " + JSONUtils.toJSON(obj));
-      Condition triggerCond =
-          Condition.fromJson(jsonObj.get("triggerCondition"));
+      Condition triggerCond = Condition.fromJson(jsonObj.get("triggerCondition"));
       Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
       List<TriggerAction> actions = new ArrayList<TriggerAction>();
       List<Object> actionsJson = (List<Object>) jsonObj.get("actions");
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 9f75544..47ea193 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -282,10 +282,17 @@ public class TriggerManager extends EventHandler implements
           }
 
           if (t.getStatus().equals(TriggerStatus.READY)) {
-            if (t.triggerConditionMet()) {
+
+            /**
+             * Prior to this change, expiration condition should never be called though
+             * we have some related code here. ExpireCondition used the same BasicTimeChecker
+             * as triggerCondition do. As a consequence, we need to figure out a way to distinguish
+             * the previous ExpireCondition and this commit's ExpireCondition.
+             */
+            if (t.getExpireCondition().getExpression().contains("EndTimeChecker") && t.expireConditionMet()) {
+              onTriggerPause(t);
+            } else if (t.triggerConditionMet()) {
               onTriggerTrigger(t);
-            } else if (t.expireConditionMet()) {
-              onTriggerExpire(t);
             }
           }
           if (t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
@@ -312,9 +319,9 @@ public class TriggerManager extends EventHandler implements
           logger.error("Failed to do action " + action.getDescription(), th);
         }
       }
+
       if (t.isResetOnTrigger()) {
         t.resetTriggerConditions();
-        t.resetExpireCondition();
       } else {
         t.setStatus(TriggerStatus.EXPIRED);
       }
@@ -325,7 +332,7 @@ public class TriggerManager extends EventHandler implements
       }
     }
 
-    private void onTriggerExpire(Trigger t) throws TriggerManagerException {
+    private void onTriggerPause(Trigger t) throws TriggerManagerException {
       List<TriggerAction> expireActions = t.getExpireActions();
       for (TriggerAction action : expireActions) {
         try {
@@ -339,12 +346,8 @@ public class TriggerManager extends EventHandler implements
               th);
         }
       }
-      if (t.isResetOnExpire()) {
-        t.resetTriggerConditions();
-        t.resetExpireCondition();
-      } else {
-        t.setStatus(TriggerStatus.EXPIRED);
-      }
+      logger.info("Pausing Trigger " + t.getDescription());
+      t.setStatus(TriggerStatus.PAUSED);
       try {
         triggerLoader.updateTrigger(t);
       } catch (TriggerLoaderException e) {
diff --git a/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java b/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
index 279c329..0d65bf2 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
@@ -86,31 +86,25 @@ public class ThresholdChecker implements ConditionChecker {
 
   @Override
   public Object getNum() {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public Object toJson() {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public void stopChecker() {
     return;
-
   }
 
   @Override
   public void setContext(Map<String, Object> context) {
-    // TODO Auto-generated method stub
-
   }
 
   @Override
   public long getNextCheckTime() {
-    // TODO Auto-generated method stub
     return 0;
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
index 4021cc7..a6865fc 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
@@ -16,115 +16,114 @@
 
 package azkaban.trigger;
 
+import azkaban.executor.ExecutorManager;
+
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.utils.Utils;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.joda.time.DateTime;
-
+import org.joda.time.DateTimeZone;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
 import azkaban.utils.Props;
 
+import static org.mockito.Mockito.*;
+
 public class TriggerManagerTest {
 
-  private TriggerLoader triggerLoader;
+  private static TriggerLoader triggerLoader;
+  private static ExecutorManager executorManager;
+  private TriggerManager triggerManager;
 
-  @Before
-  public void setup() throws TriggerException, TriggerManagerException {
+  @BeforeClass
+  public static void prepare() {
     triggerLoader = new MockTriggerLoader();
-
+    executorManager = mock(ExecutorManager.class);
+    doNothing().when(executorManager).addListener(anyObject());
   }
 
-  @After
-  public void tearDown() {
-
-  }
-
-  @Ignore @Test
-  public void triggerManagerSimpleTest() throws TriggerManagerException {
+  @Before
+  public void setup() throws TriggerException, TriggerManagerException {
     Props props = new Props();
-    props.put("trigger.scan.interval", 4000);
-    TriggerManager triggerManager =
-        new TriggerManager(props, triggerLoader, null);
-
+    props.put("trigger.scan.interval", 300);
+    triggerManager = new TriggerManager(props, triggerLoader, executorManager);
     triggerManager.registerCheckerType(ThresholdChecker.type,
         ThresholdChecker.class);
     triggerManager.registerActionType(DummyTriggerAction.type,
         DummyTriggerAction.class);
+    triggerManager.start();
+  }
 
-    ThresholdChecker.setVal(1);
+  @After
+  public void tearDown() {
+    triggerManager.shutdown();
+  }
+
+  @Test
+  public void neverExpireTriggerTest() throws TriggerManagerException {
 
-    triggerManager.insertTrigger(
-        createDummyTrigger("test1", "triggerLoader", 10), "testUser");
-    List<Trigger> triggers = triggerManager.getTriggers();
-    assertTrue(triggers.size() == 1);
-    Trigger t1 = triggers.get(0);
+    Trigger t1 = createNeverExpireTrigger("triggerLoader", 10);
+    triggerManager.insertTrigger(t1);
     t1.setResetOnTrigger(false);
-    triggerManager.updateTrigger(t1, "testUser");
-    ThresholdChecker checker1 =
+    ThresholdChecker triggerChecker =
         (ThresholdChecker) t1.getTriggerCondition().getCheckers().values()
             .toArray()[0];
-    assertTrue(t1.getSource().equals("triggerLoader"));
 
-    Trigger t2 =
-        createDummyTrigger("test2: add new trigger", "addNewTriggerTest", 20);
-    triggerManager.insertTrigger(t2, "testUser");
-    ThresholdChecker checker2 =
-        (ThresholdChecker) t2.getTriggerCondition().getCheckers().values()
+    BasicTimeChecker expireChecker =
+        (BasicTimeChecker) t1.getExpireCondition().getCheckers().values()
             .toArray()[0];
 
     ThresholdChecker.setVal(15);
-    try {
-      Thread.sleep(2000);
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
+    sleep(300);
+    sleep(300);
+    assertTrue(triggerChecker.isCheckerMet() == true);
+    assertTrue(expireChecker.eval() == false);
 
-    assertTrue(checker1.isCheckerMet() == false);
-    assertTrue(checker2.isCheckerMet() == false);
-    assertTrue(checker1.isCheckerReset() == false);
-    assertTrue(checker2.isCheckerReset() == false);
+    ThresholdChecker.setVal(25);
+    sleep(300);
+    assertTrue(triggerChecker.isCheckerMet() == true);
+    assertTrue(expireChecker.eval() == false);
+  }
 
-    try {
-      Thread.sleep(2000);
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
 
-    assertTrue(checker1.isCheckerMet() == true);
-    assertTrue(checker2.isCheckerMet() == false);
-    assertTrue(checker1.isCheckerReset() == false);
-    assertTrue(checker2.isCheckerReset() == false);
+  @Test
+  public void timeCheckerAndExpireTriggerTest() throws TriggerManagerException {
 
-    ThresholdChecker.setVal(25);
-    try {
-      Thread.sleep(4000);
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
+    long curr = System.currentTimeMillis();
+    Trigger t1 = createPeriodAndEndCheckerTrigger(curr);
+    triggerManager.insertTrigger(t1);
+    t1.setResetOnTrigger(true);
+    BasicTimeChecker expireChecker =
+        (BasicTimeChecker) t1.getExpireCondition().getCheckers().values()
+            .toArray()[0];
+
+    sleep(1000);
 
-    assertTrue(checker1.isCheckerMet() == true);
-    assertTrue(checker1.isCheckerReset() == false);
-    assertTrue(checker2.isCheckerReset() == true);
+    assertTrue(expireChecker.eval() == false);
+    assertTrue(t1.getStatus() == TriggerStatus.READY);
 
-    triggers = triggerManager.getTriggers();
-    assertTrue(triggers.size() == 1);
+    sleep(1000);
+    sleep(1000);
+    sleep(1000);
+    assertTrue(expireChecker.eval() == true);
+    assertTrue(t1.getStatus() == TriggerStatus.PAUSED);
 
+    sleep(1000);
+    assertTrue(expireChecker.eval() == true);
+    assertTrue(t1.getStatus() == TriggerStatus.PAUSED);
   }
 
-  public class MockTriggerLoader implements TriggerLoader {
 
-    private Map<Integer, Trigger> triggers = new HashMap<Integer, Trigger>();
+  public static class MockTriggerLoader implements TriggerLoader {
+    private Map<Integer, Trigger> triggers = new HashMap<>();
     private int idIndex = 0;
 
     @Override
@@ -146,7 +145,7 @@ public class TriggerManagerTest {
 
     @Override
     public List<Trigger> loadTriggers() {
-      return new ArrayList<Trigger>(triggers.values());
+      return new ArrayList<>(triggers.values());
     }
 
     @Override
@@ -161,49 +160,81 @@ public class TriggerManagerTest {
       // TODO Auto-generated method stub
       return null;
     }
-
   }
 
-  private Trigger createDummyTrigger(String message, String source,
-      int threshold) {
-
-    Map<String, ConditionChecker> checkers =
-        new HashMap<String, ConditionChecker>();
-    ConditionChecker checker =
-        new ThresholdChecker(ThresholdChecker.type, threshold);
-    checkers.put(checker.getId(), checker);
-
-    List<TriggerAction> actions = new ArrayList<TriggerAction>();
-    TriggerAction act = new DummyTriggerAction(message);
-    actions.add(act);
-
-    String expr = checker.getId() + ".eval()";
-
-    Condition triggerCond = new Condition(checkers, expr);
-    Condition expireCond = new Condition(checkers, expr);
+  private void sleep (long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
 
-    Trigger fakeTrigger =
-        new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(),
-            "azkaban", source, triggerCond, expireCond, actions);
-    fakeTrigger.setResetOnTrigger(true);
+  private Trigger createNeverExpireTrigger(String source, int threshold) {
+    Map<String, ConditionChecker> triggerCheckers = new HashMap<>();
+    Map<String, ConditionChecker> expireCheckers = new HashMap<>();
+    ConditionChecker triggerChecker = new ThresholdChecker(ThresholdChecker.type, threshold);
+    ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeCheck_1", 111L,
+        DateTimeZone.UTC, 2536871155000L,false, false,
+        null, null);
+    triggerCheckers.put(triggerChecker.getId(), triggerChecker);
+    expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
+
+    String triggerExpr = triggerChecker.getId() + ".eval()";
+    String expireExpr = endTimeChecker.getId() + ".eval()";
+
+    Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+    Condition expireCond = new Condition(expireCheckers, expireExpr);
+
+    Trigger fakeTrigger = new Trigger("azkaban",
+        source,
+        triggerCond,
+        expireCond,
+        getTriggerActions());
+
+    fakeTrigger.setResetOnTrigger(false);
     fakeTrigger.setResetOnExpire(true);
-
     return fakeTrigger;
   }
 
-  // public class MockCheckerLoader extends CheckerTypeLoader{
-  //
-  // @Override
-  // public void init(Props props) {
-  // checkerToClass.put(ThresholdChecker.type, ThresholdChecker.class);
-  // }
-  // }
-  //
-  // public class MockActionLoader extends ActionTypeLoader {
-  // @Override
-  // public void init(Props props) {
-  // actionToClass.put(DummyTriggerAction.type, DummyTriggerAction.class);
-  // }
-  // }
-
-}
+  private Trigger createPeriodAndEndCheckerTrigger(long currMillis) {
+    Map<String, ConditionChecker> triggerCheckers = new HashMap<>();
+    Map<String, ConditionChecker> expireCheckers = new HashMap<>();
+
+    // TODO kunkun-tang: 1 second is the minimum unit for {@link org.joda.time.ReadablePeriod}.
+    // In future, we should use some smaller alternative.
+    ConditionChecker triggerChecker = new BasicTimeChecker("BasicTimeChecker_1",
+        currMillis, DateTimeZone.UTC, true, true,
+        Utils.parsePeriodString("1s"), null);
+
+    // End time is 3 seconds past now.
+    ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeChecker_1", 111L,
+        DateTimeZone.UTC, currMillis + 3000L,false, false,
+        null, null);
+    triggerCheckers.put(triggerChecker.getId(), triggerChecker);
+    expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
+
+    String triggerExpr = triggerChecker.getId() + ".eval()";
+    String expireExpr = endTimeChecker.getId() + ".eval()";
+
+    Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+    Condition expireCond = new Condition(expireCheckers, expireExpr);
+
+    Trigger timeTrigger = new Trigger("azkaban",
+        "",
+        triggerCond,
+        expireCond,
+        getTriggerActions());
+
+    timeTrigger.setResetOnTrigger(false);
+    timeTrigger.setResetOnExpire(true);
+    return timeTrigger;
+  }
+
+  private List<TriggerAction> getTriggerActions() {
+    List<TriggerAction> actions = new ArrayList<>();
+    TriggerAction act = new DummyTriggerAction("");
+    actions.add(act);
+    return actions;
+  }
+}
\ No newline at end of file
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index f013451..c25963c 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,6 +16,7 @@
 
 package azkaban.webapp.servlet;
 
+import azkaban.Constants;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -655,6 +656,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       return;
     }
 
+    long endSchedTime = getLongParam(req, "endSchedTime", Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME);
+    try {
+      // Todo kunkun-tang: Need to verify if passed end time is valid.
+    } catch (Exception e) {
+      ret.put("error", "Invalid date and time: " + endSchedTime);
+      return;
+    }
+
     ReadablePeriod thePeriod = null;
     try {
       if (hasParam(req, "is_recurring")
@@ -677,7 +686,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 
     Schedule schedule =
         scheduleManager.scheduleFlow(-1, projectId, projectName, flowName,
-            "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(),
+            "ready", firstSchedTime.getMillis(), endSchedTime, firstSchedTime.getZone(),
             thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(),
             firstSchedTime.getMillis(), user.getUserId(), flowOptions,
             slaOptions);
@@ -745,6 +754,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
       ret.put("error", e.getMessage());
     }
 
+    long endSchedTime = getLongParam(req, "endSchedTime", Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME);
+    try {
+      // Todo kunkun-tang: Need to verify if passed end time is valid.
+    } catch (Exception e) {
+      ret.put("error", "Invalid date and time: " + endSchedTime);
+      return;
+    }
+
     ExecutionOptions flowOptions = null;
     try {
       flowOptions = HttpRequestUtils.parseFlowOptions(req);
@@ -757,7 +774,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 
     // Because either cronExpression or recurrence exists, we build schedule in the below way.
     Schedule schedule = scheduleManager.cronScheduleFlow(-1, projectId, projectName, flowName,
-            "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(),
+            "ready", firstSchedTime.getMillis(), endSchedTime, firstSchedTime.getZone(),
             DateTime.now().getMillis(), firstSchedTime.getMillis(),
             firstSchedTime.getMillis(), user.getUserId(), flowOptions,
             slaOptions, cronExpression);