azkaban-uncached

almost ready generic triggers

10/7/2013 7:45:37 PM

Changes

build.xml 42(+41 -1)

src/java/azkaban/executor/ExecutorManagerRemoteAdapter.java 761(+0 -761)

src/java/azkaban/scheduler/JdbcScheduleLoader.java 354(+0 -354)

src/java/azkaban/scheduler/TriggerBasedScheduler.java 203(+0 -203)

src/java/azkaban/trigger/TriggerManager.java.old 573(+0 -573)

src/java/azkaban/trigger/TriggerManagerRemoteAdapter.java 181(+0 -181)

src/java/azkaban/trigger/TriggerManagerServlet.java 158(+0 -158)

src/java/azkaban/triggerapp/AzkabanTriggerServer.java 686(+0 -686)

src/java/azkaban/triggerapp/JMXHttpServlet.java 72(+0 -72)

src/java/azkaban/triggerapp/TriggerConnectorParams.java 33(+0 -33)

src/java/azkaban/triggerapp/TriggerRunnerManagerException.java 33(+0 -33)

unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java 280(+0 -280)

Details

build.xml 42(+41 -1)

diff --git a/build.xml b/build.xml
index 86db40b..1031487 100644
--- a/build.xml
+++ b/build.xml
@@ -8,12 +8,14 @@
 	<property name="dist.packages.dir" value="${basedir}/dist/packages" />
 	<property name="dist.web.package.dir" value="${dist.packages.dir}/azkaban-web-server" />
 	<property name="dist.exec.package.dir" value="${dist.packages.dir}/azkaban-exec-server" />
+	<property name="dist.trigger.package.dir" value="${dist.packages.dir}/azkaban-trigger-server" />
 	<property name="dist.solo.package.dir" value="${dist.packages.dir}/azkaban-solo-server" />
 	<property name="dist.sql.package.dir" value="${dist.packages.dir}/sql" />	
 
 	<property name="conf.dir" value="${basedir}/conf" />
 	<property name="web.package.dir" value="${basedir}/src/package/webserver" />
 	<property name="exec.package.dir" value="${basedir}/src/package/execserver" />
+	<property name="trigger.package.dir" value="${basedir}/src/package/triggerserver" />
 	<property name="solo.package.dir" value="${basedir}/src/package/soloserver" />
 	
 	<property name="lib.dir" value="${basedir}/lib" />
@@ -208,6 +210,44 @@
 		</tar>
 	</target>
 	
+	<target name="package-trigger-server" depends="jars" description="Creates a package for the trigger server">
+		<delete dir="${dist.trigger.package.dir}" />
+		<mkdir dir="${dist.trigger.package.dir}" />
+		<mkdir dir="${dist.trigger.package.dir}/conf" />
+		<mkdir dir="${dist.trigger.package.dir}/bin" />
+		<mkdir dir="${dist.trigger.package.dir}/lib" />
+		<mkdir dir="${dist.trigger.package.dir}/plugins" />
+		<mkdir dir="${dist.trigger.package.dir}/extlib" />
+
+		<!-- Copy Azkaban jars and libs-->
+		<copy file="${azkaban.jar}" todir="${dist.trigger.package.dir}/lib" />
+		<copy todir="${dist.trigger.package.dir}/lib" >
+			<fileset dir="${lib.dir}" >
+				<exclude name="hadoop-core*.jar"/>
+			</fileset>
+		</copy>
+
+		<!-- Copy bin files for trigger server only-->
+		<copy todir="${dist.trigger.package.dir}/bin" >
+			<fileset dir="${trigger.package.dir}/bin"/>
+		</copy>
+
+		<!-- Copy conf files -->
+		<copy todir="${dist.trigger.package.dir}/conf" >
+			<fileset dir="${trigger.package.dir}/conf" />
+		</copy>
+
+		<!-- Tarball it -->
+		<tar destfile="${dist.trigger.package.dir}/${name}-trigger-server-${version}.tar.gz" compression="gzip" longfile="gnu">
+			<tarfileset dir="${dist.trigger.package.dir}" prefix="azkaban-${version}" filemode="755" includes="bin/*" />
+
+			<tarfileset dir="${dist.trigger.package.dir}" prefix="azkaban-${version}" includes="**">
+				<exclude name="bin/*"/>
+			</tarfileset>
+		</tar>
+	</target>
+
+
 	<target name="package-solo-server" depends="jars" description="Creates a package for the solo server">
 		<delete dir="${dist.solo.package.dir}" />
 		<mkdir dir="${dist.solo.package.dir}" />
@@ -257,6 +297,6 @@
 		</tar>
 	</target>
 	
-	<target name="package-all" depends="package-exec-server, package-web-server, package-solo-server, package-sql-scripts" description="Create all packages">
+	<target name="package-all" depends="package-trigger-server, package-exec-server, package-web-server, package-solo-server, package-sql-scripts" description="Create all packages">
 	</target>
 </project>
diff --git a/src/java/azkaban/alert/Alerter.java b/src/java/azkaban/alert/Alerter.java
new file mode 100644
index 0000000..1ba02a8
--- /dev/null
+++ b/src/java/azkaban/alert/Alerter.java
@@ -0,0 +1,11 @@
+package azkaban.alert;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.sla.SlaOption;
+
+public interface Alerter {
+	void alertOnSuccess(ExecutableFlow exflow) throws Exception;
+	void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception;
+	void alertOnFirstError(ExecutableFlow exflow) throws Exception;
+	void alertOnSla(SlaOption slaOption, String slaMessage) throws Exception;
+}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 7db8a04..1fa6989 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -16,15 +16,10 @@
 
 package azkaban.executor;
 
-import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
-import java.lang.reflect.Constructor;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,16 +39,14 @@ import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
+import azkaban.alert.Alerter;
 import azkaban.project.Project;
 import azkaban.scheduler.ScheduleStatisticManager;
-import azkaban.sla.SlaOption;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
-import azkaban.utils.FileIOUtils;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
-import azkaban.utils.PropsUtils;
 
 /**
  * Executor manager used to manage the client side job.
@@ -80,19 +73,14 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 	
 	private Map<String, Alerter> alerters;
 	
-	public interface Alerter {
-		void alertOnSuccess(ExecutableFlow exflow) throws Exception;
-		void alertOnError(ExecutableFlow exflow, String ... extraReasons) throws Exception;
-		void alertOnFirstError(ExecutableFlow exflow) throws Exception;
-		void alertOnSla(SlaOption slaOption, String slaMessage) throws Exception;
-	}
-	
-	public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
+	public ExecutorManager(Props props, ExecutorLoader loader, Map<String, Alerter> alters) throws ExecutorManagerException {
 		this.executorLoader = loader;
 		this.loadRunningFlows();
 		
 		executorHost = props.getString("executor.host", "localhost");
 		executorPort = props.getInt("executor.port");
+
+		alerters = alters;
 		
 		executingManager = new ExecutingManagerUpdaterThread();
 		executingManager.start();
@@ -101,145 +89,144 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 		cleanerThread = new CleanerThread(executionLogsRetentionMs);
 		cleanerThread.start();
 		
-		alerters = loadAlerters(props);
 	}
 	
-	private Map<String, Alerter> loadAlerters(Props props) {
-		Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
-		// load built-in alerters
-		ExecutorMailer mailAlerter = new ExecutorMailer(props);
-		allAlerters.put("email", mailAlerter);
-		// load all plugin alerters
-		String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
-		allAlerters.putAll(loadPluginAlerters(pluginDir));
-		return allAlerters;
-	}
-
-	private Map<String, Alerter> loadPluginAlerters(String pluginPath) {
-		File alerterPluginPath = new File(pluginPath);
-		if (!alerterPluginPath.exists()) {
-			return Collections.<String, Alerter>emptyMap();
-		}
-			
-		Map<String, Alerter> installedAlerterPlugins = new HashMap<String, Alerter>();
-		ClassLoader parentLoader = this.getClass().getClassLoader();
-		File[] pluginDirs = alerterPluginPath.listFiles();
-		ArrayList<String> jarPaths = new ArrayList<String>();
-		for (File pluginDir: pluginDirs) {
-			if (!pluginDir.isDirectory()) {
-				logger.error("The plugin path " + pluginDir + " is not a directory.");
-				continue;
-			}
-			
-			// Load the conf directory
-			File propertiesDir = new File(pluginDir, "conf");
-			Props pluginProps = null;
-			if (propertiesDir.exists() && propertiesDir.isDirectory()) {
-				File propertiesFile = new File(propertiesDir, "plugin.properties");
-				File propertiesOverrideFile = new File(propertiesDir, "override.properties");
-				
-				if (propertiesFile.exists()) {
-					if (propertiesOverrideFile.exists()) {
-						pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
-					}
-					else {
-						pluginProps = PropsUtils.loadProps(null, propertiesFile);
-					}
-				}
-				else {
-					logger.error("Plugin conf file " + propertiesFile + " not found.");
-					continue;
-				}
-			}
-			else {
-				logger.error("Plugin conf path " + propertiesDir + " not found.");
-				continue;
-			}
-			
-			String pluginName = pluginProps.getString("alerter.name");
-			List<String> extLibClasspath = pluginProps.getStringList("alerter.external.classpaths", (List<String>)null);
-			
-			String pluginClass = pluginProps.getString("alerter.class");
-			if (pluginClass == null) {
-				logger.error("Alerter class is not set.");
-			}
-			else {
-				logger.info("Plugin class " + pluginClass);
-			}
-			
-			URLClassLoader urlClassLoader = null;
-			File libDir = new File(pluginDir, "lib");
-			if (libDir.exists() && libDir.isDirectory()) {
-				File[] files = libDir.listFiles();
-				
-				ArrayList<URL> urls = new ArrayList<URL>();
-				for (int i=0; i < files.length; ++i) {
-					try {
-						URL url = files[i].toURI().toURL();
-						urls.add(url);
-					} catch (MalformedURLException e) {
-						logger.error(e);
-					}
-				}
-				if (extLibClasspath != null) {
-					for (String extLib : extLibClasspath) {
-						try {
-							File file = new File(pluginDir, extLib);
-							URL url = file.toURI().toURL();
-							urls.add(url);
-						} catch (MalformedURLException e) {
-							logger.error(e);
-						}
-					}
-				}
-				
-				urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
-			}
-			else {
-				logger.error("Library path " + propertiesDir + " not found.");
-				continue;
-			}
-			
-			Class<?> alerterClass = null;
-			try {
-				alerterClass = urlClassLoader.loadClass(pluginClass);
-			}
-			catch (ClassNotFoundException e) {
-				logger.error("Class " + pluginClass + " not found.");
-				continue;
-			}
+//	private Map<String, Alerter> loadAlerters(Props props) {
+//		Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
+//		// load built-in alerters
+//		Emailer mailAlerter = new Emailer(props);
+//		allAlerters.put("email", mailAlerter);
+//		// load all plugin alerters
+//		String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
+//		allAlerters.putAll(loadPluginAlerters(pluginDir));
+//		return allAlerters;
+//	}
 
-			String source = FileIOUtils.getSourcePathFromClass(alerterClass);
-			logger.info("Source jar " + source);
-			jarPaths.add("jar:file:" + source);
-			
-			Constructor<?> constructor = null;
-			try {
-				constructor = alerterClass.getConstructor(Props.class);
-			} catch (NoSuchMethodException e) {
-				logger.error("Constructor not found in " + pluginClass);
-				continue;
-			}
-			
-			Object obj = null;
-			try {
-				obj = constructor.newInstance(pluginProps);
-			} catch (Exception e) {
-				logger.error(e);
-			} 
-			
-			if (!(obj instanceof Alerter)) {
-				logger.error("The object is not an Alerter");
-				continue;
-			}
-			
-			Alerter plugin = (Alerter) obj;
-			installedAlerterPlugins.put(pluginName, plugin);
-		}
-		
-		return installedAlerterPlugins;
-		
-	}
+//	private Map<String, Alerter> loadPluginAlerters(String pluginPath) {
+//		File alerterPluginPath = new File(pluginPath);
+//		if (!alerterPluginPath.exists()) {
+//			return Collections.<String, Alerter>emptyMap();
+//		}
+//			
+//		Map<String, Alerter> installedAlerterPlugins = new HashMap<String, Alerter>();
+//		ClassLoader parentLoader = this.getClass().getClassLoader();
+//		File[] pluginDirs = alerterPluginPath.listFiles();
+//		ArrayList<String> jarPaths = new ArrayList<String>();
+//		for (File pluginDir: pluginDirs) {
+//			if (!pluginDir.isDirectory()) {
+//				logger.error("The plugin path " + pluginDir + " is not a directory.");
+//				continue;
+//			}
+//			
+//			// Load the conf directory
+//			File propertiesDir = new File(pluginDir, "conf");
+//			Props pluginProps = null;
+//			if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+//				File propertiesFile = new File(propertiesDir, "plugin.properties");
+//				File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+//				
+//				if (propertiesFile.exists()) {
+//					if (propertiesOverrideFile.exists()) {
+//						pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+//					}
+//					else {
+//						pluginProps = PropsUtils.loadProps(null, propertiesFile);
+//					}
+//				}
+//				else {
+//					logger.error("Plugin conf file " + propertiesFile + " not found.");
+//					continue;
+//				}
+//			}
+//			else {
+//				logger.error("Plugin conf path " + propertiesDir + " not found.");
+//				continue;
+//			}
+//			
+//			String pluginName = pluginProps.getString("alerter.name");
+//			List<String> extLibClasspath = pluginProps.getStringList("alerter.external.classpaths", (List<String>)null);
+//			
+//			String pluginClass = pluginProps.getString("alerter.class");
+//			if (pluginClass == null) {
+//				logger.error("Alerter class is not set.");
+//			}
+//			else {
+//				logger.info("Plugin class " + pluginClass);
+//			}
+//			
+//			URLClassLoader urlClassLoader = null;
+//			File libDir = new File(pluginDir, "lib");
+//			if (libDir.exists() && libDir.isDirectory()) {
+//				File[] files = libDir.listFiles();
+//				
+//				ArrayList<URL> urls = new ArrayList<URL>();
+//				for (int i=0; i < files.length; ++i) {
+//					try {
+//						URL url = files[i].toURI().toURL();
+//						urls.add(url);
+//					} catch (MalformedURLException e) {
+//						logger.error(e);
+//					}
+//				}
+//				if (extLibClasspath != null) {
+//					for (String extLib : extLibClasspath) {
+//						try {
+//							File file = new File(pluginDir, extLib);
+//							URL url = file.toURI().toURL();
+//							urls.add(url);
+//						} catch (MalformedURLException e) {
+//							logger.error(e);
+//						}
+//					}
+//				}
+//				
+//				urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+//			}
+//			else {
+//				logger.error("Library path " + propertiesDir + " not found.");
+//				continue;
+//			}
+//			
+//			Class<?> alerterClass = null;
+//			try {
+//				alerterClass = urlClassLoader.loadClass(pluginClass);
+//			}
+//			catch (ClassNotFoundException e) {
+//				logger.error("Class " + pluginClass + " not found.");
+//				continue;
+//			}
+//
+//			String source = FileIOUtils.getSourcePathFromClass(alerterClass);
+//			logger.info("Source jar " + source);
+//			jarPaths.add("jar:file:" + source);
+//			
+//			Constructor<?> constructor = null;
+//			try {
+//				constructor = alerterClass.getConstructor(Props.class);
+//			} catch (NoSuchMethodException e) {
+//				logger.error("Constructor not found in " + pluginClass);
+//				continue;
+//			}
+//			
+//			Object obj = null;
+//			try {
+//				obj = constructor.newInstance(pluginProps);
+//			} catch (Exception e) {
+//				logger.error(e);
+//			} 
+//			
+//			if (!(obj instanceof Alerter)) {
+//				logger.error("The object is not an Alerter");
+//				continue;
+//			}
+//			
+//			Alerter plugin = (Alerter) obj;
+//			installedAlerterPlugins.put(pluginName, plugin);
+//		}
+//		
+//		return installedAlerterPlugins;
+//		
+//	}
 
 //	private String getExecutorHost() {
 //		return executorHost;
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index 3ddab0c..85b55df 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -2,7 +2,6 @@ package azkaban.executor;
 
 import java.io.IOException;
 import java.lang.Thread.State;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java b/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java
index 67edbc9..1b63f26 100644
--- a/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java
+++ b/src/java/azkaban/jmx/JmxExecutorManagerAdapter.java
@@ -4,7 +4,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerAdapter;
 
 public class JmxExecutorManagerAdapter implements JmxExecutorManagerAdapterMBean {
diff --git a/src/java/azkaban/jmx/JmxTriggerManager.java b/src/java/azkaban/jmx/JmxTriggerManager.java
index e183558..7537b0b 100644
--- a/src/java/azkaban/jmx/JmxTriggerManager.java
+++ b/src/java/azkaban/jmx/JmxTriggerManager.java
@@ -1,11 +1,7 @@
 package azkaban.jmx;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.joda.time.DateTime;
 
-import azkaban.trigger.TriggerManager;
 import azkaban.trigger.TriggerManagerAdapter;
 import azkaban.trigger.TriggerManagerAdapter.TriggerJMX;
 
diff --git a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
index ecf9c21..c87fbd0 100644
--- a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
@@ -1,7 +1,5 @@
 package azkaban.jmx;
 
-import java.util.List;
-
 public interface JmxTriggerManagerMBean {	
 	
 	@DisplayName("OPERATION: getLastThreadCheckTime")
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 1a0b25d..08fcb5e 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -130,19 +130,33 @@ public class JobTypeManager
 			throw new JobTypeManagerException("Failed to get global jobtype properties" + e.getCause());
 		}
 		
-		for(File dir : jobPluginsDir.listFiles()) {
-			if(dir.isDirectory() && dir.canRead()) {
-				// get its conf file
-				try {
-					loadJob(dir, globalConf, globalSysConf);
-				}
-				catch (Exception e) {
-					logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
-					throw new JobTypeManagerException(e);
+		
+		synchronized (this) {
+			ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
+			try{
+				for(File dir : jobPluginsDir.listFiles()) {
+					if(dir.isDirectory() && dir.canRead()) {
+						// get its conf file
+						try {
+							loadJob(dir, globalConf, globalSysConf);
+							Thread.currentThread().setContextClassLoader(prevCl);
+						}
+						catch (Exception e) {
+							logger.error("Failed to load jobtype " + dir.getName() + e.getMessage());
+							throw new JobTypeManagerException(e);
+						}
+					}
 				}
+			} catch(Exception e) {
+				e.printStackTrace();
+				throw new JobTypeManagerException(e);
+			} catch(Throwable t) {
+				t.printStackTrace();
+				throw new JobTypeManagerException(t);
+			} finally {
+				Thread.currentThread().setContextClassLoader(prevCl);
 			}
-		}
-
+ 		}
 	}
 	
 	public static File findFilefromDir(File dir, String fn){
@@ -213,7 +227,6 @@ public class JobTypeManager
 		try {
 			if(confFile != null) {
 				conf = new Props(commonConf, confFile);
-//				conf = PropsUtils.resolveProps(conf);
 			}
 			else {
 				conf = new Props(commonConf);
@@ -236,18 +249,56 @@ public class JobTypeManager
 		logger.info("Loading jobtype " + jobtypeName );
 
 		// sysconf says what jars/confs to load
-		//List<String> jobtypeClasspath = sysConf.getStringList("jobtype.classpath", null, ",");
 		List<URL> resources = new ArrayList<URL>();		
-		for(File f : dir.listFiles()) {
-			try {
+		
+		try {
+			//first global classpath
+			logger.info("Adding global resources.");
+			List<String> typeGlobalClassPath = sysConf.getStringList("jobtype.global.classpath", null, ",");
+			if(typeGlobalClassPath != null) {
+				for(String jar : typeGlobalClassPath) {
+					URL cpItem = new File(jar).toURI().toURL();
+					if(!resources.contains(cpItem)) {
+						logger.info("adding to classpath " + cpItem);
+						resources.add(cpItem);
+					}
+				}
+			}
+			
+			//type specific classpath
+			logger.info("Adding type resources.");
+			List<String> typeClassPath = sysConf.getStringList("jobtype.classpath", null, ",");
+			if(typeClassPath != null) {
+				for(String jar : typeClassPath) {
+					URL cpItem = new File(jar).toURI().toURL();
+					if(!resources.contains(cpItem)) {
+						logger.info("adding to classpath " + cpItem);
+						resources.add(cpItem);
+					}
+				}
+			}			
+			List<String> jobtypeLibDirs = sysConf.getStringList("jobtype.lib.dir", null, ",");
+			if(jobtypeLibDirs != null) {
+				for(String libDir : jobtypeLibDirs) {
+					for(File f : new File(libDir).listFiles()) {
+						if(f.getName().endsWith(".jar")) {
+								resources.add(f.toURI().toURL());
+								logger.info("adding to classpath " + f.toURI().toURL());
+						}
+					}
+				}
+			}
+			
+			logger.info("Adding type override resources.");
+			for(File f : dir.listFiles()) {
 				if(f.getName().endsWith(".jar")) {
-					resources.add(f.toURI().toURL());
-					logger.info("adding to classpath " + f.toURI().toURL());
+						resources.add(f.toURI().toURL());
+						logger.info("adding to classpath " + f.toURI().toURL());
 				}
-			} catch (MalformedURLException e) {
-				// TODO Auto-generated catch block
-				throw new JobTypeManagerException(e);
 			}
+			
+		} catch (MalformedURLException e) {
+			throw new JobTypeManagerException(e);
 		}
 		
 		// each job type can have a different class loader
@@ -265,7 +316,7 @@ public class JobTypeManager
 		logger.info("Doing simple testing...");
 		try {
 			Props fakeSysProps = new Props(sysConf);
-			fakeSysProps.put("type", jobtypeName);
+//			fakeSysProps.put("type", jobtypeName);
 			Props fakeJobProps = new Props(conf);
 			@SuppressWarnings("unused")
 			Job job = (Job)Utils.callConstructor(clazz, "dummy", fakeSysProps, fakeJobProps, logger);
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index e4a4ab1..e69ed5e 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -16,15 +16,12 @@ import org.apache.log4j.Logger;
 
 import azkaban.flow.Flow;
 import azkaban.project.ProjectLogEvent.EventType;
-import azkaban.trigger.TriggerManager;
 import azkaban.user.Permission;
 import azkaban.user.User;
 import azkaban.user.Permission.Type;
 import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
-import azkaban.webapp.AzkabanWebServer;
-import azkaban.webapp.servlet.TriggerPlugin;
 
 public class ProjectManager {
 	private static final Logger logger = Logger.getLogger(ProjectManager.class);
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 2f529a6..434edfd 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -367,8 +367,8 @@ public class Schedule{
 		return null;
 	}
 	
+	@SuppressWarnings("unchecked")
 	public void createAndSetScheduleOptions(Object obj) {
-		@SuppressWarnings("unchecked")
 		HashMap<String, Object> schedObj = (HashMap<String, Object>)obj;
 		if (schedObj.containsKey("executionOptions")) {
 			ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("executionOptions"));
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 878eb28..ed41ce4 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -16,32 +16,18 @@
 
 package azkaban.scheduler;
 
-import java.lang.Thread.State;
 import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
-import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.executor.ExecutorManagerException;
-import azkaban.flow.Flow;
-import azkaban.project.Project;
-import azkaban.project.ProjectManager;
 import azkaban.sla.SlaOption;
 import azkaban.trigger.TriggerAgent;
 import azkaban.trigger.TriggerStatus;
@@ -65,10 +51,10 @@ public class ScheduleManager implements TriggerAgent {
 	private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
 	private Map<Pair<Integer, String>, Schedule> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Schedule>();
 	
-	private final ExecutorManagerAdapter executorManager;
-	
-	private ProjectManager projectManager = null;
-	
+//	private final ExecutorManagerAdapter executorManager;
+//	
+//	private ProjectManager projectManager = null;
+//	
 	// Used for mbeans to query Scheduler status
 //<<<<<<< HEAD
 //	
@@ -84,17 +70,16 @@ public class ScheduleManager implements TriggerAgent {
 	 * 
 	 * @param loader
 	 */
-	public ScheduleManager (ExecutorManagerAdapter executorManager,
-							ScheduleLoader loader) 
+	public ScheduleManager (ScheduleLoader loader) 
 	{
-		this.executorManager = executorManager;
+//		this.executorManager = executorManager;
 		this.loader = loader;
 		
 	}
 	
-	public void setProjectManager(ProjectManager projectManager) {
-		this.projectManager = projectManager;
-	}
+//	public void setProjectManager(ProjectManager projectManager) {
+//		this.projectManager = projectManager;
+//	}
 	
 	@Override
 	public void start() throws ScheduleManagerException {
@@ -318,10 +303,10 @@ public class ScheduleManager implements TriggerAgent {
 	 */
 	private synchronized void internalSchedule(Schedule s) {
 		//Schedule existing = scheduleIDMap.get(s.getScheduleId());
-		Schedule existing = null;
-		if(scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
-			existing = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
-		}
+//		Schedule existing = null;
+//		if(scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
+//			existing = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
+//		}
 
 		scheduleIDMap.put(s.getScheduleId(), s);
 //		Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
diff --git a/src/java/azkaban/scheduler/ScheduleStatisticManager.java b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
index 3aaf9ff..482334d 100644
--- a/src/java/azkaban/scheduler/ScheduleStatisticManager.java
+++ b/src/java/azkaban/scheduler/ScheduleStatisticManager.java
@@ -8,7 +8,6 @@ import java.util.List;
 import java.util.Map;
 
 import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 1f638d7..66bc178 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -6,15 +6,13 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
 
-import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.project.ProjectManager;
 import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
 import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerAdapter;
 import azkaban.trigger.TriggerManagerException;
 import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.trigger.builtin.ExecuteFlowAction;
@@ -23,29 +21,29 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	
 	private static Logger logger = Logger.getLogger(TriggerBasedScheduleLoader.class);
 	
-	private TriggerManager triggerManager;
+	private TriggerManagerAdapter triggerManager;
 	
 	private String triggerSource;
 	
-//	private Map<Integer, Trigger> triggersLocalCopy;
 	private long lastUpdateTime = -1;
 	
-	public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManagerAdapter executorManager, ProjectManager projectManager, String triggerSource) {
+	public TriggerBasedScheduleLoader(TriggerManager triggerManager, String triggerSource) {
 		this.triggerManager = triggerManager;
 		this.triggerSource = triggerSource;
-		// need to init the action types and condition checker types 
-		ExecuteFlowAction.setExecutorManager(executorManager);
-		ExecuteFlowAction.setProjectManager(projectManager);
+//		// need to init the action types and condition checker types 
+//		ExecuteFlowAction.setExecutorManager(executorManager);
+//		ExecuteFlowAction.setProjectManager(projectManager);
 	}
 	
 	private Trigger scheduleToTrigger(Schedule s) {
-		
-		Condition triggerCondition = createTimeTriggerCondition(s);
-		Condition expireCondition = createTimeExpireCondition(s);
+		Condition triggerCondition = createTriggerCondition(s);
+		Condition expireCondition = createExpireCondition(s);
 		List<TriggerAction> actions = createActions(s);
 		Trigger t = new Trigger(s.getScheduleId(), s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
 		if(s.isRecurring()) {
 			t.setResetOnTrigger(true);
+		} else {
+			t.setResetOnTrigger(false);
 		}
 		return t;
 	}
@@ -67,7 +65,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		return actions;
 	}
 	
-	private Condition createTimeTriggerCondition (Schedule s) {
+	private Condition createTriggerCondition (Schedule s) {
 		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
 		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
 		checkers.put(checker.getId(), checker);
@@ -77,7 +75,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	}
 	
 	// if failed to trigger, auto expire?
-	private Condition createTimeExpireCondition (Schedule s) {
+	private Condition createExpireCondition (Schedule s) {
 		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
 		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
 		checkers.put(checker.getId(), checker);
@@ -92,9 +90,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		try {
 			triggerManager.insertTrigger(t, t.getSubmitUser());
 			s.setScheduleId(t.getTriggerId());
-//			triggersLocalCopy.put(t.getTriggerId(), t);
 		} catch (TriggerManagerException e) {
-			// TODO Auto-generated catch block
 			throw new ScheduleManagerException("Failed to insert new schedule!", e);
 		}
 	}
@@ -104,9 +100,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		Trigger t = scheduleToTrigger(s);
 		try {
 			triggerManager.updateTrigger(t, t.getSubmitUser());
-//			triggersLocalCopy.put(t.getTriggerId(), t);
 		} catch (TriggerManagerException e) {
-			// TODO Auto-generated catch block
 			throw new ScheduleManagerException("Failed to update schedule!", e);
 		}
 	}
diff --git a/src/java/azkaban/sla/SlaOption.java b/src/java/azkaban/sla/SlaOption.java
index ad5b536..264a28c 100644
--- a/src/java/azkaban/sla/SlaOption.java
+++ b/src/java/azkaban/sla/SlaOption.java
@@ -5,9 +5,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.joda.time.ReadablePeriod;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
-import azkaban.utils.Utils;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutorManagerException;
 
 public class SlaOption {
 	
@@ -34,6 +37,8 @@ public class SlaOption {
 	private Map<String, Object> info;
 	private List<String> actions;
 	
+	private static DateTimeFormatter fmt = DateTimeFormat.forPattern("MM/dd, YYYY HH:mm");
+	
 	public SlaOption(
 			String type,
 			List<String> actions,
@@ -125,5 +130,36 @@ public class SlaOption {
 	public String toString() {
 		return "Sla of " + getType() +  getInfo() + getActions();
 	}
+	
+	public static String createSlaMessage(SlaOption slaOption, ExecutableFlow flow) {
+		String type = slaOption.getType();
+		int execId = flow.getExecutionId();
+		if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+			String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+			String basicinfo =  "SLA Alert: Your flow " + flowName + " failed to FINISH within " + duration + "</br>";
+			String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " is expected to FINISH within " + duration + " from " + fmt.print(new DateTime(flow.getStartTime())) + "</br>"; 
+			String actual = "Actual flow status is " + flow.getStatus();
+			return basicinfo + expected + actual;
+		} else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+			String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+			String basicinfo =  "SLA Alert: Your flow " + flowName + " failed to SUCCEED within " + duration + "</br>";
+			String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " expected to FINISH within " + duration + " from " + fmt.print(new DateTime(flow.getStartTime())) + "</br>"; 
+			String actual = "Actual flow status is " + flow.getStatus();
+			return basicinfo + expected + actual;
+		} else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
+			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+			return "SLA Alert: Your job " + jobName + " failed to FINISH within " + duration + " in execution " + execId;
+		} else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+			return "SLA Alert: Your job " + jobName + " failed to SUCCEED within " + duration + " in execution " + execId;
+		} else {
+			return "Unrecognized SLA type " + type;
+		}
+	}
+
 
 }
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 4ee2ef3..1a8b665 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -30,10 +30,10 @@ public class ActionTypeLoader {
 	public void init(Props props) throws TriggerException {
 		// load built-in actions
 		
-
-		loadBuiltinActions();
-		
-		loadPluginActions(props);
+//
+//		loadBuiltinActions();
+//		
+//		loadPluginActions(props);
 
 	}
 	
@@ -44,100 +44,100 @@ public class ActionTypeLoader {
 		}
 	}
 	
-	private void loadPluginActions(Props props) throws TriggerException {
-		String checkerDir = props.getString("azkaban.trigger.action.plugin.dir", DEFAULT_TRIGGER_ACTION_PLUGIN_DIR);
-		File pluginDir = new File(checkerDir);
-		if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
-			logger.info("No trigger action plugins to load.");
-			return;
-		}
-		
-		logger.info("Loading plugin trigger actions from " + pluginDir);
-		ClassLoader parentCl = this.getClass().getClassLoader();
-		
-		Props globalActionConf = null;
-		File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
-		try {
-			if(confFile != null) {
-				globalActionConf = new Props(null, confFile);
-			} else {
-				globalActionConf = new Props();
-			}
-		} catch (IOException e) {
-			throw new TriggerException("Failed to get global properties." + e);
-		}
-		
-		for(File dir : pluginDir.listFiles()) {
-			if(dir.isDirectory() && dir.canRead()) {
-				try {
-					loadPluginTypes(globalActionConf, pluginDir, parentCl);
-				} catch (Exception e) {
-					logger.info("Plugin actions failed to load. " + e.getCause());
-					throw new TriggerException("Failed to load all trigger actions!", e);
-				}
-			}
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
-		Props actionConf = null;
-		File confFile = Utils.findFilefromDir(dir, ACTIONTYPECONFFILE);
-		if(confFile == null) {
-			logger.info("No action type found in " + dir.getAbsolutePath());
-			return;
-		}
-		try {
-			actionConf = new Props(globalConf, confFile);
-		} catch (IOException e) {
-			throw new TriggerException("Failed to load config for the action type", e);
-		}
-		
-		String actionName = dir.getName();
-		String actionClass = actionConf.getString("action.class");
-		
-		List<URL> resources = new ArrayList<URL>();		
-		for(File f : dir.listFiles()) {
-			try {
-				if(f.getName().endsWith(".jar")) {
-					resources.add(f.toURI().toURL());
-					logger.info("adding to classpath " + f.toURI().toURL());
-				}
-			} catch (MalformedURLException e) {
-				// TODO Auto-generated catch block
-				throw new TriggerException(e);
-			}
-		}
-		
-		// each job type can have a different class loader
-		ClassLoader actionCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
-		
-		Class<? extends TriggerAction> clazz = null;
-		try {
-			clazz = (Class<? extends TriggerAction>)actionCl.loadClass(actionClass);
-			actionToClass.put(actionName, clazz);
-		}
-		catch (ClassNotFoundException e) {
-			throw new TriggerException(e);
-		}
-		
-		if(actionConf.getBoolean("need.init")) {
-			try {
-				Utils.invokeStaticMethod(actionCl, actionClass, "init", actionConf);
-			} catch (Exception e) {
-				e.printStackTrace();
-				logger.error("Failed to init the action type " + actionName);
-				throw new TriggerException(e);
-			}
-		}
-		
-		logger.info("Loaded action type " + actionName + " " + actionClass);
-	}
-	
-	private void loadBuiltinActions() {
-		actionToClass.put(ExecuteFlowAction.type, ExecuteFlowAction.class);		
-		logger.info("Loaded ExecuteFlowAction type.");
-	}
+//	private void loadPluginActions(Props props) throws TriggerException {
+//		String checkerDir = props.getString("azkaban.trigger.action.plugin.dir", DEFAULT_TRIGGER_ACTION_PLUGIN_DIR);
+//		File pluginDir = new File(checkerDir);
+//		if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
+//			logger.info("No trigger action plugins to load.");
+//			return;
+//		}
+//		
+//		logger.info("Loading plugin trigger actions from " + pluginDir);
+//		ClassLoader parentCl = this.getClass().getClassLoader();
+//		
+//		Props globalActionConf = null;
+//		File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
+//		try {
+//			if(confFile != null) {
+//				globalActionConf = new Props(null, confFile);
+//			} else {
+//				globalActionConf = new Props();
+//			}
+//		} catch (IOException e) {
+//			throw new TriggerException("Failed to get global properties." + e);
+//		}
+//		
+//		for(File dir : pluginDir.listFiles()) {
+//			if(dir.isDirectory() && dir.canRead()) {
+//				try {
+//					loadPluginTypes(globalActionConf, pluginDir, parentCl);
+//				} catch (Exception e) {
+//					logger.info("Plugin actions failed to load. " + e.getCause());
+//					throw new TriggerException("Failed to load all trigger actions!", e);
+//				}
+//			}
+//		}
+//	}
+//	
+//	@SuppressWarnings("unchecked")
+//	private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
+//		Props actionConf = null;
+//		File confFile = Utils.findFilefromDir(dir, ACTIONTYPECONFFILE);
+//		if(confFile == null) {
+//			logger.info("No action type found in " + dir.getAbsolutePath());
+//			return;
+//		}
+//		try {
+//			actionConf = new Props(globalConf, confFile);
+//		} catch (IOException e) {
+//			throw new TriggerException("Failed to load config for the action type", e);
+//		}
+//		
+//		String actionName = dir.getName();
+//		String actionClass = actionConf.getString("action.class");
+//		
+//		List<URL> resources = new ArrayList<URL>();		
+//		for(File f : dir.listFiles()) {
+//			try {
+//				if(f.getName().endsWith(".jar")) {
+//					resources.add(f.toURI().toURL());
+//					logger.info("adding to classpath " + f.toURI().toURL());
+//				}
+//			} catch (MalformedURLException e) {
+//				// TODO Auto-generated catch block
+//				throw new TriggerException(e);
+//			}
+//		}
+//		
+//		// each job type can have a different class loader
+//		ClassLoader actionCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
+//		
+//		Class<? extends TriggerAction> clazz = null;
+//		try {
+//			clazz = (Class<? extends TriggerAction>)actionCl.loadClass(actionClass);
+//			actionToClass.put(actionName, clazz);
+//		}
+//		catch (ClassNotFoundException e) {
+//			throw new TriggerException(e);
+//		}
+//		
+//		if(actionConf.getBoolean("need.init")) {
+//			try {
+//				Utils.invokeStaticMethod(actionCl, actionClass, "init", actionConf);
+//			} catch (Exception e) {
+//				e.printStackTrace();
+//				logger.error("Failed to init the action type " + actionName);
+//				throw new TriggerException(e);
+//			}
+//		}
+//		
+//		logger.info("Loaded action type " + actionName + " " + actionClass);
+//	}
+//	
+//	private void loadBuiltinActions() {
+//		actionToClass.put(ExecuteFlowAction.type, ExecuteFlowAction.class);		
+//		logger.info("Loaded ExecuteFlowAction type.");
+//	}
 	
 	public static void registerBuiltinActions(Map<String, Class<? extends TriggerAction>> builtinActions) {
 		actionToClass.putAll(builtinActions);
diff --git a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
index 13eaf27..8dbc1cc 100644
--- a/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
+++ b/src/java/azkaban/trigger/builtin/BasicTimeChecker.java
@@ -64,19 +64,6 @@ public class BasicTimeChecker implements ConditionChecker {
 		return nextCheckTime;
 	}
 	
-//	public BasicTimeChecker(
-//			DateTime firstCheckTime,
-//			Boolean isRecurring, 
-//			Boolean skipPastChecks,
-//			String period) {
-//		this.firstCheckTime = firstCheckTime;
-//		this.isRecurring = isRecurring;
-//		this.skipPastChecks = skipPastChecks;
-//		this.period = parsePeriodString(period);
-//		this.nextCheckTime = new DateTime(firstCheckTime);
-//		this.nextCheckTime = calculateNextCheckTime();
-//	}
-	
 	public BasicTimeChecker(
 			String id,
 			long firstCheckTime,
@@ -102,23 +89,10 @@ public class BasicTimeChecker implements ConditionChecker {
 	@Override
 	public void reset() {
 		this.nextCheckTime = calculateNextCheckTime();
-		
 	}
 	
-	/*
-	 * TimeChecker format:
-	 * type_first-time-in-millis_next-time-in-millis_timezone_is-recurring_skip-past-checks_period
-	 */
 	@Override
 	public String getId() {
-//		return getType() + "$" +
-//				firstCheckTime.getMillis() + "$" +
-//				nextCheckTime.getMillis() + "$" +
-//				firstCheckTime.getZone().getID().replace('/', '0') + "$" +
-//				//"offset"+firstCheckTime.getZone().getOffset(firstCheckTime.getMillis()) + "_" +
-//				(isRecurring == true ? "1" : "0") + "$" +
-//				(skipPastChecks == true ? "1" : "0") + "$" +
-//				createPeriodString(period);
 		return id;
 	}
 
@@ -129,21 +103,6 @@ public class BasicTimeChecker implements ConditionChecker {
 
 	@SuppressWarnings("unchecked")
 	public static BasicTimeChecker createFromJson(Object obj) throws Exception {
-//		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
-//		if(!jsonObj.get("type").equals(type)) {
-//			throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
-//		}
-//		long firstCheckTime = Long.valueOf((String)jsonObj.get("firstCheckTime"));
-//		String timezoneId = (String) jsonObj.get("timezone");
-//		long nextCheckTime = Long.valueOf((String)jsonObj.get("nextCheckTime"));
-//		DateTimeZone timezone = DateTimeZone.forID(timezoneId);
-////		DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
-////		DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
-//		boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
-//		boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
-//		ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
-//		String id = (String) jsonObj.get("id");
-//		return new BasicTimeChecker(id, firstCheckTime, timezone, nextCheckTime, isRecurring, skipPastChecks, period);
 		return createFromJson((HashMap<String, Object>)obj);
 	}
 	
@@ -156,8 +115,6 @@ public class BasicTimeChecker implements ConditionChecker {
 		String timezoneId = (String) jsonObj.get("timezone");
 		long nextCheckTime = Long.valueOf((String) jsonObj.get("nextCheckTime"));
 		DateTimeZone timezone = DateTimeZone.forID(timezoneId);
-//		DateTime firstCheckTime = new DateTime(firstTimeMillis).withZone(timezone);
-//		DateTime nextCheckTime = new DateTime(nextTimeMillis).withZone(timezone);
 		boolean isRecurring = Boolean.valueOf((String)jsonObj.get("isRecurring"));
 		boolean skipPastChecks = Boolean.valueOf((String)jsonObj.get("skipPastChecks"));
 		ReadablePeriod period = Utils.parsePeriodString((String)jsonObj.get("period"));
@@ -175,44 +132,6 @@ public class BasicTimeChecker implements ConditionChecker {
 		return createFromJson(obj);
 	}
 	
-//	public static ConditionChecker createFromJson(String obj) {
-//		String str = (String) obj;
-//		String[] parts = str.split("\\$");
-//		
-//		if(!parts[0].equals(type)) {
-//			throw new RuntimeException("Cannot create checker of " + type + " from " + parts[0]);
-//		}
-//		
-//		long firstMillis = Long.parseLong(parts[1]);
-//		long nextMillis = Long.parseLong(parts[2]);
-//		//DateTimeZone timezone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(parts[3]));
-//		DateTimeZone timezone = DateTimeZone.forID(parts[3].replace('0', '/'));
-//		boolean isRecurring = parts[4].equals("1") ? true : false;
-//		boolean skipPastChecks = parts[5].equals("1") ? true : false;
-//		ReadablePeriod period = parsePeriodString(parts[6]);
-//		
-//		return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
-//	}
-//	
-//	@Override
-//	public ConditionChecker fromJson(Object obj) {
-//		String str = (String) obj;
-//		String[] parts = str.split("_");
-//		
-//		if(!parts[0].equals(getType())) {
-//			throw new RuntimeException("Cannot create checker of " + getType() + " from " + parts[0]);
-//		}
-//		
-//		long firstMillis = Long.parseLong(parts[1]);
-//		long nextMillis = Long.parseLong(parts[2]);
-//		DateTimeZone timezone = DateTimeZone.forID(parts[3]);
-//		boolean isRecurring = Boolean.valueOf(parts[4]);
-//		boolean skipPastChecks = Boolean.valueOf(parts[5]);
-//		ReadablePeriod period = parsePeriodString(parts[6]);
-//		
-//		return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
-//	}
-	
 	private void updateNextCheckTime(){
 		nextCheckTime = calculateNextCheckTime();
 	}
@@ -224,7 +143,6 @@ public class BasicTimeChecker implements ConditionChecker {
 			if(count > 100000) {
 				throw new IllegalStateException("100000 increments of period did not get to present time.");
 			}
-			
 			if(period == null) {
 				break;
 			}else {
@@ -240,7 +158,6 @@ public class BasicTimeChecker implements ConditionChecker {
 	
 	@Override
 	public Object getNum() {
-		// TODO Auto-generated method stub
 		return null;
 	}
 
@@ -266,8 +183,6 @@ public class BasicTimeChecker implements ConditionChecker {
 
 	@Override
 	public void setContext(Map<String, Object> context) {
-		// TODO Auto-generated method stub
-		
 	}
 
 }
diff --git a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 6124b22..f19adfe 100644
--- a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -9,9 +9,9 @@ import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
@@ -38,7 +38,6 @@ public class ExecuteFlowAction implements TriggerAction {
 	private static ProjectManager projectManager;
 	private ExecutionOptions executionOptions = new ExecutionOptions();
 	private List<SlaOption> slaOptions;
-	private Map<String, Object> context;
 	
 	private static Logger logger = Logger.getLogger(ExecuteFlowAction.class);
 	
@@ -64,7 +63,7 @@ public class ExecuteFlowAction implements TriggerAction {
 		return projectId;
 	}
 
-	public void setProjectId(int projectId) {
+	protected void setProjectId(int projectId) {
 		this.projectId = projectId;
 	}
 
@@ -72,7 +71,7 @@ public class ExecuteFlowAction implements TriggerAction {
 		return flowName;
 	}
 
-	public void setFlowName(String flowName) {
+	protected void setFlowName(String flowName) {
 		this.flowName = flowName;
 	}
 
@@ -80,7 +79,7 @@ public class ExecuteFlowAction implements TriggerAction {
 		return submitUser;
 	}
 
-	public void setSubmitUser(String submitUser) {
+	protected void setSubmitUser(String submitUser) {
 		this.submitUser = submitUser;
 	}
 
@@ -88,7 +87,7 @@ public class ExecuteFlowAction implements TriggerAction {
 		return executionOptions;
 	}
 
-	public void setExecutionOptions(ExecutionOptions executionOptions) {
+	protected void setExecutionOptions(ExecutionOptions executionOptions) {
 		this.executionOptions = executionOptions;
 	}
 	
@@ -96,7 +95,7 @@ public class ExecuteFlowAction implements TriggerAction {
 		return slaOptions;
 	}
 
-	public void setSlaOptions(List<SlaOption> slaOptions) {
+	protected void setSlaOptions(List<SlaOption> slaOptions) {
 		this.slaOptions = slaOptions;
 	}
 
@@ -216,9 +215,9 @@ public class ExecuteFlowAction implements TriggerAction {
 		
 		try{
 			executorManager.submitExecutableFlow(exflow, submitUser);
-			Map<String, Object> outputProps = new HashMap<String, Object>();
-			outputProps.put(EXEC_ID, exflow.getExecutionId());
-			context.put(actionId, outputProps);
+//			Map<String, Object> outputProps = new HashMap<String, Object>();
+//			outputProps.put(EXEC_ID, exflow.getExecutionId());
+//			context.put(actionId, outputProps);
 			logger.info("Invoked flow " + project.getName() + "." + flowName);
 		} catch (ExecutorManagerException e) {
 			throw new RuntimeException(e);
@@ -229,14 +228,15 @@ public class ExecuteFlowAction implements TriggerAction {
 			int execId = exflow.getExecutionId();
 			for(SlaOption sla : slaOptions) {
 				logger.info("Adding sla trigger " + sla.toString());
-				SlaChecker slaFailChecker = new SlaChecker("slaFailChecker", sla, execId, false);
-				Map<String, ConditionChecker> failCheckers = new HashMap<String, ConditionChecker>();
-				failCheckers.put(slaFailChecker.getId(), slaFailChecker);
-				Condition triggerCond = new Condition(failCheckers, slaFailChecker.getId() + ".eval()");
-				SlaChecker slaPassChecker = new SlaChecker("slaPassChecker", sla, execId, true);
-				Map<String, ConditionChecker> passCheckers = new HashMap<String, ConditionChecker>();
-				passCheckers.put(slaPassChecker.getId(), slaPassChecker);
-				Condition expireCond = new Condition(passCheckers, slaPassChecker.getId() + ".eval()");
+				SlaChecker slaChecker = new SlaChecker("slaChecker", sla, execId);
+				Map<String, ConditionChecker> slaCheckers = new HashMap<String, ConditionChecker>();
+				slaCheckers.put(slaChecker.getId(), slaChecker);
+				Condition triggerCond = new Condition(slaCheckers, slaChecker.getId() + ".eval()");
+				// if whole flow finish before violate sla, just abort
+				ExecutionChecker execChecker = new ExecutionChecker("execChecker", execId, null, Status.SUCCEEDED);
+				Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
+				expireCheckers.put(execChecker.getId(), execChecker);
+				Condition expireCond = new Condition(expireCheckers, execChecker.getId() + ".eval()");
 				List<TriggerAction> actions = new ArrayList<TriggerAction>();
 				List<String> slaActions = sla.getActions();
 				for(String act : slaActions) {
@@ -265,7 +265,6 @@ public class ExecuteFlowAction implements TriggerAction {
 
 	@Override
 	public void setContext(Map<String, Object> context) {
-		this.context = context;
 	}
 
 	@Override
@@ -273,5 +272,4 @@ public class ExecuteFlowAction implements TriggerAction {
 		return actionId;
 	}
 
-
 }
diff --git a/src/java/azkaban/trigger/builtin/ExecutionChecker.java b/src/java/azkaban/trigger/builtin/ExecutionChecker.java
new file mode 100644
index 0000000..2906b5b
--- /dev/null
+++ b/src/java/azkaban/trigger/builtin/ExecutionChecker.java
@@ -0,0 +1,123 @@
+package azkaban.trigger.builtin;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorManagerAdapter;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.Status;
+import azkaban.trigger.ConditionChecker;
+
+public class ExecutionChecker implements ConditionChecker{
+
+	public static final String type = "ExecutionChecker";
+	public static ExecutorManagerAdapter executorManager;
+	
+	private String checkerId;
+	private int execId;
+	private String jobName;
+	private Status wantedStatus;
+	
+	public ExecutionChecker(String checkerId, int execId, String jobName, Status wantedStatus) {
+		this.checkerId = checkerId;
+		this.execId = execId;
+		this.jobName = jobName;
+		this.wantedStatus = wantedStatus;
+	}
+	
+	public static void setExecutorManager(ExecutorManagerAdapter em) {
+		executorManager = em;
+	}
+	
+	@Override
+	public Object eval() {
+		ExecutableFlow exflow;
+		try {
+			exflow = executorManager.getExecutableFlow(execId);
+		} catch (ExecutorManagerException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			return Boolean.FALSE;
+		}
+		if(jobName != null) {
+			ExecutableNode job = exflow.getExecutableNode(jobName);
+			if(job != null) {
+				return job.getStatus().equals(wantedStatus);
+			} else {
+				return Boolean.FALSE;
+			}
+		} else {
+			return exflow.getStatus().equals(wantedStatus);
+		}
+		
+	}
+
+	@Override
+	public Object getNum() {
+		return null;
+	}
+
+	@Override
+	public void reset() {
+	}
+
+	@Override
+	public String getId() {
+		return checkerId;
+	}
+
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	public static ExecutionChecker createFromJson(HashMap<String, Object> jsonObj) throws Exception {
+		if(!jsonObj.get("type").equals(type)) {
+			throw new Exception("Cannot create checker of " + type + " from " + jsonObj.get("type"));
+		}
+		int execId = Integer.valueOf((String) jsonObj.get("execId"));
+		String jobName = null;
+		if(jsonObj.containsKey("jobName")) {
+			jobName = (String) jsonObj.get("jobName");
+		}
+		String checkerId = (String) jsonObj.get("checkerId");
+		Status wantedStatus = Status.valueOf((String)jsonObj.get("wantedStatus"));
+
+		return new ExecutionChecker(checkerId, execId, jobName, wantedStatus);
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public ConditionChecker fromJson(Object obj) throws Exception {
+		return createFromJson((HashMap<String, Object>) obj);
+	}
+
+	@Override
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("type", type);
+		jsonObj.put("execId", String.valueOf(execId));
+		if(jobName != null) {
+			jsonObj.put("jobName", jobName);
+		}
+		jsonObj.put("wantedStatus", wantedStatus.toString());
+		jsonObj.put("checkerId", checkerId);
+		return jsonObj;
+	}
+
+	@Override
+	public void stopChecker() {
+	}
+
+	@Override
+	public void setContext(Map<String, Object> context) {
+	}
+
+	@Override
+	public long getNextCheckTime() {
+		return -1;
+	}
+
+}
diff --git a/src/java/azkaban/trigger/builtin/KillExecutionAction.java b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
index 0dabe73..74633e9 100644
--- a/src/java/azkaban/trigger/builtin/KillExecutionAction.java
+++ b/src/java/azkaban/trigger/builtin/KillExecutionAction.java
@@ -6,7 +6,6 @@ import java.util.Map;
 import org.apache.log4j.Logger;
 
 import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.trigger.TriggerAction;
 
diff --git a/src/java/azkaban/trigger/builtin/SendEmailAction.java b/src/java/azkaban/trigger/builtin/SendEmailAction.java
index ec946c2..c1b6efc 100644
--- a/src/java/azkaban/trigger/builtin/SendEmailAction.java
+++ b/src/java/azkaban/trigger/builtin/SendEmailAction.java
@@ -1,25 +1,17 @@
 package azkaban.trigger.builtin;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.log4j.Logger;
-
-import azkaban.sla.SlaOption;
-import azkaban.trigger.Trigger;
 import azkaban.trigger.TriggerAction;
 import azkaban.utils.AbstractMailer;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
 
 public class SendEmailAction implements TriggerAction {
-
-	private static final Logger logger = Logger.getLogger(SendEmailAction.class);
 	
 	private String actionId;
-	private Map<String, Object> context;
 	private static AbstractMailer mailer;
 	private String message;
 	public static final String type = "SendEmailAction";
diff --git a/src/java/azkaban/trigger/builtin/SlaAlertAction.java b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
index 11c6bd5..82d69c5 100644
--- a/src/java/azkaban/trigger/builtin/SlaAlertAction.java
+++ b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -1,29 +1,16 @@
 package azkaban.trigger.builtin;
 
-import java.io.File;
-import java.lang.reflect.Constructor;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
 
+import azkaban.alert.Alerter;
 import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutorMailer;
-import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.executor.ExecutorManager.Alerter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.sla.SlaOption;
 import azkaban.trigger.TriggerAction;
-import azkaban.utils.FileIOUtils;
-import azkaban.utils.Props;
-import azkaban.utils.PropsUtils;
 
 public class SlaAlertAction implements TriggerAction{
 
@@ -35,8 +22,7 @@ public class SlaAlertAction implements TriggerAction{
 	private SlaOption slaOption;
 	private int execId;
 //	private List<Map<String, Object>> alerts;
-	private static Map<String, Alerter> alerters;
-	private Map<String, Object> context;
+	private static Map<String, azkaban.alert.Alerter> alerters;
 	private static ExecutorManagerAdapter executorManager;
 
 	public SlaAlertAction(String id, SlaOption slaOption, int execId) {
@@ -108,7 +94,8 @@ public class SlaAlertAction implements TriggerAction{
 				Alerter alerter = alerters.get(alertType);
 				if(alerter != null) {
 					try {
-						alerter.alertOnSla(slaOption, createSlaMessage());
+						ExecutableFlow flow = executorManager.getExecutableFlow(execId);
+						alerter.alertOnSla(slaOption, SlaOption.createSlaMessage(slaOption, flow));
 					} catch (Exception e) {
 						// TODO Auto-generated catch block
 						e.printStackTrace();
@@ -122,45 +109,44 @@ public class SlaAlertAction implements TriggerAction{
 //		}
 	}
 
-	private String createSlaMessage() {
-		ExecutableFlow flow = null;
-		try {
-			flow = executorManager.getExecutableFlow(execId);
-		} catch (ExecutorManagerException e) {
-			e.printStackTrace();
-			logger.error("Failed to get executable flow.");
-		}
-		String type = slaOption.getType();
-		if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
-			String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
-			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
-			String basicinfo =  "SLA Alert: Your flow " + flowName + " failed to FINISH within " + duration + "</br>";
-			String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " is expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>"; 
-			String actual = "Actual flow status is " + flow.getStatus();
-			return basicinfo + expected + actual;
-		} else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
-			String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
-			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
-			String basicinfo =  "SLA Alert: Your flow " + flowName + " failed to SUCCEED within " + duration + "</br>";
-			String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>"; 
-			String actual = "Actual flow status is " + flow.getStatus();
-			return basicinfo + expected + actual;
-		} else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
-			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
-			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
-			return "SLA Alert: Your job " + jobName + " failed to FINISH within " + duration + " in execution " + execId;
-		} else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
-			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
-			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
-			return "SLA Alert: Your job " + jobName + " failed to SUCCEED within " + duration + " in execution " + execId;
-		} else {
-			return "Unrecognized SLA type " + type;
-		}
-	}
+//	private String createSlaMessage() {
+//		ExecutableFlow flow = null;
+//		try {
+//			flow = executorManager.getExecutableFlow(execId);
+//		} catch (ExecutorManagerException e) {
+//			e.printStackTrace();
+//			logger.error("Failed to get executable flow.");
+//		}
+//		String type = slaOption.getType();
+//		if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+//			String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+//			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+//			String basicinfo =  "SLA Alert: Your flow " + flowName + " failed to FINISH within " + duration + "</br>";
+//			String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " is expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>"; 
+//			String actual = "Actual flow status is " + flow.getStatus();
+//			return basicinfo + expected + actual;
+//		} else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+//			String flowName = (String) slaOption.getInfo().get(SlaOption.INFO_FLOW_NAME);
+//			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+//			String basicinfo =  "SLA Alert: Your flow " + flowName + " failed to SUCCEED within " + duration + "</br>";
+//			String expected = "Here is details : </br>" + "Flow " + flowName + " in execution " + execId + " expected to FINISH within " + duration + " from " + flow.getStartTime() + "</br>"; 
+//			String actual = "Actual flow status is " + flow.getStatus();
+//			return basicinfo + expected + actual;
+//		} else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
+//			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+//			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+//			return "SLA Alert: Your job " + jobName + " failed to FINISH within " + duration + " in execution " + execId;
+//		} else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+//			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME);
+//			String duration = (String) slaOption.getInfo().get(SlaOption.INFO_DURATION);
+//			return "SLA Alert: Your job " + jobName + " failed to SUCCEED within " + duration + " in execution " + execId;
+//		} else {
+//			return "Unrecognized SLA type " + type;
+//		}
+//	}
 
 	@Override
 	public void setContext(Map<String, Object> context) {
-		this.context = context;
 	}
 
 	@Override
diff --git a/src/java/azkaban/trigger/builtin/SlaChecker.java b/src/java/azkaban/trigger/builtin/SlaChecker.java
index 25ca9b3..0219eab 100644
--- a/src/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/src/java/azkaban/trigger/builtin/SlaChecker.java
@@ -24,38 +24,26 @@ public class SlaChecker implements ConditionChecker{
 	private String id;
 	private SlaOption slaOption;
 	private int execId;
-	private Map<String, Object> context;
-	private boolean passChecker = true;
 	private long checkTime = -1;
 	
 	private static ExecutorManagerAdapter executorManager;
 	
-	public SlaChecker(String id, SlaOption slaOption, int execId, boolean passChecker) {
+	public SlaChecker(String id, SlaOption slaOption, int execId) {
 		this.id = id;
 		this.slaOption = slaOption;
 		this.execId = execId;
-		this.passChecker = passChecker;
-	}
-	
-	public SlaChecker(String id, SlaOption sla, String executionActionId, boolean passChecker) {
-		Map<String, Object> executeActionProps = (Map<String, Object>) context.get(executionActionId);
-		int execId = Integer.valueOf((String) executeActionProps.get(ExecuteFlowAction.EXEC_ID));
-		this.id = id;
-		this.slaOption = sla;
-		this.execId = execId;
-		this.passChecker = passChecker;
 	}
 
 	public static void setExecutorManager(ExecutorManagerAdapter em) {
 		executorManager = em;
 	}
 	
-	private Boolean metSla(ExecutableFlow flow) {
+	private Boolean violateSla(ExecutableFlow flow) {
 		String type = slaOption.getType();
 		logger.info("Checking for " + flow.getExecutionId() + " with sla " + type);
 		logger.info("flow is " + flow.getStatus());
 		if(flow.getStartTime() < 0) {
-			return null;
+			return Boolean.FALSE;
 		}
 		if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
 			ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
@@ -65,9 +53,9 @@ public class SlaChecker implements ConditionChecker{
 			if(checkTime.isBeforeNow()) {
 				Status status = flow.getStatus();
 				if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
-					return Boolean.TRUE;
-				} else {
 					return Boolean.FALSE;
+				} else {
+					return Boolean.TRUE;
 				}
 			}
 		} else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
@@ -78,9 +66,9 @@ public class SlaChecker implements ConditionChecker{
 			if(checkTime.isBeforeNow()) {
 				Status status = flow.getStatus();
 				if(status.equals(Status.SUCCEEDED)) {
-					return Boolean.TRUE;
-				} else {
 					return Boolean.FALSE;
+				} else {
+					return Boolean.TRUE;
 				}
 			}
 		} else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
@@ -94,9 +82,9 @@ public class SlaChecker implements ConditionChecker{
 				if(checkTime.isBeforeNow()) {
 					Status status = node.getStatus();
 					if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
-						return Boolean.TRUE;
-					} else {
 						return Boolean.FALSE;
+					} else {
+						return Boolean.TRUE;
 					}
 				}
 			}
@@ -111,9 +99,9 @@ public class SlaChecker implements ConditionChecker{
 				if(checkTime.isBeforeNow()) {
 					Status status = node.getStatus();
 					if(status.equals(Status.SUCCEEDED)) {
-						return Boolean.TRUE;
-					} else {
 						return Boolean.FALSE;
+					} else {
+						return Boolean.TRUE;
 					}
 				}
 			}
@@ -137,10 +125,10 @@ public class SlaChecker implements ConditionChecker{
 //				return Boolean.FALSE;
 //			}
 //		}
-		return null;
+		return Boolean.FALSE;
 	}
 	
-	// return true for should do sla actions
+	// return true to trigger sla action
 	@Override
 	public Object eval() {
 		ExecutableFlow flow;
@@ -149,30 +137,19 @@ public class SlaChecker implements ConditionChecker{
 		} catch (ExecutorManagerException e) {
 			logger.error("Can't get executable flow.", e);
 			e.printStackTrace();
+			// something wrong, send out alerts
 			return Boolean.TRUE;
 		}
-		Boolean metSla = metSla(flow);
-		if(metSla == null) {
-			return Boolean.FALSE;
-		} else {
-			if(passChecker) {
-				return metSla;
-			} else {
-				return !metSla;
-			}
-		}
+		return violateSla(flow);
 	}
 
 	@Override
 	public Object getNum() {
-		// TODO Auto-generated method stub
 		return null;
 	}
 
 	@Override
 	public void reset() {
-		// TODO Auto-generated method stub
-		
 	}
 
 	@Override
@@ -204,8 +181,7 @@ public class SlaChecker implements ConditionChecker{
 		String id = (String) jsonObj.get("id");
 		SlaOption slaOption = SlaOption.fromObject(jsonObj.get("slaOption"));
 		int execId = Integer.valueOf((String) jsonObj.get("execId"));
-		boolean passChecker = Boolean.valueOf((Boolean) jsonObj.get("passChecker"));
-		return new SlaChecker(id, slaOption, execId, passChecker);
+		return new SlaChecker(id, slaOption, execId);
 	}
 	
 	@Override
@@ -215,7 +191,6 @@ public class SlaChecker implements ConditionChecker{
 		jsonObj.put("id", id);
 		jsonObj.put("slaOption", slaOption.toObject());
 		jsonObj.put("execId", String.valueOf(execId));
-		jsonObj.put("passChecker", passChecker);
 	
 		return jsonObj;
 	}
@@ -227,7 +202,6 @@ public class SlaChecker implements ConditionChecker{
 
 	@Override
 	public void setContext(Map<String, Object> context) {
-		this.context = context;
 	}
 
 	@Override
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index 45fc8ff..bc5c06a 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -31,10 +31,10 @@ public class CheckerTypeLoader {
 		
 		
 		// load built-in checkers
-		
-		loadBuiltinCheckers();
-		
-		loadPluginCheckers(props);
+//		
+//		loadBuiltinCheckers();
+//		
+//		loadPluginCheckers(props);
 
 	}
 	
@@ -45,96 +45,96 @@ public class CheckerTypeLoader {
 		}
 	}
 	
-	private void loadPluginCheckers(Props props) throws TriggerException {
-		
-		String checkerDir = props.getString("azkaban.condition.checker.plugin.dir", DEFAULT_CONDITION_CHECKER_PLUGIN_DIR);
-		File pluginDir = new File(checkerDir);
-		if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
-			logger.info("No conditon checker plugins to load.");
-			return;
-		}
-		
-		logger.info("Loading plugin condition checkers from " + pluginDir);
-		ClassLoader parentCl = this.getClass().getClassLoader();
-		
-		Props globalCheckerConf = null;
-		File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
-		try {
-			if(confFile != null) {
-				globalCheckerConf = new Props(null, confFile);
-			} else {
-				globalCheckerConf = new Props();
-			}
-		} catch (IOException e) {
-			throw new TriggerException("Failed to get global properties." + e);
-		}
-		
-		for(File dir : pluginDir.listFiles()) {
-			if(dir.isDirectory() && dir.canRead()) {
-				try {
-					loadPluginTypes(globalCheckerConf, pluginDir, parentCl);
-				} catch (Exception e) {
-					logger.info("Plugin checkers failed to load. " + e.getCause());
-					throw new TriggerException("Failed to load all condition checkers!", e);
-				}
-			}
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
-		Props checkerConf = null;
-		File confFile = Utils.findFilefromDir(dir, CHECKERTYPECONFFILE);
-		if(confFile == null) {
-			logger.info("No checker type found in " + dir.getAbsolutePath());
-			return;
-		}
-		try {
-			checkerConf = new Props(globalConf, confFile);
-		} catch (IOException e) {
-			throw new TriggerException("Failed to load config for the checker type", e);
-		}
-		
-		String checkerName = dir.getName();
-		String checkerClass = checkerConf.getString("checker.class");
-		
-		List<URL> resources = new ArrayList<URL>();		
-		for(File f : dir.listFiles()) {
-			try {
-				if(f.getName().endsWith(".jar")) {
-					resources.add(f.toURI().toURL());
-					logger.info("adding to classpath " + f.toURI().toURL());
-				}
-			} catch (MalformedURLException e) {
-				// TODO Auto-generated catch block
-				throw new TriggerException(e);
-			}
-		}
-		
-		// each job type can have a different class loader
-		ClassLoader checkerCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
-		
-		Class<? extends ConditionChecker> clazz = null;
-		try {
-			clazz = (Class<? extends ConditionChecker>)checkerCl.loadClass(checkerClass);
-			checkerToClass.put(checkerName, clazz);
-		}
-		catch (ClassNotFoundException e) {
-			throw new TriggerException(e);
-		}
-		
-		if(checkerConf.getBoolean("need.init")) {
-			try {
-				Utils.invokeStaticMethod(checkerCl, checkerClass, "init", checkerConf);
-			} catch (Exception e) {
-				e.printStackTrace();
-				logger.error("Failed to init the checker type " + checkerName);
-				throw new TriggerException(e);
-			}
-		}
-		
-		logger.info("Loaded checker type " + checkerName + " " + checkerClass);
-	}
+//	private void loadPluginCheckers(Props props) throws TriggerException {
+//		
+//		String checkerDir = props.getString("azkaban.condition.checker.plugin.dir", DEFAULT_CONDITION_CHECKER_PLUGIN_DIR);
+//		File pluginDir = new File(checkerDir);
+//		if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
+//			logger.info("No conditon checker plugins to load.");
+//			return;
+//		}
+//		
+//		logger.info("Loading plugin condition checkers from " + pluginDir);
+//		ClassLoader parentCl = this.getClass().getClassLoader();
+//		
+//		Props globalCheckerConf = null;
+//		File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
+//		try {
+//			if(confFile != null) {
+//				globalCheckerConf = new Props(null, confFile);
+//			} else {
+//				globalCheckerConf = new Props();
+//			}
+//		} catch (IOException e) {
+//			throw new TriggerException("Failed to get global properties." + e);
+//		}
+//		
+//		for(File dir : pluginDir.listFiles()) {
+//			if(dir.isDirectory() && dir.canRead()) {
+//				try {
+//					loadPluginTypes(globalCheckerConf, pluginDir, parentCl);
+//				} catch (Exception e) {
+//					logger.info("Plugin checkers failed to load. " + e.getCause());
+//					throw new TriggerException("Failed to load all condition checkers!", e);
+//				}
+//			}
+//		}
+//	}
+//	
+//	@SuppressWarnings("unchecked")
+//	private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
+//		Props checkerConf = null;
+//		File confFile = Utils.findFilefromDir(dir, CHECKERTYPECONFFILE);
+//		if(confFile == null) {
+//			logger.info("No checker type found in " + dir.getAbsolutePath());
+//			return;
+//		}
+//		try {
+//			checkerConf = new Props(globalConf, confFile);
+//		} catch (IOException e) {
+//			throw new TriggerException("Failed to load config for the checker type", e);
+//		}
+//		
+//		String checkerName = dir.getName();
+//		String checkerClass = checkerConf.getString("checker.class");
+//		
+//		List<URL> resources = new ArrayList<URL>();		
+//		for(File f : dir.listFiles()) {
+//			try {
+//				if(f.getName().endsWith(".jar")) {
+//					resources.add(f.toURI().toURL());
+//					logger.info("adding to classpath " + f.toURI().toURL());
+//				}
+//			} catch (MalformedURLException e) {
+//				// TODO Auto-generated catch block
+//				throw new TriggerException(e);
+//			}
+//		}
+//		
+//		// each job type can have a different class loader
+//		ClassLoader checkerCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
+//		
+//		Class<? extends ConditionChecker> clazz = null;
+//		try {
+//			clazz = (Class<? extends ConditionChecker>)checkerCl.loadClass(checkerClass);
+//			checkerToClass.put(checkerName, clazz);
+//		}
+//		catch (ClassNotFoundException e) {
+//			throw new TriggerException(e);
+//		}
+//		
+//		if(checkerConf.getBoolean("need.init")) {
+//			try {
+//				Utils.invokeStaticMethod(checkerCl, checkerClass, "init", checkerConf);
+//			} catch (Exception e) {
+//				e.printStackTrace();
+//				logger.error("Failed to init the checker type " + checkerName);
+//				throw new TriggerException(e);
+//			}
+//		}
+//		
+//		logger.info("Loaded checker type " + checkerName + " " + checkerClass);
+//	}
 	
 	public static void registerBuiltinCheckers(Map<String, Class<? extends ConditionChecker>> builtinCheckers) {
 		checkerToClass.putAll(checkerToClass);
@@ -143,10 +143,10 @@ public class CheckerTypeLoader {
 		}
 	}
 	
-	private void loadBuiltinCheckers() {
-		checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);
-		logger.info("Loaded BasicTimeChecker type.");
-	}
+//	private void loadBuiltinCheckers() {
+//		checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);
+//		logger.info("Loaded BasicTimeChecker type.");
+//	}
 	
 	public ConditionChecker createCheckerFromJson(String type, Object obj) throws Exception {
 		ConditionChecker checker = null;
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index d032527..2f355d9 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -21,7 +21,7 @@ public class Condition {
 	private Expression expression;
 	private Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
 	private MapContext context = new MapContext();
-	private long nextCheckTime = -1;	
+	private Long nextCheckTime = -1L;	
 	
 	public Condition(Map<String, ConditionChecker> checkers, String expr) {
 		setCheckers(checkers);
@@ -32,6 +32,9 @@ public class Condition {
 	public Condition(Map<String, ConditionChecker> checkers, String expr, long nextCheckTime) {
 		this.nextCheckTime = nextCheckTime;
 		setCheckers(checkers);
+//		for(ConditionChecker ck : checkers.values()) {
+//			ck.setCondition(this);
+//		}
 		this.expression = jexl.createExpression(expr);
 	}
 	
@@ -43,18 +46,17 @@ public class Condition {
 		Condition.checkerLoader = loader;
 	}
 	
-	public static CheckerTypeLoader getCheckerLoader() {
+	protected static CheckerTypeLoader getCheckerLoader() {
 		return checkerLoader;
 	}
 	
-	public void registerChecker(ConditionChecker checker) {
+	protected void registerChecker(ConditionChecker checker) {
 		checkers.put(checker.getId(), checker);
 		context.set(checker.getId(), checker);
 		updateNextCheckTime();
 	}
 	
 	public long getNextCheckTime() {
-		updateNextCheckTime();
 		return nextCheckTime;
 	}
 	
@@ -66,10 +68,17 @@ public class Condition {
 		this.checkers = checkers;
 		for(ConditionChecker checker : checkers.values()) {
 			this.context.set(checker.getId(), checker);
+//			checker.setCondition(this);
 		}
 		updateNextCheckTime();
 	}
 	
+	public void updateCheckTime(Long ct) {
+		if(nextCheckTime < ct) {
+			nextCheckTime = ct;
+		}
+	}
+	
 	private void updateNextCheckTime() {
 		long time = Long.MAX_VALUE;
 		for(ConditionChecker checker : checkers.values()) {
@@ -83,7 +92,7 @@ public class Condition {
 			checker.reset();
 		}
 		updateNextCheckTime();
-		logger.error("Done resetting checkers. The next check time will be " + new DateTime(nextCheckTime));
+		logger.info("Done resetting checkers. The next check time will be " + new DateTime(nextCheckTime));
 	}
 	
 	public String getExpression() {
@@ -95,6 +104,7 @@ public class Condition {
 	}
 	
 	public boolean isMet() {
+		logger.info("Testing ondition " + expression);
 		return expression.evaluate(context).equals(Boolean.TRUE);
 	}
 	
@@ -134,7 +144,7 @@ public class Condition {
 				checkers.put(ck.getId(), ck);
 			}
 			String expr = (String) jsonObj.get("expression");
-			long nextCheckTime = Long.valueOf((String) jsonObj.get("nextCheckTime"));
+			Long nextCheckTime = Long.valueOf((String) jsonObj.get("nextCheckTime"));
 				
 			cond = new Condition(checkers, expr, nextCheckTime);
 			
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 342312b..c78267e 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -24,4 +24,8 @@ public interface ConditionChecker {
 	void setContext(Map<String, Object> context);
 	
 	long getNextCheckTime();
+	
+//	void setCondition(Condition c);
+//	
+//	String getDescription();
 }
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index e964ecf..d4f1698 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -8,9 +8,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
 
 import azkaban.utils.Props;
 
@@ -18,7 +18,7 @@ public class TriggerManager implements TriggerManagerAdapter{
 	private static Logger logger = Logger.getLogger(TriggerManager.class);
 	public static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
 
-	private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
+	private static Map<Integer, Trigger> triggerIdMap = new ConcurrentHashMap<Integer, Trigger>();
 	
 	private CheckerTypeLoader checkerTypeLoader;
 	private ActionTypeLoader actionTypeLoader;
@@ -47,8 +47,11 @@ public class TriggerManager implements TriggerManagerAdapter{
 		} catch (Exception e) {
 			throw new TriggerManagerException(e);
 		}
+		
 		Condition.setCheckerLoader(checkerTypeLoader);
 		Trigger.setActionTypeLoader(actionTypeLoader);
+		
+		logger.info("TriggerManager loaded.");
 	}
 
 	@Override
@@ -192,7 +195,6 @@ public class TriggerManager implements TriggerManagerAdapter{
 					} catch(InterruptedException e) {
 						logger.info("Interrupted. Probably to shut down.");
 					}
-					
 				}
 			}
 		}
@@ -247,7 +249,6 @@ public class TriggerManager implements TriggerManagerAdapter{
 			catch (TriggerLoaderException e) {
 				throw new TriggerManagerException(e);
 			}
-//			updateAgent(t);
 		}
 		
 		private void onTriggerExpire(Trigger t) throws TriggerManagerException {
@@ -304,6 +305,7 @@ public class TriggerManager implements TriggerManagerAdapter{
 //		updateAgent(t);
 	}
 
+	@Override
 	public List<Trigger> getTriggers(String triggerSource) {
 		List<Trigger> triggers = new ArrayList<Trigger>();
 		for(Trigger t : triggerIdMap.values()) {
@@ -326,29 +328,29 @@ public class TriggerManager implements TriggerManagerAdapter{
 	}
 	
 	@Override
-	public List<Integer> getTriggerUpdates(long lastUpdateTime) throws TriggerManagerException {
-		List<Integer> triggers = new ArrayList<Integer>();
+	public List<Trigger> getAllTriggerUpdates(long lastUpdateTime) throws TriggerManagerException {
+		List<Trigger> triggers = new ArrayList<Trigger>();
 		for(Trigger t : triggerIdMap.values()) {
 			if(t.getLastModifyTime() > lastUpdateTime) {
-				triggers.add(t.getTriggerId());
+				triggers.add(t);
 			}
 		}
 		return triggers;
 	}
 
-	public void loadTrigger(int triggerId) throws TriggerManagerException {
-		Trigger t;
-		try {
-			t = triggerLoader.loadTrigger(triggerId);
-		} catch (TriggerLoaderException e) {
-			throw new TriggerManagerException(e);
-		}
-		if(t.getStatus().equals(TriggerStatus.PREPARING)) {
-			triggerIdMap.put(t.getTriggerId(), t);
-			runnerThread.addTrigger(t);
-			t.setStatus(TriggerStatus.READY);
-		}
-	}
+//	public void loadTrigger(int triggerId) throws TriggerManagerException {
+//		Trigger t;
+//		try {
+//			t = triggerLoader.loadTrigger(triggerId);
+//		} catch (TriggerLoaderException e) {
+//			throw new TriggerManagerException(e);
+//		}
+//		if(t.getStatus().equals(TriggerStatus.PREPARING)) {
+//			triggerIdMap.put(t.getTriggerId(), t);
+//			runnerThread.addTrigger(t);
+//			t.setStatus(TriggerStatus.READY);
+//		}
+//	}
 
 	@Override
 	public void insertTrigger(Trigger t, String user) throws TriggerManagerException {
@@ -361,27 +363,22 @@ public class TriggerManager implements TriggerManagerAdapter{
 	}
 
 	@Override
-	public void updateTrigger(int triggerId, String user) throws TriggerManagerException {
-		updateTrigger(triggerId);
-	}
-
-	@Override
 	public void updateTrigger(Trigger t, String user) throws TriggerManagerException {
 		updateTrigger(t);
 	}
 	
-	@Override
-	public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
-		Trigger t;
-		try {
-			t = triggerLoader.loadTrigger(triggerId);
-		} catch (TriggerLoaderException e) {
-			throw new TriggerManagerException(e);
-		}
-		if(t != null) {
-			insertTrigger(t);
-		}
-	}
+//	@Override
+//	public void insertTrigger(int triggerId, String user) throws TriggerManagerException {
+//		Trigger t;
+//		try {
+//			t = triggerLoader.loadTrigger(triggerId);
+//		} catch (TriggerLoaderException e) {
+//			throw new TriggerManagerException(e);
+//		}
+//		if(t != null) {
+//			insertTrigger(t);
+//		}
+//	}
 	
 	@Override
 	public void shutdown() {
diff --git a/src/java/azkaban/trigger/TriggerManagerAdapter.java b/src/java/azkaban/trigger/TriggerManagerAdapter.java
index 0934985..cdf2921 100644
--- a/src/java/azkaban/trigger/TriggerManagerAdapter.java
+++ b/src/java/azkaban/trigger/TriggerManagerAdapter.java
@@ -1,27 +1,22 @@
 package azkaban.trigger;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.joda.time.DateTimeZone;
-
-import azkaban.triggerapp.TriggerRunnerManagerException;
 
 public interface TriggerManagerAdapter {
+	
 	public void insertTrigger(Trigger t, String user) throws TriggerManagerException;
 	
 	public void removeTrigger(int id, String user) throws TriggerManagerException;
 	
-	public void updateTrigger(int triggerId, String user) throws TriggerManagerException;
-	
-	void updateTrigger(Trigger t, String user) throws TriggerManagerException;
+	public void updateTrigger(Trigger t, String user) throws TriggerManagerException;
 
-	public void insertTrigger(int triggerId, String user) throws TriggerManagerException;
-
-	public List<Integer> getTriggerUpdates(long lastUpdateTime) throws TriggerManagerException;
+	public List<Trigger> getAllTriggerUpdates(long lastUpdateTime) throws TriggerManagerException;
 	
 	public List<Trigger> getTriggerUpdates(String triggerSource, long lastUpdateTime) throws TriggerManagerException;
+	
+	public List<Trigger> getTriggers(String trigegerSource);
 
 	public void start() throws TriggerManagerException;
 	
diff --git a/src/java/azkaban/trigger/TriggerStatus.java b/src/java/azkaban/trigger/TriggerStatus.java
index 8d397bc..3fcadf7 100644
--- a/src/java/azkaban/trigger/TriggerStatus.java
+++ b/src/java/azkaban/trigger/TriggerStatus.java
@@ -1,7 +1,7 @@
 package azkaban.trigger;
 
 public enum TriggerStatus {
-	READY(10), PAUSED(20), EXPIRED(30), PREPARING(40);
+	READY(10), PAUSED(20), EXPIRED(30);
 	
 	private int numVal;
 
@@ -21,8 +21,6 @@ public enum TriggerStatus {
 			return PAUSED;
 		case 30:
 			return EXPIRED;
-		case 40:
-			return PREPARING;
 		default:
 			return READY;
 		}
diff --git a/src/java/azkaban/utils/PropsUtils.java b/src/java/azkaban/utils/PropsUtils.java
index 16afb52..507d433 100644
--- a/src/java/azkaban/utils/PropsUtils.java
+++ b/src/java/azkaban/utils/PropsUtils.java
@@ -24,7 +24,6 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.StringTokenizer;
 import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
diff --git a/src/java/azkaban/utils/StringUtils.java b/src/java/azkaban/utils/StringUtils.java
index 6684b00..2693365 100644
--- a/src/java/azkaban/utils/StringUtils.java
+++ b/src/java/azkaban/utils/StringUtils.java
@@ -38,6 +38,17 @@ public class StringUtils {
 		return buf.toString();
 	}
 	
+	@Deprecated
+	public static String join(List<String> list, String delimiter) {
+		StringBuffer buffer = new StringBuffer();
+		for (String str: list) {
+			buffer.append(str);
+			buffer.append(delimiter);
+		}
+		
+		return buffer.toString();
+	}
+	
 	/**
 	 * Use this when you don't want to include Apache Common's string for
 	 * plugins.
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 2af906b..83ffba4 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -51,13 +51,11 @@ import org.mortbay.jetty.servlet.DefaultServlet;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.thread.QueuedThreadPool;
 
+import azkaban.alert.Alerter;
 import azkaban.database.AzkabanDatabaseSetup;
-import azkaban.executor.ExecutorMailer;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.executor.ExecutorManagerRemoteAdapter;
 import azkaban.executor.JdbcExecutorLoader;
-import azkaban.executor.ExecutorManager.Alerter;
 import azkaban.jmx.JmxExecutorManagerAdapter;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxTriggerManager;
@@ -74,11 +72,13 @@ import azkaban.trigger.TriggerManagerException;
 import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.trigger.builtin.CreateTriggerAction;
 import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.trigger.builtin.ExecutionChecker;
 import azkaban.trigger.builtin.KillExecutionAction;
 import azkaban.trigger.builtin.SlaAlertAction;
 import azkaban.trigger.builtin.SlaChecker;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
+import azkaban.utils.Emailer;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
@@ -144,8 +144,8 @@ public class AzkabanWebServer extends AzkabanServer {
 	private ProjectManager projectManager;
 	private ExecutorManagerAdapter executorManager;
 	private ScheduleManager scheduleManager;
-//	private TriggerBasedScheduler scheduler;
 	private TriggerManager triggerManager;
+	private Map<String, Alerter> alerters;
 	
 	private final ClassLoader baseClassLoader;
 	
@@ -176,18 +176,23 @@ public class AzkabanWebServer extends AzkabanServer {
 		sessionCache = new SessionCache(props);
 		userManager = loadUserManager(props);
 		
-		triggerManager = loadTriggerManager(props);
+		alerters = loadAlerters(props);
+		
 		executorManager = loadExecutorManager(props);
-		projectManager = loadProjectManager(props, triggerManager);
+		projectManager = loadProjectManager(props);
+		
+		triggerManager = loadTriggerManager(props);
+		loadBuiltinCheckersAndActions();		
 		
 		// load all triggger agents here
-		scheduleManager = loadScheduleManager(executorManager, triggerManager, props);
+		scheduleManager = loadScheduleManager(triggerManager, props);
 		
-		loadBuiltinCheckersAndActions();
 		String triggerPluginDir = props.getString("trigger.plugin.dir", "plugins/triggers");
+		
 		loadPluginCheckersAndActions(triggerPluginDir);
 		
-		baseClassLoader = getBaseClassloader();
+		//baseClassLoader = getBaseClassloader();
+		baseClassLoader = this.getClassLoader();
 		
 		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
 
@@ -196,7 +201,6 @@ public class AzkabanWebServer extends AzkabanServer {
 			String timezone = props.getString(DEFAULT_TIMEZONE_ID);
 			TimeZone.setDefault(TimeZone.getTimeZone(timezone));
 			DateTimeZone.setDefault(DateTimeZone.forID(timezone));
-
 			logger.info("Setting timezone to " + timezone);
 		}
 		
@@ -215,9 +219,7 @@ public class AzkabanWebServer extends AzkabanServer {
 		Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
 		logger.info("Loading user manager class " + userManagerClass.getName());
 		UserManager manager = null;
-
 		if (userManagerClass != null && userManagerClass.getConstructors().length > 0) {
-
 			try {
 				Constructor<?> userManagerConstructor = userManagerClass.getConstructor(Props.class);
 				manager = (UserManager) userManagerConstructor.newInstance(props);
@@ -230,13 +232,11 @@ public class AzkabanWebServer extends AzkabanServer {
 		else {
 			manager = new XmlUserManager(props);
 		}
-
 		return manager;
 	}
 	
-	private ProjectManager loadProjectManager(Props props, TriggerManager tm) {
+	private ProjectManager loadProjectManager(Props props) {
 		logger.info("Loading JDBC for project management");
-
 		JdbcProjectLoader loader = new JdbcProjectLoader(props);
 		ProjectManager manager = new ProjectManager(loader, props);
 		return manager;
@@ -244,53 +244,26 @@ public class AzkabanWebServer extends AzkabanServer {
 
 	private ExecutorManager loadExecutorManager(Props props) throws Exception {
 		JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
-		ExecutorManager execManager = new ExecutorManager(props, loader);
+		ExecutorManager execManager = new ExecutorManager(props, loader, alerters);
 		return execManager;
 	}
 	
-	private ExecutorManagerAdapter loadExecutorManagerAdapter(Props props) throws Exception {
-//		JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
-//		ExecutorManager execManager = new ExecutorManager(props, loader, true);
-//		return execManager;
-		String executorMode = props.getString("executor.manager.mode", "local");
-		ExecutorManagerAdapter adapter;
-		if(executorMode.equals("local")) {
-			adapter = loadExecutorManager(props);
-		} else if(executorMode.equals("remote")) {
-			JdbcExecutorLoader loader = new JdbcExecutorLoader(props);
-			adapter = new ExecutorManagerRemoteAdapter(props, loader);
-		} else {
-			throw new Exception("Unknown ExecutorManager mode " + executorMode);
-		}
-		return adapter;
-	}
-
-//	private ScheduleManager loadScheduleManager(ExecutorManagerAdapter executorManager, TriggerManager tm, Props props ) throws Exception {
-//		ScheduleManager schedManager = null;
-//		String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
-//		if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
-//			ScheduleLoader loader = new JdbcScheduleLoader(props);
-//			schedManager = new ScheduleManager(executorManager, loader, false);
-//			schedManager.setProjectManager(projectManager);
-//			schedManager.start();
-//		} else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
-//			logger.info("Loading trigger based scheduler");
-//			ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
-//			schedManager = new ScheduleManager(executorManager, loader, true);
+//	private ExecutorManagerAdapter loadExecutorManagerAdapter(Props props) throws Exception {
+//		String executorMode = props.getString("executor.manager.mode", "local");
+//		ExecutorManagerAdapter adapter;
+//		if(executorMode.equals("local")) {
+//			adapter = loadExecutorManager(props);
+//		} else {
+//			throw new Exception("Unknown ExecutorManager mode " + executorMode);
 //		}
-//
-//		return schedManager;
+//		return adapter;
 //	}
-	
-	private ScheduleManager loadScheduleManager(ExecutorManagerAdapter executorManager, TriggerManager tm, Props props ) throws Exception {
+
+	private ScheduleManager loadScheduleManager(TriggerManager tm, Props props ) throws Exception {
 		logger.info("Loading trigger based scheduler");
-		ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
-		return new ScheduleManager(executorManager, loader);
+		ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, ScheduleManager.triggerSource);
+		return new ScheduleManager(loader);
 	}
-//	private TriggerBasedScheduler loadScheduler(ExecutorManager executorManager, ProjectManager projectManager, TriggerManager triggerManager) {
-//		TriggerBasedScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
-//		return new TriggerBasedScheduler(executorManager, projectManager, triggerManager, loader);
-//	}
 
 	private TriggerManager loadTriggerManager(Props props) throws TriggerManagerException {
 		TriggerLoader loader = new JdbcTriggerLoader(props);
@@ -299,7 +272,6 @@ public class AzkabanWebServer extends AzkabanServer {
 	
 	private void loadBuiltinCheckersAndActions() {
 		logger.info("Loading built-in checker and action types");
-		
 		if(triggerManager instanceof TriggerManager) {
 			SlaChecker.setExecutorManager(executorManager);
 			ExecuteFlowAction.setExecutorManager(executorManager);
@@ -307,14 +279,15 @@ public class AzkabanWebServer extends AzkabanServer {
 			ExecuteFlowAction.setTriggerManager(triggerManager);
 			KillExecutionAction.setExecutorManager(executorManager);
 			SlaAlertAction.setExecutorManager(executorManager);
-			Map<String, Alerter> alerters = loadAlerters(props);
+			//Map<String, azkaban.alert.Alerter> alerters = loadAlerters(props);
 			SlaAlertAction.setAlerters(alerters);
 			SlaAlertAction.setExecutorManager(executorManager);
 			CreateTriggerAction.setTriggerManager(triggerManager);
+			ExecutionChecker.setExecutorManager(executorManager);
 		}
-
 		triggerManager.registerCheckerType(BasicTimeChecker.type, BasicTimeChecker.class);
 		triggerManager.registerCheckerType(SlaChecker.type, SlaChecker.class);
+		triggerManager.registerCheckerType(ExecutionChecker.type, ExecutionChecker.class);
 		triggerManager.registerActionType(ExecuteFlowAction.type, ExecuteFlowAction.class);
 		triggerManager.registerActionType(KillExecutionAction.type, KillExecutionAction.class);
 		triggerManager.registerActionType(SlaAlertAction.type, SlaAlertAction.class);
@@ -324,7 +297,7 @@ public class AzkabanWebServer extends AzkabanServer {
 	private Map<String, Alerter> loadAlerters(Props props) {
 		Map<String, Alerter> allAlerters = new HashMap<String, Alerter>();
 		// load built-in alerters
-		ExecutorMailer mailAlerter = new ExecutorMailer(props);
+		Emailer mailAlerter = new Emailer(props);
 		allAlerters.put("email", mailAlerter);
 		// load all plugin alerters
 		String pluginDir = props.getString("alerter.plugin.dir", "plugins/alerter");
@@ -662,28 +635,28 @@ public class AzkabanWebServer extends AzkabanServer {
 		return engine;
 	}
 
-	private ClassLoader getBaseClassloader() throws MalformedURLException {
-		final ClassLoader retVal;
-
-		String hadoopHome = System.getenv("HADOOP_HOME");
-		String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
-
-		if (hadoopConfDir != null) {
-			logger.info("Using hadoop config found in " + hadoopConfDir);
-			retVal = new URLClassLoader(new URL[] { new File(hadoopConfDir)
-					.toURI().toURL() }, getClass().getClassLoader());
-		} else if (hadoopHome != null) {
-			logger.info("Using hadoop config found in " + hadoopHome);
-			retVal = new URLClassLoader(
-					new URL[] { new File(hadoopHome, "conf").toURI().toURL() },
-					getClass().getClassLoader());
-		} else {
-			logger.info("HADOOP_HOME not set, using default hadoop config.");
-			retVal = getClass().getClassLoader();
-		}
-
-		return retVal;
-	}
+//	private ClassLoader getBaseClassloader() throws MalformedURLException {
+//		final ClassLoader retVal;
+//
+//		String hadoopHome = System.getenv("HADOOP_HOME");
+//		String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
+//
+//		if (hadoopConfDir != null) {
+//			logger.info("Using hadoop config found in " + hadoopConfDir);
+//			retVal = new URLClassLoader(new URL[] { new File(hadoopConfDir)
+//					.toURI().toURL() }, getClass().getClassLoader());
+//		} else if (hadoopHome != null) {
+//			logger.info("Using hadoop config found in " + hadoopHome);
+//			retVal = new URLClassLoader(
+//					new URL[] { new File(hadoopHome, "conf").toURI().toURL() },
+//					getClass().getClassLoader());
+//		} else {
+//			logger.info("HADOOP_HOME not set, using default hadoop config.");
+//			retVal = getClass().getClassLoader();
+//		}
+//
+//		return retVal;
+//	}
 
 	public ClassLoader getClassLoader() {
 		return baseClassLoader;
@@ -883,9 +856,9 @@ public class AzkabanWebServer extends AzkabanServer {
 			}
 			
 			String pluginName = pluginProps.getString("trigger.name");
-			String pluginWebPath = pluginProps.getString("trigger.web.path");
-			int pluginOrder = pluginProps.getInt("trigger.order", 0);
-			boolean pluginHidden = pluginProps.getBoolean("trigger.hidden", false);
+//			String pluginWebPath = pluginProps.getString("trigger.web.path");
+//			int pluginOrder = pluginProps.getInt("trigger.order", 0);
+//			boolean pluginHidden = pluginProps.getBoolean("trigger.hidden", false);
 			List<String> extLibClasspath = pluginProps.getStringList("trigger.external.classpaths", (List<String>)null);
 			
 			String pluginClass = pluginProps.getString("trigger.class");
@@ -963,8 +936,6 @@ public class AzkabanWebServer extends AzkabanServer {
 			}
 			
 			TriggerPlugin plugin = (TriggerPlugin) obj;
-//			AbstractAzkabanServlet avServlet = (AbstractAzkabanServlet) plugin.getServlet();
-//			root.addServlet(new ServletHolder(avServlet), "/" + pluginWebPath + "/*");
 			installedTriggerPlugins.put(pluginName, plugin);
 		}
 		
@@ -974,14 +945,6 @@ public class AzkabanWebServer extends AzkabanServer {
 		VelocityEngine ve = azkabanWebApp.getVelocityEngine();
 		ve.addProperty("jar.resource.loader.path", jarResourcePath);
 		
-//		// Sort plugins based on order
-//		Collections.sort(installedTriggerPlugins, new Comparator<TriggerPlugin>() {
-//			@Override
-//			public int compare(TriggerPlugin o1, TriggerPlugin o2) {
-//				return o1.getOrder() - o2.getOrder();
-//			}
-//		});
-		
 		return installedTriggerPlugins;
 	}
 	
diff --git a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
index 601abda..c9e2603 100644
--- a/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
+++ b/src/java/azkaban/webapp/servlet/JMXHttpServlet.java
@@ -18,7 +18,6 @@ import org.apache.log4j.Logger;
 import azkaban.executor.ConnectorParams;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerAdapter;
-import azkaban.triggerapp.TriggerConnectorParams;
 import azkaban.trigger.TriggerManager;
 import azkaban.user.Permission;
 import azkaban.user.Role;
@@ -77,16 +76,17 @@ public class JMXHttpServlet extends LoginAbstractAzkabanServlet implements Conne
 				Map<String, Object> result = executorManager.callExecutorJMX(hostPort, JMX_GET_ALL_MBEAN_ATTRIBUTES, mbean);
 				ret = result;
 			}
-			else if (TriggerConnectorParams.JMX_GET_ALL_TRIGGER_SERVER_ATTRIBUTES.equals(ajax)) {
-				if(!hasParam(req, JMX_MBEAN) || !hasParam(req, JMX_HOSTPORT)) {
-					ret.put("error", "Parameters '" + JMX_MBEAN + "' and '"+ JMX_HOSTPORT +"' must be set");
-					this.writeJSON(resp, ret, true);
-					return;
-				}
-//				String hostPort = getParam(req, JMX_HOSTPORT);
-//				String mbean = getParam(req, JMX_MBEAN);
-				ret = triggerManager.getJMX().getAllJMXMbeans();
-			}
+//			else 
+//				if (TriggerConnectorParams.JMX_GET_ALL_TRIGGER_SERVER_ATTRIBUTES.equals(ajax)) {
+//				if(!hasParam(req, JMX_MBEAN) || !hasParam(req, JMX_HOSTPORT)) {
+//					ret.put("error", "Parameters '" + JMX_MBEAN + "' and '"+ JMX_HOSTPORT +"' must be set");
+//					this.writeJSON(resp, ret, true);
+//					return;
+//				}
+////				String hostPort = getParam(req, JMX_HOSTPORT);
+////				String mbean = getParam(req, JMX_MBEAN);
+//				ret = triggerManager.getJMX().getAllJMXMbeans();
+//			}
 			else if (JMX_GET_MBEANS.equals(ajax)) {
 				ret.put("mbeans", server.getMbeanNames());
 			}
diff --git a/src/java/azkaban/webapp/session/SessionCache.java b/src/java/azkaban/webapp/session/SessionCache.java
index 514606a..3628be1 100644
--- a/src/java/azkaban/webapp/session/SessionCache.java
+++ b/src/java/azkaban/webapp/session/SessionCache.java
@@ -20,7 +20,6 @@ import azkaban.utils.Props;
 import azkaban.utils.cache.Cache;
 import azkaban.utils.cache.CacheManager;
 import azkaban.utils.cache.Cache.EjectionPolicy;
-import azkaban.utils.cache.Element;
 
 
 /**
diff --git a/src/package/execserver/bin/azkaban-executor-start.sh b/src/package/execserver/bin/azkaban-executor-start.sh
index 912eaa0..80261d7 100755
--- a/src/package/execserver/bin/azkaban-executor-start.sh
+++ b/src/package/execserver/bin/azkaban-executor-start.sh
@@ -19,6 +19,19 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
+if [ "HADOOP_HOME" != "" ]; then
+	for file in $HADOOP_HOME/hadoop-core*.jar	do
+		CLASSPATH=$CLASSPATH:$file
+	done
+	CLASSPATH=$CLASSPATH:$HADOOP_HOME/conf
+else
+	echo "Error: HADOOP_HOME is not set. Hadoop job types will not run properly."
+fi
+
+if [ "HIVE_HOME" != "" ]; then
+    CLASSPATH=$CLASSPATH:$HIVE_HOME/conf
+fi
+
 echo $azkaban_dir;
 echo $CLASSPATH;