/*
* Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.execapp;
import static org.assertj.core.api.Assertions.assertThat;
import azkaban.event.Event;
import azkaban.event.EventData;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.jobExecutor.ProcessJob;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.spi.EventType;
import azkaban.utils.Props;
import azkaban.utils.TestUtils;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class JobRunnerTest {
private final Logger logger = Logger.getLogger("JobRunnerTest");
private File workingDir;
private JobTypeManager jobtypeManager;
public JobRunnerTest() {
}
@Before
public void setUp() throws Exception {
System.out.println("Create temp dir");
this.workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
if (this.workingDir.exists()) {
FileUtils.deleteDirectory(this.workingDir);
}
this.workingDir.mkdirs();
this.jobtypeManager =
new JobTypeManager(null, null, this.getClass().getClassLoader());
final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
pluginSet.addPluginClass("test", InteractiveTestJob.class);
}
@After
public void tearDown() throws IOException {
System.out.println("Teardown temp dir");
if (this.workingDir != null) {
FileUtils.deleteDirectory(this.workingDir);
this.workingDir = null;
}
}
@Test
public void testBasicRun() {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
createJobRunner(1, "testJob", 0, false, loader, eventCollector);
final ExecutableNode node = runner.getNode();
eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
|| runner.getStatus() != Status.FAILED);
runner.run();
eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
node.getStatus() == Status.SUCCEEDED);
Assert.assertTrue(node.getStartTime() >= 0 && node.getEndTime() >= 0);
Assert.assertTrue(node.getEndTime() - node.getStartTime() >= 0);
final File logFile = new File(runner.getLogFilePath());
final Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps != null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
eventCollector
.assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
@Ignore
@Test
public void testFailedRun() {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
createJobRunner(1, "testJob", 1, true, loader, eventCollector);
final ExecutableNode node = runner.getNode();
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
|| runner.getStatus() != Status.FAILED);
runner.run();
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue(node.getStatus() == Status.FAILED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
Assert.assertTrue(node.getEndTime() - node.getStartTime() > 1000);
final File logFile = new File(runner.getLogFilePath());
final Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
Assert.assertTrue(!runner.isKilled());
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
eventCollector
.assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
@Test
public void testDisabledRun() {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
createJobRunner(1, "testJob", 1, false, loader, eventCollector);
final ExecutableNode node = runner.getNode();
node.setStatus(Status.DISABLED);
// Should be disabled.
Assert.assertTrue(runner.getStatus() == Status.DISABLED);
runner.run();
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue(node.getStatus() == Status.SKIPPED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
// Give it 10 ms to fail.
Assert.assertTrue(node.getEndTime() - node.getStartTime() < 10);
// Log file and output files should not exist.
final Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(runner.getLogFilePath() == null);
Assert.assertTrue(eventCollector.checkOrdering());
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
}
@Test
public void testPreKilledRun() {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
createJobRunner(1, "testJob", 1, false, loader, eventCollector);
final ExecutableNode node = runner.getNode();
node.setStatus(Status.KILLED);
// Should be killed.
Assert.assertTrue(runner.getStatus() == Status.KILLED);
runner.run();
// Should just skip the run and not change
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue(node.getStatus() == Status.KILLED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
// Give it 10 ms to fail.
Assert.assertTrue(node.getEndTime() - node.getStartTime() < 10);
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == null);
// Log file and output files should not exist.
final Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(runner.getLogFilePath() == null);
Assert.assertTrue(!runner.isKilled());
eventCollector.assertEvents(EventType.JOB_STARTED, EventType.JOB_FINISHED);
}
@Ignore
@Test
// todo: HappyRay investigate if it is worth fixing this test. If not, remove it.
// The change history doesn't mention why this test was ignored.
public void testCancelRun() throws InterruptedException {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
createJobRunner(13, "testJob", 10, false, loader, eventCollector);
final ExecutableNode node = runner.getNode();
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
|| runner.getStatus() != Status.FAILED);
final Thread thread = new Thread(runner);
thread.start();
Thread.sleep(2000);
runner.kill();
Thread.sleep(500);
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Status is " + node.getStatus(),
node.getStatus() == Status.KILLED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
// Give it 10 ms to fail.
Assert.assertTrue(node.getEndTime() - node.getStartTime() < 3000);
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
// Log file and output files should not exist.
final File logFile = new File(runner.getLogFilePath());
final Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
Assert.assertTrue(eventCollector.checkOrdering());
Assert.assertTrue(runner.isKilled());
eventCollector
.assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
@Ignore
@Test
public void testDelayedExecutionJob() {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
createJobRunner(1, "testJob", 1, false, loader, eventCollector);
runner.setDelayStart(5000);
final long startTime = System.currentTimeMillis();
final ExecutableNode node = runner.getNode();
eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
runner.run();
eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
node.getStatus() == Status.SUCCEEDED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
Assert.assertTrue(node.getEndTime() - node.getStartTime() > 1000);
Assert.assertTrue(node.getStartTime() - startTime >= 5000);
final File logFile = new File(runner.getLogFilePath());
final Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps != null);
Assert.assertTrue(logFile.exists());
Assert.assertFalse(runner.isKilled());
Assert.assertTrue(loader.getNodeUpdateCount(node.getId()) == 3);
Assert.assertTrue(eventCollector.checkOrdering());
eventCollector
.assertEvents(EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED, EventType.JOB_FINISHED);
}
@Test
public void testDelayedExecutionCancelledJob() throws InterruptedException {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final JobRunner runner =
createJobRunner(1, "testJob", 1, false, loader, eventCollector);
runner.setDelayStart(5000);
final long startTime = System.currentTimeMillis();
final ExecutableNode node = runner.getNode();
eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
final Thread thread = new Thread(runner);
thread.start();
StatusTestUtils.waitForStatus(node, Status.READY);
// sleep so that job has time to get into delayExecution() -> wait()
Thread.sleep(2000L);
runner.kill();
StatusTestUtils.waitForStatus(node, Status.KILLED);
eventCollector.handleEvent(Event.create(null, EventType.JOB_FINISHED, new EventData(node)));
Assert.assertTrue(runner.getStatus() == node.getStatus());
Assert.assertTrue("Node status is " + node.getStatus(),
node.getStatus() == Status.KILLED);
Assert.assertTrue(node.getStartTime() > 0 && node.getEndTime() > 0);
Assert.assertTrue(node.getEndTime() - node.getStartTime() < 1000);
Assert.assertTrue(node.getStartTime() - startTime >= 2000);
Assert.assertTrue(node.getStartTime() - startTime <= 5000);
Assert.assertTrue(runner.isKilled());
final File logFile = new File(runner.getLogFilePath());
final Props outputProps = runner.getNode().getOutputProps();
Assert.assertTrue(outputProps == null);
Assert.assertTrue(logFile.exists());
// wait so that there's time to make the "DB update" for KILLED status
TestUtils.await().untilAsserted(
() -> assertThat(loader.getNodeUpdateCount("testJob")).isEqualTo(2));
eventCollector.assertEvents(EventType.JOB_FINISHED);
}
private Props createProps(final int sleepSec, final boolean fail) {
final Props props = new Props();
props.put("type", "test");
// TODO always use 0 if "immediate succeess" and 10 if "wait until killed"
// 0 makes the tests as quick as possible and 10 provides stability when test run is slow
props.put("seconds", sleepSec);
props.put(ProcessJob.WORKING_DIR, this.workingDir.getPath());
props.put("fail", String.valueOf(fail));
return props;
}
private JobRunner createJobRunner(final int execId, final String name, final int time,
final boolean fail, final ExecutorLoader loader, final EventCollectorListener listener) {
return createJobRunner(execId, name, time, fail, loader, listener, new Props());
}
private JobRunner createJobRunner(final int execId, final String name, final int time,
final boolean fail, final ExecutorLoader loader, final EventCollectorListener listener,
final Props azkabanProps) {
final ExecutableFlow flow = new ExecutableFlow();
flow.setExecutionId(execId);
final ExecutableNode node = new ExecutableNode();
node.setId(name);
node.setParentFlow(flow);
final Props props = createProps(time, fail);
node.setInputProps(props);
final HashSet<String> proxyUsers = new HashSet<>();
proxyUsers.add(flow.getSubmitUser());
final JobRunner runner = new JobRunner(node, this.workingDir, loader, this.jobtypeManager,
azkabanProps);
runner.setLogSettings(this.logger, "5MB", 4);
runner.addListener(listener);
return runner;
}
}