Details
diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 66bc178..7648a05 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -30,9 +30,6 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
public TriggerBasedScheduleLoader(TriggerManager triggerManager, String triggerSource) {
this.triggerManager = triggerManager;
this.triggerSource = triggerSource;
-// // need to init the action types and condition checker types
-// ExecuteFlowAction.setExecutorManager(executorManager);
-// ExecuteFlowAction.setProjectManager(projectManager);
}
private Trigger scheduleToTrigger(Schedule s) {
@@ -52,15 +49,6 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
List<TriggerAction> actions = new ArrayList<TriggerAction>();
ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
actions.add(executeAct);
-// List<SlaOption> slaOptions = s.getSlaOptions();
-// if(slaOptions != null && slaOptions.size() > 0) {
-// // insert a trigger to keep watching that execution
-// for(SlaOption sla : slaOptions) {
-// // need to create triggers for each sla
-// SlaChecker slaChecker = new SlaChecker("slaChecker", sla, executeAct.getId());
-//
-// }
-// }
return actions;
}
@@ -178,9 +166,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
@Override
public void updateNextExecTime(Schedule s)
throws ScheduleManagerException {
-// Trigger t = triggersLocalCopy.get(s.getScheduleId());
-// BasicTimeChecker ck = (BasicTimeChecker) t.getTriggerCondition().getCheckers().values().toArray()[0];
-// s.setNextExecTime(ck.getNextCheckTime().getMillis());
+
}
@Override
diff --git a/src/java/azkaban/trigger/builtin/SlaAlertAction.java b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
index 1f6eb00..f340497 100644
--- a/src/java/azkaban/trigger/builtin/SlaAlertAction.java
+++ b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -166,7 +166,7 @@ public class SlaAlertAction implements TriggerAction{
@Override
public String getDescription() {
- return type + " with " + slaOption.toString();
+ return type + " for " + execId + " with " + slaOption.toString();
}
}
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index c52cf58..6f88fd6 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -53,7 +53,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
private ExecutorManagerEventListener listener = new ExecutorManagerEventListener();
- private Object syncObj = new Object();
+ private final Object syncObj = new Object();
private String scannerStage = "";
@@ -85,7 +85,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
@Override
public void start() throws TriggerManagerException{
- try{
+ try {
// expect loader to return valid triggers
List<Trigger> triggers = triggerLoader.loadTriggers();
for(Trigger t : triggers) {
@@ -185,7 +185,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());
justFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
- this.scannerInterval = scannerInterval;;
+ this.scannerInterval = scannerInterval;
}
public void shutdown() {
@@ -216,12 +216,12 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
//while(stillAlive.get()) {
while(!shutdown) {
synchronized (syncObj) {
- try{
+ try {
lastRunnerThreadCheckTime = System.currentTimeMillis();
scannerStage = "Ready to start a new scan cycle at " + lastRunnerThreadCheckTime;
- try{
+ try {
checkAllTriggers();
justFinishedFlows.clear();
} catch(Exception e) {
@@ -231,11 +231,11 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
t.printStackTrace();
logger.error(t.getMessage());
}
-
+
scannerStage = "Done flipping all triggers.";
runnerThreadIdleTime = scannerInterval - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
-
+
if(runnerThreadIdleTime < 0) {
logger.error("Trigger manager thread " + this.getName() + " is too busy!");
} else {
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 6cc913e..0f8750b 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -145,14 +145,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
SlaOption sla;
try {
sla = parseSlaSetting(settings.get(set));
- sla.getInfo().put(SlaOption.INFO_FLOW_NAME, sched.getFlowName());
- sla.getInfo().put(SlaOption.INFO_EMAIL_LIST, slaEmails);
}
catch (Exception e) {
throw new ServletException(e);
}
if(sla != null) {
- sla.getInfo().put("SlaEmails", slaEmails);
+ sla.getInfo().put(SlaOption.INFO_FLOW_NAME, sched.getFlowName());
+ sla.getInfo().put(SlaOption.INFO_EMAIL_LIST, slaEmails);
slaOptions.add(sla);
}
}
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 6c1246d..92c1dee 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -23,6 +23,7 @@ public class MockExecutorLoader implements ExecutorLoader {
HashMap<Integer, ExecutionReference> refs = new HashMap<Integer, ExecutionReference>();
int flowUpdateCount = 0;
HashMap<String, Integer> jobUpdateCount = new HashMap<String,Integer>();
+ Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<Integer, Pair<ExecutionReference,ExecutableFlow>>();
@Override
public void uploadExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -38,7 +39,7 @@ public class MockExecutorLoader implements ExecutorLoader {
@Override
public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() throws ExecutorManagerException {
- return null;
+ return activeFlows;
}
@Override
diff --git a/unit/java/azkaban/test/trigger/MockTriggerLoader.java b/unit/java/azkaban/test/trigger/MockTriggerLoader.java
new file mode 100644
index 0000000..67ef5c7
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/MockTriggerLoader.java
@@ -0,0 +1,53 @@
+package azkaban.test.trigger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
+
+public class MockTriggerLoader implements TriggerLoader {
+
+ Map<Integer, Trigger> triggers = new HashMap<Integer, Trigger>();
+ int triggerCount = 0;
+
+ @Override
+ public synchronized void addTrigger(Trigger t) throws TriggerLoaderException {
+ t.setTriggerId(triggerCount);
+ t.setLastModifyTime(System.currentTimeMillis());
+ triggers.put(t.getTriggerId(), t);
+ triggerCount++;
+ }
+
+ @Override
+ public synchronized void removeTrigger(Trigger s) throws TriggerLoaderException {
+ triggers.remove(s);
+ }
+
+ @Override
+ public synchronized void updateTrigger(Trigger t) throws TriggerLoaderException {
+ t.setLastModifyTime(System.currentTimeMillis());
+ triggers.put(t.getTriggerId(), t);
+ }
+
+ @Override
+ public synchronized List<Trigger> loadTriggers() throws TriggerLoaderException {
+ return new ArrayList<Trigger>(triggers.values());
+ }
+
+ @Override
+ public synchronized Trigger loadTrigger(int triggerId) throws TriggerLoaderException {
+ return triggers.get(triggerId);
+ }
+
+ @Override
+ public List<Trigger> getUpdatedTriggers(long lastUpdateTime)
+ throws TriggerLoaderException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java b/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java
new file mode 100644
index 0000000..02c7cc1
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java
@@ -0,0 +1,186 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.test.execapp.MockExecutorLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.utils.Props;
+
+public class TriggerManagerDeadlockTest {
+
+ TriggerLoader loader;
+ TriggerManager triggerManager;
+ ExecutorLoader execLoader;
+
+ @Before
+ public void setup() throws ExecutorManagerException, TriggerManagerException {
+ loader = new MockTriggerLoader();
+ Props props = new Props();
+ props.put("trigger.scan.interval", 1000);
+ props.put("executor.port", 12321);
+ execLoader = new MockExecutorLoader();
+ Map<String, Alerter> alerters = new HashMap<String, Alerter>();
+ ExecutorManager executorManager = new ExecutorManager(props, execLoader, alerters);
+ triggerManager = new TriggerManager(props, loader, executorManager);
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+ @Test
+ public void deadlockTest() throws TriggerLoaderException, TriggerManagerException {
+ // this should well saturate it
+ for(int i = 0; i < 1000; i++) {
+ Trigger t = createSelfRegenTrigger();
+ loader.addTrigger(t);
+ }
+ // keep going and add more
+ for(int i = 0; i < 10000; i++) {
+ Trigger d = createDummyTrigger();
+ triggerManager.insertTrigger(d);
+ triggerManager.removeTrigger(d);
+ }
+
+ System.out.println("No dead lock.");
+ }
+
+ public class AlwaysOnChecker implements ConditionChecker {
+
+ public static final String type = "AlwaysOnChecker";
+
+ private final String id;
+ private final Boolean alwaysOn;
+
+ public AlwaysOnChecker(String id, Boolean alwaysOn) {
+ this.id = id;
+ this.alwaysOn = alwaysOn;
+ }
+
+ @Override
+ public Object eval() {
+ // TODO Auto-generated method stub
+ return alwaysOn;
+ }
+
+ @Override
+ public Object getNum() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public String getType() {
+ // TODO Auto-generated method stub
+ return type;
+ }
+
+ @Override
+ public ConditionChecker fromJson(Object obj) throws Exception {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object toJson() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void stopChecker() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setContext(Map<String, Object> context) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public long getNextCheckTime() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ }
+
+ private Trigger createSelfRegenTrigger() {
+ ConditionChecker alwaysOnChecker = new AlwaysOnChecker("alwaysOn", Boolean.TRUE);
+ String triggerExpr = alwaysOnChecker.getId() + ".eval()";
+ Map<String, ConditionChecker> triggerCheckers = new HashMap<String, ConditionChecker>();
+ triggerCheckers.put(alwaysOnChecker.getId(), alwaysOnChecker);
+ Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+
+ TriggerAction triggerAct = new CreateTriggerAction("dummyTrigger", createDummyTrigger());
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ actions.add(triggerAct);
+
+ ConditionChecker alwaysOffChecker = new AlwaysOnChecker("alwaysOff", Boolean.FALSE);
+ String expireExpr = alwaysOffChecker.getId() + ".eval()";
+ Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
+ expireCheckers.put(alwaysOffChecker.getId(), alwaysOffChecker);
+ Condition expireCond = new Condition(expireCheckers, expireExpr);
+
+ Trigger t = new Trigger("azkaban", "azkabanTest", triggerCond, expireCond, actions);
+ return t;
+ }
+
+ private Trigger createDummyTrigger() {
+ ConditionChecker alwaysOnChecker = new AlwaysOnChecker("alwaysOn", Boolean.TRUE);
+ String triggerExpr = alwaysOnChecker.getId() + ".eval()";
+ Map<String, ConditionChecker> triggerCheckers = new HashMap<String, ConditionChecker>();
+ triggerCheckers.put(alwaysOnChecker.getId(), alwaysOnChecker);
+ Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+
+ TriggerAction triggerAct = new DummyTriggerAction("howdy!");
+ List<TriggerAction> actions = new ArrayList<TriggerAction>();
+ actions.add(triggerAct);
+
+ ConditionChecker alwaysOffChecker = new AlwaysOnChecker("alwaysOff", Boolean.FALSE);
+ String expireExpr = alwaysOffChecker.getId() + ".eval()";
+ Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
+ expireCheckers.put(alwaysOffChecker.getId(), alwaysOffChecker);
+ Condition expireCond = new Condition(expireCheckers, expireExpr);
+
+ Trigger t = new Trigger("azkaban", "azkabanTest", triggerCond, expireCond, actions);
+ return t;
+ }
+
+}