Details
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/FlowWatcherTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/FlowWatcherTestUtil.java
new file mode 100644
index 0000000..782084f
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/FlowWatcherTestUtil.java
@@ -0,0 +1,105 @@
+package azkaban.execapp.event;
+
+import azkaban.execapp.FlowRunner;
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.Status;
+import org.junit.Assert;
+
+public class FlowWatcherTestUtil {
+
+ public static void assertPipelineLevel1(final FlowRunner runner1,
+ final FlowRunner runner2) throws Exception {
+
+ run(runner1, runner2);
+
+ final ExecutableFlow first = runner1.getExecutableFlow();
+ final ExecutableFlow second = runner2.getExecutableFlow();
+
+ for (final ExecutableNode node : second.getExecutableNodes()) {
+ Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+
+ // check it's start time is after the first's children.
+ final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
+ if (watchedNode == null) {
+ continue;
+ }
+ Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
+
+ System.out.println("Node " + node.getId() + " start: "
+ + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
+ + watchedNode.getEndTime() + " diff: "
+ + (node.getStartTime() - watchedNode.getEndTime()));
+
+ Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
+
+ long minParentDiff = 0;
+ if (node.getInNodes().size() > 0) {
+ minParentDiff = Long.MAX_VALUE;
+ for (final String dependency : node.getInNodes()) {
+ final ExecutableNode parent = second.getExecutableNode(dependency);
+ final long diff = node.getStartTime() - parent.getEndTime();
+ minParentDiff = Math.min(minParentDiff, diff);
+ }
+ }
+ final long diff = node.getStartTime() - watchedNode.getEndTime();
+ Assert.assertTrue(minParentDiff < 500 || diff < 500);
+ }
+ }
+
+ public static void assertPipelineLevel2(final FlowRunner runner1, final FlowRunner runner2,
+ final boolean job4Skipped) throws Exception {
+ run(runner1, runner2);
+ assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), job4Skipped);
+ }
+
+ public static void assertPipelineLevel2(final ExecutableFlow first,
+ final ExecutableFlow second, final boolean job4Skipped) {
+ for (final ExecutableNode node : second.getExecutableNodes()) {
+ Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
+
+ // check it's start time is after the first's children.
+ final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
+ if (watchedNode == null) {
+ continue;
+ }
+ Assert.assertEquals(watchedNode.getStatus(),
+ job4Skipped && watchedNode.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
+
+ long minDiff = Long.MAX_VALUE;
+ for (final String watchedChild : watchedNode.getOutNodes()) {
+ final ExecutableNode child = first.getExecutableNode(watchedChild);
+ if (child == null) {
+ continue;
+ }
+ Assert.assertEquals(child.getStatus(),
+ job4Skipped && child.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
+ final long diff = node.getStartTime() - child.getEndTime();
+ minDiff = Math.min(minDiff, diff);
+ Assert.assertTrue(
+ "Node " + node.getId() + " start: " + node.getStartTime() + " dependent on "
+ + watchedChild + " " + child.getEndTime() + " diff: " + diff,
+ node.getStartTime() >= child.getEndTime());
+ }
+
+ long minParentDiff = Long.MAX_VALUE;
+ for (final String dependency : node.getInNodes()) {
+ final ExecutableNode parent = second.getExecutableNode(dependency);
+ final long diff = node.getStartTime() - parent.getEndTime();
+ minParentDiff = Math.min(minParentDiff, diff);
+ }
+ Assert.assertTrue("minPipelineTimeDiff:" + minDiff
+ + " minDependencyTimeDiff:" + minParentDiff,
+ minParentDiff < 5000 || minDiff < 5000);
+ }
+ }
+
+ private static void run(final FlowRunner runner1, final FlowRunner runner2)
+ throws InterruptedException {
+ final Thread runner1Thread = new Thread(runner1);
+ final Thread runner2Thread = new Thread(runner2);
+ runner1Thread.start();
+ runner2Thread.start();
+ runner2Thread.join();
+ }
+}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index 2244ade..cf891a0 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -16,249 +16,55 @@
package azkaban.execapp.event;
-import static org.mockito.Mockito.mock;
-
-import azkaban.execapp.EventCollectorListener;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.FlowRunnerTestUtil;
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorLoader;
-import azkaban.executor.JavaJob;
-import azkaban.executor.MockExecutorLoader;
-import azkaban.executor.Status;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.utils.Props;
-import java.io.File;
+import azkaban.executor.InteractiveTestJob;
import java.io.IOException;
-import org.apache.commons.io.FileUtils;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
public class LocalFlowWatcherTest {
- private final AzkabanEventReporter azkabanEventReporter = null;
- private JobTypeManager jobtypeManager;
- private int dirVal = 0;
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ private FlowRunnerTestUtil testUtil;
@Before
public void setUp() throws Exception {
- this.jobtypeManager =
- new JobTypeManager(null, null, this.getClass().getClassLoader());
- this.jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
+ this.testUtil = new FlowRunnerTestUtil("exectest1", this.temporaryFolder);
+ InteractiveTestJob.setQuickSuccess(true);
}
@After
public void tearDown() throws IOException {
+ InteractiveTestJob.resetQuickSuccess();
}
- public File setupDirectory() throws IOException {
- System.out.println("Create temp dir");
- final File workingDir = new File("_AzkabanTestDir_" + this.dirVal);
- if (workingDir.exists()) {
- FileUtils.deleteDirectory(workingDir);
- }
- workingDir.mkdirs();
- this.dirVal++;
-
- return workingDir;
- }
-
- @Ignore
@Test
public void testBasicLocalFlowWatcher() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
-
- final EventCollectorListener eventCollector = new EventCollectorListener();
-
- final File workingDir1 = setupDirectory();
- final FlowRunner runner1 =
- createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
- null);
- final Thread runner1Thread = new Thread(runner1);
-
- final File workingDir2 = setupDirectory();
- final LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
- final FlowRunner runner2 =
- createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
- watcher, 2);
- final Thread runner2Thread = new Thread(runner2);
-
- runner1Thread.start();
- runner2Thread.start();
- runner2Thread.join();
-
- FileUtils.deleteDirectory(workingDir1);
- FileUtils.deleteDirectory(workingDir2);
-
- testPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+ final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1", watcher(runner1), 2);
+ FlowWatcherTestUtil.assertPipelineLevel2(runner1, runner2, false);
}
- @Ignore
@Test
public void testLevel1LocalFlowWatcher() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
-
- final EventCollectorListener eventCollector = new EventCollectorListener();
-
- final File workingDir1 = setupDirectory();
- final FlowRunner runner1 =
- createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
- null);
- final Thread runner1Thread = new Thread(runner1);
-
- final File workingDir2 = setupDirectory();
- final LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
- final FlowRunner runner2 =
- createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
- watcher, 1);
- final Thread runner2Thread = new Thread(runner2);
-
- runner1Thread.start();
- runner2Thread.start();
- runner2Thread.join();
-
- FileUtils.deleteDirectory(workingDir1);
- FileUtils.deleteDirectory(workingDir2);
-
- testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+ final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1", watcher(runner1), 1);
+ FlowWatcherTestUtil.assertPipelineLevel1(runner1, runner2);
}
- @Ignore
@Test
public void testLevel2DiffLocalFlowWatcher() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
-
- final EventCollectorListener eventCollector = new EventCollectorListener();
-
- final File workingDir1 = setupDirectory();
- final FlowRunner runner1 =
- createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
- null);
- final Thread runner1Thread = new Thread(runner1);
-
- final File workingDir2 = setupDirectory();
- final LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
- final FlowRunner runner2 =
- createFlowRunner(workingDir2, loader, eventCollector, "exec1-mod", 2,
- watcher, 1);
- final Thread runner2Thread = new Thread(runner2);
-
- runner1Thread.start();
- runner2Thread.start();
- runner2Thread.join();
-
- FileUtils.deleteDirectory(workingDir1);
- FileUtils.deleteDirectory(workingDir2);
-
- testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+ final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1-mod", watcher(runner1), 2);
+ FlowWatcherTestUtil.assertPipelineLevel2(runner1, runner2, false);
}
- private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
- for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
-
- // check it's start time is after the first's children.
- final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
- if (watchedNode == null) {
- continue;
- }
- Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
-
- System.out.println("Node " + node.getId() + " start: "
- + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
- + watchedNode.getEndTime() + " diff: "
- + (node.getStartTime() - watchedNode.getEndTime()));
-
- Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
-
- long minParentDiff = 0;
- if (node.getInNodes().size() > 0) {
- minParentDiff = Long.MAX_VALUE;
- for (final String dependency : node.getInNodes()) {
- final ExecutableNode parent = second.getExecutableNode(dependency);
- final long diff = node.getStartTime() - parent.getEndTime();
- minParentDiff = Math.min(minParentDiff, diff);
- }
- }
- final long diff = node.getStartTime() - watchedNode.getEndTime();
- System.out.println(" minPipelineTimeDiff:" + diff
- + " minDependencyTimeDiff:" + minParentDiff);
- Assert.assertTrue(minParentDiff < 100 || diff < 100);
- }
+ private LocalFlowWatcher watcher(final FlowRunner previousRunner) {
+ return new LocalFlowWatcher(previousRunner);
}
-
- private void testPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
- for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
-
- // check it's start time is after the first's children.
- final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
- if (watchedNode == null) {
- continue;
- }
- Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
-
- long minDiff = Long.MAX_VALUE;
- for (final String watchedChild : watchedNode.getOutNodes()) {
- final ExecutableNode child = first.getExecutableNode(watchedChild);
- if (child == null) {
- continue;
- }
- Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
- final long diff = node.getStartTime() - child.getEndTime();
- minDiff = Math.min(minDiff, diff);
- System.out.println("Node " + node.getId() + " start: "
- + node.getStartTime() + " dependent on " + watchedChild + " "
- + child.getEndTime() + " diff: " + diff);
-
- Assert.assertTrue(node.getStartTime() >= child.getEndTime());
- }
-
- long minParentDiff = Long.MAX_VALUE;
- for (final String dependency : node.getInNodes()) {
- final ExecutableNode parent = second.getExecutableNode(dependency);
- final long diff = node.getStartTime() - parent.getEndTime();
- minParentDiff = Math.min(minParentDiff, diff);
- }
- System.out.println(" minPipelineTimeDiff:" + minDiff
- + " minDependencyTimeDiff:" + minParentDiff);
- Assert.assertTrue(minParentDiff < 100 || minDiff < 100);
- }
- }
-
- private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
- final EventCollectorListener eventCollector, final String flowName, final int execId,
- final FlowWatcher watcher, final Integer pipeline) throws Exception {
- return createFlowRunner(workingDir, loader, eventCollector, flowName, execId, watcher, pipeline,
- new Props());
- }
-
- private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
- final EventCollectorListener eventCollector, final String flowName, final int execId,
- final FlowWatcher watcher, final Integer pipeline, final Props azkabanProps)
- throws Exception {
- final File testDir = new File("unit/executions/exectest1");
- final ExecutableFlow exFlow =
- FlowRunnerTestUtil.prepareExecDir(workingDir, testDir, flowName, execId);
- final ExecutionOptions option = exFlow.getExecutionOptions();
- if (watcher != null) {
- option.setPipelineLevel(pipeline);
- option.setPipelineExecutionId(watcher.getExecId());
- }
- loader.uploadExecutableFlow(exFlow);
- final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
- this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
- runner.setFlowWatcher(watcher);
- runner.addListener(eventCollector);
-
- return runner;
- }
-
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index 49f443d..fbe6a6c 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -16,30 +16,13 @@
package azkaban.execapp.event;
-import static org.mockito.Mockito.mock;
-
-import azkaban.execapp.EventCollectorListener;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.FlowRunnerTestUtil;
-import azkaban.execapp.jmx.JmxJobMBeanManager;
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlowBase;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.test.Utils;
-import azkaban.test.executions.ExecutionsTestUtil;
-import azkaban.utils.Props;
-import java.io.File;
import java.io.IOException;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -47,18 +30,14 @@ import org.junit.rules.TemporaryFolder;
public class RemoteFlowWatcherTest {
- private final AzkabanEventReporter azkabanEventReporter = null;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
- private JobTypeManager jobtypeManager;
+ private FlowRunnerTestUtil testUtil;
@Before
public void setUp() throws Exception {
- this.jobtypeManager =
- new JobTypeManager(null, null, this.getClass().getClassLoader());
- this.jobtypeManager.getJobTypePluginSet().addPluginClass("test", InteractiveTestJob.class);
- Utils.initServiceProvider();
- JmxJobMBeanManager.getInstance().initialize(new Props());
+ this.testUtil = new FlowRunnerTestUtil("exectest1", this.temporaryFolder);
+ this.testUtil.setExecutorLoader(new MockExecutorLoader());
InteractiveTestJob.setQuickSuccess(true);
}
@@ -69,230 +48,50 @@ public class RemoteFlowWatcherTest {
@Test
public void testBasicRemoteFlowWatcher() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
-
- final EventCollectorListener eventCollector = new EventCollectorListener();
-
- final File workingDir1 = this.temporaryFolder.newFolder();
- final FlowRunner runner1 =
- createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
- null);
- final Thread runner1Thread = new Thread(runner1);
-
- final File workingDir2 = this.temporaryFolder.newFolder();
- final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
- final FlowRunner runner2 =
- createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
- watcher, 2);
- final Thread runner2Thread = new Thread(runner2);
-
- printCurrentState("runner1 ", runner1.getExecutableFlow());
- runner1Thread.start();
- runner2Thread.start();
-
- runner2Thread.join();
-
- assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), false);
+ final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+ final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1", watcher(runner1), 2);
+ FlowWatcherTestUtil.assertPipelineLevel2(runner1, runner2, false);
}
@Test
public void testRemoteFlowWatcherBlockingJobsLeftInReadyState() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
+ final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+ final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1", watcher(runner1), 2);
- final EventCollectorListener eventCollector = new EventCollectorListener();
-
- final File workingDir1 = this.temporaryFolder.newFolder();
- final FlowRunner runner1 =
- createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
- null);
final Thread runner1Thread = new Thread(runner1);
-
- final File workingDir2 = this.temporaryFolder.newFolder();
- final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
- final FlowRunner runner2 =
- createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
- watcher, 2);
- final Thread runner2Thread = new Thread(runner2);
-
- printCurrentState("runner1 ", runner1.getExecutableFlow());
runner1Thread.start();
runner1Thread.join();
+
// simulate a real life scenario - this can happen for disabled jobs inside subflows:
// flow has finished otherwise but a "blocking" job has READY status
// the test gets stuck here without the fix in FlowWatcher.peekStatus
runner1.getExecutableFlow().getExecutableNodePath("job4").setStatus(Status.READY);
- loader.updateExecutableFlow(runner1.getExecutableFlow());
+ this.testUtil.getExecutorLoader().updateExecutableFlow(runner1.getExecutableFlow());
+ final Thread runner2Thread = new Thread(runner2);
runner2Thread.start();
runner2Thread.join();
- assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), true);
+ FlowWatcherTestUtil
+ .assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), true);
}
@Test
public void testLevel1RemoteFlowWatcher() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
-
- final EventCollectorListener eventCollector = new EventCollectorListener();
-
- final File workingDir1 = this.temporaryFolder.newFolder();
- final FlowRunner runner1 =
- createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
- null);
- final Thread runner1Thread = new Thread(runner1);
-
- final File workingDir2 = this.temporaryFolder.newFolder();
- final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
- final FlowRunner runner2 =
- createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
- watcher, 1);
- final Thread runner2Thread = new Thread(runner2);
-
- runner1Thread.start();
- runner2Thread.start();
- runner2Thread.join();
-
- testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+ final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1", watcher(runner1), 1);
+ FlowWatcherTestUtil.assertPipelineLevel1(runner1, runner2);
}
@Test
public void testLevel2DiffRemoteFlowWatcher() throws Exception {
- final MockExecutorLoader loader = new MockExecutorLoader();
-
- final EventCollectorListener eventCollector = new EventCollectorListener();
-
- final File workingDir1 = this.temporaryFolder.newFolder();
- final FlowRunner runner1 =
- createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
- null);
- final Thread runner1Thread = new Thread(runner1);
-
- final File workingDir2 = this.temporaryFolder.newFolder();
-
- final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
- final FlowRunner runner2 =
- createFlowRunner(workingDir2, loader, eventCollector, "exec1-mod", 2,
- watcher, 1);
- final Thread runner2Thread = new Thread(runner2);
-
- runner1Thread.start();
- runner2Thread.start();
- runner2Thread.join();
-
- testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ final FlowRunner runner1 = this.testUtil.createFromFlowFile("exec1");
+ final FlowRunner runner2 = this.testUtil.createFromFlowFile("exec1-mod", watcher(runner1), 2);
+ FlowWatcherTestUtil.assertPipelineLevel2(runner1, runner2, false);
}
- private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
- for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
-
- // check it's start time is after the first's children.
- final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
- if (watchedNode == null) {
- continue;
- }
- Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
-
- System.out.println("Node " + node.getId() + " start: "
- + node.getStartTime() + " dependent on " + watchedNode.getId() + " "
- + watchedNode.getEndTime() + " diff: "
- + (node.getStartTime() - watchedNode.getEndTime()));
-
- Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
-
- long minParentDiff = 0;
- if (node.getInNodes().size() > 0) {
- minParentDiff = Long.MAX_VALUE;
- for (final String dependency : node.getInNodes()) {
- final ExecutableNode parent = second.getExecutableNode(dependency);
- final long diff = node.getStartTime() - parent.getEndTime();
- minParentDiff = Math.min(minParentDiff, diff);
- }
- }
- final long diff = node.getStartTime() - watchedNode.getEndTime();
- Assert.assertTrue(minParentDiff < 500 || diff < 500);
- }
+ private RemoteFlowWatcher watcher(final FlowRunner previousRunner) {
+ return new RemoteFlowWatcher(previousRunner.getExecutionId(),
+ this.testUtil.getExecutorLoader(), 10);
}
-
- private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second,
- final boolean job4Skipped) {
- for (final ExecutableNode node : second.getExecutableNodes()) {
- Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
-
- // check it's start time is after the first's children.
- final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
- if (watchedNode == null) {
- continue;
- }
- Assert.assertEquals(watchedNode.getStatus(),
- job4Skipped && watchedNode.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
-
- long minDiff = Long.MAX_VALUE;
- for (final String watchedChild : watchedNode.getOutNodes()) {
- final ExecutableNode child = first.getExecutableNode(watchedChild);
- if (child == null) {
- continue;
- }
- Assert.assertEquals(child.getStatus(),
- job4Skipped && child.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
- final long diff = node.getStartTime() - child.getEndTime();
- minDiff = Math.min(minDiff, diff);
- Assert.assertTrue(
- "Node " + node.getId() + " start: " + node.getStartTime() + " dependent on "
- + watchedChild + " " + child.getEndTime() + " diff: " + diff,
- node.getStartTime() >= child.getEndTime());
- }
-
- long minParentDiff = Long.MAX_VALUE;
- for (final String dependency : node.getInNodes()) {
- final ExecutableNode parent = second.getExecutableNode(dependency);
- final long diff = node.getStartTime() - parent.getEndTime();
- minParentDiff = Math.min(minParentDiff, diff);
- }
- Assert.assertTrue("minPipelineTimeDiff:" + minDiff
- + " minDependencyTimeDiff:" + minParentDiff,
- minParentDiff < 5000 || minDiff < 5000);
- }
- }
-
- private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
- final EventCollectorListener eventCollector, final String flowName, final int execId,
- final FlowWatcher watcher, final Integer pipeline) throws Exception {
- return createFlowRunner(workingDir, loader, eventCollector, flowName, execId, watcher, pipeline,
- new Props());
- }
-
- private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
- final EventCollectorListener eventCollector, final String flowName, final int execId,
- final FlowWatcher watcher, final Integer pipeline, final Props azkabanProps)
- throws Exception {
- final File testDir = ExecutionsTestUtil.getFlowDir("exectest1");
- final ExecutableFlow exFlow =
- FlowRunnerTestUtil.prepareExecDir(workingDir, testDir, flowName, execId);
- final ExecutionOptions options = exFlow.getExecutionOptions();
- if (watcher != null) {
- options.setPipelineLevel(pipeline);
- options.setPipelineExecutionId(watcher.getExecId());
- }
-
- loader.uploadExecutableFlow(exFlow);
- final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
- this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
- runner.setFlowWatcher(watcher);
- runner.addListener(eventCollector);
-
- return runner;
- }
-
- private void printCurrentState(final String prefix, final ExecutableFlowBase flow) {
- for (final ExecutableNode node : flow.getExecutableNodes()) {
-
- System.err.println(prefix + node.getNestedId() + "->"
- + node.getStatus().name());
- if (node instanceof ExecutableFlowBase) {
- printCurrentState(prefix, (ExecutableFlowBase) node);
- }
- }
- }
-
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 2de4f0e..7a36289 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -46,17 +46,14 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
@Test
public void testBasicPipelineLevel1RunDisabledJobs() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner previousRunner =
- this.testUtil.createFromFlowMap(eventCollector, "jobf", "prev");
+ final FlowRunner previousRunner = this.testUtil.createFromFlowMap("jobf", "prev");
final ExecutionOptions options = new ExecutionOptions();
options.setPipelineExecutionId(previousRunner.getExecutableFlow()
.getExecutionId());
options.setPipelineLevel(1);
final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
- final FlowRunner pipelineRunner =
- this.testUtil.createFromFlowMap(eventCollector, "jobf", "pipe", options);
+ final FlowRunner pipelineRunner = this.testUtil.createFromFlowMap("jobf", "pipe", options);
pipelineRunner.setFlowWatcher(watcher);
// 1. START FLOW
@@ -165,17 +162,14 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
@Test
public void testBasicPipelineLevel1Run() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner previousRunner =
- this.testUtil.createFromFlowMap(eventCollector, "jobf", "prev");
+ final FlowRunner previousRunner = this.testUtil.createFromFlowMap("jobf", "prev");
final ExecutionOptions options = new ExecutionOptions();
options.setPipelineExecutionId(previousRunner.getExecutableFlow()
.getExecutionId());
options.setPipelineLevel(1);
final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
- final FlowRunner pipelineRunner =
- this.testUtil.createFromFlowMap(eventCollector, "jobf", "pipe", options);
+ final FlowRunner pipelineRunner = this.testUtil.createFromFlowMap("jobf", "pipe", options);
pipelineRunner.setFlowWatcher(watcher);
// 1. START FLOW
@@ -286,17 +280,15 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
@Test
public void testBasicPipelineLevel2Run() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner previousRunner =
- this.testUtil.createFromFlowMap(eventCollector, "pipelineFlow", "prev");
+ final FlowRunner previousRunner = this.testUtil.createFromFlowMap("pipelineFlow", "prev");
final ExecutionOptions options = new ExecutionOptions();
options.setPipelineExecutionId(previousRunner.getExecutableFlow()
.getExecutionId());
options.setPipelineLevel(2);
final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
- final FlowRunner pipelineRunner =
- this.testUtil.createFromFlowMap(eventCollector, "pipelineFlow", "pipe", options);
+ final FlowRunner pipelineRunner = this.testUtil
+ .createFromFlowMap("pipelineFlow", "pipe", options);
pipelineRunner.setFlowWatcher(watcher);
// 1. START FLOW
@@ -418,17 +410,15 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
@Test
public void testBasicPipelineLevel2Run2() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- final FlowRunner previousRunner =
- this.testUtil.createFromFlowMap(eventCollector, "pipeline1_2", "prev");
+ final FlowRunner previousRunner = this.testUtil.createFromFlowMap("pipeline1_2", "prev");
final ExecutionOptions options = new ExecutionOptions();
options.setPipelineExecutionId(previousRunner.getExecutableFlow()
.getExecutionId());
options.setPipelineLevel(2);
final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
- final FlowRunner pipelineRunner =
- this.testUtil.createFromFlowMap(eventCollector, "pipeline1_2", "pipe", options);
+ final FlowRunner pipelineRunner = this.testUtil
+ .createFromFlowMap("pipeline1_2", "pipe", options);
pipelineRunner.setFlowWatcher(watcher);
// 1. START FLOW
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 54db1e6..b3da436 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -91,8 +91,6 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
*/
@Test
public void testBasicRun() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
-
final Map<String, String> flowParams = new HashMap<>();
flowParams.put("param4", "override.4");
flowParams.put("param10", "override.10");
@@ -101,8 +99,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
final ExecutionOptions options = new ExecutionOptions();
options.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
- this.runner = this.testUtil
- .createFromFlowMap(eventCollector, "jobf", options, flowParams, new Props());
+ this.runner = this.testUtil.createFromFlowMap("jobf", options, flowParams, new Props());
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -243,9 +240,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
*/
@Test
public void testDisabledNormal() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil
- .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
final ExecutableFlow flow = this.runner.getExecutableFlow();
flow.getExecutableNode("jobb").setStatus(Status.DISABLED);
((ExecutableFlowBase) flow.getExecutableNode("jobd")).getExecutableNode(
@@ -302,9 +297,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testNormalFailure1() throws Exception {
// Test propagation of KILLED status to embedded flows.
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil
- .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -341,9 +334,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testNormalFailure2() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil
- .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -390,9 +381,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testNormalFailure3() throws Exception {
// Test propagation of CANCELLED status to embedded flows different branch
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil
- .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -448,9 +437,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testFailedFinishingFailure3() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
- FailureAction.FINISH_ALL_POSSIBLE);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_ALL_POSSIBLE);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -510,9 +497,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
- FailureAction.CANCEL_ALL);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.CANCEL_ALL);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -559,9 +544,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testRetryOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil
- .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
final ExecutableFlow flow = this.runner.getExecutableFlow();
flow.getExecutableNode("joba").setStatus(Status.DISABLED);
((ExecutableFlowBase) flow.getExecutableNode("jobb")).getExecutableNode(
@@ -648,9 +631,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testCancel() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
- FailureAction.CANCEL_ALL);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.CANCEL_ALL);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -698,9 +679,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testManualCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil
- .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -751,9 +730,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
*/
@Test
public void testPause() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil
- .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -836,9 +813,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
*/
@Test
public void testPauseKill() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil
- .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -886,9 +861,9 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
*/
@Test
public void testPauseFail() throws Exception {
- this.eventCollector = new EventCollectorListener();
- this.runner = this.testUtil.createFromFlowMap(this.eventCollector, "jobf",
- FailureAction.FINISH_CURRENTLY_RUNNING);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+ final EventCollectorListener eventCollector = new EventCollectorListener();
+ this.runner.addListener(eventCollector);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -918,7 +893,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
// Now, ensure that jobd:innerJobA has completely finished as failed before resuming.
// If we would resume before the job failure has been completely processed, FlowRunner would be
// able to start some new jobs instead of cancelling everything.
- waitEventFired("jobd:innerJobA", Status.FAILED);
+ FlowRunnerTestUtil.waitEventFired(eventCollector, "jobd:innerJobA", Status.FAILED);
waitForAndAssertFlowStatus(Status.PAUSED);
this.runner.resume("me");
@@ -946,9 +921,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
*/
@Test
public void testPauseFailFinishAll() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
- FailureAction.FINISH_ALL_POSSIBLE);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.FINISH_ALL_POSSIBLE);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
@@ -1005,9 +978,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
*/
@Test
public void testFlowKilledByJobLevelSLA() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
- FailureAction.CANCEL_ALL);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.CANCEL_ALL);
FlowRunnerTestUtil.startThread(this.runner);
assertStatus("joba", Status.RUNNING);
@@ -1030,8 +1001,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
*/
@Test
public void testPauseFailKill() throws Exception {
- final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf", FailureAction.CANCEL_ALL);
+ this.runner = this.testUtil.createFromFlowMap("jobf", FailureAction.CANCEL_ALL);
// 1. START FLOW
FlowRunnerTestUtil.startThread(this.runner);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
index c1aa7a2..c15c2ce 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -18,9 +18,7 @@ package azkaban.execapp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import azkaban.event.Event;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
@@ -37,7 +35,6 @@ public class FlowRunnerTestBase {
public TemporaryFolder temporaryFolder = new TemporaryFolder();
protected FlowRunner runner;
- protected EventCollectorListener eventCollector;
public static boolean isStarted(final Status status) {
if (status == Status.QUEUED) {
@@ -104,22 +101,6 @@ public class FlowRunnerTestBase {
waitJobStatuses(FlowRunnerTest::isStarted, jobs);
}
- protected void waitEventFired(final String nestedId, final Status status)
- throws InterruptedException {
- for (int i = 0; i < 1000; i++) {
- for (final Event event : this.eventCollector.getEventList()) {
- if (event.getData().getStatus() == status && event.getData().getNestedId()
- .equals(nestedId)) {
- return;
- }
- }
- synchronized (EventCollectorListener.handleEvent) {
- EventCollectorListener.handleEvent.wait(10L);
- }
- }
- fail("Event wasn't fired with [" + nestedId + "], " + status);
- }
-
public boolean checkJobStatuses(final Function<Status, Boolean> statusCheck,
final String[] jobs) {
final ExecutableFlow exFlow = this.runner.getExecutableFlow();
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index 27158ab..c170f6e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -16,17 +16,22 @@
package azkaban.execapp;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import azkaban.event.Event;
+import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
+import azkaban.executor.MockExecutorLoader;
+import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
@@ -51,9 +56,9 @@ public class FlowRunnerTestUtil {
private final Map<String, Flow> flowMap;
private final Project project;
private final File workingDir;
- private final ExecutorLoader fakeExecutorLoader;
private final JobTypeManager jobtypeManager;
private final File projectDir;
+ private ExecutorLoader executorLoader;
public FlowRunnerTestUtil(final String flowName, final TemporaryFolder temporaryFolder)
throws Exception {
@@ -65,8 +70,8 @@ public class FlowRunnerTestUtil {
this.flowMap = FlowRunnerTestUtil
.prepareProject(this.project, ExecutionsTestUtil.getFlowDir(flowName), this.workingDir);
- this.fakeExecutorLoader = mock(ExecutorLoader.class);
- when(this.fakeExecutorLoader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
+ this.executorLoader = mock(ExecutorLoader.class);
+ when(this.executorLoader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
Utils.initServiceProvider();
JmxJobMBeanManager.getInstance().initialize(new Props());
@@ -109,6 +114,23 @@ public class FlowRunnerTestUtil {
return flowMap;
}
+ public static void waitEventFired(final EventCollectorListener eventCollector,
+ final String nestedId, final Status status)
+ throws InterruptedException {
+ for (int i = 0; i < 1000; i++) {
+ for (final Event event : eventCollector.getEventList()) {
+ if (event.getData().getStatus() == status && event.getData().getNestedId()
+ .equals(nestedId)) {
+ return;
+ }
+ }
+ synchronized (EventCollectorListener.handleEvent) {
+ EventCollectorListener.handleEvent.wait(10L);
+ }
+ }
+ fail("Event wasn't fired with [" + nestedId + "], " + status);
+ }
+
public static ExecutableFlow prepareExecDir(final File workingDir, final File execDir,
final String flowName, final int execId) throws IOException {
FileUtils.copyDirectory(execDir, workingDir);
@@ -126,30 +148,53 @@ public class FlowRunnerTestUtil {
new Thread(runner).start();
}
+ public FlowRunner createFromFlowFile(final String flowName) throws Exception {
+ return createFromFlowFile(new EventCollectorListener(), flowName);
+ }
+
public FlowRunner createFromFlowFile(final EventCollectorListener eventCollector,
final String flowName) throws Exception {
return createFromFlowFile(flowName, eventCollector, new ExecutionOptions());
}
public FlowRunner createFromFlowFile(final String flowName,
+ final EventCollectorListener eventCollector, final ExecutionOptions options)
+ throws Exception {
+ return createFromFlowFile(flowName, eventCollector, options, null, null);
+ }
+
+ public FlowRunner createFromFlowFile(final String flowName, final FlowWatcher watcher,
+ final Integer pipeline) throws Exception {
+ return createFromFlowFile(flowName, new EventCollectorListener(), new ExecutionOptions(),
+ watcher, pipeline);
+ }
+
+ public FlowRunner createFromFlowFile(final String flowName,
final EventCollectorListener eventCollector,
- final ExecutionOptions options) throws Exception {
+ final ExecutionOptions options, final FlowWatcher watcher, final Integer pipeline)
+ throws Exception {
final ExecutableFlow exFlow = FlowRunnerTestUtil
.prepareExecDir(this.workingDir, this.projectDir, flowName, 1);
- return createFromExecutableFlow(eventCollector, exFlow, options, new HashMap<>(), new Props());
+ if (watcher != null) {
+ options.setPipelineLevel(pipeline);
+ options.setPipelineExecutionId(watcher.getExecId());
+ }
+ final FlowRunner runner = createFromExecutableFlow(eventCollector, exFlow, options,
+ new HashMap<>(),
+ new Props());
+ runner.setFlowWatcher(watcher);
+ return runner;
}
- public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
- final String flowName, final String jobIdPrefix) throws Exception {
- return createFromFlowMap(eventCollector, flowName, jobIdPrefix,
- new ExecutionOptions(), new Props());
+ public FlowRunner createFromFlowMap(final String flowName, final String jobIdPrefix)
+ throws Exception {
+ return createFromFlowMap(flowName, jobIdPrefix, new ExecutionOptions());
}
- public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
- final String flowName, final String jobIdPrefix, final ExecutionOptions options)
+ public FlowRunner createFromFlowMap(final String flowName, final String jobIdPrefix,
+ final ExecutionOptions options)
throws Exception {
- return createFromFlowMap(eventCollector, flowName, jobIdPrefix,
- options, new Props());
+ return createFromFlowMap(flowName, jobIdPrefix, options, new Props());
}
public FlowRunner createFromFlowMap(final String flowName,
@@ -157,30 +202,35 @@ public class FlowRunnerTestUtil {
return createFromFlowMap(null, flowName, null, flowParams, new Props());
}
+ public FlowRunner createFromFlowMap(final String flowName, final ExecutionOptions options,
+ final Map<String, String> flowParams, final Props azkabanProps) throws Exception {
+ return createFromFlowMap(null, flowName, options, flowParams,
+ azkabanProps);
+ }
+
public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
final String flowName, final ExecutionOptions options,
final Map<String, String> flowParams, final Props azkabanProps)
throws Exception {
final Flow flow = this.flowMap.get(flowName);
final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
- return createFromExecutableFlow(eventCollector, exFlow, options, flowParams, azkabanProps);
+ return createFromExecutableFlow(eventCollector, exFlow, options, flowParams,
+ azkabanProps);
}
- public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
- final String flowName, final FailureAction action)
+ public FlowRunner createFromFlowMap(final String flowName, final FailureAction action)
throws Exception {
final ExecutionOptions options = new ExecutionOptions();
options.setFailureAction(action);
- return createFromFlowMap(eventCollector, flowName, options, new HashMap<>(), new Props());
+ return createFromFlowMap(flowName, options, new HashMap<>(), new Props());
}
- private FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
- final String flowName, final String jobIdPrefix, final ExecutionOptions options,
- final Props azkabanProps)
- throws Exception {
+ private FlowRunner createFromFlowMap(final String flowName, final String jobIdPrefix,
+ final ExecutionOptions options, final Props azkabanProps) throws Exception {
final Map<String, String> flowParams = new HashMap<>();
flowParams.put(InteractiveTestJob.JOB_ID_PREFIX, jobIdPrefix);
- return createFromFlowMap(eventCollector, flowName, options, flowParams, azkabanProps);
+ return createFromFlowMap(new EventCollectorListener(), flowName, options, flowParams,
+ azkabanProps);
}
private FlowRunner createFromExecutableFlow(final EventCollectorListener eventCollector,
@@ -194,12 +244,22 @@ public class FlowRunnerTestUtil {
exFlow.setExecutionOptions(options);
}
exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
+ this.executorLoader.uploadExecutableFlow(exFlow);
final FlowRunner runner =
- new FlowRunner(exFlow, this.fakeExecutorLoader, mock(ProjectLoader.class),
+ new FlowRunner(exFlow, this.executorLoader, mock(ProjectLoader.class),
this.jobtypeManager, azkabanProps, null);
if (eventCollector != null) {
runner.addListener(eventCollector);
}
return runner;
}
+
+ public ExecutorLoader getExecutorLoader() {
+ return this.executorLoader;
+ }
+
+ public void setExecutorLoader(final MockExecutorLoader executorLoader) {
+ this.executorLoader = executorLoader;
+ }
+
}