package azkaban.execapp;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.when;
import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.Status;
import azkaban.jobExecutor.AllJobExecutorTests;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.ProjectLoader;
import azkaban.spi.AzkabanEventReporter;
import azkaban.spi.EventType;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
import java.io.File;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
public class FlowRunnerTest extends FlowRunnerTestBase {
private static final File TEST_DIR = ExecutionsTestUtil.getFlowDir("exectest1");
private final AzkabanEventReporter azkabanEventReporter = null;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File workingDir;
private JobTypeManager jobtypeManager;
@Mock
private ProjectLoader fakeProjectLoader;
@Mock
private ExecutorLoader loader;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(this.loader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
this.workingDir = this.temporaryFolder.newFolder();
this.jobtypeManager =
new JobTypeManager(null, null, this.getClass().getClassLoader());
final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
pluginSet.addPluginClass("test", InteractiveTestJob.class);
Utils.initServiceProvider();
JmxJobMBeanManager.getInstance().initialize(new Props());
InteractiveTestJob.clearTestJobs();
}
@Test
public void exec1Normal() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
startThread(this.runner);
succeedJobs("job3", "job4", "job6");
waitForAndAssertFlowStatus(Status.SUCCEEDED);
assertThreadShutDown();
compareFinishedRuntime(this.runner);
assertStatus("job1", Status.SUCCEEDED);
assertStatus("job2", Status.SUCCEEDED);
assertStatus("job3", Status.SUCCEEDED);
assertStatus("job4", Status.SUCCEEDED);
assertStatus("job5", Status.SUCCEEDED);
assertStatus("job6", Status.SUCCEEDED);
assertStatus("job7", Status.SUCCEEDED);
assertStatus("job8", Status.SUCCEEDED);
assertStatus("job10", Status.SUCCEEDED);
eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void exec1Disabled() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
final ExecutableFlow exFlow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec1", 1);
exFlow.getExecutableNode("job1").setStatus(Status.DISABLED);
exFlow.getExecutableNode("job6").setStatus(Status.DISABLED);
exFlow.getExecutableNode("job5").setStatus(Status.DISABLED);
exFlow.getExecutableNode("job10").setStatus(Status.DISABLED);
this.runner = createFlowRunner(exFlow, this.loader, eventCollector);
Assert.assertTrue(!this.runner.isKilled());
waitForAndAssertFlowStatus(Status.READY);
startThread(this.runner);
succeedJobs("job3", "job4");
assertThreadShutDown();
compareFinishedRuntime(this.runner);
waitForAndAssertFlowStatus(Status.SUCCEEDED);
assertStatus("job1", Status.SKIPPED);
assertStatus("job2", Status.SUCCEEDED);
assertStatus("job3", Status.SUCCEEDED);
assertStatus("job4", Status.SUCCEEDED);
assertStatus("job5", Status.SKIPPED);
assertStatus("job6", Status.SKIPPED);
assertStatus("job7", Status.SUCCEEDED);
assertStatus("job8", Status.SUCCEEDED);
assertStatus("job10", Status.SKIPPED);
eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void exec1Failed() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
final ExecutableFlow flow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
this.runner = createFlowRunner(flow, this.loader, eventCollector);
startThread(this.runner);
succeedJobs("job6");
Assert.assertTrue(!this.runner.isKilled());
waitForAndAssertFlowStatus(Status.FAILED);
assertStatus("job1", Status.SUCCEEDED);
assertStatus("job2d", Status.FAILED);
assertStatus("job3", Status.CANCELLED);
assertStatus("job4", Status.CANCELLED);
assertStatus("job5", Status.CANCELLED);
assertStatus("job6", Status.SUCCEEDED);
assertStatus("job7", Status.CANCELLED);
assertStatus("job8", Status.CANCELLED);
assertStatus("job9", Status.CANCELLED);
assertStatus("job10", Status.CANCELLED);
assertThreadShutDown();
eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void exec1FailedKillAll() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
final ExecutableFlow flow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
this.runner = createFlowRunner(flow, this.loader, eventCollector);
startThread(this.runner);
assertThreadShutDown();
Assert.assertTrue(this.runner.isKilled());
waitForAndAssertFlowStatus(Status.KILLED);
assertStatus("job1", Status.SUCCEEDED);
assertStatus("job2d", Status.FAILED);
assertStatus("job3", Status.CANCELLED);
assertStatus("job4", Status.CANCELLED);
assertStatus("job5", Status.CANCELLED);
assertStatus("job6", Status.KILLED);
assertStatus("job7", Status.CANCELLED);
assertStatus("job8", Status.CANCELLED);
assertStatus("job9", Status.CANCELLED);
assertStatus("job10", Status.CANCELLED);
eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void exec1FailedFinishRest() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
final ExecutableFlow flow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, "exec3", 1);
flow.getExecutionOptions().setFailureAction(
FailureAction.FINISH_ALL_POSSIBLE);
this.runner = createFlowRunner(flow, this.loader, eventCollector);
startThread(this.runner);
succeedJobs("job3");
waitForAndAssertFlowStatus(Status.FAILED);
assertStatus("job1", Status.SUCCEEDED);
assertStatus("job2d", Status.FAILED);
assertStatus("job3", Status.SUCCEEDED);
assertStatus("job4", Status.CANCELLED);
assertStatus("job5", Status.CANCELLED);
assertStatus("job6", Status.CANCELLED);
assertStatus("job7", Status.SUCCEEDED);
assertStatus("job8", Status.SUCCEEDED);
assertStatus("job9", Status.SUCCEEDED);
assertStatus("job10", Status.CANCELLED);
assertThreadShutDown();
eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void execAndCancel() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
startThread(this.runner);
assertStatus("job1", Status.SUCCEEDED);
assertStatus("job2", Status.SUCCEEDED);
waitJobsStarted(this.runner, "job3", "job4", "job6");
InteractiveTestJob.getTestJob("job3").ignoreCancel();
this.runner.kill("me");
assertStatus("job3", Status.KILLING);
assertFlowStatus(this.runner.getExecutableFlow(), Status.KILLING);
InteractiveTestJob.getTestJob("job3").failJob();
Assert.assertTrue(this.runner.isKilled());
assertStatus("job5", Status.CANCELLED);
assertStatus("job7", Status.CANCELLED);
assertStatus("job8", Status.CANCELLED);
assertStatus("job10", Status.CANCELLED);
assertStatus("job3", Status.KILLED);
assertStatus("job4", Status.KILLED);
assertStatus("job6", Status.KILLED);
assertThreadShutDown();
waitForAndAssertFlowStatus(Status.KILLED);
eventCollector.assertEvents(EventType.FLOW_STARTED, EventType.FLOW_FINISHED);
}
@Test
public void execRetries() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");
startThread(this.runner);
assertThreadShutDown();
assertStatus("job-retry", Status.SUCCEEDED);
assertStatus("job-pass", Status.SUCCEEDED);
assertStatus("job-retry-fail", Status.FAILED);
assertAttempts("job-retry", 3);
assertAttempts("job-pass", 0);
assertAttempts("job-retry-fail", 2);
waitForAndAssertFlowStatus(Status.FAILED);
}
private void startThread(final FlowRunner runner) {
Assert.assertTrue(!runner.isKilled());
final Thread thread = new Thread(runner);
thread.start();
}
private void assertAttempts(final String name, final int attempt) {
final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNode(name);
if (node.getAttempt() != attempt) {
Assert.fail("Expected " + attempt + " got " + node.getAttempt()
+ " attempts " + name);
}
}
private void compareFinishedRuntime(final FlowRunner runner) throws Exception {
final ExecutableFlow flow = runner.getExecutableFlow();
for (final String flowName : flow.getStartNodes()) {
final ExecutableNode node = flow.getExecutableNode(flowName);
compareStartFinishTimes(flow, node, 0);
}
}
private void compareStartFinishTimes(final ExecutableFlow flow,
final ExecutableNode node, final long previousEndTime) throws Exception {
final long startTime = node.getStartTime();
final long endTime = node.getEndTime();
if (startTime <= 0) {
Assert.assertTrue(endTime <= 0);
return;
}
Assert.assertTrue("Checking start and end times", startTime > 0
&& endTime >= startTime);
Assert.assertTrue("Start time for " + node.getId() + " is " + startTime
+ " and less than " + previousEndTime, startTime >= previousEndTime);
for (final String outNode : node.getOutNodes()) {
final ExecutableNode childNode = flow.getExecutableNode(outNode);
compareStartFinishTimes(flow, childNode, endTime);
}
}
private FlowRunner createFlowRunner(final ExecutableFlow flow,
final ExecutorLoader loader, final EventCollectorListener eventCollector) throws Exception {
return createFlowRunner(flow, loader, eventCollector, new Props());
}
private FlowRunner createFlowRunner(final ExecutableFlow flow,
final ExecutorLoader loader, final EventCollectorListener eventCollector,
final Props azkabanProps)
throws Exception {
loader.uploadExecutableFlow(flow);
final FlowRunner runner =
new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
this.azkabanEventReporter);
runner.addListener(eventCollector);
return runner;
}
private FlowRunner createFlowRunner(final ExecutorLoader loader,
final EventCollectorListener eventCollector, final String flowName) throws Exception {
return createFlowRunner(loader, eventCollector, flowName, new Props());
}
private FlowRunner createFlowRunner(final ExecutorLoader loader,
final EventCollectorListener eventCollector, final String flowName, final Props azkabanProps)
throws Exception {
final ExecutableFlow exFlow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, TEST_DIR, flowName, 1);
loader.uploadExecutableFlow(exFlow);
final FlowRunner runner =
new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
this.azkabanEventReporter);
runner.addListener(eventCollector);
return runner;
}
}