azkaban-aplcache

AZNewDispatchingLogic - Enable JMX for ExecutionController

1/16/2019 12:00:37 AM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
index 9482228..0de0713 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionController.java
@@ -249,13 +249,13 @@ public class ExecutionController extends EventHandler implements ExecutorManager
   }
 
   /**
-   * Get all active (running, non-dispatched) flows from database. {@inheritDoc}
+   * Get all running (unfinished) flows from database. {@inheritDoc}
    */
   @Override
   public List<ExecutableFlow> getRunningFlows() {
     final ArrayList<ExecutableFlow> flows = new ArrayList<>();
     try {
-      getActiveFlowHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
+      getFlowsHelper(flows, this.executorLoader.fetchUnfinishedFlows().values());
     } catch (final ExecutorManagerException e) {
       this.logger.error("Failed to get running flows.", e);
     }
@@ -263,14 +263,58 @@ public class ExecutionController extends EventHandler implements ExecutorManager
   }
 
   /**
-   * Helper method to get all running flows from a Pair<ExecutionReference,
-   * ExecutableFlow collection
+   * Helper method to get all flows from collection.
    */
-  private void getActiveFlowHelper(final ArrayList<ExecutableFlow> flows,
+  private void getFlowsHelper(final ArrayList<ExecutableFlow> flows,
       final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
-    for (final Pair<ExecutionReference, ExecutableFlow> ref : collection) {
-      flows.add(ref.getSecond());
+    collection.stream().forEach(ref -> flows.add(ref.getSecond()));
+  }
+
+  /**
+   * Get execution ids of all running (unfinished) flows from database.
+   */
+  public List<Integer> getRunningFlowIds() {
+    final List<Integer> allIds = new ArrayList<>();
+    try {
+      getExecutionIdsHelper(allIds, this.executorLoader.fetchUnfinishedFlows().values());
+    } catch (final ExecutorManagerException e) {
+      this.logger.error("Failed to get running flow ids.", e);
+    }
+    return allIds;
+  }
+
+  /**
+   * Get execution ids of all non-dispatched flows from database.
+   */
+  public List<Integer> getQueuedFlowIds() {
+    final List<Integer> allIds = new ArrayList<>();
+    try {
+      getExecutionIdsHelper(allIds, this.executorLoader.fetchQueuedFlows());
+    } catch (final ExecutorManagerException e) {
+      this.logger.error("Failed to get queued flow ids.", e);
+    }
+    return allIds;
+  }
+
+  /* Helper method to get all execution ids from collection in sorted order. */
+  private void getExecutionIdsHelper(final List<Integer> allIds,
+      final Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
+    collection.stream().forEach(ref -> allIds.add(ref.getSecond().getExecutionId()));
+    Collections.sort(allIds);
+  }
+
+  /**
+   * Get the number of non-dispatched flows from database. {@inheritDoc}
+   */
+  @Override
+  public long getQueuedFlowSize() {
+    long size = 0L;
+    try {
+      size = this.executorLoader.fetchQueuedFlows().size();
+    } catch (final ExecutorManagerException e) {
+      this.logger.error("Failed to get queued flow size.", e);
     }
+    return size;
   }
 
   @Override
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 41146ad..0125778 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -559,11 +559,7 @@ public class ExecutorManager extends EventHandler implements
   }
 
   /**
-   * Get execution Ids of all active (running, non-dispatched) flows
-   *
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
+   * Get execution Ids of all running (unfinished) flows
    */
   public String getRunningFlowIds() {
     final List<Integer> allIds = new ArrayList<>();
@@ -575,10 +571,6 @@ public class ExecutorManager extends EventHandler implements
 
   /**
    * Get execution Ids of all non-dispatched flows
-   *
-   * {@inheritDoc}
-   *
-   * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows()
    */
   public String getQueuedFlowIds() {
     final List<Integer> allIds = new ArrayList<>();
@@ -587,6 +579,10 @@ public class ExecutorManager extends EventHandler implements
     return allIds.toString();
   }
 
+  /**
+   * Get the number of non-dispatched flows. {@inheritDoc}
+   */
+  @Override
   public long getQueuedFlowSize() {
     return this.queuedFlows.size();
   }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
index ae263ce..907ab1c 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManagerAdapter.java
@@ -36,7 +36,9 @@ public interface ExecutorManagerAdapter {
 
   public List<Integer> getRunningFlows(int projectId, String flowId);
 
-  public List<ExecutableFlow> getRunningFlows() throws IOException;
+  public List<ExecutableFlow> getRunningFlows();
+
+  public long getQueuedFlowSize();
 
   /**
    * <pre>
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
index c1c376f..9b8ab3c 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutionControllerTest.java
@@ -43,6 +43,8 @@ public class ExecutionControllerTest {
   private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> activeFlows = new HashMap<>();
   private Map<Integer, Pair<ExecutionReference, ExecutableFlow>> unfinishedFlows = new
       HashMap<>();
+  private List<Pair<ExecutionReference, ExecutableFlow>> queuedFlows = new
+      ArrayList<>();
   private List<Executor> activeExecutors = new ArrayList<>();
   private List<Executor> allExecutors = new ArrayList<>();
   private ExecutionController controller;
@@ -93,6 +95,8 @@ public class ExecutionControllerTest {
         .of(this.flow2.getExecutionId(), new Pair<>(this.ref2, this.flow2),
             this.flow3.getExecutionId(), new Pair<>(this.ref3, this.flow3));
     when(this.loader.fetchActiveFlows()).thenReturn(this.activeFlows);
+    this.queuedFlows = ImmutableList.of(new Pair<>(this.ref1, this.flow1));
+    when(this.loader.fetchQueuedFlows()).thenReturn(this.queuedFlows);
   }
 
   @After
@@ -111,6 +115,24 @@ public class ExecutionControllerTest {
   }
 
   @Test
+  public void testFetchAllActiveFlowIds() throws Exception {
+    initializeUnfinishedFlows();
+    assertThat(this.controller.getRunningFlowIds())
+        .isEqualTo(new ArrayList<>(this.unfinishedFlows.keySet()));
+  }
+
+  @Test
+  public void testFetchAllQueuedFlowIds() throws Exception {
+    assertThat(this.controller.getQueuedFlowIds())
+        .isEqualTo(ImmutableList.of(this.flow1.getExecutionId()));
+  }
+
+  @Test
+  public void testFetchQueuedFlowSize() throws Exception {
+    assertThat(this.controller.getQueuedFlowSize()).isEqualTo(this.queuedFlows.size());
+  }
+
+  @Test
   public void testFetchActiveFlowByProject() throws Exception {
     initializeUnfinishedFlows();
     final List<Integer> executions = this.controller
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
index 76808b8..7c8863f 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/AzkabanWebServer.java
@@ -23,10 +23,12 @@ import azkaban.AzkabanCommonModule;
 import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.database.AzkabanDatabaseSetup;
+import azkaban.executor.ExecutionController;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.flowtrigger.FlowTriggerService;
 import azkaban.flowtrigger.quartz.FlowTriggerScheduler;
+import azkaban.jmx.JmxExecutionController;
 import azkaban.jmx.JmxExecutorManager;
 import azkaban.jmx.JmxJettyServer;
 import azkaban.jmx.JmxTriggerManager;
@@ -597,8 +599,7 @@ public class AzkabanWebServer extends AzkabanServer {
 
   private void startWebMetrics() throws Exception {
     this.metricsManager
-        .addGauge("WEB-NumQueuedFlows", ((ExecutorManager) this
-            .executorManagerAdapter)::getQueuedFlowSize);
+        .addGauge("WEB-NumQueuedFlows", this.executorManagerAdapter::getQueuedFlowSize);
     /*
      * TODO: Currently {@link ExecutorManager#getRunningFlows()} includes both running and non-dispatched flows.
      * Originally we would like to do a subtraction between getRunningFlows and {@link ExecutorManager#getQueuedFlowSize()},
@@ -606,9 +607,8 @@ public class AzkabanWebServer extends AzkabanServer {
      * However, both getRunningFlows and getQueuedFlowSize are not synchronized, such that we can not make
      * a thread safe subtraction. We need to fix this in the future.
      */
-    this.metricsManager
-        .addGauge("WEB-NumRunningFlows",
-            () -> ((ExecutorManager) this.executorManagerAdapter).getRunningFlows().size());
+    this.metricsManager.addGauge("WEB-NumRunningFlows",
+        () -> (this.executorManagerAdapter.getRunningFlows().size()));
 
     logger.info("starting reporting Web Server Metrics");
     this.metricsManager.startReporting("AZ-WEB", this.props);
@@ -691,11 +691,13 @@ public class AzkabanWebServer extends AzkabanServer {
 
     registerMbean("jetty", new JmxJettyServer(this.server));
     registerMbean("triggerManager", new JmxTriggerManager(this.triggerManager));
-    // Todo jamiesjc: enable Jmx for executionController later
-    if (this.executorManagerAdapter != null
-        && this.executorManagerAdapter instanceof ExecutorManager) {
+
+    if (this.executorManagerAdapter instanceof ExecutorManager) {
       registerMbean("executorManager",
           new JmxExecutorManager((ExecutorManager) this.executorManagerAdapter));
+    } else if (this.executorManagerAdapter instanceof ExecutionController) {
+      registerMbean("executionController", new JmxExecutionController((ExecutionController) this
+          .executorManagerAdapter));
     }
 
     // Register Log4J loggers as JMX beans so the log level can be