azkaban-aplcache

Flow 2.0 design - Convert AzkabanFlow to Flow. Load project

10/23/2017 11:15:56 PM

Changes

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 2d598ee..b3f4851 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -33,12 +33,15 @@ package azkaban;
 public class Constants {
 
   // Azkaban Flow Versions
-  public static final String AZKABAN_FLOW_VERSION_2_0 = "2.0";
+  public static final double AZKABAN_FLOW_VERSION_2_0 = 2.0;
 
   // Flow 2.0 file suffix
   public static final String PROJECT_FILE_SUFFIX = ".project";
   public static final String FLOW_FILE_SUFFIX = ".flow";
 
+  // Flow 2.0 node type
+  public static final String FLOW_NODE_TYPE = "flow";
+
   // Names and paths of various file names to configure Azkaban
   public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
   public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
@@ -65,6 +68,9 @@ public class Constants {
 
   public static class ConfigurationKeys {
 
+    // Configures Azkaban Flow Version in project YAML file
+    public static final String AZKABAN_FLOW_VERSION = "azkaban-flow-version";
+
     // These properties are configurable through azkaban.properties
     public static final String AZKABAN_PID_FILENAME = "azkaban.pid.filename";
 
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index 991a954..0594717 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -17,7 +17,6 @@
 package azkaban.flow;
 
 import azkaban.executor.mail.DefaultMailCreator;
-import azkaban.project.AzkabanFlow;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -49,8 +48,6 @@ public class Flow {
 
   private boolean isLayedOut = false;
 
-  private AzkabanFlow azkabanFlow;
-
   public Flow(final String id) {
     this.id = id;
   }
@@ -411,11 +408,4 @@ public class Flow {
     this.projectId = projectId;
   }
 
-  public AzkabanFlow getAzkabanFlow() {
-    return this.azkabanFlow;
-  }
-
-  public void setAzkabanFlow(final AzkabanFlow azkabanFlow) {
-    this.azkabanFlow = azkabanFlow;
-  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
index 2dbbd5b..997719e 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -17,6 +17,7 @@
 
 package azkaban.project;
 
+import azkaban.Constants;
 import azkaban.utils.Props;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -36,7 +37,7 @@ public class AzkabanFlow extends AzkabanNode {
 
   private AzkabanFlow(final String name, final Props props,
       final Map<String, AzkabanNode> nodes, final List<String> dependsOn) {
-    super(name, props, dependsOn);
+    super(name, Constants.FLOW_NODE_TYPE, props, dependsOn);
     this.nodes = nodes;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
index 64d109b..9a60740 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
@@ -28,16 +28,9 @@ import java.util.List;
  */
 public class AzkabanJob extends AzkabanNode {
 
-  private final String type;
-
   private AzkabanJob(final String name, final String type, final Props props,
       final List<String> dependsOn) {
-    super(name, props, dependsOn);
-    this.type = type;
-  }
-
-  public String getType() {
-    return this.type;
+    super(name, type, props, dependsOn);
   }
 
   public static class AzkabanJobBuilder {
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
index 3e566dd..079cf43 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
@@ -28,11 +28,14 @@ import java.util.List;
 public abstract class AzkabanNode {
 
   protected final String name;
+  protected final String type;
   protected final Props props;
   protected final List<String> dependsOn;
 
-  public AzkabanNode(final String name, final Props props, final List<String> dependsOn) {
+  public AzkabanNode(final String name, final String type, final Props props, final List<String>
+      dependsOn) {
     this.name = requireNonNull(name);
+    this.type = requireNonNull(type);
     this.props = requireNonNull(props);
     this.dependsOn = dependsOn;
   }
@@ -41,6 +44,10 @@ public abstract class AzkabanNode {
     return this.name;
   }
 
+  public String getType() {
+    return this.type;
+  }
+
   public Props getProps() {
     return this.props;
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index 110fe80..3a7aec8 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -184,35 +184,39 @@ class AzkabanProjectLoader {
   private void persistProject(final Project project, final FlowLoader loader, final File archive,
       final User uploader) throws ProjectManagerException {
     synchronized (project) {
+      final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
+      final Map<String, Flow> flows = loader.getFlowMap();
+      for (final Flow flow : flows.values()) {
+        flow.setProjectId(project.getId());
+        flow.setVersion(newVersion);
+      }
+
+      this.storageManager.uploadProject(project, newVersion, archive, uploader);
+
+      log.info("Uploading flow to db for project " + archive.getName());
+      this.projectLoader.uploadFlows(project, newVersion, flows.values());
+      log.info("Changing project versions for project " + archive.getName());
+      this.projectLoader.changeProjectVersion(project, newVersion,
+          uploader.getUserId());
+      project.setFlows(flows);
+
       if (loader instanceof DirectoryFlowLoader) {
         final DirectoryFlowLoader directoryFlowLoader = (DirectoryFlowLoader) loader;
-        final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
-        final Map<String, Flow> flows = directoryFlowLoader.getFlowMap();
-        for (final Flow flow : flows.values()) {
-          flow.setProjectId(project.getId());
-          flow.setVersion(newVersion);
-        }
-
-        this.storageManager.uploadProject(project, newVersion, archive, uploader);
-
-        log.info("Uploading flow to db " + archive.getName());
-        this.projectLoader.uploadFlows(project, newVersion, flows.values());
-        log.info("Changing project versions " + archive.getName());
-        this.projectLoader.changeProjectVersion(project, newVersion,
-            uploader.getUserId());
-        project.setFlows(flows);
         log.info("Uploading Job properties");
         this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
             directoryFlowLoader.getJobPropsMap().values()));
         log.info("Uploading Props properties");
         this.projectLoader.uploadProjectProperties(project, directoryFlowLoader.getPropsList());
-        this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
-            "Uploaded project files zip " + archive.getName());
+
       } else if (loader instanceof DirectoryYamlFlowLoader) {
         // Todo jamiesjc: upload yaml file to DB as a blob
+
       } else {
         throw new ProjectManagerException("Invalid type of flow loader.");
       }
+
+      this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
+          "Uploaded project files zip " + archive.getName());
     }
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index e92dda9..edb5bf4 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -83,6 +83,7 @@ public class DirectoryFlowLoader implements FlowLoader {
    *
    * @return Map of flow name to Flow.
    */
+  @Override
   public Map<String, Flow> getFlowMap() {
     return this.flowMap;
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index dbb7cce..6431fc9 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -17,14 +17,18 @@
 package azkaban.project;
 
 import azkaban.Constants;
+import azkaban.flow.Edge;
 import azkaban.flow.Flow;
+import azkaban.flow.Node;
 import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.slf4j.Logger;
@@ -40,6 +44,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
   private final Props props;
   private final Set<String> errors = new HashSet<>();
   private final Map<String, Flow> flowMap = new HashMap<>();
+  private final Map<String, List<Edge>> edgeMap = new HashMap<>();
 
   /**
    * Creates a new DirectoryYamlFlowLoader.
@@ -55,6 +60,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
    *
    * @return Map of flow name to Flow.
    */
+  @Override
   public Map<String, Flow> getFlowMap() {
     return this.flowMap;
   }
@@ -69,6 +75,15 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
   }
 
   /**
+   * Returns the edge map constructed from the loaded flows.
+   *
+   * @return Map of flow name to all its Edges.
+   */
+  public Map<String, List<Edge>> getEdgeMap() {
+    return this.edgeMap;
+  }
+
+  /**
    * Loads all project flows from the directory.
    *
    * @param project The project.
@@ -83,7 +98,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
   }
 
   private void convertYamlFiles(final File projectDir) {
-    // Todo jamiesjc: convert project yaml file. It will contain properties for all flows.
+    // Todo jamiesjc: convert project yaml file.
 
     final File[] flowFiles = projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX));
     for (final File file : flowFiles) {
@@ -91,17 +106,87 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
       try {
         final NodeBean nodeBean = loader.load(file);
         final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
-        final Flow flow = new Flow(azkabanFlow.getName());
-        flow.setAzkabanFlow(azkabanFlow);
-        this.flowMap.put(azkabanFlow.getName(), flow);
-        final Props flowProps = azkabanFlow.getProps();
-        FlowLoaderUtils.addEmailPropsToFlow(flow, flowProps);
+        final Flow flow = convertAzkabanFlowToFlow(azkabanFlow);
+        this.flowMap.put(flow.getId(), flow);
       } catch (final FileNotFoundException e) {
         logger.error("Error loading flow yaml files", e);
       }
     }
   }
 
+  private Flow convertAzkabanFlowToFlow(final AzkabanFlow azkabanFlow) {
+    final Flow flow = new Flow(azkabanFlow.getName());
+    final Props props = azkabanFlow.getProps();
+    FlowLoaderUtils.addEmailPropsToFlow(flow, props);
+
+    // Convert azkabanNodes to nodes inside the flow.
+    azkabanFlow.getNodes().values().stream().map(this::convertAzkabanNodeToNode)
+        .forEach(n -> flow.addNode(n));
+
+    // Add edges for the flow.
+    buildFlowEdges(azkabanFlow);
+    flow.addAllEdges(this.edgeMap.get(flow.getId()));
+
+    // Todo jamiesjc: deprecate startNodes, endNodes and numLevels, and remove below method finally.
+    // Blow method will construct startNodes, endNodes and numLevels for the flow.
+    flow.initialize();
+
+    return flow;
+  }
+
+  private Node convertAzkabanNodeToNode(final AzkabanNode azkabanNode) {
+    final Node node = new Node(azkabanNode.getName());
+    node.setType(azkabanNode.getType());
+
+    if (azkabanNode.getType().equals(Constants.FLOW_NODE_TYPE)) {
+      node.setEmbeddedFlowId(azkabanNode.getName());
+      final Flow flow_node = convertAzkabanFlowToFlow((AzkabanFlow) azkabanNode);
+      this.flowMap.put(flow_node.getId(), flow_node);
+    }
+    return node;
+  }
+
+  private void buildFlowEdges(final AzkabanFlow azkabanFlow) {
+    // Recursive stack to record searched nodes. Used for detecting dependency cycles.
+    final HashSet<String> recStack = new HashSet<>();
+    // Nodes that have already been visited and added edges.
+    final HashSet<String> visited = new HashSet<>();
+    for (final AzkabanNode node : azkabanFlow.getNodes().values()) {
+      addEdges(node, azkabanFlow, recStack, visited);
+    }
+  }
+
+  private void addEdges(final AzkabanNode node, final AzkabanFlow azkabanFlow,
+      final HashSet<String> recStack, final HashSet<String> visited) {
+    if (!visited.contains(node.getName())) {
+      recStack.add(node.getName());
+      visited.add(node.getName());
+      final List<String> dependsOnList = node.getDependsOn();
+      for (final String parent : dependsOnList) {
+        final Edge edge = new Edge(parent, node.getName());
+        if (!this.edgeMap.containsKey(azkabanFlow.getName())) {
+          this.edgeMap.put(azkabanFlow.getName(), new ArrayList<>());
+        }
+        this.edgeMap.get(azkabanFlow.getName()).add(edge);
+
+        if (recStack.contains(parent)) {
+          // Cycles found, including self cycle.
+          edge.setError("Cycles found.");
+          this.errors.add("Cycles found at " + edge.getId());
+        } else if (!azkabanFlow.getNodes().containsKey(parent)) {
+          // Dependency not found.
+          edge.setError("Dependency not found.");
+          this.errors.add(node.getName() + " cannot find dependency "
+              + parent);
+        } else {
+          // Valid edge. Continue to process the parent node recursively.
+          addEdges(azkabanFlow.getNode(parent), azkabanFlow, recStack, visited);
+        }
+      }
+      recStack.remove(node.getName());
+    }
+  }
+
   public void checkJobProperties(final Project project) {
     // Todo jamiesjc: implement the check later
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoader.java b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
index ce796da..8fd4f91 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
@@ -16,8 +16,10 @@
 
 package azkaban.project;
 
+import azkaban.flow.Flow;
 import azkaban.project.validator.ValidationReport;
 import java.io.File;
+import java.util.Map;
 
 /**
  * Interface to load project flows.
@@ -32,4 +34,11 @@ public interface FlowLoader {
    * @return the validation report.
    */
   ValidationReport loadProjectFlow(final Project project, final File projectDir);
+
+  /**
+   * Returns the flow map constructed from the loaded flows.
+   *
+   * @return Map of flow name to Flow.
+   */
+  Map<String, Flow> getFlowMap();
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
index 9dc170d..439f8c0 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
@@ -19,9 +19,15 @@ package azkaban.project;
 import static java.util.Objects.requireNonNull;
 
 import azkaban.Constants;
+import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.utils.Props;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
 import javax.inject.Inject;
+import org.apache.commons.lang.ArrayUtils;
+import org.yaml.snakeyaml.Yaml;
 
 /**
  * Factory class to generate flow loaders.
@@ -41,21 +47,48 @@ public class FlowLoaderFactory {
   }
 
   /**
-   * Creates flow loader based on manifest file inside project directory.
+   * Creates flow loader based on project YAML file inside project directory.
    *
    * @param projectDir the project directory
    * @return the flow loader
    */
   public FlowLoader createFlowLoader(final File projectDir) throws ProjectManagerException {
-    // Todo jamiesjc: need to check if manifest file exists in project directory,
-    // and create FlowLoader based on different flow versions specified in the manifest file.
-    final String flowVersion = null;
-    if (flowVersion == null) {
-      return new DirectoryFlowLoader(this.props);
-    } else if (flowVersion.equals(Constants.AZKABAN_FLOW_VERSION_2_0)) {
-      return new DirectoryYamlFlowLoader(this.props);
+
+    final File[] projectFileList = projectDir.listFiles(new SuffixFilter(Constants
+        .PROJECT_FILE_SUFFIX));
+
+    if (projectFileList == null) {
+      throw new ProjectManagerException("Error reading project directory. Input is not a "
+          + "directory or IO error happens.");
+    }
+
+    if (ArrayUtils.isNotEmpty(projectFileList)) {
+      if (projectFileList.length > 1) {
+        throw new ProjectManagerException("Duplicate project YAML files found in the project "
+            + "directory. Only one is allowed.");
+      }
+
+      final Map<String, Object> azkabanProject;
+      try (FileInputStream fis = new FileInputStream(projectFileList[0])) {
+        azkabanProject = (Map<String, Object>) new Yaml().load(fis);
+      } catch (final IOException e) {
+        throw new ProjectManagerException("Error reading project YAML file.", e);
+      }
+
+      if (azkabanProject == null || !azkabanProject
+          .containsKey(Constants.ConfigurationKeys.AZKABAN_FLOW_VERSION)) {
+        throw new ProjectManagerException("azkaban-flow-version is not specified in the project "
+            + "YAML file.");
+      }
+
+      if (azkabanProject.get(Constants.ConfigurationKeys.AZKABAN_FLOW_VERSION).equals
+          (Constants.AZKABAN_FLOW_VERSION_2_0)) {
+        return new DirectoryYamlFlowLoader(this.props);
+      } else {
+        throw new ProjectManagerException("Invalid azkaban-flow-version in the project YAML file.");
+      }
     } else {
-      throw new ProjectManagerException("Flow version " + flowVersion + "is invalid.");
+      return new DirectoryFlowLoader(this.props);
     }
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index e55bbb7..2669462 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -16,19 +16,36 @@
 
 package azkaban.project;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DirectoryYamlFlowLoaderTest {
 
+  private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoaderTest
+      .class);
+
   private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
   private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
-  private static final String FLOW_NAME_1 = "basic_flow";
-  private static final String FLOW_NAME_2 = "basic_flow2";
+  private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowyamltest";
+  private static final String INVALID_FLOW_YAML_DIR = "invalidflowyamltest";
+  private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
+  private static final String BASIC_FLOW_1 = "basic_flow";
+  private static final String BASIC_FLOW_2 = "basic_flow2";
+  private static final String EMBEDDED_FLOW = "embedded_flow";
+  private static final String EMBEDDED_FLOW_1 = "embedded_flow1";
+  private static final String EMBEDDED_FLOW_2 = "embedded_flow2";
+  private static final String INVALID_FLOW_1 = "dependency_not_found";
+  private static final String INVALID_FLOW_2 = "cycle_found";
+  private static final String DEPENDENCY_NOT_FOUND_ERROR = "Dependency not found.";
+  private static final String CYCLE_FOUND_ERROR = "Cycles found.";
   private Project project;
 
   @Before
@@ -37,31 +54,74 @@ public class DirectoryYamlFlowLoaderTest {
   }
 
   @Test
-  public void testLoadYamlFileFromDirectory() {
+  public void testLoadBasicYamlFile() {
     final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
-
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(BASIC_FLOW_YAML_DIR));
-    Assert.assertEquals(0, loader.getErrors().size());
-    Assert.assertEquals(1, loader.getFlowMap().size());
-    Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_1));
-    final Flow flow = loader.getFlowMap().get(FLOW_NAME_1);
-    final AzkabanFlow azkabanFlow = flow.getAzkabanFlow();
-    Assert.assertEquals(FLOW_NAME_1, azkabanFlow.getName());
-    Assert.assertEquals(4, azkabanFlow.getNodes().size());
+    checkFlowLoaderProperties(loader, 0, 1, 1);
+    checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 3, null);
   }
 
   @Test
-  public void testLoadMultipleYamlFilesFromDirectory() {
+  public void testLoadMultipleYamlFiles() {
     final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
-
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(MULTIPLE_FLOW_YAML_DIR));
-    Assert.assertEquals(0, loader.getErrors().size());
-    Assert.assertEquals(2, loader.getFlowMap().size());
-    Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_1));
-    Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_2));
-    final Flow flow2 = loader.getFlowMap().get(FLOW_NAME_2);
-    final AzkabanFlow azkabanFlow2 = flow2.getAzkabanFlow();
-    Assert.assertEquals(FLOW_NAME_2, azkabanFlow2.getName());
-    Assert.assertEquals(3, azkabanFlow2.getNodes().size());
+    checkFlowLoaderProperties(loader, 0, 2, 2);
+    checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 3, null);
+    checkFlowProperties(loader, BASIC_FLOW_2, 0, 3, 2, null);
+  }
+
+  @Test
+  public void testLoadEmbeddedFlowYamlFile() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(EMBEDDED_FLOW_YAML_DIR));
+    checkFlowLoaderProperties(loader, 0, 3, 3);
+    checkFlowProperties(loader, EMBEDDED_FLOW, 0, 4, 3, null);
+    checkFlowProperties(loader, EMBEDDED_FLOW_1, 0, 4, 3, null);
+    checkFlowProperties(loader, EMBEDDED_FLOW_2, 0, 2, 1, null);
+  }
+
+  @Test
+  public void testLoadInvalidFlowYamlFiles() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_FLOW_YAML_DIR));
+    checkFlowLoaderProperties(loader, 2, 2, 2);
+    // Invalid flow 1: Dependency not found.
+    checkFlowProperties(loader, INVALID_FLOW_1, 1, 3, 3, DEPENDENCY_NOT_FOUND_ERROR);
+    // Invalid flow 2: Cycles found.
+    checkFlowProperties(loader, INVALID_FLOW_2, 1, 4, 4, CYCLE_FOUND_ERROR);
+  }
+
+  @Test
+  public void testLoadNoFlowYamlFile() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(NO_FLOW_YAML_DIR));
+    checkFlowLoaderProperties(loader, 0, 0, 0);
+  }
+
+  private void checkFlowLoaderProperties(final DirectoryYamlFlowLoader loader, final int numError,
+      final int numFlowMap, final int numEdgeMap) {
+    assertThat(loader.getErrors().size()).isEqualTo(numError);
+    assertThat(loader.getFlowMap().size()).isEqualTo(numFlowMap);
+    assertThat(loader.getEdgeMap().size()).isEqualTo(numEdgeMap);
+  }
+
+  private void checkFlowProperties(final DirectoryYamlFlowLoader loader, final String flowName,
+      final int numError, final int numNode, final int numEdge, final String edgeError) {
+    assertThat(loader.getFlowMap().containsKey(flowName)).isTrue();
+    final Flow flow = loader.getFlowMap().get(flowName);
+    if (numError != 0) {
+      assertThat(flow.getErrors().size()).isEqualTo(numError);
+    }
+    assertThat(flow.getNodes().size()).isEqualTo(numNode);
+
+    // Verify flow edges
+    assertThat(loader.getEdgeMap().get(flowName).size()).isEqualTo(numEdge);
+    assertThat(flow.getEdges().size()).isEqualTo(numEdge);
+    for (final Edge edge : loader.getEdgeMap().get(flowName)) {
+      this.logger.info(flowName + ".flow has edge: " + edge.getId());
+      if (edge.getError() != null) {
+        assertThat(edge.getError()).isEqualTo(edgeError);
+      }
+    }
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
index 4aceded..2cb6fac 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
@@ -16,40 +16,63 @@
 
 package azkaban.project;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
 import java.io.File;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class FlowLoaderFactoryTest {
 
   private static final String FLOW_10_TEST_DIRECTORY = "exectest1";
   private static final String FLOW_20_TEST_DIRECTORY = "basicflowyamltest";
-  private Project project;
-
-  @Before
-  public void setUp() throws Exception {
-    this.project = new Project(13, "myTestProject");
-  }
+  private static final String DUPLICATE_PROJECT_DIRECTORY = "duplicateprojectyamltest";
+  private static final String INVALID_FLOW_VERSION_DIRECTORY = "invalidflowversiontest";
+  private static final String NO_FLOW_VERSION_DIRECTORY = "noflowversiontest";
 
   @Test
   public void testCreateDirectoryFlowLoader() {
     final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
     final File projectDir = ExecutionsTestUtil.getFlowDir(FLOW_10_TEST_DIRECTORY);
     final FlowLoader loader = loaderFactory.createFlowLoader(projectDir);
-    Assert.assertTrue(loader instanceof DirectoryFlowLoader);
+    assertThat(loader instanceof DirectoryFlowLoader).isTrue();
   }
 
-  // Todo jamiesjc: add the manifest file with flow version 2.0 and enable this test
   @Test
-  @Ignore
   public void testCreateDirectoryYamlFlowLoader() {
     final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
     final File projectDir = ExecutionsTestUtil.getFlowDir(FLOW_20_TEST_DIRECTORY);
     final FlowLoader loader = loaderFactory.createFlowLoader(projectDir);
-    Assert.assertTrue(loader instanceof DirectoryYamlFlowLoader);
+    assertThat(loader instanceof DirectoryYamlFlowLoader).isTrue();
+  }
+
+  @Test
+  public void testDuplicateProjectYamlFilesException() {
+    final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+    final File projectDir = ExecutionsTestUtil.getFlowDir(DUPLICATE_PROJECT_DIRECTORY);
+    assertThatThrownBy(() -> loaderFactory.createFlowLoader(projectDir))
+        .isInstanceOf(ProjectManagerException.class)
+        .hasMessageContaining(
+            "Duplicate project YAML files found in the project directory. Only one is allowed.");
+  }
+
+  @Test
+  public void testNoFlowVersionException() {
+    final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+    final File projectDir = ExecutionsTestUtil.getFlowDir(NO_FLOW_VERSION_DIRECTORY);
+    assertThatThrownBy(() -> loaderFactory.createFlowLoader(projectDir))
+        .isInstanceOf(ProjectManagerException.class)
+        .hasMessageContaining("azkaban-flow-version is not specified in the project YAML file.");
+  }
+
+  @Test
+  public void testInvalidFlowVersionException() {
+    final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+    final File projectDir = ExecutionsTestUtil.getFlowDir(INVALID_FLOW_VERSION_DIRECTORY);
+    assertThatThrownBy(() -> loaderFactory.createFlowLoader(projectDir))
+        .isInstanceOf(ProjectManagerException.class)
+        .hasMessageContaining("Invalid azkaban-flow-version in the project YAML file.");
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index ddc3845..157c5d1 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -19,6 +19,7 @@ package azkaban.project;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import azkaban.Constants;
 import azkaban.test.executions.ExecutionsTestUtil;
 import org.junit.Test;
 
@@ -41,7 +42,6 @@ public class NodeBeanLoaderTest {
   private static final String EMBEDDED_FLOW2 = "embedded_flow2";
   private static final String TYPE_NOOP = "noop";
   private static final String TYPE_COMMAND = "command";
-  private static final String TYPE_FLOW = "flow";
 
   @Test
   public void testLoadNodeBeanForBasicFlow() throws Exception {
@@ -51,7 +51,7 @@ public class NodeBeanLoaderTest {
         BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
 
     assertThat(nodeBean.getName()).isEqualTo(BASIC_FLOW_NAME);
-    assertThat(nodeBean.getType()).isEqualTo(TYPE_FLOW);
+    assertThat(nodeBean.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
     assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
     assertThat(nodeBean.getNodes().size()).isEqualTo(4);
 
@@ -74,7 +74,7 @@ public class NodeBeanLoaderTest {
         EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
 
     assertThat(nodeBean.getName()).isEqualTo(EMBEDDED_FLOW_NAME);
-    assertThat(nodeBean.getType()).isEqualTo(TYPE_FLOW);
+    assertThat(nodeBean.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
     assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
     assertThat(nodeBean.getNodes().size()).isEqualTo(4);
 
@@ -91,7 +91,7 @@ public class NodeBeanLoaderTest {
 
     final NodeBean node3 = nodeBean.getNodes().get(3);
     assertThat(node3.getName()).isEqualTo(EMBEDDED_FLOW1);
-    assertThat(node3.getType()).isEqualTo(TYPE_FLOW);
+    assertThat(node3.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
     assertThat(node3.getNodes().size()).isEqualTo(4);
 
     // Verify nodes in embedded_flow1 are loaded correctly.
@@ -103,7 +103,7 @@ public class NodeBeanLoaderTest {
 
     final NodeBean node3_2 = node3.getNodes().get(2);
     assertThat(node3_2.getName()).isEqualTo(EMBEDDED_FLOW2);
-    assertThat(node3_2.getType()).isEqualTo(TYPE_FLOW);
+    assertThat(node3_2.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
     assertThat(node3_2.getDependsOn()).contains(SHELL_BASH);
     assertThat(node3_2.getNodes().size()).isEqualTo(2);
 
diff --git a/test/execution-test-data/basicflowyamltest/basic_flow.project b/test/execution-test-data/basicflowyamltest/basic_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/basicflowyamltest/basic_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/duplicateprojectyamltest/basic_flow.flow b/test/execution-test-data/duplicateprojectyamltest/basic_flow.flow
new file mode 100644
index 0000000..689f9fe
--- /dev/null
+++ b/test/execution-test-data/duplicateprojectyamltest/basic_flow.flow
@@ -0,0 +1,41 @@
+---
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - shell_bash
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_bash
+    # Describe the type of the job
+    type: command
+    config:
+      command: bash ./sample_script.sh
diff --git a/test/execution-test-data/duplicateprojectyamltest/basic_flow.project b/test/execution-test-data/duplicateprojectyamltest/basic_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/duplicateprojectyamltest/basic_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/duplicateprojectyamltest/basic_flow2.project b/test/execution-test-data/duplicateprojectyamltest/basic_flow2.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/duplicateprojectyamltest/basic_flow2.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/embeddedflowyamltest/embedded_flow.project b/test/execution-test-data/embeddedflowyamltest/embedded_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/embeddedflowyamltest/embedded_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/invalidflowversiontest/basic_flow.flow b/test/execution-test-data/invalidflowversiontest/basic_flow.flow
new file mode 100644
index 0000000..689f9fe
--- /dev/null
+++ b/test/execution-test-data/invalidflowversiontest/basic_flow.flow
@@ -0,0 +1,41 @@
+---
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - shell_bash
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_bash
+    # Describe the type of the job
+    type: command
+    config:
+      command: bash ./sample_script.sh
diff --git a/test/execution-test-data/invalidflowversiontest/basic_flow.project b/test/execution-test-data/invalidflowversiontest/basic_flow.project
new file mode 100644
index 0000000..678ff89
--- /dev/null
+++ b/test/execution-test-data/invalidflowversiontest/basic_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 1.0
diff --git a/test/execution-test-data/invalidflowyamltest/cycle_found.flow b/test/execution-test-data/invalidflowyamltest/cycle_found.flow
new file mode 100644
index 0000000..f6943d9
--- /dev/null
+++ b/test/execution-test-data/invalidflowyamltest/cycle_found.flow
@@ -0,0 +1,30 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: shell_end
+    type: noop
+    dependsOn:
+      - shell_echo
+
+  - name: shell_echo
+    type: command
+    dependsOn:
+      - shell_pwd
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    type: command
+    dependsOn:
+      - shell_bash
+    config:
+      command: pwd
+
+  - name: shell_bash
+    type: command
+    dependsOn:
+      - shell_echo
+    config:
+      command: bash ./sample_script.sh
diff --git a/test/execution-test-data/invalidflowyamltest/dependency_not_found.flow b/test/execution-test-data/invalidflowyamltest/dependency_not_found.flow
new file mode 100644
index 0000000..dc8ab6e
--- /dev/null
+++ b/test/execution-test-data/invalidflowyamltest/dependency_not_found.flow
@@ -0,0 +1,21 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: shell_end
+    type: noop
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - shell_bash
+
+  - name: shell_echo
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/invalidflowyamltest/invalid_flow.project b/test/execution-test-data/invalidflowyamltest/invalid_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/invalidflowyamltest/invalid_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/multipleflowyamltest/multiple_flows.project b/test/execution-test-data/multipleflowyamltest/multiple_flows.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/multipleflowyamltest/multiple_flows.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/noflowversiontest/basic_flow.flow b/test/execution-test-data/noflowversiontest/basic_flow.flow
new file mode 100644
index 0000000..689f9fe
--- /dev/null
+++ b/test/execution-test-data/noflowversiontest/basic_flow.flow
@@ -0,0 +1,41 @@
+---
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - shell_bash
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_bash
+    # Describe the type of the job
+    type: command
+    config:
+      command: bash ./sample_script.sh
diff --git a/test/execution-test-data/noflowversiontest/no_flow_version.project b/test/execution-test-data/noflowversiontest/no_flow_version.project
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/execution-test-data/noflowversiontest/no_flow_version.project
diff --git a/test/execution-test-data/noflowyamltest/no_flow.project b/test/execution-test-data/noflowyamltest/no_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/noflowyamltest/no_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0