diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index 98b06cf..11709c6 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -16,22 +16,27 @@
package azkaban.execapp;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.when;
+
import azkaban.event.Event;
import azkaban.event.Event.Type;
+import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
-import azkaban.executor.JavaJob;
-import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
+import azkaban.jobExecutor.AllJobExecutorTests;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.MockProjectLoader;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.test.Utils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import java.io.File;
@@ -41,35 +46,38 @@ import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
-public class FlowRunnerTest {
+public class FlowRunnerTest extends FlowRunnerTestBase {
+ private static final File TEST_DIR = new File(
+ "../azkaban-test/src/test/resources/azkaban/test/executions/exectest1");
private File workingDir;
private JobTypeManager jobtypeManager;
private ProjectLoader fakeProjectLoader;
-
- public FlowRunnerTest() {
-
- }
+ @Mock
+ private ExecutorLoader loader;
@Before
public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(this.loader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
System.out.println("Create temp dir");
- synchronized (this) {
- this.workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
- if (this.workingDir.exists()) {
- FileUtils.deleteDirectory(this.workingDir);
- }
- this.workingDir.mkdirs();
+ this.workingDir = new File("build/tmp/_AzkabanTestDir_" + System.currentTimeMillis());
+ if (this.workingDir.exists()) {
+ FileUtils.deleteDirectory(this.workingDir);
}
+ this.workingDir.mkdirs();
this.jobtypeManager =
new JobTypeManager(null, null, this.getClass().getClassLoader());
final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
- pluginSet.addPluginClass("java", JavaJob.class);
+ pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
pluginSet.addPluginClass("test", InteractiveTestJob.class);
this.fakeProjectLoader = new MockProjectLoader(this.workingDir);
+ Utils.initServiceProvider();
+ JmxJobMBeanManager.getInstance().initialize(new Props());
InteractiveTestJob.clearTestJobs();
}
@@ -85,32 +93,29 @@ public class FlowRunnerTest {
}
}
- @Ignore
@Test
public void exec1Normal() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
- // just making compile. may not work at all.
-
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
+ this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
- Assert.assertTrue(!runner.isKilled());
- runner.run();
- final ExecutableFlow exFlow = runner.getExecutableFlow();
- Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
- compareFinishedRuntime(runner);
-
- testStatus(exFlow, "job1", Status.SUCCEEDED);
- testStatus(exFlow, "job2", Status.SUCCEEDED);
- testStatus(exFlow, "job3", Status.SUCCEEDED);
- testStatus(exFlow, "job4", Status.SUCCEEDED);
- testStatus(exFlow, "job5", Status.SUCCEEDED);
- testStatus(exFlow, "job6", Status.SUCCEEDED);
- testStatus(exFlow, "job7", Status.SUCCEEDED);
- testStatus(exFlow, "job8", Status.SUCCEEDED);
- testStatus(exFlow, "job10", Status.SUCCEEDED);
+ startThread(this.runner);
+ succeedJobs("job3", "job4", "job6");
+
+ assertFlowStatus(Status.SUCCEEDED);
+ assertThreadShutDown();
+ compareFinishedRuntime(this.runner);
+
+ assertStatus("job1", Status.SUCCEEDED);
+ assertStatus("job2", Status.SUCCEEDED);
+ assertStatus("job3", Status.SUCCEEDED);
+ assertStatus("job4", Status.SUCCEEDED);
+ assertStatus("job5", Status.SUCCEEDED);
+ assertStatus("job6", Status.SUCCEEDED);
+ assertStatus("job7", Status.SUCCEEDED);
+ assertStatus("job8", Status.SUCCEEDED);
+ assertStatus("job10", Status.SUCCEEDED);
try {
eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -122,15 +127,12 @@ public class FlowRunnerTest {
}
}
- @Ignore
@Test
public void exec1Disabled() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final File testDir = new File("unit/executions/exectest1");
- ExecutableFlow exFlow = prepareExecDir(testDir, "exec1", 1);
+ final ExecutableFlow exFlow = prepareExecDir(TEST_DIR, "exec1", 1);
// Disable couple in the middle and at the end.
exFlow.getExecutableNode("job1").setStatus(Status.DISABLED);
@@ -138,26 +140,28 @@ public class FlowRunnerTest {
exFlow.getExecutableNode("job5").setStatus(Status.DISABLED);
exFlow.getExecutableNode("job10").setStatus(Status.DISABLED);
- final FlowRunner runner = createFlowRunner(exFlow, loader, eventCollector);
+ this.runner = createFlowRunner(exFlow, this.loader, eventCollector);
- Assert.assertTrue(!runner.isKilled());
- Assert.assertTrue(exFlow.getStatus() == Status.READY);
- runner.run();
+ Assert.assertTrue(!this.runner.isKilled());
+ assertFlowStatus(Status.READY);
- exFlow = runner.getExecutableFlow();
- compareFinishedRuntime(runner);
+ startThread(this.runner);
+ succeedJobs("job3", "job4");
- Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
+ assertThreadShutDown();
+ compareFinishedRuntime(this.runner);
- testStatus(exFlow, "job1", Status.SKIPPED);
- testStatus(exFlow, "job2", Status.SUCCEEDED);
- testStatus(exFlow, "job3", Status.SUCCEEDED);
- testStatus(exFlow, "job4", Status.SUCCEEDED);
- testStatus(exFlow, "job5", Status.SKIPPED);
- testStatus(exFlow, "job6", Status.SKIPPED);
- testStatus(exFlow, "job7", Status.SUCCEEDED);
- testStatus(exFlow, "job8", Status.SUCCEEDED);
- testStatus(exFlow, "job10", Status.SKIPPED);
+ assertFlowStatus(Status.SUCCEEDED);
+
+ assertStatus("job1", Status.SKIPPED);
+ assertStatus("job2", Status.SUCCEEDED);
+ assertStatus("job3", Status.SUCCEEDED);
+ assertStatus("job4", Status.SUCCEEDED);
+ assertStatus("job5", Status.SKIPPED);
+ assertStatus("job6", Status.SKIPPED);
+ assertStatus("job7", Status.SUCCEEDED);
+ assertStatus("job8", Status.SUCCEEDED);
+ assertStatus("job10", Status.SKIPPED);
try {
eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -169,34 +173,32 @@ public class FlowRunnerTest {
}
}
- @Ignore
@Test
public void exec1Failed() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final File testDir = new File("unit/executions/exectest1");
- final ExecutableFlow flow = prepareExecDir(testDir, "exec2", 1);
+ final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec2", 1);
- final FlowRunner runner = createFlowRunner(flow, loader, eventCollector);
+ this.runner = createFlowRunner(flow, this.loader, eventCollector);
- runner.run();
- final ExecutableFlow exFlow = runner.getExecutableFlow();
- Assert.assertTrue(!runner.isKilled());
- Assert.assertTrue("Flow status " + exFlow.getStatus(),
- exFlow.getStatus() == Status.FAILED);
-
- testStatus(exFlow, "job1", Status.SUCCEEDED);
- testStatus(exFlow, "job2d", Status.FAILED);
- testStatus(exFlow, "job3", Status.CANCELLED);
- testStatus(exFlow, "job4", Status.CANCELLED);
- testStatus(exFlow, "job5", Status.CANCELLED);
- testStatus(exFlow, "job6", Status.SUCCEEDED);
- testStatus(exFlow, "job7", Status.CANCELLED);
- testStatus(exFlow, "job8", Status.CANCELLED);
- testStatus(exFlow, "job9", Status.CANCELLED);
- testStatus(exFlow, "job10", Status.CANCELLED);
+ startThread(this.runner);
+ succeedJobs("job6");
+
+ Assert.assertTrue(!this.runner.isKilled());
+ assertFlowStatus(Status.FAILED);
+
+ assertStatus("job1", Status.SUCCEEDED);
+ assertStatus("job2d", Status.FAILED);
+ assertStatus("job3", Status.CANCELLED);
+ assertStatus("job4", Status.CANCELLED);
+ assertStatus("job5", Status.CANCELLED);
+ assertStatus("job6", Status.SUCCEEDED);
+ assertStatus("job7", Status.CANCELLED);
+ assertStatus("job8", Status.CANCELLED);
+ assertStatus("job9", Status.CANCELLED);
+ assertStatus("job10", Status.CANCELLED);
+ assertThreadShutDown();
try {
eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -208,43 +210,33 @@ public class FlowRunnerTest {
}
}
- @Ignore
@Test
public void exec1FailedKillAll() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final File testDir = new File("unit/executions/exectest1");
- final ExecutableFlow flow = prepareExecDir(testDir, "exec2", 1);
+ final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec2", 1);
flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
- final FlowRunner runner = createFlowRunner(flow, loader, eventCollector);
+ this.runner = createFlowRunner(flow, this.loader, eventCollector);
- runner.run();
- final ExecutableFlow exFlow = runner.getExecutableFlow();
+ startThread(this.runner);
+ assertThreadShutDown();
- Assert.assertTrue(runner.isKilled());
+ Assert.assertTrue(this.runner.isKilled());
- Assert.assertTrue(
- "Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(),
- exFlow.getStatus() == Status.FAILED);
-
- try {
- Thread.sleep(500);
- } catch (final InterruptedException e) {
- }
+ assertFlowStatus(Status.KILLED);
- testStatus(exFlow, "job1", Status.SUCCEEDED);
- testStatus(exFlow, "job2d", Status.FAILED);
- testStatus(exFlow, "job3", Status.CANCELLED);
- testStatus(exFlow, "job4", Status.CANCELLED);
- testStatus(exFlow, "job5", Status.CANCELLED);
- testStatus(exFlow, "job6", Status.KILLED);
- testStatus(exFlow, "job7", Status.CANCELLED);
- testStatus(exFlow, "job8", Status.CANCELLED);
- testStatus(exFlow, "job9", Status.CANCELLED);
- testStatus(exFlow, "job10", Status.CANCELLED);
+ assertStatus("job1", Status.SUCCEEDED);
+ assertStatus("job2d", Status.FAILED);
+ assertStatus("job3", Status.CANCELLED);
+ assertStatus("job4", Status.CANCELLED);
+ assertStatus("job5", Status.CANCELLED);
+ assertStatus("job6", Status.KILLED);
+ assertStatus("job7", Status.CANCELLED);
+ assertStatus("job8", Status.CANCELLED);
+ assertStatus("job9", Status.CANCELLED);
+ assertStatus("job10", Status.CANCELLED);
try {
eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -256,40 +248,32 @@ public class FlowRunnerTest {
}
}
- @Ignore
@Test
public void exec1FailedFinishRest() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final File testDir = new File("unit/executions/exectest1");
- final ExecutableFlow flow = prepareExecDir(testDir, "exec3", 1);
+ final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec3", 1);
flow.getExecutionOptions().setFailureAction(
FailureAction.FINISH_ALL_POSSIBLE);
- final FlowRunner runner = createFlowRunner(flow, loader, eventCollector);
+ this.runner = createFlowRunner(flow, this.loader, eventCollector);
- runner.run();
- final ExecutableFlow exFlow = runner.getExecutableFlow();
- Assert.assertTrue(
- "Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(),
- exFlow.getStatus() == Status.FAILED);
+ startThread(this.runner);
+ succeedJobs("job3");
- try {
- Thread.sleep(500);
- } catch (final InterruptedException e) {
- }
+ assertFlowStatus(Status.FAILED);
- testStatus(exFlow, "job1", Status.SUCCEEDED);
- testStatus(exFlow, "job2d", Status.FAILED);
- testStatus(exFlow, "job3", Status.SUCCEEDED);
- testStatus(exFlow, "job4", Status.CANCELLED);
- testStatus(exFlow, "job5", Status.CANCELLED);
- testStatus(exFlow, "job6", Status.CANCELLED);
- testStatus(exFlow, "job7", Status.SUCCEEDED);
- testStatus(exFlow, "job8", Status.SUCCEEDED);
- testStatus(exFlow, "job9", Status.SUCCEEDED);
- testStatus(exFlow, "job10", Status.CANCELLED);
+ assertStatus("job1", Status.SUCCEEDED);
+ assertStatus("job2d", Status.FAILED);
+ assertStatus("job3", Status.SUCCEEDED);
+ assertStatus("job4", Status.CANCELLED);
+ assertStatus("job5", Status.CANCELLED);
+ assertStatus("job6", Status.CANCELLED);
+ assertStatus("job7", Status.SUCCEEDED);
+ assertStatus("job8", Status.SUCCEEDED);
+ assertStatus("job9", Status.SUCCEEDED);
+ assertStatus("job10", Status.CANCELLED);
+ assertThreadShutDown();
try {
eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -301,50 +285,32 @@ public class FlowRunnerTest {
}
}
- @Ignore
@Test
public void execAndCancel() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final FlowRunner runner = createFlowRunner(loader, eventCollector, "exec1");
+ this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
- Assert.assertTrue(!runner.isKilled());
- final Thread thread = new Thread(runner);
- thread.start();
+ startThread(this.runner);
- try {
- Thread.sleep(5000);
- } catch (final InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ assertStatus("job1", Status.SUCCEEDED);
+ assertStatus("job2", Status.SUCCEEDED);
+ waitJobsStarted(this.runner, "job3", "job4", "job6");
- runner.kill("me");
- Assert.assertTrue(runner.isKilled());
+ this.runner.kill("me");
+ Assert.assertTrue(this.runner.isKilled());
- try {
- Thread.sleep(2000);
- } catch (final InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ assertStatus("job5", Status.CANCELLED);
+ assertStatus("job7", Status.CANCELLED);
+ assertStatus("job8", Status.CANCELLED);
+ assertStatus("job10", Status.CANCELLED);
+ assertStatus("job3", Status.KILLED);
+ assertStatus("job4", Status.KILLED);
+ assertStatus("job6", Status.KILLED);
+ assertThreadShutDown();
- final ExecutableFlow exFlow = runner.getExecutableFlow();
- testStatus(exFlow, "job1", Status.SUCCEEDED);
- testStatus(exFlow, "job2", Status.SUCCEEDED);
- testStatus(exFlow, "job5", Status.CANCELLED);
- testStatus(exFlow, "job7", Status.CANCELLED);
- testStatus(exFlow, "job8", Status.CANCELLED);
- testStatus(exFlow, "job10", Status.CANCELLED);
- testStatus(exFlow, "job3", Status.KILLED);
- testStatus(exFlow, "job4", Status.KILLED);
- testStatus(exFlow, "job6", Status.KILLED);
-
- Assert.assertTrue(
- "Expected FAILED status instead got " + exFlow.getStatus(),
- exFlow.getStatus() == Status.KILLED);
+ assertFlowStatus(Status.KILLED);
try {
eventCollector.checkEventExists(new Type[]{Type.FLOW_STARTED,
@@ -356,42 +322,34 @@ public class FlowRunnerTest {
}
}
- @Ignore
@Test
public void execRetries() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final FlowRunner runner = createFlowRunner(loader, eventCollector, "exec4-retry");
+ this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");
- runner.run();
+ startThread(this.runner);
+ assertThreadShutDown();
- final ExecutableFlow exFlow = runner.getExecutableFlow();
- testStatus(exFlow, "job-retry", Status.SUCCEEDED);
- testStatus(exFlow, "job-pass", Status.SUCCEEDED);
- testStatus(exFlow, "job-retry-fail", Status.FAILED);
- testAttempts(exFlow, "job-retry", 3);
- testAttempts(exFlow, "job-pass", 0);
- testAttempts(exFlow, "job-retry-fail", 2);
+ assertStatus("job-retry", Status.SUCCEEDED);
+ assertStatus("job-pass", Status.SUCCEEDED);
+ assertStatus("job-retry-fail", Status.FAILED);
+ assertAttempts("job-retry", 3);
+ assertAttempts("job-pass", 0);
+ assertAttempts("job-retry-fail", 2);
- Assert.assertTrue(
- "Expected FAILED status instead got " + exFlow.getStatus(),
- exFlow.getStatus() == Status.FAILED);
+ assertFlowStatus(Status.FAILED);
}
- private void testStatus(final ExecutableFlow flow, final String name, final Status status) {
- final ExecutableNode node = flow.getExecutableNode(name);
-
- if (node.getStatus() != status) {
- Assert.fail("Status of job " + node.getId() + " is " + node.getStatus()
- + " not " + status + " as expected.");
- }
+ private void startThread(final FlowRunner runner) {
+ Assert.assertTrue(!runner.isKilled());
+ final Thread thread = new Thread(runner);
+ thread.start();
}
- private void testAttempts(final ExecutableFlow flow, final String name, final int attempt) {
- final ExecutableNode node = flow.getExecutableNode(name);
-
+ private void assertAttempts(final String name, final int attempt) {
+ final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNode(name);
if (node.getAttempt() != attempt) {
Assert.fail("Expected " + attempt + " got " + node.getAttempt()
+ " attempts " + name);
@@ -400,9 +358,7 @@ public class FlowRunnerTest {
private ExecutableFlow prepareExecDir(final File execDir, final String flowName,
final int execId) throws IOException {
- synchronized (this) {
- FileUtils.copyDirectory(execDir, this.workingDir);
- }
+ FileUtils.copyDirectory(execDir, this.workingDir);
final File jsonFlowFile = new File(this.workingDir, flowName + ".flow");
final HashMap<String, Object> flowObj =
@@ -437,8 +393,6 @@ public class FlowRunnerTest {
return;
}
- // System.out.println("Node " + node.getJobId() + " start:" + startTime +
- // " end:" + endTime + " previous:" + previousEndTime);
Assert.assertTrue("Checking start and end times", startTime > 0
&& endTime >= startTime);
Assert.assertTrue("Start time for " + node.getId() + " is " + startTime
@@ -459,9 +413,6 @@ public class FlowRunnerTest {
final ExecutorLoader loader, final EventCollectorListener eventCollector,
final Props azkabanProps)
throws Exception {
- // File testDir = new File("unit/executions/exectest1");
- // MockProjectLoader projectLoader = new MockProjectLoader(new
- // File(flow.getExecutionPath()));
loader.uploadExecutableFlow(flow);
final FlowRunner runner =
@@ -480,10 +431,7 @@ public class FlowRunnerTest {
private FlowRunner createFlowRunner(final ExecutorLoader loader,
final EventCollectorListener eventCollector, final String flowName, final Props azkabanProps)
throws Exception {
- final File testDir = new File("unit/executions/exectest1");
- final ExecutableFlow exFlow = prepareExecDir(testDir, flowName, 1);
- // MockProjectLoader projectLoader = new MockProjectLoader(new
- // File(exFlow.getExecutionPath()));
+ final ExecutableFlow exFlow = prepareExecDir(TEST_DIR, flowName, 1);
loader.uploadExecutableFlow(exFlow);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index decc17a..540e9bb 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -16,6 +16,9 @@
package azkaban.execapp;
+import static org.junit.Assert.assertEquals;
+
+import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
@@ -26,6 +29,7 @@ import azkaban.executor.JavaJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
+import azkaban.jobExecutor.AllJobExecutorTests;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.DirectoryFlowLoader;
@@ -33,6 +37,7 @@ import azkaban.project.MockProjectLoader;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
+import azkaban.test.Utils;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
@@ -41,56 +46,54 @@ import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
/**
* Test the flow run, especially with embedded flows.
*
- * This test uses executions/embedded2. It also mainly uses the flow named
- * jobf. The test is designed to control success/failures explicitly so we
- * don't have to time the flow exactly.
+ * This test uses executions/embedded2. It also mainly uses the flow named jobf. The test is
+ * designed to control success/failures explicitly so we don't have to time the flow exactly.
*
* Flow jobf looks like the following:
*
*
- * joba joba1
- * / | \ |
- * / | \ |
- * jobb jobd jobc |
- * \ | / /
- * \ | / /
- * jobe /
- * | /
- * | /
- * jobf
+ * joba joba1
+ * / | \ |
+ * / | \ |
+ * jobb jobd jobc |
+ * \ | / /
+ * \ | / /
+ * jobe /
+ * | /
+ * | /
+ * jobf
*
- * The job 'jobb' is an embedded flow:
+ * The job 'jobb' is an embedded flow:
*
- * jobb:innerFlow
+ * jobb:innerFlow
*
- * innerJobA
- * / \
- * innerJobB innerJobC
- * \ /
- * innerFlow
+ * innerJobA
+ * / \
+ * innerJobB innerJobC
+ * \ /
+ * innerFlow
*
*
- * The job 'jobd' is a simple embedded flow:
+ * The job 'jobd' is a simple embedded flow:
*
- * jobd:innerFlow2
+ * jobd:innerFlow2
*
- * innerJobA
- * |
- * innerFlow2
+ * innerJobA
+ * |
+ * innerFlow2
*
- * The following tests checks each stage of the flow run by forcing jobs to
- * succeed or fail.
+ * The following tests checks each stage of the flow run by forcing jobs to succeed or fail.
*/
-public class FlowRunnerTest2 {
+public class FlowRunnerTest2 extends FlowRunnerTestBase {
+ private static final File TEST_DIR = new File(
+ "../azkaban-test/src/test/resources/azkaban/test/executions/embedded2");
private static int id = 101;
private final Logger logger = Logger.getLogger(FlowRunnerTest2.class);
private File workingDir;
@@ -100,13 +103,10 @@ public class FlowRunnerTest2 {
private Project project;
private Map<String, Flow> flowMap;
- public FlowRunnerTest2() {
- }
-
@Before
public void setUp() throws Exception {
System.out.println("Create temp dir");
- this.workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+ this.workingDir = new File("build/tmp/_AzkabanTestDir_" + System.currentTimeMillis());
if (this.workingDir.exists()) {
FileUtils.deleteDirectory(this.workingDir);
}
@@ -115,14 +115,16 @@ public class FlowRunnerTest2 {
this.getClass().getClassLoader());
final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
+ pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
pluginSet.addPluginClass("java", JavaJob.class);
pluginSet.addPluginClass("test", InteractiveTestJob.class);
this.fakeProjectLoader = new MockProjectLoader(this.workingDir);
this.fakeExecutorLoader = new MockExecutorLoader();
this.project = new Project(1, "testProject");
+ Utils.initServiceProvider();
+ JmxJobMBeanManager.getInstance().initialize(new Props());
- final File dir = new File("unit/executions/embedded2");
- prepareProject(this.project, dir);
+ prepareProject(this.project, TEST_DIR);
InteractiveTestJob.clearTestJobs();
}
@@ -137,1231 +139,956 @@ public class FlowRunnerTest2 {
}
/**
- * Tests the basic successful flow run, and also tests all output variables
- * from each job.
+ * Tests the basic successful flow run, and also tests all output variables from each job.
*/
- @Ignore
@Test
public void testBasicRun() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+ this.runner = createFlowRunner(eventCollector, "jobf");
// 1. START FLOW
- final ExecutableFlow flow = runner.getExecutableFlow();
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
-
- compareStates(expectedStateMap, nodeMap);
- final Props joba = nodeMap.get("joba").getInputProps();
- Assert.assertEquals("joba.1", joba.get("param1"));
- Assert.assertEquals("test1.2", joba.get("param2"));
- Assert.assertEquals("test1.3", joba.get("param3"));
- Assert.assertEquals("override.4", joba.get("param4"));
- Assert.assertEquals("test2.5", joba.get("param5"));
- Assert.assertEquals("test2.6", joba.get("param6"));
- Assert.assertEquals("test2.7", joba.get("param7"));
- Assert.assertEquals("test2.8", joba.get("param8"));
-
- final Props joba1 = nodeMap.get("joba1").getInputProps();
- Assert.assertEquals("test1.1", joba1.get("param1"));
- Assert.assertEquals("test1.2", joba1.get("param2"));
- Assert.assertEquals("test1.3", joba1.get("param3"));
- Assert.assertEquals("override.4", joba1.get("param4"));
- Assert.assertEquals("test2.5", joba1.get("param5"));
- Assert.assertEquals("test2.6", joba1.get("param6"));
- Assert.assertEquals("test2.7", joba1.get("param7"));
- Assert.assertEquals("test2.8", joba1.get("param8"));
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
+
+ final Props joba = this.runner.getExecutableFlow().getExecutableNodePath("joba")
+ .getInputProps();
+ assertEquals("joba.1", joba.get("param1"));
+ assertEquals("test1.2", joba.get("param2"));
+ assertEquals("test1.3", joba.get("param3"));
+ assertEquals("override.4", joba.get("param4"));
+ assertEquals("test2.5", joba.get("param5"));
+ assertEquals("test2.6", joba.get("param6"));
+ assertEquals("test2.7", joba.get("param7"));
+ assertEquals("test2.8", joba.get("param8"));
+
+ final Props joba1 = this.runner.getExecutableFlow().getExecutableNodePath("joba1")
+ .getInputProps();
+ assertEquals("test1.1", joba1.get("param1"));
+ assertEquals("test1.2", joba1.get("param2"));
+ assertEquals("test1.3", joba1.get("param3"));
+ assertEquals("override.4", joba1.get("param4"));
+ assertEquals("test2.5", joba1.get("param5"));
+ assertEquals("test2.6", joba1.get("param6"));
+ assertEquals("test2.7", joba1.get("param7"));
+ assertEquals("test2.8", joba1.get("param8"));
// 2. JOB A COMPLETES SUCCESSFULLY
InteractiveTestJob.getTestJob("joba").succeedJob(
Props.of("output.joba", "joba", "output.override", "joba"));
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.RUNNING);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
-
- final ExecutableNode node = nodeMap.get("jobb");
- Assert.assertEquals(Status.RUNNING, node.getStatus());
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("joba1", Status.RUNNING);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+
+ final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNodePath("jobb");
+ assertEquals(Status.RUNNING, node.getStatus());
final Props jobb = node.getInputProps();
- Assert.assertEquals("override.4", jobb.get("param4"));
+ assertEquals("override.4", jobb.get("param4"));
// Test that jobb properties overwrites the output properties
- Assert.assertEquals("moo", jobb.get("testprops"));
- Assert.assertEquals("jobb", jobb.get("output.override"));
- Assert.assertEquals("joba", jobb.get("output.joba"));
-
- final Props jobbInnerJobA = nodeMap.get("jobb:innerJobA").getInputProps();
- Assert.assertEquals("test1.1", jobbInnerJobA.get("param1"));
- Assert.assertEquals("test1.2", jobbInnerJobA.get("param2"));
- Assert.assertEquals("test1.3", jobbInnerJobA.get("param3"));
- Assert.assertEquals("override.4", jobbInnerJobA.get("param4"));
- Assert.assertEquals("test2.5", jobbInnerJobA.get("param5"));
- Assert.assertEquals("test2.6", jobbInnerJobA.get("param6"));
- Assert.assertEquals("test2.7", jobbInnerJobA.get("param7"));
- Assert.assertEquals("test2.8", jobbInnerJobA.get("param8"));
- Assert.assertEquals("joba", jobbInnerJobA.get("output.joba"));
+ assertEquals("moo", jobb.get("testprops"));
+ assertEquals("jobb", jobb.get("output.override"));
+ assertEquals("joba", jobb.get("output.joba"));
+
+ final Props jobbInnerJobA = this.runner.getExecutableFlow()
+ .getExecutableNodePath("jobb:innerJobA")
+ .getInputProps();
+ assertEquals("test1.1", jobbInnerJobA.get("param1"));
+ assertEquals("test1.2", jobbInnerJobA.get("param2"));
+ assertEquals("test1.3", jobbInnerJobA.get("param3"));
+ assertEquals("override.4", jobbInnerJobA.get("param4"));
+ assertEquals("test2.5", jobbInnerJobA.get("param5"));
+ assertEquals("test2.6", jobbInnerJobA.get("param6"));
+ assertEquals("test2.7", jobbInnerJobA.get("param7"));
+ assertEquals("test2.8", jobbInnerJobA.get("param8"));
+ assertEquals("joba", jobbInnerJobA.get("output.joba"));
// 3. jobb:Inner completes
/// innerJobA completes
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob(
Props.of("output.jobb.innerJobA", "jobb.innerJobA"));
- pause(250);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
- final Props jobbInnerJobB = nodeMap.get("jobb:innerJobB").getInputProps();
- Assert.assertEquals("test1.1", jobbInnerJobB.get("param1"));
- Assert.assertEquals("override.4", jobbInnerJobB.get("param4"));
- Assert.assertEquals("jobb.innerJobA",
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobB", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.RUNNING);
+ final Props jobbInnerJobB = this.runner.getExecutableFlow()
+ .getExecutableNodePath("jobb:innerJobB")
+ .getInputProps();
+ assertEquals("test1.1", jobbInnerJobB.get("param1"));
+ assertEquals("override.4", jobbInnerJobB.get("param4"));
+ assertEquals("jobb.innerJobA",
jobbInnerJobB.get("output.jobb.innerJobA"));
- Assert.assertEquals("moo", jobbInnerJobB.get("testprops"));
+ assertEquals("moo", jobbInnerJobB.get("testprops"));
/// innerJobB, C completes
InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob(
Props.of("output.jobb.innerJobB", "jobb.innerJobB"));
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob(
Props.of("output.jobb.innerJobC", "jobb.innerJobC"));
- pause(250);
- expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerFlow", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
-
- final Props jobbInnerJobD = nodeMap.get("jobb:innerFlow").getInputProps();
- Assert.assertEquals("test1.1", jobbInnerJobD.get("param1"));
- Assert.assertEquals("override.4", jobbInnerJobD.get("param4"));
- Assert.assertEquals("jobb.innerJobB",
+ assertStatus("jobb:innerJobB", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+ assertStatus("jobb:innerFlow", Status.RUNNING);
+
+ final Props jobbInnerJobD = this.runner.getExecutableFlow()
+ .getExecutableNodePath("jobb:innerFlow")
+ .getInputProps();
+ assertEquals("test1.1", jobbInnerJobD.get("param1"));
+ assertEquals("override.4", jobbInnerJobD.get("param4"));
+ assertEquals("jobb.innerJobB",
jobbInnerJobD.get("output.jobb.innerJobB"));
- Assert.assertEquals("jobb.innerJobC",
+ assertEquals("jobb.innerJobC",
jobbInnerJobD.get("output.jobb.innerJobC"));
// 4. Finish up on inner flow for jobb
InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob(
Props.of("output1.jobb", "test1", "output2.jobb", "test2"));
- pause(250);
- expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
- expectedStateMap.put("jobb", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
- final Props jobbOutput = nodeMap.get("jobb").getOutputProps();
- Assert.assertEquals("test1", jobbOutput.get("output1.jobb"));
- Assert.assertEquals("test2", jobbOutput.get("output2.jobb"));
+ assertStatus("jobb:innerFlow", Status.SUCCEEDED);
+ assertStatus("jobb", Status.SUCCEEDED);
+ final Props jobbOutput = this.runner.getExecutableFlow().getExecutableNodePath("jobb")
+ .getOutputProps();
+ assertEquals("test1", jobbOutput.get("output1.jobb"));
+ assertEquals("test2", jobbOutput.get("output2.jobb"));
// 5. Finish jobc, jobd
InteractiveTestJob.getTestJob("jobc").succeedJob(
Props.of("output.jobc", "jobc"));
- pause(250);
- expectedStateMap.put("jobc", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobc", Status.SUCCEEDED);
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
- pause(250);
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
- pause(250);
- expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
- expectedStateMap.put("jobd", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+ assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
+ assertStatus("jobd", Status.SUCCEEDED);
+ assertStatus("jobe", Status.RUNNING);
- final Props jobd = nodeMap.get("jobe").getInputProps();
- Assert.assertEquals("test1", jobd.get("output1.jobb"));
- Assert.assertEquals("jobc", jobd.get("output.jobc"));
+ final Props jobd = this.runner.getExecutableFlow().getExecutableNodePath("jobe")
+ .getInputProps();
+ assertEquals("test1", jobd.get("output1.jobb"));
+ assertEquals("jobc", jobd.get("output.jobc"));
// 6. Finish off flow
InteractiveTestJob.getTestJob("joba1").succeedJob();
- pause(250);
InteractiveTestJob.getTestJob("jobe").succeedJob();
- pause(250);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.SUCCEEDED);
- expectedStateMap.put("jobf", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobe", Status.SUCCEEDED);
+ assertStatus("jobf", Status.RUNNING);
InteractiveTestJob.getTestJob("jobf").succeedJob();
- pause(250);
- expectedStateMap.put("jobf", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
-
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobf", Status.SUCCEEDED);
+ assertFlowStatus(Status.SUCCEEDED);
}
/**
- * Tests a flow with Disabled jobs and flows. They should properly SKIP
- * executions
+ * Tests a flow with Disabled jobs and flows. They should properly SKIP executions
*/
- @Ignore
@Test
public void testDisabledNormal() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
- final ExecutableFlow flow = runner.getExecutableFlow();
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+ this.runner = createFlowRunner(eventCollector, "jobf");
+ final ExecutableFlow flow = this.runner.getExecutableFlow();
flow.getExecutableNode("jobb").setStatus(Status.DISABLED);
((ExecutableFlowBase) flow.getExecutableNode("jobd")).getExecutableNode(
"innerJobA").setStatus(Status.DISABLED);
// 1. START FLOW
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.RUNNING);
- expectedStateMap.put("jobb", Status.SKIPPED);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.SKIPPED);
- expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.READY);
- expectedStateMap.put("jobb:innerJobB", Status.READY);
- expectedStateMap.put("jobb:innerJobC", Status.READY);
- expectedStateMap.put("jobb:innerFlow", Status.READY);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("joba1", Status.RUNNING);
+ assertStatus("jobb", Status.SKIPPED);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.SKIPPED);
+ assertStatus("jobd:innerFlow2", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.READY);
+ assertStatus("jobb:innerJobB", Status.READY);
+ assertStatus("jobb:innerJobC", Status.READY);
+ assertStatus("jobb:innerFlow", Status.READY);
// 3. jobb:Inner completes
/// innerJobA completes
InteractiveTestJob.getTestJob("jobc").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
- pause(250);
- expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
- expectedStateMap.put("jobd", Status.SUCCEEDED);
- expectedStateMap.put("jobc", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
+ assertStatus("jobd", Status.SUCCEEDED);
+ assertStatus("jobc", Status.SUCCEEDED);
+ assertStatus("jobe", Status.RUNNING);
InteractiveTestJob.getTestJob("jobe").succeedJob();
InteractiveTestJob.getTestJob("joba1").succeedJob();
- pause(250);
- expectedStateMap.put("jobe", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobf", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobe", Status.SUCCEEDED);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobf", Status.RUNNING);
// 4. Finish up on inner flow for jobb
InteractiveTestJob.getTestJob("jobf").succeedJob();
- pause(250);
- expectedStateMap.put("jobf", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobf", Status.SUCCEEDED);
- Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
- Assert.assertFalse(thread.isAlive());
+ assertFlowStatus(Status.SUCCEEDED);
+ assertThreadShutDown();
}
/**
- * Tests a failure with the default FINISH_CURRENTLY_RUNNING.
- * After the first failure, every job that started should complete, and the
- * rest of the jobs should be skipped.
+ * Tests a failure with the default FINISH_CURRENTLY_RUNNING. After the first failure, every job
+ * that started should complete, and the rest of the jobs should be skipped.
*/
- @Ignore
@Test
public void testNormalFailure1() throws Exception {
// Test propagation of KILLED status to embedded flows.
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
- final ExecutableFlow flow = runner.getExecutableFlow();
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+ this.runner = createFlowRunner(eventCollector, "jobf");
// 1. START FLOW
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
InteractiveTestJob.getTestJob("joba").failJob();
- pause(250);
- Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
- expectedStateMap.put("joba", Status.FAILED);
- expectedStateMap.put("joba1", Status.RUNNING);
- expectedStateMap.put("jobb", Status.CANCELLED);
- expectedStateMap.put("jobc", Status.CANCELLED);
- expectedStateMap.put("jobd", Status.CANCELLED);
- expectedStateMap.put("jobd:innerJobA", Status.READY);
- expectedStateMap.put("jobd:innerFlow2", Status.READY);
- expectedStateMap.put("jobb:innerJobA", Status.READY);
- expectedStateMap.put("jobb:innerFlow", Status.READY);
- expectedStateMap.put("jobe", Status.CANCELLED);
- compareStates(expectedStateMap, nodeMap);
+ assertFlowStatus(Status.FAILED_FINISHING);
+ assertStatus("joba", Status.FAILED);
+ assertStatus("joba1", Status.RUNNING);
+ assertStatus("jobb", Status.CANCELLED);
+ assertStatus("jobc", Status.CANCELLED);
+ assertStatus("jobd", Status.CANCELLED);
+ assertStatus("jobd:innerJobA", Status.READY);
+ assertStatus("jobd:innerFlow2", Status.READY);
+ assertStatus("jobb:innerJobA", Status.READY);
+ assertStatus("jobb:innerFlow", Status.READY);
+ assertStatus("jobe", Status.CANCELLED);
// 3. jobb:Inner completes
/// innerJobA completes
InteractiveTestJob.getTestJob("joba1").succeedJob();
- pause(250);
- expectedStateMap.put("jobf", Status.CANCELLED);
- Assert.assertEquals(Status.FAILED, flow.getStatus());
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobf", Status.CANCELLED);
+ assertFlowStatus(Status.FAILED);
+ assertThreadShutDown();
}
/**
* Test #2 on the default failure case.
*/
- @Ignore
@Test
public void testNormalFailure2() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
- final ExecutableFlow flow = runner.getExecutableFlow();
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+ this.runner = createFlowRunner(eventCollector, "jobf");
// 1. START FLOW
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").failJob();
- pause(250);
- expectedStateMap.put("joba1", Status.FAILED);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba1", Status.FAILED);
// 3. joba completes, everything is killed
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
- expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
- expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
- expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
- expectedStateMap.put("jobb", Status.KILLED);
- expectedStateMap.put("jobd", Status.KILLED);
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+ assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobB", Status.CANCELLED);
+ assertStatus("jobb:innerJobC", Status.CANCELLED);
+ assertStatus("jobb:innerFlow", Status.CANCELLED);
+ assertStatus("jobd:innerFlow2", Status.CANCELLED);
+ assertStatus("jobb", Status.KILLED);
+ assertStatus("jobd", Status.KILLED);
+ assertFlowStatus(Status.FAILED_FINISHING);
InteractiveTestJob.getTestJob("jobc").succeedJob();
- pause(250);
- expectedStateMap.put("jobc", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.CANCELLED);
- expectedStateMap.put("jobf", Status.CANCELLED);
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(Status.FAILED, flow.getStatus());
-
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobc", Status.SUCCEEDED);
+ assertStatus("jobe", Status.CANCELLED);
+ assertStatus("jobf", Status.CANCELLED);
+ assertFlowStatus(Status.FAILED);
+ assertThreadShutDown();
}
- @Ignore
@Test
public void testNormalFailure3() throws Exception {
// Test propagation of CANCELLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
- final ExecutableFlow flow = runner.getExecutableFlow();
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+ this.runner = createFlowRunner(eventCollector, "jobf");
// 1. START FLOW
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB in subflow FAILS
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobB", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
- pause(250);
- expectedStateMap.put("jobb", Status.FAILED_FINISHING);
- expectedStateMap.put("jobb:innerJobB", Status.FAILED);
- Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb", Status.FAILED_FINISHING);
+ assertStatus("jobb:innerJobB", Status.FAILED);
+ assertFlowStatus(Status.FAILED_FINISHING);
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
- expectedStateMap.put("jobd", Status.KILLED);
- expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
- expectedStateMap.put("jobb", Status.FAILED);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+ assertStatus("jobd:innerFlow2", Status.CANCELLED);
+ assertStatus("jobd", Status.KILLED);
+ assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+ assertStatus("jobb:innerFlow", Status.CANCELLED);
+ assertStatus("jobb", Status.FAILED);
// 3. jobc completes, everything is killed
InteractiveTestJob.getTestJob("jobc").succeedJob();
- pause(250);
- expectedStateMap.put("jobc", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.CANCELLED);
- expectedStateMap.put("jobf", Status.CANCELLED);
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(Status.FAILED, flow.getStatus());
-
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobc", Status.SUCCEEDED);
+ assertStatus("jobe", Status.CANCELLED);
+ assertStatus("jobf", Status.CANCELLED);
+ assertFlowStatus(Status.FAILED);
+ assertThreadShutDown();
}
/**
- * Tests failures when the fail behaviour is FINISH_ALL_POSSIBLE.
- * In this case, all jobs which have had its pre-requisite met can continue
- * to run. Finishes when the failure is propagated to the last node of the
- * flow.
+ * Tests failures when the fail behaviour is FINISH_ALL_POSSIBLE. In this case, all jobs which
+ * have had its pre-requisite met can continue to run. Finishes when the failure is propagated to
+ * the last node of the flow.
*/
- @Ignore
@Test
public void testFailedFinishingFailure3() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+ this.runner = createFlowRunner(eventCollector, "jobf",
FailureAction.FINISH_ALL_POSSIBLE);
- final ExecutableFlow flow = runner.getExecutableFlow();
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
// 1. START FLOW
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB in subflow FAILS
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobB", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
- pause(250);
- expectedStateMap.put("jobb", Status.FAILED_FINISHING);
- expectedStateMap.put("jobb:innerJobB", Status.FAILED);
- Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb", Status.FAILED_FINISHING);
+ assertStatus("jobb:innerJobB", Status.FAILED);
+ assertFlowStatus(Status.FAILED_FINISHING);
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobb", Status.FAILED);
- expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb", Status.FAILED);
+ assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+ assertStatus("jobd:innerFlow2", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+ assertStatus("jobb:innerFlow", Status.CANCELLED);
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
- pause(250);
- expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
- expectedStateMap.put("jobd", Status.SUCCEEDED);
+ assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
+ assertStatus("jobd", Status.SUCCEEDED);
// 3. jobc completes, everything is killed
InteractiveTestJob.getTestJob("jobc").succeedJob();
- pause(250);
- expectedStateMap.put("jobc", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.CANCELLED);
- expectedStateMap.put("jobf", Status.CANCELLED);
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(Status.FAILED, flow.getStatus());
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobc", Status.SUCCEEDED);
+ assertStatus("jobe", Status.CANCELLED);
+ assertStatus("jobf", Status.CANCELLED);
+ assertFlowStatus(Status.FAILED);
+ assertThreadShutDown();
}
/**
- * Tests the failure condition when a failure invokes a cancel (or killed)
- * on the flow.
+ * Tests the failure condition when a failure invokes a cancel (or killed) on the flow.
*
- * Any jobs that are running will be assigned a KILLED state, and any nodes
- * which were skipped due to prior errors will be given a CANCELLED state.
+ * Any jobs that are running will be assigned a KILLED state, and any nodes which were skipped due
+ * to prior errors will be given a CANCELLED state.
*/
- @Ignore
@Test
public void testCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+ this.runner = createFlowRunner(eventCollector, "jobf",
FailureAction.CANCEL_ALL);
- final ExecutableFlow flow = runner.getExecutableFlow();
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
// 1. START FLOW
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB in subflow FAILS
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobB", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
- pause(250);
- expectedStateMap.put("jobb", Status.FAILED);
- expectedStateMap.put("jobb:innerJobB", Status.FAILED);
- expectedStateMap.put("jobb:innerJobC", Status.KILLED);
- expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
- expectedStateMap.put("jobc", Status.KILLED);
- expectedStateMap.put("jobd", Status.KILLED);
- expectedStateMap.put("jobd:innerJobA", Status.KILLED);
- expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
- expectedStateMap.put("jobe", Status.CANCELLED);
- expectedStateMap.put("jobf", Status.CANCELLED);
- compareStates(expectedStateMap, nodeMap);
-
- Assert.assertFalse(thread.isAlive());
- Assert.assertEquals(Status.FAILED, flow.getStatus());
-
+ assertStatus("jobb", Status.FAILED);
+ assertStatus("jobb:innerJobB", Status.FAILED);
+ assertStatus("jobb:innerJobC", Status.KILLED);
+ assertStatus("jobb:innerFlow", Status.CANCELLED);
+ assertStatus("jobc", Status.KILLED);
+ assertStatus("jobd", Status.KILLED);
+ assertStatus("jobd:innerJobA", Status.KILLED);
+ assertStatus("jobd:innerFlow2", Status.CANCELLED);
+ assertStatus("jobe", Status.CANCELLED);
+ assertStatus("jobf", Status.CANCELLED);
+
+ assertThreadShutDown();
+ assertFlowStatus(Status.KILLED);
}
/**
* Tests retries after a failure
*/
- @Ignore
@Test
public void testRetryOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
- final ExecutableFlow flow = runner.getExecutableFlow();
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+ this.runner = createFlowRunner(eventCollector, "jobf");
+ final ExecutableFlow flow = this.runner.getExecutableFlow();
flow.getExecutableNode("joba").setStatus(Status.DISABLED);
((ExecutableFlowBase) flow.getExecutableNode("jobb")).getExecutableNode(
"innerFlow").setStatus(Status.DISABLED);
// 1. START FLOW
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
- // After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.SKIPPED);
- expectedStateMap.put("joba1", Status.RUNNING);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.SKIPPED);
+ assertStatus("joba1", Status.RUNNING);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobB", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
InteractiveTestJob.getTestJob("jobb:innerJobC").failJob();
- pause(250);
+ assertStatus("jobb:innerJobB", Status.FAILED);
+ assertStatus("jobb:innerJobC", Status.FAILED);
+ assertStatus("jobb", Status.FAILED);
+ assertStatus("jobb:innerFlow", Status.SKIPPED);
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobb", Status.FAILED);
- expectedStateMap.put("jobb:innerJobB", Status.FAILED);
- expectedStateMap.put("jobb:innerJobC", Status.FAILED);
- expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
- expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
- expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobd", Status.KILLED);
- Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
- compareStates(expectedStateMap, nodeMap);
-
- final ExecutableNode node = nodeMap.get("jobd:innerFlow2");
+ assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+ assertStatus("jobd:innerFlow2", Status.CANCELLED);
+ assertStatus("jobd", Status.KILLED);
+ assertFlowStatus(Status.FAILED_FINISHING);
+
+ final ExecutableNode node = this.runner.getExecutableFlow()
+ .getExecutableNodePath("jobd:innerFlow2");
final ExecutableFlowBase base = node.getParentFlow();
for (final String nodeId : node.getInNodes()) {
final ExecutableNode inNode = base.getExecutableNode(nodeId);
System.out.println(inNode.getId() + " > " + inNode.getStatus());
}
- runner.retryFailures("me");
- pause(500);
- expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobb:innerFlow", Status.DISABLED);
- expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
- Assert.assertEquals(Status.RUNNING, flow.getStatus());
- compareStates(expectedStateMap, nodeMap);
- Assert.assertTrue(thread.isAlive());
+ this.runner.retryFailures("me");
+ assertStatus("jobb:innerJobB", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.RUNNING);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobb:innerFlow", Status.DISABLED);
+ assertStatus("jobd:innerFlow2", Status.RUNNING);
+ assertFlowStatus(Status.RUNNING);
+ assertThreadRunning();
InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
InteractiveTestJob.getTestJob("jobc").succeedJob();
- pause(250);
- expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
- expectedStateMap.put("jobb", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
- expectedStateMap.put("jobc", Status.SUCCEEDED);
- expectedStateMap.put("jobd", Status.SUCCEEDED);
- expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb:innerFlow", Status.SKIPPED);
+ assertStatus("jobb", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobB", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+ assertStatus("jobc", Status.SUCCEEDED);
+ assertStatus("jobd", Status.SUCCEEDED);
+ assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
+ assertStatus("jobe", Status.RUNNING);
InteractiveTestJob.getTestJob("jobe").succeedJob();
- pause(250);
- expectedStateMap.put("jobe", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobe", Status.SUCCEEDED);
InteractiveTestJob.getTestJob("joba1").succeedJob();
- pause(250);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobf", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobf", Status.RUNNING);
InteractiveTestJob.getTestJob("jobf").succeedJob();
- pause(250);
- expectedStateMap.put("jobf", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobf", Status.SUCCEEDED);
+ assertFlowStatus(Status.SUCCEEDED);
+ assertThreadShutDown();
}
/**
- * Tests the manual Killing of a flow. In this case, the flow is just fine
- * before the cancel
- * is called.
+ * Tests the manual Killing of a flow. In this case, the flow is just fine before the cancel is
+ * called.
*/
- @Ignore
@Test
public void testCancel() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+ this.runner = createFlowRunner(eventCollector, "jobf",
FailureAction.CANCEL_ALL);
- final ExecutableFlow flow = runner.getExecutableFlow();
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
// 1. START FLOW
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(1000);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB in subflow FAILS
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
-
- runner.kill("me");
- pause(250);
-
- expectedStateMap.put("jobb", Status.KILLED);
- expectedStateMap.put("jobb:innerJobB", Status.KILLED);
- expectedStateMap.put("jobb:innerJobC", Status.KILLED);
- expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
- expectedStateMap.put("jobc", Status.KILLED);
- expectedStateMap.put("jobd", Status.KILLED);
- expectedStateMap.put("jobd:innerJobA", Status.KILLED);
- expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
- expectedStateMap.put("jobe", Status.CANCELLED);
- expectedStateMap.put("jobf", Status.CANCELLED);
-
- Assert.assertEquals(Status.KILLED, flow.getStatus());
- compareStates(expectedStateMap, nodeMap);
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobB", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.RUNNING);
+
+ this.runner.kill("me");
+
+ assertStatus("jobb", Status.KILLED);
+ assertStatus("jobb:innerJobB", Status.KILLED);
+ assertStatus("jobb:innerJobC", Status.KILLED);
+ assertStatus("jobb:innerFlow", Status.CANCELLED);
+ assertStatus("jobc", Status.KILLED);
+ assertStatus("jobd", Status.KILLED);
+ assertStatus("jobd:innerJobA", Status.KILLED);
+ assertStatus("jobd:innerFlow2", Status.CANCELLED);
+ assertStatus("jobe", Status.CANCELLED);
+ assertStatus("jobf", Status.CANCELLED);
+
+ assertFlowStatus(Status.KILLED);
+ assertThreadShutDown();
}
/**
* Tests the manual invocation of cancel on a flow that is FAILED_FINISHING
*/
- @Ignore
@Test
public void testManualCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
- final ExecutableFlow flow = runner.getExecutableFlow();
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+ this.runner = createFlowRunner(eventCollector, "jobf");
// 1. START FLOW
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB in subflow FAILS
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobB", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.RUNNING);
InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
- pause(250);
- expectedStateMap.put("jobb:innerJobB", Status.FAILED);
- expectedStateMap.put("jobb", Status.FAILED_FINISHING);
- Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
- compareStates(expectedStateMap, nodeMap);
-
- runner.kill("me");
- pause(1000);
-
- expectedStateMap.put("jobb", Status.FAILED);
- expectedStateMap.put("jobb:innerJobC", Status.KILLED);
- expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
- expectedStateMap.put("jobc", Status.KILLED);
- expectedStateMap.put("jobd", Status.KILLED);
- expectedStateMap.put("jobd:innerJobA", Status.KILLED);
- expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
- expectedStateMap.put("jobe", Status.CANCELLED);
- expectedStateMap.put("jobf", Status.CANCELLED);
-
- Assert.assertEquals(Status.KILLED, flow.getStatus());
- compareStates(expectedStateMap, nodeMap);
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobb:innerJobB", Status.FAILED);
+ assertStatus("jobb", Status.FAILED_FINISHING);
+ assertFlowStatus(Status.FAILED_FINISHING);
+
+ this.runner.kill("me");
+
+ assertStatus("jobb", Status.FAILED);
+ assertStatus("jobb:innerJobC", Status.KILLED);
+ assertStatus("jobb:innerFlow", Status.CANCELLED);
+ assertStatus("jobc", Status.KILLED);
+ assertStatus("jobd", Status.KILLED);
+ assertStatus("jobd:innerJobA", Status.KILLED);
+ assertStatus("jobd:innerFlow2", Status.CANCELLED);
+ assertStatus("jobe", Status.CANCELLED);
+ assertStatus("jobf", Status.CANCELLED);
+
+ assertFlowStatus(Status.KILLED);
+ assertThreadShutDown();
}
/**
* Tests that pause and resume work
*/
- @Ignore
@Test
public void testPause() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+ this.runner = createFlowRunner(eventCollector, "jobf");
// 1. START FLOW
- final ExecutableFlow flow = runner.getExecutableFlow();
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
- runner.pause("test");
+ this.runner.pause("test");
InteractiveTestJob.getTestJob("joba").succeedJob();
// 2.1 JOB A COMPLETES SUCCESSFULLY AFTER PAUSE
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertFlowStatus(Status.PAUSED);
// 2.2 Flow is unpaused
- runner.resume("test");
- pause(250);
- Assert.assertEquals(flow.getStatus(), Status.RUNNING);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.RUNNING);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ this.runner.resume("test");
+ assertFlowStatus(Status.RUNNING);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("joba1", Status.RUNNING);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
// 3. jobb:Inner completes
- runner.pause("test");
+ this.runner.pause("test");
/// innerJobA completes, but paused
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob(
Props.of("output.jobb.innerJobA", "jobb.innerJobA"));
- pause(250);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
- runner.resume("test");
- pause(250);
- expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ this.runner.resume("test");
+ assertStatus("jobb:innerJobB", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.RUNNING);
/// innerJobB, C completes
InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob(
Props.of("output.jobb.innerJobB", "jobb.innerJobB"));
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob(
Props.of("output.jobb.innerJobC", "jobb.innerJobC"));
- pause(250);
- expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerFlow", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb:innerJobB", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+ assertStatus("jobb:innerFlow", Status.RUNNING);
// 4. Finish up on inner flow for jobb
InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob(
Props.of("output1.jobb", "test1", "output2.jobb", "test2"));
- pause(250);
- expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
- expectedStateMap.put("jobb", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobb:innerFlow", Status.SUCCEEDED);
+ assertStatus("jobb", Status.SUCCEEDED);
// 5. Finish jobc, jobd
InteractiveTestJob.getTestJob("jobc").succeedJob(
Props.of("output.jobc", "jobc"));
- pause(250);
- expectedStateMap.put("jobc", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobc", Status.SUCCEEDED);
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
- pause(250);
InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
- pause(250);
- expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
- expectedStateMap.put("jobd", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+ assertStatus("jobd:innerFlow2", Status.SUCCEEDED);
+ assertStatus("jobd", Status.SUCCEEDED);
+ assertStatus("jobe", Status.RUNNING);
// 6. Finish off flow
InteractiveTestJob.getTestJob("joba1").succeedJob();
- pause(250);
InteractiveTestJob.getTestJob("jobe").succeedJob();
- pause(250);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.SUCCEEDED);
- expectedStateMap.put("jobf", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobe", Status.SUCCEEDED);
+ assertStatus("jobf", Status.RUNNING);
InteractiveTestJob.getTestJob("jobf").succeedJob();
- pause(250);
- expectedStateMap.put("jobf", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
-
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobf", Status.SUCCEEDED);
+ assertFlowStatus(Status.SUCCEEDED);
+ assertThreadShutDown();
}
/**
- * Test the condition for a manual invocation of a KILL (cancel) on a flow
- * that has been paused. The flow should unpause and be killed immediately.
+ * Test the condition for a manual invocation of a KILL (cancel) on a flow that has been paused.
+ * The flow should unpause and be killed immediately.
*/
- @Ignore
@Test
public void testPauseKill() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf");
-
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
+ this.runner = createFlowRunner(eventCollector, "jobf");
// 1. START FLOW
- final ExecutableFlow flow = runner.getExecutableFlow();
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB A COMPLETES SUCCESSFULLY
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.RUNNING);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
-
- runner.pause("me");
- pause(250);
- Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("joba1", Status.RUNNING);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+
+ this.runner.pause("me");
+ assertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
- pause(250);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
-
- runner.kill("me");
- pause(250);
- expectedStateMap.put("joba1", Status.KILLED);
- expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
- expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
- expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
- expectedStateMap.put("jobb", Status.KILLED);
- expectedStateMap.put("jobc", Status.KILLED);
- expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
- expectedStateMap.put("jobd", Status.KILLED);
- expectedStateMap.put("jobe", Status.CANCELLED);
- expectedStateMap.put("jobf", Status.CANCELLED);
-
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(Status.KILLED, flow.getStatus());
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+ assertStatus("jobd:innerJobA", Status.SUCCEEDED);
+
+ this.runner.kill("me");
+ assertStatus("joba1", Status.KILLED);
+ assertStatus("jobb:innerJobB", Status.CANCELLED);
+ assertStatus("jobb:innerJobC", Status.CANCELLED);
+ assertStatus("jobb:innerFlow", Status.CANCELLED);
+ assertStatus("jobb", Status.KILLED);
+ assertStatus("jobc", Status.KILLED);
+ assertStatus("jobd:innerFlow2", Status.CANCELLED);
+ assertStatus("jobd", Status.KILLED);
+ assertStatus("jobe", Status.CANCELLED);
+ assertStatus("jobf", Status.CANCELLED);
+
+ assertFlowStatus(Status.KILLED);
+ assertThreadShutDown();
}
/**
- * Tests the case where a failure occurs on a Paused flow. In this case, the
- * flow should stay paused.
+ * Tests the case where a failure occurs on a Paused flow. In this case, the flow should stay
+ * paused.
*/
- @Ignore
@Test
public void testPauseFail() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+ this.eventCollector = new EventCollectorListener();
+ this.runner = createFlowRunner(this.eventCollector, "jobf",
FailureAction.FINISH_CURRENTLY_RUNNING);
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
-
// 1. START FLOW
- final ExecutableFlow flow = runner.getExecutableFlow();
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB A COMPLETES SUCCESSFULLY
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.RUNNING);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
-
- runner.pause("me");
- pause(250);
- Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("joba1", Status.RUNNING);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+
+ this.runner.pause("me");
+ assertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
- pause(250);
- expectedStateMap.put("jobd:innerJobA", Status.FAILED);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(flow.getStatus(), Status.PAUSED);
-
- runner.resume("me");
- pause(250);
- expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
- expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
- expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
- expectedStateMap.put("jobb", Status.KILLED);
- expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
- expectedStateMap.put("jobd", Status.FAILED);
+ assertStatus("jobd:innerJobA", Status.FAILED);
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+ // When flow is paused, no new jobs are started. So these two jobs that were already running
+ // are allowed to finish, but their dependencies aren't started.
+ // Now, ensure that jobd:innerJobA has completely finished as failed before resuming.
+ // If we would resume before the job failure has been completely processed, FlowRunner would be
+ // able to start some new jobs instead of cancelling everything.
+ waitEventFired("jobd:innerJobA", Status.FAILED);
+ assertFlowStatus(Status.PAUSED);
+
+ this.runner.resume("me");
+ assertStatus("jobb:innerJobB", Status.CANCELLED);
+ assertStatus("jobb:innerJobC", Status.CANCELLED);
+ assertStatus("jobb:innerFlow", Status.CANCELLED);
+ assertStatus("jobb", Status.KILLED);
+ assertStatus("jobd:innerFlow2", Status.CANCELLED);
+ assertStatus("jobd", Status.FAILED);
InteractiveTestJob.getTestJob("jobc").succeedJob();
InteractiveTestJob.getTestJob("joba1").succeedJob();
- pause(250);
- expectedStateMap.put("jobc", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobf", Status.CANCELLED);
- expectedStateMap.put("jobe", Status.CANCELLED);
-
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(Status.FAILED, flow.getStatus());
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobc", Status.SUCCEEDED);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobf", Status.CANCELLED);
+ assertStatus("jobe", Status.CANCELLED);
+
+ assertFlowStatus(Status.FAILED);
+ assertThreadShutDown();
}
/**
- * Test the condition when a Finish all possible is called during a pause.
- * The Failure is not acted upon until the flow is resumed.
+ * Test the condition when a Finish all possible is called during a pause. The Failure is not
+ * acted upon until the flow is resumed.
*/
- @Ignore
@Test
public void testPauseFailFinishAll() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+ this.runner = createFlowRunner(eventCollector, "jobf",
FailureAction.FINISH_ALL_POSSIBLE);
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
-
// 1. START FLOW
- final ExecutableFlow flow = runner.getExecutableFlow();
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB A COMPLETES SUCCESSFULLY
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(250);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.RUNNING);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
-
- runner.pause("me");
- pause(250);
- Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("joba1", Status.RUNNING);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+
+ this.runner.pause("me");
+ assertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
- pause(250);
- expectedStateMap.put("jobd:innerJobA", Status.FAILED);
- expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
- compareStates(expectedStateMap, nodeMap);
-
- runner.resume("me");
- pause(250);
- expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
- expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
- expectedStateMap.put("jobd", Status.FAILED);
+ assertStatus("jobd:innerJobA", Status.FAILED);
+ assertStatus("jobb:innerJobA", Status.SUCCEEDED);
+
+ this.runner.resume("me");
+ assertStatus("jobb:innerJobB", Status.RUNNING);
+ assertStatus("jobb:innerJobC", Status.RUNNING);
+ assertStatus("jobd:innerFlow2", Status.CANCELLED);
+ assertStatus("jobd", Status.FAILED);
InteractiveTestJob.getTestJob("jobc").succeedJob();
InteractiveTestJob.getTestJob("joba1").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
- pause(250);
InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob();
- pause(250);
- expectedStateMap.put("jobc", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
- expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
- expectedStateMap.put("jobb", Status.SUCCEEDED);
- expectedStateMap.put("jobe", Status.CANCELLED);
- expectedStateMap.put("jobf", Status.CANCELLED);
-
- compareStates(expectedStateMap, nodeMap);
- Assert.assertEquals(Status.FAILED, flow.getStatus());
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobc", Status.SUCCEEDED);
+ assertStatus("joba1", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobB", Status.SUCCEEDED);
+ assertStatus("jobb:innerJobC", Status.SUCCEEDED);
+ assertStatus("jobb:innerFlow", Status.SUCCEEDED);
+ assertStatus("jobb", Status.SUCCEEDED);
+ assertStatus("jobe", Status.CANCELLED);
+ assertStatus("jobf", Status.CANCELLED);
+
+ assertFlowStatus(Status.FAILED);
+ assertThreadShutDown();
}
/**
- * Tests the case when a flow is paused and a failure causes a kill. The
- * flow should die immediately regardless of the 'paused' status.
+ * Tests the case when a flow is paused and a failure causes a kill. The flow should die
+ * immediately regardless of the 'paused' status.
*/
- @Ignore
@Test
public void testPauseFailKill() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner runner = createFlowRunner(eventCollector, "jobf",
+ this.runner = createFlowRunner(eventCollector, "jobf",
FailureAction.CANCEL_ALL);
- final Map<String, Status> expectedStateMap = new HashMap<>();
- final Map<String, ExecutableNode> nodeMap = new HashMap<>();
-
// 1. START FLOW
- final ExecutableFlow flow = runner.getExecutableFlow();
- createExpectedStateMap(flow, expectedStateMap, nodeMap);
- final Thread thread = runFlowRunnerInThread(runner);
- pause(250);
+ runFlowRunnerInThread(this.runner);
// After it starts up, only joba should be running
- expectedStateMap.put("joba", Status.RUNNING);
- expectedStateMap.put("joba1", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
+ assertStatus("joba", Status.RUNNING);
+ assertStatus("joba1", Status.RUNNING);
// 2. JOB A COMPLETES SUCCESSFULLY
InteractiveTestJob.getTestJob("joba").succeedJob();
- pause(500);
- expectedStateMap.put("joba", Status.SUCCEEDED);
- expectedStateMap.put("joba1", Status.RUNNING);
- expectedStateMap.put("jobb", Status.RUNNING);
- expectedStateMap.put("jobc", Status.RUNNING);
- expectedStateMap.put("jobd", Status.RUNNING);
- expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
- expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
- compareStates(expectedStateMap, nodeMap);
-
- runner.pause("me");
- pause(250);
- Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+ assertStatus("joba", Status.SUCCEEDED);
+ assertStatus("joba1", Status.RUNNING);
+ assertStatus("jobb", Status.RUNNING);
+ assertStatus("jobc", Status.RUNNING);
+ assertStatus("jobd", Status.RUNNING);
+ assertStatus("jobd:innerJobA", Status.RUNNING);
+ assertStatus("jobb:innerJobA", Status.RUNNING);
+
+ this.runner.pause("me");
+ assertFlowStatus(Status.PAUSED);
InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
- pause(250);
- expectedStateMap.put("jobd:innerJobA", Status.FAILED);
- expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
- expectedStateMap.put("jobd", Status.FAILED);
- expectedStateMap.put("jobb:innerJobA", Status.KILLED);
- expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
- expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
- expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
- expectedStateMap.put("jobb", Status.KILLED);
- expectedStateMap.put("jobc", Status.KILLED);
- expectedStateMap.put("jobe", Status.CANCELLED);
- expectedStateMap.put("jobf", Status.CANCELLED);
- expectedStateMap.put("joba1", Status.KILLED);
- compareStates(expectedStateMap, nodeMap);
-
- Assert.assertEquals(Status.FAILED, flow.getStatus());
- Assert.assertFalse(thread.isAlive());
+ assertStatus("jobd:innerJobA", Status.FAILED);
+ assertStatus("jobd:innerFlow2", Status.CANCELLED);
+ assertStatus("jobd", Status.FAILED);
+ assertStatus("jobb:innerJobA", Status.KILLED);
+ assertStatus("jobb:innerJobB", Status.CANCELLED);
+ assertStatus("jobb:innerJobC", Status.CANCELLED);
+ assertStatus("jobb:innerFlow", Status.CANCELLED);
+ assertStatus("jobb", Status.KILLED);
+ assertStatus("jobc", Status.KILLED);
+ assertStatus("jobe", Status.CANCELLED);
+ assertStatus("jobf", Status.CANCELLED);
+ assertStatus("joba1", Status.KILLED);
+
+ assertFlowStatus(Status.KILLED);
+ assertThreadShutDown();
}
-
private Thread runFlowRunnerInThread(final FlowRunner runner) {
final Thread thread = new Thread(runner);
thread.start();
return thread;
}
- private void pause(final long millisec) {
+ private void sleep(final long millisec) {
try {
Thread.sleep(millisec);
} catch (final InterruptedException e) {
}
}
- private void createExpectedStateMap(final ExecutableFlowBase flow,
- final Map<String, Status> expectedStateMap,
- final Map<String, ExecutableNode> nodeMap) {
- for (final ExecutableNode node : flow.getExecutableNodes()) {
- expectedStateMap.put(node.getNestedId(), node.getStatus());
- nodeMap.put(node.getNestedId(), node);
- if (node instanceof ExecutableFlowBase) {
- createExpectedStateMap((ExecutableFlowBase) node, expectedStateMap,
- nodeMap);
- }
- }
- }
-
- private void compareStates(final Map<String, Status> expectedStateMap,
- final Map<String, ExecutableNode> nodeMap) {
- for (final String printedId : expectedStateMap.keySet()) {
- final Status expectedStatus = expectedStateMap.get(printedId);
- final ExecutableNode node = nodeMap.get(printedId);
-
- if (expectedStatus != node.getStatus()) {
- Assert.fail("Expected values do not match for " + printedId
- + ". Expected " + expectedStatus + ", instead received "
- + node.getStatus());
- }
- }
- }
-
private void prepareProject(final Project project, final File directory)
throws ProjectManagerException, IOException {
final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), this.logger);
@@ -1370,7 +1097,6 @@ public class FlowRunnerTest2 {
for (final String error : loader.getErrors()) {
System.out.println(error);
}
-
throw new RuntimeException("Errors found in setup");
}