azkaban-uncached
Changes
src/java/azkaban/scheduler/ScheduleManager.java 88(+68 -20)
src/java/azkaban/trigger/Condition.java 10(+9 -1)
src/java/azkaban/trigger/Trigger.java 10(+9 -1)
src/java/azkaban/trigger/TriggerManager.java 89(+58 -31)
Details
diff --git a/src/java/azkaban/actions/ExecuteFlowAction.java b/src/java/azkaban/actions/ExecuteFlowAction.java
index 20d0369..e7c8dfb 100644
--- a/src/java/azkaban/actions/ExecuteFlowAction.java
+++ b/src/java/azkaban/actions/ExecuteFlowAction.java
@@ -26,7 +26,7 @@ public class ExecuteFlowAction implements TriggerAction {
private static ProjectManager projectManager;
private ExecutionOptions executionOptions;
- private static Logger logger;
+ private static Logger logger = Logger.getLogger(ExecuteFlowAction.class);
public ExecuteFlowAction(int projectId, String projectName, String flowName, String submitUser, ExecutionOptions executionOptions) {
this.projectId = projectId;
@@ -75,10 +75,18 @@ public class ExecuteFlowAction implements TriggerAction {
this.executionOptions = executionOptions;
}
+ public static ExecutorManager getExecutorManager() {
+ return executorManager;
+ }
+
public static void setExecutorManager(ExecutorManager executorManager) {
ExecuteFlowAction.executorManager = executorManager;
}
+ public static ProjectManager getProjectManager() {
+ return projectManager;
+ }
+
public static void setProjectManager(ProjectManager projectManager) {
ExecuteFlowAction.projectManager = projectManager;
}
@@ -148,7 +156,11 @@ public class ExecuteFlowAction implements TriggerAction {
}
@Override
- public void doAction() {
+ public void doAction() throws Exception {
+ if(projectManager == null || executorManager == null) {
+ throw new Exception("ExecuteFlowAction not properly initialized!");
+ }
+
Project project = projectManager.getProject(projectId);
if(project == null) {
logger.error("Project to execute " + projectId + " does not exist!");
diff --git a/src/java/azkaban/scheduler/BasicTimeChecker.java b/src/java/azkaban/scheduler/BasicTimeChecker.java
index 5a59c72..9d35746 100644
--- a/src/java/azkaban/scheduler/BasicTimeChecker.java
+++ b/src/java/azkaban/scheduler/BasicTimeChecker.java
@@ -21,7 +21,7 @@ public class BasicTimeChecker implements ConditionChecker {
private boolean skipPastChecks = true;
private ReadablePeriod period;
- private String id = type;
+ private final String id;
public BasicTimeChecker(
String id,
@@ -242,9 +242,5 @@ public class BasicTimeChecker implements ConditionChecker {
return jsonObj;
}
- @Override
- public void setId(String id) {
- this.id = id;
- }
}
src/java/azkaban/scheduler/ScheduleManager.java 88(+68 -20)
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index c60f143..0d5fc2f 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -62,14 +62,16 @@ public class ScheduleManager {
private Map<Pair<Integer, String>, Set<Schedule>> scheduleIdentityPairMap = new LinkedHashMap<Pair<Integer, String>, Set<Schedule>>();
private Map<Integer, Schedule> scheduleIDMap = new LinkedHashMap<Integer, Schedule>();
- private final ScheduleRunner runner;
+
private final ExecutorManager executorManager;
private final ProjectManager projectManager;
private final SLAManager slaManager;
+ private final boolean useExternalRunner;
+ private final ScheduleRunner runner;
+
// Used for mbeans to query Scheduler status
- private long lastCheckTime = -1;
- private long nextWakupTime = -1;
+
/**
* Give the schedule manager a loader class that will properly load the
@@ -80,13 +82,16 @@ public class ScheduleManager {
public ScheduleManager(ExecutorManager executorManager,
ProjectManager projectManager,
SLAManager slaManager,
- ScheduleLoader loader)
+ ScheduleLoader loader,
+ boolean useExternalRunner)
{
this.executorManager = executorManager;
this.projectManager = projectManager;
this.slaManager = slaManager;
this.loader = loader;
- this.runner = new ScheduleRunner();
+ this.useExternalRunner = useExternalRunner;
+
+
List<Schedule> scheduleList = null;
try {
@@ -101,7 +106,12 @@ public class ScheduleManager {
internalSchedule(sched);
}
- this.runner.start();
+ if(!useExternalRunner) {
+ this.runner = new ScheduleRunner();
+ this.runner.start();
+ } else {
+ this.runner = null;
+ }
}
/**
@@ -109,7 +119,9 @@ public class ScheduleManager {
* it again.
*/
public void shutdown() {
- this.runner.shutdown();
+ if(!useExternalRunner) {
+ this.runner.shutdown();
+ }
}
/**
@@ -118,7 +130,8 @@ public class ScheduleManager {
* @return
*/
public synchronized List<Schedule> getSchedules() {
- return runner.getRunnerSchedules();
+ //return runner.getRunnerSchedules();
+ return new ArrayList<Schedule>(scheduleIDMap.values());
}
/**
@@ -149,8 +162,10 @@ public class ScheduleManager {
*/
public synchronized void removeSchedules(int projectId, String flowId) {
Set<Schedule> schedules = getSchedules(projectId, flowId);
- for(Schedule sched : schedules) {
- removeSchedule(sched);
+ if(schedules != null) {
+ for(Schedule sched : schedules) {
+ removeSchedule(sched);
+ }
}
}
/**
@@ -159,7 +174,7 @@ public class ScheduleManager {
* @param id
*/
public synchronized void removeSchedule(Schedule sched) {
-
+
Pair<Integer,String> identityPairMap = sched.getScheduleIdentityPair();
Set<Schedule> schedules = scheduleIdentityPairMap.get(identityPairMap);
if(schedules != null) {
@@ -170,13 +185,17 @@ public class ScheduleManager {
}
scheduleIDMap.remove(sched.getScheduleId());
- runner.removeRunnerSchedule(sched);
try {
loader.removeSchedule(sched);
} catch (ScheduleManagerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
+
+ if(!useExternalRunner) {
+ runner.removeRunnerSchedule(sched);
+ }
+
}
// public synchronized void pauseScheduledFlow(String scheduleId){
@@ -250,11 +269,13 @@ public class ScheduleManager {
*/
private synchronized void internalSchedule(Schedule s) {
Schedule existing = scheduleIDMap.get(s.getScheduleId());
- if (existing != null) {
- this.runner.removeRunnerSchedule(existing);
+ if(!useExternalRunner) {
+ if (existing != null) {
+ this.runner.removeRunnerSchedule(existing);
+ }
+ s.updateTime();
+ this.runner.addRunnerSchedule(s);
}
- s.updateTime();
- this.runner.addRunnerSchedule(s);
scheduleIDMap.put(s.getScheduleId(), s);
Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
if(schedules == null) {
@@ -306,6 +327,9 @@ public class ScheduleManager {
*
*/
public class ScheduleRunner extends Thread {
+
+ private long lastCheckTime = -1;
+ private long nextWakupTime = -1;
private final PriorityBlockingQueue<Schedule> schedules;
private AtomicBoolean stillAlive = new AtomicBoolean(true);
@@ -322,6 +346,14 @@ public class ScheduleManager {
this.interrupt();
}
+ public long getLastCheckTime() {
+ return lastCheckTime;
+ }
+
+ public long getNextWakeupTime() {
+ return nextWakupTime;
+ }
+
/**
* Return a list of scheduled flow
*
@@ -528,18 +560,34 @@ public class ScheduleManager {
}
public long getLastCheckTime() {
- return lastCheckTime;
+ if(useExternalRunner) {
+ return -1;
+ } else {
+ return runner.getLastCheckTime();
+ }
}
public long getNextUpdateTime() {
- return nextWakupTime;
+ if(useExternalRunner) {
+ return -1;
+ } else {
+ return runner.getNextWakeupTime();
+ }
}
public State getThreadState() {
- return runner.getState();
+ if(useExternalRunner) {
+ return null;
+ } else {
+ return runner.getState();
+ }
}
public boolean isThreadActive() {
- return runner.isAlive();
+ if(useExternalRunner) {
+ return false;
+ } else {
+ return runner.isAlive();
+ }
}
}
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 05f7614..a36f3f1 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -16,6 +16,7 @@ import azkaban.trigger.ConditionChecker;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
public class TriggerBasedScheduleLoader implements ScheduleLoader {
@@ -23,6 +24,8 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
private TriggerManager triggerManager;
+ private static final String triggerSource = "TriggerBasedScheduler";
+
public TriggerBasedScheduleLoader(TriggerManager triggerManager, ExecutorManager executorManager, ProjectManager projectManager) {
this.triggerManager = triggerManager;
// need to init the action types and condition checker types
@@ -35,8 +38,10 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
Condition triggerCondition = createTimeTriggerCondition(s);
Condition expireCondition = createTimeExpireCondition(s);
List<TriggerAction> actions = createActions(s);
- Trigger t = new Trigger(s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), "TriggerBasedScheduler", triggerCondition, expireCondition, actions);
-
+ Trigger t = new Trigger(s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), triggerSource, triggerCondition, expireCondition, actions);
+ if(s.isRecurring()) {
+ t.setResetOnTrigger(true);
+ }
return t;
}
@@ -69,21 +74,33 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
@Override
public void insertSchedule(Schedule s) throws ScheduleManagerException {
Trigger t = createScheduleTrigger(s);
- triggerManager.insertTrigger(t);
+ try {
+ triggerManager.insertTrigger(t);
+ } catch (TriggerManagerException e) {
+ // TODO Auto-generated catch block
+ throw new ScheduleManagerException("Failed to insert new schedule!", e);
+ }
}
@Override
public void updateSchedule(Schedule s) throws ScheduleManagerException {
-
-
+ Trigger t = createScheduleTrigger(s);
+ try {
+ triggerManager.updateTrigger(t);
+ } catch (TriggerManagerException e) {
+ // TODO Auto-generated catch block
+ throw new ScheduleManagerException("Failed to update schedule!", e);
+ }
}
+ //TODO
+ // may need to add logic to filter out skip runs
@Override
public List<Schedule> loadSchedules() throws ScheduleManagerException {
List<Trigger> triggers = triggerManager.getTriggers();
List<Schedule> schedules = new ArrayList<Schedule>();
for(Trigger t : triggers) {
- if(t.getSource().equals("TriggerBasedScheduler")) {
+ if(t.getSource().equals(triggerSource)) {
Schedule s = triggerToSchedule(t);
schedules.add(s);
}
@@ -133,7 +150,12 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
@Override
public void removeSchedule(Schedule s) throws ScheduleManagerException {
- triggerManager.removeTrigger(s.getScheduleId());
+ try {
+ triggerManager.removeTrigger(s.getScheduleId());
+ } catch (TriggerManagerException e) {
+ // TODO Auto-generated catch block
+ throw new ScheduleManagerException(e.getMessage());
+ }
}
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduler.java b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
index 1d0f2a6..87a3cf6 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduler.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduler.java
@@ -54,12 +54,7 @@ public class TriggerBasedScheduler {
private ScheduleLoader loader;
private Map<Pair<Integer, String>, Schedule> scheduleIDMap = new HashMap<Pair<Integer, String>, Schedule>();
- private Map<Integer, Pair<Integer, String>> idFlowMap = new HashMap<Integer, Pair<Integer,String>>();
-
- private final ExecutorManager executorManager;
- private final ProjectManager projectManager;
- private final SLAManager slaManager;
- private final TriggerManager triggerManager;
+ private Map<Integer, Schedule> idFlowMap = new HashMap<Integer, Schedule>();
/**
* Give the schedule manager a loader class that will properly load the
@@ -69,15 +64,10 @@ public class TriggerBasedScheduler {
*/
public TriggerBasedScheduler(ExecutorManager executorManager,
ProjectManager projectManager,
- SLAManager slaManager,
TriggerManager triggerManager,
ScheduleLoader loader)
{
- this.executorManager = executorManager;
- this.projectManager = projectManager;
- this.slaManager = slaManager;
- this.triggerManager = triggerManager;
- this.loader = loader;
+ this.loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
List<Schedule> scheduleList = null;
try {
@@ -87,7 +77,10 @@ public class TriggerBasedScheduler {
logger.error("Failed to load schedules" + e.getCause() + e.getMessage());
e.printStackTrace();
}
-
+ for(Schedule s : scheduleList) {
+ scheduleIDMap.put(s.getScheduleIdentityPair(), s);
+ idFlowMap.put(s.getScheduleId(), s);
+ }
}
/**
diff --git a/src/java/azkaban/trigger/ActionTypeLoader.java b/src/java/azkaban/trigger/ActionTypeLoader.java
index 38ee266..62e50f8 100644
--- a/src/java/azkaban/trigger/ActionTypeLoader.java
+++ b/src/java/azkaban/trigger/ActionTypeLoader.java
@@ -139,9 +139,12 @@ public class ActionTypeLoader {
logger.info("Loaded ExecuteFlowAction type.");
}
- public TriggerAction createActionFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ public TriggerAction createActionFromJson(String type, Object obj) throws Exception {
TriggerAction action = null;
Class<? extends TriggerAction> actionClass = actionToClass.get(type);
+ if(actionClass == null) {
+ throw new Exception("Action Type " + type + " not supported!");
+ }
action = (TriggerAction) Utils.invokeStaticMethod(actionClass.getClassLoader(), actionClass.getName(), "createFromJson", obj);
return action;
diff --git a/src/java/azkaban/trigger/CheckerTypeLoader.java b/src/java/azkaban/trigger/CheckerTypeLoader.java
index f67655d..67a207d 100644
--- a/src/java/azkaban/trigger/CheckerTypeLoader.java
+++ b/src/java/azkaban/trigger/CheckerTypeLoader.java
@@ -142,9 +142,12 @@ public class CheckerTypeLoader {
logger.info("Loaded BasicTimeChecker type.");
}
- public ConditionChecker createCheckerFromJson(String type, Object obj) throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ public ConditionChecker createCheckerFromJson(String type, Object obj) throws Exception {
ConditionChecker checker = null;
- Class<? extends ConditionChecker> checkerClass = checkerToClass.get(type);
+ Class<? extends ConditionChecker> checkerClass = checkerToClass.get(type);
+ if(checkerClass == null) {
+ throw new Exception("Checker type " + type + " not supported!");
+ }
checker = (ConditionChecker) Utils.invokeStaticMethod(checkerClass.getClassLoader(), checkerClass.getName(), "createFromJson", obj);
return checker;
src/java/azkaban/trigger/Condition.java 10(+9 -1)
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index a7b311a..5d809d5 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -34,6 +34,10 @@ public class Condition {
Condition.checkerLoader = loader;
}
+ public static CheckerTypeLoader getCheckerLoader() {
+ return checkerLoader;
+ }
+
public void registerChecker(ConditionChecker checker) {
checkers.put(checker.getId(), checker);
context.set(checker.getId(), checker);
@@ -85,7 +89,11 @@ public class Condition {
}
@SuppressWarnings("unchecked")
- public static Condition fromJson(Object obj) {
+ public static Condition fromJson(Object obj) throws Exception {
+ if(checkerLoader == null) {
+ throw new Exception("Condition Checker loader not initialized!");
+ }
+
Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
Condition cond = null;
diff --git a/src/java/azkaban/trigger/ConditionChecker.java b/src/java/azkaban/trigger/ConditionChecker.java
index 20b3075..14ca333 100644
--- a/src/java/azkaban/trigger/ConditionChecker.java
+++ b/src/java/azkaban/trigger/ConditionChecker.java
@@ -8,9 +8,7 @@ public interface ConditionChecker {
Object getNum();
void reset();
-
- void setId(String id);
-
+
String getId();
String getType();
src/java/azkaban/trigger/Trigger.java 10(+9 -1)
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index 800606d..d819800 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -92,6 +92,10 @@ public class Trigger {
Trigger.actionTypeLoader = loader;
}
+ public static ActionTypeLoader getActionTypeLoader() {
+ return actionTypeLoader;
+ }
+
public boolean isResetOnTrigger() {
return resetOnTrigger;
}
@@ -165,8 +169,12 @@ public class Trigger {
}
@SuppressWarnings("unchecked")
- public static Trigger fromJson(Object obj) {
+ public static Trigger fromJson(Object obj) throws Exception {
+ if(actionTypeLoader == null) {
+ throw new Exception("Trigger Action Type loader not initialized.");
+ }
+
Map<String, Object> jsonObj = (HashMap<String, Object>) obj;
Trigger trigger = null;
diff --git a/src/java/azkaban/trigger/TriggerAction.java b/src/java/azkaban/trigger/TriggerAction.java
index 581b3fa..c57d585 100644
--- a/src/java/azkaban/trigger/TriggerAction.java
+++ b/src/java/azkaban/trigger/TriggerAction.java
@@ -8,6 +8,6 @@ public interface TriggerAction {
Object toJson();
- void doAction();
+ void doAction() throws Exception;
}
src/java/azkaban/trigger/TriggerManager.java 89(+58 -31)
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index 393c974..f5ad251 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -16,20 +16,20 @@ import azkaban.utils.Props;
public class TriggerManager {
private static Logger logger = Logger.getLogger(TriggerManager.class);
- private final long DEFAULT_TRIGGER_EXPIRE_TIME = 24*60*60*1000L;
-
private Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
private CheckerTypeLoader checkerLoader;
private ActionTypeLoader actionLoader;
+ private TriggerLoader triggerLoader;
+
TriggerScannerThread scannerThread;
- public TriggerManager(Props props, TriggerLoader triggerLoader, CheckerTypeLoader checkerLoader, ActionTypeLoader actionLoader) {
+ public TriggerManager(Props props, TriggerLoader triggerLoader) {
+
+ checkerLoader = new CheckerTypeLoader();
+ actionLoader = new ActionTypeLoader();
- this.checkerLoader = checkerLoader;
- this.actionLoader = actionLoader;
- scannerThread = new TriggerScannerThread("TriggerScannerThread");
// load plugins
try{
@@ -43,6 +43,11 @@ public class TriggerManager {
Condition.setCheckerLoader(checkerLoader);
Trigger.setActionTypeLoader(actionLoader);
+ long scannerInterval = props.getLong("trigger.scan.interval", TriggerScannerThread.DEFAULT_SCAN_INTERVAL_MS);
+ scannerThread = new TriggerScannerThread(scannerInterval);
+ scannerThread.setName("TriggerScannerThread");
+
+ this.triggerLoader = triggerLoader;
try{
// expect loader to return valid triggers
List<Trigger> triggers = triggerLoader.loadTriggers();
@@ -55,8 +60,6 @@ public class TriggerManager {
logger.error(e.getMessage());
}
-
-
scannerThread.start();
}
@@ -68,17 +71,33 @@ public class TriggerManager {
return actionLoader;
}
- public synchronized void insertTrigger(Trigger t) {
+ public synchronized void insertTrigger(Trigger t) throws TriggerManagerException {
+
+ triggerLoader.addTrigger(t);
triggerIdMap.put(t.getTriggerId(), t);
scannerThread.addTrigger(t);
}
- public synchronized void removeTrigger(int id) {
+ public synchronized void removeTrigger(int id) throws TriggerManagerException {
removeTrigger(triggerIdMap.get(id));
}
- public synchronized void removeTrigger(Trigger t) {
- scannerThread.removeTrigger(t);
+ public synchronized void updateTrigger(Trigger t) throws TriggerManagerException {
+ if(!triggerIdMap.containsKey(t.getTriggerId())) {
+ throw new TriggerManagerException("The trigger to update doesn't exist!");
+ }
+
+ scannerThread.deleteTrigger(t);
+ scannerThread.addTrigger(t);
+ triggerIdMap.put(t.getTriggerId(), t);
+
+
+ triggerLoader.updateTrigger(t);
+ }
+
+ public synchronized void removeTrigger(Trigger t) throws TriggerManagerException {
+ triggerLoader.removeTrigger(t);
+ scannerThread.deleteTrigger(t);
triggerIdMap.remove(t.getTriggerId());
}
@@ -88,21 +107,29 @@ public class TriggerManager {
//trigger scanner thread
public class TriggerScannerThread extends Thread {
+
+ //public static final long DEFAULT_SCAN_INTERVAL_MS = 300000;
+ public static final long DEFAULT_SCAN_INTERVAL_MS = 60000;
+
private final BlockingQueue<Trigger> triggers;
private AtomicBoolean stillAlive = new AtomicBoolean(true);
- private String scannerName;
private long lastCheckTime = -1;
+ private final long scanInterval;
// Five minute minimum intervals
- private static final int TIMEOUT_MS = 300000;
- public TriggerScannerThread(String scannerName){
+ public TriggerScannerThread(){
triggers = new LinkedBlockingDeque<Trigger>();
- this.setName(scannerName);
+ this.scanInterval = DEFAULT_SCAN_INTERVAL_MS;
+ }
+
+ public TriggerScannerThread(long interval){
+ triggers = new LinkedBlockingDeque<Trigger>();
+ this.scanInterval = interval;
}
public void shutdown() {
- logger.error("Shutting down trigger manager thread " + scannerName);
+ logger.error("Shutting down trigger manager thread " + this.getName());
stillAlive.set(false);
this.interrupt();
}
@@ -115,7 +142,7 @@ public class TriggerManager {
triggers.add(t);
}
- public synchronized void removeTrigger(Trigger t) {
+ public synchronized void deleteTrigger(Trigger t) {
triggers.remove(t);
}
@@ -135,9 +162,9 @@ public class TriggerManager {
logger.error(t.getMessage());
}
- long timeRemaining = TIMEOUT_MS - (System.currentTimeMillis() - lastCheckTime);
+ long timeRemaining = scanInterval - (System.currentTimeMillis() - lastCheckTime);
if(timeRemaining < 0) {
- logger.error("Trigger manager thread " + scannerName + " is too busy!");
+ logger.error("Trigger manager thread " + this.getName() + " is too busy!");
} else {
wait(timeRemaining);
}
@@ -149,7 +176,7 @@ public class TriggerManager {
}
}
- private void checkAllTriggers() {
+ private void checkAllTriggers() throws TriggerManagerException {
for(Trigger t : triggers) {
if(t.triggerConditionMet()) {
onTriggerTrigger(t);
@@ -157,32 +184,32 @@ public class TriggerManager {
onTriggerExpire(t);
}
}
-
}
- private void onTriggerTrigger(Trigger t) {
+ private void onTriggerTrigger(Trigger t) throws TriggerManagerException {
List<TriggerAction> actions = t.getTriggerActions();
for(TriggerAction action : actions) {
- action.doAction();
+ try {
+ action.doAction();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ throw new TriggerManagerException("action failed to execute", e);
+ }
}
if(t.isResetOnTrigger()) {
t.resetTriggerConditions();
} else {
- triggers.remove(t);
+ removeTrigger(t);
}
}
- private void onTriggerExpire(Trigger t) {
+ private void onTriggerExpire(Trigger t) throws TriggerManagerException {
if(t.isResetOnExpire()) {
t.resetTriggerConditions();
} else {
- triggers.remove(t);
+ removeTrigger(t);
}
}
}
-
- public TriggerAction createTriggerAction() {
- return null;
- }
}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index edbf237..2cafed0 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -60,10 +60,16 @@ import azkaban.project.JdbcProjectLoader;
import azkaban.project.ProjectManager;
import azkaban.scheduler.JdbcScheduleLoader;
+import azkaban.scheduler.ScheduleLoader;
import azkaban.scheduler.ScheduleManager;
+import azkaban.scheduler.TriggerBasedScheduleLoader;
+import azkaban.scheduler.TriggerBasedScheduler;
import azkaban.sla.JdbcSLALoader;
import azkaban.sla.SLAManager;
import azkaban.sla.SLAManagerException;
+import azkaban.trigger.JdbcTriggerLoader;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerManager;
import azkaban.user.UserManager;
import azkaban.user.XmlUserManager;
import azkaban.utils.FileIOUtils;
@@ -129,6 +135,9 @@ public class AzkabanWebServer extends AzkabanServer {
private ProjectManager projectManager;
private ExecutorManager executorManager;
private ScheduleManager scheduleManager;
+// private TriggerBasedScheduler scheduler;
+ private TriggerManager triggerManager;
+
private SLAManager slaManager;
private final ClassLoader baseClassLoader;
@@ -161,7 +170,12 @@ public class AzkabanWebServer extends AzkabanServer {
projectManager = loadProjectManager(props);
executorManager = loadExecutorManager(props);
slaManager = loadSLAManager(props);
- scheduleManager = loadScheduleManager(executorManager, slaManager, props);
+
+ triggerManager = loadTriggerManager(props);
+// scheduler = loadScheduler(executorManager, projectManager, triggerManager);
+
+ scheduleManager = loadScheduleManager(projectManager, executorManager, slaManager, triggerManager, props);
+
baseClassLoader = getBaseClassloader();
tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -220,17 +234,35 @@ public class AzkabanWebServer extends AzkabanServer {
return execManager;
}
- private ScheduleManager loadScheduleManager(ExecutorManager execManager, SLAManager slaManager, Props props ) throws Exception {
- ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, slaManager, new JdbcScheduleLoader(props));
+ private ScheduleManager loadScheduleManager(ProjectManager projectManager, ExecutorManager executorManager, SLAManager slaManager, TriggerManager triggerManager, Props props ) throws Exception {
+ ScheduleManager schedManager = null;
+ String scheduleLoaderType = props.getString("azkaban.scheduler.loader", "TriggerBasedScheduleLoader");
+ if(scheduleLoaderType.equals("JdbcScheduleLoader")) {
+ ScheduleLoader loader = new JdbcScheduleLoader(props);
+ schedManager = new ScheduleManager(executorManager, projectManager, slaManager, loader, false);
+ } else if(scheduleLoaderType.equals("TriggerBasedScheduleLoader")) {
+ ScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
+ schedManager = new ScheduleManager(executorManager, projectManager, slaManager, loader, true);
+ }
return schedManager;
}
+
+// private TriggerBasedScheduler loadScheduler(ExecutorManager executorManager, ProjectManager projectManager, TriggerManager triggerManager) {
+// TriggerBasedScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
+// return new TriggerBasedScheduler(executorManager, projectManager, triggerManager, loader);
+// }
private SLAManager loadSLAManager(Props props) throws SLAManagerException {
SLAManager slaManager = new SLAManager(executorManager, new JdbcSLALoader(props), props);
return slaManager;
}
+ private TriggerManager loadTriggerManager(Props props) {
+ TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
+ return new TriggerManager(props, triggerLoader);
+ }
+
/**
* Returns the web session cache.
*
@@ -280,6 +312,14 @@ public class AzkabanWebServer extends AzkabanServer {
return scheduleManager;
}
+ public TriggerManager getTriggerManager() {
+ return triggerManager;
+ }
+
+// public TriggerBasedScheduler getScheduler() {
+// return scheduler;
+// }
+//
/**
* Creates and configures the velocity engine.
*
diff --git a/unit/java/azkaban/test/trigger/ConditionTest.java b/unit/java/azkaban/test/trigger/ConditionTest.java
index d959b2a..34ce3bf 100644
--- a/unit/java/azkaban/test/trigger/ConditionTest.java
+++ b/unit/java/azkaban/test/trigger/ConditionTest.java
@@ -47,7 +47,7 @@ public class ConditionTest {
}
@Test
- public void jsonConversionTest() throws TriggerException, IOException {
+ public void jsonConversionTest() throws Exception {
CheckerTypeLoader checkerTypeLoader = new CheckerTypeLoader();
checkerTypeLoader.init(new Props());
diff --git a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
index 2928539..a996885 100644
--- a/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
+++ b/unit/java/azkaban/test/trigger/ExecuteFlowActionTest.java
@@ -19,7 +19,7 @@ import azkaban.utils.Props;
public class ExecuteFlowActionTest {
@Test
- public void jsonConversionTest() throws SecurityException, IllegalArgumentException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, TriggerException {
+ public void jsonConversionTest() throws Exception {
ActionTypeLoader loader = new ActionTypeLoader();
loader.init(new Props());
diff --git a/unit/java/azkaban/test/trigger/ThresholdChecker.java b/unit/java/azkaban/test/trigger/ThresholdChecker.java
index 9c568ba..c26f854 100644
--- a/unit/java/azkaban/test/trigger/ThresholdChecker.java
+++ b/unit/java/azkaban/test/trigger/ThresholdChecker.java
@@ -16,6 +16,9 @@ public class ThresholdChecker implements ConditionChecker{
private String id;
+ private boolean checkerMet = false;
+ private boolean checkerReset = false;
+
public ThresholdChecker(String id, int threshold){
this.id = id;
this.threshold = threshold;
@@ -27,13 +30,24 @@ public class ThresholdChecker implements ConditionChecker{
@Override
public Boolean eval() {
- return curVal > threshold;
+ if(curVal > threshold) {
+ checkerMet = true;
+ }
+ return checkerMet;
+ }
+
+ public boolean isCheckerMet() {
+ return checkerMet;
}
@Override
public void reset() {
- // TODO Auto-generated method stub
-
+ checkerMet = false;
+ checkerReset = true;
+ }
+
+ public boolean isCheckerReset() {
+ return checkerReset;
}
/*
@@ -67,10 +81,5 @@ public class ThresholdChecker implements ConditionChecker{
return null;
}
- @Override
- public void setId(String id) {
- this.id = id;
-
- }
}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index ee376b9..bbe3392 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -18,6 +18,7 @@ import azkaban.trigger.CheckerTypeLoader;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.ActionTypeLoader;
+import azkaban.trigger.TriggerException;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.TriggerManager;
import azkaban.trigger.TriggerManagerException;
@@ -25,9 +26,13 @@ import azkaban.utils.Props;
public class TriggerManagerTest {
+ private TriggerLoader triggerLoader;
+
@Before
- public void setup() {
-
+ public void setup() throws TriggerException, TriggerManagerException {
+ triggerLoader = new MockTriggerLoader();
+
+
}
@After
@@ -36,85 +41,137 @@ public class TriggerManagerTest {
}
@Test
- public void TriggerManagerSimpleTest() {
+ public void TriggerManagerSimpleTest() throws TriggerManagerException {
+
+
Props props = new Props();
- TriggerManager triggerManager = new TriggerManager(props, new MockTriggerLoader(), new MockCheckerLoader(), new MockActionLoader());
+ props.put("trigger.scan.interval", 4000);
+ TriggerManager triggerManager = new TriggerManager(props, triggerLoader);
+
+ triggerManager.getCheckerLoader().registerCheckerType(ThresholdChecker.type, ThresholdChecker.class);
+ triggerManager.getActionLoader().registerActionType(DummyTriggerAction.type, DummyTriggerAction.class);
+
+ ThresholdChecker.setVal(1);
+
+ triggerManager.insertTrigger(createDummyTrigger("test1", "triggerLoader", 10));
List<Trigger> triggers = triggerManager.getTriggers();
assertTrue(triggers.size() == 1);
+ Trigger t1 = triggers.get(0);
+ t1.setResetOnTrigger(false);
+ triggerManager.updateTrigger(t1);
+ ThresholdChecker checker1 = (ThresholdChecker) t1.getTriggerCondition().getCheckers().values().toArray()[0];
+ assertTrue(t1.getSource().equals("triggerLoader"));
- Trigger t2 = createFakeTrigger("addnewtriggger");
+ Trigger t2 = createDummyTrigger("test2: add new trigger", "addNewTriggerTest", 20);
triggerManager.insertTrigger(t2);
+ ThresholdChecker checker2 = (ThresholdChecker) t2.getTriggerCondition().getCheckers().values().toArray()[0];
- triggers = triggerManager.getTriggers();
- assertTrue(triggers.size() == 2);
+ ThresholdChecker.setVal(15);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ assertTrue(checker1.isCheckerMet() == false);
+ assertTrue(checker2.isCheckerMet() == false);
+ assertTrue(checker1.isCheckerReset() == false);
+ assertTrue(checker2.isCheckerReset() == false);
+
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ assertTrue(checker1.isCheckerMet() == true);
+ assertTrue(checker2.isCheckerMet() == false);
+ assertTrue(checker1.isCheckerReset() == false);
+ assertTrue(checker2.isCheckerReset() == false);
+
+ ThresholdChecker.setVal(25);
+ try {
+ Thread.sleep(4000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ assertTrue(checker1.isCheckerMet() == true);
+ assertTrue(checker1.isCheckerReset() == false);
+ assertTrue(checker2.isCheckerReset() == true);
- triggerManager.removeTrigger(t2);
triggers = triggerManager.getTriggers();
assertTrue(triggers.size() == 1);
+
}
public class MockTriggerLoader implements TriggerLoader {
- private List<Trigger> triggers;
+ private Map<Integer, Trigger> triggers = new HashMap<Integer, Trigger>();
+ private int idIndex = 0;
@Override
public void addTrigger(Trigger t) throws TriggerManagerException {
- triggers.add(t);
+ t.setTriggerId(idIndex++);
+ triggers.put(t.getTriggerId(), t);
}
@Override
public void removeTrigger(Trigger s) throws TriggerManagerException {
- triggers.remove(s);
+ triggers.remove(s.getTriggerId());
}
@Override
public void updateTrigger(Trigger t) throws TriggerManagerException {
-
+ triggers.put(t.getTriggerId(), t);
}
@Override
- public List<Trigger> loadTriggers()
- throws TriggerManagerException {
- Trigger t = createFakeTrigger("test");
- triggers = new ArrayList<Trigger>();
- triggers.add(t);
- return triggers;
+ public List<Trigger> loadTriggers() {
+ return new ArrayList<Trigger>(triggers.values());
}
}
- private Trigger createFakeTrigger(String message) {
+ private Trigger createDummyTrigger(String message, String source, int threshold) {
Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+ ConditionChecker checker = new ThresholdChecker(ThresholdChecker.type, threshold);
+ checkers.put(checker.getId(), checker);
List<TriggerAction> actions = new ArrayList<TriggerAction>();
TriggerAction act = new DummyTriggerAction(message);
actions.add(act);
- String expr = "true";
+ String expr = checker.getId() + ".eval()";
Condition triggerCond = new Condition(checkers, expr);
Condition expireCond = new Condition(checkers, expr);
- Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", "tester", triggerCond, expireCond, actions);
+ Trigger fakeTrigger = new Trigger(DateTime.now().getMillis(), DateTime.now().getMillis(), "azkaban", source, triggerCond, expireCond, actions);
+ fakeTrigger.setResetOnTrigger(true);
+ fakeTrigger.setResetOnExpire(true);
return fakeTrigger;
}
- public class MockCheckerLoader extends CheckerTypeLoader{
-
- @Override
- public void init(Props props) {
- checkerToClass.put(ThresholdChecker.type, ThresholdChecker.class);
- }
- }
-
- public class MockActionLoader extends ActionTypeLoader {
- @Override
- public void init(Props props) {
- actionToClass.put(DummyTriggerAction.type, DummyTriggerAction.class);
- }
- }
+// public class MockCheckerLoader extends CheckerTypeLoader{
+//
+// @Override
+// public void init(Props props) {
+// checkerToClass.put(ThresholdChecker.type, ThresholdChecker.class);
+// }
+// }
+//
+// public class MockActionLoader extends ActionTypeLoader {
+// @Override
+// public void init(Props props) {
+// actionToClass.put(DummyTriggerAction.type, DummyTriggerAction.class);
+// }
+// }
}
diff --git a/unit/java/azkaban/test/trigger/TriggerTest.java b/unit/java/azkaban/test/trigger/TriggerTest.java
index 1d7d81d..6235c12 100644
--- a/unit/java/azkaban/test/trigger/TriggerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerTest.java
@@ -43,7 +43,7 @@ public class TriggerTest {
}
@Test
- public void jsonConversionTest() throws TriggerException, IOException {
+ public void jsonConversionTest() throws Exception {
DateTime now = DateTime.now();
ConditionChecker checker1 = new BasicTimeChecker("timeChecker1", now, now.getZone(), true, true, Utils.parsePeriodString("1h"));
Map<String, ConditionChecker> checkers1 = new HashMap<String, ConditionChecker>();