azkaban-aplcache

Flow 2.0 design - Load flow trigger from YAML file. (#1558) *

11/17/2017 10:34:17 PM

Details

diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 60f46ea..98586ae 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -46,6 +46,11 @@ public class Constants {
   // Flow 2.0 flow and job path delimiter
   public static final String PATH_DELIMITER = ":";
 
+  // Flow trigger props
+  public static final String SCHEDULE_TYPE = "type";
+  public static final String CRON_SCHEDULE_TYPE = "cron";
+  public static final String SCHEDULE_VALUE = "value";
+
   // Names and paths of various file names to configure Azkaban
   public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
   public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
index 997719e..2dba398 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -34,11 +34,13 @@ import java.util.Map;
 public class AzkabanFlow extends AzkabanNode {
 
   private final Map<String, AzkabanNode> nodes;
+  private final FlowTrigger flowTrigger;
 
-  private AzkabanFlow(final String name, final Props props,
-      final Map<String, AzkabanNode> nodes, final List<String> dependsOn) {
+  private AzkabanFlow(final String name, final Props props, final Map<String, AzkabanNode> nodes,
+      final List<String> dependsOn, final FlowTrigger flowTrigger) {
     super(name, Constants.FLOW_NODE_TYPE, props, dependsOn);
     this.nodes = nodes;
+    this.flowTrigger = flowTrigger;
   }
 
   public Map<String, AzkabanNode> getNodes() {
@@ -49,31 +51,36 @@ public class AzkabanFlow extends AzkabanNode {
     return this.nodes.get(name);
   }
 
+  public FlowTrigger getFlowTrigger() {
+    return this.flowTrigger;
+  }
+
   public static class AzkabanFlowBuilder {
 
     private String name;
     private Props props;
     private List<String> dependsOn;
     private Map<String, AzkabanNode> nodes;
+    private FlowTrigger flowTrigger;
 
-    public AzkabanFlowBuilder setName(final String name) {
+    public AzkabanFlowBuilder name(final String name) {
       this.name = name;
       return this;
     }
 
-    public AzkabanFlowBuilder setProps(final Props props) {
+    public AzkabanFlowBuilder props(final Props props) {
       this.props = props;
       return this;
     }
 
-    public AzkabanFlowBuilder setDependsOn(final List<String> dependsOn) {
+    public AzkabanFlowBuilder dependsOn(final List<String> dependsOn) {
       this.dependsOn = dependsOn == null
           ? Collections.emptyList()
           : ImmutableList.copyOf(dependsOn);
       return this;
     }
 
-    public AzkabanFlowBuilder setNodes(final Collection<? extends AzkabanNode> azkabanNodes) {
+    public AzkabanFlowBuilder nodes(final Collection<? extends AzkabanNode> azkabanNodes) {
       final Map<String, AzkabanNode> tempNodes = new HashMap<>();
       for (final AzkabanNode node : azkabanNodes) {
         tempNodes.put(node.getName(), node);
@@ -82,8 +89,13 @@ public class AzkabanFlow extends AzkabanNode {
       return this;
     }
 
+    public AzkabanFlowBuilder flowTrigger(final FlowTrigger flowTrigger) {
+      this.flowTrigger = flowTrigger;
+      return this;
+    }
+
     public AzkabanFlow build() {
-      return new AzkabanFlow(this.name, this.props, this.nodes, this.dependsOn);
+      return new AzkabanFlow(this.name, this.props, this.nodes, this.dependsOn, this.flowTrigger);
     }
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
index 9a60740..128e93f 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
@@ -40,22 +40,22 @@ public class AzkabanJob extends AzkabanNode {
     private Props props;
     private List<String> dependsOn;
 
-    public AzkabanJobBuilder setName(final String name) {
+    public AzkabanJobBuilder name(final String name) {
       this.name = name;
       return this;
     }
 
-    public AzkabanJobBuilder setType(final String type) {
+    public AzkabanJobBuilder type(final String type) {
       this.type = type;
       return this;
     }
 
-    public AzkabanJobBuilder setProps(final Props props) {
+    public AzkabanJobBuilder props(final Props props) {
       this.props = props;
       return this;
     }
 
-    public AzkabanJobBuilder setDependsOn(final List<String> dependsOn) {
+    public AzkabanJobBuilder dependsOn(final List<String> dependsOn) {
       // A node may or may not have dependencies.
       this.dependsOn = dependsOn == null
           ? Collections.emptyList()
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index aa083be..d16c7e4 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -90,6 +90,17 @@ public class FlowLoaderUtils {
     return false;
   }
 
+  public static FlowTrigger getFlowTriggerFromYamlFile(final File flowFile) {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+    try {
+      final NodeBean nodeBean = loader.load(flowFile);
+      return loader.toFlowTrigger(nodeBean.getTrigger());
+    } catch (final FileNotFoundException e) {
+      logger.error("Failed to get flow trigger, error loading flow YAML file " + flowFile);
+    }
+    return null;
+  }
+
   /**
    * Adds email properties to a flow.
    *
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
index 98d5fb1..9a26c73 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
@@ -100,4 +100,7 @@ public class FlowTrigger {
     return this.maxWaitDuration;
   }
 
+  public CronSchedule getSchedule() {
+    return this.schedule;
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
new file mode 100644
index 0000000..11dbe8b
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Java bean loaded from YAML file to represent a flow trigger.
+ */
+public class FlowTriggerBean {
+
+  private int maxWaitMins;
+  private Map<String, String> schedule;
+  private List<TriggerDependencyBean> triggerDependencies;
+
+  public int getMaxWaitMins() {
+    return this.maxWaitMins;
+  }
+
+  public void setMaxWaitMins(final int maxWaitMins) {
+    this.maxWaitMins = maxWaitMins;
+  }
+
+  public Map<String, String> getSchedule() {
+    return this.schedule;
+  }
+
+  public void setSchedule(final Map<String, String> schedule) {
+    this.schedule = schedule;
+  }
+
+  public List<TriggerDependencyBean> getTriggerDependencies() {
+    return this.triggerDependencies == null ? Collections.emptyList() : this.triggerDependencies;
+  }
+
+  public void setTriggerDependencies(
+      final List<TriggerDependencyBean> triggerDependencies) {
+    this.triggerDependencies = triggerDependencies;
+  }
+
+  @Override
+  public String toString() {
+    return "FlowTriggerBean{" +
+        "maxWaitMins='" + this.maxWaitMins + '\'' +
+        ", schedule=" + this.schedule +
+        ", triggerDependencies=" + this.triggerDependencies +
+        '}';
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBean.java b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
index 5cafc37..370a8b9 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
@@ -33,6 +33,7 @@ public class NodeBean implements Serializable {
   private List<String> dependsOn;
   private String type;
   private List<NodeBean> nodes;
+  private FlowTriggerBean trigger;
 
   public String getName() {
     return this.name;
@@ -80,6 +81,14 @@ public class NodeBean implements Serializable {
     return props;
   }
 
+  public FlowTriggerBean getTrigger() {
+    return this.trigger;
+  }
+
+  public void setTrigger(final FlowTriggerBean trigger) {
+    this.trigger = trigger;
+  }
+
   @Override
   public String toString() {
     return "NodeBean{" +
@@ -88,6 +97,7 @@ public class NodeBean implements Serializable {
         ", dependsOn=" + this.dependsOn +
         ", type='" + this.type + '\'' +
         ", nodes=" + this.nodes +
+        ", trigger=" + this.trigger +
         '}';
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 079924a..945fff3 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -24,6 +24,7 @@ import com.google.common.io.Files;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -66,22 +67,33 @@ public class NodeBeanLoader {
   public AzkabanNode toAzkabanNode(final NodeBean nodeBean) {
     if (nodeBean.getType().equals(Constants.FLOW_NODE_TYPE)) {
       return new AzkabanFlow.AzkabanFlowBuilder()
-          .setName(nodeBean.getName())
-          .setProps(nodeBean.getProps())
-          .setDependsOn(nodeBean.getDependsOn())
-          .setNodes(
-              nodeBean.getNodes().stream().map(this::toAzkabanNode).collect(Collectors.toList()))
+          .name(nodeBean.getName())
+          .props(nodeBean.getProps())
+          .dependsOn(nodeBean.getDependsOn())
+          .nodes(nodeBean.getNodes().stream().map(this::toAzkabanNode).collect(Collectors.toList()))
+          .flowTrigger(toFlowTrigger(nodeBean.getTrigger()))
           .build();
     } else {
       return new AzkabanJob.AzkabanJobBuilder()
-          .setName(nodeBean.getName())
-          .setProps(nodeBean.getProps())
-          .setType(nodeBean.getType())
-          .setDependsOn(nodeBean.getDependsOn())
+          .name(nodeBean.getName())
+          .props(nodeBean.getProps())
+          .type(nodeBean.getType())
+          .dependsOn(nodeBean.getDependsOn())
           .build();
     }
   }
 
+  public FlowTrigger toFlowTrigger(final FlowTriggerBean flowTriggerBean) {
+    // Todo jamiesjc: need to validate flowTriggerBean
+    return flowTriggerBean == null ? null
+        : new FlowTrigger(
+            new CronSchedule(flowTriggerBean.getSchedule().get(Constants.SCHEDULE_VALUE)),
+            flowTriggerBean.getTriggerDependencies().stream()
+                .map(d -> new FlowTriggerDependency(d.getName(), d.getType(), d.getParams()))
+                .collect(Collectors.toList()),
+            Duration.ofMinutes(flowTriggerBean.getMaxWaitMins()));
+  }
+
   public String getFlowName(final File flowFile) {
     checkArgument(flowFile.exists());
     checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index ae332a7..5af9ef3 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -220,7 +220,7 @@ public interface ProjectLoader {
       throws ProjectManagerException;
 
   /**
-   * Gets all flow files that's uploaded.
+   * Check if flow file has been uploaded.
    */
   boolean isFlowFileUploaded(int projectId, int projectVersion)
       throws ProjectManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/project/TriggerDependencyBean.java b/azkaban-common/src/main/java/azkaban/project/TriggerDependencyBean.java
new file mode 100644
index 0000000..1b47c80
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/TriggerDependencyBean.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Java bean loaded from YAML file to represent a trigger dependency.
+ */
+public class TriggerDependencyBean {
+
+  private String name;
+  private String type;
+  private Map<String, String> params;
+
+  public String getName() {
+    return this.name;
+  }
+
+  public void setName(final String name) {
+    this.name = name;
+  }
+
+  public String getType() {
+    return this.type;
+  }
+
+  public void setType(final String type) {
+    this.type = type;
+  }
+
+  public Map<String, String> getParams() {
+    return this.params == null ? Collections.emptyMap() : this.params;
+  }
+
+  public void setParams(final Map<String, String> params) {
+    this.params = params;
+  }
+
+  @Override
+  public String toString() {
+    return "TriggerDependencyBean{" +
+        "name='" + this.name + '\'' +
+        ", type='" + this.type + '\'' +
+        ", params=" + this.params +
+        '}';
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index ccdbbc0..2c39a88 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -22,6 +22,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 import azkaban.Constants;
 import azkaban.test.executions.ExecutionsTestUtil;
 import azkaban.utils.Props;
+import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import org.junit.Test;
 
 public class NodeBeanLoaderTest {
@@ -32,6 +37,9 @@ public class NodeBeanLoaderTest {
   private static final String EMBEDDED_FLOW_YML_TEST_DIR = "embeddedflowyamltest";
   private static final String EMBEDDED_FLOW_NAME = "embedded_flow";
   private static final String EMBEDDED_FLOW_YML_FILE = EMBEDDED_FLOW_NAME + ".flow";
+  private static final String TRIGGER_FLOW_YML_TEST_DIR = "flowtriggeryamltest";
+  private static final String TRIGGER_FLOW_NAME = "flow_trigger";
+  private static final String TRIGGER_FLOW_YML_FILE = TRIGGER_FLOW_NAME + ".flow";
   private static final String FLOW_CONFIG_KEY = "flow-level-parameter";
   private static final String FLOW_CONFIG_VALUE = "value";
   private static final String SHELL_END = "shell_end";
@@ -41,85 +49,87 @@ public class NodeBeanLoaderTest {
   private static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
   private static final String ECHO_COMMAND_1 = "echo \"This is an echoed text from embedded_flow1.\"";
   private static final String PWD_COMMAND = "pwd";
+  private static final String BASH_COMMAND = "bash ./sample_script.sh";
   private static final String EMBEDDED_FLOW1 = "embedded_flow1";
   private static final String EMBEDDED_FLOW2 = "embedded_flow2";
   private static final String TYPE_NOOP = "noop";
   private static final String TYPE_COMMAND = "command";
+  private static final int MAX_WAIT_MINS = 5;
+  private static final String CRON_EXPRESSION = "0 0 1 ? * *";
+  private static final String TRIGGER_NAME_1 = "search-impression";
+  private static final String TRIGGER_NAME_2 = "other-name";
+  private static final String TRIGGER_TYPE = "dali-dataset";
+  private static final ImmutableMap<String, String> PARAMS_1 = ImmutableMap
+      .of("view", "search_mp_versioned.search_impression_event_0_0_47", "delay", "1", "window", "1",
+          "unit", "daily", "filter", "is_guest=0");
+  private static final ImmutableMap<String, String> PARAMS_2 = ImmutableMap
+      .of("view", "another dataset",
+          "delay", "1", "window", "7");
 
   @Test
   public void testLoadNodeBeanForBasicFlow() throws Exception {
-
     final NodeBeanLoader loader = new NodeBeanLoader();
     final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
         BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
 
-    assertThat(nodeBean.getName()).isEqualTo(BASIC_FLOW_NAME);
-    assertThat(nodeBean.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
-    assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
-    assertThat(nodeBean.getNodes().size()).isEqualTo(4);
-
-    final NodeBean node0 = nodeBean.getNodes().get(0);
-    assertThat(node0.getName()).isEqualTo(SHELL_END);
-    assertThat(node0.getType()).isEqualTo(TYPE_NOOP);
-    assertThat(node0.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
-
-    final NodeBean node1 = nodeBean.getNodes().get(1);
-    assertThat(node1.getName()).isEqualTo(SHELL_ECHO);
-    assertThat(node1.getType()).isEqualTo(TYPE_COMMAND);
-    assertThat(node1.getConfig().get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+    validateNodeBean(nodeBean, BASIC_FLOW_NAME, Constants.FLOW_NODE_TYPE, FLOW_CONFIG_KEY,
+        FLOW_CONFIG_VALUE, 4, null);
+    validateNodeBean(nodeBean.getNodes().get(0), SHELL_END, TYPE_NOOP, null,
+        null, 0, Arrays.asList(SHELL_PWD, SHELL_ECHO, SHELL_BASH));
+    validateNodeBean(nodeBean.getNodes().get(1), SHELL_ECHO, TYPE_COMMAND, TYPE_COMMAND,
+        ECHO_COMMAND, 0, null);
   }
 
   @Test
   public void testLoadNodeBeanForEmbeddedFlow() throws Exception {
-
     final NodeBeanLoader loader = new NodeBeanLoader();
     final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
         EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
 
-    assertThat(nodeBean.getName()).isEqualTo(EMBEDDED_FLOW_NAME);
-    assertThat(nodeBean.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
-    assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
-    assertThat(nodeBean.getNodes().size()).isEqualTo(4);
-
-    final NodeBean node0 = nodeBean.getNodes().get(0);
-    assertThat(node0.getName()).isEqualTo(SHELL_END);
-    assertThat(node0.getType()).isEqualTo(TYPE_NOOP);
-    assertThat(node0.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1);
-
-    final NodeBean node1 = nodeBean.getNodes().get(1);
-    assertThat(node1.getName()).isEqualTo(SHELL_PWD);
-
-    final NodeBean node2 = nodeBean.getNodes().get(2);
-    assertThat(node2.getName()).isEqualTo(SHELL_ECHO);
-
-    final NodeBean node3 = nodeBean.getNodes().get(3);
-    assertThat(node3.getName()).isEqualTo(EMBEDDED_FLOW1);
-    assertThat(node3.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
-    assertThat(node3.getNodes().size()).isEqualTo(4);
+    validateNodeBean(nodeBean, EMBEDDED_FLOW_NAME, Constants.FLOW_NODE_TYPE, FLOW_CONFIG_KEY,
+        FLOW_CONFIG_VALUE, 4, null);
+    validateNodeBean(nodeBean.getNodes().get(0), SHELL_END, TYPE_NOOP, null,
+        null, 0, Arrays.asList(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1));
+    validateNodeBean(nodeBean.getNodes().get(1), SHELL_PWD, TYPE_COMMAND, TYPE_COMMAND,
+        PWD_COMMAND, 0, null);
+    validateNodeBean(nodeBean.getNodes().get(2), SHELL_ECHO, TYPE_COMMAND, TYPE_COMMAND,
+        ECHO_COMMAND, 0, null);
+    validateNodeBean(nodeBean.getNodes().get(3), EMBEDDED_FLOW1, Constants.FLOW_NODE_TYPE,
+        FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE, 4, null);
 
     // Verify nodes in embedded_flow1 are loaded correctly.
-    final NodeBean node3_0 = node3.getNodes().get(0);
-    assertThat(node3_0.getName()).isEqualTo(SHELL_END);
-
-    final NodeBean node3_1 = node3.getNodes().get(1);
-    assertThat(node3_1.getName()).isEqualTo(SHELL_ECHO);
-
-    final NodeBean node3_2 = node3.getNodes().get(2);
-    assertThat(node3_2.getName()).isEqualTo(EMBEDDED_FLOW2);
-    assertThat(node3_2.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
-    assertThat(node3_2.getDependsOn()).contains(SHELL_BASH);
-    assertThat(node3_2.getNodes().size()).isEqualTo(2);
-
-    final NodeBean node3_3 = node3.getNodes().get(3);
-    assertThat(node3_3.getName()).isEqualTo(SHELL_BASH);
+    final NodeBean embeddedNodeBean1 = nodeBean.getNodes().get(3);
+    validateNodeBean(embeddedNodeBean1.getNodes().get(0), SHELL_END, TYPE_NOOP, null, null, 0,
+        Arrays.asList(SHELL_ECHO, EMBEDDED_FLOW2));
+    validateNodeBean(embeddedNodeBean1.getNodes().get(1), SHELL_ECHO, TYPE_COMMAND, TYPE_COMMAND,
+        ECHO_COMMAND_1, 0, null);
+    validateNodeBean(embeddedNodeBean1.getNodes().get(2), EMBEDDED_FLOW2, Constants.FLOW_NODE_TYPE,
+        FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE, 2, Arrays.asList(SHELL_BASH));
+    validateNodeBean(embeddedNodeBean1.getNodes().get(3), SHELL_BASH, TYPE_COMMAND, TYPE_COMMAND,
+        BASH_COMMAND, 0, null);
 
     // Verify nodes in embedded_flow2 are loaded correctly.
-    final NodeBean node3_2_0 = node3_2.getNodes().get(0);
-    assertThat(node3_2_0.getName()).isEqualTo(SHELL_END);
-
-    final NodeBean node3_2_1 = node3_2.getNodes().get(1);
-    assertThat(node3_2_1.getName()).isEqualTo(SHELL_PWD);
+    validateNodeBean(embeddedNodeBean1.getNodes().get(2).getNodes().get(0), SHELL_END, TYPE_NOOP,
+        null,
+        null, 0, Arrays.asList(SHELL_PWD));
+    validateNodeBean(embeddedNodeBean1.getNodes().get(2).getNodes().get(1), SHELL_PWD, TYPE_COMMAND,
+        TYPE_COMMAND, PWD_COMMAND, 0, null);
+  }
 
+  @Test
+  public void testLoadNodeBeanForFlowTrigger() throws Exception {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
+    final Map<String, String> schedule = ImmutableMap.of(Constants.SCHEDULE_TYPE, Constants
+        .CRON_SCHEDULE_TYPE, Constants.SCHEDULE_VALUE, CRON_EXPRESSION);
+    validateFlowTriggerBean(nodeBean.getTrigger(), MAX_WAIT_MINS, schedule, 2);
+    final List<TriggerDependencyBean> triggerDependencyBeans = nodeBean.getTrigger()
+        .getTriggerDependencies();
+    validateTriggerDependencyBean(triggerDependencyBeans.get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
+        PARAMS_1);
+    validateTriggerDependencyBean(triggerDependencyBeans.get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
+        PARAMS_2);
   }
 
   @Test
@@ -129,23 +139,22 @@ public class NodeBeanLoaderTest {
         BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
     final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
 
-    assertThat(flow.getName()).isEqualTo(BASIC_FLOW_NAME);
-    assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
-    assertThat(flow.getNodes().size()).isEqualTo(4);
-
-    final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
-    assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
-    assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
-    assertThat(shellEnd.getProps().size()).isEqualTo(1);
-    assertThat(shellEnd.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_NOOP);
-    assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
-
-    final AzkabanJob shellEcho = (AzkabanJob) flow.getNode(SHELL_ECHO);
-    assertThat(shellEcho.getName()).isEqualTo(SHELL_ECHO);
-    assertThat(shellEcho.getType()).isEqualTo(TYPE_COMMAND);
-    assertThat(shellEcho.getProps().size()).isEqualTo(2);
-    assertThat(shellEcho.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
-    assertThat(shellEcho.getProps().get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+    final Props props = new Props();
+    props.put(Constants.NODE_TYPE, Constants.FLOW_NODE_TYPE);
+    props.put(FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE);
+    validateAzkabanNode(flow, BASIC_FLOW_NAME, Constants.FLOW_NODE_TYPE, props, Arrays.asList
+        (SHELL_END, SHELL_PWD, SHELL_ECHO, SHELL_BASH), null);
+
+    final Props props1 = new Props();
+    props1.put(Constants.NODE_TYPE, TYPE_NOOP);
+    validateAzkabanNode(flow.getNode(SHELL_END), SHELL_END, TYPE_NOOP, props1, null,
+        Arrays.asList(SHELL_PWD, SHELL_ECHO, SHELL_BASH));
+
+    final Props props2 = new Props();
+    props2.put(Constants.NODE_TYPE, TYPE_COMMAND);
+    props2.put(TYPE_COMMAND, ECHO_COMMAND);
+    validateAzkabanNode(flow.getNode(SHELL_ECHO), SHELL_ECHO, TYPE_COMMAND, props2, null,
+        null);
   }
 
   @Test
@@ -155,31 +164,56 @@ public class NodeBeanLoaderTest {
         EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
     final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
 
-    assertThat(flow.getName()).isEqualTo(EMBEDDED_FLOW_NAME);
-    assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
-    assertThat(flow.getNodes().size()).isEqualTo(4);
+    final Props props = new Props();
+    props.put(Constants.NODE_TYPE, Constants.FLOW_NODE_TYPE);
+    props.put(FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE);
+    validateAzkabanNode(flow, EMBEDDED_FLOW_NAME, Constants.FLOW_NODE_TYPE, props, Arrays.asList
+        (SHELL_END, SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1), null);
 
-    final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
-    assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
-    assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
-    assertThat(shellEnd.getProps().size()).isEqualTo(1);
-    assertThat(shellEnd.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_NOOP);
-    assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1);
+    final Props props1 = new Props();
+    props1.put(Constants.NODE_TYPE, TYPE_NOOP);
+    validateAzkabanNode(flow.getNode(SHELL_END), SHELL_END, TYPE_NOOP, props1, null,
+        Arrays.asList(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1));
 
+    final Props props2 = new Props();
+    props2.put(Constants.NODE_TYPE, Constants.FLOW_NODE_TYPE);
+    props2.put(FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE);
     final AzkabanFlow embeddedFlow1 = (AzkabanFlow) flow.getNode(EMBEDDED_FLOW1);
-    assertThat(embeddedFlow1.getName()).isEqualTo(EMBEDDED_FLOW1);
-    assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
-    assertThat(embeddedFlow1.getNodes().size()).isEqualTo(4);
-    assertThat(embeddedFlow1.getNodes().containsKey(EMBEDDED_FLOW2)).isTrue();
+    validateAzkabanNode(embeddedFlow1, EMBEDDED_FLOW1, Constants.FLOW_NODE_TYPE, props2,
+        Arrays.asList
+            (SHELL_END, SHELL_BASH, SHELL_ECHO, EMBEDDED_FLOW2), null);
 
     final AzkabanFlow embeddedFlow2 = (AzkabanFlow) embeddedFlow1.getNode(EMBEDDED_FLOW2);
-    assertThat(embeddedFlow2.getName()).isEqualTo(EMBEDDED_FLOW2);
-    assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
-    assertThat(embeddedFlow2.getDependsOn()).contains(SHELL_BASH);
-    assertThat(embeddedFlow2.getNodes().size()).isEqualTo(2);
-    assertThat(embeddedFlow2.getNodes().containsKey(SHELL_END)).isTrue();
-    assertThat(embeddedFlow2.getNodes().containsKey(SHELL_PWD)).isTrue();
+    validateAzkabanNode(embeddedFlow2, EMBEDDED_FLOW2, Constants.FLOW_NODE_TYPE,
+        props2, Arrays.asList(SHELL_END, SHELL_PWD), Arrays.asList(SHELL_BASH));
+  }
 
+  @Test
+  public void testToFlowTrigger() throws Exception {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
+    final FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
+    validateFlowTrigger(flowTrigger, MAX_WAIT_MINS, CRON_EXPRESSION, 2);
+    validateTriggerDependency(flowTrigger.getDependencies().get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
+        PARAMS_1);
+    validateTriggerDependency(flowTrigger.getDependencies().get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
+        PARAMS_2);
+  }
+
+  @Test
+  public void testToAzkabanFlowWithFlowTrigger() throws Exception {
+    final NodeBeanLoader loader = new NodeBeanLoader();
+    final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+        TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
+    final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
+    validateFlowTrigger(flow.getFlowTrigger(), MAX_WAIT_MINS, CRON_EXPRESSION, 2);
+    validateTriggerDependency(flow.getFlowTrigger().getDependencies().get(0), TRIGGER_NAME_1,
+        TRIGGER_TYPE,
+        PARAMS_1);
+    validateTriggerDependency(flow.getFlowTrigger().getDependencies().get(1), TRIGGER_NAME_2,
+        TRIGGER_TYPE,
+        PARAMS_2);
   }
 
   @Test
@@ -241,4 +275,73 @@ public class NodeBeanLoaderTest {
     assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
     assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(PWD_COMMAND);
   }
+
+  @Test
+  public void testGetFlowTrigger() {
+    final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(
+        ExecutionsTestUtil.getFlowFile(TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
+    validateFlowTrigger(flowTrigger, MAX_WAIT_MINS, CRON_EXPRESSION, 2);
+    validateTriggerDependency(flowTrigger.getDependencies().get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
+        PARAMS_1);
+    validateTriggerDependency(flowTrigger.getDependencies().get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
+        PARAMS_2);
+  }
+
+  private void validateNodeBean(final NodeBean nodeBean, final String name, final String type,
+      final String configKey,
+      final String configValue, final int numNodes, final List<String> dependsOn) {
+    assertThat(nodeBean.getName()).isEqualTo(name);
+    assertThat(nodeBean.getType()).isEqualTo(type);
+    if (configKey != null) {
+      assertThat(nodeBean.getConfig().get(configKey)).isEqualTo(configValue);
+    }
+    if (numNodes != 0) {
+      assertThat(nodeBean.getNodes().size()).isEqualTo(numNodes);
+    }
+    if (dependsOn != null) {
+      assertThat(nodeBean.getDependsOn()).containsOnlyElementsOf(dependsOn);
+    }
+  }
+
+  private void validateAzkabanNode(final AzkabanNode azkabanNode, final String name,
+      final String type, final Props props, final List<String> nodeList, final List<String>
+      dependsOn) {
+    assertThat(azkabanNode.getName()).isEqualTo(name);
+    assertThat(azkabanNode.getType()).isEqualTo(type);
+    assertThat(azkabanNode.getProps()).isEqualTo(props);
+    if (azkabanNode instanceof AzkabanFlow) {
+      assertThat(((AzkabanFlow) azkabanNode).getNodes().keySet()).containsOnlyElementsOf(nodeList);
+    }
+    if (dependsOn != null) {
+      assertThat(azkabanNode.getDependsOn()).containsOnlyElementsOf(dependsOn);
+    }
+  }
+
+  private void validateFlowTriggerBean(final FlowTriggerBean flowTriggerBean, final int
+      maxWaitMins, final Map<String, String> schedule, final int numDependencies) {
+    assertThat(flowTriggerBean.getMaxWaitMins()).isEqualTo(maxWaitMins);
+    assertThat(flowTriggerBean.getSchedule()).isEqualTo(schedule);
+    assertThat(flowTriggerBean.getTriggerDependencies().size()).isEqualTo(numDependencies);
+  }
+
+  private void validateTriggerDependencyBean(final TriggerDependencyBean triggerDependencyBean,
+      final String name, final String type, final Map<String, String> params) {
+    assertThat(triggerDependencyBean.getName()).isEqualTo(name);
+    assertThat(triggerDependencyBean.getType()).isEqualTo(type);
+    assertThat(triggerDependencyBean.getParams()).isEqualTo(params);
+  }
+
+  private void validateFlowTrigger(final FlowTrigger flowTrigger, final long maxWaitMins, final
+  String cronExpression, final int numDependencies) {
+    assertThat(flowTrigger.getMaxWaitDuration()).isEqualTo(Duration.ofMinutes(maxWaitMins));
+    assertThat(flowTrigger.getSchedule().getCronExpression()).isEqualTo(cronExpression);
+    assertThat(flowTrigger.getDependencies().size()).isEqualTo(numDependencies);
+  }
+
+  private void validateTriggerDependency(final FlowTriggerDependency flowTriggerDependency, final
+  String name, final String type, final Map<String, String> params) {
+    assertThat(flowTriggerDependency.getName()).isEqualTo(name);
+    assertThat(flowTriggerDependency.getType()).isEqualTo(type);
+    assertThat(flowTriggerDependency.getProps()).isEqualTo(params);
+  }
 }
diff --git a/test/execution-test-data/basicflowyamltest/Archive.zip b/test/execution-test-data/basicflowyamltest/Archive.zip
index 5137202..1f9abe6 100644
Binary files a/test/execution-test-data/basicflowyamltest/Archive.zip and b/test/execution-test-data/basicflowyamltest/Archive.zip differ
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger.flow
new file mode 100644
index 0000000..671bfdc
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger.flow
@@ -0,0 +1,58 @@
+---
+# Flow trigger
+trigger:
+  maxWaitMins: 5
+  schedule:
+    type: cron
+    value: 0 0 1 ? * *
+
+  triggerDependencies:
+    - name: search-impression # an unique name to identify the dependency
+      type: dali-dataset
+      params:
+        view: search_mp_versioned.search_impression_event_0_0_47
+        delay: 1
+        window: 1
+        unit: daily
+        filter: is_guest=0
+
+    - name: other-name
+      type: dali-dataset
+      params:
+        view: another dataset
+        delay: 1
+        window: 7
+
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger.project b/test/execution-test-data/flowtriggeryamltest/flow_trigger.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0