/*
* Copyright 2018 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/package azkaban.execapp;
importstatic org.assertj.core.api.Assertions.assertThat;
import azkaban.dag.Dag;
import azkaban.dag.DagBuilder;
import azkaban.dag.DagProcessor;
import azkaban.dag.DagService;
import azkaban.dag.Node;
import azkaban.dag.NodeProcessor;
import azkaban.dag.Status;
import azkaban.project.NodeBean;
import azkaban.project.NodeBeanLoader;
import azkaban.utils.ExecutorServiceUtils;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
/**
* Tests for running flows.
*/publicclassFlowRunner2Test{
privatefinal DagService dagService = new DagService(new ExecutorServiceUtils());
privatefinal CountDownLatch flowFinishedLatch = new CountDownLatch(1);
// The recorded event sequence.privatefinal List<String> eventSequence = new ArrayList<>();
@TestpublicvoidrunSimpleV2Flow()throws Exception {
final NodeBean flowNode = loadFlowNode();
final Dag dag = createDag(flowNode);
this.dagService.startDag(dag);
this.flowFinishedLatch.await(2, TimeUnit.SECONDS);
assertThat(this.eventSequence).isEqualTo(Arrays.asList("n1", "n2"));
this.dagService.shutdownAndAwaitTermination();
}
private NodeBean loadFlowNode()throws Exception {
final File flowFile = loadFlowFileFromResource();
final NodeBeanLoader beanLoader = new NodeBeanLoader();
return beanLoader.load(flowFile);
}
private Dag createDag(final NodeBean flowNode){
final DagCreator creator = new DagCreator(flowNode);
return creator.create();
}
privateclassDagCreator{
privatefinal NodeBean flowNode;
privatefinal DagBuilder dagBuilder;
DagCreator(final NodeBean flowNode) {
final String flowName = flowNode.getName();
this.flowNode = flowNode;
this.dagBuilder = new DagBuilder(flowName, new SimpleDagProcessor());
}
Dag create(){
createNodes();
linkNodes();
returnthis.dagBuilder.build();
}
privatevoidcreateNodes(){
for (final NodeBean node : this.flowNode.getNodes()) {
createNode(node);
}
}
privatevoidcreateNode(final NodeBean node){
final String nodeName = node.getName();
final SimpleNodeProcessor nodeProcessor = new SimpleNodeProcessor(nodeName, node.getConfig());
this.dagBuilder.createNode(nodeName, nodeProcessor);
}
privatevoidlinkNodes(){
for (final NodeBean node : this.flowNode.getNodes()) {
linkNode(node);
}
}
privatevoidlinkNode(final NodeBean node){
final String name = node.getName();
final List<String> parents = node.getDependsOn();
if (parents == null) {
return;
}
for (final String parentNodeName : parents) {
this.dagBuilder.addParentNode(name, parentNodeName);
}
}
}
private File loadFlowFileFromResource(){
final ClassLoader loader = getClass().getClassLoader();
returnnew File(loader.getResource("hello_world_flow.flow").getFile());
}
classSimpleDagProcessorimplementsDagProcessor{
@OverridepublicvoidchangeStatus(final Dag dag, final Status status){
System.out.println(dag + " status changed to " + status);
if (status.isTerminal()) {
FlowRunner2Test.this.flowFinishedLatch.countDown();
}
}
}
classSimpleNodeProcessorimplementsNodeProcessor{
privatefinal String name;
privatefinal Map<String, String> config;
SimpleNodeProcessor(final String name, final Map<String, String> config) {
this.name = name;
this.config = config;
}
@OverridepublicvoidchangeStatus(final Node node, final Status status){
System.out.println(node + " status changed to " + status);
switch (status) {
case RUNNING:
System.out.println(String.format("Running with config: %s", this.config));
FlowRunner2Test.this.dagService.markNodeSuccess(node);
FlowRunner2Test.this.eventSequence.add(this.name);
break;
default:
break;
}
}
}
}