azkaban-aplcache

POC for conditional workflow design. Condition on runtime

7/10/2018 1:56:37 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index e18528e..6db3fb8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -43,6 +43,7 @@ public class ExecutableNode {
   public static final String INNODES_PARAM = "inNodes";
   public static final String OUTNODES_PARAM = "outNodes";
   public static final String TYPE_PARAM = "type";
+  public static final String CONDITION_PARAM = "condition";
   public static final String PROPS_SOURCE_PARAM = "propSource";
   public static final String JOB_SOURCE_PARAM = "jobSource";
   public static final String OUTPUT_PROPS_PARAM = "outputProps";
@@ -66,6 +67,7 @@ public class ExecutableNode {
   private Props outputProps;
   private long delayExecution = 0;
   private ArrayList<ExecutionAttempt> pastAttempts = null;
+  private String condition;
 
   // Transient. These values aren't saved, but rediscovered.
   private ExecutableFlowBase parentFlow;
@@ -77,16 +79,18 @@ public class ExecutableNode {
   }
 
   public ExecutableNode(final Node node, final ExecutableFlowBase parent) {
-    this(node.getId(), node.getType(), node.getJobSource(), node
+    this(node.getId(), node.getType(), node.getCondition(), node.getJobSource(), node
         .getPropsSource(), parent);
   }
 
-  public ExecutableNode(final String id, final String type, final String jobSource,
+  public ExecutableNode(final String id, final String type, final String condition, final String
+      jobSource,
       final String propsSource, final ExecutableFlowBase parent) {
     this.id = id;
     this.jobSource = jobSource;
     this.propsSource = propsSource;
     this.type = type;
+    this.condition = condition;
     setParentFlow(parent);
   }
 
@@ -284,6 +288,7 @@ public class ExecutableNode {
     objMap.put(ENDTIME_PARAM, this.endTime);
     objMap.put(UPDATETIME_PARAM, this.updateTime);
     objMap.put(TYPE_PARAM, this.type);
+    objMap.put(CONDITION_PARAM, this.condition);
     objMap.put(ATTEMPT_PARAM, this.attempt);
 
     if (this.inNodes != null && !this.inNodes.isEmpty()) {
@@ -318,6 +323,7 @@ public class ExecutableNode {
       final TypedMapWrapper<String, Object> wrappedMap) {
     this.id = wrappedMap.getString(ID_PARAM);
     this.type = wrappedMap.getString(TYPE_PARAM);
+    this.condition = wrappedMap.getString(CONDITION_PARAM);
     this.status = Status.valueOf(wrappedMap.getString(STATUS_PARAM));
     this.startTime = wrappedMap.getLong(STARTTIME_PARAM);
     this.endTime = wrappedMap.getLong(ENDTIME_PARAM);
@@ -454,4 +460,12 @@ public class ExecutableNode {
   public long getRetryBackoff() {
     return this.inputProps.getLong("retry.backoff", 0);
   }
+
+  public String getCondition() {
+    return this.condition;
+  }
+
+  public void setCondition(final String condition) {
+    this.condition = condition;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/flow/Node.java b/azkaban-common/src/main/java/azkaban/flow/Node.java
index 7ea700a..71b68a3 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Node.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Node.java
@@ -34,6 +34,8 @@ public class Node {
 
   private String embeddedFlowId;
 
+  private String condition = null;
+
   public Node(final String id) {
     this.id = id;
   }
@@ -57,11 +59,13 @@ public class Node {
     final String jobType = (String) mapObj.get("jobType");
 
     final String embeddedFlowId = (String) mapObj.get("embeddedFlowId");
+    final String condition = (String) mapObj.get("condition");
 
     node.setJobSource(jobSource);
     node.setPropsSource(propSource);
     node.setType(jobType);
     node.setEmbeddedFlowId(embeddedFlowId);
+    node.setCondition(condition);
 
     final Integer expectedRuntime = (Integer) mapObj.get("expectedRuntime");
     if (expectedRuntime != null) {
@@ -175,7 +179,16 @@ public class Node {
     }
     layoutInfo.put("level", this.level);
     objMap.put("layout", layoutInfo);
+    objMap.put("condition", this.condition);
 
     return objMap;
   }
+
+  public String getCondition() {
+    return this.condition;
+  }
+
+  public void setCondition(final String condition) {
+    this.condition = condition;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
index 477ebb3..62a6c32 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -37,9 +37,10 @@ public class AzkabanFlow extends AzkabanNode {
   private final Map<String, AzkabanNode> nodes;
   private final FlowTrigger flowTrigger;
 
-  private AzkabanFlow(final String name, final Props props, final Map<String, AzkabanNode> nodes,
-      final List<String> dependsOn, final FlowTrigger flowTrigger) {
-    super(name, Constants.FLOW_NODE_TYPE, props, dependsOn);
+  private AzkabanFlow(final String name, final Props props, final String condition,
+      final Map<String, AzkabanNode> nodes, final List<String> dependsOn,
+      final FlowTrigger flowTrigger) {
+    super(name, Constants.FLOW_NODE_TYPE, props, condition, dependsOn);
     this.nodes = nodes;
     this.flowTrigger = flowTrigger;
   }
@@ -60,6 +61,7 @@ public class AzkabanFlow extends AzkabanNode {
 
     private String name;
     private Props props;
+    private String condition;
     private List<String> dependsOn;
     private Map<String, AzkabanNode> nodes;
     private FlowTrigger flowTrigger;
@@ -74,6 +76,11 @@ public class AzkabanFlow extends AzkabanNode {
       return this;
     }
 
+    public AzkabanFlowBuilder condition(final String condition) {
+      this.condition = condition;
+      return this;
+    }
+
     public AzkabanFlowBuilder dependsOn(final List<String> dependsOn) {
       this.dependsOn = dependsOn == null
           ? Collections.emptyList()
@@ -96,7 +103,8 @@ public class AzkabanFlow extends AzkabanNode {
     }
 
     public AzkabanFlow build() {
-      return new AzkabanFlow(this.name, this.props, this.nodes, this.dependsOn, this.flowTrigger);
+      return new AzkabanFlow(this.name, this.props, this.condition, this.nodes, this.dependsOn, this
+          .flowTrigger);
     }
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
index 128e93f..ff2e795 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
@@ -29,8 +29,8 @@ import java.util.List;
 public class AzkabanJob extends AzkabanNode {
 
   private AzkabanJob(final String name, final String type, final Props props,
-      final List<String> dependsOn) {
-    super(name, type, props, dependsOn);
+      final String condition, final List<String> dependsOn) {
+    super(name, type, props, condition, dependsOn);
   }
 
   public static class AzkabanJobBuilder {
@@ -38,6 +38,7 @@ public class AzkabanJob extends AzkabanNode {
     private String name;
     private String type;
     private Props props;
+    private String condition;
     private List<String> dependsOn;
 
     public AzkabanJobBuilder name(final String name) {
@@ -55,6 +56,11 @@ public class AzkabanJob extends AzkabanNode {
       return this;
     }
 
+    public AzkabanJobBuilder condition(final String condition) {
+      this.condition = condition;
+      return this;
+    }
+
     public AzkabanJobBuilder dependsOn(final List<String> dependsOn) {
       // A node may or may not have dependencies.
       this.dependsOn = dependsOn == null
@@ -64,7 +70,7 @@ public class AzkabanJob extends AzkabanNode {
     }
 
     public AzkabanJob build() {
-      return new AzkabanJob(this.name, this.type, this.props, this.dependsOn);
+      return new AzkabanJob(this.name, this.type, this.props, this.condition, this.dependsOn);
     }
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
index 079cf43..8cfc289 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
@@ -30,13 +30,16 @@ public abstract class AzkabanNode {
   protected final String name;
   protected final String type;
   protected final Props props;
+  protected final String condition;
   protected final List<String> dependsOn;
 
-  public AzkabanNode(final String name, final String type, final Props props, final List<String>
+  public AzkabanNode(final String name, final String type, final Props props, final String
+      condition, final List<String>
       dependsOn) {
     this.name = requireNonNull(name);
     this.type = requireNonNull(type);
     this.props = requireNonNull(props);
+    this.condition = condition;
     this.dependsOn = dependsOn;
   }
 
@@ -52,6 +55,10 @@ public abstract class AzkabanNode {
     return this.props;
   }
 
+  public String getCondition() {
+    return this.condition;
+  }
+
   public List<String> getDependsOn() {
     return this.dependsOn;
   }
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 91bc74b..f4c6765 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -163,6 +163,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
       final File flowFile) {
     final Node node = new Node(azkabanNode.getName());
     node.setType(azkabanNode.getType());
+    node.setCondition(azkabanNode.getCondition());
     node.setPropsSource(flowFile.getName());
     node.setJobSource(flowFile.getName());
 
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBean.java b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
index 370a8b9..f518e35 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
@@ -32,6 +32,7 @@ public class NodeBean implements Serializable {
   private Map<String, String> config;
   private List<String> dependsOn;
   private String type;
+  private String condition;
   private List<NodeBean> nodes;
   private FlowTriggerBean trigger;
 
@@ -67,6 +68,14 @@ public class NodeBean implements Serializable {
     this.type = type;
   }
 
+  public String getCondition() {
+    return this.condition;
+  }
+
+  public void setCondition(final String condition) {
+    this.condition = condition;
+  }
+
   public List<NodeBean> getNodes() {
     return this.nodes;
   }
@@ -96,6 +105,7 @@ public class NodeBean implements Serializable {
         ", config=" + this.config +
         ", dependsOn=" + this.dependsOn +
         ", type='" + this.type + '\'' +
+        ", condition='" + this.condition + '\'' +
         ", nodes=" + this.nodes +
         ", trigger=" + this.trigger +
         '}';
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 0375f64..323a68f 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -77,6 +77,7 @@ public class NodeBeanLoader {
       return new AzkabanFlow.AzkabanFlowBuilder()
           .name(nodeBean.getName())
           .props(nodeBean.getProps())
+          .condition(nodeBean.getCondition())
           .dependsOn(nodeBean.getDependsOn())
           .nodes(nodeBean.getNodes().stream().map(this::toAzkabanNode).collect(Collectors.toList()))
           .flowTrigger(toFlowTrigger(nodeBean.getTrigger()))
@@ -85,6 +86,7 @@ public class NodeBeanLoader {
       return new AzkabanJob.AzkabanJobBuilder()
           .name(nodeBean.getName())
           .props(nodeBean.getProps())
+          .condition(nodeBean.getCondition())
           .type(nodeBean.getType())
           .dependsOn(nodeBean.getDependsOn())
           .build();
diff --git a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
index d886ffd..8f85c7b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
 import azkaban.flow.CommonJobProperties;
 import azkaban.jobExecutor.AbstractProcessJob;
 import azkaban.utils.Props;
+import java.io.File;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.log4j.Logger;
@@ -82,6 +83,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
 
   @Override
   public void run() throws Exception {
+    final File[] propFiles = initPropsFiles();
     final String nestedFlowPath =
         this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
     final String jobIdPrefix = this.getJobProps().getString(JOB_ID_PREFIX, null);
@@ -100,6 +102,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
     if (this.jobProps.getBoolean("fail", false)) {
       final int passRetry = this.jobProps.getInt("passRetry", -1);
       if (passRetry > 0 && passRetry < this.jobProps.getInt(JOB_ATTEMPT)) {
+        generateProperties(propFiles[1]);
         succeedJob();
       } else {
         failJob();
@@ -119,6 +122,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
           }
         }
         if (this.jobProps.containsKey("fail")) {
+          generateProperties(propFiles[1]);
           succeedJob();
         }
 
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 978b4ea..1e0eb37 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -42,10 +42,13 @@ public class DirectoryYamlFlowLoaderTest {
   private static final String DUPLICATE_NODENAME_YAML_DIR = "duplicatenodenamesyamltest";
   private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
   private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
+  private static final String CONDITIONAL_FLOW_YAML_DIR = "conditionalflowyamltest";
   private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
   private static final String BASIC_FLOW_1 = "basic_flow";
   private static final String BASIC_FLOW_2 = "basic_flow2";
   private static final String EMBEDDED_FLOW = "embedded_flow";
+  private static final String CONDITIONAL_FLOW_1 = "conditional_flow1";
+  private static final String CONDITIONAL_FLOW_2 = "conditional_flow2";
   private static final String EMBEDDED_FLOW_1 = "embedded_flow" + Constants.PATH_DELIMITER +
       "embedded_flow1";
   private static final String EMBEDDED_FLOW_2 =
@@ -168,6 +171,16 @@ public class DirectoryYamlFlowLoaderTest {
     checkFlowLoaderProperties(loader, 0, 0, 0);
   }
 
+  @Test
+  public void testLoadConditionalWorkflowYamlFile() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project,
+        ExecutionsTestUtil.getFlowDir(CONDITIONAL_FLOW_YAML_DIR));
+    checkFlowLoaderProperties(loader, 0, 2, 2);
+    checkFlowProperties(loader, CONDITIONAL_FLOW_1, 0, 4, 1, 4, null);
+    checkFlowProperties(loader, CONDITIONAL_FLOW_2, 0, 4, 1, 4, null);
+  }
+
   private void checkFlowLoaderProperties(final DirectoryYamlFlowLoader loader, final int numError,
       final int numFlowMap, final int numEdgeMap) {
     assertThat(loader.getErrors().size()).isEqualTo(numError);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 831a557..54b4342 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -55,6 +55,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.io.Files;
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -68,6 +69,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
@@ -83,6 +89,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 
   private static final Layout DEFAULT_LAYOUT = new PatternLayout(
       "%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+  // Pattern to match job variables in condition expressions: ${jobName:variable}
+  private static final Pattern VARIABLE_REPLACEMENT_PATTERN = Pattern
+      .compile("\\$\\{([^:{}]+):([^:{}]+)\\}");
   // We check update every 5 minutes, just in case things get stuck. But for the
   // most part, we'll be idling.
   private static final long CHECK_WAIT_MS = 5 * 60 * 1000;
@@ -174,6 +183,21 @@ public class FlowRunner extends EventHandler implements Runnable {
     this.azkabanEventReporter = azkabanEventReporter;
   }
 
+  private File findPropsFileForJob(final String prefix) {
+    final File[] files = this.getExecutionDir().listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(final File dir, final String name) {
+        return name.startsWith(prefix) && name.endsWith("tmp");
+      }
+    });
+
+    if (files == null || files.length == 0) {
+      this.logger.info("Not able to find props file for job " + prefix);
+      return null;
+    }
+    return files[0];
+  }
+
   public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
     this.watcher = watcher;
     return this;
@@ -852,6 +876,10 @@ public class FlowRunner extends EventHandler implements Runnable {
       }
     }
 
+    if (!isConditionMet(node)) {
+      return Status.CANCELLED;
+    }
+
     // If it's disabled but ready to run, we want to make sure it continues
     // being disabled.
     if (node.getStatus() == Status.DISABLED
@@ -873,6 +901,64 @@ public class FlowRunner extends EventHandler implements Runnable {
     return Status.READY;
   }
 
+  private Boolean isConditionMet(final ExecutableNode node) {
+    final String condition = node.getCondition();
+    if (condition == null) {
+      return true;
+    }
+
+    final Matcher matcher = VARIABLE_REPLACEMENT_PATTERN.matcher(condition);
+    String replaced = condition;
+
+    while (matcher.find()) {
+      final String value = findValueForJobVariable(matcher.group(1), matcher.group(2));
+      if (value != null) {
+        replaced = replaced.replace(matcher.group(), "'" + value + "'");
+      }
+      this.logger.info("Condition is " + replaced);
+    }
+
+    // Evaluate string expression using script engine
+    return evaluateExpression(replaced);
+  }
+
+  private String findValueForJobVariable(final String jobName, final String variable) {
+    // Get props from job props tmp file
+    final File jobPropsFile = findPropsFileForJob(jobName + "_props");
+    if (jobPropsFile != null) {
+      try {
+        final Props jobProps = new Props(null, jobPropsFile);
+        if (jobProps.containsKey(variable)) {
+          return jobProps.get(variable);
+        }
+      } catch (final IOException e) {
+        this.logger.error("Not able to load props from job props file " + jobPropsFile
+            .getAbsolutePath());
+      }
+    }
+
+    // Get job output props
+    final Props outputProps = this.getExecutableFlow().getExecutableNode(jobName).getOutputProps();
+    if (outputProps != null && outputProps.containsKey(variable)) {
+      return outputProps.get(variable);
+    }
+
+    return null;
+  }
+
+  private boolean evaluateExpression(final String expression) {
+    boolean result = false;
+    try {
+      final ScriptEngineManager sem = new ScriptEngineManager();
+      final ScriptEngine se = sem.getEngineByName("JavaScript");
+      result = (boolean) se.eval(expression);
+      this.logger.info("Evaluate expression result: " + result);
+    } catch (final ScriptException e) {
+      this.logger.error("Invalid expression.", e);
+    }
+    return result;
+  }
+
   private Props collectOutputProps(final ExecutableNode node) {
     Props previousOutput = null;
     // Iterate the in nodes again and create the dependencies
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
new file mode 100644
index 0000000..d33fb77
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.InteractiveTestJob;
+import azkaban.executor.Status;
+import azkaban.project.Project;
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
+import java.io.File;
+import java.util.HashMap;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
+
+  private static final String FLOW_YAML_DIR = "conditionalflowyamltest";
+  private static final String FLOW_NAME_1 = "conditional_flow1";
+  private static final String FLOW_NAME_2 = "conditional_flow2";
+  private static final String FLOW_YAML_FILE_1 = FLOW_NAME_1 + ".flow";
+  private static final String FLOW_YAML_FILE_2 = FLOW_NAME_2 + ".flow";
+  private FlowRunnerTestUtil testUtil;
+  private Project project;
+
+  @Before
+  public void setUp() throws Exception {
+    this.testUtil = new FlowRunnerTestUtil(FLOW_YAML_DIR, this.temporaryFolder);
+    this.project = this.testUtil.getProject();
+  }
+
+  @Test
+  public void runFlowOnJobPropsCondition() throws Exception {
+
+    when(this.testUtil.getProjectLoader()
+        .getLatestFlowVersion(this.project.getId(), this.project.getVersion(), FLOW_YAML_FILE_1))
+        .thenReturn(1);
+    when(this.testUtil.getProjectLoader()
+        .getUploadedFlowFile(eq(this.project.getId()), eq(this.project.getVersion()),
+            eq(FLOW_YAML_FILE_1),
+            eq(1), any(File.class)))
+        .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE_1));
+
+    final HashMap<String, String> flowProps = new HashMap<>();
+    flowProps.put("azkaban.server.name", "foo");
+    final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME_1, flowProps);
+    final ExecutableFlow flow = runner.getExecutableFlow();
+
+    FlowRunnerTestUtil.startThread(runner);
+
+    assertStatus(flow, "jobA", Status.SUCCEEDED);
+    assertStatus(flow, "jobB", Status.SUCCEEDED);
+    assertStatus(flow, "jobC", Status.CANCELLED);
+    assertStatus(flow, "jobD", Status.CANCELLED);
+    assertFlowStatus(flow, Status.KILLED);
+  }
+
+  @Test
+  public void runFlowOnJobOutputCondition() throws Exception {
+
+    when(this.testUtil.getProjectLoader()
+        .getLatestFlowVersion(this.project.getId(), this.project.getVersion(), FLOW_YAML_FILE_2))
+        .thenReturn(1);
+    when(this.testUtil.getProjectLoader()
+        .getUploadedFlowFile(eq(this.project.getId()), eq(this.project.getVersion()),
+            eq(FLOW_YAML_FILE_2),
+            eq(1), any(File.class)))
+        .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE_2));
+
+    final HashMap<String, String> flowProps = new HashMap<>();
+    final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME_2, flowProps);
+    final ExecutableFlow flow = runner.getExecutableFlow();
+
+    FlowRunnerTestUtil.startThread(runner);
+
+    assertStatus(flow, "jobA", Status.RUNNING);
+    final Props generatedProperties = new Props();
+    generatedProperties.put("key1", "value1");
+    generatedProperties.put("key2", "value2");
+    InteractiveTestJob.getTestJob("jobA").succeedJob(generatedProperties);
+    assertStatus(flow, "jobA", Status.SUCCEEDED);
+    assertStatus(flow, "jobB", Status.SUCCEEDED);
+    assertStatus(flow, "jobC", Status.CANCELLED);
+    assertStatus(flow, "jobD", Status.CANCELLED);
+    assertFlowStatus(flow, Status.KILLED);
+  }
+}
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow.project b/test/execution-test-data/conditionalflowyamltest/conditional_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow1.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow1.flow
new file mode 100644
index 0000000..84f5eed
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow1.flow
@@ -0,0 +1,41 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: jobD
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: ${jobA:azkaban.server.name} == 'foo'
+
+    dependsOn:
+      - jobB
+      - jobC
+
+  - name: jobA
+    type: test
+    config:
+      fail: false
+      seconds: 0
+
+  - name: jobB
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: ${jobA:azkaban.server.name} == 'foo'
+
+    dependsOn:
+      - jobA
+
+  - name: jobC
+    type: test
+    config:
+      fail: false
+      seconds: 0
+    condition: ${jobA:azkaban.server.name} == 'bar'
+
+    dependsOn:
+      - jobA
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow2.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow2.flow
new file mode 100644
index 0000000..5a7c723
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow2.flow
@@ -0,0 +1,40 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: jobD
+    type: test
+    config:
+      seconds: 0
+      fail: false
+    condition: ${jobA:key2} == 'value2'
+
+    dependsOn:
+      - jobB
+      - jobC
+
+  - name: jobA
+    type: test
+    config:
+      seconds: 0
+
+  - name: jobB
+    type: test
+    config:
+      seconds: 0
+      fail: false
+    condition: ${jobA:key1} == 'value1'
+
+    dependsOn:
+      - jobA
+
+  - name: jobC
+    type: test
+    config:
+      seconds: 0
+      fail: false
+    condition: ${jobA:key3} == 'value3'
+
+    dependsOn:
+      - jobA