Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index 9482228..0de0713 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -249,13 +249,13 @@ public class ExecutionController extends EventHandler implements ExecutorManager
}
/**
- * Get all active (running, non-dispatched) flows from database. {@inheritDoc}
+ * Get all running (unfinished) flows from database. {@inheritDoc}
*/
@Override
public List<ExecutableFlow> getRunningFlows() {
final ArrayList<ExecutableFlow> flows = new ArrayList<>();
try {
- getActiveFlowHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
+ getFlowsHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
} catch (final ExecutorManagerException e) {
this.logger.error("Failed to get running flows.", e);
}
@@ -263,14 +263,58 @@ public class ExecutionController extends EventHandler implements ExecutorManager
}
/**
- * Helper method to get all running flows from a Pair<ExecutionReference,
- * ExecutableFlow collection
+ * Helper method to get all flows from collection.
*/
- private void getActiveFlowHelper(final ArrayList<ExecutableFlow> flows,
+ private void getFlowsHelper(final ArrayList<ExecutableFlow> flows,
final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
- for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
- flows.add(ref.getSecond());
+ collection.stream().forEach(ref -> flows.add(ref.getSecond()));
+ }
+
+ /**
+ * Get execution ids of all running (unfinished) flows from database.
+ */
+ public List<Integer> getRunningFlowIds() {
+ final List<Integer> allIds = new ArrayList<>();
+ try {
+ getExecutionIdsHelper(allIds, this.executorLoader.fetchUnfinishedFlows().values());
+ } catch (final ExecutorManagerException e) {
+ this.logger.error("Failed to get running flow ids.", e);
+ }
+ return allIds;
+ }
+
+ /**
+ * Get execution ids of all non-dispatched flows from database.
+ */
+ public List<Integer> getQueuedFlowIds() {
+ final List<Integer> allIds = new ArrayList<>();
+ try {
+ getExecutionIdsHelper(allIds, this.executorLoader.fetchQueuedFlows());
+ } catch (final ExecutorManagerException e) {
+ this.logger.error("Failed to get queued flow ids.", e);
+ }
+ return allIds;
+ }
+
+ /* Helper method to get all execution ids from collection in sorted order. */
+ private void getExecutionIdsHelper(final List<Integer> allIds,
+ final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+ collection.stream().forEach(ref -> allIds.add(ref.getSecond().getExecutionId()));
+ Collections.sort(allIds);
+ }
+
+ /**
+ * Get the number of non-dispatched flows from database. {@inheritDoc}
+ */
+ @Override
+ public long getQueuedFlowSize() {
+ long size = 0L;
+ try {
+ size = this.executorLoader.fetchQueuedFlows().size();
+ } catch (final ExecutorManagerException e) {
+ this.logger.error("Failed to get queued flow size.", e);
}
+ return size;
}
@Override
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 41146ad..0125778 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -559,11 +559,7 @@ public class ExecutorManager extends EventHandler implements
}
/**
- * Get execution Ids of all active (running, non-dispatched) flows
- *
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+ * Get execution Ids of all running (unfinished) flows
*/
public String getRunningFlowIds() {
final List<Integer> allIds = new ArrayList<>();
@@ -575,10 +571,6 @@ public class ExecutorManager extends EventHandler implements
/**
* Get execution Ids of all non-dispatched flows
- *
- * {@inheritDoc}
- *
- * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
*/
public String getQueuedFlowIds() {
final List<Integer> allIds = new ArrayList<>();
@@ -587,6 +579,10 @@ public class ExecutorManager extends EventHandler implements
return allIds.toString();
}
+ /**
+ * Get the number of non-dispatched flows. {@inheritDoc}
+ */
+ @Override
public long getQueuedFlowSize() {
return this.queuedFlows.size();
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index ae263ce..907ab1c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -36,7 +36,9 @@ public interface ExecutorManagerAdapter {
public List<Integer> getRunningFlows(int projectId, String flowId);
- public List<ExecutableFlow> getRunningFlows() throws IOException;
+ public List<ExecutableFlow> getRunningFlows();
+
+ public long getQueuedFlowSize();
/**
* <pre>
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
index c1c376f..9b8ab3c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -43,6 +43,8 @@ public class ExecutionControllerTest {
private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows = new
HashMap<>();
+ private List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows = new
+ ArrayList<>();
private List<Executor> activeExecutors = new ArrayList<>();
private List<Executor> allExecutors = new ArrayList<>();
private ExecutionController controller;
@@ -93,6 +95,8 @@ public class ExecutionControllerTest {
.of(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
+ this.queuedFlows = ImmutableList.of(new Pair<>(this.ref1, this.flow1));
+ when(this.loader.fetchQueuedFlows()).thenReturn(this.queuedFlows);
}
@After
@@ -111,6 +115,24 @@ public class ExecutionControllerTest {
}
@Test
+ public void testFetchAllActiveFlowIds() throws Exception {
+ initializeUnfinishedFlows();
+ assertThat(this.controller.getRunningFlowIds())
+ .isEqualTo(new ArrayList<>(this.unfinishedFlows.keySet()));
+ }
+
+ @Test
+ public void testFetchAllQueuedFlowIds() throws Exception {
+ assertThat(this.controller.getQueuedFlowIds())
+ .isEqualTo(ImmutableList.of(this.flow1.getExecutionId()));
+ }
+
+ @Test
+ public void testFetchQueuedFlowSize() throws Exception {
+ assertThat(this.controller.getQueuedFlowSize()).isEqualTo(this.queuedFlows.size());
+ }
+
+ @Test
public void testFetchActiveFlowByProject() throws Exception {
initializeUnfinishedFlows();
final List<Integer> executions = this.controller
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 76808b8..7c8863f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -23,10 +23,12 @@ import azkaban.AzkabanCommonModule;
import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.executor.ExecutionController;
import azkaban.executor.ExecutorManager;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.flowtrigger.FlowTriggerService;
import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
+import azkaban.jmx.JmxExecutionController;
import azkaban.jmx.JmxExecutorManager;
import azkaban.jmx.JmxJettyServer;
import azkaban.jmx.JmxTriggerManager;
@@ -597,8 +599,7 @@ public class AzkabanWebServer extends AzkabanServer {
private void startWebMetrics() throws Exception {
this.metricsManager
- .addGauge("WEB-NumQueuedFlows", ((ExecutorManager) this
- .executorManagerAdapter)::getQueuedFlowSize);
+ .addGauge("WEB-NumQueuedFlows", this.executorManagerAdapter::getQueuedFlowSize);
/*
* TODO: Currently {@link ExecutorManager#getRunningFlows()} includes both running and non-dispatched flows.
* Originally we would like to do a subtraction between getRunningFlows and {@link ExecutorManager#getQueuedFlowSize()},
@@ -606,9 +607,8 @@ public class AzkabanWebServer extends AzkabanServer {
* However, both getRunningFlows and getQueuedFlowSize are not synchronized, such that we can not make
* a thread safe subtraction. We need to fix this in the future.
*/
- this.metricsManager
- .addGauge("WEB-NumRunningFlows",
- () -> ((ExecutorManager) this.executorManagerAdapter).getRunningFlows().size());
+ this.metricsManager.addGauge("WEB-NumRunningFlows",
+ () -> (this.executorManagerAdapter.getRunningFlows().size()));
logger.info("starting reporting Web Server Metrics");
this.metricsManager.startReporting("AZ-WEB", this.props);
@@ -691,11 +691,13 @@ public class AzkabanWebServer extends AzkabanServer {
registerMbean("jetty", new JmxJettyServer(this.server));
registerMbean("triggerManager", new JmxTriggerManager(this.triggerManager));
- // Todo jamiesjc: enable Jmx for executionController later
- if (this.executorManagerAdapter != null
- && this.executorManagerAdapter instanceof ExecutorManager) {
+
+ if (this.executorManagerAdapter instanceof ExecutorManager) {
registerMbean("executorManager",
new JmxExecutorManager((ExecutorManager) this.executorManagerAdapter));
+ } else if (this.executorManagerAdapter instanceof ExecutionController) {
+ registerMbean("executionController", new JmxExecutionController((ExecutionController) this
+ .executorManagerAdapter));
}
// Register Log4J loggers as JMX beans so the log level can be