azkaban-aplcache

Enable RemoteFlowWatcherTest (#1244) * Enable RemoteFlowWatcherTest Fixed

9/6/2017 11:47:50 AM

Details

diff --git a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
index 9fb5da2..6514e97 100644
--- a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
@@ -30,6 +30,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
 
   private static final ConcurrentHashMap<String, InteractiveTestJob> testJobs =
       new ConcurrentHashMap<>();
+  private static volatile boolean quickSuccess = false;
   private Props generatedProperties = new Props();
   private boolean isWaiting = true;
   private boolean succeed = true;
@@ -69,6 +70,14 @@ public class InteractiveTestJob extends AbstractProcessJob {
     }
   }
 
+  public static void setQuickSuccess(final boolean quickSuccess) {
+    InteractiveTestJob.quickSuccess = quickSuccess;
+  }
+
+  public static void resetQuickSuccess() {
+    InteractiveTestJob.quickSuccess = false;
+  }
+
   @Override
   public void run() throws Exception {
     final String nestedFlowPath =
@@ -82,6 +91,9 @@ public class InteractiveTestJob extends AbstractProcessJob {
     synchronized (testJobs) {
       testJobs.notifyAll();
     }
+    if (quickSuccess) {
+      return;
+    }
 
     if (this.jobProps.getBoolean("fail", false)) {
       final int passRetry = this.jobProps.getInt("passRetry", -1);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index ee3fe23..5471c06 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -20,18 +20,21 @@ import static org.mockito.Mockito.mock;
 
 import azkaban.execapp.EventCollectorListener;
 import azkaban.execapp.FlowRunner;
+import azkaban.execapp.jmx.JmxJobMBeanManager;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
-import azkaban.executor.JavaJob;
+import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.MockExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
+import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 import java.io.File;
@@ -41,51 +44,44 @@ import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class RemoteFlowWatcherTest {
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private JobTypeManager jobtypeManager;
-  private int dirVal = 0;
 
   @Before
   public void setUp() throws Exception {
     this.jobtypeManager =
         new JobTypeManager(null, null, this.getClass().getClassLoader());
-    this.jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
+    this.jobtypeManager.getJobTypePluginSet().addPluginClass("test", InteractiveTestJob.class);
+    Utils.initServiceProvider();
+    JmxJobMBeanManager.getInstance().initialize(new Props());
+    InteractiveTestJob.setQuickSuccess(true);
   }
 
   @After
   public void tearDown() throws IOException {
+    InteractiveTestJob.resetQuickSuccess();
   }
 
-  public File setupDirectory() throws IOException {
-    System.out.println("Create temp dir");
-    final File workingDir = new File("_AzkabanTestDir_" + this.dirVal);
-    if (workingDir.exists()) {
-      FileUtils.deleteDirectory(workingDir);
-    }
-    workingDir.mkdirs();
-    this.dirVal++;
-
-    return workingDir;
-  }
-
-  @Ignore
   @Test
   public void testBasicRemoteFlowWatcher() throws Exception {
     final MockExecutorLoader loader = new MockExecutorLoader();
 
     final EventCollectorListener eventCollector = new EventCollectorListener();
 
-    final File workingDir1 = setupDirectory();
+    final File workingDir1 = this.temporaryFolder.newFolder();
     final FlowRunner runner1 =
         createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
             null);
     final Thread runner1Thread = new Thread(runner1);
 
-    final File workingDir2 = setupDirectory();
+    final File workingDir2 = this.temporaryFolder.newFolder();
     final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
     final FlowRunner runner2 =
         createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
@@ -98,26 +94,22 @@ public class RemoteFlowWatcherTest {
 
     runner2Thread.join();
 
-    FileUtils.deleteDirectory(workingDir1);
-    FileUtils.deleteDirectory(workingDir2);
-
-    testPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+    assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
   }
 
-  @Ignore
   @Test
   public void testLevel1RemoteFlowWatcher() throws Exception {
     final MockExecutorLoader loader = new MockExecutorLoader();
 
     final EventCollectorListener eventCollector = new EventCollectorListener();
 
-    final File workingDir1 = setupDirectory();
+    final File workingDir1 = this.temporaryFolder.newFolder();
     final FlowRunner runner1 =
         createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
             null);
     final Thread runner1Thread = new Thread(runner1);
 
-    final File workingDir2 = setupDirectory();
+    final File workingDir2 = this.temporaryFolder.newFolder();
     final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
     final FlowRunner runner2 =
         createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
@@ -128,26 +120,22 @@ public class RemoteFlowWatcherTest {
     runner2Thread.start();
     runner2Thread.join();
 
-    FileUtils.deleteDirectory(workingDir1);
-    FileUtils.deleteDirectory(workingDir2);
-
     testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
   }
 
-  @Ignore
   @Test
   public void testLevel2DiffRemoteFlowWatcher() throws Exception {
     final MockExecutorLoader loader = new MockExecutorLoader();
 
     final EventCollectorListener eventCollector = new EventCollectorListener();
 
-    final File workingDir1 = setupDirectory();
+    final File workingDir1 = this.temporaryFolder.newFolder();
     final FlowRunner runner1 =
         createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
             null);
     final Thread runner1Thread = new Thread(runner1);
 
-    final File workingDir2 = setupDirectory();
+    final File workingDir2 = this.temporaryFolder.newFolder();
 
     final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
     final FlowRunner runner2 =
@@ -159,9 +147,6 @@ public class RemoteFlowWatcherTest {
     runner2Thread.start();
     runner2Thread.join();
 
-    FileUtils.deleteDirectory(workingDir1);
-    FileUtils.deleteDirectory(workingDir2);
-
     testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
   }
 
@@ -197,7 +182,7 @@ public class RemoteFlowWatcherTest {
     }
   }
 
-  private void testPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
+  private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
     for (final ExecutableNode node : second.getExecutableNodes()) {
       Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
 
@@ -217,10 +202,10 @@ public class RemoteFlowWatcherTest {
         Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
         final long diff = node.getStartTime() - child.getEndTime();
         minDiff = Math.min(minDiff, diff);
-        System.out.println("Node " + node.getId() + " start: "
-            + node.getStartTime() + " dependent on " + watchedChild + " "
-            + child.getEndTime() + " diff: " + diff);
-        Assert.assertTrue(node.getStartTime() >= child.getEndTime());
+        Assert.assertTrue(
+            "Node " + node.getId() + " start: " + node.getStartTime() + " dependent on "
+                + watchedChild + " " + child.getEndTime() + " diff: " + diff,
+            node.getStartTime() >= child.getEndTime());
       }
 
       long minParentDiff = Long.MAX_VALUE;
@@ -229,9 +214,9 @@ public class RemoteFlowWatcherTest {
         final long diff = node.getStartTime() - parent.getEndTime();
         minParentDiff = Math.min(minParentDiff, diff);
       }
-      System.out.println("   minPipelineTimeDiff:" + minDiff
-          + " minDependencyTimeDiff:" + minParentDiff);
-      Assert.assertTrue(minParentDiff < 500 || minDiff < 500);
+      Assert.assertTrue("minPipelineTimeDiff:" + minDiff
+              + " minDependencyTimeDiff:" + minParentDiff,
+          minParentDiff < 5000 || minDiff < 5000);
     }
   }
 
@@ -246,7 +231,7 @@ public class RemoteFlowWatcherTest {
       final EventCollectorListener eventCollector, final String flowName, final int execId,
       final FlowWatcher watcher, final Integer pipeline, final Props azkabanProps)
       throws Exception {
-    final File testDir = new File("unit/executions/exectest1");
+    final File testDir = ExecutionsTestUtil.getFlowDir("exectest1");
     final ExecutableFlow exFlow =
         prepareExecDir(workingDir, testDir, flowName, execId);
     final ExecutionOptions options = exFlow.getExecutionOptions();