azkaban-uncached

Details

diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 5246f9b..8770e0e 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -37,6 +37,7 @@ import org.mortbay.thread.QueuedThreadPool;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JdbcExecutorLoader;
 import azkaban.jmx.JmxExecutorManager;
+import azkaban.jmx.JmxFlowRunnerManager;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxSLAManager;
 import azkaban.jmx.JmxScheduler;
@@ -297,6 +298,7 @@ public class AzkabanExecutorServer {
 		mbeanServer = ManagementFactory.getPlatformMBeanServer();
 
 		registerMbean("jetty", new JmxJettyServer(server));
+		registerMbean("flowRunnerManager", new JmxFlowRunnerManager(runnerManager));
 	}
 	
 	public void close() {
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index d912e8a..e046958 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -807,4 +807,19 @@ public class FlowRunner extends EventHandler implements Runnable {
 		
 		return runner.getLogFile();
 	}
+	
+	public boolean isRunnerThreadAlive() {
+		if (currentThread != null) {
+			return currentThread.isAlive();
+		}
+		return false;
+	}
+	
+	public boolean isThreadPoolShutdown() {
+		return executorService.isShutdown();
+	}
+	
+	public int getNumRunningJobs() {
+		return runningJob.size();
+	}
 }
\ No newline at end of file
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 84805d8..0275778 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -478,13 +478,7 @@ public class FlowRunnerManager implements EventListener {
 		
 		throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
 	}
-	
-	/**
-	 * 	private ExecutorService executorService;
-	private SubmitterThread submitterThread;
-	private CleanerThread cleanerThread;
-	 * @return
-	 */
+
 	public long getLastCleanerThreadCheckTime() {
 		return lastCleanerThreadCheckTime;
 	}
@@ -517,4 +511,13 @@ public class FlowRunnerManager implements EventListener {
 		return runningFlows.size();
 	}
 
+	public int getNumExecutingJobs() {
+		int jobCount = 0;
+		for (FlowRunner runner: runningFlows.values()) {
+			jobCount += runner.getNumRunningJobs();
+		}
+		
+		return jobCount;
+	}
+	
 }
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
index 9bc3e2c..f4f59d3 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -2,10 +2,56 @@ package azkaban.jmx;
 
 import azkaban.execapp.FlowRunnerManager;
 
-public class JmxFlowRunnerManager {
+public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
 	private FlowRunnerManager manager;
 	
 	public JmxFlowRunnerManager(FlowRunnerManager manager) {
 		this.manager = manager;
 	}
+
+	@Override
+	public long getLastCleanerThreadCheckTime() {
+		return manager.getLastCleanerThreadCheckTime();
+	}
+
+	@Override
+	public long getLastSubmitterThreadCheckTime() {
+		return manager.getLastSubmitterThreadCheckTime();
+	}
+
+	@Override
+	public boolean isSubmitterThreadActive() {
+		return manager.isSubmitterThreadActive();
+	}
+
+	@Override
+	public boolean isCleanerThreadActive() {
+		return manager.isCleanerThreadActive();
+	}
+
+	@Override
+	public String getSubmitterThreadState() {
+		return manager.getSubmitterThreadState().toString();
+	}
+
+	@Override
+	public String getCleanerThreadState() {
+		return manager.getCleanerThreadState().toString();
+	}
+
+	@Override
+	public boolean isExecutorThreadPoolShutdown() {
+		return manager.isExecutorThreadPoolShutdown();
+	}
+
+	@Override
+	public int getNumExecutingFlows() {
+		return manager.getNumExecutingFlows();
+	}
+
+	@Override
+	public int countTotalNumRunningJobs() {
+		return manager.getNumExecutingJobs();
+	}
+
 }
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
index 5c865c7..47c6a02 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -1,5 +1,30 @@
 package azkaban.jmx;
 
 public interface JmxFlowRunnerManagerMBean {
+	@DisplayName("OPERATION: getLastCleanerThreadCheckTime")
+	public long getLastCleanerThreadCheckTime();
+
+	@DisplayName("OPERATION: getLastSubmitterThreadCheckTime")
+	public long getLastSubmitterThreadCheckTime();
+
+	@DisplayName("OPERATION: isSubmitterThreadActive")
+	public boolean isSubmitterThreadActive();
+
+	@DisplayName("OPERATION: isCleanerThreadActive")
+	public boolean isCleanerThreadActive();
+
+	@DisplayName("OPERATION: getSubmitterThreadState")
+	public String getSubmitterThreadState();
+
+	@DisplayName("OPERATION: getCleanerThreadState")
+	public String getCleanerThreadState();
+
+	@DisplayName("OPERATION: isExecutorThreadPoolShutdown")
+	public boolean isExecutorThreadPoolShutdown();
+
+	@DisplayName("OPERATION: getNumExecutingFlows")
+	public int getNumExecutingFlows();
 	
+	@DisplayName("OPERATION: getTotalNumRunningJobs")
+	public int countTotalNumRunningJobs();
 }