diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index ea46cef..e7f0bcd 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -16,8 +16,6 @@
package azkaban.execapp;
-import static org.assertj.core.api.Assertions.assertThat;
-
import azkaban.event.Event;
import azkaban.event.EventData;
import azkaban.executor.ExecutableFlow;
@@ -26,22 +24,27 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
-import azkaban.jobExecutor.ProcessJob;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.spi.EventType;
+import azkaban.test.TestUtils;
import azkaban.utils.Props;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Thread.State.TIMED_WAITING;
+import static java.lang.Thread.State.WAITING;
+import static org.assertj.core.api.Assertions.assertThat;
+
public class JobRunnerTest {
private final Logger logger = Logger.getLogger("JobRunnerTest");
@@ -107,13 +110,12 @@ public class JobRunnerTest {
.assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
- @Ignore
@Test
public void testFailedRun() {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
- createJobRunner(1, "testJob", 1, true, loader, eventCollector);
+ createJobRunner(1, "testJob", 0, true, loader, eventCollector);
final ExecutableNode node = runner.getNode();
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
@@ -122,12 +124,11 @@ public class JobRunnerTest {
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue(node.getStatus() == Status.FAILED);
- Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
- Assert.assertTrue(node.getEndTime() - node.getStartTime() > 1000);
+ Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() >= 0);
final File logFile = new File(runner.getLogFilePath());
final Props outputProps = runner.getNode().getOutputProps();
- Assert.assertTrue(outputProps == null);
+ Assert.assertEquals(0, outputProps.size());
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
Assert.assertTrue(!runner.isKilled());
@@ -142,7 +143,7 @@ public class JobRunnerTest {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
- createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+ createJobRunner(1, "testJob", 0, false, loader, eventCollector);
final ExecutableNode node = runner.getNode();
node.setStatus(Status.DISABLED);
@@ -154,8 +155,8 @@ public class JobRunnerTest {
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue(node.getStatus() == Status.SKIPPED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
- // Give it 10 ms to fail.
- Assert.assertTrue(node.getEndTime() - node.getStartTime() < 10);
+ // Give it 2000 ms to fail.
+ Assert.assertTrue(node.getEndTime() - node.getStartTime() < 2000);
// Log file and output files should not exist.
final Props outputProps = runner.getNode().getOutputProps();
@@ -173,7 +174,7 @@ public class JobRunnerTest {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
- createJobRunner(1, "testJob", 1, false, loader, eventCollector);
+ createJobRunner(1, "testJob", 0, false, loader, eventCollector);
final ExecutableNode node = runner.getNode();
node.setStatus(Status.KILLED);
@@ -186,8 +187,8 @@ public class JobRunnerTest {
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue(node.getStatus() == Status.KILLED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
- // Give it 10 ms to fail.
- Assert.assertTrue(node.getEndTime() - node.getStartTime() < 10);
+ // Give it 2000 ms to fail.
+ Assert.assertTrue(node.getEndTime() - node.getStartTime() < 2000);
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
@@ -199,11 +200,8 @@ public class JobRunnerTest {
eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
}
- @Ignore
@Test
- // todo: HappyRay investigate if it is worth fixing this test. If not, remove it.
- // The change history doesn't mention why this test was ignored.
- public void testCancelRun() throws InterruptedException {
+ public void testCancelRun() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
@@ -216,22 +214,22 @@ public class JobRunnerTest {
final Thread thread = new Thread(runner);
thread.start();
- Thread.sleep(2000);
+ StatusTestUtils.waitForStatus(node, Status.RUNNING);
runner.kill();
- Thread.sleep(500);
+ assertThreadIsNotAlive(thread);
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Status is " + node.getStatus(),
node.getStatus() == Status.KILLED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
- // Give it 10 ms to fail.
+ // Give it some time to fail.
Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
// Log file and output files should not exist.
final File logFile = new File(runner.getLogFilePath());
final Props outputProps = runner.getNode().getOutputProps();
- Assert.assertTrue(outputProps == null);
+ Assert.assertEquals(0, outputProps.size());
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
Assert.assertTrue(runner.isKilled());
@@ -239,29 +237,36 @@ public class JobRunnerTest {
.assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
- @Ignore
@Test
- public void testDelayedExecutionJob() {
+ public void testDelayedExecutionJob() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
- createJobRunner(1, "testJob", 1, false, loader, eventCollector);
- runner.setDelayStart(5000);
+ createJobRunner(1, "testJob", 0, false, loader, eventCollector);
+ runner.setDelayStart(10_000);
final long startTime = System.currentTimeMillis();
final ExecutableNode node = runner.getNode();
eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
- runner.run();
+ final Thread thread = new Thread(runner);
+ thread.start();
+
+ // wait for job to get into delayExecution() -> wait()
+ assertThreadIsWaiting(thread);
+ // Wake up delayExecution() -> wait()
+ notifyWaiting(runner);
+ assertThreadIsNotAlive(thread);
+
eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
node.getStatus() == Status.SUCCEEDED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
- Assert.assertTrue(node.getEndTime() - node.getStartTime() > 1000);
- Assert.assertTrue(node.getStartTime() - startTime >= 5000);
+ Assert.assertTrue(node.getEndTime() - node.getStartTime() >= 0);
+ Assert.assertTrue(node.getStartTime() - startTime >= 0);
final File logFile = new File(runner.getLogFilePath());
final Props outputProps = runner.getNode().getOutputProps();
@@ -276,12 +281,12 @@ public class JobRunnerTest {
}
@Test
- public void testDelayedExecutionCancelledJob() throws InterruptedException {
+ public void testDelayedExecutionCancelledJob() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
- createJobRunner(1, "testJob", 1, false, loader, eventCollector);
- runner.setDelayStart(5000);
+ createJobRunner(1, "testJob", 0, false, loader, eventCollector);
+ runner.setDelayStart(10_000);
final long startTime = System.currentTimeMillis();
final ExecutableNode node = runner.getNode();
@@ -292,8 +297,8 @@ public class JobRunnerTest {
thread.start();
StatusTestUtils.waitForStatus(node, Status.READY);
- // sleep so that job has time to get into delayExecution() -> wait()
- Thread.sleep(2000L);
+ // wait for job to get into delayExecution() -> wait()
+ assertThreadIsWaiting(thread);
runner.kill();
StatusTestUtils.waitForStatus(node, Status.KILLED);
@@ -304,7 +309,7 @@ public class JobRunnerTest {
node.getStatus() == Status.KILLED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
Assert.assertTrue(node.getEndTime() - node.getStartTime() < 1000);
- Assert.assertTrue(node.getStartTime() - startTime >= 2000);
+ Assert.assertTrue(node.getStartTime() - startTime >= 0);
Assert.assertTrue(node.getStartTime() - startTime <= 5000);
Assert.assertTrue(runner.isKilled());
@@ -314,7 +319,7 @@ public class JobRunnerTest {
Assert.assertTrue(logFile.exists());
// wait so that there's time to make the "DB update" for KILLED status
- azkaban.test.TestUtils.await().untilAsserted(
+ TestUtils.await().untilAsserted(
() -> assertThat(loader.getNodeUpdateCount("testJob")).isEqualTo(2));
eventCollector.assertEvents(EventType.JOB_FINISHED);
}
@@ -322,23 +327,14 @@ public class JobRunnerTest {
private Props createProps(final int sleepSec, final boolean fail) {
final Props props = new Props();
props.put("type", "test");
- // TODO always use 0 if "immediate succeess" and 10 if "wait until killed"
- // 0 makes the tests as quick as possible and 10 provides stability when test run is slow
props.put("seconds", sleepSec);
- props.put(ProcessJob.WORKING_DIR, this.workingDir.getPath());
props.put("fail", String.valueOf(fail));
-
return props;
}
private JobRunner createJobRunner(final int execId, final String name, final int time,
final boolean fail, final ExecutorLoader loader, final EventCollectorListener listener) {
- return createJobRunner(execId, name, time, fail, loader, listener, new Props());
- }
-
- private JobRunner createJobRunner(final int execId, final String name, final int time,
- final boolean fail, final ExecutorLoader loader, final EventCollectorListener listener,
- final Props azkabanProps) {
+ final Props azkabanProps = new Props();
final ExecutableFlow flow = new ExecutableFlow();
flow.setExecutionId(execId);
final ExecutableNode node = new ExecutableNode();
@@ -357,4 +353,20 @@ public class JobRunnerTest {
return runner;
}
+ private void assertThreadIsWaiting(final Thread thread) throws Exception {
+ TestUtils.await().until(
+ () -> thread.getState() == TIMED_WAITING || thread.getState() == WAITING);
+ }
+
+ private void assertThreadIsNotAlive(final Thread thread) throws Exception {
+ thread.join(9000L);
+ TestUtils.await().atMost(1000L, TimeUnit.MILLISECONDS).until(() -> !thread.isAlive());
+ }
+
+ private void notifyWaiting(final Object monitor) {
+ synchronized (monitor) {
+ monitor.notifyAll();
+ }
+ }
+
}