azkaban-aplcache

Flow 2.0 design - Get props from flow YAML file. (#1549) * Get

11/7/2017 9:18:18 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 4424c15..3616edd 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -42,6 +42,9 @@ public class Constants {
   // Flow 2.0 node type
   public static final String FLOW_NODE_TYPE = "flow";
 
+  // Flow 2.0 flow and job path delimiter
+  public static final String PATH_DELIMITER = ":";
+
   // Names and paths of various file names to configure Azkaban
   public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
   public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index 0594717..c1775d9 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -47,6 +47,7 @@ public class Flow {
   private Map<String, Object> metadata = new HashMap<>();
 
   private boolean isLayedOut = false;
+  private boolean isEmbeddedFlow = false;
 
   public Flow(final String id) {
     this.id = id;
@@ -57,10 +58,16 @@ public class Flow {
 
     final String id = (String) flowObject.get("id");
     final Boolean layedout = (Boolean) flowObject.get("layedout");
+    final Boolean isEmbeddedFlow = (Boolean) flowObject.get("embeddedFlow");
     final Flow flow = new Flow(id);
     if (layedout != null) {
       flow.setLayedOut(layedout);
     }
+
+    if (isEmbeddedFlow != null) {
+      flow.setEmbeddedFlow(isEmbeddedFlow);
+    }
+
     final int projId = (Integer) flowObject.get("project.id");
     flow.setProjectId(projId);
 
@@ -320,6 +327,7 @@ public class Flow {
     flowObj.put("success.email", this.successEmail);
     flowObj.put("mailCreator", this.mailCreator);
     flowObj.put("layedout", this.isLayedOut);
+    flowObj.put("embeddedFlow", this.isEmbeddedFlow);
     if (this.errors != null) {
       flowObj.put("errors", this.errors);
     }
@@ -369,6 +377,14 @@ public class Flow {
     this.isLayedOut = layedOut;
   }
 
+  public boolean isEmbeddedFlow() {
+    return this.isEmbeddedFlow;
+  }
+
+  public void setEmbeddedFlow(final boolean embeddedFlow) {
+    this.isEmbeddedFlow = embeddedFlow;
+  }
+
   public Map<String, Object> getMetadata() {
     if (this.metadata == null) {
       this.metadata = new HashMap<>();
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 6431fc9..93ae347 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -19,10 +19,12 @@ package azkaban.project;
 import azkaban.Constants;
 import azkaban.flow.Edge;
 import azkaban.flow.Flow;
+import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
 import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
+import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.util.ArrayList;
@@ -100,13 +102,12 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
   private void convertYamlFiles(final File projectDir) {
     // Todo jamiesjc: convert project yaml file.
 
-    final File[] flowFiles = projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX));
-    for (final File file : flowFiles) {
+    for (final File file : projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX))) {
       final NodeBeanLoader loader = new NodeBeanLoader();
       try {
         final NodeBean nodeBean = loader.load(file);
         final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
-        final Flow flow = convertAzkabanFlowToFlow(azkabanFlow);
+        final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
         this.flowMap.put(flow.getId(), flow);
       } catch (final FileNotFoundException e) {
         logger.error("Error loading flow yaml files", e);
@@ -114,18 +115,23 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
     }
   }
 
-  private Flow convertAzkabanFlowToFlow(final AzkabanFlow azkabanFlow) {
-    final Flow flow = new Flow(azkabanFlow.getName());
+  private Flow convertAzkabanFlowToFlow(final AzkabanFlow azkabanFlow, final String flowName,
+      final File flowFile) {
+    final Flow flow = new Flow(flowName);
     final Props props = azkabanFlow.getProps();
     FlowLoaderUtils.addEmailPropsToFlow(flow, props);
+    props.setSource(flowFile.getName());
+
+    flow.addAllFlowProperties(ImmutableList.of(new FlowProps(props)));
 
     // Convert azkabanNodes to nodes inside the flow.
-    azkabanFlow.getNodes().values().stream().map(this::convertAzkabanNodeToNode)
+    azkabanFlow.getNodes().values().stream()
+        .map(n -> convertAzkabanNodeToNode(n, flowName, flowFile))
         .forEach(n -> flow.addNode(n));
 
     // Add edges for the flow.
-    buildFlowEdges(azkabanFlow);
-    flow.addAllEdges(this.edgeMap.get(flow.getId()));
+    buildFlowEdges(azkabanFlow, flowName);
+    flow.addAllEdges(this.edgeMap.get(flowName));
 
     // Todo jamiesjc: deprecate startNodes, endNodes and numLevels, and remove below method finally.
     // Blow method will construct startNodes, endNodes and numLevels for the flow.
@@ -134,40 +140,46 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
     return flow;
   }
 
-  private Node convertAzkabanNodeToNode(final AzkabanNode azkabanNode) {
+  private Node convertAzkabanNodeToNode(final AzkabanNode azkabanNode, final String flowName,
+      final File flowFile) {
     final Node node = new Node(azkabanNode.getName());
     node.setType(azkabanNode.getType());
+    node.setPropsSource(flowFile.getName());
+    node.setJobSource(flowFile.getName());
 
     if (azkabanNode.getType().equals(Constants.FLOW_NODE_TYPE)) {
-      node.setEmbeddedFlowId(azkabanNode.getName());
-      final Flow flow_node = convertAzkabanFlowToFlow((AzkabanFlow) azkabanNode);
-      this.flowMap.put(flow_node.getId(), flow_node);
+      final String embeddedFlowId = flowName + Constants.PATH_DELIMITER + node.getId();
+      node.setEmbeddedFlowId(embeddedFlowId);
+      final Flow flowNode = convertAzkabanFlowToFlow((AzkabanFlow) azkabanNode, embeddedFlowId,
+          flowFile);
+      flowNode.setEmbeddedFlow(true);
+      this.flowMap.put(flowNode.getId(), flowNode);
     }
     return node;
   }
 
-  private void buildFlowEdges(final AzkabanFlow azkabanFlow) {
+  private void buildFlowEdges(final AzkabanFlow azkabanFlow, final String flowName) {
     // Recursive stack to record searched nodes. Used for detecting dependency cycles.
     final HashSet<String> recStack = new HashSet<>();
     // Nodes that have already been visited and added edges.
     final HashSet<String> visited = new HashSet<>();
     for (final AzkabanNode node : azkabanFlow.getNodes().values()) {
-      addEdges(node, azkabanFlow, recStack, visited);
+      addEdges(node, azkabanFlow, flowName, recStack, visited);
     }
   }
 
   private void addEdges(final AzkabanNode node, final AzkabanFlow azkabanFlow,
-      final HashSet<String> recStack, final HashSet<String> visited) {
+      final String flowName, final HashSet<String> recStack, final HashSet<String> visited) {
     if (!visited.contains(node.getName())) {
       recStack.add(node.getName());
       visited.add(node.getName());
       final List<String> dependsOnList = node.getDependsOn();
       for (final String parent : dependsOnList) {
         final Edge edge = new Edge(parent, node.getName());
-        if (!this.edgeMap.containsKey(azkabanFlow.getName())) {
-          this.edgeMap.put(azkabanFlow.getName(), new ArrayList<>());
+        if (!this.edgeMap.containsKey(flowName)) {
+          this.edgeMap.put(flowName, new ArrayList<>());
         }
-        this.edgeMap.get(azkabanFlow.getName()).add(edge);
+        this.edgeMap.get(flowName).add(edge);
 
         if (recStack.contains(parent)) {
           // Cycles found, including self cycle.
@@ -180,7 +192,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
               + parent);
         } else {
           // Valid edge. Continue to process the parent node recursively.
-          addEdges(azkabanFlow.getNode(parent), azkabanFlow, recStack, visited);
+          addEdges(azkabanFlow.getNode(parent), azkabanFlow, flowName, recStack, visited);
         }
       }
       recStack.remove(node.getName());
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index 207e06f..390888f 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -15,22 +15,82 @@
 */
 package azkaban.project;
 
+import azkaban.Constants;
 import azkaban.flow.CommonJobProperties;
 import azkaban.flow.Flow;
 import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utils to help load flows.
  */
 public class FlowLoaderUtils {
 
+  private static final Logger logger = LoggerFactory.getLogger(FlowLoaderUtils.class);
+
+  /**
+   * Gets flow or job props from flow yaml file.
+   *
+   * @param path the flow or job path delimited by ":", e.g. "flow:subflow1:subflow2:job3"
+   * @param flowFile the flow yaml file
+   * @return the props from yaml file
+   */
+  public static Props getPropsFromYamlFile(final String path, final File flowFile) {
+    final List<Props> propsList = new ArrayList<>();
+    final NodeBeanLoader loader = new NodeBeanLoader();
+
+    try {
+      final NodeBean nodeBean = loader.load(flowFile);
+      final String[] pathList = path.split(Constants.PATH_DELIMITER);
+      if (findPropsFromNodeBean(loader, nodeBean, pathList, 0, propsList)) {
+        if (!propsList.isEmpty()) {
+          return propsList.get(0);
+        } else {
+          logger.error("Error getting props for " + path);
+        }
+      }
+    } catch (final FileNotFoundException e) {
+      logger.error("Failed to get props, error loading flow YAML file " + flowFile);
+    }
+    return null;
+  }
+
+  /**
+   * Helper method to recursively find props from node bean.
+   *
+   * @param loader the loader
+   * @param nodeBean the node bean
+   * @param pathList the path list
+   * @param idx the idx
+   * @param propsList the props list
+   * @return the boolean
+   */
+  public static boolean findPropsFromNodeBean(final NodeBeanLoader loader, final NodeBean nodeBean,
+      final String[] pathList, final int idx, final List<Props> propsList) {
+    if (idx < pathList.length && nodeBean.getName().equals(pathList[idx])) {
+      if (idx == pathList.length - 1) {
+        propsList.add(loader.getNodeProps(nodeBean));
+        return true;
+      }
+      for (final NodeBean bean : nodeBean.getNodes()) {
+        if (findPropsFromNodeBean(loader, bean, pathList, idx + 1, propsList)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
   /**
    * Adds email properties to a flow.
    *
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 4f6bfc6..00443aa 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -85,6 +85,10 @@ public class NodeBeanLoader {
     }
   }
 
+  public Props getNodeProps(final NodeBean nodeBean) {
+    return new Props(null, nodeBean.getConfig());
+  }
+
   public String getFlowName(final File flowFile) {
     checkArgument(flowFile.exists());
     checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 2669462..4766606 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -18,6 +18,7 @@ package azkaban.project;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import azkaban.Constants;
 import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.test.executions.ExecutionsTestUtil;
@@ -35,13 +36,23 @@ public class DirectoryYamlFlowLoaderTest {
   private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
   private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
   private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowyamltest";
+  private static final String MULTIPLE_EMBEDDED_FLOW_YAML_DIR = "multipleembeddedflowyamltest";
   private static final String INVALID_FLOW_YAML_DIR = "invalidflowyamltest";
   private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
   private static final String BASIC_FLOW_1 = "basic_flow";
   private static final String BASIC_FLOW_2 = "basic_flow2";
   private static final String EMBEDDED_FLOW = "embedded_flow";
-  private static final String EMBEDDED_FLOW_1 = "embedded_flow1";
-  private static final String EMBEDDED_FLOW_2 = "embedded_flow2";
+  private static final String EMBEDDED_FLOW_1 = "embedded_flow" + Constants.PATH_DELIMITER +
+      "embedded_flow1";
+  private static final String EMBEDDED_FLOW_2 =
+      "embedded_flow" + Constants.PATH_DELIMITER + "embedded_flow1" + Constants.PATH_DELIMITER
+          + "embedded_flow2";
+  private static final String EMBEDDED_FLOW_B = "embedded_flow_b";
+  private static final String EMBEDDED_FLOW_B1 =
+      "embedded_flow_b" + Constants.PATH_DELIMITER + "embedded_flow1";
+  private static final String EMBEDDED_FLOW_B2 =
+      "embedded_flow_b" + Constants.PATH_DELIMITER + "embedded_flow1" + Constants.PATH_DELIMITER
+          + "embedded_flow2";
   private static final String INVALID_FLOW_1 = "dependency_not_found";
   private static final String INVALID_FLOW_2 = "cycle_found";
   private static final String DEPENDENCY_NOT_FOUND_ERROR = "Dependency not found.";
@@ -58,7 +69,7 @@ public class DirectoryYamlFlowLoaderTest {
     final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(BASIC_FLOW_YAML_DIR));
     checkFlowLoaderProperties(loader, 0, 1, 1);
-    checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 3, null);
+    checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 1, 3, null);
   }
 
   @Test
@@ -66,8 +77,8 @@ public class DirectoryYamlFlowLoaderTest {
     final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(MULTIPLE_FLOW_YAML_DIR));
     checkFlowLoaderProperties(loader, 0, 2, 2);
-    checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 3, null);
-    checkFlowProperties(loader, BASIC_FLOW_2, 0, 3, 2, null);
+    checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 1, 3, null);
+    checkFlowProperties(loader, BASIC_FLOW_2, 0, 3, 1, 2, null);
   }
 
   @Test
@@ -75,9 +86,23 @@ public class DirectoryYamlFlowLoaderTest {
     final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(EMBEDDED_FLOW_YAML_DIR));
     checkFlowLoaderProperties(loader, 0, 3, 3);
-    checkFlowProperties(loader, EMBEDDED_FLOW, 0, 4, 3, null);
-    checkFlowProperties(loader, EMBEDDED_FLOW_1, 0, 4, 3, null);
-    checkFlowProperties(loader, EMBEDDED_FLOW_2, 0, 2, 1, null);
+    checkFlowProperties(loader, EMBEDDED_FLOW, 0, 4, 1, 3, null);
+    checkFlowProperties(loader, EMBEDDED_FLOW_1, 0, 4, 1, 3, null);
+    checkFlowProperties(loader, EMBEDDED_FLOW_2, 0, 2, 1, 1, null);
+  }
+
+  @Test
+  public void testLoadMultipleEmbeddedFlowYamlFiles() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project,
+        ExecutionsTestUtil.getFlowDir(MULTIPLE_EMBEDDED_FLOW_YAML_DIR));
+    checkFlowLoaderProperties(loader, 0, 6, 6);
+    checkFlowProperties(loader, EMBEDDED_FLOW, 0, 4, 1, 3, null);
+    checkFlowProperties(loader, EMBEDDED_FLOW_1, 0, 4, 1, 3, null);
+    checkFlowProperties(loader, EMBEDDED_FLOW_2, 0, 2, 1, 1, null);
+    checkFlowProperties(loader, EMBEDDED_FLOW_B, 0, 4, 1, 3, null);
+    checkFlowProperties(loader, EMBEDDED_FLOW_B1, 0, 4, 1, 3, null);
+    checkFlowProperties(loader, EMBEDDED_FLOW_B2, 0, 2, 1, 1, null);
   }
 
   @Test
@@ -86,9 +111,9 @@ public class DirectoryYamlFlowLoaderTest {
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_FLOW_YAML_DIR));
     checkFlowLoaderProperties(loader, 2, 2, 2);
     // Invalid flow 1: Dependency not found.
-    checkFlowProperties(loader, INVALID_FLOW_1, 1, 3, 3, DEPENDENCY_NOT_FOUND_ERROR);
+    checkFlowProperties(loader, INVALID_FLOW_1, 1, 3, 1, 3, DEPENDENCY_NOT_FOUND_ERROR);
     // Invalid flow 2: Cycles found.
-    checkFlowProperties(loader, INVALID_FLOW_2, 1, 4, 4, CYCLE_FOUND_ERROR);
+    checkFlowProperties(loader, INVALID_FLOW_2, 1, 4, 1, 4, CYCLE_FOUND_ERROR);
   }
 
   @Test
@@ -106,13 +131,15 @@ public class DirectoryYamlFlowLoaderTest {
   }
 
   private void checkFlowProperties(final DirectoryYamlFlowLoader loader, final String flowName,
-      final int numError, final int numNode, final int numEdge, final String edgeError) {
+      final int numError, final int numNode, final int numFlowProps, final int numEdge, final
+  String edgeError) {
     assertThat(loader.getFlowMap().containsKey(flowName)).isTrue();
     final Flow flow = loader.getFlowMap().get(flowName);
     if (numError != 0) {
       assertThat(flow.getErrors().size()).isEqualTo(numError);
     }
     assertThat(flow.getNodes().size()).isEqualTo(numNode);
+    assertThat(flow.getAllFlowProps().size()).isEqualTo(numFlowProps);
 
     // Verify flow edges
     assertThat(loader.getEdgeMap().get(flowName).size()).isEqualTo(numEdge);
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index 157c5d1..ba64698 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import azkaban.Constants;
 import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
 import org.junit.Test;
 
 public class NodeBeanLoaderTest {
@@ -38,6 +39,8 @@ public class NodeBeanLoaderTest {
   private static final String SHELL_BASH = "shell_bash";
   private static final String SHELL_PWD = "shell_pwd";
   private static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
+  private static final String ECHO_COMMAND_1 = "echo \"This is an echoed text from embedded_flow1.\"";
+  private static final String PWD_COMMAND = "pwd";
   private static final String EMBEDDED_FLOW1 = "embedded_flow1";
   private static final String EMBEDDED_FLOW2 = "embedded_flow2";
   private static final String TYPE_NOOP = "noop";
@@ -177,8 +180,57 @@ public class NodeBeanLoaderTest {
   }
 
   @Test
-  public void testGetFlowName() throws Exception {
+  public void testGetFlowName() {
     assertThat(new NodeBeanLoader().getFlowName(ExecutionsTestUtil.getFlowFile(
         BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE))).isEqualTo(BASIC_FLOW_NAME);
   }
+
+  @Test
+  public void testGetFlowProps() {
+    final Props flowProps = FlowLoaderUtils.getPropsFromYamlFile(BASIC_FLOW_NAME,
+        ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
+    assertThat(flowProps.size()).isEqualTo(1);
+    assertThat(flowProps.get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+  }
+
+  @Test
+  public void testGetJobPropsFromBasicFlow() {
+    final Props jobProps = FlowLoaderUtils
+        .getPropsFromYamlFile(BASIC_FLOW_NAME + Constants.PATH_DELIMITER + SHELL_ECHO,
+            ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
+    assertThat(jobProps.size()).isEqualTo(1);
+    assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+  }
+
+  @Test
+  public void testGetJobPropsWithInvalidPath() {
+    final Props jobProps = FlowLoaderUtils
+        .getPropsFromYamlFile(BASIC_FLOW_NAME + Constants.PATH_DELIMITER + EMBEDDED_FLOW_NAME,
+            ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
+    assertThat(jobProps).isNull();
+  }
+
+  @Test
+  public void testGetJobPropsFromEmbeddedFlow() {
+    // Get job props from parent flow
+    String jobPrefix = EMBEDDED_FLOW_NAME + Constants.PATH_DELIMITER;
+    Props jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_ECHO,
+        ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
+    assertThat(jobProps.size()).isEqualTo(1);
+    assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+
+    // Get job props from first level embedded flow
+    jobPrefix = jobPrefix + EMBEDDED_FLOW1 + Constants.PATH_DELIMITER;
+    jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_ECHO,
+        ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
+    assertThat(jobProps.size()).isEqualTo(1);
+    assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND_1);
+
+    // Get job props from second level embedded flow
+    jobPrefix = jobPrefix + EMBEDDED_FLOW2 + Constants.PATH_DELIMITER;
+    jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_PWD,
+        ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
+    assertThat(jobProps.size()).isEqualTo(1);
+    assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(PWD_COMMAND);
+  }
 }
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index acaed57..9bb3374 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -67,6 +67,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -757,9 +758,11 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
     final ArrayList<Map<String, Object>> flowList =
         new ArrayList<>();
     for (final Flow flow : project.getFlows()) {
-      final HashMap<String, Object> flowObj = new HashMap<>();
-      flowObj.put("flowId", flow.getId());
-      flowList.add(flowObj);
+      if (!flow.isEmbeddedFlow()) {
+        final HashMap<String, Object> flowObj = new HashMap<>();
+        flowObj.put("flowId", flow.getId());
+        flowList.add(flowObj);
+      }
     }
 
     ret.put("flows", flowList);
@@ -1585,7 +1588,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
           page.add("exec", false);
         }
 
-        final List<Flow> flows = project.getFlows();
+        final List<Flow> flows = project.getFlows().stream().filter(flow -> !flow.isEmbeddedFlow())
+            .collect(Collectors.toList());
+
         if (!flows.isEmpty()) {
           Collections.sort(flows, FLOW_ID_COMPARATOR);
           page.add("flows", flows);
diff --git a/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow.flow b/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow.flow
new file mode 100644
index 0000000..855ff3c
--- /dev/null
+++ b/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow.flow
@@ -0,0 +1,61 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: shell_end
+    type: noop
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - embedded_flow1
+
+  - name: shell_pwd
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_echo
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: embedded_flow1
+    type: flow
+    config:
+      flow-level-parameter: value
+
+    nodes:
+      - name: shell_end
+        type: noop
+        dependsOn:
+          - shell_echo
+          - embedded_flow2
+
+      - name: shell_echo
+        type: command
+        config:
+          command: echo "This is an echoed text from embedded_flow1."
+
+      - name: embedded_flow2
+        type: flow
+        config:
+          flow-level-parameter: value
+        dependsOn:
+            - shell_bash
+
+        nodes:
+          - name: shell_end
+            type: noop
+            dependsOn:
+              - shell_pwd
+
+          - name: shell_pwd
+            type: command
+            config:
+              command: pwd
+
+      - name: shell_bash
+        type: command
+        config:
+          command: bash ./sample_script.sh
diff --git a/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow.project b/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow_b.flow b/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow_b.flow
new file mode 100644
index 0000000..855ff3c
--- /dev/null
+++ b/test/execution-test-data/multipleembeddedflowyamltest/embedded_flow_b.flow
@@ -0,0 +1,61 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: shell_end
+    type: noop
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - embedded_flow1
+
+  - name: shell_pwd
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_echo
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: embedded_flow1
+    type: flow
+    config:
+      flow-level-parameter: value
+
+    nodes:
+      - name: shell_end
+        type: noop
+        dependsOn:
+          - shell_echo
+          - embedded_flow2
+
+      - name: shell_echo
+        type: command
+        config:
+          command: echo "This is an echoed text from embedded_flow1."
+
+      - name: embedded_flow2
+        type: flow
+        config:
+          flow-level-parameter: value
+        dependsOn:
+            - shell_bash
+
+        nodes:
+          - name: shell_end
+            type: noop
+            dependsOn:
+              - shell_pwd
+
+          - name: shell_pwd
+            type: command
+            config:
+              command: pwd
+
+      - name: shell_bash
+        type: command
+        config:
+          command: bash ./sample_script.sh