package azkaban.execapp;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.JavaJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobExecutor.AllJobExecutorTests;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.spi.AzkabanEventReporter;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class FlowRunnerTest2 extends FlowRunnerTestBase {
private static int id = 101;
private final AzkabanEventReporter azkabanEventReporter =
EventReporterUtil.getTestAzkabanEventReporter();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File workingDir;
private JobTypeManager jobtypeManager;
private ExecutorLoader fakeExecutorLoader;
private Project project;
private Map<String, Flow> flowMap;
@Before
public void setUp() throws Exception {
this.workingDir = this.temporaryFolder.newFolder();
this.jobtypeManager = new JobTypeManager(null, null,
this.getClass().getClassLoader());
final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
pluginSet.addPluginClass("java", JavaJob.class);
pluginSet.addPluginClass("test", InteractiveTestJob.class);
this.fakeExecutorLoader = new MockExecutorLoader();
this.project = new Project(1, "testProject");
Utils.initServiceProvider();
JmxJobMBeanManager.getInstance().initialize(new Props());
this.flowMap = FlowRunnerTestUtil
.prepareProject(this.project, ExecutionsTestUtil.getFlowDir("embedded2"),
this.workingDir);
InteractiveTestJob.clearTestJobs();
}
@Test
public void testBasicRun() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
final Props joba = this.runner.getExecutableFlow().getExecutableNodePath("joba")
.getInputProps();
assertEquals("joba.1", joba.get("param1"));
assertEquals("test1.2", joba.get("param2"));
assertEquals("test1.3", joba.get("param3"));
assertEquals("override.4", joba.get("param4"));
assertEquals("test2.5", joba.get("param5"));
assertEquals("test2.6", joba.get("param6"));
assertEquals("test2.7", joba.get("param7"));
assertEquals("test2.8", joba.get("param8"));
final Props joba1 = this.runner.getExecutableFlow().getExecutableNodePath("joba1")
.getInputProps();
assertEquals("test1.1", joba1.get("param1"));
assertEquals("test1.2", joba1.get("param2"));
assertEquals("test1.3", joba1.get("param3"));
assertEquals("override.4", joba1.get("param4"));
assertEquals("test2.5", joba1.get("param5"));
assertEquals("test2.6", joba1.get("param6"));
assertEquals("test2.7", joba1.get("param7"));
assertEquals("test2.8", joba1.get("param8"));
InteractiveTestJob.getTestJob("joba").succeedJob(
Props.of("output.joba", "joba", "output.override", "joba"));
assertStatus("joba", Status.SUCCEEDED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNodePath("jobb");
assertEquals(Status.RUNNING, node.getStatus());
final Props jobb = node.getInputProps();
assertEquals("override.4", jobb.get("param4"));
assertEquals("moo", jobb.get("testprops"));
assertEquals("jobb", jobb.get("output.override"));
assertEquals("joba", jobb.get("output.joba"));
final Props jobbInnerJobA = this.runner.getExecutableFlow()
.getExecutableNodePath("jobb:innerJobA")
.getInputProps();
assertEquals("test1.1", jobbInnerJobA.get("param1"));
assertEquals("test1.2", jobbInnerJobA.get("param2"));
assertEquals("test1.3", jobbInnerJobA.get("param3"));
assertEquals("override.4", jobbInnerJobA.get("param4"));
assertEquals("test2.5", jobbInnerJobA.get("param5"));
assertEquals("test2.6", jobbInnerJobA.get("param6"));
assertEquals("test2.7", jobbInnerJobA.get("param7"));
assertEquals("test2.8", jobbInnerJobA.get("param8"));
assertEquals("joba", jobbInnerJobA.get("output.joba"));
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob(
Props.of("output.jobb.innerJobA", "jobb.innerJobA"));
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
assertStatus("jobb:innerJobB", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.RUNNING);
final Props jobbInnerJobB = this.runner.getExecutableFlow()
.getExecutableNodePath("jobb:innerJobB")
.getInputProps();
assertEquals("test1.1", jobbInnerJobB.get("param1"));
assertEquals("override.4", jobbInnerJobB.get("param4"));
assertEquals("jobb.innerJobA",
jobbInnerJobB.get("output.jobb.innerJobA"));
assertEquals("moo", jobbInnerJobB.get("testprops"));
InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob(
Props.of("output.jobb.innerJobB", "jobb.innerJobB"));
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob(
Props.of("output.jobb.innerJobC", "jobb.innerJobC"));
assertStatus("jobb:innerJobB", Status.SUCCEEDED);
assertStatus("jobb:innerJobC", Status.SUCCEEDED);
assertStatus("jobb:innerFlow", Status.RUNNING);
final Props jobbInnerJobD = this.runner.getExecutableFlow()
.getExecutableNodePath("jobb:innerFlow")
.getInputProps();
assertEquals("test1.1", jobbInnerJobD.get("param1"));
assertEquals("override.4", jobbInnerJobD.get("param4"));
assertEquals("jobb.innerJobB",
jobbInnerJobD.get("output.jobb.innerJobB"));
assertEquals("jobb.innerJobC",
jobbInnerJobD.get("output.jobb.innerJobC"));
InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob(
Props.of("output1.jobb", "test1", "output2.jobb", "test2"));
assertStatus("jobb:innerFlow", Status.SUCCEEDED);
assertStatus("jobb", Status.SUCCEEDED);
final Props jobbOutput = this.runner.getExecutableFlow().getExecutableNodePath("jobb")
.getOutputProps();
assertEquals("test1", jobbOutput.get("output1.jobb"));
assertEquals("test2", jobbOutput.get("output2.jobb"));
InteractiveTestJob.getTestJob("jobc").succeedJob(
Props.of("output.jobc", "jobc"));
assertStatus("jobc", Status.SUCCEEDED);
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
assertStatus("jobd:innerJobA", Status.SUCCEEDED);
assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
assertStatus("jobd", Status.SUCCEEDED);
assertStatus("jobe", Status.RUNNING);
final Props jobd = this.runner.getExecutableFlow().getExecutableNodePath("jobe")
.getInputProps();
assertEquals("test1", jobd.get("output1.jobb"));
assertEquals("jobc", jobd.get("output.jobc"));
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobe").succeedJob();
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobe", Status.SUCCEEDED);
assertStatus("jobf", Status.RUNNING);
InteractiveTestJob.getTestJob("jobf").succeedJob();
assertStatus("jobf", Status.SUCCEEDED);
waitForAndAssertFlowStatus(Status.SUCCEEDED);
}
@Test
public void testDisabledNormal() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector);
final ExecutableFlow flow = this.runner.getExecutableFlow();
flow.getExecutableNode("jobb").setStatus(Status.DISABLED);
((ExecutableFlowBase) flow.getExecutableNode("jobd")).getExecutableNode(
"innerJobA").setStatus(Status.DISABLED);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.SKIPPED);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.SKIPPED);
assertStatus("jobd:innerFlow2", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.READY);
assertStatus("jobb:innerJobB", Status.READY);
assertStatus("jobb:innerJobC", Status.READY);
assertStatus("jobb:innerFlow", Status.READY);
InteractiveTestJob.getTestJob("jobc").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
assertStatus("jobd", Status.SUCCEEDED);
assertStatus("jobc", Status.SUCCEEDED);
assertStatus("jobe", Status.RUNNING);
InteractiveTestJob.getTestJob("jobe").succeedJob();
InteractiveTestJob.getTestJob("joba1").succeedJob();
assertStatus("jobe", Status.SUCCEEDED);
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobf", Status.RUNNING);
InteractiveTestJob.getTestJob("jobf").succeedJob();
assertStatus("jobf", Status.SUCCEEDED);
waitForAndAssertFlowStatus(Status.SUCCEEDED);
assertThreadShutDown();
}
@Test
public void testNormalFailure1() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").failJob();
waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
assertStatus("joba", Status.FAILED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.CANCELLED);
assertStatus("jobc", Status.CANCELLED);
assertStatus("jobd", Status.CANCELLED);
assertStatus("jobd:innerJobA", Status.READY);
assertStatus("jobd:innerFlow2", Status.READY);
assertStatus("jobb:innerJobA", Status.READY);
assertStatus("jobb:innerFlow", Status.READY);
assertStatus("jobe", Status.CANCELLED);
InteractiveTestJob.getTestJob("joba1").succeedJob();
assertStatus("jobf", Status.CANCELLED);
waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@Test
public void testNormalFailure2() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").failJob();
assertStatus("joba1", Status.FAILED);
waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
assertStatus("jobd:innerJobA", Status.SUCCEEDED);
assertStatus("jobb:innerJobB", Status.CANCELLED);
assertStatus("jobb:innerJobC", Status.CANCELLED);
assertStatus("jobb:innerFlow", Status.CANCELLED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobb", Status.KILLED);
assertStatus("jobd", Status.KILLED);
waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
InteractiveTestJob.getTestJob("jobc").succeedJob();
assertStatus("jobc", Status.SUCCEEDED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@Test
public void testNormalFailure3() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobb:innerJobB", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
assertStatus("jobb", Status.FAILED_FINISHING);
assertStatus("jobb:innerJobB", Status.FAILED);
waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
assertStatus("jobd:innerJobA", Status.SUCCEEDED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobd", Status.KILLED);
assertStatus("jobb:innerJobC", Status.SUCCEEDED);
assertStatus("jobb:innerFlow", Status.CANCELLED);
assertStatus("jobb", Status.FAILED);
InteractiveTestJob.getTestJob("jobc").succeedJob();
assertStatus("jobc", Status.SUCCEEDED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@Test
public void testFailedFinishingFailure3() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector,
FailureAction.FINISH_ALL_POSSIBLE);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobb:innerJobB", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
assertStatus("jobb", Status.FAILED_FINISHING);
assertStatus("jobb:innerJobB", Status.FAILED);
waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
assertStatus("jobb", Status.FAILED);
assertStatus("jobd:innerJobA", Status.SUCCEEDED);
assertStatus("jobd:innerFlow2", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.SUCCEEDED);
assertStatus("jobb:innerFlow", Status.CANCELLED);
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
assertStatus("jobd", Status.SUCCEEDED);
InteractiveTestJob.getTestJob("jobc").succeedJob();
assertStatus("jobc", Status.SUCCEEDED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@Test
public void testCancelOnFailure() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector,
FailureAction.CANCEL_ALL);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobb:innerJobB", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
assertStatus("jobb", Status.FAILED);
assertStatus("jobb:innerJobB", Status.FAILED);
assertStatus("jobb:innerJobC", Status.KILLED);
assertStatus("jobb:innerFlow", Status.CANCELLED);
assertStatus("jobc", Status.KILLED);
assertStatus("jobd", Status.KILLED);
assertStatus("jobd:innerJobA", Status.KILLED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
assertThreadShutDown();
waitForAndAssertFlowStatus(Status.KILLED);
}
@Test
public void testRetryOnFailure() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector);
final ExecutableFlow flow = this.runner.getExecutableFlow();
flow.getExecutableNode("joba").setStatus(Status.DISABLED);
((ExecutableFlowBase) flow.getExecutableNode("jobb")).getExecutableNode(
"innerFlow").setStatus(Status.DISABLED);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.SKIPPED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
assertStatus("jobb:innerJobB", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
InteractiveTestJob.getTestJob("jobb:innerJobC").failJob();
assertStatus("jobb:innerJobB", Status.FAILED);
assertStatus("jobb:innerJobC", Status.FAILED);
assertStatus("jobb", Status.FAILED);
assertStatus("jobb:innerFlow", Status.SKIPPED);
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
assertStatus("jobd:innerJobA", Status.SUCCEEDED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobd", Status.KILLED);
waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
final ExecutableNode node = this.runner.getExecutableFlow()
.getExecutableNodePath("jobd:innerFlow2");
final ExecutableFlowBase base = node.getParentFlow();
for (final String nodeId : node.getInNodes()) {
final ExecutableNode inNode = base.getExecutableNode(nodeId);
System.out.println(inNode.getId() + " > " + inNode.getStatus());
}
assertStatus("jobb:innerFlow", Status.SKIPPED);
InteractiveTestJob.clearTestJobs("jobb:innerJobB", "jobb:innerJobC");
this.runner.retryFailures("me");
assertStatus("jobb:innerJobB", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.RUNNING);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobb:innerFlow", Status.DISABLED);
assertStatus("jobd:innerFlow2", Status.RUNNING);
waitForAndAssertFlowStatus(Status.RUNNING);
assertThreadRunning();
InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
InteractiveTestJob.getTestJob("jobc").succeedJob();
assertStatus("jobb:innerFlow", Status.SKIPPED);
assertStatus("jobb", Status.SUCCEEDED);
assertStatus("jobb:innerJobB", Status.SUCCEEDED);
assertStatus("jobb:innerJobC", Status.SUCCEEDED);
assertStatus("jobc", Status.SUCCEEDED);
assertStatus("jobd", Status.SUCCEEDED);
assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
assertStatus("jobe", Status.RUNNING);
InteractiveTestJob.getTestJob("jobe").succeedJob();
assertStatus("jobe", Status.SUCCEEDED);
InteractiveTestJob.getTestJob("joba1").succeedJob();
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobf", Status.RUNNING);
InteractiveTestJob.getTestJob("jobf").succeedJob();
assertStatus("jobf", Status.SUCCEEDED);
waitForAndAssertFlowStatus(Status.SUCCEEDED);
assertThreadShutDown();
}
@Test
public void testCancel() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector,
FailureAction.CANCEL_ALL);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobb:innerJobB", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.RUNNING);
this.runner.kill("me");
assertStatus("jobb", Status.KILLED);
assertStatus("jobb:innerJobB", Status.KILLED);
assertStatus("jobb:innerJobC", Status.KILLED);
assertStatus("jobb:innerFlow", Status.CANCELLED);
assertStatus("jobc", Status.KILLED);
assertStatus("jobd", Status.KILLED);
assertStatus("jobd:innerJobA", Status.KILLED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
waitForAndAssertFlowStatus(Status.KILLED);
assertThreadShutDown();
}
@Test
public void testManualCancelOnFailure() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobb:innerJobB", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
assertStatus("jobb:innerJobB", Status.FAILED);
assertStatus("jobb", Status.FAILED_FINISHING);
waitForAndAssertFlowStatus(Status.FAILED_FINISHING);
this.runner.kill("me");
assertStatus("jobb", Status.FAILED);
assertStatus("jobb:innerJobC", Status.KILLED);
assertStatus("jobb:innerFlow", Status.CANCELLED);
assertStatus("jobc", Status.KILLED);
assertStatus("jobd", Status.KILLED);
assertStatus("jobd:innerJobA", Status.KILLED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
waitForAndAssertFlowStatus(Status.KILLED);
assertThreadShutDown();
}
@Test
public void testPause() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
this.runner.pause("test");
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
waitForAndAssertFlowStatus(Status.PAUSED);
this.runner.resume("test");
waitForAndAssertFlowStatus(Status.RUNNING);
assertStatus("joba", Status.SUCCEEDED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
this.runner.pause("test");
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob(
Props.of("output.jobb.innerJobA", "jobb.innerJobA"));
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
this.runner.resume("test");
assertStatus("jobb:innerJobB", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob(
Props.of("output.jobb.innerJobB", "jobb.innerJobB"));
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob(
Props.of("output.jobb.innerJobC", "jobb.innerJobC"));
assertStatus("jobb:innerJobB", Status.SUCCEEDED);
assertStatus("jobb:innerJobC", Status.SUCCEEDED);
assertStatus("jobb:innerFlow", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob(
Props.of("output1.jobb", "test1", "output2.jobb", "test2"));
assertStatus("jobb:innerFlow", Status.SUCCEEDED);
assertStatus("jobb", Status.SUCCEEDED);
InteractiveTestJob.getTestJob("jobc").succeedJob(
Props.of("output.jobc", "jobc"));
assertStatus("jobc", Status.SUCCEEDED);
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
assertStatus("jobd:innerJobA", Status.SUCCEEDED);
assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
assertStatus("jobd", Status.SUCCEEDED);
assertStatus("jobe", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobe").succeedJob();
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobe", Status.SUCCEEDED);
assertStatus("jobf", Status.RUNNING);
InteractiveTestJob.getTestJob("jobf").succeedJob();
assertStatus("jobf", Status.SUCCEEDED);
waitForAndAssertFlowStatus(Status.SUCCEEDED);
assertThreadShutDown();
}
@Test
public void testPauseKill() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
this.runner.pause("me");
waitForAndAssertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
assertStatus("jobd:innerJobA", Status.SUCCEEDED);
this.runner.kill("me");
assertStatus("joba1", Status.KILLED);
assertStatus("jobb:innerJobB", Status.CANCELLED);
assertStatus("jobb:innerJobC", Status.CANCELLED);
assertStatus("jobb:innerFlow", Status.CANCELLED);
assertStatus("jobb", Status.KILLED);
assertStatus("jobc", Status.KILLED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobd", Status.KILLED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
waitForAndAssertFlowStatus(Status.KILLED);
assertThreadShutDown();
}
@Test
public void testPauseFail() throws Exception {
this.eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(this.eventCollector,
FailureAction.FINISH_CURRENTLY_RUNNING);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
this.runner.pause("me");
waitForAndAssertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
assertStatus("jobd:innerJobA", Status.FAILED);
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
waitEventFired("jobd:innerJobA", Status.FAILED);
waitForAndAssertFlowStatus(Status.PAUSED);
this.runner.resume("me");
assertStatus("jobb:innerJobB", Status.CANCELLED);
assertStatus("jobb:innerJobC", Status.CANCELLED);
assertStatus("jobb:innerFlow", Status.CANCELLED);
assertStatus("jobb", Status.KILLED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobd", Status.FAILED);
InteractiveTestJob.getTestJob("jobc").succeedJob();
InteractiveTestJob.getTestJob("joba1").succeedJob();
assertStatus("jobc", Status.SUCCEEDED);
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobf", Status.CANCELLED);
assertStatus("jobe", Status.CANCELLED);
waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@Test
public void testPauseFailFinishAll() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector,
FailureAction.FINISH_ALL_POSSIBLE);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
this.runner.pause("me");
waitForAndAssertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
assertStatus("jobd:innerJobA", Status.FAILED);
assertStatus("jobb:innerJobA", Status.SUCCEEDED);
this.runner.resume("me");
assertStatus("jobb:innerJobB", Status.RUNNING);
assertStatus("jobb:innerJobC", Status.RUNNING);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobd", Status.FAILED);
InteractiveTestJob.getTestJob("jobc").succeedJob();
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob();
assertStatus("jobc", Status.SUCCEEDED);
assertStatus("joba1", Status.SUCCEEDED);
assertStatus("jobb:innerJobB", Status.SUCCEEDED);
assertStatus("jobb:innerJobC", Status.SUCCEEDED);
assertStatus("jobb:innerFlow", Status.SUCCEEDED);
assertStatus("jobb", Status.SUCCEEDED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
waitForAndAssertFlowStatus(Status.FAILED);
assertThreadShutDown();
}
@Test
public void testFlowKilledByJobLevelSLA() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector,
FailureAction.CANCEL_ALL);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
for (final JobRunner jobRunner : this.runner.getActiveJobRunners()) {
if (jobRunner.getJobId().equals("joba")) {
jobRunner.killBySLA();
break;
}
}
waitForAndAssertFlowStatus(Status.KILLED);
assertThreadShutDown();
}
@Test
public void testPauseFailKill() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
this.runner = createFlowRunner(eventCollector,
FailureAction.CANCEL_ALL);
runFlowRunnerInThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
InteractiveTestJob.getTestJob("joba").succeedJob();
assertStatus("joba", Status.SUCCEEDED);
assertStatus("joba1", Status.RUNNING);
assertStatus("jobb", Status.RUNNING);
assertStatus("jobc", Status.RUNNING);
assertStatus("jobd", Status.RUNNING);
assertStatus("jobd:innerJobA", Status.RUNNING);
assertStatus("jobb:innerJobA", Status.RUNNING);
this.runner.pause("me");
waitForAndAssertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
assertStatus("jobd:innerJobA", Status.FAILED);
assertStatus("jobd:innerFlow2", Status.CANCELLED);
assertStatus("jobd", Status.FAILED);
assertStatus("jobb:innerJobA", Status.KILLED);
assertStatus("jobb:innerJobB", Status.CANCELLED);
assertStatus("jobb:innerJobC", Status.CANCELLED);
assertStatus("jobb:innerFlow", Status.CANCELLED);
assertStatus("jobb", Status.KILLED);
assertStatus("jobc", Status.KILLED);
assertStatus("jobe", Status.CANCELLED);
assertStatus("jobf", Status.CANCELLED);
assertStatus("joba1", Status.KILLED);
waitForAndAssertFlowStatus(Status.KILLED);
assertThreadShutDown();
}
private void runFlowRunnerInThread(final FlowRunner runner) {
final Thread thread = new Thread(runner);
thread.start();
}
private FlowRunner createFlowRunner(final EventCollectorListener eventCollector)
throws Exception {
return createFlowRunner(eventCollector,
FailureAction.FINISH_CURRENTLY_RUNNING);
}
private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
final FailureAction action) throws Exception {
return createFlowRunner(eventCollector, action, new Props());
}
private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
final FailureAction action, final Props azkabanProps)
throws Exception {
final Flow flow = this.flowMap.get("jobf");
final int exId = id++;
final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
exFlow.setExecutionPath(this.workingDir.getPath());
exFlow.setExecutionId(exId);
final Map<String, String> flowParam = new HashMap<>();
flowParam.put("param4", "override.4");
flowParam.put("param10", "override.10");
flowParam.put("param11", "override.11");
exFlow.getExecutionOptions().addAllFlowParameters(flowParam);
exFlow.getExecutionOptions().setFailureAction(action);
this.fakeExecutorLoader.uploadExecutableFlow(exFlow);
final FlowRunner runner = new FlowRunner(
this.fakeExecutorLoader.fetchExecutableFlow(exId), this.fakeExecutorLoader,
mock(ProjectLoader.class), this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
runner.addListener(eventCollector);
return runner;
}
}