Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 98586ae..7f0a42f 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -51,6 +51,9 @@ public class Constants {
public static final String CRON_SCHEDULE_TYPE = "cron";
public static final String SCHEDULE_VALUE = "value";
+ // Job properties override suffix
+ public static final String JOB_OVERRIDE_SUFFIX = ".jor";
+
// Names and paths of various file names to configure Azkaban
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index c1775d9..ab43888 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -48,6 +48,7 @@ public class Flow {
private boolean isLayedOut = false;
private boolean isEmbeddedFlow = false;
+ private double azkabanFlowVersion;
public Flow(final String id) {
this.id = id;
@@ -59,6 +60,7 @@ public class Flow {
final String id = (String) flowObject.get("id");
final Boolean layedout = (Boolean) flowObject.get("layedout");
final Boolean isEmbeddedFlow = (Boolean) flowObject.get("embeddedFlow");
+ final Double azkabanFlowVersion = (Double) flowObject.get("azkabanFlowVersion");
final Flow flow = new Flow(id);
if (layedout != null) {
flow.setLayedOut(layedout);
@@ -68,6 +70,10 @@ public class Flow {
flow.setEmbeddedFlow(isEmbeddedFlow);
}
+ if (azkabanFlowVersion != null) {
+ flow.setAzkabanFlowVersion(azkabanFlowVersion);
+ }
+
final int projId = (Integer) flowObject.get("project.id");
flow.setProjectId(projId);
@@ -328,6 +334,7 @@ public class Flow {
flowObj.put("mailCreator", this.mailCreator);
flowObj.put("layedout", this.isLayedOut);
flowObj.put("embeddedFlow", this.isEmbeddedFlow);
+ flowObj.put("azkabanFlowVersion", this.azkabanFlowVersion);
if (this.errors != null) {
flowObj.put("errors", this.errors);
}
@@ -385,6 +392,14 @@ public class Flow {
this.isEmbeddedFlow = embeddedFlow;
}
+ public double getAzkabanFlowVersion() {
+ return this.azkabanFlowVersion;
+ }
+
+ public void setAzkabanFlowVersion(final double azkabanFlowVersion) {
+ this.azkabanFlowVersion = azkabanFlowVersion;
+ }
+
public Map<String, Object> getMetadata() {
if (this.metadata == null) {
this.metadata = new HashMap<>();
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
index 2dba398..477ebb3 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -30,6 +30,7 @@ import java.util.Map;
/**
* Flow level definition of the DAG.
* Contains a list of AzkabanNodes and related flow properties.
+ * Introduced in Flow 2.0 design.
*/
public class AzkabanFlow extends AzkabanNode {
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index ee26a14..6148182 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -40,7 +40,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.zip.ZipFile;
import javax.inject.Inject;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,7 +105,7 @@ class AzkabanProjectLoader {
// Check the validation report.
if (!isReportStatusValid(reports, project)) {
- cleanUpProjectTempDir(file);
+ FlowLoaderUtils.cleanUpDir(file);
return reports;
}
@@ -114,7 +113,7 @@ class AzkabanProjectLoader {
persistProject(project, loader, archive, file, uploader);
} finally {
- cleanUpProjectTempDir(file);
+ FlowLoaderUtils.cleanUpDir(file);
}
// Clean up project old installations after new project is uploaded successfully.
@@ -239,18 +238,6 @@ class AzkabanProjectLoader {
this.storageManager.cleanupProjectArtifacts(project.getId());
}
- private void cleanUpProjectTempDir(final File file) {
- log.info("Cleaning up temp files.");
- try {
- if (file != null) {
- FileUtils.deleteDirectory(file);
- }
- } catch (final IOException e) {
- log.error("Failed to delete temp directory", e);
- file.deleteOnExit();
- }
- }
-
private File unzipFile(final File archiveFile) throws IOException {
final ZipFile zipfile = new ZipFile(archiveFile);
final File unzipped = Utils.createTempDir(this.tempDir);
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 8631aba..dade01f 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -118,6 +118,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
private Flow convertAzkabanFlowToFlow(final AzkabanFlow azkabanFlow, final String flowName,
final File flowFile) {
final Flow flow = new Flow(flowName);
+ flow.setAzkabanFlowVersion(Constants.AZKABAN_FLOW_VERSION_2_0);
final Props props = azkabanFlow.getProps();
FlowLoaderUtils.addEmailPropsToFlow(flow, props);
props.setSource(flowFile.getName());
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index 8c719d0..52d0d8b 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -20,15 +20,24 @@ import azkaban.flow.CommonJobProperties;
import azkaban.flow.Flow;
import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.DumperOptions.FlowStyle;
+import org.yaml.snakeyaml.Yaml;
/**
* Utils to help load flows.
@@ -38,6 +47,81 @@ public class FlowLoaderUtils {
private static final Logger logger = LoggerFactory.getLogger(FlowLoaderUtils.class);
/**
+ * Sets props in flow yaml file.
+ *
+ * @param path the flow or job path delimited by ":", e.g. "flow:subflow1:subflow2:job3"
+ * @param flowFile the flow yaml file
+ * @param prop the props to set
+ */
+ public static void setPropsInYamlFile(final String path, final File flowFile, final Props prop) {
+ final DumperOptions options = new DumperOptions();
+ options.setDefaultFlowStyle(FlowStyle.BLOCK);
+ final NodeBean nodeBean = FlowLoaderUtils.setPropsInNodeBean(path, flowFile, prop);
+ try (final BufferedWriter writer = Files
+ .newBufferedWriter(flowFile.toPath(), StandardCharsets.UTF_8)) {
+ new Yaml(options).dump(nodeBean, writer);
+ } catch (final IOException e) {
+ throw new ProjectManagerException(
+ "Failed to set properties in flow file " + flowFile.getName());
+ }
+ }
+
+ /**
+ * Sets props in node bean.
+ *
+ * @param path the flow or job path delimited by ":", e.g. "flow:subflow1:subflow2:job3"
+ * @param flowFile the flow yaml file
+ * @param prop the props to set
+ * @return the node bean
+ */
+ public static NodeBean setPropsInNodeBean(final String path, final File flowFile,
+ final Props prop) {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+ try {
+ final NodeBean nodeBean = loader.load(flowFile);
+ final String[] pathList = path.split(Constants.PATH_DELIMITER);
+ if (overridePropsInNodeBean(nodeBean, pathList, 0, prop)) {
+ return nodeBean;
+ } else {
+ logger.error("Error setting props for " + path);
+ }
+ } catch (final Exception e) {
+ logger.error("Failed to set props, error loading flow YAML file " + flowFile);
+ }
+ return null;
+ }
+
+ /**
+ * Helper method to recursively find the node to override props.
+ *
+ * @param nodeBean the node bean
+ * @param pathList the path list
+ * @param idx the idx
+ * @param prop the props to override
+ * @return the boolean
+ */
+ private static boolean overridePropsInNodeBean(final NodeBean nodeBean, final String[] pathList,
+ final int idx, final Props prop) {
+ if (idx < pathList.length && nodeBean.getName().equals(pathList[idx])) {
+ if (idx == pathList.length - 1) {
+ if (prop.containsKey(Constants.NODE_TYPE)) {
+ nodeBean.setType(prop.get(Constants.NODE_TYPE));
+ }
+ final Map<String, String> config = prop.getFlattened();
+ config.remove(Constants.NODE_TYPE);
+ nodeBean.setConfig(config);
+ return true;
+ }
+ for (final NodeBean bean : nodeBean.getNodes()) {
+ if (overridePropsInNodeBean(bean, pathList, idx + 1, prop)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
* Gets flow or job props from flow yaml file.
*
* @param path the flow or job path delimited by ":", e.g. "flow:subflow1:subflow2:job3"
@@ -73,7 +157,7 @@ public class FlowLoaderUtils {
* @param propsList the props list
* @return the boolean
*/
- public static boolean findPropsFromNodeBean(final NodeBean nodeBean,
+ private static boolean findPropsFromNodeBean(final NodeBean nodeBean,
final String[] pathList, final int idx, final List<Props> propsList) {
if (idx < pathList.length && nodeBean.getName().equals(pathList[idx])) {
if (idx == pathList.length - 1) {
@@ -149,6 +233,32 @@ public class FlowLoaderUtils {
}
/**
+ * Clean up the directory.
+ *
+ * @param dir the directory to be deleted
+ */
+ public static void cleanUpDir(final File dir) {
+ try {
+ if (dir != null && dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ } catch (final IOException e) {
+ logger.error("Failed to delete the directory", e);
+ dir.deleteOnExit();
+ }
+ }
+
+ /**
+ * Check if azkaban flow version is 2.0.
+ *
+ * @param azkabanFlowVersion the azkaban flow version
+ * @return the boolean
+ */
+ public static boolean isAzkabanFlowVersion20(final double azkabanFlowVersion) {
+ return Double.compare(azkabanFlowVersion, Constants.AZKABAN_FLOW_VERSION_2_0) == 0;
+ }
+
+ /**
* Implements Suffix filter.
*/
public static class SuffixFilter implements FileFilter {
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 79caa97..c9df778 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -18,6 +18,7 @@ package azkaban.project;
import static java.util.Objects.requireNonNull;
+import azkaban.Constants;
import azkaban.flow.Flow;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.project.validator.ValidationReport;
@@ -29,8 +30,7 @@ import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
-import javax.inject.Inject;
-import javax.inject.Singleton;
+import com.google.common.io.Files;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
@@ -39,6 +39,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import org.apache.log4j.Logger;
@@ -319,27 +321,79 @@ public class ProjectManager {
return this.projectLoader.getProjectEvents(project, results, skip);
}
- public Props getProperties(final Project project, final String source)
- throws ProjectManagerException {
- return this.projectLoader.fetchProjectProperty(project, source);
+ public Props getPropertiesFromFlowFile(final Flow flow, final String jobName, final String
+ flowFileName, final int flowVersion) throws ProjectManagerException {
+ File tempDir = null;
+ Props props = null;
+ try {
+ tempDir = Files.createTempDir();
+ final File flowFile = this.projectLoader.getUploadedFlowFile(flow.getProjectId(), flow
+ .getVersion(), flowFileName, flowVersion, tempDir);
+ final String path =
+ jobName == null ? flow.getId() : flow.getId() + Constants.PATH_DELIMITER + jobName;
+ props = FlowLoaderUtils.getPropsFromYamlFile(path, flowFile);
+ } catch (final Exception e) {
+ this.logger.error("Failed to get props from flow file. " + e);
+ } finally {
+ FlowLoaderUtils.cleanUpDir(tempDir);
+ }
+ return props;
}
- public Props getJobOverrideProperty(final Project project, final String jobName)
- throws ProjectManagerException {
- return this.projectLoader.fetchProjectProperty(project, jobName + ".jor");
+ public Props getProperties(final Project project, final Flow flow, final String jobName,
+ final String source) throws ProjectManagerException {
+ if (FlowLoaderUtils.isAzkabanFlowVersion20(flow.getAzkabanFlowVersion())) {
+ // Return the properties from the original uploaded flow file.
+ return getPropertiesFromFlowFile(flow, jobName, source, 1);
+ } else {
+ return this.projectLoader.fetchProjectProperty(project, source);
+ }
}
- public void setJobOverrideProperty(final Project project, final Props prop, final String jobName,
- final User modifier)
- throws ProjectManagerException {
- prop.setSource(jobName + ".jor");
- final Props oldProps =
- this.projectLoader.fetchProjectProperty(project, prop.getSource());
+ public Props getJobOverrideProperty(final Project project, final Flow flow, final String jobName,
+ final String source) throws ProjectManagerException {
+ if (FlowLoaderUtils.isAzkabanFlowVersion20(flow.getAzkabanFlowVersion())) {
+ final int flowVersion = this.projectLoader
+ .getLatestFlowVersion(flow.getProjectId(), flow.getVersion(), source);
+ return getPropertiesFromFlowFile(flow, jobName, source, flowVersion);
+ } else {
+ return this.projectLoader
+ .fetchProjectProperty(project, jobName + Constants.JOB_OVERRIDE_SUFFIX);
+ }
+ }
- if (oldProps == null) {
- this.projectLoader.uploadProjectProperty(project, prop);
+ public void setJobOverrideProperty(final Project project, final Flow flow, final Props prop,
+ final String jobName, final String source, final User modifier)
+ throws ProjectManagerException {
+ File tempDir = null;
+ Props oldProps = null;
+ if (FlowLoaderUtils.isAzkabanFlowVersion20(flow.getAzkabanFlowVersion())) {
+ try {
+ tempDir = Files.createTempDir();
+ final int flowVersion = this.projectLoader.getLatestFlowVersion(flow.getProjectId(), flow
+ .getVersion(), source);
+ final File flowFile = this.projectLoader.getUploadedFlowFile(flow.getProjectId(), flow
+ .getVersion(), source, flowVersion, tempDir);
+ final String path = flow.getId() + Constants.PATH_DELIMITER + jobName;
+ oldProps = FlowLoaderUtils.getPropsFromYamlFile(path, flowFile);
+
+ FlowLoaderUtils.setPropsInYamlFile(path, flowFile, prop);
+ this.projectLoader
+ .uploadFlowFile(flow.getProjectId(), flow.getVersion(), flowFile, flowVersion + 1);
+ } catch (final Exception e) {
+ this.logger.error("Failed to set job override property. " + e);
+ } finally {
+ FlowLoaderUtils.cleanUpDir(tempDir);
+ }
} else {
- this.projectLoader.updateProjectProperty(project, prop);
+ prop.setSource(jobName + Constants.JOB_OVERRIDE_SUFFIX);
+ oldProps = this.projectLoader.fetchProjectProperty(project, prop.getSource());
+
+ if (oldProps == null) {
+ this.projectLoader.uploadProjectProperty(project, prop);
+ } else {
+ this.projectLoader.updateProjectProperty(project, prop);
+ }
}
final String diffMessage = PropsUtils.getPropertyDiff(oldProps, prop);
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index 2c39a88..f89b218 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -22,6 +22,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import azkaban.Constants;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
+import java.io.File;
+import org.apache.commons.io.FileUtils;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Arrays;
@@ -48,6 +50,7 @@ public class NodeBeanLoaderTest {
private static final String SHELL_PWD = "shell_pwd";
private static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
private static final String ECHO_COMMAND_1 = "echo \"This is an echoed text from embedded_flow1.\"";
+ private static final String ECHO_OVERRIDE = "echo \"Override job properties.\"";
private static final String PWD_COMMAND = "pwd";
private static final String BASH_COMMAND = "bash ./sample_script.sh";
private static final String EMBEDDED_FLOW1 = "embedded_flow1";
@@ -277,6 +280,38 @@ public class NodeBeanLoaderTest {
}
@Test
+ public void testSetJobPropsInBasicFlow() throws Exception {
+ final String path = BASIC_FLOW_NAME + Constants.PATH_DELIMITER + SHELL_ECHO;
+ final Props overrideProps = new Props();
+ overrideProps.put(Constants.NODE_TYPE, TYPE_COMMAND);
+ overrideProps.put(TYPE_COMMAND, ECHO_OVERRIDE);
+
+ final File newFile = new File(ExecutionsTestUtil.getDataRootDir(), BASIC_FLOW_YML_FILE);
+ newFile.deleteOnExit();
+ FileUtils.copyFile(ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE),
+ newFile);
+ FlowLoaderUtils.setPropsInYamlFile(path, newFile, overrideProps);
+ assertThat(FlowLoaderUtils.getPropsFromYamlFile(path, newFile)).isEqualTo(overrideProps);
+ }
+
+ @Test
+ public void testSetJobPropsInEmbeddedFlow() throws Exception {
+ final String path = EMBEDDED_FLOW_NAME + Constants.PATH_DELIMITER + EMBEDDED_FLOW1 +
+ Constants.PATH_DELIMITER + EMBEDDED_FLOW2 + Constants.PATH_DELIMITER + SHELL_END;
+ final Props overrideProps = new Props();
+ overrideProps.put(Constants.NODE_TYPE, TYPE_COMMAND);
+ overrideProps.put(TYPE_COMMAND, ECHO_OVERRIDE);
+
+ final File newFile = new File(ExecutionsTestUtil.getDataRootDir(), EMBEDDED_FLOW_YML_FILE);
+ newFile.deleteOnExit();
+ FileUtils.copyFile(
+ ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE),
+ newFile);
+ FlowLoaderUtils.setPropsInYamlFile(path, newFile, overrideProps);
+ assertThat(FlowLoaderUtils.getPropsFromYamlFile(path, newFile)).isEqualTo(overrideProps);
+ }
+
+ @Test
public void testGetFlowTrigger() {
final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(
ExecutionsTestUtil.getFlowFile(TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 15383cc..efa641b 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -742,7 +742,7 @@ public class FlowRunner extends EventHandler implements Runnable {
try {
props =
this.projectLoader.fetchProjectProperty(this.flow.getProjectId(),
- this.flow.getVersion(), node.getId() + ".jor");
+ this.flow.getVersion(), node.getId() + Constants.JOB_OVERRIDE_SUFFIX);
} catch (final ProjectManagerException e) {
e.printStackTrace();
this.logger.error("Error loading job override property for job "
diff --git a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 630d61e..2a2e261 100644
--- a/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-web-server/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -688,33 +688,38 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
return;
}
- final Props prop;
+ Props jobProp;
try {
- prop = this.projectManager.getProperties(project, node.getJobSource());
+ jobProp = this.projectManager.getProperties(project, flow, jobName, node.getJobSource());
} catch (final ProjectManagerException e) {
ret.put("error", "Failed to retrieve job properties!");
return;
}
+ if (jobProp == null) {
+ jobProp = new Props();
+ }
+
Props overrideProp;
try {
- overrideProp = this.projectManager.getJobOverrideProperty(project, jobName);
+ overrideProp = this.projectManager
+ .getJobOverrideProperty(project, flow, jobName, node.getJobSource());
} catch (final ProjectManagerException e) {
ret.put("error", "Failed to retrieve job override properties!");
return;
}
ret.put("jobName", node.getId());
- ret.put("jobType", prop.get("type"));
+ ret.put("jobType", jobProp.get("type"));
if (overrideProp == null) {
- overrideProp = new Props(prop);
+ overrideProp = new Props(jobProp);
}
final Map<String, String> generalParams = new HashMap<>();
final Map<String, String> overrideParams = new HashMap<>();
- for (final String ps : prop.getKeySet()) {
- generalParams.put(ps, prop.getString(ps));
+ for (final String ps : jobProp.getKeySet()) {
+ generalParams.put(ps, jobProp.getString(ps));
}
for (final String ops : overrideProp.getKeySet()) {
overrideParams.put(ops, overrideProp.getString(ops));
@@ -745,7 +750,9 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
final Map<String, String> jobParamGroup = this.getParamGroup(req, "jobOverride");
final Props overrideParams = new Props(null, jobParamGroup);
try {
- this.projectManager.setJobOverrideProperty(project, overrideParams, jobName, user);
+ this.projectManager
+ .setJobOverrideProperty(project, flow, overrideParams, jobName, node.getJobSource(),
+ user);
} catch (final ProjectManagerException e) {
ret.put("error", "Failed to upload job override property");
}
@@ -837,20 +844,20 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
ret.put("flow", flowId);
ret.put("type", node.getType());
- final Props props;
+ final Props jobProps;
try {
- props = this.projectManager.getProperties(project, node.getJobSource());
+ jobProps = this.projectManager.getProperties(project, flow, nodeId, node.getJobSource());
} catch (final ProjectManagerException e) {
ret.put("error", "Failed to upload job override property for " + nodeId);
return;
}
- if (props == null) {
+ if (jobProps == null) {
ret.put("error", "Properties for " + nodeId + " isn't found.");
return;
}
- final Map<String, String> properties = PropsUtils.toStringMap(props, true);
+ final Map<String, String> properties = PropsUtils.toStringMap(jobProps, true);
ret.put("props", properties);
if (node.getType().equals("flow")) {
@@ -1336,13 +1343,14 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
return;
}
- final Props prop = this.projectManager.getProperties(project, node.getJobSource());
+ final Props jobProp = this.projectManager
+ .getProperties(project, flow, jobName, node.getJobSource());
Props overrideProp =
- this.projectManager.getJobOverrideProperty(project, jobName);
+ this.projectManager.getJobOverrideProperty(project, flow, jobName, node.getJobSource());
if (overrideProp == null) {
overrideProp = new Props();
}
- final Props comboProp = new Props(prop);
+ final Props comboProp = new Props(jobProp);
for (final String key : overrideProp.getKeySet()) {
comboProp.put(key, overrideProp.get(key));
}
@@ -1450,7 +1458,7 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
return;
}
- final Props prop = this.projectManager.getProperties(project, propSource);
+ final Props prop = this.projectManager.getProperties(project, flow, null, propSource);
if (prop == null) {
page.add("errorMsg", "Property " + propSource + " not found.");
logger.info("Display project property. Project " + projectName +