azkaban-aplcache

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 7fd177e..2d598ee 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -35,6 +35,10 @@ public class Constants {
   // Azkaban Flow Versions
   public static final String AZKABAN_FLOW_VERSION_2_0 = "2.0";
 
+  // Flow 2.0 file suffix
+  public static final String PROJECT_FILE_SUFFIX = ".project";
+  public static final String FLOW_FILE_SUFFIX = ".flow";
+
   // Names and paths of various file names to configure Azkaban
   public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
   public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
index d34743e..e2ad0c3 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -18,9 +18,12 @@
 package azkaban.project;
 
 import azkaban.utils.Props;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class AzkabanFlow extends AzkabanNode {
@@ -28,23 +31,24 @@ public class AzkabanFlow extends AzkabanNode {
   private final Map<String, AzkabanNode> nodes;
 
   private AzkabanFlow(final String name, final Props props,
-      final Map<String, AzkabanNode> nodes) {
-    super(name, props);
+      final Map<String, AzkabanNode> nodes, final List<String> dependsOn) {
+    super(name, props, dependsOn);
     this.nodes = nodes;
   }
 
   public Map<String, AzkabanNode> getNodes() {
-    return nodes;
+    return this.nodes;
   }
 
-  public AzkabanJob getJob(final String name) {
-    return (AzkabanJob) nodes.get(name);
+  public AzkabanNode getNode(final String name) {
+    return this.nodes.get(name);
   }
 
   public static class AzkabanFlowBuilder {
 
     private String name;
     private Props props;
+    private List<String> dependsOn;
     private Map<String, AzkabanNode> nodes;
 
     public AzkabanFlowBuilder setName(final String name) {
@@ -57,17 +61,24 @@ public class AzkabanFlow extends AzkabanNode {
       return this;
     }
 
+    public AzkabanFlowBuilder setDependsOn(final List<String> dependsOn) {
+      this.dependsOn = dependsOn == null
+          ? Collections.emptyList()
+          : ImmutableList.copyOf(dependsOn);
+      return this;
+    }
+
     public AzkabanFlowBuilder setNodes(final Collection<? extends AzkabanNode> azkabanNodes) {
       final Map<String, AzkabanNode> tempNodes = new HashMap<>();
       for (final AzkabanNode node : azkabanNodes) {
         tempNodes.put(node.getName(), node);
       }
-      nodes = ImmutableMap.copyOf(tempNodes);
+      this.nodes = ImmutableMap.copyOf(tempNodes);
       return this;
     }
 
     public AzkabanFlow build() {
-      return new AzkabanFlow(name, props, nodes);
+      return new AzkabanFlow(this.name, this.props, this.nodes, this.dependsOn);
     }
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
index ed59fe0..03f78e2 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
@@ -25,21 +25,15 @@ import java.util.List;
 public class AzkabanJob extends AzkabanNode {
 
   private final String type;
-  private final List<String> dependsOn;
 
   private AzkabanJob(final String name, final String type, final Props props,
       final List<String> dependsOn) {
-    super(name, props);
+    super(name, props, dependsOn);
     this.type = type;
-    this.dependsOn = dependsOn;
   }
 
   public String getType() {
-    return type;
-  }
-
-  public List<String> getDependsOn() {
-    return dependsOn;
+    return this.type;
   }
 
   public static class AzkabanJobBuilder {
@@ -73,7 +67,7 @@ public class AzkabanJob extends AzkabanNode {
     }
 
     public AzkabanJob build() {
-      return new AzkabanJob(name, type, props, dependsOn);
+      return new AzkabanJob(this.name, this.type, this.props, this.dependsOn);
     }
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
index 1b91ec8..d25afb1 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
@@ -20,22 +20,29 @@ package azkaban.project;
 import static java.util.Objects.requireNonNull;
 
 import azkaban.utils.Props;
+import java.util.List;
 
 public abstract class AzkabanNode {
 
   protected final String name;
   protected final Props props;
+  protected final List<String> dependsOn;
 
-  public AzkabanNode(final String name, final Props props) {
+  public AzkabanNode(final String name, final Props props, final List<String> dependsOn) {
     this.name = requireNonNull(name);
     this.props = requireNonNull(props);
+    this.dependsOn = dependsOn;
   }
 
   public String getName() {
-    return name;
+    return this.name;
   }
 
   public Props getProps() {
-    return props;
+    return this.props;
+  }
+
+  public List<String> getDependsOn() {
+    return this.dependsOn;
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 058f8ce..dbb7cce 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -16,6 +16,7 @@
 
 package azkaban.project;
 
+import azkaban.Constants;
 import azkaban.flow.Flow;
 import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.project.validator.ValidationReport;
@@ -35,8 +36,6 @@ import org.slf4j.LoggerFactory;
 public class DirectoryYamlFlowLoader implements FlowLoader {
 
   private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoader.class);
-  private static final String PROJECT_FILE_SUFFIX = ".project";
-  private static final String FLOW_FILE_SUFFIX = ".flow";
 
   private final Props props;
   private final Set<String> errors = new HashSet<>();
@@ -86,13 +85,12 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
   private void convertYamlFiles(final File projectDir) {
     // Todo jamiesjc: convert project yaml file. It will contain properties for all flows.
 
-    //covert flow yaml files
-    final File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_FILE_SUFFIX));
+    final File[] flowFiles = projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX));
     for (final File file : flowFiles) {
-      final FlowBeanLoader loader = new FlowBeanLoader();
+      final NodeBeanLoader loader = new NodeBeanLoader();
       try {
-        final FlowBean flowBean = loader.load(file);
-        final AzkabanFlow azkabanFlow = loader.toAzkabanFlow(loader.getFlowName(file), flowBean);
+        final NodeBean nodeBean = loader.load(file);
+        final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
         final Flow flow = new Flow(azkabanFlow.getName());
         flow.setAzkabanFlow(azkabanFlow);
         this.flowMap.put(azkabanFlow.getName(), flow);
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBean.java b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
index b457053..eb62481 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
@@ -17,18 +17,20 @@
 
 package azkaban.project;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
 /**
- * The node bean is used by the YAML loader to deserialize DAG nodes
+ * Used by the YAML loader to deserialize DAG nodes in the flow
  */
-public class NodeBean {
+public class NodeBean implements Serializable {
 
   private String name;
   private Map<String, String> config;
   private List<String> dependsOn;
   private String type;
+  private List<NodeBean> nodes;
 
   public String getName() {
     return this.name;
@@ -62,8 +64,22 @@ public class NodeBean {
     this.type = type;
   }
 
+  public List<NodeBean> getNodes() {
+    return this.nodes;
+  }
+
+  public void setNodes(final List<NodeBean> nodes) {
+    this.nodes = nodes;
+  }
+
   @Override
   public String toString() {
-    return "Node{config=" + this.config + ", dependsOn=" + this.dependsOn + '}';
+    return "NodeBean{" +
+        "name='" + this.name + '\'' +
+        ", config=" + this.config +
+        ", dependsOn=" + this.dependsOn +
+        ", type='" + this.type + '\'' +
+        ", nodes=" + this.nodes +
+        '}';
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
new file mode 100644
index 0000000..ddc3845
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.test.executions.ExecutionsTestUtil;
+import org.junit.Test;
+
+public class NodeBeanLoaderTest {
+
+  private static final String BASIC_FLOW_YML_TEST_DIR = "basicflowyamltest";
+  private static final String BASIC_FLOW_NAME = "basic_flow";
+  private static final String BASIC_FLOW_YML_FILE = BASIC_FLOW_NAME + ".flow";
+  private static final String EMBEDDED_FLOW_YML_TEST_DIR = "embeddedflowyamltest";
+  private static final String EMBEDDED_FLOW_NAME = "embedded_flow";
+  private static final String EMBEDDED_FLOW_YML_FILE = EMBEDDED_FLOW_NAME + ".flow";
+  private static final String FLOW_CONFIG_KEY = "flow-level-parameter";
+  private static final String FLOW_CONFIG_VALUE = "value";
+  private static final String SHELL_END = "shell_end";
+  private static final String SHELL_ECHO = "shell_echo";
+  private static final String SHELL_BASH = "shell_bash";
+  private static final String SHELL_PWD = "shell_pwd";
+  private static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
+  private static final String EMBEDDED_FLOW1 = "embedded_flow1";
+  private static final String EMBEDDED_FLOW2 = "embedded_flow2";
+  private static final String TYPE_NOOP = "noop";
+  private static final String TYPE_COMMAND = "command";
+  private static final String TYPE_FLOW = "flow";
+
+  @Test
+  public void testLoadNodeBeanForBasicFlow() throws Exception {
+
+    final NodeBeanLoader loader = new NodeBeanLoader();
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
+
+    assertThat(nodeBean.getName()).isEqualTo(BASIC_FLOW_NAME);
+    assertThat(nodeBean.getType()).isEqualTo(TYPE_FLOW);
+    assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+    assertThat(nodeBean.getNodes().size()).isEqualTo(4);
+
+    final NodeBean node0 = nodeBean.getNodes().get(0);
+    assertThat(node0.getName()).isEqualTo(SHELL_END);
+    assertThat(node0.getType()).isEqualTo(TYPE_NOOP);
+    assertThat(node0.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
+
+    final NodeBean node1 = nodeBean.getNodes().get(1);
+    assertThat(node1.getName()).isEqualTo(SHELL_ECHO);
+    assertThat(node1.getType()).isEqualTo(TYPE_COMMAND);
+    assertThat(node1.getConfig().get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+  }
+
+  @Test
+  public void testLoadNodeBeanForEmbeddedFlow() throws Exception {
+
+    final NodeBeanLoader loader = new NodeBeanLoader();
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
+
+    assertThat(nodeBean.getName()).isEqualTo(EMBEDDED_FLOW_NAME);
+    assertThat(nodeBean.getType()).isEqualTo(TYPE_FLOW);
+    assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+    assertThat(nodeBean.getNodes().size()).isEqualTo(4);
+
+    final NodeBean node0 = nodeBean.getNodes().get(0);
+    assertThat(node0.getName()).isEqualTo(SHELL_END);
+    assertThat(node0.getType()).isEqualTo(TYPE_NOOP);
+    assertThat(node0.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1);
+
+    final NodeBean node1 = nodeBean.getNodes().get(1);
+    assertThat(node1.getName()).isEqualTo(SHELL_PWD);
+
+    final NodeBean node2 = nodeBean.getNodes().get(2);
+    assertThat(node2.getName()).isEqualTo(SHELL_ECHO);
+
+    final NodeBean node3 = nodeBean.getNodes().get(3);
+    assertThat(node3.getName()).isEqualTo(EMBEDDED_FLOW1);
+    assertThat(node3.getType()).isEqualTo(TYPE_FLOW);
+    assertThat(node3.getNodes().size()).isEqualTo(4);
+
+    // Verify nodes in embedded_flow1 are loaded correctly.
+    final NodeBean node3_0 = node3.getNodes().get(0);
+    assertThat(node3_0.getName()).isEqualTo(SHELL_END);
+
+    final NodeBean node3_1 = node3.getNodes().get(1);
+    assertThat(node3_1.getName()).isEqualTo(SHELL_ECHO);
+
+    final NodeBean node3_2 = node3.getNodes().get(2);
+    assertThat(node3_2.getName()).isEqualTo(EMBEDDED_FLOW2);
+    assertThat(node3_2.getType()).isEqualTo(TYPE_FLOW);
+    assertThat(node3_2.getDependsOn()).contains(SHELL_BASH);
+    assertThat(node3_2.getNodes().size()).isEqualTo(2);
+
+    final NodeBean node3_3 = node3.getNodes().get(3);
+    assertThat(node3_3.getName()).isEqualTo(SHELL_BASH);
+
+    // Verify nodes in embedded_flow2 are loaded correctly.
+    final NodeBean node3_2_0 = node3_2.getNodes().get(0);
+    assertThat(node3_2_0.getName()).isEqualTo(SHELL_END);
+
+    final NodeBean node3_2_1 = node3_2.getNodes().get(1);
+    assertThat(node3_2_1.getName()).isEqualTo(SHELL_PWD);
+
+  }
+
+  @Test
+  public void testToBasicAzkabanFlow() throws Exception {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
+    final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
+
+    assertThat(flow.getName()).isEqualTo(BASIC_FLOW_NAME);
+    assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+    assertThat(flow.getNodes().size()).isEqualTo(4);
+
+    final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
+    assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
+    assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
+    assertThat(shellEnd.getProps().size()).isEqualTo(0);
+    assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
+
+    final AzkabanJob shellEcho = (AzkabanJob) flow.getNode(SHELL_ECHO);
+    assertThat(shellEcho.getName()).isEqualTo(SHELL_ECHO);
+    assertThat(shellEcho.getType()).isEqualTo(TYPE_COMMAND);
+    assertThat(shellEcho.getProps().size()).isEqualTo(1);
+    assertThat(shellEcho.getProps().get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+  }
+
+  @Test
+  public void testToEmbeddedAzkabanFlow() throws Exception {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
+    final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
+
+    assertThat(flow.getName()).isEqualTo(EMBEDDED_FLOW_NAME);
+    assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+    assertThat(flow.getNodes().size()).isEqualTo(4);
+
+    final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
+    assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
+    assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
+    assertThat(shellEnd.getProps().size()).isEqualTo(0);
+    assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1);
+
+    final AzkabanFlow embeddedFlow1 = (AzkabanFlow) flow.getNode(EMBEDDED_FLOW1);
+    assertThat(embeddedFlow1.getName()).isEqualTo(EMBEDDED_FLOW1);
+    assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+    assertThat(embeddedFlow1.getNodes().size()).isEqualTo(4);
+    assertThat(embeddedFlow1.getNodes().containsKey(EMBEDDED_FLOW2)).isTrue();
+
+    final AzkabanFlow embeddedFlow2 = (AzkabanFlow) embeddedFlow1.getNode(EMBEDDED_FLOW2);
+    assertThat(embeddedFlow2.getName()).isEqualTo(EMBEDDED_FLOW2);
+    assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+    assertThat(embeddedFlow2.getDependsOn()).contains(SHELL_BASH);
+    assertThat(embeddedFlow2.getNodes().size()).isEqualTo(2);
+    assertThat(embeddedFlow2.getNodes().containsKey(SHELL_END)).isTrue();
+    assertThat(embeddedFlow2.getNodes().containsKey(SHELL_PWD)).isTrue();
+
+  }
+
+  @Test
+  public void testGetFlowName() throws Exception {
+    assertThat(new NodeBeanLoader().getFlowName(ExecutionsTestUtil.getFlowFile(
+        BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE))).isEqualTo(BASIC_FLOW_NAME);
+  }
+}
diff --git a/azkaban-common/src/test/resources/sample_flow.flow b/azkaban-common/src/test/resources/sample_flow.flow
index 287f495..fa68e7d 100644
--- a/azkaban-common/src/test/resources/sample_flow.flow
+++ b/azkaban-common/src/test/resources/sample_flow.flow
@@ -1,3 +1,6 @@
+---
+name: sample_flow
+type: flow
 # All flow level properties here
 config:
   flow-level-parameter: value
diff --git a/test/execution-test-data/basicflowyamltest/basic_flow.flow b/test/execution-test-data/basicflowyamltest/basic_flow.flow
index 287f495..2b9caba 100644
--- a/test/execution-test-data/basicflowyamltest/basic_flow.flow
+++ b/test/execution-test-data/basicflowyamltest/basic_flow.flow
@@ -1,3 +1,6 @@
+---
+name: basic_flow
+type: flow
 # All flow level properties here
 config:
   flow-level-parameter: value
diff --git a/test/execution-test-data/embeddedflowyamltest/embedded_flow.flow b/test/execution-test-data/embeddedflowyamltest/embedded_flow.flow
new file mode 100644
index 0000000..7d8e54c
--- /dev/null
+++ b/test/execution-test-data/embeddedflowyamltest/embedded_flow.flow
@@ -0,0 +1,63 @@
+---
+name: embedded_flow
+type: flow
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: shell_end
+    type: noop
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - embedded_flow1
+
+  - name: shell_pwd
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_echo
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: embedded_flow1
+    type: flow
+    config:
+      flow-level-parameter: value
+
+    nodes:
+      - name: shell_end
+        type: noop
+        dependsOn:
+          - shell_echo
+          - embedded_flow2
+
+      - name: shell_echo
+        type: command
+        config:
+          command: echo "This is an echoed text from embedded_flow1."
+
+      - name: embedded_flow2
+        type: flow
+        config:
+          flow-level-parameter: value
+        dependsOn:
+            - shell_bash
+
+        nodes:
+          - name: shell_end
+            type: noop
+            dependsOn:
+              - shell_pwd
+
+          - name: shell_pwd
+            type: command
+            config:
+              command: pwd
+
+      - name: shell_bash
+        type: command
+        config:
+          command: bash ./sample_script.sh
\ No newline at end of file
diff --git a/test/execution-test-data/multipleflowyamltest/basic_flow.flow b/test/execution-test-data/multipleflowyamltest/basic_flow.flow
index 287f495..2b9caba 100644
--- a/test/execution-test-data/multipleflowyamltest/basic_flow.flow
+++ b/test/execution-test-data/multipleflowyamltest/basic_flow.flow
@@ -1,3 +1,6 @@
+---
+name: basic_flow
+type: flow
 # All flow level properties here
 config:
   flow-level-parameter: value
diff --git a/test/execution-test-data/multipleflowyamltest/basic_flow2.flow b/test/execution-test-data/multipleflowyamltest/basic_flow2.flow
index e87b712..01246c7 100644
--- a/test/execution-test-data/multipleflowyamltest/basic_flow2.flow
+++ b/test/execution-test-data/multipleflowyamltest/basic_flow2.flow
@@ -1,3 +1,6 @@
+---
+name: basic_flow2
+type: flow
 # All flow level properties here
 config:
   flow-level-parameter: value