azkaban-aplcache
Conditional workflow - Validate conditions (#1918) * Conditional …
8/27/2018 2:23:16 PM
3.52.03.53.0
Changes
Details
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 2af9ac8..86156d5 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -45,7 +45,17 @@ import org.slf4j.LoggerFactory;
*/
public class DirectoryYamlFlowLoader implements FlowLoader {
+ // Pattern to match job variables in condition expressions: ${jobName:variable}
+ public static final Pattern CONDITION_VARIABLE_REPLACEMENT_PATTERN = Pattern
+ .compile("\\$\\{([^:{}]+):([^:{}]+)\\}");
private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoader.class);
+ // Pattern to match conditionOnJobStatus macros, e.g. one_success, all_done
+ private static final Pattern CONDITION_ON_JOB_STATUS_PATTERN =
+ Pattern.compile("(?i)\\b(" + StringUtils.join(ConditionOnJobStatus.values(), "|") + ")\\b");
+ // Pattern to match a number or a string, e.g. 1234, "hello", 'foo'
+ private static final Pattern DIGIT_STRING_PATTERN = Pattern.compile("\\d+|'.*'|\".*\"");
+ // Valid operators in condition expressions: &&, ||, ==, !=, >, >=, <, <=
+ private static final String VALID_CONDITION_OPERATORS = "&&|\\|\\||==|!=|>|>=|<|<=";
private final Props props;
private final Set<String> errors = new HashSet<>();
@@ -147,7 +157,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
// Convert azkabanNodes to nodes inside the flow.
azkabanFlow.getNodes().values().stream()
- .map(n -> convertAzkabanNodeToNode(n, flowName, flowFile))
+ .map(n -> convertAzkabanNodeToNode(n, flowName, flowFile, azkabanFlow))
.forEach(n -> flow.addNode(n));
// Add edges for the flow.
@@ -164,11 +174,10 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
}
private Node convertAzkabanNodeToNode(final AzkabanNode azkabanNode, final String flowName,
- final File flowFile) {
+ final File flowFile, final AzkabanFlow azkabanFlow) {
final Node node = new Node(azkabanNode.getName());
node.setType(azkabanNode.getType());
- node.setCondition(azkabanNode.getCondition());
- setConditionOnJobStatus(node);
+ validateCondition(node, azkabanNode, azkabanFlow);
node.setPropsSource(flowFile.getName());
node.setJobSource(flowFile.getName());
@@ -222,25 +231,97 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
}
}
- private void setConditionOnJobStatus(final Node node) {
- String condition = node.getCondition();
- if (condition != null) {
- // Only values in the ConditionOnJobStatus enum can be matched by this pattern. Some examples:
- // Valid: all_done, one_success && ${jobA: param1} == 1, ALL_FAILED
- // Invalid: two_success, one_faileddd, {one_failed}
- final String patternString =
- "(?i)\\b(" + StringUtils.join(ConditionOnJobStatus.values(), "|") + ")\\b";
- final Pattern pattern = Pattern.compile(patternString);
- final Matcher matcher = pattern.matcher(condition);
-
- // Todo jamiesjc: need to add validation for condition
- while (matcher.find()) {
- logger.info("Found conditionOnJobStatus: " + matcher.group(1));
+ private void validateCondition(final Node node, final AzkabanNode azkabanNode,
+ final AzkabanFlow azkabanFlow) {
+ boolean valid = true;
+ boolean foundConditionOnJobStatus = false;
+ String condition = azkabanNode.getCondition();
+ if (condition == null) {
+ return;
+ }
+ // First, remove all the whitespaces and parenthesis ().
+ final String replacedCondition = condition.replaceAll("\\s+|\\(|\\)", "");
+ // Second, split the condition by operators &&, ||, ==, !=, >, >=, <, <=
+ final String[] operands = replacedCondition.split(VALID_CONDITION_OPERATORS);
+ // Third, check whether all the operands are valid: only conditionOnJobStatus macros, numbers,
+ // strings, and variable substitution ${jobName:param} are allowed.
+ for (int i = 0; i < operands.length; i++) {
+ final Matcher matcher = CONDITION_ON_JOB_STATUS_PATTERN.matcher(operands[i]);
+ if (matcher.matches()) {
+ this.logger.info("Operand " + operands[i] + " is a condition on job status.");
+ if (foundConditionOnJobStatus) {
+ this.errors.add("Invalid condition for " + node.getId()
+ + ": cannot combine more than one conditionOnJobStatus macros.");
+ valid = false;
+ }
+ foundConditionOnJobStatus = true;
node.setConditionOnJobStatus(ConditionOnJobStatus.fromString(matcher.group(1)));
condition = condition.replace(matcher.group(1), "true");
- logger.info("condition is " + condition);
- node.setCondition(condition);
+ } else {
+ if (operands[i].startsWith("!")) {
+ // Remove the operator '!' from the operand.
+ operands[i] = operands[i].substring(1);
+ }
+ if (operands[i].equals("")) {
+ this.errors
+ .add("Invalid condition for " + node.getId() + ": operand is an empty string.");
+ valid = false;
+ } else if (!DIGIT_STRING_PATTERN.matcher(operands[i]).matches() &&
+ !isValidVariableSubstitution(operands[i], azkabanNode, azkabanFlow)) {
+ valid = false;
+ }
+ }
+ }
+ if (valid) {
+ node.setCondition(condition);
+ } else {
+ this.logger.info("Condition of " + node.getId() + ": " + azkabanNode.getCondition()
+ + " is invalid, set it to false");
+ node.setCondition("false");
+ }
+ }
+
+ private boolean isValidVariableSubstitution(final String operand, final AzkabanNode azkabanNode,
+ final AzkabanFlow azkabanFlow) {
+ boolean result = false;
+ final Matcher matcher = CONDITION_VARIABLE_REPLACEMENT_PATTERN.matcher(operand);
+ if (matcher.matches()) {
+ final String jobName = matcher.group(1);
+ final AzkabanNode conditionNode = azkabanFlow.getNode(jobName);
+ if (conditionNode == null) {
+ this.errors.add("Invalid condition for " + azkabanNode.getName() + ": " + jobName
+ + " doesn't exist in the flow.");
+ }
+ // If a job defines condition on its descendant nodes, then that condition is invalid.
+ else if (isDescendantNode(conditionNode, azkabanNode, azkabanFlow)) {
+ this.errors.add("Invalid condition for " + azkabanNode.getName()
+ + ": should not define condition on its descendant node " + jobName + ".");
+ } else {
+ result = true;
+ }
+ } else {
+ this.errors.add("Invalid condition for " + azkabanNode.getName()
+ + ": cannot resolve the condition. Please check the syntax for supported conditions.");
+ }
+ return result;
+ }
+
+ private boolean isDescendantNode(final AzkabanNode current, final AzkabanNode target,
+ final AzkabanFlow azkabanFlow) {
+ // Check if the current node is a descendant of the target node.
+ if (current == null || target == null) {
+ return false;
+ } else if (current.getDependsOn() == null) {
+ return false;
+ } else if (current.getDependsOn().contains(target.getName())) {
+ return true;
+ } else {
+ for (final String nodeName : current.getDependsOn()) {
+ if (isDescendantNode(azkabanFlow.getNode(nodeName), target, azkabanFlow)) {
+ return true;
+ }
}
}
+ return false;
}
}
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 978b4ea..acecbda 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -23,6 +23,7 @@ import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -43,6 +44,7 @@ public class DirectoryYamlFlowLoaderTest {
private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
+ private static final String INVALID_CONDITION_YAML_DIR = "invalidconditionalflowyamltest";
private static final String BASIC_FLOW_1 = "basic_flow";
private static final String BASIC_FLOW_2 = "basic_flow2";
private static final String EMBEDDED_FLOW = "embedded_flow";
@@ -168,6 +170,23 @@ public class DirectoryYamlFlowLoaderTest {
checkFlowLoaderProperties(loader, 0, 0, 0);
}
+ @Test
+ public void testFlowYamlFileWithInvalidConditions() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_CONDITION_YAML_DIR));
+ checkFlowLoaderProperties(loader, 5, 5, 5);
+ Assert.assertTrue(
+ loader.getErrors().contains("Invalid condition for jobB: jobC doesn't exist in the flow."));
+ Assert.assertTrue(loader.getErrors().contains(
+ "Invalid condition for jobA: should not define condition on its descendant node jobD."));
+ Assert.assertTrue(
+ loader.getErrors().contains("Invalid condition for jobB: operand is an empty string."));
+ Assert.assertTrue(loader.getErrors().contains(
+ "Invalid condition for jobB: cannot resolve the condition. Please check the syntax for supported conditions."));
+ Assert.assertTrue(loader.getErrors().contains(
+ "Invalid condition for jobB: cannot combine more than one conditionOnJobStatus macros."));
+ }
+
private void checkFlowLoaderProperties(final DirectoryYamlFlowLoader loader, final int numError,
final int numFlowMap, final int numEdgeMap) {
assertThat(loader.getErrors().size()).isEqualTo(numError);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 53ee640..1ba50c8 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -20,6 +20,7 @@ import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
import static azkaban.execapp.ConditionalWorkflowUtils.FAILED;
import static azkaban.execapp.ConditionalWorkflowUtils.PENDING;
import static azkaban.execapp.ConditionalWorkflowUtils.checkConditionOnJobStatus;
+import static azkaban.project.DirectoryYamlFlowLoader.CONDITION_VARIABLE_REPLACEMENT_PATTERN;
import azkaban.Constants;
import azkaban.Constants.JobProperties;
@@ -77,7 +78,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
@@ -96,9 +96,6 @@ public class FlowRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout(
"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
- // Pattern to match job variables in condition expressions: ${jobName:variable}
- private static final Pattern VARIABLE_REPLACEMENT_PATTERN = Pattern
- .compile("\\$\\{([^:{}]+):([^:{}]+)\\}");
// We check update every 5 minutes, just in case things get stuck. But for the
// most part, we'll be idling.
private static final long CHECK_WAIT_MS = 5 * 60 * 1000;
@@ -920,7 +917,7 @@ public class FlowRunner extends EventHandler implements Runnable {
return true;
}
- final Matcher matcher = VARIABLE_REPLACEMENT_PATTERN.matcher(condition);
+ final Matcher matcher = CONDITION_VARIABLE_REPLACEMENT_PATTERN.matcher(condition);
String replaced = condition;
while (matcher.find()) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
index ca3565b..a87a5e1 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
@@ -154,6 +154,16 @@ public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
assertFlowStatus(flow, Status.FAILED);
}
+ /**
+ * JobB has defined "condition: var fImport = new JavaImporter(java.io.File); with(fImport) { var
+ * f = new File('new'); f.createNewFile(); }"
+ * Null ProtectionDomain will restrict this arbitrary code from creating a new file.
+ * However it will not kick in when the change for condition whitelisting is implemented.
+ * As a result, this test case will be ignored.
+ *
+ * @throws Exception the exception
+ */
+ @Ignore
@Test
public void runFlowOnArbitraryCondition() throws Exception {
final HashMap<String, String> flowProps = new HashMap<>();
diff --git a/test/execution-test-data/invalidconditionalflowyamltest/condition_on_descendant.flow b/test/execution-test-data/invalidconditionalflowyamltest/condition_on_descendant.flow
new file mode 100644
index 0000000..5b972f7
--- /dev/null
+++ b/test/execution-test-data/invalidconditionalflowyamltest/condition_on_descendant.flow
@@ -0,0 +1,30 @@
+nodes:
+ - name: jobD
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ dependsOn:
+ - jobB
+ - jobC
+
+ - name: jobC
+ type: test
+ config:
+ fail: false
+ seconds: 0
+
+ - name: jobB
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ dependsOn:
+ - jobA
+
+ - name: jobA
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: ${jobD:param} == 'foo'
diff --git a/test/execution-test-data/invalidconditionalflowyamltest/conditional_flow.project b/test/execution-test-data/invalidconditionalflowyamltest/conditional_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/invalidconditionalflowyamltest/conditional_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/invalidconditionalflowyamltest/invalid_variable_substitution.flow b/test/execution-test-data/invalidconditionalflowyamltest/invalid_variable_substitution.flow
new file mode 100644
index 0000000..78e72ee
--- /dev/null
+++ b/test/execution-test-data/invalidconditionalflowyamltest/invalid_variable_substitution.flow
@@ -0,0 +1,16 @@
+nodes:
+ - name: jobB
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: ($<jobA:param> == 'foo' || ${jobA:param.{param2}}) && !one_success && print("hello!")
+
+ dependsOn:
+ - jobA
+
+ - name: jobA
+ type: test
+ config:
+ fail: false
+ seconds: 0
diff --git a/test/execution-test-data/invalidconditionalflowyamltest/job_not_exist.flow b/test/execution-test-data/invalidconditionalflowyamltest/job_not_exist.flow
new file mode 100644
index 0000000..56067e9
--- /dev/null
+++ b/test/execution-test-data/invalidconditionalflowyamltest/job_not_exist.flow
@@ -0,0 +1,16 @@
+nodes:
+ - name: jobB
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: ${jobC:param} == 'foo'
+
+ dependsOn:
+ - jobA
+
+ - name: jobA
+ type: test
+ config:
+ fail: false
+ seconds: 0
diff --git a/test/execution-test-data/invalidconditionalflowyamltest/multiple_condition_on_job_status_macros.flow b/test/execution-test-data/invalidconditionalflowyamltest/multiple_condition_on_job_status_macros.flow
new file mode 100644
index 0000000..aced2e2
--- /dev/null
+++ b/test/execution-test-data/invalidconditionalflowyamltest/multiple_condition_on_job_status_macros.flow
@@ -0,0 +1,17 @@
+nodes:
+ - name: jobB
+ type: test
+ config:
+ fail: false
+ seconds: 0
+
+ condition: one_success && all_success || all_failed
+
+ dependsOn:
+ - jobA
+
+ - name: jobA
+ type: test
+ config:
+ fail: false
+ seconds: 0