azkaban-aplcache

Rename assertFlowStatus method to waitForAndAssertFlowStatus

9/12/2017 5:05:57 PM

Details

diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index e8ea268..be40548 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 LinkedIn Corp.
+ * Copyright 2017 LinkedIn Corp.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy of
@@ -85,7 +85,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     startThread(this.runner);
     succeedJobs("job3", "job4", "job6");
 
-    assertFlowStatus(Status.SUCCEEDED);
+    waitForAndAssertFlowStatus(Status.SUCCEEDED);
     assertThreadShutDown();
     compareFinishedRuntime(this.runner);
 
@@ -119,7 +119,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     this.runner = createFlowRunner(exFlow, this.loader, eventCollector);
 
     Assert.assertTrue(!this.runner.isKilled());
-    assertFlowStatus(Status.READY);
+    waitForAndAssertFlowStatus(Status.READY);
 
     startThread(this.runner);
     succeedJobs("job3", "job4");
@@ -127,7 +127,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertThreadShutDown();
     compareFinishedRuntime(this.runner);
 
-    assertFlowStatus(Status.SUCCEEDED);
+    waitForAndAssertFlowStatus(Status.SUCCEEDED);
 
     assertStatus("job1", Status.SKIPPED);
     assertStatus("job2", Status.SUCCEEDED);
@@ -156,7 +156,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     succeedJobs("job6");
 
     Assert.assertTrue(!this.runner.isKilled());
-    assertFlowStatus(Status.FAILED);
+    waitForAndAssertFlowStatus(Status.FAILED);
 
     assertStatus("job1", Status.SUCCEEDED);
     assertStatus("job2d", Status.FAILED);
@@ -189,7 +189,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
 
     Assert.assertTrue(this.runner.isKilled());
 
-    assertFlowStatus(Status.KILLED);
+    waitForAndAssertFlowStatus(Status.KILLED);
 
     assertStatus("job1", Status.SUCCEEDED);
     assertStatus("job2d", Status.FAILED);
@@ -219,7 +219,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     startThread(this.runner);
     succeedJobs("job3");
 
-    assertFlowStatus(Status.FAILED);
+    waitForAndAssertFlowStatus(Status.FAILED);
 
     assertStatus("job1", Status.SUCCEEDED);
     assertStatus("job2d", Status.FAILED);
@@ -261,7 +261,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertStatus("job6", Status.KILLED);
     assertThreadShutDown();
 
-    assertFlowStatus(Status.KILLED);
+    waitForAndAssertFlowStatus(Status.KILLED);
 
     eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
   }
@@ -283,7 +283,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     assertAttempts("job-pass", 0);
     assertAttempts("job-retry-fail", 2);
 
-    assertFlowStatus(Status.FAILED);
+    waitForAndAssertFlowStatus(Status.FAILED);
   }
 
   private void startThread(final FlowRunner runner) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 974df4c..06b4b2c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -269,7 +269,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
     InteractiveTestJob.getTestJob("jobf").succeedJob();
     assertStatus("jobf", Status.SUCCEEDED);
-    assertFlowStatus(Status.SUCCEEDED);
+    waitForAndAssertFlowStatus(Status.SUCCEEDED);
   }
 
   /**
@@ -324,7 +324,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     InteractiveTestJob.getTestJob("jobf").succeedJob();
     assertStatus("jobf", Status.SUCCEEDED);
 
-    assertFlowStatus(Status.SUCCEEDED);
+    waitForAndAssertFlowStatus(Status.SUCCEEDED);
     assertThreadShutDown();
   }
 
@@ -347,7 +347,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
     // 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
     InteractiveTestJob.getTestJob("joba").failJob();
-    assertFlowStatus(Status.FAILED_FINISHING);
+    waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
     assertStatus("joba", Status.FAILED);
     assertStatus("joba1", Status.RUNNING);
     assertStatus("jobb", Status.CANCELLED);
@@ -363,7 +363,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     /// innerJobA completes
     InteractiveTestJob.getTestJob("joba1").succeedJob();
     assertStatus("jobf", Status.CANCELLED);
-    assertFlowStatus(Status.FAILED);
+    waitForAndAssertFlowStatus(Status.FAILED);
     assertThreadShutDown();
   }
 
@@ -395,7 +395,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
     InteractiveTestJob.getTestJob("joba1").failJob();
     assertStatus("joba1", Status.FAILED);
-    assertFlowStatus(Status.FAILED_FINISHING);
+    waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
 
     // 3. joba completes, everything is killed
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
@@ -408,13 +408,13 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobd:innerFlow2", Status.CANCELLED);
     assertStatus("jobb", Status.KILLED);
     assertStatus("jobd", Status.KILLED);
-    assertFlowStatus(Status.FAILED_FINISHING);
+    waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
 
     InteractiveTestJob.getTestJob("jobc").succeedJob();
     assertStatus("jobc", Status.SUCCEEDED);
     assertStatus("jobe", Status.CANCELLED);
     assertStatus("jobf", Status.CANCELLED);
-    assertFlowStatus(Status.FAILED);
+    waitForAndAssertFlowStatus(Status.FAILED);
     assertThreadShutDown();
   }
 
@@ -450,7 +450,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
     assertStatus("jobb", Status.FAILED_FINISHING);
     assertStatus("jobb:innerJobB", Status.FAILED);
-    assertFlowStatus(Status.FAILED_FINISHING);
+    waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
 
     InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
@@ -466,7 +466,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobc", Status.SUCCEEDED);
     assertStatus("jobe", Status.CANCELLED);
     assertStatus("jobf", Status.CANCELLED);
-    assertFlowStatus(Status.FAILED);
+    waitForAndAssertFlowStatus(Status.FAILED);
     assertThreadShutDown();
   }
 
@@ -508,7 +508,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
     assertStatus("jobb", Status.FAILED_FINISHING);
     assertStatus("jobb:innerJobB", Status.FAILED);
-    assertFlowStatus(Status.FAILED_FINISHING);
+    waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
 
     InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
@@ -527,7 +527,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobc", Status.SUCCEEDED);
     assertStatus("jobe", Status.CANCELLED);
     assertStatus("jobf", Status.CANCELLED);
-    assertFlowStatus(Status.FAILED);
+    waitForAndAssertFlowStatus(Status.FAILED);
     assertThreadShutDown();
   }
 
@@ -580,7 +580,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobf", Status.CANCELLED);
 
     assertThreadShutDown();
-    assertFlowStatus(Status.KILLED);
+    waitForAndAssertFlowStatus(Status.KILLED);
   }
 
   /**
@@ -622,7 +622,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobd:innerJobA", Status.SUCCEEDED);
     assertStatus("jobd:innerFlow2", Status.CANCELLED);
     assertStatus("jobd", Status.KILLED);
-    assertFlowStatus(Status.FAILED_FINISHING);
+    waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
 
     final ExecutableNode node = this.runner.getExecutableFlow()
         .getExecutableNodePath("jobd:innerFlow2");
@@ -641,7 +641,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobd", Status.RUNNING);
     assertStatus("jobb:innerFlow", Status.DISABLED);
     assertStatus("jobd:innerFlow2", Status.RUNNING);
-    assertFlowStatus(Status.RUNNING);
+    waitForAndAssertFlowStatus(Status.RUNNING);
     assertThreadRunning();
 
     InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
@@ -666,7 +666,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
     InteractiveTestJob.getTestJob("jobf").succeedJob();
     assertStatus("jobf", Status.SUCCEEDED);
-    assertFlowStatus(Status.SUCCEEDED);
+    waitForAndAssertFlowStatus(Status.SUCCEEDED);
     assertThreadShutDown();
   }
 
@@ -717,7 +717,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobe", Status.CANCELLED);
     assertStatus("jobf", Status.CANCELLED);
 
-    assertFlowStatus(Status.KILLED);
+    waitForAndAssertFlowStatus(Status.KILLED);
     assertThreadShutDown();
   }
 
@@ -756,7 +756,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
     assertStatus("jobb:innerJobB", Status.FAILED);
     assertStatus("jobb", Status.FAILED_FINISHING);
-    assertFlowStatus(Status.FAILED_FINISHING);
+    waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
 
     this.runner.kill("me");
 
@@ -770,7 +770,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobe", Status.CANCELLED);
     assertStatus("jobf", Status.CANCELLED);
 
-    assertFlowStatus(Status.KILLED);
+    waitForAndAssertFlowStatus(Status.KILLED);
     assertThreadShutDown();
   }
 
@@ -793,11 +793,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     InteractiveTestJob.getTestJob("joba").succeedJob();
     // 2.1 JOB A COMPLETES SUCCESSFULLY AFTER PAUSE
     assertStatus("joba", Status.SUCCEEDED);
-    assertFlowStatus(Status.PAUSED);
+    waitForAndAssertFlowStatus(Status.PAUSED);
 
     // 2.2 Flow is unpaused
     this.runner.resume("test");
-    assertFlowStatus(Status.RUNNING);
+    waitForAndAssertFlowStatus(Status.RUNNING);
     assertStatus("joba", Status.SUCCEEDED);
     assertStatus("joba1", Status.RUNNING);
     assertStatus("jobb", Status.RUNNING);
@@ -853,7 +853,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
 
     InteractiveTestJob.getTestJob("jobf").succeedJob();
     assertStatus("jobf", Status.SUCCEEDED);
-    assertFlowStatus(Status.SUCCEEDED);
+    waitForAndAssertFlowStatus(Status.SUCCEEDED);
     assertThreadShutDown();
   }
 
@@ -884,7 +884,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobb:innerJobA", Status.RUNNING);
 
     this.runner.pause("me");
-    assertFlowStatus(Status.PAUSED);
+    waitForAndAssertFlowStatus(Status.PAUSED);
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
     assertStatus("jobb:innerJobA", Status.SUCCEEDED);
@@ -902,7 +902,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobe", Status.CANCELLED);
     assertStatus("jobf", Status.CANCELLED);
 
-    assertFlowStatus(Status.KILLED);
+    waitForAndAssertFlowStatus(Status.KILLED);
     assertThreadShutDown();
   }
 
@@ -934,7 +934,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobb:innerJobA", Status.RUNNING);
 
     this.runner.pause("me");
-    assertFlowStatus(Status.PAUSED);
+    waitForAndAssertFlowStatus(Status.PAUSED);
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
     assertStatus("jobd:innerJobA", Status.FAILED);
@@ -945,7 +945,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     // If we would resume before the job failure has been completely processed, FlowRunner would be
     // able to start some new jobs instead of cancelling everything.
     waitEventFired("jobd:innerJobA", Status.FAILED);
-    assertFlowStatus(Status.PAUSED);
+    waitForAndAssertFlowStatus(Status.PAUSED);
 
     this.runner.resume("me");
     assertStatus("jobb:innerJobB", Status.CANCELLED);
@@ -962,7 +962,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobf", Status.CANCELLED);
     assertStatus("jobe", Status.CANCELLED);
 
-    assertFlowStatus(Status.FAILED);
+    waitForAndAssertFlowStatus(Status.FAILED);
     assertThreadShutDown();
   }
 
@@ -995,7 +995,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobb:innerJobA", Status.RUNNING);
 
     this.runner.pause("me");
-    assertFlowStatus(Status.PAUSED);
+    waitForAndAssertFlowStatus(Status.PAUSED);
     InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
     InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
     assertStatus("jobd:innerJobA", Status.FAILED);
@@ -1021,7 +1021,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobe", Status.CANCELLED);
     assertStatus("jobf", Status.CANCELLED);
 
-    assertFlowStatus(Status.FAILED);
+    waitForAndAssertFlowStatus(Status.FAILED);
     assertThreadShutDown();
   }
 
@@ -1046,7 +1046,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
       }
     }
 
-    assertFlowStatus(Status.KILLED);
+    waitForAndAssertFlowStatus(Status.KILLED);
     assertThreadShutDown();
   }
 
@@ -1077,7 +1077,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobb:innerJobA", Status.RUNNING);
 
     this.runner.pause("me");
-    assertFlowStatus(Status.PAUSED);
+    waitForAndAssertFlowStatus(Status.PAUSED);
     InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
     assertStatus("jobd:innerJobA", Status.FAILED);
     assertStatus("jobd:innerFlow2", Status.CANCELLED);
@@ -1092,7 +1092,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
     assertStatus("jobf", Status.CANCELLED);
     assertStatus("joba1", Status.KILLED);
 
-    assertFlowStatus(Status.KILLED);
+    waitForAndAssertFlowStatus(Status.KILLED);
     assertThreadShutDown();
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
index 13350d2..55f61fa 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 package azkaban.execapp;
 
 import static org.junit.Assert.assertEquals;
@@ -101,7 +117,7 @@ public class FlowRunnerTestBase {
     return true;
   }
 
-  protected void assertFlowStatus(final Status status) {
+  protected void waitForAndAssertFlowStatus(final Status status) {
     final ExecutableFlow flow = this.runner.getExecutableFlow();
     StatusTestUtils.waitForStatus(flow, status);
     printStatuses(status, flow);