azkaban-uncached

Details

diff --git a/src/java/azkaban/scheduler/BasicTimeChecker.java b/src/java/azkaban/scheduler/BasicTimeChecker.java
new file mode 100644
index 0000000..5a59c72
--- /dev/null
+++ b/src/java/azkaban/scheduler/BasicTimeChecker.java
@@ -0,0 +1,250 @@
+package azkaban.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.trigger.ConditionChecker;
+import azkaban.utils.Utils;
+
+public class BasicTimeChecker implements ConditionChecker {
+
+	public static final String type = "BasicTimeChecker";
+	
+	private DateTime firstCheckTime;
+	private DateTime nextCheckTime;
+	private DateTimeZone timezone;
+	private boolean isRecurring = true;
+	private boolean skipPastChecks = true;
+	private ReadablePeriod period;
+	
+	private String id = type; 
+	
+	public BasicTimeChecker(
+			String id,
+			DateTime firstCheckTime,
+			DateTimeZone timezone,
+			boolean isRecurring, 
+			boolean skipPastChecks,
+			ReadablePeriod period) {
+		this.id = id;
+		this.firstCheckTime = firstCheckTime;
+		this.timezone = timezone;
+		this.isRecurring = isRecurring;
+		this.skipPastChecks = skipPastChecks;
+		this.period = period;
+		this.nextCheckTime = new DateTime(firstCheckTime);
+		this.nextCheckTime = calculateNextCheckTime();
+	}
+	
+	public DateTime getFirstCheckTime() {
+		return firstCheckTime;
+	}
+
+	public boolean isRecurring() {
+		return isRecurring;
+	}
+
+	public boolean isSkipPastChecks() {
+		return skipPastChecks;
+	}
+
+	public ReadablePeriod getPeriod() {
+		return period;
+	}
+
+	public DateTime getNextCheckTime() {
+		return nextCheckTime;
+	}
+	
+//	public BasicTimeChecker(
+//			DateTime firstCheckTime,
+//			Boolean isRecurring, 
+//			Boolean skipPastChecks,
+//			String period) {
+//		this.firstCheckTime = firstCheckTime;
+//		this.isRecurring = isRecurring;
+//		this.skipPastChecks = skipPastChecks;
+//		this.period = parsePeriodString(period);
+//		this.nextCheckTime = new DateTime(firstCheckTime);
+//		this.nextCheckTime = calculateNextCheckTime();
+//	}
+	
+	public BasicTimeChecker(
+			String id,
+			DateTime firstCheckTime,
+			DateTimeZone timezone,
+			DateTime nextCheckTime,
+			boolean isRecurring, 
+			boolean skipPastChecks,
+			ReadablePeriod period) {
+		this.id = id;
+		this.firstCheckTime = firstCheckTime;
+		this.timezone = timezone;
+		this.nextCheckTime = nextCheckTime;
+		this.isRecurring = isRecurring;
+		this.skipPastChecks = skipPastChecks;
+		this.period = period;
+	}
+	
+	@Override
+	public Boolean eval() {
+		return nextCheckTime.isBeforeNow();
+	}
+
+	@Override
+	public void reset() {
+		this.nextCheckTime = calculateNextCheckTime();
+		
+	}
+	
+	/*
+	 * TimeChecker format:
+	 * type_first-time-in-millis_next-time-in-millis_timezone_is-recurring_skip-past-checks_period
+	 */
+	@Override
+	public String getId() {
+//		return getType() + "$" +
+//				firstCheckTime.getMillis() + "$" +
+//				nextCheckTime.getMillis() + "$" +
+//				firstCheckTime.getZone().getID().replace('/', '0') + "$" +
+//				//"offset"+firstCheckTime.getZone().getOffset(firstCheckTime.getMillis()) + "_" +
+//				(isRecurring == true ? "1" : "0") + "$" +
+//				(skipPastChecks == true ? "1" : "0") + "$" +
+//				createPeriodString(period);
+		return id;
+	}
+
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static ConditionChecker createFromJson(Object obj) throws Exception {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		if(!jsonObj.get("type").equals(type)) {
+			throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+		}
+		long firstTimeMillis = Long.valueOf((String)jsonObj.get("firstCheckTime"));
+		String timezoneId = (String) jsonObj.get("timezone");
+		long nextTimeMillis = Long.valueOf((String)jsonObj.get("nextCheckTime"));
+		DateTimeZone timezone = DateTimeZone.forID(timezoneId);
+		DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
+		DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
+		boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
+		boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
+		ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
+		String id = (String) jsonObj.get("id");
+		return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+	}
+	
+	public static ConditionChecker createFromJson(HashMap<String, Object> obj) throws Exception {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		if(!jsonObj.get("type").equals(type)) {
+			throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+		}
+		Long firstTimeMillis = Long.valueOf((String) jsonObj.get("firstCheckTime"));
+		String timezoneId = (String) jsonObj.get("timezone");
+		long nextTimeMillis = Long.valueOf((String) jsonObj.get("nextCheckTime"));
+		DateTimeZone timezone = DateTimeZone.forID(timezoneId);
+		DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
+		DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
+		boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
+		boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
+		ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
+		String id = (String) jsonObj.get("id");
+		return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+	}
+	
+	@Override
+	public ConditionChecker fromJson(Object obj) throws Exception{
+		return createFromJson(obj);
+	}
+	
+//	public static ConditionChecker createFromJson(String obj) {
+//		String str = (String) obj;
+//		String[] parts = str.split("\\$");
+//		
+//		if(!parts[0].equals(type)) {
+//			throw new RuntimeException("Cannot create checker of " + type + " from " + parts[0]);
+//		}
+//		
+//		long firstMillis = Long.parseLong(parts[1]);
+//		long nextMillis = Long.parseLong(parts[2]);
+//		//DateTimeZone timezone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(parts[3]));
+//		DateTimeZone timezone = DateTimeZone.forID(parts[3].replace('0', '/'));
+//		boolean isRecurring = parts[4].equals("1") ? true : false;
+//		boolean skipPastChecks = parts[5].equals("1") ? true : false;
+//		ReadablePeriod period = parsePeriodString(parts[6]);
+//		
+//		return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
+//	}
+//	
+//	@Override
+//	public ConditionChecker fromJson(Object obj) {
+//		String str = (String) obj;
+//		String[] parts = str.split("_");
+//		
+//		if(!parts[0].equals(getType())) {
+//			throw new RuntimeException("Cannot create checker of " + getType() + " from " + parts[0]);
+//		}
+//		
+//		long firstMillis = Long.parseLong(parts[1]);
+//		long nextMillis = Long.parseLong(parts[2]);
+//		DateTimeZone timezone = DateTimeZone.forID(parts[3]);
+//		boolean isRecurring = Boolean.valueOf(parts[4]);
+//		boolean skipPastChecks = Boolean.valueOf(parts[5]);
+//		ReadablePeriod period = parsePeriodString(parts[6]);
+//		
+//		return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
+//	}
+	
+	private DateTime calculateNextCheckTime(){
+		DateTime date = new DateTime(nextCheckTime);
+		int count = 0;
+		while(!DateTime.now().isBefore(date) && skipPastChecks) {
+			if(count > 100000) {
+				throw new IllegalStateException("100000 increments of period did not get to present time.");
+			}
+			
+			if(period == null) {
+				break;
+			}else {
+				date = date.plus(period);
+			}
+			count += 1;
+		}
+		return date;
+	}
+	
+	@Override
+	public Object getNum() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("type", type);
+		jsonObj.put("firstCheckTime", String.valueOf(firstCheckTime.getMillis()));
+		jsonObj.put("timezone", timezone.getID());
+		jsonObj.put("nextCheckTime", String.valueOf(nextCheckTime.getMillis()));
+		jsonObj.put("isRecurrint", String.valueOf(isRecurring));
+		jsonObj.put("skipPastChecks", String.valueOf(skipPastChecks));
+		jsonObj.put("period", Utils.createPeriodString(period));
+		jsonObj.put("id", id);
+		
+		return jsonObj;
+	}
+
+	@Override
+	public void setId(String id) {
+		this.id = id;
+	}
+
+}
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index f11c0fe..4a85881 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -54,6 +54,8 @@ public class Schedule{
 	private String status;
 	private long submitTime;
 	
+	private boolean skipPastOccurrences = true;
+	
 	private ExecutionOptions executionOptions;
 	private SlaOptions slaOptions;
 	
@@ -376,4 +378,13 @@ public class Schedule{
 			this.slaOptions = slaOptions;
 		}
 	}
+
+	public boolean isRecurring() {
+		return period == null ? false : true;
+	}
+
+	public boolean skipPastOccurrences() {
+		return skipPastOccurrences;
+	}
+	
 }
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
new file mode 100644
index 0000000..05f7614
--- /dev/null
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -0,0 +1,147 @@
+package azkaban.scheduler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+
+import azkaban.actions.ExecuteFlowAction;
+import azkaban.executor.ExecutorManager;
+import azkaban.project.ProjectManager;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerManager;
+
+public class TriggerBasedScheduleLoader implements ScheduleLoader {
+	
+	private static Logger logger = Logger.getLogger(TriggerBasedScheduleLoader.class);
+	
+	private TriggerManager triggerManager;
+	
+	public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManager executorManager, ProjectManager projectManager) {
+		this.triggerManager = triggerManager;
+		// need to init the action types and condition checker types 
+		ExecuteFlowAction.setExecutorManager(executorManager);
+		ExecuteFlowAction.setProjectManager(projectManager);
+	}
+	
+	private Trigger createScheduleTrigger(Schedule s) {
+		
+		Condition triggerCondition = createTimeTriggerCondition(s);
+		Condition expireCondition = createTimeExpireCondition(s);
+		List<TriggerAction> actions = createActions(s);
+		Trigger t = new Trigger(s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), "TriggerBasedScheduler", triggerCondition, expireCondition, actions);
+		
+		return t;
+	}
+	
+	private List<TriggerAction> createActions (Schedule s) {
+		List<TriggerAction> actions = new ArrayList<TriggerAction>();
+		TriggerAction act = new ExecuteFlowAction(s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions());
+		actions.add(act);
+		return actions;
+	}
+	
+	private Condition createTimeTriggerCondition (Schedule s) {
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", new DateTime(s.getFirstSchedTime()), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+		checkers.put(checker.getId(), checker);
+		String expr = checker.getId() + ".eval()";
+		Condition cond = new Condition(checkers, expr);
+		return cond;
+	}
+	
+	// if failed to trigger, auto expire?
+	private Condition createTimeExpireCondition (Schedule s) {
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", new DateTime(s.getFirstSchedTime()), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+		checkers.put(checker.getId(), checker);
+		String expr = checker.getId() + ".eval()";
+		Condition cond = new Condition(checkers, expr);
+		return cond;
+	}
+
+	@Override
+	public void insertSchedule(Schedule s) throws ScheduleManagerException {
+		Trigger t = createScheduleTrigger(s);
+		triggerManager.insertTrigger(t);
+	}
+
+	@Override
+	public void updateSchedule(Schedule s) throws ScheduleManagerException {
+		
+		
+	}
+
+	@Override
+	public List<Schedule> loadSchedules() throws ScheduleManagerException {
+		List<Trigger> triggers = triggerManager.getTriggers();
+		List<Schedule> schedules = new ArrayList<Schedule>();
+		for(Trigger t : triggers) {
+			if(t.getSource().equals("TriggerBasedScheduler")) {
+				Schedule s = triggerToSchedule(t);
+				schedules.add(s);
+			}
+		}
+		return schedules;
+		
+	}
+	
+	private Schedule triggerToSchedule(Trigger t) throws ScheduleManagerException {
+		Condition triggerCond = t.getTriggerCondition();
+		Map<String, ConditionChecker> checkers = triggerCond.getCheckers();
+		BasicTimeChecker ck = null;
+		for(ConditionChecker checker : checkers.values()) {
+			if(checker.getType().equals(BasicTimeChecker.type)) {
+				ck = (BasicTimeChecker) checker;
+				break;
+			}
+		}
+		List<TriggerAction> actions = t.getActions();
+		ExecuteFlowAction act = null;
+		for(TriggerAction action : actions) {
+			if(action.getType().equals(ExecuteFlowAction.type)) {
+				act = (ExecuteFlowAction) action;
+				break;
+			}
+		}
+		if(ck != null && act != null) {
+			Schedule s = new Schedule(
+					t.getTriggerId(), 
+					act.getProjectId(), 
+					act.getProjectName(), 
+					act.getFlowName(), 
+					"ready", 
+					ck.getFirstCheckTime().getMillis(), 
+					ck.getFirstCheckTime().getZone(), 
+					ck.getPeriod(),
+					t.getLastModifyTime(),
+					ck.getNextCheckTime().getMillis(),
+					t.getSubmitTime(),
+					t.getSubmitUser());
+			return s;
+		} else {
+			logger.error("Failed to parse schedule from trigger!");
+			throw new ScheduleManagerException("Failed to parse schedule from trigger!");
+		}
+	}
+
+	@Override
+	public void removeSchedule(Schedule s) throws ScheduleManagerException {
+		triggerManager.removeTrigger(s.getScheduleId());
+		
+	}
+
+	@Override
+	public void updateNextExecTime(Schedule s)
+			throws ScheduleManagerException {
+		logger.error("no longer doing it here.");
+		throw new ScheduleManagerException("No longer updating execution time in scheduler");
+	}
+
+}
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
new file mode 100644
index 0000000..1d0f2a6
--- /dev/null
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -0,0 +1,216 @@
+package azkaban.scheduler;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import azkaban.actions.ExecuteFlowAction;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.project.ProjectManager;
+import azkaban.sla.SLAManager;
+import azkaban.sla.SlaOptions;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerManager;
+import azkaban.utils.Pair;
+
+/**
+ * The ScheduleManager stores and executes the schedule. It uses a single thread
+ * instead and waits until correct loading time for the flow. It will not remove
+ * the flow from the schedule when it is run, which can potentially allow the
+ * flow to and overlap each other.
+ */
+public class TriggerBasedScheduler {
+	private static Logger logger = Logger.getLogger(TriggerBasedScheduler.class);
+
+	private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
+	private ScheduleLoader loader;
+
+	private Map<Pair<Integer, String>, Schedule> scheduleIDMap = new HashMap<Pair<Integer, String>, Schedule>();
+	private Map<Integer, Pair<Integer, String>> idFlowMap = new HashMap<Integer, Pair<Integer,String>>();
+
+	private final ExecutorManager executorManager;
+	private final ProjectManager projectManager;
+	private final SLAManager slaManager;
+	private final TriggerManager triggerManager;
+	
+	/**
+	 * Give the schedule manager a loader class that will properly load the
+	 * schedule.
+	 * 
+	 * @param loader
+	 */
+	public TriggerBasedScheduler(ExecutorManager executorManager,
+							ProjectManager projectManager, 
+							SLAManager slaManager,
+							TriggerManager triggerManager,
+							ScheduleLoader loader) 
+	{
+		this.executorManager = executorManager;
+		this.projectManager = projectManager;
+		this.slaManager = slaManager;
+		this.triggerManager = triggerManager;
+		this.loader = loader;
+
+		List<Schedule> scheduleList = null;
+		try {
+			scheduleList = loader.loadSchedules();
+		} catch (ScheduleManagerException e) {
+			// TODO Auto-generated catch block
+			logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
+			e.printStackTrace();
+		}
+
+	}
+
+	/**
+	 * Retrieves a copy of the list of schedules.
+	 * 
+	 * @return
+	 */
+	public synchronized List<Schedule> getSchedules() {
+		return new ArrayList<Schedule>(scheduleIDMap.values());
+	}
+
+	/**
+	 * Returns the scheduled flow for the flow name
+	 * 
+	 * @param id
+	 * @return
+	*/
+	public Schedule getSchedule(int projectId, String flowId) {
+		return scheduleIDMap.get(new Pair<Integer,String>(projectId, flowId));
+	}
+
+	/**
+	 * Removes the flow from the schedule if it exists.
+	 * 
+	 * @param id
+	 */
+	public synchronized void removeSchedule(int projectId, String flowId) {
+		Schedule s = getSchedule(projectId, flowId);
+		if(s != null) {
+			removeSchedule(s);
+		}
+	}
+	
+	/**
+	 * Removes the flow from the schedule if it exists.
+	 * 
+	 * @param id
+	 */
+	public synchronized void removeSchedule(Schedule sched) {
+
+		Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
+		Schedule s = scheduleIDMap.get(identityPairMap);
+		if(s != null) {
+			scheduleIDMap.remove(sched.getScheduleId());
+		}
+		
+		try {
+			loader.removeSchedule(sched);
+		} catch (ScheduleManagerException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	public Schedule scheduleFlow(
+			final int scheduleId,
+			final int projectId,
+			final String projectName,
+			final String flowName,
+			final String status,
+			final long firstSchedTime,
+			final DateTimeZone timezone,
+			final ReadablePeriod period,
+			final long lastModifyTime,
+			final long nextExecTime,
+			final long submitTime,
+			final String submitUser
+			) {
+		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+	}
+	
+	public Schedule scheduleFlow(
+			final int scheduleId,
+			final int projectId,
+			final String projectName,
+			final String flowName,
+			final String status,
+			final long firstSchedTime,
+			final DateTimeZone timezone,
+			final ReadablePeriod period,
+			final long lastModifyTime,
+			final long nextExecTime,
+			final long submitTime,
+			final String submitUser,
+			ExecutionOptions execOptions,
+			SlaOptions slaOptions
+			) {
+		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
+		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
+				+ _dateFormat.print(firstSchedTime) + " with a period of "
+				+ period == null ? "(non-recurring)" : period);
+
+		insertSchedule(sched);
+		return sched;
+	}
+
+
+	/**
+	 * Adds a flow to the schedule.
+	 * 
+	 * @param flow
+	 */
+	public synchronized void insertSchedule(Schedule s) {
+		boolean exist = s.getScheduleId() != -1;
+		if(s.updateTime()) {
+			try {
+				if(!exist) {
+					loader.insertSchedule(s);
+					scheduleIDMap.put(s.getScheduleIdentityPair(), s);
+				}
+				else{
+					loader.updateSchedule(s);
+					scheduleIDMap.put(s.getScheduleIdentityPair(), s);
+				}
+			} catch (ScheduleManagerException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+		else {
+			logger.error("The provided schedule is non-recurring and the scheduled time already passed. " + s.getScheduleName());
+		}
+	}
+	
+	
+}
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 87773ed..38ee266 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -13,6 +13,7 @@ import java.util.Map;
 
 import org.apache.log4j.Logger;
 
+import azkaban.actions.ExecuteFlowAction;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 
@@ -36,6 +37,13 @@ public class ActionTypeLoader {
 
 	}
 	
+	public synchronized void registerActionType(String type, Class<? extends TriggerAction> actionClass) {
+		logger.info("Registering action " + type);
+		if(!actionToClass.containsKey(type)) {
+			actionToClass.put(type, actionClass);
+		}
+	}
+	
 	private void loadPluginActions(Props props) throws TriggerException {
 		String checkerDir = props.getString("azkaban.trigger.action.plugin.dir", DEFAULT_TRIGGER_ACTION_PLUGIN_DIR);
 		File pluginDir = new File(checkerDir);
@@ -128,6 +136,7 @@ public class ActionTypeLoader {
 	
 	private void loadDefaultActions() {
 		actionToClass.put(ExecuteFlowAction.type, ExecuteFlowAction.class);		
+		logger.info("Loaded ExecuteFlowAction type.");
 	}
 	
 	public TriggerAction createActionFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index 18e3e7b..f67655d 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -13,6 +13,7 @@ import java.util.Map;
 
 import org.apache.log4j.Logger;
 
+import azkaban.scheduler.BasicTimeChecker;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 
@@ -38,6 +39,13 @@ public class CheckerTypeLoader {
 
 	}
 	
+	public synchronized void registerCheckerType(String type, Class<? extends ConditionChecker> checkerClass) {
+		logger.info("Registering checker " + type);
+		if(!checkerToClass.containsKey(type)) {
+			checkerToClass.put(type, checkerClass);
+		}
+	}
+	
 	private void loadPluginCheckers(Props props) throws TriggerException {
 		
 		String checkerDir = props.getString("azkaban.condition.checker.plugin.dir", DEFAULT_CONDITION_CHECKER_PLUGIN_DIR);
@@ -130,7 +138,8 @@ public class CheckerTypeLoader {
 	}
 	
 	private void loadDefaultCheckers() {
-		checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);		
+		checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);
+		logger.info("Loaded BasicTimeChecker type.");
 	}
 	
 	public ConditionChecker createCheckerFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index 4c8e078..a7b311a 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -4,14 +4,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.jexl2.Expression;
 import org.apache.commons.jexl2.JexlEngine;
 import org.apache.commons.jexl2.MapContext;
 import org.apache.log4j.Logger;
 
-import com.sun.swing.internal.plaf.synth.resources.synth;
 
 public class Condition {
 	
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index ad7b0c6..20b3075 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -9,11 +9,13 @@ public interface ConditionChecker {
 	
 	void reset();
 	
+	void setId(String id);
+	
 	String getId();
 	
 	String getType();
 	
-	ConditionChecker fromJson(Object obj);
+	ConditionChecker fromJson(Object obj) throws Exception;
 	
 	Object toJson();
 
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
index 91bf85e..e76cc6a 100644
--- a/src/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -44,13 +44,13 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 	private static final String triggerTblName = "triggers";
 
 	private static String GET_ALL_TRIGGERS =
-			"SELECT trigger_id, enc_type, data FROM " + triggerTblName;
+			"SELECT trigger_id, modify_time, enc_type, data FROM " + triggerTblName;
 	
 	private static String GET_TRIGGER = 
-			"SELECT trigger_id, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
+			"SELECT trigger_id, modify_time, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
 	
 	private static String ADD_TRIGGER = 
-			"INSERT INTO " + triggerTblName + " ( modify_time, enc_type, data) values (?,?,?)";
+			"INSERT INTO " + triggerTblName + " ( modify_time) values (?)";
 	
 	private static String REMOVE_TRIGGER = 
 			"DELETE FROM " + triggerTblName + " WHERE trigger_id=?";
@@ -128,37 +128,22 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 
 	private synchronized void addTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerManagerException {
 		
-		String json = JSONUtils.toJSON(t.toJson());
-		byte[] data = null;
-		try {
-			byte[] stringData = json.getBytes("UTF-8");
-			data = stringData;
-	
-			if (encType == EncodingType.GZIP) {
-				data = GZIPUtils.gzipBytes(stringData);
-			}
-			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
-		}
-		catch (IOException e) {
-			throw new TriggerManagerException("Error encoding the trigger " + t.toString());
-		}
-		
 		QueryRunner runner = new QueryRunner();
-		long submitTime = System.currentTimeMillis();
 		
 		long id;
 		
 		try {
-			runner.update(connection, ADD_TRIGGER, DateTime.now().getMillis(), encType.getNumVal(), data);
+			runner.update(connection, ADD_TRIGGER, DateTime.now().getMillis());
 			connection.commit();
 			id = runner.query(connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());
 
 			if (id == -1l) {
 				throw new TriggerManagerException("trigger id is not properly created.");
 			}
-			logger.info("uploaded trigger " + t.toString());
-			t.setTriggerId((int)id);
 			
+			t.setTriggerId((int)id);
+			updateTrigger(t);
+			logger.info("uploaded trigger " + t.toString());
 		} catch (SQLException e) {
 			throw new TriggerManagerException("Error creating trigger.", e);
 		}
@@ -243,7 +228,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 			ArrayList<Trigger> triggers = new ArrayList<Trigger>();
 			do {
 				int triggerId = rs.getInt(1);
-				long modifyTime = rs.getInt(2);
+				long modifyTime = rs.getLong(2);
 				int encodingType = rs.getInt(3);
 				byte[] data = rs.getBytes(4);
 				
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index 240411b..800606d 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -16,6 +16,8 @@ public class Trigger {
 	private long submitTime = -1;
 	private String submitUser;
 	
+	private String source;
+	
 	private Condition triggerCondition;
 	private Condition expireCondition;
 	private List<TriggerAction> actions;
@@ -30,16 +32,38 @@ public class Trigger {
 		throw new TriggerManagerException("Triggers should always be specified");
 	}
 	
+	public long getSubmitTime() {
+		return submitTime;
+	}
+
+	public String getSubmitUser() {
+		return submitUser;
+	}
+
+	public Condition getTriggerCondition() {
+		return triggerCondition;
+	}
+
+	public Condition getExpireCondition() {
+		return expireCondition;
+	}
+
+	public List<TriggerAction> getActions() {
+		return actions;
+	}
+
 	public Trigger(
 			long lastModifyTime, 
 			long submitTime, 
 			String submitUser, 
+			String source,
 			Condition triggerCondition,
 			Condition expireCondition,
 			List<TriggerAction> actions) {
 		this.lastModifyTime = lastModifyTime;
 		this.submitTime = submitTime;
 		this.submitUser = submitUser;
+		this.source = source;
 		this.triggerCondition = triggerCondition;
 		this.expireCondition = expireCondition;
 		this.actions = actions;
@@ -50,6 +74,7 @@ public class Trigger {
 			long lastModifyTime, 
 			long submitTime, 
 			String submitUser, 
+			String source,
 			Condition triggerCondition,
 			Condition expireCondition,
 			List<TriggerAction> actions) {
@@ -57,6 +82,7 @@ public class Trigger {
 		this.lastModifyTime = lastModifyTime;
 		this.submitTime = submitTime;
 		this.submitUser = submitUser;
+		this.source = source;
 		this.triggerCondition = triggerCondition;
 		this.expireCondition = expireCondition;
 		this.actions = actions;
@@ -125,6 +151,7 @@ public class Trigger {
 		jsonObj.put("resetOnTrigger", String.valueOf(resetOnTrigger));
 		jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
 		jsonObj.put("submitUser", submitUser);
+		jsonObj.put("source", source);
 		jsonObj.put("submitTime", String.valueOf(submitTime));
 		jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime));
 		jsonObj.put("triggerId", String.valueOf(triggerId));
@@ -133,6 +160,10 @@ public class Trigger {
 	}
 	
 	
+	public String getSource() {
+		return source;
+	}
+
 	@SuppressWarnings("unchecked")
 	public static Trigger fromJson(Object obj) {
 		
@@ -153,10 +184,11 @@ public class Trigger {
 			boolean resetOnTrigger = Boolean.valueOf((String) jsonObj.get("resetOnTrigger"));
 			boolean resetOnExpire = Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
 			String submitUser = (String) jsonObj.get("submitUser");
+			String source = (String) jsonObj.get("source");
 			long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
 			long lastModifyTime = Long.valueOf((String) jsonObj.get("lastModifyTime"));
 			int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
-			trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, triggerCond, expireCond, actions);
+			trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions);
 			trigger.setResetOnExpire(resetOnExpire);
 			trigger.setResetOnTrigger(resetOnTrigger);
 		}catch(Exception e) {
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 6f613cc..393c974 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -1,7 +1,9 @@
 package azkaban.trigger;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -16,12 +18,19 @@ public class TriggerManager {
 	
 	private final long DEFAULT_TRIGGER_EXPIRE_TIME = 24*60*60*1000L;
 	
-	private List<Trigger> triggers;
+	private Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
+	
+	private CheckerTypeLoader checkerLoader;
+	private ActionTypeLoader actionLoader;
 	
 	TriggerScannerThread scannerThread;
 	
 	public TriggerManager(Props props, TriggerLoader triggerLoader, CheckerTypeLoader checkerLoader, ActionTypeLoader actionLoader) {
 		
+		this.checkerLoader = checkerLoader;
+		this.actionLoader = actionLoader;
+		scannerThread = new TriggerScannerThread("TriggerScannerThread");
+		
 		// load plugins
 		try{
 			checkerLoader.init(props);
@@ -36,33 +45,45 @@ public class TriggerManager {
 		
 		try{
 			// expect loader to return valid triggers
-			triggers = triggerLoader.loadTriggers();
+			List<Trigger> triggers = triggerLoader.loadTriggers();
+			for(Trigger t : triggers) {
+				scannerThread.addTrigger(t);
+				triggerIdMap.put(t.getTriggerId(), t);
+			}
 		}catch(Exception e) {
 			e.printStackTrace();
 			logger.error(e.getMessage());
 		}
 		
-		scannerThread = new TriggerScannerThread("TriggerScannerThread");
 		
-		for(Trigger t : triggers) {
-			scannerThread.addTrigger(t);
-		}
 		
 		scannerThread.start();
 	}
 	
+	public CheckerTypeLoader getCheckerLoader() {
+		return checkerLoader;
+	}
+
+	public ActionTypeLoader getActionLoader() {
+		return actionLoader;
+	}
+
 	public synchronized void insertTrigger(Trigger t) {
-		triggers.add(t);
+		triggerIdMap.put(t.getTriggerId(), t);
 		scannerThread.addTrigger(t);
 	}
 	
+	public synchronized void removeTrigger(int id) {
+		removeTrigger(triggerIdMap.get(id));
+	}
+	
 	public synchronized void removeTrigger(Trigger t) {
 		scannerThread.removeTrigger(t);
-		triggers.remove(t);		
+		triggerIdMap.remove(t.getTriggerId());		
 	}
 	
 	public List<Trigger> getTriggers() {
-		return new ArrayList<Trigger>(triggers);
+		return new ArrayList<Trigger>(triggerIdMap.values());
 	}
 
 	//trigger scanner thread
diff --git a/src/java/azkaban/utils/Utils.java b/src/java/azkaban/utils/Utils.java
index 03b1bab..5dfe71e 100644
--- a/src/java/azkaban/utils/Utils.java
+++ b/src/java/azkaban/utils/Utils.java
@@ -35,6 +35,14 @@ import java.util.zip.ZipFile;
 import java.util.zip.ZipOutputStream;
 
 import org.apache.commons.io.IOUtils;
+import org.joda.time.Days;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.Months;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.Weeks;
 
 /**
  * A util helper class full of static methods that are commonly used.
@@ -82,6 +90,17 @@ public class Utils {
 			return t;
 		}
 	}
+	
+	public static File findFilefromDir(File dir, String fn){
+		if(dir.isDirectory()) {
+			for(File f : dir.listFiles()) {
+				if(f.getName().equals(fn)) {
+					return f;
+				}
+			}
+		}
+		return null;
+	}
 
 	/**
 	 * Print the message and then exit with the given exit code
@@ -271,6 +290,7 @@ public class Utils {
 		
 		Class<?>[] argTypes = new Class[args.length];
 		for (int i=0; i < args.length; ++i) {
+			//argTypes[i] = args[i].getClass();
 			argTypes[i] = args[i].getClass();
 		}
 		
@@ -285,4 +305,70 @@ public class Utils {
 			output.write(buffer, 0, bytesRead);
 		}
 	}
+	
+	public static ReadablePeriod parsePeriodString(String periodStr) {
+		ReadablePeriod period;
+		char periodUnit = periodStr.charAt(periodStr.length() - 1);
+		if (periodUnit == 'n') {
+			return null;
+		}
+
+		int periodInt = Integer.parseInt(periodStr.substring(0,
+				periodStr.length() - 1));
+		switch (periodUnit) {
+		case 'M':
+			period = Months.months(periodInt);
+			break;
+		case 'w':
+			period = Weeks.weeks(periodInt);
+			break;
+		case 'd':
+			period = Days.days(periodInt);
+			break;
+		case 'h':
+			period = Hours.hours(periodInt);
+			break;
+		case 'm':
+			period = Minutes.minutes(periodInt);
+			break;
+		case 's':
+			period = Seconds.seconds(periodInt);
+			break;
+		default:
+			throw new IllegalArgumentException("Invalid schedule period unit '"
+					+ periodUnit);
+		}
+
+		return period;
+	}
+
+	public static String createPeriodString(ReadablePeriod period) {
+		String periodStr = "n";
+
+		if (period == null) {
+			return "n";
+		}
+
+		if (period.get(DurationFieldType.months()) > 0) {
+			int months = period.get(DurationFieldType.months());
+			periodStr = months + "M";
+		} else if (period.get(DurationFieldType.weeks()) > 0) {
+			int weeks = period.get(DurationFieldType.weeks());
+			periodStr = weeks + "w";
+		} else if (period.get(DurationFieldType.days()) > 0) {
+			int days = period.get(DurationFieldType.days());
+			periodStr = days + "d";
+		} else if (period.get(DurationFieldType.hours()) > 0) {
+			int hours = period.get(DurationFieldType.hours());
+			periodStr = hours + "h";
+		} else if (period.get(DurationFieldType.minutes()) > 0) {
+			int minutes = period.get(DurationFieldType.minutes());
+			periodStr = minutes + "m";
+		} else if (period.get(DurationFieldType.seconds()) > 0) {
+			int seconds = period.get(DurationFieldType.seconds());
+			periodStr = seconds + "s";
+		}
+
+		return periodStr;
+	}
 }
\ No newline at end of file
diff --git a/src/sql/create.triggers.sql b/src/sql/create.triggers.sql
index f75b727..4c05f55 100644
--- a/src/sql/create.triggers.sql
+++ b/src/sql/create.triggers.sql
@@ -2,6 +2,6 @@ CREATE TABLE triggers (
 	trigger_id INT NOT NULL AUTO_INCREMENT,
 	modify_time BIGINT NOT NULL,
 	enc_type TINYINT,
-	data LONGBLOB NOT NULL,
+	data LONGBLOB,
 	PRIMARY KEY (trigger_id)
 );
diff --git a/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
new file mode 100644
index 0000000..a103f5d
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
@@ -0,0 +1,63 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+import org.junit.Test;
+
+import azkaban.scheduler.BasicTimeChecker;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.utils.Utils;
+
+public class BasicTimeCheckerTest {
+
+	@Test
+	public void basicTimerTest(){
+		
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		
+		// get a new timechecker, start from now, repeat every minute. should evaluate to false now, and true a minute later.
+		DateTime now = DateTime.now();
+		ReadablePeriod period = Utils.parsePeriodString("10s");
+		
+		BasicTimeChecker timeChecker = new BasicTimeChecker("BasicTimeChecket_1", now, now.getZone(), true, true, period);
+		checkers.put(timeChecker.getId(), timeChecker);
+		String expr = timeChecker.getId() + ".eval()";
+		
+		Condition cond = new Condition(checkers, expr);
+		System.out.println(expr);
+		
+		assertFalse(cond.isMet());
+		
+		//sleep for 1 min
+		try {
+			Thread.sleep(10000);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		assertTrue(cond.isMet());
+		
+		cond.resetCheckers();
+		
+		assertFalse(cond.isMet());
+		
+		//sleep for 1 min
+		try {
+			Thread.sleep(10000);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		assertTrue(cond.isMet());
+		
+	}
+}
diff --git a/unit/java/azkaban/test/trigger/ConditionTest.java b/unit/java/azkaban/test/trigger/ConditionTest.java
index 2cf8ca9..d959b2a 100644
--- a/unit/java/azkaban/test/trigger/ConditionTest.java
+++ b/unit/java/azkaban/test/trigger/ConditionTest.java
@@ -1,37 +1,39 @@
 package azkaban.test.trigger;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 
-import org.apache.commons.jexl2.JexlEngine;
 import org.joda.time.DateTime;
-import org.joda.time.ReadablePeriod;
 import org.junit.Test;
 
+import azkaban.scheduler.BasicTimeChecker;
 import azkaban.trigger.CheckerTypeLoader;
 import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
-import azkaban.trigger.BasicTimeChecker;
+import azkaban.trigger.TriggerException;
+import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
+import azkaban.utils.Utils;
 
 public class ConditionTest {
-
-	private JexlEngine jexl = new JexlEngine();
 	
 	@Test
 	public void conditionTest(){
 		
 		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
-		DateTime now = DateTime.now();
-		FakeTimeChecker fake1 = new FakeTimeChecker(now);
-		FakeTimeChecker fake2 = new FakeTimeChecker(now.plusMinutes(1));
+
+		ThresholdChecker fake1 = new ThresholdChecker("thresholdchecker1", 10);
+		ThresholdChecker fake2 = new ThresholdChecker("thresholdchecker2", 20);
+		ThresholdChecker.setVal(15);
 		checkers.put(fake1.getId(), fake1);
 		checkers.put(fake2.getId(), fake2);
 
-		String expr1 = "( " + fake1.getId()+ ".eval()" + " && " + fake2.getId()+ ".eval()" + " )" + " || " + "( " + "!" + fake1.getId()+".eval()" + " && " + fake2.getId()+".eval()" + " )";
+		String expr1 = "( " + fake1.getId()+ ".eval()" + " && " + fake2.getId()+ ".eval()" + " )" + " || " + "( " + fake1.getId()+".eval()" + " && " + "!" + fake2.getId()+".eval()" + " )";
 		String expr2 = "( " + fake1.getId()+ ".eval()" + " && " + fake2.getId()+ ".eval()" + " )" + " || " + "( " + fake1.getId()+".eval()" + " && " + fake2.getId()+".eval()" + " )";
 
 		Condition cond = new Condition(checkers, expr1);
@@ -45,51 +47,7 @@ public class ConditionTest {
 	}
 	
 	@Test
-	public void timeCheckerTest(){
-		
-		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
-		
-		// get a new timechecker, start from now, repeat every minute. should evaluate to false now, and true a minute later.
-		DateTime now = DateTime.now();
-		ReadablePeriod period = BasicTimeChecker.parsePeriodString("10s");
-		
-		BasicTimeChecker timeChecker = new BasicTimeChecker(now, true, true, period);
-		checkers.put(timeChecker.getId(), timeChecker);
-		String expr = timeChecker.getId() + ".eval()";
-		
-		Condition cond = new Condition(checkers, expr);
-		System.out.println(expr);
-		
-		assertFalse(cond.isMet());
-		
-		//sleep for 1 min
-		try {
-			Thread.sleep(10000);
-		} catch (InterruptedException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		
-		assertTrue(cond.isMet());
-		
-		cond.resetCheckers();
-		
-		assertFalse(cond.isMet());
-		
-		//sleep for 1 min
-		try {
-			Thread.sleep(10000);
-		} catch (InterruptedException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		
-		assertTrue(cond.isMet());
-		
-	}
-	
-	@Test
-	public void BasicTimeCheckerTest() {
+	public void jsonConversionTest() throws TriggerException, IOException {
 		
 		CheckerTypeLoader checkerTypeLoader = new CheckerTypeLoader();
 		checkerTypeLoader.init(new Props());
@@ -102,7 +60,7 @@ public class ConditionTest {
 		String period = "6s";
 		
 		//BasicTimeChecker timeChecker = new BasicTimeChecker(now, true, true, period);
-		ConditionChecker timeChecker = checkerTypeLoader.createChecker(BasicTimeChecker.type, now, true, true, period);
+		ConditionChecker timeChecker = new BasicTimeChecker("BasicTimeChecker_1", now, now.getZone(), true, true, Utils.parsePeriodString(period));
 		System.out.println("checker id is " + timeChecker.getId());
 		
 		checkers.put(timeChecker.getId(), timeChecker);
@@ -110,8 +68,12 @@ public class ConditionTest {
 		
 		Condition cond = new Condition(checkers, expr);
 		
-		Object json = cond.toJson();
-		Condition cond2 = Condition.fromJson(json);
+		File temp = File.createTempFile("temptest", "temptest");
+		temp.deleteOnExit();
+		Object obj = cond.toJson();
+		JSONUtils.toJSON(obj, temp);
+		
+		Condition cond2 = Condition.fromJson(JSONUtils.parseJSONFromFile(temp));
 		
 		Map<String, ConditionChecker> checkers1 = cond.getCheckers();
 		Map<String, ConditionChecker> checkers2 = cond2.getCheckers();
diff --git a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
index 00ee02e..2928539 100644
--- a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
+++ b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
@@ -8,17 +8,18 @@ import java.util.List;
 
 import org.junit.Test;
 
+import azkaban.actions.ExecuteFlowAction;
 import azkaban.executor.ExecutionOptions;
 import azkaban.trigger.ActionTypeLoader;
-import azkaban.trigger.ExecuteFlowAction;
 import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerException;
 import azkaban.utils.Props;
 
 
 public class ExecuteFlowActionTest {
 	
 	@Test
-	public void ExecuteFlowActionTest() throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+	public void jsonConversionTest() throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, TriggerException {
 		ActionTypeLoader loader = new ActionTypeLoader();
 		loader.init(new Props());
 		
@@ -26,7 +27,7 @@ public class ExecuteFlowActionTest {
 		List<String> disabledJobs = new ArrayList<String>();
 		options.setDisabledJobs(disabledJobs);
 		
-		ExecuteFlowAction executeFlowAction = new ExecuteFlowAction(1, "testflow", "azkaban", options);
+		ExecuteFlowAction executeFlowAction = new ExecuteFlowAction(1, "testproject", "testflow", "azkaban", options);
 		
 		Object obj = executeFlowAction.toJson();
 		
diff --git a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
new file mode 100644
index 0000000..fc68dfa
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
@@ -0,0 +1,225 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.*;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.actions.ExecuteFlowAction;
+import azkaban.database.DataSourceUtils;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.scheduler.BasicTimeChecker;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.JdbcTriggerLoader;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerException;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+public class JdbcTriggerLoaderTest {
+
+	private static boolean testDBExists = false;
+	//@TODO remove this and turn into local host.
+	private static final String host = "cyu-ld.linkedin.biz";
+	private static final int port = 3306;
+	private static final String database = "azkaban2";
+	private static final String user = "azkaban";
+	private static final String password = "azkaban";
+	private static final int numConnections = 10;
+	
+	private TriggerLoader loader;
+	private CheckerTypeLoader checkerLoader;
+	private ActionTypeLoader actionLoader;
+
+	@Before
+	public void setup() throws TriggerException {
+		Props props = new Props();
+		props.put("database.type", "mysql");
+		
+		props.put("mysql.host", host);		
+		props.put("mysql.port", port);
+		props.put("mysql.user", user);
+		props.put("mysql.database", database);
+		props.put("mysql.password", password);
+		props.put("mysql.numconnections", numConnections);
+		
+		loader = new JdbcTriggerLoader(props);
+		checkerLoader = new CheckerTypeLoader();
+		checkerLoader.init(new Props());
+		Condition.setCheckerLoader(checkerLoader);
+		actionLoader = new ActionTypeLoader();
+		actionLoader.init(new Props());
+		Trigger.setActionTypeLoader(actionLoader);
+		setupDB();
+	}
+	
+	public void setupDB() {
+		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		testDBExists = true;
+		
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+
+		CountHandler countHandler = new CountHandler();
+		QueryRunner runner = new QueryRunner();
+		try {
+			runner.query(connection, "SELECT COUNT(1) FROM triggers", countHandler);
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+		
+		DbUtils.closeQuietly(connection);
+		
+		clearDB();
+	}
+	
+	@After
+	public void clearDB() {
+		if (!testDBExists) {
+			return;
+		}
+
+		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+
+		QueryRunner runner = new QueryRunner();
+		try {
+			runner.update(connection, "DELETE FROM triggers");
+			
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+		
+		DbUtils.closeQuietly(connection);
+	}
+	
+	@Test
+	public void addTriggerTest() throws TriggerManagerException {
+		Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
+		Trigger t2 = createTrigger("testProj2", "testFlow2", "source2");
+		loader.addTrigger(t1);
+		List<Trigger> ts = loader.loadTriggers();
+		assertTrue(ts.size() == 1);
+		
+		Trigger t3 = ts.get(0);
+		assertTrue(t3.getSource().equals("source1"));
+		
+		loader.addTrigger(t2);
+		ts = loader.loadTriggers();
+		assertTrue(ts.size() == 2);
+
+		for(Trigger t : ts) {
+			if(t.getTriggerId() == t2.getTriggerId()) {
+				t.getSource().equals(t2.getSource());
+			}
+		}
+	}
+	
+	@Test
+	public void removeTriggerTest() throws TriggerManagerException {
+		Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
+		Trigger t2 = createTrigger("testProj2", "testFlow2", "source2");
+		loader.addTrigger(t1);
+		loader.addTrigger(t2);
+		List<Trigger> ts = loader.loadTriggers();
+		assertTrue(ts.size() == 2);
+		loader.removeTrigger(t2);
+		ts = loader.loadTriggers();
+		assertTrue(ts.size() == 1);
+		assertTrue(ts.get(0).getTriggerId() == t1.getTriggerId());
+	}
+	
+	@Test
+	public void updateTriggerTest() throws TriggerManagerException {
+		Trigger t1 = createTrigger("testProj1", "testFlow1", "source1");
+		t1.setResetOnExpire(true);
+		loader.addTrigger(t1);
+		List<Trigger> ts = loader.loadTriggers();
+		assertTrue(ts.get(0).isResetOnExpire() == true);
+		t1.setResetOnExpire(false);
+		loader.updateTrigger(t1);
+		ts = loader.loadTriggers();
+		assertTrue(ts.get(0).isResetOnExpire() == false);
+	}
+	
+	private Trigger createTrigger(String projName, String flowName, String source) {
+		DateTime now = DateTime.now();
+		ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now, now.getZone(), true, true, Utils.parsePeriodString("1h"));
+		Map<String, ConditionChecker> checkers1 = new HashMap<String, ConditionChecker>();
+		checkers1.put(checker1.getId(), checker1);
+		String expr1 = checker1.getId() + ".eval()";
+		Condition triggerCond = new Condition(checkers1, expr1);
+		Condition expireCond = new Condition(checkers1, expr1);
+		List<TriggerAction> actions = new ArrayList<TriggerAction>();
+		TriggerAction action = new ExecuteFlowAction(1, projName, flowName, "azkaban", new ExecutionOptions());
+		actions.add(action);
+		Trigger t = new Trigger(now.getMillis(), now.getMillis(), "azkaban", source, triggerCond, expireCond, actions);
+		return t;
+	}
+	
+	private boolean isTestSetup() {
+		if (!testDBExists) {
+			System.err.println("Skipping DB test because Db not setup.");
+			return false;
+		}
+		
+		System.out.println("Running DB test because Db setup.");
+		return true;
+	}
+	
+	public static class CountHandler implements ResultSetHandler<Integer> {
+		@Override
+		public Integer handle(ResultSet rs) throws SQLException {
+			int val = 0;
+			while (rs.next()) {
+				val++;
+			}
+			
+			return val;
+		}
+	}
+	
+}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index 56a9117..ee376b9 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -97,7 +97,7 @@ public class TriggerManagerTest {
 		Condition triggerCond = new Condition(checkers, expr);
 		Condition expireCond = new Condition(checkers, expr);
 		
-		Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", triggerCond, expireCond, actions);
+		Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", "tester", triggerCond, expireCond, actions);
 		
 		return fakeTrigger;
 	}
@@ -106,7 +106,7 @@ public class TriggerManagerTest {
 		
 		@Override
 		public void init(Props props) {
-			checkerToClass.put(FakeTimeChecker.type, FakeTimeChecker.class);
+			checkerToClass.put(ThresholdChecker.type, ThresholdChecker.class);
 		}
 	}
 	
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
new file mode 100644
index 0000000..1d7d81d
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -0,0 +1,71 @@
+package azkaban.test.trigger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import azkaban.actions.ExecuteFlowAction;
+import azkaban.executor.ExecutionOptions;
+import azkaban.scheduler.BasicTimeChecker;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerException;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+public class TriggerTest {
+	
+	private CheckerTypeLoader checkerLoader;
+	private ActionTypeLoader actionLoader;
+	
+	@Before
+	public void setup() throws TriggerException {
+		checkerLoader = new CheckerTypeLoader();
+		checkerLoader.init(new Props());
+		Condition.setCheckerLoader(checkerLoader);
+		actionLoader = new ActionTypeLoader();
+		actionLoader.init(new Props());
+		Trigger.setActionTypeLoader(actionLoader);
+	}
+	
+	@Test
+	public void jsonConversionTest() throws TriggerException, IOException {
+		DateTime now = DateTime.now();
+		ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now, now.getZone(), true, true, Utils.parsePeriodString("1h"));
+		Map<String, ConditionChecker> checkers1 = new HashMap<String, ConditionChecker>();
+		checkers1.put(checker1.getId(), checker1);
+		String expr1 = checker1.getId() + ".eval()";
+		Condition triggerCond = new Condition(checkers1, expr1);
+		Condition expireCond = new Condition(checkers1, expr1);
+		List<TriggerAction> actions = new ArrayList<TriggerAction>();
+		TriggerAction action = new ExecuteFlowAction(1, "testProj", "testFlow", "azkaban", new ExecutionOptions());
+		actions.add(action);
+		Trigger t = new Trigger(now.getMillis(), now.getMillis(), "azkaban", "test", triggerCond, expireCond, actions);
+		
+		File temp = File.createTempFile("temptest", "temptest");
+		temp.deleteOnExit();
+		Object obj = t.toJson();
+		JSONUtils.toJSON(obj, temp);
+		
+		Trigger t2 = Trigger.fromJson(JSONUtils.parseJSONFromFile(temp));
+		
+		assertTrue(t.getSource().equals(t2.getSource()));
+		assertTrue(t.getTriggerId() == t2.getTriggerId());
+		
+	}
+
+}