azkaban-aplcache

Test stability: FlowRunnerTest2 (#1158) Calling ExecutableFlowBase#isFlowFinished

6/4/2017 4:56:26 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
index a714dec..97bbd5d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
@@ -389,62 +389,6 @@ public class ExecutableFlowBase extends ExecutableNode {
     }
   }
 
-  /**
-   * Only returns true if the status of all finished nodes is true.
-   */
-  public boolean isFlowFinished() {
-    for (final String end : getEndNodes()) {
-      final ExecutableNode node = getExecutableNode(end);
-      if (!Status.isStatusFinished(node.getStatus())) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  /**
-   * Finds all jobs which are ready to run. This occurs when all of its
-   * dependency nodes are finished running.
-   *
-   * It will also return any subflow that has been completed such that the
-   * FlowRunner can properly handle them.
-   */
-  public List<ExecutableNode> findNextJobsToRun() {
-    final ArrayList<ExecutableNode> jobsToRun = new ArrayList<>();
-
-    if (isFlowFinished() && !Status.isStatusFinished(getStatus())) {
-      jobsToRun.add(this);
-    } else {
-      nodeloop:
-      for (final ExecutableNode node : this.executableNodes.values()) {
-        if (Status.isStatusFinished(node.getStatus())) {
-          continue;
-        }
-
-        if ((node instanceof ExecutableFlowBase)
-            && Status.isStatusRunning(node.getStatus())) {
-          // If the flow is still running, we traverse into the flow
-          jobsToRun.addAll(((ExecutableFlowBase) node).findNextJobsToRun());
-        } else if (Status.isStatusRunning(node.getStatus())) {
-          continue;
-        } else {
-          for (final String dependency : node.getInNodes()) {
-            // We find that the outer-loop is unfinished.
-            if (!Status.isStatusFinished(getExecutableNode(dependency)
-                .getStatus())) {
-              continue nodeloop;
-            }
-          }
-
-          jobsToRun.add(node);
-        }
-      }
-    }
-
-    return jobsToRun;
-  }
-
   public String getFlowPath() {
     if (this.getParentFlow() == null) {
       return this.getFlowId();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
index 09e041a..c1138ea 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -29,12 +29,14 @@ public class FlowRunnerTestBase {
 
   public void assertThreadShutDown() {
     waitFlowRunner(
-        runner -> runner.getExecutableFlow().isFlowFinished() && !runner.isRunnerThreadAlive());
+        runner -> Status.isStatusFinished(runner.getExecutableFlow().getStatus())
+        && !runner.isRunnerThreadAlive());
   }
 
   public void assertThreadRunning() {
     waitFlowRunner(
-        runner -> !runner.getExecutableFlow().isFlowFinished() && runner.isRunnerThreadAlive());
+        runner -> Status.isStatusRunning(runner.getExecutableFlow().getStatus())
+        && runner.isRunnerThreadAlive());
   }
 
   public void waitFlowRunner(final Function<FlowRunner, Boolean> statusCheck) {