azkaban-aplcache

Details

diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 7c08c12..b304d20 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -82,6 +82,9 @@ public class FlowRunnerManager implements EventListener,
 
   private Map<Pair<Integer, Integer>, ProjectVersion> installedProjects =
       new ConcurrentHashMap<Pair<Integer, Integer>, ProjectVersion>();
+  // this map is used to store the flows that have been submitted to
+  // the executor service. Once a flow has been submitted, it is either
+  // in the queue waiting to be executed or in executing state.
   private Map<Future<?>, Integer> submittedFlows =
       new ConcurrentHashMap<Future<?>, Integer>();
   private Map<Integer, FlowRunner> runningFlows =
@@ -471,7 +474,10 @@ public class FlowRunnerManager implements EventListener,
     runningFlows.put(execId, runner);
 
     try {
-      // The executorService already has a queue
+      // The executorService already has a queue.
+      // The submit method below actually returns an instance of FutureTask,
+      // which implements interface RunnableFuture, which extends both
+      // Runnable and Future interfaces
       Future<?> future = executorService.submit(runner);
       // keep track of this future
       submittedFlows.put(future, runner.getExecutionId());
@@ -753,12 +759,15 @@ public class FlowRunnerManager implements EventListener,
   }
 
   public String getRunningFlowIds() {
+    // The in progress tasks are actually of type FutureTask
     Set<Runnable> inProgressTasks = executorService.getInProgressTasks();
 
     List<Integer> runningFlowIds =
         new ArrayList<Integer>(inProgressTasks.size());
 
     for (Runnable task : inProgressTasks) {
+      // add casting here to ensure it matches the expected type in
+      // submittedFlows
       Integer execId = submittedFlows.get((Future<?>) task);
       if (execId != null) {
         runningFlowIds.add(execId);