Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 2d598ee..b3f4851 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -33,12 +33,15 @@ package azkaban;
public class Constants {
// Azkaban Flow Versions
- public static final String AZKABAN_FLOW_VERSION_2_0 = "2.0";
+ public static final double AZKABAN_FLOW_VERSION_2_0 = 2.0;
// Flow 2.0 file suffix
public static final String PROJECT_FILE_SUFFIX = ".project";
public static final String FLOW_FILE_SUFFIX = ".flow";
+ // Flow 2.0 node type
+ public static final String FLOW_NODE_TYPE = "flow";
+
// Names and paths of various file names to configure Azkaban
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
@@ -65,6 +68,9 @@ public class Constants {
public static class ConfigurationKeys {
+ // Configures Azkaban Flow Version in project YAML file
+ public static final String AZKABAN_FLOW_VERSION = "azkaban-flow-version";
+
// These properties are configurable through azkaban.properties
public static final String AZKABAN_PID_FILENAME = "azkaban.pid.filename";
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index 991a954..0594717 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -17,7 +17,6 @@
package azkaban.flow;
import azkaban.executor.mail.DefaultMailCreator;
-import azkaban.project.AzkabanFlow;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -49,8 +48,6 @@ public class Flow {
private boolean isLayedOut = false;
- private AzkabanFlow azkabanFlow;
-
public Flow(final String id) {
this.id = id;
}
@@ -411,11 +408,4 @@ public class Flow {
this.projectId = projectId;
}
- public AzkabanFlow getAzkabanFlow() {
- return this.azkabanFlow;
- }
-
- public void setAzkabanFlow(final AzkabanFlow azkabanFlow) {
- this.azkabanFlow = azkabanFlow;
- }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
index 2dbbd5b..997719e 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanFlow.java
@@ -17,6 +17,7 @@
package azkaban.project;
+import azkaban.Constants;
import azkaban.utils.Props;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -36,7 +37,7 @@ public class AzkabanFlow extends AzkabanNode {
private AzkabanFlow(final String name, final Props props,
final Map<String, AzkabanNode> nodes, final List<String> dependsOn) {
- super(name, props, dependsOn);
+ super(name, Constants.FLOW_NODE_TYPE, props, dependsOn);
this.nodes = nodes;
}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
index 64d109b..9a60740 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanJob.java
@@ -28,16 +28,9 @@ import java.util.List;
*/
public class AzkabanJob extends AzkabanNode {
- private final String type;
-
private AzkabanJob(final String name, final String type, final Props props,
final List<String> dependsOn) {
- super(name, props, dependsOn);
- this.type = type;
- }
-
- public String getType() {
- return this.type;
+ super(name, type, props, dependsOn);
}
public static class AzkabanJobBuilder {
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
index 3e566dd..079cf43 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanNode.java
@@ -28,11 +28,14 @@ import java.util.List;
public abstract class AzkabanNode {
protected final String name;
+ protected final String type;
protected final Props props;
protected final List<String> dependsOn;
- public AzkabanNode(final String name, final Props props, final List<String> dependsOn) {
+ public AzkabanNode(final String name, final String type, final Props props, final List<String>
+ dependsOn) {
this.name = requireNonNull(name);
+ this.type = requireNonNull(type);
this.props = requireNonNull(props);
this.dependsOn = dependsOn;
}
@@ -41,6 +44,10 @@ public abstract class AzkabanNode {
return this.name;
}
+ public String getType() {
+ return this.type;
+ }
+
public Props getProps() {
return this.props;
}
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index 110fe80..3a7aec8 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -184,35 +184,39 @@ class AzkabanProjectLoader {
private void persistProject(final Project project, final FlowLoader loader, final File archive,
final User uploader) throws ProjectManagerException {
synchronized (project) {
+ final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
+ final Map<String, Flow> flows = loader.getFlowMap();
+ for (final Flow flow : flows.values()) {
+ flow.setProjectId(project.getId());
+ flow.setVersion(newVersion);
+ }
+
+ this.storageManager.uploadProject(project, newVersion, archive, uploader);
+
+ log.info("Uploading flow to db for project " + archive.getName());
+ this.projectLoader.uploadFlows(project, newVersion, flows.values());
+ log.info("Changing project versions for project " + archive.getName());
+ this.projectLoader.changeProjectVersion(project, newVersion,
+ uploader.getUserId());
+ project.setFlows(flows);
+
if (loader instanceof DirectoryFlowLoader) {
final DirectoryFlowLoader directoryFlowLoader = (DirectoryFlowLoader) loader;
- final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
- final Map<String, Flow> flows = directoryFlowLoader.getFlowMap();
- for (final Flow flow : flows.values()) {
- flow.setProjectId(project.getId());
- flow.setVersion(newVersion);
- }
-
- this.storageManager.uploadProject(project, newVersion, archive, uploader);
-
- log.info("Uploading flow to db " + archive.getName());
- this.projectLoader.uploadFlows(project, newVersion, flows.values());
- log.info("Changing project versions " + archive.getName());
- this.projectLoader.changeProjectVersion(project, newVersion,
- uploader.getUserId());
- project.setFlows(flows);
log.info("Uploading Job properties");
this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
directoryFlowLoader.getJobPropsMap().values()));
log.info("Uploading Props properties");
this.projectLoader.uploadProjectProperties(project, directoryFlowLoader.getPropsList());
- this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
- "Uploaded project files zip " + archive.getName());
+
} else if (loader instanceof DirectoryYamlFlowLoader) {
// Todo jamiesjc: upload yaml file to DB as a blob
+
} else {
throw new ProjectManagerException("Invalid type of flow loader.");
}
+
+ this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
+ "Uploaded project files zip " + archive.getName());
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index e92dda9..edb5bf4 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -83,6 +83,7 @@ public class DirectoryFlowLoader implements FlowLoader {
*
* @return Map of flow name to Flow.
*/
+ @Override
public Map<String, Flow> getFlowMap() {
return this.flowMap;
}
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index dbb7cce..6431fc9 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -17,14 +17,18 @@
package azkaban.project;
import azkaban.Constants;
+import azkaban.flow.Edge;
import azkaban.flow.Flow;
+import azkaban.flow.Node;
import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
import java.io.File;
import java.io.FileNotFoundException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
@@ -40,6 +44,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
private final Props props;
private final Set<String> errors = new HashSet<>();
private final Map<String, Flow> flowMap = new HashMap<>();
+ private final Map<String, List<Edge>> edgeMap = new HashMap<>();
/**
* Creates a new DirectoryYamlFlowLoader.
@@ -55,6 +60,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
*
* @return Map of flow name to Flow.
*/
+ @Override
public Map<String, Flow> getFlowMap() {
return this.flowMap;
}
@@ -69,6 +75,15 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
}
/**
+ * Returns the edge map constructed from the loaded flows.
+ *
+ * @return Map of flow name to all its Edges.
+ */
+ public Map<String, List<Edge>> getEdgeMap() {
+ return this.edgeMap;
+ }
+
+ /**
* Loads all project flows from the directory.
*
* @param project The project.
@@ -83,7 +98,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
}
private void convertYamlFiles(final File projectDir) {
- // Todo jamiesjc: convert project yaml file. It will contain properties for all flows.
+ // Todo jamiesjc: convert project yaml file.
final File[] flowFiles = projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX));
for (final File file : flowFiles) {
@@ -91,17 +106,87 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
try {
final NodeBean nodeBean = loader.load(file);
final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
- final Flow flow = new Flow(azkabanFlow.getName());
- flow.setAzkabanFlow(azkabanFlow);
- this.flowMap.put(azkabanFlow.getName(), flow);
- final Props flowProps = azkabanFlow.getProps();
- FlowLoaderUtils.addEmailPropsToFlow(flow, flowProps);
+ final Flow flow = convertAzkabanFlowToFlow(azkabanFlow);
+ this.flowMap.put(flow.getId(), flow);
} catch (final FileNotFoundException e) {
logger.error("Error loading flow yaml files", e);
}
}
}
+ private Flow convertAzkabanFlowToFlow(final AzkabanFlow azkabanFlow) {
+ final Flow flow = new Flow(azkabanFlow.getName());
+ final Props props = azkabanFlow.getProps();
+ FlowLoaderUtils.addEmailPropsToFlow(flow, props);
+
+ // Convert azkabanNodes to nodes inside the flow.
+ azkabanFlow.getNodes().values().stream().map(this::convertAzkabanNodeToNode)
+ .forEach(n -> flow.addNode(n));
+
+ // Add edges for the flow.
+ buildFlowEdges(azkabanFlow);
+ flow.addAllEdges(this.edgeMap.get(flow.getId()));
+
+ // Todo jamiesjc: deprecate startNodes, endNodes and numLevels, and remove below method finally.
+ // Blow method will construct startNodes, endNodes and numLevels for the flow.
+ flow.initialize();
+
+ return flow;
+ }
+
+ private Node convertAzkabanNodeToNode(final AzkabanNode azkabanNode) {
+ final Node node = new Node(azkabanNode.getName());
+ node.setType(azkabanNode.getType());
+
+ if (azkabanNode.getType().equals(Constants.FLOW_NODE_TYPE)) {
+ node.setEmbeddedFlowId(azkabanNode.getName());
+ final Flow flow_node = convertAzkabanFlowToFlow((AzkabanFlow) azkabanNode);
+ this.flowMap.put(flow_node.getId(), flow_node);
+ }
+ return node;
+ }
+
+ private void buildFlowEdges(final AzkabanFlow azkabanFlow) {
+ // Recursive stack to record searched nodes. Used for detecting dependency cycles.
+ final HashSet<String> recStack = new HashSet<>();
+ // Nodes that have already been visited and added edges.
+ final HashSet<String> visited = new HashSet<>();
+ for (final AzkabanNode node : azkabanFlow.getNodes().values()) {
+ addEdges(node, azkabanFlow, recStack, visited);
+ }
+ }
+
+ private void addEdges(final AzkabanNode node, final AzkabanFlow azkabanFlow,
+ final HashSet<String> recStack, final HashSet<String> visited) {
+ if (!visited.contains(node.getName())) {
+ recStack.add(node.getName());
+ visited.add(node.getName());
+ final List<String> dependsOnList = node.getDependsOn();
+ for (final String parent : dependsOnList) {
+ final Edge edge = new Edge(parent, node.getName());
+ if (!this.edgeMap.containsKey(azkabanFlow.getName())) {
+ this.edgeMap.put(azkabanFlow.getName(), new ArrayList<>());
+ }
+ this.edgeMap.get(azkabanFlow.getName()).add(edge);
+
+ if (recStack.contains(parent)) {
+ // Cycles found, including self cycle.
+ edge.setError("Cycles found.");
+ this.errors.add("Cycles found at " + edge.getId());
+ } else if (!azkabanFlow.getNodes().containsKey(parent)) {
+ // Dependency not found.
+ edge.setError("Dependency not found.");
+ this.errors.add(node.getName() + " cannot find dependency "
+ + parent);
+ } else {
+ // Valid edge. Continue to process the parent node recursively.
+ addEdges(azkabanFlow.getNode(parent), azkabanFlow, recStack, visited);
+ }
+ }
+ recStack.remove(node.getName());
+ }
+ }
+
public void checkJobProperties(final Project project) {
// Todo jamiesjc: implement the check later
}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoader.java b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
index ce796da..8fd4f91 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
@@ -16,8 +16,10 @@
package azkaban.project;
+import azkaban.flow.Flow;
import azkaban.project.validator.ValidationReport;
import java.io.File;
+import java.util.Map;
/**
* Interface to load project flows.
@@ -32,4 +34,11 @@ public interface FlowLoader {
* @return the validation report.
*/
ValidationReport loadProjectFlow(final Project project, final File projectDir);
+
+ /**
+ * Returns the flow map constructed from the loaded flows.
+ *
+ * @return Map of flow name to Flow.
+ */
+ Map<String, Flow> getFlowMap();
}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
index 9dc170d..439f8c0 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
@@ -19,9 +19,15 @@ package azkaban.project;
import static java.util.Objects.requireNonNull;
import azkaban.Constants;
+import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.utils.Props;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
import javax.inject.Inject;
+import org.apache.commons.lang.ArrayUtils;
+import org.yaml.snakeyaml.Yaml;
/**
* Factory class to generate flow loaders.
@@ -41,21 +47,48 @@ public class FlowLoaderFactory {
}
/**
- * Creates flow loader based on manifest file inside project directory.
+ * Creates flow loader based on project YAML file inside project directory.
*
* @param projectDir the project directory
* @return the flow loader
*/
public FlowLoader createFlowLoader(final File projectDir) throws ProjectManagerException {
- // Todo jamiesjc: need to check if manifest file exists in project directory,
- // and create FlowLoader based on different flow versions specified in the manifest file.
- final String flowVersion = null;
- if (flowVersion == null) {
- return new DirectoryFlowLoader(this.props);
- } else if (flowVersion.equals(Constants.AZKABAN_FLOW_VERSION_2_0)) {
- return new DirectoryYamlFlowLoader(this.props);
+
+ final File[] projectFileList = projectDir.listFiles(new SuffixFilter(Constants
+ .PROJECT_FILE_SUFFIX));
+
+ if (projectFileList == null) {
+ throw new ProjectManagerException("Error reading project directory. Input is not a "
+ + "directory or IO error happens.");
+ }
+
+ if (ArrayUtils.isNotEmpty(projectFileList)) {
+ if (projectFileList.length > 1) {
+ throw new ProjectManagerException("Duplicate project YAML files found in the project "
+ + "directory. Only one is allowed.");
+ }
+
+ final Map<String, Object> azkabanProject;
+ try (FileInputStream fis = new FileInputStream(projectFileList[0])) {
+ azkabanProject = (Map<String, Object>) new Yaml().load(fis);
+ } catch (final IOException e) {
+ throw new ProjectManagerException("Error reading project YAML file.", e);
+ }
+
+ if (azkabanProject == null || !azkabanProject
+ .containsKey(Constants.ConfigurationKeys.AZKABAN_FLOW_VERSION)) {
+ throw new ProjectManagerException("azkaban-flow-version is not specified in the project "
+ + "YAML file.");
+ }
+
+ if (azkabanProject.get(Constants.ConfigurationKeys.AZKABAN_FLOW_VERSION).equals
+ (Constants.AZKABAN_FLOW_VERSION_2_0)) {
+ return new DirectoryYamlFlowLoader(this.props);
+ } else {
+ throw new ProjectManagerException("Invalid azkaban-flow-version in the project YAML file.");
+ }
} else {
- throw new ProjectManagerException("Flow version " + flowVersion + "is invalid.");
+ return new DirectoryFlowLoader(this.props);
}
}
}
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index e55bbb7..2669462 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -16,19 +16,36 @@
package azkaban.project;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DirectoryYamlFlowLoaderTest {
+ private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoaderTest
+ .class);
+
private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
- private static final String FLOW_NAME_1 = "basic_flow";
- private static final String FLOW_NAME_2 = "basic_flow2";
+ private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowyamltest";
+ private static final String INVALID_FLOW_YAML_DIR = "invalidflowyamltest";
+ private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
+ private static final String BASIC_FLOW_1 = "basic_flow";
+ private static final String BASIC_FLOW_2 = "basic_flow2";
+ private static final String EMBEDDED_FLOW = "embedded_flow";
+ private static final String EMBEDDED_FLOW_1 = "embedded_flow1";
+ private static final String EMBEDDED_FLOW_2 = "embedded_flow2";
+ private static final String INVALID_FLOW_1 = "dependency_not_found";
+ private static final String INVALID_FLOW_2 = "cycle_found";
+ private static final String DEPENDENCY_NOT_FOUND_ERROR = "Dependency not found.";
+ private static final String CYCLE_FOUND_ERROR = "Cycles found.";
private Project project;
@Before
@@ -37,31 +54,74 @@ public class DirectoryYamlFlowLoaderTest {
}
@Test
- public void testLoadYamlFileFromDirectory() {
+ public void testLoadBasicYamlFile() {
final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
-
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(BASIC_FLOW_YAML_DIR));
- Assert.assertEquals(0, loader.getErrors().size());
- Assert.assertEquals(1, loader.getFlowMap().size());
- Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_1));
- final Flow flow = loader.getFlowMap().get(FLOW_NAME_1);
- final AzkabanFlow azkabanFlow = flow.getAzkabanFlow();
- Assert.assertEquals(FLOW_NAME_1, azkabanFlow.getName());
- Assert.assertEquals(4, azkabanFlow.getNodes().size());
+ checkFlowLoaderProperties(loader, 0, 1, 1);
+ checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 3, null);
}
@Test
- public void testLoadMultipleYamlFilesFromDirectory() {
+ public void testLoadMultipleYamlFiles() {
final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
-
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(MULTIPLE_FLOW_YAML_DIR));
- Assert.assertEquals(0, loader.getErrors().size());
- Assert.assertEquals(2, loader.getFlowMap().size());
- Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_1));
- Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_2));
- final Flow flow2 = loader.getFlowMap().get(FLOW_NAME_2);
- final AzkabanFlow azkabanFlow2 = flow2.getAzkabanFlow();
- Assert.assertEquals(FLOW_NAME_2, azkabanFlow2.getName());
- Assert.assertEquals(3, azkabanFlow2.getNodes().size());
+ checkFlowLoaderProperties(loader, 0, 2, 2);
+ checkFlowProperties(loader, BASIC_FLOW_1, 0, 4, 3, null);
+ checkFlowProperties(loader, BASIC_FLOW_2, 0, 3, 2, null);
+ }
+
+ @Test
+ public void testLoadEmbeddedFlowYamlFile() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(EMBEDDED_FLOW_YAML_DIR));
+ checkFlowLoaderProperties(loader, 0, 3, 3);
+ checkFlowProperties(loader, EMBEDDED_FLOW, 0, 4, 3, null);
+ checkFlowProperties(loader, EMBEDDED_FLOW_1, 0, 4, 3, null);
+ checkFlowProperties(loader, EMBEDDED_FLOW_2, 0, 2, 1, null);
+ }
+
+ @Test
+ public void testLoadInvalidFlowYamlFiles() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_FLOW_YAML_DIR));
+ checkFlowLoaderProperties(loader, 2, 2, 2);
+ // Invalid flow 1: Dependency not found.
+ checkFlowProperties(loader, INVALID_FLOW_1, 1, 3, 3, DEPENDENCY_NOT_FOUND_ERROR);
+ // Invalid flow 2: Cycles found.
+ checkFlowProperties(loader, INVALID_FLOW_2, 1, 4, 4, CYCLE_FOUND_ERROR);
+ }
+
+ @Test
+ public void testLoadNoFlowYamlFile() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(NO_FLOW_YAML_DIR));
+ checkFlowLoaderProperties(loader, 0, 0, 0);
+ }
+
+ private void checkFlowLoaderProperties(final DirectoryYamlFlowLoader loader, final int numError,
+ final int numFlowMap, final int numEdgeMap) {
+ assertThat(loader.getErrors().size()).isEqualTo(numError);
+ assertThat(loader.getFlowMap().size()).isEqualTo(numFlowMap);
+ assertThat(loader.getEdgeMap().size()).isEqualTo(numEdgeMap);
+ }
+
+ private void checkFlowProperties(final DirectoryYamlFlowLoader loader, final String flowName,
+ final int numError, final int numNode, final int numEdge, final String edgeError) {
+ assertThat(loader.getFlowMap().containsKey(flowName)).isTrue();
+ final Flow flow = loader.getFlowMap().get(flowName);
+ if (numError != 0) {
+ assertThat(flow.getErrors().size()).isEqualTo(numError);
+ }
+ assertThat(flow.getNodes().size()).isEqualTo(numNode);
+
+ // Verify flow edges
+ assertThat(loader.getEdgeMap().get(flowName).size()).isEqualTo(numEdge);
+ assertThat(flow.getEdges().size()).isEqualTo(numEdge);
+ for (final Edge edge : loader.getEdgeMap().get(flowName)) {
+ this.logger.info(flowName + ".flow has edge: " + edge.getId());
+ if (edge.getError() != null) {
+ assertThat(edge.getError()).isEqualTo(edgeError);
+ }
+ }
}
}
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
index 4aceded..2cb6fac 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
@@ -16,40 +16,63 @@
package azkaban.project;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
import java.io.File;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
public class FlowLoaderFactoryTest {
private static final String FLOW_10_TEST_DIRECTORY = "exectest1";
private static final String FLOW_20_TEST_DIRECTORY = "basicflowyamltest";
- private Project project;
-
- @Before
- public void setUp() throws Exception {
- this.project = new Project(13, "myTestProject");
- }
+ private static final String DUPLICATE_PROJECT_DIRECTORY = "duplicateprojectyamltest";
+ private static final String INVALID_FLOW_VERSION_DIRECTORY = "invalidflowversiontest";
+ private static final String NO_FLOW_VERSION_DIRECTORY = "noflowversiontest";
@Test
public void testCreateDirectoryFlowLoader() {
final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
final File projectDir = ExecutionsTestUtil.getFlowDir(FLOW_10_TEST_DIRECTORY);
final FlowLoader loader = loaderFactory.createFlowLoader(projectDir);
- Assert.assertTrue(loader instanceof DirectoryFlowLoader);
+ assertThat(loader instanceof DirectoryFlowLoader).isTrue();
}
- // Todo jamiesjc: add the manifest file with flow version 2.0 and enable this test
@Test
- @Ignore
public void testCreateDirectoryYamlFlowLoader() {
final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
final File projectDir = ExecutionsTestUtil.getFlowDir(FLOW_20_TEST_DIRECTORY);
final FlowLoader loader = loaderFactory.createFlowLoader(projectDir);
- Assert.assertTrue(loader instanceof DirectoryYamlFlowLoader);
+ assertThat(loader instanceof DirectoryYamlFlowLoader).isTrue();
+ }
+
+ @Test
+ public void testDuplicateProjectYamlFilesException() {
+ final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+ final File projectDir = ExecutionsTestUtil.getFlowDir(DUPLICATE_PROJECT_DIRECTORY);
+ assertThatThrownBy(() -> loaderFactory.createFlowLoader(projectDir))
+ .isInstanceOf(ProjectManagerException.class)
+ .hasMessageContaining(
+ "Duplicate project YAML files found in the project directory. Only one is allowed.");
+ }
+
+ @Test
+ public void testNoFlowVersionException() {
+ final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+ final File projectDir = ExecutionsTestUtil.getFlowDir(NO_FLOW_VERSION_DIRECTORY);
+ assertThatThrownBy(() -> loaderFactory.createFlowLoader(projectDir))
+ .isInstanceOf(ProjectManagerException.class)
+ .hasMessageContaining("azkaban-flow-version is not specified in the project YAML file.");
+ }
+
+ @Test
+ public void testInvalidFlowVersionException() {
+ final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+ final File projectDir = ExecutionsTestUtil.getFlowDir(INVALID_FLOW_VERSION_DIRECTORY);
+ assertThatThrownBy(() -> loaderFactory.createFlowLoader(projectDir))
+ .isInstanceOf(ProjectManagerException.class)
+ .hasMessageContaining("Invalid azkaban-flow-version in the project YAML file.");
}
}
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index ddc3845..157c5d1 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -19,6 +19,7 @@ package azkaban.project;
import static org.assertj.core.api.Assertions.assertThat;
+import azkaban.Constants;
import azkaban.test.executions.ExecutionsTestUtil;
import org.junit.Test;
@@ -41,7 +42,6 @@ public class NodeBeanLoaderTest {
private static final String EMBEDDED_FLOW2 = "embedded_flow2";
private static final String TYPE_NOOP = "noop";
private static final String TYPE_COMMAND = "command";
- private static final String TYPE_FLOW = "flow";
@Test
public void testLoadNodeBeanForBasicFlow() throws Exception {
@@ -51,7 +51,7 @@ public class NodeBeanLoaderTest {
BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
assertThat(nodeBean.getName()).isEqualTo(BASIC_FLOW_NAME);
- assertThat(nodeBean.getType()).isEqualTo(TYPE_FLOW);
+ assertThat(nodeBean.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
assertThat(nodeBean.getNodes().size()).isEqualTo(4);
@@ -74,7 +74,7 @@ public class NodeBeanLoaderTest {
EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
assertThat(nodeBean.getName()).isEqualTo(EMBEDDED_FLOW_NAME);
- assertThat(nodeBean.getType()).isEqualTo(TYPE_FLOW);
+ assertThat(nodeBean.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
assertThat(nodeBean.getConfig().get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
assertThat(nodeBean.getNodes().size()).isEqualTo(4);
@@ -91,7 +91,7 @@ public class NodeBeanLoaderTest {
final NodeBean node3 = nodeBean.getNodes().get(3);
assertThat(node3.getName()).isEqualTo(EMBEDDED_FLOW1);
- assertThat(node3.getType()).isEqualTo(TYPE_FLOW);
+ assertThat(node3.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
assertThat(node3.getNodes().size()).isEqualTo(4);
// Verify nodes in embedded_flow1 are loaded correctly.
@@ -103,7 +103,7 @@ public class NodeBeanLoaderTest {
final NodeBean node3_2 = node3.getNodes().get(2);
assertThat(node3_2.getName()).isEqualTo(EMBEDDED_FLOW2);
- assertThat(node3_2.getType()).isEqualTo(TYPE_FLOW);
+ assertThat(node3_2.getType()).isEqualTo(Constants.FLOW_NODE_TYPE);
assertThat(node3_2.getDependsOn()).contains(SHELL_BASH);
assertThat(node3_2.getNodes().size()).isEqualTo(2);
diff --git a/test/execution-test-data/basicflowyamltest/basic_flow.project b/test/execution-test-data/basicflowyamltest/basic_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/basicflowyamltest/basic_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/duplicateprojectyamltest/basic_flow.flow b/test/execution-test-data/duplicateprojectyamltest/basic_flow.flow
new file mode 100644
index 0000000..689f9fe
--- /dev/null
+++ b/test/execution-test-data/duplicateprojectyamltest/basic_flow.flow
@@ -0,0 +1,41 @@
+---
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - shell_bash
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ # Describe the type of the job
+ type: command
+ config:
+ command: bash ./sample_script.sh
diff --git a/test/execution-test-data/duplicateprojectyamltest/basic_flow.project b/test/execution-test-data/duplicateprojectyamltest/basic_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/duplicateprojectyamltest/basic_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/duplicateprojectyamltest/basic_flow2.project b/test/execution-test-data/duplicateprojectyamltest/basic_flow2.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/duplicateprojectyamltest/basic_flow2.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/embeddedflowyamltest/embedded_flow.project b/test/execution-test-data/embeddedflowyamltest/embedded_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/embeddedflowyamltest/embedded_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/invalidflowversiontest/basic_flow.flow b/test/execution-test-data/invalidflowversiontest/basic_flow.flow
new file mode 100644
index 0000000..689f9fe
--- /dev/null
+++ b/test/execution-test-data/invalidflowversiontest/basic_flow.flow
@@ -0,0 +1,41 @@
+---
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - shell_bash
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ # Describe the type of the job
+ type: command
+ config:
+ command: bash ./sample_script.sh
diff --git a/test/execution-test-data/invalidflowversiontest/basic_flow.project b/test/execution-test-data/invalidflowversiontest/basic_flow.project
new file mode 100644
index 0000000..678ff89
--- /dev/null
+++ b/test/execution-test-data/invalidflowversiontest/basic_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 1.0
diff --git a/test/execution-test-data/invalidflowyamltest/cycle_found.flow b/test/execution-test-data/invalidflowyamltest/cycle_found.flow
new file mode 100644
index 0000000..f6943d9
--- /dev/null
+++ b/test/execution-test-data/invalidflowyamltest/cycle_found.flow
@@ -0,0 +1,30 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_echo
+
+ - name: shell_echo
+ type: command
+ dependsOn:
+ - shell_pwd
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ type: command
+ dependsOn:
+ - shell_bash
+ config:
+ command: pwd
+
+ - name: shell_bash
+ type: command
+ dependsOn:
+ - shell_echo
+ config:
+ command: bash ./sample_script.sh
diff --git a/test/execution-test-data/invalidflowyamltest/dependency_not_found.flow b/test/execution-test-data/invalidflowyamltest/dependency_not_found.flow
new file mode 100644
index 0000000..dc8ab6e
--- /dev/null
+++ b/test/execution-test-data/invalidflowyamltest/dependency_not_found.flow
@@ -0,0 +1,21 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - shell_bash
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/invalidflowyamltest/invalid_flow.project b/test/execution-test-data/invalidflowyamltest/invalid_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/invalidflowyamltest/invalid_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/multipleflowyamltest/multiple_flows.project b/test/execution-test-data/multipleflowyamltest/multiple_flows.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/multipleflowyamltest/multiple_flows.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/noflowversiontest/basic_flow.flow b/test/execution-test-data/noflowversiontest/basic_flow.flow
new file mode 100644
index 0000000..689f9fe
--- /dev/null
+++ b/test/execution-test-data/noflowversiontest/basic_flow.flow
@@ -0,0 +1,41 @@
+---
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - shell_bash
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ # Describe the type of the job
+ type: command
+ config:
+ command: bash ./sample_script.sh
diff --git a/test/execution-test-data/noflowversiontest/no_flow_version.project b/test/execution-test-data/noflowversiontest/no_flow_version.project
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/execution-test-data/noflowversiontest/no_flow_version.project
diff --git a/test/execution-test-data/noflowyamltest/no_flow.project b/test/execution-test-data/noflowyamltest/no_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/noflowyamltest/no_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0