/*
* Copyright 2014 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.execapp.event;
import static org.mockito.Mockito.mock;
import azkaban.execapp.EventCollectorListener;
import azkaban.execapp.EventReporterUtil;
import azkaban.execapp.FlowRunner;
import azkaban.execapp.FlowRunnerTestUtil;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutionOptions;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.JavaJob;
import azkaban.executor.MockExecutorLoader;
import azkaban.executor.Status;
import azkaban.jobtype.JobTypeManager;
import azkaban.project.ProjectLoader;
import azkaban.spi.AzkabanEventReporter;
import azkaban.utils.Props;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class LocalFlowWatcherTest {
private final AzkabanEventReporter azkabanEventReporter =
EventReporterUtil.getTestAzkabanEventReporter();
private JobTypeManager jobtypeManager;
private int dirVal = 0;
@Before
public void setUp() throws Exception {
this.jobtypeManager =
new JobTypeManager(null, null, this.getClass().getClassLoader());
this.jobtypeManager.getJobTypePluginSet().addPluginClass("java", JavaJob.class);
}
@After
public void tearDown() throws IOException {
}
public File setupDirectory() throws IOException {
System.out.println("Create temp dir");
final File workingDir = new File("_AzkabanTestDir_" + this.dirVal);
if (workingDir.exists()) {
FileUtils.deleteDirectory(workingDir);
}
workingDir.mkdirs();
this.dirVal++;
return workingDir;
}
@Ignore
@Test
public void testBasicLocalFlowWatcher() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final File workingDir1 = setupDirectory();
final FlowRunner runner1 =
createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
null);
final Thread runner1Thread = new Thread(runner1);
final File workingDir2 = setupDirectory();
final LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
final FlowRunner runner2 =
createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
watcher, 2);
final Thread runner2Thread = new Thread(runner2);
runner1Thread.start();
runner2Thread.start();
runner2Thread.join();
FileUtils.deleteDirectory(workingDir1);
FileUtils.deleteDirectory(workingDir2);
testPipelineLevel2(runner1.getExecutableFlow(), runner2.getExecutableFlow());
}
@Ignore
@Test
public void testLevel1LocalFlowWatcher() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final File workingDir1 = setupDirectory();
final FlowRunner runner1 =
createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
null);
final Thread runner1Thread = new Thread(runner1);
final File workingDir2 = setupDirectory();
final LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
final FlowRunner runner2 =
createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2,
watcher, 1);
final Thread runner2Thread = new Thread(runner2);
runner1Thread.start();
runner2Thread.start();
runner2Thread.join();
FileUtils.deleteDirectory(workingDir1);
FileUtils.deleteDirectory(workingDir2);
testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
}
@Ignore
@Test
public void testLevel2DiffLocalFlowWatcher() throws Exception {
final MockExecutorLoader loader = new MockExecutorLoader();
final EventCollectorListener eventCollector = new EventCollectorListener();
final File workingDir1 = setupDirectory();
final FlowRunner runner1 =
createFlowRunner(workingDir1, loader, eventCollector, "exec1", 1, null,
null);
final Thread runner1Thread = new Thread(runner1);
final File workingDir2 = setupDirectory();
final LocalFlowWatcher watcher = new LocalFlowWatcher(runner1);
final FlowRunner runner2 =
createFlowRunner(workingDir2, loader, eventCollector, "exec1-mod", 2,
watcher, 1);
final Thread runner2Thread = new Thread(runner2);
runner1Thread.start();
runner2Thread.start();
runner2Thread.join();
FileUtils.deleteDirectory(workingDir1);
FileUtils.deleteDirectory(workingDir2);
testPipelineLevel1(runner1.getExecutableFlow(), runner2.getExecutableFlow());
}
private void testPipelineLevel1(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
System.out.println("Node " + node.getId() + " start: "
+ node.getStartTime() + " dependent on " + watchedNode.getId() + " "
+ watchedNode.getEndTime() + " diff: "
+ (node.getStartTime() - watchedNode.getEndTime()));
Assert.assertTrue(node.getStartTime() >= watchedNode.getEndTime());
long minParentDiff = 0;
if (node.getInNodes().size() > 0) {
minParentDiff = Long.MAX_VALUE;
for (final String dependency : node.getInNodes()) {
final ExecutableNode parent = second.getExecutableNode(dependency);
final long diff = node.getStartTime() - parent.getEndTime();
minParentDiff = Math.min(minParentDiff, diff);
}
}
final long diff = node.getStartTime() - watchedNode.getEndTime();
System.out.println(" minPipelineTimeDiff:" + diff
+ " minDependencyTimeDiff:" + minParentDiff);
Assert.assertTrue(minParentDiff < 100 || diff < 100);
}
}
private void testPipelineLevel2(final ExecutableFlow first, final ExecutableFlow second) {
for (final ExecutableNode node : second.getExecutableNodes()) {
Assert.assertEquals(Status.SUCCEEDED, node.getStatus());
// check it's start time is after the first's children.
final ExecutableNode watchedNode = first.getExecutableNode(node.getId());
if (watchedNode == null) {
continue;
}
Assert.assertEquals(Status.SUCCEEDED, watchedNode.getStatus());
long minDiff = Long.MAX_VALUE;
for (final String watchedChild : watchedNode.getOutNodes()) {
final ExecutableNode child = first.getExecutableNode(watchedChild);
if (child == null) {
continue;
}
Assert.assertEquals(Status.SUCCEEDED, child.getStatus());
final long diff = node.getStartTime() - child.getEndTime();
minDiff = Math.min(minDiff, diff);
System.out.println("Node " + node.getId() + " start: "
+ node.getStartTime() + " dependent on " + watchedChild + " "
+ child.getEndTime() + " diff: " + diff);
Assert.assertTrue(node.getStartTime() >= child.getEndTime());
}
long minParentDiff = Long.MAX_VALUE;
for (final String dependency : node.getInNodes()) {
final ExecutableNode parent = second.getExecutableNode(dependency);
final long diff = node.getStartTime() - parent.getEndTime();
minParentDiff = Math.min(minParentDiff, diff);
}
System.out.println(" minPipelineTimeDiff:" + minDiff
+ " minDependencyTimeDiff:" + minParentDiff);
Assert.assertTrue(minParentDiff < 100 || minDiff < 100);
}
}
private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
final EventCollectorListener eventCollector, final String flowName, final int execId,
final FlowWatcher watcher, final Integer pipeline) throws Exception {
return createFlowRunner(workingDir, loader, eventCollector, flowName, execId, watcher, pipeline,
new Props());
}
private FlowRunner createFlowRunner(final File workingDir, final ExecutorLoader loader,
final EventCollectorListener eventCollector, final String flowName, final int execId,
final FlowWatcher watcher, final Integer pipeline, final Props azkabanProps)
throws Exception {
final File testDir = new File("unit/executions/exectest1");
final ExecutableFlow exFlow =
FlowRunnerTestUtil.prepareExecDir(workingDir, testDir, flowName, execId);
final ExecutionOptions option = exFlow.getExecutionOptions();
if (watcher != null) {
option.setPipelineLevel(pipeline);
option.setPipelineExecutionId(watcher.getExecId());
}
loader.uploadExecutableFlow(exFlow);
final FlowRunner runner = new FlowRunner(exFlow, loader, mock(ProjectLoader.class),
this.jobtypeManager, azkabanProps, this.azkabanEventReporter);
runner.setFlowWatcher(watcher);
runner.addListener(eventCollector);
return runner;
}
}