azkaban-uncached

lots of bug fixes

8/18/2013 5:00:17 PM

Changes

Details

diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index 39363ba..8a3864e 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -9,6 +9,7 @@ import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorManager.Alerter;
+import azkaban.sla.SlaOption;
 import azkaban.utils.AbstractMailer;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
@@ -25,6 +26,29 @@ public class ExecutorMailer extends AbstractMailer implements Alerter {
 		testMode = props.getBoolean("test.mode", false);
 	}
 	
+	@SuppressWarnings("unchecked")
+	private void sendSlaAlertEmail(SlaOption slaOption, String slaMessage) {
+		String subject = "Sla Violation Alert";
+		String body = slaMessage;
+		List<String> emailList = (List<String>) slaOption.getInfo().get(SlaOption.INFO_EMAIL_LIST);
+		if (emailList != null && !emailList.isEmpty()) {
+			EmailMessage message = super.createEmailMessage(
+					subject, 
+					"text/html", 
+					emailList);
+			
+			message.setBody(body);
+			
+			if (!testMode) {
+				try {
+					message.sendEmail();
+				} catch (MessagingException e) {
+					logger.error("Email message send failed" , e);
+				}
+			}
+		}
+	}
+	
 	public void sendFirstErrorMessage(ExecutableFlow flow) {
 		ExecutionOptions option = flow.getExecutionOptions();
 		List<String> emailList = option.getDisabledJobs();
@@ -180,4 +204,10 @@ public class ExecutorMailer extends AbstractMailer implements Alerter {
 	public void alertOnFirstError(ExecutableFlow exflow) throws Exception {
 		sendFirstErrorMessage(exflow);
 	}
+
+	@Override
+	public void alertOnSla(SlaOption slaOption, String slaMessage)
+			throws Exception {
+		sendSlaAlertEmail(slaOption, slaMessage);		
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 1b24a07..92a941c 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -46,6 +46,7 @@ import org.joda.time.DateTime;
 
 import azkaban.project.Project;
 import azkaban.scheduler.ScheduleStatisticManager;
+import azkaban.sla.SlaOption;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.FileIOUtils;
@@ -84,6 +85,7 @@ public class ExecutorManager {
 		void alertOnSuccess(ExecutableFlow exflow) throws Exception;
 		void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception;
 		void alertOnFirstError(ExecutableFlow exflow) throws Exception;
+		void alertOnSla(SlaOption slaOption, String slaMessage) throws Exception;
 	}
 	
 	public ExecutorManager(Props props, ExecutorLoader loader, boolean isActive) throws ExecutorManagerException {
@@ -97,9 +99,10 @@ public class ExecutorManager {
 		
 		this.isActive = isActive;		
 		
+		executingManager = new ExecutingManagerUpdaterThread();
+		executingManager.start();
+		
 		if(isActive) {
-			executingManager = new ExecutingManagerUpdaterThread();
-			executingManager.start();
 
 			long executionLogsRetentionMs = props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS);
 			cleanerThread = new CleanerThread(executionLogsRetentionMs);
@@ -862,6 +865,7 @@ public class ExecutorManager {
 	}
 	
 	private void finalizeFlows(ExecutableFlow flow) {
+
 		int execId = flow.getExecutionId();
 
 		// First we check if the execution in the datastore is complete
@@ -873,22 +877,30 @@ public class ExecutorManager {
 			else {
 				dsFlow = executorLoader.fetchExecutableFlow(execId);
 			
+				
+			}
+
+			if(isActive) {
 				// If it's marked finished, we're good. If not, we fail everything and then mark it finished.
 				if (!isFinished(dsFlow)) {
 					failEverything(dsFlow);
 					executorLoader.updateExecutableFlow(dsFlow);
 				}
-			}
-
-			// Delete the executing reference.
-			if (flow.getEndTime() == -1) {
-				flow.setEndTime(System.currentTimeMillis());
-				executorLoader.updateExecutableFlow(dsFlow);
-			}
-			executorLoader.removeActiveExecutableReference(execId);
 			
-			runningFlows.remove(execId);
-			recentlyFinished.put(execId, dsFlow);
+				// Delete the executing reference.
+				if (flow.getEndTime() == -1) {
+					flow.setEndTime(System.currentTimeMillis());
+					executorLoader.updateExecutableFlow(dsFlow);
+				}
+				executorLoader.removeActiveExecutableReference(execId);
+				
+				runningFlows.remove(execId);
+				recentlyFinished.put(execId, dsFlow);
+			} else {
+				runningFlows.remove(execId);
+				recentlyFinished.put(execId, dsFlow);
+				return;
+			}
 		} catch (ExecutorManagerException e) {
 			logger.error(e);
 		}
@@ -1005,6 +1017,7 @@ public class ExecutorManager {
 	}
 	
 	private ExecutableFlow updateExecution(Map<String,Object> updateData) throws ExecutorManagerException {
+		
 		Integer execId = (Integer)updateData.get(ConnectorParams.UPDATE_MAP_EXEC_ID);
 		if (execId == null) {
 			throw new ExecutorManagerException("Response is malformed. Need exec id to update.");
@@ -1030,7 +1043,7 @@ public class ExecutorManager {
 		Status newStatus = flow.getStatus();
 		
 		ExecutionOptions options = flow.getExecutionOptions();
-		if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
+		if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING) && isActive) {
 			// We want to see if we should give an email status on first failure.
 			if (options.getNotifyOnFirstFailure()) {
 				Alerter mailAlerter = alerters.get("email");
diff --git a/src/java/azkaban/jmx/JmxTriggerManager.java b/src/java/azkaban/jmx/JmxTriggerManager.java
new file mode 100644
index 0000000..0bacf7b
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxTriggerManager.java
@@ -0,0 +1,52 @@
+package azkaban.jmx;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+
+import azkaban.trigger.TriggerManager;
+
+public class JmxTriggerManager implements JmxTriggerManagerMBean {
+	private TriggerManager manager;
+
+	public JmxTriggerManager(TriggerManager manager) {
+		this.manager = manager;
+	}
+
+	@Override
+	public String getLastThreadCheckTime() {
+		return new DateTime(manager.getLastThreadCheckTime()).toString();
+	}
+
+	@Override
+	public boolean isThreadActive() {
+		return manager.isThreadActive();
+	}
+
+	@Override
+	public List<String> getPrimaryTriggerHostPorts() {
+		return new ArrayList<String>(manager.getPrimaryServerHosts());
+	}
+
+	@Override
+	public List<String> getAllTriggerHostPorts() {
+		return new ArrayList<String>(manager.getAllActiveTriggerServerHosts());
+	}
+
+	@Override
+	public int getNumTriggers() {
+		return manager.getNumTriggers();
+	}
+
+	@Override
+	public String getTriggerSources() {
+		return manager.getTriggerSources();
+	}
+
+	@Override
+	public String getTriggerIds() {
+		return manager.getTriggerIds();
+	}
+	
+}
diff --git a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
new file mode 100644
index 0000000..af3cc9a
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
@@ -0,0 +1,27 @@
+package azkaban.jmx;
+
+import java.util.List;
+
+public interface JmxTriggerManagerMBean {	
+	
+	@DisplayName("OPERATION: getLastThreadCheckTime")
+	public String getLastThreadCheckTime();
+
+	@DisplayName("OPERATION: isThreadActive")
+	public boolean isThreadActive();
+
+	@DisplayName("OPERATION: getPrimaryTriggerHostPorts")
+	public List<String> getPrimaryTriggerHostPorts();
+	
+	@DisplayName("OPERATION: getAllTriggerHostPorts")
+	public List<String> getAllTriggerHostPorts();
+	
+	@DisplayName("OPERATION: getNumTriggers")
+	public int getNumTriggers();
+	
+	@DisplayName("OPERATION: getTriggerSources")
+	public String getTriggerSources();
+	
+	@DisplayName("OPERATION: getTriggerIds")
+	public String getTriggerIds();
+}
diff --git a/src/java/azkaban/jmx/JmxTriggerRunnerManager.java b/src/java/azkaban/jmx/JmxTriggerRunnerManager.java
index d0af406..9a57638 100644
--- a/src/java/azkaban/jmx/JmxTriggerRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxTriggerRunnerManager.java
@@ -19,4 +19,24 @@ public class JmxTriggerRunnerManager implements JmxTriggerRunnerManagerMBean {
 		return manager.isRunnerThreadActive();
 	}
 
+	@Override
+	public int getNumTriggers() {
+		return manager.getNumTriggers();
+	}
+
+	@Override
+	public String getTriggerSources() {
+		return manager.getTriggerSources();
+	}
+
+	@Override
+	public String getTriggerIds() {
+		return manager.getTriggerIds();
+	}
+
+	@Override
+	public long getScannerIdleTime() {
+		return manager.getScannerIdleTime();
+	}
+
 }
diff --git a/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java
index 77b72e7..ca0f45b 100644
--- a/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxTriggerRunnerManagerMBean.java
@@ -5,7 +5,19 @@ public interface JmxTriggerRunnerManagerMBean {
 	@DisplayName("OPERATION: getLastRunnerThreadCheckTime")
 	public long getLastRunnerThreadCheckTime();
 
+	@DisplayName("OPERATION: getNumTriggers")
+	public int getNumTriggers();
+	
 	@DisplayName("OPERATION: isRunnerThreadActive")
 	public boolean isRunnerThreadActive();
-
+	
+	@DisplayName("OPERATION: getTriggerSources")
+	public String getTriggerSources();
+	
+	@DisplayName("OPERATION: getTriggerIds")
+	public String getTriggerIds();
+	
+	@DisplayName("OPERATION: getScannerIdleTime")
+	public long getScannerIdleTime();
+	
 }
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index ccbad60..2f529a6 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -16,7 +16,9 @@
 
 package azkaban.scheduler;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.joda.time.DateTime;
@@ -31,6 +33,7 @@ import org.joda.time.Seconds;
 import org.joda.time.Weeks;
 
 import azkaban.executor.ExecutionOptions;
+import azkaban.sla.SlaOption;
 import azkaban.utils.Pair;
 
 public class Schedule{
@@ -56,6 +59,7 @@ public class Schedule{
 	private boolean skipPastOccurrences = true;
 	
 	private ExecutionOptions executionOptions;
+	private List<SlaOption> slaOptions;
 	
 	public Schedule(
 						int scheduleId,
@@ -84,6 +88,7 @@ public class Schedule{
 				nextExecTime,
 				submitTime,
 				submitUser,
+				null,
 				null
 				);
 	}
@@ -101,7 +106,8 @@ public class Schedule{
 						long nextExecTime,						
 						long submitTime,
 						String submitUser,
-						ExecutionOptions executionOptions
+						ExecutionOptions executionOptions,
+						List<SlaOption> slaOptions
 			) {
 		this(scheduleId, projectId, 
 				projectName, 
@@ -114,7 +120,8 @@ public class Schedule{
 				nextExecTime,
 				submitTime,
 				submitUser,
-				executionOptions
+				executionOptions,
+				slaOptions
 				);
 	}
 
@@ -131,7 +138,8 @@ public class Schedule{
 						long nextExecTime,
 						long submitTime,
 						String submitUser,
-						ExecutionOptions executionOptions
+						ExecutionOptions executionOptions,
+						List<SlaOption> slaOptions
 						) {
 		this.scheduleId = scheduleId;
 		this.projectId = projectId;
@@ -146,15 +154,24 @@ public class Schedule{
 		this.status = status;
 		this.submitTime = submitTime;
 		this.executionOptions = executionOptions;
+		this.slaOptions = slaOptions;
 	}
 
 	public ExecutionOptions getExecutionOptions() {
 		return executionOptions;
 	}
+	
+	public List<SlaOption> getSlaOptions() {
+		return slaOptions;
+	}
 
 	public void setFlowOptions(ExecutionOptions executionOptions) {
 		this.executionOptions = executionOptions;
 	}
+	
+	public void setSlaOptions(List<SlaOption> slaOptions) {
+		this.slaOptions = slaOptions;
+	}
 
 	public String getScheduleName() {
 		return projectName + "." + flowName + " (" + projectId + ")";
@@ -328,7 +345,6 @@ public class Schedule{
 		return periodStr;
 	}
 	
-	
 	public Map<String,Object> optionsToObject() {
 		if(executionOptions != null ) {
 			HashMap<String, Object> schedObj = new HashMap<String, Object>();
@@ -336,6 +352,15 @@ public class Schedule{
 			if(executionOptions != null) {
 				schedObj.put("executionOptions", executionOptions.toObject());
 			}
+			
+			if(slaOptions != null) {
+				List<Object> slaOptionsObject = new ArrayList<Object>();
+//				schedObj.put("slaOptions", slaOptions.toObject());
+				for(SlaOption sla : slaOptions) {
+					slaOptionsObject.add(sla.toObject());
+				}
+				schedObj.put("slaOptions", slaOptionsObject);
+			}
 	
 			return schedObj;
 		}
@@ -358,7 +383,17 @@ public class Schedule{
 			this.executionOptions = new ExecutionOptions();
 			this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
 		}
-
+		
+		if(schedObj.containsKey("slaOptions")) {
+			List<Object> slaOptionsObject = (List<Object>) schedObj.get("slaOptions");
+			List<SlaOption> slaOptions = new ArrayList<SlaOption>();
+			for(Object slaObj : slaOptionsObject) {
+				slaOptions.add(SlaOption.fromObject(slaObj));
+			}
+			this.slaOptions = slaOptions;
+		}
+		
+		
 	}
 
 	public boolean isRecurring() {
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 4ce0e25..ad4104a 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -41,6 +41,7 @@ import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
+import azkaban.sla.SlaOption;
 import azkaban.trigger.TriggerAgent;
 import azkaban.trigger.TriggerStatus;
 import azkaban.utils.Pair;
@@ -59,8 +60,9 @@ public class ScheduleManager implements TriggerAgent {
 	private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
 	private ScheduleLoader loader;
 
-	private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
+//	private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
 	private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
+	private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
 	
 	private final ExecutorManager executorManager;
 	
@@ -186,10 +188,14 @@ public class ScheduleManager implements TriggerAgent {
 	 * @return
 	 * @throws ScheduleManagerException 
 	*/
-	public Set<Schedule> getSchedules(int projectId, String flowId) throws ScheduleManagerException {
-		updateLocal();
-		return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
-	}
+//	public Set<Schedule> getSchedules(int projectId, String flowId) throws ScheduleManagerException {
+//		updateLocal();
+//		return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
+//	}
+	public Schedule getSchedule(int projectId, String flowId) throws ScheduleManagerException {
+	updateLocal();
+	return scheduleIdentityPairMap.get(new Pair<Integer,String>(projectId, flowId));
+}
 
 	/**
 	 * Returns the scheduled flow for the scheduleId
@@ -210,12 +216,18 @@ public class ScheduleManager implements TriggerAgent {
 	 * @param id
 	 * @throws ScheduleManagerException 
 	 */
-	public synchronized void removeSchedules(int projectId, String flowId) throws ScheduleManagerException {
-		Set<Schedule> schedules = getSchedules(projectId, flowId);
-		if(schedules != null) {
-			for(Schedule sched : schedules) {
-				removeSchedule(sched);
-			}
+//	public synchronized void removeSchedules(int projectId, String flowId) throws ScheduleManagerException {
+//		Set<Schedule> schedules = getSchedules(projectId, flowId);
+//		if(schedules != null) {
+//			for(Schedule sched : schedules) {
+//				removeSchedule(sched);
+//			}
+//		}
+//	}
+	public synchronized void removeSchedule(int projectId, String flowId) throws ScheduleManagerException {
+		Schedule sched = getSchedule(projectId, flowId);
+		if(sched != null) {
+			removeSchedule(sched);
 		}
 	}
 	/**
@@ -226,13 +238,18 @@ public class ScheduleManager implements TriggerAgent {
 	public synchronized void removeSchedule(Schedule sched) {
 		
 		Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
-		Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
-		if(schedules != null) {
-			schedules.remove(sched);
-			if(schedules.size() == 0) {
-				scheduleIdentityPairMap.remove(identityPairMap);
-			}
+//		Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
+//		if(schedules != null) {
+//			schedules.remove(sched);
+//			if(schedules.size() == 0) {
+//				scheduleIdentityPairMap.remove(identityPairMap);
+//			}
+//		}
+		Schedule schedule = scheduleIdentityPairMap.get(identityPairMap);
+		if(schedule != null) {
+			scheduleIdentityPairMap.remove(identityPairMap);
 		}
+
 		scheduleIDMap.remove(sched.getScheduleId());
 		
 		try {
@@ -284,7 +301,7 @@ public class ScheduleManager implements TriggerAgent {
 			final long submitTime,
 			final String submitUser
 			) {
-		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null);
+		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
 	}
 	
 	public Schedule scheduleFlow(
@@ -300,9 +317,10 @@ public class ScheduleManager implements TriggerAgent {
 			final long nextExecTime,
 			final long submitTime,
 			final String submitUser,
-			ExecutionOptions execOptions
+			ExecutionOptions execOptions,
+			List<SlaOption> slaOptions
 			) {
-		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions);
+		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
 		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
 				+ _dateFormat.print(firstSchedTime) + " with a period of "
 				+ period == null ? "(non-recurring)" : period);
@@ -317,7 +335,11 @@ public class ScheduleManager implements TriggerAgent {
 	 * @param flow
 	 */
 	private synchronized void internalSchedule(Schedule s) {
-		Schedule existing = scheduleIDMap.get(s.getScheduleId());
+		//Schedule existing = scheduleIDMap.get(s.getScheduleId());
+		Schedule existing = null;
+		if(scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
+			existing = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
+		}
 		if(!useExternalRunner) {
 			if (existing != null) {
 				this.runner.removeRunnerSchedule(existing);
@@ -326,12 +348,13 @@ public class ScheduleManager implements TriggerAgent {
 			this.runner.addRunnerSchedule(s);
 		}
 		scheduleIDMap.put(s.getScheduleId(), s);
-		Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
-		if(schedules == null) {
-			schedules = new HashSet<Schedule>();
-			scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), schedules);
-		}
-		schedules.add(s);
+//		Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
+//		if(schedules == null) {
+//			schedules = new HashSet<Schedule>();
+//			scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), schedules);
+//		}
+//		schedules.add(s);
+		scheduleIdentityPairMap.put(s.getScheduleIdentityPair(), s);
 	}
 
 	/**
@@ -340,14 +363,16 @@ public class ScheduleManager implements TriggerAgent {
 	 * @param flow
 	 */
 	public synchronized void insertSchedule(Schedule s) {
-		boolean exist = s.getScheduleId() != -1;
+		//boolean exist = s.getScheduleId() != -1;
+		Schedule exist = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
 		if(s.updateTime()) {
 			try {
-				if(!exist) {
+				if(exist == null) {
 					loader.insertSchedule(s);
 					internalSchedule(s);
 				}
 				else{
+					s.setScheduleId(exist.getScheduleId());
 					loader.updateSchedule(s);
 					internalSchedule(s);
 				}
@@ -524,7 +549,24 @@ public class ScheduleManager implements TriggerAgent {
 										e.printStackTrace();
 										throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
 									}
-									
+//									SlaOptions slaOptions = runningSched.getSlaOptions();
+//									if(slaOptions != null) {
+//										logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
+//										// submit flow slas
+//										List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
+//										for(SlaSetting set : slaOptions.getSettings()) {
+//											if(set.getId().equals("")) {
+//												DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
+//												slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
+//											}
+//											else {
+//												jobsettings.add(set);
+//											}
+//										}
+//										if(jobsettings.size() > 0) {
+//											slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+//										}
+//									}
 								} 
 								catch (ExecutorManagerException e) {
 									if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index b79fc3f..ba6bdd0 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -10,15 +10,17 @@ import org.joda.time.DateTime;
 
 import azkaban.executor.ExecutorManager;
 import azkaban.project.ProjectManager;
+import azkaban.sla.SlaOption;
 import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerManager;
 import azkaban.trigger.TriggerManagerException;
-import azkaban.trigger.TriggerStatus;
 import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.CreateTriggerAction;
 import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.trigger.builtin.SlaChecker;
 
 public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	
@@ -44,7 +46,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		Condition triggerCondition = createTimeTriggerCondition(s);
 		Condition expireCondition = createTimeExpireCondition(s);
 		List<TriggerAction> actions = createActions(s);
-		Trigger t = new Trigger(new DateTime(s.getLastModifyTime()), new DateTime(s.getSubmitTime()), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
+		Trigger t = new Trigger(s.getScheduleId(), new DateTime(s.getLastModifyTime()), new DateTime(s.getSubmitTime()), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
 		if(s.isRecurring()) {
 			t.setResetOnTrigger(true);
 		}
@@ -53,8 +55,18 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	
 	private List<TriggerAction> createActions (Schedule s) {
 		List<TriggerAction> actions = new ArrayList<TriggerAction>();
-		TriggerAction act = new ExecuteFlowAction(s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions());
-		actions.add(act);
+		ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
+		actions.add(executeAct);
+//		List<SlaOption> slaOptions = s.getSlaOptions();
+//		if(slaOptions != null && slaOptions.size() > 0) {
+//			// insert a trigger to keep watching that execution
+//			for(SlaOption sla : slaOptions) {
+//				// need to create triggers for each sla
+//				SlaChecker slaChecker = new SlaChecker("slaChecker", sla, executeAct.getId());
+//				
+//			}
+//		}
+		
 		return actions;
 	}
 	
@@ -110,7 +122,6 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		List<Schedule> schedules = new ArrayList<Schedule>();
 //		triggersLocalCopy = new HashMap<Integer, Trigger>();
 		for(Trigger t : triggers) {
-//				triggersLocalCopy.put(t.getTriggerId(), t);
 			lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
 			Schedule s = triggerToSchedule(t);
 			schedules.add(s);
@@ -151,7 +162,9 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 					t.getLastModifyTime().getMillis(),
 					ck.getNextCheckTime().getMillis(),
 					t.getSubmitTime().getMillis(),
-					t.getSubmitUser());
+					t.getSubmitUser(),
+					act.getExecutionOptions(),
+					act.getSlaOptions());
 			return s;
 		} else {
 			logger.error("Failed to parse schedule from trigger!");
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
index 799c487..28b0328 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduler.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
@@ -31,12 +30,8 @@ import org.joda.time.format.DateTimeFormatter;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
 import azkaban.project.ProjectManager;
-import azkaban.trigger.Condition;
-import azkaban.trigger.ConditionChecker;
-import azkaban.trigger.Trigger;
-import azkaban.trigger.TriggerAction;
+import azkaban.sla.SlaOption;
 import azkaban.trigger.TriggerManager;
-import azkaban.trigger.builtin.ExecuteFlowAction;
 import azkaban.utils.Pair;
 
 /**
@@ -147,7 +142,7 @@ public class TriggerBasedScheduler {
 			final long submitTime,
 			final String submitUser
 			) {
-		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null);
+		return scheduleFlow(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
 	}
 	
 	public Schedule scheduleFlow(
@@ -163,9 +158,10 @@ public class TriggerBasedScheduler {
 			final long nextExecTime,
 			final long submitTime,
 			final String submitUser,
-			ExecutionOptions execOptions
+			ExecutionOptions execOptions,
+			List<SlaOption> slaOptions
 			) {
-		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions);
+		Schedule sched = new Schedule(scheduleId, projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
 		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
 				+ _dateFormat.print(firstSchedTime) + " with a period of "
 				+ period == null ? "(non-recurring)" : period);
diff --git a/src/java/azkaban/sla/SlaOption.java b/src/java/azkaban/sla/SlaOption.java
new file mode 100644
index 0000000..ad5b536
--- /dev/null
+++ b/src/java/azkaban/sla/SlaOption.java
@@ -0,0 +1,129 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.ReadablePeriod;
+
+import azkaban.utils.Utils;
+
+public class SlaOption {
+	
+	public static final String TYPE_FLOW_FINISH = "FlowFinish";
+	public static final String TYPE_FLOW_SUCCEED = "FlowSucceed";
+	public static final String TYPE_FLOW_PROGRESS = "FlowProgress";
+	
+	public static final String TYPE_JOB_FINISH = "JobFinish";
+	public static final String TYPE_JOB_SUCCEED = "JobSucceed";
+	public static final String TYPE_JOB_PROGRESS = "JobProgress";
+	
+	public static final String INFO_DURATION = "Duration";
+	public static final String INFO_FLOW_NAME = "FlowName";
+	public static final String INFO_JOB_NAME = "JobName";
+	public static final String INFO_PROGRESS_PERCENT = "ProgressPercent";
+	public static final String INFO_EMAIL_LIST = "EmailList";
+	
+	// always alert
+	public static final String ALERT_TYPE = "SlaAlertType";
+	public static final String ACTION_CANCEL_FLOW = "SlaCancelFlow";
+	public static final String ACTION_ALERT = "SlaAlert";
+	
+	private String type;
+	private Map<String, Object> info;
+	private List<String> actions;
+	
+	public SlaOption(
+			String type,
+			List<String> actions,
+			Map<String, Object> info
+	) {
+		this.type = type;
+		this.info = info;
+		this.actions = actions;
+	}
+
+	public String getType() {
+		return type;
+	}
+
+	public void setType(String type) {
+		this.type = type;
+	}
+
+	public Map<String, Object> getInfo() {
+		return info;
+	}
+
+	public void setInfo(Map<String, Object> info) {
+		this.info = info;
+	}
+
+	public List<String> getActions() {
+		return actions;
+	}
+
+	public void setActions(List<String> actions) {
+		this.actions = actions;
+	}
+
+	public Map<String,Object> toObject() {
+		HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+		slaObj.put("type", type);
+		slaObj.put("info", info);
+		slaObj.put("actions", actions);
+
+		return slaObj;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static SlaOption fromObject(Object object) {
+
+		HashMap<String, Object> slaObj = (HashMap<String,Object>)object;
+
+		String type = (String) slaObj.get("type");
+		List<String> actions = (List<String>) slaObj.get("actions");
+		Map<String, Object> info = (Map<String, Object>) slaObj.get("info");
+
+		return new SlaOption(type, actions, info);
+	}
+
+	public Object toWebObject() {
+		HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+//		slaObj.put("type", type);
+//		slaObj.put("info", info);
+//		slaObj.put("actions", actions);
+		if(type.equals(TYPE_FLOW_FINISH) || type.equals(TYPE_FLOW_SUCCEED)) {
+			slaObj.put("id", "");
+		} else {
+			slaObj.put("id", info.get(INFO_JOB_NAME));
+		}
+		slaObj.put("duration", info.get(INFO_DURATION));
+		if(type.equals(TYPE_FLOW_FINISH) || type.equals(TYPE_JOB_FINISH)) {
+			slaObj.put("rule", "FINISH");
+		} else {
+			slaObj.put("rule", "SUCCESS");
+		} 
+		List<String> actionsObj = new ArrayList<String>();
+		for(String act : actions) {
+			if(act.equals(ACTION_ALERT)) {
+				actionsObj.add("EMAIL");
+			}
+			else {
+				actionsObj.add("KILL");
+			}
+		}
+		slaObj.put("actions", actionsObj);
+		
+		return slaObj;
+	}
+	
+	@Override
+	public String toString() {
+		return "Sla of " + getType() +  getInfo() + getActions();
+	}
+
+}
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 1e4cc3c..4ee2ef3 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -2,7 +2,6 @@ package azkaban.trigger;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -32,7 +31,7 @@ public class ActionTypeLoader {
 		// load built-in actions
 		
 
-		loadDefaultActions();
+		loadBuiltinActions();
 		
 		loadPluginActions(props);
 
@@ -135,11 +134,18 @@ public class ActionTypeLoader {
 		logger.info("Loaded action type " + actionName + " " + actionClass);
 	}
 	
-	private void loadDefaultActions() {
+	private void loadBuiltinActions() {
 		actionToClass.put(ExecuteFlowAction.type, ExecuteFlowAction.class);		
 		logger.info("Loaded ExecuteFlowAction type.");
 	}
 	
+	public static void registerBuiltinActions(Map<String, Class<? extends TriggerAction>> builtinActions) {
+		actionToClass.putAll(builtinActions);
+		for(String type : builtinActions.keySet()) {
+			logger.info("Loaded " + type + " action.");
+		}
+	}
+	
 	public TriggerAction createActionFromJson(String type, Object obj) throws Exception {
 		TriggerAction action = null;
 		Class<? extends TriggerAction> actionClass = actionToClass.get(type);		
diff --git a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 05e0722..6071019 100644
--- a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -157,7 +157,12 @@ public class BasicTimeChecker implements ConditionChecker {
 		boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
 		ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
 		String id = (String) jsonObj.get("id");
-		return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+
+		BasicTimeChecker checker = new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
+		if(skipPastChecks) {
+			checker.updateNextCheckTime();
+		}
+		return checker;
 	}
 	
 	@Override
@@ -203,10 +208,14 @@ public class BasicTimeChecker implements ConditionChecker {
 //		return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
 //	}
 	
+	private void updateNextCheckTime(){
+		nextCheckTime = calculateNextCheckTime();
+	}
+	
 	private DateTime calculateNextCheckTime(){
 		DateTime date = new DateTime(nextCheckTime);
 		int count = 0;
-		while(!DateTime.now().isBefore(date) && skipPastChecks) {
+		while(!DateTime.now().isBefore(date)) {
 			if(count > 100000) {
 				throw new IllegalStateException("100000 increments of period did not get to present time.");
 			}
@@ -217,6 +226,9 @@ public class BasicTimeChecker implements ConditionChecker {
 				date = date.plus(period);
 			}
 			count += 1;
+			if(!skipPastChecks) {
+				continue;
+			}
 		}
 		return date;
 	}
@@ -247,4 +259,10 @@ public class BasicTimeChecker implements ConditionChecker {
 		return;
 	}
 
+	@Override
+	public void setContext(Map<String, Object> context) {
+		// TODO Auto-generated method stub
+		
+	}
+
 }
diff --git a/src/java/azkaban/trigger/builtin/CreateTriggerAction.java b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
index e231023..4f8baac 100644
--- a/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
+++ b/src/java/azkaban/trigger/builtin/CreateTriggerAction.java
@@ -12,8 +12,11 @@ public class CreateTriggerAction implements TriggerAction {
 	public static final String type = "CreateTriggerAction";
 	private static TriggerRunnerManager triggerRunnerManager;
 	private Trigger trigger;
-
-	public CreateTriggerAction(Trigger trigger) {
+	private Map<String, Object> context;
+	private String actionId;
+	
+	public CreateTriggerAction(String actionId, Trigger trigger) {
+		this.actionId = actionId;
 		this.trigger = trigger;
 	}
 	
@@ -32,8 +35,9 @@ public class CreateTriggerAction implements TriggerAction {
 		if(!jsonObj.get("type").equals(type)) {
 			throw new Exception("Cannot create action of " + type + " from " + jsonObj.get("type"));
 		}
+		String actionId = (String) jsonObj.get("actionId");
 		Trigger trigger = Trigger.fromJson(jsonObj.get("trigger"));
-		return new CreateTriggerAction(trigger);
+		return new CreateTriggerAction(actionId, trigger);
 	}
 	
 	@Override
@@ -45,6 +49,7 @@ public class CreateTriggerAction implements TriggerAction {
 	@Override
 	public Object toJson() {
 		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("actionId", actionId);
 		jsonObj.put("type", type);
 		jsonObj.put("trigger", trigger.toJson());
 
@@ -61,4 +66,14 @@ public class CreateTriggerAction implements TriggerAction {
 		return "create another: " + trigger.getDescription();
 	}
 
+	@Override
+	public String getId() {
+		return actionId;
+	}
+
+	@Override
+	public void setContext(Map<String, Object> context) {
+		this.context = context;
+	}
+
 }
diff --git a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index a96dfcd..1ff52cb 100644
--- a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -1,6 +1,8 @@
 package azkaban.trigger.builtin;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
@@ -12,28 +14,41 @@ import azkaban.executor.ExecutorManagerException;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
+import azkaban.sla.SlaOption;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
+import azkaban.triggerapp.TriggerRunnerManager;
 
 public class ExecuteFlowAction implements TriggerAction {
 
 	public static final String type = "ExecuteFlowAction";
+
+	public static final String EXEC_ID = "ExecuteFlowAction.execid";
 	
 	private static ExecutorManager executorManager;
+	private static TriggerRunnerManager triggerRunnerManager;
+	private String actionId;
 	private int projectId;
 	private String projectName;
 	private String flowName;
 	private String submitUser;
 	private static ProjectManager projectManager;
-	private ExecutionOptions executionOptions;
-
+	private ExecutionOptions executionOptions = new ExecutionOptions();
+	private List<SlaOption> slaOptions;
+	private Map<String, Object> context;
+	
 	private static Logger logger = Logger.getLogger(ExecuteFlowAction.class);
 	
-	public ExecuteFlowAction(int projectId, String projectName, String flowName, String submitUser, ExecutionOptions executionOptions) {
+	public ExecuteFlowAction(String actionId, int projectId, String projectName, String flowName, String submitUser, ExecutionOptions executionOptions, List<SlaOption> slaOptions) {
+		this.actionId = actionId;
 		this.projectId = projectId;
 		this.projectName = projectName;
 		this.flowName = flowName;
 		this.submitUser = submitUser;
 		this.executionOptions = executionOptions;
+		this.slaOptions = slaOptions;
 	}
 	
 	public static void setLogger(Logger logger) {
@@ -75,6 +90,14 @@ public class ExecuteFlowAction implements TriggerAction {
 	public void setExecutionOptions(ExecutionOptions executionOptions) {
 		this.executionOptions = executionOptions;
 	}
+	
+	public List<SlaOption> getSlaOptions() {
+		return slaOptions;
+	}
+
+	public void setSlaOptions(List<SlaOption> slaOptions) {
+		this.slaOptions = slaOptions;
+	}
 
 	public static ExecutorManager getExecutorManager() {
 		return executorManager;
@@ -83,6 +106,14 @@ public class ExecuteFlowAction implements TriggerAction {
 	public static void setExecutorManager(ExecutorManager executorManager) {
 		ExecuteFlowAction.executorManager = executorManager;
 	}
+	
+	public static TriggerRunnerManager getTriggerRunnerManager() {
+		return triggerRunnerManager;
+	}
+ 	
+	public static void setTriggerRunnerManager(TriggerRunnerManager triggerRunnerManager) {
+		ExecuteFlowAction.triggerRunnerManager = triggerRunnerManager;
+	}
 
 	public static ProjectManager getProjectManager() {
 		return projectManager;
@@ -100,59 +131,55 @@ public class ExecuteFlowAction implements TriggerAction {
 	@SuppressWarnings("unchecked")
 	@Override
 	public TriggerAction fromJson(Object obj) {
-		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
-		String type = (String) jsonObj.get("type");
-		if(! type.equals(ExecuteFlowAction.type)) {
-			throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
-		}
-		int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
-		String projectName = (String) jsonObj.get("projectName");
-		String flowName = (String) jsonObj.get("flowName");
-		String submitUser = (String) jsonObj.get("submitUser");
-		ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
-		return new ExecuteFlowAction(projectId, projectName, flowName, submitUser, executionOptions);
+		return createFromJson((HashMap<String, Object>) obj);
 	}
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public static TriggerAction createFromJson(HashMap obj) {
-		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
-		String type = (String) jsonObj.get("type");
-		if(! type.equals(ExecuteFlowAction.type)) {
-			throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
-		}
-		int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
-		String projectName = (String) jsonObj.get("projectName");
-		String flowName = (String) jsonObj.get("flowName");
-		String submitUser = (String) jsonObj.get("submitUser");
-		ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
-		return new ExecuteFlowAction(projectId, projectName, flowName, submitUser, executionOptions);
-	}
-	
 	@SuppressWarnings("unchecked")
-	public static TriggerAction createFromJson(Object obj) {
+	public static TriggerAction createFromJson(HashMap<String, Object> obj) {
 		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
-		String type = (String) jsonObj.get("type");
-		if(! type.equals(ExecuteFlowAction.type)) {
-			throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
+		String objType = (String) jsonObj.get("type");
+		if(! objType.equals(type)) {
+			throw new RuntimeException("Cannot create action of " + type + " from " + objType);
 		}
+		String actionId = (String) jsonObj.get("actionId");
 		int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
 		String projectName = (String) jsonObj.get("projectName");
 		String flowName = (String) jsonObj.get("flowName");
 		String submitUser = (String) jsonObj.get("submitUser");
-		ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
-		return new ExecuteFlowAction(projectId, projectName, flowName, submitUser, executionOptions);
+		ExecutionOptions executionOptions = null;
+		if(jsonObj.containsKey("executionOptions")) {
+			executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
+		}
+		List<SlaOption> slaOptions = null;
+		if(jsonObj.containsKey("slaOptions")) {
+			slaOptions = new ArrayList<SlaOption>();
+			List<Object> slaOptionsObj = (List<Object>) jsonObj.get("slaOptions");
+			for(Object slaObj : slaOptionsObj) {
+				slaOptions.add(SlaOption.fromObject(slaObj));
+			}
+		}
+		return new ExecuteFlowAction(actionId, projectId, projectName, flowName, submitUser, executionOptions, slaOptions);
 	}
 	
 	@Override
 	public Object toJson() {
 		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("actionId", actionId);
 		jsonObj.put("type", type);
 		jsonObj.put("projectId", String.valueOf(projectId));
 		jsonObj.put("projectName", projectName);
 		jsonObj.put("flowName", flowName);
 		jsonObj.put("submitUser", submitUser);
-		jsonObj.put("executionOptions", executionOptions.toObject());
-
+		if(executionOptions != null) {
+			jsonObj.put("executionOptions", executionOptions.toObject());
+		}
+		if(slaOptions != null) {
+			List<Object> slaOptionsObj = new ArrayList<Object>();
+			for(SlaOption sla : slaOptions) {
+				slaOptionsObj.add(sla.toObject());
+			}
+			jsonObj.put("slaOptions", slaOptionsObj);
+		}
 		return jsonObj;
 	}
 
@@ -188,11 +215,45 @@ public class ExecuteFlowAction implements TriggerAction {
 		
 		try{
 			executorManager.submitExecutableFlow(exflow);
+			Map<String, Object> outputProps = new HashMap<String, Object>();
+			outputProps.put(EXEC_ID, exflow.getExecutionId());
+			context.put(actionId, outputProps);
 			logger.info("Invoked flow " + project.getName() + "." + flowName);
 		} catch (ExecutorManagerException e) {
 			throw new RuntimeException(e);
 		}
 		
+		// deal with sla
+		if(slaOptions != null && slaOptions.size() > 0) {
+			int execId = exflow.getExecutionId();
+			for(SlaOption sla : slaOptions) {
+				logger.info("Adding sla trigger " + sla.toString());
+				SlaChecker slaFailChecker = new SlaChecker("slaFailChecker", sla, execId, false);
+				Map<String, ConditionChecker> failCheckers = new HashMap<String, ConditionChecker>();
+				failCheckers.put(slaFailChecker.getId(), slaFailChecker);
+				Condition triggerCond = new Condition(failCheckers, slaFailChecker.getId() + ".eval()");
+				SlaChecker slaPassChecker = new SlaChecker("slaPassChecker", sla, execId, true);
+				Map<String, ConditionChecker> passCheckers = new HashMap<String, ConditionChecker>();
+				passCheckers.put(slaPassChecker.getId(), slaPassChecker);
+				Condition expireCond = new Condition(passCheckers, slaPassChecker.getId() + ".eval()");
+				List<TriggerAction> actions = new ArrayList<TriggerAction>();
+				List<String> slaActions = sla.getActions();
+				for(String act : slaActions) {
+					if(act.equals(SlaOption.ACTION_ALERT)) {
+						SlaAlertAction slaAlert = new SlaAlertAction("slaAlert", sla, execId);
+						actions.add(slaAlert);
+					} else if(act.equals(SlaOption.ACTION_CANCEL_FLOW)) {
+						KillExecutionAction killAct = new KillExecutionAction("killExecution", execId);
+						actions.add(killAct);
+					}
+				}
+				Trigger slaTrigger = new Trigger("azkaban_sla", "azkaban", triggerCond, expireCond, actions);
+				slaTrigger.setResetOnTrigger(false);
+				slaTrigger.setResetOnExpire(false);
+				triggerRunnerManager.insertTrigger(slaTrigger);
+			}
+		}
+		
 	}
 
 	@Override
@@ -201,5 +262,15 @@ public class ExecuteFlowAction implements TriggerAction {
 				" from project " + getProjectName();
 	}
 
+	@Override
+	public void setContext(Map<String, Object> context) {
+		this.context = context;
+	}
+
+	@Override
+	public String getId() {
+		return actionId;
+	}
+
 
 }
diff --git a/src/java/azkaban/trigger/builtin/KillExecutionAction.java b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
new file mode 100644
index 0000000..1da405d
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
@@ -0,0 +1,93 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManager;
+import azkaban.trigger.TriggerAction;
+
+public class KillExecutionAction implements TriggerAction{
+
+	public static final String type = "KillExecutionAction";
+	
+	private static final Logger logger = Logger.getLogger(KillExecutionAction.class);
+	
+	private String actionId;
+	private int execId;
+	private static ExecutorManager executorManager;
+	
+	public KillExecutionAction(String actionId, int execId) {
+		this.execId = execId;
+		this.actionId = actionId;
+	}
+	
+	public static void setExecutorManager(ExecutorManager em) {
+		executorManager = em;
+	}
+	
+	@Override
+	public String getId() {
+		return actionId;
+	}
+
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static KillExecutionAction createFromJson(Object obj) {
+		return createFromJson((HashMap<String, Object>)obj);
+	}
+	
+	public static KillExecutionAction createFromJson(HashMap<String, Object> obj) {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		String objType = (String) jsonObj.get("type");
+		if(! objType.equals(type)) {
+			throw new RuntimeException("Cannot create action of " + type + " from " + objType);
+		}
+		String actionId = (String) jsonObj.get("actionId");
+		int execId = Integer.valueOf((String) jsonObj.get("execId"));
+		return new KillExecutionAction(actionId, execId);
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public KillExecutionAction fromJson(Object obj) throws Exception {
+		return createFromJson((HashMap<String, Object>)obj);
+	}
+
+	@Override
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("actionId", actionId);
+		jsonObj.put("type", type);
+		jsonObj.put("execId", String.valueOf(execId));
+		return jsonObj;
+	}
+
+	@Override
+	public void doAction() throws Exception {
+		ExecutableFlow exFlow = executorManager.getExecutableFlow(execId);
+		logger.info("ready to kill execution " + execId);
+		if(!executorManager.isFinished(exFlow)) {
+			logger.info("Killing execution " + execId);
+			executorManager.cancelFlow(exFlow, "azkaban_sla");
+		}
+	}
+
+	@Override
+	public void setContext(Map<String, Object> context) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public String getDescription() {
+		return type + " for " + execId;
+	}
+
+}
diff --git a/src/java/azkaban/trigger/builtin/SendEmailAction.java b/src/java/azkaban/trigger/builtin/SendEmailAction.java
new file mode 100644
index 0000000..ec946c2
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/SendEmailAction.java
@@ -0,0 +1,99 @@
+package azkaban.trigger.builtin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.sla.SlaOption;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.utils.AbstractMailer;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.Props;
+
+public class SendEmailAction implements TriggerAction {
+
+	private static final Logger logger = Logger.getLogger(SendEmailAction.class);
+	
+	private String actionId;
+	private Map<String, Object> context;
+	private static AbstractMailer mailer;
+	private String message;
+	public static final String type = "SendEmailAction";
+	private String mimetype = "text/html";
+	private List<String> emailList;
+	private String subject;
+	
+	public static void init(Props props) {
+		mailer = new AbstractMailer(props);
+	}
+	
+	public SendEmailAction(String actionId, String subject, String message, List<String> emailList) {
+		this.actionId = actionId;
+		this.message = message;
+		this.subject = subject;
+		this.emailList = emailList;
+	}
+	
+	@Override
+	public String getId() {
+		return actionId;
+	}
+
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static SendEmailAction createFromJson(Object obj) throws Exception {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		if(!jsonObj.get("type").equals(type)) {
+			throw new Exception("Cannot create action of " + type + " from " + jsonObj.get("type"));
+		}
+		String actionId = (String) jsonObj.get("actionId");
+		String subject = (String) jsonObj.get("subject");
+		String message = (String) jsonObj.get("message");
+		List<String> emailList = (List<String>) jsonObj.get("emailList");
+		return new SendEmailAction(actionId, subject, message, emailList);
+	}
+	
+	@Override
+	public TriggerAction fromJson(Object obj) throws Exception {
+		return createFromJson(obj);
+	}
+
+	@Override
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("actionId", actionId);
+		jsonObj.put("type", type);
+		jsonObj.put("subject", subject);
+		jsonObj.put("message", message);
+		jsonObj.put("emailList", emailList);
+
+		return jsonObj;
+	}
+
+	@Override
+	public void doAction() throws Exception {
+		EmailMessage email = mailer.prepareEmailMessage(subject, mimetype, emailList);
+		email.setBody(message);
+		email.sendEmail();
+	}
+
+	@Override
+	public void setContext(Map<String, Object> context) {
+		
+	}
+
+	@Override
+	public String getDescription() {
+		return type;
+	}
+
+	
+}
diff --git a/src/java/azkaban/trigger/builtin/SlaAlertAction.java b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
new file mode 100644
index 0000000..ead2124
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -0,0 +1,170 @@
+package azkaban.trigger.builtin;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorMailer;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManager.Alerter;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.sla.SlaOption;
+import azkaban.trigger.TriggerAction;
+import azkaban.utils.FileIOUtils;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+
+public class SlaAlertAction implements TriggerAction{
+
+	public static final String type = "AlertAction";
+	
+	private static final Logger logger = Logger.getLogger(SlaAlertAction.class);
+	
+	private String actionId;
+	private SlaOption slaOption;
+	private int execId;
+//	private List<Map<String, Object>> alerts;
+	private static Map<String, Alerter> alerters;
+	private Map<String, Object> context;
+	private static ExecutorManager executorManager;
+
+	public SlaAlertAction(String id, SlaOption slaOption, int execId) {
+		this.actionId = id;
+		this.slaOption = slaOption;
+		this.execId = execId;
+//		this.alerts = alerts;
+	}
+	
+	public static void setAlerters(Map<String, Alerter> alts) {
+		alerters = alts;
+	}
+	
+	public static void setExecutorManager(ExecutorManager em) {
+		executorManager = em;
+	}
+	
+	@Override
+	public String getId() {
+		return actionId;
+	}
+
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static SlaAlertAction createFromJson(Object obj) throws Exception {
+		return createFromJson((HashMap<String, Object>) obj);
+	}
+	
+	public static SlaAlertAction createFromJson(HashMap<String, Object> obj) throws Exception {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		if(!jsonObj.get("type").equals(type)) {
+			throw new Exception("Cannot create action of " + type + " from " + jsonObj.get("type"));
+		}
+		String actionId = (String) jsonObj.get("actionId");
+		SlaOption slaOption = SlaOption.fromObject(jsonObj.get("slaOption"));
+		int execId = Integer.valueOf((String) jsonObj.get("execId"));
+//		List<Map<String, Object>> alerts = (List<Map<String, Object>>) jsonObj.get("alerts");
+		return new SlaAlertAction(actionId, slaOption, execId);
+	}
+	
+	@Override
+	public TriggerAction fromJson(Object obj) throws Exception {
+		return createFromJson(obj);
+	}
+
+	@Override
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("actionId", actionId);
+		jsonObj.put("type", type);
+		jsonObj.put("slaOption", slaOption.toObject());
+		jsonObj.put("execId", String.valueOf(execId));
+//		jsonObj.put("alerts", alerts);
+
+		return jsonObj;
+	}
+
+	@Override
+	public void doAction() throws Exception {
+//		for(Map<String, Object> alert : alerts) {
+		logger.info("Alerting on sla failure.");
+		Map<String, Object> alert = slaOption.getInfo();
+			if(alert.containsKey(SlaOption.ALERT_TYPE)) {
+				String alertType = (String) alert.get(SlaOption.ALERT_TYPE);
+				Alerter alerter = alerters.get(alertType);
+				if(alerter != null) {
+					try {
+						alerter.alertOnSla(slaOption, createSlaMessage());
+					} catch (Exception e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+						logger.error("Failed to alert by " + alertType);
+					}
+				}
+				else {
+					logger.error("Alerter type " + alertType + " doesn't exist. Failed to alert.");
+				}
+			}
+//		}
+	}
+
+	private String createSlaMessage() {
+		ExecutableFlow flow = null;
+		try {
+			flow = executorManager.getExecutableFlow(execId);
+		} catch (ExecutorManagerException e) {
+			e.printStackTrace();
+			logger.error("Failed to get executable flow.");
+		}
+		String type = slaOption.getType();
+		if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+			String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+			String basicinfo =  "SLA Alert: Your flow " + flowName + " failed to FINISH within " + duration + "</br>";
+			String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " is expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>"; 
+			String actual = "Actual flow status is " + flow.getStatus();
+			return basicinfo + expected + actual;
+		} else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+			String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+			String basicinfo =  "SLA Alert: Your flow " + flowName + " failed to SUCCEED within " + duration + "</br>";
+			String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>"; 
+			String actual = "Actual flow status is " + flow.getStatus();
+			return basicinfo + expected + actual;
+		} else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
+			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+			return "SLA Alert: Your job " + jobName + " failed to FINISH within " + duration + " in execution " + execId;
+		} else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+			return "SLA Alert: Your job " + jobName + " failed to SUCCEED within " + duration + " in execution " + execId;
+		} else {
+			return "Unrecognized SLA type " + type;
+		}
+	}
+
+	@Override
+	public void setContext(Map<String, Object> context) {
+		this.context = context;
+	}
+
+	@Override
+	public String getDescription() {
+		return type + " with " + slaOption.toString();
+	}
+
+}
diff --git a/src/java/azkaban/trigger/builtin/SlaChecker.java b/src/java/azkaban/trigger/builtin/SlaChecker.java
new file mode 100644
index 0000000..e9826eb
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/SlaChecker.java
@@ -0,0 +1,229 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.sla.SlaOption;
+import azkaban.trigger.ConditionChecker;
+import azkaban.utils.Utils;
+
+public class SlaChecker implements ConditionChecker{
+
+	private static final Logger logger = Logger.getLogger(SlaChecker.class);
+	public static final String type = "SlaChecker";
+	
+	private String id;
+	private SlaOption slaOption;
+	private int execId;
+	private Map<String, Object> context;
+	private boolean passChecker = true;
+	
+	private static ExecutorManager executorManager;
+	
+	public SlaChecker(String id, SlaOption slaOption, int execId, boolean passChecker) {
+		this.id = id;
+		this.slaOption = slaOption;
+		this.execId = execId;
+		this.passChecker = passChecker;
+	}
+	
+	public SlaChecker(String id, SlaOption sla, String executionActionId, boolean passChecker) {
+		Map<String, Object> executeActionProps = (Map<String, Object>) context.get(executionActionId);
+		int execId = Integer.valueOf((String) executeActionProps.get(ExecuteFlowAction.EXEC_ID));
+		this.id = id;
+		this.slaOption = sla;
+		this.execId = execId;
+		this.passChecker = passChecker;
+	}
+
+	public static void setExecutorManager(ExecutorManager em) {
+		executorManager = em;
+	}
+	
+	private Boolean metSla(ExecutableFlow flow) {
+		String type = slaOption.getType();
+		logger.info("Checking for " + flow.getExecutionId() + " with sla " + type);
+		logger.info("flow is " + flow.getStatus());
+		if(flow.getStartTime() < 0) {
+			return null;
+		}
+		if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+			ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+			DateTime startTime = new DateTime(flow.getStartTime());
+			DateTime checkTime = startTime.plus(duration);
+			if(checkTime.isBeforeNow()) {
+				Status status = flow.getStatus();
+				if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
+					return Boolean.TRUE;
+				} else {
+					return Boolean.FALSE;
+				}
+			}
+		} else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+			ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+			DateTime startTime = new DateTime(flow.getStartTime());
+			DateTime checkTime = startTime.plus(duration);
+			if(checkTime.isBeforeNow()) {
+				Status status = flow.getStatus();
+				if(status.equals(Status.SUCCEEDED)) {
+					return Boolean.TRUE;
+				} else {
+					return Boolean.FALSE;
+				}
+			}
+		} else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
+			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME); 
+			ExecutableNode node = flow.getExecutableNode(jobName);
+			if(node.getStartTime() > 0) {
+				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+				DateTime startTime = new DateTime(node.getStartTime());
+				DateTime checkTime = startTime.plus(duration);
+				if(checkTime.isBeforeNow()) {
+					Status status = node.getStatus();
+					if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
+						return Boolean.TRUE;
+					} else {
+						return Boolean.FALSE;
+					}
+				}
+			}
+		} else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME); 
+			ExecutableNode node = flow.getExecutableNode(jobName);
+			if(node.getStartTime() > 0) {
+				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+				DateTime startTime = new DateTime(node.getStartTime());
+				DateTime checkTime = startTime.plus(duration);
+				if(checkTime.isBeforeNow()) {
+					Status status = node.getStatus();
+					if(status.equals(Status.SUCCEEDED)) {
+						return Boolean.TRUE;
+					} else {
+						return Boolean.FALSE;
+					}
+				}
+			}
+		} 
+//		else if(type.equals(SlaOption.TYPE_JOB_PROGRESS)) {
+//			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME); 
+//			float targetProgress = Float.valueOf((String) slaOption.getInfo().get(SlaOption.INFO_PROGRESS_PERCENT));
+//			ExecutableNode node = flow.getExecutableNode(jobName);
+//			if(node.getStartTime() > 0) {
+//				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+//				DateTime startTime = new DateTime(node.getStartTime());
+//				DateTime checkTime = startTime.plus(duration);
+//				if(checkTime.isBeforeNow()) {
+//					if(node.getProgress() > targetProgress) {
+//						return Boolean.FALSE;
+//					} else {
+//						return Boolean.TRUE;
+//					}
+//				}
+//			} else {
+//				return Boolean.FALSE;
+//			}
+//		}
+		return null;
+	}
+	
+	// return true for should do sla actions
+	@Override
+	public Object eval() {
+		ExecutableFlow flow;
+		try {
+			flow = executorManager.getExecutableFlow(execId);
+		} catch (ExecutorManagerException e) {
+			logger.error("Can't get executable flow.", e);
+			e.printStackTrace();
+			return Boolean.TRUE;
+		}
+		Boolean metSla = metSla(flow);
+		if(metSla == null) {
+			return Boolean.FALSE;
+		} else {
+			if(passChecker) {
+				return metSla;
+			} else {
+				return !metSla;
+			}
+		}
+	}
+
+	@Override
+	public Object getNum() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void reset() {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public String getId() {
+		return id;
+	}
+
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	@Override
+	public ConditionChecker fromJson(Object obj) throws Exception {
+		return createFromJson(obj);
+	}
+
+	@SuppressWarnings("unchecked")
+	public static SlaChecker createFromJson(Object obj) throws Exception {
+		return createFromJson((HashMap<String, Object>)obj);
+	}
+	
+	@SuppressWarnings("unchecked")
+	public static SlaChecker createFromJson(HashMap<String, Object> obj) throws Exception {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		if(!jsonObj.get("type").equals(type)) {
+			throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+		}
+		String id = (String) jsonObj.get("id");
+		SlaOption slaOption = SlaOption.fromObject(jsonObj.get("slaOption"));
+		int execId = Integer.valueOf((String) jsonObj.get("execId"));
+		boolean passChecker = Boolean.valueOf((Boolean) jsonObj.get("passChecker"));
+		return new SlaChecker(id, slaOption, execId, passChecker);
+	}
+	
+	@Override
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("type", type);
+		jsonObj.put("id", id);
+		jsonObj.put("slaOption", slaOption.toObject());
+		jsonObj.put("execId", String.valueOf(execId));
+		jsonObj.put("passChecker", passChecker);
+	
+		return jsonObj;
+	}
+
+	@Override
+	public void stopChecker() {
+		
+	}
+
+	@Override
+	public void setContext(Map<String, Object> context) {
+		this.context = context;
+	}
+
+}
diff --git a/src/java/azkaban/trigger/builtin/WatchSlaAction.java b/src/java/azkaban/trigger/builtin/WatchSlaAction.java
new file mode 100644
index 0000000..d9a549b
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/WatchSlaAction.java
@@ -0,0 +1,206 @@
+package azkaban.trigger.builtin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.Status;
+import azkaban.flow.CommonJobProperties;
+import azkaban.sla.SlaOption;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.triggerapp.TriggerRunnerManager;
+import azkaban.utils.Utils;
+
+public class WatchSlaAction implements TriggerAction{
+
+	private static final Logger logger = Logger.getLogger(WatchSlaAction.class);
+	public static final String type = "SetSlaWatcherAction";
+	
+	private Map<String, Object> context;
+	private List<SlaOption> slaOptions;
+	private String actionId;
+	private String executionActionId;
+	private static ExecutorManager executorManager;
+	private static TriggerRunnerManager trigggerRunnerManager;
+	
+	public WatchSlaAction(String actionId, List<SlaOption> slaOptions, String executionActionId) {
+		this.actionId = actionId;
+		this.slaOptions = slaOptions;
+		this.executionActionId = executionActionId;
+	}
+	
+	public static void setExecutorManager(ExecutorManager em) {
+		executorManager = em;
+	}
+	
+	public static void setTriggerRunnerManager(TriggerRunnerManager trm) {
+		trigggerRunnerManager = trm;
+	}
+	
+	@Override
+	public String getId() {
+		return actionId;
+	}
+
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static WatchSlaAction createFromJson(Object obj) {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		String objType = (String) jsonObj.get("type");
+		if(! objType.equals(type)) {
+			throw new RuntimeException("Cannot create action of " + type + " from " + objType);
+		}
+		String actionId = (String) jsonObj.get("actionId");
+		String executionActionId = (String) jsonObj.get("executionActionId");
+		List<SlaOption> slaOptions = new ArrayList<SlaOption>();
+		List<Object> slaOptionsObj = (List<Object>) jsonObj.get("slaOptions");
+		for(Object slaObj : slaOptionsObj) {
+			slaOptions.add(SlaOption.fromObject(slaObj));
+		}
+		return new WatchSlaAction(actionId, slaOptions, executionActionId);
+
+	}
+	
+	@Override
+	public TriggerAction fromJson(Object obj) throws Exception {
+		return createFromJson(obj);
+	}
+
+	@Override
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("actionId", actionId);
+		jsonObj.put("type", type);
+		jsonObj.put("executionActionId", executionActionId);
+		List<Object> slaOptionsObj = new ArrayList<Object>();
+		for(SlaOption sla : slaOptions) {
+			slaOptionsObj.add(sla.toObject());
+		}
+		jsonObj.put("slaOptions", slaOptionsObj);
+
+		return jsonObj;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void doAction() throws Exception {
+		
+		if(executorManager == null) {
+			throw new Exception("ExecutorManager not initialized. Failed to set sla.");
+		}
+		if(trigggerRunnerManager == null) {
+			throw new Exception("TriggerRunnerManager not initialized. Failed to set sla.");
+		}
+		
+		if(!context.containsKey(executionActionId)) {
+			throw new Exception("No trace of the execution. Cannot set sla if the flow/job is not run.");
+		}
+		Map<String, Object> executionInfo = (Map<String, Object>) context.get(executionActionId);
+		if(!executionInfo.containsKey(ExecuteFlowAction.EXEC_ID)) {
+			throw new Exception("Execution Id not set. Failed to set sla.");
+		}
+		int execId = Integer.valueOf((String) executionInfo.get(ExecuteFlowAction.EXEC_ID));
+		ExecutableFlow exflow = executorManager.getExecutableFlow(execId);
+		
+		for(SlaOption sla : slaOptions) {
+			if(sla.getType().equals(SlaOption.TYPE_FLOW_FINISH)) {
+			// just do a time checker and see if the flow finish
+				ConditionChecker timer = createFlowSlaTimer(exflow, sla);
+				ExecutionChecker statusChecker = new ExecutionChecker("slaStatusChecker1", execId, ExecutionChecker.TARGET_FINISHED, null);
+				String failExpr = timer.getId() + ".eval() && !" + statusChecker.getId() + ".eval()";
+				Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+				checkers.put(timer.getId(), timer);
+				checkers.put(statusChecker.getId(), statusChecker);
+				Condition triggerCondition = new Condition(checkers, failExpr);
+				List<String> slaActions = sla.getActions();
+				List<TriggerAction> actions = new ArrayList<TriggerAction>();
+				// always send email
+				SendEmailAction emailAct = new SendEmailAction("sendSlaAlertEmail", getSlaEmailSubject(exflow), getSlaEmailMessage(exflow, sla), (List<String>) sla.getInfo().get("emailList"));
+				actions.add(emailAct);
+				for(String act : slaActions) {
+					if(act.equals(SlaOption.ACTION_CANCEL_FLOW)) {
+						KillExecutionAction killAct = new KillExecutionAction("slaKiller1", exflow.getExecutionId());
+						actions.add(killAct);
+					} 
+//					else if(act.equals(SlaOption.ACTION_ALERT_BY_EMAIL)) {
+//						SendEmailAction emailAct = new SendEmailAction("sendSlaAlertEmail", getSlaEmailSubject(exflow), getSlaEmailMessage(exflow, sla), (List<String>) sla.getInfo().get("emailList"));
+//						actions.add(emailAct);
+//					}
+				}
+				
+				Trigger t = new Trigger("azkaban", "triggerserver", triggerCondition, triggerCondition, actions);
+				t.setResetOnTrigger(false);
+				t.setResetOnExpire(false);
+				trigggerRunnerManager.insertTrigger(t);
+			} else if(sla.getType().equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+//				String jobName = (String) sla.getInfo().get("jobName");
+//				ExecutableNode exNode = exflow.getExecutableNode(jobName);
+//				ConditionChecker timer = createJobSlaTimer(exNode, sla);
+//				if(exNode.getStatus().equals(Status.RUNNING)) {
+//					
+//				}
+			} else {
+				logger.error("Unknown sla type.");
+			}
+		}
+	}
+	
+	private BasicTimeChecker createFlowSlaTimer(ExecutableFlow exflow, SlaOption sla) {
+		Map<String, Object> info = sla.getInfo();
+		ReadablePeriod duration = Utils.parsePeriodString((String) info.get(SlaOption.INFO_DURATION));
+		DateTime startTime = new DateTime(exflow.getSubmitTime());
+		DateTime checkTime = startTime.plus(duration);
+		BasicTimeChecker timeChecker = new BasicTimeChecker("slaTimer1", checkTime, checkTime.getZone(), false, false, null);
+		return timeChecker;
+	}
+	
+//	private BasicTimeChecker createJobSlaTimer(ExecutableNode exNode, SlaOption sla) {
+//		Map<String, Object> info = sla.getInfo();
+//		ReadablePeriod runtimelimit = Utils.parsePeriodString((String) info.get(SlaOption.INFO_RUNTIMELIMIT));
+//		DateTime startTime = new DateTime(exNode.getStartTime());
+//		DateTime checkTime = startTime.plus(runtimelimit);
+//		BasicTimeChecker timeChecker = new BasicTimeChecker("slaTimer1", checkTime, checkTime.getZone(), false, false, null);
+//		return timeChecker;
+//	} 
+
+	private String getSlaEmailSubject(ExecutableFlow flow) {
+		return "A preset SLA on flow " + flow.getFlowId() + " is not met in execution " + flow.getExecutionId();
+	}
+	
+	private String getSlaEmailMessage(ExecutableFlow flow, SlaOption slaOption) {
+		if(slaOption.getType().equals(SlaOption.TYPE_FLOW_FINISH)) {
+			return "<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has exceeded its expected finish time'" + "</h2>";
+		}
+		if(slaOption.getType().equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+			return "<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has exceeded its expected time to succeed'" + "</h2>";
+		}
+		return "Unrecognized SLA type.";
+	}
+	
+	@Override
+	public void setContext(Map<String, Object> context) {
+		this.context = context;
+	}
+
+	@Override
+	public String getDescription() {
+		return type;
+	}
+	
+}
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index 74f9ddf..45fc8ff 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -2,7 +2,6 @@ package azkaban.trigger;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -10,7 +9,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.log4j.Logger;
 
@@ -34,7 +32,7 @@ public class CheckerTypeLoader {
 		
 		// load built-in checkers
 		
-		loadDefaultCheckers();
+		loadBuiltinCheckers();
 		
 		loadPluginCheckers(props);
 
@@ -138,7 +136,14 @@ public class CheckerTypeLoader {
 		logger.info("Loaded checker type " + checkerName + " " + checkerClass);
 	}
 	
-	private void loadDefaultCheckers() {
+	public static void registerBuiltinCheckers(Map<String, Class<? extends ConditionChecker>> builtinCheckers) {
+		checkerToClass.putAll(checkerToClass);
+		for(String type : builtinCheckers.keySet()) {
+			logger.info("Loaded " + type + " checker.");
+		}
+	}
+	
+	private void loadBuiltinCheckers() {
 		checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);
 		logger.info("Loaded BasicTimeChecker type.");
 	}
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index fcc1fa0..451285b 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -106,9 +106,9 @@ public class Condition {
 				ConditionChecker ck = checkerLoader.createCheckerFromJson(type, oneChecker.get("checkerJson"));
 				checkers.put(ck.getId(), ck);
 			}
-				String expr = (String) jsonObj.get("expression");
+			String expr = (String) jsonObj.get("expression");
 				
-				cond = new Condition(checkers, expr);
+			cond = new Condition(checkers, expr);
 			
 		} catch(Exception e) {
 			e.printStackTrace();
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 920eddf..85b4003 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -1,5 +1,7 @@
 package azkaban.trigger;
 
+import java.util.Map;
+
 
 public interface ConditionChecker {
 	
@@ -19,4 +21,6 @@ public interface ConditionChecker {
 
 	void stopChecker();
 	
+	void setContext(Map<String, Object> context);
+	
 }
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
index 37f8131..658594b 100644
--- a/src/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -7,7 +7,6 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
@@ -142,6 +141,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 	@Override
 	public void addTrigger(Trigger t) throws TriggerManagerException {
 		logger.info("Inserting trigger " + t.toString() + " into db.");
+		t.setLastModifyTime(DateTime.now());
 		Connection connection = getConnection();
 		try {
 			addTrigger(connection, t, defaultEncodingType);
@@ -181,6 +181,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 	@Override
 	public void updateTrigger(Trigger t) throws TriggerManagerException {
 		logger.info("Updating trigger " + t.toString() + " into db.");
+		t.setLastModifyTime(DateTime.now());
 		Connection connection = getConnection();
 		try{
 			t.setLastModifyTime(DateTime.now());
@@ -224,6 +225,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 					t.getTriggerId());
 			if (updates == 0) {
 				throw new TriggerManagerException("No trigger has been updated.");
+				//logger.error("No trigger is updated!");
 			}
 		} catch (SQLException e) {
 			logger.error(UPDATE_TRIGGER + " failed.");
@@ -258,8 +260,8 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 			ArrayList<Trigger> triggers = new ArrayList<Trigger>();
 			do {
 				int triggerId = rs.getInt(1);
-				String triggerSource = rs.getString(2);
-				long modifyTime = rs.getLong(3);
+//				String triggerSource = rs.getString(2);
+//				long modifyTime = rs.getLong(3);
 				int encodingType = rs.getInt(4);
 				byte[] data = rs.getBytes(5);
 				
@@ -331,8 +333,8 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
 		}
 		
 		if(triggers.size() == 0) {
-			logger.error("Failed to load trigger " + triggerId);
-			throw new TriggerManagerException("Failed to load trigger " + triggerId);
+			logger.error("Loaded 0 triggers. Failed to load trigger " + triggerId);
+			throw new TriggerManagerException("Loaded 0 triggers. Failed to load trigger " + triggerId);
 		}
 		
 		return triggers.get(0);
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index 962cf7a..d668b5e 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -25,6 +25,7 @@ public class Trigger {
 	private List<TriggerAction> expireActions;
 	
 	private Map<String, Object> info = new HashMap<String, Object>();
+	private Map<String, Object> context = new HashMap<String, Object>();
 	
 	private static ActionTypeLoader actionTypeLoader;
 	
@@ -75,7 +76,15 @@ public class Trigger {
 	public void setInfo(Map<String, Object> info) {
 		this.info = info;
 	}
+	
+	public Map<String, Object> getContext() {
+		return context;
+	}
 
+	public void setContext(Map<String, Object> context) {
+		this.context = context;
+	}
+	
 	public Trigger(
 			DateTime lastModifyTime, 
 			DateTime submitTime, 
@@ -85,7 +94,8 @@ public class Trigger {
 			Condition expireCondition,
 			List<TriggerAction> actions, 
 			List<TriggerAction> expireActions,
-			Map<String, Object> info) {
+			Map<String, Object> info,
+			Map<String, Object> context) {
 		this.lastModifyTime = lastModifyTime;
 		this.submitTime = submitTime;
 		this.submitUser = submitUser;
@@ -95,6 +105,7 @@ public class Trigger {
 		this.actions = actions;
 		this.expireActions = expireActions;
 		this.info = info;
+		this.context = context;
 	}
 	
 	public Trigger(
@@ -117,6 +128,39 @@ public class Trigger {
 	}
 	
 	public Trigger(
+			String submitUser, 
+			String source,
+			Condition triggerCondition,
+			Condition expireCondition,
+			List<TriggerAction> actions, 
+			List<TriggerAction> expireActions) {
+		this.lastModifyTime = DateTime.now();
+		this.submitTime = DateTime.now();
+		this.submitUser = submitUser;
+		this.source = source;
+		this.triggerCondition = triggerCondition;
+		this.expireCondition = expireCondition;
+		this.actions = actions;
+		this.expireActions = expireActions;
+	}
+	
+	public Trigger(
+			String submitUser, 
+			String source,
+			Condition triggerCondition,
+			Condition expireCondition,
+			List<TriggerAction> actions) {
+		this.lastModifyTime = DateTime.now();
+		this.submitTime = DateTime.now();
+		this.submitUser = submitUser;
+		this.source = source;
+		this.triggerCondition = triggerCondition;
+		this.expireCondition = expireCondition;
+		this.actions = actions;
+		this.expireActions = new ArrayList<TriggerAction>();
+	}
+	
+	public Trigger(
 			DateTime lastModifyTime, 
 			DateTime submitTime, 
 			String submitUser, 
@@ -144,7 +188,8 @@ public class Trigger {
 			Condition expireCondition,
 			List<TriggerAction> actions,
 			List<TriggerAction> expireActions,
-			Map<String, Object> info) {
+			Map<String, Object> info,
+			Map<String, Object> context) {
 		this.triggerId = triggerId;
 		this.lastModifyTime = lastModifyTime;
 		this.submitTime = submitTime;
@@ -155,6 +200,7 @@ public class Trigger {
 		this.actions = actions;
 		this.expireActions = expireActions;
 		this.info = info;
+		this.context = context;
 	}
 	
 	public Trigger(
@@ -178,6 +224,26 @@ public class Trigger {
 		this.expireActions = expireActions;
 	}
 	
+	public Trigger(
+			int triggerId,
+			DateTime lastModifyTime, 
+			DateTime submitTime, 
+			String submitUser, 
+			String source,
+			Condition triggerCondition,
+			Condition expireCondition,
+			List<TriggerAction> actions) {
+		this.triggerId = triggerId;
+		this.lastModifyTime = lastModifyTime;
+		this.submitTime = submitTime;
+		this.submitUser = submitUser;
+		this.source = source;
+		this.triggerCondition = triggerCondition;
+		this.expireCondition = expireCondition;
+		this.actions = actions;
+		this.expireActions = new ArrayList<TriggerAction>();
+	}
+	
 	public static synchronized void setActionTypeLoader(ActionTypeLoader loader) {
 		Trigger.actionTypeLoader = loader;
 	}
@@ -268,6 +334,7 @@ public class Trigger {
 		jsonObj.put("triggerId", String.valueOf(triggerId));
 		jsonObj.put("status", status.toString());
 		jsonObj.put("info", info);
+		jsonObj.put("context", context);
 		return jsonObj;
 	}
 	
@@ -316,7 +383,21 @@ public class Trigger {
 			int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
 			TriggerStatus status = TriggerStatus.valueOf((String)jsonObj.get("status"));
 			Map<String, Object> info = (Map<String, Object>) jsonObj.get("info");
-			trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions, expireActions, info);
+			Map<String, Object> context = (Map<String, Object>) jsonObj.get("context");
+			for(ConditionChecker checker : triggerCond.getCheckers().values()) {
+				checker.setContext(context);
+			}
+			for(ConditionChecker checker : expireCond.getCheckers().values()) {
+				checker.setContext(context);
+			}
+			for(TriggerAction action : actions) {
+				action.setContext(context);
+			}
+			for(TriggerAction action : expireActions) {
+				action.setContext(context);
+			}
+			
+			trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions, expireActions, info, context);
 			trigger.setResetOnExpire(resetOnExpire);
 			trigger.setResetOnTrigger(resetOnTrigger);
 			trigger.setStatus(status);
diff --git a/src/java/azkaban/trigger/TriggerAction.java b/src/java/azkaban/trigger/TriggerAction.java
index 85b6ad8..b186b7b 100644
--- a/src/java/azkaban/trigger/TriggerAction.java
+++ b/src/java/azkaban/trigger/TriggerAction.java
@@ -1,7 +1,11 @@
 package azkaban.trigger;
 
+import java.util.Map;
+
 public interface TriggerAction {
 	
+	String getId();
+	
 	String getType();
 	
 	TriggerAction fromJson(Object obj) throws Exception;
@@ -9,6 +13,8 @@ public interface TriggerAction {
 	Object toJson();
 	
 	void doAction() throws Exception;
+	
+	void setContext(Map<String, Object> context);
 
 	String getDescription();
 	
diff --git a/src/java/azkaban/trigger/TriggerLoader.java b/src/java/azkaban/trigger/TriggerLoader.java
index c3e604b..7adf742 100644
--- a/src/java/azkaban/trigger/TriggerLoader.java
+++ b/src/java/azkaban/trigger/TriggerLoader.java
@@ -1,9 +1,6 @@
 package azkaban.trigger;
 
 import java.util.List;
-import java.util.Map;
-
-
 
 public interface TriggerLoader {
 
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 460a75f..0b21166 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -37,7 +37,6 @@ import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
 
 import azkaban.triggerapp.TriggerConnectorParams;
 import azkaban.utils.JSONUtils;
@@ -51,7 +50,7 @@ import azkaban.utils.Props;
 public class TriggerManager {
 	private static Logger logger = Logger.getLogger(TriggerManager.class);
 
-	private static final String TRIGGER_SUFFIX = ".trigger";
+	public static final String TRIGGER_SUFFIX = ".trigger";
 	
 	private TriggerLoader triggerLoader;
 	private CheckerTypeLoader checkerTypeLoader;
@@ -148,7 +147,11 @@ public class TriggerManager {
 	private void loadTriggers() throws TriggerManagerException {
 		List<Trigger> triggerList = triggerLoader.loadTriggers();
 		for(Trigger t : triggerList) {
-			triggerIdMap.put(t.getTriggerId(), t);
+			if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+				removeTrigger(t, "azkaban");
+			} else {
+				triggerIdMap.put(t.getTriggerId(), t);
+			}
 		}
 	}
 	
@@ -158,14 +161,16 @@ public class TriggerManager {
 	
 	public void removeTrigger(Trigger t, String userId) throws TriggerManagerException {
 		synchronized(t) {
+			logger.info("Removing trigger " + t.getTriggerId() + " by " + userId);
 			callTriggerServer(t, TriggerConnectorParams.REMOVE_TRIGGER_ACTION, userId);
+			triggerIdMap.remove(t.getTriggerId());
 		}
 	}
-	
 
 	public void updateTrigger(Trigger t, String userId) throws TriggerManagerException {
 		synchronized(t) {
 			try {
+				triggerLoader.updateTrigger(t);
 				callTriggerServer(t, TriggerConnectorParams.UPDATE_TRIGGER_ACTION, userId);
 			} catch(TriggerManagerException e) {
 				throw new TriggerManagerException(e);
@@ -186,6 +191,7 @@ public class TriggerManager {
 			String message = null;
 			logger.info("Inserting trigger into system. " );
 			// The trigger id is set by the loader. So it's unavailable until after this call.
+			t.setStatus(TriggerStatus.PREPARING);
 			triggerLoader.addTrigger(t);
 			try {
 				callTriggerServer(t,  TriggerConnectorParams.INSERT_TRIGGER_ACTION, userId);
@@ -202,8 +208,7 @@ public class TriggerManager {
 	
 	private Map<String, Object> callTriggerServer(Trigger t, String action, String user) throws TriggerManagerException {
 		try {
-			Map<String, Object> info = t.getInfo();
-			return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), null, (Pair<String,String>[])null);
+			return callTriggerServer(triggerServerHost, triggerServerPort, action, t.getTriggerId(), user, (Pair<String,String>[])null);
 		} catch (IOException e) {
 			throw new TriggerManagerException(e);
 		}
@@ -338,15 +343,22 @@ public class TriggerManager {
 					try{
 						results = callTriggerServer(triggerServer.getFirst(), triggerServer.getSecond(), TriggerConnectorParams.GET_UPDATE_ACTION, null, "azkaban", updateTimeParam);
 //						lastUpdateTime = (Long) results.get(TriggerConnectorParams.RESPONSE_UPDATETIME);
+
 						List<Integer> updates = (List<Integer>) results.get("updates");
 						for(Integer update : updates) {
 							Trigger t = triggerLoader.loadTrigger(update);
 							lastUpdateTime = Math.max(lastUpdateTime, t.getLastModifyTime().getMillis());
-							triggerIdMap.put(update, t);
+							
+							if(t.getStatus().equals(TriggerStatus.EXPIRED) && t.getSource().equals("azkaban")) {
+								removeTrigger(t, "azkaban");
+								//triggerIdMap.remove(update);
+							} else {
+								triggerIdMap.put(update, t);
+							}
 						}
 					} catch (Exception e) {
-						logger.error(e);
-						
+						e.printStackTrace();
+						logger.error(e);	
 					}
 					
 					synchronized(this) {
@@ -493,6 +505,29 @@ public class TriggerManager {
 		removeTrigger(triggerIdMap.get(scheduleId), submitUser);
 	}
 
+	public Set<String> getAllActiveTriggerServerHosts() {
+		Set<String> hostport = new HashSet<String>();
+		hostport.add(triggerServerHost+":"+triggerServerPort);
+		return hostport;
+	}
+
+	public int getNumTriggers() {
+		return triggerIdMap.size();
+	}
+
+	public String getTriggerSources() {
+		Set<String> sources = new HashSet<String>();
+		for(Trigger t : triggerIdMap.values()) {
+			sources.add(t.getSource());
+		}
+		return sources.toString();
+	}
+
+	public String getTriggerIds() {
+		return triggerIdMap.keySet().toString();
+	}
+
+	
 	
 }
 
diff --git a/src/java/azkaban/triggerapp/AzkabanTriggerServer.java b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
index c84198f..5ed1c41 100644
--- a/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
+++ b/src/java/azkaban/triggerapp/AzkabanTriggerServer.java
@@ -4,11 +4,15 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.lang.reflect.Constructor;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.TimeZone;
 
 import javax.management.MBeanInfo;
@@ -18,14 +22,15 @@ import javax.management.ObjectName;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTimeZone;
 import org.mortbay.jetty.Server;
-import org.mortbay.jetty.bio.SocketConnector;
-import org.mortbay.jetty.security.SslSocketConnector;
 import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
+import azkaban.executor.ExecutorMailer;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
+import azkaban.executor.ExecutorManager.Alerter;
+import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxTriggerRunnerManager;
 import azkaban.project.JdbcProjectLoader;
@@ -36,8 +41,11 @@ import azkaban.trigger.JdbcTriggerLoader;
 import azkaban.trigger.TriggerLoader;
 import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.trigger.builtin.CreateTriggerAction;
-import azkaban.trigger.builtin.ExecutableFlowStatusChecker;
+import azkaban.trigger.builtin.ExecutionChecker;
 import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.trigger.builtin.KillExecutionAction;
+import azkaban.trigger.builtin.SlaAlertAction;
+import azkaban.trigger.builtin.SlaChecker;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
@@ -142,26 +150,42 @@ public class AzkabanTriggerServer {
 		logger.info("Loading project manager");
 		JdbcProjectLoader loader = new JdbcProjectLoader(props);
 		ProjectManager manager = new ProjectManager(loader, props);
-		
 		return manager;
 	}
 	
 	private void loadBuiltinCheckersAndActions(AzkabanTriggerServer app) {
 		logger.info("Loading built-in checker and action types");
-		ExecutorManager executorManager = app.getExecutorManager();
-		TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
+//		ExecutorManager executorManager = app.getExecutorManager();
+//		TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
 		CheckerTypeLoader checkerLoader = triggerRunnerManager.getCheckerLoader();
 		ActionTypeLoader actionLoader = triggerRunnerManager.getActionLoader();
 		// time:
 		checkerLoader.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
-		ExecutableFlowStatusChecker.setExecutorManager(executorManager);
-		checkerLoader.registerCheckerType(ExecutableFlowStatusChecker.type, ExecutableFlowStatusChecker.class);
+//		// execution checker
+//		ExecutionChecker.setExecutorManager(executorManager);
+//		checkerLoader.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
+		// Sla checker
+		SlaChecker.setExecutorManager(executorManager);
+		checkerLoader.registerCheckerType(SlaChecker.type, SlaChecker.class);
 		
+		// execut flow action
 		ExecuteFlowAction.setExecutorManager(executorManager);
 		ExecuteFlowAction.setProjectManager(projectManager);
+		ExecuteFlowAction.setTriggerRunnerManager(triggerRunnerManager);
 		actionLoader.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+		// kill flow action
+		KillExecutionAction.setExecutorManager(executorManager);
+		actionLoader.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
+		// sla alert
+		SlaAlertAction.setExecutorManager(executorManager);
+		Map<String, Alerter> alerters = loadAlerters(props);
+		SlaAlertAction.setAlerters(alerters);
+		SlaAlertAction.setExecutorManager(executorManager);
+		actionLoader.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
+		// create trigger action
 		CreateTriggerAction.setTriggerRunnerManager(triggerRunnerManager);
 		actionLoader.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+
 	}
 	
 	private void loadPluginCheckersAndActions(String pluginPath, AzkabanTriggerServer app) {
@@ -291,6 +315,143 @@ public class AzkabanTriggerServer {
 		}
 	}
 	
+	private Map<String, Alerter> loadAlerters(Props props) {
+		Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
+		// load built-in alerters
+		ExecutorMailer mailAlerter = new ExecutorMailer(props);
+		allAlerters.put("email", mailAlerter);
+		// load all plugin alerters
+		String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
+		allAlerters.putAll(loadPluginAlerters(pluginDir));
+		return allAlerters;
+	}
+
+	private Map<String, Alerter> loadPluginAlerters(String pluginPath) {
+		File alerterPluginPath = new File(pluginPath);
+		if (!alerterPluginPath.exists()) {
+			return Collections.<String, Alerter>emptyMap();
+		}
+			
+		Map<String, Alerter> installedAlerterPlugins = new HashMap<String, Alerter>();
+		ClassLoader parentLoader = SlaAlertAction.class.getClass().getClassLoader();
+		File[] pluginDirs = alerterPluginPath.listFiles();
+		ArrayList<String> jarPaths = new ArrayList<String>();
+		for (File pluginDir: pluginDirs) {
+			if (!pluginDir.isDirectory()) {
+				logger.error("The plugin path " + pluginDir + " is not a directory.");
+				continue;
+			}
+			
+			// Load the conf directory
+			File propertiesDir = new File(pluginDir, "conf");
+			Props pluginProps = null;
+			if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+				File propertiesFile = new File(propertiesDir, "plugin.properties");
+				File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+				
+				if (propertiesFile.exists()) {
+					if (propertiesOverrideFile.exists()) {
+						pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+					}
+					else {
+						pluginProps = PropsUtils.loadProps(null, propertiesFile);
+					}
+				}
+				else {
+					logger.error("Plugin conf file " + propertiesFile + " not found.");
+					continue;
+				}
+			}
+			else {
+				logger.error("Plugin conf path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			String pluginName = pluginProps.getString("alerter.name");
+			List<String> extLibClasspath = pluginProps.getStringList("alerter.external.classpaths", (List<String>)null);
+			
+			String pluginClass = pluginProps.getString("alerter.class");
+			if (pluginClass == null) {
+				logger.error("Alerter class is not set.");
+			}
+			else {
+				logger.info("Plugin class " + pluginClass);
+			}
+			
+			URLClassLoader urlClassLoader = null;
+			File libDir = new File(pluginDir, "lib");
+			if (libDir.exists() && libDir.isDirectory()) {
+				File[] files = libDir.listFiles();
+				
+				ArrayList<URL> urls = new ArrayList<URL>();
+				for (int i=0; i < files.length; ++i) {
+					try {
+						URL url = files[i].toURI().toURL();
+						urls.add(url);
+					} catch (MalformedURLException e) {
+						logger.error(e);
+					}
+				}
+				if (extLibClasspath != null) {
+					for (String extLib : extLibClasspath) {
+						try {
+							File file = new File(pluginDir, extLib);
+							URL url = file.toURI().toURL();
+							urls.add(url);
+						} catch (MalformedURLException e) {
+							logger.error(e);
+						}
+					}
+				}
+				
+				urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+			}
+			else {
+				logger.error("Library path " + propertiesDir + " not found.");
+				continue;
+			}
+			
+			Class<?> alerterClass = null;
+			try {
+				alerterClass = urlClassLoader.loadClass(pluginClass);
+			}
+			catch (ClassNotFoundException e) {
+				logger.error("Class " + pluginClass + " not found.");
+				continue;
+			}
+
+			String source = FileIOUtils.getSourcePathFromClass(alerterClass);
+			logger.info("Source jar " + source);
+			jarPaths.add("jar:file:" + source);
+			
+			Constructor<?> constructor = null;
+			try {
+				constructor = alerterClass.getConstructor(Props.class);
+			} catch (NoSuchMethodException e) {
+				logger.error("Constructor not found in " + pluginClass);
+				continue;
+			}
+			
+			Object obj = null;
+			try {
+				obj = constructor.newInstance(pluginProps);
+			} catch (Exception e) {
+				logger.error(e);
+			} 
+			
+			if (!(obj instanceof Alerter)) {
+				logger.error("The object is not an Alerter");
+				continue;
+			}
+			
+			Alerter plugin = (Alerter) obj;
+			installedAlerterPlugins.put(pluginName, plugin);
+		}
+		
+		return installedAlerterPlugins;
+		
+	}
+	
 	private TriggerLoader createTriggerLoader(Props props) {
 		return new JdbcTriggerLoader(props);
 	}
@@ -417,6 +578,7 @@ public class AzkabanTriggerServer {
 
 		registerMbean("triggerServerJetty", new JmxJettyServer(server));
 		registerMbean("triggerRunnerManager", new JmxTriggerRunnerManager(triggerRunnerManager));
+		registerMbean("executorManager", new JmxExecutorManager(executorManager));
 	}
 	
 	public void close() {
diff --git a/src/java/azkaban/triggerapp/TriggerRunnerManager.java b/src/java/azkaban/triggerapp/TriggerRunnerManager.java
index b4e5fe7..9a7d62d 100644
--- a/src/java/azkaban/triggerapp/TriggerRunnerManager.java
+++ b/src/java/azkaban/triggerapp/TriggerRunnerManager.java
@@ -4,11 +4,12 @@ import java.io.IOException;
 import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.log4j.Logger;
 
@@ -39,6 +40,7 @@ public class TriggerRunnerManager {
 	
 	private final TriggerScannerThread runnerThread;
 	private long lastRunnerThreadCheckTime = -1;
+	private long runnerThreadIdleTime = -1;
 	
 			
 	public TriggerRunnerManager(Props props, TriggerLoader triggerLoader) throws IOException {
@@ -117,28 +119,23 @@ public class TriggerRunnerManager {
 	}
 	
 	public synchronized void updateTrigger(int triggerId) throws TriggerManagerException {
-		Trigger t = triggerIdMap.get(triggerId);
-		if(t == null) {
-			throw new TriggerManagerException("The trigger to update doesn't exist!");
-		}
 		
-		runnerThread.deleteTrigger(t);
-		runnerThread.addTrigger(t);
-		triggerIdMap.put(t.getTriggerId(), t);
 		
-		triggerLoader.updateTrigger(t);
+		Trigger t = triggerIdMap.get(triggerId);
+		
+		updateTrigger(t);
 	}
 	
 	public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
 		if(!triggerIdMap.containsKey(t.getTriggerId())) {
 			throw new TriggerManagerException("The trigger to update doesn't exist!");
 		}
-		
 		runnerThread.deleteTrigger(t);
-		runnerThread.addTrigger(t);
-		triggerIdMap.put(t.getTriggerId(), t);
-		
-		triggerLoader.updateTrigger(t);
+
+		Trigger t2 = triggerLoader.loadTrigger(t.getTriggerId());
+		runnerThread.addTrigger(t2);
+		triggerIdMap.put(t2.getTriggerId(), t2);
+
 	}
 
 	public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
@@ -205,11 +202,11 @@ public class TriggerRunnerManager {
 							logger.error(t.getMessage());
 						}
 						
-						long timeRemaining = scannerInterval - (System.currentTimeMillis() - getLastRunnerThreadCheckTime());
-						if(timeRemaining < 0) {
+						runnerThreadIdleTime = scannerInterval - (System.currentTimeMillis() - getLastRunnerThreadCheckTime());
+						if(runnerThreadIdleTime < 0) {
 							logger.error("Trigger manager thread " + this.getName() + " is too busy!");
 						} else {
-							wait(timeRemaining);
+							wait(runnerThreadIdleTime);
 						}
 					} catch(InterruptedException e) {
 						logger.info("Interrupted. Probably to shut down.");
@@ -221,6 +218,7 @@ public class TriggerRunnerManager {
 		
 		private void checkAllTriggers() throws TriggerManagerException {
 			for(Trigger t : triggers) {
+				logger.info("Checking trigger " + t.getDescription());
 				if(t.getStatus().equals(TriggerStatus.READY)) {
 					if(t.triggerConditionMet()) {
 						onTriggerTrigger(t);
@@ -235,22 +233,23 @@ public class TriggerRunnerManager {
 			List<TriggerAction> actions = t.getTriggerActions();
 			for(TriggerAction action : actions) {
 				try {
+					logger.info("Doing trigger actions");
 					action.doAction();
 				} catch (Exception e) {
 					// TODO Auto-generated catch block
-					throw new TriggerManagerException("action failed to execute", e);
+					//throw new TriggerManagerException("action failed to execute", e);
+					logger.error("Failed to do action " + action.getDescription(), e);
+				} catch (Throwable th) {
+					logger.error("Failed to do action " + action.getDescription(), th);
 				}
 			}
 			if(t.isResetOnTrigger()) {
 				t.resetTriggerConditions();
 				t.resetExpireCondition();
-//				updateTrigger(t);
 			} else {
 				t.setStatus(TriggerStatus.EXPIRED);
 			}
-			
 			triggerLoader.updateTrigger(t);
-			
 //			updateAgent(t);
 		}
 		
@@ -258,10 +257,14 @@ public class TriggerRunnerManager {
 			List<TriggerAction> expireActions = t.getExpireActions();
 			for(TriggerAction action : expireActions) {
 				try {
+					logger.info("Doing expire actions");
 					action.doAction();
 				} catch (Exception e) {
 					// TODO Auto-generated catch block
-					throw new TriggerManagerException("expire action failed to execute", e);
+					//throw new TriggerManagerException("action failed to execute", e);
+					logger.error("Failed to do expire action " + action.getDescription(), e);
+				} catch (Throwable th) {
+					logger.error("Failed to do expire action " + action.getDescription(), th);
 				}
 			}
 			if(t.isResetOnExpire()) {
@@ -271,7 +274,6 @@ public class TriggerRunnerManager {
 			} else {
 				t.setStatus(TriggerStatus.EXPIRED);
 			}
-//			updateAgent(t);
 			triggerLoader.updateTrigger(t);
 		}
 	}
@@ -338,4 +340,24 @@ public class TriggerRunnerManager {
 		}
 	}
 
+	public int getNumTriggers() {
+		return triggerIdMap.size();
+	}
+
+	public String getTriggerSources() {
+		Set<String> sources = new HashSet<String>();
+		for(Trigger t : triggerIdMap.values()) {
+			sources.add(t.getSource());
+		}
+		return sources.toString();
+	}
+
+	public String getTriggerIds() {
+		return triggerIdMap.keySet().toString();
+	}
+
+	public long getScannerIdleTime() {
+		return runnerThreadIdleTime;
+	}
+
 }
diff --git a/src/java/azkaban/utils/AbstractMailer.java b/src/java/azkaban/utils/AbstractMailer.java
index 85f589c..721675a 100644
--- a/src/java/azkaban/utils/AbstractMailer.java
+++ b/src/java/azkaban/utils/AbstractMailer.java
@@ -48,6 +48,10 @@ public class AbstractMailer {
 		return message;
 	}
 	
+	public EmailMessage prepareEmailMessage(String subject, String mimetype, Collection<String> emailList) {
+		return createEmailMessage(subject, mimetype, emailList);
+	}
+	
 	public String getAzkabanName() {
 		return azkabanName;
 	}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index fdfbed3..39c105c 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -55,9 +55,11 @@ import org.mortbay.thread.QueuedThreadPool;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
+import azkaban.executor.ExecutorManager.Alerter;
 import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxScheduler;
+import azkaban.jmx.JmxTriggerManager;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectManager;
 
@@ -65,11 +67,20 @@ import azkaban.scheduler.JdbcScheduleLoader;
 import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.TriggerBasedScheduleLoader;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.CheckerTypeLoader;
 import azkaban.trigger.JdbcTriggerLoader;
 import azkaban.trigger.TriggerLoader;
 import azkaban.trigger.TriggerManager;
 import azkaban.trigger.TriggerAgent;
 import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.trigger.builtin.KillExecutionAction;
+import azkaban.trigger.builtin.SlaAlertAction;
+import azkaban.trigger.builtin.SlaChecker;
+import azkaban.triggerapp.AzkabanTriggerServer;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
 import azkaban.utils.FileIOUtils;
@@ -167,17 +178,15 @@ public class AzkabanWebServer extends AzkabanServer {
 		this.server = server;
 		velocityEngine = configureVelocityEngine(props.getBoolean(VELOCITY_DEV_MODE_PARAM, false));
 		sessionCache = new SessionCache(props);
-		userManager = loadUserManager(props);		
-		executorManager = loadExecutorManager(props);
+		userManager = loadUserManager(props);
 		
 		triggerManager = loadTriggerManager(props);
-		
-		projectManager = loadProjectManager(props);
-		
-//		scheduler = loadScheduler(executorManager, projectManager, triggerManager);
-		
+		executorManager = loadExecutorManager(props);
+		projectManager = loadProjectManager(props, triggerManager);
 		scheduleManager = loadScheduleManager(executorManager, triggerManager, props);
 		
+		loadBuiltinCheckersAndActions();
+		
 		baseClassLoader = getBaseClassloader();
 		
 		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -225,12 +234,12 @@ public class AzkabanWebServer extends AzkabanServer {
 		return manager;
 	}
 	
-	private ProjectManager loadProjectManager(Props props) {
+	private ProjectManager loadProjectManager(Props props, TriggerManager tm) {
 		logger.info("Loading JDBC for project management");
 
 		JdbcProjectLoader loader = new JdbcProjectLoader(props);
 		ProjectManager manager = new ProjectManager(loader, props);
-		manager.setTriggerManager(triggerManager);
+		manager.setTriggerManager(tm);
 		
 		return manager;
 	}
@@ -241,7 +250,7 @@ public class AzkabanWebServer extends AzkabanServer {
 		return execManager;
 	}
 
-	private ScheduleManager loadScheduleManager(ExecutorManager executorManager, TriggerManager triggerManager, Props props ) throws Exception {
+	private ScheduleManager loadScheduleManager(ExecutorManager executorManager, TriggerManager tm, Props props ) throws Exception {
 		ScheduleManager schedManager = null;
 		String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
 		if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
@@ -251,7 +260,7 @@ public class AzkabanWebServer extends AzkabanServer {
 			schedManager.start();
 		} else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
 			logger.info("Loading trigger based scheduler");
-			ScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, null, ScheduleManager.triggerSource);
+			ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
 			schedManager = new ScheduleManager(executorManager, loader, true);
 		}
 
@@ -268,6 +277,39 @@ public class AzkabanWebServer extends AzkabanServer {
 		return new TriggerManager(props, loader);
 	}
 	
+	private void loadBuiltinCheckersAndActions() {
+		logger.info("Loading built-in checker and action types");
+//		ExecutorManager executorManager = app.getExecutorManager();
+//		TriggerRunnerManager triggerRunnerManager = app.getTriggerRunnerManager();
+		CheckerTypeLoader checkerLoader = triggerManager.getCheckerLoader();
+		ActionTypeLoader actionLoader = triggerManager.getActionLoader();
+		// time:
+		checkerLoader.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
+//		// execution checker
+//		ExecutionChecker.setExecutorManager(executorManager);
+//		checkerLoader.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
+		// Sla checker
+//		SlaChecker.setExecutorManager(executorManager);
+		checkerLoader.registerCheckerType(SlaChecker.type, SlaChecker.class);
+		
+		// execut flow action
+//		ExecuteFlowAction.setExecutorManager(executorManager);
+//		ExecuteFlowAction.setProjectManager(projectManager);
+		actionLoader.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
+		// kill flow action
+//		KillExecutionAction.setExecutorManager(executorManager);
+		actionLoader.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
+		// sla alert
+//		SlaAlertAction.setExecutorManager(executorManager);
+//		Map<String, Alerter> alerters = loadAlerters(props);
+//		SlaAlertAction.setAlerters(alerters);
+		actionLoader.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
+		// create trigger action
+//		CreateTriggerAction.setTriggerRunnerManager(triggerRunnerManager);
+		actionLoader.registerActionType(CreateTriggerAction.type, CreateTriggerAction.class);
+
+	}
+	
 	/**
 	 * Returns the web session cache.
 	 * 
@@ -527,7 +569,8 @@ public class AzkabanWebServer extends AzkabanServer {
 	private static Map<String, TriggerPlugin> loadTriggerPlugins(Context root, String pluginPath, AzkabanWebServer azkabanWebApp) {
 		File triggerPluginPath = new File(pluginPath);
 		if (!triggerPluginPath.exists()) {
-			return Collections.<String, TriggerPlugin>emptyMap();
+			//return Collections.<String, TriggerPlugin>emptyMap();
+			return new HashMap<String, TriggerPlugin>();
 		}
 			
 		Map<String, TriggerPlugin> installedTriggerPlugins = new HashMap<String, TriggerPlugin>();
@@ -895,7 +938,7 @@ public class AzkabanWebServer extends AzkabanServer {
 		mbeanServer = ManagementFactory.getPlatformMBeanServer();
 
 		registerMbean("jetty", new JmxJettyServer(server));
-		registerMbean("scheduler", new JmxScheduler(scheduleManager));
+		registerMbean("triggerManager", new JmxTriggerManager(triggerManager));
 		registerMbean("executorManager", new JmxExecutorManager(executorManager));
 	}
 	
diff --git a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 75982d1..2d4dff5 100644
--- a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -17,6 +17,8 @@ import org.apache.log4j.Logger;
 
 import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutorManager;
+import azkaban.triggerapp.TriggerConnectorParams;
+import azkaban.trigger.TriggerManager;
 import azkaban.user.Permission;
 import azkaban.user.Role;
 import azkaban.user.User;
@@ -38,6 +40,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 	private UserManager userManager;
 	private AzkabanWebServer server;
 	private ExecutorManager executorManager;
+	private TriggerManager triggerManager;
 	
 	@Override
 	public void init(ServletConfig config) throws ServletException {
@@ -46,6 +49,7 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 		server = (AzkabanWebServer)getApplication();
 		userManager = server.getUserManager();
 		executorManager = server.getExecutorManager();
+		triggerManager = server.getTriggerManager();
 	}
 	
 	@Override
@@ -71,6 +75,17 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 				Map<String, Object> result = executorManager.callExecutorJMX(hostPort, JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
 				ret = result;
 			}
+			else if (TriggerConnectorParams.JMX_GET_ALL_TRIGGER_SERVER_ATTRIBUTES.equals(ajax)) {
+				if(!hasParam(req, JMX_MBEAN) || !hasParam(req, JMX_HOSTPORT)) {
+					ret.put("error", "Parameters '" + JMX_MBEAN + "' and '"+ JMX_HOSTPORT +"' must be set");
+					this.writeJSON(resp, ret, true);
+                    return;
+				}
+				String hostPort = getParam(req, JMX_HOSTPORT);
+                String mbean = getParam(req, JMX_MBEAN);
+                Map<String, Object> result = triggerManager.callTriggerServerJMX(hostPort, JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
+                ret = result;
+			}
 			else if (JMX_GET_MBEANS.equals(ajax)) {
 				ret.put("mbeans", server.getMbeanNames());
 			}
@@ -177,7 +192,28 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 			}
 		}
 		
-		page.add("remoteMBeans", executorMBeans);
+		page.add("executorRemoteMBeans", executorMBeans);
+		
+		Map<String, Object> triggerserverMBeans = new HashMap<String,Object>();
+		Set<String> primaryTriggerServerHosts = triggerManager.getPrimaryServerHosts();
+		for (String hostPort: triggerManager.getAllActiveTriggerServerHosts()) {
+			try {
+				Map<String, Object> mbeans = triggerManager.callTriggerServerJMX(hostPort, TriggerConnectorParams.JMX_GET_MBEANS, null);
+				
+				if (primaryTriggerServerHosts.contains(hostPort)) {
+					triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
+				}
+				else {
+					triggerserverMBeans.put(hostPort, mbeans.get("mbeans"));
+				}
+			}
+			catch (IOException e) {
+				logger.error("Cannot contact executor " + hostPort, e);
+			}
+		}
+		
+		page.add("triggerserverRemoteMBeans", triggerserverMBeans);
+		
 		page.render();
 	}
 	
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 73fefdb..1ef2cef 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -54,6 +54,7 @@ import azkaban.scheduler.Schedule;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.ScheduleManagerException;
 import azkaban.scheduler.ScheduleStatisticManager;
+import azkaban.sla.SlaOption;
 import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.User;
@@ -95,12 +96,12 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		HashMap<String, Object> ret = new HashMap<String, Object>();
 		String ajaxName = getParam(req, "ajax");
 		
-//		if (ajaxName.equals("slaInfo")) {
-//			ajaxSlaInfo(req, ret, session.getUser());
-//		}
-//		else if(ajaxName.equals("setSla")) {
-//			ajaxSetSla(req, ret, session.getUser());
-//		}
+		if (ajaxName.equals("slaInfo")) {
+			ajaxSlaInfo(req, ret, session.getUser());
+		}
+		else if(ajaxName.equals("setSla")) {
+			ajaxSetSla(req, ret, session.getUser());
+		} else
 		if(ajaxName.equals("loadFlow")) {
 			ajaxLoadFlows(req, ret, session.getUser());
 		}
@@ -117,98 +118,110 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
-//	private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
-//		try {
-//			
-//			int scheduleId = getIntParam(req, "scheduleId");
-//			
-//			Schedule sched = scheduleManager.getSchedule(scheduleId);
-//			
-//			Project project = projectManager.getProject(sched.getProjectId());
-//			if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
-//				ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
-//				return;
-//			}
-//			
-//			
-//			SlaOptions slaOptions= new SlaOptions();
-//			
-//			String slaEmails = getParam(req, "slaEmails");
-//			String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-//			
-//			Map<String, String> settings = getParamGroup(req, "settings");
-//			List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
-//			for(String set : settings.keySet()) {
-//				SlaSetting s;
-//				try {
-//				s = parseSlaSetting(settings.get(set));
-//				}
-//				catch (Exception e) {
-//					throw new ServletException(e);
-//				}
-//				if(s != null) {
-//					slaSettings.add(s);
-//				}
-//			}
-//			
-//			if(slaSettings.size() > 0) {
-//				if(slaEmails.equals("")) {
-//					ret.put("error", "Please put correct email settings for your SLA actions");
-//					return;
-//				}
-//				slaOptions.setSlaEmails(Arrays.asList(emailSplit));
-//				slaOptions.setSettings(slaSettings);
-//			}
-//			else {
-//				slaOptions = null;
-//			}
-//			sched.setSlaOptions(slaOptions);
-//			scheduleManager.insertSchedule(sched);
-//
-//			if(slaOptions != null) {
-//				projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + sched.getFlowName() + " has been added/changed.");
-//			}
-//			
-//		} catch (ServletException e) {
-//			ret.put("error", e.getMessage());
-//		} catch (ScheduleManagerException e) {
-//			ret.put("error", e.getMessage());
-//		}
-//		
-//	}
+	private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+		try {
+			
+			int scheduleId = getIntParam(req, "scheduleId");
+			
+			Schedule sched = scheduleManager.getSchedule(scheduleId);
+			
+			Project project = projectManager.getProject(sched.getProjectId());
+			if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
+				ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
+				return;
+			}
+				
+			String emailStr = getParam(req, "slaEmails");
+			String[] emailSplit = emailStr.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			List<String> slaEmails = Arrays.asList(emailSplit);
+			
+			Map<String, String> settings = getParamGroup(req, "settings");
+			
+			List<SlaOption> slaOptions = new ArrayList<SlaOption>();
+			for(String set : settings.keySet()) {
+				SlaOption sla;
+				try {
+				sla = parseSlaSetting(settings.get(set));
+				sla.getInfo().put(SlaOption.INFO_FLOW_NAME, sched.getFlowName());
+				sla.getInfo().put(SlaOption.INFO_EMAIL_LIST, slaEmails);
+				}
+				catch (Exception e) {
+					throw new ServletException(e);
+				}
+				if(sla != null) {
+					sla.getInfo().put("SlaEmails", slaEmails);
+					slaOptions.add(sla);
+				}
+			}
+			
+			sched.setSlaOptions(slaOptions);
+			scheduleManager.insertSchedule(sched);
+
+			if(slaOptions != null) {
+				projectManager.postProjectEvent(project, EventType.SLA, user.getUserId(), "SLA for flow " + sched.getFlowName() + " has been added/changed.");
+			}
+			
+		} catch (ServletException e) {
+			ret.put("error", e.getMessage());
+		} catch (ScheduleManagerException e) {
+			ret.put("error", e.getMessage());
+		}
+		
+	}
 	
-//	private SlaSetting parseSlaSetting(String set) throws ScheduleManagerException {
-//		// "" + Duration + EmailAction + KillAction
-//		String[] parts = set.split(",", -1);
-//		String id = parts[0];
-//		String rule = parts[1];
-//		String duration = parts[2];
-//		String emailAction = parts[3];
-//		String killAction = parts[4];
-//		if(emailAction.equals("true") || killAction.equals("true")) {
-//			SlaSetting r = new SlaSetting();			
-//			r.setId(id);
-//			r.setRule(SlaRule.valueOf(rule));
-//			ReadablePeriod dur;
-//			try {
-//				dur = parseDuration(duration);
-//			}
-//			catch (Exception e) {
-//				throw new ScheduleManagerException("Unable to parse duration for a SLA that needs to take actions!", e);
-//			}
-//			r.setDuration(dur);
-//			List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
-//			if(emailAction.equals("true")) {
-//				actions.add(SlaAction.EMAIL);
-//			}
-//			if(killAction.equals("true")) {
-//				actions.add(SlaAction.KILL);
-//			}
-//			r.setActions(actions);
-//			return r;
-//		}
-//		return null;
-//	}
+	private SlaOption parseSlaSetting(String set) throws ScheduleManagerException {
+		// "" + Duration + EmailAction + KillAction
+		logger.info("Tryint to set sla with the following set: " + set);
+		
+		String slaType;
+		List<String> slaActions = new ArrayList<String>();
+		Map<String, Object> slaInfo = new HashMap<String, Object>();
+		String[] parts = set.split(",", -1);
+		String id = parts[0];
+		String rule = parts[1];	
+		String duration = parts[2];
+		String emailAction = parts[3];
+		String killAction = parts[4];
+		if(emailAction.equals("true") || killAction.equals("true")) {
+			//String type = id.equals("") ? SlaOption.RULE_FLOW_RUNTIME_SLA : SlaOption.RULE_JOB_RUNTIME_SLA ;
+			if(emailAction.equals("true")) {
+				slaActions.add(SlaOption.ACTION_ALERT);
+				slaInfo.put(SlaOption.ALERT_TYPE, "email");
+			}
+			if(killAction.equals("true")) {
+				slaActions.add(SlaOption.ACTION_CANCEL_FLOW);
+			}
+			if(id.equals("")) {
+				if(rule.equals("SUCCESS")) {
+					slaType = SlaOption.TYPE_FLOW_SUCCEED;
+				}
+				else {
+					slaType = SlaOption.TYPE_FLOW_FINISH;
+				}
+			} else {
+				slaInfo.put(SlaOption.INFO_JOB_NAME, id);
+				if(rule.equals("SUCCESS")) {
+					slaType = SlaOption.TYPE_JOB_SUCCEED;
+				} else {
+					slaType = SlaOption.TYPE_JOB_FINISH;
+				}
+			}
+			
+			ReadablePeriod dur;
+			try {
+				dur = parseDuration(duration);
+			}
+			catch (Exception e) {
+				throw new ScheduleManagerException("Unable to parse duration for a SLA that needs to take actions!", e);
+			}
+
+			slaInfo.put(SlaOption.INFO_DURATION, Utils.createPeriodString(dur));
+			SlaOption r = new SlaOption(slaType, slaActions, slaInfo);
+			logger.info("Parsing sla as id:" + id + " type:" + slaType + " rule:" + rule + " Duration:" + duration + " actions:" + slaActions);
+			return r;
+		}
+		return null;
+	}
 
 	private ReadablePeriod parseDuration(String duration) {
 		int hour = Integer.parseInt(duration.split(":")[0]);
@@ -216,77 +229,76 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		return Minutes.minutes(min+hour*60).toPeriod();
 	}
 
-//	private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
-//		int scheduleId;
-//		try {
-//			scheduleId = getIntParam(req, "scheduleId");
-//			
-//			Schedule sched = scheduleManager.getSchedule(scheduleId);
-//			
-//			Project project = getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
-//			if (project == null) {
-//				ret.put("error", "Error loading project. Project " + sched.getProjectId() + " doesn't exist");
-//				return;
-//			}
-//			
-//			Flow flow = project.getFlow(sched.getFlowName());
-//			if (flow == null) {
-//				ret.put("error", "Error loading flow. Flow " + sched.getFlowName() + " doesn't exist in " + sched.getProjectId());
-//				return;
-//			}
-//			
-//			SlaOptions slaOptions = sched.getSlaOptions();
-//			ExecutionOptions flowOptions = sched.getExecutionOptions();
-//			
-//			if(slaOptions != null) {
-//				ret.put("slaEmails", slaOptions.getSlaEmails());
-//				List<SlaSetting> settings = slaOptions.getSettings();
-//				List<Object> setObj = new ArrayList<Object>();
-//				for(SlaSetting set: settings) {
-//					setObj.add(set.toObject());
-//				}
-//				ret.put("settings", setObj);
-//			}
-//			else if (flowOptions != null) {
-//				if(flowOptions.getFailureEmails() != null) {
-//					List<String> emails = flowOptions.getFailureEmails();
-//					if(emails.size() > 0) {
-//						ret.put("slaEmails", emails);
-//					}
-//				}
-//			}
-//			else {
-//				if(flow.getFailureEmails() != null) {
-//					List<String> emails = flow.getFailureEmails();
-//					if(emails.size() > 0) {
-//						ret.put("slaEmails", emails);
-//					}
-//				}
-//			}
-//			
-//			List<String> disabledJobs;
-//			if(flowOptions != null) {
-//				disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
-//			}
-//			else {
-//				disabledJobs = new ArrayList<String>();
-//			}
-//				
-//			List<String> allJobs = new ArrayList<String>();
-//			for(Node n : flow.getNodes()) {
-//				if(!disabledJobs.contains(n.getId())) {
-//					allJobs.add(n.getId());
-//				}
-//			}
-//			ret.put("allJobNames", allJobs);
-//		} catch (ServletException e) {
-//			ret.put("error", e);
-//		} catch (ScheduleManagerException e) {
-//			// TODO Auto-generated catch block
-//			ret.put("error", e);
-//		}
-//		
-//	}
+	private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+		int scheduleId;
+		try {
+			scheduleId = getIntParam(req, "scheduleId");
+			
+			Schedule sched = scheduleManager.getSchedule(scheduleId);
+			
+			Project project = getProjectAjaxByPermission(ret, sched.getProjectId(), user, Type.READ);
+			if (project == null) {
+				ret.put("error", "Error loading project. Project " + sched.getProjectId() + " doesn't exist");
+				return;
+			}
+			
+			Flow flow = project.getFlow(sched.getFlowName());
+			if (flow == null) {
+				ret.put("error", "Error loading flow. Flow " + sched.getFlowName() + " doesn't exist in " + sched.getProjectId());
+				return;
+			}
+			
+			List<SlaOption> slaOptions = sched.getSlaOptions();
+			ExecutionOptions flowOptions = sched.getExecutionOptions();
+			
+			if(slaOptions != null && slaOptions.size() > 0) {
+				ret.put("slaEmails", slaOptions.get(0).getInfo().get("SlaEmails"));
+				
+				List<Object> setObj = new ArrayList<Object>();
+				for(SlaOption sla: slaOptions) {
+					setObj.add(sla.toWebObject());
+				}
+				ret.put("settings", setObj);
+			}
+			else if (flowOptions != null) {
+				if(flowOptions.getFailureEmails() != null) {
+					List<String> emails = flowOptions.getFailureEmails();
+					if(emails.size() > 0) {
+						ret.put("slaEmails", emails);
+					}
+				}
+			}
+			else {
+				if(flow.getFailureEmails() != null) {
+					List<String> emails = flow.getFailureEmails();
+					if(emails.size() > 0) {
+						ret.put("slaEmails", emails);
+					}
+				}
+			}
+			
+			List<String> disabledJobs;
+			if(flowOptions != null) {
+				disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
+			}
+			else {
+				disabledJobs = new ArrayList<String>();
+			}
+				
+			List<String> allJobs = new ArrayList<String>();
+			for(Node n : flow.getNodes()) {
+				if(!disabledJobs.contains(n.getId())) {
+					allJobs.add(n.getId());
+				}
+			}
+			ret.put("allJobNames", allJobs);
+		} catch (ServletException e) {
+			ret.put("error", e);
+		} catch (ScheduleManagerException e) {
+			ret.put("error", e);
+		}
+		
+	}
 
 	protected Project getProjectAjaxByPermission(Map<String, Object> ret, int projectId, User user, Permission.Type type) {
 		Project project = projectManager.getProject(projectId);
@@ -614,7 +626,9 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			ret.put("error", e.getMessage());
 		}
 		
-		Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions);
+		List<SlaOption> slaOptions = null;
+		
+		Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
 		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
 		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
 
diff --git a/src/java/azkaban/webapp/servlet/velocity/jmxpage.vm b/src/java/azkaban/webapp/servlet/velocity/jmxpage.vm
index 2a0e945..efa5c60 100644
--- a/src/java/azkaban/webapp/servlet/velocity/jmxpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/jmxpage.vm
@@ -89,7 +89,7 @@
 				</tbody>
 			</table>
 
-#foreach($executor in $remoteMBeans.entrySet())
+#foreach($executor in $executorRemoteMBeans.entrySet())
 			<h3 class="subhead">Remote Executor JMX $executor.key</h3>
 			<table class="all-jobs job-table remoteJMX">
 				<thead>
@@ -131,6 +131,50 @@
 				</tbody>
 			</table>
 #end
+
+#foreach($triggerserver in $triggerserverRemoteMBeans.entrySet())
+			<h3 class="subhead">Remote Trigger Server JMX $triggerserver.key</h3>
+			<table class="all-jobs job-table remoteJMX">
+				<thead>
+					<tr>
+						<th>Name</th>
+						<th>Domain</th>
+						<th>Canonical Name</th>
+						<th></th>
+					</tr>
+				</thead>
+				<tbody>
+					#foreach($bean in $triggerserver.value)
+						<tr>
+							<td>${bean.get("keyPropertyList").get("name")}</td>
+							<td>${bean.get("domain")}</td>
+							<td>${bean.get("canonicalName")}</td>
+							<td><div class="btn4 querybtn" id="expandBtn-$counter" domain="${bean.get("domain")}" name="${bean.get("keyPropertyList").get("name")}" hostport="$triggerserver.key">Query</div></td>
+						</tr>
+					<tr class="childrow" id="expandBtn-${counter}-child"  style="display: none;">
+						<td class="expandedFlow" colspan="3">
+							<table class="innerTable">
+								<thead>
+									<tr>
+										<th>Attribute Name</th>
+										<th>Value</th>
+									</tr>
+								</thead>
+								<tbody id="expandBtn-${counter}-tbody">
+								</tbody>
+							</table>
+						</td>
+
+						<td>
+							<div class="btn4 collapse">Collapse</div>
+						</td>
+				</tr>
+				#set($counter=$counter + 1)
+					#end 
+				</tbody>
+			</table>
+#end
+
 		</div>
 	</body>
 </html>
diff --git a/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm b/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm
index bf62ca4..6ccb348 100644
--- a/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm
@@ -72,7 +72,8 @@
 							<th>Source</th>
 							<th>Submitted By</th>
 							<th>Description</th>
-							<th colspan="2" class="action ignoresort">Action</th>
+							<th>Status</th>
+							<!--th colspan="2" class="action ignoresort">Action</th-->
 						</tr>
 					</thead>
 					<tbody>
@@ -84,7 +85,8 @@
 							<td>${trigger.source}</td>
 							<td>${trigger.submitUser}</td>
 							<td>${trigger.getDescription()}</td>
-							<td><button id="expireTriggerBtn" onclick="expireTrigger(${trigger.triggerId})" >Expire Trigger</button></td>
+							<td>${trigger.getStatus()}</td>
+							<!--td><button id="expireTriggerBtn" onclick="expireTrigger(${trigger.triggerId})" >Expire Trigger</button></td-->
 						</tr>
 #end
 #else
diff --git a/src/web/js/azkaban.scheduled.view.js b/src/web/js/azkaban.scheduled.view.js
index f507dc1..6d8c039 100644
--- a/src/web/js/azkaban.scheduled.view.js
+++ b/src/web/js/azkaban.scheduled.view.js
@@ -107,10 +107,6 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 							indexToText[i] = "job " + allJobNames[i-1];
 						}
 						
-						
-						
-						
-						
 						// populate with existing settings
 						if(data.settings) {
 							
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index 6365fd0..0d746d8 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -113,12 +113,12 @@ public class JdbcScheduleLoaderTest {
 		flowOptions.setDisabledJobs(disabledJobs);
 
 		
-		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
-		Schedule s2 = new Schedule(-1, 1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
-		Schedule s3 = new Schedule(-1, 2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
-		Schedule s4 = new Schedule(-1, 3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
-		Schedule s5 = new Schedule(-1, 3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
-		Schedule s6 = new Schedule(-1, 3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
+		Schedule s2 = new Schedule(-1, 1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
+		Schedule s3 = new Schedule(-1, 2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
+		Schedule s4 = new Schedule(-1, 3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
+		Schedule s5 = new Schedule(-1, 3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
+		Schedule s6 = new Schedule(-1, 3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
 		
 		loader.insertSchedule(s1);
 		loader.insertSchedule(s2);
@@ -163,13 +163,13 @@ public class JdbcScheduleLoaderTest {
 		flowOptions.setDisabledJobs(disabledJobs);
 		
 		System.out.println("the flow options are " + flowOptions);
-		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+		Schedule s1 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
 
 		loader.insertSchedule(s1);
 		
 		emails.add("email3");
 		
-		Schedule s2 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "azkaban", flowOptions);
+		Schedule s2 = new Schedule(-1, 1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "azkaban", flowOptions, null);
 
 		loader.updateSchedule(s2);
 		
@@ -208,7 +208,7 @@ public class JdbcScheduleLoaderTest {
 			flowOptions.setFailureEmails(emails);
 			flowOptions.setDisabledJobs(disabledJobs);
 			
-			Schedule s = new Schedule(-1, i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions);
+			Schedule s = new Schedule(-1, i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "azkaban", flowOptions, null);
 			schedules.add(s);
 			try {
 				loader.insertSchedule(s);
diff --git a/unit/java/azkaban/test/trigger/DummyTriggerAction.java b/unit/java/azkaban/test/trigger/DummyTriggerAction.java
index a99b3f7..cffbed6 100644
--- a/unit/java/azkaban/test/trigger/DummyTriggerAction.java
+++ b/unit/java/azkaban/test/trigger/DummyTriggerAction.java
@@ -1,5 +1,7 @@
 package azkaban.test.trigger;
 
+import java.util.Map;
+
 import azkaban.trigger.TriggerAction;
 
 public class DummyTriggerAction implements TriggerAction{
@@ -40,4 +42,15 @@ public class DummyTriggerAction implements TriggerAction{
 		return "this is real dummy action";
 	}
 
+	@Override
+	public String getId() {
+		return null;
+	}
+
+	@Override
+	public void setContext(Map<String, Object> context) {
+		// TODO Auto-generated method stub
+		
+	}
+
 }
diff --git a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
index 962884c..4f1bb09 100644
--- a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
+++ b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
@@ -27,7 +27,7 @@ public class ExecuteFlowActionTest {
 		List<String> disabledJobs = new ArrayList<String>();
 		options.setDisabledJobs(disabledJobs);
 		
-		ExecuteFlowAction executeFlowAction = new ExecuteFlowAction(1, "testproject", "testflow", "azkaban", options);
+		ExecuteFlowAction executeFlowAction = new ExecuteFlowAction("ExecuteFlowAction", 1, "testproject", "testflow", "azkaban", options, null);
 		
 		Object obj = executeFlowAction.toJson();
 		
diff --git a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
index 98bd5ae..9a06f56 100644
--- a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
+++ b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
@@ -194,7 +194,7 @@ public class JdbcTriggerLoaderTest {
 		Condition triggerCond = new Condition(checkers1, expr1);
 		Condition expireCond = new Condition(checkers1, expr1);
 		List<TriggerAction> actions = new ArrayList<TriggerAction>();
-		TriggerAction action = new ExecuteFlowAction(1, projName, flowName, "azkaban", new ExecutionOptions());
+		TriggerAction action = new ExecuteFlowAction("executeAction", 1, projName, flowName, "azkaban", new ExecutionOptions(), null);
 		actions.add(action);
 		Trigger t = new Trigger(now, now, "azkaban", source, triggerCond, expireCond, actions);
 		return t;
diff --git a/unit/java/azkaban/test/trigger/ThresholdChecker.java b/unit/java/azkaban/test/trigger/ThresholdChecker.java
index 801a134..c25f566 100644
--- a/unit/java/azkaban/test/trigger/ThresholdChecker.java
+++ b/unit/java/azkaban/test/trigger/ThresholdChecker.java
@@ -1,5 +1,7 @@
 package azkaban.test.trigger;
 
+import java.util.Map;
+
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
@@ -87,5 +89,11 @@ public class ThresholdChecker implements ConditionChecker{
 		
 	}
 
+	@Override
+	public void setContext(Map<String, Object> context) {
+		// TODO Auto-generated method stub
+		
+	}
+
 
 }
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
index 8629edd..790fcb4 100644
--- a/unit/java/azkaban/test/trigger/TriggerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -52,7 +52,7 @@ public class TriggerTest {
 		Condition triggerCond = new Condition(checkers1, expr1);
 		Condition expireCond = new Condition(checkers1, expr1);
 		List<TriggerAction> actions = new ArrayList<TriggerAction>();
-		TriggerAction action = new ExecuteFlowAction(1, "testProj", "testFlow", "azkaban", new ExecutionOptions());
+		TriggerAction action = new ExecuteFlowAction("executeAction", 1, "testProj", "testFlow", "azkaban", new ExecutionOptions(), null);
 		actions.add(action);
 		Trigger t = new Trigger(now, now, "azkaban", "test", triggerCond, expireCond, actions);