Details
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index 7597bd5..681acd5 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock;
import azkaban.execapp.EventCollectorListener;
import azkaban.execapp.FlowRunner;
+import azkaban.execapp.FlowRunnerTestUtil;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
@@ -27,15 +28,11 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.JavaJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
-import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
-import azkaban.project.Project;
import azkaban.project.ProjectLoader;
-import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
-import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
@@ -247,7 +244,7 @@ public class LocalFlowWatcherTest {
throws Exception {
final File testDir = new File("unit/executions/exectest1");
final ExecutableFlow exFlow =
- prepareExecDir(workingDir, testDir, flowName, execId);
+ FlowRunnerTestUtil.prepareExecDir(workingDir, testDir, flowName, execId);
final ExecutionOptions option = exFlow.getExecutionOptions();
if (watcher != null) {
option.setPipelineLevel(pipeline);
@@ -262,19 +259,4 @@ public class LocalFlowWatcherTest {
return runner;
}
- private ExecutableFlow prepareExecDir(final File workingDir, final File execDir,
- final String flowName, final int execId) throws IOException {
- FileUtils.copyDirectory(execDir, workingDir);
-
- final File jsonFlowFile = new File(workingDir, flowName + ".flow");
- final HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- final Project project = new Project(1, "test");
- final Flow flow = Flow.flowFromObject(flowObj);
- final ExecutableFlow execFlow = new ExecutableFlow(project, flow);
- execFlow.setExecutionId(execId);
- execFlow.setExecutionPath(workingDir.getPath());
- return execFlow;
- }
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index 5471c06..f5b195a 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock;
import azkaban.execapp.EventCollectorListener;
import azkaban.execapp.FlowRunner;
+import azkaban.execapp.FlowRunnerTestUtil;
import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
@@ -29,18 +30,13 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
-import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
-import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
-import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
-import java.util.HashMap;
-import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -233,7 +229,7 @@ public class RemoteFlowWatcherTest {
throws Exception {
final File testDir = ExecutionsTestUtil.getFlowDir("exectest1");
final ExecutableFlow exFlow =
- prepareExecDir(workingDir, testDir, flowName, execId);
+ FlowRunnerTestUtil.prepareExecDir(workingDir, testDir, flowName, execId);
final ExecutionOptions options = exFlow.getExecutionOptions();
if (watcher != null) {
options.setPipelineLevel(pipeline);
@@ -260,19 +256,4 @@ public class RemoteFlowWatcherTest {
}
}
- private ExecutableFlow prepareExecDir(final File workingDir, final File execDir,
- final String flowName, final int execId) throws IOException {
- FileUtils.copyDirectory(execDir, workingDir);
-
- final File jsonFlowFile = new File(workingDir, flowName + ".flow");
- final HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- final Project project = new Project(1, "test");
- final Flow flow = Flow.flowFromObject(flowObj);
- final ExecutableFlow execFlow = new ExecutableFlow(project, flow);
- execFlow.setExecutionId(execId);
- execFlow.setExecutionPath(workingDir.getPath());
- return execFlow;
- }
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index 8e15a9c..c26a68e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -29,20 +29,14 @@ import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.Status;
-import azkaban.flow.Flow;
import azkaban.jobExecutor.AllJobExecutorTests;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
-import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
-import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -67,7 +61,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(this.loader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
- this.workingDir = temporaryFolder.newFolder();
+ this.workingDir = this.temporaryFolder.newFolder();
this.jobtypeManager =
new JobTypeManager(null, null, this.getClass().getClassLoader());
final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
@@ -111,7 +105,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final ExecutableFlow exFlow = prepareExecDir(TEST_DIR, "exec1", 1);
+ final ExecutableFlow exFlow = FlowRunnerTestUtil
+ .prepareExecDir(this.workingDir, TEST_DIR, "exec1", 1);
// Disable couple in the middle and at the end.
exFlow.getExecutableNode("job1").setStatus(Status.DISABLED);
@@ -150,7 +145,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec2", 1);
+ final ExecutableFlow flow = FlowRunnerTestUtil
+ .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
this.runner = createFlowRunner(flow, this.loader, eventCollector);
@@ -180,7 +176,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec2", 1);
+ final ExecutableFlow flow = FlowRunnerTestUtil
+ .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
this.runner = createFlowRunner(flow, this.loader, eventCollector);
@@ -211,7 +208,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
final EventCollectorListener eventCollector = new EventCollectorListener();
eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
- final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec3", 1);
+ final ExecutableFlow flow = FlowRunnerTestUtil
+ .prepareExecDir(this.workingDir, TEST_DIR, "exec3", 1);
flow.getExecutionOptions().setFailureAction(
FailureAction.FINISH_ALL_POSSIBLE);
this.runner = createFlowRunner(flow, this.loader, eventCollector);
@@ -300,24 +298,6 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
}
}
- private ExecutableFlow prepareExecDir(final File execDir, final String flowName,
- final int execId) throws IOException {
- FileUtils.copyDirectory(execDir, this.workingDir);
-
- final File jsonFlowFile = new File(this.workingDir, flowName + ".flow");
- final HashMap<String, Object> flowObj =
- (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
- final Project project = new Project(1, "myproject");
- project.setVersion(2);
-
- final Flow flow = Flow.flowFromObject(flowObj);
- final ExecutableFlow execFlow = new ExecutableFlow(project, flow);
- execFlow.setExecutionId(execId);
- execFlow.setExecutionPath(this.workingDir.getPath());
- return execFlow;
- }
-
private void compareFinishedRuntime(final FlowRunner runner) throws Exception {
final ExecutableFlow flow = runner.getExecutableFlow();
for (final String flowName : flow.getStartNodes()) {
@@ -375,7 +355,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
private FlowRunner createFlowRunner(final ExecutorLoader loader,
final EventCollectorListener eventCollector, final String flowName, final Props azkabanProps)
throws Exception {
- final ExecutableFlow exFlow = prepareExecDir(TEST_DIR, flowName, 1);
+ final ExecutableFlow exFlow = FlowRunnerTestUtil
+ .prepareExecDir(this.workingDir, TEST_DIR, flowName, 1);
loader.uploadExecutableFlow(exFlow);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index 512064b..ef0413d 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -16,10 +16,12 @@
package azkaban.execapp;
+import azkaban.executor.ExecutableFlow;
import azkaban.flow.Flow;
import azkaban.project.DirectoryFlowLoader;
import azkaban.project.Project;
import azkaban.project.ProjectManagerException;
+import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
@@ -27,7 +29,7 @@ import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
-class FlowRunnerTestUtil {
+public class FlowRunnerTestUtil {
/**
* Initialize the project with the flow definitions stored in the given source directory. Also
@@ -41,7 +43,7 @@ class FlowRunnerTestUtil {
* @throws ProjectManagerException the project manager exception
* @throws IOException the io exception
*/
- static Map<String, Flow> prepareProject(final Project project, final File sourceDir,
+ public static Map<String, Flow> prepareProject(final Project project, final File sourceDir,
final Logger logger, final File workingDir)
throws ProjectManagerException, IOException {
final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
@@ -62,4 +64,17 @@ class FlowRunnerTestUtil {
return flowMap;
}
+ public static ExecutableFlow prepareExecDir(final File workingDir, final File execDir,
+ final String flowName, final int execId) throws IOException {
+ FileUtils.copyDirectory(execDir, workingDir);
+ final File jsonFlowFile = new File(workingDir, flowName + ".flow");
+ final Object flowObj = JSONUtils.parseJSONFromFile(jsonFlowFile);
+ final Project project = new Project(1, "test");
+ final Flow flow = Flow.flowFromObject(flowObj);
+ final ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+ execFlow.setExecutionId(execId);
+ execFlow.setExecutionPath(workingDir.getPath());
+ return execFlow;
+ }
+
}