azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 15(+15 -0)
Details
diff --git a/src/java/azkaban/execapp/AzkabanExecutorServer.java b/src/java/azkaban/execapp/AzkabanExecutorServer.java
index 5246f9b..8770e0e 100644
--- a/src/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/src/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -37,6 +37,7 @@ import org.mortbay.thread.QueuedThreadPool;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.JdbcExecutorLoader;
import azkaban.jmx.JmxExecutorManager;
+import azkaban.jmx.JmxFlowRunnerManager;
import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxSLAManager;
import azkaban.jmx.JmxScheduler;
@@ -297,6 +298,7 @@ public class AzkabanExecutorServer {
mbeanServer = ManagementFactory.getPlatformMBeanServer();
registerMbean("jetty", new JmxJettyServer(server));
+ registerMbean("flowRunnerManager", new JmxFlowRunnerManager(runnerManager));
}
public void close() {
src/java/azkaban/execapp/FlowRunner.java 15(+15 -0)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index d912e8a..e046958 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -807,4 +807,19 @@ public class FlowRunner extends EventHandler implements Runnable {
return runner.getLogFile();
}
+
+ public boolean isRunnerThreadAlive() {
+ if (currentThread != null) {
+ return currentThread.isAlive();
+ }
+ return false;
+ }
+
+ public boolean isThreadPoolShutdown() {
+ return executorService.isShutdown();
+ }
+
+ public int getNumRunningJobs() {
+ return runningJob.size();
+ }
}
\ No newline at end of file
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 84805d8..0275778 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -478,13 +478,7 @@ public class FlowRunnerManager implements EventListener {
throw new ExecutorManagerException("Error reading file. Log directory doesn't exist.");
}
-
- /**
- * private ExecutorService executorService;
- private SubmitterThread submitterThread;
- private CleanerThread cleanerThread;
- * @return
- */
+
public long getLastCleanerThreadCheckTime() {
return lastCleanerThreadCheckTime;
}
@@ -517,4 +511,13 @@ public class FlowRunnerManager implements EventListener {
return runningFlows.size();
}
+ public int getNumExecutingJobs() {
+ int jobCount = 0;
+ for (FlowRunner runner: runningFlows.values()) {
+ jobCount += runner.getNumRunningJobs();
+ }
+
+ return jobCount;
+ }
+
}
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManager.java b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
index 9bc3e2c..f4f59d3 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManager.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManager.java
@@ -2,10 +2,56 @@ package azkaban.jmx;
import azkaban.execapp.FlowRunnerManager;
-public class JmxFlowRunnerManager {
+public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
private FlowRunnerManager manager;
public JmxFlowRunnerManager(FlowRunnerManager manager) {
this.manager = manager;
}
+
+ @Override
+ public long getLastCleanerThreadCheckTime() {
+ return manager.getLastCleanerThreadCheckTime();
+ }
+
+ @Override
+ public long getLastSubmitterThreadCheckTime() {
+ return manager.getLastSubmitterThreadCheckTime();
+ }
+
+ @Override
+ public boolean isSubmitterThreadActive() {
+ return manager.isSubmitterThreadActive();
+ }
+
+ @Override
+ public boolean isCleanerThreadActive() {
+ return manager.isCleanerThreadActive();
+ }
+
+ @Override
+ public String getSubmitterThreadState() {
+ return manager.getSubmitterThreadState().toString();
+ }
+
+ @Override
+ public String getCleanerThreadState() {
+ return manager.getCleanerThreadState().toString();
+ }
+
+ @Override
+ public boolean isExecutorThreadPoolShutdown() {
+ return manager.isExecutorThreadPoolShutdown();
+ }
+
+ @Override
+ public int getNumExecutingFlows() {
+ return manager.getNumExecutingFlows();
+ }
+
+ @Override
+ public int countTotalNumRunningJobs() {
+ return manager.getNumExecutingJobs();
+ }
+
}
diff --git a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
index 5c865c7..47c6a02 100644
--- a/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
+++ b/src/java/azkaban/jmx/JmxFlowRunnerManagerMBean.java
@@ -1,5 +1,30 @@
package azkaban.jmx;
public interface JmxFlowRunnerManagerMBean {
+ @DisplayName("OPERATION: getLastCleanerThreadCheckTime")
+ public long getLastCleanerThreadCheckTime();
+
+ @DisplayName("OPERATION: getLastSubmitterThreadCheckTime")
+ public long getLastSubmitterThreadCheckTime();
+
+ @DisplayName("OPERATION: isSubmitterThreadActive")
+ public boolean isSubmitterThreadActive();
+
+ @DisplayName("OPERATION: isCleanerThreadActive")
+ public boolean isCleanerThreadActive();
+
+ @DisplayName("OPERATION: getSubmitterThreadState")
+ public String getSubmitterThreadState();
+
+ @DisplayName("OPERATION: getCleanerThreadState")
+ public String getCleanerThreadState();
+
+ @DisplayName("OPERATION: isExecutorThreadPoolShutdown")
+ public boolean isExecutorThreadPoolShutdown();
+
+ @DisplayName("OPERATION: getNumExecutingFlows")
+ public int getNumExecutingFlows();
+ @DisplayName("OPERATION: getTotalNumRunningJobs")
+ public int countTotalNumRunningJobs();
}