azkaban-aplcache

Details

diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
index 7597bd5..681acd5 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/LocalFlowWatcherTest.java
@@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock;
 
 import azkaban.execapp.EventCollectorListener;
 import azkaban.execapp.FlowRunner;
+import azkaban.execapp.FlowRunnerTestUtil;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
@@ -27,15 +28,11 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.JavaJob;
 import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
-import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
-import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
-import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -247,7 +244,7 @@ public class LocalFlowWatcherTest {
       throws Exception {
     final File testDir = new File("unit/executions/exectest1");
     final ExecutableFlow exFlow =
-        prepareExecDir(workingDir, testDir, flowName, execId);
+        FlowRunnerTestUtil.prepareExecDir(workingDir, testDir, flowName, execId);
     final ExecutionOptions option = exFlow.getExecutionOptions();
     if (watcher != null) {
       option.setPipelineLevel(pipeline);
@@ -262,19 +259,4 @@ public class LocalFlowWatcherTest {
     return runner;
   }
 
-  private ExecutableFlow prepareExecDir(final File workingDir, final File execDir,
-      final String flowName, final int execId) throws IOException {
-    FileUtils.copyDirectory(execDir, workingDir);
-
-    final File jsonFlowFile = new File(workingDir, flowName + ".flow");
-    final HashMap<String, Object> flowObj =
-        (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    final Project project = new Project(1, "test");
-    final Flow flow = Flow.flowFromObject(flowObj);
-    final ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-    execFlow.setExecutionId(execId);
-    execFlow.setExecutionPath(workingDir.getPath());
-    return execFlow;
-  }
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index 5471c06..f5b195a 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock;
 
 import azkaban.execapp.EventCollectorListener;
 import azkaban.execapp.FlowRunner;
+import azkaban.execapp.FlowRunnerTestUtil;
 import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
@@ -29,18 +30,13 @@ import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
-import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
-import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
-import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
-import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -233,7 +229,7 @@ public class RemoteFlowWatcherTest {
       throws Exception {
     final File testDir = ExecutionsTestUtil.getFlowDir("exectest1");
     final ExecutableFlow exFlow =
-        prepareExecDir(workingDir, testDir, flowName, execId);
+        FlowRunnerTestUtil.prepareExecDir(workingDir, testDir, flowName, execId);
     final ExecutionOptions options = exFlow.getExecutionOptions();
     if (watcher != null) {
       options.setPipelineLevel(pipeline);
@@ -260,19 +256,4 @@ public class RemoteFlowWatcherTest {
     }
   }
 
-  private ExecutableFlow prepareExecDir(final File workingDir, final File execDir,
-      final String flowName, final int execId) throws IOException {
-    FileUtils.copyDirectory(execDir, workingDir);
-
-    final File jsonFlowFile = new File(workingDir, flowName + ".flow");
-    final HashMap<String, Object> flowObj =
-        (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    final Project project = new Project(1, "test");
-    final Flow flow = Flow.flowFromObject(flowObj);
-    final ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-    execFlow.setExecutionId(execId);
-    execFlow.setExecutionPath(workingDir.getPath());
-    return execFlow;
-  }
 }
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
index 8e15a9c..c26a68e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTest.java
@@ -29,20 +29,14 @@ import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.Status;
-import azkaban.flow.Flow;
 import azkaban.jobExecutor.AllJobExecutorTests;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
-import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.test.Utils;
 import azkaban.test.executions.ExecutionsTestUtil;
-import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -67,7 +61,7 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
     when(this.loader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
-    this.workingDir = temporaryFolder.newFolder();
+    this.workingDir = this.temporaryFolder.newFolder();
     this.jobtypeManager =
         new JobTypeManager(null, null, this.getClass().getClassLoader());
     final JobTypePluginSet pluginSet = this.jobtypeManager.getJobTypePluginSet();
@@ -111,7 +105,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final ExecutableFlow exFlow = prepareExecDir(TEST_DIR, "exec1", 1);
+    final ExecutableFlow exFlow = FlowRunnerTestUtil
+        .prepareExecDir(this.workingDir, TEST_DIR, "exec1", 1);
 
     // Disable couple in the middle and at the end.
     exFlow.getExecutableNode("job1").setStatus(Status.DISABLED);
@@ -150,7 +145,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec2", 1);
+    final ExecutableFlow flow = FlowRunnerTestUtil
+        .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
 
     this.runner = createFlowRunner(flow, this.loader, eventCollector);
 
@@ -180,7 +176,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec2", 1);
+    final ExecutableFlow flow = FlowRunnerTestUtil
+        .prepareExecDir(this.workingDir, TEST_DIR, "exec2", 1);
     flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
 
     this.runner = createFlowRunner(flow, this.loader, eventCollector);
@@ -211,7 +208,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     final EventCollectorListener eventCollector = new EventCollectorListener();
     eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED,
         Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
-    final ExecutableFlow flow = prepareExecDir(TEST_DIR, "exec3", 1);
+    final ExecutableFlow flow = FlowRunnerTestUtil
+        .prepareExecDir(this.workingDir, TEST_DIR, "exec3", 1);
     flow.getExecutionOptions().setFailureAction(
         FailureAction.FINISH_ALL_POSSIBLE);
     this.runner = createFlowRunner(flow, this.loader, eventCollector);
@@ -300,24 +298,6 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
     }
   }
 
-  private ExecutableFlow prepareExecDir(final File execDir, final String flowName,
-      final int execId) throws IOException {
-    FileUtils.copyDirectory(execDir, this.workingDir);
-
-    final File jsonFlowFile = new File(this.workingDir, flowName + ".flow");
-    final HashMap<String, Object> flowObj =
-        (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
-
-    final Project project = new Project(1, "myproject");
-    project.setVersion(2);
-
-    final Flow flow = Flow.flowFromObject(flowObj);
-    final ExecutableFlow execFlow = new ExecutableFlow(project, flow);
-    execFlow.setExecutionId(execId);
-    execFlow.setExecutionPath(this.workingDir.getPath());
-    return execFlow;
-  }
-
   private void compareFinishedRuntime(final FlowRunner runner) throws Exception {
     final ExecutableFlow flow = runner.getExecutableFlow();
     for (final String flowName : flow.getStartNodes()) {
@@ -375,7 +355,8 @@ public class FlowRunnerTest extends FlowRunnerTestBase {
   private FlowRunner createFlowRunner(final ExecutorLoader loader,
       final EventCollectorListener eventCollector, final String flowName, final Props azkabanProps)
       throws Exception {
-    final ExecutableFlow exFlow = prepareExecDir(TEST_DIR, flowName, 1);
+    final ExecutableFlow exFlow = FlowRunnerTestUtil
+        .prepareExecDir(this.workingDir, TEST_DIR, flowName, 1);
 
     loader.uploadExecutableFlow(exFlow);
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index 512064b..ef0413d 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -16,10 +16,12 @@
 
 package azkaban.execapp;
 
+import azkaban.executor.ExecutableFlow;
 import azkaban.flow.Flow;
 import azkaban.project.DirectoryFlowLoader;
 import azkaban.project.Project;
 import azkaban.project.ProjectManagerException;
+import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.IOException;
@@ -27,7 +29,7 @@ import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
-class FlowRunnerTestUtil {
+public class FlowRunnerTestUtil {
 
   /**
    * Initialize the project with the flow definitions stored in the given source directory. Also
@@ -41,7 +43,7 @@ class FlowRunnerTestUtil {
    * @throws ProjectManagerException the project manager exception
    * @throws IOException the io exception
    */
-  static Map<String, Flow> prepareProject(final Project project, final File sourceDir,
+  public static Map<String, Flow> prepareProject(final Project project, final File sourceDir,
       final Logger logger, final File workingDir)
       throws ProjectManagerException, IOException {
     final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
@@ -62,4 +64,17 @@ class FlowRunnerTestUtil {
     return flowMap;
   }
 
+  public static ExecutableFlow prepareExecDir(final File workingDir, final File execDir,
+      final String flowName, final int execId) throws IOException {
+    FileUtils.copyDirectory(execDir, workingDir);
+    final File jsonFlowFile = new File(workingDir, flowName + ".flow");
+    final Object flowObj = JSONUtils.parseJSONFromFile(jsonFlowFile);
+    final Project project = new Project(1, "test");
+    final Flow flow = Flow.flowFromObject(flowObj);
+    final ExecutableFlow execFlow = new ExecutableFlow(project, flow);
+    execFlow.setExecutionId(execId);
+    execFlow.setExecutionPath(workingDir.getPath());
+    return execFlow;
+  }
+
 }