Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 4424c15..3616edd 100644
@@ -42,6 +42,9 @@ public class Constants {
// Flow 2.0 node type
public static final String FLOW_NODE_TYPE = "flow";
+ // Flow 2.0 flow and job path delimiter
+ public static final String PATH_DELIMITER = ":";
+
// Names and paths of various file names to configure Azkaban
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index 0594717..c1775d9 100644
@@ -47,6 +47,7 @@ public class Flow {
private Map<String, Object> metadata = new HashMap<>();
private boolean isLayedOut = false;
+ private boolean isEmbeddedFlow = false;
public Flow(final String id) {
this.id = id;
@@ -57,10 +58,16 @@ public class Flow {
final String id = (String) flowObject.get("id");
final Boolean layedout = (Boolean) flowObject.get("layedout");
+ final Boolean isEmbeddedFlow = (Boolean) flowObject.get("embeddedFlow");
final Flow flow = new Flow(id);
if (layedout != null) {
flow.setLayedOut(layedout);
}
+
+ if (isEmbeddedFlow != null) {
+ flow.setEmbeddedFlow(isEmbeddedFlow);
+ }
+
final int projId = (Integer) flowObject.get("project.id");
flow.setProjectId(projId);
@@ -320,6 +327,7 @@ public class Flow {
flowObj.put("success.email", this.successEmail);
flowObj.put("mailCreator", this.mailCreator);
flowObj.put("layedout", this.isLayedOut);
+ flowObj.put("embeddedFlow", this.isEmbeddedFlow);
if (this.errors != null) {
flowObj.put("errors", this.errors);
}
@@ -369,6 +377,14 @@ public class Flow {
this.isLayedOut = layedOut;
}
+ public boolean isEmbeddedFlow() {
+ return this.isEmbeddedFlow;
+ }
+
+ public void setEmbeddedFlow(final boolean embeddedFlow) {
+ this.isEmbeddedFlow = embeddedFlow;
+ }
+
public Map<String, Object> getMetadata() {
if (this.metadata == null) {
this.metadata = new HashMap<>();
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 6431fc9..93ae347 100644
@@ -19,10 +19,12 @@ package azkaban.project;
import azkaban.Constants;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
+import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
+import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
@@ -100,13 +102,12 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
private void convertYamlFiles(final File projectDir) {
// Todo jamiesjc: convert project yaml file.
- final File[] flowFiles = projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX));
- for (final File file : flowFiles) {
+ for (final File file : projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX))) {
final NodeBeanLoader loader = new NodeBeanLoader();
try {
final NodeBean nodeBean = loader.load(file);
final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
- final Flow flow = convertAzkabanFlowToFlow(azkabanFlow);
+ final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
this.flowMap.put(flow.getId(), flow);
} catch (final FileNotFoundException e) {
logger.error("Error loading flow yaml files", e);
@@ -114,18 +115,23 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
}
}
- private Flow convertAzkabanFlowToFlow(final AzkabanFlow azkabanFlow) {
- final Flow flow = new Flow(azkabanFlow.getName());
+ private Flow convertAzkabanFlowToFlow(final AzkabanFlow azkabanFlow, final String flowName,
+ final File flowFile) {
+ final Flow flow = new Flow(flowName);
final Props props = azkabanFlow.getProps();
FlowLoaderUtils.addEmailPropsToFlow(flow, props);
+ props.setSource(flowFile.getName());
+
+ flow.addAllFlowProperties(ImmutableList.of(new FlowProps(props)));
// Convert azkabanNodes to nodes inside the flow.
- azkabanFlow.getNodes().values().stream().map(this::convertAzkabanNodeToNode)
+ azkabanFlow.getNodes().values().stream()
+ .map(n -> convertAzkabanNodeToNode(n, flowName, flowFile))
.forEach(n -> flow.addNode(n));
// Add edges for the flow.
- buildFlowEdges(azkabanFlow);
- flow.addAllEdges(this.edgeMap.get(flow.getId()));
+ buildFlowEdges(azkabanFlow, flowName);
+ flow.addAllEdges(this.edgeMap.get(flowName));
// Todo jamiesjc: deprecate startNodes, endNodes and numLevels, and remove below method finally.
// Blow method will construct startNodes, endNodes and numLevels for the flow.
@@ -134,40 +140,46 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
return flow;
}
- private Node convertAzkabanNodeToNode(final AzkabanNode azkabanNode) {
+ private Node convertAzkabanNodeToNode(final AzkabanNode azkabanNode, final String flowName,
+ final File flowFile) {
final Node node = new Node(azkabanNode.getName());
node.setType(azkabanNode.getType());
+ node.setPropsSource(flowFile.getName());
+ node.setJobSource(flowFile.getName());
if (azkabanNode.getType().equals(Constants.FLOW_NODE_TYPE)) {
- node.setEmbeddedFlowId(azkabanNode.getName());
- final Flow flow_node = convertAzkabanFlowToFlow((AzkabanFlow) azkabanNode);
- this.flowMap.put(flow_node.getId(), flow_node);
+ final String embeddedFlowId = flowName + Constants.PATH_DELIMITER + node.getId();
+ node.setEmbeddedFlowId(embeddedFlowId);
+ final Flow flowNode = convertAzkabanFlowToFlow((AzkabanFlow) azkabanNode, embeddedFlowId,
+ flowFile);
+ flowNode.setEmbeddedFlow(true);
+ this.flowMap.put(flowNode.getId(), flowNode);
}
return node;
}
- private void buildFlowEdges(final AzkabanFlow azkabanFlow) {
+ private void buildFlowEdges(final AzkabanFlow azkabanFlow, final String flowName) {
// Recursive stack to record searched nodes. Used for detecting dependency cycles.
final HashSet<String> recStack = new HashSet<>();
// Nodes that have already been visited and added edges.
final HashSet<String> visited = new HashSet<>();
for (final AzkabanNode node : azkabanFlow.getNodes().values()) {
- addEdges(node, azkabanFlow, recStack, visited);
+ addEdges(node, azkabanFlow, flowName, recStack, visited);
}
}
private void addEdges(final AzkabanNode node, final AzkabanFlow azkabanFlow,
- final HashSet<String> recStack, final HashSet<String> visited) {
+ final String flowName, final HashSet<String> recStack, final HashSet<String> visited) {
if (!visited.contains(node.getName())) {
recStack.add(node.getName());
visited.add(node.getName());
final List<String> dependsOnList = node.getDependsOn();
for (final String parent : dependsOnList) {
final Edge edge = new Edge(parent, node.getName());
- if (!this.edgeMap.containsKey(azkabanFlow.getName())) {
- this.edgeMap.put(azkabanFlow.getName(), new ArrayList<>());
+ if (!this.edgeMap.containsKey(flowName)) {
+ this.edgeMap.put(flowName, new ArrayList<>());
}
- this.edgeMap.get(azkabanFlow.getName()).add(edge);
+ this.edgeMap.get(flowName).add(edge);
if (recStack.contains(parent)) {
// Cycles found, including self cycle.
@@ -180,7 +192,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
+ parent);
} else {
// Valid edge. Continue to process the parent node recursively.
- addEdges(azkabanFlow.getNode(parent), azkabanFlow, recStack, visited);
+ addEdges(azkabanFlow.getNode(parent), azkabanFlow, flowName, recStack, visited);
}
}
recStack.remove(node.getName());
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index 207e06f..390888f 100644
@@ -15,22 +15,82 @@
*/
package azkaban.project;
+import azkaban.Constants;
import azkaban.flow.CommonJobProperties;
import azkaban.flow.Flow;
import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
import java.io.File;
import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utils to help load flows.
*/
public class FlowLoaderUtils {
+ private static final Logger logger = LoggerFactory.getLogger(FlowLoaderUtils.class);
+
+ /**
+ * Gets flow or job props from flow yaml file.
+ *
+ * @param path the flow or job path delimited by ":", e.g. "flow:subflow1:subflow2:job3"
+ * @param flowFile the flow yaml file
+ * @return the props from yaml file
+ */
+ public static Props getPropsFromYamlFile(final String path, final File flowFile) {
+ final List<Props> propsList = new ArrayList<>();
+ final NodeBeanLoader loader = new NodeBeanLoader();
+
+ try {
+ final NodeBean nodeBean = loader.load(flowFile);
+ final String[] pathList = path.split(Constants.PATH_DELIMITER);
+ if (findPropsFromNodeBean(loader, nodeBean, pathList, 0, propsList)) {
+ if (!propsList.isEmpty()) {
+ return propsList.get(0);
+ } else {
+ logger.error("Error getting props for " + path);
+ }
+ }
+ } catch (final FileNotFoundException e) {
+ logger.error("Failed to get props, error loading flow YAML file " + flowFile);
+ }
+ return null;
+ }
+
+ /**
+ * Helper method to recursively find props from node bean.
+ *
+ * @param loader the loader
+ * @param nodeBean the node bean
+ * @param pathList the path list
+ * @param idx the idx
+ * @param propsList the props list
+ * @return the boolean
+ */
+ public static boolean findPropsFromNodeBean(final NodeBeanLoader loader, final NodeBean nodeBean,
+ final String[] pathList, final int idx, final List<Props> propsList) {
+ if (idx < pathList.length && nodeBean.getName().equals(pathList[idx])) {
+ if (idx == pathList.length - 1) {
+ propsList.add(loader.getNodeProps(nodeBean));
+ return true;
+ }
+ for (final NodeBean bean : nodeBean.getNodes()) {
+ if (findPropsFromNodeBean(loader, bean, pathList, idx + 1, propsList)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
/**
* Adds email properties to a flow.
*
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 4f6bfc6..00443aa 100644
@@ -85,6 +85,10 @@ public class NodeBeanLoader {
}
}
+ public Props getNodeProps(final NodeBean nodeBean) {
+ return new Props(null, nodeBean.getConfig());
+ }
+
public String getFlowName(final File flowFile) {
checkArgument(flowFile.exists());
checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 2669462..4766606 100644
@@ -18,6 +18,7 @@ package azkaban.project;
import static org.assertj.core.api.Assertions.assertThat;
+import azkaban.Constants;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.test.executions.ExecutionsTestUtil;
@@ -35,13 +36,23 @@ public class DirectoryYamlFlowLoaderTest {
private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowyamltest";
+ private static final String MULTIPLE_EMBEDDED_FLOW_YAML_DIR = "multipleembeddedflowyamltest";
private static final String INVALID_FLOW_YAML_DIR = "invalidflowyamltest";
private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
private static final String BASIC_FLOW_1 = "basic_flow";
private static final String BASIC_FLOW_2 = "basic_flow2";
private static final String EMBEDDED_FLOW = "embedded_flow";
- private static final String EMBEDDED_FLOW_1 = "embedded_flow1";
- private static final String EMBEDDED_FLOW_2 = "embedded_flow2";
+ private static final String EMBEDDED_FLOW_1 = "embedded_flow" + Constants.PATH_DELIMITER +
+ "embedded_flow1";
+ private static final String EMBEDDED_FLOW_2 =
+ "embedded_flow" + Constants.PATH_DELIMITER + "embedded_flow1" + Constants.PATH_DELIMITER
+ + "embedded_flow2";
+ private static final String EMBEDDED_FLOW_B = "embedded_flow_b";
+ private static final String EMBEDDED_FLOW_B1 =
+ "embedded_flow_b" + Constants.PATH_DELIMITER + "embedded_flow1";
+ private static final String EMBEDDED_FLOW_B2 =
+ "embedded_flow_b" + Constants.PATH_DELIMITER + "embedded_flow1" + Constants.PATH_DELIMITER
+ + "embedded_flow2";
private static final String INVALID_FLOW_1 = "dependency_not_found";
private static final String INVALID_FLOW_2 = "cycle_found";
private static final String DEPENDENCY_NOT_FOUND_ERROR = "Dependency not found.";
@@ -58,7 +69,7 @@ public class DirectoryYamlFlowLoaderTest {
final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(BASIC_FLOW_YAML_DIR));
checkFlowLoaderProperties(loader, 0, 1, 1);
- checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 3, null);
+ checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 1, 3, null);
}
@Test
@@ -66,8 +77,8 @@ public class DirectoryYamlFlowLoaderTest {
final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(MULTIPLE_FLOW_YAML_DIR));
checkFlowLoaderProperties(loader, 0, 2, 2);
- checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 3, null);
- checkFlowProperties(loader, BASIC_FLOW_2, 0, 3, 2, null);
+ checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 1, 3, null);
+ checkFlowProperties(loader, BASIC_FLOW_2, 0, 3, 1, 2, null);
}
@Test
@@ -75,9 +86,23 @@ public class DirectoryYamlFlowLoaderTest {
final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(EMBEDDED_FLOW_YAML_DIR));
checkFlowLoaderProperties(loader, 0, 3, 3);
- checkFlowProperties(loader, EMBEDDED_FLOW, 0, 4, 3, null);
- checkFlowProperties(loader, EMBEDDED_FLOW_1, 0, 4, 3, null);
- checkFlowProperties(loader, EMBEDDED_FLOW_2, 0, 2, 1, null);
+ checkFlowProperties(loader, EMBEDDED_FLOW, 0, 4, 1, 3, null);
+ checkFlowProperties(loader, EMBEDDED_FLOW_1, 0, 4, 1, 3, null);
+ checkFlowProperties(loader, EMBEDDED_FLOW_2, 0, 2, 1, 1, null);
+ }
+
+ @Test
+ public void testLoadMultipleEmbeddedFlowYamlFiles() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project,
+ ExecutionsTestUtil.getFlowDir(MULTIPLE_EMBEDDED_FLOW_YAML_DIR));
+ checkFlowLoaderProperties(loader, 0, 6, 6);
+ checkFlowProperties(loader, EMBEDDED_FLOW, 0, 4, 1, 3, null);
+ checkFlowProperties(loader, EMBEDDED_FLOW_1, 0, 4, 1, 3, null);
+ checkFlowProperties(loader, EMBEDDED_FLOW_2, 0, 2, 1, 1, null);
+ checkFlowProperties(loader, EMBEDDED_FLOW_B, 0, 4, 1, 3, null);
+ checkFlowProperties(loader, EMBEDDED_FLOW_B1, 0, 4, 1, 3, null);
+ checkFlowProperties(loader, EMBEDDED_FLOW_B2, 0, 2, 1, 1, null);
}
@Test
@@ -86,9 +111,9 @@ public class DirectoryYamlFlowLoaderTest {
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_FLOW_YAML_DIR));
checkFlowLoaderProperties(loader, 2, 2, 2);
// Invalid flow 1: Dependency not found.
- checkFlowProperties(loader, INVALID_FLOW_1, 1, 3, 3, DEPENDENCY_NOT_FOUND_ERROR);
+ checkFlowProperties(loader, INVALID_FLOW_1, 1, 3, 1, 3, DEPENDENCY_NOT_FOUND_ERROR);
// Invalid flow 2: Cycles found.
- checkFlowProperties(loader, INVALID_FLOW_2, 1, 4, 4, CYCLE_FOUND_ERROR);
+ checkFlowProperties(loader, INVALID_FLOW_2, 1, 4, 1, 4, CYCLE_FOUND_ERROR);
}
@Test
@@ -106,13 +131,15 @@ public class DirectoryYamlFlowLoaderTest {
}
private void checkFlowProperties(final DirectoryYamlFlowLoader loader, final String flowName,
- final int numError, final int numNode, final int numEdge, final String edgeError) {
+ final int numError, final int numNode, final int numFlowProps, final int numEdge, final
+ String edgeError) {
assertThat(loader.getFlowMap().containsKey(flowName)).isTrue();
final Flow flow = loader.getFlowMap().get(flowName);
if (numError != 0) {
assertThat(flow.getErrors().size()).isEqualTo(numError);
}
assertThat(flow.getNodes().size()).isEqualTo(numNode);
+ assertThat(flow.getAllFlowProps().size()).isEqualTo(numFlowProps);
// Verify flow edges
assertThat(loader.getEdgeMap().get(flowName).size()).isEqualTo(numEdge);
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index 157c5d1..ba64698 100644
@@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import azkaban.Constants;
import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
import org.junit.Test;
public class NodeBeanLoaderTest {
@@ -38,6 +39,8 @@ public class NodeBeanLoaderTest {
private static final String SHELL_BASH = "shell_bash";
private static final String SHELL_PWD = "shell_pwd";
private static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
+ private static final String ECHO_COMMAND_1 = "echo \"This is an echoed text from embedded_flow1.\"";
+ private static final String PWD_COMMAND = "pwd";
private static final String EMBEDDED_FLOW1 = "embedded_flow1";
private static final String EMBEDDED_FLOW2 = "embedded_flow2";
private static final String TYPE_NOOP = "noop";
@@ -177,8 +180,57 @@ public class NodeBeanLoaderTest {
}
@Test
- public void testGetFlowName() throws Exception {
+ public void testGetFlowName() {
assertThat(new NodeBeanLoader().getFlowName(ExecutionsTestUtil.getFlowFile(
BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE))).isEqualTo(BASIC_FLOW_NAME);
}
+
+ @Test
+ public void testGetFlowProps() {
+ final Props flowProps = FlowLoaderUtils.getPropsFromYamlFile(BASIC_FLOW_NAME,
+ ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
+ assertThat(flowProps.size()).isEqualTo(1);
+ assertThat(flowProps.get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+ }
+
+ @Test
+ public void testGetJobPropsFromBasicFlow() {
+ final Props jobProps = FlowLoaderUtils
+ .getPropsFromYamlFile(BASIC_FLOW_NAME + Constants.PATH_DELIMITER + SHELL_ECHO,
+ ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
+ assertThat(jobProps.size()).isEqualTo(1);
+ assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+ }
+
+ @Test
+ public void testGetJobPropsWithInvalidPath() {
+ final Props jobProps = FlowLoaderUtils
+ .getPropsFromYamlFile(BASIC_FLOW_NAME + Constants.PATH_DELIMITER + EMBEDDED_FLOW_NAME,
+ ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
+ assertThat(jobProps).isNull();
+ }
+
+ @Test
+ public void testGetJobPropsFromEmbeddedFlow() {
+ // Get job props from parent flow
+ String jobPrefix = EMBEDDED_FLOW_NAME + Constants.PATH_DELIMITER;
+ Props jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_ECHO,
+ ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
+ assertThat(jobProps.size()).isEqualTo(1);
+ assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+
+ // Get job props from first level embedded flow
+ jobPrefix = jobPrefix + EMBEDDED_FLOW1 + Constants.PATH_DELIMITER;
+ jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_ECHO,
+ ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
+ assertThat(jobProps.size()).isEqualTo(1);
+ assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND_1);
+
+ // Get job props from second level embedded flow
+ jobPrefix = jobPrefix + EMBEDDED_FLOW2 + Constants.PATH_DELIMITER;
+ jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_PWD,
+ ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
+ assertThat(jobProps.size()).isEqualTo(1);
+ assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(PWD_COMMAND);
+ }
}
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index acaed57..9bb3374 100644
@@ -67,6 +67,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -757,9 +758,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
final ArrayList<Map<String, Object>> flowList =
new ArrayList<>();
for (final Flow flow : project.getFlows()) {
- final HashMap<String, Object> flowObj = new HashMap<>();
- flowObj.put("flowId", flow.getId());
- flowList.add(flowObj);
+ if (!flow.isEmbeddedFlow()) {
+ final HashMap<String, Object> flowObj = new HashMap<>();
+ flowObj.put("flowId", flow.getId());
+ flowList.add(flowObj);
+ }
}
ret.put("flows", flowList);
@@ -1585,7 +1588,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
page.add("exec", false);
}
- final List<Flow> flows = project.getFlows();
+ final List<Flow> flows = project.getFlows().stream().filter(flow -> !flow.isEmbeddedFlow())
+ .collect(Collectors.toList());
+
if (!flows.isEmpty()) {
Collections.sort(flows, FLOW_ID_COMPARATOR);
page.add("flows", flows);
diff --git a/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow.flow b/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow.flow
new file mode 100644
index 0000000..855ff3c
@@ -0,0 +1,61 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - embedded_flow1
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: embedded_flow1
+ type: flow
+ config:
+ flow-level-parameter: value
+
+ nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_echo
+ - embedded_flow2
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text from embedded_flow1."
+
+ - name: embedded_flow2
+ type: flow
+ config:
+ flow-level-parameter: value
+ dependsOn:
+ - shell_bash
+
+ nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_pwd
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ type: command
+ config:
+ command: bash ./sample_script.sh
diff --git a/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow.project b/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow.project
new file mode 100644
index 0000000..4929753
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow_b.flow b/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow_b.flow
new file mode 100644
index 0000000..855ff3c
@@ -0,0 +1,61 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - embedded_flow1
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: embedded_flow1
+ type: flow
+ config:
+ flow-level-parameter: value
+
+ nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_echo
+ - embedded_flow2
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text from embedded_flow1."
+
+ - name: embedded_flow2
+ type: flow
+ config:
+ flow-level-parameter: value
+ dependsOn:
+ - shell_bash
+
+ nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_pwd
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ type: command
+ config:
+ command: bash ./sample_script.sh