azkaban-uncached
Changes
src/java/azkaban/scheduler/BasicTimeChecker.java 250(+250 -0)
src/java/azkaban/scheduler/Schedule.java 11(+11 -0)
src/java/azkaban/trigger/Trigger.java 34(+33 -1)
src/java/azkaban/trigger/TriggerManager.java 39(+30 -9)
src/java/azkaban/utils/Utils.java 86(+86 -0)
src/sql/create.triggers.sql 2(+1 -1)
Details
src/java/azkaban/scheduler/BasicTimeChecker.java 250(+250 -0)
diff --git a/src/java/azkaban/scheduler/BasicTimeChecker.java b/src/java/azkaban/scheduler/BasicTimeChecker.java
new file mode 100644
index 0000000..5a59c72
--- /dev/null
+++ b/src/java/azkaban/scheduler/BasicTimeChecker.java
@@ -0,0 +1,250 @@
+package azkaban.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.trigger.ConditionChecker;
+import azkaban.utils.Utils;
+
+public class BasicTimeChecker implements ConditionChecker {
+
+ public static final String type = "BasicTimeChecker";
+
+ private DateTime firstCheckTime;
+ private DateTime nextCheckTime;
+ private DateTimeZone timezone;
+ private boolean isRecurring = true;
+ private boolean skipPastChecks = true;
+ private ReadablePeriod period;
+
+ private String id = type;
+
+ public BasicTimeChecker(
+ String id,
+ DateTime firstCheckTime,
+ DateTimeZone timezone,
+ boolean isRecurring,
+ boolean skipPastChecks,
+ ReadablePeriod period) {
+ this.id = id;
+ this.firstCheckTime = firstCheckTime;
+ this.timezone = timezone;
+ this.isRecurring = isRecurring;
+ this.skipPastChecks = skipPastChecks;
+ this.period = period;
+ this.nextCheckTime = new DateTime(firstCheckTime);
+ this.nextCheckTime = calculateNextCheckTime();
+ }
+
+ public DateTime getFirstCheckTime() {
+ return firstCheckTime;
+ }
+
+ public boolean isRecurring() {
+ return isRecurring;
+ }
+
+ public boolean isSkipPastChecks() {
+ return skipPastChecks;
+ }
+
+ public ReadablePeriod getPeriod() {
+ return period;
+ }
+
+ public DateTime getNextCheckTime() {
+ return nextCheckTime;
+ }
+
+// public BasicTimeChecker(
+// DateTime firstCheckTime,
+// Boolean isRecurring,
+// Boolean skipPastChecks,
+// String period) {
+// this.firstCheckTime = firstCheckTime;
+// this.isRecurring = isRecurring;
+// this.skipPastChecks = skipPastChecks;
+// this.period = parsePeriodString(period);
+// this.nextCheckTime = new DateTime(firstCheckTime);
+// this.nextCheckTime = calculateNextCheckTime();
+// }
+
+ public BasicTimeChecker(
+ String id,
+ DateTime firstCheckTime,
+ DateTimeZone timezone,
+ DateTime nextCheckTime,
+ boolean isRecurring,
+ boolean skipPastChecks,
+ ReadablePeriod period) {
+ this.id = id;
+ this.firstCheckTime = firstCheckTime;
+ this.timezone = timezone;
+ this.nextCheckTime = nextCheckTime;
+ this.isRecurring = isRecurring;
+ this.skipPastChecks = skipPastChecks;
+ this.period = period;
+ }
+
+ @Override
+ public Boolean eval() {
+ return nextCheckTime.isBeforeNow();
+ }
+
+ @Override
+ public void reset() {
+ this.nextCheckTime = calculateNextCheckTime();
+
+ }
+
+ /*
+ * TimeChecker format:
+ * type_first-time-in-millis_next-time-in-millis_timezone_is-recurring_skip-past-checks_period
+ */
+ @Override
+ public String getId() {
+// return getType() + "$" +
+// firstCheckTime.getMillis() + "$" +
+// nextCheckTime.getMillis() + "$" +
+// firstCheckTime.getZone().getID().replace('/', '0') + "$" +
+// //"offset"+firstCheckTime.getZone().getOffset(firstCheckTime.getMillis()) + "_" +
+// (isRecurring == true ? "1" : "0") + "$" +
+// (skipPastChecks == true ? "1" : "0") + "$" +
+// createPeriodString(period);
+ return id;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ConditionChecker createFromJson(Object obj) throws Exception {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ if(!jsonObj.get("type").equals(type)) {
+ throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+ }
+ long firstTimeMillis = Long.valueOf((String)jsonObj.get("firstCheckTime"));
+ String timezoneId = (String) jsonObj.get("timezone");
+ long nextTimeMillis = Long.valueOf((String)jsonObj.get("nextCheckTime"));
+ DateTimeZone timezone = DateTimeZone.forID(timezoneId);
+ DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
+ DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
+ boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
+ boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
+ ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
+ String id = (String) jsonObj.get("id");
+ return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+ }
+
+ public static ConditionChecker createFromJson(HashMap<String, Object> obj) throws Exception {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ if(!jsonObj.get("type").equals(type)) {
+ throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+ }
+ Long firstTimeMillis = Long.valueOf((String) jsonObj.get("firstCheckTime"));
+ String timezoneId = (String) jsonObj.get("timezone");
+ long nextTimeMillis = Long.valueOf((String) jsonObj.get("nextCheckTime"));
+ DateTimeZone timezone = DateTimeZone.forID(timezoneId);
+ DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
+ DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
+ boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
+ boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
+ ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
+ String id = (String) jsonObj.get("id");
+ return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+ }
+
+ @Override
+ public ConditionChecker fromJson(Object obj) throws Exception{
+ return createFromJson(obj);
+ }
+
+// public static ConditionChecker createFromJson(String obj) {
+// String str = (String) obj;
+// String[] parts = str.split("\\$");
+//
+// if(!parts[0].equals(type)) {
+// throw new RuntimeException("Cannot create checker of " + type + " from " + parts[0]);
+// }
+//
+// long firstMillis = Long.parseLong(parts[1]);
+// long nextMillis = Long.parseLong(parts[2]);
+// //DateTimeZone timezone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(parts[3]));
+// DateTimeZone timezone = DateTimeZone.forID(parts[3].replace('0', '/'));
+// boolean isRecurring = parts[4].equals("1") ? true : false;
+// boolean skipPastChecks = parts[5].equals("1") ? true : false;
+// ReadablePeriod period = parsePeriodString(parts[6]);
+//
+// return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
+// }
+//
+// @Override
+// public ConditionChecker fromJson(Object obj) {
+// String str = (String) obj;
+// String[] parts = str.split("_");
+//
+// if(!parts[0].equals(getType())) {
+// throw new RuntimeException("Cannot create checker of " + getType() + " from " + parts[0]);
+// }
+//
+// long firstMillis = Long.parseLong(parts[1]);
+// long nextMillis = Long.parseLong(parts[2]);
+// DateTimeZone timezone = DateTimeZone.forID(parts[3]);
+// boolean isRecurring = Boolean.valueOf(parts[4]);
+// boolean skipPastChecks = Boolean.valueOf(parts[5]);
+// ReadablePeriod period = parsePeriodString(parts[6]);
+//
+// return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
+// }
+
+ private DateTime calculateNextCheckTime(){
+ DateTime date = new DateTime(nextCheckTime);
+ int count = 0;
+ while(!DateTime.now().isBefore(date) && skipPastChecks) {
+ if(count > 100000) {
+ throw new IllegalStateException("100000 increments of period did not get to present time.");
+ }
+
+ if(period == null) {
+ break;
+ }else {
+ date = date.plus(period);
+ }
+ count += 1;
+ }
+ return date;
+ }
+
+ @Override
+ public Object getNum() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("type", type);
+ jsonObj.put("firstCheckTime", String.valueOf(firstCheckTime.getMillis()));
+ jsonObj.put("timezone", timezone.getID());
+ jsonObj.put("nextCheckTime", String.valueOf(nextCheckTime.getMillis()));
+ jsonObj.put("isRecurrint", String.valueOf(isRecurring));
+ jsonObj.put("skipPastChecks", String.valueOf(skipPastChecks));
+ jsonObj.put("period", Utils.createPeriodString(period));
+ jsonObj.put("id", id);
+
+ return jsonObj;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+}
src/java/azkaban/scheduler/Schedule.java 11(+11 -0)
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index f11c0fe..4a85881 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -54,6 +54,8 @@ public class Schedule{
private String status;
private long submitTime;
+ private boolean skipPastOccurrences = true;
+
private ExecutionOptions executionOptions;
private SlaOptions slaOptions;
@@ -376,4 +378,13 @@ public class Schedule{
this.slaOptions = slaOptions;
}
}
+
+ public boolean isRecurring() {
+ return period == null ? false : true;
+ }
+
+ public boolean skipPastOccurrences() {
+ return skipPastOccurrences;
+ }
+
}
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
new file mode 100644
index 0000000..05f7614
--- /dev/null
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -0,0 +1,147 @@
+package azkaban.scheduler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+
+import azkaban.actions.ExecuteFlowAction;
+import azkaban.executor.ExecutorManager;
+import azkaban.project.ProjectManager;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerManager;
+
+public class TriggerBasedScheduleLoader implements ScheduleLoader {
+
+ private static Logger logger = Logger.getLogger(TriggerBasedScheduleLoader.class);
+
+ private TriggerManager triggerManager;
+
+ public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManager executorManager, ProjectManager projectManager) {
+ this.triggerManager = triggerManager;
+ // need to init the action types and condition checker types
+ ExecuteFlowAction.setExecutorManager(executorManager);
+ ExecuteFlowAction.setProjectManager(projectManager);
+ }
+
+ private Trigger createScheduleTrigger(Schedule s) {
+
+ Condition triggerCondition = createTimeTriggerCondition(s);
+ Condition expireCondition = createTimeExpireCondition(s);
+ List<TriggerAction> actions = createActions(s);
+ Trigger t = new Trigger(s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), "TriggerBasedScheduler", triggerCondition, expireCondition, actions);
+
+ return t;
+ }
+
+ private List<TriggerAction> createActions (Schedule s) {
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ TriggerAction act = new ExecuteFlowAction(s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions());
+ actions.add(act);
+ return actions;
+ }
+
+ private Condition createTimeTriggerCondition (Schedule s) {
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", new DateTime(s.getFirstSchedTime()), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+ checkers.put(checker.getId(), checker);
+ String expr = checker.getId() + ".eval()";
+ Condition cond = new Condition(checkers, expr);
+ return cond;
+ }
+
+ // if failed to trigger, auto expire?
+ private Condition createTimeExpireCondition (Schedule s) {
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", new DateTime(s.getFirstSchedTime()), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+ checkers.put(checker.getId(), checker);
+ String expr = checker.getId() + ".eval()";
+ Condition cond = new Condition(checkers, expr);
+ return cond;
+ }
+
+ @Override
+ public void insertSchedule(Schedule s) throws ScheduleManagerException {
+ Trigger t = createScheduleTrigger(s);
+ triggerManager.insertTrigger(t);
+ }
+
+ @Override
+ public void updateSchedule(Schedule s) throws ScheduleManagerException {
+
+
+ }
+
+ @Override
+ public List<Schedule> loadSchedules() throws ScheduleManagerException {
+ List<Trigger> triggers = triggerManager.getTriggers();
+ List<Schedule> schedules = new ArrayList<Schedule>();
+ for(Trigger t : triggers) {
+ if(t.getSource().equals("TriggerBasedScheduler")) {
+ Schedule s = triggerToSchedule(t);
+ schedules.add(s);
+ }
+ }
+ return schedules;
+
+ }
+
+ private Schedule triggerToSchedule(Trigger t) throws ScheduleManagerException {
+ Condition triggerCond = t.getTriggerCondition();
+ Map<String, ConditionChecker> checkers = triggerCond.getCheckers();
+ BasicTimeChecker ck = null;
+ for(ConditionChecker checker : checkers.values()) {
+ if(checker.getType().equals(BasicTimeChecker.type)) {
+ ck = (BasicTimeChecker) checker;
+ break;
+ }
+ }
+ List<TriggerAction> actions = t.getActions();
+ ExecuteFlowAction act = null;
+ for(TriggerAction action : actions) {
+ if(action.getType().equals(ExecuteFlowAction.type)) {
+ act = (ExecuteFlowAction) action;
+ break;
+ }
+ }
+ if(ck != null && act != null) {
+ Schedule s = new Schedule(
+ t.getTriggerId(),
+ act.getProjectId(),
+ act.getProjectName(),
+ act.getFlowName(),
+ "ready",
+ ck.getFirstCheckTime().getMillis(),
+ ck.getFirstCheckTime().getZone(),
+ ck.getPeriod(),
+ t.getLastModifyTime(),
+ ck.getNextCheckTime().getMillis(),
+ t.getSubmitTime(),
+ t.getSubmitUser());
+ return s;
+ } else {
+ logger.error("Failed to parse schedule from trigger!");
+ throw new ScheduleManagerException("Failed to parse schedule from trigger!");
+ }
+ }
+
+ @Override
+ public void removeSchedule(Schedule s) throws ScheduleManagerException {
+ triggerManager.removeTrigger(s.getScheduleId());
+
+ }
+
+ @Override
+ public void updateNextExecTime(Schedule s)
+ throws ScheduleManagerException {
+ logger.error("no longer doing it here.");
+ throw new ScheduleManagerException("No longer updating execution time in scheduler");
+ }
+
+}
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
new file mode 100644
index 0000000..1d0f2a6
--- /dev/null
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -0,0 +1,216 @@
+package azkaban.scheduler;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import azkaban.actions.ExecuteFlowAction;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.project.ProjectManager;
+import azkaban.sla.SLAManager;
+import azkaban.sla.SlaOptions;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerManager;
+import azkaban.utils.Pair;
+
+/**
+ * The ScheduleManager stores and executes the schedule. It uses a single thread
+ * instead and waits until correct loading time for the flow. It will not remove
+ * the flow from the schedule when it is run, which can potentially allow the
+ * flow to and overlap each other.
+ */
+public class TriggerBasedScheduler {
+ private static Logger logger = Logger.getLogger(TriggerBasedScheduler.class);
+
+ private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
+ private ScheduleLoader loader;
+
+ private Map<Pair<Integer, String>, Schedule> scheduleIDMap = new HashMap<Pair<Integer, String>, Schedule>();
+ private Map<Integer, Pair<Integer, String>> idFlowMap = new HashMap<Integer, Pair<Integer,String>>();
+
+ private final ExecutorManager executorManager;
+ private final ProjectManager projectManager;
+ private final SLAManager slaManager;
+ private final TriggerManager triggerManager;
+
+ /**
+ * Give the schedule manager a loader class that will properly load the
+ * schedule.
+ *
+ * @param loader
+ */
+ public TriggerBasedScheduler(ExecutorManager executorManager,
+ ProjectManager projectManager,
+ SLAManager slaManager,
+ TriggerManager triggerManager,
+ ScheduleLoader loader)
+ {
+ this.executorManager = executorManager;
+ this.projectManager = projectManager;
+ this.slaManager = slaManager;
+ this.triggerManager = triggerManager;
+ this.loader = loader;
+
+ List<Schedule> scheduleList = null;
+ try {
+ scheduleList = loader.loadSchedules();
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
+ e.printStackTrace();
+ }
+
+ }
+
+ /**
+ * Retrieves a copy of the list of schedules.
+ *
+ * @return
+ */
+ public synchronized List<Schedule> getSchedules() {
+ return new ArrayList<Schedule>(scheduleIDMap.values());
+ }
+
+ /**
+ * Returns the scheduled flow for the flow name
+ *
+ * @param id
+ * @return
+ */
+ public Schedule getSchedule(int projectId, String flowId) {
+ return scheduleIDMap.get(new Pair<Integer,String>(projectId, flowId));
+ }
+
+ /**
+ * Removes the flow from the schedule if it exists.
+ *
+ * @param id
+ */
+ public synchronized void removeSchedule(int projectId, String flowId) {
+ Schedule s = getSchedule(projectId, flowId);
+ if(s != null) {
+ removeSchedule(s);
+ }
+ }
+
+ /**
+ * Removes the flow from the schedule if it exists.
+ *
+ * @param id
+ */
+ public synchronized void removeSchedule(Schedule sched) {
+
+ Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
+ Schedule s = scheduleIDMap.get(identityPairMap);
+ if(s != null) {
+ scheduleIDMap.remove(sched.getScheduleId());
+ }
+
+ try {
+ loader.removeSchedule(sched);
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public Schedule scheduleFlow(
+ final int scheduleId,
+ final int projectId,
+ final String projectName,
+ final String flowName,
+ final String status,
+ final long firstSchedTime,
+ final DateTimeZone timezone,
+ final ReadablePeriod period,
+ final long lastModifyTime,
+ final long nextExecTime,
+ final long submitTime,
+ final String submitUser
+ ) {
+ return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+ }
+
+ public Schedule scheduleFlow(
+ final int scheduleId,
+ final int projectId,
+ final String projectName,
+ final String flowName,
+ final String status,
+ final long firstSchedTime,
+ final DateTimeZone timezone,
+ final ReadablePeriod period,
+ final long lastModifyTime,
+ final long nextExecTime,
+ final long submitTime,
+ final String submitUser,
+ ExecutionOptions execOptions,
+ SlaOptions slaOptions
+ ) {
+ Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
+ logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
+ + _dateFormat.print(firstSchedTime) + " with a period of "
+ + period == null ? "(non-recurring)" : period);
+
+ insertSchedule(sched);
+ return sched;
+ }
+
+
+ /**
+ * Adds a flow to the schedule.
+ *
+ * @param flow
+ */
+ public synchronized void insertSchedule(Schedule s) {
+ boolean exist = s.getScheduleId() != -1;
+ if(s.updateTime()) {
+ try {
+ if(!exist) {
+ loader.insertSchedule(s);
+ scheduleIDMap.put(s.getScheduleIdentityPair(), s);
+ }
+ else{
+ loader.updateSchedule(s);
+ scheduleIDMap.put(s.getScheduleIdentityPair(), s);
+ }
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ else {
+ logger.error("The provided schedule is non-recurring and the scheduled time already passed. " + s.getScheduleName());
+ }
+ }
+
+
+}
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 87773ed..38ee266 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -13,6 +13,7 @@ import java.util.Map;
import org.apache.log4j.Logger;
+import azkaban.actions.ExecuteFlowAction;
import azkaban.utils.Props;
import azkaban.utils.Utils;
@@ -36,6 +37,13 @@ public class ActionTypeLoader {
}
+ public synchronized void registerActionType(String type, Class<? extends TriggerAction> actionClass) {
+ logger.info("Registering action " + type);
+ if(!actionToClass.containsKey(type)) {
+ actionToClass.put(type, actionClass);
+ }
+ }
+
private void loadPluginActions(Props props) throws TriggerException {
String checkerDir = props.getString("azkaban.trigger.action.plugin.dir", DEFAULT_TRIGGER_ACTION_PLUGIN_DIR);
File pluginDir = new File(checkerDir);
@@ -128,6 +136,7 @@ public class ActionTypeLoader {
private void loadDefaultActions() {
actionToClass.put(ExecuteFlowAction.type, ExecuteFlowAction.class);
+ logger.info("Loaded ExecuteFlowAction type.");
}
public TriggerAction createActionFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index 18e3e7b..f67655d 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -13,6 +13,7 @@ import java.util.Map;
import org.apache.log4j.Logger;
+import azkaban.scheduler.BasicTimeChecker;
import azkaban.utils.Props;
import azkaban.utils.Utils;
@@ -38,6 +39,13 @@ public class CheckerTypeLoader {
}
+ public synchronized void registerCheckerType(String type, Class<? extends ConditionChecker> checkerClass) {
+ logger.info("Registering checker " + type);
+ if(!checkerToClass.containsKey(type)) {
+ checkerToClass.put(type, checkerClass);
+ }
+ }
+
private void loadPluginCheckers(Props props) throws TriggerException {
String checkerDir = props.getString("azkaban.condition.checker.plugin.dir", DEFAULT_CONDITION_CHECKER_PLUGIN_DIR);
@@ -130,7 +138,8 @@ public class CheckerTypeLoader {
}
private void loadDefaultCheckers() {
- checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);
+ checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);
+ logger.info("Loaded BasicTimeChecker type.");
}
public ConditionChecker createCheckerFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index 4c8e078..a7b311a 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -4,14 +4,12 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.jexl2.Expression;
import org.apache.commons.jexl2.JexlEngine;
import org.apache.commons.jexl2.MapContext;
import org.apache.log4j.Logger;
-import com.sun.swing.internal.plaf.synth.resources.synth;
public class Condition {
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index ad7b0c6..20b3075 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -9,11 +9,13 @@ public interface ConditionChecker {
void reset();
+ void setId(String id);
+
String getId();
String getType();
- ConditionChecker fromJson(Object obj);
+ ConditionChecker fromJson(Object obj) throws Exception;
Object toJson();
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
index 91bf85e..e76cc6a 100644
--- a/src/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -44,13 +44,13 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
private static final String triggerTblName = "triggers";
private static String GET_ALL_TRIGGERS =
- "SELECT trigger_id, enc_type, data FROM " + triggerTblName;
+ "SELECT trigger_id, modify_time, enc_type, data FROM " + triggerTblName;
private static String GET_TRIGGER =
- "SELECT trigger_id, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
+ "SELECT trigger_id, modify_time, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
private static String ADD_TRIGGER =
- "INSERT INTO " + triggerTblName + " ( modify_time, enc_type, data) values (?,?,?)";
+ "INSERT INTO " + triggerTblName + " ( modify_time) values (?)";
private static String REMOVE_TRIGGER =
"DELETE FROM " + triggerTblName + " WHERE trigger_id=?";
@@ -128,37 +128,22 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
private synchronized void addTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerManagerException {
- String json = JSONUtils.toJSON(t.toJson());
- byte[] data = null;
- try {
- byte[] stringData = json.getBytes("UTF-8");
- data = stringData;
-
- if (encType == EncodingType.GZIP) {
- data = GZIPUtils.gzipBytes(stringData);
- }
- logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
- }
- catch (IOException e) {
- throw new TriggerManagerException("Error encoding the trigger " + t.toString());
- }
-
QueryRunner runner = new QueryRunner();
- long submitTime = System.currentTimeMillis();
long id;
try {
- runner.update(connection, ADD_TRIGGER, DateTime.now().getMillis(), encType.getNumVal(), data);
+ runner.update(connection, ADD_TRIGGER, DateTime.now().getMillis());
connection.commit();
id = runner.query(connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());
if (id == -1l) {
throw new TriggerManagerException("trigger id is not properly created.");
}
- logger.info("uploaded trigger " + t.toString());
- t.setTriggerId((int)id);
+ t.setTriggerId((int)id);
+ updateTrigger(t);
+ logger.info("uploaded trigger " + t.toString());
} catch (SQLException e) {
throw new TriggerManagerException("Error creating trigger.", e);
}
@@ -243,7 +228,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
ArrayList<Trigger> triggers = new ArrayList<Trigger>();
do {
int triggerId = rs.getInt(1);
- long modifyTime = rs.getInt(2);
+ long modifyTime = rs.getLong(2);
int encodingType = rs.getInt(3);
byte[] data = rs.getBytes(4);
src/java/azkaban/trigger/Trigger.java 34(+33 -1)
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index 240411b..800606d 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -16,6 +16,8 @@ public class Trigger {
private long submitTime = -1;
private String submitUser;
+ private String source;
+
private Condition triggerCondition;
private Condition expireCondition;
private List<TriggerAction> actions;
@@ -30,16 +32,38 @@ public class Trigger {
throw new TriggerManagerException("Triggers should always be specified");
}
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ public String getSubmitUser() {
+ return submitUser;
+ }
+
+ public Condition getTriggerCondition() {
+ return triggerCondition;
+ }
+
+ public Condition getExpireCondition() {
+ return expireCondition;
+ }
+
+ public List<TriggerAction> getActions() {
+ return actions;
+ }
+
public Trigger(
long lastModifyTime,
long submitTime,
String submitUser,
+ String source,
Condition triggerCondition,
Condition expireCondition,
List<TriggerAction> actions) {
this.lastModifyTime = lastModifyTime;
this.submitTime = submitTime;
this.submitUser = submitUser;
+ this.source = source;
this.triggerCondition = triggerCondition;
this.expireCondition = expireCondition;
this.actions = actions;
@@ -50,6 +74,7 @@ public class Trigger {
long lastModifyTime,
long submitTime,
String submitUser,
+ String source,
Condition triggerCondition,
Condition expireCondition,
List<TriggerAction> actions) {
@@ -57,6 +82,7 @@ public class Trigger {
this.lastModifyTime = lastModifyTime;
this.submitTime = submitTime;
this.submitUser = submitUser;
+ this.source = source;
this.triggerCondition = triggerCondition;
this.expireCondition = expireCondition;
this.actions = actions;
@@ -125,6 +151,7 @@ public class Trigger {
jsonObj.put("resetOnTrigger", String.valueOf(resetOnTrigger));
jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
jsonObj.put("submitUser", submitUser);
+ jsonObj.put("source", source);
jsonObj.put("submitTime", String.valueOf(submitTime));
jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime));
jsonObj.put("triggerId", String.valueOf(triggerId));
@@ -133,6 +160,10 @@ public class Trigger {
}
+ public String getSource() {
+ return source;
+ }
+
@SuppressWarnings("unchecked")
public static Trigger fromJson(Object obj) {
@@ -153,10 +184,11 @@ public class Trigger {
boolean resetOnTrigger = Boolean.valueOf((String) jsonObj.get("resetOnTrigger"));
boolean resetOnExpire = Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
String submitUser = (String) jsonObj.get("submitUser");
+ String source = (String) jsonObj.get("source");
long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
long lastModifyTime = Long.valueOf((String) jsonObj.get("lastModifyTime"));
int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
- trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, triggerCond, expireCond, actions);
+ trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions);
trigger.setResetOnExpire(resetOnExpire);
trigger.setResetOnTrigger(resetOnTrigger);
}catch(Exception e) {
src/java/azkaban/trigger/TriggerManager.java 39(+30 -9)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 6f613cc..393c974 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -1,7 +1,9 @@
package azkaban.trigger;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -16,12 +18,19 @@ public class TriggerManager {
private final long DEFAULT_TRIGGER_EXPIRE_TIME = 24*60*60*1000L;
- private List<Trigger> triggers;
+ private Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
+
+ private CheckerTypeLoader checkerLoader;
+ private ActionTypeLoader actionLoader;
TriggerScannerThread scannerThread;
public TriggerManager(Props props, TriggerLoader triggerLoader, CheckerTypeLoader checkerLoader, ActionTypeLoader actionLoader) {
+ this.checkerLoader = checkerLoader;
+ this.actionLoader = actionLoader;
+ scannerThread = new TriggerScannerThread("TriggerScannerThread");
+
// load plugins
try{
checkerLoader.init(props);
@@ -36,33 +45,45 @@ public class TriggerManager {
try{
// expect loader to return valid triggers
- triggers = triggerLoader.loadTriggers();
+ List<Trigger> triggers = triggerLoader.loadTriggers();
+ for(Trigger t : triggers) {
+ scannerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
+ }
}catch(Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
}
- scannerThread = new TriggerScannerThread("TriggerScannerThread");
- for(Trigger t : triggers) {
- scannerThread.addTrigger(t);
- }
scannerThread.start();
}
+ public CheckerTypeLoader getCheckerLoader() {
+ return checkerLoader;
+ }
+
+ public ActionTypeLoader getActionLoader() {
+ return actionLoader;
+ }
+
public synchronized void insertTrigger(Trigger t) {
- triggers.add(t);
+ triggerIdMap.put(t.getTriggerId(), t);
scannerThread.addTrigger(t);
}
+ public synchronized void removeTrigger(int id) {
+ removeTrigger(triggerIdMap.get(id));
+ }
+
public synchronized void removeTrigger(Trigger t) {
scannerThread.removeTrigger(t);
- triggers.remove(t);
+ triggerIdMap.remove(t.getTriggerId());
}
public List<Trigger> getTriggers() {
- return new ArrayList<Trigger>(triggers);
+ return new ArrayList<Trigger>(triggerIdMap.values());
}
//trigger scanner thread
src/java/azkaban/utils/Utils.java 86(+86 -0)
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index 03b1bab..5dfe71e 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -35,6 +35,14 @@ import java.util.zip.ZipFile;
import java.util.zip.ZipOutputStream;
import org.apache.commons.io.IOUtils;
+import org.joda.time.Days;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.Months;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.Weeks;
/**
* A util helper class full of static methods that are commonly used.
@@ -82,6 +90,17 @@ public class Utils {
return t;
}
}
+
+ public static File findFilefromDir(File dir, String fn){
+ if(dir.isDirectory()) {
+ for(File f : dir.listFiles()) {
+ if(f.getName().equals(fn)) {
+ return f;
+ }
+ }
+ }
+ return null;
+ }
/**
* Print the message and then exit with the given exit code
@@ -271,6 +290,7 @@ public class Utils {
Class<?>[] argTypes = new Class[args.length];
for (int i=0; i < args.length; ++i) {
+ //argTypes[i] = args[i].getClass();
argTypes[i] = args[i].getClass();
}
@@ -285,4 +305,70 @@ public class Utils {
output.write(buffer, 0, bytesRead);
}
}
+
+ public static ReadablePeriod parsePeriodString(String periodStr) {
+ ReadablePeriod period;
+ char periodUnit = periodStr.charAt(periodStr.length() - 1);
+ if (periodUnit == 'n') {
+ return null;
+ }
+
+ int periodInt = Integer.parseInt(periodStr.substring(0,
+ periodStr.length() - 1));
+ switch (periodUnit) {
+ case 'M':
+ period = Months.months(periodInt);
+ break;
+ case 'w':
+ period = Weeks.weeks(periodInt);
+ break;
+ case 'd':
+ period = Days.days(periodInt);
+ break;
+ case 'h':
+ period = Hours.hours(periodInt);
+ break;
+ case 'm':
+ period = Minutes.minutes(periodInt);
+ break;
+ case 's':
+ period = Seconds.seconds(periodInt);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid schedule period unit '"
+ + periodUnit);
+ }
+
+ return period;
+ }
+
+ public static String createPeriodString(ReadablePeriod period) {
+ String periodStr = "n";
+
+ if (period == null) {
+ return "n";
+ }
+
+ if (period.get(DurationFieldType.months()) > 0) {
+ int months = period.get(DurationFieldType.months());
+ periodStr = months + "M";
+ } else if (period.get(DurationFieldType.weeks()) > 0) {
+ int weeks = period.get(DurationFieldType.weeks());
+ periodStr = weeks + "w";
+ } else if (period.get(DurationFieldType.days()) > 0) {
+ int days = period.get(DurationFieldType.days());
+ periodStr = days + "d";
+ } else if (period.get(DurationFieldType.hours()) > 0) {
+ int hours = period.get(DurationFieldType.hours());
+ periodStr = hours + "h";
+ } else if (period.get(DurationFieldType.minutes()) > 0) {
+ int minutes = period.get(DurationFieldType.minutes());
+ periodStr = minutes + "m";
+ } else if (period.get(DurationFieldType.seconds()) > 0) {
+ int seconds = period.get(DurationFieldType.seconds());
+ periodStr = seconds + "s";
+ }
+
+ return periodStr;
+ }
}
\ No newline at end of file
src/sql/create.triggers.sql 2(+1 -1)
diff --git a/src/sql/create.triggers.sql b/src/sql/create.triggers.sql
index f75b727..4c05f55 100644
--- a/src/sql/create.triggers.sql
+++ b/src/sql/create.triggers.sql
@@ -2,6 +2,6 @@ CREATE TABLE triggers (
trigger_id INT NOT NULL AUTO_INCREMENT,
modify_time BIGINT NOT NULL,
enc_type TINYINT,
- data LONGBLOB NOT NULL,
+ data LONGBLOB,
PRIMARY KEY (trigger_id)
);
diff --git a/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
new file mode 100644
index 0000000..a103f5d
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
@@ -0,0 +1,63 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+import org.junit.Test;
+
+import azkaban.scheduler.BasicTimeChecker;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.utils.Utils;
+
+public class BasicTimeCheckerTest {
+
+ @Test
+ public void basicTimerTest(){
+
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+
+ // get a new timechecker, start from now, repeat every minute. should evaluate to false now, and true a minute later.
+ DateTime now = DateTime.now();
+ ReadablePeriod period = Utils.parsePeriodString("10s");
+
+ BasicTimeChecker timeChecker = new BasicTimeChecker("BasicTimeChecket_1", now, now.getZone(), true, true, period);
+ checkers.put(timeChecker.getId(), timeChecker);
+ String expr = timeChecker.getId() + ".eval()";
+
+ Condition cond = new Condition(checkers, expr);
+ System.out.println(expr);
+
+ assertFalse(cond.isMet());
+
+ //sleep for 1 min
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ assertTrue(cond.isMet());
+
+ cond.resetCheckers();
+
+ assertFalse(cond.isMet());
+
+ //sleep for 1 min
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ assertTrue(cond.isMet());
+
+ }
+}
diff --git a/unit/java/azkaban/test/trigger/ConditionTest.java b/unit/java/azkaban/test/trigger/ConditionTest.java
index 2cf8ca9..d959b2a 100644
--- a/unit/java/azkaban/test/trigger/ConditionTest.java
+++ b/unit/java/azkaban/test/trigger/ConditionTest.java
@@ -1,37 +1,39 @@
package azkaban.test.trigger;
+import java.io.File;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
-import org.apache.commons.jexl2.JexlEngine;
import org.joda.time.DateTime;
-import org.joda.time.ReadablePeriod;
import org.junit.Test;
+import azkaban.scheduler.BasicTimeChecker;
import azkaban.trigger.CheckerTypeLoader;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
-import azkaban.trigger.BasicTimeChecker;
+import azkaban.trigger.TriggerException;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
+import azkaban.utils.Utils;
public class ConditionTest {
-
- private JexlEngine jexl = new JexlEngine();
@Test
public void conditionTest(){
Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
- DateTime now = DateTime.now();
- FakeTimeChecker fake1 = new FakeTimeChecker(now);
- FakeTimeChecker fake2 = new FakeTimeChecker(now.plusMinutes(1));
+
+ ThresholdChecker fake1 = new ThresholdChecker("thresholdchecker1", 10);
+ ThresholdChecker fake2 = new ThresholdChecker("thresholdchecker2", 20);
+ ThresholdChecker.setVal(15);
checkers.put(fake1.getId(), fake1);
checkers.put(fake2.getId(), fake2);
- String expr1 = "( " + fake1.getId()+ ".eval()" + " && " + fake2.getId()+ ".eval()" + " )" + " || " + "( " + "!" + fake1.getId()+".eval()" + " && " + fake2.getId()+".eval()" + " )";
+ String expr1 = "( " + fake1.getId()+ ".eval()" + " && " + fake2.getId()+ ".eval()" + " )" + " || " + "( " + fake1.getId()+".eval()" + " && " + "!" + fake2.getId()+".eval()" + " )";
String expr2 = "( " + fake1.getId()+ ".eval()" + " && " + fake2.getId()+ ".eval()" + " )" + " || " + "( " + fake1.getId()+".eval()" + " && " + fake2.getId()+".eval()" + " )";
Condition cond = new Condition(checkers, expr1);
@@ -45,51 +47,7 @@ public class ConditionTest {
}
@Test
- public void timeCheckerTest(){
-
- Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
-
- // get a new timechecker, start from now, repeat every minute. should evaluate to false now, and true a minute later.
- DateTime now = DateTime.now();
- ReadablePeriod period = BasicTimeChecker.parsePeriodString("10s");
-
- BasicTimeChecker timeChecker = new BasicTimeChecker(now, true, true, period);
- checkers.put(timeChecker.getId(), timeChecker);
- String expr = timeChecker.getId() + ".eval()";
-
- Condition cond = new Condition(checkers, expr);
- System.out.println(expr);
-
- assertFalse(cond.isMet());
-
- //sleep for 1 min
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- assertTrue(cond.isMet());
-
- cond.resetCheckers();
-
- assertFalse(cond.isMet());
-
- //sleep for 1 min
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- assertTrue(cond.isMet());
-
- }
-
- @Test
- public void BasicTimeCheckerTest() {
+ public void jsonConversionTest() throws TriggerException, IOException {
CheckerTypeLoader checkerTypeLoader = new CheckerTypeLoader();
checkerTypeLoader.init(new Props());
@@ -102,7 +60,7 @@ public class ConditionTest {
String period = "6s";
//BasicTimeChecker timeChecker = new BasicTimeChecker(now, true, true, period);
- ConditionChecker timeChecker = checkerTypeLoader.createChecker(BasicTimeChecker.type, now, true, true, period);
+ ConditionChecker timeChecker = new BasicTimeChecker("BasicTimeChecker_1", now, now.getZone(), true, true, Utils.parsePeriodString(period));
System.out.println("checker id is " + timeChecker.getId());
checkers.put(timeChecker.getId(), timeChecker);
@@ -110,8 +68,12 @@ public class ConditionTest {
Condition cond = new Condition(checkers, expr);
- Object json = cond.toJson();
- Condition cond2 = Condition.fromJson(json);
+ File temp = File.createTempFile("temptest", "temptest");
+ temp.deleteOnExit();
+ Object obj = cond.toJson();
+ JSONUtils.toJSON(obj, temp);
+
+ Condition cond2 = Condition.fromJson(JSONUtils.parseJSONFromFile(temp));
Map<String, ConditionChecker> checkers1 = cond.getCheckers();
Map<String, ConditionChecker> checkers2 = cond2.getCheckers();
diff --git a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
index 00ee02e..2928539 100644
--- a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
+++ b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
@@ -8,17 +8,18 @@ import java.util.List;
import org.junit.Test;
+import azkaban.actions.ExecuteFlowAction;
import azkaban.executor.ExecutionOptions;
import azkaban.trigger.ActionTypeLoader;
-import azkaban.trigger.ExecuteFlowAction;
import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerException;
import azkaban.utils.Props;
public class ExecuteFlowActionTest {
@Test
- public void ExecuteFlowActionTest() throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ public void jsonConversionTest() throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, TriggerException {
ActionTypeLoader loader = new ActionTypeLoader();
loader.init(new Props());
@@ -26,7 +27,7 @@ public class ExecuteFlowActionTest {
List<String> disabledJobs = new ArrayList<String>();
options.setDisabledJobs(disabledJobs);
- ExecuteFlowAction executeFlowAction = new ExecuteFlowAction(1, "testflow", "azkaban", options);
+ ExecuteFlowAction executeFlowAction = new ExecuteFlowAction(1, "testproject", "testflow", "azkaban", options);
Object obj = executeFlowAction.toJson();
diff --git a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
new file mode 100644
index 0000000..fc68dfa
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
@@ -0,0 +1,225 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.*;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.actions.ExecuteFlowAction;
+import azkaban.database.DataSourceUtils;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.scheduler.BasicTimeChecker;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.JdbcTriggerLoader;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerException;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+public class JdbcTriggerLoaderTest {
+
+ private static boolean testDBExists = false;
+ //@TODO remove this and turn into local host.
+ private static final String host = "cyu-ld.linkedin.biz";
+ private static final int port = 3306;
+ private static final String database = "azkaban2";
+ private static final String user = "azkaban";
+ private static final String password = "azkaban";
+ private static final int numConnections = 10;
+
+ private TriggerLoader loader;
+ private CheckerTypeLoader checkerLoader;
+ private ActionTypeLoader actionLoader;
+
+ @Before
+ public void setup() throws TriggerException {
+ Props props = new Props();
+ props.put("database.type", "mysql");
+
+ props.put("mysql.host", host);
+ props.put("mysql.port", port);
+ props.put("mysql.user", user);
+ props.put("mysql.database", database);
+ props.put("mysql.password", password);
+ props.put("mysql.numconnections", numConnections);
+
+ loader = new JdbcTriggerLoader(props);
+ checkerLoader = new CheckerTypeLoader();
+ checkerLoader.init(new Props());
+ Condition.setCheckerLoader(checkerLoader);
+ actionLoader = new ActionTypeLoader();
+ actionLoader.init(new Props());
+ Trigger.setActionTypeLoader(actionLoader);
+ setupDB();
+ }
+
+ public void setupDB() {
+ DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ testDBExists = true;
+
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ CountHandler countHandler = new CountHandler();
+ QueryRunner runner = new QueryRunner();
+ try {
+ runner.query(connection, "SELECT COUNT(1) FROM triggers", countHandler);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ DbUtils.closeQuietly(connection);
+
+ clearDB();
+ }
+
+ @After
+ public void clearDB() {
+ if (!testDBExists) {
+ return;
+ }
+
+ DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ QueryRunner runner = new QueryRunner();
+ try {
+ runner.update(connection, "DELETE FROM triggers");
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ testDBExists = false;
+ DbUtils.closeQuietly(connection);
+ return;
+ }
+
+ DbUtils.closeQuietly(connection);
+ }
+
+ @Test
+ public void addTriggerTest() throws TriggerManagerException {
+ Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
+ Trigger t2 = createTrigger("testProj2", "testFlow2", "source2");
+ loader.addTrigger(t1);
+ List<Trigger> ts = loader.loadTriggers();
+ assertTrue(ts.size() == 1);
+
+ Trigger t3 = ts.get(0);
+ assertTrue(t3.getSource().equals("source1"));
+
+ loader.addTrigger(t2);
+ ts = loader.loadTriggers();
+ assertTrue(ts.size() == 2);
+
+ for(Trigger t : ts) {
+ if(t.getTriggerId() == t2.getTriggerId()) {
+ t.getSource().equals(t2.getSource());
+ }
+ }
+ }
+
+ @Test
+ public void removeTriggerTest() throws TriggerManagerException {
+ Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
+ Trigger t2 = createTrigger("testProj2", "testFlow2", "source2");
+ loader.addTrigger(t1);
+ loader.addTrigger(t2);
+ List<Trigger> ts = loader.loadTriggers();
+ assertTrue(ts.size() == 2);
+ loader.removeTrigger(t2);
+ ts = loader.loadTriggers();
+ assertTrue(ts.size() == 1);
+ assertTrue(ts.get(0).getTriggerId() == t1.getTriggerId());
+ }
+
+ @Test
+ public void updateTriggerTest() throws TriggerManagerException {
+ Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
+ t1.setResetOnExpire(true);
+ loader.addTrigger(t1);
+ List<Trigger> ts = loader.loadTriggers();
+ assertTrue(ts.get(0).isResetOnExpire() == true);
+ t1.setResetOnExpire(false);
+ loader.updateTrigger(t1);
+ ts = loader.loadTriggers();
+ assertTrue(ts.get(0).isResetOnExpire() == false);
+ }
+
+ private Trigger createTrigger(String projName, String flowName, String source) {
+ DateTime now = DateTime.now();
+ ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now, now.getZone(), true, true, Utils.parsePeriodString("1h"));
+ Map<String, ConditionChecker> checkers1 = new HashMap<String, ConditionChecker>();
+ checkers1.put(checker1.getId(), checker1);
+ String expr1 = checker1.getId() + ".eval()";
+ Condition triggerCond = new Condition(checkers1, expr1);
+ Condition expireCond = new Condition(checkers1, expr1);
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ TriggerAction action = new ExecuteFlowAction(1, projName, flowName, "azkaban", new ExecutionOptions());
+ actions.add(action);
+ Trigger t = new Trigger(now.getMillis(), now.getMillis(), "azkaban", source, triggerCond, expireCond, actions);
+ return t;
+ }
+
+ private boolean isTestSetup() {
+ if (!testDBExists) {
+ System.err.println("Skipping DB test because Db not setup.");
+ return false;
+ }
+
+ System.out.println("Running DB test because Db setup.");
+ return true;
+ }
+
+ public static class CountHandler implements ResultSetHandler<Integer> {
+ @Override
+ public Integer handle(ResultSet rs) throws SQLException {
+ int val = 0;
+ while (rs.next()) {
+ val++;
+ }
+
+ return val;
+ }
+ }
+
+}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index 56a9117..ee376b9 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -97,7 +97,7 @@ public class TriggerManagerTest {
Condition triggerCond = new Condition(checkers, expr);
Condition expireCond = new Condition(checkers, expr);
- Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", triggerCond, expireCond, actions);
+ Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", "tester", triggerCond, expireCond, actions);
return fakeTrigger;
}
@@ -106,7 +106,7 @@ public class TriggerManagerTest {
@Override
public void init(Props props) {
- checkerToClass.put(FakeTimeChecker.type, FakeTimeChecker.class);
+ checkerToClass.put(ThresholdChecker.type, ThresholdChecker.class);
}
}
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
new file mode 100644
index 0000000..1d7d81d
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -0,0 +1,71 @@
+package azkaban.test.trigger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import azkaban.actions.ExecuteFlowAction;
+import azkaban.executor.ExecutionOptions;
+import azkaban.scheduler.BasicTimeChecker;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerException;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+public class TriggerTest {
+
+ private CheckerTypeLoader checkerLoader;
+ private ActionTypeLoader actionLoader;
+
+ @Before
+ public void setup() throws TriggerException {
+ checkerLoader = new CheckerTypeLoader();
+ checkerLoader.init(new Props());
+ Condition.setCheckerLoader(checkerLoader);
+ actionLoader = new ActionTypeLoader();
+ actionLoader.init(new Props());
+ Trigger.setActionTypeLoader(actionLoader);
+ }
+
+ @Test
+ public void jsonConversionTest() throws TriggerException, IOException {
+ DateTime now = DateTime.now();
+ ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now, now.getZone(), true, true, Utils.parsePeriodString("1h"));
+ Map<String, ConditionChecker> checkers1 = new HashMap<String, ConditionChecker>();
+ checkers1.put(checker1.getId(), checker1);
+ String expr1 = checker1.getId() + ".eval()";
+ Condition triggerCond = new Condition(checkers1, expr1);
+ Condition expireCond = new Condition(checkers1, expr1);
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ TriggerAction action = new ExecuteFlowAction(1, "testProj", "testFlow", "azkaban", new ExecutionOptions());
+ actions.add(action);
+ Trigger t = new Trigger(now.getMillis(), now.getMillis(), "azkaban", "test", triggerCond, expireCond, actions);
+
+ File temp = File.createTempFile("temptest", "temptest");
+ temp.deleteOnExit();
+ Object obj = t.toJson();
+ JSONUtils.toJSON(obj, temp);
+
+ Trigger t2 = Trigger.fromJson(JSONUtils.parseJSONFromFile(temp));
+
+ assertTrue(t.getSource().equals(t2.getSource()));
+ assertTrue(t.getTriggerId() == t2.getTriggerId());
+
+ }
+
+}