azkaban-uncached

moved to separate trigger server

7/24/2013 4:56:37 PM

Changes

src/java/azkaban/jmx/JmxSLAManager.java 45(+0 -45)

src/java/azkaban/jmx/JmxSLAManagerMBean.java 20(+0 -20)

src/java/azkaban/sla/JdbcSLALoader.java 283(+0 -283)

src/java/azkaban/sla/SLA.java 251(+0 -251)

src/java/azkaban/sla/SLALoader.java 14(+0 -14)

src/java/azkaban/sla/SlaMailer.java 63(+0 -63)

src/java/azkaban/sla/SLAManager.java 486(+0 -486)

src/java/azkaban/sla/SLAManagerException.java 13(+0 -13)

src/java/azkaban/sla/SlaOptions.java 51(+0 -51)

unit/java/azkaban/test/sla/JdbcSLALoaderTest.java 180(+0 -180)

Details

diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 527956d..5d51a2b 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -70,19 +70,27 @@ public class ExecutorManager {
 	
 	private long lastThreadCheckTime = -1;
 	
-	public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
+	private final boolean isPrimary;
+	
+	public ExecutorManager(Props props, ExecutorLoader loader, boolean isPrimary) throws ExecutorManagerException {
 		this.executorLoader = loader;
 		this.loadRunningFlows();
 		
 		executorHost = props.getString("executor.host", "localhost");
 		executorPort = props.getInt("executor.port");
+		
 		mailer = new ExecutorMailer(props);
-		executingManager = new ExecutingManagerUpdaterThread();
-		executingManager.start();
+		
+		this.isPrimary = isPrimary;		
+		
+		if(isPrimary) {
+			executingManager = new ExecutingManagerUpdaterThread();
+			executingManager.start();
 
-		long executionLogsRetentionMs = props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
-		cleanerThread = new CleanerThread(executionLogsRetentionMs);
-		cleanerThread.start();
+			long executionLogsRetentionMs = props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
+			cleanerThread = new CleanerThread(executionLogsRetentionMs);
+			cleanerThread.start();
+		}
 	}
 	
 	public String getExecutorHost() {
@@ -128,6 +136,11 @@ public class ExecutorManager {
 		return ports;
 	}
 	
+	public ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
+		ExecutableFlow exflow = executorLoader.fetchExecutableFlow(execId);
+		return exflow;
+	}
+	
 	private void loadRunningFlows() throws ExecutorManagerException {
 		runningFlows.putAll(executorLoader.fetchActiveFlows());
 	}
@@ -593,6 +606,8 @@ public class ExecutorManager {
 				try {
 					lastThreadCheckTime = System.currentTimeMillis();
 					
+					loadRunningFlows();
+					
 					Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
 					ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
 					ArrayList<ExecutableFlow> finalizeFlows = new ArrayList<ExecutableFlow>();
diff --git a/src/java/azkaban/jmx/JmxTriggerRunnerManager.java b/src/java/azkaban/jmx/JmxTriggerRunnerManager.java
new file mode 100644
index 0000000..d0af406
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxTriggerRunnerManager.java
@@ -0,0 +1,22 @@
+package azkaban.jmx;
+
+import azkaban.triggerapp.TriggerRunnerManager;
+
+public class JmxTriggerRunnerManager implements JmxTriggerRunnerManagerMBean {
+	private TriggerRunnerManager manager;
+	
+	public JmxTriggerRunnerManager(TriggerRunnerManager manager) {
+		this.manager = manager;
+	}
+
+	@Override
+	public long getLastRunnerThreadCheckTime() {
+		return manager.getLastRunnerThreadCheckTime();
+	}
+
+	@Override
+	public boolean isRunnerThreadActive() {
+		return manager.isRunnerThreadActive();
+	}
+
+}
diff --git a/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java
new file mode 100644
index 0000000..77b72e7
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java
@@ -0,0 +1,11 @@
+package azkaban.jmx;
+
+public interface JmxTriggerRunnerManagerMBean {
+
+	@DisplayName("OPERATION: getLastRunnerThreadCheckTime")
+	public long getLastRunnerThreadCheckTime();
+
+	@DisplayName("OPERATION: isRunnerThreadActive")
+	public boolean isRunnerThreadActive();
+
+}
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index 818fb2b..eea9c46 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -40,15 +40,13 @@ public class ProjectManager {
 	private TriggerManager triggerManager;
 	private boolean loadTriggerFromFile = false;
 	
-	public ProjectManager(ProjectLoader loader, Props props, TriggerManager triggerManager) {
+	public ProjectManager(ProjectLoader loader, Props props) {
 		this.projectLoader = loader;
 		this.props = props;
 		this.tempDir = new File(this.props.getString("project.temp.dir", "temp"));
 		this.projectVersionRetention = (props.getInt("project.version.retention", 3));
 		logger.info("Project version retention is set to " + projectVersionRetention);
 		
-		this.triggerManager = triggerManager;
-		
 		this.creatorDefaultPermissions = props.getBoolean("creator.default.proxy", true);
 		
 		this.loadTriggerFromFile = props.getBoolean("enable.load.trigger.from.file", false);
@@ -59,6 +57,10 @@ public class ProjectManager {
 		
 		loadAllProjects();
 	}
+
+	public void setTriggerManager(TriggerManager triggerManager) {
+		this.triggerManager = triggerManager;
+	}
 	
 	public void setLoadTriggerFromFile(boolean enable) {
 		this.loadTriggerFromFile = enable;
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 931b4e7..ccbad60 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -31,7 +31,6 @@ import org.joda.time.Seconds;
 import org.joda.time.Weeks;
 
 import azkaban.executor.ExecutionOptions;
-import azkaban.sla.SlaOptions;
 import azkaban.utils.Pair;
 
 public class Schedule{
@@ -57,7 +56,6 @@ public class Schedule{
 	private boolean skipPastOccurrences = true;
 	
 	private ExecutionOptions executionOptions;
-	private SlaOptions slaOptions;
 	
 	public Schedule(
 						int scheduleId,
@@ -86,7 +84,6 @@ public class Schedule{
 				nextExecTime,
 				submitTime,
 				submitUser,
-				null,
 				null
 				);
 	}
@@ -104,8 +101,7 @@ public class Schedule{
 						long nextExecTime,						
 						long submitTime,
 						String submitUser,
-						ExecutionOptions executionOptions,
-						SlaOptions slaOptions
+						ExecutionOptions executionOptions
 			) {
 		this(scheduleId, projectId, 
 				projectName, 
@@ -118,8 +114,7 @@ public class Schedule{
 				nextExecTime,
 				submitTime,
 				submitUser,
-				executionOptions,
-				slaOptions
+				executionOptions
 				);
 	}
 
@@ -136,8 +131,7 @@ public class Schedule{
 						long nextExecTime,
 						long submitTime,
 						String submitUser,
-						ExecutionOptions executionOptions,
-						SlaOptions slaOptions
+						ExecutionOptions executionOptions
 						) {
 		this.scheduleId = scheduleId;
 		this.projectId = projectId;
@@ -152,7 +146,6 @@ public class Schedule{
 		this.status = status;
 		this.submitTime = submitTime;
 		this.executionOptions = executionOptions;
-		this.slaOptions = slaOptions;
 	}
 
 	public ExecutionOptions getExecutionOptions() {
@@ -163,14 +156,6 @@ public class Schedule{
 		this.executionOptions = executionOptions;
 	}
 
-	public SlaOptions getSlaOptions() {
-		return slaOptions;
-	}
-
-	public void setSlaOptions(SlaOptions slaOptions) {
-		this.slaOptions = slaOptions;
-	}
-
 	public String getScheduleName() {
 		return projectName + "." + flowName + " (" + projectId + ")";
 	}
@@ -345,15 +330,12 @@ public class Schedule{
 	
 	
 	public Map<String,Object> optionsToObject() {
-		if(executionOptions != null || slaOptions != null) {
+		if(executionOptions != null ) {
 			HashMap<String, Object> schedObj = new HashMap<String, Object>();
 			
 			if(executionOptions != null) {
 				schedObj.put("executionOptions", executionOptions.toObject());
 			}
-			if(slaOptions != null) {
-				schedObj.put("slaOptions", slaOptions.toObject());
-			}
 	
 			return schedObj;
 		}
@@ -377,10 +359,6 @@ public class Schedule{
 			this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
 		}
 
-		if (schedObj.containsKey("slaOptions")) {
-			SlaOptions slaOptions = SlaOptions.fromObject(schedObj.get("slaOptions"));
-			this.slaOptions = slaOptions;
-		}
 	}
 
 	public boolean isRecurring() {
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 49e48c3..4ce0e25 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -16,7 +16,6 @@
 
 package azkaban.scheduler;
 
-import java.io.File;
 import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -42,12 +41,6 @@ import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
-import azkaban.sla.SLA.SlaAction;
-import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLA.SlaSetting;
-import azkaban.sla.SLAManager;
-import azkaban.sla.SlaOptions;
-import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAgent;
 import azkaban.trigger.TriggerStatus;
 import azkaban.utils.Pair;
@@ -70,8 +63,8 @@ public class ScheduleManager implements TriggerAgent {
 	private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
 	
 	private final ExecutorManager executorManager;
-	private final ProjectManager projectManager;
-	private final SLAManager slaManager;
+	
+	private ProjectManager projectManager = null;
 	
 	private final boolean useExternalRunner;
 	private final ScheduleRunner runner;
@@ -86,28 +79,27 @@ public class ScheduleManager implements TriggerAgent {
 	 * @param loader
 	 */
 	public ScheduleManager (ExecutorManager executorManager,
-							ProjectManager projectManager, 
-							SLAManager slaManager,
 							ScheduleLoader loader,
 							boolean useExternalRunner) 
 	{
 		this.executorManager = executorManager;
-		this.projectManager = projectManager;
-		this.slaManager = slaManager;
 		this.loader = loader;
 		this.useExternalRunner = useExternalRunner;
 		
 		if(!useExternalRunner) {
 			this.runner = new ScheduleRunner();
-			load();
 		} else {
 			this.runner = null;
 		}
 		
 	}
 	
+	public void setProjectManager(ProjectManager projectManager) {
+		this.projectManager = projectManager;
+	}
+	
 	@Override
-	public void load() {
+	public void start() throws ScheduleManagerException {
 		List<Schedule> scheduleList = null;
 		try {
 			scheduleList = loader.loadSchedules();
@@ -126,6 +118,9 @@ public class ScheduleManager implements TriggerAgent {
 		}
 
 		if(!useExternalRunner) {
+			if(projectManager == null) {
+				throw new ScheduleManagerException("Project Manager must be initialized when using internal schedule runner!");
+			}
 			this.runner.start();
 		}
 	}
@@ -289,7 +284,7 @@ public class ScheduleManager implements TriggerAgent {
 			final long submitTime,
 			final String submitUser
 			) {
-		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null);
 	}
 	
 	public Schedule scheduleFlow(
@@ -305,10 +300,9 @@ public class ScheduleManager implements TriggerAgent {
 			final long nextExecTime,
 			final long submitTime,
 			final String submitUser,
-			ExecutionOptions execOptions,
-			SlaOptions slaOptions
+			ExecutionOptions execOptions
 			) {
-		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
+		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions);
 		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
 				+ _dateFormat.print(firstSchedTime) + " with a period of "
 				+ period == null ? "(non-recurring)" : period);
@@ -531,25 +525,6 @@ public class ScheduleManager implements TriggerAgent {
 										throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
 									}
 									
-									SlaOptions slaOptions = runningSched.getSlaOptions();
-									if(slaOptions != null) {
-										logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
-										// submit flow slas
-										List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
-										for(SlaSetting set : slaOptions.getSettings()) {
-											if(set.getId().equals("")) {
-												DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
-												slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
-											}
-											else {
-												jobsettings.add(set);
-											}
-										}
-										if(jobsettings.size() > 0) {
-											slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
-										}
-									}
-									
 								} 
 								catch (ExecutorManagerException e) {
 									if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
diff --git a/src/java/azkaban/scheduler/ScheduleManagerException.java b/src/java/azkaban/scheduler/ScheduleManagerException.java
index a977e2a..a15cabf 100644
--- a/src/java/azkaban/scheduler/ScheduleManagerException.java
+++ b/src/java/azkaban/scheduler/ScheduleManagerException.java
@@ -26,4 +26,8 @@ public class ScheduleManagerException extends Exception{
 	public ScheduleManagerException(String message, Throwable cause) {
 		super(message, cause);
 	}
+
+	public ScheduleManagerException(Exception e) {
+		super(e);
+	}
 }
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 311d673..b79fc3f 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -8,7 +8,6 @@ import java.util.Map;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
-import azkaban.actions.ExecuteFlowAction;
 import azkaban.executor.ExecutorManager;
 import azkaban.project.ProjectManager;
 import azkaban.trigger.Condition;
@@ -18,6 +17,8 @@ import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerManager;
 import azkaban.trigger.TriggerManagerException;
 import azkaban.trigger.TriggerStatus;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
 
 public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	
@@ -80,7 +81,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	public void insertSchedule(Schedule s) throws ScheduleManagerException {
 		Trigger t = scheduleToTrigger(s);
 		try {
-			triggerManager.insertTrigger(t);
+			triggerManager.insertTrigger(t, t.getSubmitUser());
 			s.setScheduleId(t.getTriggerId());
 //			triggersLocalCopy.put(t.getTriggerId(), t);
 		} catch (TriggerManagerException e) {
@@ -93,7 +94,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	public void updateSchedule(Schedule s) throws ScheduleManagerException {
 		Trigger t = scheduleToTrigger(s);
 		try {
-			triggerManager.updateTrigger(t);
+			triggerManager.updateTrigger(t, t.getSubmitUser());
 //			triggersLocalCopy.put(t.getTriggerId(), t);
 		} catch (TriggerManagerException e) {
 			// TODO Auto-generated catch block
@@ -161,7 +162,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	@Override
 	public void removeSchedule(Schedule s) throws ScheduleManagerException {
 		try {
-			triggerManager.removeTrigger(s.getScheduleId());
+			triggerManager.removeTrigger(s.getScheduleId(), s.getSubmitUser());
 //			triggersLocalCopy.remove(s.getScheduleId());
 		} catch (TriggerManagerException e) {
 			// TODO Auto-generated catch block
@@ -180,7 +181,14 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 
 	@Override
 	public synchronized List<Schedule> loadUpdatedSchedules() throws ScheduleManagerException {
-		List<Trigger> triggers = triggerManager.getUpdatedTriggers(triggerSource, lastUpdateTime);
+		List<Trigger> triggers;
+		try {
+			triggers = triggerManager.getUpdatedTriggers(triggerSource, lastUpdateTime);
+		} catch (TriggerManagerException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			throw new ScheduleManagerException(e);
+		}
 		List<Schedule> schedules = new ArrayList<Schedule>();
 		for(Trigger t : triggers) {
 			lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
index f3032d7..799c487 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduler.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -28,17 +28,15 @@ import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
-import azkaban.actions.ExecuteFlowAction;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
 import azkaban.project.ProjectManager;
-import azkaban.sla.SLAManager;
-import azkaban.sla.SlaOptions;
 import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerManager;
+import azkaban.trigger.builtin.ExecuteFlowAction;
 import azkaban.utils.Pair;
 
 /**
@@ -149,7 +147,7 @@ public class TriggerBasedScheduler {
 			final long submitTime,
 			final String submitUser
 			) {
-		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null);
 	}
 	
 	public Schedule scheduleFlow(
@@ -165,10 +163,9 @@ public class TriggerBasedScheduler {
 			final long nextExecTime,
 			final long submitTime,
 			final String submitUser,
-			ExecutionOptions execOptions,
-			SlaOptions slaOptions
+			ExecutionOptions execOptions
 			) {
-		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
+		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions);
 		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
 				+ _dateFormat.print(firstSchedTime) + " with a period of "
 				+ period == null ? "(non-recurring)" : period);
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 1f9a58e..1e4cc3c 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -14,7 +14,7 @@ import java.util.Set;
 
 import org.apache.log4j.Logger;
 
-import azkaban.actions.ExecuteFlowAction;
+import azkaban.trigger.builtin.ExecuteFlowAction;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 
diff --git a/src/java/azkaban/trigger/builtin/CreateTriggerAction.java b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
new file mode 100644
index 0000000..e231023
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
@@ -0,0 +1,64 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.triggerapp.TriggerRunnerManager;
+
+public class CreateTriggerAction implements TriggerAction {
+	
+	public static final String type = "CreateTriggerAction";
+	private static TriggerRunnerManager triggerRunnerManager;
+	private Trigger trigger;
+
+	public CreateTriggerAction(Trigger trigger) {
+		this.trigger = trigger;
+	}
+	
+	@Override
+	public String getType() {
+		return type;
+	}
+	
+	public static void setTriggerRunnerManager(TriggerRunnerManager trm) {
+		triggerRunnerManager = trm;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static CreateTriggerAction createFromJson(Object obj) throws Exception {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		if(!jsonObj.get("type").equals(type)) {
+			throw new Exception("Cannot create action of " + type + " from " + jsonObj.get("type"));
+		}
+		Trigger trigger = Trigger.fromJson(jsonObj.get("trigger"));
+		return new CreateTriggerAction(trigger);
+	}
+	
+	@Override
+	public CreateTriggerAction fromJson(Object obj) throws Exception {
+		// TODO Auto-generated method stub
+		return createFromJson(obj);
+	}
+
+	@Override
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("type", type);
+		jsonObj.put("trigger", trigger.toJson());
+
+		return jsonObj;
+	}
+
+	@Override
+	public void doAction() throws Exception {
+		triggerRunnerManager.insertTrigger(trigger);
+	}
+
+	@Override
+	public String getDescription() {
+		return "create another: " + trigger.getDescription();
+	}
+
+}
diff --git a/src/java/azkaban/trigger/builtin/ExecutableFlowStatusChecker.java b/src/java/azkaban/trigger/builtin/ExecutableFlowStatusChecker.java
new file mode 100644
index 0000000..81553f9
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/ExecutableFlowStatusChecker.java
@@ -0,0 +1,104 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.trigger.ConditionChecker;
+
+public class ExecutableFlowStatusChecker implements ConditionChecker{
+	public static final String type = "ExecutableFlowStatusChecker";
+	private static Logger logger = Logger.getLogger(ExecutableFlowStatusChecker.class);
+	private int execId;
+	private Status status;
+	private String id;
+	private static ExecutorManager executorManager;
+	
+	public ExecutableFlowStatusChecker(int execId, Status status, String id) {
+		this.execId = execId;
+		this.status = status;
+		this.id = id;
+	}
+	
+	public static void setExecutorManager(ExecutorManager em) {
+		executorManager = em;
+	}
+	
+	@Override
+	public Object eval() {
+		ExecutableFlow exflow;
+		try {
+			exflow = executorManager.fetchExecutableFlow(execId);
+		} catch (ExecutorManagerException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			logger.error("Failed to get executable flow status.");
+			return Boolean.FALSE;
+		}
+		Status flowStatus = exflow.getStatus();
+		return flowStatus.equals(status);
+	}
+
+	@Override
+	public Object getNum() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void reset() {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public String getId() {
+		return id;
+	}
+
+	@Override
+	public String getType() {
+		// TODO Auto-generated method stub
+		return type;
+	}
+
+	@Override
+	public ExecutableFlowStatusChecker fromJson(Object obj) throws Exception {
+		return createFromJson(obj);
+	}
+
+	@SuppressWarnings("unchecked")
+	public static ExecutableFlowStatusChecker createFromJson(Object obj) throws Exception {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		if(!jsonObj.get("type").equals(type)) {
+			throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+		}
+		String id = (String) jsonObj.get("id");
+		int execId = Integer.valueOf((String) jsonObj.get("execId"));
+		Status status = Status.valueOf((String) jsonObj.get("status"));
+		return new ExecutableFlowStatusChecker(execId, status, id);
+	}
+	
+	@Override
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("type", type);
+		jsonObj.put("execId", String.valueOf(execId));
+		jsonObj.put("status", status.toString());
+		jsonObj.put("id", id);
+		return jsonObj;
+	}
+
+	@Override
+	public void stopChecker() {
+		// TODO Auto-generated method stub
+		
+	}
+
+	
+}
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index c64a66a..74f9ddf 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -14,7 +14,7 @@ import java.util.Set;
 
 import org.apache.log4j.Logger;
 
-import azkaban.scheduler.BasicTimeChecker;
+import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index 5d809d5..fcc1fa0 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -113,7 +113,7 @@ public class Condition {
 		} catch(Exception e) {
 			e.printStackTrace();
 			logger.error("Failed to recreate condition from json.", e);
-			return null;
+			throw new Exception("Failed to recreate condition from json.", e);
 		}
 		
 		return cond;
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 8ce3c46..920eddf 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -18,5 +18,5 @@ public interface ConditionChecker {
 	Object toJson();
 
 	void stopChecker();
-
+	
 }
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
index b5bde55..37f8131 100644
--- a/src/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -43,11 +43,14 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 	
 	private static final String triggerTblName = "triggers";
 
+	private static final String GET_UPDATED_TRIGGERS = 
+			"SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM " + triggerTblName + " WHERE modify_time>=?";
+
 	private static String GET_ALL_TRIGGERS =
 			"SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM " + triggerTblName;
 	
 	private static String GET_TRIGGER = 
-			"SELECT trigger_source, modify_time, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
+			"SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
 	
 	private static String ADD_TRIGGER = 
 			"INSERT INTO " + triggerTblName + " ( modify_time) values (?)";
@@ -71,6 +74,31 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 	}
 
 	@Override
+	public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerManagerException {
+		logger.info("Loading triggers changed since " + new DateTime(lastUpdateTime).toString());
+		Connection connection = getConnection();
+
+		QueryRunner runner = new QueryRunner();
+		ResultSetHandler<List<Trigger>> handler = new TriggerResultHandler();
+	
+		List<Trigger> triggers;
+		
+		try {
+			triggers = runner.query(connection, GET_UPDATED_TRIGGERS, handler, lastUpdateTime);
+		} catch (SQLException e) {
+			logger.error(GET_ALL_TRIGGERS + " failed.");
+
+			throw new TriggerManagerException("Loading triggers from db failed. ", e);
+		} finally {
+			DbUtils.closeQuietly(connection);
+		}
+		
+		logger.info("Loaded " + triggers.size() + " triggers.");
+		
+		return triggers;
+	}
+	
+	@Override
 	public List<Trigger> loadTriggers() throws TriggerManagerException {
 		logger.info("Loading all triggers from db.");
 		Connection connection = getConnection();
@@ -155,6 +183,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 		logger.info("Updating trigger " + t.toString() + " into db.");
 		Connection connection = getConnection();
 		try{
+			t.setLastModifyTime(DateTime.now());
 			updateTrigger(connection, t, defaultEncodingType);
 		}
 		catch(Exception e) {
@@ -189,7 +218,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 			int updates =  runner.update( connection, 
 					UPDATE_TRIGGER, 
 					t.getSource(),
-					DateTime.now().getMillis(),
+					t.getLastModifyTime().getMillis(),
 					encType.getNumVal(),
 					data,
 					t.getTriggerId());
@@ -282,5 +311,33 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 		return connection;
 	}
 
+	@Override
+	public Trigger loadTrigger(int triggerId) throws TriggerManagerException {
+		logger.info("Loading trigger " + triggerId + " from db.");
+		Connection connection = getConnection();
+
+		QueryRunner runner = new QueryRunner();
+		ResultSetHandler<List<Trigger>> handler = new TriggerResultHandler();
+	
+		List<Trigger> triggers;
+		
+		try {
+			triggers = runner.query(connection, GET_TRIGGER, handler, triggerId);
+		} catch (SQLException e) {
+			logger.error(GET_TRIGGER + " failed.");
+			throw new TriggerManagerException("Loading trigger from db failed. ", e);
+		} finally {
+			DbUtils.closeQuietly(connection);
+		}
+		
+		if(triggers.size() == 0) {
+			logger.error("Failed to load trigger " + triggerId);
+			throw new TriggerManagerException("Failed to load trigger " + triggerId);
+		}
+		
+		return triggers.get(0);
+	}
+
+	
 
 }
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index 673facc..962cf7a 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -22,6 +22,9 @@ public class Trigger {
 	private Condition triggerCondition;
 	private Condition expireCondition;
 	private List<TriggerAction> actions;
+	private List<TriggerAction> expireActions;
+	
+	private Map<String, Object> info = new HashMap<String, Object>();
 	
 	private static ActionTypeLoader actionTypeLoader;
 	
@@ -61,6 +64,58 @@ public class Trigger {
 		return actions;
 	}
 
+	public List<TriggerAction> getExpireActions() {
+		return expireActions;
+	}
+	
+	public Map<String, Object> getInfo() {
+		return info;
+	}
+
+	public void setInfo(Map<String, Object> info) {
+		this.info = info;
+	}
+
+	public Trigger(
+			DateTime lastModifyTime, 
+			DateTime submitTime, 
+			String submitUser, 
+			String source,
+			Condition triggerCondition,
+			Condition expireCondition,
+			List<TriggerAction> actions, 
+			List<TriggerAction> expireActions,
+			Map<String, Object> info) {
+		this.lastModifyTime = lastModifyTime;
+		this.submitTime = submitTime;
+		this.submitUser = submitUser;
+		this.source = source;
+		this.triggerCondition = triggerCondition;
+		this.expireCondition = expireCondition;
+		this.actions = actions;
+		this.expireActions = expireActions;
+		this.info = info;
+	}
+	
+	public Trigger(
+			DateTime lastModifyTime, 
+			DateTime submitTime, 
+			String submitUser, 
+			String source,
+			Condition triggerCondition,
+			Condition expireCondition,
+			List<TriggerAction> actions, 
+			List<TriggerAction> expireActions) {
+		this.lastModifyTime = lastModifyTime;
+		this.submitTime = submitTime;
+		this.submitUser = submitUser;
+		this.source = source;
+		this.triggerCondition = triggerCondition;
+		this.expireCondition = expireCondition;
+		this.actions = actions;
+		this.expireActions = expireActions;
+	}
+	
 	public Trigger(
 			DateTime lastModifyTime, 
 			DateTime submitTime, 
@@ -76,6 +131,7 @@ public class Trigger {
 		this.triggerCondition = triggerCondition;
 		this.expireCondition = expireCondition;
 		this.actions = actions;
+		this.expireActions = new ArrayList<TriggerAction>();
 	}
 	
 	public Trigger(
@@ -86,7 +142,31 @@ public class Trigger {
 			String source,
 			Condition triggerCondition,
 			Condition expireCondition,
-			List<TriggerAction> actions) {
+			List<TriggerAction> actions,
+			List<TriggerAction> expireActions,
+			Map<String, Object> info) {
+		this.triggerId = triggerId;
+		this.lastModifyTime = lastModifyTime;
+		this.submitTime = submitTime;
+		this.submitUser = submitUser;
+		this.source = source;
+		this.triggerCondition = triggerCondition;
+		this.expireCondition = expireCondition;
+		this.actions = actions;
+		this.expireActions = expireActions;
+		this.info = info;
+	}
+	
+	public Trigger(
+			int triggerId,
+			DateTime lastModifyTime, 
+			DateTime submitTime, 
+			String submitUser, 
+			String source,
+			Condition triggerCondition,
+			Condition expireCondition,
+			List<TriggerAction> actions,
+			List<TriggerAction> expireActions) {
 		this.triggerId = triggerId;
 		this.lastModifyTime = lastModifyTime;
 		this.submitTime = submitTime;
@@ -95,6 +175,7 @@ public class Trigger {
 		this.triggerCondition = triggerCondition;
 		this.expireCondition = expireCondition;
 		this.actions = actions;
+		this.expireActions = expireActions;
 	}
 	
 	public static synchronized void setActionTypeLoader(ActionTypeLoader loader) {
@@ -124,6 +205,10 @@ public class Trigger {
 	public DateTime getLastModifyTime() {
 		return lastModifyTime;
 	}
+	
+	public void setLastModifyTime(DateTime lastModifyTime) {
+		this.lastModifyTime = lastModifyTime;
+	}
 
 	public void setTriggerId(int id) {
 		this.triggerId = id;
@@ -165,6 +250,15 @@ public class Trigger {
 			actionsJson.add(oneActionJson);
 		}
 		jsonObj.put("actions", actionsJson);
+		List<Object> expireActionsJson = new ArrayList<Object>();
+		for(TriggerAction expireAction : expireActions) {
+			Map<String, Object> oneExpireActionJson = new HashMap<String, Object>();
+			oneExpireActionJson.put("type", expireAction.getType());
+			oneExpireActionJson.put("actionJson", expireAction.toJson());
+			expireActionsJson.add(oneExpireActionJson);
+		}
+		jsonObj.put("expireActions", expireActionsJson);
+		
 		jsonObj.put("resetOnTrigger", String.valueOf(resetOnTrigger));
 		jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
 		jsonObj.put("submitUser", submitUser);
@@ -173,7 +267,7 @@ public class Trigger {
 		jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime.getMillis()));
 		jsonObj.put("triggerId", String.valueOf(triggerId));
 		jsonObj.put("status", status.toString());
-		
+		jsonObj.put("info", info);
 		return jsonObj;
 	}
 	
@@ -203,6 +297,14 @@ public class Trigger {
 				TriggerAction act = actionTypeLoader.createActionFromJson(type, oneActionJson.get("actionJson"));
 				actions.add(act);
 			}
+			List<TriggerAction> expireActions = new ArrayList<TriggerAction>();
+			List<Object> expireActionsJson = (List<Object>) jsonObj.get("expireActions");
+			for(Object expireActObj : expireActionsJson) {
+				Map<String, Object> oneExpireActionJson = (HashMap<String, Object>) expireActObj;
+				String type = (String) oneExpireActionJson.get("type");
+				TriggerAction expireAct = actionTypeLoader.createActionFromJson(type, oneExpireActionJson.get("actionJson"));
+				expireActions.add(expireAct);
+			}
 			boolean resetOnTrigger = Boolean.valueOf((String) jsonObj.get("resetOnTrigger"));
 			boolean resetOnExpire = Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
 			String submitUser = (String) jsonObj.get("submitUser");
@@ -213,14 +315,15 @@ public class Trigger {
 			DateTime lastModifyTime = new DateTime(lastModifyTimeMillis);
 			int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
 			TriggerStatus status = TriggerStatus.valueOf((String)jsonObj.get("status"));
-			trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions);
+			Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
+			trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions, expireActions, info);
 			trigger.setResetOnExpire(resetOnExpire);
 			trigger.setResetOnTrigger(resetOnTrigger);
 			trigger.setStatus(status);
 		}catch(Exception e) {
 			e.printStackTrace();
 			logger.error("Failed to decode the trigger.", e);
-			return null;
+			throw new Exception("Failed to decode the trigger.", e);
 		}
 		
 		return trigger;
diff --git a/src/java/azkaban/trigger/TriggerAction.java b/src/java/azkaban/trigger/TriggerAction.java
index f6adf5d..85b6ad8 100644
--- a/src/java/azkaban/trigger/TriggerAction.java
+++ b/src/java/azkaban/trigger/TriggerAction.java
@@ -4,7 +4,7 @@ public interface TriggerAction {
 	
 	String getType();
 	
-	TriggerAction fromJson(Object obj);
+	TriggerAction fromJson(Object obj) throws Exception;
 	
 	Object toJson();
 	
diff --git a/src/java/azkaban/trigger/TriggerAgent.java b/src/java/azkaban/trigger/TriggerAgent.java
index f86d289..453f49d 100644
--- a/src/java/azkaban/trigger/TriggerAgent.java
+++ b/src/java/azkaban/trigger/TriggerAgent.java
@@ -1,17 +1,12 @@
 package azkaban.trigger;
 
-import java.io.File;
-
 import azkaban.utils.Props;
 
 public interface TriggerAgent {
-	public void loadTriggerFromProps(Props props) throws Exception;
+	void loadTriggerFromProps(Props props) throws Exception;
 
-	public String getTriggerSource();
+	String getTriggerSource();
 	
-	void load();
-
-//	// update local copy
-//	public void updateLocal();
+	void start() throws Exception;
 
 }
diff --git a/src/java/azkaban/trigger/TriggerLoader.java b/src/java/azkaban/trigger/TriggerLoader.java
index cf37634..c3e604b 100644
--- a/src/java/azkaban/trigger/TriggerLoader.java
+++ b/src/java/azkaban/trigger/TriggerLoader.java
@@ -13,6 +13,10 @@ public interface TriggerLoader {
 	
 	public void updateTrigger(Trigger t) throws TriggerManagerException;
 	
-	public List<Trigger> loadTriggers() throws TriggerManagerException;	
+	public List<Trigger> loadTriggers() throws TriggerManagerException;
+
+	public Trigger loadTrigger(int triggerId) throws TriggerManagerException;
+
+	public List<Trigger> getUpdatedTriggers(long lastUpdateTime) throws TriggerManagerException;
 	
 }
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 11311dd..460a75f 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -1,66 +1,112 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 package azkaban.trigger;
 
 import java.io.File;
 import java.io.FileFilter;
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
 
+import azkaban.triggerapp.TriggerConnectorParams;
 import azkaban.utils.JSONUtils;
+import azkaban.utils.Pair;
 import azkaban.utils.Props;
 
+/**
+ * Executor manager used to manage the client side job.
+ *
+ */
 public class TriggerManager {
 	private static Logger logger = Logger.getLogger(TriggerManager.class);
-	
+
 	private static final String TRIGGER_SUFFIX = ".trigger";
 	
-	private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
+	private TriggerLoader triggerLoader;
+	private CheckerTypeLoader checkerTypeLoader;
+	private ActionTypeLoader actionTypeLoader;
 	
-	private CheckerTypeLoader checkerLoader;
-	private ActionTypeLoader actionLoader;
+	private String triggerServerHost;
+	private int triggerServerPort;
 	
-	private static TriggerLoader triggerLoader;
+	private Set<Pair<String, Integer>> triggerServers = new HashSet<Pair<String,Integer>>();
 	
-	private static TriggerScannerThread scannerThread;
+	private ConcurrentHashMap<Integer, Trigger> triggerIdMap = new ConcurrentHashMap<Integer, Trigger>();
 	
 	private Map<String, TriggerAgent> triggerAgents = new HashMap<String, TriggerAgent>();
+
+	private TriggerManagerUpdaterThread triggerManagingThread;
 	
-	public TriggerManager(Props props, TriggerLoader triggerLoader) {
-		
-		TriggerManager.triggerLoader = triggerLoader;
-		checkerLoader = new CheckerTypeLoader();
-		actionLoader = new ActionTypeLoader();
+	private long lastThreadCheckTime = -1;
+	
+	private long lastUpdateTime = -1;
+	
+	public TriggerManager(Props props, TriggerLoader loader) throws TriggerManagerException {
+		this.triggerLoader = loader;
+		this.checkerTypeLoader = new CheckerTypeLoader();
+		this.actionTypeLoader = new ActionTypeLoader();
+
+		triggerServerHost = props.getString("trigger.server.host", "localhost");
+		triggerServerPort = props.getInt("trigger.server.port");
+
+		triggerManagingThread = new TriggerManagerUpdaterThread();
 		
-		// load plugins
 		try{
-			checkerLoader.init(props);
-			actionLoader.init(props);
+			checkerTypeLoader.init(props);
+			actionTypeLoader.init(props);
 		} catch(Exception e) {
 			e.printStackTrace();
 			logger.error(e.getMessage());
 		}
 		
-		Condition.setCheckerLoader(checkerLoader);
-		Trigger.setActionTypeLoader(actionLoader);
-		
-		checkerLoader = new CheckerTypeLoader();
-		actionLoader = new ActionTypeLoader();
-		
-		long scannerInterval = props.getLong("trigger.scan.interval", TriggerScannerThread.DEFAULT_SCAN_INTERVAL_MS);
-		scannerThread = new TriggerScannerThread(scannerInterval);
-		scannerThread.setName("TriggerScannerThread");
+		Condition.setCheckerLoader(checkerTypeLoader);
+		Trigger.setActionTypeLoader(actionTypeLoader);
 		
+		triggerServers.add(new Pair<String, Integer>(triggerServerHost, triggerServerPort));
+
+	}
+	
+	public void start() throws Exception {
+		loadTriggers();
+		for(TriggerAgent agent : triggerAgents.values()) {
+			agent.start();
+		}
+		triggerManagingThread.start();
 	}
 	
 	private static class SuffixFilter implements FileFilter {
 		private String suffix;
-		
 		public SuffixFilter(String suffix) {
 			this.suffix = suffix;
 		}
@@ -68,257 +114,385 @@ public class TriggerManager {
 		@Override
 		public boolean accept(File pathname) {
 			String name = pathname.getName();
-			
 			return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
 		}
 	}
 	
-	@SuppressWarnings("unchecked")
-	public void loadTriggerFromDir(File baseDir, Props props) throws Exception {
-		File[] triggerFiles = baseDir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
-		
-		for(File triggerFile : triggerFiles) {
-			Props triggerProps = new Props(props, triggerFile);
-			String triggerType = triggerProps.getString("trigger.type");
-			TriggerAgent agent = triggerAgents.get(triggerType);
-			if(agent != null) {
-				agent.loadTriggerFromProps(triggerProps);
-			} else {
-				throw new Exception("Trigger " + triggerType + " is not supported.");
-			}
-		}
+	public String getTriggerServerHost() {
+		return triggerServerHost;
 	}
 	
-	public void addTriggerAgent(String triggerSource, TriggerAgent agent) throws TriggerManagerException {
-		if(triggerAgents.containsKey(triggerSource)) {
-			throw new TriggerManagerException("Trigger agent " + triggerSource + " already exists!" );
-		}
-		this.triggerAgents.put(triggerSource, agent);
+	public int getTriggerServerPort() {
+		return triggerServerPort;
 	}
 	
-	public void start() {
-		
-		try{
-			// expect loader to return valid triggers
-			List<Trigger> triggers = triggerLoader.loadTriggers();
-			for(Trigger t : triggers) {
-				scannerThread.addTrigger(t);
-				triggerIdMap.put(t.getTriggerId(), t);
-			}
-		}catch(Exception e) {
-			e.printStackTrace();
-			logger.error(e.getMessage());
-		}
-		
-		for(TriggerAgent agent : triggerAgents.values()) {
-			agent.load();
-		}
-		
-		scannerThread.start();
+	public State getUpdaterThreadState() {
+		return triggerManagingThread.getState();
 	}
 	
-	public CheckerTypeLoader getCheckerLoader() {
-		return checkerLoader;
+	public boolean isThreadActive() {
+		return triggerManagingThread.isAlive();
 	}
-
-	public ActionTypeLoader getActionLoader() {
-		return actionLoader;
+	
+	public long getLastThreadCheckTime() {
+		return lastThreadCheckTime;
 	}
-
-	public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
-		
-		triggerLoader.addTrigger(t);
-		triggerIdMap.put(t.getTriggerId(), t);
-		scannerThread.addTrigger(t);
+	
+	public Set<String> getPrimaryServerHosts() {
+		// Only one for now. More probably later.
+		HashSet<String> ports = new HashSet<String>();
+		ports.add(triggerServerHost + ":" + triggerServerPort);
+		return ports;
 	}
 	
-	public synchronized void removeTrigger(int id) throws TriggerManagerException {
-		Trigger t = triggerIdMap.get(id);
-		if(t != null) {
-			removeTrigger(triggerIdMap.get(id));
+	private void loadTriggers() throws TriggerManagerException {
+		List<Trigger> triggerList = triggerLoader.loadTriggers();
+		for(Trigger t : triggerList) {
+			triggerIdMap.put(t.getTriggerId(), t);
 		}
 	}
 	
-	//TODO: update corresponding agents
-	public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
-		if(!triggerIdMap.containsKey(t.getTriggerId())) {
-			throw new TriggerManagerException("The trigger to update doesn't exist!");
+	public Trigger getTrigger(int triggerId) {
+		return triggerIdMap.get(triggerId);
+	}
+	
+	public void removeTrigger(Trigger t, String userId) throws TriggerManagerException {
+		synchronized(t) {
+			callTriggerServer(t, TriggerConnectorParams.REMOVE_TRIGGER_ACTION, userId);
 		}
-		
-		scannerThread.deleteTrigger(t);
-		scannerThread.addTrigger(t);
-		triggerIdMap.put(t.getTriggerId(), t);
-		
-		triggerLoader.updateTrigger(t);
 	}
+	
 
-	//TODO: update corresponding agents
-	public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
-		t.stopCheckers();
-		triggerLoader.removeTrigger(t);
-		scannerThread.deleteTrigger(t);
-		triggerIdMap.remove(t.getTriggerId());		
+	public void updateTrigger(Trigger t, String userId) throws TriggerManagerException {
+		synchronized(t) {
+			try {
+				callTriggerServer(t, TriggerConnectorParams.UPDATE_TRIGGER_ACTION, userId);
+			} catch(TriggerManagerException e) {
+				throw new TriggerManagerException(e);
+			}
+		}
 	}
 	
-	public List<Trigger> getTriggers() {
-		return new ArrayList<Trigger>(triggerIdMap.values());
+//	public void getUpdatedTriggers() throws TriggerManagerException {
+//		try {
+//			callTriggerServer(triggerServerHost, triggerServerPort, TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", (Pair<String,String>[])null);
+//		} catch(IOException e) {
+//			throw new TriggerManagerException(e);
+//		}
+//	}
+	
+	public String insertTrigger(Trigger t, String userId) throws TriggerManagerException {
+		synchronized(t) {
+			String message = null;
+			logger.info("Inserting trigger into system. " );
+			// The trigger id is set by the loader. So it's unavailable until after this call.
+			triggerLoader.addTrigger(t);
+			try {
+				callTriggerServer(t,  TriggerConnectorParams.INSERT_TRIGGER_ACTION, userId);
+				triggerIdMap.put(t.getTriggerId(), t);
+				
+				message += "Trigger inserted successfully with trigger id " + t.getTriggerId();
+			}
+			catch (TriggerManagerException e) {
+				throw e;
+			}
+			return message;
+		}
 	}
 	
-	public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
-		return checkerLoader.getSupportedCheckers();
+	private Map<String, Object> callTriggerServer(Trigger t, String action, String user) throws TriggerManagerException {
+		try {
+			Map<String, Object> info = t.getInfo();
+			return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), null, (Pair<String,String>[])null);
+		} catch (IOException e) {
+			throw new TriggerManagerException(e);
+		}
 	}
-
-//	private void updateAgent(Trigger t) {
-//		TriggerAgent agent = triggerAgents.get(t.getSource());
-//		if(agent != null) {
-//			agent.updateLocal(t);
-//		}
-//		
-//	}
 	
-	//trigger scanner thread
-	public class TriggerScannerThread extends Thread {
-		
-		//public static final long DEFAULT_SCAN_INTERVAL_MS = 300000;
-		public static final long DEFAULT_SCAN_INTERVAL_MS = 60000;
+	private Map<String, Object> callTriggerServer(String host, int port, String action, Integer triggerId, String user, Pair<String,String> ... params) throws IOException {
+		URIBuilder builder = new URIBuilder();
+		builder.setScheme("http")
+			.setHost(host)
+			.setPort(port)
+			.setPath("/trigger");
+
+		builder.setParameter(TriggerConnectorParams.ACTION_PARAM, action);
 		
-		private final BlockingQueue<Trigger> triggers;
-		private AtomicBoolean stillAlive = new AtomicBoolean(true);
-		private long lastCheckTime = -1;
-		private final long scanInterval;
+		if (triggerId != null) {
+			builder.setParameter(TriggerConnectorParams.TRIGGER_ID_PARAM,String.valueOf(triggerId));
+		}
 		
-		// Five minute minimum intervals
+		if (user != null) {
+			builder.setParameter(TriggerConnectorParams.USER_PARAM, user);
+		}
 		
-		public TriggerScannerThread(){
-			triggers = new LinkedBlockingDeque<Trigger>();
-			this.scanInterval = DEFAULT_SCAN_INTERVAL_MS;
+		if (params != null) {
+			for (Pair<String, String> pair: params) {
+				builder.setParameter(pair.getFirst(), pair.getSecond());
+			}
 		}
 
-		public TriggerScannerThread(long interval){
-			triggers = new LinkedBlockingDeque<Trigger>();
-			this.scanInterval = interval;
+		URI uri = null;
+		try {
+			uri = builder.build();
+		} catch (URISyntaxException e) {
+			throw new IOException(e);
 		}
 		
-		public void shutdown() {
-			logger.error("Shutting down trigger manager thread " + this.getName());
-			stillAlive.set(false);
-			this.interrupt();
+		ResponseHandler<String> responseHandler = new BasicResponseHandler();
+		
+		HttpClient httpclient = new DefaultHttpClient();
+		HttpGet httpget = new HttpGet(uri);
+		String response = null;
+		try {
+			response = httpclient.execute(httpget, responseHandler);
+		} catch (IOException e) {
+			throw e;
+		}
+		finally {
+			httpclient.getConnectionManager().shutdown();
 		}
 		
-		public synchronized List<Trigger> getTriggers() {
-			return new ArrayList<Trigger>(triggers);
+		@SuppressWarnings("unchecked")
+		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+		String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
+		if (error != null) {
+			throw new IOException(error);
 		}
 		
-		public synchronized void addTrigger(Trigger t) {
-			triggers.add(t);
+		return jsonResponse;
+	}
+	
+	public Map<String, Object> callTriggerServerJMX(String hostPort, String action, String mBean) throws IOException {
+		URIBuilder builder = new URIBuilder();
+		
+		String[] hostPortSplit = hostPort.split(":");
+		builder.setScheme("http")
+			.setHost(hostPortSplit[0])
+			.setPort(Integer.parseInt(hostPortSplit[1]))
+			.setPath("/jmx");
+
+		builder.setParameter(action, "");
+		if (mBean != null) {
+			builder.setParameter(TriggerConnectorParams.JMX_MBEAN, mBean);
+		}
+
+		URI uri = null;
+		try {
+			uri = builder.build();
+		} catch (URISyntaxException e) {
+			throw new IOException(e);
+		}
+		
+		ResponseHandler<String> responseHandler = new BasicResponseHandler();
+		
+		HttpClient httpclient = new DefaultHttpClient();
+		HttpGet httpget = new HttpGet(uri);
+		String response = null;
+		try {
+			response = httpclient.execute(httpget, responseHandler);
+		} catch (IOException e) {
+			throw e;
+		}
+		finally {
+			httpclient.getConnectionManager().shutdown();
+		}
+		
+		@SuppressWarnings("unchecked")
+		Map<String, Object> jsonResponse = (Map<String, Object>)JSONUtils.parseJSONFromString(response);
+		String error = (String)jsonResponse.get(TriggerConnectorParams.RESPONSE_ERROR);
+		if (error != null) {
+			throw new IOException(error);
+		}
+		return jsonResponse;
+	}
+	
+	public void shutdown() {
+		triggerManagingThread.shutdown();
+	}
+	
+	private class TriggerManagerUpdaterThread extends Thread {
+		private boolean shutdown = false;
+
+		public TriggerManagerUpdaterThread() {
+			this.setName("TriggerManagingThread");
 		}
+
+		private int waitTimeIdleMs = 2000;
+		private int waitTimeMs = 500;
 		
-		public synchronized void deleteTrigger(Trigger t) {
-			triggers.remove(t);
+		private void shutdown() {
+			shutdown = true;
 		}
 		
+		@SuppressWarnings("unchecked")
 		public void run() {
-			while(stillAlive.get()) {
-				synchronized (this) {
+			while(!shutdown) {
+				try {
+					lastThreadCheckTime = System.currentTimeMillis();
+					
+					Pair<String, Integer> triggerServer = (Pair<String, Integer>) triggerServers.toArray()[0];
+					
+					Pair<String, String> updateTimeParam = new Pair<String, String>("lastUpdateTime", String.valueOf(lastUpdateTime));
+					Map<String, Object> results = null;
 					try{
-						lastCheckTime = System.currentTimeMillis();
-						
-						try{
-							checkAllTriggers();
-						} catch(Exception e) {
-							e.printStackTrace();
-							logger.error(e.getMessage());
-						} catch(Throwable t) {
-							t.printStackTrace();
-							logger.error(t.getMessage());
+						results = callTriggerServer(triggerServer.getFirst(), triggerServer.getSecond(), TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", updateTimeParam);
+//						lastUpdateTime = (Long) results.get(TriggerConnectorParams.RESPONSE_UPDATETIME);
+						List<Integer> updates = (List<Integer>) results.get("updates");
+						for(Integer update : updates) {
+							Trigger t = triggerLoader.loadTrigger(update);
+							lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
+							triggerIdMap.put(update, t);
 						}
+					} catch (Exception e) {
+						logger.error(e);
 						
-						long timeRemaining = scanInterval - (System.currentTimeMillis() - lastCheckTime);
-						if(timeRemaining < 0) {
-							logger.error("Trigger manager thread " + this.getName() + " is too busy!");
-						} else {
-							wait(timeRemaining);
-						}
-					} catch(InterruptedException e) {
-						logger.info("Interrupted. Probably to shut down.");
 					}
 					
+					synchronized(this) {
+						try {
+							if (triggerIdMap.size() > 0) {
+								this.wait(waitTimeMs);
+							}
+							else {
+								this.wait(waitTimeIdleMs);
+							}
+						} catch (InterruptedException e) {
+						}
+					}
+				}
+				catch (Exception e) {
+					logger.error(e);
 				}
 			}
 		}
+	}
+	
+	private static class ConnectionInfo {
+		private String host;
+		private int port;
+
+		public ConnectionInfo(String host, int port) {
+			this.host = host;
+			this.port = port;
+		}
+
+		@SuppressWarnings("unused")
+		private ConnectionInfo getOuterType() {
+			return ConnectionInfo.this;
+		}
 		
-		private void checkAllTriggers() throws TriggerManagerException {
-			for(Trigger t : triggers) {
-				if(t.getStatus().equals(TriggerStatus.READY)) {
-					if(t.triggerConditionMet()) {
-						onTriggerTrigger(t);
-					} else if (t.expireConditionMet()) {
-						onTriggerExpire(t);
-					}
-				}
-			}
+		public boolean isEqual(String host, int port) {
+			return this.port == port && this.host.equals(host);
 		}
 		
-		private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
-			List<TriggerAction> actions = t.getTriggerActions();
-			for(TriggerAction action : actions) {
-				try {
-					action.doAction();
-				} catch (Exception e) {
-					// TODO Auto-generated catch block
-					throw new TriggerManagerException("action failed to execute", e);
-				}
-			}
-			if(t.isResetOnTrigger()) {
-				t.resetTriggerConditions();
-				t.resetExpireCondition();
-				updateTrigger(t);
-			} else {
-				t.setStatus(TriggerStatus.EXPIRED);
-			}
-//			updateAgent(t);
+		public String getHost() {
+			return host;
+		}
+		
+		public int getPort() {
+			return port;
 		}
 		
-		private void onTriggerExpire(Trigger t) throws TriggerManagerException {
-			if(t.isResetOnExpire()) {
-				t.resetTriggerConditions();
-				t.resetExpireCondition();
-				updateTrigger(t);
+		@Override
+		public int hashCode() {
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + ((host == null) ? 0 : host.hashCode());
+			result = prime * result + port;
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			ConnectionInfo other = (ConnectionInfo) obj;
+			if (host == null) {
+				if (other.host != null)
+					return false;
+			} else if (!host.equals(other.host))
+				return false;
+			if (port != other.port)
+				return false;
+			return true;
+		}
+	}
+
+	public void loadTriggerFromDir(File baseDir, Props props) throws Exception {
+		File[] triggerFiles = baseDir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
+
+		for(File triggerFile : triggerFiles) {
+			Props triggerProps = new Props(props, triggerFile);
+			String triggerType = triggerProps.getString("trigger.type");
+			TriggerAgent agent = triggerAgents.get(triggerType);
+			if(agent != null) {
+				agent.loadTriggerFromProps(triggerProps);
 			} else {
-				t.setStatus(TriggerStatus.EXPIRED);
+				throw new Exception("Trigger " + triggerType + " is not supported.");
 			}
-//			updateAgent(t);
 		}
 	}
 
-	public synchronized Trigger getTrigger(int triggerId) {
-		return triggerIdMap.get(triggerId);
+	public List<Trigger> getTriggers() {
+		return new ArrayList<Trigger>(triggerIdMap.values());
 	}
 
 	public void expireTrigger(int triggerId) {
-		Trigger t = getTrigger(triggerId);
-		t.setStatus(TriggerStatus.EXPIRED);
-//		updateAgent(t);
+		// TODO Auto-generated method stub
+		
+	}
+
+	public CheckerTypeLoader getCheckerLoader() {
+		return checkerTypeLoader;
+	}
+
+	public ActionTypeLoader getActionLoader() {
+		return actionTypeLoader;
+	}
+
+	public void addTriggerAgent(String triggerSource,
+			TriggerAgent agent) {
+		triggerAgents.put(triggerSource, agent);
 	}
 
 	public List<Trigger> getTriggers(String triggerSource) {
-		List<Trigger> triggers = new ArrayList<Trigger>();
+		List<Trigger> results = new ArrayList<Trigger>();
 		for(Trigger t : triggerIdMap.values()) {
 			if(t.getSource().equals(triggerSource)) {
-				triggers.add(t);
+				results.add(t);
 			}
 		}
-		return triggers;
+		return results;
 	}
 
-	public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) {
+	public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) throws TriggerManagerException {
+		getUpdatedTriggers();
 		List<Trigger> triggers = new ArrayList<Trigger>();
 		for(Trigger t : triggerIdMap.values()) {
-			if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() > lastUpdateTime) {
+			if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() >= lastUpdateTime) {
 				triggers.add(t);
 			}
 		}
 		return triggers;
 	}
 
+	private void getUpdatedTriggers() throws TriggerManagerException {
+		List<Trigger> triggers = triggerLoader.getUpdatedTriggers(this.lastUpdateTime);
+		for(Trigger t : triggers) {
+			this.lastUpdateTime = Math.max(this.lastUpdateTime, t.getLastModifyTime().getMillis());
+			triggerIdMap.put(t.getTriggerId(), t);
+		}
+	}
+
+	public void removeTrigger(int scheduleId, String submitUser) throws TriggerManagerException {
+		removeTrigger(triggerIdMap.get(scheduleId), submitUser);
+	}
+
+	
 }
+
diff --git a/src/java/azkaban/trigger/TriggerManagerException.java b/src/java/azkaban/trigger/TriggerManagerException.java
index 5d30b39..c12a0e1 100644
--- a/src/java/azkaban/trigger/TriggerManagerException.java
+++ b/src/java/azkaban/trigger/TriggerManagerException.java
@@ -27,5 +27,9 @@ public class TriggerManagerException extends Exception{
 	public TriggerManagerException(String message, Throwable cause) {
 		super(message, cause);
 	}
+	
+	public TriggerManagerException(Throwable e) {
+		super(e);
+	}
 }
 
diff --git a/src/java/azkaban/trigger/TriggerStatus.java b/src/java/azkaban/trigger/TriggerStatus.java
index 3fcadf7..8d397bc 100644
--- a/src/java/azkaban/trigger/TriggerStatus.java
+++ b/src/java/azkaban/trigger/TriggerStatus.java
@@ -1,7 +1,7 @@
 package azkaban.trigger;
 
 public enum TriggerStatus {
-	READY(10), PAUSED(20), EXPIRED(30);
+	READY(10), PAUSED(20), EXPIRED(30), PREPARING(40);
 	
 	private int numVal;
 
@@ -21,6 +21,8 @@ public enum TriggerStatus {
 			return PAUSED;
 		case 30:
 			return EXPIRED;
+		case 40:
+			return PREPARING;
 		default:
 			return READY;
 		}
diff --git a/src/java/azkaban/triggerapp/AzkabanTriggerServer.java b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
new file mode 100644
index 0000000..c84198f
--- /dev/null
+++ b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
@@ -0,0 +1,481 @@
+package azkaban.triggerapp;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
+
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.bio.SocketConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.thread.QueuedThreadPool;
+
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.JdbcExecutorLoader;
+import azkaban.jmx.JmxJettyServer;
+import azkaban.jmx.JmxTriggerRunnerManager;
+import azkaban.project.JdbcProjectLoader;
+import azkaban.project.ProjectManager;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.JdbcTriggerLoader;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.trigger.builtin.ExecutableFlowStatusChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.utils.FileIOUtils;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanServer;
+import azkaban.webapp.servlet.AzkabanServletContextListener;
+
+public class AzkabanTriggerServer {
+	private static final Logger logger = Logger.getLogger(AzkabanTriggerServer.class);
+	private static final int MAX_FORM_CONTENT_SIZE = 10*1024*1024;
+
+	public static final String AZKABAN_HOME = "AZKABAN_HOME";
+	public static final String DEFAULT_CONF_PATH = "conf";
+	public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
+	public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
+	public static final String TRIGGER_PLUGIN_DIR = "trigger.plugin.dir";
+	public static final int DEFAULT_PORT_NUMBER = 22321;
+	public static final int DEFAULT_THREAD_NUMBER = 50;
+	
+	private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
+
+	private static AzkabanTriggerServer app;
+	
+	private TriggerLoader triggerLoader;
+	private TriggerRunnerManager triggerRunnerManager;
+	private ExecutorManager executorManager;
+	private ProjectManager projectManager;
+	private Props props;
+	private Server server;
+	
+	private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
+	private MBeanServer mbeanServer;
+
+	/**
+	 * Constructor
+	 * 
+	 * @throws Exception
+	 */
+	public AzkabanTriggerServer(Props props) throws Exception {
+		this.props = props;
+
+		int portNumber = props.getInt("trigger.server.port", DEFAULT_PORT_NUMBER);
+		int maxThreads = props.getInt("trigger.server.maxThreads", DEFAULT_THREAD_NUMBER);
+
+		String hostname = props.getString("jetty.hostname", "localhost");
+		props.put("server.hostname", hostname);
+		props.put("server.port", portNumber);
+		props.put("server.useSSL", String.valueOf(props.getBoolean("jetty.use.ssl", true)));
+		
+		server = new Server(portNumber);
+		QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
+		server.setThreadPool(httpThreadPool);
+
+		Context root = new Context(server, "/", Context.SESSIONS);
+		root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
+		
+		root.addServlet(new ServletHolder(new TriggerServerServlet()), "/trigger");
+		root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
+		root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, this);
+		
+		triggerLoader = createTriggerLoader(props);
+		projectManager = loadProjectManager(props);
+		executorManager = loadExecutorManager(props);
+		triggerRunnerManager = loadTriggerRunnerManager(props, triggerLoader);
+		
+		String triggerPluginDir = props.getString("trigger.plugin.dir", "plugins/triggers");
+		loadBuiltinCheckersAndActions(this);
+		loadPluginCheckersAndActions(triggerPluginDir, this);
+		
+		configureMBeanServer();
+		
+		try {
+			triggerRunnerManager.start();
+			server.start();
+		} 
+		catch (Exception e) {
+			logger.warn(e);
+			Utils.croak(e.getMessage(), 1);
+		}
+		
+		logger.info("Azkaban Trigger Server started on port " + portNumber);
+	}
+
+	
+	
+	
+	private TriggerRunnerManager loadTriggerRunnerManager(Props props, TriggerLoader triggerLoader) throws IOException {
+		logger.info("Loading trigger runner manager");
+		TriggerRunnerManager trm = new TriggerRunnerManager(props, triggerLoader);
+		trm.init();
+		return trm;
+	}
+	
+	private ExecutorManager loadExecutorManager(Props props) throws Exception {
+		logger.info("Loading executor manager");
+		JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
+		ExecutorManager execManager = new ExecutorManager(props, loader, false);
+		return execManager;
+	}
+	
+	private ProjectManager loadProjectManager(Props props) {
+		logger.info("Loading project manager");
+		JdbcProjectLoader loader = new JdbcProjectLoader(props);
+		ProjectManager manager = new ProjectManager(loader, props);
+		
+		return manager;
+	}
+	
+	private void loadBuiltinCheckersAndActions(AzkabanTriggerServer app) {
+		logger.info("Loading built-in checker and action types");
+		ExecutorManager executorManager = app.getExecutorManager();
+		TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
+		CheckerTypeLoader checkerLoader = triggerRunnerManager.getCheckerLoader();
+		ActionTypeLoader actionLoader = triggerRunnerManager.getActionLoader();
+		// time:
+		checkerLoader.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
+		ExecutableFlowStatusChecker.setExecutorManager(executorManager);
+		checkerLoader.registerCheckerType(ExecutableFlowStatusChecker.type, ExecutableFlowStatusChecker.class);
+		
+		ExecuteFlowAction.setExecutorManager(executorManager);
+		ExecuteFlowAction.setProjectManager(projectManager);
+		actionLoader.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+		CreateTriggerAction.setTriggerRunnerManager(triggerRunnerManager);
+		actionLoader.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+	}
+	
+	private void loadPluginCheckersAndActions(String pluginPath, AzkabanTriggerServer app) {
+		logger.info("Loading plug-in checker and action types");
+		File triggerPluginPath = new File(pluginPath);
+		if (!triggerPluginPath.exists()) {
+			logger.error("plugin path " + pluginPath + " doesn't exist!");
+			return;
+		}
+			
+		ClassLoader parentLoader = AzkabanTriggerServer.class.getClassLoader();
+		File[] pluginDirs = triggerPluginPath.listFiles();
+		ArrayList<String> jarPaths = new ArrayList<String>();
+		for (File pluginDir: pluginDirs) {
+			if (!pluginDir.exists()) {
+				logger.error("Error! Trigger plugin path " + pluginDir.getPath() + " doesn't exist.");
+				continue;
+			}
+			
+			if (!pluginDir.isDirectory()) {
+				logger.error("The plugin path " + pluginDir + " is not a directory.");
+				continue;
+			}
+			
+			// Load the conf directory
+			File propertiesDir = new File(pluginDir, "conf");
+			Props pluginProps = null;
+			if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+				File propertiesFile = new File(propertiesDir, "plugin.properties");
+				File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+				
+				if (propertiesFile.exists()) {
+					if (propertiesOverrideFile.exists()) {
+						pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+					}
+					else {
+						pluginProps = PropsUtils.loadProps(null, propertiesFile);
+					}
+				}
+				else {
+					logger.error("Plugin conf file " + propertiesFile + " not found.");
+					continue;
+				}
+			}
+			else {
+				logger.error("Plugin conf path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			List<String> extLibClasspath = pluginProps.getStringList("trigger.external.classpaths", (List<String>)null);
+			
+			String pluginClass = pluginProps.getString("trigger.class");
+			if (pluginClass == null) {
+				logger.error("Trigger class is not set.");
+			}
+			else {
+				logger.error("Plugin class " + pluginClass);
+			}
+			
+			URLClassLoader urlClassLoader = null;
+			File libDir = new File(pluginDir, "lib");
+			if (libDir.exists() && libDir.isDirectory()) {
+				File[] files = libDir.listFiles();
+				
+				ArrayList<URL> urls = new ArrayList<URL>();
+				for (int i=0; i < files.length; ++i) {
+					try {
+						URL url = files[i].toURI().toURL();
+						urls.add(url);
+					} catch (MalformedURLException e) {
+						logger.error(e);
+					}
+				}
+				if (extLibClasspath != null) {
+					for (String extLib : extLibClasspath) {
+						try {
+							File file = new File(pluginDir, extLib);
+							URL url = file.toURI().toURL();
+							urls.add(url);
+						} catch (MalformedURLException e) {
+							logger.error(e);
+						}
+					}
+				}
+				
+				urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+			}
+			else {
+				logger.error("Library path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			Class<?> triggerClass = null;
+			try {
+				triggerClass = urlClassLoader.loadClass(pluginClass);
+			}
+			catch (ClassNotFoundException e) {
+				logger.error("Class " + pluginClass + " not found.");
+				continue;
+			}
+
+			String source = FileIOUtils.getSourcePathFromClass(triggerClass);
+			logger.info("Source jar " + source);
+			jarPaths.add("jar:file:" + source);
+			
+//			Constructor<?> constructor = null;
+//			try {
+//				constructor = triggerClass.getConstructor(String.class, Props.class, Context.class, AzkabanTriggerServer.class);
+//			} catch (NoSuchMethodException e) {
+//				logger.error("Constructor not found in " + pluginClass);
+//				continue;
+//			}
+			try {
+				Utils.invokeStaticMethod(urlClassLoader, pluginClass, "initiateCheckerTypes", pluginProps, app);
+			} catch (Exception e) {
+				logger.error("Unable to initiate checker types for " + pluginClass);
+				continue;
+			}
+			
+			try {
+				Utils.invokeStaticMethod(urlClassLoader, pluginClass, "initiateActionTypes", pluginProps, app);
+			} catch (Exception e) {
+				logger.error("Unable to initiate action types for " + pluginClass);
+				continue;
+			}
+			
+		}
+	}
+	
+	private TriggerLoader createTriggerLoader(Props props) {
+		return new JdbcTriggerLoader(props);
+	}
+
+	public void stopServer() throws Exception {
+		server.stop();
+		server.destroy();
+	}
+	
+	/**
+	 * Returns the global azkaban properties
+	 * 
+	 * @return
+	 */
+	public Props getAzkabanProps() {
+		return props;
+	}
+	
+	/**
+	 * Azkaban using Jetty
+	 * 
+	 * @param args
+	 * @throws IOException
+	 */
+	public static void main(String[] args) throws Exception {
+		logger.error("Starting Jetty Azkaban Trigger Server...");
+		Props azkabanSettings = AzkabanServer.loadProps(args);
+
+		if (azkabanSettings == null) {
+			logger.error("Azkaban Properties not loaded.");
+			logger.error("Exiting Azkaban Trigger Server...");
+			return;
+		}
+
+		// Setup time zone
+		if (azkabanSettings.containsKey(DEFAULT_TIMEZONE_ID)) {
+			String timezone = azkabanSettings.getString(DEFAULT_TIMEZONE_ID);
+			TimeZone.setDefault(TimeZone.getTimeZone(timezone));
+			DateTimeZone.setDefault(DateTimeZone.forID(timezone));
+
+			logger.info("Setting timezone to " + timezone);
+		}
+
+		app = new AzkabanTriggerServer(azkabanSettings);
+		
+		Runtime.getRuntime().addShutdownHook(new Thread() {
+
+			public void run() {
+				logger.info("Shutting down http server...");
+				try {
+					app.stopServer();
+				} catch (Exception e) {
+					logger.error("Error while shutting down http server.", e);
+				}
+				logger.info("kk thx bye.");
+			}
+		});
+	}
+
+	/**
+	 * Loads the Azkaban property file from the AZKABAN_HOME conf directory
+	 * 
+	 * @return
+	 */
+	/*package*/ static Props loadConfigurationFromAzkabanHome() {
+		String azkabanHome = System.getenv("AZKABAN_HOME");
+
+		if (azkabanHome == null) {
+			logger.error("AZKABAN_HOME not set. Will try default.");
+			return null;
+		}
+
+		if (!new File(azkabanHome).isDirectory()
+				|| !new File(azkabanHome).canRead()) {
+			logger.error(azkabanHome + " is not a readable directory.");
+			return null;
+		}
+
+		File confPath = new File(azkabanHome, DEFAULT_CONF_PATH);
+		if (!confPath.exists() || !confPath.isDirectory()
+				|| !confPath.canRead()) {
+			logger.error(azkabanHome
+					+ " does not contain a readable conf directory.");
+			return null;
+		}
+
+		return loadAzkabanConfigurationFromDirectory(confPath);
+	}
+
+	/**
+	 * Loads the Azkaban conf file int a Props object
+	 * 
+	 * @param path
+	 * @return
+	 */
+	private static Props loadAzkabanConfigurationFromDirectory(File dir) {
+		File azkabanPrivatePropsFile = new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);
+		File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);
+		
+		Props props = null;
+		try {
+			// This is purely optional
+			if (azkabanPrivatePropsFile.exists() && azkabanPrivatePropsFile.isFile()) {
+				logger.info("Loading azkaban private properties file" );
+				props = new Props(null, azkabanPrivatePropsFile);
+			}
+
+			if (azkabanPropsFile.exists() && azkabanPropsFile.isFile()) {
+				logger.info("Loading azkaban properties file" );
+				props = new Props(props, azkabanPropsFile);
+			}
+		} catch (FileNotFoundException e) {
+			logger.error("File not found. Could not load azkaban config file", e);
+		} catch (IOException e) {
+			logger.error("File found, but error reading. Could not load azkaban config file", e);
+		}
+		
+		return props;
+	}
+
+	private void configureMBeanServer() {
+		logger.info("Registering MBeans...");
+		mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+		registerMbean("triggerServerJetty", new JmxJettyServer(server));
+		registerMbean("triggerRunnerManager", new JmxTriggerRunnerManager(triggerRunnerManager));
+	}
+	
+	public void close() {
+		try {
+			for (ObjectName name : registeredMBeans) {
+				mbeanServer.unregisterMBean(name);
+				logger.info("Jmx MBean " + name.getCanonicalName() + " unregistered.");
+			}
+		} catch (Exception e) {
+			logger.error("Failed to cleanup MBeanServer", e);
+		}
+	}
+	
+	private void registerMbean(String name, Object mbean) {
+		Class<?> mbeanClass = mbean.getClass();
+		ObjectName mbeanName;
+		try {
+			mbeanName = new ObjectName(mbeanClass.getName() + ":name=" + name);
+			mbeanServer.registerMBean(mbean, mbeanName);
+			logger.info("Bean " + mbeanClass.getCanonicalName() + " registered.");
+			registeredMBeans.add(mbeanName);
+		} catch (Exception e) {
+			logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
+		}
+
+	}
+	
+	public List<ObjectName> getMbeanNames() {
+		return registeredMBeans;
+	}
+	
+	public MBeanInfo getMBeanInfo(ObjectName name) {
+		try {
+			return mbeanServer.getMBeanInfo(name);
+		} catch (Exception e) {
+			logger.error(e);
+			return null;
+		}
+	}
+	
+	public Object getMBeanAttribute(ObjectName name, String attribute) {
+		 try {
+			return mbeanServer.getAttribute(name, attribute);
+		} catch (Exception e) {
+			logger.error(e);
+			return null;
+		}
+	}
+
+	public TriggerRunnerManager getTriggerRunnerManager() {
+		return triggerRunnerManager;
+	}
+	
+	public ExecutorManager getExecutorManager() {
+		return executorManager;
+	}
+	
+	public ProjectManager getProjectManager() {
+		return projectManager;
+	}
+
+}
diff --git a/src/java/azkaban/triggerapp/JMXHttpServlet.java b/src/java/azkaban/triggerapp/JMXHttpServlet.java
new file mode 100644
index 0000000..ac7ae41
--- /dev/null
+++ b/src/java/azkaban/triggerapp/JMXHttpServlet.java
@@ -0,0 +1,72 @@
+package azkaban.triggerapp;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.JSONUtils;
+import azkaban.webapp.servlet.AzkabanServletContextListener;
+import azkaban.webapp.servlet.HttpRequestUtils;
+
+public class JMXHttpServlet extends HttpServlet implements TriggerConnectorParams {
+	private static final long serialVersionUID = -3085603824826446270L;
+	private static final Logger logger = Logger.getLogger(JMXHttpServlet.class);
+	private AzkabanTriggerServer server;
+	
+	public void init(ServletConfig config) throws ServletException {
+		server = (AzkabanTriggerServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+	}
+	
+	public boolean hasParam(HttpServletRequest request, String param) {
+		return HttpRequestUtils.hasParam(request, param);
+	}
+	
+	public String getParam(HttpServletRequest request, String name) throws ServletException {
+		return HttpRequestUtils.getParam(request, name);
+	}
+	
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		HashMap<String,Object> ret = new HashMap<String,Object>();
+
+		if (hasParam(req, JMX_GET_MBEANS)) {
+			ret.put("mbeans", server.getMbeanNames());
+		}
+		else if (hasParam(req, JMX_GET_ALL_MBEAN_ATTRIBUTES)) {
+			if (!hasParam(req, JMX_MBEAN)) {
+				ret.put("error", "Parameters 'mbean' must be set");
+			}
+			else {
+				String mbeanName = getParam(req, JMX_MBEAN);
+				try {
+					ObjectName name = new ObjectName(mbeanName);
+					MBeanInfo info = server.getMBeanInfo(name);
+					
+					MBeanAttributeInfo[] mbeanAttrs = info.getAttributes();
+					HashMap<String, Object> attributes = new HashMap<String,Object>();
+
+					for (MBeanAttributeInfo attrInfo: mbeanAttrs) {
+						Object obj = server.getMBeanAttribute(name, attrInfo.getName());
+						attributes.put(attrInfo.getName(), obj);
+					}
+					
+					ret.put("attributes", attributes);
+				} catch (Exception e) {
+					logger.error(e);
+					ret.put("error", "'" + mbeanName + "' is not a valid mBean name");
+				}
+			}
+		}
+
+		JSONUtils.toJSON(ret, resp.getOutputStream(), true);
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/triggerapp/TriggerConnectorParams.java b/src/java/azkaban/triggerapp/TriggerConnectorParams.java
new file mode 100644
index 0000000..f7901bd
--- /dev/null
+++ b/src/java/azkaban/triggerapp/TriggerConnectorParams.java
@@ -0,0 +1,33 @@
+package azkaban.triggerapp;
+
+public interface TriggerConnectorParams {
+	public static final String ACTION_PARAM = "action";
+	public static final String TRIGGER_ID_PARAM = "triggerid";
+	public static final String USER_PARAM = "user";
+	
+	public static final String PING_ACTION = "ping";
+	
+	public static final String INSERT_TRIGGER_ACTION = "insert";
+	public static final String REMOVE_TRIGGER_ACTION = "remove";
+	public static final String UPDATE_TRIGGER_ACTION = "update";
+	public static final String GET_UPDATE_ACTION = "getupdate";
+	
+	public static final String RESPONSE_NOTFOUND = "notfound";
+	public static final String RESPONSE_ERROR = "error";
+	public static final String RESPONSE_SUCCESS = "success";
+	public static final String RESPONSE_ALIVE = "alive";
+	public static final String RESPONSE_UPDATETIME = "lasttime";
+	public static final String RESPONSE_UPDATED_TRIGGERS = "updated";
+	
+	public static final String UPDATE_TIME_LIST_PARAM = "updatetime";
+	
+	public static final String JMX_GET_MBEANS = "getMBeans";
+	public static final String JMX_GET_MBEAN_INFO = "getMBeanInfo";
+	public static final String JMX_GET_MBEAN_ATTRIBUTE = "getAttribute";
+	public static final String JMX_GET_ALL_MBEAN_ATTRIBUTES = "getAllMBeanAttributes";
+	public static final String JMX_ATTRIBUTE = "attribute";
+	public static final String JMX_MBEAN = "mBean";
+	
+	public static final String JMX_GET_ALL_TRIGGER_SERVER_ATTRIBUTES = "getAllTriggerServerAttributes";
+	public static final String JMX_HOSTPORT = "hostPort";
+}
diff --git a/src/java/azkaban/triggerapp/TriggerRunnerManager.java b/src/java/azkaban/triggerapp/TriggerRunnerManager.java
new file mode 100644
index 0000000..b4e5fe7
--- /dev/null
+++ b/src/java/azkaban/triggerapp/TriggerRunnerManager.java
@@ -0,0 +1,341 @@
+package azkaban.triggerapp;
+
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.TriggerStatus;
+import azkaban.utils.Props;
+
+public class TriggerRunnerManager {
+	private static Logger logger = Logger.getLogger(TriggerRunnerManager.class);
+	private static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
+
+	private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
+	
+	private CheckerTypeLoader checkerTypeLoader;
+	private ActionTypeLoader actionTypeLoader;
+	private TriggerLoader triggerLoader;
+	
+	private Props globalProps;
+	
+	private final Props azkabanProps;
+	
+	private final TriggerScannerThread runnerThread;
+	private long lastRunnerThreadCheckTime = -1;
+	
+			
+	public TriggerRunnerManager(Props props, TriggerLoader triggerLoader) throws IOException {
+		
+		azkabanProps = props;
+
+		this.triggerLoader = triggerLoader;
+		
+		long scannerInterval = props.getLong("trigger.scan.interval", DEFAULT_SCANNER_INTERVAL_MS);
+		runnerThread = new TriggerScannerThread(scannerInterval);
+
+		checkerTypeLoader = new CheckerTypeLoader();
+		actionTypeLoader = new ActionTypeLoader();
+		
+	}
+
+	public void init() {
+		try{
+			checkerTypeLoader.init(azkabanProps);
+			actionTypeLoader.init(azkabanProps);
+		} catch(Exception e) {
+			e.printStackTrace();
+			logger.error(e.getMessage());
+		}
+		
+		Condition.setCheckerLoader(checkerTypeLoader);
+		Trigger.setActionTypeLoader(actionTypeLoader);
+
+	}
+	
+	public void start() {
+		
+		try{
+			// expect loader to return valid triggers
+			List<Trigger> triggers = triggerLoader.loadTriggers();
+			for(Trigger t : triggers) {
+				runnerThread.addTrigger(t);
+				triggerIdMap.put(t.getTriggerId(), t);
+			}
+		}catch(Exception e) {
+			e.printStackTrace();
+			logger.error(e.getMessage());
+		}
+		
+		runnerThread.start();
+	}
+
+	public Props getGlobalProps() {
+		return globalProps;
+	}
+	
+	public void setGlobalProps(Props globalProps) {
+		this.globalProps = globalProps;
+	}
+	
+	public CheckerTypeLoader getCheckerLoader() {
+		return checkerTypeLoader;
+	}
+
+	public ActionTypeLoader getActionLoader() {
+		return actionTypeLoader;
+	}
+
+	public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
+		
+		triggerLoader.addTrigger(t);
+		triggerIdMap.put(t.getTriggerId(), t);
+		runnerThread.addTrigger(t);
+	}
+	
+	public synchronized void removeTrigger(int id) throws TriggerManagerException {
+		Trigger t = triggerIdMap.get(id);
+		if(t != null) {
+			removeTrigger(triggerIdMap.get(id));
+		}
+	}
+	
+	public synchronized void updateTrigger(int triggerId) throws TriggerManagerException {
+		Trigger t = triggerIdMap.get(triggerId);
+		if(t == null) {
+			throw new TriggerManagerException("The trigger to update doesn't exist!");
+		}
+		
+		runnerThread.deleteTrigger(t);
+		runnerThread.addTrigger(t);
+		triggerIdMap.put(t.getTriggerId(), t);
+		
+		triggerLoader.updateTrigger(t);
+	}
+	
+	public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
+		if(!triggerIdMap.containsKey(t.getTriggerId())) {
+			throw new TriggerManagerException("The trigger to update doesn't exist!");
+		}
+		
+		runnerThread.deleteTrigger(t);
+		runnerThread.addTrigger(t);
+		triggerIdMap.put(t.getTriggerId(), t);
+		
+		triggerLoader.updateTrigger(t);
+	}
+
+	public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
+		t.stopCheckers();
+		triggerLoader.removeTrigger(t);
+		runnerThread.deleteTrigger(t);
+		triggerIdMap.remove(t.getTriggerId());		
+	}
+	
+	public List<Trigger> getTriggers() {
+		return new ArrayList<Trigger>(triggerIdMap.values());
+	}
+	
+	public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
+		return checkerTypeLoader.getSupportedCheckers();
+	}
+	
+	private class TriggerScannerThread extends Thread {
+		private BlockingQueue<Trigger> triggers;
+		private boolean shutdown = false;
+		//private AtomicBoolean stillAlive = new AtomicBoolean(true);
+		private final long scannerInterval;
+		
+		public TriggerScannerThread(long scannerInterval) {
+			triggers = new LinkedBlockingDeque<Trigger>();
+			this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
+			this.scannerInterval = scannerInterval;;
+		}
+
+		@SuppressWarnings("unused")
+		public void shutdown() {
+			logger.error("Shutting down trigger manager thread " + this.getName());
+			shutdown = true;
+			//stillAlive.set(false);
+			this.interrupt();
+		}
+		
+		public synchronized List<Trigger> getTriggers() {
+			return new ArrayList<Trigger>(triggers);
+		}
+		
+		public synchronized void addTrigger(Trigger t) {
+			triggers.add(t);
+		}
+		
+		public synchronized void deleteTrigger(Trigger t) {
+			triggers.remove(t);
+		}
+
+		public void run() {
+			//while(stillAlive.get()) {
+			while(!shutdown) {
+				synchronized (this) {
+					try{
+						lastRunnerThreadCheckTime = System.currentTimeMillis();
+						
+						try{
+							checkAllTriggers();
+						} catch(Exception e) {
+							e.printStackTrace();
+							logger.error(e.getMessage());
+						} catch(Throwable t) {
+							t.printStackTrace();
+							logger.error(t.getMessage());
+						}
+						
+						long timeRemaining = scannerInterval - (System.currentTimeMillis() - getLastRunnerThreadCheckTime());
+						if(timeRemaining < 0) {
+							logger.error("Trigger manager thread " + this.getName() + " is too busy!");
+						} else {
+							wait(timeRemaining);
+						}
+					} catch(InterruptedException e) {
+						logger.info("Interrupted. Probably to shut down.");
+					}
+					
+				}
+			}
+		}
+		
+		private void checkAllTriggers() throws TriggerManagerException {
+			for(Trigger t : triggers) {
+				if(t.getStatus().equals(TriggerStatus.READY)) {
+					if(t.triggerConditionMet()) {
+						onTriggerTrigger(t);
+					} else if (t.expireConditionMet()) {
+						onTriggerExpire(t);
+					}
+				}
+			}
+		}
+		
+		private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
+			List<TriggerAction> actions = t.getTriggerActions();
+			for(TriggerAction action : actions) {
+				try {
+					action.doAction();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					throw new TriggerManagerException("action failed to execute", e);
+				}
+			}
+			if(t.isResetOnTrigger()) {
+				t.resetTriggerConditions();
+				t.resetExpireCondition();
+//				updateTrigger(t);
+			} else {
+				t.setStatus(TriggerStatus.EXPIRED);
+			}
+			
+			triggerLoader.updateTrigger(t);
+			
+//			updateAgent(t);
+		}
+		
+		private void onTriggerExpire(Trigger t) throws TriggerManagerException {
+			List<TriggerAction> expireActions = t.getExpireActions();
+			for(TriggerAction action : expireActions) {
+				try {
+					action.doAction();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					throw new TriggerManagerException("expire action failed to execute", e);
+				}
+			}
+			if(t.isResetOnExpire()) {
+				t.resetTriggerConditions();
+				t.resetExpireCondition();
+//				updateTrigger(t);
+			} else {
+				t.setStatus(TriggerStatus.EXPIRED);
+			}
+//			updateAgent(t);
+			triggerLoader.updateTrigger(t);
+		}
+	}
+	
+	public synchronized Trigger getTrigger(int triggerId) {
+		return triggerIdMap.get(triggerId);
+	}
+
+	public void expireTrigger(int triggerId) {
+		Trigger t = getTrigger(triggerId);
+		t.setStatus(TriggerStatus.EXPIRED);
+//		updateAgent(t);
+	}
+
+	public List<Trigger> getTriggers(String triggerSource) {
+		List<Trigger> triggers = new ArrayList<Trigger>();
+		for(Trigger t : triggerIdMap.values()) {
+			if(t.getSource().equals(triggerSource)) {
+				triggers.add(t);
+			}
+		}
+		return triggers;
+	}
+
+	public List<Trigger> getUpdatedTriggers(String triggerSource, long lastUpdateTime) {
+		List<Trigger> triggers = new ArrayList<Trigger>();
+		for(Trigger t : triggerIdMap.values()) {
+			if(t.getSource().equals(triggerSource) && t.getLastModifyTime().getMillis() > lastUpdateTime) {
+				triggers.add(t);
+			}
+		}
+		return triggers;
+	}
+	
+	public List<Integer> getUpdatedTriggers(long lastUpdateTime) {
+		List<Integer> triggers = new ArrayList<Integer>();
+		for(Trigger t : triggerIdMap.values()) {
+			if(t.getLastModifyTime().getMillis() > lastUpdateTime) {
+				triggers.add(t.getTriggerId());
+			}
+		}
+		return triggers;
+	}
+
+	public long getLastRunnerThreadCheckTime() {
+		return lastRunnerThreadCheckTime;
+	}
+
+	public boolean isRunnerThreadActive() {
+		return runnerThread.isAlive();
+	}
+
+
+	public State getRunnerThreadState() {
+		return this.runnerThread.getState();
+	}
+
+	public void loadTrigger(int triggerId) throws TriggerManagerException {
+		Trigger t = triggerLoader.loadTrigger(triggerId);
+		if(t.getStatus().equals(TriggerStatus.PREPARING)) {
+			triggerIdMap.put(t.getTriggerId(), t);
+			runnerThread.addTrigger(t);
+			t.setStatus(TriggerStatus.READY);
+		}
+	}
+
+}
diff --git a/src/java/azkaban/triggerapp/TriggerServerServlet.java b/src/java/azkaban/triggerapp/TriggerServerServlet.java
new file mode 100644
index 0000000..68c36b9
--- /dev/null
+++ b/src/java/azkaban/triggerapp/TriggerServerServlet.java
@@ -0,0 +1,208 @@
+package azkaban.triggerapp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.DateTime;
+
+import azkaban.execapp.AzkabanExecutorServer;
+import azkaban.execapp.FlowRunnerManager;
+import azkaban.executor.ConnectorParams;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.FileIOUtils.JobMetaData;
+import azkaban.utils.FileIOUtils.LogData;
+import azkaban.webapp.servlet.AzkabanServletContextListener;
+
+public class TriggerServerServlet extends HttpServlet implements TriggerConnectorParams {
+	private static final long serialVersionUID = 1L;
+	private static final Logger logger = Logger.getLogger(TriggerServerServlet.class.getName());
+	public static final String JSON_MIME_TYPE = "application/json";
+
+	private AzkabanTriggerServer application;
+	private TriggerRunnerManager triggerRunnerManager;
+	
+	public TriggerServerServlet() {
+		super();
+	}
+	
+	@Override
+	public void init(ServletConfig config) throws ServletException {
+		application = (AzkabanTriggerServer) config.getServletContext().getAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY);
+
+		if (application == null) {
+			throw new IllegalStateException(
+					"No batch application is defined in the servlet context!");
+		}
+
+		triggerRunnerManager = application.getTriggerRunnerManager();
+	}
+
+	
+	protected void writeJSON(HttpServletResponse resp, Object obj) throws IOException {
+		resp.setContentType(JSON_MIME_TYPE);
+		ObjectMapper mapper = new ObjectMapper();
+		OutputStream stream = resp.getOutputStream();
+		mapper.writeValue(stream, obj);
+	}
+
+	@Override
+	public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		HashMap<String,Object> respMap= new HashMap<String,Object>();
+		//logger.info("ExecutorServer called by " + req.getRemoteAddr());
+		try {
+			if (!hasParam(req, ACTION_PARAM)) {
+				logger.error("Parameter action not set");
+				respMap.put("error", "Parameter action not set");
+			}
+			else {
+				String action = getParam(req, ACTION_PARAM);
+				if (action.equals(GET_UPDATE_ACTION)) {
+					//logger.info("Updated called");
+					handleAjaxGetUpdateRequest(req, respMap);
+				}
+				else if (action.equals(PING_ACTION)) {
+					respMap.put("status", "alive");
+				}
+				else {
+					int triggerId = Integer.parseInt(getParam(req, TRIGGER_ID_PARAM));
+					String user = getParam(req, USER_PARAM, null);
+					
+					logger.info("User " + user + " has called action " + action + " on " + triggerId);
+					if (action.equals(INSERT_TRIGGER_ACTION)) {
+						logger.info("Insert Trigger Action");
+						handleInsertTrigger(triggerId, req, resp, respMap);
+					} else if (action.equals(REMOVE_TRIGGER_ACTION)) {
+						logger.info("Remove Trigger Action");
+						handleRemoveTrigger(triggerId, req, resp, respMap);
+					} 
+					else if (action.equals(UPDATE_TRIGGER_ACTION)) {
+						logger.info("Update Trigger Action");
+						handleUpdateTrigger(triggerId, user, req, respMap);
+					}
+					else {
+						logger.error("action: '" + action + "' not supported.");
+						respMap.put("error", "action: '" + action + "' not supported.");
+					}
+				}
+			}
+		} catch (Exception e) {
+			logger.error(e);
+			respMap.put(RESPONSE_ERROR, e.getMessage());
+		}
+		writeJSON(resp, respMap);
+		resp.flushBuffer();
+	}
+	
+	
+
+	private void handleAjaxGetUpdateRequest(HttpServletRequest req, HashMap<String, Object> respMap) {
+		List<Integer> updates = null;
+		try{
+			long lastUpdateTime = getLongParam(req, "lastUpdateTime");
+//			respMap.put(TriggerConnectorParams.RESPONSE_UPDATETIME, DateTime.now().getMillis());
+			updates = triggerRunnerManager.getUpdatedTriggers(lastUpdateTime);
+			if(updates.size() > 0) {
+				System.out.println("got " + updates.size() + " updates" );
+			}
+			respMap.put("updates", updates);
+		} catch (Exception e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
+
+	private void handleInsertTrigger(int triggerId, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+		try {
+			triggerRunnerManager.loadTrigger(triggerId);
+		} catch (TriggerManagerException e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
+	
+	private void handleUpdateTrigger(int triggerId, String user, HttpServletRequest req, HashMap<String, Object> respMap) {
+		try {
+			triggerRunnerManager.updateTrigger(triggerId);
+		} catch (TriggerManagerException e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
+
+	private void handleRemoveTrigger(int triggerId, HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> respMap) {
+		try {
+			triggerRunnerManager.removeTrigger(triggerId);
+		} catch (TriggerManagerException e) {
+			logger.error(e);
+			respMap.put("error", e.getMessage());
+		}
+	}
+
+	@Override
+	public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		
+	}
+	
+	/**
+	 * Duplicated code with AbstractAzkabanServlet, but ne
+	 */
+	public boolean hasParam(HttpServletRequest request, String param) {
+		return request.getParameter(param) != null;
+	}
+
+	public String getParam(HttpServletRequest request, String name)
+			throws ServletException {
+		String p = request.getParameter(name);
+		if (p == null)
+			throw new ServletException("Missing required parameter '" + name + "'.");
+		else
+			return p;
+	}
+	
+	public String getParam(HttpServletRequest request, String name, String defaultVal ) {
+		String p = request.getParameter(name);
+		if (p == null) {
+			return defaultVal;
+		}
+
+		return p;
+	}
+
+	public long getLongParam(HttpServletRequest request, String name) throws ServletException {
+		String p = getParam(request, name);
+		return Long.parseLong(p);
+	}
+	
+	public int getIntParam(HttpServletRequest request, String name) throws ServletException {
+		String p = getParam(request, name);
+		return Integer.parseInt(p);
+	}
+	
+	public int getIntParam(HttpServletRequest request, String name, int defaultVal) {
+		if (hasParam(request, name)) {
+			try {
+				return getIntParam(request, name);
+			} catch (Exception e) {
+				return defaultVal;
+			}
+		}
+		
+		return defaultVal;
+	}
+}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 2ae3ee3..fdfbed3 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -57,7 +57,6 @@ import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxJettyServer;
-import azkaban.jmx.JmxSLAManager;
 import azkaban.jmx.JmxScheduler;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectManager;
@@ -66,13 +65,11 @@ import azkaban.scheduler.JdbcScheduleLoader;
 import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.TriggerBasedScheduleLoader;
-import azkaban.sla.JdbcSLALoader;
-import azkaban.sla.SLAManager;
-import azkaban.sla.SLAManagerException;
 import azkaban.trigger.JdbcTriggerLoader;
 import azkaban.trigger.TriggerLoader;
 import azkaban.trigger.TriggerManager;
 import azkaban.trigger.TriggerAgent;
+import azkaban.trigger.TriggerManagerException;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
 import azkaban.utils.FileIOUtils;
@@ -143,8 +140,6 @@ public class AzkabanWebServer extends AzkabanServer {
 //	private TriggerBasedScheduler scheduler;
 	private TriggerManager triggerManager;
 	
-	private SLAManager slaManager;
-
 	private final ClassLoader baseClassLoader;
 	
 	private Props props;
@@ -174,15 +169,14 @@ public class AzkabanWebServer extends AzkabanServer {
 		sessionCache = new SessionCache(props);
 		userManager = loadUserManager(props);		
 		executorManager = loadExecutorManager(props);
-		slaManager = loadSLAManager(props);
 		
 		triggerManager = loadTriggerManager(props);
 		
-		projectManager = loadProjectManager(props, triggerManager);
+		projectManager = loadProjectManager(props);
 		
 //		scheduler = loadScheduler(executorManager, projectManager, triggerManager);
 		
-		scheduleManager = loadScheduleManager(projectManager, executorManager, slaManager, triggerManager, props);
+		scheduleManager = loadScheduleManager(executorManager, triggerManager, props);
 		
 		baseClassLoader = getBaseClassloader();
 		
@@ -231,31 +225,34 @@ public class AzkabanWebServer extends AzkabanServer {
 		return manager;
 	}
 	
-	private ProjectManager loadProjectManager(Props props, TriggerManager triggerManager) {
+	private ProjectManager loadProjectManager(Props props) {
 		logger.info("Loading JDBC for project management");
 
 		JdbcProjectLoader loader = new JdbcProjectLoader(props);
-		ProjectManager manager = new ProjectManager(loader, props, triggerManager);
+		ProjectManager manager = new ProjectManager(loader, props);
+		manager.setTriggerManager(triggerManager);
 		
 		return manager;
 	}
 
 	private ExecutorManager loadExecutorManager(Props props) throws Exception {
 		JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
-		ExecutorManager execManager = new ExecutorManager(props, loader);
+		ExecutorManager execManager = new ExecutorManager(props, loader, true);
 		return execManager;
 	}
 
-	private ScheduleManager loadScheduleManager(ProjectManager projectManager, ExecutorManager executorManager, SLAManager slaManager, TriggerManager triggerManager, Props props ) throws Exception {
+	private ScheduleManager loadScheduleManager(ExecutorManager executorManager, TriggerManager triggerManager, Props props ) throws Exception {
 		ScheduleManager schedManager = null;
 		String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
 		if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
 			ScheduleLoader loader = new JdbcScheduleLoader(props);
-			schedManager = new ScheduleManager(executorManager, projectManager, slaManager, loader, false);
+			schedManager = new ScheduleManager(executorManager, loader, false);
+			schedManager.setProjectManager(projectManager);
+			schedManager.start();
 		} else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
 			logger.info("Loading trigger based scheduler");
-			ScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager, ScheduleManager.triggerSource);
-			schedManager = new ScheduleManager(executorManager, projectManager, slaManager, loader, true);
+			ScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, null, ScheduleManager.triggerSource);
+			schedManager = new ScheduleManager(executorManager, loader, true);
 		}
 
 		return schedManager;
@@ -266,12 +263,7 @@ public class AzkabanWebServer extends AzkabanServer {
 //		return new TriggerBasedScheduler(executorManager, projectManager, triggerManager, loader);
 //	}
 
-	private SLAManager loadSLAManager(Props props) throws SLAManagerException {
-		SLAManager slaManager = new SLAManager(executorManager, new JdbcSLALoader(props), props);
-		return slaManager;
-	}
-	
-	private TriggerManager loadTriggerManager(Props props) {
+	private TriggerManager loadTriggerManager(Props props) throws TriggerManagerException {
 		TriggerLoader loader = new JdbcTriggerLoader(props);
 		return new TriggerManager(props, loader);
 	}
@@ -317,10 +309,6 @@ public class AzkabanWebServer extends AzkabanServer {
 		return executorManager;
 	}
 	
-	public SLAManager getSLAManager() {
-		return slaManager;
-	}
-	
 	public ScheduleManager getScheduleManager() {
 		return scheduleManager;
 	}
@@ -663,7 +651,7 @@ public class AzkabanWebServer extends AzkabanServer {
 			}
 			
 			TriggerPlugin plugin = (TriggerPlugin) obj;
-//			AbstractTriggerServlet avServlet = (AbstractTriggerServlet)obj;
+//			AbstractAzkabanServlet avServlet = (AbstractAzkabanServlet) plugin.getServlet();
 //			root.addServlet(new ServletHolder(avServlet), "/" + pluginWebPath + "/*");
 			installedTriggerPlugins.put(pluginName, plugin);
 		}
@@ -908,7 +896,6 @@ public class AzkabanWebServer extends AzkabanServer {
 
 		registerMbean("jetty", new JmxJettyServer(server));
 		registerMbean("scheduler", new JmxScheduler(scheduleManager));
-		registerMbean("slaManager", new JmxSLAManager(slaManager));
 		registerMbean("executorManager", new JmxExecutorManager(executorManager));
 	}
 	
@@ -922,7 +909,6 @@ public class AzkabanWebServer extends AzkabanServer {
 			logger.error("Failed to cleanup MBeanServer", e);
 		}
 		scheduleManager.shutdown();
-		slaManager.shutdown();
 		executorManager.shutdown();
 	}
 	
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 8e71ee4..73fefdb 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -54,11 +54,6 @@ import azkaban.scheduler.Schedule;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.ScheduleManagerException;
 import azkaban.scheduler.ScheduleStatisticManager;
-import azkaban.sla.SLA;
-import azkaban.sla.SLA.SlaAction;
-import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLA.SlaSetting;
-import azkaban.sla.SlaOptions;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.User;
@@ -100,13 +95,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		HashMap<String, Object> ret = new HashMap<String, Object>();
 		String ajaxName = getParam(req, "ajax");
 		
-		if (ajaxName.equals("slaInfo")) {
-			ajaxSlaInfo(req, ret, session.getUser());
-		}
-		else if(ajaxName.equals("setSla")) {
-			ajaxSetSla(req, ret, session.getUser());
-		}
-		else if(ajaxName.equals("loadFlow")) {
+//		if (ajaxName.equals("slaInfo")) {
+//			ajaxSlaInfo(req, ret, session.getUser());
+//		}
+//		else if(ajaxName.equals("setSla")) {
+//			ajaxSetSla(req, ret, session.getUser());
+//		}
+		if(ajaxName.equals("loadFlow")) {
 			ajaxLoadFlows(req, ret, session.getUser());
 		}
 		else if(ajaxName.equals("loadHistory")) {
@@ -122,98 +117,98 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
-	private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
-		try {
-			
-			int scheduleId = getIntParam(req, "scheduleId");
-			
-			Schedule sched = scheduleManager.getSchedule(scheduleId);
-			
-			Project project = projectManager.getProject(sched.getProjectId());
-			if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
-				ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
-				return;
-			}
-			
-			
-			SlaOptions slaOptions= new SlaOptions();
-			
-			String slaEmails = getParam(req, "slaEmails");
-			String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-			
-			Map<String, String> settings = getParamGroup(req, "settings");
-			List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
-			for(String set : settings.keySet()) {
-				SlaSetting s;
-				try {
-				s = parseSlaSetting(settings.get(set));
-				}
-				catch (Exception e) {
-					throw new ServletException(e);
-				}
-				if(s != null) {
-					slaSettings.add(s);
-				}
-			}
-			
-			if(slaSettings.size() > 0) {
-				if(slaEmails.equals("")) {
-					ret.put("error", "Please put correct email settings for your SLA actions");
-					return;
-				}
-				slaOptions.setSlaEmails(Arrays.asList(emailSplit));
-				slaOptions.setSettings(slaSettings);
-			}
-			else {
-				slaOptions = null;
-			}
-			sched.setSlaOptions(slaOptions);
-			scheduleManager.insertSchedule(sched);
-
-			if(slaOptions != null) {
-				projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + sched.getFlowName() + " has been added/changed.");
-			}
-			
-		} catch (ServletException e) {
-			ret.put("error", e.getMessage());
-		} catch (ScheduleManagerException e) {
-			ret.put("error", e.getMessage());
-		}
-		
-	}
+//	private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+//		try {
+//			
+//			int scheduleId = getIntParam(req, "scheduleId");
+//			
+//			Schedule sched = scheduleManager.getSchedule(scheduleId);
+//			
+//			Project project = projectManager.getProject(sched.getProjectId());
+//			if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
+//				ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
+//				return;
+//			}
+//			
+//			
+//			SlaOptions slaOptions= new SlaOptions();
+//			
+//			String slaEmails = getParam(req, "slaEmails");
+//			String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+//			
+//			Map<String, String> settings = getParamGroup(req, "settings");
+//			List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
+//			for(String set : settings.keySet()) {
+//				SlaSetting s;
+//				try {
+//				s = parseSlaSetting(settings.get(set));
+//				}
+//				catch (Exception e) {
+//					throw new ServletException(e);
+//				}
+//				if(s != null) {
+//					slaSettings.add(s);
+//				}
+//			}
+//			
+//			if(slaSettings.size() > 0) {
+//				if(slaEmails.equals("")) {
+//					ret.put("error", "Please put correct email settings for your SLA actions");
+//					return;
+//				}
+//				slaOptions.setSlaEmails(Arrays.asList(emailSplit));
+//				slaOptions.setSettings(slaSettings);
+//			}
+//			else {
+//				slaOptions = null;
+//			}
+//			sched.setSlaOptions(slaOptions);
+//			scheduleManager.insertSchedule(sched);
+//
+//			if(slaOptions != null) {
+//				projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + sched.getFlowName() + " has been added/changed.");
+//			}
+//			
+//		} catch (ServletException e) {
+//			ret.put("error", e.getMessage());
+//		} catch (ScheduleManagerException e) {
+//			ret.put("error", e.getMessage());
+//		}
+//		
+//	}
 	
-	private SlaSetting parseSlaSetting(String set) throws ScheduleManagerException {
-		// "" + Duration + EmailAction + KillAction
-		String[] parts = set.split(",", -1);
-		String id = parts[0];
-		String rule = parts[1];
-		String duration = parts[2];
-		String emailAction = parts[3];
-		String killAction = parts[4];
-		if(emailAction.equals("true") || killAction.equals("true")) {
-			SlaSetting r = new SlaSetting();			
-			r.setId(id);
-			r.setRule(SlaRule.valueOf(rule));
-			ReadablePeriod dur;
-			try {
-				dur = parseDuration(duration);
-			}
-			catch (Exception e) {
-				throw new ScheduleManagerException("Unable to parse duration for a SLA that needs to take actions!", e);
-			}
-			r.setDuration(dur);
-			List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
-			if(emailAction.equals("true")) {
-				actions.add(SlaAction.EMAIL);
-			}
-			if(killAction.equals("true")) {
-				actions.add(SlaAction.KILL);
-			}
-			r.setActions(actions);
-			return r;
-		}
-		return null;
-	}
+//	private SlaSetting parseSlaSetting(String set) throws ScheduleManagerException {
+//		// "" + Duration + EmailAction + KillAction
+//		String[] parts = set.split(",", -1);
+//		String id = parts[0];
+//		String rule = parts[1];
+//		String duration = parts[2];
+//		String emailAction = parts[3];
+//		String killAction = parts[4];
+//		if(emailAction.equals("true") || killAction.equals("true")) {
+//			SlaSetting r = new SlaSetting();			
+//			r.setId(id);
+//			r.setRule(SlaRule.valueOf(rule));
+//			ReadablePeriod dur;
+//			try {
+//				dur = parseDuration(duration);
+//			}
+//			catch (Exception e) {
+//				throw new ScheduleManagerException("Unable to parse duration for a SLA that needs to take actions!", e);
+//			}
+//			r.setDuration(dur);
+//			List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
+//			if(emailAction.equals("true")) {
+//				actions.add(SlaAction.EMAIL);
+//			}
+//			if(killAction.equals("true")) {
+//				actions.add(SlaAction.KILL);
+//			}
+//			r.setActions(actions);
+//			return r;
+//		}
+//		return null;
+//	}
 
 	private ReadablePeriod parseDuration(String duration) {
 		int hour = Integer.parseInt(duration.split(":")[0]);
@@ -221,77 +216,77 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		return Minutes.minutes(min+hour*60).toPeriod();
 	}
 
-	private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
-		int scheduleId;
-		try {
-			scheduleId = getIntParam(req, "scheduleId");
-			
-			Schedule sched = scheduleManager.getSchedule(scheduleId);
-			
-			Project project = getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
-			if (project == null) {
-				ret.put("error", "Error loading project. Project " + sched.getProjectId() + " doesn't exist");
-				return;
-			}
-			
-			Flow flow = project.getFlow(sched.getFlowName());
-			if (flow == null) {
-				ret.put("error", "Error loading flow. Flow " + sched.getFlowName() + " doesn't exist in " + sched.getProjectId());
-				return;
-			}
-			
-			SlaOptions slaOptions = sched.getSlaOptions();
-			ExecutionOptions flowOptions = sched.getExecutionOptions();
-			
-			if(slaOptions != null) {
-				ret.put("slaEmails", slaOptions.getSlaEmails());
-				List<SlaSetting> settings = slaOptions.getSettings();
-				List<Object> setObj = new ArrayList<Object>();
-				for(SlaSetting set: settings) {
-					setObj.add(set.toObject());
-				}
-				ret.put("settings", setObj);
-			}
-			else if (flowOptions != null) {
-				if(flowOptions.getFailureEmails() != null) {
-					List<String> emails = flowOptions.getFailureEmails();
-					if(emails.size() > 0) {
-						ret.put("slaEmails", emails);
-					}
-				}
-			}
-			else {
-				if(flow.getFailureEmails() != null) {
-					List<String> emails = flow.getFailureEmails();
-					if(emails.size() > 0) {
-						ret.put("slaEmails", emails);
-					}
-				}
-			}
-			
-			List<String> disabledJobs;
-			if(flowOptions != null) {
-				disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
-			}
-			else {
-				disabledJobs = new ArrayList<String>();
-			}
-				
-			List<String> allJobs = new ArrayList<String>();
-			for(Node n : flow.getNodes()) {
-				if(!disabledJobs.contains(n.getId())) {
-					allJobs.add(n.getId());
-				}
-			}
-			ret.put("allJobNames", allJobs);
-		} catch (ServletException e) {
-			ret.put("error", e);
-		} catch (ScheduleManagerException e) {
-			// TODO Auto-generated catch block
-			ret.put("error", e);
-		}
-		
-	}
+//	private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+//		int scheduleId;
+//		try {
+//			scheduleId = getIntParam(req, "scheduleId");
+//			
+//			Schedule sched = scheduleManager.getSchedule(scheduleId);
+//			
+//			Project project = getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
+//			if (project == null) {
+//				ret.put("error", "Error loading project. Project " + sched.getProjectId() + " doesn't exist");
+//				return;
+//			}
+//			
+//			Flow flow = project.getFlow(sched.getFlowName());
+//			if (flow == null) {
+//				ret.put("error", "Error loading flow. Flow " + sched.getFlowName() + " doesn't exist in " + sched.getProjectId());
+//				return;
+//			}
+//			
+//			SlaOptions slaOptions = sched.getSlaOptions();
+//			ExecutionOptions flowOptions = sched.getExecutionOptions();
+//			
+//			if(slaOptions != null) {
+//				ret.put("slaEmails", slaOptions.getSlaEmails());
+//				List<SlaSetting> settings = slaOptions.getSettings();
+//				List<Object> setObj = new ArrayList<Object>();
+//				for(SlaSetting set: settings) {
+//					setObj.add(set.toObject());
+//				}
+//				ret.put("settings", setObj);
+//			}
+//			else if (flowOptions != null) {
+//				if(flowOptions.getFailureEmails() != null) {
+//					List<String> emails = flowOptions.getFailureEmails();
+//					if(emails.size() > 0) {
+//						ret.put("slaEmails", emails);
+//					}
+//				}
+//			}
+//			else {
+//				if(flow.getFailureEmails() != null) {
+//					List<String> emails = flow.getFailureEmails();
+//					if(emails.size() > 0) {
+//						ret.put("slaEmails", emails);
+//					}
+//				}
+//			}
+//			
+//			List<String> disabledJobs;
+//			if(flowOptions != null) {
+//				disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
+//			}
+//			else {
+//				disabledJobs = new ArrayList<String>();
+//			}
+//				
+//			List<String> allJobs = new ArrayList<String>();
+//			for(Node n : flow.getNodes()) {
+//				if(!disabledJobs.contains(n.getId())) {
+//					allJobs.add(n.getId());
+//				}
+//			}
+//			ret.put("allJobNames", allJobs);
+//		} catch (ServletException e) {
+//			ret.put("error", e);
+//		} catch (ScheduleManagerException e) {
+//			// TODO Auto-generated catch block
+//			ret.put("error", e);
+//		}
+//		
+//	}
 
 	protected Project getProjectAjaxByPermission(Map<String, Object> ret, int projectId, User user, Permission.Type type) {
 		Project project = projectManager.getProject(projectId);
@@ -618,9 +613,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		catch (Exception e) {
 			ret.put("error", e.getMessage());
 		}
-		SlaOptions slaOptions = null;
 		
-		Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
+		Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions);
 		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
 		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
 
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index 5fc7235..6365fd0 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -18,10 +18,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import azkaban.executor.ExecutionOptions;
-import azkaban.sla.SLA.SlaAction;
-import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLA.SlaSetting;
-import azkaban.sla.SlaOptions;
 import azkaban.database.DataSourceUtils;
 import azkaban.utils.Props;
 
@@ -111,28 +107,18 @@ public class JdbcScheduleLoaderTest {
 		List<String> disabledJobs = new ArrayList<String>();
 		disabledJobs.add("job1");
 		disabledJobs.add("job2");
-		List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
-		SlaSetting set1 = new SlaSetting();
-		List<SlaAction> actions = new ArrayList<SlaAction>();
-		actions.add(SlaAction.EMAIL);
-		set1.setActions(actions);
-		set1.setId("");
-		set1.setDuration(Schedule.parsePeriodString("1h"));
-		set1.setRule(SlaRule.FINISH);
-		slaSets.add(set1);
+
 		ExecutionOptions flowOptions = new ExecutionOptions();
 		flowOptions.setFailureEmails(emails);
 		flowOptions.setDisabledJobs(disabledJobs);
-		SlaOptions slaOptions = new SlaOptions();
-		slaOptions.setSlaEmails(emails);
-		slaOptions.setSettings(slaSets);
+
 		
-		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s2 = new Schedule(-1, 1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", flowOptions, slaOptions);
-		Schedule s3 = new Schedule(-1, 2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s4 = new Schedule(-1, 3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s5 = new Schedule(-1, 3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
-		Schedule s6 = new Schedule(-1, 3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+		Schedule s2 = new Schedule(-1, 1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+		Schedule s3 = new Schedule(-1, 2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+		Schedule s4 = new Schedule(-1, 3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+		Schedule s5 = new Schedule(-1, 3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+		Schedule s6 = new Schedule(-1, 3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
 		
 		loader.insertSchedule(s1);
 		loader.insertSchedule(s2);
@@ -149,11 +135,6 @@ public class JdbcScheduleLoaderTest {
 		Assert.assertEquals(44444, sched.getSubmitTime());
 		Assert.assertEquals("1d", Schedule.createPeriodString(sched.getPeriod()));
 		ExecutionOptions fOpt = sched.getExecutionOptions();
-		SlaOptions sOpt = sched.getSlaOptions();
-		Assert.assertEquals(SlaAction.EMAIL, sOpt.getSettings().get(0).getActions().get(0));
-		Assert.assertEquals("", sOpt.getSettings().get(0).getId());
-		Assert.assertEquals(Schedule.parsePeriodString("1h"), sOpt.getSettings().get(0).getDuration());
-		Assert.assertEquals(SlaRule.FINISH, sOpt.getSettings().get(0).getRule());
 		Assert.assertEquals(2, fOpt.getFailureEmails().size());
 		Assert.assertEquals(null, fOpt.getSuccessEmails());
 		Assert.assertEquals(2, fOpt.getDisabledJobs().size());
@@ -176,32 +157,19 @@ public class JdbcScheduleLoaderTest {
 		List<String> disabledJobs = new ArrayList<String>();
 		disabledJobs.add("job1");
 		disabledJobs.add("job2");
-		List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
-		SlaSetting set1 = new SlaSetting();
-		List<SlaAction> actions = new ArrayList<SlaAction>();
-		actions.add(SlaAction.EMAIL);
-		set1.setActions(actions);
-		set1.setId("");
-		set1.setDuration(Schedule.parsePeriodString("1h"));
-		set1.setRule(SlaRule.FINISH);
-		slaSets.add(set1);
+		
 		ExecutionOptions flowOptions = new ExecutionOptions();
 		flowOptions.setFailureEmails(emails);
 		flowOptions.setDisabledJobs(disabledJobs);
-		SlaOptions slaOptions = new SlaOptions();
-		slaOptions.setSlaEmails(emails);
-		slaOptions.setSettings(slaSets);
 		
 		System.out.println("the flow options are " + flowOptions);
-		System.out.println("the sla options are " + slaOptions);
-		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
 
 		loader.insertSchedule(s1);
 		
 		emails.add("email3");
-		slaOptions.setSlaEmails(emails);
 		
-		Schedule s2 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", flowOptions, slaOptions);
+		Schedule s2 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "azkaban", flowOptions);
 
 		loader.updateSchedule(s2);
 		
@@ -212,7 +180,6 @@ public class JdbcScheduleLoaderTest {
 		Assert.assertEquals(44445, schedules.get(0).getSubmitTime());
 		Assert.assertEquals("2M", Schedule.createPeriodString(schedules.get(0).getPeriod()));
 //		System.out.println("the options are " + schedules.get(0).getSchedOptions());
-		Assert.assertEquals(3, schedules.get(0).getSlaOptions().getSlaEmails().size());
 	}
 	
 	@Test
@@ -237,23 +204,11 @@ public class JdbcScheduleLoaderTest {
 			List<String> disabledJobs = new ArrayList<String>();
 			disabledJobs.add("job1");
 			disabledJobs.add("job2");
-			List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
-			SlaSetting set1 = new SlaSetting();
-			List<SlaAction> actions = new ArrayList<SlaAction>();
-			actions.add(SlaAction.EMAIL);
-			set1.setActions(actions);
-			set1.setId("");
-			set1.setDuration(Schedule.parsePeriodString("1h"));
-			set1.setRule(SlaRule.FINISH);
-			slaSets.add(set1);
 			ExecutionOptions flowOptions = new ExecutionOptions();
 			flowOptions.setFailureEmails(emails);
 			flowOptions.setDisabledJobs(disabledJobs);
-			SlaOptions slaOptions = new SlaOptions();
-			slaOptions.setSlaEmails(emails);
-			slaOptions.setSettings(slaSets);
 			
-			Schedule s = new Schedule(-1, i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+			Schedule s = new Schedule(-1, i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
 			schedules.add(s);
 			try {
 				loader.insertSchedule(s);
diff --git a/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
index a103f5d..d65ce7b 100644
--- a/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
+++ b/unit/java/azkaban/test/trigger/BasicTimeCheckerTest.java
@@ -10,9 +10,9 @@ import org.joda.time.DateTime;
 import org.joda.time.ReadablePeriod;
 import org.junit.Test;
 
-import azkaban.scheduler.BasicTimeChecker;
 import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.utils.Utils;
 
 public class BasicTimeCheckerTest {
diff --git a/unit/java/azkaban/test/trigger/ConditionTest.java b/unit/java/azkaban/test/trigger/ConditionTest.java
index 34ce3bf..09510cf 100644
--- a/unit/java/azkaban/test/trigger/ConditionTest.java
+++ b/unit/java/azkaban/test/trigger/ConditionTest.java
@@ -11,11 +11,11 @@ import static org.junit.Assert.assertFalse;
 import org.joda.time.DateTime;
 import org.junit.Test;
 
-import azkaban.scheduler.BasicTimeChecker;
 import azkaban.trigger.CheckerTypeLoader;
 import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.TriggerException;
+import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
diff --git a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
index a996885..962884c 100644
--- a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
+++ b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
@@ -8,11 +8,11 @@ import java.util.List;
 
 import org.junit.Test;
 
-import azkaban.actions.ExecuteFlowAction;
 import azkaban.executor.ExecutionOptions;
 import azkaban.trigger.ActionTypeLoader;
 import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerException;
+import azkaban.trigger.builtin.ExecuteFlowAction;
 import azkaban.utils.Props;
 
 
diff --git a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
index 31a0788..98bd5ae 100644
--- a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
+++ b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
@@ -20,11 +20,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import azkaban.actions.ExecuteFlowAction;
 import azkaban.database.DataSourceUtils;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
-import azkaban.scheduler.BasicTimeChecker;
 import azkaban.trigger.ActionTypeLoader;
 import azkaban.trigger.CheckerTypeLoader;
 import azkaban.trigger.Condition;
@@ -35,6 +33,8 @@ import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerException;
 import azkaban.trigger.TriggerLoader;
 import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
 
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index 98ec8fe..508436b 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -53,17 +53,17 @@ public class TriggerManagerTest {
 		
 		ThresholdChecker.setVal(1);
 		
-		triggerManager.insertTrigger(createDummyTrigger("test1", "triggerLoader", 10));
+		triggerManager.insertTrigger(createDummyTrigger("test1", "triggerLoader", 10), "testUser");
 		List<Trigger> triggers = triggerManager.getTriggers();
 		assertTrue(triggers.size() == 1);
 		Trigger t1 = triggers.get(0);
 		t1.setResetOnTrigger(false);
-		triggerManager.updateTrigger(t1);
+		triggerManager.updateTrigger(t1, "testUser");
 		ThresholdChecker checker1 = (ThresholdChecker) t1.getTriggerCondition().getCheckers().values().toArray()[0];
 		assertTrue(t1.getSource().equals("triggerLoader"));
 		
 		Trigger t2 = createDummyTrigger("test2: add new trigger", "addNewTriggerTest", 20);
-		triggerManager.insertTrigger(t2);
+		triggerManager.insertTrigger(t2, "testUser");
 		ThresholdChecker checker2 = (ThresholdChecker) t2.getTriggerCondition().getCheckers().values().toArray()[0];
 		
 		ThresholdChecker.setVal(15);
@@ -134,6 +134,20 @@ public class TriggerManagerTest {
 		public List<Trigger> loadTriggers() {
 			return new ArrayList<Trigger>(triggers.values());
 		}
+
+		@Override
+		public Trigger loadTrigger(int triggerId)
+				throws TriggerManagerException {
+			// TODO Auto-generated method stub
+			return null;
+		}
+
+		@Override
+		public List<Trigger> getUpdatedTriggers(long lastUpdateTime)
+				throws TriggerManagerException {
+			// TODO Auto-generated method stub
+			return null;
+		}
 		
 	}
 	
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
index db8e324..8629edd 100644
--- a/unit/java/azkaban/test/trigger/TriggerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -13,9 +13,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.*;
 
-import azkaban.actions.ExecuteFlowAction;
 import azkaban.executor.ExecutionOptions;
-import azkaban.scheduler.BasicTimeChecker;
 import azkaban.trigger.ActionTypeLoader;
 import azkaban.trigger.CheckerTypeLoader;
 import azkaban.trigger.Condition;
@@ -23,6 +21,8 @@ import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerException;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;