azkaban-aplcache
Changes
azkaban-common/src/main/java/azkaban/trigger/Trigger.java 241(+117 -124)
Details
diff --git a/azkaban-common/src/main/java/azkaban/Constants.java b/azkaban-common/src/main/java/azkaban/Constants.java
index 06c982c..2b228ac 100644
--- a/azkaban-common/src/main/java/azkaban/Constants.java
+++ b/azkaban-common/src/main/java/azkaban/Constants.java
@@ -49,6 +49,9 @@ public class Constants {
public static final int DEFAULT_SSL_PORT_NUMBER = 8443;
public static final int DEFAULT_JETTY_MAX_THREAD_COUNT = 20;
+ // One Schedule's default End Time: 01/01/2050, 00:00:00, UTC
+ public static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
+
public static class ConfigurationKeys {
// These properties are configurable through azkaban.properties
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java b/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
index d495059..4613a4d 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/Schedule.java
@@ -43,6 +43,7 @@ public class Schedule {
private final String projectName;
private final String flowName;
private final long firstSchedTime;
+ private final long endSchedTime;
private final DateTimeZone timezone;
private final long lastModifyTime;
private final ReadablePeriod period;
@@ -56,40 +57,28 @@ public class Schedule {
private ExecutionOptions executionOptions;
private List<SlaOption> slaOptions;
- public Schedule(final int scheduleId, final int projectId, final String projectName,
- final String flowName, final String status, final long firstSchedTime,
- final DateTimeZone timezone, final ReadablePeriod period, final long lastModifyTime,
- final long nextExecTime, final long submitTime, final String submitUser) {
-
- this(scheduleId, projectId, projectName, flowName, status, firstSchedTime,
- timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser,
- null, null, null);
- }
-
- public Schedule(final int scheduleId, final int projectId, final String projectName,
- final String flowName, final String status, final long firstSchedTime,
- final String timezoneId,
- final String period, final long lastModifyTime, final long nextExecTime,
- final long submitTime,
- final String submitUser, final ExecutionOptions executionOptions,
- final List<SlaOption> slaOptions) {
- this(scheduleId, projectId, projectName, flowName, status, firstSchedTime,
- DateTimeZone.forID(timezoneId), parsePeriodString(period),
- lastModifyTime, nextExecTime, submitTime, submitUser, executionOptions,
- slaOptions, null);
- }
-
- public Schedule(final int scheduleId, final int projectId, final String projectName,
- final String flowName, final String status, final long firstSchedTime,
- final DateTimeZone timezone, final ReadablePeriod period, final long lastModifyTime,
- final long nextExecTime, final long submitTime, final String submitUser,
- final ExecutionOptions executionOptions, final List<SlaOption> slaOptions,
- final String cronExpression) {
+ public Schedule(final int scheduleId,
+ final int projectId,
+ final String projectName,
+ final String flowName,
+ final String status,
+ final long firstSchedTime,
+ final long endSchedTime,
+ final DateTimeZone timezone,
+ final ReadablePeriod period,
+ final long lastModifyTime,
+ final long nextExecTime,
+ final long submitTime,
+ final String submitUser,
+ final ExecutionOptions executionOptions,
+ final List<SlaOption> slaOptions,
+ final String cronExpression) {
this.scheduleId = scheduleId;
this.projectId = projectId;
this.projectName = projectName;
this.flowName = flowName;
this.firstSchedTime = firstSchedTime;
+ this.endSchedTime = endSchedTime;
this.timezone = timezone;
this.lastModifyTime = lastModifyTime;
this.period = period;
@@ -112,27 +101,27 @@ public class Schedule {
final int periodInt =
Integer.parseInt(periodStr.substring(0, periodStr.length() - 1));
switch (periodUnit) {
- case 'M':
- period = Months.months(periodInt);
- break;
- case 'w':
- period = Weeks.weeks(periodInt);
- break;
- case 'd':
- period = Days.days(periodInt);
- break;
- case 'h':
- period = Hours.hours(periodInt);
- break;
- case 'm':
- period = Minutes.minutes(periodInt);
- break;
- case 's':
- period = Seconds.seconds(periodInt);
- break;
- default:
- throw new IllegalArgumentException("Invalid schedule period unit '"
- + periodUnit);
+ case 'M':
+ period = Months.months(periodInt);
+ break;
+ case 'w':
+ period = Weeks.weeks(periodInt);
+ break;
+ case 'd':
+ period = Days.days(periodInt);
+ break;
+ case 'h':
+ period = Hours.hours(periodInt);
+ break;
+ case 'm':
+ period = Minutes.minutes(periodInt);
+ break;
+ case 's':
+ period = Seconds.seconds(periodInt);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid schedule period unit '"
+ + periodUnit);
}
return period;
@@ -192,9 +181,7 @@ public class Schedule {
public String toString() {
final String underlying =
- this.projectName + "." + this.flowName + " (" + this.projectId + ")"
- + " to be run at (starting) "
- + new DateTime(
+ this.projectName + "." + this.flowName + " (" + this.projectId + ")" + " to be run at (starting) " + new DateTime(
this.firstSchedTime).toDateTimeISO();
if (this.period == null && this.cronExpression == null) {
return underlying + " non-recurring";
@@ -275,8 +262,9 @@ public class Schedule {
}
if (this.cronExpression != null) {
- final DateTime nextTime = getNextCronRuntime(this.nextExecTime, this.timezone,
- Utils.parseCronExpression(this.cronExpression, this.timezone));
+ final DateTime nextTime = getNextCronRuntime(
+ this.nextExecTime, this.timezone, Utils.parseCronExpression(this.cronExpression,
+ this.timezone));
this.nextExecTime = nextTime.getMillis();
return true;
}
@@ -292,7 +280,7 @@ public class Schedule {
}
private DateTime getNextRuntime(final long scheduleTime, final DateTimeZone timezone,
- final ReadablePeriod period) {
+ final ReadablePeriod period) {
final DateTime now = new DateTime();
DateTime date = new DateTime(scheduleTime).withZone(timezone);
int count = 0;
@@ -315,13 +303,14 @@ public class Schedule {
}
/**
- * @param scheduleTime represents the time when Schedule Servlet receives the Cron Schedule API
- * call.
+ *
+ * @param scheduleTime represents the time when Schedule Servlet receives the Cron Schedule API call.
* @param timezone is always UTC (after 3.1.0)
+ * @param ce
* @return the First Scheduled DateTime to run this flow.
*/
private DateTime getNextCronRuntime(final long scheduleTime, final DateTimeZone timezone,
- final CronExpression ce) {
+ final CronExpression ce) {
Date date = new DateTime(scheduleTime).withZone(timezone).toDate();
if (ce != null) {
@@ -387,4 +376,7 @@ public class Schedule {
return this.skipPastOccurrences;
}
+ public long getEndSchedTime() {
+ return this.endSchedTime;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
index 65ae121..6c97b80 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleLoader.java
@@ -24,8 +24,6 @@ public interface ScheduleLoader {
public void updateSchedule(Schedule s) throws ScheduleManagerException;
- public List<Schedule> loadSchedules() throws ScheduleManagerException;
-
public void removeSchedule(Schedule s) throws ScheduleManagerException;
public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
index 60cfe2b..023d1ed 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/ScheduleManager.java
@@ -41,7 +41,6 @@ import org.joda.time.format.DateTimeFormatter;
* TODO kunkun-tang: When new AZ quartz Scheduler comes, we will remove this class.
*/
public class ScheduleManager implements TriggerAgent {
-
public static final String triggerSource = "SimpleTimeTrigger";
private static final Logger logger = Logger.getLogger(ScheduleManager.class);
private final DateTimeFormatter _dateFormat = DateTimeFormat
@@ -56,6 +55,7 @@ public class ScheduleManager implements TriggerAgent {
/**
* Give the schedule manager a loader class that will properly load the
* schedule.
+ *
*/
public ScheduleManager(final ScheduleLoader loader) {
this.loader = loader;
@@ -95,6 +95,7 @@ public class ScheduleManager implements TriggerAgent {
/**
* Retrieves a copy of the list of schedules.
+ *
*/
public synchronized List<Schedule> getSchedules()
throws ScheduleManagerException {
@@ -105,6 +106,7 @@ public class ScheduleManager implements TriggerAgent {
/**
* Returns the scheduled flow for the flow name
+ *
*/
public Schedule getSchedule(final int projectId, final String flowId)
throws ScheduleManagerException {
@@ -126,6 +128,7 @@ public class ScheduleManager implements TriggerAgent {
/**
* Removes the flow from the schedule if it exists.
+ *
*/
public synchronized void removeSchedule(final Schedule sched) {
final Pair<Integer, String> identityPairMap = sched.getScheduleIdentityPair();
@@ -145,43 +148,50 @@ public class ScheduleManager implements TriggerAgent {
}
public Schedule scheduleFlow(final int scheduleId,
- final int projectId,
- final String projectName,
- final String flowName,
- final String status,
- final long firstSchedTime,
- final DateTimeZone timezone,
- final ReadablePeriod period,
- final long lastModifyTime,
- final long nextExecTime,
- final long submitTime,
- final String submitUser,
- final ExecutionOptions execOptions,
- final List<SlaOption> slaOptions) {
- final Schedule sched =
- new Schedule(scheduleId, projectId, projectName, flowName, status,
- firstSchedTime, timezone, period, lastModifyTime, nextExecTime,
- submitTime, submitUser, execOptions, slaOptions, null);
+ final int projectId,
+ final String projectName,
+ final String flowName,
+ final String status,
+ final long firstSchedTime,
+ final long endSchedTime,
+ final DateTimeZone timezone,
+ final ReadablePeriod period,
+ final long lastModifyTime,
+ final long nextExecTime,
+ final long submitTime,
+ final String submitUser,
+ final ExecutionOptions execOptions,
+ final List<SlaOption> slaOptions) {
+ final Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status,
+ firstSchedTime, endSchedTime, timezone, period, lastModifyTime, nextExecTime,
+ submitTime, submitUser, execOptions, slaOptions, null);
logger
.info("Scheduling flow '" + sched.getScheduleName() + "' for "
- + this._dateFormat.print(firstSchedTime) + " with a period of " + (period == null
- ? "(non-recurring)"
+ + this._dateFormat.print(firstSchedTime) + " with a period of " + (period == null ? "(non-recurring)"
: period));
insertSchedule(sched);
return sched;
}
- public Schedule cronScheduleFlow(final int scheduleId, final int projectId,
- final String projectName, final String flowName, final String status,
- final long firstSchedTime, final DateTimeZone timezone,
- final long lastModifyTime,
- final long nextExecTime, final long submitTime, final String submitUser,
- final ExecutionOptions execOptions, final List<SlaOption> slaOptions,
- final String cronExpression) {
+ public Schedule cronScheduleFlow(final int scheduleId,
+ final int projectId,
+ final String projectName,
+ final String flowName,
+ final String status,
+ final long firstSchedTime,
+ final long endSchedTime,
+ final DateTimeZone timezone,
+ final long lastModifyTime,
+ final long nextExecTime,
+ final long submitTime,
+ final String submitUser,
+ final ExecutionOptions execOptions,
+ final List<SlaOption> slaOptions,
+ final String cronExpression) {
final Schedule sched =
new Schedule(scheduleId, projectId, projectName, flowName, status,
- firstSchedTime, timezone, null, lastModifyTime, nextExecTime,
+ firstSchedTime, endSchedTime, timezone, null, lastModifyTime, nextExecTime,
submitTime, submitUser, execOptions, slaOptions, cronExpression);
logger
.info("Scheduling flow '" + sched.getScheduleName() + "' for "
@@ -190,7 +200,6 @@ public class ScheduleManager implements TriggerAgent {
insertSchedule(sched);
return sched;
}
-
/**
* Schedules the flow, but doesn't save the schedule afterwards.
*/
diff --git a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 7483169..f32b900 100644
--- a/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/azkaban-common/src/main/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -16,6 +16,7 @@
package azkaban.scheduler;
+import azkaban.Constants;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.Trigger;
@@ -43,7 +44,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
private long lastUpdateTime = -1;
public TriggerBasedScheduleLoader(final TriggerManager triggerManager,
- final String triggerSource) {
+ final String triggerSource) {
this.triggerManager = triggerManager;
this.triggerSource = triggerSource;
}
@@ -95,18 +96,14 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
return cond;
}
- // if failed to trigger, auto expire?
private Condition createExpireCondition(final Schedule s) {
- final Map<String, ConditionChecker> checkers =
- new HashMap<>();
- final ConditionChecker checker =
- new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(),
- s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(),
- s.getPeriod(), s.getCronExpression());
+ final Map<String, ConditionChecker> checkers = new HashMap<>();
+ final ConditionChecker checker = new BasicTimeChecker("EndTimeChecker_1", s.getFirstSchedTime(),
+ s.getTimezone(), s.getEndSchedTime(), false, false,
+ null, null);
checkers.put(checker.getId(), checker);
final String expr = checker.getId() + ".eval()";
- final Condition cond = new Condition(checkers, expr);
- return cond;
+ return new Condition(checkers, expr);
}
@Override
@@ -130,33 +127,11 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
}
}
- // TODO may need to add logic to filter out skip runs
- @Override
- public synchronized List<Schedule> loadSchedules()
- throws ScheduleManagerException {
- final List<Trigger> triggers = this.triggerManager.getTriggers(this.triggerSource);
- final List<Schedule> schedules = new ArrayList<>();
- for (final Trigger t : triggers) {
- this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime());
- final Schedule s = triggerToSchedule(t);
- schedules.add(s);
- System.out.println("loaded schedule for "
- + s.getProjectName() + " (project_ID: " + s.getProjectId() + ")");
- }
- return schedules;
+ private Schedule triggerToSchedule(final Trigger t) throws ScheduleManagerException {
- }
+ final BasicTimeChecker triggerTimeChecker = getBasicTimeChecker(t.getTriggerCondition().getCheckers());
+ final BasicTimeChecker endTimeChecker = getBasicTimeChecker(t.getExpireCondition().getCheckers());
- private Schedule triggerToSchedule(final Trigger t) throws ScheduleManagerException {
- final Condition triggerCond = t.getTriggerCondition();
- final Map<String, ConditionChecker> checkers = triggerCond.getCheckers();
- BasicTimeChecker ck = null;
- for (final ConditionChecker checker : checkers.values()) {
- if (checker.getType().equals(BasicTimeChecker.type)) {
- ck = (BasicTimeChecker) checker;
- break;
- }
- }
final List<TriggerAction> actions = t.getActions();
ExecuteFlowAction act = null;
for (final TriggerAction action : actions) {
@@ -165,15 +140,23 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
break;
}
}
- if (ck != null && act != null) {
- final Schedule s =
- new Schedule(t.getTriggerId(), act.getProjectId(),
- act.getProjectName(), act.getFlowName(),
- t.getStatus().toString(), ck.getFirstCheckTime(),
- ck.getTimeZone(), ck.getPeriod(), t.getLastModifyTime(),
- ck.getNextCheckTime(), t.getSubmitTime(), t.getSubmitUser(),
- act.getExecutionOptions(), act.getSlaOptions(), ck.getCronExpression());
- return s;
+ if (triggerTimeChecker != null && act != null) {
+ return new Schedule(t.getTriggerId(),
+ act.getProjectId(),
+ act.getProjectName(),
+ act.getFlowName(),
+ t.getStatus().toString(),
+ triggerTimeChecker.getFirstCheckTime(),
+ endTimeChecker == null? Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME: endTimeChecker.getNextCheckTime(),
+ triggerTimeChecker.getTimeZone(),
+ triggerTimeChecker.getPeriod(),
+ t.getLastModifyTime(),
+ triggerTimeChecker.getNextCheckTime(),
+ t.getSubmitTime(),
+ t.getSubmitUser(),
+ act.getExecutionOptions(),
+ act.getSlaOptions(),
+ triggerTimeChecker.getCronExpression());
} else {
logger.error("Failed to parse schedule from trigger!");
throw new ScheduleManagerException(
@@ -181,6 +164,16 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
}
}
+ // expirecheckers or triggerCheckers only have BasicTimeChecker today. This should be refactored in future.
+ private BasicTimeChecker getBasicTimeChecker(final Map<String, ConditionChecker> checkers) {
+ for (final ConditionChecker checker : checkers.values()) {
+ if (checker.getType().equals(BasicTimeChecker.type)) {
+ return (BasicTimeChecker) checker;
+ }
+ }
+ return null;
+ }
+
@Override
public void removeSchedule(final Schedule s) throws ScheduleManagerException {
try {
@@ -212,7 +205,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime());
final Schedule s = triggerToSchedule(t);
schedules.add(s);
- System.out.println("loaded schedule for "
+ logger.info("loaded schedule for "
+ s.getProjectName() + " (project_ID: " + s.getProjectId() + ")");
}
return schedules;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 05b7316..ad71790 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -28,7 +28,6 @@ import org.quartz.CronExpression;
public class BasicTimeChecker implements ConditionChecker {
-
public static final String type = "BasicTimeChecker";
private final String id;
private final long firstCheckTime;
@@ -41,8 +40,8 @@ public class BasicTimeChecker implements ConditionChecker {
private boolean skipPastChecks = true;
public BasicTimeChecker(final String id, final long firstCheckTime,
- final DateTimeZone timezone, final boolean isRecurring, final boolean skipPastChecks,
- final ReadablePeriod period, final String cronExpression) {
+ final DateTimeZone timezone, final boolean isRecurring, final boolean skipPastChecks,
+ final ReadablePeriod period, final String cronExpression) {
this.id = id;
this.firstCheckTime = firstCheckTime;
this.timezone = timezone;
@@ -56,8 +55,8 @@ public class BasicTimeChecker implements ConditionChecker {
}
public BasicTimeChecker(final String id, final long firstCheckTime,
- final DateTimeZone timezone, final long nextCheckTime, final boolean isRecurring,
- final boolean skipPastChecks, final ReadablePeriod period, final String cronExpression) {
+ final DateTimeZone timezone, final long nextCheckTime, final boolean isRecurring,
+ final boolean skipPastChecks, final ReadablePeriod period, final String cronExpression) {
this.id = id;
this.firstCheckTime = firstCheckTime;
this.timezone = timezone;
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Condition.java b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
index 4490721..beb9820 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Condition.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Condition.java
@@ -30,7 +30,7 @@ public class Condition {
private static final Logger logger = Logger.getLogger(Condition.class);
- private static JexlEngine jexl = new JexlEngine();
+ private static final JexlEngine jexl = new JexlEngine();
private static CheckerTypeLoader checkerLoader = null;
private final MapContext context = new MapContext();
private Expression expression;
@@ -45,20 +45,12 @@ public class Condition {
}
public Condition(final Map<String, ConditionChecker> checkers, final String expr,
- final long nextCheckTime) {
+ final long nextCheckTime) {
this.nextCheckTime = nextCheckTime;
setCheckers(checkers);
this.expression = jexl.createExpression(expr);
}
- public synchronized static void setJexlEngine(final JexlEngine jexl) {
- Condition.jexl = jexl;
- }
-
- protected static CheckerTypeLoader getCheckerLoader() {
- return checkerLoader;
- }
-
public synchronized static void setCheckerLoader(final CheckerTypeLoader loader) {
Condition.checkerLoader = loader;
}
@@ -98,12 +90,6 @@ public class Condition {
return cond;
}
- protected void registerChecker(final ConditionChecker checker) {
- this.checkers.put(checker.getId(), checker);
- this.context.set(checker.getId(), checker);
- updateNextCheckTime();
- }
-
public long getNextCheckTime() {
return this.nextCheckTime;
}
@@ -112,7 +98,7 @@ public class Condition {
return this.checkers;
}
- public void setCheckers(final Map<String, ConditionChecker> checkers) {
+ private void setCheckers(final Map<String, ConditionChecker> checkers) {
this.checkers = checkers;
for (final ConditionChecker checker : checkers.values()) {
this.context.set(checker.getId(), checker);
@@ -120,12 +106,6 @@ public class Condition {
updateNextCheckTime();
}
- public void updateCheckTime(final Long ct) {
- if (this.nextCheckTime < ct) {
- this.nextCheckTime = ct;
- }
- }
-
private void updateNextCheckTime() {
long time = Long.MAX_VALUE;
for (final ConditionChecker checker : this.checkers.values()) {
azkaban-common/src/main/java/azkaban/trigger/Trigger.java 241(+117 -124)
diff --git a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
index c88cd82..8b4014f 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/Trigger.java
@@ -36,10 +36,8 @@ public class Trigger {
private final String source;
private final List<TriggerAction> actions;
private final List<TriggerAction> expireActions;
-
private Condition expireCondition;
private Condition triggerCondition;
-
private int triggerId = -1;
private long lastModifyTime;
private TriggerStatus status = TriggerStatus.READY;
@@ -50,16 +48,15 @@ public class Trigger {
private long nextCheckTime = -1;
- @SuppressWarnings("unused")
private Trigger() throws TriggerManagerException {
throw new TriggerManagerException("Triggers should always be specified");
}
- private Trigger(int triggerId, long lastModifyTime, long submitTime,
- String submitUser, String source, Condition triggerCondition,
- Condition expireCondition, List<TriggerAction> actions,
- List<TriggerAction> expireActions, Map<String, Object> info,
- Map<String, Object> context) {
+ private Trigger(final int triggerId, final long lastModifyTime, final long submitTime,
+ final String submitUser, final String source, final Condition triggerCondition,
+ final Condition expireCondition, final List<TriggerAction> actions,
+ final List<TriggerAction> expireActions, final Map<String, Object> info,
+ final Map<String, Object> context) {
requireNonNull(submitUser);
requireNonNull(source);
requireNonNull(triggerCondition);
@@ -84,75 +81,72 @@ public class Trigger {
return actionTypeLoader;
}
- public static synchronized void setActionTypeLoader(ActionTypeLoader loader) {
+ public static synchronized void setActionTypeLoader(final ActionTypeLoader loader) {
Trigger.actionTypeLoader = loader;
}
- @SuppressWarnings("unchecked")
- public static Trigger fromJson(Object obj) throws Exception {
+ public static Trigger fromJson(final Object obj) throws Exception {
if (actionTypeLoader == null) {
throw new Exception("Trigger Action Type loader not initialized.");
}
-
- Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ final Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
Trigger trigger = null;
try {
logger.info("Decoding for " + JSONUtils.toJSON(obj));
- Condition triggerCond =
- Condition.fromJson(jsonObj.get("triggerCondition"));
- Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
- List<TriggerAction> actions = new ArrayList<>();
- List<Object> actionsJson = (List<Object>) jsonObj.get("actions");
- for (Object actObj : actionsJson) {
- Map<String, Object> oneActionJson = (HashMap<String, Object>) actObj;
- String type = (String) oneActionJson.get("type");
- TriggerAction act =
+ final Condition triggerCond = Condition.fromJson(jsonObj.get("triggerCondition"));
+ final Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
+ final List<TriggerAction> actions = new ArrayList<>();
+ final List<Object> actionsJson = (List<Object>) jsonObj.get("actions");
+ for (final Object actObj : actionsJson) {
+ final Map<String, Object> oneActionJson = (HashMap<String, Object>) actObj;
+ final String type = (String) oneActionJson.get("type");
+ final TriggerAction act =
actionTypeLoader.createActionFromJson(type,
oneActionJson.get("actionJson"));
actions.add(act);
}
- List<TriggerAction> expireActions = new ArrayList<>();
- List<Object> expireActionsJson =
+ final List<TriggerAction> expireActions = new ArrayList<>();
+ final List<Object> expireActionsJson =
(List<Object>) jsonObj.get("expireActions");
- for (Object expireActObj : expireActionsJson) {
- Map<String, Object> oneExpireActionJson =
+ for (final Object expireActObj : expireActionsJson) {
+ final Map<String, Object> oneExpireActionJson =
(HashMap<String, Object>) expireActObj;
- String type = (String) oneExpireActionJson.get("type");
- TriggerAction expireAct =
+ final String type = (String) oneExpireActionJson.get("type");
+ final TriggerAction expireAct =
actionTypeLoader.createActionFromJson(type,
oneExpireActionJson.get("actionJson"));
expireActions.add(expireAct);
}
- boolean resetOnTrigger =
+ final boolean resetOnTrigger =
Boolean.valueOf((String) jsonObj.get("resetOnTrigger"));
- boolean resetOnExpire =
+ final boolean resetOnExpire =
Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
- String submitUser = (String) jsonObj.get("submitUser");
- String source = (String) jsonObj.get("source");
- long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
- long lastModifyTime =
+ final String submitUser = (String) jsonObj.get("submitUser");
+ final String source = (String) jsonObj.get("source");
+ final long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
+ final long lastModifyTime =
Long.valueOf((String) jsonObj.get("lastModifyTime"));
- int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
- TriggerStatus status =
+ final int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
+ final TriggerStatus status =
TriggerStatus.valueOf((String) jsonObj.get("status"));
- Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
+ final Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
Map<String, Object> context =
(Map<String, Object>) jsonObj.get("context");
if (context == null) {
context = new HashMap<>();
}
- for (ConditionChecker checker : triggerCond.getCheckers().values()) {
+ for (final ConditionChecker checker : triggerCond.getCheckers().values()) {
checker.setContext(context);
}
- for (ConditionChecker checker : expireCond.getCheckers().values()) {
+ for (final ConditionChecker checker : expireCond.getCheckers().values()) {
checker.setContext(context);
}
- for (TriggerAction action : actions) {
+ for (final TriggerAction action : actions) {
action.setContext(context);
}
- for (TriggerAction action : expireActions) {
+ for (final TriggerAction action : expireActions) {
action.setContext(context);
}
@@ -172,7 +166,7 @@ public class Trigger {
trigger.setResetOnExpire(resetOnExpire);
trigger.setResetOnTrigger(resetOnTrigger);
trigger.setStatus(status);
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
logger.error("Failed to decode the trigger.", e);
throw new Exception("Failed to decode the trigger.", e);
@@ -182,183 +176,182 @@ public class Trigger {
}
public void updateNextCheckTime() {
- this.nextCheckTime =
- Math.min(triggerCondition.getNextCheckTime(),
- expireCondition.getNextCheckTime());
+ this.nextCheckTime = Math.min(this.triggerCondition.getNextCheckTime(),
+ this.expireCondition.getNextCheckTime());
}
public long getNextCheckTime() {
- return nextCheckTime;
+ return this.nextCheckTime;
}
- public void setNextCheckTime(long nct) {
+ public void setNextCheckTime(final long nct) {
this.nextCheckTime = nct;
}
public long getSubmitTime() {
- return submitTime;
+ return this.submitTime;
}
public String getSubmitUser() {
- return submitUser;
+ return this.submitUser;
}
public TriggerStatus getStatus() {
- return status;
+ return this.status;
}
- public void setStatus(TriggerStatus status) {
+ public void setStatus(final TriggerStatus status) {
this.status = status;
}
public Condition getTriggerCondition() {
- return triggerCondition;
+ return this.triggerCondition;
}
- public void setTriggerCondition(Condition triggerCondition) {
+ public void setTriggerCondition(final Condition triggerCondition) {
this.triggerCondition = triggerCondition;
}
public Condition getExpireCondition() {
- return expireCondition;
+ return this.expireCondition;
}
- public void setExpireCondition(Condition expireCondition) {
+ public void setExpireCondition(final Condition expireCondition) {
this.expireCondition = expireCondition;
- }
+ }
public List<TriggerAction> getActions() {
- return actions;
+ return this.actions;
}
public List<TriggerAction> getExpireActions() {
- return expireActions;
+ return this.expireActions;
}
public Map<String, Object> getInfo() {
- return info;
+ return this.info;
}
- public void setInfo(Map<String, Object> info) {
+ public void setInfo(final Map<String, Object> info) {
this.info = info;
}
public Map<String, Object> getContext() {
- return context;
+ return this.context;
}
- public void setContext(Map<String, Object> context) {
+ public void setContext(final Map<String, Object> context) {
this.context = context;
}
public boolean isResetOnTrigger() {
- return resetOnTrigger;
+ return this.resetOnTrigger;
}
- public void setResetOnTrigger(boolean resetOnTrigger) {
+ public void setResetOnTrigger(final boolean resetOnTrigger) {
this.resetOnTrigger = resetOnTrigger;
}
public boolean isResetOnExpire() {
- return resetOnExpire;
+ return this.resetOnExpire;
}
- public void setResetOnExpire(boolean resetOnExpire) {
+ public void setResetOnExpire(final boolean resetOnExpire) {
this.resetOnExpire = resetOnExpire;
}
public long getLastModifyTime() {
- return lastModifyTime;
+ return this.lastModifyTime;
}
- public void setLastModifyTime(long lastModifyTime) {
+ public void setLastModifyTime(final long lastModifyTime) {
this.lastModifyTime = lastModifyTime;
}
public int getTriggerId() {
- return triggerId;
+ return this.triggerId;
}
- public void setTriggerId(int id) {
+ public void setTriggerId(final int id) {
this.triggerId = id;
}
public boolean triggerConditionMet() {
- return triggerCondition.isMet();
+ return this.triggerCondition.isMet();
}
public boolean expireConditionMet() {
- return expireCondition.isMet();
+ return this.expireCondition.isMet();
}
public void resetTriggerConditions() {
- triggerCondition.resetCheckers();
+ this.triggerCondition.resetCheckers();
updateNextCheckTime();
}
public void resetExpireCondition() {
- expireCondition.resetCheckers();
+ this.expireCondition.resetCheckers();
updateNextCheckTime();
}
public List<TriggerAction> getTriggerActions() {
- return actions;
+ return this.actions;
}
public Map<String, Object> toJson() {
- Map<String, Object> jsonObj = new HashMap<>();
- jsonObj.put("triggerCondition", triggerCondition.toJson());
- jsonObj.put("expireCondition", expireCondition.toJson());
- List<Object> actionsJson = new ArrayList<>();
- for (TriggerAction action : actions) {
- Map<String, Object> oneActionJson = new HashMap<>();
+ final Map<String, Object> jsonObj = new HashMap<>();
+ jsonObj.put("triggerCondition", this.triggerCondition.toJson());
+ jsonObj.put("expireCondition", this.expireCondition.toJson());
+ final List<Object> actionsJson = new ArrayList<>();
+ for (final TriggerAction action : this.actions) {
+ final Map<String, Object> oneActionJson = new HashMap<>();
oneActionJson.put("type", action.getType());
oneActionJson.put("actionJson", action.toJson());
actionsJson.add(oneActionJson);
}
jsonObj.put("actions", actionsJson);
- List<Object> expireActionsJson = new ArrayList<>();
- for (TriggerAction expireAction : expireActions) {
- Map<String, Object> oneExpireActionJson = new HashMap<>();
+ final List<Object> expireActionsJson = new ArrayList<>();
+ for (final TriggerAction expireAction : this.expireActions) {
+ final Map<String, Object> oneExpireActionJson = new HashMap<>();
oneExpireActionJson.put("type", expireAction.getType());
oneExpireActionJson.put("actionJson", expireAction.toJson());
expireActionsJson.add(oneExpireActionJson);
}
jsonObj.put("expireActions", expireActionsJson);
- jsonObj.put("resetOnTrigger", String.valueOf(resetOnTrigger));
- jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
- jsonObj.put("submitUser", submitUser);
- jsonObj.put("source", source);
- jsonObj.put("submitTime", String.valueOf(submitTime));
- jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime));
- jsonObj.put("triggerId", String.valueOf(triggerId));
- jsonObj.put("status", status.toString());
- jsonObj.put("info", info);
- jsonObj.put("context", context);
+ jsonObj.put("resetOnTrigger", String.valueOf(this.resetOnTrigger));
+ jsonObj.put("resetOnExpire", String.valueOf(this.resetOnExpire));
+ jsonObj.put("submitUser", this.submitUser);
+ jsonObj.put("source", this.source);
+ jsonObj.put("submitTime", String.valueOf(this.submitTime));
+ jsonObj.put("lastModifyTime", String.valueOf(this.lastModifyTime));
+ jsonObj.put("triggerId", String.valueOf(this.triggerId));
+ jsonObj.put("status", this.status.toString());
+ jsonObj.put("info", this.info);
+ jsonObj.put("context", this.context);
return jsonObj;
}
public String getSource() {
- return source;
+ return this.source;
}
public String getDescription() {
- StringBuffer actionsString = new StringBuffer();
- for (TriggerAction act : actions) {
+ final StringBuffer actionsString = new StringBuffer();
+ for (final TriggerAction act : this.actions) {
actionsString.append(", ");
actionsString.append(act.getDescription());
}
return "Trigger from " + getSource() + " with trigger condition of "
- + triggerCondition.getExpression() + " and expire condition of "
- + expireCondition.getExpression() + actionsString;
+ + this.triggerCondition.getExpression() + " and expire condition of "
+ + this.expireCondition.getExpression() + actionsString;
}
public void stopCheckers() {
- for (ConditionChecker checker : triggerCondition.getCheckers().values()) {
+ for (final ConditionChecker checker : this.triggerCondition.getCheckers().values()) {
checker.stopChecker();
}
- for (ConditionChecker checker : expireCondition.getCheckers().values()) {
+ for (final ConditionChecker checker : this.expireCondition.getCheckers().values()) {
checker.stopChecker();
}
}
@@ -383,63 +376,63 @@ public class Trigger {
private Map<String, Object> info = new HashMap<>();
private Map<String, Object> context = new HashMap<>();
- public TriggerBuilder(String submitUser,
- String source,
- Condition triggerCondition,
- Condition expireCondition,
- List<TriggerAction> actions) {
+ public TriggerBuilder(final String submitUser,
+ final String source,
+ final Condition triggerCondition,
+ final Condition expireCondition,
+ final List<TriggerAction> actions) {
this.submitUser = submitUser;
this.source = source;
this.triggerCondition = triggerCondition;
this.actions = actions;
this.expireCondition = expireCondition;
- long now = DateTime.now().getMillis();
+ final long now = DateTime.now().getMillis();
this.submitTime = now;
this.lastModifyTime = now;
}
- public TriggerBuilder setId(int id) {
+ public TriggerBuilder setId(final int id) {
this.triggerId = id;
return this;
}
- public TriggerBuilder setSubmitTime(long time) {
+ public TriggerBuilder setSubmitTime(final long time) {
this.submitTime = time;
return this;
}
- public TriggerBuilder setLastModifyTime(long time) {
+ public TriggerBuilder setLastModifyTime(final long time) {
this.lastModifyTime = time;
return this;
}
- public TriggerBuilder setExpireActions(List<TriggerAction> actions) {
+ public TriggerBuilder setExpireActions(final List<TriggerAction> actions) {
this.expireActions = actions;
return this;
}
- public TriggerBuilder setInfo(Map<String, Object> info) {
+ public TriggerBuilder setInfo(final Map<String, Object> info) {
this.info = info;
return this;
}
- public TriggerBuilder setContext(Map<String, Object> context) {
+ public TriggerBuilder setContext(final Map<String, Object> context) {
this.context = context;
return this;
}
public Trigger build() {
- return new Trigger(triggerId,
- lastModifyTime,
- submitTime,
- submitUser,
- source,
- triggerCondition,
- expireCondition,
- actions,
- expireActions,
- info,
- context);
+ return new Trigger(this.triggerId,
+ this.lastModifyTime,
+ this.submitTime,
+ this.submitUser,
+ this.source,
+ this.triggerCondition,
+ this.expireCondition,
+ this.actions,
+ this.expireActions,
+ this.info,
+ this.context);
}
}
diff --git a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
index 2051117..19a0af6 100644
--- a/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
+++ b/azkaban-common/src/main/java/azkaban/trigger/TriggerManager.java
@@ -16,7 +16,6 @@
package azkaban.trigger;
-import azkaban.ServiceProvider;
import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventHandler;
@@ -24,6 +23,7 @@ import azkaban.event.EventListener;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManager;
import azkaban.utils.Props;
+import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@@ -38,7 +38,6 @@ import org.apache.log4j.Logger;
public class TriggerManager extends EventHandler implements
TriggerManagerAdapter {
-
public static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
private static final Logger logger = Logger.getLogger(TriggerManager.class);
private static final Map<Integer, Trigger> triggerIdMap =
@@ -55,32 +54,30 @@ public class TriggerManager extends EventHandler implements
private long runnerThreadIdleTime = -1;
private String scannerStage = "";
- // TODO kunkun-tang: Before apply guice to this class, we should make
- // ExecutorManager guiceable.
- public TriggerManager(final Props props, final TriggerLoader triggerLoader,
- final ExecutorManager executorManager) throws TriggerManagerException {
+ @Inject
+ public TriggerManager(Props props, TriggerLoader triggerLoader,
+ ExecutorManager executorManager) throws TriggerManagerException {
- // TODO kunkun-tang: Doing hack here to allow calling new azkaban-db code. Should fix in future.
- this.triggerLoader = ServiceProvider.SERVICE_PROVIDER.getInstance(TriggerLoader.class);
+ this.triggerLoader = triggerLoader;
- final long scannerInterval =
+ long scannerInterval =
props.getLong("trigger.scan.interval", DEFAULT_SCANNER_INTERVAL_MS);
- this.runnerThread = new TriggerScannerThread(scannerInterval);
+ runnerThread = new TriggerScannerThread(scannerInterval);
- this.checkerTypeLoader = new CheckerTypeLoader();
- this.actionTypeLoader = new ActionTypeLoader();
+ checkerTypeLoader = new CheckerTypeLoader();
+ actionTypeLoader = new ActionTypeLoader();
try {
- this.checkerTypeLoader.init(props);
- this.actionTypeLoader.init(props);
- } catch (final Exception e) {
+ checkerTypeLoader.init(props);
+ actionTypeLoader.init(props);
+ } catch (Exception e) {
throw new TriggerManagerException(e);
}
- Condition.setCheckerLoader(this.checkerTypeLoader);
- Trigger.setActionTypeLoader(this.actionTypeLoader);
+ Condition.setCheckerLoader(checkerTypeLoader);
+ Trigger.setActionTypeLoader(actionTypeLoader);
- executorManager.addListener(this.listener);
+ executorManager.addListener(listener);
logger.info("TriggerManager loaded.");
}
@@ -90,68 +87,68 @@ public class TriggerManager extends EventHandler implements
try {
// expect loader to return valid triggers
- final List<Trigger> triggers = this.triggerLoader.loadTriggers();
- for (final Trigger t : triggers) {
- this.runnerThread.addTrigger(t);
+ List<Trigger> triggers = triggerLoader.loadTriggers();
+ for (Trigger t : triggers) {
+ runnerThread.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
}
- } catch (final Exception e) {
+ } catch (Exception e) {
logger.error(e);
throw new TriggerManagerException(e);
}
- this.runnerThread.start();
+ runnerThread.start();
}
protected CheckerTypeLoader getCheckerLoader() {
- return this.checkerTypeLoader;
+ return checkerTypeLoader;
}
protected ActionTypeLoader getActionLoader() {
- return this.actionTypeLoader;
+ return actionTypeLoader;
}
- public void insertTrigger(final Trigger t) throws TriggerManagerException {
+ public void insertTrigger(Trigger t) throws TriggerManagerException {
logger.info("Inserting trigger " + t + " in TriggerManager");
- synchronized (this.syncObj) {
+ synchronized (syncObj) {
try {
- this.triggerLoader.addTrigger(t);
- } catch (final TriggerLoaderException e) {
+ triggerLoader.addTrigger(t);
+ } catch (TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
- this.runnerThread.addTrigger(t);
+ runnerThread.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
}
}
- public void removeTrigger(final int id) throws TriggerManagerException {
+ public void removeTrigger(int id) throws TriggerManagerException {
logger.info("Removing trigger with id: " + id + " from TriggerManager");
- synchronized (this.syncObj) {
- final Trigger t = triggerIdMap.get(id);
+ synchronized (syncObj) {
+ Trigger t = triggerIdMap.get(id);
if (t != null) {
removeTrigger(triggerIdMap.get(id));
}
}
}
- public void updateTrigger(final Trigger t) throws TriggerManagerException {
+ public void updateTrigger(Trigger t) throws TriggerManagerException {
logger.info("Updating trigger " + t + " in TriggerManager");
- synchronized (this.syncObj) {
- this.runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
- this.runnerThread.addTrigger(t);
+ synchronized (syncObj) {
+ runnerThread.deleteTrigger(triggerIdMap.get(t.getTriggerId()));
+ runnerThread.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
}
}
- public void removeTrigger(final Trigger t) throws TriggerManagerException {
+ public void removeTrigger(Trigger t) throws TriggerManagerException {
logger.info("Removing trigger " + t + " from TriggerManager");
- synchronized (this.syncObj) {
- this.runnerThread.deleteTrigger(t);
+ synchronized (syncObj) {
+ runnerThread.deleteTrigger(t);
triggerIdMap.remove(t.getTriggerId());
try {
t.stopCheckers();
- this.triggerLoader.removeTrigger(t);
- } catch (final TriggerLoaderException e) {
+ triggerLoader.removeTrigger(t);
+ } catch (TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
}
@@ -162,24 +159,24 @@ public class TriggerManager extends EventHandler implements
}
public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
- return this.checkerTypeLoader.getSupportedCheckers();
+ return checkerTypeLoader.getSupportedCheckers();
}
- public Trigger getTrigger(final int triggerId) {
- synchronized (this.syncObj) {
+ public Trigger getTrigger(int triggerId) {
+ synchronized (syncObj) {
return triggerIdMap.get(triggerId);
}
}
- public void expireTrigger(final int triggerId) {
- final Trigger t = getTrigger(triggerId);
+ public void expireTrigger(int triggerId) {
+ Trigger t = getTrigger(triggerId);
t.setStatus(TriggerStatus.EXPIRED);
}
@Override
- public List<Trigger> getTriggers(final String triggerSource) {
- final List<Trigger> triggers = new ArrayList<>();
- for (final Trigger t : triggerIdMap.values()) {
+ public List<Trigger> getTriggers(String triggerSource) {
+ List<Trigger> triggers = new ArrayList<>();
+ for (Trigger t : triggerIdMap.values()) {
if (t.getSource().equals(triggerSource)) {
triggers.add(t);
}
@@ -188,10 +185,10 @@ public class TriggerManager extends EventHandler implements
}
@Override
- public List<Trigger> getTriggerUpdates(final String triggerSource,
- final long lastUpdateTime) throws TriggerManagerException {
- final List<Trigger> triggers = new ArrayList<>();
- for (final Trigger t : triggerIdMap.values()) {
+ public List<Trigger> getTriggerUpdates(String triggerSource,
+ long lastUpdateTime) throws TriggerManagerException {
+ List<Trigger> triggers = new ArrayList<>();
+ for (Trigger t : triggerIdMap.values()) {
if (t.getSource().equals(triggerSource)
&& t.getLastModifyTime() > lastUpdateTime) {
triggers.add(t);
@@ -201,10 +198,10 @@ public class TriggerManager extends EventHandler implements
}
@Override
- public List<Trigger> getAllTriggerUpdates(final long lastUpdateTime)
+ public List<Trigger> getAllTriggerUpdates(long lastUpdateTime)
throws TriggerManagerException {
- final List<Trigger> triggers = new ArrayList<>();
- for (final Trigger t : triggerIdMap.values()) {
+ List<Trigger> triggers = new ArrayList<>();
+ for (Trigger t : triggerIdMap.values()) {
if (t.getLastModifyTime() > lastUpdateTime) {
triggers.add(t);
}
@@ -213,25 +210,25 @@ public class TriggerManager extends EventHandler implements
}
@Override
- public void insertTrigger(final Trigger t, final String user)
+ public void insertTrigger(Trigger t, String user)
throws TriggerManagerException {
insertTrigger(t);
}
@Override
- public void removeTrigger(final int id, final String user) throws TriggerManagerException {
+ public void removeTrigger(int id, String user) throws TriggerManagerException {
removeTrigger(id);
}
@Override
- public void updateTrigger(final Trigger t, final String user)
+ public void updateTrigger(Trigger t, String user)
throws TriggerManagerException {
updateTrigger(t);
}
@Override
public void shutdown() {
- this.runnerThread.shutdown();
+ runnerThread.shutdown();
}
@Override
@@ -240,89 +237,88 @@ public class TriggerManager extends EventHandler implements
}
@Override
- public void registerCheckerType(final String name,
- final Class<? extends ConditionChecker> checker) {
- this.checkerTypeLoader.registerCheckerType(name, checker);
+ public void registerCheckerType(String name,
+ Class<? extends ConditionChecker> checker) {
+ checkerTypeLoader.registerCheckerType(name, checker);
}
@Override
- public void registerActionType(final String name,
- final Class<? extends TriggerAction> action) {
- this.actionTypeLoader.registerActionType(name, action);
+ public void registerActionType(String name,
+ Class<? extends TriggerAction> action) {
+ actionTypeLoader.registerActionType(name, action);
}
private class TriggerScannerThread extends Thread {
-
private final long scannerInterval;
private final BlockingQueue<Trigger> triggers;
private final Map<Integer, ExecutableFlow> justFinishedFlows;
private boolean shutdown = false;
- public TriggerScannerThread(final long scannerInterval) {
- this.triggers = new PriorityBlockingQueue<>(1, new TriggerComparator());
- this.justFinishedFlows = new ConcurrentHashMap<>();
+ public TriggerScannerThread(long scannerInterval) {
+ triggers = new PriorityBlockingQueue<>(1, new TriggerComparator());
+ justFinishedFlows = new ConcurrentHashMap<>();
this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
this.scannerInterval = scannerInterval;
}
public void shutdown() {
logger.error("Shutting down trigger manager thread " + this.getName());
- this.shutdown = true;
+ shutdown = true;
this.interrupt();
}
- public void addJustFinishedFlow(final ExecutableFlow flow) {
- synchronized (TriggerManager.this.syncObj) {
- this.justFinishedFlows.put(flow.getExecutionId(), flow);
+ public void addJustFinishedFlow(ExecutableFlow flow) {
+ synchronized (syncObj) {
+ justFinishedFlows.put(flow.getExecutionId(), flow);
}
}
- public void addTrigger(final Trigger t) {
- synchronized (TriggerManager.this.syncObj) {
+ public void addTrigger(Trigger t) {
+ synchronized (syncObj) {
t.updateNextCheckTime();
- this.triggers.add(t);
+ triggers.add(t);
}
}
- public void deleteTrigger(final Trigger t) {
- this.triggers.remove(t);
+ public void deleteTrigger(Trigger t) {
+ triggers.remove(t);
}
@Override
public void run() {
- while (!this.shutdown) {
- synchronized (TriggerManager.this.syncObj) {
+ while (!shutdown) {
+ synchronized (syncObj) {
try {
- TriggerManager.this.lastRunnerThreadCheckTime = System.currentTimeMillis();
+ lastRunnerThreadCheckTime = System.currentTimeMillis();
- TriggerManager.this.scannerStage =
+ scannerStage =
"Ready to start a new scan cycle at "
- + TriggerManager.this.lastRunnerThreadCheckTime;
+ + lastRunnerThreadCheckTime;
try {
checkAllTriggers();
- this.justFinishedFlows.clear();
- } catch (final Exception e) {
+ justFinishedFlows.clear();
+ } catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
- } catch (final Throwable t) {
+ } catch (Throwable t) {
t.printStackTrace();
logger.error(t.getMessage());
}
- TriggerManager.this.scannerStage = "Done flipping all triggers.";
+ scannerStage = "Done flipping all triggers.";
- TriggerManager.this.runnerThreadIdleTime =
- this.scannerInterval
- - (System.currentTimeMillis() - TriggerManager.this.lastRunnerThreadCheckTime);
+ runnerThreadIdleTime =
+ scannerInterval
+ - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
- if (TriggerManager.this.runnerThreadIdleTime < 0) {
+ if (runnerThreadIdleTime < 0) {
logger.error("Trigger manager thread " + this.getName()
+ " is too busy!");
} else {
- TriggerManager.this.syncObj.wait(TriggerManager.this.runnerThreadIdleTime);
+ syncObj.wait(runnerThreadIdleTime);
}
- } catch (final InterruptedException e) {
+ } catch (InterruptedException e) {
logger.info("Interrupted. Probably to shut down.");
}
}
@@ -331,15 +327,22 @@ public class TriggerManager extends EventHandler implements
private void checkAllTriggers() throws TriggerManagerException {
// sweep through the rest of them
- for (final Trigger t : this.triggers) {
+ for (Trigger t : triggers) {
try {
- TriggerManager.this.scannerStage = "Checking for trigger " + t.getTriggerId();
+ scannerStage = "Checking for trigger " + t.getTriggerId();
if (t.getStatus().equals(TriggerStatus.READY)) {
- if (t.triggerConditionMet()) {
+
+ /**
+ * Prior to this change, expiration condition should never be called though
+ * we have some related code here. ExpireCondition used the same BasicTimeChecker
+ * as triggerCondition do. As a consequence, we need to figure out a way to distinguish
+ * the previous ExpireCondition and this commit's ExpireCondition.
+ */
+ if (t.getExpireCondition().getExpression().contains("EndTimeChecker") && t.expireConditionMet()) {
+ onTriggerPause(t);
+ } else if (t.triggerConditionMet()) {
onTriggerTrigger(t);
- } else if (t.expireConditionMet()) {
- onTriggerExpire(t);
}
}
if (t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
@@ -347,69 +350,64 @@ public class TriggerManager extends EventHandler implements
} else {
t.updateNextCheckTime();
}
- } catch (final Throwable th) {
+ } catch (Throwable th) {
//skip this trigger, moving on to the next one
logger.error("Failed to process trigger with id : " + t, th);
}
}
}
- private void onTriggerTrigger(final Trigger t) throws TriggerManagerException {
- final List<TriggerAction> actions = t.getTriggerActions();
- for (final TriggerAction action : actions) {
+ private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
+ List<TriggerAction> actions = t.getTriggerActions();
+ for (TriggerAction action : actions) {
try {
logger.info("Doing trigger actions " + action.getDescription() + " for " + t);
action.doAction();
- } catch (final Exception e) {
+ } catch (Exception e) {
logger.error("Failed to do action " + action.getDescription() + " for " + t, e);
- } catch (final Throwable th) {
+ } catch (Throwable th) {
logger.error("Failed to do action " + action.getDescription() + " for " + t, th);
}
}
+
if (t.isResetOnTrigger()) {
t.resetTriggerConditions();
- t.resetExpireCondition();
} else {
t.setStatus(TriggerStatus.EXPIRED);
}
try {
- TriggerManager.this.triggerLoader.updateTrigger(t);
- } catch (final TriggerLoaderException e) {
+ triggerLoader.updateTrigger(t);
+ } catch (TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
}
- private void onTriggerExpire(final Trigger t) throws TriggerManagerException {
- final List<TriggerAction> expireActions = t.getExpireActions();
- for (final TriggerAction action : expireActions) {
+ private void onTriggerPause(Trigger t) throws TriggerManagerException {
+ List<TriggerAction> expireActions = t.getExpireActions();
+ for (TriggerAction action : expireActions) {
try {
- logger.info("Doing expire actions for " + action.getDescription() + " for " + t);
+ logger.info("Doing expire actions for "+ action.getDescription() + " for " + t);
action.doAction();
- } catch (final Exception e) {
+ } catch (Exception e) {
logger.error("Failed to do expire action " + action.getDescription() + " for " + t, e);
- } catch (final Throwable th) {
+ } catch (Throwable th) {
logger.error("Failed to do expire action " + action.getDescription() + " for " + t, th);
}
}
- if (t.isResetOnExpire()) {
- t.resetTriggerConditions();
- t.resetExpireCondition();
- } else {
- t.setStatus(TriggerStatus.EXPIRED);
- }
+ logger.info("Pausing Trigger " + t.getDescription());
+ t.setStatus(TriggerStatus.PAUSED);
try {
- TriggerManager.this.triggerLoader.updateTrigger(t);
- } catch (final TriggerLoaderException e) {
+ triggerLoader.updateTrigger(t);
+ } catch (TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
}
private class TriggerComparator implements Comparator<Trigger> {
-
@Override
- public int compare(final Trigger arg0, final Trigger arg1) {
- final long first = arg1.getNextCheckTime();
- final long second = arg0.getNextCheckTime();
+ public int compare(Trigger arg0, Trigger arg1) {
+ long first = arg1.getNextCheckTime();
+ long second = arg0.getNextCheckTime();
if (first == second) {
return 0;
@@ -425,12 +423,12 @@ public class TriggerManager extends EventHandler implements
@Override
public long getLastRunnerThreadCheckTime() {
- return TriggerManager.this.lastRunnerThreadCheckTime;
+ return lastRunnerThreadCheckTime;
}
@Override
public boolean isRunnerThreadActive() {
- return TriggerManager.this.runnerThread.isAlive();
+ return runnerThread.isAlive();
}
@Override
@@ -445,8 +443,8 @@ public class TriggerManager extends EventHandler implements
@Override
public String getTriggerSources() {
- final Set<String> sources = new HashSet<>();
- for (final Trigger t : triggerIdMap.values()) {
+ Set<String> sources = new HashSet<>();
+ for (Trigger t : triggerIdMap.values()) {
sources.add(t.getSource());
}
return sources.toString();
@@ -459,7 +457,7 @@ public class TriggerManager extends EventHandler implements
@Override
public long getScannerIdleTime() {
- return TriggerManager.this.runnerThreadIdleTime;
+ return runnerThreadIdleTime;
}
@Override
@@ -469,24 +467,23 @@ public class TriggerManager extends EventHandler implements
@Override
public String getScannerThreadStage() {
- return TriggerManager.this.scannerStage;
+ return scannerStage;
}
}
private class ExecutorManagerEventListener implements EventListener {
-
public ExecutorManagerEventListener() {
}
@Override
- public void handleEvent(final Event event) {
+ public void handleEvent(Event event) {
// this needs to be fixed for perf
- synchronized (TriggerManager.this.syncObj) {
- final ExecutableFlow flow = (ExecutableFlow) event.getRunner();
+ synchronized (syncObj) {
+ ExecutableFlow flow = (ExecutableFlow) event.getRunner();
if (event.getType() == Type.FLOW_FINISHED) {
logger.info("Flow finish event received. " + flow.getExecutionId());
- TriggerManager.this.runnerThread.addJustFinishedFlow(flow);
+ runnerThread.addJustFinishedFlow(flow);
}
}
}
diff --git a/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java b/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
index dec5fe5..50122a4 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/ThresholdChecker.java
@@ -80,31 +80,25 @@ public class ThresholdChecker implements ConditionChecker {
@Override
public Object getNum() {
- // TODO Auto-generated method stub
return null;
}
@Override
public Object toJson() {
- // TODO Auto-generated method stub
return null;
}
@Override
public void stopChecker() {
return;
-
}
@Override
public void setContext(final Map<String, Object> context) {
- // TODO Auto-generated method stub
-
}
@Override
public long getNextCheckTime() {
- // TODO Auto-generated method stub
return 0;
}
diff --git a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
index b3940bf..fd3958f 100644
--- a/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
+++ b/azkaban-common/src/test/java/azkaban/trigger/TriggerManagerTest.java
@@ -17,140 +17,185 @@
package azkaban.trigger;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import azkaban.executor.ExecutorManager;
+import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.utils.Props;
+import azkaban.utils.Utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TriggerManagerTest {
- private TriggerLoader triggerLoader;
+ private static TriggerLoader triggerLoader;
+ private static ExecutorManager executorManager;
+ private TriggerManager triggerManager;
+
+ @BeforeClass
+ public static void prepare() {
+ triggerLoader = new MockTriggerLoader();
+ executorManager = mock(ExecutorManager.class);
+ doNothing().when(executorManager).addListener(anyObject());
+ }
@Before
public void setup() throws TriggerException, TriggerManagerException {
- this.triggerLoader = new MockTriggerLoader();
-
+ final Props props = new Props();
+ props.put("trigger.scan.interval", 300);
+ this.triggerManager = new TriggerManager(props, triggerLoader, executorManager);
+ this.triggerManager.registerCheckerType(ThresholdChecker.type,
+ ThresholdChecker.class);
+ this.triggerManager.registerActionType(DummyTriggerAction.type,
+ DummyTriggerAction.class);
+ this.triggerManager.start();
}
@After
public void tearDown() {
-
+ this.triggerManager.shutdown();
}
- @Ignore
@Test
- public void triggerManagerSimpleTest() throws TriggerManagerException {
- final Props props = new Props();
- props.put("trigger.scan.interval", 4000);
- final TriggerManager triggerManager =
- new TriggerManager(props, this.triggerLoader, null);
-
- triggerManager.registerCheckerType(ThresholdChecker.type,
- ThresholdChecker.class);
- triggerManager.registerActionType(DummyTriggerAction.type,
- DummyTriggerAction.class);
-
- ThresholdChecker.setVal(1);
+ public void neverExpireTriggerTest() throws TriggerManagerException {
- triggerManager.insertTrigger(
- createDummyTrigger("test1", "triggerLoader", 10), "testUser");
- List<Trigger> triggers = triggerManager.getTriggers();
- assertTrue(triggers.size() == 1);
- final Trigger t1 = triggers.get(0);
+ final Trigger t1 = createNeverExpireTrigger("triggerLoader", 10);
+ this.triggerManager.insertTrigger(t1);
t1.setResetOnTrigger(false);
- triggerManager.updateTrigger(t1, "testUser");
- final ThresholdChecker checker1 =
+ final ThresholdChecker triggerChecker =
(ThresholdChecker) t1.getTriggerCondition().getCheckers().values()
.toArray()[0];
- assertTrue(t1.getSource().equals("triggerLoader"));
- final Trigger t2 =
- createDummyTrigger("test2: add new trigger", "addNewTriggerTest", 20);
- triggerManager.insertTrigger(t2, "testUser");
- final ThresholdChecker checker2 =
- (ThresholdChecker) t2.getTriggerCondition().getCheckers().values()
+ final BasicTimeChecker expireChecker =
+ (BasicTimeChecker) t1.getExpireCondition().getCheckers().values()
.toArray()[0];
ThresholdChecker.setVal(15);
- try {
- Thread.sleep(2000);
- } catch (final InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ sleep(300);
+ sleep(300);
+ assertTrue(triggerChecker.isCheckerMet() == true);
+ assertTrue(expireChecker.eval() == false);
- assertTrue(checker1.isCheckerMet() == false);
- assertTrue(checker2.isCheckerMet() == false);
- assertTrue(checker1.isCheckerReset() == false);
- assertTrue(checker2.isCheckerReset() == false);
+ ThresholdChecker.setVal(25);
+ sleep(300);
+ assertTrue(triggerChecker.isCheckerMet() == true);
+ assertTrue(expireChecker.eval() == false);
+ }
- try {
- Thread.sleep(2000);
- } catch (final InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- assertTrue(checker1.isCheckerMet() == true);
- assertTrue(checker2.isCheckerMet() == false);
- assertTrue(checker1.isCheckerReset() == false);
- assertTrue(checker2.isCheckerReset() == false);
+ @Test
+ public void timeCheckerAndExpireTriggerTest() throws TriggerManagerException {
+
+ final long curr = System.currentTimeMillis();
+ final Trigger t1 = createPeriodAndEndCheckerTrigger(curr);
+ this.triggerManager.insertTrigger(t1);
+ t1.setResetOnTrigger(true);
+ final BasicTimeChecker expireChecker =
+ (BasicTimeChecker) t1.getExpireCondition().getCheckers().values()
+ .toArray()[0];
+
+ sleep(1000);
- ThresholdChecker.setVal(25);
+ assertTrue(expireChecker.eval() == false);
+ assertTrue(t1.getStatus() == TriggerStatus.READY);
+
+ sleep(1000);
+ sleep(1000);
+ sleep(1000);
+ assertTrue(expireChecker.eval() == true);
+ assertTrue(t1.getStatus() == TriggerStatus.PAUSED);
+
+ sleep(1000);
+ assertTrue(expireChecker.eval() == true);
+ assertTrue(t1.getStatus() == TriggerStatus.PAUSED);
+ }
+
+ private void sleep (final long millis) {
try {
- Thread.sleep(4000);
+ Thread.sleep(millis);
} catch (final InterruptedException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
+ }
+
+ private Trigger createNeverExpireTrigger(final String source, final int threshold) {
+ final Map<String, ConditionChecker> triggerCheckers = new HashMap<>();
+ final Map<String, ConditionChecker> expireCheckers = new HashMap<>();
+ final ConditionChecker triggerChecker = new ThresholdChecker(ThresholdChecker.type, threshold);
+ final ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeCheck_1", 111L,
+ DateTimeZone.UTC, 2536871155000L,false, false,
+ null, null);
+ triggerCheckers.put(triggerChecker.getId(), triggerChecker);
+ expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
- assertTrue(checker1.isCheckerMet() == true);
- assertTrue(checker1.isCheckerReset() == false);
- assertTrue(checker2.isCheckerReset() == true);
+ final String triggerExpr = triggerChecker.getId() + ".eval()";
+ final String expireExpr = endTimeChecker.getId() + ".eval()";
- triggers = triggerManager.getTriggers();
- assertTrue(triggers.size() == 1);
+ final Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+ final Condition expireCond = new Condition(expireCheckers, expireExpr);
+ final Trigger fakeTrigger = new Trigger.TriggerBuilder("azkaban",
+ source,
+ triggerCond,
+ expireCond,
+ getTriggerActions()).build();
+
+ fakeTrigger.setResetOnTrigger(false);
+ fakeTrigger.setResetOnExpire(true);
+ return fakeTrigger;
}
- private Trigger createDummyTrigger(final String message, final String source,
- final int threshold) {
+ private Trigger createPeriodAndEndCheckerTrigger(final long currMillis) {
+ final Map<String, ConditionChecker> triggerCheckers = new HashMap<>();
+ final Map<String, ConditionChecker> expireCheckers = new HashMap<>();
- final Map<String, ConditionChecker> checkers =
- new HashMap<>();
- final ConditionChecker checker =
- new ThresholdChecker(ThresholdChecker.type, threshold);
- checkers.put(checker.getId(), checker);
+ // TODO kunkun-tang: 1 second is the minimum unit for {@link org.joda.time.ReadablePeriod}.
+ // In future, we should use some smaller alternative.
+ final ConditionChecker triggerChecker = new BasicTimeChecker("BasicTimeChecker_1",
+ currMillis, DateTimeZone.UTC, true, true,
+ Utils.parsePeriodString("1s"), null);
- final List<TriggerAction> actions = new ArrayList<>();
- final TriggerAction act = new DummyTriggerAction(message);
- actions.add(act);
+ // End time is 3 seconds past now.
+ final ConditionChecker endTimeChecker = new BasicTimeChecker("EndTimeChecker_1", 111L,
+ DateTimeZone.UTC, currMillis + 3000L,false, false,
+ null, null);
+ triggerCheckers.put(triggerChecker.getId(), triggerChecker);
+ expireCheckers.put(endTimeChecker.getId(), endTimeChecker);
- final String expr = checker.getId() + ".eval()";
+ final String triggerExpr = triggerChecker.getId() + ".eval()";
+ final String expireExpr = endTimeChecker.getId() + ".eval()";
- final Condition triggerCond = new Condition(checkers, expr);
- final Condition expireCond = new Condition(checkers, expr);
+ final Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+ final Condition expireCond = new Condition(expireCheckers, expireExpr);
- final Trigger fakeTrigger = new Trigger.TriggerBuilder("azkaban",
- source,
+ final Trigger timeTrigger = new Trigger.TriggerBuilder("azkaban",
+ "",
triggerCond,
expireCond,
- actions).build();
+ getTriggerActions()).build();
- fakeTrigger.setResetOnTrigger(true);
- fakeTrigger.setResetOnExpire(true);
+ timeTrigger.setResetOnTrigger(false);
+ timeTrigger.setResetOnExpire(true);
+ return timeTrigger;
+ }
- return fakeTrigger;
+ private List<TriggerAction> getTriggerActions() {
+ final List<TriggerAction> actions = new ArrayList<>();
+ final TriggerAction act = new DummyTriggerAction("");
+ actions.add(act);
+ return actions;
}
public static class MockTriggerLoader implements TriggerLoader {
-
private final Map<Integer, Trigger> triggers = new HashMap<>();
private int idIndex = 0;
@@ -188,22 +233,5 @@ public class TriggerManagerTest {
// TODO Auto-generated method stub
return null;
}
-
}
-
- // public class MockCheckerLoader extends CheckerTypeLoader{
- //
- // @Override
- // public void init(Props props) {
- // checkerToClass.put(ThresholdChecker.type, ThresholdChecker.class);
- // }
- // }
- //
- // public class MockActionLoader extends ActionTypeLoader {
- // @Override
- // public void init(Props props) {
- // actionToClass.put(DummyTriggerAction.type, DummyTriggerAction.class);
- // }
- // }
-
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
index c6158b2..0b3b4fd 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -16,6 +16,31 @@
package azkaban.webapp.servlet;
+import azkaban.Constants;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.flow.Node;
+import azkaban.project.Project;
+import azkaban.project.ProjectLogEvent.EventType;
+import azkaban.project.ProjectManager;
+import azkaban.scheduler.Schedule;
+import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.ScheduleManagerException;
+import azkaban.server.HttpRequestUtils;
+import azkaban.server.session.Session;
+import azkaban.sla.SlaOption;
+import azkaban.user.Permission;
+import azkaban.user.Permission.Type;
+import azkaban.user.User;
+import azkaban.user.UserManager;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.SplitterOutputStream;
+import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.SchedulerStatistics;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -29,12 +54,10 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
@@ -44,31 +67,6 @@ import org.joda.time.Minutes;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.executor.ExecutorManagerException;
-import azkaban.flow.Flow;
-import azkaban.flow.Node;
-import azkaban.project.Project;
-import azkaban.project.ProjectLogEvent.EventType;
-import azkaban.project.ProjectManager;
-import azkaban.scheduler.Schedule;
-import azkaban.scheduler.ScheduleManager;
-import azkaban.scheduler.ScheduleManagerException;
-import azkaban.server.session.Session;
-import azkaban.server.HttpRequestUtils;
-import azkaban.sla.SlaOption;
-import azkaban.user.Permission;
-import azkaban.user.Permission.Type;
-import azkaban.user.User;
-import azkaban.user.UserManager;
-import azkaban.utils.JSONUtils;
-import azkaban.utils.SplitterOutputStream;
-import azkaban.utils.Utils;
-import azkaban.webapp.AzkabanWebServer;
-import azkaban.webapp.SchedulerStatistics;
-
public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private static final long serialVersionUID = 1L;
private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
@@ -77,17 +75,17 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
private UserManager userManager;
@Override
- public void init(ServletConfig config) throws ServletException {
+ public void init(final ServletConfig config) throws ServletException {
super.init(config);
- AzkabanWebServer server = (AzkabanWebServer) getApplication();
- userManager = server.getUserManager();
- projectManager = server.getProjectManager();
- scheduleManager = server.getScheduleManager();
+ final AzkabanWebServer server = (AzkabanWebServer) getApplication();
+ this.userManager = server.getUserManager();
+ this.projectManager = server.getProjectManager();
+ this.scheduleManager = server.getScheduleManager();
}
@Override
- protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException, IOException {
+ protected void handleGet(final HttpServletRequest req, final HttpServletResponse resp,
+ final Session session) throws ServletException, IOException {
if (hasParam(req, "ajax")) {
handleAJAXAction(req, resp, session);
} else if (hasParam(req, "calendar")) {
@@ -97,11 +95,11 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
}
- private void handleAJAXAction(HttpServletRequest req,
- HttpServletResponse resp, Session session) throws ServletException,
+ private void handleAJAXAction(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
IOException {
- HashMap<String, Object> ret = new HashMap<String, Object>();
- String ajaxName = getParam(req, "ajax");
+ HashMap<String, Object> ret = new HashMap<>();
+ final String ajaxName = getParam(req, "ajax");
if (ajaxName.equals("slaInfo")) {
ajaxSlaInfo(req, ret, session.getUser());
@@ -125,11 +123,11 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret,
- User user) {
+ private void ajaxSetSla(final HttpServletRequest req, final HashMap<String, Object> ret,
+ final User user) {
try {
- int scheduleId = getIntParam(req, "scheduleId");
- Schedule sched = scheduleManager.getSchedule(scheduleId);
+ final int scheduleId = getIntParam(req, "scheduleId");
+ final Schedule sched = this.scheduleManager.getSchedule(scheduleId);
if (sched == null) {
ret.put("error",
"Error loading schedule. Schedule " + scheduleId
@@ -137,25 +135,25 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- Project project = projectManager.getProject(sched.getProjectId());
+ final Project project = this.projectManager.getProject(sched.getProjectId());
if (!hasPermission(project, user, Permission.Type.SCHEDULE)) {
ret.put("error", "User " + user
+ " does not have permission to set SLA for this flow.");
return;
}
- String emailStr = getParam(req, "slaEmails");
- String[] emailSplit = emailStr.split("\\s*,\\s*|\\s*;\\s*|\\s+");
- List<String> slaEmails = Arrays.asList(emailSplit);
+ final String emailStr = getParam(req, "slaEmails");
+ final String[] emailSplit = emailStr.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+ final List<String> slaEmails = Arrays.asList(emailSplit);
- Map<String, String> settings = getParamGroup(req, "settings");
+ final Map<String, String> settings = getParamGroup(req, "settings");
- List<SlaOption> slaOptions = new ArrayList<SlaOption>();
- for (String set : settings.keySet()) {
- SlaOption sla;
+ final List<SlaOption> slaOptions = new ArrayList<>();
+ for (final String set : settings.keySet()) {
+ final SlaOption sla;
try {
sla = parseSlaSetting(settings.get(set));
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ServletException(e);
}
if (sla != null) {
@@ -166,35 +164,35 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
sched.setSlaOptions(slaOptions);
- scheduleManager.insertSchedule(sched);
+ this.scheduleManager.insertSchedule(sched);
if (slaOptions != null) {
- projectManager.postProjectEvent(project, EventType.SLA,
+ this.projectManager.postProjectEvent(project, EventType.SLA,
user.getUserId(), "SLA for flow " + sched.getFlowName()
+ " has been added/changed.");
}
- } catch (ServletException e) {
+ } catch (final ServletException e) {
ret.put("error", e.getMessage());
- } catch (ScheduleManagerException e) {
+ } catch (final ScheduleManagerException e) {
logger.error(e.getMessage(), e);
ret.put("error", e.getMessage());
}
}
- private SlaOption parseSlaSetting(String set) throws ScheduleManagerException {
+ private SlaOption parseSlaSetting(final String set) throws ScheduleManagerException {
logger.info("Tryint to set sla with the following set: " + set);
- String slaType;
- List<String> slaActions = new ArrayList<String>();
- Map<String, Object> slaInfo = new HashMap<String, Object>();
- String[] parts = set.split(",", -1);
- String id = parts[0];
- String rule = parts[1];
- String duration = parts[2];
- String emailAction = parts[3];
- String killAction = parts[4];
+ final String slaType;
+ final List<String> slaActions = new ArrayList<>();
+ final Map<String, Object> slaInfo = new HashMap<>();
+ final String[] parts = set.split(",", -1);
+ final String id = parts[0];
+ final String rule = parts[1];
+ final String duration = parts[2];
+ final String emailAction = parts[3];
+ final String killAction = parts[4];
if (emailAction.equals("true") || killAction.equals("true")) {
if (emailAction.equals("true")) {
slaActions.add(SlaOption.ACTION_ALERT);
@@ -218,16 +216,16 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
}
- ReadablePeriod dur;
+ final ReadablePeriod dur;
try {
dur = parseDuration(duration);
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ScheduleManagerException(
"Unable to parse duration for a SLA that needs to take actions!", e);
}
slaInfo.put(SlaOption.INFO_DURATION, Utils.createPeriodString(dur));
- SlaOption r = new SlaOption(slaType, slaActions, slaInfo);
+ final SlaOption r = new SlaOption(slaType, slaActions, slaInfo);
logger.info("Parsing sla as id:" + id + " type:" + slaType + " rule:"
+ rule + " Duration:" + duration + " actions:" + slaActions);
return r;
@@ -235,22 +233,22 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return null;
}
- private ReadablePeriod parseDuration(String duration) {
- int hour = Integer.parseInt(duration.split(":")[0]);
- int min = Integer.parseInt(duration.split(":")[1]);
+ private ReadablePeriod parseDuration(final String duration) {
+ final int hour = Integer.parseInt(duration.split(":")[0]);
+ final int min = Integer.parseInt(duration.split(":")[1]);
return Minutes.minutes(min + hour * 60).toPeriod();
}
- private void ajaxFetchSchedule(HttpServletRequest req,
- HashMap<String, Object> ret, User user) throws ServletException {
+ private void ajaxFetchSchedule(final HttpServletRequest req,
+ final HashMap<String, Object> ret, final User user) throws ServletException {
- int projectId = getIntParam(req, "projectId");
- String flowId = getParam(req, "flowId");
+ final int projectId = getIntParam(req, "projectId");
+ final String flowId = getParam(req, "flowId");
try {
- Schedule schedule = scheduleManager.getSchedule(projectId, flowId);
+ final Schedule schedule = this.scheduleManager.getSchedule(projectId, flowId);
if (schedule != null) {
- Map<String, Object> jsonObj = new HashMap<String, Object>();
+ final Map<String, Object> jsonObj = new HashMap<>();
jsonObj.put("scheduleId", Integer.toString(schedule.getScheduleId()));
jsonObj.put("submitUser", schedule.getSubmitUser());
jsonObj.put("firstSchedTime",
@@ -262,18 +260,18 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
jsonObj.put("executionOptions", schedule.getExecutionOptions());
ret.put("schedule", jsonObj);
}
- } catch (ScheduleManagerException e) {
+ } catch (final ScheduleManagerException e) {
logger.error(e.getMessage(), e);
ret.put("error", e);
}
}
- private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret,
- User user) {
- int scheduleId;
+ private void ajaxSlaInfo(final HttpServletRequest req, final HashMap<String, Object> ret,
+ final User user) {
+ final int scheduleId;
try {
scheduleId = getIntParam(req, "scheduleId");
- Schedule sched = scheduleManager.getSchedule(scheduleId);
+ final Schedule sched = this.scheduleManager.getSchedule(scheduleId);
if (sched == null) {
ret.put("error",
"Error loading schedule. Schedule " + scheduleId
@@ -281,7 +279,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- Project project =
+ final Project project =
getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
if (project == null) {
ret.put("error",
@@ -290,58 +288,58 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- Flow flow = project.getFlow(sched.getFlowName());
+ final Flow flow = project.getFlow(sched.getFlowName());
if (flow == null) {
ret.put("error", "Error loading flow. Flow " + sched.getFlowName()
+ " doesn't exist in " + sched.getProjectId());
return;
}
- List<SlaOption> slaOptions = sched.getSlaOptions();
- ExecutionOptions flowOptions = sched.getExecutionOptions();
+ final List<SlaOption> slaOptions = sched.getSlaOptions();
+ final ExecutionOptions flowOptions = sched.getExecutionOptions();
if (slaOptions != null && slaOptions.size() > 0) {
ret.put("slaEmails",
slaOptions.get(0).getInfo().get(SlaOption.INFO_EMAIL_LIST));
- List<Object> setObj = new ArrayList<Object>();
- for (SlaOption sla : slaOptions) {
+ final List<Object> setObj = new ArrayList<>();
+ for (final SlaOption sla : slaOptions) {
setObj.add(sla.toWebObject());
}
ret.put("settings", setObj);
} else if (flowOptions != null) {
if (flowOptions.getFailureEmails() != null) {
- List<String> emails = flowOptions.getFailureEmails();
+ final List<String> emails = flowOptions.getFailureEmails();
if (emails.size() > 0) {
ret.put("slaEmails", emails);
}
}
} else {
if (flow.getFailureEmails() != null) {
- List<String> emails = flow.getFailureEmails();
+ final List<String> emails = flow.getFailureEmails();
if (emails.size() > 0) {
ret.put("slaEmails", emails);
}
}
}
- List<String> allJobs = new ArrayList<String>();
- for (Node n : flow.getNodes()) {
+ final List<String> allJobs = new ArrayList<>();
+ for (final Node n : flow.getNodes()) {
allJobs.add(n.getId());
}
ret.put("allJobNames", allJobs);
- } catch (ServletException e) {
+ } catch (final ServletException e) {
ret.put("error", e);
- } catch (ScheduleManagerException e) {
+ } catch (final ScheduleManagerException e) {
logger.error(e.getMessage(), e);
ret.put("error", e);
}
}
- protected Project getProjectAjaxByPermission(Map<String, Object> ret,
- int projectId, User user, Permission.Type type) {
- Project project = projectManager.getProject(projectId);
+ protected Project getProjectAjaxByPermission(final Map<String, Object> ret,
+ final int projectId, final User user, final Permission.Type type) {
+ final Project project = this.projectManager.getProject(projectId);
if (project == null) {
ret.put("error", "Project '" + project + "' not found.");
@@ -356,36 +354,36 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return null;
}
- private void handleGetAllSchedules(HttpServletRequest req,
- HttpServletResponse resp, Session session) throws ServletException,
+ private void handleGetAllSchedules(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
IOException {
- Page page =
+ final Page page =
newPage(req, resp, session,
"azkaban/webapp/servlet/velocity/scheduledflowpage.vm");
- List<Schedule> schedules;
+ final List<Schedule> schedules;
try {
- schedules = scheduleManager.getSchedules();
- } catch (ScheduleManagerException e) {
+ schedules = this.scheduleManager.getSchedules();
+ } catch (final ScheduleManagerException e) {
throw new ServletException(e);
}
page.add("schedules", schedules);
page.render();
}
- private void handleGetScheduleCalendar(HttpServletRequest req,
- HttpServletResponse resp, Session session) throws ServletException,
+ private void handleGetScheduleCalendar(final HttpServletRequest req,
+ final HttpServletResponse resp, final Session session) throws ServletException,
IOException {
- Page page =
+ final Page page =
newPage(req, resp, session,
"azkaban/webapp/servlet/velocity/scheduledflowcalendarpage.vm");
- List<Schedule> schedules;
+ final List<Schedule> schedules;
try {
- schedules = scheduleManager.getSchedules();
- } catch (ScheduleManagerException e) {
+ schedules = this.scheduleManager.getSchedules();
+ } catch (final ScheduleManagerException e) {
throw new ServletException(e);
}
page.add("schedules", schedules);
@@ -393,14 +391,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
@Override
- protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
- Session session) throws ServletException, IOException {
+ protected void handlePost(final HttpServletRequest req, final HttpServletResponse resp,
+ final Session session) throws ServletException, IOException {
if (hasParam(req, "ajax")) {
handleAJAXAction(req, resp, session);
} else {
- HashMap<String, Object> ret = new HashMap<String, Object>();
+ final HashMap<String, Object> ret = new HashMap<>();
if (hasParam(req, "action")) {
- String action = getParam(req, "action");
+ final String action = getParam(req, "action");
if (action.equals("scheduleFlow")) {
ajaxScheduleFlow(req, ret, session.getUser());
} else if (action.equals("scheduleCronFlow")) {
@@ -419,44 +417,44 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
}
- private void ajaxLoadFlows(HttpServletRequest req,
- HashMap<String, Object> ret, User user) throws ServletException {
- List<Schedule> schedules;
+ private void ajaxLoadFlows(final HttpServletRequest req,
+ final HashMap<String, Object> ret, final User user) throws ServletException {
+ final List<Schedule> schedules;
try {
- schedules = scheduleManager.getSchedules();
- } catch (ScheduleManagerException e) {
+ schedules = this.scheduleManager.getSchedules();
+ } catch (final ScheduleManagerException e) {
throw new ServletException(e);
}
// See if anything is scheduled
if (schedules.size() <= 0)
return;
- List<HashMap<String, Object>> output =
- new ArrayList<HashMap<String, Object>>();
+ final List<HashMap<String, Object>> output =
+ new ArrayList<>();
ret.put("items", output);
- for (Schedule schedule : schedules) {
+ for (final Schedule schedule : schedules) {
try {
writeScheduleData(output, schedule);
- } catch (ScheduleManagerException e) {
+ } catch (final ScheduleManagerException e) {
throw new ServletException(e);
}
}
}
- private void writeScheduleData(List<HashMap<String, Object>> output,
- Schedule schedule) throws ScheduleManagerException {
- Map<String, Object> stats =
+ private void writeScheduleData(final List<HashMap<String, Object>> output,
+ final Schedule schedule) throws ScheduleManagerException {
+ final Map<String, Object> stats =
SchedulerStatistics.getStatistics(schedule.getScheduleId(),
(AzkabanWebServer) getApplication());
- HashMap<String, Object> data = new HashMap<String, Object>();
+ final HashMap<String, Object> data = new HashMap<>();
data.put("scheduleid", schedule.getScheduleId());
data.put("flowname", schedule.getFlowName());
data.put("projectname", schedule.getProjectName());
data.put("time", schedule.getFirstSchedTime());
data.put("cron", schedule.getCronExpression());
- DateTime time = DateTime.now();
+ final DateTime time = DateTime.now();
long period = 0;
if (schedule.getPeriod() != null) {
period = time.plus(schedule.getPeriod()).getMillis() - time.getMillis();
@@ -475,26 +473,26 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
output.add(data);
}
- private void ajaxLoadHistory(HttpServletRequest req,
- HttpServletResponse resp, User user) throws ServletException, IOException {
+ private void ajaxLoadHistory(final HttpServletRequest req,
+ final HttpServletResponse resp, final User user) throws ServletException, IOException {
resp.setContentType(JSON_MIME_TYPE);
- long today = DateTime.now().withTime(0, 0, 0, 0).getMillis();
+ final long today = DateTime.now().withTime(0, 0, 0, 0).getMillis();
long startTime = getLongParam(req, "startTime");
- DateTime start = new DateTime(startTime);
+ final DateTime start = new DateTime(startTime);
// Ensure start time is 12:00 AM
startTime = start.withTime(0, 0, 0, 0).getMillis();
boolean useCache = false;
if (startTime < today) {
useCache = true;
}
- long endTime = startTime + 24 * 3600 * 1000;
- int loadAll = getIntParam(req, "loadAll");
+ final long endTime = startTime + 24 * 3600 * 1000;
+ final int loadAll = getIntParam(req, "loadAll");
// Cache file
- String cacheDir =
+ final String cacheDir =
getApplication().getServerProps().getString("cache.directory", "cache");
- File cacheDirFile = new File(cacheDir, "schedule-history");
- File cache = new File(cacheDirFile, startTime + ".cache");
+ final File cacheDirFile = new File(cacheDir, "schedule-history");
+ final File cache = new File(cacheDirFile, startTime + ".cache");
cache.getParentFile().mkdirs();
if (useCache) {
@@ -505,7 +503,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
if (cacheExists) {
// Send the cache instead
- InputStream cacheInput =
+ final InputStream cacheInput =
new BufferedInputStream(new FileInputStream(cache));
try {
IOUtils.copy(cacheInput, resp.getOutputStream());
@@ -519,20 +517,20 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
// Load data if not cached
List<ExecutableFlow> history = null;
try {
- AzkabanWebServer server = (AzkabanWebServer) getApplication();
- ExecutorManagerAdapter executorManager = server.getExecutorManager();
+ final AzkabanWebServer server = (AzkabanWebServer) getApplication();
+ final ExecutorManagerAdapter executorManager = server.getExecutorManager();
history =
executorManager.getExecutableFlows(null, null, null, 0, startTime,
endTime, -1, -1);
- } catch (ExecutorManagerException e) {
+ } catch (final ExecutorManagerException e) {
logger.error(e.getMessage(), e);
}
- HashMap<String, Object> ret = new HashMap<String, Object>();
- List<HashMap<String, Object>> output =
- new ArrayList<HashMap<String, Object>>();
+ final HashMap<String, Object> ret = new HashMap<>();
+ final List<HashMap<String, Object>> output =
+ new ArrayList<>();
ret.put("items", output);
- for (ExecutableFlow historyItem : history) {
+ for (final ExecutableFlow historyItem : history) {
// Check if it is an scheduled execution
if (historyItem.getScheduleId() >= 0 || loadAll != 0) {
writeHistoryData(output, historyItem);
@@ -548,12 +546,12 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
// Create cache file
- File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
+ final File cacheTemp = new File(cacheDirFile, startTime + ".tmp");
cacheTemp.createNewFile();
- OutputStream cacheOutput =
+ final OutputStream cacheOutput =
new BufferedOutputStream(new FileOutputStream(cacheTemp));
try {
- OutputStream outputStream =
+ final OutputStream outputStream =
new SplitterOutputStream(cacheOutput, resp.getOutputStream());
// Write to both the cache file and web output
JSONUtils.toJSON(ret, outputStream, false);
@@ -566,12 +564,12 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
}
- private void writeHistoryData(List<HashMap<String, Object>> output,
- ExecutableFlow history) {
- HashMap<String, Object> data = new HashMap<String, Object>();
+ private void writeHistoryData(final List<HashMap<String, Object>> output,
+ final ExecutableFlow history) {
+ final HashMap<String, Object> data = new HashMap<>();
data.put("scheduleid", history.getScheduleId());
- Project project = projectManager.getProject(history.getProjectId());
+ final Project project = this.projectManager.getProject(history.getProjectId());
data.put("flowname", history.getFlowId());
data.put("projectname", project.getName());
data.put("time", history.getStartTime());
@@ -587,13 +585,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
output.add(data);
}
- private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret,
- User user) throws ServletException {
- int scheduleId = getIntParam(req, "scheduleId");
- Schedule sched;
+ private void ajaxRemoveSched(final HttpServletRequest req, final Map<String, Object> ret,
+ final User user) throws ServletException {
+ final int scheduleId = getIntParam(req, "scheduleId");
+ final Schedule sched;
try {
- sched = scheduleManager.getSchedule(scheduleId);
- } catch (ScheduleManagerException e) {
+ sched = this.scheduleManager.getSchedule(scheduleId);
+ } catch (final ScheduleManagerException e) {
throw new ServletException(e);
}
if (sched == null) {
@@ -602,7 +600,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- Project project = projectManager.getProject(sched.getProjectId());
+ final Project project = this.projectManager.getProject(sched.getProjectId());
if (project == null) {
ret.put("message", "Project " + sched.getProjectId() + " does not exist");
@@ -617,10 +615,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- scheduleManager.removeSchedule(sched);
+ this.scheduleManager.removeSchedule(sched);
logger.info("User '" + user.getUserId() + " has removed schedule "
+ sched.getScheduleName());
- projectManager
+ this.projectManager
.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(),
"Schedule " + sched.toString() + " has been removed.");
@@ -630,13 +628,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- private void ajaxScheduleFlow(HttpServletRequest req,
- HashMap<String, Object> ret, User user) throws ServletException {
- String projectName = getParam(req, "projectName");
- String flowName = getParam(req, "flow");
- int projectId = getIntParam(req, "projectId");
+ private void ajaxScheduleFlow(final HttpServletRequest req,
+ final HashMap<String, Object> ret, final User user) throws ServletException {
+ final String projectName = getParam(req, "projectName");
+ final String flowName = getParam(req, "flow");
+ final int projectId = getIntParam(req, "projectId");
- Project project = projectManager.getProject(projectId);
+ final Project project = this.projectManager.getProject(projectId);
if (project == null) {
ret.put("message", "Project " + projectName + " does not exist");
@@ -650,7 +648,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- Flow flow = project.getFlow(flowName);
+ final Flow flow = project.getFlow(flowName);
if (flow == null) {
ret.put("status", "error");
ret.put("message", "Flow " + flowName + " cannot be found in project "
@@ -658,46 +656,54 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- String scheduleTime = getParam(req, "scheduleTime");
- String scheduleDate = getParam(req, "scheduleDate");
- DateTime firstSchedTime;
+ final String scheduleTime = getParam(req, "scheduleTime");
+ final String scheduleDate = getParam(req, "scheduleDate");
+ final DateTime firstSchedTime;
try {
firstSchedTime = parseDateTime(scheduleDate, scheduleTime);
- } catch (Exception e) {
+ } catch (final Exception e) {
ret.put("error", "Invalid date and/or time '" + scheduleDate + " "
+ scheduleTime);
return;
}
+ final long endSchedTime = getLongParam(req, "endSchedTime", Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME);
+ try {
+ // Todo kunkun-tang: Need to verify if passed end time is valid.
+ } catch (final Exception e) {
+ ret.put("error", "Invalid date and time: " + endSchedTime);
+ return;
+ }
+
ReadablePeriod thePeriod = null;
try {
if (hasParam(req, "is_recurring")
&& getParam(req, "is_recurring").equals("on")) {
thePeriod = Schedule.parsePeriodString(getParam(req, "period"));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
ret.put("error", e.getMessage());
}
ExecutionOptions flowOptions = null;
try {
flowOptions = HttpRequestUtils.parseFlowOptions(req);
- HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
- } catch (Exception e) {
+ HttpRequestUtils.filterAdminOnlyFlowParams(this.userManager, flowOptions, user);
+ } catch (final Exception e) {
ret.put("error", e.getMessage());
}
- List<SlaOption> slaOptions = null;
+ final List<SlaOption> slaOptions = null;
- Schedule schedule =
- scheduleManager.scheduleFlow(-1, projectId, projectName, flowName,
- "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(),
+ final Schedule schedule =
+ this.scheduleManager.scheduleFlow(-1, projectId, projectName, flowName,
+ "ready", firstSchedTime.getMillis(), endSchedTime, firstSchedTime.getZone(),
thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(),
firstSchedTime.getMillis(), user.getUserId(), flowOptions,
slaOptions);
logger.info("User '" + user.getUserId() + "' has scheduled " + "["
+ projectName + flowName + " (" + projectId + ")" + "].");
- projectManager.postProjectEvent(project, EventType.SCHEDULE,
+ this.projectManager.postProjectEvent(project, EventType.SCHEDULE,
user.getUserId(), "Schedule " + schedule.toString()
+ " has been added.");
@@ -711,19 +717,19 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
* This method is in charge of doing cron scheduling.
* @throws ServletException
*/
- private void ajaxScheduleCronFlow(HttpServletRequest req,
- HashMap<String, Object> ret, User user) throws ServletException {
- String projectName = getParam(req, "projectName");
- String flowName = getParam(req, "flow");
+ private void ajaxScheduleCronFlow(final HttpServletRequest req,
+ final HashMap<String, Object> ret, final User user) throws ServletException {
+ final String projectName = getParam(req, "projectName");
+ final String flowName = getParam(req, "flow");
- Project project = projectManager.getProject(projectName);
+ final Project project = this.projectManager.getProject(projectName);
if (project == null) {
ret.put("message", "Project " + projectName + " does not exist");
ret.put("status", "error");
return;
}
- int projectId = project.getId();
+ final int projectId = project.getId();
if (!hasPermission(project, user, Type.SCHEDULE)) {
ret.put("status", "error");
@@ -731,7 +737,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- Flow flow = project.getFlow(flowName);
+ final Flow flow = project.getFlow(flowName);
if (flow == null) {
ret.put("status", "error");
ret.put("message", "Flow " + flowName + " cannot be found in project "
@@ -739,8 +745,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
return;
}
- DateTimeZone timezone = DateTimeZone.getDefault();
- DateTime firstSchedTime = getPresentTimeByTimezone(timezone);
+ final DateTimeZone timezone = DateTimeZone.getDefault();
+ final DateTime firstSchedTime = getPresentTimeByTimezone(timezone);
String cronExpression = null;
try {
@@ -755,30 +761,39 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
}
if(cronExpression == null)
throw new Exception("Cron expression must exist.");
- } catch (Exception e) {
+ } catch (final Exception e) {
ret.put("error", e.getMessage());
}
+ final long endSchedTime = getLongParam(req, "endSchedTime", Constants.DEFAULT_SCHEDULE_END_EPOCH_TIME);
+ try {
+ // Todo kunkun-tang: Need to verify if passed end time is valid.
+ } catch (final Exception e) {
+ ret.put("error", "Invalid date and time: " + endSchedTime);
+ return;
+ }
+
ExecutionOptions flowOptions = null;
try {
flowOptions = HttpRequestUtils.parseFlowOptions(req);
- HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
- } catch (Exception e) {
+ HttpRequestUtils.filterAdminOnlyFlowParams(this.userManager, flowOptions, user);
+ } catch (final Exception e) {
ret.put("error", e.getMessage());
}
- List<SlaOption> slaOptions = null;
+ final List<SlaOption> slaOptions = null;
// Because either cronExpression or recurrence exists, we build schedule in the below way.
- Schedule schedule = scheduleManager.cronScheduleFlow(-1, projectId, projectName, flowName,
- "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(),
+ final Schedule schedule = this.scheduleManager
+ .cronScheduleFlow(-1, projectId, projectName, flowName,
+ "ready", firstSchedTime.getMillis(), endSchedTime, firstSchedTime.getZone(),
DateTime.now().getMillis(), firstSchedTime.getMillis(),
firstSchedTime.getMillis(), user.getUserId(), flowOptions,
slaOptions, cronExpression);
logger.info("User '" + user.getUserId() + "' has scheduled " + "["
+ projectName + flowName + " (" + projectId + ")" + "].");
- projectManager.postProjectEvent(project, EventType.SCHEDULE,
+ this.projectManager.postProjectEvent(project, EventType.SCHEDULE,
user.getUserId(), "Schedule " + schedule.toString()
+ " has been added.");
@@ -787,14 +802,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
ret.put("message", projectName + "." + flowName + " scheduled.");
}
- private DateTime parseDateTime(String scheduleDate, String scheduleTime) {
+ private DateTime parseDateTime(final String scheduleDate, final String scheduleTime) {
// scheduleTime: 12,00,pm,PDT
- String[] parts = scheduleTime.split(",", -1);
+ final String[] parts = scheduleTime.split(",", -1);
int hour = Integer.parseInt(parts[0]);
- int minutes = Integer.parseInt(parts[1]);
- boolean isPm = parts[2].equalsIgnoreCase("pm");
+ final int minutes = Integer.parseInt(parts[1]);
+ final boolean isPm = parts[2].equalsIgnoreCase("pm");
- DateTimeZone timezone =
+ final DateTimeZone timezone =
parts[3].equals("UTC") ? DateTimeZone.UTC : DateTimeZone.getDefault();
// scheduleDate: 02/10/2013
@@ -811,7 +826,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
if (isPm)
hour += 12;
- DateTime firstSchedTime =
+ final DateTime firstSchedTime =
day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
return firstSchedTime;
@@ -821,14 +836,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
* @param cronTimezone represents the timezone from remote API call
* @return if the string is equal to UTC, we return UTC; otherwise, we always return default timezone.
*/
- private DateTimeZone parseTimeZone(String cronTimezone) {
+ private DateTimeZone parseTimeZone(final String cronTimezone) {
if(cronTimezone != null && cronTimezone.equals("UTC"))
return DateTimeZone.UTC;
return DateTimeZone.getDefault();
}
- private DateTime getPresentTimeByTimezone(DateTimeZone timezone) {
+ private DateTime getPresentTimeByTimezone(final DateTimeZone timezone) {
return new DateTime(timezone);
}
}