/*
* Copyright 2014 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.execapp.event;
import static org.mockito.Mockito.mock;
import azkaban.execapp.EventCollectorListener;
import azkaban.execapp.EventReporterUtil;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.FlowRunnerTestUtil;
import azkaban.execapp.jmx.JmxJobMBeanManager;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
import azkaban.spi.AzkabanEventReporter;
import azkaban.test.Utils;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class RemoteFlowWatcherTest {
private final AzkabanEventReporter azkabanEventReporter =
EventReporterUtil.getTestAzkabanEventReporter();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private JobTypeManager jobtypeManager;
@Before
public void setUp() throws Exception {
this.jobtypeManager =
new JobTypeManager(null, null, this.getClass().getClassLoader());
this.jobtypeManager.getJobTypePluginSet().addPluginClass("test", InteractiveTestJob.class);
Utils.initServiceProvider();
JmxJobMBeanManager.getInstance().initialize(new Props());
InteractiveTestJob.setQuickSuccess(true);
}
@After
public void tearDown() throws IOException {
InteractiveTestJob.resetQuickSuccess();
}
@Test
public void testBasicRemoteFlowWatcher() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final File workingDir1 = this.temporaryFolder.newFolder();
final FlowRunner runner1 =
createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
null);
final Thread runner1Thread = new Thread(runner1);
final File workingDir2 = this.temporaryFolder.newFolder();
final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
final FlowRunner runner2 =
createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
watcher, 2);
final Thread runner2Thread = new Thread(runner2);
printCurrentState("runner1 ", runner1.getExecutableFlow());
runner1Thread.start();
runner2Thread.start();
runner2Thread.join();
assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), false);
}
@Test
public void testRemoteFlowWatcherBlockingJobsLeftInReadyState() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final File workingDir1 = this.temporaryFolder.newFolder();
final FlowRunner runner1 =
createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
null);
final Thread runner1Thread = new Thread(runner1);
final File workingDir2 = this.temporaryFolder.newFolder();
final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
final FlowRunner runner2 =
createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
watcher, 2);
final Thread runner2Thread = new Thread(runner2);
printCurrentState("runner1 ", runner1.getExecutableFlow());
runner1Thread.start();
runner1Thread.join();
// simulate a real life scenario - this can happen for disabled jobs inside subflows:
// flow has finished otherwise but a "blocking" job has READY status
// the test gets stuck here without the fix in FlowWatcher.peekStatus
runner1.getExecutableFlow().getExecutableNodePath("job4").setStatus(Status.READY);
loader.updateExecutableFlow(runner1.getExecutableFlow());
runner2Thread.start();
runner2Thread.join();
assertPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow(), true);
}
@Test
public void testLevel1RemoteFlowWatcher() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final File workingDir1 = this.temporaryFolder.newFolder();
final FlowRunner runner1 =
createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
null);
final Thread runner1Thread = new Thread(runner1);
final File workingDir2 = this.temporaryFolder.newFolder();
final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
final FlowRunner runner2 =
createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
watcher, 1);
final Thread runner2Thread = new Thread(runner2);
runner1Thread.start();
runner2Thread.start();
runner2Thread.join();
testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
}
@Test
public void testLevel2DiffRemoteFlowWatcher() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final File workingDir1 = this.temporaryFolder.newFolder();
final FlowRunner runner1 =
createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
null);
final Thread runner1Thread = new Thread(runner1);
final File workingDir2 = this.temporaryFolder.newFolder();
final RemoteFlowWatcher watcher = new RemoteFlowWatcher(1, loader, 100);
final FlowRunner runner2 =
createFlowRunner(workingDir2, loader, eventCollector, "exec1-mod", 2,
watcher, 1);
final Thread runner2Thread = new Thread(runner2);
runner1Thread.start();
runner2Thread.start();
runner2Thread.join();
testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
}
private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
System.out.println("Node " + node.getId() + " start: "
+ node.getStartTime() + " dependent on " + watchedNode.getId() + " "
+ watchedNode.getEndTime() + " diff: "
+ (node.getStartTime() - watchedNode.getEndTime()));
Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
long minParentDiff = 0;
if (node.getInNodes().size() > 0) {
minParentDiff = Long.MAX_VALUE;
for (final String dependency : node.getInNodes()) {
final ExecutableNode parent = second.getExecutableNode(dependency);
final long diff = node.getStartTime() - parent.getEndTime();
minParentDiff = Math.min(minParentDiff, diff);
}
}
final long diff = node.getStartTime() - watchedNode.getEndTime();
Assert.assertTrue(minParentDiff < 500 || diff < 500);
}
}
private void assertPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second,
final boolean job4Skipped) {
for (final ExecutableNode node : second.getExecutableNodes()) {
Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
Assert.assertEquals(watchedNode.getStatus(),
job4Skipped && watchedNode.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
long minDiff = Long.MAX_VALUE;
for (final String watchedChild : watchedNode.getOutNodes()) {
final ExecutableNode child = first.getExecutableNode(watchedChild);
if (child == null) {
continue;
}
Assert.assertEquals(child.getStatus(),
job4Skipped && child.getId().equals("job4") ? Status.READY : Status.SUCCEEDED);
final long diff = node.getStartTime() - child.getEndTime();
minDiff = Math.min(minDiff, diff);
Assert.assertTrue(
"Node " + node.getId() + " start: " + node.getStartTime() + " dependent on "
+ watchedChild + " " + child.getEndTime() + " diff: " + diff,
node.getStartTime() >= child.getEndTime());
}
long minParentDiff = Long.MAX_VALUE;
for (final String dependency : node.getInNodes()) {
final ExecutableNode parent = second.getExecutableNode(dependency);
final long diff = node.getStartTime() - parent.getEndTime();
minParentDiff = Math.min(minParentDiff, diff);
}
Assert.assertTrue("minPipelineTimeDiff:" + minDiff
+ " minDependencyTimeDiff:" + minParentDiff,
minParentDiff < 5000 || minDiff < 5000);
}
}
private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
final EventCollectorListener eventCollector, final String flowName, final int execId,
final FlowWatcher watcher, final Integer pipeline) throws Exception {
return createFlowRunner(workingDir, loader, eventCollector, flowName, execId, watcher, pipeline,
new Props());
}
private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
final EventCollectorListener eventCollector, final String flowName, final int execId,
final FlowWatcher watcher, final Integer pipeline, final Props azkabanProps)
throws Exception {
final File testDir = ExecutionsTestUtil.getFlowDir("exectest1");
final ExecutableFlow exFlow =
FlowRunnerTestUtil.prepareExecDir(workingDir, testDir, flowName, execId);
final ExecutionOptions options = exFlow.getExecutionOptions();
if (watcher != null) {
options.setPipelineLevel(pipeline);
options.setPipelineExecutionId(watcher.getExecId());
}
loader.uploadExecutableFlow(exFlow);
final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
runner.setFlowWatcher(watcher);
runner.addListener(eventCollector);
return runner;
}
private void printCurrentState(final String prefix, final ExecutableFlowBase flow) {
for (final ExecutableNode node : flow.getExecutableNodes()) {
System.err.println(prefix + node.getNestedId() + "->"
+ node.getStatus().name());
if (node instanceof ExecutableFlowBase) {
printCurrentState(prefix, (ExecutableFlowBase) node);
}
}
}
}