Details
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
index 83c41bf..13350d2 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -28,13 +28,13 @@ public class FlowRunnerTestBase {
public void assertThreadShutDown() {
waitFlowRunner(
runner -> Status.isStatusFinished(runner.getExecutableFlow().getStatus())
- && !runner.isRunnerThreadAlive());
+ && !runner.isRunnerThreadAlive());
}
public void assertThreadRunning() {
waitFlowRunner(
runner -> Status.isStatusRunning(runner.getExecutableFlow().getStatus())
- && runner.isRunnerThreadAlive());
+ && runner.isRunnerThreadAlive());
}
public void waitFlowRunner(final Function<FlowRunner, Boolean> statusCheck) {
@@ -103,7 +103,7 @@ public class FlowRunnerTestBase {
protected void assertFlowStatus(final Status status) {
final ExecutableFlow flow = this.runner.getExecutableFlow();
- waitForStatus(flow, status);
+ StatusTestUtils.waitForStatus(flow, status);
printStatuses(status, flow);
assertEquals(status, flow.getStatus());
}
@@ -112,26 +112,11 @@ public class FlowRunnerTestBase {
final ExecutableFlow exFlow = this.runner.getExecutableFlow();
final ExecutableNode node = exFlow.getExecutableNodePath(name);
assertNotNull(name + " wasn't found", node);
- waitForStatus(node, status);
+ StatusTestUtils.waitForStatus(node, status);
printStatuses(status, node);
assertEquals("Wrong status for [" + name + "]", status, node.getStatus());
}
- private void waitForStatus(ExecutableNode node, Status status) {
- for (int i = 0; i < 1000; i++) {
- if (node.getStatus() == status) {
- break;
- }
- synchronized (EventCollectorListener.handleEvent) {
- try {
- EventCollectorListener.handleEvent.wait(10L);
- } catch (final InterruptedException e) {
- i--;
- }
- }
- }
- }
-
protected void printStatuses(final Status status, final ExecutableNode node) {
if (status != node.getStatus()) {
printTestJobs();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index a513c8e..765b141 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -286,9 +286,11 @@ public class JobRunnerTest {
final Thread thread = new Thread(runner);
thread.start();
- Thread.sleep(2000);
+ StatusTestUtils.waitForStatus(node, Status.READY);
+ // sleep so that job has time to get into delayExecution() -> wait()
+ Thread.sleep(1000L);
runner.kill();
- Thread.sleep(500);
+ StatusTestUtils.waitForStatus(node, Status.KILLED);
eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
@@ -297,8 +299,8 @@ public class JobRunnerTest {
node.getStatus() == Status.KILLED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
Assert.assertTrue(node.getEndTime() - node.getStartTime() < 1000);
- Assert.assertTrue(node.getStartTime() - startTime >= 2000);
- Assert.assertTrue(node.getStartTime() - startTime <= 5000);
+ Assert.assertTrue(node.getStartTime() - startTime >= 1000);
+ Assert.assertTrue(node.getStartTime() - startTime <= 4000);
Assert.assertTrue(runner.isKilled());
final File logFile = new File(runner.getLogFilePath());
@@ -306,6 +308,9 @@ public class JobRunnerTest {
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
+ // sleep so that there's time to make the "DB update" for KILLED status
+ Thread.sleep(1000L);
+ Assert.assertEquals(2L, loader.getNodeUpdateCount("testJob").longValue());
Assert.assertEquals(2L, (long) loader.getNodeUpdateCount("testJob"));
eventCollector.assertEvents(Type.JOB_FINISHED);
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/StatusTestUtils.java b/azkaban-exec-server/src/test/java/azkaban/execapp/StatusTestUtils.java
new file mode 100644
index 0000000..65645ab
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/StatusTestUtils.java
@@ -0,0 +1,23 @@
+package azkaban.execapp;
+
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+
+public class StatusTestUtils {
+
+ public static void waitForStatus(final ExecutableNode node, final Status status) {
+ for (int i = 0; i < 1000; i++) {
+ if (node.getStatus() == status) {
+ break;
+ }
+ synchronized (EventCollectorListener.handleEvent) {
+ try {
+ EventCollectorListener.handleEvent.wait(10L);
+ } catch (final InterruptedException e) {
+ i--;
+ }
+ }
+ }
+ }
+
+}