azkaban-aplcache

Flow 2.0 Stubs. Reading YAML flow files (#1275) This patch

8/31/2017 7:34:05 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
new file mode 100644
index 0000000..d34743e
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import azkaban.utils.Props;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AzkabanFlow extends AzkabanNode {
+
+  private final Map<String, AzkabanNode> nodes;
+
+  private AzkabanFlow(final String name, final Props props,
+      final Map<String, AzkabanNode> nodes) {
+    super(name, props);
+    this.nodes = nodes;
+  }
+
+  public Map<String, AzkabanNode> getNodes() {
+    return nodes;
+  }
+
+  public AzkabanJob getJob(final String name) {
+    return (AzkabanJob) nodes.get(name);
+  }
+
+  public static class AzkabanFlowBuilder {
+
+    private String name;
+    private Props props;
+    private Map<String, AzkabanNode> nodes;
+
+    public AzkabanFlowBuilder setName(final String name) {
+      this.name = name;
+      return this;
+    }
+
+    public AzkabanFlowBuilder setProps(final Props props) {
+      this.props = props;
+      return this;
+    }
+
+    public AzkabanFlowBuilder setNodes(final Collection<? extends AzkabanNode> azkabanNodes) {
+      final Map<String, AzkabanNode> tempNodes = new HashMap<>();
+      for (final AzkabanNode node : azkabanNodes) {
+        tempNodes.put(node.getName(), node);
+      }
+      nodes = ImmutableMap.copyOf(tempNodes);
+      return this;
+    }
+
+    public AzkabanFlow build() {
+      return new AzkabanFlow(name, props, nodes);
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
new file mode 100644
index 0000000..ed59fe0
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import azkaban.utils.Props;
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+
+public class AzkabanJob extends AzkabanNode {
+
+  private final String type;
+  private final List<String> dependsOn;
+
+  private AzkabanJob(final String name, final String type, final Props props,
+      final List<String> dependsOn) {
+    super(name, props);
+    this.type = type;
+    this.dependsOn = dependsOn;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public List<String> getDependsOn() {
+    return dependsOn;
+  }
+
+  public static class AzkabanJobBuilder {
+
+    private String name;
+    private String type;
+    private Props props;
+    private List<String> dependsOn;
+
+    public AzkabanJobBuilder setName(final String name) {
+      this.name = name;
+      return this;
+    }
+
+    public AzkabanJobBuilder setType(final String type) {
+      this.type = type;
+      return this;
+    }
+
+    public AzkabanJobBuilder setProps(final Props props) {
+      this.props = props;
+      return this;
+    }
+
+    public AzkabanJobBuilder setDependsOn(final List<String> dependsOn) {
+      // A node may or may not have dependencies.
+      this.dependsOn = dependsOn == null
+          ? Collections.emptyList()
+          : ImmutableList.copyOf(dependsOn);
+      return this;
+    }
+
+    public AzkabanJob build() {
+      return new AzkabanJob(name, type, props, dependsOn);
+    }
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
new file mode 100644
index 0000000..1b91ec8
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static java.util.Objects.requireNonNull;
+
+import azkaban.utils.Props;
+
+public abstract class AzkabanNode {
+
+  protected final String name;
+  protected final Props props;
+
+  public AzkabanNode(final String name, final Props props) {
+    this.name = requireNonNull(name);
+    this.props = requireNonNull(props);
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public Props getProps() {
+    return props;
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowBean.java b/azkaban-common/src/main/java/azkaban/project/FlowBean.java
new file mode 100644
index 0000000..de4780c
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowBean.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the top level class which is used by the YAML loader to deserialize a flow.yml file.
+ */
+public class FlowBean implements Serializable {
+
+  private Map<String, String> config;
+  private List<NodeBean> nodes;
+
+  public Map<String, String> getConfig() {
+    return this.config;
+  }
+
+  public void setConfig(final Map<String, String> config) {
+    this.config = config;
+  }
+
+  public List<NodeBean> getNodes() {
+    return this.nodes;
+  }
+
+  public void setNodes(final List<NodeBean> nodes) {
+    this.nodes = nodes;
+  }
+
+  @Override
+  public String toString() {
+    return "Flow{nodes=" + this.nodes + ", config=" + this.config + '}';
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java
new file mode 100644
index 0000000..34f401a
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import azkaban.utils.Props;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.yaml.snakeyaml.Yaml;
+
+public class FlowBeanLoader {
+
+  public FlowBean load(final File flowFile) throws FileNotFoundException {
+    checkArgument(flowFile.exists());
+    checkArgument(flowFile.getName().endsWith(".yml"));
+
+    return new Yaml().loadAs(new FileInputStream(flowFile), FlowBean.class);
+  }
+
+  public boolean validate(final FlowBean flowBean) {
+    final Set<String> nodeNames = new HashSet<>();
+    for (final NodeBean n : flowBean.getNodes()) {
+      if (!nodeNames.add(n.getName())) {
+        // Duplicate jobs
+        return false;
+      }
+    }
+
+    for (final NodeBean n : flowBean.getNodes()) {
+      if (!nodeNames.containsAll(n.getDependsOn())) {
+        // Undefined reference to dependent job
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  public AzkabanFlow toAzkabanFlow(final String flowName, final FlowBean flowBean) {
+    final AzkabanFlow flow = new AzkabanFlow.AzkabanFlowBuilder()
+        .setName(flowName)
+        .setProps(new Props(null, flowBean.getConfig()))
+        .setNodes(
+            flowBean.getNodes().stream().map(this::toAzkabanNode).collect(Collectors.toList()))
+        .build();
+    return flow;
+  }
+
+  private AzkabanNode toAzkabanNode(final NodeBean nodeBean) {
+    // Note: For now, all DAG nodes are assumed to be Jobs. The AzkabanNode generalize is for
+    // future so that flows can refer to flows within it.
+
+    return new AzkabanJob.AzkabanJobBuilder()
+        .setName(nodeBean.getName())
+        .setProps(new Props(null, nodeBean.getConfig()))
+        .setType(nodeBean.getType())
+        .setDependsOn(nodeBean.getDependsOn())
+        .build();
+  }
+
+  public String getFlowName(final File flowFile) {
+    checkArgument(flowFile.exists());
+    checkArgument(flowFile.getName().endsWith(".yml"));
+
+    return Files.getNameWithoutExtension(flowFile.getName());
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBean.java b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
new file mode 100644
index 0000000..b457053
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The node bean is used by the YAML loader to deserialize DAG nodes
+ */
+public class NodeBean {
+
+  private String name;
+  private Map<String, String> config;
+  private List<String> dependsOn;
+  private String type;
+
+  public String getName() {
+    return this.name;
+  }
+
+  public void setName(final String name) {
+    this.name = name;
+  }
+
+  public Map<String, String> getConfig() {
+    return this.config;
+  }
+
+  public void setConfig(final Map<String, String> config) {
+    this.config = config;
+  }
+
+  public List<String> getDependsOn() {
+    return this.dependsOn;
+  }
+
+  public void setDependsOn(final List<String> dependsOn) {
+    this.dependsOn = dependsOn;
+  }
+
+  public String getType() {
+    return this.type;
+  }
+
+  public void setType(final String type) {
+    this.type = type;
+  }
+
+  @Override
+  public String toString() {
+    return "Node{config=" + this.config + ", dependsOn=" + this.dependsOn + '}';
+  }
+}
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java
new file mode 100644
index 0000000..cb51fcc
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package azkaban.project;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import org.junit.Test;
+
+public class FlowBeanLoaderTest {
+
+  public static final String TEST_FLOW_NAME = "sample_flow";
+  public static final String TEST_FLOW_YML_FILENAME = TEST_FLOW_NAME + ".yml";
+  public static final String SHELL_END = "shell_end";
+  public static final String SHELL_ECHO = "shell_echo";
+  public static final String SHELL_BASH = "shell_bash";
+  public static final String SHELL_PWD = "shell_pwd";
+  public static final String ECHO_COMMAND = "echo \"This is an echoed text.\"";
+
+  final File TEST_FLOW_YML_FILE =
+      new File(getClass().getClassLoader().getResource(TEST_FLOW_YML_FILENAME).getFile());
+
+  @Test
+  public void testLoad() throws Exception {
+
+    final FlowBeanLoader loader = new FlowBeanLoader();
+    final FlowBean flowBean = loader.load(TEST_FLOW_YML_FILE);
+
+    assertThat(flowBean.getConfig().get("flow-level-parameter")).isEqualTo("value");
+    assertThat(flowBean.getNodes().size()).isEqualTo(4);
+
+    final NodeBean node0 = flowBean.getNodes().get(0);
+    assertThat(node0.getName()).isEqualTo(SHELL_END);
+    assertThat(node0.getType()).isEqualTo("noop");
+    assertThat(node0.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
+
+    final NodeBean node1 = flowBean.getNodes().get(1);
+    assertThat(node1.getName()).isEqualTo(SHELL_ECHO);
+    assertThat(node1.getConfig().get("command")).isEqualTo(ECHO_COMMAND);
+  }
+
+  @Test
+  public void testToAzkabanFlow() throws Exception {
+    final FlowBeanLoader loader = new FlowBeanLoader();
+    final FlowBean flowBean = loader.load(TEST_FLOW_YML_FILE);
+    final AzkabanFlow flow = loader
+        .toAzkabanFlow(loader.getFlowName(TEST_FLOW_YML_FILE), flowBean);
+
+    assertThat(flow.getName()).isEqualTo(TEST_FLOW_NAME);
+    assertThat(flow.getProps().get("flow-level-parameter")).isEqualTo("value");
+    assertThat(flow.getNodes().size()).isEqualTo(4);
+
+    final AzkabanJob shellEnd = flow.getJob(SHELL_END);
+    assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
+    assertThat(shellEnd.getType()).isEqualTo("noop");
+    assertThat(shellEnd.getProps().size()).isEqualTo(0);
+    assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
+
+    final AzkabanJob shellEcho = flow.getJob(SHELL_ECHO);
+    assertThat(shellEcho.getName()).isEqualTo(SHELL_ECHO);
+    assertThat(shellEcho.getType()).isEqualTo("command");
+    assertThat(shellEcho.getProps().size()).isEqualTo(1);
+    assertThat(shellEcho.getProps().get("command")).isEqualTo(ECHO_COMMAND);
+  }
+
+  @Test
+  public void testGetFlowName() throws Exception {
+    assertThat(new FlowBeanLoader().getFlowName(TEST_FLOW_YML_FILE)).isEqualTo(TEST_FLOW_NAME);
+  }
+}
diff --git a/azkaban-common/src/test/resources/sample_flow.yml b/azkaban-common/src/test/resources/sample_flow.yml
new file mode 100644
index 0000000..689f9fe
--- /dev/null
+++ b/azkaban-common/src/test/resources/sample_flow.yml
@@ -0,0 +1,41 @@
+---
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - shell_bash
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_bash
+    # Describe the type of the job
+    type: command
+    config:
+      command: bash ./sample_script.sh
diff --git a/azkaban-common/src/test/resources/sample_flow20_01/sample_flow20.yml b/azkaban-common/src/test/resources/sample_flow20_01/sample_flow20.yml
new file mode 100644
index 0000000..689f9fe
--- /dev/null
+++ b/azkaban-common/src/test/resources/sample_flow20_01/sample_flow20.yml
@@ -0,0 +1,41 @@
+---
+# All flow level properties here
+config:
+  flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+  # Job definition
+  # The job definition is like a YAMLified version of properties file
+  # with one major difference. All custom properties are now clubbed together
+  # in a config section in the definition.
+  # The first line describes the name of the job
+  - name: shell_end
+    # Describe the type of the job
+    type: noop
+
+    # List the dependencies of the job
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - shell_bash
+
+  - name: shell_echo
+    # Describe the type of the job
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    # Describe the type of the job
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_bash
+    # Describe the type of the job
+    type: command
+    config:
+      command: bash ./sample_script.sh