Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
index 0d98a70..8d230ba 100644
--- a/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
+++ b/azkaban-common/src/main/java/azkaban/executor/JdbcExecutorLoader.java
@@ -34,10 +34,9 @@ import java.util.Map;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
-
import org.joda.time.DateTime;
import azkaban.database.AbstractJdbcLoader;
@@ -168,7 +167,11 @@ public class JdbcExecutorLoader extends AbstractJdbcLoader implements
List<ExecutableFlow> properties =
runner.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, flowHandler,
id);
- return properties.get(0);
+ if (properties.isEmpty()) {
+ return null;
+ } else {
+ return properties.get(0);
+ }
} catch (SQLException e) {
throw new ExecutorManagerException("Error fetching flow id " + id, e);
}
diff --git a/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
new file mode 100644
index 0000000..a7c85a4
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/ThreadPoolExecutingListener.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.utils;
+
+/**
+ * Interface for listener to get notified before and after a task has been
+ * executed.
+ *
+ * @author hluu
+ *
+ */
+public interface ThreadPoolExecutingListener {
+ public void beforeExecute(Runnable r);
+
+ public void afterExecute(Runnable r);
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
new file mode 100644
index 0000000..821d989
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/TrackingThreadPool.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.utils;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A simple subclass of {@link ThreadPoolExecutor} to keep track of in progress
+ * tasks as well as other interesting statistics.
+ *
+ * The content of this class is copied from article "Java theory and practice:
+ * Instrumenting applications with JMX"
+ *
+ * @author hluu
+ *
+ */
+public class TrackingThreadPool extends ThreadPoolExecutor {
+
+ private static Logger logger = Logger.getLogger(TrackingThreadPool.class);
+
+ private final Map<Runnable, Boolean> inProgress =
+ new ConcurrentHashMap<Runnable, Boolean>();
+ private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
+
+ private ThreadPoolExecutingListener executingListener =
+ new NoOpThreadPoolExecutingListener();
+
+ private long totalTime;
+ private int totalTasks;
+
+ public TrackingThreadPool(int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+ ThreadPoolExecutingListener listener) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ if (listener != null) {
+ executingListener = listener;
+ }
+ }
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ try {
+ executingListener.beforeExecute(r);
+ } catch (Throwable e) {
+ // to ensure the listener doesn't cause any issues
+ logger.warn("Listener threw exception", e);
+ }
+ super.beforeExecute(t, r);
+ inProgress.put(r, Boolean.TRUE);
+ startTime.set(new Long(System.currentTimeMillis()));
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ long time = System.currentTimeMillis() - startTime.get().longValue();
+ synchronized (this) {
+ totalTime += time;
+ ++totalTasks;
+ }
+ inProgress.remove(r);
+ super.afterExecute(r, t);
+ try {
+ executingListener.afterExecute(r);
+ } catch (Throwable e) {
+ // to ensure the listener doesn't cause any issues
+ logger.warn("Listener threw exception", e);
+ }
+ }
+
+ public Set<Runnable> getInProgressTasks() {
+ return Collections.unmodifiableSet(inProgress.keySet());
+ }
+
+ public synchronized int getTotalTasks() {
+ return totalTasks;
+ }
+
+ public synchronized double getAverageTaskTime() {
+ return (totalTasks == 0) ? 0 : totalTime / totalTasks;
+ }
+
+ private static class NoOpThreadPoolExecutingListener implements
+ ThreadPoolExecutingListener {
+
+ @Override
+ public void beforeExecute(Runnable r) {
+ }
+
+ @Override
+ public void afterExecute(Runnable r) {
+ }
+ }
+}
\ No newline at end of file
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index 123f9e4..f73a30a 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -60,7 +60,7 @@ import azkaban.utils.SwapQueue;
/**
* Class that handles the running of a ExecutableFlow DAG
- *
+ *
*/
public class FlowRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout(
@@ -122,7 +122,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Constructor. This will create its own ExecutorService for thread pools
- *
+ *
* @param flow
* @param executorLoader
* @param projectLoader
@@ -138,7 +138,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Constructor. If executorService is null, then it will create it's own for
* thread pools.
- *
+ *
* @param flow
* @param executorLoader
* @param projectLoader
@@ -356,7 +356,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Main method that executes the jobs.
- *
+ *
* @throws Exception
*/
private void runFlow() throws Exception {
@@ -725,7 +725,7 @@ public class FlowRunner extends EventHandler implements Runnable {
/**
* Determines what the state of the next node should be. Returns null if the
* node should not be run.
- *
+ *
* @param node
* @return
*/
@@ -1094,4 +1094,8 @@ public class FlowRunner extends EventHandler implements Runnable {
public int getNumRunningJobs() {
return activeJobRunners.size();
}
+
+ public int getExecutionId() {
+ return execId;
+ }
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index a044f70..54d82e8 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -27,16 +27,16 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
-import azkaban.project.ProjectLoader;
import azkaban.event.Event;
import azkaban.event.EventListener;
import azkaban.execapp.event.FlowWatcher;
@@ -48,18 +48,47 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
+import azkaban.project.ProjectLoader;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.ThreadPoolExecutingListener;
+import azkaban.utils.TrackingThreadPool;
/**
* Execution manager for the server side execution.
- *
+ *
+ * When a flow is submitted to FlowRunnerManager, it is the
+ * {@link Status.PREPARING} status. When a flow is about to be executed by
+ * FlowRunner, its status is updated to {@link Status.RUNNING}
+ *
+ * Two main data structures are used in this class to maintain flows.
+ *
+ * runningFlows: this is used as a bookkeeping for submitted flows in
+ * FlowRunnerManager. It has nothing to do with the executor service that is
+ * used to execute the flows. This bookkeeping is used at the time of canceling
+ * or killing a flow. The flows in this data structure is removed in the
+ * handleEvent method.
+ *
+ * submittedFlows: this is used to keep track the execution of the flows, so it
+ * has the mapping between a Future<?> and an execution id. This would allow us
+ * to find out the execution ids of the flows that are in the Status.PREPARING
+ * status. The entries in this map is removed once the flow execution is
+ * completed.
+ *
+ *
*/
-public class FlowRunnerManager implements EventListener {
+public class FlowRunnerManager implements EventListener,
+ ThreadPoolExecutingListener {
+ private static final String EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE =
+ "executor.use.bounded.threadpool.queue";
+ private static final String EXECUTOR_THREADPOOL_WORKQUEUE_SIZE =
+ "executor.threadpool.workqueue.size";
+ private static final String EXECUTOR_FLOW_THREADS = "executor.flow.threads";
+ private static final String FLOW_NUM_JOB_THREADS = "flow.num.job.threads";
private static Logger logger = Logger.getLogger(FlowRunnerManager.class);
private File executionDirectory;
private File projectDirectory;
@@ -68,20 +97,27 @@ public class FlowRunnerManager implements EventListener {
private static final long RECENTLY_FINISHED_TIME_TO_LIVE = 60 * 1000;
private static final int DEFAULT_NUM_EXECUTING_FLOWS = 30;
+ private static final int DEFAULT_FLOW_NUM_JOB_TREADS = 10;
+
private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
+ // this map is used to store the flows that have been submitted to
+ // the executor service. Once a flow has been submitted, it is either
+ // in the queue waiting to be executed or in executing state.
+ private Map<Future<?>, Integer> submittedFlows =
+ new ConcurrentHashMap<Future<?>, Integer>();
private Map<Integer, FlowRunner> runningFlows =
new ConcurrentHashMap<Integer, FlowRunner>();
private Map<Integer, ExecutableFlow> recentlyFinishedFlows =
new ConcurrentHashMap<Integer, ExecutableFlow>();
- private LinkedBlockingQueue<FlowRunner> flowQueue =
- new LinkedBlockingQueue<FlowRunner>();
+
private int numThreads = DEFAULT_NUM_EXECUTING_FLOWS;
+ private int threadPoolQueueSize = -1;
+
+ private TrackingThreadPool executorService;
- private ExecutorService executorService;
- private SubmitterThread submitterThread;
private CleanerThread cleanerThread;
- private int numJobThreadPerFlow = 10;
+ private int numJobThreadPerFlow = DEFAULT_FLOW_NUM_JOB_TREADS;
private ExecutorLoader executorLoader;
private ProjectLoader projectLoader;
@@ -92,7 +128,6 @@ public class FlowRunnerManager implements EventListener {
private final Props azkabanProps;
- private long lastSubmitterThreadCheckTime = -1;
private long lastCleanerThreadCheckTime = -1;
private long executionDirRetention = 1 * 24 * 60 * 60 * 1000;
@@ -132,10 +167,10 @@ public class FlowRunnerManager implements EventListener {
// azkaban.temp.dir
numThreads =
- props.getInt("executor.flow.threads", DEFAULT_NUM_EXECUTING_FLOWS);
+ props.getInt(EXECUTOR_FLOW_THREADS, DEFAULT_NUM_EXECUTING_FLOWS);
numJobThreadPerFlow =
- props.getInt("flow.num.job.threads", numJobThreadPerFlow);
- executorService = Executors.newFixedThreadPool(numThreads);
+ props.getInt(FLOW_NUM_JOB_THREADS, DEFAULT_FLOW_NUM_JOB_TREADS);
+ executorService = createExecutorService(numThreads);
this.executorLoader = executorLoader;
this.projectLoader = projectLoader;
@@ -146,9 +181,6 @@ public class FlowRunnerManager implements EventListener {
this.validateProxyUser =
azkabanProps.getBoolean("proxy.user.lock.down", false);
- submitterThread = new SubmitterThread(flowQueue);
- submitterThread.start();
-
cleanerThread = new CleanerThread();
cleanerThread.start();
@@ -165,6 +197,32 @@ public class FlowRunnerManager implements EventListener {
parentClassLoader);
}
+ private TrackingThreadPool createExecutorService(int nThreads) {
+ boolean useNewThreadPool =
+ azkabanProps.getBoolean(EXECUTOR_USE_BOUNDED_THREADPOOL_QUEUE, false);
+ logger.info("useNewThreadPool: " + useNewThreadPool);
+
+ if (useNewThreadPool) {
+ threadPoolQueueSize =
+ azkabanProps.getInt(EXECUTOR_THREADPOOL_WORKQUEUE_SIZE, nThreads);
+ logger.info("workQueueSize: " + threadPoolQueueSize);
+
+ // using a bounded queue for the work queue. The default rejection policy
+ // {@ThreadPoolExecutor.AbortPolicy} is used
+ TrackingThreadPool executor =
+ new TrackingThreadPool(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(threadPoolQueueSize), this);
+
+ return executor;
+ } else {
+ // the old way of using unbounded task queue.
+ // if the running tasks are taking a long time or stuck, this queue
+ // will be very very long.
+ return new TrackingThreadPool(nThreads, nThreads, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), this);
+ }
+ }
+
private Map<Pair<Integer, Integer>, ProjectVersion> loadExistingProjects() {
Map<Pair<Integer, Integer>, ProjectVersion> allProjects =
new HashMap<Pair<Integer, Integer>, ProjectVersion>();
@@ -202,34 +260,6 @@ public class FlowRunnerManager implements EventListener {
this.globalProps = globalProps;
}
- private class SubmitterThread extends Thread {
- private BlockingQueue<FlowRunner> queue;
- private boolean shutdown = false;
-
- public SubmitterThread(BlockingQueue<FlowRunner> queue) {
- this.setName("FlowRunnerManager-Submitter-Thread");
- this.queue = queue;
- }
-
- @SuppressWarnings("unused")
- public void shutdown() {
- shutdown = true;
- this.interrupt();
- }
-
- public void run() {
- while (!shutdown) {
- try {
- lastSubmitterThreadCheckTime = System.currentTimeMillis();
- FlowRunner flowRunner = queue.take();
- executorService.submit(flowRunner);
- } catch (InterruptedException e) {
- logger.info("Interrupted. Probably to shut down.");
- }
- }
- }
- }
-
private class CleanerThread extends Thread {
// Every hour, clean execution dir.
private static final long EXECUTION_DIR_CLEAN_INTERVAL_MS = 60 * 60 * 1000;
@@ -430,18 +460,18 @@ public class FlowRunnerManager implements EventListener {
}
int numJobThreads = numJobThreadPerFlow;
- if (options.getFlowParameters().containsKey("flow.num.job.threads")) {
+ if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
try {
int numJobs =
Integer.valueOf(options.getFlowParameters().get(
- "flow.num.job.threads"));
+ FLOW_NUM_JOB_THREADS));
if (numJobs > 0 && numJobs <= numJobThreads) {
numJobThreads = numJobs;
}
} catch (Exception e) {
throw new ExecutorManagerException(
"Failed to set the number of job threads "
- + options.getFlowParameters().get("flow.num.job.threads")
+ + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
+ " for flow " + execId, e);
}
}
@@ -461,7 +491,21 @@ public class FlowRunnerManager implements EventListener {
// Finally, queue the sucker.
runningFlows.put(execId, runner);
- flowQueue.add(runner);
+
+ try {
+ // The executorService already has a queue.
+ // The submit method below actually returns an instance of FutureTask,
+ // which implements interface RunnableFuture, which extends both
+ // Runnable and Future interfaces
+ Future<?> future = executorService.submit(runner);
+ // keep track of this future
+ submittedFlows.put(future, runner.getExecutionId());
+ } catch (RejectedExecutionException re) {
+ throw new ExecutorManagerException(
+ "Azkaban server can't execute any more flows. "
+ + "The number of running flows has reached the system configured limit."
+ + "Please notify Azkaban administrators");
+ }
}
private void setupFlow(ExecutableFlow flow) throws ExecutorManagerException {
@@ -713,22 +757,10 @@ public class FlowRunnerManager implements EventListener {
return lastCleanerThreadCheckTime;
}
- public long getLastSubmitterThreadCheckTime() {
- return lastSubmitterThreadCheckTime;
- }
-
- public boolean isSubmitterThreadActive() {
- return this.submitterThread.isAlive();
- }
-
public boolean isCleanerThreadActive() {
return this.cleanerThread.isAlive();
}
- public State getSubmitterThreadState() {
- return this.submitterThread.getState();
- }
-
public State getCleanerThreadState() {
return this.cleanerThread.getState();
}
@@ -737,26 +769,76 @@ public class FlowRunnerManager implements EventListener {
return executorService.isShutdown();
}
- public int getNumExecutingFlows() {
- return runningFlows.size();
+ public int getNumQueuedFlows() {
+ return executorService.getQueue().size();
+ }
+
+ public int getNumRunningFlows() {
+ return executorService.getActiveCount();
}
public String getRunningFlowIds() {
- ArrayList<Integer> ids = new ArrayList<Integer>(runningFlows.keySet());
- Collections.sort(ids);
- return ids.toString();
+ // The in progress tasks are actually of type FutureTask
+ Set<Runnable> inProgressTasks = executorService.getInProgressTasks();
+
+ List<Integer> runningFlowIds =
+ new ArrayList<Integer>(inProgressTasks.size());
+
+ for (Runnable task : inProgressTasks) {
+ // add casting here to ensure it matches the expected type in
+ // submittedFlows
+ Integer execId = submittedFlows.get((Future<?>) task);
+ if (execId != null) {
+ runningFlowIds.add(execId);
+ } else {
+ logger.warn("getRunningFlowIds: got null execId for task: " + task);
+ }
+ }
+
+ Collections.sort(runningFlowIds);
+ return runningFlowIds.toString();
}
- public int getNumExecutingJobs() {
- int jobCount = 0;
- for (FlowRunner runner : runningFlows.values()) {
- jobCount += runner.getNumRunningJobs();
+ public String getQueuedFlowIds() {
+ List<Integer> flowIdList =
+ new ArrayList<Integer>(executorService.getQueue().size());
+
+ for (Runnable task : executorService.getQueue()) {
+ Integer execId = submittedFlows.get(task);
+ if (execId != null) {
+ flowIdList.add(execId);
+ } else {
+ logger
+ .warn("getQueuedFlowIds: got null execId for queuedTask: " + task);
+ }
}
+ Collections.sort(flowIdList);
+ return flowIdList.toString();
+ }
- return jobCount;
+ public int getMaxNumRunningFlows() {
+ return numThreads;
+ }
+
+ public int getTheadPoolQueueSize() {
+ return threadPoolQueueSize;
}
public void reloadJobTypePlugins() throws JobTypeManagerException {
jobtypeManager.loadPlugins();
}
-}
+
+ public int getTotalNumExecutedFlows() {
+ return executorService.getTotalTasks();
+ }
+
+ @Override
+ public void beforeExecute(Runnable r) {
+ }
+
+ @Override
+ public void afterExecute(Runnable r) {
+ submittedFlows.remove(r);
+ }
+
+}
\ No newline at end of file
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
index 2f4ccb2..3437065 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManager.java
@@ -31,48 +31,52 @@ public class JmxFlowRunnerManager implements JmxFlowRunnerManagerMBean {
}
@Override
- public long getLastSubmitterThreadCheckTime() {
- return manager.getLastSubmitterThreadCheckTime();
+ public boolean isCleanerThreadActive() {
+ return manager.isCleanerThreadActive();
}
@Override
- public boolean isSubmitterThreadActive() {
- return manager.isSubmitterThreadActive();
+ public String getCleanerThreadState() {
+ return manager.getCleanerThreadState().toString();
}
@Override
- public boolean isCleanerThreadActive() {
- return manager.isCleanerThreadActive();
+ public boolean isExecutorThreadPoolShutdown() {
+ return manager.isExecutorThreadPoolShutdown();
}
@Override
- public String getSubmitterThreadState() {
- return manager.getSubmitterThreadState().toString();
+ public int getNumRunningFlows() {
+ return manager.getNumRunningFlows();
}
@Override
- public String getCleanerThreadState() {
- return manager.getCleanerThreadState().toString();
+ public int getNumQueuedFlows() {
+ return manager.getNumQueuedFlows();
}
@Override
- public boolean isExecutorThreadPoolShutdown() {
- return manager.isExecutorThreadPoolShutdown();
+ public String getRunningFlows() {
+ return manager.getRunningFlowIds();
}
@Override
- public int getNumExecutingFlows() {
- return manager.getNumExecutingFlows();
+ public String getQueuedFlows() {
+ return manager.getQueuedFlowIds();
}
@Override
- public int countTotalNumRunningJobs() {
- return manager.getNumExecutingJobs();
+ public int getMaxNumRunningFlows() {
+ return manager.getMaxNumRunningFlows();
}
@Override
- public String getRunningFlows() {
- return manager.getRunningFlowIds();
+ public int getMaxQueuedFlows() {
+ return manager.getTheadPoolQueueSize();
}
-}
+ @Override
+ public int getTotalNumExecutedFlows() {
+ return manager.getTotalNumExecutedFlows();
+ }
+}
\ No newline at end of file
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
index 1709aee..41f8f04 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/jmx/JmxFlowRunnerManagerMBean.java
@@ -22,30 +22,34 @@ public interface JmxFlowRunnerManagerMBean {
@DisplayName("OPERATION: getLastCleanerThreadCheckTime")
public long getLastCleanerThreadCheckTime();
- @DisplayName("OPERATION: getLastSubmitterThreadCheckTime")
- public long getLastSubmitterThreadCheckTime();
-
- @DisplayName("OPERATION: isSubmitterThreadActive")
- public boolean isSubmitterThreadActive();
-
@DisplayName("OPERATION: isCleanerThreadActive")
public boolean isCleanerThreadActive();
- @DisplayName("OPERATION: getSubmitterThreadState")
- public String getSubmitterThreadState();
-
@DisplayName("OPERATION: getCleanerThreadState")
public String getCleanerThreadState();
@DisplayName("OPERATION: isExecutorThreadPoolShutdown")
public boolean isExecutorThreadPoolShutdown();
- @DisplayName("OPERATION: getNumExecutingFlows")
- public int getNumExecutingFlows();
+ @DisplayName("OPERATION: getNumRunningFlows")
+ public int getNumRunningFlows();
+
+ @DisplayName("OPERATION: getNumQueuedFlows")
+ public int getNumQueuedFlows();
@DisplayName("OPERATION: getRunningFlows")
public String getRunningFlows();
- @DisplayName("OPERATION: getTotalNumRunningJobs")
- public int countTotalNumRunningJobs();
-}
+ @DisplayName("OPERATION: getQueuedFlows")
+ public String getQueuedFlows();
+
+ @DisplayName("OPERATION: getMaxNumRunningFlows")
+ public int getMaxNumRunningFlows();
+
+ @DisplayName("OPERATION: getMaxQueuedFlows")
+ public int getMaxQueuedFlows();
+
+ @DisplayName("OPERATION: getTotalNumExecutedFlows")
+ public int getTotalNumExecutedFlows();
+
+}
\ No newline at end of file