azkaban-developers

Issue #280: flowRunnerManager JMX bean : the NumExecutingFlows

7/11/2014 6:07:01 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 0d98a70..8d230ba 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -34,10 +34,9 @@ import java.util.Map;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
-
 import org.joda.time.DateTime;
 
 import azkaban.database.AbstractJdbcLoader;
@@ -168,7 +167,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
       List<ExecutableFlow> properties =
           runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler,
               id);
-      return properties.get(0);
+      if (properties.isEmpty()) {
+        return null;
+      } else {
+        return properties.get(0);
+      }
     } catch (SQLException e) {
       throw new ExecutorManagerException("Error fetching flow id " + id, e);
     }
diff --git a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
new file mode 100644
index 0000000..c731146
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
@@ -0,0 +1,14 @@
+package azkaban.utils;
+
+/**
+ * Interface for listener to get notified before and after a task has been
+ * executed.
+ * 
+ * @author hluu
+ * 
+ */
+public interface ThreadPoolExecutingListener {
+  public void beforeExecute(Runnable r);
+
+  public void afterExecute(Runnable r);
+}
diff --git a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
new file mode 100644
index 0000000..2b19eed
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
@@ -0,0 +1,91 @@
+package azkaban.utils;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress
+ * tasks as well as other interesting statistics.
+ * 
+ * The content of this class is copied from article "Java theory and practice:
+ * Instrumenting applications with JMX"
+ * 
+ * @author hluu
+ * 
+ */
+public class TrackingThreadPool extends ThreadPoolExecutor {
+
+  private final Map<Runnable, Boolean> inProgress =
+      new ConcurrentHashMap<Runnable, Boolean>();
+  private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
+
+  private ThreadPoolExecutingListener executingListener =
+      new NoOpThreadPoolExecutingListener();
+
+  private long totalTime;
+  private int totalTasks;
+
+  public TrackingThreadPool(int corePoolSize, int maximumPoolSize,
+      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+      ThreadPoolExecutingListener listener) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+    if (listener != null) {
+      executingListener = listener;
+    }
+  }
+
+  protected void beforeExecute(Thread t, Runnable r) {
+    try {
+      executingListener.beforeExecute(r);
+    } finally {
+      // to ensure the listener doesn't cause any issues
+    }
+    super.beforeExecute(t, r);
+    inProgress.put(r, Boolean.TRUE);
+    startTime.set(new Long(System.currentTimeMillis()));
+  }
+
+  protected void afterExecute(Runnable r, Throwable t) {
+    long time = System.currentTimeMillis() - startTime.get().longValue();
+    synchronized (this) {
+      totalTime += time;
+      ++totalTasks;
+    }
+    inProgress.remove(r);
+    super.afterExecute(r, t);
+    try {
+      executingListener.afterExecute(r);
+    } finally {
+      // to ensure the listener doesn't cause any issues
+    }
+  }
+
+  public Set<Runnable> getInProgressTasks() {
+    return Collections.unmodifiableSet(inProgress.keySet());
+  }
+
+  public synchronized int getTotalTasks() {
+    return totalTasks;
+  }
+
+  public synchronized double getAverageTaskTime() {
+    return (totalTasks == 0) ? 0 : totalTime / totalTasks;
+  }
+
+  private static class NoOpThreadPoolExecutingListener implements
+      ThreadPoolExecutingListener {
+
+    @Override
+    public void beforeExecute(Runnable r) {
+    }
+
+    @Override
+    public void afterExecute(Runnable r) {
+    }
+  }
+}
\ No newline at end of file
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index 123f9e4..f73a30a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -60,7 +60,7 @@ import azkaban.utils.SwapQueue;
 
 /**
  * Class that handles the running of a ExecutableFlow DAG
- *
+ * 
  */
 public class FlowRunner extends EventHandler implements Runnable {
   private static final Layout DEFAULT_LAYOUT = new PatternLayout(
@@ -122,7 +122,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Constructor. This will create its own ExecutorService for thread pools
-   *
+   * 
    * @param flow
    * @param executorLoader
    * @param projectLoader
@@ -138,7 +138,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   /**
    * Constructor. If executorService is null, then it will create it's own for
    * thread pools.
-   *
+   * 
    * @param flow
    * @param executorLoader
    * @param projectLoader
@@ -356,7 +356,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   /**
    * Main method that executes the jobs.
-   *
+   * 
    * @throws Exception
    */
   private void runFlow() throws Exception {
@@ -725,7 +725,7 @@ public class FlowRunner extends EventHandler implements Runnable {
   /**
    * Determines what the state of the next node should be. Returns null if the
    * node should not be run.
-   *
+   * 
    * @param node
    * @return
    */
@@ -1094,4 +1094,8 @@ public class FlowRunner extends EventHandler implements Runnable {
   public int getNumRunningJobs() {
     return activeJobRunners.size();
   }
+
+  public int getExecutionId() {
+    return execId;
+  }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index a044f70..0f4c840 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -27,16 +27,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
-import azkaban.project.ProjectLoader;
 import azkaban.event.Event;
 import azkaban.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
@@ -48,18 +48,28 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
+import azkaban.project.ProjectLoader;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
+import azkaban.utils.ThreadPoolExecutingListener;
+import azkaban.utils.TrackingThreadPool;
 
 /**
  * Execution manager for the server side execution.
- *
+ * 
  */
-public class FlowRunnerManager implements EventListener {
+public class FlowRunnerManager implements EventListener,
+    ThreadPoolExecutingListener {
+  private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE =
+      "executor.use.bounded.threadpool.queue";
+  private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE =
+      "executor.threadpool.workqueue.size";
+  private static final String EXECUTOR_FLOW_THREADS = "executor.flow.threads";
+  private static final String FLOW_NUM_JOB_THREADS = "flow.num.job.threads";
   private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
   private File executionDirectory;
   private File projectDirectory;
@@ -70,16 +80,18 @@ public class FlowRunnerManager implements EventListener {
   private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
   private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
       new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
+  private Map<Future<?>, Integer> submittedFlows =
+      new ConcurrentHashMap<Future<?>, Integer>();
   private Map<Integer, FlowRunner> runningFlows =
       new ConcurrentHashMap<Integer, FlowRunner>();
   private Map<Integer, ExecutableFlow> recentlyFinishedFlows =
       new ConcurrentHashMap<Integer, ExecutableFlow>();
-  private LinkedBlockingQueue<FlowRunner> flowQueue =
-      new LinkedBlockingQueue<FlowRunner>();
+
   private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
+  private int threadPoolQueueSize = -1;
+
+  private TrackingThreadPool executorService;
 
-  private ExecutorService executorService;
-  private SubmitterThread submitterThread;
   private CleanerThread cleanerThread;
   private int numJobThreadPerFlow = 10;
 
@@ -92,7 +104,6 @@ public class FlowRunnerManager implements EventListener {
 
   private final Props azkabanProps;
 
-  private long lastSubmitterThreadCheckTime = -1;
   private long lastCleanerThreadCheckTime = -1;
   private long executionDirRetention = 1 * 24 * 60 * 60 * 1000;
 
@@ -132,10 +143,10 @@ public class FlowRunnerManager implements EventListener {
 
     // azkaban.temp.dir
     numThreads =
-        props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
+        props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
     numJobThreadPerFlow =
-        props.getInt("flow.num.job.threads", numJobThreadPerFlow);
-    executorService = Executors.newFixedThreadPool(numThreads);
+        props.getInt(FLOW_NUM_JOB_THREADS, numJobThreadPerFlow);
+    executorService = createExecutorService(numThreads);
 
     this.executorLoader = executorLoader;
     this.projectLoader = projectLoader;
@@ -146,9 +157,6 @@ public class FlowRunnerManager implements EventListener {
     this.validateProxyUser =
         azkabanProps.getBoolean("proxy.user.lock.down", false);
 
-    submitterThread = new SubmitterThread(flowQueue);
-    submitterThread.start();
-
     cleanerThread = new CleanerThread();
     cleanerThread.start();
 
@@ -165,6 +173,32 @@ public class FlowRunnerManager implements EventListener {
             parentClassLoader);
   }
 
+  private TrackingThreadPool createExecutorService(int nThreads) {
+    boolean useNewThreadPool =
+        azkabanProps.getBoolean(EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE, false);
+    logger.info("useNewThreadPool: " + useNewThreadPool);
+
+    if (useNewThreadPool) {
+      threadPoolQueueSize =
+          azkabanProps.getInt(EXECUTOR_THREADPOOL_WORKQUEUE_SIZE, nThreads);
+      logger.info("workQueueSize: " + threadPoolQueueSize);
+
+      // using a bounded queue for the work queue. The default rejection policy
+      // {@ThreadPoolExecutor.AbortPolicy} is used
+      TrackingThreadPool executor =
+          new TrackingThreadPool(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
+              new LinkedBlockingQueue<Runnable>(threadPoolQueueSize), this);
+
+      return executor;
+    } else {
+      // the old way of using unbounded task queue.
+      // if the running tasks are taking a long time or stuck, this queue
+      // will be very very long.
+      return new TrackingThreadPool(nThreads, nThreads, 0L,
+          TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), this);
+    }
+  }
+
   private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
     Map<Pair<Integer, Integer>, ProjectVersion> allProjects =
         new HashMap<Pair<Integer, Integer>, ProjectVersion>();
@@ -202,34 +236,6 @@ public class FlowRunnerManager implements EventListener {
     this.globalProps = globalProps;
   }
 
-  private class SubmitterThread extends Thread {
-    private BlockingQueue<FlowRunner> queue;
-    private boolean shutdown = false;
-
-    public SubmitterThread(BlockingQueue<FlowRunner> queue) {
-      this.setName("FlowRunnerManager-Submitter-Thread");
-      this.queue = queue;
-    }
-
-    @SuppressWarnings("unused")
-    public void shutdown() {
-      shutdown = true;
-      this.interrupt();
-    }
-
-    public void run() {
-      while (!shutdown) {
-        try {
-          lastSubmitterThreadCheckTime = System.currentTimeMillis();
-          FlowRunner flowRunner = queue.take();
-          executorService.submit(flowRunner);
-        } catch (InterruptedException e) {
-          logger.info("Interrupted. Probably to shut down.");
-        }
-      }
-    }
-  }
-
   private class CleanerThread extends Thread {
     // Every hour, clean execution dir.
     private static final long EXECUTION_DIR_CLEAN_INTERVAL_MS = 60 * 60 * 1000;
@@ -430,18 +436,18 @@ public class FlowRunnerManager implements EventListener {
     }
 
     int numJobThreads = numJobThreadPerFlow;
-    if (options.getFlowParameters().containsKey("flow.num.job.threads")) {
+    if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
       try {
         int numJobs =
             Integer.valueOf(options.getFlowParameters().get(
-                "flow.num.job.threads"));
+                FLOW_NUM_JOB_THREADS));
         if (numJobs > 0 && numJobs <= numJobThreads) {
           numJobThreads = numJobs;
         }
       } catch (Exception e) {
         throw new ExecutorManagerException(
             "Failed to set the number of job threads "
-                + options.getFlowParameters().get("flow.num.job.threads")
+                + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
                 + " for flow " + execId, e);
       }
     }
@@ -461,7 +467,18 @@ public class FlowRunnerManager implements EventListener {
 
     // Finally, queue the sucker.
     runningFlows.put(execId, runner);
-    flowQueue.add(runner);
+
+    try {
+      // The executorService already has a queue
+      Future<?> future = executorService.submit(runner);
+      // keep track of this future
+      submittedFlows.put(future, runner.getExecutionId());
+    } catch (RejectedExecutionException re) {
+      throw new ExecutorManagerException(
+          "Azkaban server can't execute any more flows. "
+              + "The number of running flows has reached the system configured limit."
+              + "Please notify Azkaban administrators");
+    }
   }
 
   private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -571,6 +588,7 @@ public class FlowRunnerManager implements EventListener {
       logger.info("Flow " + flow.getExecutionId()
           + " is finished. Adding it to recently finished flows list.");
       runningFlows.remove(flow.getExecutionId());
+      submittedFlows.remove(flow.getExecutionId());
     }
   }
 
@@ -713,22 +731,10 @@ public class FlowRunnerManager implements EventListener {
     return lastCleanerThreadCheckTime;
   }
 
-  public long getLastSubmitterThreadCheckTime() {
-    return lastSubmitterThreadCheckTime;
-  }
-
-  public boolean isSubmitterThreadActive() {
-    return this.submitterThread.isAlive();
-  }
-
   public boolean isCleanerThreadActive() {
     return this.cleanerThread.isAlive();
   }
 
-  public State getSubmitterThreadState() {
-    return this.submitterThread.getState();
-  }
-
   public State getCleanerThreadState() {
     return this.cleanerThread.getState();
   }
@@ -737,26 +743,73 @@ public class FlowRunnerManager implements EventListener {
     return executorService.isShutdown();
   }
 
-  public int getNumExecutingFlows() {
-    return runningFlows.size();
+  public int getNumQueuedFlows() {
+    return executorService.getQueue().size();
+  }
+
+  public int getNumRunningFlows() {
+    return executorService.getActiveCount();
   }
 
   public String getRunningFlowIds() {
-    ArrayList<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
-    Collections.sort(ids);
-    return ids.toString();
+    Set<Runnable> inProgressTasks = executorService.getInProgressTasks();
+
+    List<Integer> runningFlowIds =
+        new ArrayList<Integer>(inProgressTasks.size());
+
+    for (Runnable task : inProgressTasks) {
+      Integer execId = submittedFlows.get(task);
+      if (execId != null) {
+        runningFlowIds.add(execId);
+      } else {
+        logger.warn("getRunningFlowIds: got null execId for task: " + task);
+      }
+    }
+
+    Collections.sort(runningFlowIds);
+    return runningFlowIds.toString();
   }
 
-  public int getNumExecutingJobs() {
-    int jobCount = 0;
-    for (FlowRunner runner : runningFlows.values()) {
-      jobCount += runner.getNumRunningJobs();
+  public String getQueuedFlowIds() {
+    List<Integer> flowIdList =
+        new ArrayList<Integer>(executorService.getQueue().size());
+
+    for (Runnable task : executorService.getQueue()) {
+      Integer execId = submittedFlows.get(task);
+      if (execId != null) {
+        flowIdList.add(execId);
+      } else {
+        logger
+            .warn("getQueuedFlowIds: got null execId for queuedTask: " + task);
+      }
     }
+    Collections.sort(flowIdList);
+    return flowIdList.toString();
+  }
 
-    return jobCount;
+  public int getMaxNumRunningFlows() {
+    return numThreads;
+  }
+
+  public int getTheadPoolQueueSize() {
+    return threadPoolQueueSize;
   }
 
   public void reloadJobTypePlugins() throws JobTypeManagerException {
     jobtypeManager.loadPlugins();
   }
+
+  public int getTotalNumExecutedFlows() {
+    return executorService.getTotalTasks();
+  }
+
+  @Override
+  public void beforeExecute(Runnable r) {
+  }
+
+  @Override
+  public void afterExecute(Runnable r) {
+    submittedFlows.remove(r);
+  }
+
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
index 2f4ccb2..02ab2be 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
@@ -31,48 +31,52 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
   }
 
   @Override
-  public long getLastSubmitterThreadCheckTime() {
-    return manager.getLastSubmitterThreadCheckTime();
+  public boolean isCleanerThreadActive() {
+    return manager.isCleanerThreadActive();
   }
 
   @Override
-  public boolean isSubmitterThreadActive() {
-    return manager.isSubmitterThreadActive();
+  public String getCleanerThreadState() {
+    return manager.getCleanerThreadState().toString();
   }
 
   @Override
-  public boolean isCleanerThreadActive() {
-    return manager.isCleanerThreadActive();
+  public boolean isExecutorThreadPoolShutdown() {
+    return manager.isExecutorThreadPoolShutdown();
   }
 
   @Override
-  public String getSubmitterThreadState() {
-    return manager.getSubmitterThreadState().toString();
+  public int getNumRunningFlows() {
+    return manager.getNumRunningFlows();
   }
 
   @Override
-  public String getCleanerThreadState() {
-    return manager.getCleanerThreadState().toString();
+  public int getNumQueuedFlows() {
+    return manager.getNumQueuedFlows();
   }
 
   @Override
-  public boolean isExecutorThreadPoolShutdown() {
-    return manager.isExecutorThreadPoolShutdown();
+  public String getRunningFlows() {
+    return manager.getRunningFlowIds();
   }
 
   @Override
-  public int getNumExecutingFlows() {
-    return manager.getNumExecutingFlows();
+  public String getQueuedFlows() {
+    return manager.getQueuedFlowIds();
   }
 
   @Override
-  public int countTotalNumRunningJobs() {
-    return manager.getNumExecutingJobs();
+  public int getMaxNumRunningFlows() {
+    return manager.getMaxNumRunningFlows();
   }
 
   @Override
-  public String getRunningFlows() {
-    return manager.getRunningFlowIds();
+  public int getMaxQueuedFlows() {
+    return manager.getTheadPoolQueueSize();
   }
 
+  @Override
+  public int getTotalNumExecutedFlows() {
+    return manager.getTotalNumExecutedFlows();
+  }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
index 1709aee..71212a8 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
@@ -22,30 +22,34 @@ public interface JmxFlowRunnerManagerMBean {
   @DisplayName("OPERATION: getLastCleanerThreadCheckTime")
   public long getLastCleanerThreadCheckTime();
 
-  @DisplayName("OPERATION: getLastSubmitterThreadCheckTime")
-  public long getLastSubmitterThreadCheckTime();
-
-  @DisplayName("OPERATION: isSubmitterThreadActive")
-  public boolean isSubmitterThreadActive();
-
   @DisplayName("OPERATION: isCleanerThreadActive")
   public boolean isCleanerThreadActive();
 
-  @DisplayName("OPERATION: getSubmitterThreadState")
-  public String getSubmitterThreadState();
-
   @DisplayName("OPERATION: getCleanerThreadState")
   public String getCleanerThreadState();
 
   @DisplayName("OPERATION: isExecutorThreadPoolShutdown")
   public boolean isExecutorThreadPoolShutdown();
 
-  @DisplayName("OPERATION: getNumExecutingFlows")
-  public int getNumExecutingFlows();
+  @DisplayName("OPERATION: getNumRunningFlows")
+  public int getNumRunningFlows();
+
+  @DisplayName("OPERATION: getNumQueuedFlows")
+  public int getNumQueuedFlows();
 
   @DisplayName("OPERATION: getRunningFlows")
   public String getRunningFlows();
 
-  @DisplayName("OPERATION: getTotalNumRunningJobs")
-  public int countTotalNumRunningJobs();
+  @DisplayName("OPERATION: getQueuedFlows")
+  public String getQueuedFlows();
+
+  @DisplayName("OPERATION: getMaxNumRunningFlows")
+  public int getMaxNumRunningFlows();
+
+  @DisplayName("OPERATION: getMaxQueuedFlows")
+  public int getMaxQueuedFlows();
+
+  @DisplayName("OPERATION: getTotalNumExecutedFlows")
+  public int getTotalNumExecutedFlows();
+
 }
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
index c5ae77c..21319d7 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -27,12 +27,14 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.log4j.Logger;
+
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorManagerAdapter;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
@@ -44,8 +46,8 @@ import azkaban.scheduler.ScheduleManagerException;
 import azkaban.server.HttpRequestUtils;
 import azkaban.server.session.Session;
 import azkaban.user.Permission;
-import azkaban.user.User;
 import azkaban.user.Permission.Type;
+import azkaban.user.User;
 import azkaban.utils.FileIOUtils.LogData;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.plugin.PluginRegistry;
@@ -53,6 +55,9 @@ import azkaban.webapp.plugin.ViewerPlugin;
 
 public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   private static final long serialVersionUID = 1L;
+
+  private static final Logger logger = Logger.getLogger(ExecutorServlet.class);
+
   private ProjectManager projectManager;
   private ExecutorManagerAdapter executorManager;
   private ScheduleManager scheduleManager;
@@ -349,7 +354,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 
   /**
    * Gets the logs through plain text stream to reduce memory overhead.
-   *
+   * 
    * @param req
    * @param resp
    * @param user
@@ -389,7 +394,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 
   /**
    * Gets the logs through ajax plain text stream to reduce memory overhead.
-   *
+   * 
    * @param req
    * @param resp
    * @param user
@@ -709,7 +714,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
       HttpServletResponse resp, HashMap<String, Object> ret, User user,
       ExecutableFlow exFlow) throws ServletException {
     Long lastUpdateTime = Long.parseLong(getParam(req, "lastUpdateTime"));
-    System.out.println("Fetching " + exFlow.getExecutionId());
+    logger.info("Fetching " + exFlow.getExecutionId());
 
     Project project =
         getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);
@@ -729,7 +734,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
   private void ajaxFetchExecutableFlow(HttpServletRequest req,
       HttpServletResponse resp, HashMap<String, Object> ret, User user,
       ExecutableFlow exFlow) throws ServletException {
-    System.out.println("Fetching " + exFlow.getExecutionId());
+    logger.info("Fetching " + exFlow.getExecutionId());
 
     Project project =
         getProjectAjaxByPermission(ret, exFlow.getProjectId(), user, Type.READ);