Details
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
new file mode 100644
index 0000000..d34743e
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import azkaban.utils.Props;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AzkabanFlow extends AzkabanNode {
+
+ private final Map<String, AzkabanNode> nodes;
+
+ private AzkabanFlow(final String name, final Props props,
+ final Map<String, AzkabanNode> nodes) {
+ super(name, props);
+ this.nodes = nodes;
+ }
+
+ public Map<String, AzkabanNode> getNodes() {
+ return nodes;
+ }
+
+ public AzkabanJob getJob(final String name) {
+ return (AzkabanJob) nodes.get(name);
+ }
+
+ public static class AzkabanFlowBuilder {
+
+ private String name;
+ private Props props;
+ private Map<String, AzkabanNode> nodes;
+
+ public AzkabanFlowBuilder setName(final String name) {
+ this.name = name;
+ return this;
+ }
+
+ public AzkabanFlowBuilder setProps(final Props props) {
+ this.props = props;
+ return this;
+ }
+
+ public AzkabanFlowBuilder setNodes(final Collection<? extends AzkabanNode> azkabanNodes) {
+ final Map<String, AzkabanNode> tempNodes = new HashMap<>();
+ for (final AzkabanNode node : azkabanNodes) {
+ tempNodes.put(node.getName(), node);
+ }
+ nodes = ImmutableMap.copyOf(tempNodes);
+ return this;
+ }
+
+ public AzkabanFlow build() {
+ return new AzkabanFlow(name, props, nodes);
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
new file mode 100644
index 0000000..ed59fe0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import azkaban.utils.Props;
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+
+public class AzkabanJob extends AzkabanNode {
+
+ private final String type;
+ private final List<String> dependsOn;
+
+ private AzkabanJob(final String name, final String type, final Props props,
+ final List<String> dependsOn) {
+ super(name, props);
+ this.type = type;
+ this.dependsOn = dependsOn;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public List<String> getDependsOn() {
+ return dependsOn;
+ }
+
+ public static class AzkabanJobBuilder {
+
+ private String name;
+ private String type;
+ private Props props;
+ private List<String> dependsOn;
+
+ public AzkabanJobBuilder setName(final String name) {
+ this.name = name;
+ return this;
+ }
+
+ public AzkabanJobBuilder setType(final String type) {
+ this.type = type;
+ return this;
+ }
+
+ public AzkabanJobBuilder setProps(final Props props) {
+ this.props = props;
+ return this;
+ }
+
+ public AzkabanJobBuilder setDependsOn(final List<String> dependsOn) {
+ // A node may or may not have dependencies.
+ this.dependsOn = dependsOn == null
+ ? Collections.emptyList()
+ : ImmutableList.copyOf(dependsOn);
+ return this;
+ }
+
+ public AzkabanJob build() {
+ return new AzkabanJob(name, type, props, dependsOn);
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
new file mode 100644
index 0000000..1b91ec8
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static java.util.Objects.requireNonNull;
+
+import azkaban.utils.Props;
+
+public abstract class AzkabanNode {
+
+ protected final String name;
+ protected final Props props;
+
+ public AzkabanNode(final String name, final Props props) {
+ this.name = requireNonNull(name);
+ this.props = requireNonNull(props);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Props getProps() {
+ return props;
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowBean.java b/azkaban-common/src/main/java/azkaban/project/FlowBean.java
new file mode 100644
index 0000000..de4780c
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowBean.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the top level class which is used by the YAML loader to deserialize a flow.yml file.
+ */
+public class FlowBean implements Serializable {
+
+ private Map<String, String> config;
+ private List<NodeBean> nodes;
+
+ public Map<String, String> getConfig() {
+ return this.config;
+ }
+
+ public void setConfig(final Map<String, String> config) {
+ this.config = config;
+ }
+
+ public List<NodeBean> getNodes() {
+ return this.nodes;
+ }
+
+ public void setNodes(final List<NodeBean> nodes) {
+ this.nodes = nodes;
+ }
+
+ @Override
+ public String toString() {
+ return "Flow{nodes=" + this.nodes + ", config=" + this.config + '}';
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java
new file mode 100644
index 0000000..34f401a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import azkaban.utils.Props;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.yaml.snakeyaml.Yaml;
+
+public class FlowBeanLoader {
+
+ public FlowBean load(final File flowFile) throws FileNotFoundException {
+ checkArgument(flowFile.exists());
+ checkArgument(flowFile.getName().endsWith(".yml"));
+
+ return new Yaml().loadAs(new FileInputStream(flowFile), FlowBean.class);
+ }
+
+ public boolean validate(final FlowBean flowBean) {
+ final Set<String> nodeNames = new HashSet<>();
+ for (final NodeBean n : flowBean.getNodes()) {
+ if (!nodeNames.add(n.getName())) {
+ // Duplicate jobs
+ return false;
+ }
+ }
+
+ for (final NodeBean n : flowBean.getNodes()) {
+ if (!nodeNames.containsAll(n.getDependsOn())) {
+ // Undefined reference to dependent job
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public AzkabanFlow toAzkabanFlow(final String flowName, final FlowBean flowBean) {
+ final AzkabanFlow flow = new AzkabanFlow.AzkabanFlowBuilder()
+ .setName(flowName)
+ .setProps(new Props(null, flowBean.getConfig()))
+ .setNodes(
+ flowBean.getNodes().stream().map(this::toAzkabanNode).collect(Collectors.toList()))
+ .build();
+ return flow;
+ }
+
+ private AzkabanNode toAzkabanNode(final NodeBean nodeBean) {
+ // Note: For now, all DAG nodes are assumed to be Jobs. The AzkabanNode generalize is for
+ // future so that flows can refer to flows within it.
+
+ return new AzkabanJob.AzkabanJobBuilder()
+ .setName(nodeBean.getName())
+ .setProps(new Props(null, nodeBean.getConfig()))
+ .setType(nodeBean.getType())
+ .setDependsOn(nodeBean.getDependsOn())
+ .build();
+ }
+
+ public String getFlowName(final File flowFile) {
+ checkArgument(flowFile.exists());
+ checkArgument(flowFile.getName().endsWith(".yml"));
+
+ return Files.getNameWithoutExtension(flowFile.getName());
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBean.java b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
new file mode 100644
index 0000000..b457053
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The node bean is used by the YAML loader to deserialize DAG nodes
+ */
+public class NodeBean {
+
+ private String name;
+ private Map<String, String> config;
+ private List<String> dependsOn;
+ private String type;
+
+ public String getName() {
+ return this.name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ public Map<String, String> getConfig() {
+ return this.config;
+ }
+
+ public void setConfig(final Map<String, String> config) {
+ this.config = config;
+ }
+
+ public List<String> getDependsOn() {
+ return this.dependsOn;
+ }
+
+ public void setDependsOn(final List<String> dependsOn) {
+ this.dependsOn = dependsOn;
+ }
+
+ public String getType() {
+ return this.type;
+ }
+
+ public void setType(final String type) {
+ this.type = type;
+ }
+
+ @Override
+ public String toString() {
+ return "Node{config=" + this.config + ", dependsOn=" + this.dependsOn + '}';
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java
new file mode 100644
index 0000000..cb51fcc
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import org.junit.Test;
+
+public class FlowBeanLoaderTest {
+
+ public static final String TEST_FLOW_NAME = "sample_flow";
+ public static final String TEST_FLOW_YML_FILENAME = TEST_FLOW_NAME + ".yml";
+ public static final String SHELL_END = "shell_end";
+ public static final String SHELL_ECHO = "shell_echo";
+ public static final String SHELL_BASH = "shell_bash";
+ public static final String SHELL_PWD = "shell_pwd";
+ public static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
+
+ final File TEST_FLOW_YML_FILE =
+ new File(getClass().getClassLoader().getResource(TEST_FLOW_YML_FILENAME).getFile());
+
+ @Test
+ public void testLoad() throws Exception {
+
+ final FlowBeanLoader loader = new FlowBeanLoader();
+ final FlowBean flowBean = loader.load(TEST_FLOW_YML_FILE);
+
+ assertThat(flowBean.getConfig().get("flow-level-parameter")).isEqualTo("value");
+ assertThat(flowBean.getNodes().size()).isEqualTo(4);
+
+ final NodeBean node0 = flowBean.getNodes().get(0);
+ assertThat(node0.getName()).isEqualTo(SHELL_END);
+ assertThat(node0.getType()).isEqualTo("noop");
+ assertThat(node0.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
+
+ final NodeBean node1 = flowBean.getNodes().get(1);
+ assertThat(node1.getName()).isEqualTo(SHELL_ECHO);
+ assertThat(node1.getConfig().get("command")).isEqualTo(ECHO_COMMAND);
+ }
+
+ @Test
+ public void testToAzkabanFlow() throws Exception {
+ final FlowBeanLoader loader = new FlowBeanLoader();
+ final FlowBean flowBean = loader.load(TEST_FLOW_YML_FILE);
+ final AzkabanFlow flow = loader
+ .toAzkabanFlow(loader.getFlowName(TEST_FLOW_YML_FILE), flowBean);
+
+ assertThat(flow.getName()).isEqualTo(TEST_FLOW_NAME);
+ assertThat(flow.getProps().get("flow-level-parameter")).isEqualTo("value");
+ assertThat(flow.getNodes().size()).isEqualTo(4);
+
+ final AzkabanJob shellEnd = flow.getJob(SHELL_END);
+ assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
+ assertThat(shellEnd.getType()).isEqualTo("noop");
+ assertThat(shellEnd.getProps().size()).isEqualTo(0);
+ assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
+
+ final AzkabanJob shellEcho = flow.getJob(SHELL_ECHO);
+ assertThat(shellEcho.getName()).isEqualTo(SHELL_ECHO);
+ assertThat(shellEcho.getType()).isEqualTo("command");
+ assertThat(shellEcho.getProps().size()).isEqualTo(1);
+ assertThat(shellEcho.getProps().get("command")).isEqualTo(ECHO_COMMAND);
+ }
+
+ @Test
+ public void testGetFlowName() throws Exception {
+ assertThat(new FlowBeanLoader().getFlowName(TEST_FLOW_YML_FILE)).isEqualTo(TEST_FLOW_NAME);
+ }
+}
diff --git a/azkaban-common/src/test/resources/sample_flow.yml b/azkaban-common/src/test/resources/sample_flow.yml
new file mode 100644
index 0000000..689f9fe
--- /dev/null
+++ b/azkaban-common/src/test/resources/sample_flow.yml
@@ -0,0 +1,41 @@
+---
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - shell_bash
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ # Describe the type of the job
+ type: command
+ config:
+ command: bash ./sample_script.sh
diff --git a/azkaban-common/src/test/resources/sample_flow20_01/sample_flow20.yml b/azkaban-common/src/test/resources/sample_flow20_01/sample_flow20.yml
new file mode 100644
index 0000000..689f9fe
--- /dev/null
+++ b/azkaban-common/src/test/resources/sample_flow20_01/sample_flow20.yml
@@ -0,0 +1,41 @@
+---
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - shell_bash
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ # Describe the type of the job
+ type: command
+ config:
+ command: bash ./sample_script.sh