azkaban-uncached

bug fix to stop sla triggers being abused by users setting too

11/12/2013 8:52:58 PM

Details

diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index fd8a16c..bf87720 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -41,6 +41,9 @@ import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 
 import azkaban.alert.Alerter;
+import azkaban.execapp.event.Event;
+import azkaban.execapp.event.Event.Type;
+import azkaban.execapp.event.EventHandler;
 import azkaban.project.Project;
 import azkaban.scheduler.ScheduleStatisticManager;
 import azkaban.utils.FileIOUtils.JobMetaData;
@@ -53,7 +56,7 @@ import azkaban.utils.Props;
  * Executor manager used to manage the client side job.
  *
  */
-public class ExecutorManager implements ExecutorManagerAdapter {
+public class ExecutorManager extends EventHandler implements ExecutorManagerAdapter {
 	private static Logger logger = Logger.getLogger(ExecutorManager.class);
 	private ExecutorLoader executorLoader;
 	private String executorHost;
@@ -723,6 +726,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 							if(flow.getScheduleId() >= 0 && flow.getStatus() == Status.SUCCEEDED){
 								ScheduleStatisticManager.invalidateCache(flow.getScheduleId(), cacheDir);
 							}
+							fireEventListeners(Event.create(flow, Type.FLOW_FINISHED));
 							recentlyFinished.put(flow.getExecutionId(), flow);
 						}
 						
@@ -789,6 +793,7 @@ public class ExecutorManager implements ExecutorManagerAdapter {
 			
 			updaterStage = "finalizing flow " + execId + " cleaning from memory";
 			runningFlows.remove(execId);
+			fireEventListeners(Event.create(dsFlow, Type.FLOW_FINISHED));
 			recentlyFinished.put(execId, dsFlow);
 
 		} catch (ExecutorManagerException e) {
diff --git a/src/java/azkaban/executor/ExecutorManagerAdapter.java b/src/java/azkaban/executor/ExecutorManagerAdapter.java
index 85b55df..cc81589 100644
--- a/src/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/src/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -10,7 +10,7 @@ import azkaban.project.Project;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 
-public interface ExecutorManagerAdapter {
+public interface ExecutorManagerAdapter{
 	
 	public static final String LOCAL_MODE = "local";
 	public static final String REMOTE_MODE = "remote";
diff --git a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
index 1cbbd61..2f3a5f4 100644
--- a/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
+++ b/src/java/azkaban/trigger/builtin/ExecuteFlowAction.java
@@ -247,15 +247,15 @@ public class ExecuteFlowAction implements TriggerAction {
 			int execId = exflow.getExecutionId();
 			for(SlaOption sla : slaOptions) {
 				logger.info("Adding sla trigger " + sla.toString() + " to execution " + execId);
-				SlaChecker slaChecker = new SlaChecker("slaChecker", sla, execId);
+				SlaChecker slaFailChecker = new SlaChecker("slaFailChecker", sla, execId);
 				Map<String, ConditionChecker> slaCheckers = new HashMap<String, ConditionChecker>();
-				slaCheckers.put(slaChecker.getId(), slaChecker);
-				Condition triggerCond = new Condition(slaCheckers, slaChecker.getId() + ".eval()");
-				// if whole flow finish before violate sla, just abort
-				ExecutionChecker execChecker = new ExecutionChecker("execChecker", execId, null, Status.SUCCEEDED);
+				slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
+				Condition triggerCond = new Condition(slaCheckers, slaFailChecker.getId() + ".isSlaFailed()");
+				// if whole flow finish before violate sla, just expire
+				SlaChecker slaPassChecker = new SlaChecker("slaPassChecker", sla, execId);
 				Map<String, ConditionChecker> expireCheckers = new HashMap<String, ConditionChecker>();
-				expireCheckers.put(execChecker.getId(), execChecker);
-				Condition expireCond = new Condition(expireCheckers, execChecker.getId() + ".eval()");
+				expireCheckers.put(slaPassChecker.getId(), slaPassChecker);
+				Condition expireCond = new Condition(expireCheckers, slaPassChecker.getId() + ".isSlaPassed()");
 				List<TriggerAction> actions = new ArrayList<TriggerAction>();
 				List<String> slaActions = sla.getActions();
 				for(String act : slaActions) {
@@ -268,6 +268,7 @@ public class ExecuteFlowAction implements TriggerAction {
 					}
 				}
 				Trigger slaTrigger = new Trigger("azkaban_sla", "azkaban", triggerCond, expireCond, actions);
+				slaTrigger.getInfo().put("monitored.finished.execution", String.valueOf(execId));
 				slaTrigger.setResetOnTrigger(false);
 				slaTrigger.setResetOnExpire(false);
 				logger.info("Ready to put in the sla trigger");
diff --git a/src/java/azkaban/trigger/builtin/SlaChecker.java b/src/java/azkaban/trigger/builtin/SlaChecker.java
index eccb5b6..f8897a5 100644
--- a/src/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/src/java/azkaban/trigger/builtin/SlaChecker.java
@@ -54,99 +54,136 @@ public class SlaChecker implements ConditionChecker{
 		executorManager = em;
 	}
 	
-	private Boolean violateSla(ExecutableFlow flow) {
+	private Boolean isSlaMissed(ExecutableFlow flow) {
 		String type = slaOption.getType();
-		logger.info("Checking for " + flow.getExecutionId() + " with sla " + type);
 		logger.info("flow is " + flow.getStatus());
 		if(flow.getStartTime() < 0) {
 			return Boolean.FALSE;
 		}
+		Status status;
 		if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
-			ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
-			DateTime startTime = new DateTime(flow.getStartTime());
-			DateTime checkTime = startTime.plus(duration);
-			this.checkTime = checkTime.getMillis();
-			if(checkTime.isBeforeNow()) {
-				Status status = flow.getStatus();
-				if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
-					return Boolean.FALSE;
-				} else {
-					return Boolean.TRUE;
-				}
+			if(checkTime < flow.getStartTime()) {
+				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+				DateTime startTime = new DateTime(flow.getStartTime());
+				DateTime nextCheckTime = startTime.plus(duration);
+				this.checkTime = nextCheckTime.getMillis();
+			}
+			status = flow.getStatus();
+			if(checkTime < DateTime.now().getMillis()) {
+				return !isFlowFinished(status);
 			}
 		} else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
-			ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
-			DateTime startTime = new DateTime(flow.getStartTime());
-			DateTime checkTime = startTime.plus(duration);
-			this.checkTime = checkTime.getMillis();
-			if(checkTime.isBeforeNow()) {
-				Status status = flow.getStatus();
-				if(status.equals(Status.SUCCEEDED)) {
-					return Boolean.FALSE;
-				} else {
-					return Boolean.TRUE;
-				}
+			if(checkTime < flow.getStartTime()) {
+				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+				DateTime startTime = new DateTime(flow.getStartTime());
+				DateTime nextCheckTime = startTime.plus(duration);
+				this.checkTime = nextCheckTime.getMillis();
+			}
+			status = flow.getStatus();
+			if(checkTime < DateTime.now().getMillis()) {
+				return !isFlowSucceeded(status);
+			} else {
+				return status.equals(Status.FAILED) || status.equals(Status.KILLED);
 			}
 		} else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
 			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME); 
 			ExecutableNode node = flow.getExecutableNode(jobName);
-			if(node.getStartTime() > 0) {
+			if(node.getStartTime() < 0) {
+				return Boolean.FALSE;
+			}
+			if(checkTime < node.getStartTime()) {
 				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
 				DateTime startTime = new DateTime(node.getStartTime());
-				DateTime checkTime = startTime.plus(duration);
-				this.checkTime = checkTime.getMillis();
-				if(checkTime.isBeforeNow()) {
-					Status status = node.getStatus();
-					if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
-						return Boolean.FALSE;
-					} else {
-						return Boolean.TRUE;
-					}
-				}
+				DateTime nextCheckTime = startTime.plus(duration);
+				this.checkTime = nextCheckTime.getMillis();
+			}
+			status = node.getStatus();
+			if(checkTime < DateTime.now().getMillis()) {
+				return !isJobFinished(status);
 			}
 		} else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
 			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME); 
 			ExecutableNode node = flow.getExecutableNode(jobName);
-			if(node.getStartTime() > 0) {
+			if(node.getStartTime() < 0) {
+				return Boolean.FALSE;
+			}
+			if(checkTime < node.getStartTime()) {
+				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+				DateTime startTime = new DateTime(node.getStartTime());
+				DateTime nextCheckTime = startTime.plus(duration);
+				this.checkTime = nextCheckTime.getMillis();
+			}
+			status = node.getStatus();
+			if(checkTime < DateTime.now().getMillis()) {
+				return !isJobFinished(status);
+			} else {
+				return status.equals(Status.FAILED) || status.equals(Status.KILLED);
+			}
+		} 
+		return Boolean.FALSE;
+	}
+	
+	private Boolean isSlaGood(ExecutableFlow flow) {
+		String type = slaOption.getType();
+		logger.info("flow is " + flow.getStatus());
+		if(flow.getStartTime() < 0) {
+			return Boolean.FALSE;
+		}
+		Status status;
+		if(type.equals(SlaOption.TYPE_FLOW_FINISH)) {
+			if(checkTime < flow.getStartTime()) {
+				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+				DateTime startTime = new DateTime(flow.getStartTime());
+				DateTime nextCheckTime = startTime.plus(duration);
+				this.checkTime = nextCheckTime.getMillis();
+			}
+			status = flow.getStatus();
+			return isFlowFinished(status);
+		} else if(type.equals(SlaOption.TYPE_FLOW_SUCCEED)) {
+			if(checkTime < flow.getStartTime()) {
+				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+				DateTime startTime = new DateTime(flow.getStartTime());
+				DateTime nextCheckTime = startTime.plus(duration);
+				this.checkTime = nextCheckTime.getMillis();
+			}
+			status = flow.getStatus();
+			return isFlowSucceeded(status);
+		} else if(type.equals(SlaOption.TYPE_JOB_FINISH)) {
+			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME); 
+			ExecutableNode node = flow.getExecutableNode(jobName);
+			if(node.getStartTime() < 0) {
+				return Boolean.FALSE;
+			}
+			if(checkTime < node.getStartTime()) {
+				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
+				DateTime startTime = new DateTime(node.getStartTime());
+				DateTime nextCheckTime = startTime.plus(duration);
+				this.checkTime = nextCheckTime.getMillis();
+			}
+			status = node.getStatus();
+			return isJobFinished(status);
+		} else if(type.equals(SlaOption.TYPE_JOB_SUCCEED)) {
+			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME); 
+			ExecutableNode node = flow.getExecutableNode(jobName);
+			if(node.getStartTime() < 0) {
+				return Boolean.FALSE;
+			}
+			if(checkTime < node.getStartTime()) {
 				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
 				DateTime startTime = new DateTime(node.getStartTime());
-				DateTime checkTime = startTime.plus(duration);
-				this.checkTime = checkTime.getMillis();
-				if(checkTime.isBeforeNow()) {
-					Status status = node.getStatus();
-					if(status.equals(Status.SUCCEEDED)) {
-						return Boolean.FALSE;
-					} else {
-						return Boolean.TRUE;
-					}
-				}
+				DateTime nextCheckTime = startTime.plus(duration);
+				this.checkTime = nextCheckTime.getMillis();
 			}
+			status = node.getStatus();
+			return isJobSucceeded(status);
 		} 
-//		else if(type.equals(SlaOption.TYPE_JOB_PROGRESS)) {
-//			String jobName = (String) slaOption.getInfo().get(SlaOption.INFO_JOB_NAME); 
-//			float targetProgress = Float.valueOf((String) slaOption.getInfo().get(SlaOption.INFO_PROGRESS_PERCENT));
-//			ExecutableNode node = flow.getExecutableNode(jobName);
-//			if(node.getStartTime() > 0) {
-//				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
-//				DateTime startTime = new DateTime(node.getStartTime());
-//				DateTime checkTime = startTime.plus(duration);
-//				if(checkTime.isBeforeNow()) {
-//					if(node.getProgress() > targetProgress) {
-//						return Boolean.FALSE;
-//					} else {
-//						return Boolean.TRUE;
-//					}
-//				}
-//			} else {
-//				return Boolean.FALSE;
-//			}
-//		}
 		return Boolean.FALSE;
 	}
 	
 	// return true to trigger sla action
 	@Override
 	public Object eval() {
+		logger.info("Checking sla for execution " + execId);
 		ExecutableFlow flow;
 		try {
 			flow = executorManager.getExecutableFlow(execId);
@@ -156,7 +193,35 @@ public class SlaChecker implements ConditionChecker{
 			// something wrong, send out alerts
 			return Boolean.TRUE;
 		}
-		return violateSla(flow);
+		return isSlaMissed(flow);
+	}
+	
+	public Object isSlaFailed() {
+		logger.info("Testing if sla failed for execution " + execId);
+		ExecutableFlow flow;
+		try {
+			flow = executorManager.getExecutableFlow(execId);
+		} catch (ExecutorManagerException e) {
+			logger.error("Can't get executable flow.", e);
+			e.printStackTrace();
+			// something wrong, send out alerts
+			return Boolean.TRUE;
+		}
+		return isSlaMissed(flow);
+	}
+	
+	public Object isSlaPassed() {
+		logger.info("Testing if sla is good for execution " + execId);
+		ExecutableFlow flow;
+		try {
+			flow = executorManager.getExecutableFlow(execId);
+		} catch (ExecutorManagerException e) {
+			logger.error("Can't get executable flow.", e);
+			e.printStackTrace();
+			// something wrong, send out alerts
+			return Boolean.TRUE;
+		}
+		return isSlaGood(flow);
 	}
 
 	@Override
@@ -223,5 +288,28 @@ public class SlaChecker implements ConditionChecker{
 	public long getNextCheckTime() {
 		return checkTime;
 	}
-
+	
+	private boolean isFlowFinished(Status status) {
+		if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
+			return Boolean.TRUE;
+		} else {
+			return Boolean.FALSE;
+		}
+	}
+	
+	private boolean isFlowSucceeded(Status status) {
+		return status.equals(Status.SUCCEEDED);
+	}
+	
+	private boolean isJobFinished(Status status) {
+		if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
+			return Boolean.TRUE;
+		} else {
+			return Boolean.FALSE;
+		}
+	}
+	
+	private boolean isJobSucceeded(Status status) {
+		return status.equals(Status.SUCCEEDED);
+	}
 }
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index 6df5503..1bead36 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -119,7 +119,7 @@ public class Condition {
 	}
 	
 	public boolean isMet() {
-		logger.info("Testing ondition " + expression);
+		logger.info("Testing condition " + expression);
 		return expression.evaluate(context).equals(Boolean.TRUE);
 	}
 	
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index e6e5b9d..496a9b3 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -28,9 +28,20 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.log4j.Logger;
 
+import azkaban.execapp.JobRunner;
+import azkaban.execapp.event.Event;
+import azkaban.execapp.event.EventHandler;
+import azkaban.execapp.event.EventListener;
+import azkaban.execapp.event.Event.Type;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.Status;
+import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.utils.Props;
 
-public class TriggerManager implements TriggerManagerAdapter{
+public class TriggerManager extends EventHandler implements TriggerManagerAdapter{
 	private static Logger logger = Logger.getLogger(TriggerManager.class);
 	public static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
 
@@ -45,9 +56,11 @@ public class TriggerManager implements TriggerManagerAdapter{
 	private long runnerThreadIdleTime = -1;
 	private LocalTriggerJMX jmxStats = new LocalTriggerJMX();
 	
+	private ExecutorManagerEventListener listener = new ExecutorManagerEventListener();
+	
 	private String scannerStage = "";
 	
-	public TriggerManager(Props props, TriggerLoader triggerLoader) throws TriggerManagerException {
+	public TriggerManager(Props props, TriggerLoader triggerLoader, ExecutorManager executorManager) throws TriggerManagerException {
 
 		this.triggerLoader = triggerLoader;
 		
@@ -67,6 +80,8 @@ public class TriggerManager implements TriggerManagerAdapter{
 		Condition.setCheckerLoader(checkerTypeLoader);
 		Trigger.setActionTypeLoader(actionTypeLoader);
 		
+		executorManager.addListener(listener);
+		
 		logger.info("TriggerManager loaded.");
 	}
 
@@ -154,12 +169,14 @@ public class TriggerManager implements TriggerManagerAdapter{
 	
 	private class TriggerScannerThread extends Thread {
 		private BlockingQueue<Trigger> triggers;
+		private Map<Integer, ExecutableFlow> justFinishedFlows;
 		private boolean shutdown = false;
 		//private AtomicBoolean stillAlive = new AtomicBoolean(true);
 		private final long scannerInterval;
 		
 		public TriggerScannerThread(long scannerInterval) {
 			triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());
+			justFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
 			this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
 			this.scannerInterval = scannerInterval;;
 		}
@@ -171,6 +188,10 @@ public class TriggerManager implements TriggerManagerAdapter{
 			this.interrupt();
 		}
 		
+		public synchronized void addJustFinishedFlow(ExecutableFlow flow) {
+			justFinishedFlows.put(flow.getExecutionId(), flow);
+		}
+		
 		public synchronized void addTrigger(Trigger t) {
 			t.updateNextCheckTime();
 			triggers.add(t);
@@ -191,6 +212,7 @@ public class TriggerManager implements TriggerManagerAdapter{
 						
 						try{
 							checkAllTriggers();
+							justFinishedFlows.clear();
 						} catch(Exception e) {
 							e.printStackTrace();
 							logger.error(e.getMessage());
@@ -217,12 +239,27 @@ public class TriggerManager implements TriggerManagerAdapter{
 		
 		private void checkAllTriggers() throws TriggerManagerException {
 			long now = System.currentTimeMillis();
+			
+			// sweep through the rest of them
 			for(Trigger t : triggers) {
 				scannerStage = "Checking for trigger " + t.getTriggerId();
-				if(t.getNextCheckTime() > now) {
+				
+				boolean shouldSkip = true;
+				if(shouldSkip && t.getInfo() != null && t.getInfo().containsKey("monitored.finished.execution")) {
+					int execId = Integer.valueOf((String) t.getInfo().get("monitored.finished.execution"));
+					if(justFinishedFlows.containsKey(execId)) {
+						logger.info("Monitored execution has finished. Checking trigger earlier " + t.getTriggerId());
+						shouldSkip = false;
+					}
+				}
+				if(shouldSkip && t.getNextCheckTime() > now) {
+					shouldSkip = false;
+				}
+
+				if(shouldSkip) {
 					logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
-					continue;
 				}
+				
 				logger.info("Checking trigger " + t.getTriggerId());
 				if(t.getStatus().equals(TriggerStatus.READY)) {
 					if(t.triggerConditionMet()) {
@@ -473,5 +510,19 @@ public class TriggerManager implements TriggerManagerAdapter{
 		actionTypeLoader.registerActionType(name, action);
 	}
 	
+	private class ExecutorManagerEventListener implements EventListener {
+		public ExecutorManagerEventListener() {
+		}
+		
+		@Override
+		public synchronized void handleEvent(Event event) {
+			
+			ExecutableFlow flow = (ExecutableFlow) event.getRunner();
+			if (event.getType() == Type.FLOW_FINISHED) {
+				logger.info("Flow finish event received. " + flow.getExecutionId() );
+				runnerThread.addJustFinishedFlow(flow);
+			}
+		}
+	}
 	
 }
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 7bb0282..9684d85 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -54,7 +54,6 @@ import org.mortbay.thread.QueuedThreadPool;
 import azkaban.alert.Alerter;
 import azkaban.database.AzkabanDatabaseSetup;
 import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxJettyServer;
@@ -143,7 +142,8 @@ public class AzkabanWebServer extends AzkabanServer {
 	private final Server server;
 	private UserManager userManager;
 	private ProjectManager projectManager;
-	private ExecutorManagerAdapter executorManager;
+//	private ExecutorManagerAdapter executorManager;
+	private ExecutorManager executorManager;
 	private ScheduleManager scheduleManager;
 	private TriggerManager triggerManager;
 	private Map<String, Alerter> alerters;
@@ -269,7 +269,7 @@ public class AzkabanWebServer extends AzkabanServer {
 
 	private TriggerManager loadTriggerManager(Props props) throws TriggerManagerException {
 		TriggerLoader loader = new JdbcTriggerLoader(props);
-		return new TriggerManager(props, loader);
+		return new TriggerManager(props, loader, executorManager);
 	}
 	
 	private void loadBuiltinCheckersAndActions() {
@@ -590,7 +590,7 @@ public class AzkabanWebServer extends AzkabanServer {
 	/**
      * 
      */
-	public ExecutorManagerAdapter getExecutorManager() {
+	public ExecutorManager getExecutorManager() {
 		return executorManager;
 	}
 	
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index d9481ec..ea1e6c8 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -646,11 +646,14 @@ var logUpdaterFunction = function() {
 var exNodeClickCallback = function(event) {
 	console.log("Node clicked callback");
 	var jobId = event.currentTarget.jobid;
+	var attempt = event.currentTarget.attempt;
 	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
+	var visualizerURL = contextURL + "/visualizer?execid=" + execId + "&job=" + jobId + "&attempt=" + attempt;
 
 	var menu = [	
 			{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
-			{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}}
+			{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
+			{title: "Open Job Visualizer...", callback: function() {window.open(visualizerURL);}}
 	];
 
 	contextMenuView.show(event, menu);
@@ -659,11 +662,14 @@ var exNodeClickCallback = function(event) {
 var exJobClickCallback = function(event) {
 	console.log("Node clicked callback");
 	var jobId = event.currentTarget.jobid;
+	var attempt = event.currentTarget.attempt;
 	var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowId + "&job=" + jobId;
+	var visualizerURL = contextURL + "/visualizer?execid=" + execId + "&job=" + jobId + "&attempt=" + attempt;
 
 	var menu = [	
 			{title: "Open Job...", callback: function() {window.location.href=requestURL;}},
-			{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}}
+			{title: "Open Job in New Window...", callback: function() {window.open(requestURL);}},
+			{title: "Open Job Visualizer...", callback: function() {window.open(visualizerURL);}}
 	];
 
 	contextMenuView.show(event, menu);
diff --git a/unit/java/azkaban/test/trigger/TriggerManagerTest.java b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
index fbd5da7..ebf1f38 100644
--- a/unit/java/azkaban/test/trigger/TriggerManagerTest.java
+++ b/unit/java/azkaban/test/trigger/TriggerManagerTest.java
@@ -12,6 +12,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import azkaban.executor.ExecutorManager;
 import azkaban.trigger.Condition;
 import azkaban.trigger.ConditionChecker;
 import azkaban.trigger.CheckerTypeLoader;
@@ -33,7 +34,6 @@ public class TriggerManagerTest {
 	public void setup() throws TriggerException, TriggerManagerException {
 		triggerLoader = new MockTriggerLoader();
 		
-		
 	}
 	
 	@After
@@ -47,7 +47,7 @@ public class TriggerManagerTest {
 		
 		Props props = new Props();
 		props.put("trigger.scan.interval", 4000);
-		TriggerManager triggerManager = new TriggerManager(props, triggerLoader);
+		TriggerManager triggerManager = new TriggerManager(props, triggerLoader, null);
 		
 		triggerManager.registerCheckerType(ThresholdChecker.type, ThresholdChecker.class);
 		triggerManager.registerActionType(DummyTriggerAction.type, DummyTriggerAction.class);