diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 7c08c12..b304d20 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -82,6 +82,9 @@ public class FlowRunnerManager implements EventListener,
private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
+ // this map is used to store the flows that have been submitted to
+ // the executor service. Once a flow has been submitted, it is either
+ // in the queue waiting to be executed or in executing state.
private Map<Future<?>, Integer> submittedFlows =
new ConcurrentHashMap<Future<?>, Integer>();
private Map<Integer, FlowRunner> runningFlows =
@@ -471,7 +474,10 @@ public class FlowRunnerManager implements EventListener,
runningFlows.put(execId, runner);
try {
- // The executorService already has a queue
+ // The executorService already has a queue.
+ // The submit method below actually returns an instance of FutureTask,
+ // which implements interface RunnableFuture, which extends both
+ // Runnable and Future interfaces
Future<?> future = executorService.submit(runner);
// keep track of this future
submittedFlows.put(future, runner.getExecutionId());
@@ -753,12 +759,15 @@ public class FlowRunnerManager implements EventListener,
}
public String getRunningFlowIds() {
+ // The in progress tasks are actually of type FutureTask
Set<Runnable> inProgressTasks = executorService.getInProgressTasks();
List<Integer> runningFlowIds =
new ArrayList<Integer>(inProgressTasks.size());
for (Runnable task : inProgressTasks) {
+ // add casting here to ensure it matches the expected type in
+ // submittedFlows
Integer execId = submittedFlows.get((Future<?>) task);
if (execId != null) {
runningFlowIds.add(execId);