azkaban-aplcache

Enable all the rest in JobRunnerTest (#1479) The tests that

10/5/2017 2:41:04 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index a82486a..21e6b50 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -50,8 +50,8 @@ public class ExecutableNode {
   private String id;
   private String type = null;
   private volatile Status status = Status.READY;
-  private long startTime = -1;
-  private long endTime = -1;
+  private volatile long startTime = -1;
+  private volatile long endTime = -1;
   private long updateTime = -1;
   private volatile boolean killedBySLA = false;
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index ea46cef..e7f0bcd 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -16,8 +16,6 @@
 
 package azkaban.execapp;
 
-import static org.assertj.core.api.Assertions.assertThat;
-
 import azkaban.event.Event;
 import azkaban.event.EventData;
 import azkaban.executor.ExecutableFlow;
@@ -26,22 +24,27 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
-import azkaban.jobExecutor.ProcessJob;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
 import azkaban.spi.EventType;
+import azkaban.test.TestUtils;
 import azkaban.utils.Props;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Thread.State.TIMED_WAITING;
+import static java.lang.Thread.State.WAITING;
+import static org.assertj.core.api.Assertions.assertThat;
+
 public class JobRunnerTest {
 
   private final Logger logger = Logger.getLogger("JobRunnerTest");
@@ -107,13 +110,12 @@ public class JobRunnerTest {
         .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
-  @Ignore
   @Test
   public void testFailedRun() {
     final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final JobRunner runner =
-        createJobRunner(1, "testJob", 1, true, loader, eventCollector);
+        createJobRunner(1, "testJob", 0, true, loader, eventCollector);
     final ExecutableNode node = runner.getNode();
 
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
@@ -122,12 +124,11 @@ public class JobRunnerTest {
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue(node.getStatus() == Status.FAILED);
-    Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
-    Assert.assertTrue(node.getEndTime() - node.getStartTime() > 1000);
+    Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() >= 0);
 
     final File logFile = new File(runner.getLogFilePath());
     final Props outputProps = runner.getNode().getOutputProps();
-    Assert.assertTrue(outputProps == null);
+    Assert.assertEquals(0, outputProps.size());
     Assert.assertTrue(logFile.exists());
     Assert.assertTrue(eventCollector.checkOrdering());
     Assert.assertTrue(!runner.isKilled());
@@ -142,7 +143,7 @@ public class JobRunnerTest {
     final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final JobRunner runner =
-        createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+        createJobRunner(1, "testJob", 0, false, loader, eventCollector);
     final ExecutableNode node = runner.getNode();
 
     node.setStatus(Status.DISABLED);
@@ -154,8 +155,8 @@ public class JobRunnerTest {
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue(node.getStatus() == Status.SKIPPED);
     Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
-    // Give it 10 ms to fail.
-    Assert.assertTrue(node.getEndTime() - node.getStartTime() < 10);
+    // Give it 2000 ms to fail.
+    Assert.assertTrue(node.getEndTime() - node.getStartTime() < 2000);
 
     // Log file and output files should not exist.
     final Props outputProps = runner.getNode().getOutputProps();
@@ -173,7 +174,7 @@ public class JobRunnerTest {
     final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final JobRunner runner =
-        createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+        createJobRunner(1, "testJob", 0, false, loader, eventCollector);
     final ExecutableNode node = runner.getNode();
 
     node.setStatus(Status.KILLED);
@@ -186,8 +187,8 @@ public class JobRunnerTest {
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue(node.getStatus() == Status.KILLED);
     Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
-    // Give it 10 ms to fail.
-    Assert.assertTrue(node.getEndTime() - node.getStartTime() < 10);
+    // Give it 2000 ms to fail.
+    Assert.assertTrue(node.getEndTime() - node.getStartTime() < 2000);
 
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
 
@@ -199,11 +200,8 @@ public class JobRunnerTest {
     eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
   }
 
-  @Ignore
   @Test
-  // todo: HappyRay investigate if it is worth fixing this test. If not, remove it.
-  // The change history doesn't mention why this test was ignored.
-  public void testCancelRun() throws InterruptedException {
+  public void testCancelRun() throws Exception {
     final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final JobRunner runner =
@@ -216,22 +214,22 @@ public class JobRunnerTest {
     final Thread thread = new Thread(runner);
     thread.start();
 
-    Thread.sleep(2000);
+    StatusTestUtils.waitForStatus(node, Status.RUNNING);
     runner.kill();
-    Thread.sleep(500);
+    assertThreadIsNotAlive(thread);
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Status is " + node.getStatus(),
         node.getStatus() == Status.KILLED);
     Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
-    // Give it 10 ms to fail.
+    // Give it some time to fail.
     Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);
     Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
 
     // Log file and output files should not exist.
     final File logFile = new File(runner.getLogFilePath());
     final Props outputProps = runner.getNode().getOutputProps();
-    Assert.assertTrue(outputProps == null);
+    Assert.assertEquals(0, outputProps.size());
     Assert.assertTrue(logFile.exists());
     Assert.assertTrue(eventCollector.checkOrdering());
     Assert.assertTrue(runner.isKilled());
@@ -239,29 +237,36 @@ public class JobRunnerTest {
         .assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
   }
 
-  @Ignore
   @Test
-  public void testDelayedExecutionJob() {
+  public void testDelayedExecutionJob() throws Exception {
     final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final JobRunner runner =
-        createJobRunner(1, "testJob", 1, false, loader, eventCollector);
-    runner.setDelayStart(5000);
+        createJobRunner(1, "testJob", 0, false, loader, eventCollector);
+    runner.setDelayStart(10_000);
     final long startTime = System.currentTimeMillis();
     final ExecutableNode node = runner.getNode();
 
     eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
     Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
 
-    runner.run();
+    final Thread thread = new Thread(runner);
+    thread.start();
+
+    // wait for job to get into delayExecution() -> wait()
+    assertThreadIsWaiting(thread);
+    // Wake up delayExecution() -> wait()
+    notifyWaiting(runner);
+    assertThreadIsNotAlive(thread);
+
     eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
 
     Assert.assertTrue(runner.getStatus() == node.getStatus());
     Assert.assertTrue("Node status is " + node.getStatus(),
         node.getStatus() == Status.SUCCEEDED);
     Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
-    Assert.assertTrue(node.getEndTime() - node.getStartTime() > 1000);
-    Assert.assertTrue(node.getStartTime() - startTime >= 5000);
+    Assert.assertTrue(node.getEndTime() - node.getStartTime() >= 0);
+    Assert.assertTrue(node.getStartTime() - startTime >= 0);
 
     final File logFile = new File(runner.getLogFilePath());
     final Props outputProps = runner.getNode().getOutputProps();
@@ -276,12 +281,12 @@ public class JobRunnerTest {
   }
 
   @Test
-  public void testDelayedExecutionCancelledJob() throws InterruptedException {
+  public void testDelayedExecutionCancelledJob() throws Exception {
     final MockExecutorLoader loader = new MockExecutorLoader();
     final EventCollectorListener eventCollector = new EventCollectorListener();
     final JobRunner runner =
-        createJobRunner(1, "testJob", 1, false, loader, eventCollector);
-    runner.setDelayStart(5000);
+        createJobRunner(1, "testJob", 0, false, loader, eventCollector);
+    runner.setDelayStart(10_000);
     final long startTime = System.currentTimeMillis();
     final ExecutableNode node = runner.getNode();
 
@@ -292,8 +297,8 @@ public class JobRunnerTest {
     thread.start();
 
     StatusTestUtils.waitForStatus(node, Status.READY);
-    // sleep so that job has time to get into delayExecution() -> wait()
-    Thread.sleep(2000L);
+    // wait for job to get into delayExecution() -> wait()
+    assertThreadIsWaiting(thread);
     runner.kill();
     StatusTestUtils.waitForStatus(node, Status.KILLED);
 
@@ -304,7 +309,7 @@ public class JobRunnerTest {
         node.getStatus() == Status.KILLED);
     Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
     Assert.assertTrue(node.getEndTime() - node.getStartTime() < 1000);
-    Assert.assertTrue(node.getStartTime() - startTime >= 2000);
+    Assert.assertTrue(node.getStartTime() - startTime >= 0);
     Assert.assertTrue(node.getStartTime() - startTime <= 5000);
     Assert.assertTrue(runner.isKilled());
 
@@ -314,7 +319,7 @@ public class JobRunnerTest {
     Assert.assertTrue(logFile.exists());
 
     // wait so that there's time to make the "DB update" for KILLED status
-    azkaban.test.TestUtils.await().untilAsserted(
+    TestUtils.await().untilAsserted(
         () -> assertThat(loader.getNodeUpdateCount("testJob")).isEqualTo(2));
     eventCollector.assertEvents(EventType.JOB_FINISHED);
   }
@@ -322,23 +327,14 @@ public class JobRunnerTest {
   private Props createProps(final int sleepSec, final boolean fail) {
     final Props props = new Props();
     props.put("type", "test");
-    // TODO always use 0 if "immediate succeess" and 10 if "wait until killed"
-    // 0 makes the tests as quick as possible and 10 provides stability when test run is slow
     props.put("seconds", sleepSec);
-    props.put(ProcessJob.WORKING_DIR, this.workingDir.getPath());
     props.put("fail", String.valueOf(fail));
-
     return props;
   }
 
   private JobRunner createJobRunner(final int execId, final String name, final int time,
       final boolean fail, final ExecutorLoader loader, final EventCollectorListener listener) {
-    return createJobRunner(execId, name, time, fail, loader, listener, new Props());
-  }
-
-  private JobRunner createJobRunner(final int execId, final String name, final int time,
-      final boolean fail, final ExecutorLoader loader, final EventCollectorListener listener,
-      final Props azkabanProps) {
+    final Props azkabanProps = new Props();
     final ExecutableFlow flow = new ExecutableFlow();
     flow.setExecutionId(execId);
     final ExecutableNode node = new ExecutableNode();
@@ -357,4 +353,20 @@ public class JobRunnerTest {
     return runner;
   }
 
+  private void assertThreadIsWaiting(final Thread thread) throws Exception {
+    TestUtils.await().until(
+        () -> thread.getState() == TIMED_WAITING || thread.getState() == WAITING);
+  }
+
+  private void assertThreadIsNotAlive(final Thread thread) throws Exception {
+    thread.join(9000L);
+    TestUtils.await().atMost(1000L, TimeUnit.MILLISECONDS).until(() -> !thread.isAlive());
+  }
+
+  private void notifyWaiting(final Object monitor) {
+    synchronized (monitor) {
+      monitor.notifyAll();
+    }
+  }
+
 }