azkaban-uncached

check in generic triggers

6/10/2013 3:01:14 PM

Details

diff --git a/lib/commons-jexl-2.1.1.jar b/lib/commons-jexl-2.1.1.jar
new file mode 100644
index 0000000..ab288a8
Binary files /dev/null and b/lib/commons-jexl-2.1.1.jar differ
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
new file mode 100644
index 0000000..87773ed
--- /dev/null
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -0,0 +1,148 @@
+package azkaban.trigger;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+public class ActionTypeLoader {
+	
+	private static Logger logger = Logger.getLogger(ActionTypeLoader.class);
+	
+	public static final String DEFAULT_TRIGGER_ACTION_PLUGIN_DIR = "plugins/triggeractions";
+	private static final String ACTIONTYPECONFFILE = "plugin.properties"; // need jars.to.include property, will be loaded with user property
+	private static final String COMMONCONFFILE = "common.properties";	// common properties for multiple plugins
+
+	protected static Map<String, Class<? extends TriggerAction>> actionToClass = new HashMap<String, Class<? extends TriggerAction>>();
+	
+	public void init(Props props) throws TriggerException {
+		// load built-in actions
+		
+
+		loadDefaultActions();
+		
+		loadPluginActions(props);
+
+	}
+	
+	private void loadPluginActions(Props props) throws TriggerException {
+		String checkerDir = props.getString("azkaban.trigger.action.plugin.dir", DEFAULT_TRIGGER_ACTION_PLUGIN_DIR);
+		File pluginDir = new File(checkerDir);
+		if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
+			logger.info("No trigger action plugins to load.");
+			return;
+		}
+		
+		logger.info("Loading plugin trigger actions from " + pluginDir);
+		ClassLoader parentCl = this.getClass().getClassLoader();
+		
+		Props globalActionConf = null;
+		File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
+		try {
+			if(confFile != null) {
+				globalActionConf = new Props(null, confFile);
+			} else {
+				globalActionConf = new Props();
+			}
+		} catch (IOException e) {
+			throw new TriggerException("Failed to get global properties." + e);
+		}
+		
+		for(File dir : pluginDir.listFiles()) {
+			if(dir.isDirectory() && dir.canRead()) {
+				try {
+					loadPluginTypes(globalActionConf, pluginDir, parentCl);
+				} catch (Exception e) {
+					logger.info("Plugin actions failed to load. " + e.getCause());
+					throw new TriggerException("Failed to load all trigger actions!", e);
+				}
+			}
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
+		Props actionConf = null;
+		File confFile = Utils.findFilefromDir(dir, ACTIONTYPECONFFILE);
+		if(confFile == null) {
+			logger.info("No action type found in " + dir.getAbsolutePath());
+			return;
+		}
+		try {
+			actionConf = new Props(globalConf, confFile);
+		} catch (IOException e) {
+			throw new TriggerException("Failed to load config for the action type", e);
+		}
+		
+		String actionName = dir.getName();
+		String actionClass = actionConf.getString("action.class");
+		
+		List<URL> resources = new ArrayList<URL>();		
+		for(File f : dir.listFiles()) {
+			try {
+				if(f.getName().endsWith(".jar")) {
+					resources.add(f.toURI().toURL());
+					logger.info("adding to classpath " + f.toURI().toURL());
+				}
+			} catch (MalformedURLException e) {
+				// TODO Auto-generated catch block
+				throw new TriggerException(e);
+			}
+		}
+		
+		// each job type can have a different class loader
+		ClassLoader actionCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
+		
+		Class<? extends TriggerAction> clazz = null;
+		try {
+			clazz = (Class<? extends TriggerAction>)actionCl.loadClass(actionClass);
+			actionToClass.put(actionName, clazz);
+		}
+		catch (ClassNotFoundException e) {
+			throw new TriggerException(e);
+		}
+		
+		if(actionConf.getBoolean("need.init")) {
+			try {
+				Utils.invokeStaticMethod(actionCl, actionClass, "init", actionConf);
+			} catch (Exception e) {
+				e.printStackTrace();
+				logger.error("Failed to init the action type " + actionName);
+				throw new TriggerException(e);
+			}
+		}
+		
+		logger.info("Loaded action type " + actionName + " " + actionClass);
+	}
+	
+	private void loadDefaultActions() {
+		actionToClass.put(ExecuteFlowAction.type, ExecuteFlowAction.class);		
+	}
+	
+	public TriggerAction createActionFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+		TriggerAction action = null;
+		Class<? extends TriggerAction> actionClass = actionToClass.get(type);		
+		action = (TriggerAction) Utils.invokeStaticMethod(actionClass.getClassLoader(), actionClass.getName(), "createFromJson", obj);
+		
+		return action;
+	}
+	
+	public TriggerAction createAction(String type, Object ... args) {
+		TriggerAction action = null;
+		Class<? extends TriggerAction> actionClass = actionToClass.get(type);		
+		action = (TriggerAction) Utils.callConstructor(actionClass, args);
+		
+		return action;
+	}
+}
diff --git a/src/java/azkaban/trigger/BasicTimeChecker.java b/src/java/azkaban/trigger/BasicTimeChecker.java
new file mode 100644
index 0000000..ab0b94f
--- /dev/null
+++ b/src/java/azkaban/trigger/BasicTimeChecker.java
@@ -0,0 +1,232 @@
+package azkaban.trigger;
+
+import java.util.TimeZone;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.Months;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.Weeks;
+import org.joda.time.tz.DateTimeZoneBuilder;
+
+public class BasicTimeChecker implements ConditionChecker {
+
+	private DateTime firstCheckTime;
+	private DateTime nextCheckTime;
+	private boolean isRecurring = true;
+	private boolean skipPastChecks = true;
+	private ReadablePeriod period;
+	private String message;
+	
+	public static final String type = "BasicTimeChecker";
+	
+	public BasicTimeChecker(
+			DateTime firstCheckTime,
+			boolean isRecurring, 
+			boolean skipPastChecks,
+			ReadablePeriod period) {
+		this.firstCheckTime = firstCheckTime;
+		this.isRecurring = isRecurring;
+		this.skipPastChecks = skipPastChecks;
+		this.period = period;
+		this.nextCheckTime = new DateTime(firstCheckTime);
+		this.nextCheckTime = getNextCheckTime();
+	}
+	
+	public BasicTimeChecker(
+			DateTime firstCheckTime,
+			Boolean isRecurring, 
+			Boolean skipPastChecks,
+			String period) {
+		this.firstCheckTime = firstCheckTime;
+		this.isRecurring = isRecurring;
+		this.skipPastChecks = skipPastChecks;
+		this.period = parsePeriodString(period);
+		this.nextCheckTime = new DateTime(firstCheckTime);
+		this.nextCheckTime = getNextCheckTime();
+	}
+	
+	public BasicTimeChecker(
+			DateTime firstCheckTime,
+			DateTime nextCheckTime,
+			boolean isRecurring, 
+			boolean skipPastChecks,
+			ReadablePeriod period) {
+		this.firstCheckTime = firstCheckTime;
+		this.nextCheckTime = nextCheckTime;
+		this.isRecurring = isRecurring;
+		this.skipPastChecks = skipPastChecks;
+		this.period = period;
+	}
+	
+	@Override
+	public Boolean eval() {
+		return nextCheckTime.isBeforeNow();
+	}
+
+	@Override
+	public void reset() {
+		this.nextCheckTime = getNextCheckTime();
+		
+	}
+	
+	/*
+	 * TimeChecker format:
+	 * type_first-time-in-millis_next-time-in-millis_timezone_is-recurring_skip-past-checks_period
+	 */
+	@Override
+	public String getId() {
+		return getType() + "$" +
+				firstCheckTime.getMillis() + "$" +
+				nextCheckTime.getMillis() + "$" +
+				firstCheckTime.getZone().getID().replace('/', '0') + "$" +
+				//"offset"+firstCheckTime.getZone().getOffset(firstCheckTime.getMillis()) + "_" +
+				(isRecurring == true ? "1" : "0") + "$" +
+				(skipPastChecks == true ? "1" : "0") + "$" +
+				createPeriodString(period);
+	}
+
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	public static ConditionChecker createFromJson(String obj) {
+		String str = (String) obj;
+		String[] parts = str.split("\\$");
+		
+		if(!parts[0].equals(type)) {
+			throw new RuntimeException("Cannot create checker of " + type + " from " + parts[0]);
+		}
+		
+		long firstMillis = Long.parseLong(parts[1]);
+		long nextMillis = Long.parseLong(parts[2]);
+		//DateTimeZone timezone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(parts[3]));
+		DateTimeZone timezone = DateTimeZone.forID(parts[3].replace('0', '/'));
+		boolean isRecurring = parts[4].equals("1") ? true : false;
+		boolean skipPastChecks = parts[5].equals("1") ? true : false;
+		ReadablePeriod period = parsePeriodString(parts[6]);
+		
+		return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
+	}
+	
+	@Override
+	public ConditionChecker fromJson(Object obj) {
+		String str = (String) obj;
+		String[] parts = str.split("_");
+		
+		if(!parts[0].equals(getType())) {
+			throw new RuntimeException("Cannot create checker of " + getType() + " from " + parts[0]);
+		}
+		
+		long firstMillis = Long.parseLong(parts[1]);
+		long nextMillis = Long.parseLong(parts[2]);
+		DateTimeZone timezone = DateTimeZone.forID(parts[3]);
+		boolean isRecurring = Boolean.valueOf(parts[4]);
+		boolean skipPastChecks = Boolean.valueOf(parts[5]);
+		ReadablePeriod period = parsePeriodString(parts[6]);
+		
+		return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
+	}
+	
+	private DateTime getNextCheckTime(){
+		DateTime date = new DateTime(nextCheckTime);
+		int count = 0;
+		while(!DateTime.now().isBefore(date) && skipPastChecks) {
+			if(count > 100000) {
+				throw new IllegalStateException("100000 increments of period did not get to present time.");
+			}
+			
+			if(period == null) {
+				break;
+			}else {
+				date = date.plus(period);
+			}
+			count += 1;
+		}
+		return date;
+	}
+	
+	public static ReadablePeriod parsePeriodString(String periodStr) {
+		ReadablePeriod period;
+		char periodUnit = periodStr.charAt(periodStr.length() - 1);
+		if (periodUnit == 'n') {
+			return null;
+		}
+
+		int periodInt = Integer.parseInt(periodStr.substring(0,
+				periodStr.length() - 1));
+		switch (periodUnit) {
+		case 'M':
+			period = Months.months(periodInt);
+			break;
+		case 'w':
+			period = Weeks.weeks(periodInt);
+			break;
+		case 'd':
+			period = Days.days(periodInt);
+			break;
+		case 'h':
+			period = Hours.hours(periodInt);
+			break;
+		case 'm':
+			period = Minutes.minutes(periodInt);
+			break;
+		case 's':
+			period = Seconds.seconds(periodInt);
+			break;
+		default:
+			throw new IllegalArgumentException("Invalid schedule period unit '"
+					+ periodUnit);
+		}
+
+		return period;
+	}
+
+	public static String createPeriodString(ReadablePeriod period) {
+		String periodStr = "n";
+
+		if (period == null) {
+			return "n";
+		}
+
+		if (period.get(DurationFieldType.months()) > 0) {
+			int months = period.get(DurationFieldType.months());
+			periodStr = months + "M";
+		} else if (period.get(DurationFieldType.weeks()) > 0) {
+			int weeks = period.get(DurationFieldType.weeks());
+			periodStr = weeks + "w";
+		} else if (period.get(DurationFieldType.days()) > 0) {
+			int days = period.get(DurationFieldType.days());
+			periodStr = days + "d";
+		} else if (period.get(DurationFieldType.hours()) > 0) {
+			int hours = period.get(DurationFieldType.hours());
+			periodStr = hours + "h";
+		} else if (period.get(DurationFieldType.minutes()) > 0) {
+			int minutes = period.get(DurationFieldType.minutes());
+			periodStr = minutes + "m";
+		} else if (period.get(DurationFieldType.seconds()) > 0) {
+			int seconds = period.get(DurationFieldType.seconds());
+			periodStr = seconds + "s";
+		}
+
+		return periodStr;
+	}
+
+	@Override
+	public Object getNum() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public Object toJson() {
+		return getId();
+	}
+
+}
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
new file mode 100644
index 0000000..18e3e7b
--- /dev/null
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -0,0 +1,152 @@
+package azkaban.trigger;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+
+public class CheckerTypeLoader {
+	
+	private static Logger logger = Logger.getLogger(CheckerTypeLoader.class);
+	
+	public static final String DEFAULT_CONDITION_CHECKER_PLUGIN_DIR = "plugins/conditioncheckers";
+	private static final String CHECKERTYPECONFFILE = "plugin.properties"; // need jars.to.include property, will be loaded with user property
+	private static final String COMMONCONFFILE = "common.properties";	// common properties for multiple plugins
+	
+	protected static Map<String, Class<? extends ConditionChecker>> checkerToClass = new HashMap<String, Class<? extends ConditionChecker>>();
+	
+	public void init(Props props) throws TriggerException {
+		
+		
+		// load built-in checkers
+		
+		loadDefaultCheckers();
+		
+		loadPluginCheckers(props);
+
+	}
+	
+	private void loadPluginCheckers(Props props) throws TriggerException {
+		
+		String checkerDir = props.getString("azkaban.condition.checker.plugin.dir", DEFAULT_CONDITION_CHECKER_PLUGIN_DIR);
+		File pluginDir = new File(checkerDir);
+		if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
+			logger.info("No conditon checker plugins to load.");
+			return;
+		}
+		
+		logger.info("Loading plugin condition checkers from " + pluginDir);
+		ClassLoader parentCl = this.getClass().getClassLoader();
+		
+		Props globalCheckerConf = null;
+		File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
+		try {
+			if(confFile != null) {
+				globalCheckerConf = new Props(null, confFile);
+			} else {
+				globalCheckerConf = new Props();
+			}
+		} catch (IOException e) {
+			throw new TriggerException("Failed to get global properties." + e);
+		}
+		
+		for(File dir : pluginDir.listFiles()) {
+			if(dir.isDirectory() && dir.canRead()) {
+				try {
+					loadPluginTypes(globalCheckerConf, pluginDir, parentCl);
+				} catch (Exception e) {
+					logger.info("Plugin checkers failed to load. " + e.getCause());
+					throw new TriggerException("Failed to load all condition checkers!", e);
+				}
+			}
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
+		Props checkerConf = null;
+		File confFile = Utils.findFilefromDir(dir, CHECKERTYPECONFFILE);
+		if(confFile == null) {
+			logger.info("No checker type found in " + dir.getAbsolutePath());
+			return;
+		}
+		try {
+			checkerConf = new Props(globalConf, confFile);
+		} catch (IOException e) {
+			throw new TriggerException("Failed to load config for the checker type", e);
+		}
+		
+		String checkerName = dir.getName();
+		String checkerClass = checkerConf.getString("checker.class");
+		
+		List<URL> resources = new ArrayList<URL>();		
+		for(File f : dir.listFiles()) {
+			try {
+				if(f.getName().endsWith(".jar")) {
+					resources.add(f.toURI().toURL());
+					logger.info("adding to classpath " + f.toURI().toURL());
+				}
+			} catch (MalformedURLException e) {
+				// TODO Auto-generated catch block
+				throw new TriggerException(e);
+			}
+		}
+		
+		// each job type can have a different class loader
+		ClassLoader checkerCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
+		
+		Class<? extends ConditionChecker> clazz = null;
+		try {
+			clazz = (Class<? extends ConditionChecker>)checkerCl.loadClass(checkerClass);
+			checkerToClass.put(checkerName, clazz);
+		}
+		catch (ClassNotFoundException e) {
+			throw new TriggerException(e);
+		}
+		
+		if(checkerConf.getBoolean("need.init")) {
+			try {
+				Utils.invokeStaticMethod(checkerCl, checkerClass, "init", checkerConf);
+			} catch (Exception e) {
+				e.printStackTrace();
+				logger.error("Failed to init the checker type " + checkerName);
+				throw new TriggerException(e);
+			}
+		}
+		
+		logger.info("Loaded checker type " + checkerName + " " + checkerClass);
+	}
+	
+	private void loadDefaultCheckers() {
+		checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);		
+	}
+	
+	public ConditionChecker createCheckerFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+		ConditionChecker checker = null;
+		Class<? extends ConditionChecker> checkerClass = checkerToClass.get(type);		
+		checker = (ConditionChecker) Utils.invokeStaticMethod(checkerClass.getClassLoader(), checkerClass.getName(), "createFromJson", obj);
+		
+		return checker;
+	}
+	
+	public ConditionChecker createChecker(String type, Object ... args) {
+		ConditionChecker checker = null;
+		Class<? extends ConditionChecker> checkerClass = checkerToClass.get(type);		
+		checker = (ConditionChecker) Utils.callConstructor(checkerClass, args);
+		
+		return checker;
+	}
+	
+}
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
new file mode 100644
index 0000000..4c8e078
--- /dev/null
+++ b/src/java/azkaban/trigger/Condition.java
@@ -0,0 +1,117 @@
+package azkaban.trigger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.jexl2.MapContext;
+import org.apache.log4j.Logger;
+
+import com.sun.swing.internal.plaf.synth.resources.synth;
+
+public class Condition {
+	
+	private static Logger logger = Logger.getLogger(Condition.class);
+	
+	private static JexlEngine jexl = new JexlEngine();
+	private static CheckerTypeLoader checkerLoader = null;
+	private Expression expression;
+	private Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+	private MapContext context = new MapContext();
+	
+	public Condition(Map<String, ConditionChecker> checkers, String expr) {
+		setCheckers(checkers);
+		this.expression = jexl.createExpression(expr);
+	}
+	
+	public synchronized static void setJexlEngine(JexlEngine jexl) {
+		Condition.jexl = jexl;
+	}
+	
+	public synchronized static void setCheckerLoader(CheckerTypeLoader loader) {
+		Condition.checkerLoader = loader;
+	}
+	
+	public void registerChecker(ConditionChecker checker) {
+		checkers.put(checker.getId(), checker);
+		context.set(checker.getId(), checker);
+	}
+	
+	public Map<String, ConditionChecker> getCheckers() {
+		return this.checkers;
+	}
+	
+	public void setCheckers(Map<String, ConditionChecker> checkers){
+		this.checkers = checkers;
+		for(ConditionChecker checker : checkers.values()) {
+			this.context.set(checker.getId(), checker);
+		}
+	}
+	
+	public void resetCheckers() {
+		for(ConditionChecker checker : checkers.values()) {
+			checker.reset();
+		}
+	}
+	
+	public String getExpression() {
+		return this.expression.getExpression();
+	}
+	
+	public void setExpression(String expr) {
+		this.expression = jexl.createExpression(expr);
+	}
+	
+	public boolean isMet() {
+		return expression.evaluate(context).equals(Boolean.TRUE);
+	}
+	
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("expression", expression.getExpression());
+		
+		List<Object> checkersJson = new ArrayList<Object>();
+		for(ConditionChecker checker : checkers.values()) {
+			Map<String, Object> oneChecker = new HashMap<String, Object>();
+			oneChecker.put("type", checker.getType());
+			oneChecker.put("checkerJson", checker.toJson());
+			checkersJson.add(oneChecker);
+		}
+		jsonObj.put("checkers", checkersJson);
+		
+		return jsonObj;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static Condition fromJson(Object obj) {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		Condition cond = null;
+		
+		try {
+			Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+			List<Object> checkersJson = (List<Object>) jsonObj.get("checkers");			
+			for(Object oneCheckerJson : checkersJson) {
+				Map<String, Object> oneChecker = (HashMap<String, Object>) oneCheckerJson;
+				String type = (String) oneChecker.get("type");
+				ConditionChecker ck = checkerLoader.createCheckerFromJson(type, oneChecker.get("checkerJson"));
+				checkers.put(ck.getId(), ck);
+			}
+				String expr = (String) jsonObj.get("expression");
+				
+				cond = new Condition(checkers, expr);
+			
+		} catch(Exception e) {
+			e.printStackTrace();
+			logger.error("Failed to recreate condition from json.", e);
+			return null;
+		}
+		
+		return cond;
+	}
+	
+
+}
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
new file mode 100644
index 0000000..ad7b0c6
--- /dev/null
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -0,0 +1,20 @@
+package azkaban.trigger;
+
+
+public interface ConditionChecker {
+	
+	Object eval();
+	
+	Object getNum();
+	
+	void reset();
+	
+	String getId();
+	
+	String getType();
+	
+	ConditionChecker fromJson(Object obj);
+	
+	Object toJson();
+
+}
diff --git a/src/java/azkaban/trigger/ExecuteFlowAction.java b/src/java/azkaban/trigger/ExecuteFlowAction.java
new file mode 100644
index 0000000..35d12cf
--- /dev/null
+++ b/src/java/azkaban/trigger/ExecuteFlowAction.java
@@ -0,0 +1,177 @@
+package azkaban.trigger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+
+public class ExecuteFlowAction implements TriggerAction {
+
+	public static final String type = "ExecuteFlow";
+	
+	private static ExecutorManager executorManager;
+	private int projectId;
+	private String flowName;
+	private String submitUser;
+	private static ProjectManager projectManager;
+	private ExecutionOptions executionOptions;
+	
+
+	private static Logger logger;
+	
+	public ExecuteFlowAction(int projectId, String flowName, String submitUser, ExecutionOptions executionOptions) {
+		this.projectId = projectId;
+		this.flowName = flowName;
+		this.submitUser = submitUser;
+		this.executionOptions = executionOptions;
+	}
+	
+	public static void setLogger(Logger logger) {
+		ExecuteFlowAction.logger = logger;
+	}
+
+	public int getProjectId() {
+		return projectId;
+	}
+
+	public void setProjectId(int projectId) {
+		this.projectId = projectId;
+	}
+
+	public String getFlowName() {
+		return flowName;
+	}
+
+	public void setFlowName(String flowName) {
+		this.flowName = flowName;
+	}
+
+	public String getSubmitUser() {
+		return submitUser;
+	}
+
+	public void setSubmitUser(String submitUser) {
+		this.submitUser = submitUser;
+	}
+
+	public ExecutionOptions getExecutionOptions() {
+		return executionOptions;
+	}
+
+	public void setExecutionOptions(ExecutionOptions executionOptions) {
+		this.executionOptions = executionOptions;
+	}
+
+	public static void setExecutorManager(ExecutorManager executorManager) {
+		ExecuteFlowAction.executorManager = executorManager;
+	}
+
+	public static void setProjectManager(ProjectManager projectManager) {
+		ExecuteFlowAction.projectManager = projectManager;
+	}
+
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TriggerAction fromJson(Object obj) {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		String type = (String) jsonObj.get("type");
+		if(! type.equals(ExecuteFlowAction.type)) {
+			throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
+		}
+		int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
+		String flowName = (String) jsonObj.get("flowName");
+		String submitUser = (String) jsonObj.get("submitUser");
+		ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
+		return new ExecuteFlowAction(projectId, flowName, submitUser, executionOptions);
+	}
+
+	@SuppressWarnings("unchecked")
+	public static TriggerAction createFromJson(HashMap obj) {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		String type = (String) jsonObj.get("type");
+		if(! type.equals(ExecuteFlowAction.type)) {
+			throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
+		}
+		int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
+		String flowName = (String) jsonObj.get("flowName");
+		String submitUser = (String) jsonObj.get("submitUser");
+		ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
+		return new ExecuteFlowAction(projectId, flowName, submitUser, executionOptions);
+	}
+	
+	@SuppressWarnings("unchecked")
+	public static TriggerAction createFromJson(Object obj) {
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		String type = (String) jsonObj.get("type");
+		if(! type.equals(ExecuteFlowAction.type)) {
+			throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
+		}
+		int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
+		String flowName = (String) jsonObj.get("flowName");
+		String submitUser = (String) jsonObj.get("submitUser");
+		ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
+		return new ExecuteFlowAction(projectId, flowName, submitUser, executionOptions);
+	}
+	
+	@Override
+	public Object toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("type", type);
+		jsonObj.put("projectId", String.valueOf(projectId));
+		jsonObj.put("flowName", flowName);
+		jsonObj.put("submitUser", submitUser);
+		jsonObj.put("executionOptions", executionOptions.toObject());
+
+		return jsonObj;
+	}
+
+	@Override
+	public void doAction() {
+		Project project = projectManager.getProject(projectId);
+		if(project == null) {
+			logger.error("Project to execute " + projectId + " does not exist!");
+			throw new RuntimeException("Error finding the project to execute " + projectId);
+		}
+		
+		Flow flow = project.getFlow(flowName);
+		if(flow == null) {
+			logger.error("Flow " + flowName + " cannot be found in project " + project.getName());
+			throw new RuntimeException("Error finding the flow to execute " + flowName);
+		}
+		
+		ExecutableFlow exflow = new ExecutableFlow(flow);
+		exflow.setSubmitUser(submitUser);
+		exflow.addAllProxyUsers(project.getProxyUsers());
+		
+		if(!executionOptions.isFailureEmailsOverridden()) {
+			executionOptions.setFailureEmails(flow.getFailureEmails());
+		}
+		if(!executionOptions.isSuccessEmailsOverridden()) {
+			executionOptions.setSuccessEmails(flow.getSuccessEmails());
+		}
+		exflow.setExecutionOptions(executionOptions);
+		
+		try{
+			executorManager.submitExecutableFlow(exflow);
+			logger.info("Invoked flow " + project.getName() + "." + flowName);
+		} catch (ExecutorManagerException e) {
+			throw new RuntimeException(e);
+		}
+		
+	}
+
+
+}
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
new file mode 100644
index 0000000..91bf85e
--- /dev/null
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -0,0 +1,299 @@
+package azkaban.trigger;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoader {
+	private static Logger logger = Logger.getLogger(JdbcTriggerLoader.class);
+
+	private EncodingType defaultEncodingType = EncodingType.GZIP;
+	
+	private static final String triggerTblName = "triggers";
+
+	private static String GET_ALL_TRIGGERS =
+			"SELECT trigger_id, enc_type, data FROM " + triggerTblName;
+	
+	private static String GET_TRIGGER = 
+			"SELECT trigger_id, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
+	
+	private static String ADD_TRIGGER = 
+			"INSERT INTO " + triggerTblName + " ( modify_time, enc_type, data) values (?,?,?)";
+	
+	private static String REMOVE_TRIGGER = 
+			"DELETE FROM " + triggerTblName + " WHERE trigger_id=?";
+	
+	private static String UPDATE_TRIGGER = 
+			"UPDATE " + triggerTblName + " SET modify_time=?, enc_type=?, data=? WHERE trigger_id=?";
+	
+	public EncodingType getDefaultEncodingType() {
+		return defaultEncodingType;
+	}
+
+	public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+		this.defaultEncodingType = defaultEncodingType;
+	}
+	
+	public JdbcTriggerLoader(Props props) {
+		super(props);
+	}
+
+	@Override
+	public List<Trigger> loadTriggers() throws TriggerManagerException {
+		logger.info("Loading all triggers from db.");
+		Connection connection = getConnection();
+
+		QueryRunner runner = new QueryRunner();
+		ResultSetHandler<List<Trigger>> handler = new TriggerResultHandler();
+	
+		List<Trigger> triggers;
+		
+		try {
+			triggers = runner.query(connection, GET_ALL_TRIGGERS, handler);
+		} catch (SQLException e) {
+			logger.error(GET_ALL_TRIGGERS + " failed.");
+
+			throw new TriggerManagerException("Loading triggers from db failed. ", e);
+		} finally {
+			DbUtils.closeQuietly(connection);
+		}
+		
+		logger.info("Loaded " + triggers.size() + " triggers.");
+		
+		return triggers;
+	}
+
+	@Override
+	public void removeTrigger(Trigger t) throws TriggerManagerException {		
+		logger.info("Removing trigger " + t.toString() + " from db.");
+
+		QueryRunner runner = createQueryRunner();
+		try {
+			int removes =  runner.update(REMOVE_TRIGGER, t.getTriggerId());
+			if (removes == 0) {
+				throw new TriggerManagerException("No trigger has been removed.");
+			}
+		} catch (SQLException e) {
+			logger.error(REMOVE_TRIGGER + " failed.");
+			throw new TriggerManagerException("Remove trigger " + t.toString() + " from db failed. ", e);
+		}
+	}
+	
+	@Override
+	public void addTrigger(Trigger t) throws TriggerManagerException {
+		logger.info("Inserting trigger " + t.toString() + " into db.");
+		Connection connection = getConnection();
+		try {
+			addTrigger(connection, t, defaultEncodingType);
+		}
+		catch (Exception e) {
+			throw new TriggerManagerException("Error uploading trigger", e);
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+
+	private synchronized void addTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerManagerException {
+		
+		String json = JSONUtils.toJSON(t.toJson());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		}
+		catch (IOException e) {
+			throw new TriggerManagerException("Error encoding the trigger " + t.toString());
+		}
+		
+		QueryRunner runner = new QueryRunner();
+		long submitTime = System.currentTimeMillis();
+		
+		long id;
+		
+		try {
+			runner.update(connection, ADD_TRIGGER, DateTime.now().getMillis(), encType.getNumVal(), data);
+			connection.commit();
+			id = runner.query(connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());
+
+			if (id == -1l) {
+				throw new TriggerManagerException("trigger id is not properly created.");
+			}
+			logger.info("uploaded trigger " + t.toString());
+			t.setTriggerId((int)id);
+			
+		} catch (SQLException e) {
+			throw new TriggerManagerException("Error creating trigger.", e);
+		}
+		
+	}
+	
+	@Override
+	public void updateTrigger(Trigger t) throws TriggerManagerException {
+		logger.info("Updating trigger " + t.toString() + " into db.");
+		Connection connection = getConnection();
+		try{
+			updateTrigger(connection, t, defaultEncodingType);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			throw new TriggerManagerException("Failed to update trigger " + t.toString() + " into db!");
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+		
+	private void updateTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerManagerException {
+
+		String json = JSONUtils.toJSON(t.toJson());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		}
+		catch (IOException e) {
+			throw new TriggerManagerException("Error encoding the trigger " + t.toString());
+		}
+		
+		QueryRunner runner = new QueryRunner();
+	
+		try {
+			int updates =  runner.update( connection, 
+					UPDATE_TRIGGER, 
+					DateTime.now().getMillis(),
+					encType.getNumVal(),
+					data,
+					t.getTriggerId());
+			if (updates == 0) {
+				throw new TriggerManagerException("No trigger has been updated.");
+			}
+		} catch (SQLException e) {
+			logger.error(UPDATE_TRIGGER + " failed.");
+			throw new TriggerManagerException("Update trigger " + t.toString() + " into db failed. ", e);
+		}
+	}
+
+	
+	private static class LastInsertID implements ResultSetHandler<Long> {
+		private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
+		
+		@Override
+		public Long handle(ResultSet rs) throws SQLException {
+			if (!rs.next()) {
+				return -1l;
+			}
+
+			long id = rs.getLong(1);
+			return id;
+		}
+		
+	}
+	
+	public class TriggerResultHandler implements ResultSetHandler<List<Trigger>> {
+		
+		@Override
+		public List<Trigger> handle(ResultSet rs) throws SQLException {
+			if (!rs.next()) {
+				return Collections.<Trigger>emptyList();
+			}
+			
+			ArrayList<Trigger> triggers = new ArrayList<Trigger>();
+			do {
+				int triggerId = rs.getInt(1);
+				long modifyTime = rs.getInt(2);
+				int encodingType = rs.getInt(3);
+				byte[] data = rs.getBytes(4);
+				
+				Object jsonObj = null;
+				if (data != null) {
+					EncodingType encType = EncodingType.fromInteger(encodingType);
+
+					try {
+						// Convoluted way to inflate strings. Should find common package or helper function.
+						if (encType == EncodingType.GZIP) {
+							// Decompress the sucker.
+							String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+							jsonObj = JSONUtils.parseJSONFromString(jsonString);
+						}
+						else {
+							String jsonString = new String(data, "UTF-8");
+							jsonObj = JSONUtils.parseJSONFromString(jsonString);
+						}	
+					} catch (IOException e) {
+						throw new SQLException("Error reconstructing trigger data " );
+					}
+				}
+				
+				Trigger t = null;
+				try {
+					t = Trigger.fromJson(jsonObj);
+					triggers.add(t);
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+					logger.error("Failed to load trigger " + triggerId);
+				}
+			} while (rs.next());
+			
+			return triggers;
+		}
+		
+	}
+	
+	private Connection getConnection() throws TriggerManagerException {
+		Connection connection = null;
+		try {
+			connection = super.getDBConnection(false);
+		} catch (Exception e) {
+			DbUtils.closeQuietly(connection);
+			throw new TriggerManagerException("Error getting DB connection.", e);
+		}
+		
+		return connection;
+	}
+
+
+}
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
new file mode 100644
index 0000000..240411b
--- /dev/null
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -0,0 +1,171 @@
+package azkaban.trigger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class Trigger {
+	
+	private static Logger logger = Logger.getLogger(Trigger.class);
+	
+	private int triggerId = -1;
+	private long lastModifyTime = -1;
+	private long submitTime = -1;
+	private String submitUser;
+	
+	private Condition triggerCondition;
+	private Condition expireCondition;
+	private List<TriggerAction> actions;
+	
+	private static ActionTypeLoader actionTypeLoader;
+	
+	private boolean resetOnTrigger = false;
+	private boolean resetOnExpire = false;
+	
+	@SuppressWarnings("unused")
+	private Trigger() throws TriggerManagerException {	
+		throw new TriggerManagerException("Triggers should always be specified");
+	}
+	
+	public Trigger(
+			long lastModifyTime, 
+			long submitTime, 
+			String submitUser, 
+			Condition triggerCondition,
+			Condition expireCondition,
+			List<TriggerAction> actions) {
+		this.lastModifyTime = lastModifyTime;
+		this.submitTime = submitTime;
+		this.submitUser = submitUser;
+		this.triggerCondition = triggerCondition;
+		this.expireCondition = expireCondition;
+		this.actions = actions;
+	}
+	
+	public Trigger(
+			int triggerId,
+			long lastModifyTime, 
+			long submitTime, 
+			String submitUser, 
+			Condition triggerCondition,
+			Condition expireCondition,
+			List<TriggerAction> actions) {
+		this.triggerId = triggerId;
+		this.lastModifyTime = lastModifyTime;
+		this.submitTime = submitTime;
+		this.submitUser = submitUser;
+		this.triggerCondition = triggerCondition;
+		this.expireCondition = expireCondition;
+		this.actions = actions;
+	}
+	
+	public static synchronized void setActionTypeLoader(ActionTypeLoader loader) {
+		Trigger.actionTypeLoader = loader;
+	}
+	
+	public boolean isResetOnTrigger() {
+		return resetOnTrigger;
+	}
+	
+	public void setResetOnTrigger(boolean resetOnTrigger) {
+		this.resetOnTrigger = resetOnTrigger;
+	}
+	
+	public boolean isResetOnExpire() {
+		return resetOnExpire;
+	}
+	
+	public void setResetOnExpire(boolean resetOnExpire) {
+		this.resetOnExpire = resetOnExpire;
+	}
+
+	public long getLastModifyTime() {
+		return lastModifyTime;
+	}
+
+	public void setTriggerId(int id) {
+		this.triggerId = id;
+	}
+	
+	public int getTriggerId() {
+		return triggerId;
+	}
+
+	public boolean triggerConditionMet(){
+		return triggerCondition.isMet();
+	}
+	
+	public boolean expireConditionMet(){
+		return expireCondition.isMet();
+	}
+	
+	public void resetTriggerConditions() {
+		triggerCondition.resetCheckers();
+	}
+	
+	public List<TriggerAction> getTriggerActions () {
+		return actions;
+	}
+	
+	public Map<String, Object> toJson() {
+		Map<String, Object> jsonObj = new HashMap<String, Object>();
+		jsonObj.put("triggerCondition", triggerCondition.toJson());
+		jsonObj.put("expireCondition", expireCondition.toJson());
+		List<Object> actionsJson = new ArrayList<Object>();
+		for(TriggerAction action : actions) {
+			Map<String, Object> oneActionJson = new HashMap<String, Object>();
+			oneActionJson.put("type", action.getType());
+			oneActionJson.put("actionJson", action.toJson());
+			actionsJson.add(oneActionJson);
+		}
+		jsonObj.put("actions", actionsJson);
+		jsonObj.put("resetOnTrigger", String.valueOf(resetOnTrigger));
+		jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
+		jsonObj.put("submitUser", submitUser);
+		jsonObj.put("submitTime", String.valueOf(submitTime));
+		jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime));
+		jsonObj.put("triggerId", String.valueOf(triggerId));
+		
+		return jsonObj;
+	}
+	
+	
+	@SuppressWarnings("unchecked")
+	public static Trigger fromJson(Object obj) {
+		
+		Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+		
+		Trigger trigger = null;
+		try{
+			Condition triggerCond = Condition.fromJson(jsonObj.get("triggerCondition"));
+			Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
+			List<TriggerAction> actions = new ArrayList<TriggerAction>();
+			List<Object> actionsJson = (List<Object>) jsonObj.get("actions");
+			for(Object actObj : actionsJson) {
+				Map<String, Object> oneActionJson = (HashMap<String, Object>) actObj;
+				String type = (String) oneActionJson.get("type");
+				TriggerAction act = actionTypeLoader.createActionFromJson(type, oneActionJson.get("actionJson"));
+				actions.add(act);
+			}
+			boolean resetOnTrigger = Boolean.valueOf((String) jsonObj.get("resetOnTrigger"));
+			boolean resetOnExpire = Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
+			String submitUser = (String) jsonObj.get("submitUser");
+			long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
+			long lastModifyTime = Long.valueOf((String) jsonObj.get("lastModifyTime"));
+			int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
+			trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, triggerCond, expireCond, actions);
+			trigger.setResetOnExpire(resetOnExpire);
+			trigger.setResetOnTrigger(resetOnTrigger);
+		}catch(Exception e) {
+			e.printStackTrace();
+			logger.error("Failed to decode the trigger.", e);
+			return null;
+		}
+		
+		return trigger;
+	}
+	
+}
diff --git a/src/java/azkaban/trigger/TriggerAction.java b/src/java/azkaban/trigger/TriggerAction.java
new file mode 100644
index 0000000..581b3fa
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerAction.java
@@ -0,0 +1,13 @@
+package azkaban.trigger;
+
+public interface TriggerAction {
+	
+	String getType();
+	
+	TriggerAction fromJson(Object obj);
+	
+	Object toJson();
+	
+	void doAction();
+	
+}
diff --git a/src/java/azkaban/trigger/TriggerException.java b/src/java/azkaban/trigger/TriggerException.java
new file mode 100644
index 0000000..e49fb2a
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerException.java
@@ -0,0 +1,19 @@
+package azkaban.trigger;
+
+
+public class TriggerException extends Exception{
+	private static final long serialVersionUID = 1L;
+
+	public TriggerException(String message) {
+		super(message);
+	}
+	
+	public TriggerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public TriggerException(Throwable e) {
+		super(e);
+	}
+}
+
diff --git a/src/java/azkaban/trigger/TriggerLoader.java b/src/java/azkaban/trigger/TriggerLoader.java
new file mode 100644
index 0000000..cf37634
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerLoader.java
@@ -0,0 +1,18 @@
+package azkaban.trigger;
+
+import java.util.List;
+import java.util.Map;
+
+
+
+public interface TriggerLoader {
+
+	public void addTrigger(Trigger t) throws TriggerManagerException;	
+
+	public void removeTrigger(Trigger s) throws TriggerManagerException;
+	
+	public void updateTrigger(Trigger t) throws TriggerManagerException;
+	
+	public List<Trigger> loadTriggers() throws TriggerManagerException;	
+	
+}
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
new file mode 100644
index 0000000..6f613cc
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -0,0 +1,167 @@
+package azkaban.trigger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+
+public class TriggerManager {
+	private static Logger logger = Logger.getLogger(TriggerManager.class);
+	
+	private final long DEFAULT_TRIGGER_EXPIRE_TIME = 24*60*60*1000L;
+	
+	private List<Trigger> triggers;
+	
+	TriggerScannerThread scannerThread;
+	
+	public TriggerManager(Props props, TriggerLoader triggerLoader, CheckerTypeLoader checkerLoader, ActionTypeLoader actionLoader) {
+		
+		// load plugins
+		try{
+			checkerLoader.init(props);
+			actionLoader.init(props);
+		} catch(Exception e) {
+			e.printStackTrace();
+			logger.error(e.getMessage());
+		}
+		
+		Condition.setCheckerLoader(checkerLoader);
+		Trigger.setActionTypeLoader(actionLoader);
+		
+		try{
+			// expect loader to return valid triggers
+			triggers = triggerLoader.loadTriggers();
+		}catch(Exception e) {
+			e.printStackTrace();
+			logger.error(e.getMessage());
+		}
+		
+		scannerThread = new TriggerScannerThread("TriggerScannerThread");
+		
+		for(Trigger t : triggers) {
+			scannerThread.addTrigger(t);
+		}
+		
+		scannerThread.start();
+	}
+	
+	public synchronized void insertTrigger(Trigger t) {
+		triggers.add(t);
+		scannerThread.addTrigger(t);
+	}
+	
+	public synchronized void removeTrigger(Trigger t) {
+		scannerThread.removeTrigger(t);
+		triggers.remove(t);		
+	}
+	
+	public List<Trigger> getTriggers() {
+		return new ArrayList<Trigger>(triggers);
+	}
+
+	//trigger scanner thread
+	public class TriggerScannerThread extends Thread {
+		private final BlockingQueue<Trigger> triggers;
+		private AtomicBoolean stillAlive = new AtomicBoolean(true);
+		private String scannerName;
+		private long lastCheckTime = -1;
+		
+		// Five minute minimum intervals
+		private static final int TIMEOUT_MS = 300000;
+		
+		public TriggerScannerThread(String scannerName){
+			triggers = new LinkedBlockingDeque<Trigger>();
+			this.setName(scannerName);
+		}
+		
+		public void shutdown() {
+			logger.error("Shutting down trigger manager thread " + scannerName);
+			stillAlive.set(false);
+			this.interrupt();
+		}
+		
+		public synchronized List<Trigger> getTriggers() {
+			return new ArrayList<Trigger>(triggers);
+		}
+		
+		public synchronized void addTrigger(Trigger t) {
+			triggers.add(t);
+		}
+		
+		public synchronized void removeTrigger(Trigger t) {
+			triggers.remove(t);
+		}
+		
+		public void run() {
+			while(stillAlive.get()) {
+				synchronized (this) {
+					try{
+						lastCheckTime = System.currentTimeMillis();
+						
+						try{
+							checkAllTriggers();
+						} catch(Exception e) {
+							e.printStackTrace();
+							logger.error(e.getMessage());
+						} catch(Throwable t) {
+							t.printStackTrace();
+							logger.error(t.getMessage());
+						}
+						
+						long timeRemaining = TIMEOUT_MS - (System.currentTimeMillis() - lastCheckTime);
+						if(timeRemaining < 0) {
+							logger.error("Trigger manager thread " + scannerName + " is too busy!");
+						} else {
+							wait(timeRemaining);
+						}
+					} catch(InterruptedException e) {
+						logger.info("Interrupted. Probably to shut down.");
+					}
+					
+				}
+			}
+		}
+		
+		private void checkAllTriggers() {
+			for(Trigger t : triggers) {
+				if(t.triggerConditionMet()) {
+					onTriggerTrigger(t);
+				} else if (t.expireConditionMet()) {
+					onTriggerExpire(t);
+				}
+			}
+			
+		}
+		
+		private void onTriggerTrigger(Trigger t) {
+			List<TriggerAction> actions = t.getTriggerActions();
+			for(TriggerAction action : actions) {
+				action.doAction();
+			}
+			if(t.isResetOnTrigger()) {
+				t.resetTriggerConditions();
+			} else {
+				triggers.remove(t);
+			}
+		}
+		
+		private void onTriggerExpire(Trigger t) {
+			if(t.isResetOnExpire()) {
+				t.resetTriggerConditions();
+			} else {
+				triggers.remove(t);
+			}
+		}
+	}
+	
+	public TriggerAction createTriggerAction() {
+		return null;
+	}
+
+}
diff --git a/src/java/azkaban/trigger/TriggerManagerException.java b/src/java/azkaban/trigger/TriggerManagerException.java
new file mode 100644
index 0000000..5d30b39
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManagerException.java
@@ -0,0 +1,31 @@
+package azkaban.trigger;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+public class TriggerManagerException extends Exception{
+	private static final long serialVersionUID = 1L;
+
+	public TriggerManagerException(String message) {
+		super(message);
+	}
+	
+	public TriggerManagerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}
+
diff --git a/src/sql/create.triggers.sql b/src/sql/create.triggers.sql
new file mode 100644
index 0000000..f75b727
--- /dev/null
+++ b/src/sql/create.triggers.sql
@@ -0,0 +1,7 @@
+CREATE TABLE triggers (
+	trigger_id INT NOT NULL AUTO_INCREMENT,
+	modify_time BIGINT NOT NULL,
+	enc_type TINYINT,
+	data LONGBLOB NOT NULL,
+	PRIMARY KEY (trigger_id)
+);
diff --git a/unit/java/azkaban/test/trigger/ConditionTest.java b/unit/java/azkaban/test/trigger/ConditionTest.java
new file mode 100644
index 0000000..2cf8ca9
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/ConditionTest.java
@@ -0,0 +1,130 @@
+package azkaban.test.trigger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import org.apache.commons.jexl2.JexlEngine;
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+import org.junit.Test;
+
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.BasicTimeChecker;
+import azkaban.utils.Props;
+
+public class ConditionTest {
+
+	private JexlEngine jexl = new JexlEngine();
+	
+	@Test
+	public void conditionTest(){
+		
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		DateTime now = DateTime.now();
+		FakeTimeChecker fake1 = new FakeTimeChecker(now);
+		FakeTimeChecker fake2 = new FakeTimeChecker(now.plusMinutes(1));
+		checkers.put(fake1.getId(), fake1);
+		checkers.put(fake2.getId(), fake2);
+
+		String expr1 = "( " + fake1.getId()+ ".eval()" + " && " + fake2.getId()+ ".eval()" + " )" + " || " + "( " + "!" + fake1.getId()+".eval()" + " && " + fake2.getId()+".eval()" + " )";
+		String expr2 = "( " + fake1.getId()+ ".eval()" + " && " + fake2.getId()+ ".eval()" + " )" + " || " + "( " + fake1.getId()+".eval()" + " && " + fake2.getId()+".eval()" + " )";
+
+		Condition cond = new Condition(checkers, expr1);
+
+		System.out.println("Setting expression " + expr1);
+		assertTrue(cond.isMet());
+		cond.setExpression(expr2);
+		System.out.println("Setting expression " + expr2);
+		assertFalse(cond.isMet());
+		
+	}
+	
+	@Test
+	public void timeCheckerTest(){
+		
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		
+		// get a new timechecker, start from now, repeat every minute. should evaluate to false now, and true a minute later.
+		DateTime now = DateTime.now();
+		ReadablePeriod period = BasicTimeChecker.parsePeriodString("10s");
+		
+		BasicTimeChecker timeChecker = new BasicTimeChecker(now, true, true, period);
+		checkers.put(timeChecker.getId(), timeChecker);
+		String expr = timeChecker.getId() + ".eval()";
+		
+		Condition cond = new Condition(checkers, expr);
+		System.out.println(expr);
+		
+		assertFalse(cond.isMet());
+		
+		//sleep for 1 min
+		try {
+			Thread.sleep(10000);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		assertTrue(cond.isMet());
+		
+		cond.resetCheckers();
+		
+		assertFalse(cond.isMet());
+		
+		//sleep for 1 min
+		try {
+			Thread.sleep(10000);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		assertTrue(cond.isMet());
+		
+	}
+	
+	@Test
+	public void BasicTimeCheckerTest() {
+		
+		CheckerTypeLoader checkerTypeLoader = new CheckerTypeLoader();
+		checkerTypeLoader.init(new Props());
+		Condition.setCheckerLoader(checkerTypeLoader);
+
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		
+		// get a new timechecker, start from now, repeat every minute. should evaluate to false now, and true a minute later.
+		DateTime now = DateTime.now();
+		String period = "6s";
+		
+		//BasicTimeChecker timeChecker = new BasicTimeChecker(now, true, true, period);
+		ConditionChecker timeChecker = checkerTypeLoader.createChecker(BasicTimeChecker.type, now, true, true, period);
+		System.out.println("checker id is " + timeChecker.getId());
+		
+		checkers.put(timeChecker.getId(), timeChecker);
+		String expr = timeChecker.getId() + ".eval()";
+		
+		Condition cond = new Condition(checkers, expr);
+		
+		Object json = cond.toJson();
+		Condition cond2 = Condition.fromJson(json);
+		
+		Map<String, ConditionChecker> checkers1 = cond.getCheckers();
+		Map<String, ConditionChecker> checkers2 = cond2.getCheckers();
+		
+		assertTrue(cond.getExpression().equals(cond2.getExpression()));
+		System.out.println("cond1: " + cond.getExpression());
+		System.out.println("cond2: " + cond2.getExpression());
+		assertTrue(checkers2.size() == 1);
+		ConditionChecker checker2 = checkers2.get(timeChecker.getId());
+		//assertTrue(checker2.getId().equals(timeChecker.getId()));
+		System.out.println("checker1: " + timeChecker.getId());
+		System.out.println("checker2: " + checker2.getId());
+		assertTrue(timeChecker.getId().equals(checker2.getId()));
+	}
+	
+}
diff --git a/unit/java/azkaban/test/trigger/DummyTriggerAction.java b/unit/java/azkaban/test/trigger/DummyTriggerAction.java
new file mode 100644
index 0000000..5fb0b4c
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/DummyTriggerAction.java
@@ -0,0 +1,38 @@
+package azkaban.test.trigger;
+
+import azkaban.trigger.TriggerAction;
+
+public class DummyTriggerAction implements TriggerAction{
+
+	public static final String type = "DummyAction";
+	
+	private String message;
+	
+	public DummyTriggerAction(String message) {
+		this.message = message;
+	}
+	
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	@Override
+	public TriggerAction fromJson(Object obj) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public Object toJson() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void doAction() {
+		System.out.println(getType() + " invoked.");
+		System.out.println(message);
+	}
+
+}
diff --git a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
new file mode 100644
index 0000000..00ee02e
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
@@ -0,0 +1,41 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.*;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+import azkaban.executor.ExecutionOptions;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.ExecuteFlowAction;
+import azkaban.trigger.TriggerAction;
+import azkaban.utils.Props;
+
+
+public class ExecuteFlowActionTest {
+	
+	@Test
+	public void ExecuteFlowActionTest() throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+		ActionTypeLoader loader = new ActionTypeLoader();
+		loader.init(new Props());
+		
+		ExecutionOptions options = new ExecutionOptions();
+		List<String> disabledJobs = new ArrayList<String>();
+		options.setDisabledJobs(disabledJobs);
+		
+		ExecuteFlowAction executeFlowAction = new ExecuteFlowAction(1, "testflow", "azkaban", options);
+		
+		Object obj = executeFlowAction.toJson();
+		
+		ExecuteFlowAction action = (ExecuteFlowAction) loader.createActionFromJson(ExecuteFlowAction.type, obj);
+		assertTrue(executeFlowAction.getProjectId() == action.getProjectId());
+		assertTrue(executeFlowAction.getFlowName().equals(action.getFlowName()));
+		assertTrue(executeFlowAction.getSubmitUser().equals(action.getSubmitUser()));
+	}
+
+	
+	
+}
diff --git a/unit/java/azkaban/test/trigger/FakeTimeChecker.java b/unit/java/azkaban/test/trigger/FakeTimeChecker.java
new file mode 100644
index 0000000..b4396a8
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/FakeTimeChecker.java
@@ -0,0 +1,74 @@
+package azkaban.test.trigger;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import azkaban.trigger.ConditionChecker;
+
+
+public class FakeTimeChecker implements ConditionChecker{
+	
+	private DateTime timeToCheck;
+	private String message;
+	
+	public static final String type = "FakeTimeChecker";
+	
+	public FakeTimeChecker(DateTime timeToCheck){
+		this.timeToCheck = timeToCheck;
+	}
+	
+	@Override
+	public Boolean eval() {
+		return timeToCheck.isAfterNow();
+	}
+
+	@Override
+	public void reset() {
+		// TODO Auto-generated method stub
+		
+	}
+	
+	/*
+	 * TimeChecker format:
+	 * type_first-time-in-millis_next-time-in-millis_timezone_is-recurring_skip-past-checks_period
+	 */
+	@Override
+	public String getId() {
+		return getType() + "_" +
+				timeToCheck.getMillis() + "_" +
+				timeToCheck.getZone().getShortName(timeToCheck.getMillis() );
+	}
+
+	@Override
+	public String getType() {
+		return type;
+	}
+
+	@Override
+	public ConditionChecker fromJson(Object obj) {
+		String str = (String) obj;
+		String[] parts = str.split("_");
+		
+		if(!parts[0].equals(getType())) {
+			throw new RuntimeException("Cannot create checker of " + getType() + " from " + parts[0]);
+		}
+		
+		long timeToCheckMillis = Long.parseLong(parts[1]);
+		DateTimeZone timezone = DateTimeZone.forID(parts[2]);
+		
+		return new FakeTimeChecker(new DateTime(timeToCheckMillis, timezone));
+	}
+
+	@Override
+	public Object getNum() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public Object toJson() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
new file mode 100644
index 0000000..56a9117
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -0,0 +1,120 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.utils.Props;
+
+public class TriggerManagerTest {
+	
+	@Before
+	public void setup() {
+
+	}
+	
+	@After
+	public void tearDown() {
+		
+	}
+	
+	@Test
+	public void TriggerManagerSimpleTest() {
+		Props props = new Props();
+		TriggerManager triggerManager = new TriggerManager(props, new MockTriggerLoader(), new MockCheckerLoader(), new MockActionLoader());
+		List<Trigger> triggers = triggerManager.getTriggers();
+		assertTrue(triggers.size() == 1);
+		
+		Trigger t2 = createFakeTrigger("addnewtriggger");
+		triggerManager.insertTrigger(t2);
+		
+		triggers = triggerManager.getTriggers();
+		assertTrue(triggers.size() == 2);
+		
+		triggerManager.removeTrigger(t2);
+		triggers = triggerManager.getTriggers();
+		assertTrue(triggers.size() == 1);
+	}
+	
+	public class MockTriggerLoader implements TriggerLoader {
+
+		private List<Trigger> triggers;
+		
+		@Override
+		public void addTrigger(Trigger t) throws TriggerManagerException {
+			triggers.add(t);			
+		}
+
+		@Override
+		public void removeTrigger(Trigger s) throws TriggerManagerException {
+			triggers.remove(s);
+			
+		}
+
+		@Override
+		public void updateTrigger(Trigger t) throws TriggerManagerException {
+
+		}
+
+		@Override
+		public List<Trigger> loadTriggers()
+				throws TriggerManagerException {
+			Trigger t = createFakeTrigger("test");
+			triggers = new ArrayList<Trigger>();
+			triggers.add(t);
+			return triggers;
+		}
+		
+	}
+	
+	private Trigger createFakeTrigger(String message) {
+		
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		
+		List<TriggerAction> actions = new ArrayList<TriggerAction>();
+		TriggerAction act  = new DummyTriggerAction(message);
+		actions.add(act);
+		
+		String expr = "true";
+		
+		Condition triggerCond = new Condition(checkers, expr);
+		Condition expireCond = new Condition(checkers, expr);
+		
+		Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", triggerCond, expireCond, actions);
+		
+		return fakeTrigger;
+	}
+
+	public class MockCheckerLoader extends CheckerTypeLoader{
+		
+		@Override
+		public void init(Props props) {
+			checkerToClass.put(FakeTimeChecker.type, FakeTimeChecker.class);
+		}
+	}
+	
+	public class MockActionLoader extends ActionTypeLoader {
+		@Override
+		public void init(Props props) {
+			actionToClass.put(DummyTriggerAction.type, DummyTriggerAction.class);
+		}
+	}
+
+}