Details
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 7c7fc2e..04bdc5f 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -44,6 +44,9 @@ public class Constants {
// Max number of memory check retry
public static final int MEMORY_CHECK_RETRY_LIMIT = 720;
+ // One Schedule's default End Time: 01/01/2050, 00:00:00, UTC
+ public static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
+
public static class ConfigurationKeys {
// These properties are configurable through azkaban.properties
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java b/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
index 27d3f30..7e189e3 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
@@ -46,6 +46,7 @@ public class Schedule {
private String projectName;
private String flowName;
private long firstSchedTime;
+ private long endSchedTime;
private DateTimeZone timezone;
private long lastModifyTime;
private ReadablePeriod period;
@@ -60,37 +61,28 @@ public class Schedule {
private ExecutionOptions executionOptions;
private List<SlaOption> slaOptions;
- public Schedule(int scheduleId, int projectId, String projectName,
- String flowName, String status, long firstSchedTime,
- DateTimeZone timezone, ReadablePeriod period, long lastModifyTime,
- long nextExecTime, long submitTime, String submitUser) {
-
- this(scheduleId, projectId, projectName, flowName, status, firstSchedTime,
- timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser,
- null, null, null);
- }
-
- public Schedule(int scheduleId, int projectId, String projectName,
- String flowName, String status, long firstSchedTime, String timezoneId,
- String period, long lastModifyTime, long nextExecTime, long submitTime,
- String submitUser, ExecutionOptions executionOptions,
- List<SlaOption> slaOptions) {
- this(scheduleId, projectId, projectName, flowName, status, firstSchedTime,
- DateTimeZone.forID(timezoneId), parsePeriodString(period),
- lastModifyTime, nextExecTime, submitTime, submitUser, executionOptions,
- slaOptions, null);
- }
-
- public Schedule(int scheduleId, int projectId, String projectName,
- String flowName, String status, long firstSchedTime,
- DateTimeZone timezone, ReadablePeriod period, long lastModifyTime,
- long nextExecTime, long submitTime, String submitUser,
- ExecutionOptions executionOptions, List<SlaOption> slaOptions, String cronExpression) {
+ public Schedule(int scheduleId,
+ int projectId,
+ String projectName,
+ String flowName,
+ String status,
+ long firstSchedTime,
+ long endSchedTime,
+ DateTimeZone timezone,
+ ReadablePeriod period,
+ long lastModifyTime,
+ long nextExecTime,
+ long submitTime,
+ String submitUser,
+ ExecutionOptions executionOptions,
+ List<SlaOption> slaOptions,
+ String cronExpression) {
this.scheduleId = scheduleId;
this.projectId = projectId;
this.projectName = projectName;
this.flowName = flowName;
this.firstSchedTime = firstSchedTime;
+ this.endSchedTime = endSchedTime;
this.timezone = timezone;
this.lastModifyTime = lastModifyTime;
this.period = period;
@@ -385,4 +377,7 @@ public class Schedule {
return skipPastOccurrences;
}
+ public long getEndSchedTime() {
+ return endSchedTime;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
index 65ae121..6c97b80 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
@@ -24,8 +24,6 @@ public interface ScheduleLoader {
public void updateSchedule(Schedule s) throws ScheduleManagerException;
- public List<Schedule> loadSchedules() throws ScheduleManagerException;
-
public void removeSchedule(Schedule s) throws ScheduleManagerException;
public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
index fbfef13..8b1a848 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
@@ -50,9 +50,9 @@ public class ScheduleManager implements TriggerAgent {
private ScheduleLoader loader;
private Map<Integer, Schedule> scheduleIDMap =
- new LinkedHashMap<Integer, Schedule>();
+ new LinkedHashMap<>();
private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap =
- new LinkedHashMap<Pair<Integer, String>, Schedule>();
+ new LinkedHashMap<>();
/**
* Give the schedule manager a loader class that will properly load the
@@ -66,22 +66,6 @@ public class ScheduleManager implements TriggerAgent {
@Override
public void start() throws ScheduleManagerException {
- List<Schedule> scheduleList = null;
- try {
- scheduleList = loader.loadSchedules();
- } catch (ScheduleManagerException e) {
- logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
- e.printStackTrace();
- }
-
- for (Schedule sched : scheduleList) {
- if (sched.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
- onScheduleExpire(sched);
- } else {
- internalSchedule(sched);
- }
- }
-
}
// only do this when using external runner
@@ -124,7 +108,6 @@ public class ScheduleManager implements TriggerAgent {
/**
* Returns the scheduled flow for the flow name
*
- * @param id
* @return
* @throws ScheduleManagerException
*/
@@ -139,7 +122,6 @@ public class ScheduleManager implements TriggerAgent {
/**
* Returns the scheduled flow for the scheduleId
*
- * @param id
* @return
* @throws ScheduleManagerException
*/
@@ -151,7 +133,6 @@ public class ScheduleManager implements TriggerAgent {
/**
* Removes the flow from the schedule if it exists.
*
- * @param id
* @throws ScheduleManagerException
*/
@@ -166,7 +147,6 @@ public class ScheduleManager implements TriggerAgent {
/**
* Removes the flow from the schedule if it exists.
*
- * @param id
*/
public synchronized void removeSchedule(Schedule sched) {
Pair<Integer, String> identityPairMap = sched.getScheduleIdentityPair();
@@ -185,44 +165,50 @@ public class ScheduleManager implements TriggerAgent {
}
}
- public Schedule scheduleFlow(final int scheduleId, final int projectId,
- final String projectName, final String flowName, final String status,
- final long firstSchedTime, final DateTimeZone timezone,
- final ReadablePeriod period, final long lastModifyTime,
- final long nextExecTime, final long submitTime, final String submitUser) {
- return scheduleFlow(scheduleId, projectId, projectName, flowName, status,
- firstSchedTime, timezone, period, lastModifyTime, nextExecTime,
- submitTime, submitUser, null, null);
- }
-
- public Schedule scheduleFlow(final int scheduleId, final int projectId,
- final String projectName, final String flowName, final String status,
- final long firstSchedTime, final DateTimeZone timezone,
- final ReadablePeriod period, final long lastModifyTime,
- final long nextExecTime, final long submitTime, final String submitUser,
- ExecutionOptions execOptions, List<SlaOption> slaOptions) {
- Schedule sched =
- new Schedule(scheduleId, projectId, projectName, flowName, status,
- firstSchedTime, timezone, period, lastModifyTime, nextExecTime,
- submitTime, submitUser, execOptions, slaOptions, null);
+ public Schedule scheduleFlow(final int scheduleId,
+ final int projectId,
+ final String projectName,
+ final String flowName,
+ final String status,
+ final long firstSchedTime,
+ final long endSchedTime,
+ final DateTimeZone timezone,
+ final ReadablePeriod period,
+ final long lastModifyTime,
+ final long nextExecTime,
+ final long submitTime,
+ final String submitUser,
+ ExecutionOptions execOptions, List<SlaOption> slaOptions) {
+ Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status,
+ firstSchedTime, endSchedTime, timezone, period, lastModifyTime, nextExecTime,
+ submitTime, submitUser, execOptions, slaOptions, null);
logger
.info("Scheduling flow '" + sched.getScheduleName() + "' for "
- + _dateFormat.print(firstSchedTime) + " with a period of " + period == null ? "(non-recurring)"
- : period);
+ + _dateFormat.print(firstSchedTime) + " with a period of " + (period == null ? "(non-recurring)"
+ : period));
insertSchedule(sched);
return sched;
}
- public Schedule cronScheduleFlow(final int scheduleId, final int projectId,
- final String projectName, final String flowName, final String status,
- final long firstSchedTime, final DateTimeZone timezone,
- final long lastModifyTime,
- final long nextExecTime, final long submitTime, final String submitUser,
- ExecutionOptions execOptions, List<SlaOption> slaOptions, String cronExpression) {
+ public Schedule cronScheduleFlow(final int scheduleId,
+ final int projectId,
+ final String projectName,
+ final String flowName,
+ final String status,
+ final long firstSchedTime,
+ final long endSchedTime,
+ final DateTimeZone timezone,
+ final long lastModifyTime,
+ final long nextExecTime,
+ final long submitTime,
+ final String submitUser,
+ ExecutionOptions execOptions,
+ List<SlaOption> slaOptions,
+ String cronExpression) {
Schedule sched =
new Schedule(scheduleId, projectId, projectName, flowName, status,
- firstSchedTime, timezone, null, lastModifyTime, nextExecTime,
+ firstSchedTime, endSchedTime, timezone, null, lastModifyTime, nextExecTime,
submitTime, submitUser, execOptions, slaOptions, cronExpression);
logger
.info("Scheduling flow '" + sched.getScheduleName() + "' for "
@@ -233,8 +219,6 @@ public class ScheduleManager implements TriggerAgent {
}
/**
* Schedules the flow, but doesn't save the schedule afterwards.
- *
- * @param flow
*/
private synchronized void internalSchedule(Schedule s) {
scheduleIDMap.put(s.getScheduleId(), s);
@@ -244,7 +228,6 @@ public class ScheduleManager implements TriggerAgent {
/**
* Adds a flow to the schedule.
*
- * @param flow
*/
public synchronized void insertSchedule(Schedule s) {
Schedule exist = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index e1bcf81..306c729 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -16,6 +16,7 @@
package azkaban.scheduler;
+import azkaban.Constants;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -45,7 +46,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
private long lastUpdateTime = -1;
public TriggerBasedScheduleLoader(TriggerManager triggerManager,
- String triggerSource) {
+ String triggerSource) {
this.triggerManager = triggerManager;
this.triggerSource = triggerSource;
}
@@ -79,7 +80,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
private Condition createTriggerCondition(Schedule s) {
Map<String, ConditionChecker> checkers =
- new HashMap<String, ConditionChecker>();
+ new HashMap<>();
ConditionChecker checker =
new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(),
s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(),
@@ -90,18 +91,14 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
return cond;
}
- // if failed to trigger, auto expire?
private Condition createExpireCondition(Schedule s) {
- Map<String, ConditionChecker> checkers =
- new HashMap<String, ConditionChecker>();
- ConditionChecker checker =
- new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(),
- s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(),
- s.getPeriod(), s.getCronExpression());
+ Map<String, ConditionChecker> checkers = new HashMap<>();
+ ConditionChecker checker = new BasicTimeChecker("EndTimeChecker_1", s.getFirstSchedTime(),
+ s.getTimezone(), s.getEndSchedTime(), false, false,
+ null, null);
checkers.put(checker.getId(), checker);
String expr = checker.getId() + ".eval()";
- Condition cond = new Condition(checkers, expr);
- return cond;
+ return new Condition(checkers, expr);
}
@Override
@@ -125,33 +122,11 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
}
}
- // TODO may need to add logic to filter out skip runs
- @Override
- public synchronized List<Schedule> loadSchedules()
- throws ScheduleManagerException {
- List<Trigger> triggers = triggerManager.getTriggers(triggerSource);
- List<Schedule> schedules = new ArrayList<Schedule>();
- for (Trigger t : triggers) {
- lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime());
- Schedule s = triggerToSchedule(t);
- schedules.add(s);
- System.out.println("loaded schedule for "
- + s.getProjectName() + " (project_ID: " + s.getProjectId() + ")");
- }
- return schedules;
+ private Schedule triggerToSchedule(Trigger t) throws ScheduleManagerException {
- }
+ BasicTimeChecker triggerTimeChecker = getBasicTimeChecker(t.getTriggerCondition().getCheckers());
+ BasicTimeChecker endTimeChecker = getBasicTimeChecker(t.getExpireCondition().getCheckers());
- private Schedule triggerToSchedule(Trigger t) throws ScheduleManagerException {
- Condition triggerCond = t.getTriggerCondition();
- Map<String, ConditionChecker> checkers = triggerCond.getCheckers();
- BasicTimeChecker ck = null;
- for (ConditionChecker checker : checkers.values()) {
- if (checker.getType().equals(BasicTimeChecker.type)) {
- ck = (BasicTimeChecker) checker;
- break;
- }
- }
List<TriggerAction> actions = t.getActions();
ExecuteFlowAction act = null;
for (TriggerAction action : actions) {
@@ -160,15 +135,23 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
break;
}
}
- if (ck != null && act != null) {
- Schedule s =
- new Schedule(t.getTriggerId(), act.getProjectId(),
- act.getProjectName(), act.getFlowName(),
- t.getStatus().toString(), ck.getFirstCheckTime(),
- ck.getTimeZone(), ck.getPeriod(), t.getLastModifyTime(),
- ck.getNextCheckTime(), t.getSubmitTime(), t.getSubmitUser(),
- act.getExecutionOptions(), act.getSlaOptions(), ck.getCronExpression());
- return s;
+ if (triggerTimeChecker != null && act != null) {
+ return new Schedule(t.getTriggerId(),
+ act.getProjectId(),
+ act.getProjectName(),
+ act.getFlowName(),
+ t.getStatus().toString(),
+ triggerTimeChecker.getFirstCheckTime(),
+ endTimeChecker == null? Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME: endTimeChecker.getNextCheckTime(),
+ triggerTimeChecker.getTimeZone(),
+ triggerTimeChecker.getPeriod(),
+ t.getLastModifyTime(),
+ triggerTimeChecker.getNextCheckTime(),
+ t.getSubmitTime(),
+ t.getSubmitUser(),
+ act.getExecutionOptions(),
+ act.getSlaOptions(),
+ triggerTimeChecker.getCronExpression());
} else {
logger.error("Failed to parse schedule from trigger!");
throw new ScheduleManagerException(
@@ -176,6 +159,16 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
}
}
+ // expirecheckers or triggerCheckers only have BasicTimeChecker today. This should be refactored in future.
+ private BasicTimeChecker getBasicTimeChecker(Map<String, ConditionChecker> checkers) {
+ for (ConditionChecker checker : checkers.values()) {
+ if (checker.getType().equals(BasicTimeChecker.type)) {
+ return (BasicTimeChecker) checker;
+ }
+ }
+ return null;
+ }
+
@Override
public void removeSchedule(Schedule s) throws ScheduleManagerException {
try {
@@ -207,7 +200,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime());
Schedule s = triggerToSchedule(t);
schedules.add(s);
- System.out.println("loaded schedule for "
+ logger.info("loaded schedule for "
+ s.getProjectName() + " (project_ID: " + s.getProjectId() + ")");
}
return schedules;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 52ff2e0..0a69b2d 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -23,8 +23,6 @@ import java.util.Date;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
-import org.apache.log4j.Logger;
-
import org.quartz.CronExpression;
import azkaban.trigger.ConditionChecker;
@@ -32,8 +30,6 @@ import azkaban.utils.Utils;
public class BasicTimeChecker implements ConditionChecker {
- private static Logger logger = Logger.getLogger(BasicTimeChecker.class);
-
public static final String type = "BasicTimeChecker";
private long firstCheckTime;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Condition.java b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
index 5e8e7b6..275c567 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Condition.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
@@ -35,7 +35,7 @@ public class Condition {
private static CheckerTypeLoader checkerLoader = null;
private Expression expression;
private Map<String, ConditionChecker> checkers =
- new HashMap<String, ConditionChecker>();
+ new HashMap<>();
private MapContext context = new MapContext();
private Long nextCheckTime = -1L;
@@ -52,24 +52,10 @@ public class Condition {
this.expression = jexl.createExpression(expr);
}
- public synchronized static void setJexlEngine(JexlEngine jexl) {
- Condition.jexl = jexl;
- }
-
public synchronized static void setCheckerLoader(CheckerTypeLoader loader) {
Condition.checkerLoader = loader;
}
- protected static CheckerTypeLoader getCheckerLoader() {
- return checkerLoader;
- }
-
- protected void registerChecker(ConditionChecker checker) {
- checkers.put(checker.getId(), checker);
- context.set(checker.getId(), checker);
- updateNextCheckTime();
- }
-
public long getNextCheckTime() {
return nextCheckTime;
}
@@ -78,7 +64,7 @@ public class Condition {
return this.checkers;
}
- public void setCheckers(Map<String, ConditionChecker> checkers) {
+ private void setCheckers(Map<String, ConditionChecker> checkers) {
this.checkers = checkers;
for (ConditionChecker checker : checkers.values()) {
this.context.set(checker.getId(), checker);
@@ -86,12 +72,6 @@ public class Condition {
updateNextCheckTime();
}
- public void updateCheckTime(Long ct) {
- if (nextCheckTime < ct) {
- nextCheckTime = ct;
- }
- }
-
private void updateNextCheckTime() {
long time = Long.MAX_VALUE;
for (ConditionChecker checker : checkers.values()) {
@@ -125,12 +105,12 @@ public class Condition {
}
public Object toJson() {
- Map<String, Object> jsonObj = new HashMap<String, Object>();
+ Map<String, Object> jsonObj = new HashMap<>();
jsonObj.put("expression", expression.getExpression());
- List<Object> checkersJson = new ArrayList<Object>();
+ List<Object> checkersJson = new ArrayList<>();
for (ConditionChecker checker : checkers.values()) {
- Map<String, Object> oneChecker = new HashMap<String, Object>();
+ Map<String, Object> oneChecker = new HashMap<>();
oneChecker.put("type", checker.getType());
oneChecker.put("checkerJson", checker.toJson());
checkersJson.add(oneChecker);
@@ -152,7 +132,7 @@ public class Condition {
try {
Map<String, ConditionChecker> checkers =
- new HashMap<String, ConditionChecker>();
+ new HashMap<>();
List<Object> checkersJson = (List<Object>) jsonObj.get("checkers");
for (Object oneCheckerJson : checkersJson) {
Map<String, Object> oneChecker =
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
index edde6d3..6d4312d 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
@@ -59,8 +59,7 @@ public class Trigger {
}
public void updateNextCheckTime() {
- this.nextCheckTime =
- Math.min(triggerCondition.getNextCheckTime(),
+ this.nextCheckTime = Math.min(triggerCondition.getNextCheckTime(),
expireCondition.getNextCheckTime());
}
@@ -346,8 +345,7 @@ public class Trigger {
Trigger trigger = null;
try {
logger.info("Decoding for " + JSONUtils.toJSON(obj));
- Condition triggerCond =
- Condition.fromJson(jsonObj.get("triggerCondition"));
+ Condition triggerCond = Condition.fromJson(jsonObj.get("triggerCondition"));
Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
List<TriggerAction> actions = new ArrayList<TriggerAction>();
List<Object> actionsJson = (List<Object>) jsonObj.get("actions");
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 9f75544..47ea193 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -282,10 +282,17 @@ public class TriggerManager extends EventHandler implements
}
if (t.getStatus().equals(TriggerStatus.READY)) {
- if (t.triggerConditionMet()) {
+
+ /**
+ * Prior to this change, expiration condition should never be called though
+ * we have some related code here. ExpireCondition used the same BasicTimeChecker
+ * as triggerCondition do. As a consequence, we need to figure out a way to distinguish
+ * the previous ExpireCondition and this commit's ExpireCondition.
+ */
+ if (t.getExpireCondition().getExpression().contains("EndTimeChecker") && t.expireConditionMet()) {
+ onTriggerPause(t);
+ } else if (t.triggerConditionMet()) {
onTriggerTrigger(t);
- } else if (t.expireConditionMet()) {
- onTriggerExpire(t);
}
}
if (t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
@@ -312,9 +319,9 @@ public class TriggerManager extends EventHandler implements
logger.error("Failed to do action " + action.getDescription(), th);
}
}
+
if (t.isResetOnTrigger()) {
t.resetTriggerConditions();
- t.resetExpireCondition();
} else {
t.setStatus(TriggerStatus.EXPIRED);
}
@@ -325,7 +332,7 @@ public class TriggerManager extends EventHandler implements
}
}
- private void onTriggerExpire(Trigger t) throws TriggerManagerException {
+ private void onTriggerPause(Trigger t) throws TriggerManagerException {
List<TriggerAction> expireActions = t.getExpireActions();
for (TriggerAction action : expireActions) {
try {
@@ -339,12 +346,8 @@ public class TriggerManager extends EventHandler implements
th);
}
}
- if (t.isResetOnExpire()) {
- t.resetTriggerConditions();
- t.resetExpireCondition();
- } else {
- t.setStatus(TriggerStatus.EXPIRED);
- }
+ logger.info("Pausing Trigger " + t.getDescription());
+ t.setStatus(TriggerStatus.PAUSED);
try {
triggerLoader.updateTrigger(t);
} catch (TriggerLoaderException e) {
diff --git a/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java b/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
index 279c329..0d65bf2 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
@@ -86,31 +86,25 @@ public class ThresholdChecker implements ConditionChecker {
@Override
public Object getNum() {
- // TODO Auto-generated method stub
return null;
}
@Override
public Object toJson() {
- // TODO Auto-generated method stub
return null;
}
@Override
public void stopChecker() {
return;
-
}
@Override
public void setContext(Map<String, Object> context) {
- // TODO Auto-generated method stub
-
}
@Override
public long getNextCheckTime() {
- // TODO Auto-generated method stub
return 0;
}
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
index 4021cc7..a6865fc 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
@@ -16,115 +16,114 @@
package azkaban.trigger;
+import azkaban.executor.ExecutorManager;
+
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.utils.Utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.joda.time.DateTime;
-
+import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import azkaban.utils.Props;
+import static org.mockito.Mockito.*;
+
public class TriggerManagerTest {
- private TriggerLoader triggerLoader;
+ private static TriggerLoader triggerLoader;
+ private static ExecutorManager executorManager;
+ private TriggerManager triggerManager;
- @Before
- public void setup() throws TriggerException, TriggerManagerException {
+ @BeforeClass
+ public static void prepare() {
triggerLoader = new MockTriggerLoader();
-
+ executorManager = mock(ExecutorManager.class);
+ doNothing().when(executorManager).addListener(anyObject());
}
- @After
- public void tearDown() {
-
- }
-
- @Ignore @Test
- public void triggerManagerSimpleTest() throws TriggerManagerException {
+ @Before
+ public void setup() throws TriggerException, TriggerManagerException {
Props props = new Props();
- props.put("trigger.scan.interval", 4000);
- TriggerManager triggerManager =
- new TriggerManager(props, triggerLoader, null);
-
+ props.put("trigger.scan.interval", 300);
+ triggerManager = new TriggerManager(props, triggerLoader, executorManager);
triggerManager.registerCheckerType(ThresholdChecker.type,
ThresholdChecker.class);
triggerManager.registerActionType(DummyTriggerAction.type,
DummyTriggerAction.class);
+ triggerManager.start();
+ }
- ThresholdChecker.setVal(1);
+ @After
+ public void tearDown() {
+ triggerManager.shutdown();
+ }
+
+ @Test
+ public void neverExpireTriggerTest() throws TriggerManagerException {
- triggerManager.insertTrigger(
- createDummyTrigger("test1", "triggerLoader", 10), "testUser");
- List<Trigger> triggers = triggerManager.getTriggers();
- assertTrue(triggers.size() == 1);
- Trigger t1 = triggers.get(0);
+ Trigger t1 = createNeverExpireTrigger("triggerLoader", 10);
+ triggerManager.insertTrigger(t1);
t1.setResetOnTrigger(false);
- triggerManager.updateTrigger(t1, "testUser");
- ThresholdChecker checker1 =
+ ThresholdChecker triggerChecker =
(ThresholdChecker) t1.getTriggerCondition().getCheckers().values()
.toArray()[0];
- assertTrue(t1.getSource().equals("triggerLoader"));
- Trigger t2 =
- createDummyTrigger("test2: add new trigger", "addNewTriggerTest", 20);
- triggerManager.insertTrigger(t2, "testUser");
- ThresholdChecker checker2 =
- (ThresholdChecker) t2.getTriggerCondition().getCheckers().values()
+ BasicTimeChecker expireChecker =
+ (BasicTimeChecker) t1.getExpireCondition().getCheckers().values()
.toArray()[0];
ThresholdChecker.setVal(15);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ sleep(300);
+ sleep(300);
+ assertTrue(triggerChecker.isCheckerMet() == true);
+ assertTrue(expireChecker.eval() == false);
- assertTrue(checker1.isCheckerMet() == false);
- assertTrue(checker2.isCheckerMet() == false);
- assertTrue(checker1.isCheckerReset() == false);
- assertTrue(checker2.isCheckerReset() == false);
+ ThresholdChecker.setVal(25);
+ sleep(300);
+ assertTrue(triggerChecker.isCheckerMet() == true);
+ assertTrue(expireChecker.eval() == false);
+ }
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- assertTrue(checker1.isCheckerMet() == true);
- assertTrue(checker2.isCheckerMet() == false);
- assertTrue(checker1.isCheckerReset() == false);
- assertTrue(checker2.isCheckerReset() == false);
+ @Test
+ public void timeCheckerAndExpireTriggerTest() throws TriggerManagerException {
- ThresholdChecker.setVal(25);
- try {
- Thread.sleep(4000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ long curr = System.currentTimeMillis();
+ Trigger t1 = createPeriodAndEndCheckerTrigger(curr);
+ triggerManager.insertTrigger(t1);
+ t1.setResetOnTrigger(true);
+ BasicTimeChecker expireChecker =
+ (BasicTimeChecker) t1.getExpireCondition().getCheckers().values()
+ .toArray()[0];
+
+ sleep(1000);
- assertTrue(checker1.isCheckerMet() == true);
- assertTrue(checker1.isCheckerReset() == false);
- assertTrue(checker2.isCheckerReset() == true);
+ assertTrue(expireChecker.eval() == false);
+ assertTrue(t1.getStatus() == TriggerStatus.READY);
- triggers = triggerManager.getTriggers();
- assertTrue(triggers.size() == 1);
+ sleep(1000);
+ sleep(1000);
+ sleep(1000);
+ assertTrue(expireChecker.eval() == true);
+ assertTrue(t1.getStatus() == TriggerStatus.PAUSED);
+ sleep(1000);
+ assertTrue(expireChecker.eval() == true);
+ assertTrue(t1.getStatus() == TriggerStatus.PAUSED);
}
- public class MockTriggerLoader implements TriggerLoader {
- private Map<Integer, Trigger> triggers = new HashMap<Integer, Trigger>();
+ public static class MockTriggerLoader implements TriggerLoader {
+ private Map<Integer, Trigger> triggers = new HashMap<>();
private int idIndex = 0;
@Override
@@ -146,7 +145,7 @@ public class TriggerManagerTest {
@Override
public List<Trigger> loadTriggers() {
- return new ArrayList<Trigger>(triggers.values());
+ return new ArrayList<>(triggers.values());
}
@Override
@@ -161,49 +160,81 @@ public class TriggerManagerTest {
// TODO Auto-generated method stub
return null;
}
-
}
- private Trigger createDummyTrigger(String message, String source,
- int threshold) {
-
- Map<String, ConditionChecker> checkers =
- new HashMap<String, ConditionChecker>();
- ConditionChecker checker =
- new ThresholdChecker(ThresholdChecker.type, threshold);
- checkers.put(checker.getId(), checker);
-
- List<TriggerAction> actions = new ArrayList<TriggerAction>();
- TriggerAction act = new DummyTriggerAction(message);
- actions.add(act);
-
- String expr = checker.getId() + ".eval()";
-
- Condition triggerCond = new Condition(checkers, expr);
- Condition expireCond = new Condition(checkers, expr);
+ private void sleep (long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
- Trigger fakeTrigger =
- new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(),
- "azkaban", source, triggerCond, expireCond, actions);
- fakeTrigger.setResetOnTrigger(true);
+ private Trigger createNeverExpireTrigger(String source, int threshold) {
+ Map<String, ConditionChecker> triggerCheckers = new HashMap<>();
+ Map<String, ConditionChecker> expireCheckers = new HashMap<>();
+ ConditionChecker triggerChecker = new ThresholdChecker(ThresholdChecker.type, threshold);
+ ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeCheck_1", 111L,
+ DateTimeZone.UTC, 2536871155000L,false, false,
+ null, null);
+ triggerCheckers.put(triggerChecker.getId(), triggerChecker);
+ expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
+
+ String triggerExpr = triggerChecker.getId() + ".eval()";
+ String expireExpr = endTimeChecker.getId() + ".eval()";
+
+ Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+ Condition expireCond = new Condition(expireCheckers, expireExpr);
+
+ Trigger fakeTrigger = new Trigger("azkaban",
+ source,
+ triggerCond,
+ expireCond,
+ getTriggerActions());
+
+ fakeTrigger.setResetOnTrigger(false);
fakeTrigger.setResetOnExpire(true);
-
return fakeTrigger;
}
- // public class MockCheckerLoader extends CheckerTypeLoader{
- //
- // @Override
- // public void init(Props props) {
- // checkerToClass.put(ThresholdChecker.type, ThresholdChecker.class);
- // }
- // }
- //
- // public class MockActionLoader extends ActionTypeLoader {
- // @Override
- // public void init(Props props) {
- // actionToClass.put(DummyTriggerAction.type, DummyTriggerAction.class);
- // }
- // }
-
-}
+ private Trigger createPeriodAndEndCheckerTrigger(long currMillis) {
+ Map<String, ConditionChecker> triggerCheckers = new HashMap<>();
+ Map<String, ConditionChecker> expireCheckers = new HashMap<>();
+
+ // TODO kunkun-tang: 1 second is the minimum unit for {@link org.joda.time.ReadablePeriod}.
+ // In future, we should use some smaller alternative.
+ ConditionChecker triggerChecker = new BasicTimeChecker("BasicTimeChecker_1",
+ currMillis, DateTimeZone.UTC, true, true,
+ Utils.parsePeriodString("1s"), null);
+
+ // End time is 3 seconds past now.
+ ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeChecker_1", 111L,
+ DateTimeZone.UTC, currMillis + 3000L,false, false,
+ null, null);
+ triggerCheckers.put(triggerChecker.getId(), triggerChecker);
+ expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
+
+ String triggerExpr = triggerChecker.getId() + ".eval()";
+ String expireExpr = endTimeChecker.getId() + ".eval()";
+
+ Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+ Condition expireCond = new Condition(expireCheckers, expireExpr);
+
+ Trigger timeTrigger = new Trigger("azkaban",
+ "",
+ triggerCond,
+ expireCond,
+ getTriggerActions());
+
+ timeTrigger.setResetOnTrigger(false);
+ timeTrigger.setResetOnExpire(true);
+ return timeTrigger;
+ }
+
+ private List<TriggerAction> getTriggerActions() {
+ List<TriggerAction> actions = new ArrayList<>();
+ TriggerAction act = new DummyTriggerAction("");
+ actions.add(act);
+ return actions;
+ }
+}
\ No newline at end of file
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index f013451..c25963c 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,6 +16,7 @@
package azkaban.webapp.servlet;
+import azkaban.Constants;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -655,6 +656,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
+ long endSchedTime = getLongParam(req, "endSchedTime", Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME);
+ try {
+ // Todo kunkun-tang: Need to verify if passed end time is valid.
+ } catch (Exception e) {
+ ret.put("error", "Invalid date and time: " + endSchedTime);
+ return;
+ }
+
ReadablePeriod thePeriod = null;
try {
if (hasParam(req, "is_recurring")
@@ -677,7 +686,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
Schedule schedule =
scheduleManager.scheduleFlow(-1, projectId, projectName, flowName,
- "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(),
+ "ready", firstSchedTime.getMillis(), endSchedTime, firstSchedTime.getZone(),
thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(),
firstSchedTime.getMillis(), user.getUserId(), flowOptions,
slaOptions);
@@ -745,6 +754,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ret.put("error", e.getMessage());
}
+ long endSchedTime = getLongParam(req, "endSchedTime", Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME);
+ try {
+ // Todo kunkun-tang: Need to verify if passed end time is valid.
+ } catch (Exception e) {
+ ret.put("error", "Invalid date and time: " + endSchedTime);
+ return;
+ }
+
ExecutionOptions flowOptions = null;
try {
flowOptions = HttpRequestUtils.parseFlowOptions(req);
@@ -757,7 +774,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
// Because either cronExpression or recurrence exists, we build schedule in the below way.
Schedule schedule = scheduleManager.cronScheduleFlow(-1, projectId, projectName, flowName,
- "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(),
+ "ready", firstSchedTime.getMillis(), endSchedTime, firstSchedTime.getZone(),
DateTime.now().getMillis(), firstSchedTime.getMillis(),
firstSchedTime.getMillis(), user.getUserId(), flowOptions,
slaOptions, cronExpression);