azkaban-aplcache

Changes

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 3616edd..bf8b529 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -40,6 +40,7 @@ public class Constants {
   public static final String FLOW_FILE_SUFFIX = ".flow";
 
   // Flow 2.0 node type
+  public static final String NODE_TYPE = "type";
   public static final String FLOW_NODE_TYPE = "flow";
 
   // Flow 2.0 flow and job path delimiter
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index edb5bf4..b92a183 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -93,6 +93,7 @@ public class DirectoryFlowLoader implements FlowLoader {
    *
    * @return Set of error strings.
    */
+  @Override
   public Set<String> getErrors() {
     return this.errors;
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 93ae347..ca3e1d7 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -72,6 +72,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
    *
    * @return Set of error strings.
    */
+  @Override
   public Set<String> getErrors() {
     return this.errors;
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoader.java b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
index 8fd4f91..0e7d11c 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
@@ -20,6 +20,7 @@ import azkaban.flow.Flow;
 import azkaban.project.validator.ValidationReport;
 import java.io.File;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Interface to load project flows.
@@ -41,4 +42,11 @@ public interface FlowLoader {
    * @return Map of flow name to Flow.
    */
   Map<String, Flow> getFlowMap();
+
+  /**
+   * Returns errors caught when loading flows.
+   *
+   * @return Set of error strings.
+   */
+  Set<String> getErrors();
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index 390888f..aa083be 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -52,7 +52,7 @@ public class FlowLoaderUtils {
     try {
       final NodeBean nodeBean = loader.load(flowFile);
       final String[] pathList = path.split(Constants.PATH_DELIMITER);
-      if (findPropsFromNodeBean(loader, nodeBean, pathList, 0, propsList)) {
+      if (findPropsFromNodeBean(nodeBean, pathList, 0, propsList)) {
         if (!propsList.isEmpty()) {
           return propsList.get(0);
         } else {
@@ -68,22 +68,21 @@ public class FlowLoaderUtils {
   /**
    * Helper method to recursively find props from node bean.
    *
-   * @param loader the loader
    * @param nodeBean the node bean
    * @param pathList the path list
    * @param idx the idx
    * @param propsList the props list
    * @return the boolean
    */
-  public static boolean findPropsFromNodeBean(final NodeBeanLoader loader, final NodeBean nodeBean,
+  public static boolean findPropsFromNodeBean(final NodeBean nodeBean,
       final String[] pathList, final int idx, final List<Props> propsList) {
     if (idx < pathList.length && nodeBean.getName().equals(pathList[idx])) {
       if (idx == pathList.length - 1) {
-        propsList.add(loader.getNodeProps(nodeBean));
+        propsList.add(nodeBean.getProps());
         return true;
       }
       for (final NodeBean bean : nodeBean.getNodes()) {
-        if (findPropsFromNodeBean(loader, bean, pathList, idx + 1, propsList)) {
+        if (findPropsFromNodeBean(bean, pathList, idx + 1, propsList)) {
           return true;
         }
       }
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
index 2cc55d5..8e17609 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
@@ -342,6 +342,10 @@ class JdbcProjectHandlerSet {
         "SELECT flow_file FROM project_flow_files WHERE "
             + "project_id=? AND project_version=? AND flow_name=? AND flow_version=?";
 
+    public static String SELECT_ALL_FLOW_FILES =
+        "SELECT flow_file FROM project_flow_files WHERE "
+            + "project_id=? AND project_version=?";
+
     @Override
     public List<byte[]> handle(final ResultSet rs) throws SQLException {
       if (!rs.next()) {
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index a69ca0f..2ca61bd 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -1058,4 +1058,22 @@ public class JdbcProjectImpl implements ProjectLoader {
               projectVersion + ", flow " + flowName + ".", e);
     }
   }
+
+  @Override
+  public boolean isFlowFileUploaded(final int projectId, final int projectVersion)
+      throws ProjectManagerException {
+    final FlowFileResultHandler handler = new FlowFileResultHandler();
+    final List<byte[]> data;
+
+    try {
+      data = this.dbOperator
+          .query(FlowFileResultHandler.SELECT_ALL_FLOW_FILES, handler,
+              projectId, projectVersion);
+    } catch (final SQLException e) {
+      logger.error(e);
+      throw new ProjectManagerException("Failed to query uploaded flow files ", e);
+    }
+
+    return !data.isEmpty();
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBean.java b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
index eb62481..5cafc37 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
@@ -17,6 +17,8 @@
 
 package azkaban.project;
 
+import azkaban.Constants;
+import azkaban.utils.Props;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
@@ -72,6 +74,12 @@ public class NodeBean implements Serializable {
     this.nodes = nodes;
   }
 
+  public Props getProps() {
+    final Props props = new Props(null, this.getConfig());
+    props.put(Constants.NODE_TYPE, this.getType());
+    return props;
+  }
+
   @Override
   public String toString() {
     return "NodeBean{" +
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 00443aa..079924a 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -20,7 +20,6 @@ package azkaban.project;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import azkaban.Constants;
-import azkaban.utils.Props;
 import com.google.common.io.Files;
 import java.io.File;
 import java.io.FileInputStream;
@@ -35,15 +34,13 @@ import org.yaml.snakeyaml.Yaml;
  */
 public class NodeBeanLoader {
 
-  private static final String NODE_BEAN_TYPE_FLOW = "flow";
-
   public NodeBean load(final File flowFile) throws FileNotFoundException {
     checkArgument(flowFile.exists());
     checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
 
     final NodeBean nodeBean = new Yaml().loadAs(new FileInputStream(flowFile), NodeBean.class);
     nodeBean.setName(getFlowName(flowFile));
-    nodeBean.setType(NODE_BEAN_TYPE_FLOW);
+    nodeBean.setType(Constants.FLOW_NODE_TYPE);
     return nodeBean;
   }
 
@@ -67,10 +64,10 @@ public class NodeBeanLoader {
   }
 
   public AzkabanNode toAzkabanNode(final NodeBean nodeBean) {
-    if (nodeBean.getType().equals(NODE_BEAN_TYPE_FLOW)) {
+    if (nodeBean.getType().equals(Constants.FLOW_NODE_TYPE)) {
       return new AzkabanFlow.AzkabanFlowBuilder()
           .setName(nodeBean.getName())
-          .setProps(new Props(null, nodeBean.getConfig()))
+          .setProps(nodeBean.getProps())
           .setDependsOn(nodeBean.getDependsOn())
           .setNodes(
               nodeBean.getNodes().stream().map(this::toAzkabanNode).collect(Collectors.toList()))
@@ -78,17 +75,13 @@ public class NodeBeanLoader {
     } else {
       return new AzkabanJob.AzkabanJobBuilder()
           .setName(nodeBean.getName())
-          .setProps(new Props(null, nodeBean.getConfig()))
+          .setProps(nodeBean.getProps())
           .setType(nodeBean.getType())
           .setDependsOn(nodeBean.getDependsOn())
           .build();
     }
   }
 
-  public Props getNodeProps(final NodeBean nodeBean) {
-    return new Props(null, nodeBean.getConfig());
-  }
-
   public String getFlowName(final File flowFile) {
     checkArgument(flowFile.exists());
     checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 42062b6..ae332a7 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -219,4 +219,10 @@ public interface ProjectLoader {
   int getLatestFlowVersion(int projectId, int projectVersion, String flowName)
       throws ProjectManagerException;
 
+  /**
+   * Gets all flow files that's uploaded.
+   */
+  boolean isFlowFileUploaded(int projectId, int projectVersion)
+      throws ProjectManagerException;
+
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index ba64698..ccdbbc0 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -136,13 +136,15 @@ public class NodeBeanLoaderTest {
     final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
     assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
     assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
-    assertThat(shellEnd.getProps().size()).isEqualTo(0);
+    assertThat(shellEnd.getProps().size()).isEqualTo(1);
+    assertThat(shellEnd.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_NOOP);
     assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
 
     final AzkabanJob shellEcho = (AzkabanJob) flow.getNode(SHELL_ECHO);
     assertThat(shellEcho.getName()).isEqualTo(SHELL_ECHO);
     assertThat(shellEcho.getType()).isEqualTo(TYPE_COMMAND);
-    assertThat(shellEcho.getProps().size()).isEqualTo(1);
+    assertThat(shellEcho.getProps().size()).isEqualTo(2);
+    assertThat(shellEcho.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
     assertThat(shellEcho.getProps().get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
   }
 
@@ -160,7 +162,8 @@ public class NodeBeanLoaderTest {
     final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
     assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
     assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
-    assertThat(shellEnd.getProps().size()).isEqualTo(0);
+    assertThat(shellEnd.getProps().size()).isEqualTo(1);
+    assertThat(shellEnd.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_NOOP);
     assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1);
 
     final AzkabanFlow embeddedFlow1 = (AzkabanFlow) flow.getNode(EMBEDDED_FLOW1);
@@ -189,7 +192,8 @@ public class NodeBeanLoaderTest {
   public void testGetFlowProps() {
     final Props flowProps = FlowLoaderUtils.getPropsFromYamlFile(BASIC_FLOW_NAME,
         ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
-    assertThat(flowProps.size()).isEqualTo(1);
+    assertThat(flowProps.size()).isEqualTo(2);
+    assertThat(flowProps.get(Constants.NODE_TYPE)).isEqualTo(Constants.FLOW_NODE_TYPE);
     assertThat(flowProps.get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
   }
 
@@ -198,7 +202,8 @@ public class NodeBeanLoaderTest {
     final Props jobProps = FlowLoaderUtils
         .getPropsFromYamlFile(BASIC_FLOW_NAME + Constants.PATH_DELIMITER + SHELL_ECHO,
             ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
-    assertThat(jobProps.size()).isEqualTo(1);
+    assertThat(jobProps.size()).isEqualTo(2);
+    assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
     assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
   }
 
@@ -216,21 +221,24 @@ public class NodeBeanLoaderTest {
     String jobPrefix = EMBEDDED_FLOW_NAME + Constants.PATH_DELIMITER;
     Props jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_ECHO,
         ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
-    assertThat(jobProps.size()).isEqualTo(1);
+    assertThat(jobProps.size()).isEqualTo(2);
+    assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
     assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
 
     // Get job props from first level embedded flow
     jobPrefix = jobPrefix + EMBEDDED_FLOW1 + Constants.PATH_DELIMITER;
     jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_ECHO,
         ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
-    assertThat(jobProps.size()).isEqualTo(1);
+    assertThat(jobProps.size()).isEqualTo(2);
+    assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
     assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND_1);
 
     // Get job props from second level embedded flow
     jobPrefix = jobPrefix + EMBEDDED_FLOW2 + Constants.PATH_DELIMITER;
     jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_PWD,
         ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
-    assertThat(jobProps.size()).isEqualTo(1);
+    assertThat(jobProps.size()).isEqualTo(2);
+    assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
     assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(PWD_COMMAND);
   }
 }
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 9020f30..8b81297 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
 
 import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
 
+import azkaban.Constants;
 import azkaban.ServiceProvider;
 import azkaban.event.Event;
 import azkaban.event.EventData;
@@ -41,6 +42,7 @@ import azkaban.flow.FlowUtils;
 import azkaban.jobExecutor.ProcessJob;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.metric.MetricReportManager;
+import azkaban.project.FlowLoaderUtils;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
 import azkaban.sla.SlaOption;
@@ -48,6 +50,7 @@ import azkaban.spi.AzkabanEventReporter;
 import azkaban.spi.EventType;
 import azkaban.utils.Props;
 import azkaban.utils.SwapQueue;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import java.io.File;
 import java.io.IOException;
@@ -100,6 +103,8 @@ public class FlowRunner extends EventHandler implements Runnable {
   // Thread safe swap queue for finishedExecutions.
   private final SwapQueue<ExecutableNode> finishedNodes;
   private final AzkabanEventReporter azkabanEventReporter;
+  // Flag to indicate whether to interpret the flow as the new Flow 2.0 definition.
+  private boolean isAzkabanFlowVersion20 = false;
   private Logger logger;
   private Appender flowAppender;
   private File logFile;
@@ -166,6 +171,9 @@ public class FlowRunner extends EventHandler implements Runnable {
     // where the uninitialized logger is used in flow preparing state
     createLogger(this.flow.getFlowId());
     this.azkabanEventReporter = azkabanEventReporter;
+
+    // Todo jamiesjc: Enable below check after DB change of project_flow_files table is rolled out.
+    // this.isAzkabanFlowVersion20 = checkAzkabanFlowVersion();
   }
 
   public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
@@ -194,6 +202,10 @@ public class FlowRunner extends EventHandler implements Runnable {
     return this.execDir;
   }
 
+  public void setAzkabanFlowVersion20(final boolean azkabanFlowVersion20) {
+    this.isAzkabanFlowVersion20 = azkabanFlowVersion20;
+  }
+
   @Override
   public void run() {
     try {
@@ -206,7 +218,9 @@ public class FlowRunner extends EventHandler implements Runnable {
       this.logger.info("Updating initial flow directory.");
       updateFlow();
       this.logger.info("Fetching job and shared properties.");
-      loadAllProperties();
+      if (!this.isAzkabanFlowVersion20) {
+        loadAllProperties();
+      }
 
       this.fireEventListeners(
           Event.create(this, EventType.FLOW_STARTED, new EventData(this.getExecutableFlow())));
@@ -240,6 +254,10 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
   }
 
+  private boolean checkAzkabanFlowVersion() {
+    return this.projectLoader.isFlowFileUploaded(this.flow.getProjectId(), this.flow.getVersion());
+  }
+
   private void setupFlowExecution() {
     final int projectId = this.flow.getProjectId();
     final int version = this.flow.getVersion();
@@ -248,11 +266,19 @@ public class FlowRunner extends EventHandler implements Runnable {
     // Add a bunch of common azkaban properties
     Props commonFlowProps = FlowUtils.addCommonFlowProperties(null, this.flow);
 
-    if (this.flow.getJobSource() != null) {
-      final String source = this.flow.getJobSource();
-      final Props flowProps = this.sharedProps.get(source);
-      flowProps.setParent(commonFlowProps);
-      commonFlowProps = flowProps;
+    if (this.isAzkabanFlowVersion20) {
+      final Props flowProps = loadFlowPropsFromYamlFile();
+      if (flowProps != null) {
+        flowProps.setParent(commonFlowProps);
+        commonFlowProps = flowProps;
+      }
+    } else {
+      if (this.flow.getJobSource() != null) {
+        final String source = this.flow.getJobSource();
+        final Props flowProps = this.sharedProps.get(source);
+        flowProps.setParent(commonFlowProps);
+        commonFlowProps = flowProps;
+      }
     }
 
     // If there are flow overrides, we apply them now.
@@ -646,13 +672,16 @@ public class FlowRunner extends EventHandler implements Runnable {
     }
 
     Props props = null;
-    // 1. Shared properties (i.e. *.properties) for the jobs only. This takes
-    // the
-    // least precedence
-    if (!(node instanceof ExecutableFlowBase)) {
-      final String sharedProps = node.getPropsSource();
-      if (sharedProps != null) {
-        props = this.sharedProps.get(sharedProps);
+
+    if (!this.isAzkabanFlowVersion20) {
+      // 1. Shared properties (i.e. *.properties) for the jobs only. This takes
+      // the
+      // least precedence
+      if (!(node instanceof ExecutableFlowBase)) {
+        final String sharedProps = node.getPropsSource();
+        if (sharedProps != null) {
+          props = this.sharedProps.get(sharedProps);
+        }
       }
     }
 
@@ -693,37 +722,45 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   private Props loadJobProps(final ExecutableNode node) throws IOException {
     Props props = null;
-    final String source = node.getJobSource();
-    if (source == null) {
-      return null;
-    }
-
-    // load the override props if any
-    try {
-      props =
-          this.projectLoader.fetchProjectProperty(this.flow.getProjectId(),
-              this.flow.getVersion(), node.getId() + ".jor");
-    } catch (final ProjectManagerException e) {
-      e.printStackTrace();
-      this.logger.error("Error loading job override property for job "
-          + node.getId());
-    }
+    if (this.isAzkabanFlowVersion20) {
+      props = loadJobPropsFromYamlFile(node);
+      if (props == null) {
+        this.logger.info("Job props loaded from yaml file is empty for job " + node.getId());
+        return props;
+      }
+    } else {
+      final String source = node.getJobSource();
+      if (source == null) {
+        return null;
+      }
 
-    final File path = new File(this.execDir, source);
-    if (props == null) {
-      // if no override prop, load the original one on disk
+      // load the override props if any
       try {
-        props = new Props(null, path);
-      } catch (final IOException e) {
+        props =
+            this.projectLoader.fetchProjectProperty(this.flow.getProjectId(),
+                this.flow.getVersion(), node.getId() + ".jor");
+      } catch (final ProjectManagerException e) {
         e.printStackTrace();
-        this.logger.error("Error loading job file " + source + " for job "
+        this.logger.error("Error loading job override property for job "
             + node.getId());
       }
-    }
-    // setting this fake source as this will be used to determine the location
-    // of log files.
-    if (path.getPath() != null) {
-      props.setSource(path.getPath());
+
+      final File path = new File(this.execDir, source);
+      if (props == null) {
+        // if no override prop, load the original one on disk
+        try {
+          props = new Props(null, path);
+        } catch (final IOException e) {
+          e.printStackTrace();
+          this.logger.error("Error loading job file " + source + " for job "
+              + node.getId());
+        }
+      }
+      // setting this fake source as this will be used to determine the location
+      // of log files.
+      if (path.getPath() != null) {
+        props.setSource(path.getPath());
+      }
     }
 
     customizeJobProperties(props);
@@ -731,6 +768,49 @@ public class FlowRunner extends EventHandler implements Runnable {
     return props;
   }
 
+  private Props loadFlowPropsFromYamlFile() {
+    final File flowFile = getFlowFile();
+    final Props flowProps = FlowLoaderUtils.getPropsFromYamlFile(this.flow.getId(), flowFile);
+    if (flowFile != null && flowFile.exists()) {
+      flowFile.delete();
+    }
+    return flowProps;
+  }
+
+  private Props loadJobPropsFromYamlFile(final ExecutableNode node) {
+    final File flowFile = getFlowFile();
+    final String jobPath =
+        node.getParentFlow().getFlowId() + Constants.PATH_DELIMITER + node.getId();
+    final Props props = FlowLoaderUtils.getPropsFromYamlFile(jobPath, flowFile);
+    if (flowFile != null && flowFile.exists()) {
+      flowFile.delete();
+    }
+    return props;
+  }
+
+  private File getFlowFile() {
+    File flowFile = null;
+    String source = null;
+    try {
+      final List<FlowProps> flowPropsList = ImmutableList.copyOf(this.flow.getFlowProps());
+      // There should be exact one source(file name) for each flow file.
+      if (!flowPropsList.isEmpty()) {
+        source = flowPropsList.get(0).getSource();
+      } else {
+        this.logger.error("Failed to get flow file source, flow props is empty.");
+        return null;
+      }
+      final int flowVersion = this.projectLoader
+          .getLatestFlowVersion(this.flow.getProjectId(), this.flow.getVersion(), source);
+      flowFile = this.projectLoader
+          .getUploadedFlowFile(this.flow.getProjectId(), this.flow.getVersion(), flowVersion,
+              source);
+    } catch (final ProjectManagerException e) {
+      this.logger.error("Failed to get flow file " + source, e);
+    }
+    return flowFile;
+  }
+
   @SuppressWarnings("FutureReturnValueIgnored")
   private void runExecutableNode(final ExecutableNode node) throws IOException {
     // Collect output props from the job's dependencies.
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 8c2feb9..5dd2fd4 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -16,16 +16,22 @@
 
 package azkaban.execapp;
 
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.InteractiveTestJob;
 import azkaban.executor.Status;
+import azkaban.project.Project;
+import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
+import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -45,23 +51,52 @@ import org.junit.Test;
  */
 public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {
 
+  private static final String EXEC_FLOW_DIR = "execpropstest";
+  private static final String FLOW_YAML_DIR = "loadpropsflowyamltest";
+  private static final String FLOW_NAME = "job3";
+  private static final String FLOW_YAML_FILE = FLOW_NAME + ".flow";
   private FlowRunnerTestUtil testUtil;
 
-  @Before
-  public void setUp() throws Exception {
-    this.testUtil = new FlowRunnerTestUtil("execpropstest", this.temporaryFolder);
-  }
-
   /**
    * Tests the basic flow resolution. Flow is defined in execpropstest
    */
   @Test
   public void testPropertyResolution() throws Exception {
+    this.testUtil = new FlowRunnerTestUtil(EXEC_FLOW_DIR, this.temporaryFolder);
+    assertProperties(false);
+  }
+
+  /**
+   * Tests the YAML flow resolution. Flow is defined in loadpropsflowyamltest
+   */
+  @Test
+  public void testYamlFilePropertyResolution() throws Exception {
+    this.testUtil = new FlowRunnerTestUtil(FLOW_YAML_DIR, this.temporaryFolder);
+    final Project project = this.testUtil.getProject();
+    when(this.testUtil.getProjectLoader().isFlowFileUploaded(project.getId(), project.getVersion()))
+        .thenReturn(true);
+    when(this.testUtil.getProjectLoader()
+        .getLatestFlowVersion(project.getId(), project.getVersion(), FLOW_YAML_FILE)).thenReturn(1);
+    doAnswer(invocation -> {
+      final File flowFile = this.temporaryFolder.newFile(FLOW_YAML_FILE);
+      FileUtils.copyFile(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE), flowFile);
+      return flowFile;
+    }).when(this.testUtil.getProjectLoader())
+        .getUploadedFlowFile(project.getId(), project.getVersion(), 1, FLOW_YAML_FILE);
+    assertProperties(true);
+  }
+
+  /**
+   * Helper method to test the flow property resolution.
+   */
+  private void assertProperties(final boolean isAzkabanFlowVersion20) throws Exception {
     final HashMap<String, String> flowProps = new HashMap<>();
     flowProps.put("props7", "flow7");
     flowProps.put("props6", "flow6");
     flowProps.put("props5", "flow5");
-    final FlowRunner runner = this.testUtil.createFromFlowMap("job3", flowProps);
+    final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME, flowProps);
+    // Todo jamiesjc: remove below line after project_flow_files DB change is rolled out.
+    runner.setAzkabanFlowVersion20(isAzkabanFlowVersion20);
     final Map<String, ExecutableNode> nodeMap = new HashMap<>();
     createNodeMap(runner.getExecutableFlow(), nodeMap);
     final ExecutableFlow flow = runner.getExecutableFlow();
@@ -134,7 +169,7 @@ public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {
     job4GeneratedProps.put("props6", "g4job6");
     InteractiveTestJob.getTestJob("innerflow:job4").succeedJob(
         job4GeneratedProps);
-    assertStatus(flow, "job3", Status.RUNNING);
+    assertStatus(flow, FLOW_NAME, Status.RUNNING);
     final Props job3Props = nodeMap.get("job3").getInputProps();
     Assert.assertEquals("job3", job3Props.get("props3"));
     Assert.assertEquals("g4job6", job3Props.get("props6"));
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index c170f6e..64d421e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -35,7 +35,8 @@ import azkaban.executor.Status;
 import azkaban.flow.Flow;
 import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypePluginSet;
-import azkaban.project.DirectoryFlowLoader;
+import azkaban.project.FlowLoader;
+import azkaban.project.FlowLoaderFactory;
 import azkaban.project.Project;
 import azkaban.project.ProjectLoader;
 import azkaban.project.ProjectManagerException;
@@ -58,6 +59,7 @@ public class FlowRunnerTestUtil {
   private final File workingDir;
   private final JobTypeManager jobtypeManager;
   private final File projectDir;
+  private final ProjectLoader projectLoader;
   private ExecutorLoader executorLoader;
 
   public FlowRunnerTestUtil(final String flowName, final TemporaryFolder temporaryFolder)
@@ -68,9 +70,10 @@ public class FlowRunnerTestUtil {
     this.project = new Project(1, "testProject");
 
     this.flowMap = FlowRunnerTestUtil
-        .prepareProject(this.project, ExecutionsTestUtil.getFlowDir(flowName), this.workingDir);
+        .prepareProject(this.project, this.projectDir, this.workingDir);
 
     this.executorLoader = mock(ExecutorLoader.class);
+    this.projectLoader = mock(ProjectLoader.class);
     when(this.executorLoader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
 
     Utils.initServiceProvider();
@@ -97,7 +100,9 @@ public class FlowRunnerTestUtil {
   public static Map<String, Flow> prepareProject(final Project project, final File sourceDir,
       final File workingDir)
       throws ProjectManagerException, IOException {
-    final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props());
+    final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+    final FlowLoader loader = loaderFactory.createFlowLoader(sourceDir);
+
     loader.loadProjectFlow(project, sourceDir);
     if (!loader.getErrors().isEmpty()) {
       for (final String error : loader.getErrors()) {
@@ -246,7 +251,7 @@ public class FlowRunnerTestUtil {
     exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
     this.executorLoader.uploadExecutableFlow(exFlow);
     final FlowRunner runner =
-        new FlowRunner(exFlow, this.executorLoader, mock(ProjectLoader.class),
+        new FlowRunner(exFlow, this.executorLoader, this.projectLoader,
             this.jobtypeManager, azkabanProps, null);
     if (eventCollector != null) {
       runner.addListener(eventCollector);
@@ -262,4 +267,11 @@ public class FlowRunnerTestUtil {
     this.executorLoader = executorLoader;
   }
 
+  public ProjectLoader getProjectLoader() {
+    return this.projectLoader;
+  }
+
+  public Project getProject() {
+    return this.project;
+  }
 }
diff --git a/test/execution-test-data/basicflowyamltest/Archive.zip b/test/execution-test-data/basicflowyamltest/Archive.zip
index 7bc2077..5137202 100644
Binary files a/test/execution-test-data/basicflowyamltest/Archive.zip and b/test/execution-test-data/basicflowyamltest/Archive.zip differ
diff --git a/test/execution-test-data/basicflowyamltest/sample_script.sh b/test/execution-test-data/basicflowyamltest/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/basicflowyamltest/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."
diff --git a/test/execution-test-data/embeddedflowyamltest/sample_script.sh b/test/execution-test-data/embeddedflowyamltest/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/embeddedflowyamltest/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."
diff --git a/test/execution-test-data/loadpropsflowyamltest/job3.flow b/test/execution-test-data/loadpropsflowyamltest/job3.flow
new file mode 100644
index 0000000..5a6ca5e
--- /dev/null
+++ b/test/execution-test-data/loadpropsflowyamltest/job3.flow
@@ -0,0 +1,50 @@
+---
+config:
+  props3: moo3
+  props4: moo4
+  props5: moo5
+  props1: shared1
+  props2: shared2
+  props6: shared6
+
+nodes:
+  - name: job2
+    type: test
+    config:
+      props4: shared4
+      props8: shared8
+      props2: job2
+      props7: job7
+
+  - name: innerflow
+    type: flow
+    dependsOn:
+      - job2
+    config:
+      props5: innerflow5
+      props6: innerflow6
+      props8: innerflow8
+
+    nodes:
+      - name: job4
+        type: test
+        dependsOn:
+          - job1
+        config:
+          props4: shared4
+          props8: job8
+          props9: job9
+
+      - name: job1
+        type: test
+        config:
+          props1: job1
+          props2: job2
+          props8: job8
+
+  - name: job3
+    type: test
+    dependsOn:
+      - innerflow
+    config:
+      props3: job3
diff --git a/test/execution-test-data/loadpropsflowyamltest/loadpropsflow.project b/test/execution-test-data/loadpropsflowyamltest/loadpropsflow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/loadpropsflowyamltest/loadpropsflow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/loadpropsflowyamltest/sample_script.sh b/test/execution-test-data/loadpropsflowyamltest/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/loadpropsflowyamltest/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."
diff --git a/test/execution-test-data/multipleembeddedflowyamltest/sample_script.sh b/test/execution-test-data/multipleembeddedflowyamltest/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/multipleembeddedflowyamltest/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."
diff --git a/test/execution-test-data/multipleflowyamltest/sample_script.sh b/test/execution-test-data/multipleflowyamltest/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/multipleflowyamltest/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."