azkaban-aplcache

Conditional workflow - Validate conditions (#1918) * Conditional

8/27/2018 2:23:16 PM
3.52.03.53.0

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 2af9ac8..86156d5 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -45,7 +45,17 @@ import org.slf4j.LoggerFactory;
  */
 public class DirectoryYamlFlowLoader implements FlowLoader {
 
+  // Pattern to match job variables in condition expressions: ${jobName:variable}
+  public static final Pattern CONDITION_VARIABLE_REPLACEMENT_PATTERN = Pattern
+      .compile("\\$\\{([^:{}]+):([^:{}]+)\\}");
   private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoader.class);
+  // Pattern to match conditionOnJobStatus macros, e.g. one_success, all_done
+  private static final Pattern CONDITION_ON_JOB_STATUS_PATTERN =
+      Pattern.compile("(?i)\\b(" + StringUtils.join(ConditionOnJobStatus.values(), "|") + ")\\b");
+  // Pattern to match a number or a string, e.g. 1234, "hello", 'foo'
+  private static final Pattern DIGIT_STRING_PATTERN = Pattern.compile("\\d+|'.*'|\".*\"");
+  // Valid operators in condition expressions: &&, ||, ==, !=, >, >=, <, <=
+  private static final String VALID_CONDITION_OPERATORS = "&&|\\|\\||==|!=|>|>=|<|<=";
 
   private final Props props;
   private final Set<String> errors = new HashSet<>();
@@ -147,7 +157,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
 
     // Convert azkabanNodes to nodes inside the flow.
     azkabanFlow.getNodes().values().stream()
-        .map(n -> convertAzkabanNodeToNode(n, flowName, flowFile))
+        .map(n -> convertAzkabanNodeToNode(n, flowName, flowFile, azkabanFlow))
         .forEach(n -> flow.addNode(n));
 
     // Add edges for the flow.
@@ -164,11 +174,10 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
   }
 
   private Node convertAzkabanNodeToNode(final AzkabanNode azkabanNode, final String flowName,
-      final File flowFile) {
+      final File flowFile, final AzkabanFlow azkabanFlow) {
     final Node node = new Node(azkabanNode.getName());
     node.setType(azkabanNode.getType());
-    node.setCondition(azkabanNode.getCondition());
-    setConditionOnJobStatus(node);
+    validateCondition(node, azkabanNode, azkabanFlow);
     node.setPropsSource(flowFile.getName());
     node.setJobSource(flowFile.getName());
 
@@ -222,25 +231,97 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
     }
   }
 
-  private void setConditionOnJobStatus(final Node node) {
-    String condition = node.getCondition();
-    if (condition != null) {
-      // Only values in the ConditionOnJobStatus enum can be matched by this pattern. Some examples:
-      // Valid: all_done, one_success && ${jobA: param1} == 1, ALL_FAILED
-      // Invalid: two_success, one_faileddd, {one_failed}
-      final String patternString =
-          "(?i)\\b(" + StringUtils.join(ConditionOnJobStatus.values(), "|") + ")\\b";
-      final Pattern pattern = Pattern.compile(patternString);
-      final Matcher matcher = pattern.matcher(condition);
-
-      // Todo jamiesjc: need to add validation for condition
-      while (matcher.find()) {
-        logger.info("Found conditionOnJobStatus: " + matcher.group(1));
+  private void validateCondition(final Node node, final AzkabanNode azkabanNode,
+      final AzkabanFlow azkabanFlow) {
+    boolean valid = true;
+    boolean foundConditionOnJobStatus = false;
+    String condition = azkabanNode.getCondition();
+    if (condition == null) {
+      return;
+    }
+    // First, remove all the whitespaces and parenthesis ().
+    final String replacedCondition = condition.replaceAll("\\s+|\\(|\\)", "");
+    // Second, split the condition by operators &&, ||, ==, !=, >, >=, <, <=
+    final String[] operands = replacedCondition.split(VALID_CONDITION_OPERATORS);
+    // Third, check whether all the operands are valid: only conditionOnJobStatus macros, numbers,
+    // strings, and variable substitution ${jobName:param} are allowed.
+    for (int i = 0; i < operands.length; i++) {
+      final Matcher matcher = CONDITION_ON_JOB_STATUS_PATTERN.matcher(operands[i]);
+      if (matcher.matches()) {
+        this.logger.info("Operand " + operands[i] + " is a condition on job status.");
+        if (foundConditionOnJobStatus) {
+          this.errors.add("Invalid condition for " + node.getId()
+              + ": cannot combine more than one conditionOnJobStatus macros.");
+          valid = false;
+        }
+        foundConditionOnJobStatus = true;
         node.setConditionOnJobStatus(ConditionOnJobStatus.fromString(matcher.group(1)));
         condition = condition.replace(matcher.group(1), "true");
-        logger.info("condition is " + condition);
-        node.setCondition(condition);
+      } else {
+        if (operands[i].startsWith("!")) {
+          // Remove the operator '!' from the operand.
+          operands[i] = operands[i].substring(1);
+        }
+        if (operands[i].equals("")) {
+          this.errors
+              .add("Invalid condition for " + node.getId() + ": operand is an empty string.");
+          valid = false;
+        } else if (!DIGIT_STRING_PATTERN.matcher(operands[i]).matches() &&
+            !isValidVariableSubstitution(operands[i], azkabanNode, azkabanFlow)) {
+          valid = false;
+        }
+      }
+    }
+    if (valid) {
+      node.setCondition(condition);
+    } else {
+      this.logger.info("Condition of " + node.getId() + ": " + azkabanNode.getCondition()
+          + " is invalid, set it to false");
+      node.setCondition("false");
+    }
+  }
+
+  private boolean isValidVariableSubstitution(final String operand, final AzkabanNode azkabanNode,
+      final AzkabanFlow azkabanFlow) {
+    boolean result = false;
+    final Matcher matcher = CONDITION_VARIABLE_REPLACEMENT_PATTERN.matcher(operand);
+    if (matcher.matches()) {
+      final String jobName = matcher.group(1);
+      final AzkabanNode conditionNode = azkabanFlow.getNode(jobName);
+      if (conditionNode == null) {
+        this.errors.add("Invalid condition for " + azkabanNode.getName() + ": " + jobName
+            + " doesn't exist in the flow.");
+      }
+      // If a job defines condition on its descendant nodes, then that condition is invalid.
+      else if (isDescendantNode(conditionNode, azkabanNode, azkabanFlow)) {
+        this.errors.add("Invalid condition for " + azkabanNode.getName()
+            + ": should not define condition on its descendant node " + jobName + ".");
+      } else {
+        result = true;
+      }
+    } else {
+      this.errors.add("Invalid condition for " + azkabanNode.getName()
+          + ": cannot resolve the condition. Please check the syntax for supported conditions.");
+    }
+    return result;
+  }
+
+  private boolean isDescendantNode(final AzkabanNode current, final AzkabanNode target,
+      final AzkabanFlow azkabanFlow) {
+    // Check if the current node is a descendant of the target node.
+    if (current == null || target == null) {
+      return false;
+    } else if (current.getDependsOn() == null) {
+      return false;
+    } else if (current.getDependsOn().contains(target.getName())) {
+      return true;
+    } else {
+      for (final String nodeName : current.getDependsOn()) {
+        if (isDescendantNode(azkabanFlow.getNode(nodeName), target, azkabanFlow)) {
+          return true;
+        }
       }
     }
+    return false;
   }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 978b4ea..acecbda 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -23,6 +23,7 @@ import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -43,6 +44,7 @@ public class DirectoryYamlFlowLoaderTest {
   private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
   private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
   private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
+  private static final String INVALID_CONDITION_YAML_DIR = "invalidconditionalflowyamltest";
   private static final String BASIC_FLOW_1 = "basic_flow";
   private static final String BASIC_FLOW_2 = "basic_flow2";
   private static final String EMBEDDED_FLOW = "embedded_flow";
@@ -168,6 +170,23 @@ public class DirectoryYamlFlowLoaderTest {
     checkFlowLoaderProperties(loader, 0, 0, 0);
   }
 
+  @Test
+  public void testFlowYamlFileWithInvalidConditions() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_CONDITION_YAML_DIR));
+    checkFlowLoaderProperties(loader, 5, 5, 5);
+    Assert.assertTrue(
+        loader.getErrors().contains("Invalid condition for jobB: jobC doesn't exist in the flow."));
+    Assert.assertTrue(loader.getErrors().contains(
+        "Invalid condition for jobA: should not define condition on its descendant node jobD."));
+    Assert.assertTrue(
+        loader.getErrors().contains("Invalid condition for jobB: operand is an empty string."));
+    Assert.assertTrue(loader.getErrors().contains(
+        "Invalid condition for jobB: cannot resolve the condition. Please check the syntax for supported conditions."));
+    Assert.assertTrue(loader.getErrors().contains(
+        "Invalid condition for jobB: cannot combine more than one conditionOnJobStatus macros."));
+  }
+
   private void checkFlowLoaderProperties(final DirectoryYamlFlowLoader loader, final int numError,
       final int numFlowMap, final int numEdgeMap) {
     assertThat(loader.getErrors().size()).isEqualTo(numError);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 53ee640..1ba50c8 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -20,6 +20,7 @@ import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
 import static azkaban.execapp.ConditionalWorkflowUtils.FAILED;
 import static azkaban.execapp.ConditionalWorkflowUtils.PENDING;
 import static azkaban.execapp.ConditionalWorkflowUtils.checkConditionOnJobStatus;
+import static azkaban.project.DirectoryYamlFlowLoader.CONDITION_VARIABLE_REPLACEMENT_PATTERN;
 
 import azkaban.Constants;
 import azkaban.Constants.JobProperties;
@@ -77,7 +78,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import javax.script.ScriptEngine;
 import javax.script.ScriptEngineManager;
 import javax.script.ScriptException;
@@ -96,9 +96,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   private static final Layout DEFAULT_LAYOUT = new PatternLayout(
       "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
-  // Pattern to match job variables in condition expressions: ${jobName:variable}
-  private static final Pattern VARIABLE_REPLACEMENT_PATTERN = Pattern
-      .compile("\\$\\{([^:{}]+):([^:{}]+)\\}");
   // We check update every 5 minutes, just in case things get stuck. But for the
   // most part, we'll be idling.
   private static final long CHECK_WAIT_MS = 5 * 60 * 1000;
@@ -920,7 +917,7 @@ public class FlowRunner extends EventHandler implements Runnable {
       return true;
     }
 
-    final Matcher matcher = VARIABLE_REPLACEMENT_PATTERN.matcher(condition);
+    final Matcher matcher = CONDITION_VARIABLE_REPLACEMENT_PATTERN.matcher(condition);
     String replaced = condition;
 
     while (matcher.find()) {
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
index ca3565b..a87a5e1 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
@@ -154,6 +154,16 @@ public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
     assertFlowStatus(flow, Status.FAILED);
   }
 
+  /**
+   * JobB has defined "condition: var fImport = new JavaImporter(java.io.File); with(fImport) { var
+   * f = new File('new'); f.createNewFile(); }"
+   * Null ProtectionDomain will restrict this arbitrary code from creating a new file.
+   * However it will not kick in when the change for condition whitelisting is implemented.
+   * As a result, this test case will be ignored.
+   *
+   * @throws Exception the exception
+   */
+  @Ignore
   @Test
   public void runFlowOnArbitraryCondition() throws Exception {
     final HashMap<String, String> flowProps = new HashMap<>();
diff --git a/test/execution-test-data/invalidconditionalflowyamltest/condition_on_descendant.flow b/test/execution-test-data/invalidconditionalflowyamltest/condition_on_descendant.flow
new file mode 100644
index 0000000..5b972f7
--- /dev/null
+++ b/test/execution-test-data/invalidconditionalflowyamltest/condition_on_descendant.flow
@@ -0,0 +1,30 @@
+nodes:
+  - name: jobD
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    dependsOn:
+      - jobB
+      - jobC
+
+  - name: jobC
+    type: test
+    config:
+      fail: false
+      seconds: 0
+
+  - name: jobB
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    dependsOn:
+      - jobA
+
+  - name: jobA
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: ${jobD:param} == 'foo'
diff --git a/test/execution-test-data/invalidconditionalflowyamltest/conditional_flow.project b/test/execution-test-data/invalidconditionalflowyamltest/conditional_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/invalidconditionalflowyamltest/conditional_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/invalidconditionalflowyamltest/invalid_variable_substitution.flow b/test/execution-test-data/invalidconditionalflowyamltest/invalid_variable_substitution.flow
new file mode 100644
index 0000000..78e72ee
--- /dev/null
+++ b/test/execution-test-data/invalidconditionalflowyamltest/invalid_variable_substitution.flow
@@ -0,0 +1,16 @@
+nodes:
+  - name: jobB
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: ($<jobA:param> == 'foo' || ${jobA:param.{param2}}) && !one_success && print("hello!")
+
+    dependsOn:
+      - jobA
+
+  - name: jobA
+    type: test
+    config:
+      fail: false
+      seconds: 0
diff --git a/test/execution-test-data/invalidconditionalflowyamltest/job_not_exist.flow b/test/execution-test-data/invalidconditionalflowyamltest/job_not_exist.flow
new file mode 100644
index 0000000..56067e9
--- /dev/null
+++ b/test/execution-test-data/invalidconditionalflowyamltest/job_not_exist.flow
@@ -0,0 +1,16 @@
+nodes:
+  - name: jobB
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: ${jobC:param} == 'foo'
+
+    dependsOn:
+      - jobA
+
+  - name: jobA
+    type: test
+    config:
+      fail: false
+      seconds: 0
diff --git a/test/execution-test-data/invalidconditionalflowyamltest/multiple_condition_on_job_status_macros.flow b/test/execution-test-data/invalidconditionalflowyamltest/multiple_condition_on_job_status_macros.flow
new file mode 100644
index 0000000..aced2e2
--- /dev/null
+++ b/test/execution-test-data/invalidconditionalflowyamltest/multiple_condition_on_job_status_macros.flow
@@ -0,0 +1,17 @@
+nodes:
+  - name: jobB
+    type: test
+    config:
+      fail: false
+      seconds: 0
+
+    condition: one_success && all_success || all_failed
+
+    dependsOn:
+      - jobA
+
+  - name: jobA
+    type: test
+    config:
+      fail: false
+      seconds: 0