Details
diff --git a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
index 506cffd..d886ffd 100644
--- a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
@@ -28,6 +28,7 @@ import org.apache.log4j.Logger;
public class InteractiveTestJob extends AbstractProcessJob {
+ public static final String JOB_ID_PREFIX = "InteractiveTestJob.jobIdPrefix";
private static final ConcurrentHashMap<String, InteractiveTestJob> testJobs =
new ConcurrentHashMap<>();
private static volatile boolean quickSuccess = false;
@@ -83,10 +84,10 @@ public class InteractiveTestJob extends AbstractProcessJob {
public void run() throws Exception {
final String nestedFlowPath =
this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
- final String groupName = this.getJobProps().getString("group", null);
+ final String jobIdPrefix = this.getJobProps().getString(JOB_ID_PREFIX, null);
String id = nestedFlowPath == null ? this.getId() : nestedFlowPath;
- if (groupName != null) {
- id = groupName + ":" + id;
+ if (jobIdPrefix != null) {
+ id = jobIdPrefix + ":" + id;
}
testJobs.put(id, this);
synchronized (testJobs) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 8b37f50..2de4f0e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -16,34 +16,14 @@
package azkaban.execapp;
-import static org.mockito.Mockito.mock;
-
import azkaban.execapp.event.FlowWatcher;
import azkaban.execapp.event.LocalFlowWatcher;
-import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
-import azkaban.executor.JavaJob;
-import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
-import azkaban.flow.Flow;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.jobtype.JobTypePluginSet;
-import azkaban.project.Project;
-import azkaban.project.ProjectLoader;
-import azkaban.test.Utils;
-import azkaban.test.executions.ExecutionsTestUtil;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.utils.Props;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
/**
* Flows in this test: joba jobb joba1 jobc->joba jobd->joba jobe->jobb,jobc,jobd jobf->jobe,joba1
@@ -57,45 +37,18 @@ import org.junit.rules.TemporaryFolder;
*/
public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
- private static int id = 101;
- private final AzkabanEventReporter azkabanEventReporter = null;
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
- private File workingDir;
- private JobTypeManager jobtypeManager;
- private ExecutorLoader fakeExecutorLoader;
- private Project project;
- private Map<String, Flow> flowMap;
-
- public FlowRunnerPipelineTest() {
- }
+ private FlowRunnerTestUtil testUtil;
@Before
public void setUp() throws Exception {
- this.workingDir = this.temporaryFolder.newFolder();
- this.jobtypeManager =
- new JobTypeManager(null, null, this.getClass().getClassLoader());
- final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
-
- pluginSet.addPluginClass("java", JavaJob.class);
- pluginSet.addPluginClass("test", InteractiveTestJob.class);
- this.fakeExecutorLoader = new MockExecutorLoader();
- this.project = new Project(1, "testProject");
- Utils.initServiceProvider();
- JmxJobMBeanManager.getInstance().initialize(new Props());
-
- final File dir = ExecutionsTestUtil.getFlowDir("embedded2");
- this.flowMap = FlowRunnerTestUtil
- .prepareProject(this.project, dir, this.workingDir);
-
- InteractiveTestJob.clearTestJobs();
+ this.testUtil = new FlowRunnerTestUtil("embedded2", this.temporaryFolder);
}
@Test
public void testBasicPipelineLevel1RunDisabledJobs() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
final FlowRunner previousRunner =
- createFlowRunner(eventCollector, "jobf", "prev");
+ this.testUtil.createFromFlowMap(eventCollector, "jobf", "prev");
final ExecutionOptions options = new ExecutionOptions();
options.setPipelineExecutionId(previousRunner.getExecutableFlow()
@@ -103,7 +56,7 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
options.setPipelineLevel(1);
final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
final FlowRunner pipelineRunner =
- createFlowRunner(eventCollector, "jobf", "pipe", options);
+ this.testUtil.createFromFlowMap(eventCollector, "jobf", "pipe", options);
pipelineRunner.setFlowWatcher(watcher);
// 1. START FLOW
@@ -112,12 +65,12 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
// disable the innerFlow (entire sub-flow)
previousFlow.getExecutableNodePath("jobb").setStatus(Status.DISABLED);
- runFlowRunnerInThread(previousRunner);
+ FlowRunnerTestUtil.startThread(previousRunner);
assertStatus(previousFlow, "joba", Status.RUNNING);
assertStatus(previousFlow, "joba", Status.RUNNING);
assertStatus(previousFlow, "joba1", Status.RUNNING);
- runFlowRunnerInThread(pipelineRunner);
+ FlowRunnerTestUtil.startThread(pipelineRunner);
assertStatus(pipelineFlow, "joba", Status.QUEUED);
assertStatus(pipelineFlow, "joba1", Status.QUEUED);
@@ -214,7 +167,7 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
public void testBasicPipelineLevel1Run() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
final FlowRunner previousRunner =
- createFlowRunner(eventCollector, "jobf", "prev");
+ this.testUtil.createFromFlowMap(eventCollector, "jobf", "prev");
final ExecutionOptions options = new ExecutionOptions();
options.setPipelineExecutionId(previousRunner.getExecutableFlow()
@@ -222,19 +175,19 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
options.setPipelineLevel(1);
final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
final FlowRunner pipelineRunner =
- createFlowRunner(eventCollector, "jobf", "pipe", options);
+ this.testUtil.createFromFlowMap(eventCollector, "jobf", "pipe", options);
pipelineRunner.setFlowWatcher(watcher);
// 1. START FLOW
final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
- runFlowRunnerInThread(previousRunner);
+ FlowRunnerTestUtil.startThread(previousRunner);
assertStatus(previousFlow, "joba", Status.RUNNING);
assertStatus(previousFlow, "joba", Status.RUNNING);
assertStatus(previousFlow, "joba1", Status.RUNNING);
- runFlowRunnerInThread(pipelineRunner);
+ FlowRunnerTestUtil.startThread(pipelineRunner);
assertStatus(pipelineFlow, "joba", Status.QUEUED);
assertStatus(pipelineFlow, "joba1", Status.QUEUED);
@@ -335,7 +288,7 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
public void testBasicPipelineLevel2Run() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
final FlowRunner previousRunner =
- createFlowRunner(eventCollector, "pipelineFlow", "prev");
+ this.testUtil.createFromFlowMap(eventCollector, "pipelineFlow", "prev");
final ExecutionOptions options = new ExecutionOptions();
options.setPipelineExecutionId(previousRunner.getExecutableFlow()
@@ -343,17 +296,17 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
options.setPipelineLevel(2);
final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
final FlowRunner pipelineRunner =
- createFlowRunner(eventCollector, "pipelineFlow", "pipe", options);
+ this.testUtil.createFromFlowMap(eventCollector, "pipelineFlow", "pipe", options);
pipelineRunner.setFlowWatcher(watcher);
// 1. START FLOW
final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
- runFlowRunnerInThread(previousRunner);
+ FlowRunnerTestUtil.startThread(previousRunner);
assertStatus(previousFlow, "pipeline1", Status.RUNNING);
- runFlowRunnerInThread(pipelineRunner);
+ FlowRunnerTestUtil.startThread(pipelineRunner);
assertStatus(pipelineFlow, "pipeline1", Status.QUEUED);
InteractiveTestJob.getTestJob("prev:pipeline1").succeedJob();
@@ -467,7 +420,7 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
public void testBasicPipelineLevel2Run2() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
final FlowRunner previousRunner =
- createFlowRunner(eventCollector, "pipeline1_2", "prev");
+ this.testUtil.createFromFlowMap(eventCollector, "pipeline1_2", "prev");
final ExecutionOptions options = new ExecutionOptions();
options.setPipelineExecutionId(previousRunner.getExecutableFlow()
@@ -475,18 +428,18 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
options.setPipelineLevel(2);
final FlowWatcher watcher = new LocalFlowWatcher(previousRunner);
final FlowRunner pipelineRunner =
- createFlowRunner(eventCollector, "pipeline1_2", "pipe", options);
+ this.testUtil.createFromFlowMap(eventCollector, "pipeline1_2", "pipe", options);
pipelineRunner.setFlowWatcher(watcher);
// 1. START FLOW
final ExecutableFlow pipelineFlow = pipelineRunner.getExecutableFlow();
final ExecutableFlow previousFlow = previousRunner.getExecutableFlow();
- runFlowRunnerInThread(previousRunner);
+ FlowRunnerTestUtil.startThread(previousRunner);
assertStatus(previousFlow, "pipeline1_1", Status.RUNNING);
assertStatus(previousFlow, "pipeline1_1:innerJobA", Status.RUNNING);
- runFlowRunnerInThread(pipelineRunner);
+ FlowRunnerTestUtil.startThread(pipelineRunner);
assertStatus(pipelineFlow, "pipeline1_1", Status.RUNNING);
assertStatus(pipelineFlow, "pipeline1_1:innerJobA", Status.QUEUED);
@@ -540,48 +493,4 @@ public class FlowRunnerPipelineTest extends FlowRunnerTestBase {
assertThreadShutDown(pipelineRunner);
}
- private void runFlowRunnerInThread(final FlowRunner runner) {
- final Thread thread = new Thread(runner);
- thread.start();
- }
-
- private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
- final String flowName, final String groupName) throws Exception {
- return createFlowRunner(eventCollector, flowName, groupName,
- new ExecutionOptions(), new Props());
- }
-
- private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
- final String flowName, final String groupName, final ExecutionOptions options)
- throws Exception {
- return createFlowRunner(eventCollector, flowName, groupName,
- options, new Props());
- }
-
- private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
- final String flowName, final String groupName, final ExecutionOptions options,
- final Props azkabanProps)
- throws Exception {
- final Flow flow = this.flowMap.get(flowName);
-
- final int exId = id++;
- final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
- exFlow.setExecutionPath(this.workingDir.getPath());
- exFlow.setExecutionId(exId);
-
- final Map<String, String> flowParam = new HashMap<>();
- flowParam.put("group", groupName);
- options.addAllFlowParameters(flowParam);
- exFlow.setExecutionOptions(options);
- this.fakeExecutorLoader.uploadExecutableFlow(exFlow);
-
- final FlowRunner runner =
- new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
- this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
- this.azkabanEventReporter);
- runner.addListener(eventCollector);
-
- return runner;
- }
-
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index ff82221..8c2feb9 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -16,30 +16,16 @@
package azkaban.execapp;
-import static org.mockito.Mockito.mock;
-
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
-import azkaban.executor.JavaJob;
-import azkaban.executor.MockExecutorLoader;
-import azkaban.flow.Flow;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.project.Project;
-import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
+import azkaban.executor.Status;
import azkaban.utils.Props;
-import java.io.File;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
/**
@@ -57,70 +43,36 @@ import org.junit.Test;
*
* job2 and 4 are in nested directories so should have different shared properties than other jobs.
*/
-public class FlowRunnerPropertyResolutionTest {
+public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {
- private static int id = 101;
- private final AzkabanEventReporter azkabanEventReporter = null;
- private File workingDir;
- private JobTypeManager jobtypeManager;
- private ExecutorLoader fakeExecutorLoader;
- private Project project;
- private Map<String, Flow> flowMap;
+ private FlowRunnerTestUtil testUtil;
@Before
public void setUp() throws Exception {
- System.out.println("Create temp dir");
- this.workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
- if (this.workingDir.exists()) {
- FileUtils.deleteDirectory(this.workingDir);
- }
- this.workingDir.mkdirs();
- this.jobtypeManager =
- new JobTypeManager(null, null, this.getClass().getClassLoader());
- this.jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
- this.jobtypeManager.getJobTypePluginSet().addPluginClass("test",
- InteractiveTestJob.class);
- this.fakeExecutorLoader = new MockExecutorLoader();
- this.project = new Project(1, "testProject");
-
- final File dir = new File("unit/executions/execpropstest");
- this.flowMap = FlowRunnerTestUtil
- .prepareProject(this.project, dir, this.workingDir);
-
- InteractiveTestJob.clearTestJobs();
- }
-
- @After
- public void tearDown() throws IOException {
- System.out.println("Teardown temp dir");
- if (this.workingDir != null) {
- FileUtils.deleteDirectory(this.workingDir);
- this.workingDir = null;
- }
+ this.testUtil = new FlowRunnerTestUtil("execpropstest", this.temporaryFolder);
}
/**
* Tests the basic flow resolution. Flow is defined in execpropstest
*/
- @Ignore
@Test
public void testPropertyResolution() throws Exception {
final HashMap<String, String> flowProps = new HashMap<>();
flowProps.put("props7", "flow7");
flowProps.put("props6", "flow6");
flowProps.put("props5", "flow5");
- final FlowRunner runner = createFlowRunner("job3", flowProps);
+ final FlowRunner runner = this.testUtil.createFromFlowMap("job3", flowProps);
final Map<String, ExecutableNode> nodeMap = new HashMap<>();
createNodeMap(runner.getExecutableFlow(), nodeMap);
+ final ExecutableFlow flow = runner.getExecutableFlow();
// 1. Start flow. Job 2 should start
- runFlowRunnerInThread(runner);
- pause(250);
+ FlowRunnerTestUtil.startThread(runner);
+ assertStatus(flow, "job2", Status.RUNNING);
// Job 2 is a normal job.
// Only the flow overrides and the shared properties matter
- ExecutableNode node = nodeMap.get("job2");
- final Props job2Props = node.getInputProps();
+ final Props job2Props = nodeMap.get("job2").getInputProps();
Assert.assertEquals("shared1", job2Props.get("props1"));
Assert.assertEquals("job2", job2Props.get("props2"));
Assert.assertEquals("moo3", job2Props.get("props3"));
@@ -138,9 +90,9 @@ public class FlowRunnerPropertyResolutionTest {
job2Generated.put("props9", "gjob9");
job2Generated.put("props10", "gjob10");
InteractiveTestJob.getTestJob("job2").succeedJob(job2Generated);
- pause(250);
- node = nodeMap.get("innerflow:job1");
- final Props job1Props = node.getInputProps();
+ assertStatus(flow, "innerflow:job1", Status.RUNNING);
+
+ final Props job1Props = nodeMap.get("innerflow:job1").getInputProps();
Assert.assertEquals("job1", job1Props.get("props1"));
Assert.assertEquals("job2", job1Props.get("props2"));
Assert.assertEquals("job8", job1Props.get("props8"));
@@ -161,9 +113,8 @@ public class FlowRunnerPropertyResolutionTest {
job1GeneratedProps.put("props7", "g2job7");
InteractiveTestJob.getTestJob("innerflow:job1").succeedJob(
job1GeneratedProps);
- pause(250);
- node = nodeMap.get("innerflow:job4");
- final Props job4Props = node.getInputProps();
+ assertStatus(flow, "innerflow:job4", Status.RUNNING);
+ final Props job4Props = nodeMap.get("innerflow:job4").getInputProps();
Assert.assertEquals("job8", job4Props.get("props8"));
Assert.assertEquals("job9", job4Props.get("props9"));
Assert.assertEquals("g2job7", job4Props.get("props7"));
@@ -183,9 +134,8 @@ public class FlowRunnerPropertyResolutionTest {
job4GeneratedProps.put("props6", "g4job6");
InteractiveTestJob.getTestJob("innerflow:job4").succeedJob(
job4GeneratedProps);
- pause(250);
- node = nodeMap.get("job3");
- final Props job3Props = node.getInputProps();
+ assertStatus(flow, "job3", Status.RUNNING);
+ final Props job3Props = nodeMap.get("job3").getInputProps();
Assert.assertEquals("job3", job3Props.get("props3"));
Assert.assertEquals("g4job6", job3Props.get("props6"));
Assert.assertEquals("g4job9", job3Props.get("props9"));
@@ -196,30 +146,6 @@ public class FlowRunnerPropertyResolutionTest {
Assert.assertEquals("moo4", job3Props.get("props4"));
}
- private FlowRunner createFlowRunner(final String flowName,
- final HashMap<String, String> flowParams) throws Exception {
- return createFlowRunner(flowName, flowParams, new Props());
- }
-
- private FlowRunner createFlowRunner(final String flowName,
- final HashMap<String, String> flowParams, final Props azkabanProps) throws Exception {
- final Flow flow = this.flowMap.get(flowName);
-
- final int exId = id++;
- final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
- exFlow.setExecutionPath(this.workingDir.getPath());
- exFlow.setExecutionId(exId);
-
- exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
- this.fakeExecutorLoader.uploadExecutableFlow(exFlow);
-
- final FlowRunner runner =
- new FlowRunner(this.fakeExecutorLoader.fetchExecutableFlow(exId),
- this.fakeExecutorLoader, mock(ProjectLoader.class), this.jobtypeManager, azkabanProps,
- this.azkabanEventReporter);
- return runner;
- }
-
private void createNodeMap(final ExecutableFlowBase flow,
final Map<String, ExecutableNode> nodeMap) {
for (final ExecutableNode node : flow.getExecutableNodes()) {
@@ -230,17 +156,4 @@ public class FlowRunnerPropertyResolutionTest {
}
}
}
-
- private Thread runFlowRunnerInThread(final FlowRunner runner) {
- final Thread thread = new Thread(runner);
- thread.start();
- return thread;
- }
-
- private void pause(final long millisec) {
- try {
- Thread.sleep(millisec);
- } catch (final InterruptedException e) {
- }
- }
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index 64128df..e534171 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -16,62 +16,24 @@
package azkaban.execapp;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.when;
-
-import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
-import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.Status;
-import azkaban.jobExecutor.AllJobExecutorTests;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.jobtype.JobTypePluginSet;
-import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
import azkaban.spi.EventType;
-import azkaban.test.Utils;
-import azkaban.test.executions.ExecutionsTestUtil;
-import azkaban.utils.Props;
-import java.io.File;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
public class FlowRunnerTest extends FlowRunnerTestBase {
- private static final File TEST_DIR = ExecutionsTestUtil.getFlowDir("exectest1");
- private final AzkabanEventReporter azkabanEventReporter = null;
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
- private File workingDir;
- private JobTypeManager jobtypeManager;
- @Mock
- private ProjectLoader fakeProjectLoader;
- @Mock
- private ExecutorLoader loader;
+ private FlowRunnerTestUtil testUtil;
@Before
public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- when(this.loader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
- this.workingDir = this.temporaryFolder.newFolder();
- this.jobtypeManager =
- new JobTypeManager(null, null, this.getClass().getClassLoader());
- final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
- pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
- pluginSet.addPluginClass("test", InteractiveTestJob.class);
- Utils.initServiceProvider();
- JmxJobMBeanManager.getInstance().initialize(new Props());
-
- InteractiveTestJob.clearTestJobs();
+ this.testUtil = new FlowRunnerTestUtil("exectest1", this.temporaryFolder);
}
@Test
@@ -79,9 +41,9 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
- this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
+ this.runner = this.testUtil.createFromFlowFile(eventCollector, "exec1");
- startThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
succeedJobs("job3", "job4", "job6");
waitForAndAssertFlowStatus(Status.SUCCEEDED);
@@ -106,8 +68,9 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
- final ExecutableFlow exFlow = FlowRunnerTestUtil
- .prepareExecDir(this.workingDir, TEST_DIR, "exec1", 1);
+
+ this.runner = this.testUtil.createFromFlowFile(eventCollector, "exec1");
+ final ExecutableFlow exFlow = this.runner.getExecutableFlow();
// Disable couple in the middle and at the end.
exFlow.getExecutableNode("job1").setStatus(Status.DISABLED);
@@ -115,12 +78,10 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
exFlow.getExecutableNode("job5").setStatus(Status.DISABLED);
exFlow.getExecutableNode("job10").setStatus(Status.DISABLED);
- this.runner = createFlowRunner(exFlow, this.loader, eventCollector);
-
Assert.assertTrue(!this.runner.isKilled());
waitForAndAssertFlowStatus(Status.READY);
- startThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
succeedJobs("job3", "job4");
assertThreadShutDown();
@@ -146,12 +107,10 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
- final ExecutableFlow flow = FlowRunnerTestUtil
- .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
- this.runner = createFlowRunner(flow, this.loader, eventCollector);
+ this.runner = this.testUtil.createFromFlowFile(eventCollector, "exec2");
- startThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
succeedJobs("job6");
Assert.assertTrue(!this.runner.isKilled());
@@ -177,13 +136,12 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
- final ExecutableFlow flow = FlowRunnerTestUtil
- .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
- flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
+ final ExecutionOptions options = new ExecutionOptions();
+ options.setFailureAction(FailureAction.CANCEL_ALL);
- this.runner = createFlowRunner(flow, this.loader, eventCollector);
+ this.runner = this.testUtil.createFromFlowFile("exec2", eventCollector, options);
- startThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
assertThreadShutDown();
Assert.assertTrue(this.runner.isKilled());
@@ -209,13 +167,11 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
- final ExecutableFlow flow = FlowRunnerTestUtil
- .prepareExecDir(this.workingDir, TEST_DIR, "exec3", 1);
- flow.getExecutionOptions().setFailureAction(
- FailureAction.FINISH_ALL_POSSIBLE);
- this.runner = createFlowRunner(flow, this.loader, eventCollector);
+ final ExecutionOptions options = new ExecutionOptions();
+ options.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+ this.runner = this.testUtil.createFromFlowFile("exec3", eventCollector, options);
- startThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
succeedJobs("job3");
waitForAndAssertFlowStatus(Status.FAILED);
@@ -240,9 +196,9 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
- this.runner = createFlowRunner(this.loader, eventCollector, "exec1");
+ this.runner = this.testUtil.createFromFlowFile(eventCollector, "exec1");
- startThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
assertStatus("job1", Status.SUCCEEDED);
assertStatus("job2", Status.SUCCEEDED);
@@ -275,9 +231,9 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(EventType.JOB_FINISHED,
EventType.JOB_STARTED, EventType.JOB_STATUS_CHANGED);
- this.runner = createFlowRunner(this.loader, eventCollector, "exec4-retry");
+ this.runner = this.testUtil.createFromFlowFile(eventCollector, "exec4-retry");
- startThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
assertThreadShutDown();
assertStatus("job-retry", Status.SUCCEEDED);
@@ -290,12 +246,6 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
waitForAndAssertFlowStatus(Status.FAILED);
}
- private void startThread(final FlowRunner runner) {
- Assert.assertTrue(!runner.isKilled());
- final Thread thread = new Thread(runner);
- thread.start();
- }
-
private void assertAttempts(final String name, final int attempt) {
final ExecutableNode node = this.runner.getExecutableFlow().getExecutableNode(name);
if (node.getAttempt() != attempt) {
@@ -333,46 +283,4 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
compareStartFinishTimes(flow, childNode, endTime);
}
}
-
- private FlowRunner createFlowRunner(final ExecutableFlow flow,
- final ExecutorLoader loader, final EventCollectorListener eventCollector) throws Exception {
- return createFlowRunner(flow, loader, eventCollector, new Props());
- }
-
- private FlowRunner createFlowRunner(final ExecutableFlow flow,
- final ExecutorLoader loader, final EventCollectorListener eventCollector,
- final Props azkabanProps)
- throws Exception {
-
- loader.uploadExecutableFlow(flow);
- final FlowRunner runner =
- new FlowRunner(flow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
- this.azkabanEventReporter);
-
- runner.addListener(eventCollector);
-
- return runner;
- }
-
- private FlowRunner createFlowRunner(final ExecutorLoader loader,
- final EventCollectorListener eventCollector, final String flowName) throws Exception {
- return createFlowRunner(loader, eventCollector, flowName, new Props());
- }
-
- private FlowRunner createFlowRunner(final ExecutorLoader loader,
- final EventCollectorListener eventCollector, final String flowName, final Props azkabanProps)
- throws Exception {
- final ExecutableFlow exFlow = FlowRunnerTestUtil
- .prepareExecDir(this.workingDir, TEST_DIR, flowName, 1);
-
- loader.uploadExecutableFlow(exFlow);
-
- final FlowRunner runner =
- new FlowRunner(exFlow, loader, this.fakeProjectLoader, this.jobtypeManager, azkabanProps,
- this.azkabanEventReporter);
-
- runner.addListener(eventCollector);
-
- return runner;
- }
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 0790935..54db1e6 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -17,35 +17,19 @@
package azkaban.execapp;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutionOptions.FailureAction;
-import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
-import azkaban.executor.JavaJob;
-import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
-import azkaban.flow.Flow;
-import azkaban.jobExecutor.AllJobExecutorTests;
-import azkaban.jobtype.JobTypeManager;
-import azkaban.jobtype.JobTypePluginSet;
-import azkaban.project.Project;
-import azkaban.project.ProjectLoader;
-import azkaban.spi.AzkabanEventReporter;
-import azkaban.test.Utils;
-import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
-import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
/**
* Test the flow run, especially with embedded flows.
@@ -95,36 +79,11 @@ import org.junit.rules.TemporaryFolder;
*/
public class FlowRunnerTest2 extends FlowRunnerTestBase {
- private static int id = 101;
- private final AzkabanEventReporter azkabanEventReporter = null;
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
- private File workingDir;
- private JobTypeManager jobtypeManager;
- private ExecutorLoader fakeExecutorLoader;
- private Project project;
- private Map<String, Flow> flowMap;
+ private FlowRunnerTestUtil testUtil;
@Before
public void setUp() throws Exception {
- this.workingDir = this.temporaryFolder.newFolder();
- this.jobtypeManager = new JobTypeManager(null, null,
- this.getClass().getClassLoader());
- final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
-
- pluginSet.setCommonPluginLoadProps(AllJobExecutorTests.setUpCommonProps());
- pluginSet.addPluginClass("java", JavaJob.class);
- pluginSet.addPluginClass("test", InteractiveTestJob.class);
- this.fakeExecutorLoader = new MockExecutorLoader();
- this.project = new Project(1, "testProject");
- Utils.initServiceProvider();
- JmxJobMBeanManager.getInstance().initialize(new Props());
-
- this.flowMap = FlowRunnerTestUtil
- .prepareProject(this.project, ExecutionsTestUtil.getFlowDir("embedded2"),
- this.workingDir);
-
- InteractiveTestJob.clearTestJobs();
+ this.testUtil = new FlowRunnerTestUtil("embedded2", this.temporaryFolder);
}
/**
@@ -133,10 +92,20 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testBasicRun() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector);
+
+ final Map<String, String> flowParams = new HashMap<>();
+ flowParams.put("param4", "override.4");
+ flowParams.put("param10", "override.10");
+ flowParams.put("param11", "override.11");
+
+ final ExecutionOptions options = new ExecutionOptions();
+ options.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
+
+ this.runner = this.testUtil
+ .createFromFlowMap(eventCollector, "jobf", options, flowParams, new Props());
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -275,14 +244,15 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testDisabledNormal() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector);
+ this.runner = this.testUtil
+ .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
final ExecutableFlow flow = this.runner.getExecutableFlow();
flow.getExecutableNode("jobb").setStatus(Status.DISABLED);
((ExecutableFlowBase) flow.getExecutableNode("jobd")).getExecutableNode(
"innerJobA").setStatus(Status.DISABLED);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -333,10 +303,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testNormalFailure1() throws Exception {
// Test propagation of KILLED status to embedded flows.
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector);
+ this.runner = this.testUtil
+ .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -371,10 +342,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testNormalFailure2() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector);
+ this.runner = this.testUtil
+ .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -419,10 +391,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testNormalFailure3() throws Exception {
// Test propagation of CANCELLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector);
+ this.runner = this.testUtil
+ .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -476,11 +449,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testFailedFinishingFailure3() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector,
+ this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
FailureAction.FINISH_ALL_POSSIBLE);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -538,11 +511,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector,
+ this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
FailureAction.CANCEL_ALL);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -587,14 +560,15 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testRetryOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector);
+ this.runner = this.testUtil
+ .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
final ExecutableFlow flow = this.runner.getExecutableFlow();
flow.getExecutableNode("joba").setStatus(Status.DISABLED);
((ExecutableFlowBase) flow.getExecutableNode("jobb")).getExecutableNode(
"innerFlow").setStatus(Status.DISABLED);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
assertStatus("joba", Status.SKIPPED);
assertStatus("joba1", Status.RUNNING);
@@ -675,11 +649,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testCancel() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector,
+ this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
FailureAction.CANCEL_ALL);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -725,10 +699,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testManualCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector);
+ this.runner = this.testUtil
+ .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -777,10 +752,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testPause() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector);
+ this.runner = this.testUtil
+ .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -861,10 +837,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testPauseKill() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector);
+ this.runner = this.testUtil
+ .createFromFlowMap(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -910,11 +887,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testPauseFail() throws Exception {
this.eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(this.eventCollector,
+ this.runner = this.testUtil.createFromFlowMap(this.eventCollector, "jobf",
FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -970,11 +947,11 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testPauseFailFinishAll() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector,
+ this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
FailureAction.FINISH_ALL_POSSIBLE);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
@@ -1029,10 +1006,10 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testFlowKilledByJobLevelSLA() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector,
+ this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf",
FailureAction.CANCEL_ALL);
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
@@ -1054,11 +1031,10 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testPauseFailKill() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector,
- FailureAction.CANCEL_ALL);
+ this.runner = this.testUtil.createFromFlowMap(eventCollector, "jobf", FailureAction.CANCEL_ALL);
// 1. START FLOW
- runFlowRunnerInThread(this.runner);
+ FlowRunnerTestUtil.startThread(this.runner);
// After it starts up, only joba should be running
assertStatus("joba", Status.RUNNING);
assertStatus("joba1", Status.RUNNING);
@@ -1093,46 +1069,4 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertThreadShutDown();
}
- private void runFlowRunnerInThread(final FlowRunner runner) {
- final Thread thread = new Thread(runner);
- thread.start();
- }
-
- private FlowRunner createFlowRunner(final EventCollectorListener eventCollector)
- throws Exception {
- return createFlowRunner(eventCollector,
- FailureAction.FINISH_CURRENTLY_RUNNING);
- }
-
- private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
- final FailureAction action) throws Exception {
- return createFlowRunner(eventCollector, action, new Props());
- }
-
- private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
- final FailureAction action, final Props azkabanProps)
- throws Exception {
- final Flow flow = this.flowMap.get("jobf");
-
- final int exId = id++;
- final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
- exFlow.setExecutionPath(this.workingDir.getPath());
- exFlow.setExecutionId(exId);
-
- final Map<String, String> flowParam = new HashMap<>();
- flowParam.put("param4", "override.4");
- flowParam.put("param10", "override.10");
- flowParam.put("param11", "override.11");
- exFlow.getExecutionOptions().addAllFlowParameters(flowParam);
- exFlow.getExecutionOptions().setFailureAction(action);
- this.fakeExecutorLoader.uploadExecutableFlow(exFlow);
-
- final FlowRunner runner = new FlowRunner(
- this.fakeExecutorLoader.fetchExecutableFlow(exId), this.fakeExecutorLoader,
- mock(ProjectLoader.class), this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
- runner.addListener(eventCollector);
-
- return runner;
- }
-
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
index 029cbf3..c1aa7a2 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestBase.java
@@ -28,9 +28,14 @@ import azkaban.executor.InteractiveTestJob;
import azkaban.executor.Status;
import java.util.function.Function;
import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
public class FlowRunnerTestBase {
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
protected FlowRunner runner;
protected EventCollectorListener eventCollector;
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index 8e0d9c9..27158ab 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -16,20 +16,68 @@
package azkaban.execapp;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+import azkaban.executor.ExecutorLoader;
+import azkaban.executor.InteractiveTestJob;
import azkaban.flow.Flow;
+import azkaban.jobtype.JobTypeManager;
+import azkaban.jobtype.JobTypePluginSet;
import azkaban.project.DirectoryFlowLoader;
import azkaban.project.Project;
+import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
+import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.junit.rules.TemporaryFolder;
public class FlowRunnerTestUtil {
+ private static int id = 101;
+ private final Map<String, Flow> flowMap;
+ private final Project project;
+ private final File workingDir;
+ private final ExecutorLoader fakeExecutorLoader;
+ private final JobTypeManager jobtypeManager;
+ private final File projectDir;
+
+ public FlowRunnerTestUtil(final String flowName, final TemporaryFolder temporaryFolder)
+ throws Exception {
+
+ this.projectDir = ExecutionsTestUtil.getFlowDir(flowName);
+ this.workingDir = temporaryFolder.newFolder();
+ this.project = new Project(1, "testProject");
+
+ this.flowMap = FlowRunnerTestUtil
+ .prepareProject(this.project, ExecutionsTestUtil.getFlowDir(flowName), this.workingDir);
+
+ this.fakeExecutorLoader = mock(ExecutorLoader.class);
+ when(this.fakeExecutorLoader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
+
+ Utils.initServiceProvider();
+ JmxJobMBeanManager.getInstance().initialize(new Props());
+
+ InteractiveTestJob.clearTestJobs();
+
+ this.jobtypeManager = new JobTypeManager(null, null, this.getClass().getClassLoader());
+ final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
+ pluginSet.addPluginClass("test", InteractiveTestJob.class);
+ }
+
/**
* Initialize the project with the flow definitions stored in the given source directory. Also
* copy the source directory to the working directory.
@@ -58,7 +106,6 @@ public class FlowRunnerTestUtil {
final Map<String, Flow> flowMap = loader.getFlowMap();
project.setFlows(flowMap);
FileUtils.copyDirectory(sourceDir, workingDir);
-
return flowMap;
}
@@ -75,4 +122,84 @@ public class FlowRunnerTestUtil {
return execFlow;
}
+ public static void startThread(final FlowRunner runner) {
+ new Thread(runner).start();
+ }
+
+ public FlowRunner createFromFlowFile(final EventCollectorListener eventCollector,
+ final String flowName) throws Exception {
+ return createFromFlowFile(flowName, eventCollector, new ExecutionOptions());
+ }
+
+ public FlowRunner createFromFlowFile(final String flowName,
+ final EventCollectorListener eventCollector,
+ final ExecutionOptions options) throws Exception {
+ final ExecutableFlow exFlow = FlowRunnerTestUtil
+ .prepareExecDir(this.workingDir, this.projectDir, flowName, 1);
+ return createFromExecutableFlow(eventCollector, exFlow, options, new HashMap<>(), new Props());
+ }
+
+ public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
+ final String flowName, final String jobIdPrefix) throws Exception {
+ return createFromFlowMap(eventCollector, flowName, jobIdPrefix,
+ new ExecutionOptions(), new Props());
+ }
+
+ public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
+ final String flowName, final String jobIdPrefix, final ExecutionOptions options)
+ throws Exception {
+ return createFromFlowMap(eventCollector, flowName, jobIdPrefix,
+ options, new Props());
+ }
+
+ public FlowRunner createFromFlowMap(final String flowName,
+ final HashMap<String, String> flowParams) throws Exception {
+ return createFromFlowMap(null, flowName, null, flowParams, new Props());
+ }
+
+ public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
+ final String flowName, final ExecutionOptions options,
+ final Map<String, String> flowParams, final Props azkabanProps)
+ throws Exception {
+ final Flow flow = this.flowMap.get(flowName);
+ final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);
+ return createFromExecutableFlow(eventCollector, exFlow, options, flowParams, azkabanProps);
+ }
+
+ public FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
+ final String flowName, final FailureAction action)
+ throws Exception {
+ final ExecutionOptions options = new ExecutionOptions();
+ options.setFailureAction(action);
+ return createFromFlowMap(eventCollector, flowName, options, new HashMap<>(), new Props());
+ }
+
+ private FlowRunner createFromFlowMap(final EventCollectorListener eventCollector,
+ final String flowName, final String jobIdPrefix, final ExecutionOptions options,
+ final Props azkabanProps)
+ throws Exception {
+ final Map<String, String> flowParams = new HashMap<>();
+ flowParams.put(InteractiveTestJob.JOB_ID_PREFIX, jobIdPrefix);
+ return createFromFlowMap(eventCollector, flowName, options, flowParams, azkabanProps);
+ }
+
+ private FlowRunner createFromExecutableFlow(final EventCollectorListener eventCollector,
+ final ExecutableFlow exFlow, final ExecutionOptions options,
+ final Map<String, String> flowParams, final Props azkabanProps)
+ throws Exception {
+ final int exId = id++;
+ exFlow.setExecutionPath(this.workingDir.getPath());
+ exFlow.setExecutionId(exId);
+ if (options != null) {
+ exFlow.setExecutionOptions(options);
+ }
+ exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
+ final FlowRunner runner =
+ new FlowRunner(exFlow, this.fakeExecutorLoader, mock(ProjectLoader.class),
+ this.jobtypeManager, azkabanProps, null);
+ if (eventCollector != null) {
+ runner.addListener(eventCollector);
+ }
+ return runner;
+ }
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
index e7f0bcd..f177c65 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/JobRunnerTest.java
@@ -16,6 +16,10 @@
package azkaban.execapp;
+import static java.lang.Thread.State.TIMED_WAITING;
+import static java.lang.Thread.State.WAITING;
+import static org.assertj.core.api.Assertions.assertThat;
+
import azkaban.event.Event;
import azkaban.event.EventData;
import azkaban.executor.ExecutableFlow;
@@ -29,6 +33,10 @@ import azkaban.jobtype.JobTypePluginSet;
import azkaban.spi.EventType;
import azkaban.test.TestUtils;
import azkaban.utils.Props;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.junit.After;
@@ -36,15 +44,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.concurrent.TimeUnit;
-
-import static java.lang.Thread.State.TIMED_WAITING;
-import static java.lang.Thread.State.WAITING;
-import static org.assertj.core.api.Assertions.assertThat;
-
public class JobRunnerTest {
private final Logger logger = Logger.getLogger("JobRunnerTest");
@@ -211,8 +210,7 @@ public class JobRunnerTest {
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED
|| runner.getStatus() != Status.FAILED);
- final Thread thread = new Thread(runner);
- thread.start();
+ final Thread thread = startThread(runner);
StatusTestUtils.waitForStatus(node, Status.RUNNING);
runner.kill();
@@ -250,8 +248,7 @@ public class JobRunnerTest {
eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
- final Thread thread = new Thread(runner);
- thread.start();
+ final Thread thread = startThread(runner);
// wait for job to get into delayExecution() -> wait()
assertThreadIsWaiting(thread);
@@ -293,8 +290,7 @@ public class JobRunnerTest {
eventCollector.handleEvent(Event.create(null, EventType.JOB_STARTED, new EventData(node)));
Assert.assertTrue(runner.getStatus() != Status.SUCCEEDED);
- final Thread thread = new Thread(runner);
- thread.start();
+ final Thread thread = startThread(runner);
StatusTestUtils.waitForStatus(node, Status.READY);
// wait for job to get into delayExecution() -> wait()
@@ -369,4 +365,9 @@ public class JobRunnerTest {
}
}
+ private Thread startThread(final JobRunner runner) {
+ final Thread thread = new Thread(runner);
+ thread.start();
+ return thread;
+ }
}