azkaban-uncached

-Adding initial Jmx support -fixed bug causing restart to

2/19/2013 11:39:07 PM

Details

diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 63925de..5246f9b 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -19,9 +19,14 @@ package azkaban.execapp;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.TimeZone;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import org.apache.log4j.Logger;
 import org.joda.time.DateTimeZone;
 import org.mortbay.jetty.Server;
@@ -31,6 +36,10 @@ import org.mortbay.thread.QueuedThreadPool;
 
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
+import azkaban.jmx.JmxExecutorManager;
+import azkaban.jmx.JmxJettyServer;
+import azkaban.jmx.JmxSLAManager;
+import azkaban.jmx.JmxScheduler;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectLoader;
 import azkaban.utils.Props;
@@ -64,6 +73,9 @@ public class AzkabanExecutorServer {
 	private Props executorGlobalProps;
 	private Server server;
 	
+	private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
+	private MBeanServer mbeanServer;
+
 	/**
 	 * Constructor
 	 * 
@@ -75,7 +87,7 @@ public class AzkabanExecutorServer {
 		int portNumber = props.getInt("executor.port", DEFAULT_PORT_NUMBER);
 		int maxThreads = props.getInt("executor.maxThreads", DEFAULT_THREAD_NUMBER);
 
-		Server server = new Server(portNumber);
+		server = new Server(portNumber);
 		QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
 		server.setThreadPool(httpThreadPool);
 
@@ -100,6 +112,8 @@ public class AzkabanExecutorServer {
 		}
 		runnerManager.setGlobalProps(executorGlobalProps);
 		
+		configureMBeanServer();
+		
 		try {
 			server.start();
 		} 
@@ -131,7 +145,7 @@ public class AzkabanExecutorServer {
 	public ExecutorLoader getExecutorLoader() {
 		return executionLoader;
 	}
-
+	
 	/**
 	 * Returns the global azkaban properties
 	 * 
@@ -277,4 +291,36 @@ public class AzkabanExecutorServer {
 		
 		return props;
 	}
+
+	private void configureMBeanServer() {
+		logger.info("Registering MBeans...");
+		mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+		registerMbean("jetty", new JmxJettyServer(server));
+	}
+	
+	public void close() {
+		try {
+			for (ObjectName name : registeredMBeans) {
+				mbeanServer.unregisterMBean(name);
+				logger.info("Jmx MBean " + name.getCanonicalName() + " unregistered.");
+			}
+		} catch (Exception e) {
+			logger.error("Failed to cleanup MBeanServer", e);
+		}
+	}
+	
+	private void registerMbean(String name, Object mbean) {
+		Class<?> mbeanClass = mbean.getClass();
+		ObjectName mbeanName;
+		try {
+			mbeanName = new ObjectName(mbeanClass.getName() + ":name=" + name);
+			mbeanServer.registerMBean(mbean, mbeanName);
+			logger.info("Bean " + mbeanClass.getCanonicalName() + " registered.");
+			registeredMBeans.add(mbeanName);
+		} catch (Exception e) {
+			logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
+		}
+
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 5054c48..7374b60 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -75,6 +76,9 @@ public class FlowRunnerManager implements EventListener {
 	
 	private Props globalProps;
 	
+	private long lastSubmitterThreadCheckTime = -1;
+	private long lastCleanerThreadCheckTime = -1;
+	
 	public FlowRunnerManager(Props props, ExecutorLoader executorLoader, ProjectLoader projectLoader, ClassLoader parentClassLoader) throws IOException {
 		executionDirectory = new File(props.getString("azkaban.execution.dir", "executions"));
 		projectDirectory = new File(props.getString("azkaban.project.dir", "projects"));
@@ -131,6 +135,7 @@ public class FlowRunnerManager implements EventListener {
 		public void run() {
 			while (!shutdown) {
 				try {
+					lastSubmitterThreadCheckTime = System.currentTimeMillis();
 					FlowRunner flowRunner = queue.take();
 					executorService.submit(flowRunner);
 				} catch (InterruptedException e) {
@@ -157,6 +162,7 @@ public class FlowRunnerManager implements EventListener {
 			while (!shutdown) {
 				synchronized (this) {
 					try {
+						lastCleanerThreadCheckTime = System.currentTimeMillis();
 						wait(RECENTLY_FINISHED_TIME_TO_LIVE);
 						// Cleanup old stuff.
 						cleanRecentlyFinished();
@@ -472,4 +478,43 @@ public class FlowRunnerManager implements EventListener {
 		
 		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
 	}
+	
+	/**
+	 * 	private ExecutorService executorService;
+	private SubmitterThread submitterThread;
+	private CleanerThread cleanerThread;
+	 * @return
+	 */
+	public long getLastCleanerThreadCheckTime() {
+		return lastCleanerThreadCheckTime;
+	}
+
+	public long getLastSubmitterThreadCheckTime() {
+		return lastSubmitterThreadCheckTime;
+	}
+
+	public boolean isSubmitterThreadActive() {
+		return this.submitterThread.isAlive();
+	}
+
+	public boolean isCleanerThreadActive() {
+		return this.cleanerThread.isAlive();
+	}
+	
+	public State getSubmitterThreadState() {
+		return this.submitterThread.getState();
+	}
+
+	public State getCleanerThreadState() {
+		return this.cleanerThread.getState();
+	}
+	
+	public boolean isExecutorThreadPoolShutdown() {
+		return executorService.isShutdown();
+	}
+	
+	public int getNumExecutingFlows() {
+		return runningFlows.size();
+	}
+
 }
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 35208cb..72c8e3d 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -17,6 +17,7 @@
 package azkaban.executor;
 
 import java.io.IOException;
+import java.lang.Thread.State;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -57,6 +58,8 @@ public class ExecutorManager {
 	private ExecutorMailer mailer;
 	private ExecutingManagerUpdaterThread executingManager;
 	
+	private long lastThreadCheckTime = -1;
+	
 	public ExecutorManager(Props props, ExecutorLoader loader) throws ExecutorManagerException {
 		this.executorLoader = loader;
 		this.loadRunningFlows();
@@ -68,6 +71,26 @@ public class ExecutorManager {
 		executingManager.start();
 	}
 	
+	public String getExecutorHost() {
+		return executorHost;
+	}
+	
+	public int getExecutorPort() {
+		return executorPort;
+	}
+	
+	public State getExecutorThreadState() {
+		return executingManager.getState();
+	}
+	
+	public boolean isThreadActive() {
+		return executingManager.isAlive();
+	}
+	
+	public long getLastThreadCheckTime() {
+		return lastThreadCheckTime;
+	}
+	
 	private void loadRunningFlows() throws ExecutorManagerException {
 		runningFlows.putAll(executorLoader.fetchActiveFlows());
 	}
@@ -394,6 +417,8 @@ public class ExecutorManager {
 		public void run() {
 			while(!shutdown) {
 				try {
+					lastThreadCheckTime = System.currentTimeMillis();
+					
 					Map<ConnectionInfo, List<ExecutableFlow>> exFlowMap = getFlowToExecutorMap();
 					ArrayList<ExecutableFlow> finishedFlows = new ArrayList<ExecutableFlow>();
 					ArrayList<ExecutableFlow> finalizeFlows = new ArrayList<ExecutableFlow>();
diff --git a/src/java/azkaban/jmx/DisplayName.java b/src/java/azkaban/jmx/DisplayName.java
new file mode 100644
index 0000000..7a55f94
--- /dev/null
+++ b/src/java/azkaban/jmx/DisplayName.java
@@ -0,0 +1,21 @@
+package azkaban.jmx;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.management.DescriptorKey;
+
+/**
+ * DisplayName - This annotation allows to supply a display name for a method in
+ * the MBean interface.
+ */
+@Documented
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface DisplayName {
+	@DescriptorKey("displayName")
+	String value();
+}
diff --git a/src/java/azkaban/jmx/JmxExecutorManager.java b/src/java/azkaban/jmx/JmxExecutorManager.java
new file mode 100644
index 0000000..35b18a1
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxExecutorManager.java
@@ -0,0 +1,31 @@
+package azkaban.jmx;
+
+import azkaban.executor.ExecutorManager;
+
+public class JmxExecutorManager implements JmxExecutorManagerMBean {
+	private ExecutorManager manager;
+
+	public JmxExecutorManager(ExecutorManager manager) {
+		this.manager = manager;
+	}
+
+	@Override
+	public int getNumRunningFlows() {
+		return this.manager.getRunningFlows().size();
+	}
+
+	@Override
+	public String getExecutorThreadState() {
+		return manager.getExecutorThreadState().toString();
+	}
+
+	@Override
+	public boolean isThreadActive() {
+		return manager.isThreadActive();
+	}
+
+	@Override
+	public Long getLastThreadCheckTime() {
+		return manager.getLastThreadCheckTime();
+	}
+}
diff --git a/src/java/azkaban/jmx/JmxExecutorManagerMBean.java b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
new file mode 100644
index 0000000..d5649a4
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxExecutorManagerMBean.java
@@ -0,0 +1,15 @@
+package azkaban.jmx;
+
+public interface JmxExecutorManagerMBean {
+	@DisplayName("OPERATION: getNumRunningFlows")
+	public int getNumRunningFlows();
+	
+	@DisplayName("OPERATION: getExecutorThreadState")
+	public String getExecutorThreadState();
+
+	@DisplayName("OPERATION: isThreadActive")
+	public boolean isThreadActive();
+
+	@DisplayName("OPERATION: getLastThreadCheckTime")
+	public Long getLastThreadCheckTime();
+}
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
new file mode 100644
index 0000000..9bc3e2c
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -0,0 +1,11 @@
+package azkaban.jmx;
+
+import azkaban.execapp.FlowRunnerManager;
+
+public class JmxFlowRunnerManager {
+	private FlowRunnerManager manager;
+	
+	public JmxFlowRunnerManager(FlowRunnerManager manager) {
+		this.manager = manager;
+	}
+}
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
new file mode 100644
index 0000000..5c865c7
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -0,0 +1,5 @@
+package azkaban.jmx;
+
+public interface JmxFlowRunnerManagerMBean {
+	
+}
diff --git a/src/java/azkaban/jmx/JmxJettyServer.java b/src/java/azkaban/jmx/JmxJettyServer.java
new file mode 100644
index 0000000..6a74376
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxJettyServer.java
@@ -0,0 +1,129 @@
+package azkaban.jmx;
+
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Server;
+
+public class JmxJettyServer implements JmxJettyServerMBean {
+	private Server server;
+	private Connector connector;
+	
+	public JmxJettyServer(Server server) {
+		this.server = server;
+		this.connector = server.getConnectors()[0];
+	}
+	
+	@Override
+	public boolean isRunning() {
+		return this.server.isRunning();
+	}
+	
+	@Override
+	public boolean isFailed() {
+		return this.server.isFailed();
+	}
+	
+	@Override
+	public boolean isStopped() {
+		return this.server.isStopped();
+	}
+	
+	@Override
+	public int getNumThreads() {
+		return this.server.getThreadPool().getThreads();
+	}
+	
+	@Override
+	public int getNumIdleThreads() {
+		return this.server.getThreadPool().getIdleThreads();
+	}
+	
+	@Override
+	public String getHost() {
+		return connector.getHost();
+	}
+	
+	@Override
+	public int getPort() {
+		return connector.getPort();
+	}
+
+	@Override
+	public int getConfidentialPort() {
+		return connector.getConfidentialPort();
+	}
+	
+	@Override
+	public int getConnections() {
+		return connector.getConnections();
+	}
+	
+	@Override
+	public int getConnectionsOpen() {
+		return connector.getConnectionsOpen();
+	}
+	
+	@Override
+	public int getConnectionsOpenMax() {
+		return connector.getConnectionsOpenMax();
+	}
+	
+	@Override
+	public int getConnectionsOpenMin() {
+		return connector.getConnectionsOpenMin();
+	}
+	
+	@Override
+	public long getConnectionsDurationAve() {
+		return connector.getConnectionsDurationAve();
+	}
+	
+	@Override
+	public long getConnectionsDurationMax() {
+		return connector.getConnectionsDurationMax();
+	}
+	
+	@Override
+	public long getConnectionsDurationMin() {
+		return connector.getConnectionsDurationMin();
+	}
+	
+	@Override
+	public long getConnectionsDurationTotal() {
+		return connector.getConnectionsDurationTotal();
+	}
+	
+	@Override
+	public long getConnectionsRequestAve() {
+		return connector.getConnectionsRequestsAve();
+	}
+	
+	@Override
+	public long getConnectionsRequestMax() {
+		return connector.getConnectionsRequestsMax();
+	}
+	
+	@Override
+	public long getConnectionsRequestMin() {
+		return connector.getConnectionsRequestsMin();
+	}
+	
+	@Override
+	public void turnStatsOn() {
+		connector.setStatsOn(true);
+	}
+	
+	@Override
+	public void turnStatsOff() {
+		connector.setStatsOn(false);
+	}
+	
+	@Override
+	public void resetStats() {
+		connector.statsReset();
+	}
+
+	@Override
+	public boolean isStatsOn() {
+		return connector.getStatsOn();
+	}
+}
diff --git a/src/java/azkaban/jmx/JmxJettyServerMBean.java b/src/java/azkaban/jmx/JmxJettyServerMBean.java
new file mode 100644
index 0000000..7b1f046
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxJettyServerMBean.java
@@ -0,0 +1,72 @@
+package azkaban.jmx;
+
+public interface JmxJettyServerMBean {
+	@DisplayName("OPERATION: isRunning")
+	public boolean isRunning();
+	
+	@DisplayName("OPERATION: isFailed")
+	public boolean isFailed();
+	
+	@DisplayName("OPERATION: isStopped")
+	public boolean isStopped();
+	
+	@DisplayName("OPERATION: getNumThreads")
+	public int getNumThreads();
+	
+	@DisplayName("OPERATION: getNumIdleThreads")
+	public int getNumIdleThreads();
+	
+	@DisplayName("OPERATION: getHost")
+	public String getHost();
+	
+	@DisplayName("OPERATION: getPort")
+	public int getPort();
+
+	@DisplayName("OPERATION: getConfidentialPort")
+	public int getConfidentialPort();
+	
+	@DisplayName("OPERATION: getConnections")
+	public int getConnections();
+	
+	@DisplayName("OPERATION: getConnectionsOpen")
+	public int getConnectionsOpen();
+	
+	@DisplayName("OPERATION: getConnectionsOpenMax")
+	public int getConnectionsOpenMax();
+	
+	@DisplayName("OPERATION: getConnectionsOpenMin")
+	public int getConnectionsOpenMin();
+	
+	@DisplayName("OPERATION: getConnectionsDurationAve")
+	public long getConnectionsDurationAve();
+	
+	@DisplayName("OPERATION: getConnectionsDurationMax")
+	public long getConnectionsDurationMax();
+	
+	@DisplayName("OPERATION: getConnectionsDurationMin")
+	public long getConnectionsDurationMin();
+	
+	@DisplayName("OPERATION: getConnectionsDurationTotal")
+	public long getConnectionsDurationTotal();
+	
+	@DisplayName("OPERATION: getConnectionsRequestAve")
+	public long getConnectionsRequestAve();
+	
+	@DisplayName("OPERATION: getConnectionsRequestMax")
+	public long getConnectionsRequestMax();
+	
+	@DisplayName("OPERATION: getConnectionsRequestMin")
+	public long getConnectionsRequestMin();
+	
+	@DisplayName("OPERATION: turnStatsOn")
+	public void turnStatsOn();
+	
+	@DisplayName("OPERATION: turnStatsOff")
+	public void turnStatsOff();
+	
+	@DisplayName("OPERATION: resetStats")
+	public void resetStats();
+		
+	@DisplayName("OPERATION: isStatsOn")
+	public boolean isStatsOn();
+}
diff --git a/src/java/azkaban/jmx/JmxScheduler.java b/src/java/azkaban/jmx/JmxScheduler.java
new file mode 100644
index 0000000..73bcf98
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxScheduler.java
@@ -0,0 +1,31 @@
+package azkaban.jmx;
+
+import azkaban.scheduler.ScheduleManager;
+
+public class JmxScheduler implements JmxSchedulerMBean {
+	private final ScheduleManager manager;
+	
+	public JmxScheduler(ScheduleManager manager) {
+		this.manager = manager;
+	}
+	
+	@Override
+	public String getScheduleThreadState() {
+		return manager.getThreadState().toString();
+	}
+
+	@Override
+	public Long getNextScheduleTime() {
+		return manager.getNextUpdateTime();
+	}
+
+	@Override
+	public Long getLastThreadCheckTime() {
+		return manager.getLastCheckTime();
+	}
+
+	@Override
+	public Boolean isThreadActive() {
+		return manager.isThreadActive();
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/jmx/JmxSchedulerMBean.java b/src/java/azkaban/jmx/JmxSchedulerMBean.java
new file mode 100644
index 0000000..19a8b70
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxSchedulerMBean.java
@@ -0,0 +1,15 @@
+package azkaban.jmx;
+
+public interface JmxSchedulerMBean {
+	@DisplayName("OPERATION: getScheduleThreadState")
+	String getScheduleThreadState();
+	
+	@DisplayName("OPERATION: getNextScheduleTime")
+	Long getNextScheduleTime();
+	
+	@DisplayName("OPERATION: getLastCheckTime")
+	Long getLastThreadCheckTime();
+	
+	@DisplayName("OPERATION: isThreadActive")
+	Boolean isThreadActive();
+}
\ No newline at end of file
diff --git a/src/java/azkaban/jmx/JmxSLAManager.java b/src/java/azkaban/jmx/JmxSLAManager.java
new file mode 100644
index 0000000..b8e89a4
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxSLAManager.java
@@ -0,0 +1,45 @@
+package azkaban.jmx;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import azkaban.sla.SLA;
+import azkaban.sla.SLAManager;
+
+public class JmxSLAManager implements JmxSLAManagerMBean {
+	private final SLAManager manager;
+	
+	public JmxSLAManager(SLAManager manager) {
+		this.manager = manager;
+	}
+
+	@Override
+	public String getSLAThreadState() {
+		return manager.getSLAThreadState().toString();
+	}
+
+	@Override
+	public boolean isThreadActive() {
+		return manager.isThreadActive();
+	}
+
+	@Override
+	public Long getLastThreadCheckTime() {
+		return manager.getLastCheckTime();
+	}
+
+	@Override
+	public int getNumActiveSLA() {
+		return manager.getNumActiveSLA();
+	}
+
+	@Override
+	public List<String> getSLASummary() {
+		ArrayList<String> summary = new ArrayList<String>();
+		for(SLA sla: manager.getSLAList()) {
+			summary.add(sla.toString());
+		}
+		return summary;
+	}
+
+}
diff --git a/src/java/azkaban/jmx/JmxSLAManagerMBean.java b/src/java/azkaban/jmx/JmxSLAManagerMBean.java
new file mode 100644
index 0000000..1378b57
--- /dev/null
+++ b/src/java/azkaban/jmx/JmxSLAManagerMBean.java
@@ -0,0 +1,20 @@
+package azkaban.jmx;
+
+import java.util.List;
+
+public interface JmxSLAManagerMBean {
+	@DisplayName("OPERATION: getSLAThreadState")
+	String getSLAThreadState();
+	
+	@DisplayName("OPERATION: isThreadActive")
+	boolean isThreadActive();
+	
+	@DisplayName("OPERATION: getLastThreadCheckTime")
+	Long getLastThreadCheckTime();
+	
+	@DisplayName("OPERATION: getNumActiveSLA")
+	int getNumActiveSLA();
+	
+	@DisplayName("OPERATION: getSLASummary")
+	List<String> getSLASummary();
+}
diff --git a/src/java/azkaban/jmx/ParameterName.java b/src/java/azkaban/jmx/ParameterName.java
new file mode 100644
index 0000000..3ee0b97
--- /dev/null
+++ b/src/java/azkaban/jmx/ParameterName.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2011 Adconion, Inc.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.jmx;
+
+/**
+ * ParameterName - This annotation allows to supply
+ * a parameter name for a method in the MBean interface.
+ */
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.management.DescriptorKey;
+
+@Documented
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ParameterName {
+	@DescriptorKey("parameterName")
+	String value();
+}
diff --git a/src/java/azkaban/scheduler/ScheduleLoader.java b/src/java/azkaban/scheduler/ScheduleLoader.java
index 9caed47..276b10e 100644
--- a/src/java/azkaban/scheduler/ScheduleLoader.java
+++ b/src/java/azkaban/scheduler/ScheduleLoader.java
@@ -16,7 +16,6 @@
 
 package azkaban.scheduler;
 
-import java.sql.Connection;
 import java.util.List;
 
 public interface ScheduleLoader {
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index ad67def..a15907b 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -16,8 +16,8 @@
 
 package azkaban.scheduler;
 
+import java.lang.Thread.State;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -35,15 +35,12 @@ import org.joda.time.format.DateTimeFormatter;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
-import azkaban.executor.ExecutableFlow.FailureAction;
 import azkaban.executor.ExecutableFlow.Status;
 
 import azkaban.flow.Flow;
-import azkaban.jobExecutor.utils.JobExecutionException;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 
-
 import azkaban.scheduler.Schedule.FlowOptions;
 import azkaban.scheduler.Schedule.SlaOptions;
 import azkaban.sla.SLA.SlaAction;
@@ -70,6 +67,10 @@ public class ScheduleManager {
 	private final ExecutorManager executorManager;
 	private final ProjectManager projectManager;
 	private final SLAManager slaManager;
+	
+	// Used for mbeans to query Scheduler status
+	private long lastCheckTime = -1;
+	private long nextWakupTime = -1;
 
 	/**
 	 * Give the schedule manager a loader class that will properly load the
@@ -336,6 +337,7 @@ public class ScheduleManager {
 			while (stillAlive.get()) {
 				synchronized (this) {
 					try {
+						lastCheckTime = System.currentTimeMillis();
 						// TODO clear up the exception handling
 						Schedule s = schedules.peek();
 
@@ -343,6 +345,7 @@ public class ScheduleManager {
 							// If null, wake up every minute or so to see if
 							// there's something to do. Most likely there will not be.
 							try {
+								nextWakupTime = System.currentTimeMillis() + TIMEOUT_MS;
 								this.wait(TIMEOUT_MS);
 							} catch (InterruptedException e) {
 								// interruption should occur when items are added or removed from the queue.
@@ -454,6 +457,7 @@ public class ScheduleManager {
 								// wait until flow run
 								long millisWait = Math.max(0, s.getNextExecTime() - (new DateTime()).getMillis());
 								try {
+									nextWakupTime = System.currentTimeMillis() + millisWait;
 									this.wait(Math.min(millisWait, TIMEOUT_MS));
 								} catch (InterruptedException e) {
 									// interruption should occur when items are
@@ -491,4 +495,20 @@ public class ScheduleManager {
 			}
 		}
 	}
+	
+	public long getLastCheckTime() {
+		return lastCheckTime;
+	}
+	
+	public long getNextUpdateTime() {
+		return nextWakupTime;
+	}
+	
+	public State getThreadState() {
+		return runner.getState();
+	}
+	
+	public boolean isThreadActive() {
+		return runner.isAlive();
+	}
 }
diff --git a/src/java/azkaban/sla/SLA.java b/src/java/azkaban/sla/SLA.java
index eb08229..8611b95 100644
--- a/src/java/azkaban/sla/SLA.java
+++ b/src/java/azkaban/sla/SLA.java
@@ -216,7 +216,6 @@ public class SLA {
 			}
 			slaObj.put("jobSettings", settingsObj);
 		}
-		
 
 		return slaObj;
 	}
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
index ae17595..967228c 100644
--- a/src/java/azkaban/sla/SLAManager.java
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -1,5 +1,6 @@
 package azkaban.sla;
 
+import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.LinkedHashMap;
@@ -64,6 +65,8 @@ public class SLAManager {
 	private final ExecutorManager executorManager;
 	private SlaMailer mailer;
 
+	private long lastCheckTime = -1;
+	
 	/**
 	 * Give the sla manager a loader class that will properly load the
 	 * sla.
@@ -195,6 +198,8 @@ public class SLAManager {
 			while (stillAlive.get()) {
 				synchronized (this) {
 					try {
+						lastCheckTime = System.currentTimeMillis();
+						
 						// TODO clear up the exception handling
 						SLA s = SLAs.peek();
 
@@ -463,5 +468,23 @@ public class SLAManager {
 		}
 	}
 
+	public int getNumActiveSLA() {
+		return runner.getRunnerSLAs().size();
+	}
 	
+	public State getSLAThreadState() {
+		return runner.getState();
+	}
+	
+	public boolean isThreadActive() {
+		return runner.isAlive();
+	}
+	
+	public List<SLA> getSLAList() {
+		return runner.getRunnerSLAs();
+	}
+	
+	public long getLastCheckTime() {
+		return lastCheckTime;
+	}
 }
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index e3cceae..bbca3cf 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -19,6 +19,7 @@ package azkaban.webapp;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -30,6 +31,9 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.TimeZone;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.velocity.app.VelocityEngine;
@@ -46,6 +50,10 @@ import org.mortbay.thread.QueuedThreadPool;
 
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.JdbcExecutorLoader;
+import azkaban.jmx.JmxExecutorManager;
+import azkaban.jmx.JmxJettyServer;
+import azkaban.jmx.JmxSLAManager;
+import azkaban.jmx.JmxScheduler;
 import azkaban.project.JdbcProjectLoader;
 import azkaban.project.ProjectManager;
 
@@ -97,7 +105,7 @@ import joptsimple.OptionSpec;
  */
 public class AzkabanWebServer implements AzkabanServer {
 	private static final Logger logger = Logger.getLogger(AzkabanWebServer.class);
-
+	
 	public static final String AZKABAN_HOME = "AZKABAN_HOME";
 	public static final String DEFAULT_CONF_PATH = "conf";
 	public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
@@ -117,6 +125,7 @@ public class AzkabanWebServer implements AzkabanServer {
 
 	private final VelocityEngine velocityEngine;
 
+	private final Server server;
 	private UserManager userManager;
 	private ProjectManager projectManager;
 	private ExecutorManager executorManager;
@@ -129,20 +138,24 @@ public class AzkabanWebServer implements AzkabanServer {
 	private SessionCache sessionCache;
 	private File tempDir;
 	private List<ViewerPlugin> viewerPlugins;
+	
+	private MBeanServer mbeanServer;
+	private ArrayList<ObjectName> registeredMBeans = new ArrayList<ObjectName>();
 
 	/**
 	 * Constructor usually called by tomcat AzkabanServletContext to create the
 	 * initial server
 	 */
 	public AzkabanWebServer() throws Exception {
-		this(loadConfigurationFromAzkabanHome());
+		this(null, loadConfigurationFromAzkabanHome());
 	}
 
 	/**
 	 * Constructor
 	 */
-	public AzkabanWebServer(Props props) throws Exception {
+	public AzkabanWebServer(Server server, Props props) throws Exception {
 		this.props = props;
+		this.server = server;
 		velocityEngine = configureVelocityEngine(props.getBoolean(VELOCITY_DEV_MODE_PARAM, false));
 		sessionCache = new SessionCache(props);
 		userManager = loadUserManager(props);
@@ -162,6 +175,8 @@ public class AzkabanWebServer implements AzkabanServer {
 
 			logger.info("Setting timezone to " + timezone);
 		}
+		
+		configureMBeanServer();
 	}
 	
 	
@@ -378,7 +393,6 @@ public class AzkabanWebServer implements AzkabanServer {
 			logger.error("Exiting Azkaban...");
 			return;
 		}
-		app = new AzkabanWebServer(azkabanSettings);
 
 		// int portNumber =
 		// azkabanSettings.getInt("jetty.port",DEFAULT_PORT_NUMBER);
@@ -388,7 +402,6 @@ public class AzkabanWebServer implements AzkabanServer {
 				DEFAULT_THREAD_NUMBER);
 
 		logger.info("Setting up Jetty Server with port:" + sslPortNumber + " and numThreads:" + maxThreads);
-
 		final Server server = new Server();
 		SslSocketConnector secureConnector = new SslSocketConnector();
 		secureConnector.setPort(sslPortNumber);
@@ -399,7 +412,8 @@ public class AzkabanWebServer implements AzkabanServer {
 		secureConnector.setTrustPassword(azkabanSettings.getString("jetty.trustpassword"));
 		
 		server.addConnector(secureConnector);
-
+		app = new AzkabanWebServer(server, azkabanSettings);
+		
 		QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads);
 		server.setThreadPool(httpThreadPool);
 
@@ -445,6 +459,7 @@ public class AzkabanWebServer implements AzkabanServer {
 				logger.info("Shutting down http server...");
 				try {
 					app.getScheduleManager().shutdown();
+					app.close();
 					server.stop();
 					server.destroy();
 				} 
@@ -671,5 +686,38 @@ public class AzkabanWebServer implements AzkabanServer {
 		return props;
 	}
 
+	private void configureMBeanServer() {
+		logger.info("Registering MBeans...");
+		mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+		registerMbean("jetty", new JmxJettyServer(server));
+		registerMbean("scheduler", new JmxScheduler(scheduleManager));
+		registerMbean("slaManager", new JmxSLAManager(slaManager));
+		registerMbean("executorManager", new JmxExecutorManager(executorManager));
+	}
 	
+	public void close() {
+		try {
+			for (ObjectName name : registeredMBeans) {
+				mbeanServer.unregisterMBean(name);
+				logger.info("Jmx MBean " + name.getCanonicalName() + " unregistered.");
+			}
+		} catch (Exception e) {
+			logger.error("Failed to cleanup MBeanServer", e);
+		}
+	}
+	
+	private void registerMbean(String name, Object mbean) {
+		Class<?> mbeanClass = mbean.getClass();
+		ObjectName mbeanName;
+		try {
+			mbeanName = new ObjectName(mbeanClass.getName() + ":name=" + name);
+			mbeanServer.registerMBean(mbean, mbeanName);
+			logger.info("Bean " + mbeanClass.getCanonicalName() + " registered.");
+			registeredMBeans.add(mbeanName);
+		} catch (Exception e) {
+			logger.error("Error registering mbean " + mbeanClass.getCanonicalName(), e);
+		}
+
+	}
 }
diff --git a/src/sql/create_schedule_table.sql b/src/sql/create_schedule_table.sql
index 462e244..a2ca6d2 100644
--- a/src/sql/create_schedule_table.sql
+++ b/src/sql/create_schedule_table.sql
@@ -1,5 +1,3 @@
-DROP TABLE if exists schedules;
-
 CREATE TABLE schedules (
 	project_id INT NOT NULL,
 	project_name VARCHAR(128) NOT NULL,
@@ -13,7 +11,7 @@ CREATE TABLE schedules (
 	submit_time BIGINT,
 	submit_user VARCHAR(128),
 	enc_type TINYINT,
-    	schedule_options LONGBLOB,
+	schedule_options LONGBLOB,
 	primary key(project_id, flow_name)
 ) ENGINE=InnoDB;
 
diff --git a/src/web/js/azkaban.exflow.view.js b/src/web/js/azkaban.exflow.view.js
index cd6ff66..4fb7326 100644
--- a/src/web/js/azkaban.exflow.view.js
+++ b/src/web/js/azkaban.exflow.view.js
@@ -193,13 +193,24 @@ azkaban.FlowTabView= Backbone.View.extend({
 			// Nodes can be in a killed state, even if the parents have succeeded due to failure option Finish running
 			// We want to re-enable those.
 			var shouldAdd = true;
-			for(var key in node.in) {
-				var dependency = node.in[key];
-				if (dependency.status != 'SUCCEEDED' && dependency.status!='SKIPPED') {
+			if (node.in) {
+				var size = 0;
+				for(var key in node.in) {
+					size++;
+					var dependency = node.in[key];
+					if (dependency.status != 'SUCCEEDED' && dependency.status!='SKIPPED') {
+						shouldAdd = false;
+						break;
+					}
+				}
+				
+				if (size == 0) {
 					shouldAdd = false;
-					break;
 				}
 			}
+			else {
+				shouldAdd = false;
+			}
 			
 			if (shouldAdd) {
 				failedJobs.push(node.id);