azkaban-developers

Details

diff --git a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
index 66bc178..7648a05 100644
--- a/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
+++ b/src/java/azkaban/scheduler/TriggerBasedScheduleLoader.java
@@ -30,9 +30,6 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	public TriggerBasedScheduleLoader(TriggerManager triggerManager, String triggerSource) {
 		this.triggerManager = triggerManager;
 		this.triggerSource = triggerSource;
-//		// need to init the action types and condition checker types 
-//		ExecuteFlowAction.setExecutorManager(executorManager);
-//		ExecuteFlowAction.setProjectManager(projectManager);
 	}
 	
 	private Trigger scheduleToTrigger(Schedule s) {
@@ -52,15 +49,6 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 		List<TriggerAction> actions = new ArrayList<TriggerAction>();
 		ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
 		actions.add(executeAct);
-//		List<SlaOption> slaOptions = s.getSlaOptions();
-//		if(slaOptions != null && slaOptions.size() > 0) {
-//			// insert a trigger to keep watching that execution
-//			for(SlaOption sla : slaOptions) {
-//				// need to create triggers for each sla
-//				SlaChecker slaChecker = new SlaChecker("slaChecker", sla, executeAct.getId());
-//				
-//			}
-//		}
 		
 		return actions;
 	}
@@ -178,9 +166,7 @@ public class TriggerBasedScheduleLoader implements ScheduleLoader {
 	@Override
 	public void updateNextExecTime(Schedule s)
 			throws ScheduleManagerException {
-//		Trigger t = triggersLocalCopy.get(s.getScheduleId());
-//		BasicTimeChecker ck = (BasicTimeChecker) t.getTriggerCondition().getCheckers().values().toArray()[0];
-//		s.setNextExecTime(ck.getNextCheckTime().getMillis());
+
 	}
 
 	@Override
diff --git a/src/java/azkaban/trigger/builtin/SlaAlertAction.java b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
index 1f6eb00..f340497 100644
--- a/src/java/azkaban/trigger/builtin/SlaAlertAction.java
+++ b/src/java/azkaban/trigger/builtin/SlaAlertAction.java
@@ -166,7 +166,7 @@ public class SlaAlertAction implements TriggerAction{
 
 	@Override
 	public String getDescription() {
-		return type + " with " + slaOption.toString();
+		return type + " for " + execId + " with " + slaOption.toString();
 	}
 
 }
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index c52cf58..6f88fd6 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -53,7 +53,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 	
 	private ExecutorManagerEventListener listener = new ExecutorManagerEventListener();
 	
-	private Object syncObj = new Object();
+	private final Object syncObj = new Object();
 	
 	private String scannerStage = "";
 	
@@ -85,7 +85,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 	@Override
 	public void start() throws TriggerManagerException{
 		
-		try{
+		try {
 			// expect loader to return valid triggers
 			List<Trigger> triggers = triggerLoader.loadTriggers();
 			for(Trigger t : triggers) {
@@ -185,7 +185,7 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 			triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());
 			justFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
 			this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
-			this.scannerInterval = scannerInterval;;
+			this.scannerInterval = scannerInterval;
 		}
 
 		public void shutdown() {
@@ -216,12 +216,12 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 			//while(stillAlive.get()) {
 			while(!shutdown) {
 				synchronized (syncObj) {
-					try{
+					try {
 						lastRunnerThreadCheckTime = System.currentTimeMillis();
 						
 						scannerStage = "Ready to start a new scan cycle at " + lastRunnerThreadCheckTime;
 						
-						try{
+						try {
 							checkAllTriggers();
 							justFinishedFlows.clear();
 						} catch(Exception e) {
@@ -231,11 +231,11 @@ public class TriggerManager extends EventHandler implements TriggerManagerAdapte
 							t.printStackTrace();
 							logger.error(t.getMessage());
 						}
-						
+					
 						scannerStage = "Done flipping all triggers.";
 						
 						runnerThreadIdleTime = scannerInterval - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
-
+	
 						if(runnerThreadIdleTime < 0) {
 							logger.error("Trigger manager thread " + this.getName() + " is too busy!");
 						} else {
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 6cc913e..0f8750b 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -145,14 +145,13 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 				SlaOption sla;
 				try {
 				sla = parseSlaSetting(settings.get(set));
-				sla.getInfo().put(SlaOption.INFO_FLOW_NAME, sched.getFlowName());
-				sla.getInfo().put(SlaOption.INFO_EMAIL_LIST, slaEmails);
 				}
 				catch (Exception e) {
 					throw new ServletException(e);
 				}
 				if(sla != null) {
-					sla.getInfo().put("SlaEmails", slaEmails);
+					sla.getInfo().put(SlaOption.INFO_FLOW_NAME, sched.getFlowName());
+					sla.getInfo().put(SlaOption.INFO_EMAIL_LIST, slaEmails);
 					slaOptions.add(sla);
 				}
 			}
diff --git a/unit/java/azkaban/test/execapp/MockExecutorLoader.java b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
index 6c1246d..92c1dee 100644
--- a/unit/java/azkaban/test/execapp/MockExecutorLoader.java
+++ b/unit/java/azkaban/test/execapp/MockExecutorLoader.java
@@ -23,6 +23,7 @@ public class MockExecutorLoader implements ExecutorLoader {
 	HashMap<Integer, ExecutionReference> refs = new HashMap<Integer, ExecutionReference>();
 	int flowUpdateCount = 0;
 	HashMap<String, Integer> jobUpdateCount = new HashMap<String,Integer>();
+	Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<Integer, Pair<ExecutionReference,ExecutableFlow>>();
 	
 	@Override
 	public void uploadExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -38,7 +39,7 @@ public class MockExecutorLoader implements ExecutorLoader {
 
 	@Override
 	public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() throws ExecutorManagerException {
-		return null;
+		return activeFlows;
 	}
 
 	@Override
diff --git a/unit/java/azkaban/test/trigger/MockTriggerLoader.java b/unit/java/azkaban/test/trigger/MockTriggerLoader.java
new file mode 100644
index 0000000..67ef5c7
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/MockTriggerLoader.java
@@ -0,0 +1,53 @@
+package azkaban.test.trigger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
+
+public class MockTriggerLoader implements TriggerLoader {
+
+	Map<Integer, Trigger> triggers = new HashMap<Integer, Trigger>();
+	int triggerCount = 0;
+	
+	@Override
+	public synchronized void addTrigger(Trigger t) throws TriggerLoaderException {
+		t.setTriggerId(triggerCount);
+		t.setLastModifyTime(System.currentTimeMillis());
+		triggers.put(t.getTriggerId(), t);
+		triggerCount++;
+	}
+
+	@Override
+	public synchronized void removeTrigger(Trigger s) throws TriggerLoaderException {
+		triggers.remove(s);
+	}
+
+	@Override
+	public synchronized void updateTrigger(Trigger t) throws TriggerLoaderException {
+		t.setLastModifyTime(System.currentTimeMillis());
+		triggers.put(t.getTriggerId(), t);
+	}
+
+	@Override
+	public synchronized List<Trigger> loadTriggers() throws TriggerLoaderException {
+		return new ArrayList<Trigger>(triggers.values());
+	}
+
+	@Override
+	public synchronized Trigger loadTrigger(int triggerId) throws TriggerLoaderException {
+		return triggers.get(triggerId);
+	}
+
+	@Override
+	public List<Trigger> getUpdatedTriggers(long lastUpdateTime)
+			throws TriggerLoaderException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+}
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java b/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java
new file mode 100644
index 0000000..02c7cc1
--- /dev/null
+++ b/unit/java/azkaban/test/trigger/TriggerManagerDeadlockTest.java
@@ -0,0 +1,186 @@
+package azkaban.test.trigger;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.alert.Alerter;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.test.execapp.MockExecutorLoader;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.TriggerLoaderException;
+import azkaban.trigger.TriggerManager;
+import azkaban.trigger.TriggerManagerException;
+import azkaban.trigger.builtin.CreateTriggerAction;
+import azkaban.utils.Props;
+
+public class TriggerManagerDeadlockTest {
+	
+	TriggerLoader loader;
+	TriggerManager triggerManager;
+	ExecutorLoader execLoader;
+	
+	@Before
+	public void setup() throws ExecutorManagerException, TriggerManagerException {
+		loader = new MockTriggerLoader();
+		Props props = new Props();
+		props.put("trigger.scan.interval", 1000);
+		props.put("executor.port", 12321);
+		execLoader = new MockExecutorLoader();
+		Map<String, Alerter> alerters = new HashMap<String, Alerter>();
+		ExecutorManager executorManager = new ExecutorManager(props, execLoader, alerters);
+		triggerManager = new TriggerManager(props, loader, executorManager);
+	}
+
+	@After
+	public void tearDown() {
+		
+	}
+	
+	@Test
+	public void deadlockTest() throws TriggerLoaderException, TriggerManagerException {
+		// this should well saturate it
+		for(int i = 0; i < 1000; i++) {
+			Trigger t = createSelfRegenTrigger();
+			loader.addTrigger(t);
+		}
+		// keep going and add more
+		for(int i = 0; i < 10000; i++) {
+			Trigger d = createDummyTrigger();
+			triggerManager.insertTrigger(d);
+			triggerManager.removeTrigger(d);
+		}
+		
+		System.out.println("No dead lock.");
+	}
+	
+	public class AlwaysOnChecker implements ConditionChecker {
+
+		public static final String type = "AlwaysOnChecker";
+		
+		private final String id;
+		private final Boolean alwaysOn;
+		
+		public AlwaysOnChecker(String id, Boolean alwaysOn) {
+			this.id = id;
+			this.alwaysOn = alwaysOn;
+		}
+		
+		@Override
+		public Object eval() {
+			// TODO Auto-generated method stub
+			return alwaysOn;
+		}
+
+		@Override
+		public Object getNum() {
+			// TODO Auto-generated method stub
+			return null;
+		}
+
+		@Override
+		public void reset() {
+			// TODO Auto-generated method stub
+			
+		}
+
+		@Override
+		public String getId() {
+			return id;
+		}
+
+		@Override
+		public String getType() {
+			// TODO Auto-generated method stub
+			return type;
+		}
+
+		@Override
+		public ConditionChecker fromJson(Object obj) throws Exception {
+			// TODO Auto-generated method stub
+			return null;
+		}
+
+		@Override
+		public Object toJson() {
+			// TODO Auto-generated method stub
+			return null;
+		}
+
+		@Override
+		public void stopChecker() {
+			// TODO Auto-generated method stub
+			
+		}
+
+		@Override
+		public void setContext(Map<String, Object> context) {
+			// TODO Auto-generated method stub
+			
+		}
+
+		@Override
+		public long getNextCheckTime() {
+			// TODO Auto-generated method stub
+			return 0;
+		}
+		
+	}
+	
+	private Trigger createSelfRegenTrigger() {
+		ConditionChecker alwaysOnChecker = new AlwaysOnChecker("alwaysOn", Boolean.TRUE);
+		String triggerExpr = alwaysOnChecker.getId() + ".eval()";
+		Map<String, ConditionChecker> triggerCheckers = new HashMap<String, ConditionChecker>();
+		triggerCheckers.put(alwaysOnChecker.getId(), alwaysOnChecker);
+		Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+		
+		TriggerAction triggerAct = new CreateTriggerAction("dummyTrigger", createDummyTrigger());
+		List<TriggerAction> actions = new ArrayList<TriggerAction>();
+		actions.add(triggerAct);
+		
+		ConditionChecker alwaysOffChecker = new AlwaysOnChecker("alwaysOff", Boolean.FALSE);
+		String expireExpr = alwaysOffChecker.getId() + ".eval()";
+		Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
+		expireCheckers.put(alwaysOffChecker.getId(), alwaysOffChecker);
+		Condition expireCond = new Condition(expireCheckers, expireExpr);
+			
+		Trigger t = new Trigger("azkaban", "azkabanTest", triggerCond, expireCond, actions);
+		return t;
+	}
+
+	private Trigger createDummyTrigger() {
+		ConditionChecker alwaysOnChecker = new AlwaysOnChecker("alwaysOn", Boolean.TRUE);
+		String triggerExpr = alwaysOnChecker.getId() + ".eval()";
+		Map<String, ConditionChecker> triggerCheckers = new HashMap<String, ConditionChecker>();
+		triggerCheckers.put(alwaysOnChecker.getId(), alwaysOnChecker);
+		Condition triggerCond = new Condition(triggerCheckers, triggerExpr);
+		
+		TriggerAction triggerAct = new DummyTriggerAction("howdy!");
+		List<TriggerAction> actions = new ArrayList<TriggerAction>();
+		actions.add(triggerAct);
+		
+		ConditionChecker alwaysOffChecker = new AlwaysOnChecker("alwaysOff", Boolean.FALSE);
+		String expireExpr = alwaysOffChecker.getId() + ".eval()";
+		Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
+		expireCheckers.put(alwaysOffChecker.getId(), alwaysOffChecker);
+		Condition expireCond = new Condition(expireCheckers, expireExpr);
+
+		Trigger t = new Trigger("azkaban", "azkabanTest", triggerCond, expireCond, actions);
+		return t;
+	}
+	
+}