azkaban-uncached
Changes
lib/commons-jexl-2.1.1.jar 0(+0 -0)
src/java/azkaban/trigger/ActionTypeLoader.java 148(+148 -0)
src/java/azkaban/trigger/BasicTimeChecker.java 232(+232 -0)
src/java/azkaban/trigger/CheckerTypeLoader.java 152(+152 -0)
src/java/azkaban/trigger/Condition.java 117(+117 -0)
src/java/azkaban/trigger/ExecuteFlowAction.java 177(+177 -0)
src/java/azkaban/trigger/JdbcTriggerLoader.java 299(+299 -0)
src/java/azkaban/trigger/Trigger.java 171(+171 -0)
src/java/azkaban/trigger/TriggerAction.java 13(+13 -0)
src/java/azkaban/trigger/TriggerLoader.java 18(+18 -0)
src/java/azkaban/trigger/TriggerManager.java 167(+167 -0)
src/sql/create.triggers.sql 7(+7 -0)
unit/java/azkaban/test/trigger/ConditionTest.java 130(+130 -0)
Details
lib/commons-jexl-2.1.1.jar 0(+0 -0)
diff --git a/lib/commons-jexl-2.1.1.jar b/lib/commons-jexl-2.1.1.jar
new file mode 100644
index 0000000..ab288a8
Binary files /dev/null and b/lib/commons-jexl-2.1.1.jar differ
src/java/azkaban/trigger/ActionTypeLoader.java 148(+148 -0)
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
new file mode 100644
index 0000000..87773ed
--- /dev/null
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -0,0 +1,148 @@
+package azkaban.trigger;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+public class ActionTypeLoader {
+
+ private static Logger logger = Logger.getLogger(ActionTypeLoader.class);
+
+ public static final String DEFAULT_TRIGGER_ACTION_PLUGIN_DIR = "plugins/triggeractions";
+ private static final String ACTIONTYPECONFFILE = "plugin.properties"; // need jars.to.include property, will be loaded with user property
+ private static final String COMMONCONFFILE = "common.properties"; // common properties for multiple plugins
+
+ protected static Map<String, Class<? extends TriggerAction>> actionToClass = new HashMap<String, Class<? extends TriggerAction>>();
+
+ public void init(Props props) throws TriggerException {
+ // load built-in actions
+
+
+ loadDefaultActions();
+
+ loadPluginActions(props);
+
+ }
+
+ private void loadPluginActions(Props props) throws TriggerException {
+ String checkerDir = props.getString("azkaban.trigger.action.plugin.dir", DEFAULT_TRIGGER_ACTION_PLUGIN_DIR);
+ File pluginDir = new File(checkerDir);
+ if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
+ logger.info("No trigger action plugins to load.");
+ return;
+ }
+
+ logger.info("Loading plugin trigger actions from " + pluginDir);
+ ClassLoader parentCl = this.getClass().getClassLoader();
+
+ Props globalActionConf = null;
+ File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
+ try {
+ if(confFile != null) {
+ globalActionConf = new Props(null, confFile);
+ } else {
+ globalActionConf = new Props();
+ }
+ } catch (IOException e) {
+ throw new TriggerException("Failed to get global properties." + e);
+ }
+
+ for(File dir : pluginDir.listFiles()) {
+ if(dir.isDirectory() && dir.canRead()) {
+ try {
+ loadPluginTypes(globalActionConf, pluginDir, parentCl);
+ } catch (Exception e) {
+ logger.info("Plugin actions failed to load. " + e.getCause());
+ throw new TriggerException("Failed to load all trigger actions!", e);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
+ Props actionConf = null;
+ File confFile = Utils.findFilefromDir(dir, ACTIONTYPECONFFILE);
+ if(confFile == null) {
+ logger.info("No action type found in " + dir.getAbsolutePath());
+ return;
+ }
+ try {
+ actionConf = new Props(globalConf, confFile);
+ } catch (IOException e) {
+ throw new TriggerException("Failed to load config for the action type", e);
+ }
+
+ String actionName = dir.getName();
+ String actionClass = actionConf.getString("action.class");
+
+ List<URL> resources = new ArrayList<URL>();
+ for(File f : dir.listFiles()) {
+ try {
+ if(f.getName().endsWith(".jar")) {
+ resources.add(f.toURI().toURL());
+ logger.info("adding to classpath " + f.toURI().toURL());
+ }
+ } catch (MalformedURLException e) {
+ // TODO Auto-generated catch block
+ throw new TriggerException(e);
+ }
+ }
+
+ // each job type can have a different class loader
+ ClassLoader actionCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
+
+ Class<? extends TriggerAction> clazz = null;
+ try {
+ clazz = (Class<? extends TriggerAction>)actionCl.loadClass(actionClass);
+ actionToClass.put(actionName, clazz);
+ }
+ catch (ClassNotFoundException e) {
+ throw new TriggerException(e);
+ }
+
+ if(actionConf.getBoolean("need.init")) {
+ try {
+ Utils.invokeStaticMethod(actionCl, actionClass, "init", actionConf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("Failed to init the action type " + actionName);
+ throw new TriggerException(e);
+ }
+ }
+
+ logger.info("Loaded action type " + actionName + " " + actionClass);
+ }
+
+ private void loadDefaultActions() {
+ actionToClass.put(ExecuteFlowAction.type, ExecuteFlowAction.class);
+ }
+
+ public TriggerAction createActionFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ TriggerAction action = null;
+ Class<? extends TriggerAction> actionClass = actionToClass.get(type);
+ action = (TriggerAction) Utils.invokeStaticMethod(actionClass.getClassLoader(), actionClass.getName(), "createFromJson", obj);
+
+ return action;
+ }
+
+ public TriggerAction createAction(String type, Object ... args) {
+ TriggerAction action = null;
+ Class<? extends TriggerAction> actionClass = actionToClass.get(type);
+ action = (TriggerAction) Utils.callConstructor(actionClass, args);
+
+ return action;
+ }
+}
src/java/azkaban/trigger/BasicTimeChecker.java 232(+232 -0)
diff --git a/src/java/azkaban/trigger/BasicTimeChecker.java b/src/java/azkaban/trigger/BasicTimeChecker.java
new file mode 100644
index 0000000..ab0b94f
--- /dev/null
+++ b/src/java/azkaban/trigger/BasicTimeChecker.java
@@ -0,0 +1,232 @@
+package azkaban.trigger;
+
+import java.util.TimeZone;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.Months;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.Weeks;
+import org.joda.time.tz.DateTimeZoneBuilder;
+
+public class BasicTimeChecker implements ConditionChecker {
+
+ private DateTime firstCheckTime;
+ private DateTime nextCheckTime;
+ private boolean isRecurring = true;
+ private boolean skipPastChecks = true;
+ private ReadablePeriod period;
+ private String message;
+
+ public static final String type = "BasicTimeChecker";
+
+ public BasicTimeChecker(
+ DateTime firstCheckTime,
+ boolean isRecurring,
+ boolean skipPastChecks,
+ ReadablePeriod period) {
+ this.firstCheckTime = firstCheckTime;
+ this.isRecurring = isRecurring;
+ this.skipPastChecks = skipPastChecks;
+ this.period = period;
+ this.nextCheckTime = new DateTime(firstCheckTime);
+ this.nextCheckTime = getNextCheckTime();
+ }
+
+ public BasicTimeChecker(
+ DateTime firstCheckTime,
+ Boolean isRecurring,
+ Boolean skipPastChecks,
+ String period) {
+ this.firstCheckTime = firstCheckTime;
+ this.isRecurring = isRecurring;
+ this.skipPastChecks = skipPastChecks;
+ this.period = parsePeriodString(period);
+ this.nextCheckTime = new DateTime(firstCheckTime);
+ this.nextCheckTime = getNextCheckTime();
+ }
+
+ public BasicTimeChecker(
+ DateTime firstCheckTime,
+ DateTime nextCheckTime,
+ boolean isRecurring,
+ boolean skipPastChecks,
+ ReadablePeriod period) {
+ this.firstCheckTime = firstCheckTime;
+ this.nextCheckTime = nextCheckTime;
+ this.isRecurring = isRecurring;
+ this.skipPastChecks = skipPastChecks;
+ this.period = period;
+ }
+
+ @Override
+ public Boolean eval() {
+ return nextCheckTime.isBeforeNow();
+ }
+
+ @Override
+ public void reset() {
+ this.nextCheckTime = getNextCheckTime();
+
+ }
+
+ /*
+ * TimeChecker format:
+ * type_first-time-in-millis_next-time-in-millis_timezone_is-recurring_skip-past-checks_period
+ */
+ @Override
+ public String getId() {
+ return getType() + "$" +
+ firstCheckTime.getMillis() + "$" +
+ nextCheckTime.getMillis() + "$" +
+ firstCheckTime.getZone().getID().replace('/', '0') + "$" +
+ //"offset"+firstCheckTime.getZone().getOffset(firstCheckTime.getMillis()) + "_" +
+ (isRecurring == true ? "1" : "0") + "$" +
+ (skipPastChecks == true ? "1" : "0") + "$" +
+ createPeriodString(period);
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ public static ConditionChecker createFromJson(String obj) {
+ String str = (String) obj;
+ String[] parts = str.split("\\$");
+
+ if(!parts[0].equals(type)) {
+ throw new RuntimeException("Cannot create checker of " + type + " from " + parts[0]);
+ }
+
+ long firstMillis = Long.parseLong(parts[1]);
+ long nextMillis = Long.parseLong(parts[2]);
+ //DateTimeZone timezone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(parts[3]));
+ DateTimeZone timezone = DateTimeZone.forID(parts[3].replace('0', '/'));
+ boolean isRecurring = parts[4].equals("1") ? true : false;
+ boolean skipPastChecks = parts[5].equals("1") ? true : false;
+ ReadablePeriod period = parsePeriodString(parts[6]);
+
+ return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
+ }
+
+ @Override
+ public ConditionChecker fromJson(Object obj) {
+ String str = (String) obj;
+ String[] parts = str.split("_");
+
+ if(!parts[0].equals(getType())) {
+ throw new RuntimeException("Cannot create checker of " + getType() + " from " + parts[0]);
+ }
+
+ long firstMillis = Long.parseLong(parts[1]);
+ long nextMillis = Long.parseLong(parts[2]);
+ DateTimeZone timezone = DateTimeZone.forID(parts[3]);
+ boolean isRecurring = Boolean.valueOf(parts[4]);
+ boolean skipPastChecks = Boolean.valueOf(parts[5]);
+ ReadablePeriod period = parsePeriodString(parts[6]);
+
+ return new BasicTimeChecker(new DateTime(firstMillis, timezone), new DateTime(nextMillis, timezone), isRecurring, skipPastChecks, period);
+ }
+
+ private DateTime getNextCheckTime(){
+ DateTime date = new DateTime(nextCheckTime);
+ int count = 0;
+ while(!DateTime.now().isBefore(date) && skipPastChecks) {
+ if(count > 100000) {
+ throw new IllegalStateException("100000 increments of period did not get to present time.");
+ }
+
+ if(period == null) {
+ break;
+ }else {
+ date = date.plus(period);
+ }
+ count += 1;
+ }
+ return date;
+ }
+
+ public static ReadablePeriod parsePeriodString(String periodStr) {
+ ReadablePeriod period;
+ char periodUnit = periodStr.charAt(periodStr.length() - 1);
+ if (periodUnit == 'n') {
+ return null;
+ }
+
+ int periodInt = Integer.parseInt(periodStr.substring(0,
+ periodStr.length() - 1));
+ switch (periodUnit) {
+ case 'M':
+ period = Months.months(periodInt);
+ break;
+ case 'w':
+ period = Weeks.weeks(periodInt);
+ break;
+ case 'd':
+ period = Days.days(periodInt);
+ break;
+ case 'h':
+ period = Hours.hours(periodInt);
+ break;
+ case 'm':
+ period = Minutes.minutes(periodInt);
+ break;
+ case 's':
+ period = Seconds.seconds(periodInt);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid schedule period unit '"
+ + periodUnit);
+ }
+
+ return period;
+ }
+
+ public static String createPeriodString(ReadablePeriod period) {
+ String periodStr = "n";
+
+ if (period == null) {
+ return "n";
+ }
+
+ if (period.get(DurationFieldType.months()) > 0) {
+ int months = period.get(DurationFieldType.months());
+ periodStr = months + "M";
+ } else if (period.get(DurationFieldType.weeks()) > 0) {
+ int weeks = period.get(DurationFieldType.weeks());
+ periodStr = weeks + "w";
+ } else if (period.get(DurationFieldType.days()) > 0) {
+ int days = period.get(DurationFieldType.days());
+ periodStr = days + "d";
+ } else if (period.get(DurationFieldType.hours()) > 0) {
+ int hours = period.get(DurationFieldType.hours());
+ periodStr = hours + "h";
+ } else if (period.get(DurationFieldType.minutes()) > 0) {
+ int minutes = period.get(DurationFieldType.minutes());
+ periodStr = minutes + "m";
+ } else if (period.get(DurationFieldType.seconds()) > 0) {
+ int seconds = period.get(DurationFieldType.seconds());
+ periodStr = seconds + "s";
+ }
+
+ return periodStr;
+ }
+
+ @Override
+ public Object getNum() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object toJson() {
+ return getId();
+ }
+
+}
src/java/azkaban/trigger/CheckerTypeLoader.java 152(+152 -0)
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
new file mode 100644
index 0000000..18e3e7b
--- /dev/null
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -0,0 +1,152 @@
+package azkaban.trigger;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+
+public class CheckerTypeLoader {
+
+ private static Logger logger = Logger.getLogger(CheckerTypeLoader.class);
+
+ public static final String DEFAULT_CONDITION_CHECKER_PLUGIN_DIR = "plugins/conditioncheckers";
+ private static final String CHECKERTYPECONFFILE = "plugin.properties"; // need jars.to.include property, will be loaded with user property
+ private static final String COMMONCONFFILE = "common.properties"; // common properties for multiple plugins
+
+ protected static Map<String, Class<? extends ConditionChecker>> checkerToClass = new HashMap<String, Class<? extends ConditionChecker>>();
+
+ public void init(Props props) throws TriggerException {
+
+
+ // load built-in checkers
+
+ loadDefaultCheckers();
+
+ loadPluginCheckers(props);
+
+ }
+
+ private void loadPluginCheckers(Props props) throws TriggerException {
+
+ String checkerDir = props.getString("azkaban.condition.checker.plugin.dir", DEFAULT_CONDITION_CHECKER_PLUGIN_DIR);
+ File pluginDir = new File(checkerDir);
+ if(!pluginDir.exists() || !pluginDir.isDirectory() || !pluginDir.canRead()) {
+ logger.info("No conditon checker plugins to load.");
+ return;
+ }
+
+ logger.info("Loading plugin condition checkers from " + pluginDir);
+ ClassLoader parentCl = this.getClass().getClassLoader();
+
+ Props globalCheckerConf = null;
+ File confFile = Utils.findFilefromDir(pluginDir, COMMONCONFFILE);
+ try {
+ if(confFile != null) {
+ globalCheckerConf = new Props(null, confFile);
+ } else {
+ globalCheckerConf = new Props();
+ }
+ } catch (IOException e) {
+ throw new TriggerException("Failed to get global properties." + e);
+ }
+
+ for(File dir : pluginDir.listFiles()) {
+ if(dir.isDirectory() && dir.canRead()) {
+ try {
+ loadPluginTypes(globalCheckerConf, pluginDir, parentCl);
+ } catch (Exception e) {
+ logger.info("Plugin checkers failed to load. " + e.getCause());
+ throw new TriggerException("Failed to load all condition checkers!", e);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadPluginTypes(Props globalConf, File dir, ClassLoader parentCl) throws TriggerException {
+ Props checkerConf = null;
+ File confFile = Utils.findFilefromDir(dir, CHECKERTYPECONFFILE);
+ if(confFile == null) {
+ logger.info("No checker type found in " + dir.getAbsolutePath());
+ return;
+ }
+ try {
+ checkerConf = new Props(globalConf, confFile);
+ } catch (IOException e) {
+ throw new TriggerException("Failed to load config for the checker type", e);
+ }
+
+ String checkerName = dir.getName();
+ String checkerClass = checkerConf.getString("checker.class");
+
+ List<URL> resources = new ArrayList<URL>();
+ for(File f : dir.listFiles()) {
+ try {
+ if(f.getName().endsWith(".jar")) {
+ resources.add(f.toURI().toURL());
+ logger.info("adding to classpath " + f.toURI().toURL());
+ }
+ } catch (MalformedURLException e) {
+ // TODO Auto-generated catch block
+ throw new TriggerException(e);
+ }
+ }
+
+ // each job type can have a different class loader
+ ClassLoader checkerCl = new URLClassLoader(resources.toArray(new URL[resources.size()]), parentCl);
+
+ Class<? extends ConditionChecker> clazz = null;
+ try {
+ clazz = (Class<? extends ConditionChecker>)checkerCl.loadClass(checkerClass);
+ checkerToClass.put(checkerName, clazz);
+ }
+ catch (ClassNotFoundException e) {
+ throw new TriggerException(e);
+ }
+
+ if(checkerConf.getBoolean("need.init")) {
+ try {
+ Utils.invokeStaticMethod(checkerCl, checkerClass, "init", checkerConf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("Failed to init the checker type " + checkerName);
+ throw new TriggerException(e);
+ }
+ }
+
+ logger.info("Loaded checker type " + checkerName + " " + checkerClass);
+ }
+
+ private void loadDefaultCheckers() {
+ checkerToClass.put("BasicTimeChecker", BasicTimeChecker.class);
+ }
+
+ public ConditionChecker createCheckerFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ ConditionChecker checker = null;
+ Class<? extends ConditionChecker> checkerClass = checkerToClass.get(type);
+ checker = (ConditionChecker) Utils.invokeStaticMethod(checkerClass.getClassLoader(), checkerClass.getName(), "createFromJson", obj);
+
+ return checker;
+ }
+
+ public ConditionChecker createChecker(String type, Object ... args) {
+ ConditionChecker checker = null;
+ Class<? extends ConditionChecker> checkerClass = checkerToClass.get(type);
+ checker = (ConditionChecker) Utils.callConstructor(checkerClass, args);
+
+ return checker;
+ }
+
+}
src/java/azkaban/trigger/Condition.java 117(+117 -0)
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
new file mode 100644
index 0000000..4c8e078
--- /dev/null
+++ b/src/java/azkaban/trigger/Condition.java
@@ -0,0 +1,117 @@
+package azkaban.trigger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.jexl2.MapContext;
+import org.apache.log4j.Logger;
+
+import com.sun.swing.internal.plaf.synth.resources.synth;
+
+public class Condition {
+
+ private static Logger logger = Logger.getLogger(Condition.class);
+
+ private static JexlEngine jexl = new JexlEngine();
+ private static CheckerTypeLoader checkerLoader = null;
+ private Expression expression;
+ private Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ private MapContext context = new MapContext();
+
+ public Condition(Map<String, ConditionChecker> checkers, String expr) {
+ setCheckers(checkers);
+ this.expression = jexl.createExpression(expr);
+ }
+
+ public synchronized static void setJexlEngine(JexlEngine jexl) {
+ Condition.jexl = jexl;
+ }
+
+ public synchronized static void setCheckerLoader(CheckerTypeLoader loader) {
+ Condition.checkerLoader = loader;
+ }
+
+ public void registerChecker(ConditionChecker checker) {
+ checkers.put(checker.getId(), checker);
+ context.set(checker.getId(), checker);
+ }
+
+ public Map<String, ConditionChecker> getCheckers() {
+ return this.checkers;
+ }
+
+ public void setCheckers(Map<String, ConditionChecker> checkers){
+ this.checkers = checkers;
+ for(ConditionChecker checker : checkers.values()) {
+ this.context.set(checker.getId(), checker);
+ }
+ }
+
+ public void resetCheckers() {
+ for(ConditionChecker checker : checkers.values()) {
+ checker.reset();
+ }
+ }
+
+ public String getExpression() {
+ return this.expression.getExpression();
+ }
+
+ public void setExpression(String expr) {
+ this.expression = jexl.createExpression(expr);
+ }
+
+ public boolean isMet() {
+ return expression.evaluate(context).equals(Boolean.TRUE);
+ }
+
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("expression", expression.getExpression());
+
+ List<Object> checkersJson = new ArrayList<Object>();
+ for(ConditionChecker checker : checkers.values()) {
+ Map<String, Object> oneChecker = new HashMap<String, Object>();
+ oneChecker.put("type", checker.getType());
+ oneChecker.put("checkerJson", checker.toJson());
+ checkersJson.add(oneChecker);
+ }
+ jsonObj.put("checkers", checkersJson);
+
+ return jsonObj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Condition fromJson(Object obj) {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ Condition cond = null;
+
+ try {
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ List<Object> checkersJson = (List<Object>) jsonObj.get("checkers");
+ for(Object oneCheckerJson : checkersJson) {
+ Map<String, Object> oneChecker = (HashMap<String, Object>) oneCheckerJson;
+ String type = (String) oneChecker.get("type");
+ ConditionChecker ck = checkerLoader.createCheckerFromJson(type, oneChecker.get("checkerJson"));
+ checkers.put(ck.getId(), ck);
+ }
+ String expr = (String) jsonObj.get("expression");
+
+ cond = new Condition(checkers, expr);
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ logger.error("Failed to recreate condition from json.", e);
+ return null;
+ }
+
+ return cond;
+ }
+
+
+}
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
new file mode 100644
index 0000000..ad7b0c6
--- /dev/null
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -0,0 +1,20 @@
+package azkaban.trigger;
+
+
+public interface ConditionChecker {
+
+ Object eval();
+
+ Object getNum();
+
+ void reset();
+
+ String getId();
+
+ String getType();
+
+ ConditionChecker fromJson(Object obj);
+
+ Object toJson();
+
+}
src/java/azkaban/trigger/ExecuteFlowAction.java 177(+177 -0)
diff --git a/src/java/azkaban/trigger/ExecuteFlowAction.java b/src/java/azkaban/trigger/ExecuteFlowAction.java
new file mode 100644
index 0000000..35d12cf
--- /dev/null
+++ b/src/java/azkaban/trigger/ExecuteFlowAction.java
@@ -0,0 +1,177 @@
+package azkaban.trigger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+
+public class ExecuteFlowAction implements TriggerAction {
+
+ public static final String type = "ExecuteFlow";
+
+ private static ExecutorManager executorManager;
+ private int projectId;
+ private String flowName;
+ private String submitUser;
+ private static ProjectManager projectManager;
+ private ExecutionOptions executionOptions;
+
+
+ private static Logger logger;
+
+ public ExecuteFlowAction(int projectId, String flowName, String submitUser, ExecutionOptions executionOptions) {
+ this.projectId = projectId;
+ this.flowName = flowName;
+ this.submitUser = submitUser;
+ this.executionOptions = executionOptions;
+ }
+
+ public static void setLogger(Logger logger) {
+ ExecuteFlowAction.logger = logger;
+ }
+
+ public int getProjectId() {
+ return projectId;
+ }
+
+ public void setProjectId(int projectId) {
+ this.projectId = projectId;
+ }
+
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public void setFlowName(String flowName) {
+ this.flowName = flowName;
+ }
+
+ public String getSubmitUser() {
+ return submitUser;
+ }
+
+ public void setSubmitUser(String submitUser) {
+ this.submitUser = submitUser;
+ }
+
+ public ExecutionOptions getExecutionOptions() {
+ return executionOptions;
+ }
+
+ public void setExecutionOptions(ExecutionOptions executionOptions) {
+ this.executionOptions = executionOptions;
+ }
+
+ public static void setExecutorManager(ExecutorManager executorManager) {
+ ExecuteFlowAction.executorManager = executorManager;
+ }
+
+ public static void setProjectManager(ProjectManager projectManager) {
+ ExecuteFlowAction.projectManager = projectManager;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TriggerAction fromJson(Object obj) {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ String type = (String) jsonObj.get("type");
+ if(! type.equals(ExecuteFlowAction.type)) {
+ throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
+ }
+ int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
+ String flowName = (String) jsonObj.get("flowName");
+ String submitUser = (String) jsonObj.get("submitUser");
+ ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
+ return new ExecuteFlowAction(projectId, flowName, submitUser, executionOptions);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static TriggerAction createFromJson(HashMap obj) {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ String type = (String) jsonObj.get("type");
+ if(! type.equals(ExecuteFlowAction.type)) {
+ throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
+ }
+ int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
+ String flowName = (String) jsonObj.get("flowName");
+ String submitUser = (String) jsonObj.get("submitUser");
+ ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
+ return new ExecuteFlowAction(projectId, flowName, submitUser, executionOptions);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static TriggerAction createFromJson(Object obj) {
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+ String type = (String) jsonObj.get("type");
+ if(! type.equals(ExecuteFlowAction.type)) {
+ throw new RuntimeException("Cannot create action of " + ExecuteFlowAction.type + " from " + type);
+ }
+ int projectId = Integer.valueOf((String)jsonObj.get("projectId"));
+ String flowName = (String) jsonObj.get("flowName");
+ String submitUser = (String) jsonObj.get("submitUser");
+ ExecutionOptions executionOptions = ExecutionOptions.createFromObject(jsonObj.get("executionOptions"));
+ return new ExecuteFlowAction(projectId, flowName, submitUser, executionOptions);
+ }
+
+ @Override
+ public Object toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("type", type);
+ jsonObj.put("projectId", String.valueOf(projectId));
+ jsonObj.put("flowName", flowName);
+ jsonObj.put("submitUser", submitUser);
+ jsonObj.put("executionOptions", executionOptions.toObject());
+
+ return jsonObj;
+ }
+
+ @Override
+ public void doAction() {
+ Project project = projectManager.getProject(projectId);
+ if(project == null) {
+ logger.error("Project to execute " + projectId + " does not exist!");
+ throw new RuntimeException("Error finding the project to execute " + projectId);
+ }
+
+ Flow flow = project.getFlow(flowName);
+ if(flow == null) {
+ logger.error("Flow " + flowName + " cannot be found in project " + project.getName());
+ throw new RuntimeException("Error finding the flow to execute " + flowName);
+ }
+
+ ExecutableFlow exflow = new ExecutableFlow(flow);
+ exflow.setSubmitUser(submitUser);
+ exflow.addAllProxyUsers(project.getProxyUsers());
+
+ if(!executionOptions.isFailureEmailsOverridden()) {
+ executionOptions.setFailureEmails(flow.getFailureEmails());
+ }
+ if(!executionOptions.isSuccessEmailsOverridden()) {
+ executionOptions.setSuccessEmails(flow.getSuccessEmails());
+ }
+ exflow.setExecutionOptions(executionOptions);
+
+ try{
+ executorManager.submitExecutableFlow(exflow);
+ logger.info("Invoked flow " + project.getName() + "." + flowName);
+ } catch (ExecutorManagerException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+
+}
src/java/azkaban/trigger/JdbcTriggerLoader.java 299(+299 -0)
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
new file mode 100644
index 0000000..91bf85e
--- /dev/null
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -0,0 +1,299 @@
+package azkaban.trigger;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+
+import azkaban.database.AbstractJdbcLoader;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoader {
+ private static Logger logger = Logger.getLogger(JdbcTriggerLoader.class);
+
+ private EncodingType defaultEncodingType = EncodingType.GZIP;
+
+ private static final String triggerTblName = "triggers";
+
+ private static String GET_ALL_TRIGGERS =
+ "SELECT trigger_id, enc_type, data FROM " + triggerTblName;
+
+ private static String GET_TRIGGER =
+ "SELECT trigger_id, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
+
+ private static String ADD_TRIGGER =
+ "INSERT INTO " + triggerTblName + " ( modify_time, enc_type, data) values (?,?,?)";
+
+ private static String REMOVE_TRIGGER =
+ "DELETE FROM " + triggerTblName + " WHERE trigger_id=?";
+
+ private static String UPDATE_TRIGGER =
+ "UPDATE " + triggerTblName + " SET modify_time=?, enc_type=?, data=? WHERE trigger_id=?";
+
+ public EncodingType getDefaultEncodingType() {
+ return defaultEncodingType;
+ }
+
+ public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+ this.defaultEncodingType = defaultEncodingType;
+ }
+
+ public JdbcTriggerLoader(Props props) {
+ super(props);
+ }
+
+ @Override
+ public List<Trigger> loadTriggers() throws TriggerManagerException {
+ logger.info("Loading all triggers from db.");
+ Connection connection = getConnection();
+
+ QueryRunner runner = new QueryRunner();
+ ResultSetHandler<List<Trigger>> handler = new TriggerResultHandler();
+
+ List<Trigger> triggers;
+
+ try {
+ triggers = runner.query(connection, GET_ALL_TRIGGERS, handler);
+ } catch (SQLException e) {
+ logger.error(GET_ALL_TRIGGERS + " failed.");
+
+ throw new TriggerManagerException("Loading triggers from db failed. ", e);
+ } finally {
+ DbUtils.closeQuietly(connection);
+ }
+
+ logger.info("Loaded " + triggers.size() + " triggers.");
+
+ return triggers;
+ }
+
+ @Override
+ public void removeTrigger(Trigger t) throws TriggerManagerException {
+ logger.info("Removing trigger " + t.toString() + " from db.");
+
+ QueryRunner runner = createQueryRunner();
+ try {
+ int removes = runner.update(REMOVE_TRIGGER, t.getTriggerId());
+ if (removes == 0) {
+ throw new TriggerManagerException("No trigger has been removed.");
+ }
+ } catch (SQLException e) {
+ logger.error(REMOVE_TRIGGER + " failed.");
+ throw new TriggerManagerException("Remove trigger " + t.toString() + " from db failed. ", e);
+ }
+ }
+
+ @Override
+ public void addTrigger(Trigger t) throws TriggerManagerException {
+ logger.info("Inserting trigger " + t.toString() + " into db.");
+ Connection connection = getConnection();
+ try {
+ addTrigger(connection, t, defaultEncodingType);
+ }
+ catch (Exception e) {
+ throw new TriggerManagerException("Error uploading trigger", e);
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ private synchronized void addTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerManagerException {
+
+ String json = JSONUtils.toJSON(t.toJson());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ }
+ catch (IOException e) {
+ throw new TriggerManagerException("Error encoding the trigger " + t.toString());
+ }
+
+ QueryRunner runner = new QueryRunner();
+ long submitTime = System.currentTimeMillis();
+
+ long id;
+
+ try {
+ runner.update(connection, ADD_TRIGGER, DateTime.now().getMillis(), encType.getNumVal(), data);
+ connection.commit();
+ id = runner.query(connection, LastInsertID.LAST_INSERT_ID, new LastInsertID());
+
+ if (id == -1l) {
+ throw new TriggerManagerException("trigger id is not properly created.");
+ }
+ logger.info("uploaded trigger " + t.toString());
+ t.setTriggerId((int)id);
+
+ } catch (SQLException e) {
+ throw new TriggerManagerException("Error creating trigger.", e);
+ }
+
+ }
+
+ @Override
+ public void updateTrigger(Trigger t) throws TriggerManagerException {
+ logger.info("Updating trigger " + t.toString() + " into db.");
+ Connection connection = getConnection();
+ try{
+ updateTrigger(connection, t, defaultEncodingType);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ throw new TriggerManagerException("Failed to update trigger " + t.toString() + " into db!");
+ }
+ finally {
+ DbUtils.closeQuietly(connection);
+ }
+ }
+
+ private void updateTrigger(Connection connection, Trigger t, EncodingType encType) throws TriggerManagerException {
+
+ String json = JSONUtils.toJSON(t.toJson());
+ byte[] data = null;
+ try {
+ byte[] stringData = json.getBytes("UTF-8");
+ data = stringData;
+
+ if (encType == EncodingType.GZIP) {
+ data = GZIPUtils.gzipBytes(stringData);
+ }
+ logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+ }
+ catch (IOException e) {
+ throw new TriggerManagerException("Error encoding the trigger " + t.toString());
+ }
+
+ QueryRunner runner = new QueryRunner();
+
+ try {
+ int updates = runner.update( connection,
+ UPDATE_TRIGGER,
+ DateTime.now().getMillis(),
+ encType.getNumVal(),
+ data,
+ t.getTriggerId());
+ if (updates == 0) {
+ throw new TriggerManagerException("No trigger has been updated.");
+ }
+ } catch (SQLException e) {
+ logger.error(UPDATE_TRIGGER + " failed.");
+ throw new TriggerManagerException("Update trigger " + t.toString() + " into db failed. ", e);
+ }
+ }
+
+
+ private static class LastInsertID implements ResultSetHandler<Long> {
+ private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";
+
+ @Override
+ public Long handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return -1l;
+ }
+
+ long id = rs.getLong(1);
+ return id;
+ }
+
+ }
+
+ public class TriggerResultHandler implements ResultSetHandler<List<Trigger>> {
+
+ @Override
+ public List<Trigger> handle(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return Collections.<Trigger>emptyList();
+ }
+
+ ArrayList<Trigger> triggers = new ArrayList<Trigger>();
+ do {
+ int triggerId = rs.getInt(1);
+ long modifyTime = rs.getInt(2);
+ int encodingType = rs.getInt(3);
+ byte[] data = rs.getBytes(4);
+
+ Object jsonObj = null;
+ if (data != null) {
+ EncodingType encType = EncodingType.fromInteger(encodingType);
+
+ try {
+ // Convoluted way to inflate strings. Should find common package or helper function.
+ if (encType == EncodingType.GZIP) {
+ // Decompress the sucker.
+ String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+ jsonObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ else {
+ String jsonString = new String(data, "UTF-8");
+ jsonObj = JSONUtils.parseJSONFromString(jsonString);
+ }
+ } catch (IOException e) {
+ throw new SQLException("Error reconstructing trigger data " );
+ }
+ }
+
+ Trigger t = null;
+ try {
+ t = Trigger.fromJson(jsonObj);
+ triggers.add(t);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ logger.error("Failed to load trigger " + triggerId);
+ }
+ } while (rs.next());
+
+ return triggers;
+ }
+
+ }
+
+ private Connection getConnection() throws TriggerManagerException {
+ Connection connection = null;
+ try {
+ connection = super.getDBConnection(false);
+ } catch (Exception e) {
+ DbUtils.closeQuietly(connection);
+ throw new TriggerManagerException("Error getting DB connection.", e);
+ }
+
+ return connection;
+ }
+
+
+}
src/java/azkaban/trigger/Trigger.java 171(+171 -0)
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
new file mode 100644
index 0000000..240411b
--- /dev/null
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -0,0 +1,171 @@
+package azkaban.trigger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class Trigger {
+
+ private static Logger logger = Logger.getLogger(Trigger.class);
+
+ private int triggerId = -1;
+ private long lastModifyTime = -1;
+ private long submitTime = -1;
+ private String submitUser;
+
+ private Condition triggerCondition;
+ private Condition expireCondition;
+ private List<TriggerAction> actions;
+
+ private static ActionTypeLoader actionTypeLoader;
+
+ private boolean resetOnTrigger = false;
+ private boolean resetOnExpire = false;
+
+ @SuppressWarnings("unused")
+ private Trigger() throws TriggerManagerException {
+ throw new TriggerManagerException("Triggers should always be specified");
+ }
+
+ public Trigger(
+ long lastModifyTime,
+ long submitTime,
+ String submitUser,
+ Condition triggerCondition,
+ Condition expireCondition,
+ List<TriggerAction> actions) {
+ this.lastModifyTime = lastModifyTime;
+ this.submitTime = submitTime;
+ this.submitUser = submitUser;
+ this.triggerCondition = triggerCondition;
+ this.expireCondition = expireCondition;
+ this.actions = actions;
+ }
+
+ public Trigger(
+ int triggerId,
+ long lastModifyTime,
+ long submitTime,
+ String submitUser,
+ Condition triggerCondition,
+ Condition expireCondition,
+ List<TriggerAction> actions) {
+ this.triggerId = triggerId;
+ this.lastModifyTime = lastModifyTime;
+ this.submitTime = submitTime;
+ this.submitUser = submitUser;
+ this.triggerCondition = triggerCondition;
+ this.expireCondition = expireCondition;
+ this.actions = actions;
+ }
+
+ public static synchronized void setActionTypeLoader(ActionTypeLoader loader) {
+ Trigger.actionTypeLoader = loader;
+ }
+
+ public boolean isResetOnTrigger() {
+ return resetOnTrigger;
+ }
+
+ public void setResetOnTrigger(boolean resetOnTrigger) {
+ this.resetOnTrigger = resetOnTrigger;
+ }
+
+ public boolean isResetOnExpire() {
+ return resetOnExpire;
+ }
+
+ public void setResetOnExpire(boolean resetOnExpire) {
+ this.resetOnExpire = resetOnExpire;
+ }
+
+ public long getLastModifyTime() {
+ return lastModifyTime;
+ }
+
+ public void setTriggerId(int id) {
+ this.triggerId = id;
+ }
+
+ public int getTriggerId() {
+ return triggerId;
+ }
+
+ public boolean triggerConditionMet(){
+ return triggerCondition.isMet();
+ }
+
+ public boolean expireConditionMet(){
+ return expireCondition.isMet();
+ }
+
+ public void resetTriggerConditions() {
+ triggerCondition.resetCheckers();
+ }
+
+ public List<TriggerAction> getTriggerActions () {
+ return actions;
+ }
+
+ public Map<String, Object> toJson() {
+ Map<String, Object> jsonObj = new HashMap<String, Object>();
+ jsonObj.put("triggerCondition", triggerCondition.toJson());
+ jsonObj.put("expireCondition", expireCondition.toJson());
+ List<Object> actionsJson = new ArrayList<Object>();
+ for(TriggerAction action : actions) {
+ Map<String, Object> oneActionJson = new HashMap<String, Object>();
+ oneActionJson.put("type", action.getType());
+ oneActionJson.put("actionJson", action.toJson());
+ actionsJson.add(oneActionJson);
+ }
+ jsonObj.put("actions", actionsJson);
+ jsonObj.put("resetOnTrigger", String.valueOf(resetOnTrigger));
+ jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
+ jsonObj.put("submitUser", submitUser);
+ jsonObj.put("submitTime", String.valueOf(submitTime));
+ jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime));
+ jsonObj.put("triggerId", String.valueOf(triggerId));
+
+ return jsonObj;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public static Trigger fromJson(Object obj) {
+
+ Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
+
+ Trigger trigger = null;
+ try{
+ Condition triggerCond = Condition.fromJson(jsonObj.get("triggerCondition"));
+ Condition expireCond = Condition.fromJson(jsonObj.get("expireCondition"));
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ List<Object> actionsJson = (List<Object>) jsonObj.get("actions");
+ for(Object actObj : actionsJson) {
+ Map<String, Object> oneActionJson = (HashMap<String, Object>) actObj;
+ String type = (String) oneActionJson.get("type");
+ TriggerAction act = actionTypeLoader.createActionFromJson(type, oneActionJson.get("actionJson"));
+ actions.add(act);
+ }
+ boolean resetOnTrigger = Boolean.valueOf((String) jsonObj.get("resetOnTrigger"));
+ boolean resetOnExpire = Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
+ String submitUser = (String) jsonObj.get("submitUser");
+ long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
+ long lastModifyTime = Long.valueOf((String) jsonObj.get("lastModifyTime"));
+ int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
+ trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, triggerCond, expireCond, actions);
+ trigger.setResetOnExpire(resetOnExpire);
+ trigger.setResetOnTrigger(resetOnTrigger);
+ }catch(Exception e) {
+ e.printStackTrace();
+ logger.error("Failed to decode the trigger.", e);
+ return null;
+ }
+
+ return trigger;
+ }
+
+}
src/java/azkaban/trigger/TriggerAction.java 13(+13 -0)
diff --git a/src/java/azkaban/trigger/TriggerAction.java b/src/java/azkaban/trigger/TriggerAction.java
new file mode 100644
index 0000000..581b3fa
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerAction.java
@@ -0,0 +1,13 @@
+package azkaban.trigger;
+
+public interface TriggerAction {
+
+ String getType();
+
+ TriggerAction fromJson(Object obj);
+
+ Object toJson();
+
+ void doAction();
+
+}
diff --git a/src/java/azkaban/trigger/TriggerException.java b/src/java/azkaban/trigger/TriggerException.java
new file mode 100644
index 0000000..e49fb2a
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerException.java
@@ -0,0 +1,19 @@
+package azkaban.trigger;
+
+
+public class TriggerException extends Exception{
+ private static final long serialVersionUID = 1L;
+
+ public TriggerException(String message) {
+ super(message);
+ }
+
+ public TriggerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TriggerException(Throwable e) {
+ super(e);
+ }
+}
+
src/java/azkaban/trigger/TriggerLoader.java 18(+18 -0)
diff --git a/src/java/azkaban/trigger/TriggerLoader.java b/src/java/azkaban/trigger/TriggerLoader.java
new file mode 100644
index 0000000..cf37634
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerLoader.java
@@ -0,0 +1,18 @@
+package azkaban.trigger;
+
+import java.util.List;
+import java.util.Map;
+
+
+
+public interface TriggerLoader {
+
+ public void addTrigger(Trigger t) throws TriggerManagerException;
+
+ public void removeTrigger(Trigger s) throws TriggerManagerException;
+
+ public void updateTrigger(Trigger t) throws TriggerManagerException;
+
+ public List<Trigger> loadTriggers() throws TriggerManagerException;
+
+}
src/java/azkaban/trigger/TriggerManager.java 167(+167 -0)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
new file mode 100644
index 0000000..6f613cc
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -0,0 +1,167 @@
+package azkaban.trigger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import azkaban.utils.Props;
+
+
+public class TriggerManager {
+ private static Logger logger = Logger.getLogger(TriggerManager.class);
+
+ private final long DEFAULT_TRIGGER_EXPIRE_TIME = 24*60*60*1000L;
+
+ private List<Trigger> triggers;
+
+ TriggerScannerThread scannerThread;
+
+ public TriggerManager(Props props, TriggerLoader triggerLoader, CheckerTypeLoader checkerLoader, ActionTypeLoader actionLoader) {
+
+ // load plugins
+ try{
+ checkerLoader.init(props);
+ actionLoader.init(props);
+ } catch(Exception e) {
+ e.printStackTrace();
+ logger.error(e.getMessage());
+ }
+
+ Condition.setCheckerLoader(checkerLoader);
+ Trigger.setActionTypeLoader(actionLoader);
+
+ try{
+ // expect loader to return valid triggers
+ triggers = triggerLoader.loadTriggers();
+ }catch(Exception e) {
+ e.printStackTrace();
+ logger.error(e.getMessage());
+ }
+
+ scannerThread = new TriggerScannerThread("TriggerScannerThread");
+
+ for(Trigger t : triggers) {
+ scannerThread.addTrigger(t);
+ }
+
+ scannerThread.start();
+ }
+
+ public synchronized void insertTrigger(Trigger t) {
+ triggers.add(t);
+ scannerThread.addTrigger(t);
+ }
+
+ public synchronized void removeTrigger(Trigger t) {
+ scannerThread.removeTrigger(t);
+ triggers.remove(t);
+ }
+
+ public List<Trigger> getTriggers() {
+ return new ArrayList<Trigger>(triggers);
+ }
+
+ //trigger scanner thread
+ public class TriggerScannerThread extends Thread {
+ private final BlockingQueue<Trigger> triggers;
+ private AtomicBoolean stillAlive = new AtomicBoolean(true);
+ private String scannerName;
+ private long lastCheckTime = -1;
+
+ // Five minute minimum intervals
+ private static final int TIMEOUT_MS = 300000;
+
+ public TriggerScannerThread(String scannerName){
+ triggers = new LinkedBlockingDeque<Trigger>();
+ this.setName(scannerName);
+ }
+
+ public void shutdown() {
+ logger.error("Shutting down trigger manager thread " + scannerName);
+ stillAlive.set(false);
+ this.interrupt();
+ }
+
+ public synchronized List<Trigger> getTriggers() {
+ return new ArrayList<Trigger>(triggers);
+ }
+
+ public synchronized void addTrigger(Trigger t) {
+ triggers.add(t);
+ }
+
+ public synchronized void removeTrigger(Trigger t) {
+ triggers.remove(t);
+ }
+
+ public void run() {
+ while(stillAlive.get()) {
+ synchronized (this) {
+ try{
+ lastCheckTime = System.currentTimeMillis();
+
+ try{
+ checkAllTriggers();
+ } catch(Exception e) {
+ e.printStackTrace();
+ logger.error(e.getMessage());
+ } catch(Throwable t) {
+ t.printStackTrace();
+ logger.error(t.getMessage());
+ }
+
+ long timeRemaining = TIMEOUT_MS - (System.currentTimeMillis() - lastCheckTime);
+ if(timeRemaining < 0) {
+ logger.error("Trigger manager thread " + scannerName + " is too busy!");
+ } else {
+ wait(timeRemaining);
+ }
+ } catch(InterruptedException e) {
+ logger.info("Interrupted. Probably to shut down.");
+ }
+
+ }
+ }
+ }
+
+ private void checkAllTriggers() {
+ for(Trigger t : triggers) {
+ if(t.triggerConditionMet()) {
+ onTriggerTrigger(t);
+ } else if (t.expireConditionMet()) {
+ onTriggerExpire(t);
+ }
+ }
+
+ }
+
+ private void onTriggerTrigger(Trigger t) {
+ List<TriggerAction> actions = t.getTriggerActions();
+ for(TriggerAction action : actions) {
+ action.doAction();
+ }
+ if(t.isResetOnTrigger()) {
+ t.resetTriggerConditions();
+ } else {
+ triggers.remove(t);
+ }
+ }
+
+ private void onTriggerExpire(Trigger t) {
+ if(t.isResetOnExpire()) {
+ t.resetTriggerConditions();
+ } else {
+ triggers.remove(t);
+ }
+ }
+ }
+
+ public TriggerAction createTriggerAction() {
+ return null;
+ }
+
+}
diff --git a/src/java/azkaban/trigger/TriggerManagerException.java b/src/java/azkaban/trigger/TriggerManagerException.java
new file mode 100644
index 0000000..5d30b39
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerManagerException.java
@@ -0,0 +1,31 @@
+package azkaban.trigger;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+public class TriggerManagerException extends Exception{
+ private static final long serialVersionUID = 1L;
+
+ public TriggerManagerException(String message) {
+ super(message);
+ }
+
+ public TriggerManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
+
src/sql/create.triggers.sql 7(+7 -0)
diff --git a/src/sql/create.triggers.sql b/src/sql/create.triggers.sql
new file mode 100644
index 0000000..f75b727
--- /dev/null
+++ b/src/sql/create.triggers.sql
@@ -0,0 +1,7 @@
+CREATE TABLE triggers (
+ trigger_id INT NOT NULL AUTO_INCREMENT,
+ modify_time BIGINT NOT NULL,
+ enc_type TINYINT,
+ data LONGBLOB NOT NULL,
+ PRIMARY KEY (trigger_id)
+);
unit/java/azkaban/test/trigger/ConditionTest.java 130(+130 -0)
diff --git a/unit/java/azkaban/test/trigger/ConditionTest.java b/unit/java/azkaban/test/trigger/ConditionTest.java
new file mode 100644
index 0000000..2cf8ca9
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/ConditionTest.java
@@ -0,0 +1,130 @@
+package azkaban.test.trigger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import org.apache.commons.jexl2.JexlEngine;
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+import org.junit.Test;
+
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.BasicTimeChecker;
+import azkaban.utils.Props;
+
+public class ConditionTest {
+
+ private JexlEngine jexl = new JexlEngine();
+
+ @Test
+ public void conditionTest(){
+
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ DateTime now = DateTime.now();
+ FakeTimeChecker fake1 = new FakeTimeChecker(now);
+ FakeTimeChecker fake2 = new FakeTimeChecker(now.plusMinutes(1));
+ checkers.put(fake1.getId(), fake1);
+ checkers.put(fake2.getId(), fake2);
+
+ String expr1 = "( " + fake1.getId()+ ".eval()" + " && " + fake2.getId()+ ".eval()" + " )" + " || " + "( " + "!" + fake1.getId()+".eval()" + " && " + fake2.getId()+".eval()" + " )";
+ String expr2 = "( " + fake1.getId()+ ".eval()" + " && " + fake2.getId()+ ".eval()" + " )" + " || " + "( " + fake1.getId()+".eval()" + " && " + fake2.getId()+".eval()" + " )";
+
+ Condition cond = new Condition(checkers, expr1);
+
+ System.out.println("Setting expression " + expr1);
+ assertTrue(cond.isMet());
+ cond.setExpression(expr2);
+ System.out.println("Setting expression " + expr2);
+ assertFalse(cond.isMet());
+
+ }
+
+ @Test
+ public void timeCheckerTest(){
+
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+
+ // get a new timechecker, start from now, repeat every minute. should evaluate to false now, and true a minute later.
+ DateTime now = DateTime.now();
+ ReadablePeriod period = BasicTimeChecker.parsePeriodString("10s");
+
+ BasicTimeChecker timeChecker = new BasicTimeChecker(now, true, true, period);
+ checkers.put(timeChecker.getId(), timeChecker);
+ String expr = timeChecker.getId() + ".eval()";
+
+ Condition cond = new Condition(checkers, expr);
+ System.out.println(expr);
+
+ assertFalse(cond.isMet());
+
+ //sleep for 1 min
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ assertTrue(cond.isMet());
+
+ cond.resetCheckers();
+
+ assertFalse(cond.isMet());
+
+ //sleep for 1 min
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ assertTrue(cond.isMet());
+
+ }
+
+ @Test
+ public void BasicTimeCheckerTest() {
+
+ CheckerTypeLoader checkerTypeLoader = new CheckerTypeLoader();
+ checkerTypeLoader.init(new Props());
+ Condition.setCheckerLoader(checkerTypeLoader);
+
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+
+ // get a new timechecker, start from now, repeat every minute. should evaluate to false now, and true a minute later.
+ DateTime now = DateTime.now();
+ String period = "6s";
+
+ //BasicTimeChecker timeChecker = new BasicTimeChecker(now, true, true, period);
+ ConditionChecker timeChecker = checkerTypeLoader.createChecker(BasicTimeChecker.type, now, true, true, period);
+ System.out.println("checker id is " + timeChecker.getId());
+
+ checkers.put(timeChecker.getId(), timeChecker);
+ String expr = timeChecker.getId() + ".eval()";
+
+ Condition cond = new Condition(checkers, expr);
+
+ Object json = cond.toJson();
+ Condition cond2 = Condition.fromJson(json);
+
+ Map<String, ConditionChecker> checkers1 = cond.getCheckers();
+ Map<String, ConditionChecker> checkers2 = cond2.getCheckers();
+
+ assertTrue(cond.getExpression().equals(cond2.getExpression()));
+ System.out.println("cond1: " + cond.getExpression());
+ System.out.println("cond2: " + cond2.getExpression());
+ assertTrue(checkers2.size() == 1);
+ ConditionChecker checker2 = checkers2.get(timeChecker.getId());
+ //assertTrue(checker2.getId().equals(timeChecker.getId()));
+ System.out.println("checker1: " + timeChecker.getId());
+ System.out.println("checker2: " + checker2.getId());
+ assertTrue(timeChecker.getId().equals(checker2.getId()));
+ }
+
+}
diff --git a/unit/java/azkaban/test/trigger/DummyTriggerAction.java b/unit/java/azkaban/test/trigger/DummyTriggerAction.java
new file mode 100644
index 0000000..5fb0b4c
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/DummyTriggerAction.java
@@ -0,0 +1,38 @@
+package azkaban.test.trigger;
+
+import azkaban.trigger.TriggerAction;
+
+public class DummyTriggerAction implements TriggerAction{
+
+ public static final String type = "DummyAction";
+
+ private String message;
+
+ public DummyTriggerAction(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public TriggerAction fromJson(Object obj) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object toJson() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void doAction() {
+ System.out.println(getType() + " invoked.");
+ System.out.println(message);
+ }
+
+}
diff --git a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
new file mode 100644
index 0000000..00ee02e
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
@@ -0,0 +1,41 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.*;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+import azkaban.executor.ExecutionOptions;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.ExecuteFlowAction;
+import azkaban.trigger.TriggerAction;
+import azkaban.utils.Props;
+
+
+public class ExecuteFlowActionTest {
+
+ @Test
+ public void ExecuteFlowActionTest() throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ ActionTypeLoader loader = new ActionTypeLoader();
+ loader.init(new Props());
+
+ ExecutionOptions options = new ExecutionOptions();
+ List<String> disabledJobs = new ArrayList<String>();
+ options.setDisabledJobs(disabledJobs);
+
+ ExecuteFlowAction executeFlowAction = new ExecuteFlowAction(1, "testflow", "azkaban", options);
+
+ Object obj = executeFlowAction.toJson();
+
+ ExecuteFlowAction action = (ExecuteFlowAction) loader.createActionFromJson(ExecuteFlowAction.type, obj);
+ assertTrue(executeFlowAction.getProjectId() == action.getProjectId());
+ assertTrue(executeFlowAction.getFlowName().equals(action.getFlowName()));
+ assertTrue(executeFlowAction.getSubmitUser().equals(action.getSubmitUser()));
+ }
+
+
+
+}
diff --git a/unit/java/azkaban/test/trigger/FakeTimeChecker.java b/unit/java/azkaban/test/trigger/FakeTimeChecker.java
new file mode 100644
index 0000000..b4396a8
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/FakeTimeChecker.java
@@ -0,0 +1,74 @@
+package azkaban.test.trigger;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import azkaban.trigger.ConditionChecker;
+
+
+public class FakeTimeChecker implements ConditionChecker{
+
+ private DateTime timeToCheck;
+ private String message;
+
+ public static final String type = "FakeTimeChecker";
+
+ public FakeTimeChecker(DateTime timeToCheck){
+ this.timeToCheck = timeToCheck;
+ }
+
+ @Override
+ public Boolean eval() {
+ return timeToCheck.isAfterNow();
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ /*
+ * TimeChecker format:
+ * type_first-time-in-millis_next-time-in-millis_timezone_is-recurring_skip-past-checks_period
+ */
+ @Override
+ public String getId() {
+ return getType() + "_" +
+ timeToCheck.getMillis() + "_" +
+ timeToCheck.getZone().getShortName(timeToCheck.getMillis() );
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public ConditionChecker fromJson(Object obj) {
+ String str = (String) obj;
+ String[] parts = str.split("_");
+
+ if(!parts[0].equals(getType())) {
+ throw new RuntimeException("Cannot create checker of " + getType() + " from " + parts[0]);
+ }
+
+ long timeToCheckMillis = Long.parseLong(parts[1]);
+ DateTimeZone timezone = DateTimeZone.forID(parts[2]);
+
+ return new FakeTimeChecker(new DateTime(timeToCheckMillis, timezone));
+ }
+
+ @Override
+ public Object getNum() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object toJson() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
new file mode 100644
index 0000000..56a9117
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -0,0 +1,120 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.CheckerTypeLoader;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.utils.Props;
+
+public class TriggerManagerTest {
+
+ @Before
+ public void setup() {
+
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+ @Test
+ public void TriggerManagerSimpleTest() {
+ Props props = new Props();
+ TriggerManager triggerManager = new TriggerManager(props, new MockTriggerLoader(), new MockCheckerLoader(), new MockActionLoader());
+ List<Trigger> triggers = triggerManager.getTriggers();
+ assertTrue(triggers.size() == 1);
+
+ Trigger t2 = createFakeTrigger("addnewtriggger");
+ triggerManager.insertTrigger(t2);
+
+ triggers = triggerManager.getTriggers();
+ assertTrue(triggers.size() == 2);
+
+ triggerManager.removeTrigger(t2);
+ triggers = triggerManager.getTriggers();
+ assertTrue(triggers.size() == 1);
+ }
+
+ public class MockTriggerLoader implements TriggerLoader {
+
+ private List<Trigger> triggers;
+
+ @Override
+ public void addTrigger(Trigger t) throws TriggerManagerException {
+ triggers.add(t);
+ }
+
+ @Override
+ public void removeTrigger(Trigger s) throws TriggerManagerException {
+ triggers.remove(s);
+
+ }
+
+ @Override
+ public void updateTrigger(Trigger t) throws TriggerManagerException {
+
+ }
+
+ @Override
+ public List<Trigger> loadTriggers()
+ throws TriggerManagerException {
+ Trigger t = createFakeTrigger("test");
+ triggers = new ArrayList<Trigger>();
+ triggers.add(t);
+ return triggers;
+ }
+
+ }
+
+ private Trigger createFakeTrigger(String message) {
+
+ Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ TriggerAction act = new DummyTriggerAction(message);
+ actions.add(act);
+
+ String expr = "true";
+
+ Condition triggerCond = new Condition(checkers, expr);
+ Condition expireCond = new Condition(checkers, expr);
+
+ Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", triggerCond, expireCond, actions);
+
+ return fakeTrigger;
+ }
+
+ public class MockCheckerLoader extends CheckerTypeLoader{
+
+ @Override
+ public void init(Props props) {
+ checkerToClass.put(FakeTimeChecker.type, FakeTimeChecker.class);
+ }
+ }
+
+ public class MockActionLoader extends ActionTypeLoader {
+ @Override
+ public void init(Props props) {
+ actionToClass.put(DummyTriggerAction.type, DummyTriggerAction.class);
+ }
+ }
+
+}