azkaban-uncached

fixes to trigger scanners

9/11/2013 7:21:20 PM

Details

diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 9096b2f..fdb7688 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -157,6 +157,7 @@ public class JobRunner extends EventHandler implements Runnable {
 				fileAppender.setMaxFileSize(jobLogChunkSize);
 				jobAppender = fileAppender;
 				logger.addAppender(jobAppender);
+				logger.setAdditivity(false);
 			} catch (IOException e) {
 				flowLogger.error("Could not open log file in " + workingDir + " for job " + node.getJobId(), e);
 			}
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
index 37f52f8..e3acefb 100644
--- a/src/java/azkaban/jmx/JmxExecutorManager.java
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -19,7 +19,7 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
 
 	@Override
 	public String getExecutorThreadState() {
-		return manager.getExecutorThreadState().toString();
+		return manager.getExecutorManagerThreadState().toString();
 	}
 	
 	@Override
@@ -29,12 +29,12 @@ public class JmxExecutorManager implements JmxExecutorManagerMBean {
 
 	@Override
 	public boolean isThreadActive() {
-		return manager.isThreadActive();
+		return manager.isExecutorManagerThreadActive();
 	}
 
 	@Override
 	public Long getLastThreadCheckTime() {
-		return manager.getLastThreadCheckTime();
+		return manager.getLastExecutorManagerThreadCheckTime();
 	}
 	
 	@Override 
diff --git a/src/java/azkaban/jmx/JmxTriggerManager.java b/src/java/azkaban/jmx/JmxTriggerManager.java
index dbee790..e183558 100644
--- a/src/java/azkaban/jmx/JmxTriggerManager.java
+++ b/src/java/azkaban/jmx/JmxTriggerManager.java
@@ -55,6 +55,12 @@ public class JmxTriggerManager implements JmxTriggerManagerMBean {
 	public long getScannerIdleTime() {
 		return jmxStats.getScannerIdleTime();
 	}
+
+	@Override
+	public String getScannerThreadStage() {
+		// TODO Auto-generated method stub
+		return jmxStats.getScannerThreadStage();
+	}
 	
 	
 	
diff --git a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
index 6302885..ecf9c21 100644
--- a/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxTriggerManagerMBean.java
@@ -27,4 +27,7 @@ public interface JmxTriggerManagerMBean {
 	
 	@DisplayName("OPERATION: getScannerIdleTime")
 	public long getScannerIdleTime();
+	
+	@DisplayName("OPERATION: getScannerThreadStage")
+	public String getScannerThreadStage();
 }
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 29dbdeb..878eb28 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -69,9 +69,6 @@ public class ScheduleManager implements TriggerAgent {
 	
 	private ProjectManager projectManager = null;
 	
-	private final boolean useExternalRunner;
-	private final ScheduleRunner runner;
-	
 	// Used for mbeans to query Scheduler status
 //<<<<<<< HEAD
 //	
@@ -88,18 +85,10 @@ public class ScheduleManager implements TriggerAgent {
 	 * @param loader
 	 */
 	public ScheduleManager (ExecutorManagerAdapter executorManager,
-							ScheduleLoader loader,
-							boolean useExternalRunner) 
+							ScheduleLoader loader) 
 	{
 		this.executorManager = executorManager;
 		this.loader = loader;
-		this.useExternalRunner = useExternalRunner;
-		
-		if(!useExternalRunner) {
-			this.runner = new ScheduleRunner();
-		} else {
-			this.runner = null;
-		}
 		
 	}
 	
@@ -126,20 +115,11 @@ public class ScheduleManager implements TriggerAgent {
 			}
 		}
 
-		if(!useExternalRunner) {
-			if(projectManager == null) {
-				throw new ScheduleManagerException("Project Manager must be initialized when using internal schedule runner!");
-			}
-			this.runner.start();
-		}
 	}
 	
 	// only do this when using external runner
 	public synchronized void updateLocal() throws ScheduleManagerException {
-		if(!useExternalRunner) {
-			//throw new ScheduleManagerException("Should not change schedule states when using local runner!");
-			return;
-		}
+
 		List<Schedule> updates = loader.loadUpdatedSchedules();
 		for(Schedule s : updates) {
 			if(s.getStatus().equals(TriggerStatus.EXPIRED.toString())) {
@@ -159,9 +139,7 @@ public class ScheduleManager implements TriggerAgent {
 	 * it again.
 	 */
 	public void shutdown() {
-		if(!useExternalRunner) {
-			this.runner.shutdown();
-		}
+
 	}
 
 	/**
@@ -265,10 +243,7 @@ public class ScheduleManager implements TriggerAgent {
 			// TODO Auto-generated catch block
 			e.printStackTrace();
 		}
-		
-		if(!useExternalRunner) {
-			runner.removeRunnerSchedule(sched);
-		}
+
 		
 	}
 
@@ -347,13 +322,7 @@ public class ScheduleManager implements TriggerAgent {
 		if(scheduleIdentityPairMap.get(s.getScheduleIdentityPair()) != null) {
 			existing = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
 		}
-		if(!useExternalRunner) {
-			if (existing != null) {
-				this.runner.removeRunnerSchedule(existing);
-			}
-			s.updateTime();
-			this.runner.addRunnerSchedule(s);
-		}
+
 		scheduleIDMap.put(s.getScheduleId(), s);
 //		Set<Schedule> schedules = scheduleIdentityPairMap.get(s.getScheduleIdentityPair());
 //		if(schedules == null) {
@@ -404,295 +373,5 @@ public class ScheduleManager implements TriggerAgent {
 		return triggerSource;
 	}
 	
-	/**
-	 * Thread that simply invokes the running of flows when the schedule is
-	 * ready.
-	 * 
-	 * @author Richard Park
-	 * 
-	 */
-	public class ScheduleRunner extends Thread {
-		
-		private long lastCheckTime = -1;
-		private long nextWakupTime = -1;
-		private final PriorityBlockingQueue<Schedule> schedules;
-		private AtomicBoolean stillAlive = new AtomicBoolean(true);
-
-		// Five minute minimum intervals
-		private static final int TIMEOUT_MS = 300000;
-
-		public ScheduleRunner() {
-			schedules = new PriorityBlockingQueue<Schedule>(1,new ScheduleComparator());
-		}
-
-		public void shutdown() {
-			logger.error("Shutting down scheduler thread");
-			stillAlive.set(false);
-			this.interrupt();
-		}
-
-		public long getLastCheckTime() {
-			return lastCheckTime;
-		}
-		
-		public long getNextWakeupTime() {
-			return nextWakupTime;
-		}
-		
-		/**
-		 * Return a list of scheduled flow
-		 * 
-		 * @return
-		 */
-		public synchronized List<Schedule> getRunnerSchedules() {
-			return new ArrayList<Schedule>(schedules);
-		}
-
-		/**
-		 * Adds the flow to the schedule and then interrupts so it will update
-		 * its wait time.
-		 * 
-		 * @param flow
-		 */
-		public synchronized void addRunnerSchedule(Schedule s) {
-			logger.info("Adding " + s + " to schedule runner.");
-			schedules.add(s);
-			// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
-			// System.currentTimeMillis(),
-			// WorkflowAction.SCHEDULE_WORKFLOW,
-			// WorkflowState.NOP,
-			// flow.getId());
-
-			this.interrupt();
-		}
-
-		/**
-		 * Remove scheduled flows. Does not interrupt.
-		 * 
-		 * @param flow
-		 */
-		public synchronized void removeRunnerSchedule(Schedule s) {
-			logger.info("Removing " + s + " from the schedule runner.");
-			schedules.remove(s);
-			// MonitorImpl.getInternalMonitorInterface().workflowEvent(null,
-			// System.currentTimeMillis(),
-			// WorkflowAction.UNSCHEDULE_WORKFLOW,
-			// WorkflowState.NOP,
-			// flow.getId());
-			// Don't need to interrupt, because if this is originally on the top
-			// of the queue,
-			// it'll just skip it.
-		}
-
-		public void run() {
-			while (stillAlive.get()) {
-				synchronized (this) {
-					try {
-						lastCheckTime = System.currentTimeMillis();
-						
-//						runnerStage = "Starting schedule scan.";
-						// TODO clear up the exception handling
-						Schedule s = schedules.peek();
-
-						if (s == null) {
-							// If null, wake up every minute or so to see if
-							// there's something to do. Most likely there will not be.
-							try {
-								logger.info("Nothing scheduled to run. Checking again soon.");
-//								runnerStage = "Waiting for next round scan.";
-								nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
-								this.wait(TIMEOUT_MS);
-							} catch (InterruptedException e) {
-								// interruption should occur when items are added or removed from the queue.
-							}
-						} else {
-							// We've passed the flow execution time, so we will run.
-							if (!(new DateTime(s.getNextExecTime())).isAfterNow()) {
-								// Run flow. The invocation of flows should be quick.
-								Schedule runningSched = schedules.poll();
-
-//								runnerStage = "Ready to run schedule " + runningSched.toString();
-								
-								logger.info("Scheduler ready to run " + runningSched.toString());
-								// Execute the flow here
-								try {
-									Project project = projectManager.getProject(runningSched.getProjectId());
-									if (project == null) {
-										logger.error("Scheduled Project " + runningSched.getProjectId() + " does not exist!");
-										throw new RuntimeException("Error finding the scheduled project. "+ runningSched.getProjectId());
-									}	
-									//TODO It is possible that the project is there, but the flow doesn't exist because upload a version that changes flow structure
-
-									Flow flow = project.getFlow(runningSched.getFlowName());
-									if (flow == null) {
-										logger.error("Flow " + runningSched.getScheduleName() + " cannot be found in project " + project.getName());
-										throw new RuntimeException("Error finding the scheduled flow. " + runningSched.getScheduleName());
-									}
-									
-									// Create ExecutableFlow
-									ExecutableFlow exflow = new ExecutableFlow(flow);
-									System.out.println("ScheduleManager: creating schedule: " +runningSched.getScheduleId());
-									exflow.setScheduleId(runningSched.getScheduleId());
-									exflow.setSubmitUser(runningSched.getSubmitUser());
-									exflow.addAllProxyUsers(project.getProxyUsers());
-									
-									ExecutionOptions flowOptions = runningSched.getExecutionOptions();
-									if(flowOptions == null) {
-										flowOptions = new ExecutionOptions();
-										flowOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
-									}
-									exflow.setExecutionOptions(flowOptions);
-									
-									if (!flowOptions.isFailureEmailsOverridden()) {
-										flowOptions.setFailureEmails(flow.getFailureEmails());
-									}
-									if (!flowOptions.isSuccessEmailsOverridden()) {
-										flowOptions.setSuccessEmails(flow.getSuccessEmails());
-									}
-									
-//									runnerStage = "Submitting flow " + exflow.getFlowId();
-									
-									try {
-										executorManager.submitExecutableFlow(exflow, s.getSubmitUser());
-										logger.info("Scheduler has invoked " + exflow.getExecutionId());
-									} 
-									catch (ExecutorManagerException e) {
-										throw e;
-									}
-									catch (Exception e) {	
-										e.printStackTrace();
-										throw new ScheduleManagerException("Scheduler invoked flow " + exflow.getExecutionId() + " has failed.", e);
-									}
-
-//									SlaOptions slaOptions = runningSched.getSlaOptions();
-//									if(slaOptions != null) {
-//										logger.info("Submitting SLA checkings for " + runningSched.getFlowName());
-//										// submit flow slas
-//										List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
-//										for(SlaSetting set : slaOptions.getSettings()) {
-//											if(set.getId().equals("")) {
-//												DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
-//												slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
-//											}
-//											else {
-//												jobsettings.add(set);
-//											}
-//										}
-//										if(jobsettings.size() > 0) {
-//											slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
-//										}
-//									}
-								} 
-								catch (ExecutorManagerException e) {
-									if (e.getReason() != null && e.getReason() == ExecutorManagerException.Reason.SkippedExecution) {
-										logger.info(e.getMessage());
-									}
-									else {
-										e.printStackTrace();
-									}
-								}
-								catch (Exception e) {
-									logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
-								}
-
-//								runnerStage = "Done running schedule for " + runningSched.toString();
-								removeRunnerSchedule(runningSched);
-
-								// Immediately reschedule if it's possible. Let
-								// the execution manager
-								// handle any duplicate runs.
-								if (runningSched.updateTime()) {
-									addRunnerSchedule(runningSched);
-									loader.updateSchedule(runningSched);
-								}
-								else {
-									removeSchedule(runningSched);
-								}								
-							} else {
-//								runnerStage = "Waiting for next round scan.";
-								// wait until flow run
-								long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
-								try {
-									nextWakupTime = System.currentTimeMillis() + millisWait;
-									this.wait(Math.min(millisWait, TIMEOUT_MS));
-								} catch (InterruptedException e) {
-									// interruption should occur when items are
-									// added or removed from the queue.
-								}
-							}
-						}
-					} catch (Exception e) {
-						logger.error("Unexpected exception has been thrown in scheduler", e);
-					} catch (Throwable e) {
-						logger.error("Unexpected throwable has been thrown in scheduler", e);
-					}
-				}
-			}
-		}
-
-		/**
-		 * Class to sort the schedule based on time.
-		 * 
-		 * @author Richard Park
-		 */
-		private class ScheduleComparator implements Comparator<Schedule> {
-			@Override
-			public int compare(Schedule arg0, Schedule arg1) {
-				long first = arg1.getNextExecTime();
-				long second = arg0.getNextExecTime();
-
-				if (first == second) {
-					return 0;
-				} else if (first < second) {
-					return 1;
-				}
-
-				return -1;
-			}
-		}
-	}
-	
-	public long getLastCheckTime() {
-		if(useExternalRunner) {
-			return -1;
-		} else {
-			return runner.getLastCheckTime();
-		}
-	}
-	
-	public long getNextUpdateTime() {
-		if(useExternalRunner) {
-			return -1;
-		} else {
-			return runner.getNextWakeupTime();
-		}
-	}
-	
-	public State getThreadState() {
-		if(useExternalRunner) {
-			return null;
-		} else {
-			return runner.getState();
-		}
-	}
-	
-	public boolean isThreadActive() {
-		if(useExternalRunner) {
-			return false;
-		} else {
-			return runner.isAlive();
-		}
-	}
 
-//<<<<<<< HEAD
-//	
-//
-//	
-//
-//=======
-//	public String getThreadStage() {
-//		return runnerStage;
-//	}
-	
-//>>>>>>> 10830aeb8ac819473873cac3bb4e07b4aeda67e8
 }
diff --git a/src/java/azkaban/trigger/builtin/SlaChecker.java b/src/java/azkaban/trigger/builtin/SlaChecker.java
index 05a00ac..25ca9b3 100644
--- a/src/java/azkaban/trigger/builtin/SlaChecker.java
+++ b/src/java/azkaban/trigger/builtin/SlaChecker.java
@@ -5,12 +5,10 @@ import java.util.Map;
 
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.joda.time.ReadablePeriod;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
@@ -28,6 +26,7 @@ public class SlaChecker implements ConditionChecker{
 	private int execId;
 	private Map<String, Object> context;
 	private boolean passChecker = true;
+	private long checkTime = -1;
 	
 	private static ExecutorManagerAdapter executorManager;
 	
@@ -62,6 +61,7 @@ public class SlaChecker implements ConditionChecker{
 			ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
 			DateTime startTime = new DateTime(flow.getStartTime());
 			DateTime checkTime = startTime.plus(duration);
+			this.checkTime = checkTime.getMillis();
 			if(checkTime.isBeforeNow()) {
 				Status status = flow.getStatus();
 				if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
@@ -74,6 +74,7 @@ public class SlaChecker implements ConditionChecker{
 			ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
 			DateTime startTime = new DateTime(flow.getStartTime());
 			DateTime checkTime = startTime.plus(duration);
+			this.checkTime = checkTime.getMillis();
 			if(checkTime.isBeforeNow()) {
 				Status status = flow.getStatus();
 				if(status.equals(Status.SUCCEEDED)) {
@@ -89,6 +90,7 @@ public class SlaChecker implements ConditionChecker{
 				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
 				DateTime startTime = new DateTime(node.getStartTime());
 				DateTime checkTime = startTime.plus(duration);
+				this.checkTime = checkTime.getMillis();
 				if(checkTime.isBeforeNow()) {
 					Status status = node.getStatus();
 					if(status.equals(Status.FAILED) || status.equals(Status.KILLED) || status.equals(Status.SUCCEEDED)) {
@@ -105,6 +107,7 @@ public class SlaChecker implements ConditionChecker{
 				ReadablePeriod duration = Utils.parsePeriodString((String) slaOption.getInfo().get(SlaOption.INFO_DURATION));
 				DateTime startTime = new DateTime(node.getStartTime());
 				DateTime checkTime = startTime.plus(duration);
+				this.checkTime = checkTime.getMillis();
 				if(checkTime.isBeforeNow()) {
 					Status status = node.getStatus();
 					if(status.equals(Status.SUCCEEDED)) {
@@ -229,8 +232,7 @@ public class SlaChecker implements ConditionChecker{
 
 	@Override
 	public long getNextCheckTime() {
-		// TODO Auto-generated method stub
-		return 0;
+		return checkTime;
 	}
 
 }
diff --git a/src/java/azkaban/trigger/Condition.java b/src/java/azkaban/trigger/Condition.java
index 0311b54..d032527 100644
--- a/src/java/azkaban/trigger/Condition.java
+++ b/src/java/azkaban/trigger/Condition.java
@@ -26,6 +26,7 @@ public class Condition {
 	public Condition(Map<String, ConditionChecker> checkers, String expr) {
 		setCheckers(checkers);
 		this.expression = jexl.createExpression(expr);
+		updateNextCheckTime();
 	}
 	
 	public Condition(Map<String, ConditionChecker> checkers, String expr, long nextCheckTime) {
@@ -49,9 +50,11 @@ public class Condition {
 	public void registerChecker(ConditionChecker checker) {
 		checkers.put(checker.getId(), checker);
 		context.set(checker.getId(), checker);
+		updateNextCheckTime();
 	}
 	
 	public long getNextCheckTime() {
+		updateNextCheckTime();
 		return nextCheckTime;
 	}
 	
@@ -64,18 +67,25 @@ public class Condition {
 		for(ConditionChecker checker : checkers.values()) {
 			this.context.set(checker.getId(), checker);
 		}
+		updateNextCheckTime();
 	}
 	
-	public void resetCheckers() {
+	private void updateNextCheckTime() {
 		long time = Long.MAX_VALUE;
 		for(ConditionChecker checker : checkers.values()) {
-			checker.reset();
 			time = Math.min(time, checker.getNextCheckTime());
 		}
-		logger.error("Done resetting checkers. The next check time will be " + new DateTime(time));
 		this.nextCheckTime = time;
 	}
 	
+	public void resetCheckers() {
+		for(ConditionChecker checker : checkers.values()) {
+			checker.reset();
+		}
+		updateNextCheckTime();
+		logger.error("Done resetting checkers. The next check time will be " + new DateTime(nextCheckTime));
+	}
+	
 	public String getExpression() {
 		return this.expression.getExpression();
 	}
diff --git a/src/java/azkaban/trigger/Trigger.java b/src/java/azkaban/trigger/Trigger.java
index 013a548..60e58f8 100644
--- a/src/java/azkaban/trigger/Trigger.java
+++ b/src/java/azkaban/trigger/Trigger.java
@@ -310,10 +310,12 @@ public class Trigger {
 	
 	public void resetTriggerConditions() {
 		triggerCondition.resetCheckers();
+		updateNextCheckTime();
 	}
 	
 	public void resetExpireCondition() {
 		expireCondition.resetCheckers();
+		updateNextCheckTime();
 	}
 	
 	public List<TriggerAction> getTriggerActions () {
diff --git a/src/java/azkaban/trigger/TriggerManager.java b/src/java/azkaban/trigger/TriggerManager.java
index bdcd30f..e964ecf 100644
--- a/src/java/azkaban/trigger/TriggerManager.java
+++ b/src/java/azkaban/trigger/TriggerManager.java
@@ -10,12 +10,13 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
 
 import azkaban.utils.Props;
 
 public class TriggerManager implements TriggerManagerAdapter{
 	private static Logger logger = Logger.getLogger(TriggerManager.class);
-	private static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
+	public static final long DEFAULT_SCANNER_INTERVAL_MS = 60000;
 
 	private static Map<Integer, Trigger> triggerIdMap = new HashMap<Integer, Trigger>();
 	
@@ -28,6 +29,8 @@ public class TriggerManager implements TriggerManagerAdapter{
 	private long runnerThreadIdleTime = -1;
 	private LocalTriggerJMX jmxStats = new LocalTriggerJMX();
 	
+	private String scannerStage = "";
+	
 	public TriggerManager(Props props, TriggerLoader triggerLoader) throws TriggerManagerException {
 
 		this.triggerLoader = triggerLoader;
@@ -165,6 +168,8 @@ public class TriggerManager implements TriggerManagerAdapter{
 					try{
 						lastRunnerThreadCheckTime = System.currentTimeMillis();
 						
+						scannerStage = "Ready to start a new scan cycle at " + lastRunnerThreadCheckTime;
+						
 						try{
 							checkAllTriggers();
 						} catch(Exception e) {
@@ -175,7 +180,10 @@ public class TriggerManager implements TriggerManagerAdapter{
 							logger.error(t.getMessage());
 						}
 						
+						scannerStage = "Done flipping all triggers.";
+						
 						runnerThreadIdleTime = scannerInterval - (System.currentTimeMillis() - lastRunnerThreadCheckTime);
+
 						if(runnerThreadIdleTime < 0) {
 							logger.error("Trigger manager thread " + this.getName() + " is too busy!");
 						} else {
@@ -192,6 +200,7 @@ public class TriggerManager implements TriggerManagerAdapter{
 		private void checkAllTriggers() throws TriggerManagerException {
 			long now = System.currentTimeMillis();
 			for(Trigger t : triggers) {
+				scannerStage = "Checking for trigger " + t.getTriggerId();
 				if(t.getNextCheckTime() > now) {
 					logger.info("Skipping trigger" + t.getTriggerId() + " until " + t.getNextCheckTime());
 					continue;
@@ -433,6 +442,11 @@ public class TriggerManager implements TriggerManagerAdapter{
 		public Map<String, Object> getAllJMXMbeans() {
 			return new HashMap<String, Object>();
 		}
+
+		@Override
+		public String getScannerThreadStage() {
+			return scannerStage;
+		}
 		
 	}
 
diff --git a/src/java/azkaban/trigger/TriggerManagerAdapter.java b/src/java/azkaban/trigger/TriggerManagerAdapter.java
index 2ce5df5..0934985 100644
--- a/src/java/azkaban/trigger/TriggerManagerAdapter.java
+++ b/src/java/azkaban/trigger/TriggerManagerAdapter.java
@@ -42,6 +42,7 @@ public interface TriggerManagerAdapter {
 		public String getTriggerIds();
 		public long getScannerIdleTime();
 		public Map<String, Object> getAllJMXMbeans();
+		public String getScannerThreadStage();
 	}
 	
 }
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index d0731ed..2af906b 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -42,7 +42,6 @@ import org.apache.velocity.app.VelocityEngine;
 import org.apache.velocity.runtime.log.Log4JLogChute;
 import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
 import org.apache.velocity.runtime.resource.loader.JarResourceLoader;
-import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.bio.SocketConnector;
@@ -61,21 +60,16 @@ import azkaban.executor.JdbcExecutorLoader;
 import azkaban.executor.ExecutorManager.Alerter;
 import azkaban.jmx.JmxExecutorManagerAdapter;
 import azkaban.jmx.JmxJettyServer;
-import azkaban.jmx.JmxScheduler;
 import azkaban.jmx.JmxTriggerManager;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectManager;
 
-import azkaban.scheduler.JdbcScheduleLoader;
 import azkaban.scheduler.ScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.scheduler.TriggerBasedScheduleLoader;
-import azkaban.trigger.ActionTypeLoader;
-import azkaban.trigger.CheckerTypeLoader;
 import azkaban.trigger.JdbcTriggerLoader;
 import azkaban.trigger.TriggerLoader;
 import azkaban.trigger.TriggerManager;
-import azkaban.trigger.TriggerAgent;
 import azkaban.trigger.TriggerManagerException;
 import azkaban.trigger.builtin.BasicTimeChecker;
 import azkaban.trigger.builtin.CreateTriggerAction;
@@ -83,7 +77,6 @@ import azkaban.trigger.builtin.ExecuteFlowAction;
 import azkaban.trigger.builtin.KillExecutionAction;
 import azkaban.trigger.builtin.SlaAlertAction;
 import azkaban.trigger.builtin.SlaChecker;
-import azkaban.triggerapp.AzkabanTriggerServer;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
 import azkaban.utils.FileIOUtils;
@@ -292,7 +285,7 @@ public class AzkabanWebServer extends AzkabanServer {
 	private ScheduleManager loadScheduleManager(ExecutorManagerAdapter executorManager, TriggerManager tm, Props props ) throws Exception {
 		logger.info("Loading trigger based scheduler");
 		ScheduleLoader loader = new TriggerBasedScheduleLoader(tm, executorManager, null, ScheduleManager.triggerSource);
-		return new ScheduleManager(executorManager, loader, true);
+		return new ScheduleManager(executorManager, loader);
 	}
 //	private TriggerBasedScheduler loadScheduler(ExecutorManager executorManager, ProjectManager projectManager, TriggerManager triggerManager) {
 //		TriggerBasedScheduleLoader loader = new TriggerBasedScheduleLoader(triggerManager, executorManager, projectManager);
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 0bbd585..fa4471b 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -32,7 +32,6 @@ import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutionOptions.FailureAction;
-import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;