Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
index e18528e..6db3fb8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutableNode.java
@@ -43,6 +43,7 @@ public class ExecutableNode {
public static final String INNODES_PARAM = "inNodes";
public static final String OUTNODES_PARAM = "outNodes";
public static final String TYPE_PARAM = "type";
+ public static final String CONDITION_PARAM = "condition";
public static final String PROPS_SOURCE_PARAM = "propSource";
public static final String JOB_SOURCE_PARAM = "jobSource";
public static final String OUTPUT_PROPS_PARAM = "outputProps";
@@ -66,6 +67,7 @@ public class ExecutableNode {
private Props outputProps;
private long delayExecution = 0;
private ArrayList<ExecutionAttempt> pastAttempts = null;
+ private String condition;
// Transient. These values aren't saved, but rediscovered.
private ExecutableFlowBase parentFlow;
@@ -77,16 +79,18 @@ public class ExecutableNode {
}
public ExecutableNode(final Node node, final ExecutableFlowBase parent) {
- this(node.getId(), node.getType(), node.getJobSource(), node
+ this(node.getId(), node.getType(), node.getCondition(), node.getJobSource(), node
.getPropsSource(), parent);
}
- public ExecutableNode(final String id, final String type, final String jobSource,
+ public ExecutableNode(final String id, final String type, final String condition, final String
+ jobSource,
final String propsSource, final ExecutableFlowBase parent) {
this.id = id;
this.jobSource = jobSource;
this.propsSource = propsSource;
this.type = type;
+ this.condition = condition;
setParentFlow(parent);
}
@@ -284,6 +288,7 @@ public class ExecutableNode {
objMap.put(ENDTIME_PARAM, this.endTime);
objMap.put(UPDATETIME_PARAM, this.updateTime);
objMap.put(TYPE_PARAM, this.type);
+ objMap.put(CONDITION_PARAM, this.condition);
objMap.put(ATTEMPT_PARAM, this.attempt);
if (this.inNodes != null && !this.inNodes.isEmpty()) {
@@ -318,6 +323,7 @@ public class ExecutableNode {
final TypedMapWrapper<String, Object> wrappedMap) {
this.id = wrappedMap.getString(ID_PARAM);
this.type = wrappedMap.getString(TYPE_PARAM);
+ this.condition = wrappedMap.getString(CONDITION_PARAM);
this.status = Status.valueOf(wrappedMap.getString(STATUS_PARAM));
this.startTime = wrappedMap.getLong(STARTTIME_PARAM);
this.endTime = wrappedMap.getLong(ENDTIME_PARAM);
@@ -454,4 +460,12 @@ public class ExecutableNode {
public long getRetryBackoff() {
return this.inputProps.getLong("retry.backoff", 0);
}
+
+ public String getCondition() {
+ return this.condition;
+ }
+
+ public void setCondition(final String condition) {
+ this.condition = condition;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/flow/Node.java b/azkaban-common/src/main/java/azkaban/flow/Node.java
index 7ea700a..71b68a3 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Node.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Node.java
@@ -34,6 +34,8 @@ public class Node {
private String embeddedFlowId;
+ private String condition = null;
+
public Node(final String id) {
this.id = id;
}
@@ -57,11 +59,13 @@ public class Node {
final String jobType = (String) mapObj.get("jobType");
final String embeddedFlowId = (String) mapObj.get("embeddedFlowId");
+ final String condition = (String) mapObj.get("condition");
node.setJobSource(jobSource);
node.setPropsSource(propSource);
node.setType(jobType);
node.setEmbeddedFlowId(embeddedFlowId);
+ node.setCondition(condition);
final Integer expectedRuntime = (Integer) mapObj.get("expectedRuntime");
if (expectedRuntime != null) {
@@ -175,7 +179,16 @@ public class Node {
}
layoutInfo.put("level", this.level);
objMap.put("layout", layoutInfo);
+ objMap.put("condition", this.condition);
return objMap;
}
+
+ public String getCondition() {
+ return this.condition;
+ }
+
+ public void setCondition(final String condition) {
+ this.condition = condition;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
index 477ebb3..62a6c32 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -37,9 +37,10 @@ public class AzkabanFlow extends AzkabanNode {
private final Map<String, AzkabanNode> nodes;
private final FlowTrigger flowTrigger;
- private AzkabanFlow(final String name, final Props props, final Map<String, AzkabanNode> nodes,
- final List<String> dependsOn, final FlowTrigger flowTrigger) {
- super(name, Constants.FLOW_NODE_TYPE, props, dependsOn);
+ private AzkabanFlow(final String name, final Props props, final String condition,
+ final Map<String, AzkabanNode> nodes, final List<String> dependsOn,
+ final FlowTrigger flowTrigger) {
+ super(name, Constants.FLOW_NODE_TYPE, props, condition, dependsOn);
this.nodes = nodes;
this.flowTrigger = flowTrigger;
}
@@ -60,6 +61,7 @@ public class AzkabanFlow extends AzkabanNode {
private String name;
private Props props;
+ private String condition;
private List<String> dependsOn;
private Map<String, AzkabanNode> nodes;
private FlowTrigger flowTrigger;
@@ -74,6 +76,11 @@ public class AzkabanFlow extends AzkabanNode {
return this;
}
+ public AzkabanFlowBuilder condition(final String condition) {
+ this.condition = condition;
+ return this;
+ }
+
public AzkabanFlowBuilder dependsOn(final List<String> dependsOn) {
this.dependsOn = dependsOn == null
? Collections.emptyList()
@@ -96,7 +103,8 @@ public class AzkabanFlow extends AzkabanNode {
}
public AzkabanFlow build() {
- return new AzkabanFlow(this.name, this.props, this.nodes, this.dependsOn, this.flowTrigger);
+ return new AzkabanFlow(this.name, this.props, this.condition, this.nodes, this.dependsOn, this
+ .flowTrigger);
}
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
index 128e93f..ff2e795 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
@@ -29,8 +29,8 @@ import java.util.List;
public class AzkabanJob extends AzkabanNode {
private AzkabanJob(final String name, final String type, final Props props,
- final List<String> dependsOn) {
- super(name, type, props, dependsOn);
+ final String condition, final List<String> dependsOn) {
+ super(name, type, props, condition, dependsOn);
}
public static class AzkabanJobBuilder {
@@ -38,6 +38,7 @@ public class AzkabanJob extends AzkabanNode {
private String name;
private String type;
private Props props;
+ private String condition;
private List<String> dependsOn;
public AzkabanJobBuilder name(final String name) {
@@ -55,6 +56,11 @@ public class AzkabanJob extends AzkabanNode {
return this;
}
+ public AzkabanJobBuilder condition(final String condition) {
+ this.condition = condition;
+ return this;
+ }
+
public AzkabanJobBuilder dependsOn(final List<String> dependsOn) {
// A node may or may not have dependencies.
this.dependsOn = dependsOn == null
@@ -64,7 +70,7 @@ public class AzkabanJob extends AzkabanNode {
}
public AzkabanJob build() {
- return new AzkabanJob(this.name, this.type, this.props, this.dependsOn);
+ return new AzkabanJob(this.name, this.type, this.props, this.condition, this.dependsOn);
}
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
index 079cf43..8cfc289 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
@@ -30,13 +30,16 @@ public abstract class AzkabanNode {
protected final String name;
protected final String type;
protected final Props props;
+ protected final String condition;
protected final List<String> dependsOn;
- public AzkabanNode(final String name, final String type, final Props props, final List<String>
+ public AzkabanNode(final String name, final String type, final Props props, final String
+ condition, final List<String>
dependsOn) {
this.name = requireNonNull(name);
this.type = requireNonNull(type);
this.props = requireNonNull(props);
+ this.condition = condition;
this.dependsOn = dependsOn;
}
@@ -52,6 +55,10 @@ public abstract class AzkabanNode {
return this.props;
}
+ public String getCondition() {
+ return this.condition;
+ }
+
public List<String> getDependsOn() {
return this.dependsOn;
}
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 91bc74b..f4c6765 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -163,6 +163,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
final File flowFile) {
final Node node = new Node(azkabanNode.getName());
node.setType(azkabanNode.getType());
+ node.setCondition(azkabanNode.getCondition());
node.setPropsSource(flowFile.getName());
node.setJobSource(flowFile.getName());
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBean.java b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
index 370a8b9..f518e35 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
@@ -32,6 +32,7 @@ public class NodeBean implements Serializable {
private Map<String, String> config;
private List<String> dependsOn;
private String type;
+ private String condition;
private List<NodeBean> nodes;
private FlowTriggerBean trigger;
@@ -67,6 +68,14 @@ public class NodeBean implements Serializable {
this.type = type;
}
+ public String getCondition() {
+ return this.condition;
+ }
+
+ public void setCondition(final String condition) {
+ this.condition = condition;
+ }
+
public List<NodeBean> getNodes() {
return this.nodes;
}
@@ -96,6 +105,7 @@ public class NodeBean implements Serializable {
", config=" + this.config +
", dependsOn=" + this.dependsOn +
", type='" + this.type + '\'' +
+ ", condition='" + this.condition + '\'' +
", nodes=" + this.nodes +
", trigger=" + this.trigger +
'}';
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 0375f64..323a68f 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -77,6 +77,7 @@ public class NodeBeanLoader {
return new AzkabanFlow.AzkabanFlowBuilder()
.name(nodeBean.getName())
.props(nodeBean.getProps())
+ .condition(nodeBean.getCondition())
.dependsOn(nodeBean.getDependsOn())
.nodes(nodeBean.getNodes().stream().map(this::toAzkabanNode).collect(Collectors.toList()))
.flowTrigger(toFlowTrigger(nodeBean.getTrigger()))
@@ -85,6 +86,7 @@ public class NodeBeanLoader {
return new AzkabanJob.AzkabanJobBuilder()
.name(nodeBean.getName())
.props(nodeBean.getProps())
+ .condition(nodeBean.getCondition())
.type(nodeBean.getType())
.dependsOn(nodeBean.getDependsOn())
.build();
diff --git a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
index d886ffd..8f85c7b 100644
--- a/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
+++ b/azkaban-common/src/test/java/azkaban/executor/InteractiveTestJob.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.AbstractProcessJob;
import azkaban.utils.Props;
+import java.io.File;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
@@ -82,6 +83,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
@Override
public void run() throws Exception {
+ final File[] propFiles = initPropsFiles();
final String nestedFlowPath =
this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
final String jobIdPrefix = this.getJobProps().getString(JOB_ID_PREFIX, null);
@@ -100,6 +102,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
if (this.jobProps.getBoolean("fail", false)) {
final int passRetry = this.jobProps.getInt("passRetry", -1);
if (passRetry > 0 && passRetry < this.jobProps.getInt(JOB_ATTEMPT)) {
+ generateProperties(propFiles[1]);
succeedJob();
} else {
failJob();
@@ -119,6 +122,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
}
}
if (this.jobProps.containsKey("fail")) {
+ generateProperties(propFiles[1]);
succeedJob();
}
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 978b4ea..1e0eb37 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -42,10 +42,13 @@ public class DirectoryYamlFlowLoaderTest {
private static final String DUPLICATE_NODENAME_YAML_DIR = "duplicatenodenamesyamltest";
private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
+ private static final String CONDITIONAL_FLOW_YAML_DIR = "conditionalflowyamltest";
private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
private static final String BASIC_FLOW_1 = "basic_flow";
private static final String BASIC_FLOW_2 = "basic_flow2";
private static final String EMBEDDED_FLOW = "embedded_flow";
+ private static final String CONDITIONAL_FLOW_1 = "conditional_flow1";
+ private static final String CONDITIONAL_FLOW_2 = "conditional_flow2";
private static final String EMBEDDED_FLOW_1 = "embedded_flow" + Constants.PATH_DELIMITER +
"embedded_flow1";
private static final String EMBEDDED_FLOW_2 =
@@ -168,6 +171,16 @@ public class DirectoryYamlFlowLoaderTest {
checkFlowLoaderProperties(loader, 0, 0, 0);
}
+ @Test
+ public void testLoadConditionalWorkflowYamlFile() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project,
+ ExecutionsTestUtil.getFlowDir(CONDITIONAL_FLOW_YAML_DIR));
+ checkFlowLoaderProperties(loader, 0, 2, 2);
+ checkFlowProperties(loader, CONDITIONAL_FLOW_1, 0, 4, 1, 4, null);
+ checkFlowProperties(loader, CONDITIONAL_FLOW_2, 0, 4, 1, 4, null);
+ }
+
private void checkFlowLoaderProperties(final DirectoryYamlFlowLoader loader, final int numError,
final int numFlowMap, final int numEdgeMap) {
assertThat(loader.getErrors().size()).isEqualTo(numError);
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 831a557..54b4342 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -55,6 +55,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Files;
import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -68,6 +69,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
@@ -83,6 +89,9 @@ public class FlowRunner extends EventHandler implements Runnable {
private static final Layout DEFAULT_LAYOUT = new PatternLayout(
"%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
+ // Pattern to match job variables in condition expressions: ${jobName:variable}
+ private static final Pattern VARIABLE_REPLACEMENT_PATTERN = Pattern
+ .compile("\\$\\{([^:{}]+):([^:{}]+)\\}");
// We check update every 5 minutes, just in case things get stuck. But for the
// most part, we'll be idling.
private static final long CHECK_WAIT_MS = 5 * 60 * 1000;
@@ -174,6 +183,21 @@ public class FlowRunner extends EventHandler implements Runnable {
this.azkabanEventReporter = azkabanEventReporter;
}
+ private File findPropsFileForJob(final String prefix) {
+ final File[] files = this.getExecutionDir().listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(final File dir, final String name) {
+ return name.startsWith(prefix) && name.endsWith("tmp");
+ }
+ });
+
+ if (files == null || files.length == 0) {
+ this.logger.info("Not able to find props file for job " + prefix);
+ return null;
+ }
+ return files[0];
+ }
+
public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
this.watcher = watcher;
return this;
@@ -852,6 +876,10 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+ if (!isConditionMet(node)) {
+ return Status.CANCELLED;
+ }
+
// If it's disabled but ready to run, we want to make sure it continues
// being disabled.
if (node.getStatus() == Status.DISABLED
@@ -873,6 +901,64 @@ public class FlowRunner extends EventHandler implements Runnable {
return Status.READY;
}
+ private Boolean isConditionMet(final ExecutableNode node) {
+ final String condition = node.getCondition();
+ if (condition == null) {
+ return true;
+ }
+
+ final Matcher matcher = VARIABLE_REPLACEMENT_PATTERN.matcher(condition);
+ String replaced = condition;
+
+ while (matcher.find()) {
+ final String value = findValueForJobVariable(matcher.group(1), matcher.group(2));
+ if (value != null) {
+ replaced = replaced.replace(matcher.group(), "'" + value + "'");
+ }
+ this.logger.info("Condition is " + replaced);
+ }
+
+ // Evaluate string expression using script engine
+ return evaluateExpression(replaced);
+ }
+
+ private String findValueForJobVariable(final String jobName, final String variable) {
+ // Get props from job props tmp file
+ final File jobPropsFile = findPropsFileForJob(jobName + "_props");
+ if (jobPropsFile != null) {
+ try {
+ final Props jobProps = new Props(null, jobPropsFile);
+ if (jobProps.containsKey(variable)) {
+ return jobProps.get(variable);
+ }
+ } catch (final IOException e) {
+ this.logger.error("Not able to load props from job props file " + jobPropsFile
+ .getAbsolutePath());
+ }
+ }
+
+ // Get job output props
+ final Props outputProps = this.getExecutableFlow().getExecutableNode(jobName).getOutputProps();
+ if (outputProps != null && outputProps.containsKey(variable)) {
+ return outputProps.get(variable);
+ }
+
+ return null;
+ }
+
+ private boolean evaluateExpression(final String expression) {
+ boolean result = false;
+ try {
+ final ScriptEngineManager sem = new ScriptEngineManager();
+ final ScriptEngine se = sem.getEngineByName("JavaScript");
+ result = (boolean) se.eval(expression);
+ this.logger.info("Evaluate expression result: " + result);
+ } catch (final ScriptException e) {
+ this.logger.error("Invalid expression.", e);
+ }
+ return result;
+ }
+
private Props collectOutputProps(final ExecutableNode node) {
Props previousOutput = null;
// Iterate the in nodes again and create the dependencies
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
new file mode 100644
index 0000000..d33fb77
--- /dev/null
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerConditionalFlowTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.InteractiveTestJob;
+import azkaban.executor.Status;
+import azkaban.project.Project;
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
+import java.io.File;
+import java.util.HashMap;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FlowRunnerConditionalFlowTest extends FlowRunnerTestBase {
+
+ private static final String FLOW_YAML_DIR = "conditionalflowyamltest";
+ private static final String FLOW_NAME_1 = "conditional_flow1";
+ private static final String FLOW_NAME_2 = "conditional_flow2";
+ private static final String FLOW_YAML_FILE_1 = FLOW_NAME_1 + ".flow";
+ private static final String FLOW_YAML_FILE_2 = FLOW_NAME_2 + ".flow";
+ private FlowRunnerTestUtil testUtil;
+ private Project project;
+
+ @Before
+ public void setUp() throws Exception {
+ this.testUtil = new FlowRunnerTestUtil(FLOW_YAML_DIR, this.temporaryFolder);
+ this.project = this.testUtil.getProject();
+ }
+
+ @Test
+ public void runFlowOnJobPropsCondition() throws Exception {
+
+ when(this.testUtil.getProjectLoader()
+ .getLatestFlowVersion(this.project.getId(), this.project.getVersion(), FLOW_YAML_FILE_1))
+ .thenReturn(1);
+ when(this.testUtil.getProjectLoader()
+ .getUploadedFlowFile(eq(this.project.getId()), eq(this.project.getVersion()),
+ eq(FLOW_YAML_FILE_1),
+ eq(1), any(File.class)))
+ .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE_1));
+
+ final HashMap<String, String> flowProps = new HashMap<>();
+ flowProps.put("azkaban.server.name", "foo");
+ final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME_1, flowProps);
+ final ExecutableFlow flow = runner.getExecutableFlow();
+
+ FlowRunnerTestUtil.startThread(runner);
+
+ assertStatus(flow, "jobA", Status.SUCCEEDED);
+ assertStatus(flow, "jobB", Status.SUCCEEDED);
+ assertStatus(flow, "jobC", Status.CANCELLED);
+ assertStatus(flow, "jobD", Status.CANCELLED);
+ assertFlowStatus(flow, Status.KILLED);
+ }
+
+ @Test
+ public void runFlowOnJobOutputCondition() throws Exception {
+
+ when(this.testUtil.getProjectLoader()
+ .getLatestFlowVersion(this.project.getId(), this.project.getVersion(), FLOW_YAML_FILE_2))
+ .thenReturn(1);
+ when(this.testUtil.getProjectLoader()
+ .getUploadedFlowFile(eq(this.project.getId()), eq(this.project.getVersion()),
+ eq(FLOW_YAML_FILE_2),
+ eq(1), any(File.class)))
+ .thenReturn(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE_2));
+
+ final HashMap<String, String> flowProps = new HashMap<>();
+ final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME_2, flowProps);
+ final ExecutableFlow flow = runner.getExecutableFlow();
+
+ FlowRunnerTestUtil.startThread(runner);
+
+ assertStatus(flow, "jobA", Status.RUNNING);
+ final Props generatedProperties = new Props();
+ generatedProperties.put("key1", "value1");
+ generatedProperties.put("key2", "value2");
+ InteractiveTestJob.getTestJob("jobA").succeedJob(generatedProperties);
+ assertStatus(flow, "jobA", Status.SUCCEEDED);
+ assertStatus(flow, "jobB", Status.SUCCEEDED);
+ assertStatus(flow, "jobC", Status.CANCELLED);
+ assertStatus(flow, "jobD", Status.CANCELLED);
+ assertFlowStatus(flow, Status.KILLED);
+ }
+}
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow.project b/test/execution-test-data/conditionalflowyamltest/conditional_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow1.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow1.flow
new file mode 100644
index 0000000..84f5eed
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow1.flow
@@ -0,0 +1,41 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: jobD
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: ${jobA:azkaban.server.name} == 'foo'
+
+ dependsOn:
+ - jobB
+ - jobC
+
+ - name: jobA
+ type: test
+ config:
+ fail: false
+ seconds: 0
+
+ - name: jobB
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: ${jobA:azkaban.server.name} == 'foo'
+
+ dependsOn:
+ - jobA
+
+ - name: jobC
+ type: test
+ config:
+ fail: false
+ seconds: 0
+ condition: ${jobA:azkaban.server.name} == 'bar'
+
+ dependsOn:
+ - jobA
diff --git a/test/execution-test-data/conditionalflowyamltest/conditional_flow2.flow b/test/execution-test-data/conditionalflowyamltest/conditional_flow2.flow
new file mode 100644
index 0000000..5a7c723
--- /dev/null
+++ b/test/execution-test-data/conditionalflowyamltest/conditional_flow2.flow
@@ -0,0 +1,40 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: jobD
+ type: test
+ config:
+ seconds: 0
+ fail: false
+ condition: ${jobA:key2} == 'value2'
+
+ dependsOn:
+ - jobB
+ - jobC
+
+ - name: jobA
+ type: test
+ config:
+ seconds: 0
+
+ - name: jobB
+ type: test
+ config:
+ seconds: 0
+ fail: false
+ condition: ${jobA:key1} == 'value1'
+
+ dependsOn:
+ - jobA
+
+ - name: jobC
+ type: test
+ config:
+ seconds: 0
+ fail: false
+ condition: ${jobA:key3} == 'value3'
+
+ dependsOn:
+ - jobA