azkaban-uncached
Changes
src/java/azkaban/project/ProjectManager.java 31(+30 -1)
src/java/azkaban/scheduler/ScheduleManager.java 54(+41 -13)
src/java/azkaban/trigger/Trigger.java 47(+33 -14)
src/java/azkaban/trigger/TriggerManager.java 82(+76 -6)
src/java/azkaban/webapp/AzkabanWebServer.java 196(+188 -8)
src/sql/create.triggers.sql 1(+1 -0)
src/web/js/azkaban.triggers.view.js 342(+342 -0)
Details
diff --git a/src/java/azkaban/actions/ExecuteFlowAction.java b/src/java/azkaban/actions/ExecuteFlowAction.java
index e7c8dfb..bf8959d 100644
--- a/src/java/azkaban/actions/ExecuteFlowAction.java
+++ b/src/java/azkaban/actions/ExecuteFlowAction.java
@@ -30,6 +30,7 @@ public class ExecuteFlowAction implements TriggerAction {
public ExecuteFlowAction(int projectId, String projectName, String flowName, String submitUser, ExecutionOptions executionOptions) {
this.projectId = projectId;
+ this.projectName = projectName;
this.flowName = flowName;
this.submitUser = submitUser;
this.executionOptions = executionOptions;
@@ -194,5 +195,11 @@ public class ExecuteFlowAction implements TriggerAction {
}
+ @Override
+ public String getDescription() {
+ return "Execute flow " + getFlowName() +
+ " from project " + getProjectName();
+ }
+
}
src/java/azkaban/project/ProjectManager.java 31(+30 -1)
diff --git a/src/java/azkaban/project/ProjectManager.java b/src/java/azkaban/project/ProjectManager.java
index 0972276..818fb2b 100644
--- a/src/java/azkaban/project/ProjectManager.java
+++ b/src/java/azkaban/project/ProjectManager.java
@@ -16,12 +16,15 @@ import org.apache.log4j.Logger;
import azkaban.flow.Flow;
import azkaban.project.ProjectLogEvent.EventType;
+import azkaban.trigger.TriggerManager;
import azkaban.user.Permission;
import azkaban.user.User;
import azkaban.user.Permission.Type;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
import azkaban.utils.Utils;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.servlet.TriggerPlugin;
public class ProjectManager {
private static final Logger logger = Logger.getLogger(ProjectManager.class);
@@ -34,15 +37,22 @@ public class ProjectManager {
private final int projectVersionRetention;
private final boolean creatorDefaultPermissions;
- public ProjectManager(ProjectLoader loader, Props props) {
+ private TriggerManager triggerManager;
+ private boolean loadTriggerFromFile = false;
+
+ public ProjectManager(ProjectLoader loader, Props props, TriggerManager triggerManager) {
this.projectLoader = loader;
this.props = props;
this.tempDir = new File(this.props.getString("project.temp.dir", "temp"));
this.projectVersionRetention = (props.getInt("project.version.retention", 3));
logger.info("Project version retention is set to " + projectVersionRetention);
+ this.triggerManager = triggerManager;
+
this.creatorDefaultPermissions = props.getBoolean("creator.default.proxy", true);
+ this.loadTriggerFromFile = props.getBoolean("enable.load.trigger.from.file", false);
+
if (!tempDir.exists()) {
tempDir.mkdirs();
}
@@ -50,6 +60,10 @@ public class ProjectManager {
loadAllProjects();
}
+ public void setLoadTriggerFromFile(boolean enable) {
+ this.loadTriggerFromFile = enable;
+ }
+
private void loadAllProjects() {
List<Project> projects;
try {
@@ -339,6 +353,21 @@ public class ProjectManager {
projectLoader.uploadProjectProperties(project, propProps);
}
+ if(loadTriggerFromFile) {
+ logger.info("Loading triggers.");
+ Props triggerProps = new Props();
+ triggerProps.put("projectId", project.getId());
+ triggerProps.put("projectName", project.getName());
+ triggerProps.put("submitUser", uploader.getUserId());
+ try {
+ triggerManager.loadTriggerFromDir(file, triggerProps);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ logger.error("Failed to load triggers.", e);
+ }
+ }
+
logger.info("Uploaded project files. Cleaning up temp files.");
projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(), "Uploaded project files zip " + archive.getName());
try {
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 4a85881..931b4e7 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -251,6 +251,10 @@ public class Schedule{
return false;
}
+ public void setNextExecTime(long nextExecTime) {
+ this.nextExecTime = nextExecTime;
+ }
+
private DateTime getNextRuntime(long scheduleTime, DateTimeZone timezone, ReadablePeriod period) {
DateTime now = new DateTime();
DateTime date = new DateTime(scheduleTime).withZone(timezone);
src/java/azkaban/scheduler/ScheduleManager.java 54(+41 -13)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 0d5fc2f..06cdb2c 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -16,6 +16,7 @@
package azkaban.scheduler;
+import java.io.File;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Comparator;
@@ -46,7 +47,9 @@ import azkaban.sla.SLA.SlaRule;
import azkaban.sla.SLA.SlaSetting;
import azkaban.sla.SLAManager;
import azkaban.sla.SlaOptions;
+import azkaban.trigger.TriggerServicer;
import azkaban.utils.Pair;
+import azkaban.utils.Props;
/**
* The ScheduleManager stores and executes the schedule. It uses a single thread
@@ -54,9 +57,10 @@ import azkaban.utils.Pair;
* the flow from the schedule when it is run, which can potentially allow the
* flow to and overlap each other.
*/
-public class ScheduleManager {
+public class ScheduleManager implements TriggerServicer {
private static Logger logger = Logger.getLogger(ScheduleManager.class);
+ public static final String triggerSource = "SimpleTimeTrigger";
private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
private ScheduleLoader loader;
@@ -79,7 +83,7 @@ public class ScheduleManager {
*
* @param loader
*/
- public ScheduleManager(ExecutorManager executorManager,
+ public ScheduleManager (ExecutorManager executorManager,
ProjectManager projectManager,
SLAManager slaManager,
ScheduleLoader loader,
@@ -91,8 +95,17 @@ public class ScheduleManager {
this.loader = loader;
this.useExternalRunner = useExternalRunner;
+ if(!useExternalRunner) {
+ this.runner = new ScheduleRunner();
+ load();
+ } else {
+ this.runner = null;
+ }
-
+ }
+
+ @Override
+ public void load() {
List<Schedule> scheduleList = null;
try {
scheduleList = loader.loadSchedules();
@@ -107,10 +120,7 @@ public class ScheduleManager {
}
if(!useExternalRunner) {
- this.runner = new ScheduleRunner();
this.runner.start();
- } else {
- this.runner = null;
}
}
@@ -128,8 +138,21 @@ public class ScheduleManager {
* Retrieves a copy of the list of schedules.
*
* @return
+ * @throws ScheduleManagerException
*/
public synchronized List<Schedule> getSchedules() {
+ if(useExternalRunner) {
+ for(Schedule s : scheduleIDMap.values()) {
+ try {
+ loader.updateNextExecTime(s);
+ } catch (ScheduleManagerException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ logger.error("Failed to update schedule from external runner for schedule " + s.getScheduleId());
+ }
+ }
+ }
+
//return runner.getRunnerSchedules();
return new ArrayList<Schedule>(scheduleIDMap.values());
}
@@ -311,14 +334,18 @@ public class ScheduleManager {
logger.error("The provided schedule is non-recurring and the scheduled time already passed. " + s.getScheduleName());
}
}
-
-// /**
-// * Save the schedule
-// */
-// private void saveSchedule() {
-// loader.saveSchedule(getSchedule());
-// }
+ @Override
+ public void createTriggerFromProps(Props props) throws ScheduleManagerException {
+ throw new ScheduleManagerException("create " + getTriggerSource() + " from json not supported yet" );
+
+ }
+
+ @Override
+ public String getTriggerSource() {
+ return triggerSource;
+ }
+
/**
* Thread that simply invokes the running of flows when the schedule is
* ready.
@@ -590,4 +617,5 @@ public class ScheduleManager {
return runner.isAlive();
}
}
+
}
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index a36f3f1..7a7200a 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -24,10 +24,13 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
private TriggerManager triggerManager;
- private static final String triggerSource = "TriggerBasedScheduler";
+ private String triggerSource;
- public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManager executorManager, ProjectManager projectManager) {
+ private Map<Integer, Trigger> triggersLocalCopy;
+
+ public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManager executorManager, ProjectManager projectManager, String triggerSource) {
this.triggerManager = triggerManager;
+ this.triggerSource = triggerSource;
// need to init the action types and condition checker types
ExecuteFlowAction.setExecutorManager(executorManager);
ExecuteFlowAction.setProjectManager(projectManager);
@@ -38,7 +41,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
Condition triggerCondition = createTimeTriggerCondition(s);
Condition expireCondition = createTimeExpireCondition(s);
List<TriggerAction> actions = createActions(s);
- Trigger t = new Trigger(s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
+ Trigger t = new Trigger(new DateTime(s.getLastModifyTime()), new DateTime(s.getSubmitTime()), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
if(s.isRecurring()) {
t.setResetOnTrigger(true);
}
@@ -76,6 +79,8 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
Trigger t = createScheduleTrigger(s);
try {
triggerManager.insertTrigger(t);
+ s.setScheduleId(t.getTriggerId());
+ triggersLocalCopy.put(t.getTriggerId(), t);
} catch (TriggerManagerException e) {
// TODO Auto-generated catch block
throw new ScheduleManagerException("Failed to insert new schedule!", e);
@@ -87,6 +92,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
Trigger t = createScheduleTrigger(s);
try {
triggerManager.updateTrigger(t);
+ triggersLocalCopy.put(t.getTriggerId(), t);
} catch (TriggerManagerException e) {
// TODO Auto-generated catch block
throw new ScheduleManagerException("Failed to update schedule!", e);
@@ -99,10 +105,13 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
public List<Schedule> loadSchedules() throws ScheduleManagerException {
List<Trigger> triggers = triggerManager.getTriggers();
List<Schedule> schedules = new ArrayList<Schedule>();
+ triggersLocalCopy = new HashMap<Integer, Trigger>();
for(Trigger t : triggers) {
if(t.getSource().equals(triggerSource)) {
+ triggersLocalCopy.put(t.getTriggerId(), t);
Schedule s = triggerToSchedule(t);
schedules.add(s);
+ System.out.println("loaded schedule for " + s.getProjectId() + s.getProjectName());
}
}
return schedules;
@@ -137,9 +146,9 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
ck.getFirstCheckTime().getMillis(),
ck.getFirstCheckTime().getZone(),
ck.getPeriod(),
- t.getLastModifyTime(),
+ t.getLastModifyTime().getMillis(),
ck.getNextCheckTime().getMillis(),
- t.getSubmitTime(),
+ t.getSubmitTime().getMillis(),
t.getSubmitUser());
return s;
} else {
@@ -152,6 +161,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
public void removeSchedule(Schedule s) throws ScheduleManagerException {
try {
triggerManager.removeTrigger(s.getScheduleId());
+ triggersLocalCopy.remove(s.getScheduleId());
} catch (TriggerManagerException e) {
// TODO Auto-generated catch block
throw new ScheduleManagerException(e.getMessage());
@@ -162,8 +172,9 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
@Override
public void updateNextExecTime(Schedule s)
throws ScheduleManagerException {
- logger.error("no longer doing it here.");
- throw new ScheduleManagerException("No longer updating execution time in scheduler");
+ Trigger t = triggersLocalCopy.get(s.getScheduleId());
+ BasicTimeChecker ck = (BasicTimeChecker) t.getTriggerCondition().getCheckers().values().toArray()[0];
+ s.setNextExecTime(ck.getNextCheckTime().getMillis());
}
}
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
index 87a3cf6..f3032d7 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduler.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -67,7 +67,7 @@ public class TriggerBasedScheduler {
TriggerManager triggerManager,
ScheduleLoader loader)
{
- this.loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
+ this.loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager, ScheduleManager.triggerSource);
List<Schedule> scheduleList = null;
try {
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 62e50f8..1f9a58e 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -10,6 +10,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.log4j.Logger;
@@ -157,4 +158,8 @@ public class ActionTypeLoader {
return action;
}
+
+ public Set<String> getSupportedActions() {
+ return actionToClass.keySet();
+ }
}
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index 67a207d..c64a66a 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -10,6 +10,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.log4j.Logger;
@@ -161,4 +162,8 @@ public class CheckerTypeLoader {
return checker;
}
+ public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
+ return checkerToClass;
+ }
+
}
diff --git a/src/java/azkaban/trigger/JdbcTriggerLoader.java b/src/java/azkaban/trigger/JdbcTriggerLoader.java
index e76cc6a..b5bde55 100644
--- a/src/java/azkaban/trigger/JdbcTriggerLoader.java
+++ b/src/java/azkaban/trigger/JdbcTriggerLoader.java
@@ -44,10 +44,10 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
private static final String triggerTblName = "triggers";
private static String GET_ALL_TRIGGERS =
- "SELECT trigger_id, modify_time, enc_type, data FROM " + triggerTblName;
+ "SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM " + triggerTblName;
private static String GET_TRIGGER =
- "SELECT trigger_id, modify_time, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
+ "SELECT trigger_source, modify_time, enc_type, data FROM " + triggerTblName + " WHERE trigger_id=?";
private static String ADD_TRIGGER =
"INSERT INTO " + triggerTblName + " ( modify_time) values (?)";
@@ -56,7 +56,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
"DELETE FROM " + triggerTblName + " WHERE trigger_id=?";
private static String UPDATE_TRIGGER =
- "UPDATE " + triggerTblName + " SET modify_time=?, enc_type=?, data=? WHERE trigger_id=?";
+ "UPDATE " + triggerTblName + " SET trigger_source=?, modify_time=?, enc_type=?, data=? WHERE trigger_id=?";
public EncodingType getDefaultEncodingType() {
return defaultEncodingType;
@@ -188,6 +188,7 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
try {
int updates = runner.update( connection,
UPDATE_TRIGGER,
+ t.getSource(),
DateTime.now().getMillis(),
encType.getNumVal(),
data,
@@ -228,9 +229,10 @@ public class JdbcTriggerLoader extends AbstractJdbcLoader implements TriggerLoad
ArrayList<Trigger> triggers = new ArrayList<Trigger>();
do {
int triggerId = rs.getInt(1);
- long modifyTime = rs.getLong(2);
- int encodingType = rs.getInt(3);
- byte[] data = rs.getBytes(4);
+ String triggerSource = rs.getString(2);
+ long modifyTime = rs.getLong(3);
+ int encodingType = rs.getInt(4);
+ byte[] data = rs.getBytes(5);
Object jsonObj = null;
if (data != null) {
src/java/azkaban/trigger/Trigger.java 47(+33 -14)
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index d819800..56b7d80 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -6,14 +6,15 @@ import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
public class Trigger {
private static Logger logger = Logger.getLogger(Trigger.class);
private int triggerId = -1;
- private long lastModifyTime = -1;
- private long submitTime = -1;
+ private DateTime lastModifyTime;
+ private DateTime submitTime;
private String submitUser;
private String source;
@@ -24,15 +25,15 @@ public class Trigger {
private static ActionTypeLoader actionTypeLoader;
- private boolean resetOnTrigger = false;
- private boolean resetOnExpire = false;
+ private boolean resetOnTrigger = true;
+ private boolean resetOnExpire = true;
@SuppressWarnings("unused")
private Trigger() throws TriggerManagerException {
throw new TriggerManagerException("Triggers should always be specified");
}
- public long getSubmitTime() {
+ public DateTime getSubmitTime() {
return submitTime;
}
@@ -53,8 +54,8 @@ public class Trigger {
}
public Trigger(
- long lastModifyTime,
- long submitTime,
+ DateTime lastModifyTime,
+ DateTime submitTime,
String submitUser,
String source,
Condition triggerCondition,
@@ -71,8 +72,8 @@ public class Trigger {
public Trigger(
int triggerId,
- long lastModifyTime,
- long submitTime,
+ DateTime lastModifyTime,
+ DateTime submitTime,
String submitUser,
String source,
Condition triggerCondition,
@@ -112,7 +113,7 @@ public class Trigger {
this.resetOnExpire = resetOnExpire;
}
- public long getLastModifyTime() {
+ public DateTime getLastModifyTime() {
return lastModifyTime;
}
@@ -136,6 +137,10 @@ public class Trigger {
triggerCondition.resetCheckers();
}
+ public void resetExpireCondition() {
+ expireCondition.resetCheckers();
+ }
+
public List<TriggerAction> getTriggerActions () {
return actions;
}
@@ -156,8 +161,8 @@ public class Trigger {
jsonObj.put("resetOnExpire", String.valueOf(resetOnExpire));
jsonObj.put("submitUser", submitUser);
jsonObj.put("source", source);
- jsonObj.put("submitTime", String.valueOf(submitTime));
- jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime));
+ jsonObj.put("submitTime", String.valueOf(submitTime.getMillis()));
+ jsonObj.put("lastModifyTime", String.valueOf(lastModifyTime.getMillis()));
jsonObj.put("triggerId", String.valueOf(triggerId));
return jsonObj;
@@ -193,8 +198,10 @@ public class Trigger {
boolean resetOnExpire = Boolean.valueOf((String) jsonObj.get("resetOnExpire"));
String submitUser = (String) jsonObj.get("submitUser");
String source = (String) jsonObj.get("source");
- long submitTime = Long.valueOf((String) jsonObj.get("submitTime"));
- long lastModifyTime = Long.valueOf((String) jsonObj.get("lastModifyTime"));
+ long submitTimeMillis = Long.valueOf((String) jsonObj.get("submitTime"));
+ long lastModifyTimeMillis = Long.valueOf((String) jsonObj.get("lastModifyTime"));
+ DateTime submitTime = new DateTime(submitTimeMillis);
+ DateTime lastModifyTime = new DateTime(lastModifyTimeMillis);
int triggerId = Integer.valueOf((String) jsonObj.get("triggerId"));
trigger = new Trigger(triggerId, lastModifyTime, submitTime, submitUser, source, triggerCond, expireCond, actions);
trigger.setResetOnExpire(resetOnExpire);
@@ -207,5 +214,17 @@ public class Trigger {
return trigger;
}
+
+ public String getDescription() {
+ StringBuffer actionsString = new StringBuffer();
+ for(TriggerAction act : actions) {
+ actionsString.append(", ");
+ actionsString.append(act.getDescription());
+ }
+ return "Trigger from " + getSource() +
+ " with trigger condition of " + triggerCondition.getExpression() +
+ " and expire condition of " + expireCondition.getExpression() +
+ actionsString;
+ }
}
diff --git a/src/java/azkaban/trigger/TriggerAction.java b/src/java/azkaban/trigger/TriggerAction.java
index c57d585..f6adf5d 100644
--- a/src/java/azkaban/trigger/TriggerAction.java
+++ b/src/java/azkaban/trigger/TriggerAction.java
@@ -9,5 +9,7 @@ public interface TriggerAction {
Object toJson();
void doAction() throws Exception;
+
+ String getDescription();
}
src/java/azkaban/trigger/TriggerManager.java 82(+76 -6)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index f5ad251..09bbae1 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -1,5 +1,7 @@
package azkaban.trigger;
+import java.io.File;
+import java.io.FileFilter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -10,27 +12,31 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
-
public class TriggerManager {
private static Logger logger = Logger.getLogger(TriggerManager.class);
- private Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
+ private static final String TRIGGER_SUFFIX = ".trigger";
+
+ private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
private CheckerTypeLoader checkerLoader;
private ActionTypeLoader actionLoader;
- private TriggerLoader triggerLoader;
+ private static TriggerLoader triggerLoader;
+
+ private static TriggerScannerThread scannerThread;
- TriggerScannerThread scannerThread;
+ private Map<String, TriggerServicer> triggerServicers = new HashMap<String, TriggerServicer>();
public TriggerManager(Props props, TriggerLoader triggerLoader) {
+ TriggerManager.triggerLoader = triggerLoader;
checkerLoader = new CheckerTypeLoader();
actionLoader = new ActionTypeLoader();
-
// load plugins
try{
checkerLoader.init(props);
@@ -43,11 +49,55 @@ public class TriggerManager {
Condition.setCheckerLoader(checkerLoader);
Trigger.setActionTypeLoader(actionLoader);
+ checkerLoader = new CheckerTypeLoader();
+ actionLoader = new ActionTypeLoader();
+
long scannerInterval = props.getLong("trigger.scan.interval", TriggerScannerThread.DEFAULT_SCAN_INTERVAL_MS);
scannerThread = new TriggerScannerThread(scannerInterval);
scannerThread.setName("TriggerScannerThread");
- this.triggerLoader = triggerLoader;
+ }
+
+ private static class SuffixFilter implements FileFilter {
+ private String suffix;
+
+ public SuffixFilter(String suffix) {
+ this.suffix = suffix;
+ }
+
+ @Override
+ public boolean accept(File pathname) {
+ String name = pathname.getName();
+
+ return pathname.isFile() && !pathname.isHidden() && name.length() > suffix.length() && name.endsWith(suffix);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void loadTriggerFromDir(File baseDir, Props props) throws Exception {
+ File[] triggerFiles = baseDir.listFiles(new SuffixFilter(TRIGGER_SUFFIX));
+
+ for(File triggerFile : triggerFiles) {
+ Props triggerProps = new Props(props, triggerFile);
+ String triggerType = triggerProps.getString("trigger.type");
+ TriggerServicer servicer = triggerServicers.get(triggerType);
+ if(servicer != null) {
+ servicer.createTriggerFromProps(triggerProps);
+ } else {
+ throw new Exception("Trigger " + triggerType + " is not supported.");
+ }
+ }
+ }
+
+ public void addTriggerServicer(String triggerSource, TriggerServicer triggerServicer) throws TriggerManagerException {
+ if(triggerServicers.containsKey(triggerSource)) {
+ throw new TriggerManagerException("Trigger Servicer " + triggerSource + " already exists!" );
+ }
+ this.triggerServicers.put(triggerSource, triggerServicer);
+ }
+
+ public void start() {
+
try{
// expect loader to return valid triggers
List<Trigger> triggers = triggerLoader.loadTriggers();
@@ -60,6 +110,10 @@ public class TriggerManager {
logger.error(e.getMessage());
}
+ for(TriggerServicer servicer : triggerServicers.values()) {
+ servicer.load();
+ }
+
scannerThread.start();
}
@@ -82,6 +136,7 @@ public class TriggerManager {
removeTrigger(triggerIdMap.get(id));
}
+ //TODO: update corresponding servicers
public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
if(!triggerIdMap.containsKey(t.getTriggerId())) {
throw new TriggerManagerException("The trigger to update doesn't exist!");
@@ -95,6 +150,7 @@ public class TriggerManager {
triggerLoader.updateTrigger(t);
}
+ //TODO: update corresponding servicers
public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
triggerLoader.removeTrigger(t);
scannerThread.deleteTrigger(t);
@@ -104,7 +160,13 @@ public class TriggerManager {
public List<Trigger> getTriggers() {
return new ArrayList<Trigger>(triggerIdMap.values());
}
+
+ public Map<String, Class<? extends ConditionChecker>> getSupportedCheckers() {
+ return checkerLoader.getSupportedCheckers();
+ }
+
+
//trigger scanner thread
public class TriggerScannerThread extends Thread {
@@ -198,6 +260,8 @@ public class TriggerManager {
}
if(t.isResetOnTrigger()) {
t.resetTriggerConditions();
+ t.resetExpireCondition();
+ updateTrigger(t);
} else {
removeTrigger(t);
}
@@ -206,10 +270,16 @@ public class TriggerManager {
private void onTriggerExpire(Trigger t) throws TriggerManagerException {
if(t.isResetOnExpire()) {
t.resetTriggerConditions();
+ t.resetExpireCondition();
+ updateTrigger(t);
} else {
removeTrigger(t);
}
}
}
+ public synchronized Trigger getTrigger(int triggerId) {
+ return triggerIdMap.get(triggerId);
+ }
+
}
diff --git a/src/java/azkaban/trigger/TriggerServicer.java b/src/java/azkaban/trigger/TriggerServicer.java
new file mode 100644
index 0000000..c2ee7b1
--- /dev/null
+++ b/src/java/azkaban/trigger/TriggerServicer.java
@@ -0,0 +1,14 @@
+package azkaban.trigger;
+
+import java.io.File;
+
+import azkaban.utils.Props;
+
+public interface TriggerServicer {
+ public void createTriggerFromProps(Props props) throws Exception;
+
+ public String getTriggerSource();
+
+ void load();
+
+}
src/java/azkaban/webapp/AzkabanWebServer.java 196(+188 -8)
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 2cafed0..61b7dcd 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -27,7 +27,9 @@ import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.TimeZone;
import javax.management.MBeanInfo;
@@ -63,13 +65,13 @@ import azkaban.scheduler.JdbcScheduleLoader;
import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.scheduler.TriggerBasedScheduleLoader;
-import azkaban.scheduler.TriggerBasedScheduler;
import azkaban.sla.JdbcSLALoader;
import azkaban.sla.SLAManager;
import azkaban.sla.SLAManagerException;
import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerServicer;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
import azkaban.utils.FileIOUtils;
@@ -85,6 +87,8 @@ import azkaban.webapp.servlet.ScheduleServlet;
import azkaban.webapp.servlet.HistoryServlet;
import azkaban.webapp.servlet.IndexServlet;
import azkaban.webapp.servlet.ProjectManagerServlet;
+import azkaban.webapp.servlet.TriggerManagerServlet;
+import azkaban.webapp.servlet.TriggerPlugin;
import azkaban.webapp.servlet.ViewerPlugin;
import azkaban.webapp.session.SessionCache;
@@ -146,6 +150,7 @@ public class AzkabanWebServer extends AzkabanServer {
private SessionCache sessionCache;
private File tempDir;
private List<ViewerPlugin> viewerPlugins;
+ private Map<String, TriggerPlugin> triggerPlugins;
private MBeanServer mbeanServer;
private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
@@ -166,12 +171,14 @@ public class AzkabanWebServer extends AzkabanServer {
this.server = server;
velocityEngine = configureVelocityEngine(props.getBoolean(VELOCITY_DEV_MODE_PARAM, false));
sessionCache = new SessionCache(props);
- userManager = loadUserManager(props);
- projectManager = loadProjectManager(props);
+ userManager = loadUserManager(props);
executorManager = loadExecutorManager(props);
slaManager = loadSLAManager(props);
triggerManager = loadTriggerManager(props);
+
+ projectManager = loadProjectManager(props, triggerManager);
+
// scheduler = loadScheduler(executorManager, projectManager, triggerManager);
scheduleManager = loadScheduleManager(projectManager, executorManager, slaManager, triggerManager, props);
@@ -196,6 +203,10 @@ public class AzkabanWebServer extends AzkabanServer {
this.viewerPlugins = viewerPlugins;
}
+ private void setTriggerPlugins(Map<String, TriggerPlugin> triggerPlugins) {
+ this.triggerPlugins = triggerPlugins;
+ }
+
private UserManager loadUserManager(Props props) {
Class<?> userManagerClass = props.getClass(USER_MANAGER_CLASS_PARAM, null);
logger.info("Loading user manager class " + userManagerClass.getName());
@@ -219,11 +230,11 @@ public class AzkabanWebServer extends AzkabanServer {
return manager;
}
- private ProjectManager loadProjectManager(Props props) {
+ private ProjectManager loadProjectManager(Props props, TriggerManager triggerManager) {
logger.info("Loading JDBC for project management");
JdbcProjectLoader loader = new JdbcProjectLoader(props);
- ProjectManager manager = new ProjectManager(loader, props);
+ ProjectManager manager = new ProjectManager(loader, props, triggerManager);
return manager;
}
@@ -241,7 +252,8 @@ public class AzkabanWebServer extends AzkabanServer {
ScheduleLoader loader = new JdbcScheduleLoader(props);
schedManager = new ScheduleManager(executorManager, projectManager, slaManager, loader, false);
} else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
- ScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
+ logger.info("Loading trigger based scheduler");
+ ScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager, ScheduleManager.triggerSource);
schedManager = new ScheduleManager(executorManager, projectManager, slaManager, loader, true);
}
@@ -259,8 +271,8 @@ public class AzkabanWebServer extends AzkabanServer {
}
private TriggerManager loadTriggerManager(Props props) {
- TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
- return new TriggerManager(props, triggerLoader);
+ TriggerLoader loader = new JdbcTriggerLoader(props);
+ return new TriggerManager(props, loader);
}
/**
@@ -477,9 +489,24 @@ public class AzkabanWebServer extends AzkabanServer {
root.addServlet(new ServletHolder(new HistoryServlet()), "/history");
root.addServlet(new ServletHolder(new ScheduleServlet()),"/schedule");
root.addServlet(new ServletHolder(new JMXHttpServlet()),"/jmx");
+ root.addServlet(new ServletHolder(new TriggerManagerServlet()),"/triggers");
String viewerPluginDir = azkabanSettings.getString("viewer.plugin.dir", "plugins/viewer");
app.setViewerPlugins(loadViewerPlugins(root, viewerPluginDir, app.getVelocityEngine()));
+
+ // triggerplugin
+ String triggerPluginDir = azkabanSettings.getString("trigger.plugin.dir", "plugins/triggers");
+ Map<String, TriggerPlugin> triggerPlugins = loadTriggerPlugins(root, triggerPluginDir, app);
+ app.setTriggerPlugins(triggerPlugins);
+ // always have basic time trigger
+ app.getTriggerManager().addTriggerServicer(app.getScheduleManager().getTriggerSource(), app.getScheduleManager());
+ // add additional triggers
+ for(TriggerPlugin plugin : triggerPlugins.values()) {
+ TriggerServicer servicer = plugin.getServicer();
+ app.getTriggerManager().addTriggerServicer(servicer.getTriggerSource(), servicer);
+ }
+ // fire up
+ app.getTriggerManager().start();
root.setAttribute(AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY, app);
try {
@@ -508,6 +535,159 @@ public class AzkabanWebServer extends AzkabanServer {
logger.info("Server running on " + (ssl ? "ssl" : "") + " port " + port + ".");
}
+ private static Map<String, TriggerPlugin> loadTriggerPlugins(Context root, String pluginPath, AzkabanWebServer azkabanWebApp) {
+ File triggerPluginPath = new File(pluginPath);
+ if (!triggerPluginPath.exists()) {
+ return Collections.<String, TriggerPlugin>emptyMap();
+ }
+
+ Map<String, TriggerPlugin> installedTriggerPlugins = new HashMap<String, TriggerPlugin>();
+ ClassLoader parentLoader = AzkabanWebServer.class.getClassLoader();
+ File[] pluginDirs = triggerPluginPath.listFiles();
+ ArrayList<String> jarPaths = new ArrayList<String>();
+ for (File pluginDir: pluginDirs) {
+ if (!pluginDir.exists()) {
+ logger.error("Error! Trigger plugin path " + pluginDir.getPath() + " doesn't exist.");
+ continue;
+ }
+
+ if (!pluginDir.isDirectory()) {
+ logger.error("The plugin path " + pluginDir + " is not a directory.");
+ continue;
+ }
+
+ // Load the conf directory
+ File propertiesDir = new File(pluginDir, "conf");
+ Props pluginProps = null;
+ if (propertiesDir.exists() && propertiesDir.isDirectory()) {
+ File propertiesFile = new File(propertiesDir, "plugin.properties");
+ File propertiesOverrideFile = new File(propertiesDir, "override.properties");
+
+ if (propertiesFile.exists()) {
+ if (propertiesOverrideFile.exists()) {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile, propertiesOverrideFile);
+ }
+ else {
+ pluginProps = PropsUtils.loadProps(null, propertiesFile);
+ }
+ }
+ else {
+ logger.error("Plugin conf file " + propertiesFile + " not found.");
+ continue;
+ }
+ }
+ else {
+ logger.error("Plugin conf path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ String pluginName = pluginProps.getString("trigger.name");
+ String pluginWebPath = pluginProps.getString("trigger.path");
+ int pluginOrder = pluginProps.getInt("trigger.order", 0);
+ boolean pluginHidden = pluginProps.getBoolean("trigger.hidden", false);
+ List<String> extLibClasspath = pluginProps.getStringList("trigger.external.classpaths", (List<String>)null);
+
+ String pluginClass = pluginProps.getString("trigger.class");
+ if (pluginClass == null) {
+ logger.error("Trigger class is not set.");
+ }
+ else {
+ logger.error("Plugin class " + pluginClass);
+ }
+
+ URLClassLoader urlClassLoader = null;
+ File libDir = new File(pluginDir, "lib");
+ if (libDir.exists() && libDir.isDirectory()) {
+ File[] files = libDir.listFiles();
+
+ ArrayList<URL> urls = new ArrayList<URL>();
+ for (int i=0; i < files.length; ++i) {
+ try {
+ URL url = files[i].toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ if (extLibClasspath != null) {
+ for (String extLib : extLibClasspath) {
+ try {
+ File file = new File(pluginDir, extLib);
+ URL url = file.toURI().toURL();
+ urls.add(url);
+ } catch (MalformedURLException e) {
+ logger.error(e);
+ }
+ }
+ }
+
+ urlClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentLoader);
+ }
+ else {
+ logger.error("Library path " + propertiesDir + " not found.");
+ continue;
+ }
+
+ Class<?> triggerClass = null;
+ try {
+ triggerClass = urlClassLoader.loadClass(pluginClass);
+ }
+ catch (ClassNotFoundException e) {
+ logger.error("Class " + pluginClass + " not found.");
+ continue;
+ }
+
+ String source = FileIOUtils.getSourcePathFromClass(triggerClass);
+ logger.info("Source jar " + source);
+ jarPaths.add("jar:file:" + source);
+
+ Constructor<?> constructor = null;
+ try {
+ constructor = triggerClass.getConstructor(String.class, Props.class, Context.class, AzkabanWebServer.class);
+ } catch (NoSuchMethodException e) {
+ logger.error("Constructor not found in " + pluginClass);
+ continue;
+ }
+
+ Object obj = null;
+ try {
+ obj = constructor.newInstance(pluginName, pluginProps, root, azkabanWebApp);
+ } catch (Exception e) {
+ logger.error(e);
+ }
+
+ if (!(obj instanceof TriggerPlugin)) {
+ logger.error("The object is not an TriggerPlugin");
+ continue;
+ }
+
+ TriggerPlugin plugin = (TriggerPlugin) obj;
+// AbstractTriggerServlet avServlet = (AbstractTriggerServlet)obj;
+// root.addServlet(new ServletHolder(avServlet), "/" + pluginWebPath + "/*");
+ installedTriggerPlugins.put(pluginName, plugin);
+ }
+
+ // Velocity needs the jar resource paths to be set.
+ String jarResourcePath = StringUtils.join(jarPaths, ", ");
+ logger.info("Setting jar resource path " + jarResourcePath);
+ VelocityEngine ve = azkabanWebApp.getVelocityEngine();
+ ve.addProperty("jar.resource.loader.path", jarResourcePath);
+
+// // Sort plugins based on order
+// Collections.sort(installedTriggerPlugins, new Comparator<TriggerPlugin>() {
+// @Override
+// public int compare(TriggerPlugin o1, TriggerPlugin o2) {
+// return o1.getOrder() - o2.getOrder();
+// }
+// });
+
+ return installedTriggerPlugins;
+ }
+
+ public Map<String, TriggerPlugin> getTriggerPlugins() {
+ return triggerPlugins;
+ }
+
private static List<ViewerPlugin> loadViewerPlugins(Context root, String pluginPath, VelocityEngine ve) {
File viewerPluginPath = new File(pluginPath);
if (!viewerPluginPath.exists()) {
diff --git a/src/java/azkaban/webapp/servlet/TriggerManagerServlet.java b/src/java/azkaban/webapp/servlet/TriggerManagerServlet.java
new file mode 100644
index 0000000..73d11d7
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/TriggerManagerServlet.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.webapp.servlet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.user.User;
+import azkaban.webapp.AzkabanWebServer;
+import azkaban.webapp.session.Session;
+
+public class TriggerManagerServlet extends LoginAbstractAzkabanServlet {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = Logger.getLogger(TriggerManagerServlet.class);
+ private TriggerManager triggerManager;
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ super.init(config);
+ AzkabanWebServer server = (AzkabanWebServer)getApplication();
+ triggerManager = server.getTriggerManager();
+ }
+
+ @Override
+ protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
+ Session session) throws ServletException, IOException {
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ } else {
+ handleGetAllSchedules(req, resp, session);
+ }
+ }
+
+ private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ String ajaxName = getParam(req, "ajax");
+
+ try {
+ if (ajaxName.equals("removeTrigger")) {
+ ajaxRemoveTrigger(req, ret, session.getUser());
+ }
+ } catch (Exception e) {
+ ret.put("error", e.getMessage());
+ }
+
+ if (ret != null) {
+ this.writeJSON(resp, ret);
+ }
+ }
+
+ private void handleGetAllSchedules(HttpServletRequest req, HttpServletResponse resp,
+ Session session) throws ServletException, IOException{
+
+ Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/triggerspage.vm");
+
+ List<Trigger> triggers = triggerManager.getTriggers();
+ page.add("triggers", triggers);
+//
+// List<SLA> slas = slaManager.getSLAs();
+// page.add("slas", slas);
+
+ page.render();
+ }
+
+ @Override
+ protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+ if (hasParam(req, "ajax")) {
+ handleAJAXAction(req, resp, session);
+ }
+ }
+
+ private void ajaxRemoveTrigger(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException, TriggerManagerException{
+ int triggerId = getIntParam(req, "triggerId");
+ Trigger t = triggerManager.getTrigger(triggerId);
+ if(t == null) {
+ ret.put("message", "Trigger with ID " + triggerId + " does not exist");
+ ret.put("status", "error");
+ return;
+ }
+
+// if(!hasPermission(project, user, Type.SCHEDULE)) {
+// ret.put("status", "error");
+// ret.put("message", "Permission denied. Cannot remove trigger with id " + triggerId);
+// return;
+// }
+
+ triggerManager.removeTrigger(triggerId);
+ logger.info("User '" + user.getUserId() + " has removed trigger " + t.getDescription());
+// projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + sched.toString() + " has been removed.");
+
+ ret.put("status", "success");
+ ret.put("message", "trigger " + triggerId + " removed from Schedules.");
+ return;
+ }
+
+}
+
diff --git a/src/java/azkaban/webapp/servlet/TriggerPlugin.java b/src/java/azkaban/webapp/servlet/TriggerPlugin.java
new file mode 100644
index 0000000..ee5e620
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/TriggerPlugin.java
@@ -0,0 +1,18 @@
+package azkaban.webapp.servlet;
+
+import azkaban.trigger.TriggerServicer;
+
+public interface TriggerPlugin {
+
+// public TriggerPlugin(String pluginName, Props props, AzkabanWebServer azkabanWebApp) {
+// this.pluginName = pluginName;
+// this.pluginPath = props.getString("trigger.path");
+// this.order = props.getInt("trigger.order", 0);
+// this.hidden = props.getBoolean("trigger.hidden", false);
+//
+// }
+
+ public AbstractAzkabanServlet getServlet();
+ public TriggerServicer getServicer();
+ public void load();
+}
diff --git a/src/java/azkaban/webapp/servlet/velocity/nav.vm b/src/java/azkaban/webapp/servlet/velocity/nav.vm
index 8019467..8ce7d72 100644
--- a/src/java/azkaban/webapp/servlet/velocity/nav.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/nav.vm
@@ -25,6 +25,7 @@
<ul id="nav" class="nav">
<li id="all-jobs-tab" #if($current_page == 'all')class="selected"#end onClick="navMenuClick('$!context/')"><a href="$!context/">Projects</a></li>
<li id="scheduled-jobs-tab" #if($current_page == 'schedule')class="selected"#end onClick="navMenuClick('$!context/schedule')"><a href="$!context/schedule">Scheduled</a></li>
+ <li id="triggers-tab" #if($current_page == 'triggers')class="selected"#end onClick="navMenuClick('$!context/triggers')"><a href="$!context/triggers">Triggers</a></li>
<li id="executing-jobs-tab" #if($current_page == 'executing')class="selected"#end onClick="navMenuClick('$!context/executor')"><a href="$!context/executor">Executing</a></li>
<li id="history-jobs-tab" #if($current_page == 'history')class="selected"#end onClick="navMenuClick('$!context/history')"><a href="$!context/history">History</a></li>
diff --git a/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm b/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm
new file mode 100644
index 0000000..77a12c2
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/triggerspage.vm
@@ -0,0 +1,97 @@
+#*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+*#
+
+<!DOCTYPE html>
+<html>
+ <head>
+#parse( "azkaban/webapp/servlet/velocity/style.vm" )
+ <link rel="stylesheet" type="text/css" href="${context}/css/jquery-ui-1.10.1.custom.css" />
+ <link rel="stylesheet" type="text/css" href="${context}/css/jquery-ui.css" />
+
+ <script type="text/javascript" src="${context}/js/jquery/jquery-1.9.1.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.10.1.custom.js"></script>
+ <script type="text/javascript" src="${context}/js/underscore-1.4.4-min.js"></script>
+ <script type="text/javascript" src="${context}/js/namespace.js"></script>
+ <script type="text/javascript" src="${context}/js/backbone-0.9.10-min.js"></script>
+ <script type="text/javascript" src="${context}/js/jquery.simplemodal-1.4.4.js"></script>
+
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-timepicker-addon.js"></script>
+ <script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-sliderAccess.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.table.sort.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.nav.js"></script>
+ <script type="text/javascript" src="${context}/js/azkaban.triggers.view.js"></script>
+ <script type="text/javascript">
+ var contextURL = "${context}";
+ var currentTime = ${currentTime};
+ var timezone = "${timezone}";
+ var errorMessage = null;
+ var successMessage = null;
+ </script>
+ </head>
+ <body>
+#set($current_page="triggers")
+#parse( "azkaban/webapp/servlet/velocity/nav.vm" )
+ <div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>
+
+ <div class="content">
+
+#if($errorMsg)
+ <div class="box-error-message">$errorMsg</div>
+#else
+#if($error_message != "null")
+ <div class="box-error-message">$error_message</div>
+#elseif($success_message != "null")
+ <div class="box-success-message">$success_message</div>
+#end
+#end
+
+ <div id="all-triggers-content">
+ <div class="section-hd">
+ <h2>All Triggers</h2>
+ </div>
+ </div>
+
+ <div class="triggers">
+ <table id="triggersTbl">
+ <thead>
+ <tr>
+ <th>ID</th>
+ <th>Source</th>
+ <th>Submitted By</th>
+ <th>Description</th>
+ <th colspan="2" class="action ignoresort">Action</th>
+ </tr>
+ </thead>
+ <tbody>
+ #if($triggers)
+#foreach($trigger in $triggers)
+ <tr class="row" >
+
+ <td>${trigger.triggerId}</td>
+ <td>${trigger.source}</td>
+ <td>${trigger.submitUser}</td>
+ <td>${trigger.getDescription()}</td>
+ <td><button id="removeTriggerBtn" hidden="true" onclick="removeTrigger(${trigger.triggerId})" >Remove Trigger</button></td>
+ </tr>
+#end
+#else
+ <tr><td class="last">No Trigger Found</td></tr>
+#end
+ </tbody>
+ </table>
+ </div>
+ </body>
+</html>
diff --git a/src/java/azkaban/webapp/session/SessionCache.java b/src/java/azkaban/webapp/session/SessionCache.java
index be76c55..0058bf1 100644
--- a/src/java/azkaban/webapp/session/SessionCache.java
+++ b/src/java/azkaban/webapp/session/SessionCache.java
@@ -33,7 +33,7 @@ import azkaban.utils.cache.Element;
*/
public class SessionCache {
private static final int MAX_NUM_SESSIONS = 10000;
- private static final int SESSION_TIME_TO_LIVE = 10000;
+ private static final long SESSION_TIME_TO_LIVE = 24*60*60*1000L;
// private CacheManager manager = CacheManager.create();
private Cache cache;
@@ -48,7 +48,7 @@ public class SessionCache {
cache = manager.createCache();
cache.setEjectionPolicy(EjectionPolicy.LRU);
cache.setMaxCacheSize(props.getInt("max.num.sessions", MAX_NUM_SESSIONS));
- cache.setExpiryTimeToLiveMs(props.getInt("session.time.to.live", SESSION_TIME_TO_LIVE));
+ cache.setExpiryTimeToLiveMs(props.getLong("session.time.to.live", SESSION_TIME_TO_LIVE));
}
/**
src/sql/create.triggers.sql 1(+1 -0)
diff --git a/src/sql/create.triggers.sql b/src/sql/create.triggers.sql
index 4c05f55..523ad5a 100644
--- a/src/sql/create.triggers.sql
+++ b/src/sql/create.triggers.sql
@@ -1,5 +1,6 @@
CREATE TABLE triggers (
trigger_id INT NOT NULL AUTO_INCREMENT,
+ trigger_source VARCHAR(128),
modify_time BIGINT NOT NULL,
enc_type TINYINT,
data LONGBLOB,
src/web/js/azkaban.triggers.view.js 342(+342 -0)
diff --git a/src/web/js/azkaban.triggers.view.js b/src/web/js/azkaban.triggers.view.js
new file mode 100644
index 0000000..f507dc1
--- /dev/null
+++ b/src/web/js/azkaban.triggers.view.js
@@ -0,0 +1,342 @@
+$.namespace('azkaban');
+
+
+function removeSched(scheduleId) {
+ var scheduleURL = contextURL + "/schedule"
+ var redirectURL = contextURL + "/schedule"
+ $.post(
+ scheduleURL,
+ {"action":"removeSched", "scheduleId":scheduleId},
+ function(data) {
+ if (data.error) {
+// alert(data.error)
+ $('#errorMsg').text(data.error);
+ }
+ else {
+// alert("Schedule "+schedId+" removed!")
+ window.location = redirectURL;
+ }
+ },
+ "json"
+ )
+}
+
+function removeSla(scheduleId) {
+ var scheduleURL = contextURL + "/schedule"
+ var redirectURL = contextURL + "/schedule"
+ $.post(
+ scheduleURL,
+ {"action":"removeSla", "scheduleId":scheduleId},
+ function(data) {
+ if (data.error) {
+// alert(data.error)
+ $('#errorMsg').text(data.error)
+ }
+ else {
+// alert("Schedule "+schedId+" removed!")
+ window.location = redirectURL
+ }
+ },
+ "json"
+ )
+}
+
+azkaban.ChangeSlaView = Backbone.View.extend({
+ events : {
+ "click" : "closeEditingTarget",
+ "click #set-sla-btn": "handleSetSla",
+ "click #remove-sla-btn": "handleRemoveSla",
+ "click #sla-cancel-btn": "handleSlaCancel",
+ "click .modal-close": "handleSlaCancel",
+ "click #addRow": "handleAddRow"
+ },
+ initialize: function(setting) {
+
+ },
+ handleSlaCancel: function(evt) {
+ console.log("Clicked cancel button");
+ var scheduleURL = contextURL + "/schedule";
+
+ $('#slaModalBackground').hide();
+ $('#sla-options').hide();
+
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+ var rows = tFlowRules.rows;
+ var rowLength = rows.length
+ for(var i = 0; i < rowLength-1; i++) {
+ tFlowRules.deleteRow(0);
+ }
+
+ },
+ initFromSched: function(scheduleId, flowName) {
+ this.scheduleId = scheduleId;
+
+ var scheduleURL = contextURL + "/schedule"
+ this.scheduleURL = scheduleURL;
+ var indexToName = {};
+ var nameToIndex = {};
+ var indexToText = {};
+ this.indexToName = indexToName;
+ this.nameToIndex = nameToIndex;
+ this.indexToText = indexToText;
+ var ruleBoxOptions = ["SUCCESS", "FINISH"];
+ this.ruleBoxOptions = ruleBoxOptions;
+
+ var fetchScheduleData = {"scheduleId": this.scheduleId, "ajax":"slaInfo"};
+
+ $.get(
+ this.scheduleURL,
+ fetchScheduleData,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ if (data.slaEmails) {
+ $('#slaEmails').val(data.slaEmails.join());
+ }
+
+ var allJobNames = data.allJobNames;
+
+ indexToName[0] = "";
+ nameToIndex[flowName] = 0;
+ indexToText[0] = "flow " + flowName;
+ for(var i = 1; i <= allJobNames.length; i++) {
+ indexToName[i] = allJobNames[i-1];
+ nameToIndex[allJobNames[i-1]] = i;
+ indexToText[i] = "job " + allJobNames[i-1];
+ }
+
+
+
+
+
+ // populate with existing settings
+ if(data.settings) {
+
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+
+ for(var setting in data.settings) {
+ var rFlowRule = tFlowRules.insertRow(0);
+
+ var cId = rFlowRule.insertCell(-1);
+ var idSelect = document.createElement("select");
+ for(var i in indexToName) {
+ idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+ if(data.settings[setting].id == indexToName[i]) {
+ idSelect.options[i].selected = true;
+ }
+ }
+ cId.appendChild(idSelect);
+
+ var cRule = rFlowRule.insertCell(-1);
+ var ruleSelect = document.createElement("select");
+ for(var i in ruleBoxOptions) {
+ ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+ if(data.settings[setting].rule == ruleBoxOptions[i]) {
+ ruleSelect.options[i].selected = true;
+ }
+ }
+ cRule.appendChild(ruleSelect);
+
+ var cDuration = rFlowRule.insertCell(-1);
+ var duration = document.createElement("input");
+ duration.type = "text";
+ duration.setAttribute("class", "durationpick");
+ var rawMinutes = data.settings[setting].duration;
+ var intMinutes = rawMinutes.substring(0, rawMinutes.length-1);
+ var minutes = parseInt(intMinutes);
+ var hours = Math.floor(minutes / 60);
+ minutes = minutes % 60;
+ duration.value = hours + ":" + minutes;
+ cDuration.appendChild(duration);
+
+ var cEmail = rFlowRule.insertCell(-1);
+ var emailCheck = document.createElement("input");
+ emailCheck.type = "checkbox";
+ for(var act in data.settings[setting].actions) {
+ if(data.settings[setting].actions[act] == "EMAIL") {
+ emailCheck.checked = true;
+ }
+ }
+ cEmail.appendChild(emailCheck);
+
+ var cKill = rFlowRule.insertCell(-1);
+ var killCheck = document.createElement("input");
+ killCheck.type = "checkbox";
+ for(var act in data.settings[setting].actions) {
+ if(data.settings[setting].actions[act] == "KILL") {
+ killCheck.checked = true;
+ }
+ }
+ cKill.appendChild(killCheck);
+
+ $('.durationpick').timepicker({hourMax: 99});
+ }
+ }
+ $('.durationpick').timepicker({hourMax: 99});
+ }
+ },
+ "json"
+ );
+
+ $('#slaModalBackground').show();
+ $('#sla-options').show();
+
+// this.schedFlowOptions = sched.flowOptions
+ console.log("Loaded schedule info. Ready to set SLA.");
+
+ },
+ handleRemoveSla: function(evt) {
+ console.log("Clicked remove sla button");
+ var scheduleURL = this.scheduleURL;
+ var redirectURL = this.scheduleURL;
+ $.post(
+ scheduleURL,
+ {"action":"removeSla", "scheduleId":this.scheduleId},
+ function(data) {
+ if (data.error) {
+ $('#errorMsg').text(data.error)
+ }
+ else {
+ window.location = redirectURL
+ }
+ "json"
+ }
+ );
+
+ },
+ handleSetSla: function(evt) {
+
+ var slaEmails = $('#slaEmails').val();
+ var settings = {};
+
+
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+ for(var row = 0; row < tFlowRules.rows.length-1; row++) {
+ var rFlowRule = tFlowRules.rows[row];
+ var id = rFlowRule.cells[0].firstChild.value;
+ var rule = rFlowRule.cells[1].firstChild.value;
+ var duration = rFlowRule.cells[2].firstChild.value;
+ var email = rFlowRule.cells[3].firstChild.checked;
+ var kill = rFlowRule.cells[4].firstChild.checked;
+ settings[row] = id + "," + rule + "," + duration + "," + email + "," + kill;
+ }
+
+ var slaData = {
+ scheduleId: this.scheduleId,
+ ajax: "setSla",
+ slaEmails: slaEmails,
+ settings: settings
+ };
+
+ var scheduleURL = this.scheduleURL;
+
+ $.post(
+ scheduleURL,
+ slaData,
+ function(data) {
+ if (data.error) {
+ alert(data.error);
+ }
+ else {
+ tFlowRules.length = 0;
+ window.location = scheduleURL;
+ }
+ },
+ "json"
+ );
+ },
+ handleAddRow: function(evt) {
+
+ var indexToName = this.indexToName;
+ var nameToIndex = this.nameToIndex;
+ var indexToText = this.indexToText;
+ var ruleBoxOptions = this.ruleBoxOptions;
+
+ var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+ var rFlowRule = tFlowRules.insertRow(tFlowRules.rows.length-1);
+
+ var cId = rFlowRule.insertCell(-1);
+ var idSelect = document.createElement("select");
+ for(var i in indexToName) {
+ idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+ }
+
+ cId.appendChild(idSelect);
+
+ var cRule = rFlowRule.insertCell(-1);
+ var ruleSelect = document.createElement("select");
+ for(var i in ruleBoxOptions) {
+ ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+ }
+ cRule.appendChild(ruleSelect);
+
+ var cDuration = rFlowRule.insertCell(-1);
+ var duration = document.createElement("input");
+ duration.type = "text";
+ duration.setAttribute("class", "durationpick");
+ cDuration.appendChild(duration);
+
+ var cEmail = rFlowRule.insertCell(-1);
+ var emailCheck = document.createElement("input");
+ emailCheck.type = "checkbox";
+ cEmail.appendChild(emailCheck);
+
+ var cKill = rFlowRule.insertCell(-1);
+ var killCheck = document.createElement("input");
+ killCheck.type = "checkbox";
+ cKill.appendChild(killCheck);
+
+ $('.durationpick').timepicker({hourMax: 99});
+
+ return rFlowRule;
+ },
+ handleEditColumn : function(evt) {
+ var curTarget = evt.currentTarget;
+
+ if (this.editingTarget != curTarget) {
+ this.closeEditingTarget();
+
+ var text = $(curTarget).children(".spanValue").text();
+ $(curTarget).empty();
+
+ var input = document.createElement("input");
+ $(input).attr("type", "text");
+ $(input).css("width", "100%");
+ $(input).val(text);
+ $(curTarget).addClass("editing");
+ $(curTarget).append(input);
+ $(input).focus();
+ this.editingTarget = curTarget;
+ }
+ },
+ handleRemoveColumn : function(evt) {
+ var curTarget = evt.currentTarget;
+ // Should be the table
+ var row = curTarget.parentElement.parentElement;
+ $(row).remove();
+ },
+ closeEditingTarget: function(evt) {
+
+ }
+});
+
+var slaView;
+var tableSorterView;
+$(function() {
+ var selected;
+
+
+ slaView = new azkaban.ChangeSlaView({el:$('#sla-options')});
+ tableSorterView = new azkaban.TableSorter({el:$('#scheduledFlowsTbl')});
+// var requestURL = contextURL + "/manager";
+
+ // Set up the Flow options view. Create a new one every time :p
+// $('#addSlaBtn').click( function() {
+// slaView.show();
+// });
+
+
+
+});
\ No newline at end of file
diff --git a/unit/java/azkaban/test/trigger/DummyTriggerAction.java b/unit/java/azkaban/test/trigger/DummyTriggerAction.java
index 5fb0b4c..a99b3f7 100644
--- a/unit/java/azkaban/test/trigger/DummyTriggerAction.java
+++ b/unit/java/azkaban/test/trigger/DummyTriggerAction.java
@@ -35,4 +35,9 @@ public class DummyTriggerAction implements TriggerAction{
System.out.println(message);
}
+ @Override
+ public String getDescription() {
+ return "this is real dummy action";
+ }
+
}
diff --git a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
index fc68dfa..31a0788 100644
--- a/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
+++ b/unit/java/azkaban/test/trigger/JdbcTriggerLoaderTest.java
@@ -42,7 +42,7 @@ public class JdbcTriggerLoaderTest {
private static boolean testDBExists = false;
//@TODO remove this and turn into local host.
- private static final String host = "cyu-ld.linkedin.biz";
+ private static final String host = "localhost";
private static final int port = 3306;
private static final String database = "azkaban2";
private static final String user = "azkaban";
@@ -196,7 +196,7 @@ public class JdbcTriggerLoaderTest {
List<TriggerAction> actions = new ArrayList<TriggerAction>();
TriggerAction action = new ExecuteFlowAction(1, projName, flowName, "azkaban", new ExecutionOptions());
actions.add(action);
- Trigger t = new Trigger(now.getMillis(), now.getMillis(), "azkaban", source, triggerCond, expireCond, actions);
+ Trigger t = new Trigger(now, now, "azkaban", source, triggerCond, expireCond, actions);
return t;
}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index bbe3392..98ec8fe 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -152,7 +152,7 @@ public class TriggerManagerTest {
Condition triggerCond = new Condition(checkers, expr);
Condition expireCond = new Condition(checkers, expr);
- Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", source, triggerCond, expireCond, actions);
+ Trigger fakeTrigger = new Trigger(DateTime.now(), DateTime.now(), "azkaban", source, triggerCond, expireCond, actions);
fakeTrigger.setResetOnTrigger(true);
fakeTrigger.setResetOnExpire(true);
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
index 6235c12..db8e324 100644
--- a/unit/java/azkaban/test/trigger/TriggerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -54,7 +54,7 @@ public class TriggerTest {
List<TriggerAction> actions = new ArrayList<TriggerAction>();
TriggerAction action = new ExecuteFlowAction(1, "testProj", "testFlow", "azkaban", new ExecutionOptions());
actions.add(action);
- Trigger t = new Trigger(now.getMillis(), now.getMillis(), "azkaban", "test", triggerCond, expireCond, actions);
+ Trigger t = new Trigger(now, now, "azkaban", "test", triggerCond, expireCond, actions);
File temp = File.createTempFile("temptest", "temptest");
temp.deleteOnExit();