azkaban-uncached

scheduler should be working now

6/17/2013 6:14:59 PM

Details

diff --git a/src/java/azkaban/actions/ExecuteFlowAction.java b/src/java/azkaban/actions/ExecuteFlowAction.java
index 20d0369..e7c8dfb 100644
--- a/src/java/azkaban/actions/ExecuteFlowAction.java
+++ b/src/java/azkaban/actions/ExecuteFlowAction.java
@@ -26,7 +26,7 @@ public class ExecuteFlowAction implements TriggerAction {
 	private static ProjectManager projectManager;
 	private ExecutionOptions executionOptions;
 
-	private static Logger logger;
+	private static Logger logger = Logger.getLogger(ExecuteFlowAction.class);
 	
 	public ExecuteFlowAction(int projectId, String projectName, String flowName, String submitUser, ExecutionOptions executionOptions) {
 		this.projectId = projectId;
@@ -75,10 +75,18 @@ public class ExecuteFlowAction implements TriggerAction {
 		this.executionOptions = executionOptions;
 	}
 
+	public static ExecutorManager getExecutorManager() {
+		return executorManager;
+	}
+ 	
 	public static void setExecutorManager(ExecutorManager executorManager) {
 		ExecuteFlowAction.executorManager = executorManager;
 	}
 
+	public static ProjectManager getProjectManager() {
+		return projectManager;
+	}
+	
 	public static void setProjectManager(ProjectManager projectManager) {
 		ExecuteFlowAction.projectManager = projectManager;
 	}
@@ -148,7 +156,11 @@ public class ExecuteFlowAction implements TriggerAction {
 	}
 
 	@Override
-	public void doAction() {
+	public void doAction() throws Exception {
+		if(projectManager == null || executorManager == null) {
+			throw new Exception("ExecuteFlowAction not properly initialized!");
+		}
+		
 		Project project = projectManager.getProject(projectId);
 		if(project == null) {
 			logger.error("Project to execute " + projectId + " does not exist!");
diff --git a/src/java/azkaban/scheduler/BasicTimeChecker.java b/src/java/azkaban/scheduler/BasicTimeChecker.java
index 5a59c72..9d35746 100644
--- a/src/java/azkaban/scheduler/BasicTimeChecker.java
+++ b/src/java/azkaban/scheduler/BasicTimeChecker.java
@@ -21,7 +21,7 @@ public class BasicTimeChecker implements ConditionChecker {
 	private boolean skipPastChecks = true;
 	private ReadablePeriod period;
 	
-	private String id = type; 
+	private final String id; 
 	
 	public BasicTimeChecker(
 			String id,
@@ -242,9 +242,5 @@ public class BasicTimeChecker implements ConditionChecker {
 		return jsonObj;
 	}
 
-	@Override
-	public void setId(String id) {
-		this.id = id;
-	}
 
 }
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index c60f143..0d5fc2f 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -62,14 +62,16 @@ public class ScheduleManager {
 
 	private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
 	private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
-	private final ScheduleRunner runner;
+	
 	private final ExecutorManager executorManager;
 	private final ProjectManager projectManager;
 	private final SLAManager slaManager;
 	
+	private final boolean useExternalRunner;
+	private final ScheduleRunner runner;
+	
 	// Used for mbeans to query Scheduler status
-	private long lastCheckTime = -1;
-	private long nextWakupTime = -1;
+	
 
 	/**
 	 * Give the schedule manager a loader class that will properly load the
@@ -80,13 +82,16 @@ public class ScheduleManager {
 	public ScheduleManager(ExecutorManager executorManager,
 							ProjectManager projectManager, 
 							SLAManager slaManager,
-							ScheduleLoader loader) 
+							ScheduleLoader loader,
+							boolean useExternalRunner) 
 	{
 		this.executorManager = executorManager;
 		this.projectManager = projectManager;
 		this.slaManager = slaManager;
 		this.loader = loader;
-		this.runner = new ScheduleRunner();
+		this.useExternalRunner = useExternalRunner;
+		
+		
 
 		List<Schedule> scheduleList = null;
 		try {
@@ -101,7 +106,12 @@ public class ScheduleManager {
 			internalSchedule(sched);
 		}
 
-		this.runner.start();
+		if(!useExternalRunner) {
+			this.runner = new ScheduleRunner();
+			this.runner.start();
+		} else {
+			this.runner = null;
+		}
 	}
 
 	/**
@@ -109,7 +119,9 @@ public class ScheduleManager {
 	 * it again.
 	 */
 	public void shutdown() {
-		this.runner.shutdown();
+		if(!useExternalRunner) {
+			this.runner.shutdown();
+		}
 	}
 
 	/**
@@ -118,7 +130,8 @@ public class ScheduleManager {
 	 * @return
 	 */
 	public synchronized List<Schedule> getSchedules() {
-		return runner.getRunnerSchedules();
+		//return runner.getRunnerSchedules();
+		return new ArrayList<Schedule>(scheduleIDMap.values());
 	}
 
 	/**
@@ -149,8 +162,10 @@ public class ScheduleManager {
 	 */
 	public synchronized void removeSchedules(int projectId, String flowId) {
 		Set<Schedule> schedules = getSchedules(projectId, flowId);
-		for(Schedule sched : schedules) {
-			removeSchedule(sched);
+		if(schedules != null) {
+			for(Schedule sched : schedules) {
+				removeSchedule(sched);
+			}
 		}
 	}
 	/**
@@ -159,7 +174,7 @@ public class ScheduleManager {
 	 * @param id
 	 */
 	public synchronized void removeSchedule(Schedule sched) {
-
+		
 		Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
 		Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
 		if(schedules != null) {
@@ -170,13 +185,17 @@ public class ScheduleManager {
 		}
 		scheduleIDMap.remove(sched.getScheduleId());
 		
-		runner.removeRunnerSchedule(sched);
 		try {
 			loader.removeSchedule(sched);
 		} catch (ScheduleManagerException e) {
 			// TODO Auto-generated catch block
 			e.printStackTrace();
 		}
+		
+		if(!useExternalRunner) {
+			runner.removeRunnerSchedule(sched);
+		}
+		
 	}
 
 	// public synchronized void pauseScheduledFlow(String scheduleId){
@@ -250,11 +269,13 @@ public class ScheduleManager {
 	 */
 	private synchronized void internalSchedule(Schedule s) {
 		Schedule existing = scheduleIDMap.get(s.getScheduleId());
-		if (existing != null) {
-			this.runner.removeRunnerSchedule(existing);
+		if(!useExternalRunner) {
+			if (existing != null) {
+				this.runner.removeRunnerSchedule(existing);
+			}
+			s.updateTime();
+			this.runner.addRunnerSchedule(s);
 		}
-		s.updateTime();
-		this.runner.addRunnerSchedule(s);
 		scheduleIDMap.put(s.getScheduleId(), s);
 		Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
 		if(schedules == null) {
@@ -306,6 +327,9 @@ public class ScheduleManager {
 	 * 
 	 */
 	public class ScheduleRunner extends Thread {
+		
+		private long lastCheckTime = -1;
+		private long nextWakupTime = -1;
 		private final PriorityBlockingQueue<Schedule> schedules;
 		private AtomicBoolean stillAlive = new AtomicBoolean(true);
 
@@ -322,6 +346,14 @@ public class ScheduleManager {
 			this.interrupt();
 		}
 
+		public long getLastCheckTime() {
+			return lastCheckTime;
+		}
+		
+		public long getNextWakeupTime() {
+			return nextWakupTime;
+		}
+		
 		/**
 		 * Return a list of scheduled flow
 		 * 
@@ -528,18 +560,34 @@ public class ScheduleManager {
 	}
 	
 	public long getLastCheckTime() {
-		return lastCheckTime;
+		if(useExternalRunner) {
+			return -1;
+		} else {
+			return runner.getLastCheckTime();
+		}
 	}
 	
 	public long getNextUpdateTime() {
-		return nextWakupTime;
+		if(useExternalRunner) {
+			return -1;
+		} else {
+			return runner.getNextWakeupTime();
+		}
 	}
 	
 	public State getThreadState() {
-		return runner.getState();
+		if(useExternalRunner) {
+			return null;
+		} else {
+			return runner.getState();
+		}
 	}
 	
 	public boolean isThreadActive() {
-		return runner.isAlive();
+		if(useExternalRunner) {
+			return false;
+		} else {
+			return runner.isAlive();
+		}
 	}
 }
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 05f7614..a36f3f1 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -16,6 +16,7 @@ import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
 
 public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	
@@ -23,6 +24,8 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	
 	private TriggerManager triggerManager;
 	
+	private static final String triggerSource = "TriggerBasedScheduler";
+	
 	public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManager executorManager, ProjectManager projectManager) {
 		this.triggerManager = triggerManager;
 		// need to init the action types and condition checker types 
@@ -35,8 +38,10 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		Condition triggerCondition = createTimeTriggerCondition(s);
 		Condition expireCondition = createTimeExpireCondition(s);
 		List<TriggerAction> actions = createActions(s);
-		Trigger t = new Trigger(s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), "TriggerBasedScheduler", triggerCondition, expireCondition, actions);
-		
+		Trigger t = new Trigger(s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
+		if(s.isRecurring()) {
+			t.setResetOnTrigger(true);
+		}
 		return t;
 	}
 	
@@ -69,21 +74,33 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	@Override
 	public void insertSchedule(Schedule s) throws ScheduleManagerException {
 		Trigger t = createScheduleTrigger(s);
-		triggerManager.insertTrigger(t);
+		try {
+			triggerManager.insertTrigger(t);
+		} catch (TriggerManagerException e) {
+			// TODO Auto-generated catch block
+			throw new ScheduleManagerException("Failed to insert new schedule!", e);
+		}
 	}
 
 	@Override
 	public void updateSchedule(Schedule s) throws ScheduleManagerException {
-		
-		
+		Trigger t = createScheduleTrigger(s);
+		try {
+			triggerManager.updateTrigger(t);
+		} catch (TriggerManagerException e) {
+			// TODO Auto-generated catch block
+			throw new ScheduleManagerException("Failed to update schedule!", e);
+		}
 	}
 
+	//TODO
+	// may need to add logic to filter out skip runs
 	@Override
 	public List<Schedule> loadSchedules() throws ScheduleManagerException {
 		List<Trigger> triggers = triggerManager.getTriggers();
 		List<Schedule> schedules = new ArrayList<Schedule>();
 		for(Trigger t : triggers) {
-			if(t.getSource().equals("TriggerBasedScheduler")) {
+			if(t.getSource().equals(triggerSource)) {
 				Schedule s = triggerToSchedule(t);
 				schedules.add(s);
 			}
@@ -133,7 +150,12 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 
 	@Override
 	public void removeSchedule(Schedule s) throws ScheduleManagerException {
-		triggerManager.removeTrigger(s.getScheduleId());
+		try {
+			triggerManager.removeTrigger(s.getScheduleId());
+		} catch (TriggerManagerException e) {
+			// TODO Auto-generated catch block
+			throw new ScheduleManagerException(e.getMessage());
+		}
 		
 	}
 
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
index 1d0f2a6..87a3cf6 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduler.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -54,12 +54,7 @@ public class TriggerBasedScheduler {
 	private ScheduleLoader loader;
 
 	private Map<Pair<Integer, String>, Schedule> scheduleIDMap = new HashMap<Pair<Integer, String>, Schedule>();
-	private Map<Integer, Pair<Integer, String>> idFlowMap = new HashMap<Integer, Pair<Integer,String>>();
-
-	private final ExecutorManager executorManager;
-	private final ProjectManager projectManager;
-	private final SLAManager slaManager;
-	private final TriggerManager triggerManager;
+	private Map<Integer, Schedule> idFlowMap = new HashMap<Integer, Schedule>();
 	
 	/**
 	 * Give the schedule manager a loader class that will properly load the
@@ -69,15 +64,10 @@ public class TriggerBasedScheduler {
 	 */
 	public TriggerBasedScheduler(ExecutorManager executorManager,
 							ProjectManager projectManager, 
-							SLAManager slaManager,
 							TriggerManager triggerManager,
 							ScheduleLoader loader) 
 	{
-		this.executorManager = executorManager;
-		this.projectManager = projectManager;
-		this.slaManager = slaManager;
-		this.triggerManager = triggerManager;
-		this.loader = loader;
+		this.loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
 
 		List<Schedule> scheduleList = null;
 		try {
@@ -87,7 +77,10 @@ public class TriggerBasedScheduler {
 			logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
 			e.printStackTrace();
 		}
-
+		for(Schedule s : scheduleList) {
+			scheduleIDMap.put(s.getScheduleIdentityPair(), s);
+			idFlowMap.put(s.getScheduleId(), s);
+		}
 	}
 
 	/**
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 38ee266..62e50f8 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -139,9 +139,12 @@ public class ActionTypeLoader {
 		logger.info("Loaded ExecuteFlowAction type.");
 	}
 	
-	public TriggerAction createActionFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+	public TriggerAction createActionFromJson(String type, Object obj) throws Exception {
 		TriggerAction action = null;
 		Class<? extends TriggerAction> actionClass = actionToClass.get(type);		
+		if(actionClass == null) {
+			throw new Exception("Action Type " + type + " not supported!");
+		}
 		action = (TriggerAction) Utils.invokeStaticMethod(actionClass.getClassLoader(), actionClass.getName(), "createFromJson", obj);
 		
 		return action;
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index f67655d..67a207d 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -142,9 +142,12 @@ public class CheckerTypeLoader {
 		logger.info("Loaded BasicTimeChecker type.");
 	}
 	
-	public ConditionChecker createCheckerFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+	public ConditionChecker createCheckerFromJson(String type, Object obj) throws Exception {
 		ConditionChecker checker = null;
-		Class<? extends ConditionChecker> checkerClass = checkerToClass.get(type);		
+		Class<? extends ConditionChecker> checkerClass = checkerToClass.get(type);	
+		if(checkerClass == null) {
+			throw new Exception("Checker type " + type + " not supported!");
+		}
 		checker = (ConditionChecker) Utils.invokeStaticMethod(checkerClass.getClassLoader(), checkerClass.getName(), "createFromJson", obj);
 		
 		return checker;
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index a7b311a..5d809d5 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -34,6 +34,10 @@ public class Condition {
 		Condition.checkerLoader = loader;
 	}
 	
+	public static CheckerTypeLoader getCheckerLoader() {
+		return checkerLoader;
+	}
+	
 	public void registerChecker(ConditionChecker checker) {
 		checkers.put(checker.getId(), checker);
 		context.set(checker.getId(), checker);
@@ -85,7 +89,11 @@ public class Condition {
 	}
 
 	@SuppressWarnings("unchecked")
-	public static Condition fromJson(Object obj) {
+	public static Condition fromJson(Object obj) throws Exception {
+		if(checkerLoader == null) {
+			throw new Exception("Condition Checker loader not initialized!");
+		}
+		
 		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
 		Condition cond = null;
 		
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 20b3075..14ca333 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -8,9 +8,7 @@ public interface ConditionChecker {
 	Object getNum();
 	
 	void reset();
-	
-	void setId(String id);
-	
+
 	String getId();
 	
 	String getType();
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index 800606d..d819800 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -92,6 +92,10 @@ public class Trigger {
 		Trigger.actionTypeLoader = loader;
 	}
 	
+	public static ActionTypeLoader getActionTypeLoader() {
+		return actionTypeLoader;
+	}
+	
 	public boolean isResetOnTrigger() {
 		return resetOnTrigger;
 	}
@@ -165,8 +169,12 @@ public class Trigger {
 	}
 
 	@SuppressWarnings("unchecked")
-	public static Trigger fromJson(Object obj) {
+	public static Trigger fromJson(Object obj) throws Exception {
 		
+		if(actionTypeLoader == null) {
+			throw new Exception("Trigger Action Type loader not initialized.");
+		}
+ 		
 		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
 		
 		Trigger trigger = null;
diff --git a/src/java/azkaban/trigger/TriggerAction.java b/src/java/azkaban/trigger/TriggerAction.java
index 581b3fa..c57d585 100644
--- a/src/java/azkaban/trigger/TriggerAction.java
+++ b/src/java/azkaban/trigger/TriggerAction.java
@@ -8,6 +8,6 @@ public interface TriggerAction {
 	
 	Object toJson();
 	
-	void doAction();
+	void doAction() throws Exception;
 	
 }
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 393c974..f5ad251 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -16,20 +16,20 @@ import azkaban.utils.Props;
 public class TriggerManager {
 	private static Logger logger = Logger.getLogger(TriggerManager.class);
 	
-	private final long DEFAULT_TRIGGER_EXPIRE_TIME = 24*60*60*1000L;
-	
 	private Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
 	
 	private CheckerTypeLoader checkerLoader;
 	private ActionTypeLoader actionLoader;
 	
+	private TriggerLoader triggerLoader;
+	
 	TriggerScannerThread scannerThread;
 	
-	public TriggerManager(Props props, TriggerLoader triggerLoader, CheckerTypeLoader checkerLoader, ActionTypeLoader actionLoader) {
+	public TriggerManager(Props props, TriggerLoader triggerLoader) {
+		
+		checkerLoader = new CheckerTypeLoader();
+		actionLoader = new ActionTypeLoader();
 		
-		this.checkerLoader = checkerLoader;
-		this.actionLoader = actionLoader;
-		scannerThread = new TriggerScannerThread("TriggerScannerThread");
 		
 		// load plugins
 		try{
@@ -43,6 +43,11 @@ public class TriggerManager {
 		Condition.setCheckerLoader(checkerLoader);
 		Trigger.setActionTypeLoader(actionLoader);
 		
+		long scannerInterval = props.getLong("trigger.scan.interval", TriggerScannerThread.DEFAULT_SCAN_INTERVAL_MS);
+		scannerThread = new TriggerScannerThread(scannerInterval);
+		scannerThread.setName("TriggerScannerThread");
+		
+		this.triggerLoader = triggerLoader;
 		try{
 			// expect loader to return valid triggers
 			List<Trigger> triggers = triggerLoader.loadTriggers();
@@ -55,8 +60,6 @@ public class TriggerManager {
 			logger.error(e.getMessage());
 		}
 		
-		
-		
 		scannerThread.start();
 	}
 	
@@ -68,17 +71,33 @@ public class TriggerManager {
 		return actionLoader;
 	}
 
-	public synchronized void insertTrigger(Trigger t) {
+	public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
+		
+		triggerLoader.addTrigger(t);
 		triggerIdMap.put(t.getTriggerId(), t);
 		scannerThread.addTrigger(t);
 	}
 	
-	public synchronized void removeTrigger(int id) {
+	public synchronized void removeTrigger(int id) throws TriggerManagerException {
 		removeTrigger(triggerIdMap.get(id));
 	}
 	
-	public synchronized void removeTrigger(Trigger t) {
-		scannerThread.removeTrigger(t);
+	public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
+		if(!triggerIdMap.containsKey(t.getTriggerId())) {
+			throw new TriggerManagerException("The trigger to update doesn't exist!");
+		}
+		
+		scannerThread.deleteTrigger(t);
+		scannerThread.addTrigger(t);
+		triggerIdMap.put(t.getTriggerId(), t);
+		
+		
+		triggerLoader.updateTrigger(t);
+	}
+	
+	public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
+		triggerLoader.removeTrigger(t);
+		scannerThread.deleteTrigger(t);
 		triggerIdMap.remove(t.getTriggerId());		
 	}
 	
@@ -88,21 +107,29 @@ public class TriggerManager {
 
 	//trigger scanner thread
 	public class TriggerScannerThread extends Thread {
+		
+		//public static final long DEFAULT_SCAN_INTERVAL_MS = 300000;
+		public static final long DEFAULT_SCAN_INTERVAL_MS = 60000;
+		
 		private final BlockingQueue<Trigger> triggers;
 		private AtomicBoolean stillAlive = new AtomicBoolean(true);
-		private String scannerName;
 		private long lastCheckTime = -1;
+		private final long scanInterval;
 		
 		// Five minute minimum intervals
-		private static final int TIMEOUT_MS = 300000;
 		
-		public TriggerScannerThread(String scannerName){
+		public TriggerScannerThread(){
 			triggers = new LinkedBlockingDeque<Trigger>();
-			this.setName(scannerName);
+			this.scanInterval = DEFAULT_SCAN_INTERVAL_MS;
+		}
+
+		public TriggerScannerThread(long interval){
+			triggers = new LinkedBlockingDeque<Trigger>();
+			this.scanInterval = interval;
 		}
 		
 		public void shutdown() {
-			logger.error("Shutting down trigger manager thread " + scannerName);
+			logger.error("Shutting down trigger manager thread " + this.getName());
 			stillAlive.set(false);
 			this.interrupt();
 		}
@@ -115,7 +142,7 @@ public class TriggerManager {
 			triggers.add(t);
 		}
 		
-		public synchronized void removeTrigger(Trigger t) {
+		public synchronized void deleteTrigger(Trigger t) {
 			triggers.remove(t);
 		}
 		
@@ -135,9 +162,9 @@ public class TriggerManager {
 							logger.error(t.getMessage());
 						}
 						
-						long timeRemaining = TIMEOUT_MS - (System.currentTimeMillis() - lastCheckTime);
+						long timeRemaining = scanInterval - (System.currentTimeMillis() - lastCheckTime);
 						if(timeRemaining < 0) {
-							logger.error("Trigger manager thread " + scannerName + " is too busy!");
+							logger.error("Trigger manager thread " + this.getName() + " is too busy!");
 						} else {
 							wait(timeRemaining);
 						}
@@ -149,7 +176,7 @@ public class TriggerManager {
 			}
 		}
 		
-		private void checkAllTriggers() {
+		private void checkAllTriggers() throws TriggerManagerException {
 			for(Trigger t : triggers) {
 				if(t.triggerConditionMet()) {
 					onTriggerTrigger(t);
@@ -157,32 +184,32 @@ public class TriggerManager {
 					onTriggerExpire(t);
 				}
 			}
-			
 		}
 		
-		private void onTriggerTrigger(Trigger t) {
+		private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
 			List<TriggerAction> actions = t.getTriggerActions();
 			for(TriggerAction action : actions) {
-				action.doAction();
+				try {
+					action.doAction();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					throw new TriggerManagerException("action failed to execute", e);
+				}
 			}
 			if(t.isResetOnTrigger()) {
 				t.resetTriggerConditions();
 			} else {
-				triggers.remove(t);
+				removeTrigger(t);
 			}
 		}
 		
-		private void onTriggerExpire(Trigger t) {
+		private void onTriggerExpire(Trigger t) throws TriggerManagerException {
 			if(t.isResetOnExpire()) {
 				t.resetTriggerConditions();
 			} else {
-				triggers.remove(t);
+				removeTrigger(t);
 			}
 		}
 	}
-	
-	public TriggerAction createTriggerAction() {
-		return null;
-	}
 
 }
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index edbf237..2cafed0 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -60,10 +60,16 @@ import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectManager;
 
 import azkaban.scheduler.JdbcScheduleLoader;
+import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.TriggerBasedScheduleLoader;
+import azkaban.scheduler.TriggerBasedScheduler;
 import azkaban.sla.JdbcSLALoader;
 import azkaban.sla.SLAManager;
 import azkaban.sla.SLAManagerException;
+import azkaban.trigger.JdbcTriggerLoader;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManager;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
 import azkaban.utils.FileIOUtils;
@@ -129,6 +135,9 @@ public class AzkabanWebServer extends AzkabanServer {
 	private ProjectManager projectManager;
 	private ExecutorManager executorManager;
 	private ScheduleManager scheduleManager;
+//	private TriggerBasedScheduler scheduler;
+	private TriggerManager triggerManager;
+	
 	private SLAManager slaManager;
 
 	private final ClassLoader baseClassLoader;
@@ -161,7 +170,12 @@ public class AzkabanWebServer extends AzkabanServer {
 		projectManager = loadProjectManager(props);
 		executorManager = loadExecutorManager(props);
 		slaManager = loadSLAManager(props);
-		scheduleManager = loadScheduleManager(executorManager, slaManager, props);
+		
+		triggerManager = loadTriggerManager(props);
+//		scheduler = loadScheduler(executorManager, projectManager, triggerManager);
+		
+		scheduleManager = loadScheduleManager(projectManager, executorManager, slaManager, triggerManager, props);
+		
 		baseClassLoader = getBaseClassloader();
 		
 		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -220,17 +234,35 @@ public class AzkabanWebServer extends AzkabanServer {
 		return execManager;
 	}
 
-	private ScheduleManager loadScheduleManager(ExecutorManager execManager, SLAManager slaManager, Props props ) throws Exception {
-		ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, slaManager, new JdbcScheduleLoader(props));
+	private ScheduleManager loadScheduleManager(ProjectManager projectManager, ExecutorManager executorManager, SLAManager slaManager, TriggerManager triggerManager, Props props ) throws Exception {
+		ScheduleManager schedManager = null;
+		String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
+		if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
+			ScheduleLoader loader = new JdbcScheduleLoader(props);
+			schedManager = new ScheduleManager(executorManager, projectManager, slaManager, loader, false);
+		} else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
+			ScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
+			schedManager = new ScheduleManager(executorManager, projectManager, slaManager, loader, true);
+		}
 
 		return schedManager;
 	}
+	
+//	private TriggerBasedScheduler loadScheduler(ExecutorManager executorManager, ProjectManager projectManager, TriggerManager triggerManager) {
+//		TriggerBasedScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
+//		return new TriggerBasedScheduler(executorManager, projectManager, triggerManager, loader);
+//	}
 
 	private SLAManager loadSLAManager(Props props) throws SLAManagerException {
 		SLAManager slaManager = new SLAManager(executorManager, new JdbcSLALoader(props), props);
 		return slaManager;
 	}
 	
+	private TriggerManager loadTriggerManager(Props props) {
+		TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
+		return new TriggerManager(props, triggerLoader);
+	}
+	
 	/**
 	 * Returns the web session cache.
 	 * 
@@ -280,6 +312,14 @@ public class AzkabanWebServer extends AzkabanServer {
 		return scheduleManager;
 	}
 	
+	public TriggerManager getTriggerManager() {
+		return triggerManager;
+	}
+	
+//	public TriggerBasedScheduler getScheduler() {
+//		return scheduler;
+//	}
+//	
 	/**
 	 * Creates and configures the velocity engine.
 	 * 
diff --git a/unit/java/azkaban/test/trigger/ConditionTest.java b/unit/java/azkaban/test/trigger/ConditionTest.java
index d959b2a..34ce3bf 100644
--- a/unit/java/azkaban/test/trigger/ConditionTest.java
+++ b/unit/java/azkaban/test/trigger/ConditionTest.java
@@ -47,7 +47,7 @@ public class ConditionTest {
 	}
 	
 	@Test
-	public void jsonConversionTest() throws TriggerException, IOException {
+	public void jsonConversionTest() throws Exception {
 		
 		CheckerTypeLoader checkerTypeLoader = new CheckerTypeLoader();
 		checkerTypeLoader.init(new Props());
diff --git a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
index 2928539..a996885 100644
--- a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
+++ b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
@@ -19,7 +19,7 @@ import azkaban.utils.Props;
 public class ExecuteFlowActionTest {
 	
 	@Test
-	public void jsonConversionTest() throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, TriggerException {
+	public void jsonConversionTest() throws Exception {
 		ActionTypeLoader loader = new ActionTypeLoader();
 		loader.init(new Props());
 		
diff --git a/unit/java/azkaban/test/trigger/ThresholdChecker.java b/unit/java/azkaban/test/trigger/ThresholdChecker.java
index 9c568ba..c26f854 100644
--- a/unit/java/azkaban/test/trigger/ThresholdChecker.java
+++ b/unit/java/azkaban/test/trigger/ThresholdChecker.java
@@ -16,6 +16,9 @@ public class ThresholdChecker implements ConditionChecker{
 	
 	private String id;
 	
+	private boolean checkerMet = false;
+	private boolean checkerReset  = false;
+	
 	public ThresholdChecker(String id, int threshold){
 		this.id = id;
 		this.threshold = threshold;
@@ -27,13 +30,24 @@ public class ThresholdChecker implements ConditionChecker{
 	
 	@Override
 	public Boolean eval() {
-		return curVal > threshold;
+		if(curVal > threshold) {
+			checkerMet = true;
+		}
+		return checkerMet;
+	}
+	
+	public boolean isCheckerMet() {
+		return checkerMet;
 	}
 
 	@Override
 	public void reset() {
-		// TODO Auto-generated method stub
-		
+		checkerMet = false;
+		checkerReset = true;
+	}
+	
+	public boolean isCheckerReset() {
+		return checkerReset;
 	}
 	
 	/*
@@ -67,10 +81,5 @@ public class ThresholdChecker implements ConditionChecker{
 		return null;
 	}
 
-	@Override
-	public void setId(String id) {
-		this.id = id;
-		
-	}
 
 }
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index ee376b9..bbe3392 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -18,6 +18,7 @@ import azkaban.trigger.CheckerTypeLoader;
 import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
 import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.TriggerException;
 import azkaban.trigger.TriggerLoader;
 import azkaban.trigger.TriggerManager;
 import azkaban.trigger.TriggerManagerException;
@@ -25,9 +26,13 @@ import azkaban.utils.Props;
 
 public class TriggerManagerTest {
 	
+	private TriggerLoader triggerLoader;
+	
 	@Before
-	public void setup() {
-
+	public void setup() throws TriggerException, TriggerManagerException {
+		triggerLoader = new MockTriggerLoader();
+		
+		
 	}
 	
 	@After
@@ -36,85 +41,137 @@ public class TriggerManagerTest {
 	}
 	
 	@Test
-	public void TriggerManagerSimpleTest() {
+	public void TriggerManagerSimpleTest() throws TriggerManagerException {
+
+		
 		Props props = new Props();
-		TriggerManager triggerManager = new TriggerManager(props, new MockTriggerLoader(), new MockCheckerLoader(), new MockActionLoader());
+		props.put("trigger.scan.interval", 4000);
+		TriggerManager triggerManager = new TriggerManager(props, triggerLoader);
+		
+		triggerManager.getCheckerLoader().registerCheckerType(ThresholdChecker.type, ThresholdChecker.class);
+		triggerManager.getActionLoader().registerActionType(DummyTriggerAction.type, DummyTriggerAction.class);
+		
+		ThresholdChecker.setVal(1);
+		
+		triggerManager.insertTrigger(createDummyTrigger("test1", "triggerLoader", 10));
 		List<Trigger> triggers = triggerManager.getTriggers();
 		assertTrue(triggers.size() == 1);
+		Trigger t1 = triggers.get(0);
+		t1.setResetOnTrigger(false);
+		triggerManager.updateTrigger(t1);
+		ThresholdChecker checker1 = (ThresholdChecker) t1.getTriggerCondition().getCheckers().values().toArray()[0];
+		assertTrue(t1.getSource().equals("triggerLoader"));
 		
-		Trigger t2 = createFakeTrigger("addnewtriggger");
+		Trigger t2 = createDummyTrigger("test2: add new trigger", "addNewTriggerTest", 20);
 		triggerManager.insertTrigger(t2);
+		ThresholdChecker checker2 = (ThresholdChecker) t2.getTriggerCondition().getCheckers().values().toArray()[0];
 		
-		triggers = triggerManager.getTriggers();
-		assertTrue(triggers.size() == 2);
+		ThresholdChecker.setVal(15);
+		try {
+			Thread.sleep(2000);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		assertTrue(checker1.isCheckerMet() == false);
+		assertTrue(checker2.isCheckerMet() == false);
+		assertTrue(checker1.isCheckerReset() == false);
+		assertTrue(checker2.isCheckerReset() == false);
+		
+		try {
+			Thread.sleep(2000);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		assertTrue(checker1.isCheckerMet() == true);
+		assertTrue(checker2.isCheckerMet() == false);
+		assertTrue(checker1.isCheckerReset() == false);
+		assertTrue(checker2.isCheckerReset() == false);
+		
+		ThresholdChecker.setVal(25);
+		try {
+			Thread.sleep(4000);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		assertTrue(checker1.isCheckerMet() == true);
+		assertTrue(checker1.isCheckerReset() == false);
+		assertTrue(checker2.isCheckerReset() == true);
 		
-		triggerManager.removeTrigger(t2);
 		triggers = triggerManager.getTriggers();
 		assertTrue(triggers.size() == 1);
+		
 	}
 	
 	public class MockTriggerLoader implements TriggerLoader {
 
-		private List<Trigger> triggers;
+		private Map<Integer, Trigger> triggers = new HashMap<Integer, Trigger>();
+		private int idIndex = 0;
 		
 		@Override
 		public void addTrigger(Trigger t) throws TriggerManagerException {
-			triggers.add(t);			
+			t.setTriggerId(idIndex++);
+			triggers.put(t.getTriggerId(), t);
 		}
 
 		@Override
 		public void removeTrigger(Trigger s) throws TriggerManagerException {
-			triggers.remove(s);
+			triggers.remove(s.getTriggerId());
 			
 		}
 
 		@Override
 		public void updateTrigger(Trigger t) throws TriggerManagerException {
-
+			triggers.put(t.getTriggerId(), t);
 		}
 
 		@Override
-		public List<Trigger> loadTriggers()
-				throws TriggerManagerException {
-			Trigger t = createFakeTrigger("test");
-			triggers = new ArrayList<Trigger>();
-			triggers.add(t);
-			return triggers;
+		public List<Trigger> loadTriggers() {
+			return new ArrayList<Trigger>(triggers.values());
 		}
 		
 	}
 	
-	private Trigger createFakeTrigger(String message) {
+	private Trigger createDummyTrigger(String message, String source, int threshold) {
 		
 		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		ConditionChecker checker = new ThresholdChecker(ThresholdChecker.type, threshold);
+		checkers.put(checker.getId(), checker);
 		
 		List<TriggerAction> actions = new ArrayList<TriggerAction>();
 		TriggerAction act  = new DummyTriggerAction(message);
 		actions.add(act);
 		
-		String expr = "true";
+		String expr = checker.getId() + ".eval()";
 		
 		Condition triggerCond = new Condition(checkers, expr);
 		Condition expireCond = new Condition(checkers, expr);
 		
-		Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", "tester", triggerCond, expireCond, actions);
+		Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", source, triggerCond, expireCond, actions);
+		fakeTrigger.setResetOnTrigger(true);
+		fakeTrigger.setResetOnExpire(true);
 		
 		return fakeTrigger;
 	}
 
-	public class MockCheckerLoader extends CheckerTypeLoader{
-		
-		@Override
-		public void init(Props props) {
-			checkerToClass.put(ThresholdChecker.type, ThresholdChecker.class);
-		}
-	}
-	
-	public class MockActionLoader extends ActionTypeLoader {
-		@Override
-		public void init(Props props) {
-			actionToClass.put(DummyTriggerAction.type, DummyTriggerAction.class);
-		}
-	}
+//	public class MockCheckerLoader extends CheckerTypeLoader{
+//		
+//		@Override
+//		public void init(Props props) {
+//			checkerToClass.put(ThresholdChecker.type, ThresholdChecker.class);
+//		}
+//	}
+//	
+//	public class MockActionLoader extends ActionTypeLoader {
+//		@Override
+//		public void init(Props props) {
+//			actionToClass.put(DummyTriggerAction.type, DummyTriggerAction.class);
+//		}
+//	}
 
 }
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
index 1d7d81d..6235c12 100644
--- a/unit/java/azkaban/test/trigger/TriggerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -43,7 +43,7 @@ public class TriggerTest {
 	}
 	
 	@Test
-	public void jsonConversionTest() throws TriggerException, IOException {
+	public void jsonConversionTest() throws Exception {
 		DateTime now = DateTime.now();
 		ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now, now.getZone(), true, true, Utils.parsePeriodString("1h"));
 		Map<String, ConditionChecker> checkers1 = new HashMap<String, ConditionChecker>();