diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index e8ea268..be40548 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014 LinkedIn Corp.
+ * Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -85,7 +85,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
startThread(this.runner);
succeedJobs("job3", "job4", "job6");
- assertFlowStatus(Status.SUCCEEDED);
+ waitForAndAssertFlowStatus(Status.SUCCEEDED);
assertThreadShutDown();
compareFinishedRuntime(this.runner);
@@ -119,7 +119,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
this.runner = createFlowRunner(exFlow, this.loader, eventCollector);
Assert.assertTrue(!this.runner.isKilled());
- assertFlowStatus(Status.READY);
+ waitForAndAssertFlowStatus(Status.READY);
startThread(this.runner);
succeedJobs("job3", "job4");
@@ -127,7 +127,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertThreadShutDown();
compareFinishedRuntime(this.runner);
- assertFlowStatus(Status.SUCCEEDED);
+ waitForAndAssertFlowStatus(Status.SUCCEEDED);
assertStatus("job1", Status.SKIPPED);
assertStatus("job2", Status.SUCCEEDED);
@@ -156,7 +156,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
succeedJobs("job6");
Assert.assertTrue(!this.runner.isKilled());
- assertFlowStatus(Status.FAILED);
+ waitForAndAssertFlowStatus(Status.FAILED);
assertStatus("job1", Status.SUCCEEDED);
assertStatus("job2d", Status.FAILED);
@@ -189,7 +189,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
Assert.assertTrue(this.runner.isKilled());
- assertFlowStatus(Status.KILLED);
+ waitForAndAssertFlowStatus(Status.KILLED);
assertStatus("job1", Status.SUCCEEDED);
assertStatus("job2d", Status.FAILED);
@@ -219,7 +219,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
startThread(this.runner);
succeedJobs("job3");
- assertFlowStatus(Status.FAILED);
+ waitForAndAssertFlowStatus(Status.FAILED);
assertStatus("job1", Status.SUCCEEDED);
assertStatus("job2d", Status.FAILED);
@@ -261,7 +261,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertStatus("job6", Status.KILLED);
assertThreadShutDown();
- assertFlowStatus(Status.KILLED);
+ waitForAndAssertFlowStatus(Status.KILLED);
eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@@ -283,7 +283,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
assertAttempts("job-pass", 0);
assertAttempts("job-retry-fail", 2);
- assertFlowStatus(Status.FAILED);
+ waitForAndAssertFlowStatus(Status.FAILED);
}
private void startThread(final FlowRunner runner) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 974df4c..06b4b2c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -269,7 +269,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
InteractiveTestJob.getTestJob("jobf").succeedJob();
assertStatus("jobf", Status.SUCCEEDED);
- assertFlowStatus(Status.SUCCEEDED);
+ waitForAndAssertFlowStatus(Status.SUCCEEDED);
}
/**
@@ -324,7 +324,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
InteractiveTestJob.getTestJob("jobf").succeedJob();
assertStatus("jobf", Status.SUCCEEDED);
- assertFlowStatus(Status.SUCCEEDED);
+ waitForAndAssertFlowStatus(Status.SUCCEEDED);
assertThreadShutDown();
}
@@ -347,7 +347,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
// 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
InteractiveTestJob.getTestJob("joba").failJob();
- assertFlowStatus(Status.FAILED_FINISHING);
+ waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
assertStatus("joba", Status.FAILED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.CANCELLED);
@@ -363,7 +363,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
/// innerJobA completes
InteractiveTestJob.getTestJob("joba1").succeedJob();
assertStatus("jobf", Status.CANCELLED);
- assertFlowStatus(Status.FAILED);
+ waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@@ -395,7 +395,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
InteractiveTestJob.getTestJob("joba1").failJob();
assertStatus("joba1", Status.FAILED);
- assertFlowStatus(Status.FAILED_FINISHING);
+ waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
// 3. joba completes, everything is killed
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
@@ -408,13 +408,13 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobb", Status.KILLED);
assertStatus("jobd", Status.KILLED);
- assertFlowStatus(Status.FAILED_FINISHING);
+ waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
InteractiveTestJob.getTestJob("jobc").succeedJob();
assertStatus("jobc", Status.SUCCEEDED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
- assertFlowStatus(Status.FAILED);
+ waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@@ -450,7 +450,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
assertStatus("jobb", Status.FAILED_FINISHING);
assertStatus("jobb:innerJobB", Status.FAILED);
- assertFlowStatus(Status.FAILED_FINISHING);
+ waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
@@ -466,7 +466,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobc", Status.SUCCEEDED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
- assertFlowStatus(Status.FAILED);
+ waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@@ -508,7 +508,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
assertStatus("jobb", Status.FAILED_FINISHING);
assertStatus("jobb:innerJobB", Status.FAILED);
- assertFlowStatus(Status.FAILED_FINISHING);
+ waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
@@ -527,7 +527,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobc", Status.SUCCEEDED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
- assertFlowStatus(Status.FAILED);
+ waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@@ -580,7 +580,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobf", Status.CANCELLED);
assertThreadShutDown();
- assertFlowStatus(Status.KILLED);
+ waitForAndAssertFlowStatus(Status.KILLED);
}
/**
@@ -622,7 +622,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobd:innerJobA", Status.SUCCEEDED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobd", Status.KILLED);
- assertFlowStatus(Status.FAILED_FINISHING);
+ waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
final ExecutableNode node = this.runner.getExecutableFlow()
.getExecutableNodePath("jobd:innerFlow2");
@@ -641,7 +641,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobd", Status.RUNNING);
assertStatus("jobb:innerFlow", Status.DISABLED);
assertStatus("jobd:innerFlow2", Status.RUNNING);
- assertFlowStatus(Status.RUNNING);
+ waitForAndAssertFlowStatus(Status.RUNNING);
assertThreadRunning();
InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
@@ -666,7 +666,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
InteractiveTestJob.getTestJob("jobf").succeedJob();
assertStatus("jobf", Status.SUCCEEDED);
- assertFlowStatus(Status.SUCCEEDED);
+ waitForAndAssertFlowStatus(Status.SUCCEEDED);
assertThreadShutDown();
}
@@ -717,7 +717,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
- assertFlowStatus(Status.KILLED);
+ waitForAndAssertFlowStatus(Status.KILLED);
assertThreadShutDown();
}
@@ -756,7 +756,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
assertStatus("jobb:innerJobB", Status.FAILED);
assertStatus("jobb", Status.FAILED_FINISHING);
- assertFlowStatus(Status.FAILED_FINISHING);
+ waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
this.runner.kill("me");
@@ -770,7 +770,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
- assertFlowStatus(Status.KILLED);
+ waitForAndAssertFlowStatus(Status.KILLED);
assertThreadShutDown();
}
@@ -793,11 +793,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
InteractiveTestJob.getTestJob("joba").succeedJob();
// 2.1 JOB A COMPLETES SUCCESSFULLY AFTER PAUSE
assertStatus("joba", Status.SUCCEEDED);
- assertFlowStatus(Status.PAUSED);
+ waitForAndAssertFlowStatus(Status.PAUSED);
// 2.2 Flow is unpaused
this.runner.resume("test");
- assertFlowStatus(Status.RUNNING);
+ waitForAndAssertFlowStatus(Status.RUNNING);
assertStatus("joba", Status.SUCCEEDED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.RUNNING);
@@ -853,7 +853,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
InteractiveTestJob.getTestJob("jobf").succeedJob();
assertStatus("jobf", Status.SUCCEEDED);
- assertFlowStatus(Status.SUCCEEDED);
+ waitForAndAssertFlowStatus(Status.SUCCEEDED);
assertThreadShutDown();
}
@@ -884,7 +884,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobb:innerJobA", Status.RUNNING);
this.runner.pause("me");
- assertFlowStatus(Status.PAUSED);
+ waitForAndAssertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
@@ -902,7 +902,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
- assertFlowStatus(Status.KILLED);
+ waitForAndAssertFlowStatus(Status.KILLED);
assertThreadShutDown();
}
@@ -934,7 +934,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobb:innerJobA", Status.RUNNING);
this.runner.pause("me");
- assertFlowStatus(Status.PAUSED);
+ waitForAndAssertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
assertStatus("jobd:innerJobA", Status.FAILED);
@@ -945,7 +945,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
// If we would resume before the job failure has been completely processed, FlowRunner would be
// able to start some new jobs instead of cancelling everything.
waitEventFired("jobd:innerJobA", Status.FAILED);
- assertFlowStatus(Status.PAUSED);
+ waitForAndAssertFlowStatus(Status.PAUSED);
this.runner.resume("me");
assertStatus("jobb:innerJobB", Status.CANCELLED);
@@ -962,7 +962,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobf", Status.CANCELLED);
assertStatus("jobe", Status.CANCELLED);
- assertFlowStatus(Status.FAILED);
+ waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@@ -995,7 +995,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobb:innerJobA", Status.RUNNING);
this.runner.pause("me");
- assertFlowStatus(Status.PAUSED);
+ waitForAndAssertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
assertStatus("jobd:innerJobA", Status.FAILED);
@@ -1021,7 +1021,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
- assertFlowStatus(Status.FAILED);
+ waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@@ -1046,7 +1046,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
}
}
- assertFlowStatus(Status.KILLED);
+ waitForAndAssertFlowStatus(Status.KILLED);
assertThreadShutDown();
}
@@ -1077,7 +1077,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobb:innerJobA", Status.RUNNING);
this.runner.pause("me");
- assertFlowStatus(Status.PAUSED);
+ waitForAndAssertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
assertStatus("jobd:innerJobA", Status.FAILED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
@@ -1092,7 +1092,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertStatus("jobf", Status.CANCELLED);
assertStatus("joba1", Status.KILLED);
- assertFlowStatus(Status.KILLED);
+ waitForAndAssertFlowStatus(Status.KILLED);
assertThreadShutDown();
}