/*
* Copyright 2017 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package azkaban.project;
import static org.assertj.core.api.Assertions.assertThat;
import azkaban.Constants;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
import java.io.File;
import org.apache.commons.io.FileUtils;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.junit.Test;
public class NodeBeanLoaderTest {
private static final String BASIC_FLOW_YML_TEST_DIR = "basicflowyamltest";
private static final String BASIC_FLOW_NAME = "basic_flow";
private static final String BASIC_FLOW_YML_FILE = BASIC_FLOW_NAME + ".flow";
private static final String EMBEDDED_FLOW_YML_TEST_DIR = "embeddedflowyamltest";
private static final String EMBEDDED_FLOW_NAME = "embedded_flow";
private static final String EMBEDDED_FLOW_YML_FILE = EMBEDDED_FLOW_NAME + ".flow";
private static final String TRIGGER_FLOW_YML_TEST_DIR = "flowtriggeryamltest";
private static final String TRIGGER_FLOW_NAME = "flow_trigger";
private static final String TRIGGER_FLOW_YML_FILE = TRIGGER_FLOW_NAME + ".flow";
private static final String FLOW_CONFIG_KEY = "flow-level-parameter";
private static final String FLOW_CONFIG_VALUE = "value";
private static final String SHELL_END = "shell_end";
private static final String SHELL_ECHO = "shell_echo";
private static final String SHELL_BASH = "shell_bash";
private static final String SHELL_PWD = "shell_pwd";
private static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
private static final String ECHO_COMMAND_1 = "echo \"This is an echoed text from embedded_flow1.\"";
private static final String ECHO_OVERRIDE = "echo \"Override job properties.\"";
private static final String PWD_COMMAND = "pwd";
private static final String BASH_COMMAND = "bash ./sample_script.sh";
private static final String EMBEDDED_FLOW1 = "embedded_flow1";
private static final String EMBEDDED_FLOW2 = "embedded_flow2";
private static final String TYPE_NOOP = "noop";
private static final String TYPE_COMMAND = "command";
private static final int MAX_WAIT_MINS = 5;
private static final String CRON_EXPRESSION = "0 0 1 ? * *";
private static final String TRIGGER_NAME_1 = "search-impression";
private static final String TRIGGER_NAME_2 = "other-name";
private static final String TRIGGER_TYPE = "dali-dataset";
private static final ImmutableMap<String, String> PARAMS_1 = ImmutableMap
.of("view", "search_mp_versioned.search_impression_event_0_0_47", "delay", "1", "window", "1",
"unit", "daily", "filter", "is_guest=0");
private static final ImmutableMap<String, String> PARAMS_2 = ImmutableMap
.of("view", "another dataset",
"delay", "1", "window", "7");
@Test
public void testLoadNodeBeanForBasicFlow() throws Exception {
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
validateNodeBean(nodeBean, BASIC_FLOW_NAME, Constants.FLOW_NODE_TYPE, FLOW_CONFIG_KEY,
FLOW_CONFIG_VALUE, 4, null);
validateNodeBean(nodeBean.getNodes().get(0), SHELL_END, TYPE_NOOP, null,
null, 0, Arrays.asList(SHELL_PWD, SHELL_ECHO, SHELL_BASH));
validateNodeBean(nodeBean.getNodes().get(1), SHELL_ECHO, TYPE_COMMAND, TYPE_COMMAND,
ECHO_COMMAND, 0, null);
}
@Test
public void testLoadNodeBeanForEmbeddedFlow() throws Exception {
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
validateNodeBean(nodeBean, EMBEDDED_FLOW_NAME, Constants.FLOW_NODE_TYPE, FLOW_CONFIG_KEY,
FLOW_CONFIG_VALUE, 4, null);
validateNodeBean(nodeBean.getNodes().get(0), SHELL_END, TYPE_NOOP, null,
null, 0, Arrays.asList(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1));
validateNodeBean(nodeBean.getNodes().get(1), SHELL_PWD, TYPE_COMMAND, TYPE_COMMAND,
PWD_COMMAND, 0, null);
validateNodeBean(nodeBean.getNodes().get(2), SHELL_ECHO, TYPE_COMMAND, TYPE_COMMAND,
ECHO_COMMAND, 0, null);
validateNodeBean(nodeBean.getNodes().get(3), EMBEDDED_FLOW1, Constants.FLOW_NODE_TYPE,
FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE, 4, null);
// Verify nodes in embedded_flow1 are loaded correctly.
final NodeBean embeddedNodeBean1 = nodeBean.getNodes().get(3);
validateNodeBean(embeddedNodeBean1.getNodes().get(0), SHELL_END, TYPE_NOOP, null, null, 0,
Arrays.asList(SHELL_ECHO, EMBEDDED_FLOW2));
validateNodeBean(embeddedNodeBean1.getNodes().get(1), SHELL_ECHO, TYPE_COMMAND, TYPE_COMMAND,
ECHO_COMMAND_1, 0, null);
validateNodeBean(embeddedNodeBean1.getNodes().get(2), EMBEDDED_FLOW2, Constants.FLOW_NODE_TYPE,
FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE, 2, Arrays.asList(SHELL_BASH));
validateNodeBean(embeddedNodeBean1.getNodes().get(3), SHELL_BASH, TYPE_COMMAND, TYPE_COMMAND,
BASH_COMMAND, 0, null);
// Verify nodes in embedded_flow2 are loaded correctly.
validateNodeBean(embeddedNodeBean1.getNodes().get(2).getNodes().get(0), SHELL_END, TYPE_NOOP,
null,
null, 0, Arrays.asList(SHELL_PWD));
validateNodeBean(embeddedNodeBean1.getNodes().get(2).getNodes().get(1), SHELL_PWD, TYPE_COMMAND,
TYPE_COMMAND, PWD_COMMAND, 0, null);
}
@Test
public void testLoadNodeBeanForFlowTrigger() throws Exception {
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
final Map<String, String> schedule = ImmutableMap.of(Constants.SCHEDULE_TYPE, Constants
.CRON_SCHEDULE_TYPE, Constants.SCHEDULE_VALUE, CRON_EXPRESSION);
validateFlowTriggerBean(nodeBean.getTrigger(), MAX_WAIT_MINS, schedule, 2);
final List<TriggerDependencyBean> triggerDependencyBeans = nodeBean.getTrigger()
.getTriggerDependencies();
validateTriggerDependencyBean(triggerDependencyBeans.get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
PARAMS_1);
validateTriggerDependencyBean(triggerDependencyBeans.get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
PARAMS_2);
}
@Test
public void testToBasicAzkabanFlow() throws Exception {
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
final Props props = new Props();
props.put(Constants.NODE_TYPE, Constants.FLOW_NODE_TYPE);
props.put(FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE);
validateAzkabanNode(flow, BASIC_FLOW_NAME, Constants.FLOW_NODE_TYPE, props, Arrays.asList
(SHELL_END, SHELL_PWD, SHELL_ECHO, SHELL_BASH), null);
final Props props1 = new Props();
props1.put(Constants.NODE_TYPE, TYPE_NOOP);
validateAzkabanNode(flow.getNode(SHELL_END), SHELL_END, TYPE_NOOP, props1, null,
Arrays.asList(SHELL_PWD, SHELL_ECHO, SHELL_BASH));
final Props props2 = new Props();
props2.put(Constants.NODE_TYPE, TYPE_COMMAND);
props2.put(TYPE_COMMAND, ECHO_COMMAND);
validateAzkabanNode(flow.getNode(SHELL_ECHO), SHELL_ECHO, TYPE_COMMAND, props2, null,
null);
}
@Test
public void testToEmbeddedAzkabanFlow() throws Exception {
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
final Props props = new Props();
props.put(Constants.NODE_TYPE, Constants.FLOW_NODE_TYPE);
props.put(FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE);
validateAzkabanNode(flow, EMBEDDED_FLOW_NAME, Constants.FLOW_NODE_TYPE, props, Arrays.asList
(SHELL_END, SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1), null);
final Props props1 = new Props();
props1.put(Constants.NODE_TYPE, TYPE_NOOP);
validateAzkabanNode(flow.getNode(SHELL_END), SHELL_END, TYPE_NOOP, props1, null,
Arrays.asList(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1));
final Props props2 = new Props();
props2.put(Constants.NODE_TYPE, Constants.FLOW_NODE_TYPE);
props2.put(FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE);
final AzkabanFlow embeddedFlow1 = (AzkabanFlow) flow.getNode(EMBEDDED_FLOW1);
validateAzkabanNode(embeddedFlow1, EMBEDDED_FLOW1, Constants.FLOW_NODE_TYPE, props2,
Arrays.asList
(SHELL_END, SHELL_BASH, SHELL_ECHO, EMBEDDED_FLOW2), null);
final AzkabanFlow embeddedFlow2 = (AzkabanFlow) embeddedFlow1.getNode(EMBEDDED_FLOW2);
validateAzkabanNode(embeddedFlow2, EMBEDDED_FLOW2, Constants.FLOW_NODE_TYPE,
props2, Arrays.asList(SHELL_END, SHELL_PWD), Arrays.asList(SHELL_BASH));
}
@Test
public void testToFlowTrigger() throws Exception {
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
final FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
validateFlowTrigger(flowTrigger, MAX_WAIT_MINS, CRON_EXPRESSION, 2);
validateTriggerDependency(flowTrigger.getDependencies().get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
PARAMS_1);
validateTriggerDependency(flowTrigger.getDependencies().get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
PARAMS_2);
}
@Test
public void testToAzkabanFlowWithFlowTrigger() throws Exception {
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
validateFlowTrigger(flow.getFlowTrigger(), MAX_WAIT_MINS, CRON_EXPRESSION, 2);
validateTriggerDependency(flow.getFlowTrigger().getDependencies().get(0), TRIGGER_NAME_1,
TRIGGER_TYPE,
PARAMS_1);
validateTriggerDependency(flow.getFlowTrigger().getDependencies().get(1), TRIGGER_NAME_2,
TRIGGER_TYPE,
PARAMS_2);
}
@Test
public void testGetFlowName() {
assertThat(new NodeBeanLoader().getFlowName(ExecutionsTestUtil.getFlowFile(
BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE))).isEqualTo(BASIC_FLOW_NAME);
}
@Test
public void testGetFlowProps() {
final Props flowProps = FlowLoaderUtils.getPropsFromYamlFile(BASIC_FLOW_NAME,
ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
assertThat(flowProps.size()).isEqualTo(2);
assertThat(flowProps.get(Constants.NODE_TYPE)).isEqualTo(Constants.FLOW_NODE_TYPE);
assertThat(flowProps.get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
}
@Test
public void testGetJobPropsFromBasicFlow() {
final Props jobProps = FlowLoaderUtils
.getPropsFromYamlFile(BASIC_FLOW_NAME + Constants.PATH_DELIMITER + SHELL_ECHO,
ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
assertThat(jobProps.size()).isEqualTo(2);
assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
}
@Test
public void testGetJobPropsWithInvalidPath() {
final Props jobProps = FlowLoaderUtils
.getPropsFromYamlFile(BASIC_FLOW_NAME + Constants.PATH_DELIMITER + EMBEDDED_FLOW_NAME,
ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
assertThat(jobProps).isNull();
}
@Test
public void testGetJobPropsFromEmbeddedFlow() {
// Get job props from parent flow
String jobPrefix = EMBEDDED_FLOW_NAME + Constants.PATH_DELIMITER;
Props jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_ECHO,
ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
assertThat(jobProps.size()).isEqualTo(2);
assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
// Get job props from first level embedded flow
jobPrefix = jobPrefix + EMBEDDED_FLOW1 + Constants.PATH_DELIMITER;
jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_ECHO,
ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
assertThat(jobProps.size()).isEqualTo(2);
assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND_1);
// Get job props from second level embedded flow
jobPrefix = jobPrefix + EMBEDDED_FLOW2 + Constants.PATH_DELIMITER;
jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_PWD,
ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
assertThat(jobProps.size()).isEqualTo(2);
assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(PWD_COMMAND);
}
@Test
public void testSetJobPropsInBasicFlow() throws Exception {
final String path = BASIC_FLOW_NAME + Constants.PATH_DELIMITER + SHELL_ECHO;
final Props overrideProps = new Props();
overrideProps.put(Constants.NODE_TYPE, TYPE_COMMAND);
overrideProps.put(TYPE_COMMAND, ECHO_OVERRIDE);
final File newFile = new File(ExecutionsTestUtil.getDataRootDir(), BASIC_FLOW_YML_FILE);
newFile.deleteOnExit();
FileUtils.copyFile(ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE),
newFile);
FlowLoaderUtils.setPropsInYamlFile(path, newFile, overrideProps);
assertThat(FlowLoaderUtils.getPropsFromYamlFile(path, newFile)).isEqualTo(overrideProps);
}
@Test
public void testSetJobPropsInEmbeddedFlow() throws Exception {
final String path = EMBEDDED_FLOW_NAME + Constants.PATH_DELIMITER + EMBEDDED_FLOW1 +
Constants.PATH_DELIMITER + EMBEDDED_FLOW2 + Constants.PATH_DELIMITER + SHELL_END;
final Props overrideProps = new Props();
overrideProps.put(Constants.NODE_TYPE, TYPE_COMMAND);
overrideProps.put(TYPE_COMMAND, ECHO_OVERRIDE);
final File newFile = new File(ExecutionsTestUtil.getDataRootDir(), EMBEDDED_FLOW_YML_FILE);
newFile.deleteOnExit();
FileUtils.copyFile(
ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE),
newFile);
FlowLoaderUtils.setPropsInYamlFile(path, newFile, overrideProps);
assertThat(FlowLoaderUtils.getPropsFromYamlFile(path, newFile)).isEqualTo(overrideProps);
}
@Test
public void testGetFlowTrigger() {
final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(
ExecutionsTestUtil.getFlowFile(TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
validateFlowTrigger(flowTrigger, MAX_WAIT_MINS, CRON_EXPRESSION, 2);
validateTriggerDependency(flowTrigger.getDependencies().get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
PARAMS_1);
validateTriggerDependency(flowTrigger.getDependencies().get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
PARAMS_2);
}
private void validateNodeBean(final NodeBean nodeBean, final String name, final String type,
final String configKey,
final String configValue, final int numNodes, final List<String> dependsOn) {
assertThat(nodeBean.getName()).isEqualTo(name);
assertThat(nodeBean.getType()).isEqualTo(type);
if (configKey != null) {
assertThat(nodeBean.getConfig().get(configKey)).isEqualTo(configValue);
}
if (numNodes != 0) {
assertThat(nodeBean.getNodes().size()).isEqualTo(numNodes);
}
if (dependsOn != null) {
assertThat(nodeBean.getDependsOn()).containsOnlyElementsOf(dependsOn);
}
}
private void validateAzkabanNode(final AzkabanNode azkabanNode, final String name,
final String type, final Props props, final List<String> nodeList, final List<String>
dependsOn) {
assertThat(azkabanNode.getName()).isEqualTo(name);
assertThat(azkabanNode.getType()).isEqualTo(type);
assertThat(azkabanNode.getProps()).isEqualTo(props);
if (azkabanNode instanceof AzkabanFlow) {
assertThat(((AzkabanFlow) azkabanNode).getNodes().keySet()).containsOnlyElementsOf(nodeList);
}
if (dependsOn != null) {
assertThat(azkabanNode.getDependsOn()).containsOnlyElementsOf(dependsOn);
}
}
private void validateFlowTriggerBean(final FlowTriggerBean flowTriggerBean, final int
maxWaitMins, final Map<String, String> schedule, final int numDependencies) {
assertThat(flowTriggerBean.getMaxWaitMins()).isEqualTo(maxWaitMins);
assertThat(flowTriggerBean.getSchedule()).isEqualTo(schedule);
assertThat(flowTriggerBean.getTriggerDependencies().size()).isEqualTo(numDependencies);
}
private void validateTriggerDependencyBean(final TriggerDependencyBean triggerDependencyBean,
final String name, final String type, final Map<String, String> params) {
assertThat(triggerDependencyBean.getName()).isEqualTo(name);
assertThat(triggerDependencyBean.getType()).isEqualTo(type);
assertThat(triggerDependencyBean.getParams()).isEqualTo(params);
}
private void validateFlowTrigger(final FlowTrigger flowTrigger, final long maxWaitMins, final
String cronExpression, final int numDependencies) {
assertThat(flowTrigger.getMaxWaitDuration()).isEqualTo(Duration.ofMinutes(maxWaitMins));
assertThat(flowTrigger.getSchedule().getCronExpression()).isEqualTo(cronExpression);
assertThat(flowTrigger.getDependencies().size()).isEqualTo(numDependencies);
}
private void validateTriggerDependency(final FlowTriggerDependency flowTriggerDependency, final
String name, final String type, final Map<String, String> params) {
assertThat(flowTriggerDependency.getName()).isEqualTo(name);
assertThat(flowTriggerDependency.getType()).isEqualTo(type);
assertThat(flowTriggerDependency.getProps()).isEqualTo(params);
}
}