azkaban-aplcache

Merge pull request #281 from hluu/master flowRunnerManager

7/15/2014 6:16:31 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 0d98a70..8d230ba 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -34,10 +34,9 @@ import java.util.Map;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
-
 import org.joda.time.DateTime;
 
 import azkaban.database.AbstractJdbcLoader;
@@ -168,7 +167,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       List<ExecutableFlow> properties =
           runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler,
               id);
-      return properties.get(0);
+      if (properties.isEmpty()) {
+        return null;
+      } else {
+        return properties.get(0);
+      }
     } catch (SQLException e) {
       throw new ExecutorManagerException("Error fetching flow id " + id, e);
     }
diff --git a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
new file mode 100644
index 0000000..a7c85a4
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.utils;
+
+/**
+ * Interface for listener to get notified before and after a task has been
+ * executed.
+ * 
+ * @author hluu
+ * 
+ */
+public interface ThreadPoolExecutingListener {
+  public void beforeExecute(Runnable r);
+
+  public void afterExecute(Runnable r);
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
new file mode 100644
index 0000000..821d989
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.utils;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress
+ * tasks as well as other interesting statistics.
+ * 
+ * The content of this class is copied from article "Java theory and practice:
+ * Instrumenting applications with JMX"
+ * 
+ * @author hluu
+ * 
+ */
+public class TrackingThreadPool extends ThreadPoolExecutor {
+
+  private static Logger logger = Logger.getLogger(TrackingThreadPool.class);
+
+  private final Map<Runnable, Boolean> inProgress =
+      new ConcurrentHashMap<Runnable, Boolean>();
+  private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
+
+  private ThreadPoolExecutingListener executingListener =
+      new NoOpThreadPoolExecutingListener();
+
+  private long totalTime;
+  private int totalTasks;
+
+  public TrackingThreadPool(int corePoolSize, int maximumPoolSize,
+      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+      ThreadPoolExecutingListener listener) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+    if (listener != null) {
+      executingListener = listener;
+    }
+  }
+
+  @Override
+  protected void beforeExecute(Thread t, Runnable r) {
+    try {
+      executingListener.beforeExecute(r);
+    } catch (Throwable e) {
+      // to ensure the listener doesn't cause any issues
+      logger.warn("Listener threw exception", e);
+    }
+    super.beforeExecute(t, r);
+    inProgress.put(r, Boolean.TRUE);
+    startTime.set(new Long(System.currentTimeMillis()));
+  }
+
+  @Override
+  protected void afterExecute(Runnable r, Throwable t) {
+    long time = System.currentTimeMillis() - startTime.get().longValue();
+    synchronized (this) {
+      totalTime += time;
+      ++totalTasks;
+    }
+    inProgress.remove(r);
+    super.afterExecute(r, t);
+    try {
+      executingListener.afterExecute(r);
+    } catch (Throwable e) {
+      // to ensure the listener doesn't cause any issues
+      logger.warn("Listener threw exception", e);
+    }
+  }
+
+  public Set<Runnable> getInProgressTasks() {
+    return Collections.unmodifiableSet(inProgress.keySet());
+  }
+
+  public synchronized int getTotalTasks() {
+    return totalTasks;
+  }
+
+  public synchronized double getAverageTaskTime() {
+    return (totalTasks == 0) ? 0 : totalTime / totalTasks;
+  }
+
+  private static class NoOpThreadPoolExecutingListener implements
+      ThreadPoolExecutingListener {
+
+    @Override
+    public void beforeExecute(Runnable r) {
+    }
+
+    @Override
+    public void afterExecute(Runnable r) {
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index 123f9e4..f73a30a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -60,7 +60,7 @@ import azkaban.utils.SwapQueue;
 
 /**
  * Class that handles the running of a ExecutableFlow DAG
- *
+ * 
  */
 public class FlowRunner extends EventHandler implements Runnable {
   private static final Layout DEFAULT_LAYOUT = new PatternLayout(
@@ -122,7 +122,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Constructor. This will create its own ExecutorService for thread pools
-   *
+   * 
    * @param flow
    * @param executorLoader
    * @param projectLoader
@@ -138,7 +138,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   /**
    * Constructor. If executorService is null, then it will create it's own for
    * thread pools.
-   *
+   * 
    * @param flow
    * @param executorLoader
    * @param projectLoader
@@ -356,7 +356,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Main method that executes the jobs.
-   *
+   * 
    * @throws Exception
    */
   private void runFlow() throws Exception {
@@ -725,7 +725,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   /**
    * Determines what the state of the next node should be. Returns null if the
    * node should not be run.
-   *
+   * 
    * @param node
    * @return
    */
@@ -1094,4 +1094,8 @@ public class FlowRunner extends EventHandler implements Runnable {
   public int getNumRunningJobs() {
     return activeJobRunners.size();
   }
+
+  public int getExecutionId() {
+    return execId;
+  }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index a044f70..54d82e8 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -27,16 +27,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
-import azkaban.project.ProjectLoader;
 import azkaban.event.Event;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
@@ -48,18 +48,47 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
+import azkaban.project.ProjectLoader;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.ThreadPoolExecutingListener;
+import azkaban.utils.TrackingThreadPool;
 
 /**
  * Execution manager for the server side execution.
- *
+ * 
+ * When a flow is submitted to FlowRunnerManager, it is the
+ * {@link Status.PREPARING} status. When a flow is about to be executed by
+ * FlowRunner, its status is updated to {@link Status.RUNNING}
+ * 
+ * Two main data structures are used in this class to maintain flows.
+ * 
+ * runningFlows: this is used as a bookkeeping for submitted flows in
+ * FlowRunnerManager. It has nothing to do with the executor service that is
+ * used to execute the flows. This bookkeeping is used at the time of canceling
+ * or killing a flow. The flows in this data structure is removed in the
+ * handleEvent method.
+ * 
+ * submittedFlows: this is used to keep track the execution of the flows, so it
+ * has the mapping between a Future<?> and an execution id. This would allow us
+ * to find out the execution ids of the flows that are in the Status.PREPARING
+ * status. The entries in this map is removed once the flow execution is
+ * completed.
+ * 
+ * 
  */
-public class FlowRunnerManager implements EventListener {
+public class FlowRunnerManager implements EventListener,
+    ThreadPoolExecutingListener {
+  private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE =
+      "executor.use.bounded.threadpool.queue";
+  private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE =
+      "executor.threadpool.workqueue.size";
+  private static final String EXECUTOR_FLOW_THREADS = "executor.flow.threads";
+  private static final String FLOW_NUM_JOB_THREADS = "flow.num.job.threads";
   private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
   private File executionDirectory;
   private File projectDirectory;
@@ -68,20 +97,27 @@ public class FlowRunnerManager implements EventListener {
   private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;
 
   private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
+  private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
+
   private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
       new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
+  // this map is used to store the flows that have been submitted to
+  // the executor service. Once a flow has been submitted, it is either
+  // in the queue waiting to be executed or in executing state.
+  private Map<Future<?>, Integer> submittedFlows =
+      new ConcurrentHashMap<Future<?>, Integer>();
   private Map<Integer, FlowRunner> runningFlows =
       new ConcurrentHashMap<Integer, FlowRunner>();
   private Map<Integer, ExecutableFlow> recentlyFinishedFlows =
       new ConcurrentHashMap<Integer, ExecutableFlow>();
-  private LinkedBlockingQueue<FlowRunner> flowQueue =
-      new LinkedBlockingQueue<FlowRunner>();
+
   private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
+  private int threadPoolQueueSize = -1;
+
+  private TrackingThreadPool executorService;
 
-  private ExecutorService executorService;
-  private SubmitterThread submitterThread;
   private CleanerThread cleanerThread;
-  private int numJobThreadPerFlow = 10;
+  private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
 
   private ExecutorLoader executorLoader;
   private ProjectLoader projectLoader;
@@ -92,7 +128,6 @@ public class FlowRunnerManager implements EventListener {
 
   private final Props azkabanProps;
 
-  private long lastSubmitterThreadCheckTime = -1;
   private long lastCleanerThreadCheckTime = -1;
   private long executionDirRetention = 1 * 24 * 60 * 60 * 1000;
 
@@ -132,10 +167,10 @@ public class FlowRunnerManager implements EventListener {
 
     // azkaban.temp.dir
     numThreads =
-        props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
+        props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
     numJobThreadPerFlow =
-        props.getInt("flow.num.job.threads", numJobThreadPerFlow);
-    executorService = Executors.newFixedThreadPool(numThreads);
+        props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
+    executorService = createExecutorService(numThreads);
 
     this.executorLoader = executorLoader;
     this.projectLoader = projectLoader;
@@ -146,9 +181,6 @@ public class FlowRunnerManager implements EventListener {
     this.validateProxyUser =
         azkabanProps.getBoolean("proxy.user.lock.down", false);
 
-    submitterThread = new SubmitterThread(flowQueue);
-    submitterThread.start();
-
     cleanerThread = new CleanerThread();
     cleanerThread.start();
 
@@ -165,6 +197,32 @@ public class FlowRunnerManager implements EventListener {
             parentClassLoader);
   }
 
+  private TrackingThreadPool createExecutorService(int nThreads) {
+    boolean useNewThreadPool =
+        azkabanProps.getBoolean(EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE, false);
+    logger.info("useNewThreadPool: " + useNewThreadPool);
+
+    if (useNewThreadPool) {
+      threadPoolQueueSize =
+          azkabanProps.getInt(EXECUTOR_THREADPOOL_WORKQUEUE_SIZE, nThreads);
+      logger.info("workQueueSize: " + threadPoolQueueSize);
+
+      // using a bounded queue for the work queue. The default rejection policy
+      // {@ThreadPoolExecutor.AbortPolicy} is used
+      TrackingThreadPool executor =
+          new TrackingThreadPool(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
+              new LinkedBlockingQueue<Runnable>(threadPoolQueueSize), this);
+
+      return executor;
+    } else {
+      // the old way of using unbounded task queue.
+      // if the running tasks are taking a long time or stuck, this queue
+      // will be very very long.
+      return new TrackingThreadPool(nThreads, nThreads, 0L,
+          TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), this);
+    }
+  }
+
   private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
     Map<Pair<Integer, Integer>, ProjectVersion> allProjects =
         new HashMap<Pair<Integer, Integer>, ProjectVersion>();
@@ -202,34 +260,6 @@ public class FlowRunnerManager implements EventListener {
     this.globalProps = globalProps;
   }
 
-  private class SubmitterThread extends Thread {
-    private BlockingQueue<FlowRunner> queue;
-    private boolean shutdown = false;
-
-    public SubmitterThread(BlockingQueue<FlowRunner> queue) {
-      this.setName("FlowRunnerManager-Submitter-Thread");
-      this.queue = queue;
-    }
-
-    @SuppressWarnings("unused")
-    public void shutdown() {
-      shutdown = true;
-      this.interrupt();
-    }
-
-    public void run() {
-      while (!shutdown) {
-        try {
-          lastSubmitterThreadCheckTime = System.currentTimeMillis();
-          FlowRunner flowRunner = queue.take();
-          executorService.submit(flowRunner);
-        } catch (InterruptedException e) {
-          logger.info("Interrupted. Probably to shut down.");
-        }
-      }
-    }
-  }
-
   private class CleanerThread extends Thread {
     // Every hour, clean execution dir.
     private static final long EXECUTION_DIR_CLEAN_INTERVAL_MS = 60 * 60 * 1000;
@@ -430,18 +460,18 @@ public class FlowRunnerManager implements EventListener {
     }
 
     int numJobThreads = numJobThreadPerFlow;
-    if (options.getFlowParameters().containsKey("flow.num.job.threads")) {
+    if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
       try {
         int numJobs =
             Integer.valueOf(options.getFlowParameters().get(
-                "flow.num.job.threads"));
+                FLOW_NUM_JOB_THREADS));
         if (numJobs > 0 && numJobs <= numJobThreads) {
           numJobThreads = numJobs;
         }
       } catch (Exception e) {
         throw new ExecutorManagerException(
             "Failed to set the number of job threads "
-                + options.getFlowParameters().get("flow.num.job.threads")
+                + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
                 + " for flow " + execId, e);
       }
     }
@@ -461,7 +491,21 @@ public class FlowRunnerManager implements EventListener {
 
     // Finally, queue the sucker.
     runningFlows.put(execId, runner);
-    flowQueue.add(runner);
+
+    try {
+      // The executorService already has a queue.
+      // The submit method below actually returns an instance of FutureTask,
+      // which implements interface RunnableFuture, which extends both
+      // Runnable and Future interfaces
+      Future<?> future = executorService.submit(runner);
+      // keep track of this future
+      submittedFlows.put(future, runner.getExecutionId());
+    } catch (RejectedExecutionException re) {
+      throw new ExecutorManagerException(
+          "Azkaban server can't execute any more flows. "
+              + "The number of running flows has reached the system configured limit."
+              + "Please notify Azkaban administrators");
+    }
   }
 
   private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -713,22 +757,10 @@ public class FlowRunnerManager implements EventListener {
     return lastCleanerThreadCheckTime;
   }
 
-  public long getLastSubmitterThreadCheckTime() {
-    return lastSubmitterThreadCheckTime;
-  }
-
-  public boolean isSubmitterThreadActive() {
-    return this.submitterThread.isAlive();
-  }
-
   public boolean isCleanerThreadActive() {
     return this.cleanerThread.isAlive();
   }
 
-  public State getSubmitterThreadState() {
-    return this.submitterThread.getState();
-  }
-
   public State getCleanerThreadState() {
     return this.cleanerThread.getState();
   }
@@ -737,26 +769,76 @@ public class FlowRunnerManager implements EventListener {
     return executorService.isShutdown();
   }
 
-  public int getNumExecutingFlows() {
-    return runningFlows.size();
+  public int getNumQueuedFlows() {
+    return executorService.getQueue().size();
+  }
+
+  public int getNumRunningFlows() {
+    return executorService.getActiveCount();
   }
 
   public String getRunningFlowIds() {
-    ArrayList<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
-    Collections.sort(ids);
-    return ids.toString();
+    // The in progress tasks are actually of type FutureTask
+    Set<Runnable> inProgressTasks = executorService.getInProgressTasks();
+
+    List<Integer> runningFlowIds =
+        new ArrayList<Integer>(inProgressTasks.size());
+
+    for (Runnable task : inProgressTasks) {
+      // add casting here to ensure it matches the expected type in
+      // submittedFlows
+      Integer execId = submittedFlows.get((Future<?>) task);
+      if (execId != null) {
+        runningFlowIds.add(execId);
+      } else {
+        logger.warn("getRunningFlowIds: got null execId for task: " + task);
+      }
+    }
+
+    Collections.sort(runningFlowIds);
+    return runningFlowIds.toString();
   }
 
-  public int getNumExecutingJobs() {
-    int jobCount = 0;
-    for (FlowRunner runner : runningFlows.values()) {
-      jobCount += runner.getNumRunningJobs();
+  public String getQueuedFlowIds() {
+    List<Integer> flowIdList =
+        new ArrayList<Integer>(executorService.getQueue().size());
+
+    for (Runnable task : executorService.getQueue()) {
+      Integer execId = submittedFlows.get(task);
+      if (execId != null) {
+        flowIdList.add(execId);
+      } else {
+        logger
+            .warn("getQueuedFlowIds: got null execId for queuedTask: " + task);
+      }
     }
+    Collections.sort(flowIdList);
+    return flowIdList.toString();
+  }
 
-    return jobCount;
+  public int getMaxNumRunningFlows() {
+    return numThreads;
+  }
+
+  public int getTheadPoolQueueSize() {
+    return threadPoolQueueSize;
   }
 
   public void reloadJobTypePlugins() throws JobTypeManagerException {
     jobtypeManager.loadPlugins();
   }
-}
+
+  public int getTotalNumExecutedFlows() {
+    return executorService.getTotalTasks();
+  }
+
+  @Override
+  public void beforeExecute(Runnable r) {
+  }
+
+  @Override
+  public void afterExecute(Runnable r) {
+    submittedFlows.remove(r);
+  }
+
+}
\ No newline at end of file
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
index 2f4ccb2..3437065 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
@@ -31,48 +31,52 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
   }
 
   @Override
-  public long getLastSubmitterThreadCheckTime() {
-    return manager.getLastSubmitterThreadCheckTime();
+  public boolean isCleanerThreadActive() {
+    return manager.isCleanerThreadActive();
   }
 
   @Override
-  public boolean isSubmitterThreadActive() {
-    return manager.isSubmitterThreadActive();
+  public String getCleanerThreadState() {
+    return manager.getCleanerThreadState().toString();
   }
 
   @Override
-  public boolean isCleanerThreadActive() {
-    return manager.isCleanerThreadActive();
+  public boolean isExecutorThreadPoolShutdown() {
+    return manager.isExecutorThreadPoolShutdown();
   }
 
   @Override
-  public String getSubmitterThreadState() {
-    return manager.getSubmitterThreadState().toString();
+  public int getNumRunningFlows() {
+    return manager.getNumRunningFlows();
   }
 
   @Override
-  public String getCleanerThreadState() {
-    return manager.getCleanerThreadState().toString();
+  public int getNumQueuedFlows() {
+    return manager.getNumQueuedFlows();
   }
 
   @Override
-  public boolean isExecutorThreadPoolShutdown() {
-    return manager.isExecutorThreadPoolShutdown();
+  public String getRunningFlows() {
+    return manager.getRunningFlowIds();
   }
 
   @Override
-  public int getNumExecutingFlows() {
-    return manager.getNumExecutingFlows();
+  public String getQueuedFlows() {
+    return manager.getQueuedFlowIds();
   }
 
   @Override
-  public int countTotalNumRunningJobs() {
-    return manager.getNumExecutingJobs();
+  public int getMaxNumRunningFlows() {
+    return manager.getMaxNumRunningFlows();
   }
 
   @Override
-  public String getRunningFlows() {
-    return manager.getRunningFlowIds();
+  public int getMaxQueuedFlows() {
+    return manager.getTheadPoolQueueSize();
   }
 
-}
+  @Override
+  public int getTotalNumExecutedFlows() {
+    return manager.getTotalNumExecutedFlows();
+  }
+}
\ No newline at end of file
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
index 1709aee..41f8f04 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
@@ -22,30 +22,34 @@ public interface JmxFlowRunnerManagerMBean {
   @DisplayName("OPERATION: getLastCleanerThreadCheckTime")
   public long getLastCleanerThreadCheckTime();
 
-  @DisplayName("OPERATION: getLastSubmitterThreadCheckTime")
-  public long getLastSubmitterThreadCheckTime();
-
-  @DisplayName("OPERATION: isSubmitterThreadActive")
-  public boolean isSubmitterThreadActive();
-
   @DisplayName("OPERATION: isCleanerThreadActive")
   public boolean isCleanerThreadActive();
 
-  @DisplayName("OPERATION: getSubmitterThreadState")
-  public String getSubmitterThreadState();
-
   @DisplayName("OPERATION: getCleanerThreadState")
   public String getCleanerThreadState();
 
   @DisplayName("OPERATION: isExecutorThreadPoolShutdown")
   public boolean isExecutorThreadPoolShutdown();
 
-  @DisplayName("OPERATION: getNumExecutingFlows")
-  public int getNumExecutingFlows();
+  @DisplayName("OPERATION: getNumRunningFlows")
+  public int getNumRunningFlows();
+
+  @DisplayName("OPERATION: getNumQueuedFlows")
+  public int getNumQueuedFlows();
 
   @DisplayName("OPERATION: getRunningFlows")
   public String getRunningFlows();
 
-  @DisplayName("OPERATION: getTotalNumRunningJobs")
-  public int countTotalNumRunningJobs();
-}
+  @DisplayName("OPERATION: getQueuedFlows")
+  public String getQueuedFlows();
+
+  @DisplayName("OPERATION: getMaxNumRunningFlows")
+  public int getMaxNumRunningFlows();
+
+  @DisplayName("OPERATION: getMaxQueuedFlows")
+  public int getMaxQueuedFlows();
+
+  @DisplayName("OPERATION: getTotalNumExecutedFlows")
+  public int getTotalNumExecutedFlows();
+
+}
\ No newline at end of file