azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 3eb335b..7706fe5 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -454,7 +454,8 @@ public class ExecutorManager extends EventHandler implements
 
   /**
    * Gets a list of all the active (running flows and non-dispatched flows)
-   * executions for a given project and flow {@inheritDoc}
+   * executions for a given project and flow {@inheritDoc}. Results should
+   * be sorted as we assume this while setting up pipelined execution Id.
    *
    * @see azkaban.executor.ExecutorManagerAdapter#getRunningFlows(int,
    *      java.lang.String)
@@ -466,6 +467,7 @@ public class ExecutorManager extends EventHandler implements
       queuedFlows.getAllEntries()));
     executionIds.addAll(getRunningFlowsHelper(projectId, flowId,
       runningFlows.values()));
+    Collections.sort(executionIds);
     return executionIds;
   }
 
@@ -1870,7 +1872,11 @@ public class ExecutorManager extends EventHandler implements
           // process flow with current snapshot of activeExecutors
           selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
         }
-        currentContinuousFlowProcessed++;
+
+        // do not count failed flow processsing (flows still in queue)
+        if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
+          currentContinuousFlowProcessed++;
+        }
       }
     }