Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 7fd177e..2d598ee 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -35,6 +35,10 @@ public class Constants {
// Azkaban Flow Versions
public static final String AZKABAN_FLOW_VERSION_2_0 = "2.0";
+ // Flow 2.0 file suffix
+ public static final String PROJECT_FILE_SUFFIX = ".project";
+ public static final String FLOW_FILE_SUFFIX = ".flow";
+
// Names and paths of various file names to configure Azkaban
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
index d34743e..e2ad0c3 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -18,9 +18,12 @@
package azkaban.project;
import azkaban.utils.Props;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class AzkabanFlow extends AzkabanNode {
@@ -28,23 +31,24 @@ public class AzkabanFlow extends AzkabanNode {
private final Map<String, AzkabanNode> nodes;
private AzkabanFlow(final String name, final Props props,
- final Map<String, AzkabanNode> nodes) {
- super(name, props);
+ final Map<String, AzkabanNode> nodes, final List<String> dependsOn) {
+ super(name, props, dependsOn);
this.nodes = nodes;
}
public Map<String, AzkabanNode> getNodes() {
- return nodes;
+ return this.nodes;
}
- public AzkabanJob getJob(final String name) {
- return (AzkabanJob) nodes.get(name);
+ public AzkabanNode getNode(final String name) {
+ return this.nodes.get(name);
}
public static class AzkabanFlowBuilder {
private String name;
private Props props;
+ private List<String> dependsOn;
private Map<String, AzkabanNode> nodes;
public AzkabanFlowBuilder setName(final String name) {
@@ -57,17 +61,24 @@ public class AzkabanFlow extends AzkabanNode {
return this;
}
+ public AzkabanFlowBuilder setDependsOn(final List<String> dependsOn) {
+ this.dependsOn = dependsOn == null
+ ? Collections.emptyList()
+ : ImmutableList.copyOf(dependsOn);
+ return this;
+ }
+
public AzkabanFlowBuilder setNodes(final Collection<? extends AzkabanNode> azkabanNodes) {
final Map<String, AzkabanNode> tempNodes = new HashMap<>();
for (final AzkabanNode node : azkabanNodes) {
tempNodes.put(node.getName(), node);
}
- nodes = ImmutableMap.copyOf(tempNodes);
+ this.nodes = ImmutableMap.copyOf(tempNodes);
return this;
}
public AzkabanFlow build() {
- return new AzkabanFlow(name, props, nodes);
+ return new AzkabanFlow(this.name, this.props, this.nodes, this.dependsOn);
}
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
index ed59fe0..03f78e2 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
@@ -25,21 +25,15 @@ import java.util.List;
public class AzkabanJob extends AzkabanNode {
private final String type;
- private final List<String> dependsOn;
private AzkabanJob(final String name, final String type, final Props props,
final List<String> dependsOn) {
- super(name, props);
+ super(name, props, dependsOn);
this.type = type;
- this.dependsOn = dependsOn;
}
public String getType() {
- return type;
- }
-
- public List<String> getDependsOn() {
- return dependsOn;
+ return this.type;
}
public static class AzkabanJobBuilder {
@@ -73,7 +67,7 @@ public class AzkabanJob extends AzkabanNode {
}
public AzkabanJob build() {
- return new AzkabanJob(name, type, props, dependsOn);
+ return new AzkabanJob(this.name, this.type, this.props, this.dependsOn);
}
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
index 1b91ec8..d25afb1 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
@@ -20,22 +20,29 @@ package azkaban.project;
import static java.util.Objects.requireNonNull;
import azkaban.utils.Props;
+import java.util.List;
public abstract class AzkabanNode {
protected final String name;
protected final Props props;
+ protected final List<String> dependsOn;
- public AzkabanNode(final String name, final Props props) {
+ public AzkabanNode(final String name, final Props props, final List<String> dependsOn) {
this.name = requireNonNull(name);
this.props = requireNonNull(props);
+ this.dependsOn = dependsOn;
}
public String getName() {
- return name;
+ return this.name;
}
public Props getProps() {
- return props;
+ return this.props;
+ }
+
+ public List<String> getDependsOn() {
+ return this.dependsOn;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 058f8ce..dbb7cce 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -16,6 +16,7 @@
package azkaban.project;
+import azkaban.Constants;
import azkaban.flow.Flow;
import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.project.validator.ValidationReport;
@@ -35,8 +36,6 @@ import org.slf4j.LoggerFactory;
public class DirectoryYamlFlowLoader implements FlowLoader {
private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoader.class);
- private static final String PROJECT_FILE_SUFFIX = ".project";
- private static final String FLOW_FILE_SUFFIX = ".flow";
private final Props props;
private final Set<String> errors = new HashSet<>();
@@ -86,13 +85,12 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
private void convertYamlFiles(final File projectDir) {
// Todo jamiesjc: convert project yaml file. It will contain properties for all flows.
- //covert flow yaml files
- final File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_FILE_SUFFIX));
+ final File[] flowFiles = projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX));
for (final File file : flowFiles) {
- final FlowBeanLoader loader = new FlowBeanLoader();
+ final NodeBeanLoader loader = new NodeBeanLoader();
try {
- final FlowBean flowBean = loader.load(file);
- final AzkabanFlow azkabanFlow = loader.toAzkabanFlow(loader.getFlowName(file), flowBean);
+ final NodeBean nodeBean = loader.load(file);
+ final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
final Flow flow = new Flow(azkabanFlow.getName());
flow.setAzkabanFlow(azkabanFlow);
this.flowMap.put(azkabanFlow.getName(), flow);
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBean.java b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
index b457053..eb62481 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
@@ -17,18 +17,20 @@
package azkaban.project;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
- * The node bean is used by the YAML loader to deserialize DAG nodes
+ * Used by the YAML loader to deserialize DAG nodes in the flow
*/
-public class NodeBean {
+public class NodeBean implements Serializable {
private String name;
private Map<String, String> config;
private List<String> dependsOn;
private String type;
+ private List<NodeBean> nodes;
public String getName() {
return this.name;
@@ -62,8 +64,22 @@ public class NodeBean {
this.type = type;
}
+ public List<NodeBean> getNodes() {
+ return this.nodes;
+ }
+
+ public void setNodes(final List<NodeBean> nodes) {
+ this.nodes = nodes;
+ }
+
@Override
public String toString() {
- return "Node{config=" + this.config + ", dependsOn=" + this.dependsOn + '}';
+ return "NodeBean{" +
+ "name='" + this.name + '\'' +
+ ", config=" + this.config +
+ ", dependsOn=" + this.dependsOn +
+ ", type='" + this.type + '\'' +
+ ", nodes=" + this.nodes +
+ '}';
}
}
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
new file mode 100644
index 0000000..ddc3845
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.test.executions.ExecutionsTestUtil;
+import org.junit.Test;
+
+public class NodeBeanLoaderTest {
+
+ private static final String BASIC_FLOW_YML_TEST_DIR = "basicflowyamltest";
+ private static final String BASIC_FLOW_NAME = "basic_flow";
+ private static final String BASIC_FLOW_YML_FILE = BASIC_FLOW_NAME + ".flow";
+ private static final String EMBEDDED_FLOW_YML_TEST_DIR = "embeddedflowyamltest";
+ private static final String EMBEDDED_FLOW_NAME = "embedded_flow";
+ private static final String EMBEDDED_FLOW_YML_FILE = EMBEDDED_FLOW_NAME + ".flow";
+ private static final String FLOW_CONFIG_KEY = "flow-level-parameter";
+ private static final String FLOW_CONFIG_VALUE = "value";
+ private static final String SHELL_END = "shell_end";
+ private static final String SHELL_ECHO = "shell_echo";
+ private static final String SHELL_BASH = "shell_bash";
+ private static final String SHELL_PWD = "shell_pwd";
+ private static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
+ private static final String EMBEDDED_FLOW1 = "embedded_flow1";
+ private static final String EMBEDDED_FLOW2 = "embedded_flow2";
+ private static final String TYPE_NOOP = "noop";
+ private static final String TYPE_COMMAND = "command";
+ private static final String TYPE_FLOW = "flow";
+
+ @Test
+ public void testLoadNodeBeanForBasicFlow() throws Exception {
+
+ final NodeBeanLoader loader = new NodeBeanLoader();
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
+
+ assertThat(nodeBean.getName()).isEqualTo(BASIC_FLOW_NAME);
+ assertThat(nodeBean.getType()).isEqualTo(TYPE_FLOW);
+ assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+ assertThat(nodeBean.getNodes().size()).isEqualTo(4);
+
+ final NodeBean node0 = nodeBean.getNodes().get(0);
+ assertThat(node0.getName()).isEqualTo(SHELL_END);
+ assertThat(node0.getType()).isEqualTo(TYPE_NOOP);
+ assertThat(node0.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
+
+ final NodeBean node1 = nodeBean.getNodes().get(1);
+ assertThat(node1.getName()).isEqualTo(SHELL_ECHO);
+ assertThat(node1.getType()).isEqualTo(TYPE_COMMAND);
+ assertThat(node1.getConfig().get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+ }
+
+ @Test
+ public void testLoadNodeBeanForEmbeddedFlow() throws Exception {
+
+ final NodeBeanLoader loader = new NodeBeanLoader();
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
+
+ assertThat(nodeBean.getName()).isEqualTo(EMBEDDED_FLOW_NAME);
+ assertThat(nodeBean.getType()).isEqualTo(TYPE_FLOW);
+ assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+ assertThat(nodeBean.getNodes().size()).isEqualTo(4);
+
+ final NodeBean node0 = nodeBean.getNodes().get(0);
+ assertThat(node0.getName()).isEqualTo(SHELL_END);
+ assertThat(node0.getType()).isEqualTo(TYPE_NOOP);
+ assertThat(node0.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1);
+
+ final NodeBean node1 = nodeBean.getNodes().get(1);
+ assertThat(node1.getName()).isEqualTo(SHELL_PWD);
+
+ final NodeBean node2 = nodeBean.getNodes().get(2);
+ assertThat(node2.getName()).isEqualTo(SHELL_ECHO);
+
+ final NodeBean node3 = nodeBean.getNodes().get(3);
+ assertThat(node3.getName()).isEqualTo(EMBEDDED_FLOW1);
+ assertThat(node3.getType()).isEqualTo(TYPE_FLOW);
+ assertThat(node3.getNodes().size()).isEqualTo(4);
+
+ // Verify nodes in embedded_flow1 are loaded correctly.
+ final NodeBean node3_0 = node3.getNodes().get(0);
+ assertThat(node3_0.getName()).isEqualTo(SHELL_END);
+
+ final NodeBean node3_1 = node3.getNodes().get(1);
+ assertThat(node3_1.getName()).isEqualTo(SHELL_ECHO);
+
+ final NodeBean node3_2 = node3.getNodes().get(2);
+ assertThat(node3_2.getName()).isEqualTo(EMBEDDED_FLOW2);
+ assertThat(node3_2.getType()).isEqualTo(TYPE_FLOW);
+ assertThat(node3_2.getDependsOn()).contains(SHELL_BASH);
+ assertThat(node3_2.getNodes().size()).isEqualTo(2);
+
+ final NodeBean node3_3 = node3.getNodes().get(3);
+ assertThat(node3_3.getName()).isEqualTo(SHELL_BASH);
+
+ // Verify nodes in embedded_flow2 are loaded correctly.
+ final NodeBean node3_2_0 = node3_2.getNodes().get(0);
+ assertThat(node3_2_0.getName()).isEqualTo(SHELL_END);
+
+ final NodeBean node3_2_1 = node3_2.getNodes().get(1);
+ assertThat(node3_2_1.getName()).isEqualTo(SHELL_PWD);
+
+ }
+
+ @Test
+ public void testToBasicAzkabanFlow() throws Exception {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
+ final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
+
+ assertThat(flow.getName()).isEqualTo(BASIC_FLOW_NAME);
+ assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+ assertThat(flow.getNodes().size()).isEqualTo(4);
+
+ final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
+ assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
+ assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
+ assertThat(shellEnd.getProps().size()).isEqualTo(0);
+ assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
+
+ final AzkabanJob shellEcho = (AzkabanJob) flow.getNode(SHELL_ECHO);
+ assertThat(shellEcho.getName()).isEqualTo(SHELL_ECHO);
+ assertThat(shellEcho.getType()).isEqualTo(TYPE_COMMAND);
+ assertThat(shellEcho.getProps().size()).isEqualTo(1);
+ assertThat(shellEcho.getProps().get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
+ }
+
+ @Test
+ public void testToEmbeddedAzkabanFlow() throws Exception {
+ final NodeBeanLoader loader = new NodeBeanLoader();
+ final NodeBean nodeBean = loader.load(ExecutionsTestUtil.getFlowFile(
+ EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
+ final AzkabanFlow flow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
+
+ assertThat(flow.getName()).isEqualTo(EMBEDDED_FLOW_NAME);
+ assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+ assertThat(flow.getNodes().size()).isEqualTo(4);
+
+ final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
+ assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
+ assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
+ assertThat(shellEnd.getProps().size()).isEqualTo(0);
+ assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1);
+
+ final AzkabanFlow embeddedFlow1 = (AzkabanFlow) flow.getNode(EMBEDDED_FLOW1);
+ assertThat(embeddedFlow1.getName()).isEqualTo(EMBEDDED_FLOW1);
+ assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+ assertThat(embeddedFlow1.getNodes().size()).isEqualTo(4);
+ assertThat(embeddedFlow1.getNodes().containsKey(EMBEDDED_FLOW2)).isTrue();
+
+ final AzkabanFlow embeddedFlow2 = (AzkabanFlow) embeddedFlow1.getNode(EMBEDDED_FLOW2);
+ assertThat(embeddedFlow2.getName()).isEqualTo(EMBEDDED_FLOW2);
+ assertThat(flow.getProps().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
+ assertThat(embeddedFlow2.getDependsOn()).contains(SHELL_BASH);
+ assertThat(embeddedFlow2.getNodes().size()).isEqualTo(2);
+ assertThat(embeddedFlow2.getNodes().containsKey(SHELL_END)).isTrue();
+ assertThat(embeddedFlow2.getNodes().containsKey(SHELL_PWD)).isTrue();
+
+ }
+
+ @Test
+ public void testGetFlowName() throws Exception {
+ assertThat(new NodeBeanLoader().getFlowName(ExecutionsTestUtil.getFlowFile(
+ BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE))).isEqualTo(BASIC_FLOW_NAME);
+ }
+}
diff --git a/azkaban-common/src/test/resources/sample_flow.flow b/azkaban-common/src/test/resources/sample_flow.flow
index 287f495..fa68e7d 100644
--- a/azkaban-common/src/test/resources/sample_flow.flow
+++ b/azkaban-common/src/test/resources/sample_flow.flow
@@ -1,3 +1,6 @@
+---
+name: sample_flow
+type: flow
# All flow level properties here
config:
flow-level-parameter: value
diff --git a/test/execution-test-data/basicflowyamltest/basic_flow.flow b/test/execution-test-data/basicflowyamltest/basic_flow.flow
index 287f495..2b9caba 100644
--- a/test/execution-test-data/basicflowyamltest/basic_flow.flow
+++ b/test/execution-test-data/basicflowyamltest/basic_flow.flow
@@ -1,3 +1,6 @@
+---
+name: basic_flow
+type: flow
# All flow level properties here
config:
flow-level-parameter: value
diff --git a/test/execution-test-data/embeddedflowyamltest/embedded_flow.flow b/test/execution-test-data/embeddedflowyamltest/embedded_flow.flow
new file mode 100644
index 0000000..7d8e54c
--- /dev/null
+++ b/test/execution-test-data/embeddedflowyamltest/embedded_flow.flow
@@ -0,0 +1,63 @@
+---
+name: embedded_flow
+type: flow
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - embedded_flow1
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: embedded_flow1
+ type: flow
+ config:
+ flow-level-parameter: value
+
+ nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_echo
+ - embedded_flow2
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text from embedded_flow1."
+
+ - name: embedded_flow2
+ type: flow
+ config:
+ flow-level-parameter: value
+ dependsOn:
+ - shell_bash
+
+ nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_pwd
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ type: command
+ config:
+ command: bash ./sample_script.sh
\ No newline at end of file
diff --git a/test/execution-test-data/multipleflowyamltest/basic_flow.flow b/test/execution-test-data/multipleflowyamltest/basic_flow.flow
index 287f495..2b9caba 100644
--- a/test/execution-test-data/multipleflowyamltest/basic_flow.flow
+++ b/test/execution-test-data/multipleflowyamltest/basic_flow.flow
@@ -1,3 +1,6 @@
+---
+name: basic_flow
+type: flow
# All flow level properties here
config:
flow-level-parameter: value
diff --git a/test/execution-test-data/multipleflowyamltest/basic_flow2.flow b/test/execution-test-data/multipleflowyamltest/basic_flow2.flow
index e87b712..01246c7 100644
--- a/test/execution-test-data/multipleflowyamltest/basic_flow2.flow
+++ b/test/execution-test-data/multipleflowyamltest/basic_flow2.flow
@@ -1,3 +1,6 @@
+---
+name: basic_flow2
+type: flow
# All flow level properties here
config:
flow-level-parameter: value