Details
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index b92a183..c53e5c1 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -16,19 +16,15 @@
package azkaban.project;
-import azkaban.Constants;
import azkaban.flow.CommonJobProperties;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.flow.SpecialJobTypes;
-import azkaban.jobcallback.JobCallbackValidator;
import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
-import azkaban.utils.PropsUtils;
-import azkaban.utils.Utils;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
@@ -50,8 +46,6 @@ public class DirectoryFlowLoader implements FlowLoader {
private static final DirFilter DIR_FILTER = new DirFilter();
private static final String PROPERTY_SUFFIX = ".properties";
private static final String JOB_SUFFIX = ".job";
- private static final String XMS = "Xms";
- private static final String XMX = "Xmx";
private static final Logger logger = LoggerFactory.getLogger(DirectoryFlowLoader.class);
private final Props props;
@@ -146,7 +140,7 @@ public class DirectoryFlowLoader implements FlowLoader {
// Resolve embedded flows
resolveEmbeddedFlows();
- checkJobProperties(project);
+ FlowLoaderUtils.checkJobProperties(project.getId(), this.props, this.jobPropsMap, this.errors);
return FlowLoaderUtils.generateFlowLoaderReport(this.errors);
@@ -383,44 +377,6 @@ public class DirectoryFlowLoader implements FlowLoader {
visited.remove(node.getId());
}
- public void checkJobProperties(final Project project) {
- // if project is in the memory check whitelist, then we don't need to check
- // its memory settings
- if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
- ProjectWhitelist.WhitelistType.MemoryCheck)) {
- return;
- }
-
- final String maxXms = this.props.getString(
- Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
- final String maxXmx = this.props.getString(
- Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
- final long sizeMaxXms = Utils.parseMemString(maxXms);
- final long sizeMaxXmx = Utils.parseMemString(maxXmx);
-
- for (final String jobName : this.jobPropsMap.keySet()) {
-
- final Props jobProps = this.jobPropsMap.get(jobName);
- final String xms = jobProps.getString(XMS, null);
- if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
- && Utils.parseMemString(xms) > sizeMaxXms) {
- this.errors.add(String.format(
- "%s: Xms value has exceeded the allowed limit (max Xms = %s)",
- jobName, maxXms));
- }
- final String xmx = jobProps.getString(XMX, null);
- if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
- && Utils.parseMemString(xmx) > sizeMaxXmx) {
- this.errors.add(String.format(
- "%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
- jobName, maxXmx));
- }
-
- // job callback properties check
- JobCallbackValidator.validate(jobName, this.props, jobProps, this.errors);
- }
- }
-
private String getNameWithoutExtension(final File file) {
final String filename = file.getName();
final int index = filename.lastIndexOf('.');
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index dade01f..858b6ee 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -46,6 +46,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
private final Set<String> errors = new HashSet<>();
private final Map<String, Flow> flowMap = new HashMap<>();
private final Map<String, List<Edge>> edgeMap = new HashMap<>();
+ private final Map<String, Props> jobPropsMap = new HashMap<>();
/**
* Creates a new DirectoryYamlFlowLoader.
@@ -95,7 +96,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
@Override
public ValidationReport loadProjectFlow(final Project project, final File projectDir) {
convertYamlFiles(projectDir);
- checkJobProperties(project);
+ FlowLoaderUtils.checkJobProperties(project.getId(), this.props, this.jobPropsMap, this.errors);
return FlowLoaderUtils.generateFlowLoaderReport(this.errors);
}
@@ -106,11 +107,17 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
final NodeBeanLoader loader = new NodeBeanLoader();
try {
final NodeBean nodeBean = loader.load(file);
- final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
- final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
- this.flowMap.put(flow.getId(), flow);
+ if (!loader.validate(nodeBean)) {
+ this.errors.add("Failed to validate nodeBean for " + file.getName()
+ + ". Duplicate nodes found or dependency undefined.");
+ } else {
+ final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
+ final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
+ this.flowMap.put(flow.getId(), flow);
+ }
} catch (final Exception e) {
- logger.error("Error loading flow yaml file. ", e);
+ this.errors.add("Error loading flow yaml file " + file.getName() + ":"
+ + e.getMessage());
}
}
}
@@ -156,6 +163,9 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
flowNode.setEmbeddedFlow(true);
this.flowMap.put(flowNode.getId(), flowNode);
}
+
+ this.jobPropsMap
+ .put(flowName + Constants.PATH_DELIMITER + node.getId(), azkabanNode.getProps());
return node;
}
@@ -186,11 +196,6 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
// Cycles found, including self cycle.
edge.setError("Cycles found.");
this.errors.add("Cycles found at " + edge.getId());
- } else if (!azkabanFlow.getNodes().containsKey(parent)) {
- // Dependency not found.
- edge.setError("Dependency not found.");
- this.errors.add(node.getName() + " cannot find dependency "
- + parent);
} else {
// Valid edge. Continue to process the parent node recursively.
addEdges(azkabanFlow.getNode(parent), azkabanFlow, flowName, recStack, visited);
@@ -200,8 +205,4 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
}
}
- public void checkJobProperties(final Project project) {
- // Todo jamiesjc: implement the check later
- }
-
}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index 52d0d8b..1fa4ac3 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -18,8 +18,11 @@ package azkaban.project;
import azkaban.Constants;
import azkaban.flow.CommonJobProperties;
import azkaban.flow.Flow;
+import azkaban.jobcallback.JobCallbackValidator;
import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import azkaban.utils.Utils;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileFilter;
@@ -45,7 +48,8 @@ import org.yaml.snakeyaml.Yaml;
public class FlowLoaderUtils {
private static final Logger logger = LoggerFactory.getLogger(FlowLoaderUtils.class);
-
+ private static final String XMS = "Xms";
+ private static final String XMX = "Xmx";
/**
* Sets props in flow yaml file.
*
@@ -233,6 +237,52 @@ public class FlowLoaderUtils {
}
/**
+ * Check job properties.
+ *
+ * @param projectId the project id
+ * @param props the server props
+ * @param jobPropsMap the job props map
+ * @param errors the errors
+ */
+ public static void checkJobProperties(final int projectId, final Props props,
+ final Map<String, Props> jobPropsMap, final Set<String> errors) {
+ // if project is in the memory check whitelist, then we don't need to check
+ // its memory settings
+ if (ProjectWhitelist.isProjectWhitelisted(projectId,
+ ProjectWhitelist.WhitelistType.MemoryCheck)) {
+ return;
+ }
+
+ final String maxXms = props.getString(
+ Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
+ final String maxXmx = props.getString(
+ Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
+ final long sizeMaxXms = Utils.parseMemString(maxXms);
+ final long sizeMaxXmx = Utils.parseMemString(maxXmx);
+
+ for (final String jobName : jobPropsMap.keySet()) {
+ final Props jobProps = jobPropsMap.get(jobName);
+ final String xms = jobProps.getString(XMS, null);
+ if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
+ && Utils.parseMemString(xms) > sizeMaxXms) {
+ errors.add(String.format(
+ "%s: Xms value has exceeded the allowed limit (max Xms = %s)",
+ jobName, maxXms));
+ }
+ final String xmx = jobProps.getString(XMX, null);
+ if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
+ && Utils.parseMemString(xmx) > sizeMaxXmx) {
+ errors.add(String.format(
+ "%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
+ jobName, maxXmx));
+ }
+
+ // job callback properties check
+ JobCallbackValidator.validate(jobName, props, jobProps, errors);
+ }
+ }
+
+ /**
* Clean up the directory.
*
* @param dir the directory to be deleted
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 2b303d3..0ea5c15 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -58,7 +58,7 @@ public class NodeBeanLoader {
}
for (final NodeBean n : nodeBean.getNodes()) {
- if (!nodeNames.containsAll(n.getDependsOn())) {
+ if (n.getDependsOn() != null && !nodeNames.containsAll(n.getDependsOn())) {
// Undefined reference to dependent job
return false;
}
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 4766606..96cd07d 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -37,7 +37,10 @@ public class DirectoryYamlFlowLoaderTest {
private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowyamltest";
private static final String MULTIPLE_EMBEDDED_FLOW_YAML_DIR = "multipleembeddedflowyamltest";
- private static final String INVALID_FLOW_YAML_DIR = "invalidflowyamltest";
+ private static final String CYCLE_FOUND_YAML_DIR = "cyclefoundyamltest";
+ private static final String DUPLICATE_NODENAME_YAML_DIR = "duplicatenodenamesyamltest";
+ private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
+ private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
private static final String BASIC_FLOW_1 = "basic_flow";
private static final String BASIC_FLOW_2 = "basic_flow2";
@@ -53,10 +56,11 @@ public class DirectoryYamlFlowLoaderTest {
private static final String EMBEDDED_FLOW_B2 =
"embedded_flow_b" + Constants.PATH_DELIMITER + "embedded_flow1" + Constants.PATH_DELIMITER
+ "embedded_flow2";
- private static final String INVALID_FLOW_1 = "dependency_not_found";
- private static final String INVALID_FLOW_2 = "cycle_found";
- private static final String DEPENDENCY_NOT_FOUND_ERROR = "Dependency not found.";
+ private static final String DUPLICATE_NODENAME_FLOW_FILE = "duplicate_nodename.flow";
+ private static final String DEPENDENCY_UNDEFINED_FLOW_FILE = "dependency_undefined.flow";
+ private static final String CYCLE_FOUND_FLOW = "cycle_found";
private static final String CYCLE_FOUND_ERROR = "Cycles found.";
+ private static final String SHELL_PWD = "invalid_jobprops:shell_pwd";
private Project project;
@Before
@@ -106,14 +110,44 @@ public class DirectoryYamlFlowLoaderTest {
}
@Test
- public void testLoadInvalidFlowYamlFiles() {
+ public void testLoadInvalidFlowYamlFileWithDuplicateNodeNames() {
final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
- loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_FLOW_YAML_DIR));
- checkFlowLoaderProperties(loader, 2, 2, 2);
- // Invalid flow 1: Dependency not found.
- checkFlowProperties(loader, INVALID_FLOW_1, 1, 3, 1, 3, DEPENDENCY_NOT_FOUND_ERROR);
- // Invalid flow 2: Cycles found.
- checkFlowProperties(loader, INVALID_FLOW_2, 1, 4, 1, 4, CYCLE_FOUND_ERROR);
+ loader.loadProjectFlow(this.project,
+ ExecutionsTestUtil.getFlowDir(DUPLICATE_NODENAME_YAML_DIR));
+ checkFlowLoaderProperties(loader, 1, 0, 0);
+ assertThat(loader.getErrors()).containsExactly(
+ "Failed to validate nodeBean for " + DUPLICATE_NODENAME_FLOW_FILE
+ + ". Duplicate nodes found or dependency undefined.");
+ }
+
+ @Test
+ public void testLoadInvalidFlowYamlFileWithUndefinedDependency() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project,
+ ExecutionsTestUtil.getFlowDir(DEPENDENCY_UNDEFINED_YAML_DIR));
+ checkFlowLoaderProperties(loader, 1, 0, 0);
+ assertThat(loader.getErrors()).containsExactly(
+ "Failed to validate nodeBean for " + DEPENDENCY_UNDEFINED_FLOW_FILE
+ + ". Duplicate nodes found or dependency undefined.");
+ }
+
+ @Test
+ public void testLoadInvalidFlowYamlFileWithCycle() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(CYCLE_FOUND_YAML_DIR));
+ checkFlowLoaderProperties(loader, 1, 1, 1);
+ checkFlowProperties(loader, CYCLE_FOUND_FLOW, 1, 4, 1, 4, CYCLE_FOUND_ERROR);
+ }
+
+ @Test
+ public void testLoadFlowYamlFileWithInvalidJobProps() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project,
+ ExecutionsTestUtil.getFlowDir(INVALID_JOBPROPS_YAML_DIR));
+ checkFlowLoaderProperties(loader, 1, 1, 1);
+ assertThat(loader.getErrors()).containsExactly(
+ SHELL_PWD + ": Xms value has exceeded the allowed limit (max Xms = "
+ + Constants.JobProperties.MAX_XMS_DEFAULT + ")");
}
@Test
diff --git a/test/execution-test-data/dependencyundefinedyamltest/dependency_undefined.project b/test/execution-test-data/dependencyundefinedyamltest/dependency_undefined.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/dependencyundefinedyamltest/dependency_undefined.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.flow b/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.flow
new file mode 100644
index 0000000..311f457
--- /dev/null
+++ b/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.flow
@@ -0,0 +1,28 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_echo
+
+ - name: shell_echo
+ type: command
+ dependsOn:
+ - shell_pwd
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_echo
+ type: command
+ dependsOn:
+ - shell_pwd
+ config:
+ command: echo "This is a duplicate echoed text."
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.project b/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.flow b/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.flow
new file mode 100644
index 0000000..53867ae
--- /dev/null
+++ b/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.flow
@@ -0,0 +1,22 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
+ Xms: 2G
+
diff --git a/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.project b/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0