Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 77d2151..b21a4fe 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -32,6 +32,9 @@ package azkaban;
*/
public class Constants {
+ // Azkaban Flow Versions
+ public static final String AZKABAN_FLOW_VERSION_2_0 = "2.0";
+
// Names and paths of various file names to configure Azkaban
public static final String AZKABAN_PROPERTIES_FILE = "azkaban.properties";
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE = "azkaban.private.properties";
@@ -156,6 +159,12 @@ public class Constants {
// Job property that enables/disables using Kafka logging of user job logs
public static final String AZKABAN_JOB_LOGGING_KAFKA_ENABLE = "azkaban.job.logging.kafka.enable";
+
+ // Job properties that indicate maximum memory size
+ public static final String JOB_MAX_XMS = "job.max.Xms";
+ public static final String MAX_XMS_DEFAULT = "1G";
+ public static final String JOB_MAX_XMX = "job.max.Xmx";
+ public static final String MAX_XMX_DEFAULT = "2G";
}
public static class JobCallbackProperties {
diff --git a/azkaban-common/src/main/java/azkaban/flow/Flow.java b/azkaban-common/src/main/java/azkaban/flow/Flow.java
index fee2e29..991a954 100644
--- a/azkaban-common/src/main/java/azkaban/flow/Flow.java
+++ b/azkaban-common/src/main/java/azkaban/flow/Flow.java
@@ -17,6 +17,7 @@
package azkaban.flow;
import azkaban.executor.mail.DefaultMailCreator;
+import azkaban.project.AzkabanFlow;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -48,6 +49,8 @@ public class Flow {
private boolean isLayedOut = false;
+ private AzkabanFlow azkabanFlow;
+
public Flow(final String id) {
this.id = id;
}
@@ -407,4 +410,12 @@ public class Flow {
public void setProjectId(final int projectId) {
this.projectId = projectId;
}
+
+ public AzkabanFlow getAzkabanFlow() {
+ return this.azkabanFlow;
+ }
+
+ public void setAzkabanFlow(final AzkabanFlow azkabanFlow) {
+ this.azkabanFlow = azkabanFlow;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index 4f1a6c8..f87f7ab 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -16,7 +16,7 @@
package azkaban.jobExecutor;
-import azkaban.project.DirectoryFlowLoader;
+import azkaban.Constants;
import azkaban.server.AzkabanServer;
import azkaban.utils.Pair;
import azkaban.utils.Props;
@@ -161,9 +161,9 @@ public class JavaProcessJob extends ProcessJob {
final Props azkabanProperties = AzkabanServer.getAzkabanProperties();
if (azkabanProperties != null) {
final String maxXms = azkabanProperties
- .getString(DirectoryFlowLoader.JOB_MAX_XMS, DirectoryFlowLoader.MAX_XMS_DEFAULT);
+ .getString(Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
final String maxXmx = azkabanProperties
- .getString(DirectoryFlowLoader.JOB_MAX_XMX, DirectoryFlowLoader.MAX_XMX_DEFAULT);
+ .getString(Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
final long sizeMaxXms = Utils.parseMemString(maxXms);
final long sizeMaxXmx = Utils.parseMemString(maxXmx);
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index dd6cdea..110fe80 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -54,15 +54,17 @@ class AzkabanProjectLoader {
private final ProjectLoader projectLoader;
private final StorageManager storageManager;
+ private final FlowLoaderFactory flowLoaderFactory;
private final File tempDir;
private final int projectVersionRetention;
@Inject
AzkabanProjectLoader(final Props props, final ProjectLoader projectLoader,
- final StorageManager storageManager) {
+ final StorageManager storageManager, final FlowLoaderFactory flowLoaderFactory) {
this.props = requireNonNull(props, "Props is null");
this.projectLoader = requireNonNull(projectLoader, "project Loader is null");
this.storageManager = requireNonNull(storageManager, "Storage Manager is null");
+ this.flowLoaderFactory = requireNonNull(flowLoaderFactory, "Flow Loader Factory is null");
this.tempDir = new File(props.getString(ConfigurationKeys.PROJECT_TEMP_DIR, "temp"));
if (!this.tempDir.exists()) {
@@ -90,18 +92,16 @@ class AzkabanProjectLoader {
prop.putAll(additionalProps);
File file = null;
+ final FlowLoader loader;
+
try {
file = unzipProject(archive, fileType);
reports = validateProject(project, archive, file, prop);
- // Todo jamiesjc: in Flow 2.0, we need to create new flowLoader class and
- // call new method to load the project flows.
- // Need to guicify it later so that we can mock flowLoader in the tests.
- // Load the project flows.
- final DirectoryFlowLoader directoryFlowLoader = new DirectoryFlowLoader(prop);
- reports.put(DIRECTORY_FLOW_REPORT_KEY,
- directoryFlowLoader.loadProject(project, file));
+ loader = this.flowLoaderFactory.createFlowLoader(file);
+ reports.put(DIRECTORY_FLOW_REPORT_KEY, loader.loadProjectFlow(project, file));
+
} finally {
cleanUpProjectTempDir(file);
}
@@ -112,7 +112,7 @@ class AzkabanProjectLoader {
}
// Upload the project to DB and storage.
- persistProject(project, archive, uploader);
+ persistProject(project, loader, archive, uploader);
// Clean up project old installations after new project is uploaded successfully.
cleanUpProjectOldInstallations(project);
@@ -181,30 +181,38 @@ class AzkabanProjectLoader {
return true;
}
- private void persistProject(final Project project, final File archive, final User uploader)
- throws ProjectManagerException{
+ private void persistProject(final Project project, final FlowLoader loader, final File archive,
+ final User uploader) throws ProjectManagerException {
synchronized (project) {
- final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
- final Map<String, Flow> flows = project.getFlowMap();
- for (final Flow flow : flows.values()) {
- flow.setProjectId(project.getId());
- flow.setVersion(newVersion);
- }
+ if (loader instanceof DirectoryFlowLoader) {
+ final DirectoryFlowLoader directoryFlowLoader = (DirectoryFlowLoader) loader;
+ final int newVersion = this.projectLoader.getLatestProjectVersion(project) + 1;
+ final Map<String, Flow> flows = directoryFlowLoader.getFlowMap();
+ for (final Flow flow : flows.values()) {
+ flow.setProjectId(project.getId());
+ flow.setVersion(newVersion);
+ }
- this.storageManager.uploadProject(project, newVersion, archive, uploader);
-
- log.info("Uploading flow to db " + archive.getName());
- this.projectLoader.uploadFlows(project, newVersion, flows.values());
- log.info("Changing project versions " + archive.getName());
- this.projectLoader.changeProjectVersion(project, newVersion,
- uploader.getUserId());
- log.info("Uploading Job properties");
- this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
- project.getJobPropsMap().values()));
- log.info("Uploading Props properties");
- this.projectLoader.uploadProjectProperties(project, project.getPropsList());
- this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
- "Uploaded project files zip " + archive.getName());
+ this.storageManager.uploadProject(project, newVersion, archive, uploader);
+
+ log.info("Uploading flow to db " + archive.getName());
+ this.projectLoader.uploadFlows(project, newVersion, flows.values());
+ log.info("Changing project versions " + archive.getName());
+ this.projectLoader.changeProjectVersion(project, newVersion,
+ uploader.getUserId());
+ project.setFlows(flows);
+ log.info("Uploading Job properties");
+ this.projectLoader.uploadProjectProperties(project, new ArrayList<>(
+ directoryFlowLoader.getJobPropsMap().values()));
+ log.info("Uploading Props properties");
+ this.projectLoader.uploadProjectProperties(project, directoryFlowLoader.getPropsList());
+ this.projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
+ "Uploaded project files zip " + archive.getName());
+ } else if (loader instanceof DirectoryYamlFlowLoader) {
+ // Todo jamiesjc: upload yaml file to DB as a blob
+ } else {
+ throw new ProjectManagerException("Invalid type of flow loader.");
+ }
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index cc99174..a9388fa 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -16,6 +16,7 @@
package azkaban.project;
+import azkaban.Constants;
import azkaban.flow.CommonJobProperties;
import azkaban.flow.Edge;
import azkaban.flow.Flow;
@@ -23,6 +24,7 @@ import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.flow.SpecialJobTypes;
import azkaban.jobcallback.JobCallbackValidator;
+import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
@@ -32,30 +34,27 @@ import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class DirectoryFlowLoader {
+public class DirectoryFlowLoader implements FlowLoader {
- public static final String JOB_MAX_XMS = "job.max.Xms";
- public static final String MAX_XMS_DEFAULT = "1G";
- public static final String JOB_MAX_XMX = "job.max.Xmx";
- public static final String MAX_XMX_DEFAULT = "2G";
private static final DirFilter DIR_FILTER = new DirFilter();
private static final String PROPERTY_SUFFIX = ".properties";
private static final String JOB_SUFFIX = ".job";
private static final String XMS = "Xms";
private static final String XMX = "Xmx";
- private static final Logger logger = Logger.getLogger(DirectoryFlowLoader.class);
+ private static final Logger logger = LoggerFactory.getLogger(DirectoryFlowLoader.class);
private final Props props;
+ private final Set<String> errors = new HashSet<>();
+ private final Map<String, Flow> flowMap = new HashMap<>();
private HashSet<String> rootNodes;
- private HashMap<String, Flow> flowMap;
private HashMap<String, Node> nodeMap;
private HashMap<String, Map<String, Edge>> nodeDependencies;
private HashMap<String, Props> jobPropsMap;
@@ -65,7 +64,6 @@ public class DirectoryFlowLoader {
private ArrayList<FlowProps> flowPropsList;
private ArrayList<Props> propsList;
- private Set<String> errors;
private Set<String> duplicateJobs;
/**
@@ -78,6 +76,15 @@ public class DirectoryFlowLoader {
}
/**
+ * Returns the flow map constructed from the loaded flows.
+ *
+ * @return Map of flow name to Flow.
+ */
+ public Map<String, Flow> getFlowMap() {
+ return this.flowMap;
+ }
+
+ /**
* Returns errors caught when loading flows.
*
* @return Set of error strings.
@@ -87,25 +94,43 @@ public class DirectoryFlowLoader {
}
/**
- * Loads all flows from the directory into the project.
+ * Returns job properties.
+ *
+ * @return Map of job name to properties.
+ */
+ public HashMap<String, Props> getJobPropsMap() {
+ return this.jobPropsMap;
+ }
+
+ /**
+ * Returns list of properties.
*
- * @param project The project to load flows to.
- * @param baseDirectory The directory to load flows from.
+ * @return List of Props.
*/
- public void loadProjectFlow(final Project project, final File baseDirectory) {
+ public ArrayList<Props> getPropsList() {
+ return this.propsList;
+ }
+
+ /**
+ * Loads all project flows from the directory.
+ *
+ * @param project The project.
+ * @param projectDir The directory to load flows from.
+ * @return the validation report.
+ */
+ @Override
+ public ValidationReport loadProjectFlow(final Project project, final File projectDir) {
this.propsList = new ArrayList<>();
this.flowPropsList = new ArrayList<>();
this.jobPropsMap = new HashMap<>();
this.nodeMap = new HashMap<>();
- this.flowMap = new HashMap<>();
- this.errors = new HashSet<>();
this.duplicateJobs = new HashSet<>();
this.nodeDependencies = new HashMap<>();
this.rootNodes = new HashSet<>();
this.flowDependencies = new HashMap<>();
// Load all the props files and create the Node objects
- loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
+ loadProjectFromDir(projectDir.getPath(), projectDir, null);
// Create edges and find missing dependencies
resolveDependencies();
@@ -116,9 +141,9 @@ public class DirectoryFlowLoader {
// Resolve embedded flows
resolveEmbeddedFlows();
- project.setFlows(this.flowMap);
- project.setPropsList(this.propsList);
- project.setJobPropsMap(this.jobPropsMap);
+ checkJobProperties(project);
+
+ return FlowLoaderUtils.generateFlowLoaderReport(this.errors);
}
@@ -302,34 +327,7 @@ public class DirectoryFlowLoader {
final Flow flow = new Flow(base.getId());
final Props jobProp = this.jobPropsMap.get(base.getId());
- // Dedup with sets
- final List<String> successEmailList =
- jobProp.getStringList(CommonJobProperties.SUCCESS_EMAILS,
- Collections.EMPTY_LIST);
- final Set<String> successEmail = new HashSet<>();
- for (final String email : successEmailList) {
- successEmail.add(email.toLowerCase());
- }
-
- final List<String> failureEmailList =
- jobProp.getStringList(CommonJobProperties.FAILURE_EMAILS,
- Collections.EMPTY_LIST);
- final Set<String> failureEmail = new HashSet<>();
- for (final String email : failureEmailList) {
- failureEmail.add(email.toLowerCase());
- }
-
- final List<String> notifyEmailList =
- jobProp.getStringList(CommonJobProperties.NOTIFY_EMAILS,
- Collections.EMPTY_LIST);
- for (String email : notifyEmailList) {
- email = email.toLowerCase();
- successEmail.add(email);
- failureEmail.add(email);
- }
-
- flow.addFailureEmails(failureEmail);
- flow.addSuccessEmails(successEmail);
+ FlowLoaderUtils.addEmailPropsToFlow(flow, jobProp);
flow.addAllFlowProperties(this.flowPropsList);
constructFlow(flow, base, visitedNodes);
@@ -380,7 +378,7 @@ public class DirectoryFlowLoader {
visited.remove(node.getId());
}
- private void checkJobProperties(final Project project) {
+ public void checkJobProperties(final Project project) {
// if project is in the memory check whitelist, then we don't need to check
// its memory settings
if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
@@ -388,8 +386,10 @@ public class DirectoryFlowLoader {
return;
}
- final String maxXms = this.props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
- final String maxXmx = this.props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
+ final String maxXms = this.props.getString(
+ Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
+ final String maxXmx = this.props.getString(
+ Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
final long sizeMaxXms = Utils.parseMemString(maxXms);
final long sizeMaxXmx = Utils.parseMemString(maxXmx);
@@ -427,14 +427,6 @@ public class DirectoryFlowLoader {
return filePath.substring(basePath.length() + 1);
}
- public ValidationReport loadProject(final Project project, final File projectDir) {
- loadProjectFlow(project, projectDir);
- checkJobProperties(project);
- final ValidationReport report = new ValidationReport();
- report.addErrorMsgs(this.errors);
- return report;
- }
-
private static class DirFilter implements FileFilter {
@Override
@@ -443,20 +435,4 @@ public class DirectoryFlowLoader {
}
}
- private static class SuffixFilter implements FileFilter {
-
- private final String suffix;
-
- public SuffixFilter(final String suffix) {
- this.suffix = suffix;
- }
-
- @Override
- public boolean accept(final File pathname) {
- final String name = pathname.getName();
-
- return pathname.isFile() && !pathname.isHidden()
- && name.length() > this.suffix.length() && name.endsWith(this.suffix);
- }
- }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
new file mode 100644
index 0000000..058f8ce
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -0,0 +1,111 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+package azkaban.project;
+
+import azkaban.flow.Flow;
+import azkaban.project.FlowLoaderUtils.SuffixFilter;
+import azkaban.project.validator.ValidationReport;
+import azkaban.utils.Props;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Loads yaml files to flows from project directory.
+ */
+public class DirectoryYamlFlowLoader implements FlowLoader {
+
+ private static final Logger logger = LoggerFactory.getLogger(DirectoryYamlFlowLoader.class);
+ private static final String PROJECT_FILE_SUFFIX = ".project";
+ private static final String FLOW_FILE_SUFFIX = ".flow";
+
+ private final Props props;
+ private final Set<String> errors = new HashSet<>();
+ private final Map<String, Flow> flowMap = new HashMap<>();
+
+ /**
+ * Creates a new DirectoryYamlFlowLoader.
+ *
+ * @param props Properties to add.
+ */
+ public DirectoryYamlFlowLoader(final Props props) {
+ this.props = props;
+ }
+
+ /**
+ * Returns the flow map constructed from the loaded flows.
+ *
+ * @return Map of flow name to Flow.
+ */
+ public Map<String, Flow> getFlowMap() {
+ return this.flowMap;
+ }
+
+ /**
+ * Returns errors caught when loading flows.
+ *
+ * @return Set of error strings.
+ */
+ public Set<String> getErrors() {
+ return this.errors;
+ }
+
+ /**
+ * Loads all project flows from the directory.
+ *
+ * @param project The project.
+ * @param projectDir The directory to load flows from.
+ * @return the validation report.
+ */
+ @Override
+ public ValidationReport loadProjectFlow(final Project project, final File projectDir) {
+ convertYamlFiles(projectDir);
+ checkJobProperties(project);
+ return FlowLoaderUtils.generateFlowLoaderReport(this.errors);
+ }
+
+ private void convertYamlFiles(final File projectDir) {
+ // Todo jamiesjc: convert project yaml file. It will contain properties for all flows.
+
+ //covert flow yaml files
+ final File[] flowFiles = projectDir.listFiles(new SuffixFilter(FLOW_FILE_SUFFIX));
+ for (final File file : flowFiles) {
+ final FlowBeanLoader loader = new FlowBeanLoader();
+ try {
+ final FlowBean flowBean = loader.load(file);
+ final AzkabanFlow azkabanFlow = loader.toAzkabanFlow(loader.getFlowName(file), flowBean);
+ final Flow flow = new Flow(azkabanFlow.getName());
+ flow.setAzkabanFlow(azkabanFlow);
+ this.flowMap.put(azkabanFlow.getName(), flow);
+ final Props flowProps = azkabanFlow.getProps();
+ FlowLoaderUtils.addEmailPropsToFlow(flow, flowProps);
+ } catch (final FileNotFoundException e) {
+ logger.error("Error loading flow yaml files", e);
+ }
+ }
+ }
+
+ public void checkJobProperties(final Project project) {
+ // Todo jamiesjc: implement the check later
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowBean.java b/azkaban-common/src/main/java/azkaban/project/FlowBean.java
index de4780c..4dc0b53 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowBean.java
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Map;
/**
- * This is the top level class which is used by the YAML loader to deserialize a flow.yml file.
+ * Top level class used by the YAML loader to deserialize a flow yaml file.
*/
public class FlowBean implements Serializable {
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java
index 34f401a..a10914b 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowBeanLoader.java
@@ -33,7 +33,7 @@ public class FlowBeanLoader {
public FlowBean load(final File flowFile) throws FileNotFoundException {
checkArgument(flowFile.exists());
- checkArgument(flowFile.getName().endsWith(".yml"));
+ checkArgument(flowFile.getName().endsWith(".flow"));
return new Yaml().loadAs(new FileInputStream(flowFile), FlowBean.class);
}
@@ -81,7 +81,7 @@ public class FlowBeanLoader {
public String getFlowName(final File flowFile) {
checkArgument(flowFile.exists());
- checkArgument(flowFile.getName().endsWith(".yml"));
+ checkArgument(flowFile.getName().endsWith(".flow"));
return Files.getNameWithoutExtension(flowFile.getName());
}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoader.java b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
new file mode 100644
index 0000000..18b9303
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
@@ -0,0 +1,32 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+package azkaban.project;
+
+import azkaban.project.validator.ValidationReport;
+import java.io.File;
+
+public interface FlowLoader {
+
+ /**
+ * Loads all project flows from the directory.
+ *
+ * @param project The project.
+ * @param projectDir The directory to load flows from.
+ * @return the validation report.
+ */
+ ValidationReport loadProjectFlow(final Project project, final File projectDir);
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
new file mode 100644
index 0000000..9dc170d
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
@@ -0,0 +1,61 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+package azkaban.project;
+
+import static java.util.Objects.requireNonNull;
+
+import azkaban.Constants;
+import azkaban.utils.Props;
+import java.io.File;
+import javax.inject.Inject;
+
+/**
+ * Factory class to generate flow loaders.
+ */
+public class FlowLoaderFactory {
+
+ private final Props props;
+
+ /**
+ * Instantiates a new Flow loader factory.
+ *
+ * @param props the props
+ */
+ @Inject
+ public FlowLoaderFactory(final Props props) {
+ this.props = requireNonNull(props, "Props is null");
+ }
+
+ /**
+ * Creates flow loader based on manifest file inside project directory.
+ *
+ * @param projectDir the project directory
+ * @return the flow loader
+ */
+ public FlowLoader createFlowLoader(final File projectDir) throws ProjectManagerException {
+ // Todo jamiesjc: need to check if manifest file exists in project directory,
+ // and create FlowLoader based on different flow versions specified in the manifest file.
+ final String flowVersion = null;
+ if (flowVersion == null) {
+ return new DirectoryFlowLoader(this.props);
+ } else if (flowVersion.equals(Constants.AZKABAN_FLOW_VERSION_2_0)) {
+ return new DirectoryYamlFlowLoader(this.props);
+ } else {
+ throw new ProjectManagerException("Flow version " + flowVersion + "is invalid.");
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
new file mode 100644
index 0000000..16ca468
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -0,0 +1,103 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+package azkaban.project;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.flow.Flow;
+import azkaban.project.validator.ValidationReport;
+import azkaban.utils.Props;
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class FlowLoaderUtils {
+
+ /**
+ * Adds email properties to a flow.
+ *
+ * @param flow the flow
+ * @param prop the prop
+ */
+ public static void addEmailPropsToFlow(final Flow flow, final Props prop) {
+ final List<String> successEmailList =
+ prop.getStringList(CommonJobProperties.SUCCESS_EMAILS,
+ Collections.EMPTY_LIST);
+ final Set<String> successEmail = new HashSet<>();
+ for (final String email : successEmailList) {
+ successEmail.add(email.toLowerCase());
+ }
+
+ final List<String> failureEmailList =
+ prop.getStringList(CommonJobProperties.FAILURE_EMAILS,
+ Collections.EMPTY_LIST);
+ final Set<String> failureEmail = new HashSet<>();
+ for (final String email : failureEmailList) {
+ failureEmail.add(email.toLowerCase());
+ }
+
+ final List<String> notifyEmailList =
+ prop.getStringList(CommonJobProperties.NOTIFY_EMAILS,
+ Collections.EMPTY_LIST);
+ for (String email : notifyEmailList) {
+ email = email.toLowerCase();
+ successEmail.add(email);
+ failureEmail.add(email);
+ }
+
+ flow.addFailureEmails(failureEmail);
+ flow.addSuccessEmails(successEmail);
+ }
+
+ /**
+ * Generate flow loader report validation report.
+ *
+ * @param errors the errors
+ * @return the validation report
+ */
+ public static ValidationReport generateFlowLoaderReport(final Set<String> errors) {
+ final ValidationReport report = new ValidationReport();
+ report.addErrorMsgs(errors);
+ return report;
+ }
+
+ /**
+ * Implements Suffix filter.
+ */
+ public static class SuffixFilter implements FileFilter {
+
+ private final String suffix;
+
+ /**
+ * Instantiates a new Suffix filter.
+ *
+ * @param suffix the suffix
+ */
+ public SuffixFilter(final String suffix) {
+ this.suffix = suffix;
+ }
+
+ @Override
+ public boolean accept(final File pathname) {
+ final String name = pathname.getName();
+
+ return pathname.isFile() && !pathname.isHidden()
+ && name.length() > this.suffix.length() && name.endsWith(this.suffix);
+ }
+ }
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/Project.java b/azkaban-common/src/main/java/azkaban/project/Project.java
index c2f1911..2148fa9 100644
--- a/azkaban-common/src/main/java/azkaban/project/Project.java
+++ b/azkaban-common/src/main/java/azkaban/project/Project.java
@@ -21,8 +21,6 @@ import azkaban.user.Permission;
import azkaban.user.Permission.Type;
import azkaban.user.User;
import azkaban.utils.Pair;
-import azkaban.utils.Props;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collection;
@@ -50,8 +48,6 @@ public class Project {
private String lastModifiedUser;
private String source;
private Map<String, Flow> flows = new HashMap<>();
- private Map<String, Props> jobPropsMap = new HashMap<>();
- private List<Props> propsList = new ArrayList<>();
private Map<String, Object> metadata = new HashMap<>();
public Project(final int id, final String name) {
@@ -455,20 +451,4 @@ public class Project {
public void setVersion(final int version) {
this.version = version;
}
-
- public Map<String, Props> getJobPropsMap() {
- return this.jobPropsMap;
- }
-
- public void setJobPropsMap(final Map<String, Props> jobPropsMap) {
- this.jobPropsMap = ImmutableMap.copyOf(jobPropsMap);
- }
-
- public List<Props> getPropsList() {
- return this.propsList;
- }
-
- public void setPropsList(final List<Props> propsList) {
- this.propsList = ImmutableList.copyOf(propsList);
- }
}
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
index 11bbcaa..cd625d8 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
@@ -212,7 +212,7 @@ public class ExecutableFlowTest {
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
Assert.assertEquals(0, loader.getErrors().size());
-
+ this.project.setFlows(loader.getFlowMap());
this.project.setVersion(123);
}
diff --git a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
index 5f98984..61b020f 100644
--- a/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/AzkabanProjectLoaderTest.java
@@ -23,11 +23,15 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import azkaban.project.validator.ValidationReport;
+import azkaban.project.validator.ValidationStatus;
import azkaban.storage.StorageManager;
import azkaban.user.User;
import azkaban.utils.Props;
import java.io.File;
import java.net.URL;
+import java.util.Map;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -35,6 +39,8 @@ import org.junit.rules.TemporaryFolder;
public class AzkabanProjectLoaderTest {
+ private static final String DIRECTORY_FLOW_REPORT_KEY = "Directory Flow";
+
@Rule
public final TemporaryFolder TEMP_DIR = new TemporaryFolder();
@@ -55,7 +61,7 @@ public class AzkabanProjectLoaderTest {
this.projectLoader = mock(ProjectLoader.class);
this.azkabanProjectLoader = new AzkabanProjectLoader(props, this.projectLoader,
- this.storageManager);
+ this.storageManager, new FlowLoaderFactory(props));
}
@Test
@@ -67,7 +73,14 @@ public class AzkabanProjectLoaderTest {
final File projectZipFile = new File(resource.getPath());
final User uploader = new User("test_user");
- this.azkabanProjectLoader.uploadProject(this.project, projectZipFile, "zip", uploader, null);
+ final Map<String, ValidationReport> validationReportMap =
+ this.azkabanProjectLoader
+ .uploadProject(this.project, projectZipFile, "zip", uploader, null);
+
+ Assert.assertEquals(1, validationReportMap.size());
+ Assert.assertTrue(validationReportMap.containsKey(DIRECTORY_FLOW_REPORT_KEY));
+ Assert.assertEquals(ValidationStatus.PASS,
+ validationReportMap.get(DIRECTORY_FLOW_REPORT_KEY).getStatus());
verify(this.storageManager)
.uploadProject(this.project, this.VERSION + 1, projectZipFile, uploader);
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
index 10eccac..cd6ebec 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryFlowLoaderTest.java
@@ -37,9 +37,9 @@ public class DirectoryFlowLoaderTest {
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("exectest1"));
Assert.assertEquals(0, loader.getErrors().size());
- Assert.assertEquals(5, this.project.getFlowMap().size());
- Assert.assertEquals(2, this.project.getPropsList().size());
- Assert.assertEquals(14, this.project.getJobPropsMap().size());
+ Assert.assertEquals(5, loader.getFlowMap().size());
+ Assert.assertEquals(2, loader.getPropsList().size());
+ Assert.assertEquals(14, loader.getJobPropsMap().size());
}
@Test
@@ -48,9 +48,9 @@ public class DirectoryFlowLoaderTest {
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
Assert.assertEquals(0, loader.getErrors().size());
- Assert.assertEquals(2, this.project.getFlowMap().size());
- Assert.assertEquals(0, this.project.getPropsList().size());
- Assert.assertEquals(9, this.project.getJobPropsMap().size());
+ Assert.assertEquals(2, loader.getFlowMap().size());
+ Assert.assertEquals(0, loader.getPropsList().size());
+ Assert.assertEquals(9, loader.getJobPropsMap().size());
}
@Test
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
new file mode 100644
index 0000000..e55bbb7
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -0,0 +1,67 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+package azkaban.project;
+
+import azkaban.flow.Flow;
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DirectoryYamlFlowLoaderTest {
+
+ private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
+ private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
+ private static final String FLOW_NAME_1 = "basic_flow";
+ private static final String FLOW_NAME_2 = "basic_flow2";
+ private Project project;
+
+ @Before
+ public void setUp() {
+ this.project = new Project(12, "myTestProject");
+ }
+
+ @Test
+ public void testLoadYamlFileFromDirectory() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+
+ loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(BASIC_FLOW_YAML_DIR));
+ Assert.assertEquals(0, loader.getErrors().size());
+ Assert.assertEquals(1, loader.getFlowMap().size());
+ Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_1));
+ final Flow flow = loader.getFlowMap().get(FLOW_NAME_1);
+ final AzkabanFlow azkabanFlow = flow.getAzkabanFlow();
+ Assert.assertEquals(FLOW_NAME_1, azkabanFlow.getName());
+ Assert.assertEquals(4, azkabanFlow.getNodes().size());
+ }
+
+ @Test
+ public void testLoadMultipleYamlFilesFromDirectory() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+
+ loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(MULTIPLE_FLOW_YAML_DIR));
+ Assert.assertEquals(0, loader.getErrors().size());
+ Assert.assertEquals(2, loader.getFlowMap().size());
+ Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_1));
+ Assert.assertTrue(loader.getFlowMap().containsKey(FLOW_NAME_2));
+ final Flow flow2 = loader.getFlowMap().get(FLOW_NAME_2);
+ final AzkabanFlow azkabanFlow2 = flow2.getAzkabanFlow();
+ Assert.assertEquals(FLOW_NAME_2, azkabanFlow2.getName());
+ Assert.assertEquals(3, azkabanFlow2.getNodes().size());
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java
index cb51fcc..ce30c4f 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowBeanLoaderTest.java
@@ -25,7 +25,7 @@ import org.junit.Test;
public class FlowBeanLoaderTest {
public static final String TEST_FLOW_NAME = "sample_flow";
- public static final String TEST_FLOW_YML_FILENAME = TEST_FLOW_NAME + ".yml";
+ public static final String TEST_FLOW_YML_FILENAME = TEST_FLOW_NAME + ".flow";
public static final String SHELL_END = "shell_end";
public static final String SHELL_ECHO = "shell_echo";
public static final String SHELL_BASH = "shell_bash";
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
new file mode 100644
index 0000000..4aceded
--- /dev/null
+++ b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
@@ -0,0 +1,55 @@
+/*
+* Copyright 2017 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the “License”); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+package azkaban.project;
+
+import azkaban.test.executions.ExecutionsTestUtil;
+import azkaban.utils.Props;
+import java.io.File;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class FlowLoaderFactoryTest {
+
+ private static final String FLOW_10_TEST_DIRECTORY = "exectest1";
+ private static final String FLOW_20_TEST_DIRECTORY = "basicflowyamltest";
+ private Project project;
+
+ @Before
+ public void setUp() throws Exception {
+ this.project = new Project(13, "myTestProject");
+ }
+
+ @Test
+ public void testCreateDirectoryFlowLoader() {
+ final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+ final File projectDir = ExecutionsTestUtil.getFlowDir(FLOW_10_TEST_DIRECTORY);
+ final FlowLoader loader = loaderFactory.createFlowLoader(projectDir);
+ Assert.assertTrue(loader instanceof DirectoryFlowLoader);
+ }
+
+ // Todo jamiesjc: add the manifest file with flow version 2.0 and enable this test
+ @Test
+ @Ignore
+ public void testCreateDirectoryYamlFlowLoader() {
+ final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+ final File projectDir = ExecutionsTestUtil.getFlowDir(FLOW_20_TEST_DIRECTORY);
+ final FlowLoader loader = loaderFactory.createFlowLoader(projectDir);
+ Assert.assertTrue(loader instanceof DirectoryYamlFlowLoader);
+ }
+}
diff --git a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
index 46394f3..ac2b049 100644
--- a/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/EmailerTest.java
@@ -55,6 +55,7 @@ public class EmailerTest {
final DirectoryFlowLoader loader = new DirectoryFlowLoader(this.props);
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir("embedded"));
Assert.assertEquals(0, loader.getErrors().size());
+ this.project.setFlows(loader.getFlowMap());
this.project.setVersion(123);
}
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index 465445d..8e0d9c9 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -55,7 +55,8 @@ public class FlowRunnerTestUtil {
project.getName(), sourceDir));
}
- final Map<String, Flow> flowMap = project.getFlowMap();
+ final Map<String, Flow> flowMap = loader.getFlowMap();
+ project.setFlows(flowMap);
FileUtils.copyDirectory(sourceDir, workingDir);
return flowMap;
diff --git a/test/execution-test-data/basicflowyamltest/basic_flow.flow b/test/execution-test-data/basicflowyamltest/basic_flow.flow
new file mode 100644
index 0000000..287f495
--- /dev/null
+++ b/test/execution-test-data/basicflowyamltest/basic_flow.flow
@@ -0,0 +1,40 @@
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - shell_bash
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ # Describe the type of the job
+ type: command
+ config:
+ command: bash ./sample_script.sh
diff --git a/test/execution-test-data/multipleflowyamltest/basic_flow.flow b/test/execution-test-data/multipleflowyamltest/basic_flow.flow
new file mode 100644
index 0000000..287f495
--- /dev/null
+++ b/test/execution-test-data/multipleflowyamltest/basic_flow.flow
@@ -0,0 +1,40 @@
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - shell_bash
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ # Describe the type of the job
+ type: command
+ config:
+ command: bash ./sample_script.sh
diff --git a/test/execution-test-data/multipleflowyamltest/basic_flow2.flow b/test/execution-test-data/multipleflowyamltest/basic_flow2.flow
new file mode 100644
index 0000000..e87b712
--- /dev/null
+++ b/test/execution-test-data/multipleflowyamltest/basic_flow2.flow
@@ -0,0 +1,33 @@
+# All flow level properties here
+config:
+ flow-level-parameter: value
+
+# This section defines the list of jobs
+# A node can be a job or a flow
+# In this example, all nodes are jobs
+nodes:
+ # Job definition
+ # The job definition is like a YAMLified version of properties file
+ # with one major difference. All custom properties are now clubbed together
+ # in a config section in the definition.
+ # The first line describes the name of the job
+ - name: shell_end
+ # Describe the type of the job
+ type: noop
+
+ # List the dependencies of the job
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ # Describe the type of the job
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ # Describe the type of the job
+ type: command
+ config:
+ command: pwd