azkaban-aplcache
Test stability: FlowRunnerTest2 (#1158) Calling ExecutableFlowBase#isFlowFinished …
6/4/2017 4:56:26 PM
Changes
Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
index a714dec..97bbd5d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableFlowBase.java
@@ -389,62 +389,6 @@ public class ExecutableFlowBase extends ExecutableNode {
}
}
- /**
- * Only returns true if the status of all finished nodes is true.
- */
- public boolean isFlowFinished() {
- for (final String end : getEndNodes()) {
- final ExecutableNode node = getExecutableNode(end);
- if (!Status.isStatusFinished(node.getStatus())) {
- return false;
- }
- }
-
- return true;
- }
-
- /**
- * Finds all jobs which are ready to run. This occurs when all of its
- * dependency nodes are finished running.
- *
- * It will also return any subflow that has been completed such that the
- * FlowRunner can properly handle them.
- */
- public List<ExecutableNode> findNextJobsToRun() {
- final ArrayList<ExecutableNode> jobsToRun = new ArrayList<>();
-
- if (isFlowFinished() && !Status.isStatusFinished(getStatus())) {
- jobsToRun.add(this);
- } else {
- nodeloop:
- for (final ExecutableNode node : this.executableNodes.values()) {
- if (Status.isStatusFinished(node.getStatus())) {
- continue;
- }
-
- if ((node instanceof ExecutableFlowBase)
- && Status.isStatusRunning(node.getStatus())) {
- // If the flow is still running, we traverse into the flow
- jobsToRun.addAll(((ExecutableFlowBase) node).findNextJobsToRun());
- } else if (Status.isStatusRunning(node.getStatus())) {
- continue;
- } else {
- for (final String dependency : node.getInNodes()) {
- // We find that the outer-loop is unfinished.
- if (!Status.isStatusFinished(getExecutableNode(dependency)
- .getStatus())) {
- continue nodeloop;
- }
- }
-
- jobsToRun.add(node);
- }
- }
- }
-
- return jobsToRun;
- }
-
public String getFlowPath() {
if (this.getParentFlow() == null) {
return this.getFlowId();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
index 09e041a..c1138ea 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -29,12 +29,14 @@ public class FlowRunnerTestBase {
public void assertThreadShutDown() {
waitFlowRunner(
- runner -> runner.getExecutableFlow().isFlowFinished() && !runner.isRunnerThreadAlive());
+ runner -> Status.isStatusFinished(runner.getExecutableFlow().getStatus())
+ && !runner.isRunnerThreadAlive());
}
public void assertThreadRunning() {
waitFlowRunner(
- runner -> !runner.getExecutableFlow().isFlowFinished() && runner.isRunnerThreadAlive());
+ runner -> Status.isStatusRunning(runner.getExecutableFlow().getStatus())
+ && runner.isRunnerThreadAlive());
}
public void waitFlowRunner(final Function<FlowRunner, Boolean> statusCheck) {