azkaban-aplcache
Changes
Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 60f46ea..98586ae 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -46,6 +46,11 @@ public class Constants {
// Flow 2.0 flow and job path delimiter
public static final String PATH_DELIMITER = ":";
+ // Flow trigger props
+ public static final String SCHEDULE_TYPE = "type";
+ public static final String CRON_SCHEDULE_TYPE = "cron";
+ public static final String SCHEDULE_VALUE = "value";
+
// Names and paths of various file names to configure Azkaban
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
index 997719e..2dba398 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -34,11 +34,13 @@ import java.util.Map;
public class AzkabanFlow extends AzkabanNode {
private final Map<String, AzkabanNode> nodes;
+ private final FlowTrigger flowTrigger;
- private AzkabanFlow(final String name, final Props props,
- final Map<String, AzkabanNode> nodes, final List<String> dependsOn) {
+ private AzkabanFlow(final String name, final Props props, final Map<String, AzkabanNode> nodes,
+ final List<String> dependsOn, final FlowTrigger flowTrigger) {
super(name, Constants.FLOW_NODE_TYPE, props, dependsOn);
this.nodes = nodes;
+ this.flowTrigger = flowTrigger;
}
public Map<String, AzkabanNode> getNodes() {
@@ -49,31 +51,36 @@ public class AzkabanFlow extends AzkabanNode {
return this.nodes.get(name);
}
+ public FlowTrigger getFlowTrigger() {
+ return this.flowTrigger;
+ }
+
public static class AzkabanFlowBuilder {
private String name;
private Props props;
private List<String> dependsOn;
private Map<String, AzkabanNode> nodes;
+ private FlowTrigger flowTrigger;
- public AzkabanFlowBuilder setName(final String name) {
+ public AzkabanFlowBuilder name(final String name) {
this.name = name;
return this;
}
- public AzkabanFlowBuilder setProps(final Props props) {
+ public AzkabanFlowBuilder props(final Props props) {
this.props = props;
return this;
}
- public AzkabanFlowBuilder setDependsOn(final List<String> dependsOn) {
+ public AzkabanFlowBuilder dependsOn(final List<String> dependsOn) {
this.dependsOn = dependsOn == null
? Collections.emptyList()
: ImmutableList.copyOf(dependsOn);
return this;
}
- public AzkabanFlowBuilder setNodes(final Collection<? extends AzkabanNode> azkabanNodes) {
+ public AzkabanFlowBuilder nodes(final Collection<? extends AzkabanNode> azkabanNodes) {
final Map<String, AzkabanNode> tempNodes = new HashMap<>();
for (final AzkabanNode node : azkabanNodes) {
tempNodes.put(node.getName(), node);
@@ -82,8 +89,13 @@ public class AzkabanFlow extends AzkabanNode {
return this;
}
+ public AzkabanFlowBuilder flowTrigger(final FlowTrigger flowTrigger) {
+ this.flowTrigger = flowTrigger;
+ return this;
+ }
+
public AzkabanFlow build() {
- return new AzkabanFlow(this.name, this.props, this.nodes, this.dependsOn);
+ return new AzkabanFlow(this.name, this.props, this.nodes, this.dependsOn, this.flowTrigger);
}
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
index 9a60740..128e93f 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
@@ -40,22 +40,22 @@ public class AzkabanJob extends AzkabanNode {
private Props props;
private List<String> dependsOn;
- public AzkabanJobBuilder setName(final String name) {
+ public AzkabanJobBuilder name(final String name) {
this.name = name;
return this;
}
- public AzkabanJobBuilder setType(final String type) {
+ public AzkabanJobBuilder type(final String type) {
this.type = type;
return this;
}
- public AzkabanJobBuilder setProps(final Props props) {
+ public AzkabanJobBuilder props(final Props props) {
this.props = props;
return this;
}
- public AzkabanJobBuilder setDependsOn(final List<String> dependsOn) {
+ public AzkabanJobBuilder dependsOn(final List<String> dependsOn) {
// A node may or may not have dependencies.
this.dependsOn = dependsOn == null
? Collections.emptyList()
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index aa083be..d16c7e4 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -90,6 +90,17 @@ public class FlowLoaderUtils {
return false;
}
+ public static FlowTrigger getFlowTriggerFromYamlFile(final File flowFile) {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+ try {
+ final NodeBean nodeBean = loader.load(flowFile);
+ return loader.toFlowTrigger(nodeBean.getTrigger());
+ } catch (final FileNotFoundException e) {
+ logger.error("Failed to get flow trigger, error loading flow YAML file " + flowFile);
+ }
+ return null;
+ }
+
/**
* Adds email properties to a flow.
*
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
index 98d5fb1..9a26c73 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTrigger.java
@@ -100,4 +100,7 @@ public class FlowTrigger {
return this.maxWaitDuration;
}
+ public CronSchedule getSchedule() {
+ return this.schedule;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
new file mode 100644
index 0000000..11dbe8b
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowTriggerBean.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Java bean loaded from YAML file to represent a flow trigger.
+ */
+public class FlowTriggerBean {
+
+ private int maxWaitMins;
+ private Map<String, String> schedule;
+ private List<TriggerDependencyBean> triggerDependencies;
+
+ public int getMaxWaitMins() {
+ return this.maxWaitMins;
+ }
+
+ public void setMaxWaitMins(final int maxWaitMins) {
+ this.maxWaitMins = maxWaitMins;
+ }
+
+ public Map<String, String> getSchedule() {
+ return this.schedule;
+ }
+
+ public void setSchedule(final Map<String, String> schedule) {
+ this.schedule = schedule;
+ }
+
+ public List<TriggerDependencyBean> getTriggerDependencies() {
+ return this.triggerDependencies == null ? Collections.emptyList() : this.triggerDependencies;
+ }
+
+ public void setTriggerDependencies(
+ final List<TriggerDependencyBean> triggerDependencies) {
+ this.triggerDependencies = triggerDependencies;
+ }
+
+ @Override
+ public String toString() {
+ return "FlowTriggerBean{" +
+ "maxWaitMins='" + this.maxWaitMins + '\'' +
+ ", schedule=" + this.schedule +
+ ", triggerDependencies=" + this.triggerDependencies +
+ '}';
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBean.java b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
index 5cafc37..370a8b9 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
@@ -33,6 +33,7 @@ public class NodeBean implements Serializable {
private List<String> dependsOn;
private String type;
private List<NodeBean> nodes;
+ private FlowTriggerBean trigger;
public String getName() {
return this.name;
@@ -80,6 +81,14 @@ public class NodeBean implements Serializable {
return props;
}
+ public FlowTriggerBean getTrigger() {
+ return this.trigger;
+ }
+
+ public void setTrigger(final FlowTriggerBean trigger) {
+ this.trigger = trigger;
+ }
+
@Override
public String toString() {
return "NodeBean{" +
@@ -88,6 +97,7 @@ public class NodeBean implements Serializable {
", dependsOn=" + this.dependsOn +
", type='" + this.type + '\'' +
", nodes=" + this.nodes +
+ ", trigger=" + this.trigger +
'}';
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 079924a..945fff3 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -24,6 +24,7 @@ import com.google.common.io.Files;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
@@ -66,22 +67,33 @@ public class NodeBeanLoader {
public AzkabanNode toAzkabanNode(final NodeBean nodeBean) {
if (nodeBean.getType().equals(Constants.FLOW_NODE_TYPE)) {
return new AzkabanFlow.AzkabanFlowBuilder()
- .setName(nodeBean.getName())
- .setProps(nodeBean.getProps())
- .setDependsOn(nodeBean.getDependsOn())
- .setNodes(
- nodeBean.getNodes().stream().map(this::toAzkabanNode).collect(Collectors.toList()))
+ .name(nodeBean.getName())
+ .props(nodeBean.getProps())
+ .dependsOn(nodeBean.getDependsOn())
+ .nodes(nodeBean.getNodes().stream().map(this::toAzkabanNode).collect(Collectors.toList()))
+ .flowTrigger(toFlowTrigger(nodeBean.getTrigger()))
.build();
} else {
return new AzkabanJob.AzkabanJobBuilder()
- .setName(nodeBean.getName())
- .setProps(nodeBean.getProps())
- .setType(nodeBean.getType())
- .setDependsOn(nodeBean.getDependsOn())
+ .name(nodeBean.getName())
+ .props(nodeBean.getProps())
+ .type(nodeBean.getType())
+ .dependsOn(nodeBean.getDependsOn())
.build();
}
}
+ public FlowTrigger toFlowTrigger(final FlowTriggerBean flowTriggerBean) {
+ // Todo jamiesjc: need to validate flowTriggerBean
+ return flowTriggerBean == null ? null
+ : new FlowTrigger(
+ new CronSchedule(flowTriggerBean.getSchedule().get(Constants.SCHEDULE_VALUE)),
+ flowTriggerBean.getTriggerDependencies().stream()
+ .map(d -> new FlowTriggerDependency(d.getName(), d.getType(), d.getParams()))
+ .collect(Collectors.toList()),
+ Duration.ofMinutes(flowTriggerBean.getMaxWaitMins()));
+ }
+
public String getFlowName(final File flowFile) {
checkArgument(flowFile.exists());
checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index ae332a7..5af9ef3 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -220,7 +220,7 @@ public interface ProjectLoader {
throws ProjectManagerException;
/**
- * Gets all flow files that's uploaded.
+ * Check if flow file has been uploaded.
*/
boolean isFlowFileUploaded(int projectId, int projectVersion)
throws ProjectManagerException;
diff --git a/azkaban-common/src/main/java/azkaban/project/TriggerDependencyBean.java b/azkaban-common/src/main/java/azkaban/project/TriggerDependencyBean.java
new file mode 100644
index 0000000..1b47c80
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/TriggerDependencyBean.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Java bean loaded from YAML file to represent a trigger dependency.
+ */
+public class TriggerDependencyBean {
+
+ private String name;
+ private String type;
+ private Map<String, String> params;
+
+ public String getName() {
+ return this.name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ public String getType() {
+ return this.type;
+ }
+
+ public void setType(final String type) {
+ this.type = type;
+ }
+
+ public Map<String, String> getParams() {
+ return this.params == null ? Collections.emptyMap() : this.params;
+ }
+
+ public void setParams(final Map<String, String> params) {
+ this.params = params;
+ }
+
+ @Override
+ public String toString() {
+ return "TriggerDependencyBean{" +
+ "name='" + this.name + '\'' +
+ ", type='" + this.type + '\'' +
+ ", params=" + this.params +
+ '}';
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index ccdbbc0..2c39a88 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -22,6 +22,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import azkaban.Constants;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
+import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import org.junit.Test;
public class NodeBeanLoaderTest {
@@ -32,6 +37,9 @@ public class NodeBeanLoaderTest {
private static final String EMBEDDED_FLOW_YML_TEST_DIR = "embeddedflowyamltest";
private static final String EMBEDDED_FLOW_NAME = "embedded_flow";
private static final String EMBEDDED_FLOW_YML_FILE = EMBEDDED_FLOW_NAME + ".flow";
+ private static final String TRIGGER_FLOW_YML_TEST_DIR = "flowtriggeryamltest";
+ private static final String TRIGGER_FLOW_NAME = "flow_trigger";
+ private static final String TRIGGER_FLOW_YML_FILE = TRIGGER_FLOW_NAME + ".flow";
private static final String FLOW_CONFIG_KEY = "flow-level-parameter";
private static final String FLOW_CONFIG_VALUE = "value";
private static final String SHELL_END = "shell_end";
@@ -41,85 +49,87 @@ public class NodeBeanLoaderTest {
private static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
private static final String ECHO_COMMAND_1 = "echo \"This is an echoed text from embedded_flow1.\"";
private static final String PWD_COMMAND = "pwd";
+ private static final String BASH_COMMAND = "bash ./sample_script.sh";
private static final String EMBEDDED_FLOW1 = "embedded_flow1";
private static final String EMBEDDED_FLOW2 = "embedded_flow2";
private static final String TYPE_NOOP = "noop";
private static final String TYPE_COMMAND = "command";
+ private static final int MAX_WAIT_MINS = 5;
+ private static final String CRON_EXPRESSION = "0 0 1 ? * *";
+ private static final String TRIGGER_NAME_1 = "search-impression";
+ private static final String TRIGGER_NAME_2 = "other-name";
+ private static final String TRIGGER_TYPE = "dali-dataset";
+ private static final ImmutableMap<String, String> PARAMS_1 = ImmutableMap
+ .of("view", "search_mp_versioned.search_impression_event_0_0_47", "delay", "1", "window", "1",
+ "unit", "daily", "filter", "is_guest=0");
+ private static final ImmutableMap<String, String> PARAMS_2 = ImmutableMap
+ .of("view", "another dataset",
+ "delay", "1", "window", "7");
@Test
public void testLoadNodeBeanForBasicFlow() throws Exception {
-
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
- assertThat(nodeBean.getName()).isEqualTo(BASIC_FLOW_NAME);
- assertThat(nodeBean.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
- assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
- assertThat(nodeBean.getNodes().size()).isEqualTo(4);
-
- final NodeBean node0 = nodeBean.getNodes().get(0);
- assertThat(node0.getName()).isEqualTo(SHELL_END);
- assertThat(node0.getType()).isEqualTo(TYPE_NOOP);
- assertThat(node0.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
-
- final NodeBean node1 = nodeBean.getNodes().get(1);
- assertThat(node1.getName()).isEqualTo(SHELL_ECHO);
- assertThat(node1.getType()).isEqualTo(TYPE_COMMAND);
- assertThat(node1.getConfig().get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+ validateNodeBean(nodeBean, BASIC_FLOW_NAME, Constants.FLOW_NODE_TYPE, FLOW_CONFIG_KEY,
+ FLOW_CONFIG_VALUE, 4, null);
+ validateNodeBean(nodeBean.getNodes().get(0), SHELL_END, TYPE_NOOP, null,
+ null, 0, Arrays.asList(SHELL_PWD, SHELL_ECHO, SHELL_BASH));
+ validateNodeBean(nodeBean.getNodes().get(1), SHELL_ECHO, TYPE_COMMAND, TYPE_COMMAND,
+ ECHO_COMMAND, 0, null);
}
@Test
public void testLoadNodeBeanForEmbeddedFlow() throws Exception {
-
final NodeBeanLoader loader = new NodeBeanLoader();
final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
- assertThat(nodeBean.getName()).isEqualTo(EMBEDDED_FLOW_NAME);
- assertThat(nodeBean.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
- assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
- assertThat(nodeBean.getNodes().size()).isEqualTo(4);
-
- final NodeBean node0 = nodeBean.getNodes().get(0);
- assertThat(node0.getName()).isEqualTo(SHELL_END);
- assertThat(node0.getType()).isEqualTo(TYPE_NOOP);
- assertThat(node0.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1);
-
- final NodeBean node1 = nodeBean.getNodes().get(1);
- assertThat(node1.getName()).isEqualTo(SHELL_PWD);
-
- final NodeBean node2 = nodeBean.getNodes().get(2);
- assertThat(node2.getName()).isEqualTo(SHELL_ECHO);
-
- final NodeBean node3 = nodeBean.getNodes().get(3);
- assertThat(node3.getName()).isEqualTo(EMBEDDED_FLOW1);
- assertThat(node3.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
- assertThat(node3.getNodes().size()).isEqualTo(4);
+ validateNodeBean(nodeBean, EMBEDDED_FLOW_NAME, Constants.FLOW_NODE_TYPE, FLOW_CONFIG_KEY,
+ FLOW_CONFIG_VALUE, 4, null);
+ validateNodeBean(nodeBean.getNodes().get(0), SHELL_END, TYPE_NOOP, null,
+ null, 0, Arrays.asList(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1));
+ validateNodeBean(nodeBean.getNodes().get(1), SHELL_PWD, TYPE_COMMAND, TYPE_COMMAND,
+ PWD_COMMAND, 0, null);
+ validateNodeBean(nodeBean.getNodes().get(2), SHELL_ECHO, TYPE_COMMAND, TYPE_COMMAND,
+ ECHO_COMMAND, 0, null);
+ validateNodeBean(nodeBean.getNodes().get(3), EMBEDDED_FLOW1, Constants.FLOW_NODE_TYPE,
+ FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE, 4, null);
// Verify nodes in embedded_flow1 are loaded correctly.
- final NodeBean node3_0 = node3.getNodes().get(0);
- assertThat(node3_0.getName()).isEqualTo(SHELL_END);
-
- final NodeBean node3_1 = node3.getNodes().get(1);
- assertThat(node3_1.getName()).isEqualTo(SHELL_ECHO);
-
- final NodeBean node3_2 = node3.getNodes().get(2);
- assertThat(node3_2.getName()).isEqualTo(EMBEDDED_FLOW2);
- assertThat(node3_2.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
- assertThat(node3_2.getDependsOn()).contains(SHELL_BASH);
- assertThat(node3_2.getNodes().size()).isEqualTo(2);
-
- final NodeBean node3_3 = node3.getNodes().get(3);
- assertThat(node3_3.getName()).isEqualTo(SHELL_BASH);
+ final NodeBean embeddedNodeBean1 = nodeBean.getNodes().get(3);
+ validateNodeBean(embeddedNodeBean1.getNodes().get(0), SHELL_END, TYPE_NOOP, null, null, 0,
+ Arrays.asList(SHELL_ECHO, EMBEDDED_FLOW2));
+ validateNodeBean(embeddedNodeBean1.getNodes().get(1), SHELL_ECHO, TYPE_COMMAND, TYPE_COMMAND,
+ ECHO_COMMAND_1, 0, null);
+ validateNodeBean(embeddedNodeBean1.getNodes().get(2), EMBEDDED_FLOW2, Constants.FLOW_NODE_TYPE,
+ FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE, 2, Arrays.asList(SHELL_BASH));
+ validateNodeBean(embeddedNodeBean1.getNodes().get(3), SHELL_BASH, TYPE_COMMAND, TYPE_COMMAND,
+ BASH_COMMAND, 0, null);
// Verify nodes in embedded_flow2 are loaded correctly.
- final NodeBean node3_2_0 = node3_2.getNodes().get(0);
- assertThat(node3_2_0.getName()).isEqualTo(SHELL_END);
-
- final NodeBean node3_2_1 = node3_2.getNodes().get(1);
- assertThat(node3_2_1.getName()).isEqualTo(SHELL_PWD);
+ validateNodeBean(embeddedNodeBean1.getNodes().get(2).getNodes().get(0), SHELL_END, TYPE_NOOP,
+ null,
+ null, 0, Arrays.asList(SHELL_PWD));
+ validateNodeBean(embeddedNodeBean1.getNodes().get(2).getNodes().get(1), SHELL_PWD, TYPE_COMMAND,
+ TYPE_COMMAND, PWD_COMMAND, 0, null);
+ }
+ @Test
+ public void testLoadNodeBeanForFlowTrigger() throws Exception {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
+ final Map<String, String> schedule = ImmutableMap.of(Constants.SCHEDULE_TYPE, Constants
+ .CRON_SCHEDULE_TYPE, Constants.SCHEDULE_VALUE, CRON_EXPRESSION);
+ validateFlowTriggerBean(nodeBean.getTrigger(), MAX_WAIT_MINS, schedule, 2);
+ final List<TriggerDependencyBean> triggerDependencyBeans = nodeBean.getTrigger()
+ .getTriggerDependencies();
+ validateTriggerDependencyBean(triggerDependencyBeans.get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
+ PARAMS_1);
+ validateTriggerDependencyBean(triggerDependencyBeans.get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
+ PARAMS_2);
}
@Test
@@ -129,23 +139,22 @@ public class NodeBeanLoaderTest {
BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
- assertThat(flow.getName()).isEqualTo(BASIC_FLOW_NAME);
- assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
- assertThat(flow.getNodes().size()).isEqualTo(4);
-
- final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
- assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
- assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
- assertThat(shellEnd.getProps().size()).isEqualTo(1);
- assertThat(shellEnd.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_NOOP);
- assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
-
- final AzkabanJob shellEcho = (AzkabanJob) flow.getNode(SHELL_ECHO);
- assertThat(shellEcho.getName()).isEqualTo(SHELL_ECHO);
- assertThat(shellEcho.getType()).isEqualTo(TYPE_COMMAND);
- assertThat(shellEcho.getProps().size()).isEqualTo(2);
- assertThat(shellEcho.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
- assertThat(shellEcho.getProps().get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+ final Props props = new Props();
+ props.put(Constants.NODE_TYPE, Constants.FLOW_NODE_TYPE);
+ props.put(FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE);
+ validateAzkabanNode(flow, BASIC_FLOW_NAME, Constants.FLOW_NODE_TYPE, props, Arrays.asList
+ (SHELL_END, SHELL_PWD, SHELL_ECHO, SHELL_BASH), null);
+
+ final Props props1 = new Props();
+ props1.put(Constants.NODE_TYPE, TYPE_NOOP);
+ validateAzkabanNode(flow.getNode(SHELL_END), SHELL_END, TYPE_NOOP, props1, null,
+ Arrays.asList(SHELL_PWD, SHELL_ECHO, SHELL_BASH));
+
+ final Props props2 = new Props();
+ props2.put(Constants.NODE_TYPE, TYPE_COMMAND);
+ props2.put(TYPE_COMMAND, ECHO_COMMAND);
+ validateAzkabanNode(flow.getNode(SHELL_ECHO), SHELL_ECHO, TYPE_COMMAND, props2, null,
+ null);
}
@Test
@@ -155,31 +164,56 @@ public class NodeBeanLoaderTest {
EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
- assertThat(flow.getName()).isEqualTo(EMBEDDED_FLOW_NAME);
- assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
- assertThat(flow.getNodes().size()).isEqualTo(4);
+ final Props props = new Props();
+ props.put(Constants.NODE_TYPE, Constants.FLOW_NODE_TYPE);
+ props.put(FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE);
+ validateAzkabanNode(flow, EMBEDDED_FLOW_NAME, Constants.FLOW_NODE_TYPE, props, Arrays.asList
+ (SHELL_END, SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1), null);
- final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
- assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
- assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
- assertThat(shellEnd.getProps().size()).isEqualTo(1);
- assertThat(shellEnd.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_NOOP);
- assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1);
+ final Props props1 = new Props();
+ props1.put(Constants.NODE_TYPE, TYPE_NOOP);
+ validateAzkabanNode(flow.getNode(SHELL_END), SHELL_END, TYPE_NOOP, props1, null,
+ Arrays.asList(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1));
+ final Props props2 = new Props();
+ props2.put(Constants.NODE_TYPE, Constants.FLOW_NODE_TYPE);
+ props2.put(FLOW_CONFIG_KEY, FLOW_CONFIG_VALUE);
final AzkabanFlow embeddedFlow1 = (AzkabanFlow) flow.getNode(EMBEDDED_FLOW1);
- assertThat(embeddedFlow1.getName()).isEqualTo(EMBEDDED_FLOW1);
- assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
- assertThat(embeddedFlow1.getNodes().size()).isEqualTo(4);
- assertThat(embeddedFlow1.getNodes().containsKey(EMBEDDED_FLOW2)).isTrue();
+ validateAzkabanNode(embeddedFlow1, EMBEDDED_FLOW1, Constants.FLOW_NODE_TYPE, props2,
+ Arrays.asList
+ (SHELL_END, SHELL_BASH, SHELL_ECHO, EMBEDDED_FLOW2), null);
final AzkabanFlow embeddedFlow2 = (AzkabanFlow) embeddedFlow1.getNode(EMBEDDED_FLOW2);
- assertThat(embeddedFlow2.getName()).isEqualTo(EMBEDDED_FLOW2);
- assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
- assertThat(embeddedFlow2.getDependsOn()).contains(SHELL_BASH);
- assertThat(embeddedFlow2.getNodes().size()).isEqualTo(2);
- assertThat(embeddedFlow2.getNodes().containsKey(SHELL_END)).isTrue();
- assertThat(embeddedFlow2.getNodes().containsKey(SHELL_PWD)).isTrue();
+ validateAzkabanNode(embeddedFlow2, EMBEDDED_FLOW2, Constants.FLOW_NODE_TYPE,
+ props2, Arrays.asList(SHELL_END, SHELL_PWD), Arrays.asList(SHELL_BASH));
+ }
+ @Test
+ public void testToFlowTrigger() throws Exception {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
+ final FlowTrigger flowTrigger = loader.toFlowTrigger(nodeBean.getTrigger());
+ validateFlowTrigger(flowTrigger, MAX_WAIT_MINS, CRON_EXPRESSION, 2);
+ validateTriggerDependency(flowTrigger.getDependencies().get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
+ PARAMS_1);
+ validateTriggerDependency(flowTrigger.getDependencies().get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
+ PARAMS_2);
+ }
+
+ @Test
+ public void testToAzkabanFlowWithFlowTrigger() throws Exception {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
+ final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
+ validateFlowTrigger(flow.getFlowTrigger(), MAX_WAIT_MINS, CRON_EXPRESSION, 2);
+ validateTriggerDependency(flow.getFlowTrigger().getDependencies().get(0), TRIGGER_NAME_1,
+ TRIGGER_TYPE,
+ PARAMS_1);
+ validateTriggerDependency(flow.getFlowTrigger().getDependencies().get(1), TRIGGER_NAME_2,
+ TRIGGER_TYPE,
+ PARAMS_2);
}
@Test
@@ -241,4 +275,73 @@ public class NodeBeanLoaderTest {
assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(PWD_COMMAND);
}
+
+ @Test
+ public void testGetFlowTrigger() {
+ final FlowTrigger flowTrigger = FlowLoaderUtils.getFlowTriggerFromYamlFile(
+ ExecutionsTestUtil.getFlowFile(TRIGGER_FLOW_YML_TEST_DIR, TRIGGER_FLOW_YML_FILE));
+ validateFlowTrigger(flowTrigger, MAX_WAIT_MINS, CRON_EXPRESSION, 2);
+ validateTriggerDependency(flowTrigger.getDependencies().get(0), TRIGGER_NAME_1, TRIGGER_TYPE,
+ PARAMS_1);
+ validateTriggerDependency(flowTrigger.getDependencies().get(1), TRIGGER_NAME_2, TRIGGER_TYPE,
+ PARAMS_2);
+ }
+
+ private void validateNodeBean(final NodeBean nodeBean, final String name, final String type,
+ final String configKey,
+ final String configValue, final int numNodes, final List<String> dependsOn) {
+ assertThat(nodeBean.getName()).isEqualTo(name);
+ assertThat(nodeBean.getType()).isEqualTo(type);
+ if (configKey != null) {
+ assertThat(nodeBean.getConfig().get(configKey)).isEqualTo(configValue);
+ }
+ if (numNodes != 0) {
+ assertThat(nodeBean.getNodes().size()).isEqualTo(numNodes);
+ }
+ if (dependsOn != null) {
+ assertThat(nodeBean.getDependsOn()).containsOnlyElementsOf(dependsOn);
+ }
+ }
+
+ private void validateAzkabanNode(final AzkabanNode azkabanNode, final String name,
+ final String type, final Props props, final List<String> nodeList, final List<String>
+ dependsOn) {
+ assertThat(azkabanNode.getName()).isEqualTo(name);
+ assertThat(azkabanNode.getType()).isEqualTo(type);
+ assertThat(azkabanNode.getProps()).isEqualTo(props);
+ if (azkabanNode instanceof AzkabanFlow) {
+ assertThat(((AzkabanFlow) azkabanNode).getNodes().keySet()).containsOnlyElementsOf(nodeList);
+ }
+ if (dependsOn != null) {
+ assertThat(azkabanNode.getDependsOn()).containsOnlyElementsOf(dependsOn);
+ }
+ }
+
+ private void validateFlowTriggerBean(final FlowTriggerBean flowTriggerBean, final int
+ maxWaitMins, final Map<String, String> schedule, final int numDependencies) {
+ assertThat(flowTriggerBean.getMaxWaitMins()).isEqualTo(maxWaitMins);
+ assertThat(flowTriggerBean.getSchedule()).isEqualTo(schedule);
+ assertThat(flowTriggerBean.getTriggerDependencies().size()).isEqualTo(numDependencies);
+ }
+
+ private void validateTriggerDependencyBean(final TriggerDependencyBean triggerDependencyBean,
+ final String name, final String type, final Map<String, String> params) {
+ assertThat(triggerDependencyBean.getName()).isEqualTo(name);
+ assertThat(triggerDependencyBean.getType()).isEqualTo(type);
+ assertThat(triggerDependencyBean.getParams()).isEqualTo(params);
+ }
+
+ private void validateFlowTrigger(final FlowTrigger flowTrigger, final long maxWaitMins, final
+ String cronExpression, final int numDependencies) {
+ assertThat(flowTrigger.getMaxWaitDuration()).isEqualTo(Duration.ofMinutes(maxWaitMins));
+ assertThat(flowTrigger.getSchedule().getCronExpression()).isEqualTo(cronExpression);
+ assertThat(flowTrigger.getDependencies().size()).isEqualTo(numDependencies);
+ }
+
+ private void validateTriggerDependency(final FlowTriggerDependency flowTriggerDependency, final
+ String name, final String type, final Map<String, String> params) {
+ assertThat(flowTriggerDependency.getName()).isEqualTo(name);
+ assertThat(flowTriggerDependency.getType()).isEqualTo(type);
+ assertThat(flowTriggerDependency.getProps()).isEqualTo(params);
+ }
}
diff --git a/test/execution-test-data/basicflowyamltest/Archive.zip b/test/execution-test-data/basicflowyamltest/Archive.zip
index 5137202..1f9abe6 100644
Binary files a/test/execution-test-data/basicflowyamltest/Archive.zip and b/test/execution-test-data/basicflowyamltest/Archive.zip differ
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger.flow b/test/execution-test-data/flowtriggeryamltest/flow_trigger.flow
new file mode 100644
index 0000000..671bfdc
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger.flow
@@ -0,0 +1,58 @@
+---
+# Flow trigger
+trigger:
+ maxWaitMins: 5
+ schedule:
+ type: cron
+ value: 0 0 1 ? * *
+
+ triggerDependencies:
+ - name: search-impression # an unique name to identify the dependency
+ type: dali-dataset
+ params:
+ view: search_mp_versioned.search_impression_event_0_0_47
+ delay: 1
+ window: 1
+ unit: daily
+ filter: is_guest=0
+
+ - name: other-name
+ type: dali-dataset
+ params:
+ view: another dataset
+ delay: 1
+ window: 7
+
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/flowtriggeryamltest/flow_trigger.project b/test/execution-test-data/flowtriggeryamltest/flow_trigger.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/flowtriggeryamltest/flow_trigger.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0