azkaban-aplcache

validate flow trigger definition when parsing the flow trigger

2/6/2018 5:05:25 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 037c947..337b0d7 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -17,6 +17,8 @@
 
 package azkaban;
 
+import java.time.Duration;
+
 /**
  * Constants used in configuration files or shared among classes.
  *
@@ -79,6 +81,11 @@ public class Constants {
   // One Schedule's default End Time: 01/01/2050, 00:00:00, UTC
   public static final long DEFAULT_SCHEDULE_END_EPOCH_TIME = 2524608000000L;
 
+  // Default flow trigger max wait time
+  public static final Duration DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME = Duration.ofDays(10);
+
+  public static final Duration MIN_FLOW_TRIGGER_WAIT_TIME = Duration.ofMinutes(1);
+
   // The flow exec id for a flow trigger instance which hasn't started a flow yet
   public static final int UNASSIGNED_EXEC_ID = -1;
 
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
index 11dbe8b..e2c9756 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
@@ -17,6 +17,7 @@
 
 package azkaban.project;
 
+import azkaban.Constants;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -26,15 +27,15 @@ import java.util.Map;
  */
 public class FlowTriggerBean {
 
-  private int maxWaitMins;
+  private long maxWaitMins = Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME.toMinutes();
   private Map<String, String> schedule;
   private List<TriggerDependencyBean> triggerDependencies;
 
-  public int getMaxWaitMins() {
+  public long getMaxWaitMins() {
     return this.maxWaitMins;
   }
 
-  public void setMaxWaitMins(final int maxWaitMins) {
+  public void setMaxWaitMins(final long maxWaitMins) {
     this.maxWaitMins = maxWaitMins;
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 0ea5c15..2c224e1 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -20,13 +20,17 @@ package azkaban.project;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import azkaban.Constants;
+import com.google.common.base.Preconditions;
 import com.google.common.io.Files;
 import java.io.File;
 import java.io.FileInputStream;
 import java.time.Duration;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.quartz.CronExpression;
 import org.yaml.snakeyaml.Yaml;
 
 /**
@@ -86,15 +90,99 @@ public class NodeBeanLoader {
     }
   }
 
+  private void validateSchedule(final FlowTriggerBean flowTriggerBean) {
+    final Map<String, String> scheduleMap = flowTriggerBean.getSchedule();
+
+    Preconditions.checkNotNull(scheduleMap, "flow trigger schedule must not be null");
+
+    Preconditions.checkArgument(
+        scheduleMap.containsKey(Constants.SCHEDULE_TYPE) && scheduleMap.get(Constants.SCHEDULE_TYPE)
+            .equals(Constants.CRON_SCHEDULE_TYPE), "flow trigger schedule type must be cron");
+
+    Preconditions.checkArgument(scheduleMap.containsKey(Constants.SCHEDULE_VALUE) && CronExpression
+            .isValidExpression(scheduleMap.get(Constants.SCHEDULE_VALUE)),
+        "flow trigger schedule value must be a valid cron expression");
+
+    Preconditions.checkArgument(scheduleMap.size() == 2, "flow trigger schedule must "
+        + "contain type and value only");
+  }
+
+  private void validateFlowTriggerBean(final FlowTriggerBean flowTriggerBean) {
+    // validate max wait mins
+    Preconditions.checkArgument(flowTriggerBean.getMaxWaitMins() >= Constants
+        .MIN_FLOW_TRIGGER_WAIT_TIME.toMinutes(), "max wait min must be longer than " + Constants
+        .MIN_FLOW_TRIGGER_WAIT_TIME + " min ");
+
+    validateSchedule(flowTriggerBean);
+    validateTriggerDependencies(flowTriggerBean.getTriggerDependencies());
+  }
+
+  /**
+   * check uniqueness of dependency.name
+   */
+  private void validateDepNameUniqueness(final List<TriggerDependencyBean> dependencies) {
+    final Set<String> seen = new HashSet<>();
+    for (final TriggerDependencyBean dep : dependencies) {
+      // set.add() returns false when there exists duplicate
+      Preconditions.checkArgument(seen.add(dep.getName()), String.format("duplicate dependency"
+          + ".name %s found, dependency.name should be unique", dep.getName()));
+    }
+  }
+
+  /**
+   * check uniqueness of dependency type and params
+   */
+  private void validateDepDefinitionUniqueness(final List<TriggerDependencyBean> dependencies) {
+    for (int i = 0; i < dependencies.size(); i++) {
+      for (int j = i + 1; j < dependencies.size(); j++) {
+        final boolean duplicateDepDefFound =
+            dependencies.get(i).getType().equals(dependencies.get(j)
+                .getType()) && dependencies.get(i).getParams()
+                .equals(dependencies.get(j).getParams());
+        Preconditions.checkArgument(!duplicateDepDefFound, String.format("duplicate dependency"
+                + "config %s found, dependency config should be unique",
+            dependencies.get(i).getName()));
+      }
+    }
+  }
+
+  /**
+   * validate name and type are present
+   */
+  private void validateNameAndTypeArePresent(final List<TriggerDependencyBean> dependencies) {
+    for (final TriggerDependencyBean dep : dependencies) {
+      Preconditions.checkNotNull(dep.getName(), "dependency name is required");
+      Preconditions.checkNotNull(dep.getType(), "dependency type is required for " + dep.getName());
+    }
+  }
+
+  private void validateTriggerDependencies(final List<TriggerDependencyBean> dependencies) {
+    validateNameAndTypeArePresent(dependencies);
+    validateDepNameUniqueness(dependencies);
+    validateDepDefinitionUniqueness(dependencies);
+    validateDepType(dependencies);
+  }
+
+  private void validateDepType(final List<TriggerDependencyBean> dependencies) {
+    //todo chengren311: validate dependencies are of valid dependency type
+  }
+
   public FlowTrigger toFlowTrigger(final FlowTriggerBean flowTriggerBean) {
-    // Todo jamiesjc: need to validate flowTriggerBean
-    return flowTriggerBean == null ? null
-        : new FlowTrigger(
-            new CronSchedule(flowTriggerBean.getSchedule().get(Constants.SCHEDULE_VALUE)),
-            flowTriggerBean.getTriggerDependencies().stream()
-                .map(d -> new FlowTriggerDependency(d.getName(), d.getType(), d.getParams()))
-                .collect(Collectors.toList()),
-            Duration.ofMinutes(flowTriggerBean.getMaxWaitMins()));
+    if (flowTriggerBean == null) {
+      return null;
+    } else {
+      validateFlowTriggerBean(flowTriggerBean);
+      if (flowTriggerBean.getMaxWaitMins() > Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME
+          .toMinutes()) {
+        flowTriggerBean.setMaxWaitMins(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME.toMinutes());
+      }
+      return new FlowTrigger(
+          new CronSchedule(flowTriggerBean.getSchedule().get(Constants.SCHEDULE_VALUE)),
+          flowTriggerBean.getTriggerDependencies().stream()
+              .map(d -> new FlowTriggerDependency(d.getName(), d.getType(), d.getParams()))
+              .collect(Collectors.toList()),
+          Duration.ofMinutes(flowTriggerBean.getMaxWaitMins()));
+    }
   }
 
   public String getFlowName(final File flowFile) {
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index b999e35..00873b1 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -18,6 +18,7 @@
 package azkaban.project;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import azkaban.Constants;
 import azkaban.test.executions.ExecutionsTestUtil;
@@ -203,6 +204,74 @@ public class NodeBeanLoaderTest {
   }
 
   @Test
+  public void testFlowTriggerMaxWaitMinValidation() throws Exception {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+
+    NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_max_wait_min.flow"));
+    FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
+    assertThat(flowTrigger.getMaxWaitDuration())
+        .isEqualTo(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME);
+
+    nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_large_max_wait_min.flow"));
+    flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
+    assertThat(flowTrigger.getMaxWaitDuration())
+        .isEqualTo(Constants.DEFAULT_FLOW_TRIGGER_MAX_WAIT_TIME);
+
+    final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_zero_max_wait_min.flow"));
+
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  public void testFlowTriggerDepDuplicationValidation() throws Exception {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_duplicate_dep_props.flow"));
+
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  public void testFlowTriggerRequireDepNameAndType() throws Exception {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_without_dep_name.flow"));
+
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+        .isInstanceOf(NullPointerException.class);
+
+    final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_without_dep_type.flow"));
+
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
+        .isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void testFlowTriggerScheduleValidation() throws Exception {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_invalid_cron_expression.flow"));
+
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean.getTrigger()))
+        .isInstanceOf(IllegalArgumentException.class);
+
+    final NodeBean nodeBean2 = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, "flow_trigger_no_schedule.flow"));
+
+    assertThatThrownBy(() -> loader.toFlowTrigger(nodeBean2.getTrigger()))
+        .isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
   public void testToAzkabanFlowWithFlowTrigger() throws Exception {
     final NodeBeanLoader loader = new NodeBeanLoader();
     final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
@@ -378,3 +447,4 @@ public class NodeBeanLoaderTest {
     assertThat(flowTrigger.getDependencyByName(name).getProps()).isEqualTo(params);
   }
 }
+
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_duplicate_dep_props.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_duplicate_dep_props.flow
new file mode 100644
index 0000000..455f1b5
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_duplicate_dep_props.flow
@@ -0,0 +1,60 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 5
+  schedule:
+    type: cron
+    value: 0 0 1 ? * *
+
+  triggerDependencies:
+    - name: search-impression # an unique name to identify the dependency
+      type: dali-dataset
+      params:
+        view: search_mp_versioned.search_impression_event_0_0_47
+        delay: 1
+        window: 1
+        unit: daily
+        filter: is_guest=0
+
+    - name: other-name
+      type: dali-dataset
+      params:
+        delay: 1
+        window: 1
+        view: search_mp_versioned.search_impression_event_0_0_47
+        unit: daily
+        filter: is_guest=0
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_invalid_cron_expression.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_invalid_cron_expression.flow
new file mode 100644
index 0000000..be4e612
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_invalid_cron_expression.flow
@@ -0,0 +1,60 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 5
+  schedule:
+    type: cron
+    value: 0 0 3 ? * * * *
+
+  triggerDependencies:
+    - name: search-impression # an unique name to identify the dependency
+      type: dali-dataset
+      params:
+        view: search_mp_versioned.search_impression_event_0_0_47
+        delay: 1
+        window: 1
+        unit: daily
+        filter: is_guest=0
+
+    - name: other-name
+      type: dali-dataset
+      params:
+        delay: 1
+        window: 1
+        view: search_mp_versioned.search_impression_event_0_0_47
+        unit: daily
+        filter: is_guest=0
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_large_max_wait_min.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_large_max_wait_min.flow
new file mode 100644
index 0000000..d77b021
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_large_max_wait_min.flow
@@ -0,0 +1,58 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 9999999999999
+  schedule:
+    type: cron
+    value: 0 0 1 ? * *
+
+  triggerDependencies:
+    - name: search-impression # an unique name to identify the dependency
+      type: dali-dataset
+      params:
+        view: search_mp_versioned.search_impression_event_0_0_47
+        delay: 1
+        window: 1
+        unit: daily
+        filter: is_guest=0
+
+    - name: other-name
+      type: dali-dataset
+      params:
+        view: another dataset
+        delay: 1
+        window: 7
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min.flow
new file mode 100644
index 0000000..2bcd326
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_max_wait_min.flow
@@ -0,0 +1,57 @@
+---
+# Flow trigger
+trigger:
+  schedule:
+    type: cron
+    value: 0 0 1 ? * *
+
+  triggerDependencies:
+    - name: search-impression # an unique name to identify the dependency
+      type: dali-dataset
+      params:
+        view: search_mp_versioned.search_impression_event_0_0_47
+        delay: 1
+        window: 1
+        unit: daily
+        filter: is_guest=0
+
+    - name: other-name
+      type: dali-dataset
+      params:
+        view: another dataset
+        delay: 1
+        window: 7
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_schedule.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_schedule.flow
new file mode 100644
index 0000000..22a27a7
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_no_schedule.flow
@@ -0,0 +1,57 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 5
+
+  triggerDependencies:
+    - name: search-impression # an unique name to identify the dependency
+      type: dali-dataset
+      params:
+        view: search_mp_versioned.search_impression_event_0_0_47
+        delay: 1
+        window: 1
+        unit: daily
+        filter: is_guest=0
+
+    - name: other-name
+      type: dali-dataset
+      params:
+        delay: 1
+        window: 1
+        view: search_mp_versioned.search_impression_event_0_0_47
+        unit: daily
+        filter: is_guest=0
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_name.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_name.flow
new file mode 100644
index 0000000..47f3db1
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_name.flow
@@ -0,0 +1,59 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 5
+  schedule:
+    type: cron
+    value: 0 0 1 ? * *
+
+  triggerDependencies:
+    - type: dali-dataset
+      params:
+        view: search_mp_versioned.search_impression_event_0_0_47
+        delay: 1
+        window: 1
+        unit: daily
+        filter: is_guest=0
+
+    - name: other-name
+      type: dali-dataset
+      params:
+        delay: 1
+        window: 1
+        view: search_mp_versioned.search_impression_event_0_0_47
+        unit: daily
+        filter: is_guest=0
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_type.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_type.flow
new file mode 100644
index 0000000..fb9f965
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_without_dep_type.flow
@@ -0,0 +1,50 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 5
+  schedule:
+    type: cron
+    value: 0 0 1 ? * *
+
+  triggerDependencies:
+    - name: other-name
+      params:
+        delay: 1
+        window: 1
+        view: search_mp_versioned.search_impression_event_0_0_47
+        unit: daily
+        filter: is_guest=0
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger_zero_max_wait_min.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger_zero_max_wait_min.flow
new file mode 100644
index 0000000..05ca14a
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger_zero_max_wait_min.flow
@@ -0,0 +1,58 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 0
+  schedule:
+    type: cron
+    value: 0 0 1 ? * *
+
+  triggerDependencies:
+    - name: search-impression # an unique name to identify the dependency
+      type: dali-dataset
+      params:
+        view: search_mp_versioned.search_impression_event_0_0_47
+        delay: 1
+        window: 1
+        unit: daily
+        filter: is_guest=0
+
+    - name: other-name
+      type: dali-dataset
+      params:
+        view: another dataset
+        delay: 1
+        window: 7
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd