Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 037c947..337b0d7 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -17,6 +17,8 @@
package azkaban;
+import java.time.Duration;
+
/**
* Constants used in configuration files or shared among classes.
*
@@ -79,6 +81,11 @@ public class Constants {
// One Schedule's default End Time: 01/01/2050, 00:00:00, UTC
public static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
+ // Default flow trigger max wait time
+ public static final Duration DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME = Duration.ofDays(10);
+
+ public static final Duration MIN_FLOW_TRIGGER_WAIT_TIME = Duration.ofMinutes(1);
+
// The flow exec id for a flow trigger instance which hasn't started a flow yet
public static final int UNASSIGNED_EXEC_ID = -1;
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
index 11dbe8b..e2c9756 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
@@ -17,6 +17,7 @@
package azkaban.project;
+import azkaban.Constants;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -26,15 +27,15 @@ import java.util.Map;
*/
public class FlowTriggerBean {
- private int maxWaitMins;
+ private long maxWaitMins = Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME.toMinutes();
private Map<String, String> schedule;
private List<TriggerDependencyBean> triggerDependencies;
- public int getMaxWaitMins() {
+ public long getMaxWaitMins() {
return this.maxWaitMins;
}
- public void setMaxWaitMins(final int maxWaitMins) {
+ public void setMaxWaitMins(final long maxWaitMins) {
this.maxWaitMins = maxWaitMins;
}
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 0ea5c15..2c224e1 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -20,13 +20,17 @@ package azkaban.project;
import static com.google.common.base.Preconditions.checkArgument;
import azkaban.Constants;
+import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import java.io.File;
import java.io.FileInputStream;
import java.time.Duration;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.quartz.CronExpression;
import org.yaml.snakeyaml.Yaml;
/**
@@ -86,15 +90,99 @@ public class NodeBeanLoader {
}
}
+ private void validateSchedule(final FlowTriggerBean flowTriggerBean) {
+ final Map<String, String> scheduleMap = flowTriggerBean.getSchedule();
+
+ Preconditions.checkNotNull(scheduleMap, "flow trigger schedule must not be null");
+
+ Preconditions.checkArgument(
+ scheduleMap.containsKey(Constants.SCHEDULE_TYPE) && scheduleMap.get(Constants.SCHEDULE_TYPE)
+ .equals(Constants.CRON_SCHEDULE_TYPE), "flow trigger schedule type must be cron");
+
+ Preconditions.checkArgument(scheduleMap.containsKey(Constants.SCHEDULE_VALUE) && CronExpression
+ .isValidExpression(scheduleMap.get(Constants.SCHEDULE_VALUE)),
+ "flow trigger schedule value must be a valid cron expression");
+
+ Preconditions.checkArgument(scheduleMap.size() == 2, "flow trigger schedule must "
+ + "contain type and value only");
+ }
+
+ private void validateFlowTriggerBean(final FlowTriggerBean flowTriggerBean) {
+ // validate max wait mins
+ Preconditions.checkArgument(flowTriggerBean.getMaxWaitMins() >= Constants
+ .MIN_FLOW_TRIGGER_WAIT_TIME.toMinutes(), "max wait min must be longer than " + Constants
+ .MIN_FLOW_TRIGGER_WAIT_TIME + " min ");
+
+ validateSchedule(flowTriggerBean);
+ validateTriggerDependencies(flowTriggerBean.getTriggerDependencies());
+ }
+
+ /**
+ * check uniqueness of dependency.name
+ */
+ private void validateDepNameUniqueness(final List<TriggerDependencyBean> dependencies) {
+ final Set<String> seen = new HashSet<>();
+ for (final TriggerDependencyBean dep : dependencies) {
+ // set.add() returns false when there exists duplicate
+ Preconditions.checkArgument(seen.add(dep.getName()), String.format("duplicate dependency"
+ + ".name %s found, dependency.name should be unique", dep.getName()));
+ }
+ }
+
+ /**
+ * check uniqueness of dependency type and params
+ */
+ private void validateDepDefinitionUniqueness(final List<TriggerDependencyBean> dependencies) {
+ for (int i = 0; i < dependencies.size(); i++) {
+ for (int j = i + 1; j < dependencies.size(); j++) {
+ final boolean duplicateDepDefFound =
+ dependencies.get(i).getType().equals(dependencies.get(j)
+ .getType()) && dependencies.get(i).getParams()
+ .equals(dependencies.get(j).getParams());
+ Preconditions.checkArgument(!duplicateDepDefFound, String.format("duplicate dependency"
+ + "config %s found, dependency config should be unique",
+ dependencies.get(i).getName()));
+ }
+ }
+ }
+
+ /**
+ * validate name and type are present
+ */
+ private void validateNameAndTypeArePresent(final List<TriggerDependencyBean> dependencies) {
+ for (final TriggerDependencyBean dep : dependencies) {
+ Preconditions.checkNotNull(dep.getName(), "dependency name is required");
+ Preconditions.checkNotNull(dep.getType(), "dependency type is required for " + dep.getName());
+ }
+ }
+
+ private void validateTriggerDependencies(final List<TriggerDependencyBean> dependencies) {
+ validateNameAndTypeArePresent(dependencies);
+ validateDepNameUniqueness(dependencies);
+ validateDepDefinitionUniqueness(dependencies);
+ validateDepType(dependencies);
+ }
+
+ private void validateDepType(final List<TriggerDependencyBean> dependencies) {
+ //todo chengren311: validate dependencies are of valid dependency type
+ }
+
public FlowTrigger toFlowTrigger(final FlowTriggerBean flowTriggerBean) {
- // Todo jamiesjc: need to validate flowTriggerBean
- return flowTriggerBean == null ? null
- : new FlowTrigger(
- new CronSchedule(flowTriggerBean.getSchedule().get(Constants.SCHEDULE_VALUE)),
- flowTriggerBean.getTriggerDependencies().stream()
- .map(d -> new FlowTriggerDependency(d.getName(), d.getType(), d.getParams()))
- .collect(Collectors.toList()),
- Duration.ofMinutes(flowTriggerBean.getMaxWaitMins()));
+ if (flowTriggerBean == null) {
+ return null;
+ } else {
+ validateFlowTriggerBean(flowTriggerBean);
+ if (flowTriggerBean.getMaxWaitMins() > Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME
+ .toMinutes()) {
+ flowTriggerBean.setMaxWaitMins(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME.toMinutes());
+ }
+ return new FlowTrigger(
+ new CronSchedule(flowTriggerBean.getSchedule().get(Constants.SCHEDULE_VALUE)),
+ flowTriggerBean.getTriggerDependencies().stream()
+ .map(d -> new FlowTriggerDependency(d.getName(), d.getType(), d.getParams()))
+ .collect(Collectors.toList()),
+ Duration.ofMinutes(flowTriggerBean.getMaxWaitMins()));
+ }
}
public String getFlowName(final File flowFile) {
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index b999e35..00873b1 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -18,6 +18,7 @@
package azkaban.project;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import azkaban.Constants;
import azkaban.test.executions.ExecutionsTestUtil;
@@ -203,6 +204,74 @@ public class NodeBeanLoaderTest {
}
@Test
+ public void testFlowTriggerMaxWaitMinValidation() throws Exception {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+
+ NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_max_wait_min.flow"));
+ FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
+ assertThat(flowTrigger.getMaxWaitDuration())
+ .isEqualTo(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME);
+
+ nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_large_max_wait_min.flow"));
+ flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
+ assertThat(flowTrigger.getMaxWaitDuration())
+ .isEqualTo(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME);
+
+ final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_zero_max_wait_min.flow"));
+
+ assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testFlowTriggerDepDuplicationValidation() throws Exception {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_duplicate_dep_props.flow"));
+
+ assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testFlowTriggerRequireDepNameAndType() throws Exception {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_without_dep_name.flow"));
+
+ assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+ .isInstanceOf(NullPointerException.class);
+
+ final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_without_dep_type.flow"));
+
+ assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
+ .isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ public void testFlowTriggerScheduleValidation() throws Exception {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_invalid_cron_expression.flow"));
+
+ assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+ .isInstanceOf(IllegalArgumentException.class);
+
+ final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_schedule.flow"));
+
+ assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
+ .isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
public void testToAzkabanFlowWithFlowTrigger() throws Exception {
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
@@ -378,3 +447,4 @@ public class NodeBeanLoaderTest {
assertThat(flowTrigger.getDependencyByName(name).getProps()).isEqualTo(params);
}
}
+
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_duplicate_dep_props.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_duplicate_dep_props.flow
new file mode 100644
index 0000000..455f1b5
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_duplicate_dep_props.flow
@@ -0,0 +1,60 @@
+---
+# Flow trigger
+trigger:
+ maxWaitMins: 5
+ schedule:
+ type: cron
+ value: 0 0 1 ? * *
+
+ triggerDependencies:
+ - name: search-impression # an unique name to identify the dependency
+ type: dali-dataset
+ params:
+ view: search_mp_versioned.search_impression_event_0_0_47
+ delay: 1
+ window: 1
+ unit: daily
+ filter: is_guest=0
+
+ - name: other-name
+ type: dali-dataset
+ params:
+ delay: 1
+ window: 1
+ view: search_mp_versioned.search_impression_event_0_0_47
+ unit: daily
+ filter: is_guest=0
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_invalid_cron_expression.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_invalid_cron_expression.flow
new file mode 100644
index 0000000..be4e612
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_invalid_cron_expression.flow
@@ -0,0 +1,60 @@
+---
+# Flow trigger
+trigger:
+ maxWaitMins: 5
+ schedule:
+ type: cron
+ value: 0 0 3 ? * * * *
+
+ triggerDependencies:
+ - name: search-impression # an unique name to identify the dependency
+ type: dali-dataset
+ params:
+ view: search_mp_versioned.search_impression_event_0_0_47
+ delay: 1
+ window: 1
+ unit: daily
+ filter: is_guest=0
+
+ - name: other-name
+ type: dali-dataset
+ params:
+ delay: 1
+ window: 1
+ view: search_mp_versioned.search_impression_event_0_0_47
+ unit: daily
+ filter: is_guest=0
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_large_max_wait_min.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_large_max_wait_min.flow
new file mode 100644
index 0000000..d77b021
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_large_max_wait_min.flow
@@ -0,0 +1,58 @@
+---
+# Flow trigger
+trigger:
+ maxWaitMins: 9999999999999
+ schedule:
+ type: cron
+ value: 0 0 1 ? * *
+
+ triggerDependencies:
+ - name: search-impression # an unique name to identify the dependency
+ type: dali-dataset
+ params:
+ view: search_mp_versioned.search_impression_event_0_0_47
+ delay: 1
+ window: 1
+ unit: daily
+ filter: is_guest=0
+
+ - name: other-name
+ type: dali-dataset
+ params:
+ view: another dataset
+ delay: 1
+ window: 7
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min.flow
new file mode 100644
index 0000000..2bcd326
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min.flow
@@ -0,0 +1,57 @@
+---
+# Flow trigger
+trigger:
+ schedule:
+ type: cron
+ value: 0 0 1 ? * *
+
+ triggerDependencies:
+ - name: search-impression # an unique name to identify the dependency
+ type: dali-dataset
+ params:
+ view: search_mp_versioned.search_impression_event_0_0_47
+ delay: 1
+ window: 1
+ unit: daily
+ filter: is_guest=0
+
+ - name: other-name
+ type: dali-dataset
+ params:
+ view: another dataset
+ delay: 1
+ window: 7
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_schedule.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_schedule.flow
new file mode 100644
index 0000000..22a27a7
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_schedule.flow
@@ -0,0 +1,57 @@
+---
+# Flow trigger
+trigger:
+ maxWaitMins: 5
+
+ triggerDependencies:
+ - name: search-impression # an unique name to identify the dependency
+ type: dali-dataset
+ params:
+ view: search_mp_versioned.search_impression_event_0_0_47
+ delay: 1
+ window: 1
+ unit: daily
+ filter: is_guest=0
+
+ - name: other-name
+ type: dali-dataset
+ params:
+ delay: 1
+ window: 1
+ view: search_mp_versioned.search_impression_event_0_0_47
+ unit: daily
+ filter: is_guest=0
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_name.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_name.flow
new file mode 100644
index 0000000..47f3db1
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_name.flow
@@ -0,0 +1,59 @@
+---
+# Flow trigger
+trigger:
+ maxWaitMins: 5
+ schedule:
+ type: cron
+ value: 0 0 1 ? * *
+
+ triggerDependencies:
+ - type: dali-dataset
+ params:
+ view: search_mp_versioned.search_impression_event_0_0_47
+ delay: 1
+ window: 1
+ unit: daily
+ filter: is_guest=0
+
+ - name: other-name
+ type: dali-dataset
+ params:
+ delay: 1
+ window: 1
+ view: search_mp_versioned.search_impression_event_0_0_47
+ unit: daily
+ filter: is_guest=0
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_type.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_type.flow
new file mode 100644
index 0000000..fb9f965
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_type.flow
@@ -0,0 +1,50 @@
+---
+# Flow trigger
+trigger:
+ maxWaitMins: 5
+ schedule:
+ type: cron
+ value: 0 0 1 ? * *
+
+ triggerDependencies:
+ - name: other-name
+ params:
+ delay: 1
+ window: 1
+ view: search_mp_versioned.search_impression_event_0_0_47
+ unit: daily
+ filter: is_guest=0
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_zero_max_wait_min.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_zero_max_wait_min.flow
new file mode 100644
index 0000000..05ca14a
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_zero_max_wait_min.flow
@@ -0,0 +1,58 @@
+---
+# Flow trigger
+trigger:
+ maxWaitMins: 0
+ schedule:
+ type: cron
+ value: 0 0 1 ? * *
+
+ triggerDependencies:
+ - name: search-impression # an unique name to identify the dependency
+ type: dali-dataset
+ params:
+ view: search_mp_versioned.search_impression_event_0_0_47
+ delay: 1
+ window: 1
+ unit: daily
+ filter: is_guest=0
+
+ - name: other-name
+ type: dali-dataset
+ params:
+ view: another dataset
+ delay: 1
+ window: 7
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd