diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index b0aa40e..39f2dfd 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -115,7 +115,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
if (this.workingDir.exists()) {
FileUtils.deleteDirectory(this.workingDir);
}
- this.workingDir.mkdirs();
+ FileUtils.forceMkdir(this.workingDir);
this.jobtypeManager = new JobTypeManager(null, null,
this.getClass().getClassLoader());
final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
@@ -149,7 +149,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testBasicRun() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf");
+ this.runner = createFlowRunner(eventCollector);
// 1. START FLOW
runFlowRunnerInThread(this.runner);
@@ -291,7 +291,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testDisabledNormal() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf");
+ this.runner = createFlowRunner(eventCollector);
final ExecutableFlow flow = this.runner.getExecutableFlow();
flow.getExecutableNode("jobb").setStatus(Status.DISABLED);
((ExecutableFlowBase) flow.getExecutableNode("jobd")).getExecutableNode(
@@ -349,7 +349,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testNormalFailure1() throws Exception {
// Test propagation of KILLED status to embedded flows.
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf");
+ this.runner = createFlowRunner(eventCollector);
// 1. START FLOW
runFlowRunnerInThread(this.runner);
@@ -387,7 +387,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testNormalFailure2() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf");
+ this.runner = createFlowRunner(eventCollector);
// 1. START FLOW
runFlowRunnerInThread(this.runner);
@@ -434,7 +434,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testNormalFailure3() throws Exception {
// Test propagation of CANCELLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf");
+ this.runner = createFlowRunner(eventCollector);
// 1. START FLOW
runFlowRunnerInThread(this.runner);
@@ -491,7 +491,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testFailedFinishingFailure3() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf",
+ this.runner = createFlowRunner(eventCollector,
FailureAction.FINISH_ALL_POSSIBLE);
// 1. START FLOW
@@ -553,7 +553,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf",
+ this.runner = createFlowRunner(eventCollector,
FailureAction.CANCEL_ALL);
// 1. START FLOW
@@ -602,7 +602,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testRetryOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf");
+ this.runner = createFlowRunner(eventCollector);
final ExecutableFlow flow = this.runner.getExecutableFlow();
flow.getExecutableNode("joba").setStatus(Status.DISABLED);
((ExecutableFlowBase) flow.getExecutableNode("jobb")).getExecutableNode(
@@ -688,7 +688,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testCancel() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf",
+ this.runner = createFlowRunner(eventCollector,
FailureAction.CANCEL_ALL);
// 1. START FLOW
@@ -738,7 +738,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
public void testManualCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf");
+ this.runner = createFlowRunner(eventCollector);
// 1. START FLOW
runFlowRunnerInThread(this.runner);
@@ -790,7 +790,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testPause() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf");
+ this.runner = createFlowRunner(eventCollector);
// 1. START FLOW
runFlowRunnerInThread(this.runner);
@@ -874,7 +874,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testPauseKill() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf");
+ this.runner = createFlowRunner(eventCollector);
// 1. START FLOW
runFlowRunnerInThread(this.runner);
@@ -923,7 +923,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testPauseFail() throws Exception {
this.eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(this.eventCollector, "jobf",
+ this.runner = createFlowRunner(this.eventCollector,
FailureAction.FINISH_CURRENTLY_RUNNING);
// 1. START FLOW
@@ -983,7 +983,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testPauseFailFinishAll() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf",
+ this.runner = createFlowRunner(eventCollector,
FailureAction.FINISH_ALL_POSSIBLE);
// 1. START FLOW
@@ -1042,7 +1042,7 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
@Test
public void testPauseFailKill() throws Exception {
final EventCollectorListener eventCollector = new EventCollectorListener();
- this.runner = createFlowRunner(eventCollector, "jobf",
+ this.runner = createFlowRunner(eventCollector,
FailureAction.CANCEL_ALL);
// 1. START FLOW
@@ -1081,17 +1081,9 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
assertThreadShutDown();
}
- private Thread runFlowRunnerInThread(final FlowRunner runner) {
+ private void runFlowRunnerInThread(final FlowRunner runner) {
final Thread thread = new Thread(runner);
thread.start();
- return thread;
- }
-
- private void sleep(final long millisec) {
- try {
- Thread.sleep(millisec);
- } catch (final InterruptedException e) {
- }
}
private void prepareProject(final Project project, final File directory)
@@ -1110,21 +1102,21 @@ public class FlowRunnerTest2 extends FlowRunnerTestBase {
FileUtils.copyDirectory(directory, this.workingDir);
}
- private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
- final String flowName) throws Exception {
- return createFlowRunner(eventCollector, flowName,
+ private FlowRunner createFlowRunner(final EventCollectorListener eventCollector)
+ throws Exception {
+ return createFlowRunner(eventCollector,
FailureAction.FINISH_CURRENTLY_RUNNING);
}
private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
- final String flowName, final FailureAction action) throws Exception {
- return createFlowRunner(eventCollector, flowName, action, new Props());
+ final FailureAction action) throws Exception {
+ return createFlowRunner(eventCollector, action, new Props());
}
private FlowRunner createFlowRunner(final EventCollectorListener eventCollector,
- final String flowName, final FailureAction action, final Props azkabanProps)
+ final FailureAction action, final Props azkabanProps)
throws Exception {
- final Flow flow = this.flowMap.get(flowName);
+ final Flow flow = this.flowMap.get("jobf");
final int exId = id++;
final ExecutableFlow exFlow = new ExecutableFlow(this.project, flow);