diff --git a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
index 9fb5da2..6514e97 100644
--- a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
@@ -30,6 +30,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
private static final ConcurrentHashMap<String, InteractiveTestJob> testJobs =
new ConcurrentHashMap<>();
+ private static volatile boolean quickSuccess = false;
private Props generatedProperties = new Props();
private boolean isWaiting = true;
private boolean succeed = true;
@@ -69,6 +70,14 @@ public class InteractiveTestJob extends AbstractProcessJob {
}
}
+ public static void setQuickSuccess(final boolean quickSuccess) {
+ InteractiveTestJob.quickSuccess = quickSuccess;
+ }
+
+ public static void resetQuickSuccess() {
+ InteractiveTestJob.quickSuccess = false;
+ }
+
@Override
public void run() throws Exception {
final String nestedFlowPath =
@@ -82,6 +91,9 @@ public class InteractiveTestJob extends AbstractProcessJob {
synchronized (testJobs) {
testJobs.notifyAll();
}
+ if (quickSuccess) {
+ return;
+ }
if (this.jobProps.getBoolean("fail", false)) {
final int passRetry = this.jobProps.getInt("passRetry", -1);
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
index ee3fe23..5471c06 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/event/RemoteFlowWatcherTest.java
@@ -20,18 +20,21 @@ import static org.mockito.Mockito.mock;
import azkaban.execapp.EventCollectorListener;
import azkaban.execapp.FlowRunner;
+import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
-import azkaban.executor.JavaJob;
+import azkaban.executor.InteractiveTestJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
+import azkaban.test.Utils;
+import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import java.io.File;
@@ -41,51 +44,44 @@ import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
public class RemoteFlowWatcherTest {
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
private JobTypeManager jobtypeManager;
- private int dirVal = 0;
@Before
public void setUp() throws Exception {
this.jobtypeManager =
new JobTypeManager(null, null, this.getClass().getClassLoader());
- this.jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
+ this.jobtypeManager.getJobTypePluginSet().addPluginClass("test", InteractiveTestJob.class);
+ Utils.initServiceProvider();
+ JmxJobMBeanManager.getInstance().initialize(new Props());
+ InteractiveTestJob.setQuickSuccess(true);
}
@After
public void tearDown() throws IOException {
+ InteractiveTestJob.resetQuickSuccess();
}
- public File setupDirectory() throws IOException {
- System.out.println("Create temp dir");
- final File workingDir = new File("_AzkabanTestDir_" + this.dirVal);
- if (workingDir.exists()) {
- FileUtils.deleteDirectory(workingDir);
- }
- workingDir.mkdirs();
- this.dirVal++;
-
- return workingDir;
- }
-
- @Ignore
@Test
public void testBasicRemoteFlowWatcher() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
- final File workingDir1 = setupDirectory();
+ final File workingDir1 = this.temporaryFolder.newFolder();
final FlowRunner runner1 =
createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
null);
final Thread runner1Thread = new Thread(runner1);
- final File workingDir2 = setupDirectory();
+ final File workingDir2 = this.temporaryFolder.newFolder();
final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
final FlowRunner runner2 =
createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
@@ -98,26 +94,22 @@ public class RemoteFlowWatcherTest {
runner2Thread.join();
- FileUtils.deleteDirectory(workingDir1);
- FileUtils.deleteDirectory(workingDir2);
-
- testPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
+ assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
}
- @Ignore
@Test
public void testLevel1RemoteFlowWatcher() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
- final File workingDir1 = setupDirectory();
+ final File workingDir1 = this.temporaryFolder.newFolder();
final FlowRunner runner1 =
createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
null);
final Thread runner1Thread = new Thread(runner1);
- final File workingDir2 = setupDirectory();
+ final File workingDir2 = this.temporaryFolder.newFolder();
final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
final FlowRunner runner2 =
createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
@@ -128,26 +120,22 @@ public class RemoteFlowWatcherTest {
runner2Thread.start();
runner2Thread.join();
- FileUtils.deleteDirectory(workingDir1);
- FileUtils.deleteDirectory(workingDir2);
-
testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
}
- @Ignore
@Test
public void testLevel2DiffRemoteFlowWatcher() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
- final File workingDir1 = setupDirectory();
+ final File workingDir1 = this.temporaryFolder.newFolder();
final FlowRunner runner1 =
createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
null);
final Thread runner1Thread = new Thread(runner1);
- final File workingDir2 = setupDirectory();
+ final File workingDir2 = this.temporaryFolder.newFolder();
final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
final FlowRunner runner2 =
@@ -159,9 +147,6 @@ public class RemoteFlowWatcherTest {
runner2Thread.start();
runner2Thread.join();
- FileUtils.deleteDirectory(workingDir1);
- FileUtils.deleteDirectory(workingDir2);
-
testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
}
@@ -197,7 +182,7 @@ public class RemoteFlowWatcherTest {
}
}
- private void testPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
+ private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
Assert.assertEquals(node.getStatus(), Status.SUCCEEDED);
@@ -217,10 +202,10 @@ public class RemoteFlowWatcherTest {
Assert.assertEquals(child.getStatus(), Status.SUCCEEDED);
final long diff = node.getStartTime() - child.getEndTime();
minDiff = Math.min(minDiff, diff);
- System.out.println("Node " + node.getId() + " start: "
- + node.getStartTime() + " dependent on " + watchedChild + " "
- + child.getEndTime() + " diff: " + diff);
- Assert.assertTrue(node.getStartTime() >= child.getEndTime());
+ Assert.assertTrue(
+ "Node " + node.getId() + " start: " + node.getStartTime() + " dependent on "
+ + watchedChild + " " + child.getEndTime() + " diff: " + diff,
+ node.getStartTime() >= child.getEndTime());
}
long minParentDiff = Long.MAX_VALUE;
@@ -229,9 +214,9 @@ public class RemoteFlowWatcherTest {
final long diff = node.getStartTime() - parent.getEndTime();
minParentDiff = Math.min(minParentDiff, diff);
}
- System.out.println(" minPipelineTimeDiff:" + minDiff
- + " minDependencyTimeDiff:" + minParentDiff);
- Assert.assertTrue(minParentDiff < 500 || minDiff < 500);
+ Assert.assertTrue("minPipelineTimeDiff:" + minDiff
+ + " minDependencyTimeDiff:" + minParentDiff,
+ minParentDiff < 5000 || minDiff < 5000);
}
}
@@ -246,7 +231,7 @@ public class RemoteFlowWatcherTest {
final EventCollectorListener eventCollector, final String flowName, final int execId,
final FlowWatcher watcher, final Integer pipeline, final Props azkabanProps)
throws Exception {
- final File testDir = new File("unit/executions/exectest1");
+ final File testDir = ExecutionsTestUtil.getFlowDir("exectest1");
final ExecutableFlow exFlow =
prepareExecDir(workingDir, testDir, flowName, execId);
final ExecutionOptions options = exFlow.getExecutionOptions();