azkaban-aplcache

Create directory flow loader for yaml files. Fixed an issue

9/29/2017 1:23:09 PM

Changes

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 77d2151..b21a4fe 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -32,6 +32,9 @@ package azkaban;
  */
 public class Constants {
 
+  // Azkaban Flow Versions
+  public static final String AZKABAN_FLOW_VERSION_2_0 = "2.0";
+
   // Names and paths of various file names to configure Azkaban
   public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
   public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
@@ -156,6 +159,12 @@ public class Constants {
 
     // Job property that enables/disables using Kafka logging of user job logs
     public static final String AZKABAN_JOB_LOGGING_KAFKA_ENABLE = "azkaban.job.logging.kafka.enable";
+
+    // Job properties that indicate maximum memory size
+    public static final String JOB_MAX_XMS = "job.max.Xms";
+    public static final String MAX_XMS_DEFAULT = "1G";
+    public static final String JOB_MAX_XMX = "job.max.Xmx";
+    public static final String MAX_XMX_DEFAULT = "2G";
   }
 
   public static class JobCallbackProperties {
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index fee2e29..991a954 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -17,6 +17,7 @@
 package azkaban.flow;
 
 import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.project.AzkabanFlow;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -48,6 +49,8 @@ public class Flow {
 
   private boolean isLayedOut = false;
 
+  private AzkabanFlow azkabanFlow;
+
   public Flow(final String id) {
     this.id = id;
   }
@@ -407,4 +410,12 @@ public class Flow {
   public void setProjectId(final int projectId) {
     this.projectId = projectId;
   }
+
+  public AzkabanFlow getAzkabanFlow() {
+    return this.azkabanFlow;
+  }
+
+  public void setAzkabanFlow(final AzkabanFlow azkabanFlow) {
+    this.azkabanFlow = azkabanFlow;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index 4f1a6c8..f87f7ab 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -16,7 +16,7 @@
 
 package azkaban.jobExecutor;
 
-import azkaban.project.DirectoryFlowLoader;
+import azkaban.Constants;
 import azkaban.server.AzkabanServer;
 import azkaban.utils.Pair;
 import azkaban.utils.Props;
@@ -161,9 +161,9 @@ public class JavaProcessJob extends ProcessJob {
     final Props azkabanProperties = AzkabanServer.getAzkabanProperties();
     if (azkabanProperties != null) {
       final String maxXms = azkabanProperties
-          .getString(DirectoryFlowLoader.JOB_MAX_XMS, DirectoryFlowLoader.MAX_XMS_DEFAULT);
+          .getString(Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
       final String maxXmx = azkabanProperties
-          .getString(DirectoryFlowLoader.JOB_MAX_XMX, DirectoryFlowLoader.MAX_XMX_DEFAULT);
+          .getString(Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
       final long sizeMaxXms = Utils.parseMemString(maxXms);
       final long sizeMaxXmx = Utils.parseMemString(maxXmx);
 
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index dd6cdea..110fe80 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -54,15 +54,17 @@ class AzkabanProjectLoader {
 
   private final ProjectLoader projectLoader;
   private final StorageManager storageManager;
+  private final FlowLoaderFactory flowLoaderFactory;
   private final File tempDir;
   private final int projectVersionRetention;
 
   @Inject
   AzkabanProjectLoader(final Props props, final ProjectLoader projectLoader,
-      final StorageManager storageManager) {
+      final StorageManager storageManager, final FlowLoaderFactory flowLoaderFactory) {
     this.props = requireNonNull(props, "Props is null");
     this.projectLoader = requireNonNull(projectLoader, "project Loader is null");
     this.storageManager = requireNonNull(storageManager, "Storage Manager is null");
+    this.flowLoaderFactory = requireNonNull(flowLoaderFactory, "Flow Loader Factory is null");
 
     this.tempDir = new File(props.getString(ConfigurationKeys.PROJECT_TEMP_DIR, "temp"));
     if (!this.tempDir.exists()) {
@@ -90,18 +92,16 @@ class AzkabanProjectLoader {
     prop.putAll(additionalProps);
 
     File file = null;
+    final FlowLoader loader;
+
     try {
       file = unzipProject(archive, fileType);
 
       reports = validateProject(project, archive, file, prop);
 
-      // Todo jamiesjc: in Flow 2.0, we need to create new flowLoader class and
-      // call new method to load the project flows.
-      // Need to guicify it later so that we can mock flowLoader in the tests.
-      // Load the project flows.
-      final DirectoryFlowLoader directoryFlowLoader = new DirectoryFlowLoader(prop);
-      reports.put(DIRECTORY_FLOW_REPORT_KEY,
-          directoryFlowLoader.loadProject(project, file));
+      loader = this.flowLoaderFactory.createFlowLoader(file);
+      reports.put(DIRECTORY_FLOW_REPORT_KEY, loader.loadProjectFlow(project, file));
+
     } finally {
       cleanUpProjectTempDir(file);
     }
@@ -112,7 +112,7 @@ class AzkabanProjectLoader {
     }
 
     // Upload the project to DB and storage.
-    persistProject(project, archive, uploader);
+    persistProject(project, loader, archive, uploader);
 
     // Clean up project old installations after new project is uploaded successfully.
     cleanUpProjectOldInstallations(project);
@@ -181,30 +181,38 @@ class AzkabanProjectLoader {
     return true;
   }
 
-  private void persistProject(final Project project, final File archive, final User uploader)
-      throws ProjectManagerException{
+  private void persistProject(final Project project, final FlowLoader loader, final File archive,
+      final User uploader) throws ProjectManagerException {
     synchronized (project) {
-      final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
-      final Map<String, Flow> flows = project.getFlowMap();
-      for (final Flow flow : flows.values()) {
-        flow.setProjectId(project.getId());
-        flow.setVersion(newVersion);
-      }
+      if (loader instanceof DirectoryFlowLoader) {
+        final DirectoryFlowLoader directoryFlowLoader = (DirectoryFlowLoader) loader;
+        final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
+        final Map<String, Flow> flows = directoryFlowLoader.getFlowMap();
+        for (final Flow flow : flows.values()) {
+          flow.setProjectId(project.getId());
+          flow.setVersion(newVersion);
+        }
 
-      this.storageManager.uploadProject(project, newVersion, archive, uploader);
-
-      log.info("Uploading flow to db " + archive.getName());
-      this.projectLoader.uploadFlows(project, newVersion, flows.values());
-      log.info("Changing project versions " + archive.getName());
-      this.projectLoader.changeProjectVersion(project, newVersion,
-          uploader.getUserId());
-      log.info("Uploading Job properties");
-      this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
-          project.getJobPropsMap().values()));
-      log.info("Uploading Props properties");
-      this.projectLoader.uploadProjectProperties(project, project.getPropsList());
-      this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
-          "Uploaded project files zip " + archive.getName());
+        this.storageManager.uploadProject(project, newVersion, archive, uploader);
+
+        log.info("Uploading flow to db " + archive.getName());
+        this.projectLoader.uploadFlows(project, newVersion, flows.values());
+        log.info("Changing project versions " + archive.getName());
+        this.projectLoader.changeProjectVersion(project, newVersion,
+            uploader.getUserId());
+        project.setFlows(flows);
+        log.info("Uploading Job properties");
+        this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
+            directoryFlowLoader.getJobPropsMap().values()));
+        log.info("Uploading Props properties");
+        this.projectLoader.uploadProjectProperties(project, directoryFlowLoader.getPropsList());
+        this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
+            "Uploaded project files zip " + archive.getName());
+      } else if (loader instanceof DirectoryYamlFlowLoader) {
+        // Todo jamiesjc: upload yaml file to DB as a blob
+      } else {
+        throw new ProjectManagerException("Invalid type of flow loader.");
+      }
     }
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index cc99174..a9388fa 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -16,6 +16,7 @@
 
 package azkaban.project;
 
+import azkaban.Constants;
 import azkaban.flow.CommonJobProperties;
 import azkaban.flow.Edge;
 import azkaban.flow.Flow;
@@ -23,6 +24,7 @@ import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
 import azkaban.flow.SpecialJobTypes;
 import azkaban.jobcallback.JobCallbackValidator;
+import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
@@ -32,30 +34,27 @@ import java.io.FileFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class DirectoryFlowLoader {
+public class DirectoryFlowLoader implements FlowLoader {
 
-  public static final String JOB_MAX_XMS = "job.max.Xms";
-  public static final String MAX_XMS_DEFAULT = "1G";
-  public static final String JOB_MAX_XMX = "job.max.Xmx";
-  public static final String MAX_XMX_DEFAULT = "2G";
   private static final DirFilter DIR_FILTER = new DirFilter();
   private static final String PROPERTY_SUFFIX = ".properties";
   private static final String JOB_SUFFIX = ".job";
   private static final String XMS = "Xms";
   private static final String XMX = "Xmx";
 
-  private static final Logger logger = Logger.getLogger(DirectoryFlowLoader.class);
+  private static final Logger logger = LoggerFactory.getLogger(DirectoryFlowLoader.class);
   private final Props props;
+  private final Set<String> errors = new HashSet<>();
+  private final Map<String, Flow> flowMap = new HashMap<>();
   private HashSet<String> rootNodes;
-  private HashMap<String, Flow> flowMap;
   private HashMap<String, Node> nodeMap;
   private HashMap<String, Map<String, Edge>> nodeDependencies;
   private HashMap<String, Props> jobPropsMap;
@@ -65,7 +64,6 @@ public class DirectoryFlowLoader {
 
   private ArrayList<FlowProps> flowPropsList;
   private ArrayList<Props> propsList;
-  private Set<String> errors;
   private Set<String> duplicateJobs;
 
   /**
@@ -78,6 +76,15 @@ public class DirectoryFlowLoader {
   }
 
   /**
+   * Returns the flow map constructed from the loaded flows.
+   *
+   * @return Map of flow name to Flow.
+   */
+  public Map<String, Flow> getFlowMap() {
+    return this.flowMap;
+  }
+
+  /**
    * Returns errors caught when loading flows.
    *
    * @return Set of error strings.
@@ -87,25 +94,43 @@ public class DirectoryFlowLoader {
   }
 
   /**
-   * Loads all flows from the directory into the project.
+   * Returns job properties.
+   *
+   * @return Map of job name to properties.
+   */
+  public HashMap<String, Props> getJobPropsMap() {
+    return this.jobPropsMap;
+  }
+
+  /**
+   * Returns list of properties.
    *
-   * @param project The project to load flows to.
-   * @param baseDirectory The directory to load flows from.
+   * @return List of Props.
    */
-  public void loadProjectFlow(final Project project, final File baseDirectory) {
+  public ArrayList<Props> getPropsList() {
+    return this.propsList;
+  }
+
+  /**
+   * Loads all project flows from the directory.
+   *
+   * @param project The project.
+   * @param projectDir The directory to load flows from.
+   * @return the validation report.
+   */
+  @Override
+  public ValidationReport loadProjectFlow(final Project project, final File projectDir) {
     this.propsList = new ArrayList<>();
     this.flowPropsList = new ArrayList<>();
     this.jobPropsMap = new HashMap<>();
     this.nodeMap = new HashMap<>();
-    this.flowMap = new HashMap<>();
-    this.errors = new HashSet<>();
     this.duplicateJobs = new HashSet<>();
     this.nodeDependencies = new HashMap<>();
     this.rootNodes = new HashSet<>();
     this.flowDependencies = new HashMap<>();
 
     // Load all the props files and create the Node objects
-    loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
+    loadProjectFromDir(projectDir.getPath(), projectDir, null);
 
     // Create edges and find missing dependencies
     resolveDependencies();
@@ -116,9 +141,9 @@ public class DirectoryFlowLoader {
     // Resolve embedded flows
     resolveEmbeddedFlows();
 
-    project.setFlows(this.flowMap);
-    project.setPropsList(this.propsList);
-    project.setJobPropsMap(this.jobPropsMap);
+    checkJobProperties(project);
+
+    return FlowLoaderUtils.generateFlowLoaderReport(this.errors);
 
   }
 
@@ -302,34 +327,7 @@ public class DirectoryFlowLoader {
         final Flow flow = new Flow(base.getId());
         final Props jobProp = this.jobPropsMap.get(base.getId());
 
-        // Dedup with sets
-        final List<String> successEmailList =
-            jobProp.getStringList(CommonJobProperties.SUCCESS_EMAILS,
-                Collections.EMPTY_LIST);
-        final Set<String> successEmail = new HashSet<>();
-        for (final String email : successEmailList) {
-          successEmail.add(email.toLowerCase());
-        }
-
-        final List<String> failureEmailList =
-            jobProp.getStringList(CommonJobProperties.FAILURE_EMAILS,
-                Collections.EMPTY_LIST);
-        final Set<String> failureEmail = new HashSet<>();
-        for (final String email : failureEmailList) {
-          failureEmail.add(email.toLowerCase());
-        }
-
-        final List<String> notifyEmailList =
-            jobProp.getStringList(CommonJobProperties.NOTIFY_EMAILS,
-                Collections.EMPTY_LIST);
-        for (String email : notifyEmailList) {
-          email = email.toLowerCase();
-          successEmail.add(email);
-          failureEmail.add(email);
-        }
-
-        flow.addFailureEmails(failureEmail);
-        flow.addSuccessEmails(successEmail);
+        FlowLoaderUtils.addEmailPropsToFlow(flow, jobProp);
 
         flow.addAllFlowProperties(this.flowPropsList);
         constructFlow(flow, base, visitedNodes);
@@ -380,7 +378,7 @@ public class DirectoryFlowLoader {
     visited.remove(node.getId());
   }
 
-  private void checkJobProperties(final Project project) {
+  public void checkJobProperties(final Project project) {
     // if project is in the memory check whitelist, then we don't need to check
     // its memory settings
     if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
@@ -388,8 +386,10 @@ public class DirectoryFlowLoader {
       return;
     }
 
-    final String maxXms = this.props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
-    final String maxXmx = this.props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
+    final String maxXms = this.props.getString(
+        Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
+    final String maxXmx = this.props.getString(
+        Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
     final long sizeMaxXms = Utils.parseMemString(maxXms);
     final long sizeMaxXmx = Utils.parseMemString(maxXmx);
 
@@ -427,14 +427,6 @@ public class DirectoryFlowLoader {
     return filePath.substring(basePath.length() + 1);
   }
 
-  public ValidationReport loadProject(final Project project, final File projectDir) {
-    loadProjectFlow(project, projectDir);
-    checkJobProperties(project);
-    final ValidationReport report = new ValidationReport();
-    report.addErrorMsgs(this.errors);
-    return report;
-  }
-
   private static class DirFilter implements FileFilter {
 
     @Override
@@ -443,20 +435,4 @@ public class DirectoryFlowLoader {
     }
   }
 
-  private static class SuffixFilter implements FileFilter {
-
-    private final String suffix;
-
-    public SuffixFilter(final String suffix) {
-      this.suffix = suffix;
-    }
-
-    @Override
-    public boolean accept(final File pathname) {
-      final String name = pathname.getName();
-
-      return pathname.isFile() && !pathname.isHidden()
-          && name.length() > this.suffix.length() && name.endsWith(this.suffix);
-    }
-  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
new file mode 100644
index 0000000..058f8ce
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -0,0 +1,111 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+package azkaban.project;
+
+import azkaban.flow.Flow;
+import azkaban.project.FlowLoaderUtils.SuffixFilter;
+import azkaban.project.validator.ValidationReport;
+import azkaban.utils.Props;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Loads yaml files to flows from project directory.
+ */
+public class DirectoryYamlFlowLoader implements FlowLoader {
+
+  private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoader.class);
+  private static final String PROJECT_FILE_SUFFIX = ".project";
+  private static final String FLOW_FILE_SUFFIX = ".flow";
+
+  private final Props props;
+  private final Set<String> errors = new HashSet<>();
+  private final Map<String, Flow> flowMap = new HashMap<>();
+
+  /**
+   * Creates a new DirectoryYamlFlowLoader.
+   *
+   * @param props Properties to add.
+   */
+  public DirectoryYamlFlowLoader(final Props props) {
+    this.props = props;
+  }
+
+  /**
+   * Returns the flow map constructed from the loaded flows.
+   *
+   * @return Map of flow name to Flow.
+   */
+  public Map<String, Flow> getFlowMap() {
+    return this.flowMap;
+  }
+
+  /**
+   * Returns errors caught when loading flows.
+   *
+   * @return Set of error strings.
+   */
+  public Set<String> getErrors() {
+    return this.errors;
+  }
+
+  /**
+   * Loads all project flows from the directory.
+   *
+   * @param project The project.
+   * @param projectDir The directory to load flows from.
+   * @return the validation report.
+   */
+  @Override
+  public ValidationReport loadProjectFlow(final Project project, final File projectDir) {
+    convertYamlFiles(projectDir);
+    checkJobProperties(project);
+    return FlowLoaderUtils.generateFlowLoaderReport(this.errors);
+  }
+
+  private void convertYamlFiles(final File projectDir) {
+    // Todo jamiesjc: convert project yaml file. It will contain properties for all flows.
+
+    //covert flow yaml files
+    final File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_FILE_SUFFIX));
+    for (final File file : flowFiles) {
+      final FlowBeanLoader loader = new FlowBeanLoader();
+      try {
+        final FlowBean flowBean = loader.load(file);
+        final AzkabanFlow azkabanFlow = loader.toAzkabanFlow(loader.getFlowName(file), flowBean);
+        final Flow flow = new Flow(azkabanFlow.getName());
+        flow.setAzkabanFlow(azkabanFlow);
+        this.flowMap.put(azkabanFlow.getName(), flow);
+        final Props flowProps = azkabanFlow.getProps();
+        FlowLoaderUtils.addEmailPropsToFlow(flow, flowProps);
+      } catch (final FileNotFoundException e) {
+        logger.error("Error loading flow yaml files", e);
+      }
+    }
+  }
+
+  public void checkJobProperties(final Project project) {
+    // Todo jamiesjc: implement the check later
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowBean.java b/azkaban-common/src/main/java/azkaban/project/FlowBean.java
index de4780c..4dc0b53 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowBean.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * This is the top level class which is used by the YAML loader to deserialize a flow.yml file.
+ * Top level class used by the YAML loader to deserialize a flow yaml file.
  */
 public class FlowBean implements Serializable {
 
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java
index 34f401a..a10914b 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java
@@ -33,7 +33,7 @@ public class FlowBeanLoader {
 
   public FlowBean load(final File flowFile) throws FileNotFoundException {
     checkArgument(flowFile.exists());
-    checkArgument(flowFile.getName().endsWith(".yml"));
+    checkArgument(flowFile.getName().endsWith(".flow"));
 
     return new Yaml().loadAs(new FileInputStream(flowFile), FlowBean.class);
   }
@@ -81,7 +81,7 @@ public class FlowBeanLoader {
 
   public String getFlowName(final File flowFile) {
     checkArgument(flowFile.exists());
-    checkArgument(flowFile.getName().endsWith(".yml"));
+    checkArgument(flowFile.getName().endsWith(".flow"));
 
     return Files.getNameWithoutExtension(flowFile.getName());
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoader.java b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
new file mode 100644
index 0000000..18b9303
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
@@ -0,0 +1,32 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+package azkaban.project;
+
+import azkaban.project.validator.ValidationReport;
+import java.io.File;
+
+public interface FlowLoader {
+
+  /**
+   * Loads all project flows from the directory.
+   *
+   * @param project The project.
+   * @param projectDir The directory to load flows from.
+   * @return the validation report.
+   */
+  ValidationReport loadProjectFlow(final Project project, final File projectDir);
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
new file mode 100644
index 0000000..9dc170d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
@@ -0,0 +1,61 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+package azkaban.project;
+
+import static java.util.Objects.requireNonNull;
+
+import azkaban.Constants;
+import azkaban.utils.Props;
+import java.io.File;
+import javax.inject.Inject;
+
+/**
+ * Factory class to generate flow loaders.
+ */
+public class FlowLoaderFactory {
+
+  private final Props props;
+
+  /**
+   * Instantiates a new Flow loader factory.
+   *
+   * @param props the props
+   */
+  @Inject
+  public FlowLoaderFactory(final Props props) {
+    this.props = requireNonNull(props, "Props is null");
+  }
+
+  /**
+   * Creates flow loader based on manifest file inside project directory.
+   *
+   * @param projectDir the project directory
+   * @return the flow loader
+   */
+  public FlowLoader createFlowLoader(final File projectDir) throws ProjectManagerException {
+    // Todo jamiesjc: need to check if manifest file exists in project directory,
+    // and create FlowLoader based on different flow versions specified in the manifest file.
+    final String flowVersion = null;
+    if (flowVersion == null) {
+      return new DirectoryFlowLoader(this.props);
+    } else if (flowVersion.equals(Constants.AZKABAN_FLOW_VERSION_2_0)) {
+      return new DirectoryYamlFlowLoader(this.props);
+    } else {
+      throw new ProjectManagerException("Flow version " + flowVersion + "is invalid.");
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
new file mode 100644
index 0000000..16ca468
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -0,0 +1,103 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.project;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.flow.Flow;
+import azkaban.project.validator.ValidationReport;
+import azkaban.utils.Props;
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class FlowLoaderUtils {
+
+  /**
+   * Adds email properties to a flow.
+   *
+   * @param flow the flow
+   * @param prop the prop
+   */
+  public static void addEmailPropsToFlow(final Flow flow, final Props prop) {
+    final List<String> successEmailList =
+        prop.getStringList(CommonJobProperties.SUCCESS_EMAILS,
+            Collections.EMPTY_LIST);
+    final Set<String> successEmail = new HashSet<>();
+    for (final String email : successEmailList) {
+      successEmail.add(email.toLowerCase());
+    }
+
+    final List<String> failureEmailList =
+        prop.getStringList(CommonJobProperties.FAILURE_EMAILS,
+            Collections.EMPTY_LIST);
+    final Set<String> failureEmail = new HashSet<>();
+    for (final String email : failureEmailList) {
+      failureEmail.add(email.toLowerCase());
+    }
+
+    final List<String> notifyEmailList =
+        prop.getStringList(CommonJobProperties.NOTIFY_EMAILS,
+            Collections.EMPTY_LIST);
+    for (String email : notifyEmailList) {
+      email = email.toLowerCase();
+      successEmail.add(email);
+      failureEmail.add(email);
+    }
+
+    flow.addFailureEmails(failureEmail);
+    flow.addSuccessEmails(successEmail);
+  }
+
+  /**
+   * Generate flow loader report validation report.
+   *
+   * @param errors the errors
+   * @return the validation report
+   */
+  public static ValidationReport generateFlowLoaderReport(final Set<String> errors) {
+    final ValidationReport report = new ValidationReport();
+    report.addErrorMsgs(errors);
+    return report;
+  }
+
+  /**
+   * Implements Suffix filter.
+   */
+  public static class SuffixFilter implements FileFilter {
+
+    private final String suffix;
+
+    /**
+     * Instantiates a new Suffix filter.
+     *
+     * @param suffix the suffix
+     */
+    public SuffixFilter(final String suffix) {
+      this.suffix = suffix;
+    }
+
+    @Override
+    public boolean accept(final File pathname) {
+      final String name = pathname.getName();
+
+      return pathname.isFile() && !pathname.isHidden()
+          && name.length() > this.suffix.length() && name.endsWith(this.suffix);
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/Project.java b/azkaban-common/src/main/java/azkaban/project/Project.java
index c2f1911..2148fa9 100644
--- a/azkaban-common/src/main/java/azkaban/project/Project.java
+++ b/azkaban-common/src/main/java/azkaban/project/Project.java
@@ -21,8 +21,6 @@ import azkaban.user.Permission;
 import azkaban.user.Permission.Type;
 import azkaban.user.User;
 import azkaban.utils.Pair;
-import azkaban.utils.Props;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -50,8 +48,6 @@ public class Project {
   private String lastModifiedUser;
   private String source;
   private Map<String, Flow> flows = new HashMap<>();
-  private Map<String, Props> jobPropsMap = new HashMap<>();
-  private List<Props> propsList = new ArrayList<>();
   private Map<String, Object> metadata = new HashMap<>();
 
   public Project(final int id, final String name) {
@@ -455,20 +451,4 @@ public class Project {
   public void setVersion(final int version) {
     this.version = version;
   }
-
-  public Map<String, Props> getJobPropsMap() {
-    return this.jobPropsMap;
-  }
-
-  public void setJobPropsMap(final Map<String, Props> jobPropsMap) {
-    this.jobPropsMap = ImmutableMap.copyOf(jobPropsMap);
-  }
-
-  public List<Props> getPropsList() {
-    return this.propsList;
-  }
-
-  public void setPropsList(final List<Props> propsList) {
-    this.propsList = ImmutableList.copyOf(propsList);
-  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
index 11bbcaa..cd625d8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
@@ -212,7 +212,7 @@ public class ExecutableFlowTest {
 
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
     Assert.assertEquals(0, loader.getErrors().size());
-
+    this.project.setFlows(loader.getFlowMap());
     this.project.setVersion(123);
   }
 
diff --git a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
index 5f98984..61b020f 100644
--- a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
@@ -23,11 +23,15 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import azkaban.project.validator.ValidationReport;
+import azkaban.project.validator.ValidationStatus;
 import azkaban.storage.StorageManager;
 import azkaban.user.User;
 import azkaban.utils.Props;
 import java.io.File;
 import java.net.URL;
+import java.util.Map;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,6 +39,8 @@ import org.junit.rules.TemporaryFolder;
 
 public class AzkabanProjectLoaderTest {
 
+  private static final String DIRECTORY_FLOW_REPORT_KEY = "Directory Flow";
+
   @Rule
   public final TemporaryFolder TEMP_DIR = new TemporaryFolder();
 
@@ -55,7 +61,7 @@ public class AzkabanProjectLoaderTest {
     this.projectLoader = mock(ProjectLoader.class);
 
     this.azkabanProjectLoader = new AzkabanProjectLoader(props, this.projectLoader,
-        this.storageManager);
+        this.storageManager, new FlowLoaderFactory(props));
   }
 
   @Test
@@ -67,7 +73,14 @@ public class AzkabanProjectLoaderTest {
     final File projectZipFile = new File(resource.getPath());
     final User uploader = new User("test_user");
 
-    this.azkabanProjectLoader.uploadProject(this.project, projectZipFile, "zip", uploader, null);
+    final Map<String, ValidationReport> validationReportMap =
+        this.azkabanProjectLoader
+            .uploadProject(this.project, projectZipFile, "zip", uploader, null);
+
+    Assert.assertEquals(1, validationReportMap.size());
+    Assert.assertTrue(validationReportMap.containsKey(DIRECTORY_FLOW_REPORT_KEY));
+    Assert.assertEquals(ValidationStatus.PASS,
+        validationReportMap.get(DIRECTORY_FLOW_REPORT_KEY).getStatus());
 
     verify(this.storageManager)
         .uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
index 10eccac..cd6ebec 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
@@ -37,9 +37,9 @@ public class DirectoryFlowLoaderTest {
 
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("exectest1"));
     Assert.assertEquals(0, loader.getErrors().size());
-    Assert.assertEquals(5, this.project.getFlowMap().size());
-    Assert.assertEquals(2, this.project.getPropsList().size());
-    Assert.assertEquals(14, this.project.getJobPropsMap().size());
+    Assert.assertEquals(5, loader.getFlowMap().size());
+    Assert.assertEquals(2, loader.getPropsList().size());
+    Assert.assertEquals(14, loader.getJobPropsMap().size());
   }
 
   @Test
@@ -48,9 +48,9 @@ public class DirectoryFlowLoaderTest {
 
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
     Assert.assertEquals(0, loader.getErrors().size());
-    Assert.assertEquals(2, this.project.getFlowMap().size());
-    Assert.assertEquals(0, this.project.getPropsList().size());
-    Assert.assertEquals(9, this.project.getJobPropsMap().size());
+    Assert.assertEquals(2, loader.getFlowMap().size());
+    Assert.assertEquals(0, loader.getPropsList().size());
+    Assert.assertEquals(9, loader.getJobPropsMap().size());
   }
 
   @Test
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
new file mode 100644
index 0000000..e55bbb7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -0,0 +1,67 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+package azkaban.project;
+
+import azkaban.flow.Flow;
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DirectoryYamlFlowLoaderTest {
+
+  private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
+  private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
+  private static final String FLOW_NAME_1 = "basic_flow";
+  private static final String FLOW_NAME_2 = "basic_flow2";
+  private Project project;
+
+  @Before
+  public void setUp() {
+    this.project = new Project(12, "myTestProject");
+  }
+
+  @Test
+  public void testLoadYamlFileFromDirectory() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+
+    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(BASIC_FLOW_YAML_DIR));
+    Assert.assertEquals(0, loader.getErrors().size());
+    Assert.assertEquals(1, loader.getFlowMap().size());
+    Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_1));
+    final Flow flow = loader.getFlowMap().get(FLOW_NAME_1);
+    final AzkabanFlow azkabanFlow = flow.getAzkabanFlow();
+    Assert.assertEquals(FLOW_NAME_1, azkabanFlow.getName());
+    Assert.assertEquals(4, azkabanFlow.getNodes().size());
+  }
+
+  @Test
+  public void testLoadMultipleYamlFilesFromDirectory() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+
+    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(MULTIPLE_FLOW_YAML_DIR));
+    Assert.assertEquals(0, loader.getErrors().size());
+    Assert.assertEquals(2, loader.getFlowMap().size());
+    Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_1));
+    Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_2));
+    final Flow flow2 = loader.getFlowMap().get(FLOW_NAME_2);
+    final AzkabanFlow azkabanFlow2 = flow2.getAzkabanFlow();
+    Assert.assertEquals(FLOW_NAME_2, azkabanFlow2.getName());
+    Assert.assertEquals(3, azkabanFlow2.getNodes().size());
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java
index cb51fcc..ce30c4f 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java
@@ -25,7 +25,7 @@ import org.junit.Test;
 public class FlowBeanLoaderTest {
 
   public static final String TEST_FLOW_NAME = "sample_flow";
-  public static final String TEST_FLOW_YML_FILENAME = TEST_FLOW_NAME + ".yml";
+  public static final String TEST_FLOW_YML_FILENAME = TEST_FLOW_NAME + ".flow";
   public static final String SHELL_END = "shell_end";
   public static final String SHELL_ECHO = "shell_echo";
   public static final String SHELL_BASH = "shell_bash";
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
new file mode 100644
index 0000000..4aceded
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
@@ -0,0 +1,55 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+package azkaban.project;
+
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
+import java.io.File;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class FlowLoaderFactoryTest {
+
+  private static final String FLOW_10_TEST_DIRECTORY = "exectest1";
+  private static final String FLOW_20_TEST_DIRECTORY = "basicflowyamltest";
+  private Project project;
+
+  @Before
+  public void setUp() throws Exception {
+    this.project = new Project(13, "myTestProject");
+  }
+
+  @Test
+  public void testCreateDirectoryFlowLoader() {
+    final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+    final File projectDir = ExecutionsTestUtil.getFlowDir(FLOW_10_TEST_DIRECTORY);
+    final FlowLoader loader = loaderFactory.createFlowLoader(projectDir);
+    Assert.assertTrue(loader instanceof DirectoryFlowLoader);
+  }
+
+  // Todo jamiesjc: add the manifest file with flow version 2.0 and enable this test
+  @Test
+  @Ignore
+  public void testCreateDirectoryYamlFlowLoader() {
+    final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+    final File projectDir = ExecutionsTestUtil.getFlowDir(FLOW_20_TEST_DIRECTORY);
+    final FlowLoader loader = loaderFactory.createFlowLoader(projectDir);
+    Assert.assertTrue(loader instanceof DirectoryYamlFlowLoader);
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index 46394f3..ac2b049 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -55,6 +55,7 @@ public class EmailerTest {
     final DirectoryFlowLoader loader = new DirectoryFlowLoader(this.props);
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
     Assert.assertEquals(0, loader.getErrors().size());
+    this.project.setFlows(loader.getFlowMap());
     this.project.setVersion(123);
   }
 
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index 465445d..8e0d9c9 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -55,7 +55,8 @@ public class FlowRunnerTestUtil {
           project.getName(), sourceDir));
     }
 
-    final Map<String, Flow> flowMap = project.getFlowMap();
+    final Map<String, Flow> flowMap = loader.getFlowMap();
+    project.setFlows(flowMap);
     FileUtils.copyDirectory(sourceDir, workingDir);
 
     return flowMap;
diff --git a/test/execution-test-data/basicflowyamltest/basic_flow.flow b/test/execution-test-data/basicflowyamltest/basic_flow.flow
new file mode 100644
index 0000000..287f495
--- /dev/null
+++ b/test/execution-test-data/basicflowyamltest/basic_flow.flow
@@ -0,0 +1,40 @@
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - shell_bash
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_bash
+    # Describe the type of the job
+    type: command
+    config:
+      command: bash ./sample_script.sh
diff --git a/test/execution-test-data/multipleflowyamltest/basic_flow.flow b/test/execution-test-data/multipleflowyamltest/basic_flow.flow
new file mode 100644
index 0000000..287f495
--- /dev/null
+++ b/test/execution-test-data/multipleflowyamltest/basic_flow.flow
@@ -0,0 +1,40 @@
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - shell_bash
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_bash
+    # Describe the type of the job
+    type: command
+    config:
+      command: bash ./sample_script.sh
diff --git a/test/execution-test-data/multipleflowyamltest/basic_flow2.flow b/test/execution-test-data/multipleflowyamltest/basic_flow2.flow
new file mode 100644
index 0000000..e87b712
--- /dev/null
+++ b/test/execution-test-data/multipleflowyamltest/basic_flow2.flow
@@ -0,0 +1,33 @@
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd