azkaban-aplcache

Flow 2.0 design - Override Job Properties (#1565) * overrideJobProperties *

12/19/2017 3:53:01 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 98586ae..7f0a42f 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -51,6 +51,9 @@ public class Constants {
   public static final String CRON_SCHEDULE_TYPE = "cron";
   public static final String SCHEDULE_VALUE = "value";
 
+  // Job properties override suffix
+  public static final String JOB_OVERRIDE_SUFFIX = ".jor";
+
   // Names and paths of various file names to configure Azkaban
   public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
   public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index c1775d9..ab43888 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -48,6 +48,7 @@ public class Flow {
 
   private boolean isLayedOut = false;
   private boolean isEmbeddedFlow = false;
+  private double azkabanFlowVersion;
 
   public Flow(final String id) {
     this.id = id;
@@ -59,6 +60,7 @@ public class Flow {
     final String id = (String) flowObject.get("id");
     final Boolean layedout = (Boolean) flowObject.get("layedout");
     final Boolean isEmbeddedFlow = (Boolean) flowObject.get("embeddedFlow");
+    final Double azkabanFlowVersion = (Double) flowObject.get("azkabanFlowVersion");
     final Flow flow = new Flow(id);
     if (layedout != null) {
       flow.setLayedOut(layedout);
@@ -68,6 +70,10 @@ public class Flow {
       flow.setEmbeddedFlow(isEmbeddedFlow);
     }
 
+    if (azkabanFlowVersion != null) {
+      flow.setAzkabanFlowVersion(azkabanFlowVersion);
+    }
+
     final int projId = (Integer) flowObject.get("project.id");
     flow.setProjectId(projId);
 
@@ -328,6 +334,7 @@ public class Flow {
     flowObj.put("mailCreator", this.mailCreator);
     flowObj.put("layedout", this.isLayedOut);
     flowObj.put("embeddedFlow", this.isEmbeddedFlow);
+    flowObj.put("azkabanFlowVersion", this.azkabanFlowVersion);
     if (this.errors != null) {
       flowObj.put("errors", this.errors);
     }
@@ -385,6 +392,14 @@ public class Flow {
     this.isEmbeddedFlow = embeddedFlow;
   }
 
+  public double getAzkabanFlowVersion() {
+    return this.azkabanFlowVersion;
+  }
+
+  public void setAzkabanFlowVersion(final double azkabanFlowVersion) {
+    this.azkabanFlowVersion = azkabanFlowVersion;
+  }
+
   public Map<String, Object> getMetadata() {
     if (this.metadata == null) {
       this.metadata = new HashMap<>();
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
index 2dba398..477ebb3 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -30,6 +30,7 @@ import java.util.Map;
 /**
  * Flow level definition of the DAG.
  * Contains a list of AzkabanNodes and related flow properties.
+ * Introduced in Flow 2.0 design.
  */
 public class AzkabanFlow extends AzkabanNode {
 
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index ee26a14..6148182 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -40,7 +40,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.zip.ZipFile;
 import javax.inject.Inject;
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,7 +105,7 @@ class AzkabanProjectLoader {
 
       // Check the validation report.
       if (!isReportStatusValid(reports, project)) {
-        cleanUpProjectTempDir(file);
+        FlowLoaderUtils.cleanUpDir(file);
         return reports;
       }
 
@@ -114,7 +113,7 @@ class AzkabanProjectLoader {
       persistProject(project, loader, archive, file, uploader);
 
     } finally {
-      cleanUpProjectTempDir(file);
+      FlowLoaderUtils.cleanUpDir(file);
     }
 
     // Clean up project old installations after new project is uploaded successfully.
@@ -239,18 +238,6 @@ class AzkabanProjectLoader {
     this.storageManager.cleanupProjectArtifacts(project.getId());
   }
 
-  private void cleanUpProjectTempDir(final File file) {
-    log.info("Cleaning up temp files.");
-    try {
-      if (file != null) {
-        FileUtils.deleteDirectory(file);
-      }
-    } catch (final IOException e) {
-      log.error("Failed to delete temp directory", e);
-      file.deleteOnExit();
-    }
-  }
-
   private File unzipFile(final File archiveFile) throws IOException {
     final ZipFile zipfile = new ZipFile(archiveFile);
     final File unzipped = Utils.createTempDir(this.tempDir);
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 8631aba..dade01f 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -118,6 +118,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
   private Flow convertAzkabanFlowToFlow(final AzkabanFlow azkabanFlow, final String flowName,
       final File flowFile) {
     final Flow flow = new Flow(flowName);
+    flow.setAzkabanFlowVersion(Constants.AZKABAN_FLOW_VERSION_2_0);
     final Props props = azkabanFlow.getProps();
     FlowLoaderUtils.addEmailPropsToFlow(flow, props);
     props.setSource(flowFile.getName());
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index 8c719d0..52d0d8b 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -20,15 +20,24 @@ import azkaban.flow.CommonJobProperties;
 import azkaban.flow.Flow;
 import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.DumperOptions.FlowStyle;
+import org.yaml.snakeyaml.Yaml;
 
 /**
  * Utils to help load flows.
@@ -38,6 +47,81 @@ public class FlowLoaderUtils {
   private static final Logger logger = LoggerFactory.getLogger(FlowLoaderUtils.class);
 
   /**
+   * Sets props in flow yaml file.
+   *
+   * @param path the flow or job path delimited by ":", e.g. "flow:subflow1:subflow2:job3"
+   * @param flowFile the flow yaml file
+   * @param prop the props to set
+   */
+  public static void setPropsInYamlFile(final String path, final File flowFile, final Props prop) {
+    final DumperOptions options = new DumperOptions();
+    options.setDefaultFlowStyle(FlowStyle.BLOCK);
+    final NodeBean nodeBean = FlowLoaderUtils.setPropsInNodeBean(path, flowFile, prop);
+    try (final BufferedWriter writer = Files
+        .newBufferedWriter(flowFile.toPath(), StandardCharsets.UTF_8)) {
+      new Yaml(options).dump(nodeBean, writer);
+    } catch (final IOException e) {
+      throw new ProjectManagerException(
+          "Failed to set properties in flow file " + flowFile.getName());
+    }
+  }
+
+  /**
+   * Sets props in node bean.
+   *
+   * @param path the flow or job path delimited by ":", e.g. "flow:subflow1:subflow2:job3"
+   * @param flowFile the flow yaml file
+   * @param prop the props to set
+   * @return the node bean
+   */
+  public static NodeBean setPropsInNodeBean(final String path, final File flowFile,
+      final Props prop) {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+    try {
+      final NodeBean nodeBean = loader.load(flowFile);
+      final String[] pathList = path.split(Constants.PATH_DELIMITER);
+      if (overridePropsInNodeBean(nodeBean, pathList, 0, prop)) {
+        return nodeBean;
+      } else {
+        logger.error("Error setting props for " + path);
+      }
+    } catch (final Exception e) {
+      logger.error("Failed to set props, error loading flow YAML file " + flowFile);
+    }
+    return null;
+  }
+
+  /**
+   * Helper method to recursively find the node to override props.
+   *
+   * @param nodeBean the node bean
+   * @param pathList the path list
+   * @param idx the idx
+   * @param prop the props to override
+   * @return the boolean
+   */
+  private static boolean overridePropsInNodeBean(final NodeBean nodeBean, final String[] pathList,
+      final int idx, final Props prop) {
+    if (idx < pathList.length && nodeBean.getName().equals(pathList[idx])) {
+      if (idx == pathList.length - 1) {
+        if (prop.containsKey(Constants.NODE_TYPE)) {
+          nodeBean.setType(prop.get(Constants.NODE_TYPE));
+        }
+        final Map<String, String> config = prop.getFlattened();
+        config.remove(Constants.NODE_TYPE);
+        nodeBean.setConfig(config);
+        return true;
+      }
+      for (final NodeBean bean : nodeBean.getNodes()) {
+        if (overridePropsInNodeBean(bean, pathList, idx + 1, prop)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
    * Gets flow or job props from flow yaml file.
    *
    * @param path the flow or job path delimited by ":", e.g. "flow:subflow1:subflow2:job3"
@@ -73,7 +157,7 @@ public class FlowLoaderUtils {
    * @param propsList the props list
    * @return the boolean
    */
-  public static boolean findPropsFromNodeBean(final NodeBean nodeBean,
+  private static boolean findPropsFromNodeBean(final NodeBean nodeBean,
       final String[] pathList, final int idx, final List<Props> propsList) {
     if (idx < pathList.length && nodeBean.getName().equals(pathList[idx])) {
       if (idx == pathList.length - 1) {
@@ -149,6 +233,32 @@ public class FlowLoaderUtils {
   }
 
   /**
+   * Clean up the directory.
+   *
+   * @param dir the directory to be deleted
+   */
+  public static void cleanUpDir(final File dir) {
+    try {
+      if (dir != null && dir.exists()) {
+        FileUtils.deleteDirectory(dir);
+      }
+    } catch (final IOException e) {
+      logger.error("Failed to delete the directory", e);
+      dir.deleteOnExit();
+    }
+  }
+
+  /**
+   * Check if azkaban flow version is 2.0.
+   *
+   * @param azkabanFlowVersion the azkaban flow version
+   * @return the boolean
+   */
+  public static boolean isAzkabanFlowVersion20(final double azkabanFlowVersion) {
+    return Double.compare(azkabanFlowVersion, Constants.AZKABAN_FLOW_VERSION_2_0) == 0;
+  }
+
+  /**
    * Implements Suffix filter.
    */
   public static class SuffixFilter implements FileFilter {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 79caa97..c9df778 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -18,6 +18,7 @@ package azkaban.project;
 
 import static java.util.Objects.requireNonNull;
 
+import azkaban.Constants;
 import azkaban.flow.Flow;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.project.validator.ValidationReport;
@@ -29,8 +30,7 @@ import azkaban.user.Permission.Type;
 import azkaban.user.User;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
-import javax.inject.Inject;
-import javax.inject.Singleton;
+import com.google.common.io.Files;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -39,6 +39,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.apache.log4j.Logger;
 
 
@@ -319,27 +321,79 @@ public class ProjectManager {
     return this.projectLoader.getProjectEvents(project, results, skip);
   }
 
-  public Props getProperties(final Project project, final String source)
-      throws ProjectManagerException {
-    return this.projectLoader.fetchProjectProperty(project, source);
+  public Props getPropertiesFromFlowFile(final Flow flow, final String jobName, final String
+      flowFileName, final int flowVersion) throws ProjectManagerException {
+    File tempDir = null;
+    Props props = null;
+    try {
+      tempDir = Files.createTempDir();
+      final File flowFile = this.projectLoader.getUploadedFlowFile(flow.getProjectId(), flow
+          .getVersion(), flowFileName, flowVersion, tempDir);
+      final String path =
+          jobName == null ? flow.getId() : flow.getId() + Constants.PATH_DELIMITER + jobName;
+      props = FlowLoaderUtils.getPropsFromYamlFile(path, flowFile);
+    } catch (final Exception e) {
+      this.logger.error("Failed to get props from flow file. " + e);
+    } finally {
+      FlowLoaderUtils.cleanUpDir(tempDir);
+    }
+    return props;
   }
 
-  public Props getJobOverrideProperty(final Project project, final String jobName)
-      throws ProjectManagerException {
-    return this.projectLoader.fetchProjectProperty(project, jobName + ".jor");
+  public Props getProperties(final Project project, final Flow flow, final String jobName,
+      final String source) throws ProjectManagerException {
+    if (FlowLoaderUtils.isAzkabanFlowVersion20(flow.getAzkabanFlowVersion())) {
+      // Return the properties from the original uploaded flow file.
+      return getPropertiesFromFlowFile(flow, jobName, source, 1);
+    } else {
+      return this.projectLoader.fetchProjectProperty(project, source);
+    }
   }
 
-  public void setJobOverrideProperty(final Project project, final Props prop, final String jobName,
-      final User modifier)
-      throws ProjectManagerException {
-    prop.setSource(jobName + ".jor");
-    final Props oldProps =
-        this.projectLoader.fetchProjectProperty(project, prop.getSource());
+  public Props getJobOverrideProperty(final Project project, final Flow flow, final String jobName,
+      final String source) throws ProjectManagerException {
+    if (FlowLoaderUtils.isAzkabanFlowVersion20(flow.getAzkabanFlowVersion())) {
+      final int flowVersion = this.projectLoader
+          .getLatestFlowVersion(flow.getProjectId(), flow.getVersion(), source);
+      return getPropertiesFromFlowFile(flow, jobName, source, flowVersion);
+    } else {
+      return this.projectLoader
+          .fetchProjectProperty(project, jobName + Constants.JOB_OVERRIDE_SUFFIX);
+    }
+  }
 
-    if (oldProps == null) {
-      this.projectLoader.uploadProjectProperty(project, prop);
+  public void setJobOverrideProperty(final Project project, final Flow flow, final Props prop,
+      final String jobName, final String source, final User modifier)
+      throws ProjectManagerException {
+    File tempDir = null;
+    Props oldProps = null;
+    if (FlowLoaderUtils.isAzkabanFlowVersion20(flow.getAzkabanFlowVersion())) {
+      try {
+        tempDir = Files.createTempDir();
+        final int flowVersion = this.projectLoader.getLatestFlowVersion(flow.getProjectId(), flow
+            .getVersion(), source);
+        final File flowFile = this.projectLoader.getUploadedFlowFile(flow.getProjectId(), flow
+            .getVersion(), source, flowVersion, tempDir);
+        final String path = flow.getId() + Constants.PATH_DELIMITER + jobName;
+        oldProps = FlowLoaderUtils.getPropsFromYamlFile(path, flowFile);
+
+        FlowLoaderUtils.setPropsInYamlFile(path, flowFile, prop);
+        this.projectLoader
+            .uploadFlowFile(flow.getProjectId(), flow.getVersion(), flowFile, flowVersion + 1);
+      } catch (final Exception e) {
+        this.logger.error("Failed to set job override property. " + e);
+      } finally {
+        FlowLoaderUtils.cleanUpDir(tempDir);
+      }
     } else {
-      this.projectLoader.updateProjectProperty(project, prop);
+      prop.setSource(jobName + Constants.JOB_OVERRIDE_SUFFIX);
+      oldProps = this.projectLoader.fetchProjectProperty(project, prop.getSource());
+
+      if (oldProps == null) {
+        this.projectLoader.uploadProjectProperty(project, prop);
+      } else {
+        this.projectLoader.updateProjectProperty(project, prop);
+      }
     }
 
     final String diffMessage = PropsUtils.getPropertyDiff(oldProps, prop);
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index 2c39a88..f89b218 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -22,6 +22,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 import azkaban.Constants;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
+import java.io.File;
+import org.apache.commons.io.FileUtils;
 import com.google.common.collect.ImmutableMap;
 import java.time.Duration;
 import java.util.Arrays;
@@ -48,6 +50,7 @@ public class NodeBeanLoaderTest {
   private static final String SHELL_PWD = "shell_pwd";
   private static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
   private static final String ECHO_COMMAND_1 = "echo \"This is an echoed text from embedded_flow1.\"";
+  private static final String ECHO_OVERRIDE = "echo \"Override job properties.\"";
   private static final String PWD_COMMAND = "pwd";
   private static final String BASH_COMMAND = "bash ./sample_script.sh";
   private static final String EMBEDDED_FLOW1 = "embedded_flow1";
@@ -277,6 +280,38 @@ public class NodeBeanLoaderTest {
   }
 
   @Test
+  public void testSetJobPropsInBasicFlow() throws Exception {
+    final String path = BASIC_FLOW_NAME + Constants.PATH_DELIMITER + SHELL_ECHO;
+    final Props overrideProps = new Props();
+    overrideProps.put(Constants.NODE_TYPE, TYPE_COMMAND);
+    overrideProps.put(TYPE_COMMAND, ECHO_OVERRIDE);
+
+    final File newFile = new File(ExecutionsTestUtil.getDataRootDir(), BASIC_FLOW_YML_FILE);
+    newFile.deleteOnExit();
+    FileUtils.copyFile(ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE),
+        newFile);
+    FlowLoaderUtils.setPropsInYamlFile(path, newFile, overrideProps);
+    assertThat(FlowLoaderUtils.getPropsFromYamlFile(path, newFile)).isEqualTo(overrideProps);
+  }
+
+  @Test
+  public void testSetJobPropsInEmbeddedFlow() throws Exception {
+    final String path = EMBEDDED_FLOW_NAME + Constants.PATH_DELIMITER + EMBEDDED_FLOW1 +
+        Constants.PATH_DELIMITER + EMBEDDED_FLOW2 + Constants.PATH_DELIMITER + SHELL_END;
+    final Props overrideProps = new Props();
+    overrideProps.put(Constants.NODE_TYPE, TYPE_COMMAND);
+    overrideProps.put(TYPE_COMMAND, ECHO_OVERRIDE);
+
+    final File newFile = new File(ExecutionsTestUtil.getDataRootDir(), EMBEDDED_FLOW_YML_FILE);
+    newFile.deleteOnExit();
+    FileUtils.copyFile(
+        ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE),
+        newFile);
+    FlowLoaderUtils.setPropsInYamlFile(path, newFile, overrideProps);
+    assertThat(FlowLoaderUtils.getPropsFromYamlFile(path, newFile)).isEqualTo(overrideProps);
+  }
+
+  @Test
   public void testGetFlowTrigger() {
     final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(
         ExecutionsTestUtil.getFlowFile(TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 15383cc..efa641b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -742,7 +742,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       try {
         props =
             this.projectLoader.fetchProjectProperty(this.flow.getProjectId(),
-                this.flow.getVersion(), node.getId() + ".jor");
+                this.flow.getVersion(), node.getId() + Constants.JOB_OVERRIDE_SUFFIX);
       } catch (final ProjectManagerException e) {
         e.printStackTrace();
         this.logger.error("Error loading job override property for job "
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 630d61e..2a2e261 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -688,33 +688,38 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
       return;
     }
 
-    final Props prop;
+    Props jobProp;
     try {
-      prop = this.projectManager.getProperties(project, node.getJobSource());
+      jobProp = this.projectManager.getProperties(project, flow, jobName, node.getJobSource());
     } catch (final ProjectManagerException e) {
       ret.put("error", "Failed to retrieve job properties!");
       return;
     }
 
+    if (jobProp == null) {
+      jobProp = new Props();
+    }
+
     Props overrideProp;
     try {
-      overrideProp = this.projectManager.getJobOverrideProperty(project, jobName);
+      overrideProp = this.projectManager
+          .getJobOverrideProperty(project, flow, jobName, node.getJobSource());
     } catch (final ProjectManagerException e) {
       ret.put("error", "Failed to retrieve job override properties!");
       return;
     }
 
     ret.put("jobName", node.getId());
-    ret.put("jobType", prop.get("type"));
+    ret.put("jobType", jobProp.get("type"));
 
     if (overrideProp == null) {
-      overrideProp = new Props(prop);
+      overrideProp = new Props(jobProp);
     }
 
     final Map<String, String> generalParams = new HashMap<>();
     final Map<String, String> overrideParams = new HashMap<>();
-    for (final String ps : prop.getKeySet()) {
-      generalParams.put(ps, prop.getString(ps));
+    for (final String ps : jobProp.getKeySet()) {
+      generalParams.put(ps, jobProp.getString(ps));
     }
     for (final String ops : overrideProp.getKeySet()) {
       overrideParams.put(ops, overrideProp.getString(ops));
@@ -745,7 +750,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
     final Map<String, String> jobParamGroup = this.getParamGroup(req, "jobOverride");
     final Props overrideParams = new Props(null, jobParamGroup);
     try {
-      this.projectManager.setJobOverrideProperty(project, overrideParams, jobName, user);
+      this.projectManager
+          .setJobOverrideProperty(project, flow, overrideParams, jobName, node.getJobSource(),
+              user);
     } catch (final ProjectManagerException e) {
       ret.put("error", "Failed to upload job override property");
     }
@@ -837,20 +844,20 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
     ret.put("flow", flowId);
     ret.put("type", node.getType());
 
-    final Props props;
+    final Props jobProps;
     try {
-      props = this.projectManager.getProperties(project, node.getJobSource());
+      jobProps = this.projectManager.getProperties(project, flow, nodeId, node.getJobSource());
     } catch (final ProjectManagerException e) {
       ret.put("error", "Failed to upload job override property for " + nodeId);
       return;
     }
 
-    if (props == null) {
+    if (jobProps == null) {
       ret.put("error", "Properties for " + nodeId + " isn't found.");
       return;
     }
 
-    final Map<String, String> properties = PropsUtils.toStringMap(props, true);
+    final Map<String, String> properties = PropsUtils.toStringMap(jobProps, true);
     ret.put("props", properties);
 
     if (node.getType().equals("flow")) {
@@ -1336,13 +1343,14 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
         return;
       }
 
-      final Props prop = this.projectManager.getProperties(project, node.getJobSource());
+      final Props jobProp = this.projectManager
+          .getProperties(project, flow, jobName, node.getJobSource());
       Props overrideProp =
-          this.projectManager.getJobOverrideProperty(project, jobName);
+          this.projectManager.getJobOverrideProperty(project, flow, jobName, node.getJobSource());
       if (overrideProp == null) {
         overrideProp = new Props();
       }
-      final Props comboProp = new Props(prop);
+      final Props comboProp = new Props(jobProp);
       for (final String key : overrideProp.getKeySet()) {
         comboProp.put(key, overrideProp.get(key));
       }
@@ -1450,7 +1458,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
         return;
       }
 
-      final Props prop = this.projectManager.getProperties(project, propSource);
+      final Props prop = this.projectManager.getProperties(project, flow, null, propSource);
       if (prop == null) {
         page.add("errorMsg", "Property " + propSource + " not found.");
         logger.info("Display project property. Project " + projectName +