azkaban-aplcache

Improve JobRunnerTest stability (#1373) - Remove some redundant

8/21/2017 12:13:37 PM

Details

diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
index 83c41bf..13350d2 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -28,13 +28,13 @@ public class FlowRunnerTestBase {
   public void assertThreadShutDown() {
     waitFlowRunner(
         runner -> Status.isStatusFinished(runner.getExecutableFlow().getStatus())
-        && !runner.isRunnerThreadAlive());
+            && !runner.isRunnerThreadAlive());
   }
 
   public void assertThreadRunning() {
     waitFlowRunner(
         runner -> Status.isStatusRunning(runner.getExecutableFlow().getStatus())
-        && runner.isRunnerThreadAlive());
+            && runner.isRunnerThreadAlive());
   }
 
   public void waitFlowRunner(final Function<FlowRunner, Boolean> statusCheck) {
@@ -103,7 +103,7 @@ public class FlowRunnerTestBase {
 
   protected void assertFlowStatus(final Status status) {
     final ExecutableFlow flow = this.runner.getExecutableFlow();
-    waitForStatus(flow, status);
+    StatusTestUtils.waitForStatus(flow, status);
     printStatuses(status, flow);
     assertEquals(status, flow.getStatus());
   }
@@ -112,26 +112,11 @@ public class FlowRunnerTestBase {
     final ExecutableFlow exFlow = this.runner.getExecutableFlow();
     final ExecutableNode node = exFlow.getExecutableNodePath(name);
     assertNotNull(name + " wasn't found", node);
-    waitForStatus(node, status);
+    StatusTestUtils.waitForStatus(node, status);
     printStatuses(status, node);
     assertEquals("Wrong status for [" + name + "]", status, node.getStatus());
   }
 
-  private void waitForStatus(ExecutableNode node, Status status) {
-    for (int i = 0; i < 1000; i++) {
-      if (node.getStatus() == status) {
-        break;
-      }
-      synchronized (EventCollectorListener.handleEvent) {
-        try {
-          EventCollectorListener.handleEvent.wait(10L);
-        } catch (final InterruptedException e) {
-          i--;
-        }
-      }
-    }
-  }
-
   protected void printStatuses(final Status status, final ExecutableNode node) {
     if (status != node.getStatus()) {
       printTestJobs();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index a513c8e..765b141 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -286,9 +286,11 @@ public class JobRunnerTest {
     final Thread thread = new Thread(runner);
     thread.start();
 
-    Thread.sleep(2000);
+    StatusTestUtils.waitForStatus(node, Status.READY);
+    // sleep so that job has time to get into delayExecution() -> wait()
+    Thread.sleep(1000L);
     runner.kill();
-    Thread.sleep(500);
+    StatusTestUtils.waitForStatus(node, Status.KILLED);
 
     eventCollector.handleEvent(Event.create(null, Event.Type.JOB_FINISHED, new EventData(node)));
 
@@ -297,8 +299,8 @@ public class JobRunnerTest {
         node.getStatus() == Status.KILLED);
     Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
     Assert.assertTrue(node.getEndTime() - node.getStartTime() < 1000);
-    Assert.assertTrue(node.getStartTime() - startTime >= 2000);
-    Assert.assertTrue(node.getStartTime() - startTime <= 5000);
+    Assert.assertTrue(node.getStartTime() - startTime >= 1000);
+    Assert.assertTrue(node.getStartTime() - startTime <= 4000);
     Assert.assertTrue(runner.isKilled());
 
     final File logFile = new File(runner.getLogFilePath());
@@ -306,6 +308,9 @@ public class JobRunnerTest {
     Assert.assertTrue(outputProps == null);
     Assert.assertTrue(logFile.exists());
 
+    // sleep so that there's time to make the "DB update" for KILLED status
+    Thread.sleep(1000L);
+    Assert.assertEquals(2L, loader.getNodeUpdateCount("testJob").longValue());
     Assert.assertEquals(2L, (long) loader.getNodeUpdateCount("testJob"));
     eventCollector.assertEvents(Type.JOB_FINISHED);
   }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/StatusTestUtils.java b/azkaban-exec-server/src/test/java/azkaban/execapp/StatusTestUtils.java
new file mode 100644
index 0000000..65645ab
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/StatusTestUtils.java
@@ -0,0 +1,23 @@
+package azkaban.execapp;
+
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+
+public class StatusTestUtils {
+
+  public static void waitForStatus(final ExecutableNode node, final Status status) {
+    for (int i = 0; i < 1000; i++) {
+      if (node.getStatus() == status) {
+        break;
+      }
+      synchronized (EventCollectorListener.handleEvent) {
+        try {
+          EventCollectorListener.handleEvent.wait(10L);
+        } catch (final InterruptedException e) {
+          i--;
+        }
+      }
+    }
+  }
+
+}