Details
diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java
index 3616edd..bf8b529 100644
--- a/az-core/src/main/java/azkaban/Constants.java
+++ b/az-core/src/main/java/azkaban/Constants.java
@@ -40,6 +40,7 @@ public class Constants {
public static final String FLOW_FILE_SUFFIX = ".flow";
// Flow 2.0 node type
+ public static final String NODE_TYPE = "type";
public static final String FLOW_NODE_TYPE = "flow";
// Flow 2.0 flow and job path delimiter
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index edb5bf4..b92a183 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -93,6 +93,7 @@ public class DirectoryFlowLoader implements FlowLoader {
*
* @return Set of error strings.
*/
+ @Override
public Set<String> getErrors() {
return this.errors;
}
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 93ae347..ca3e1d7 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -72,6 +72,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
*
* @return Set of error strings.
*/
+ @Override
public Set<String> getErrors() {
return this.errors;
}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoader.java b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
index 8fd4f91..0e7d11c 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoader.java
@@ -20,6 +20,7 @@ import azkaban.flow.Flow;
import azkaban.project.validator.ValidationReport;
import java.io.File;
import java.util.Map;
+import java.util.Set;
/**
* Interface to load project flows.
@@ -41,4 +42,11 @@ public interface FlowLoader {
* @return Map of flow name to Flow.
*/
Map<String, Flow> getFlowMap();
+
+ /**
+ * Returns errors caught when loading flows.
+ *
+ * @return Set of error strings.
+ */
+ Set<String> getErrors();
}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index 390888f..aa083be 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -52,7 +52,7 @@ public class FlowLoaderUtils {
try {
final NodeBean nodeBean = loader.load(flowFile);
final String[] pathList = path.split(Constants.PATH_DELIMITER);
- if (findPropsFromNodeBean(loader, nodeBean, pathList, 0, propsList)) {
+ if (findPropsFromNodeBean(nodeBean, pathList, 0, propsList)) {
if (!propsList.isEmpty()) {
return propsList.get(0);
} else {
@@ -68,22 +68,21 @@ public class FlowLoaderUtils {
/**
* Helper method to recursively find props from node bean.
*
- * @param loader the loader
* @param nodeBean the node bean
* @param pathList the path list
* @param idx the idx
* @param propsList the props list
* @return the boolean
*/
- public static boolean findPropsFromNodeBean(final NodeBeanLoader loader, final NodeBean nodeBean,
+ public static boolean findPropsFromNodeBean(final NodeBean nodeBean,
final String[] pathList, final int idx, final List<Props> propsList) {
if (idx < pathList.length && nodeBean.getName().equals(pathList[idx])) {
if (idx == pathList.length - 1) {
- propsList.add(loader.getNodeProps(nodeBean));
+ propsList.add(nodeBean.getProps());
return true;
}
for (final NodeBean bean : nodeBean.getNodes()) {
- if (findPropsFromNodeBean(loader, bean, pathList, idx + 1, propsList)) {
+ if (findPropsFromNodeBean(bean, pathList, idx + 1, propsList)) {
return true;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
index 2cc55d5..8e17609 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectHandlerSet.java
@@ -342,6 +342,10 @@ class JdbcProjectHandlerSet {
"SELECT flow_file FROM project_flow_files WHERE "
+ "project_id=? AND project_version=? AND flow_name=? AND flow_version=?";
+ public static String SELECT_ALL_FLOW_FILES =
+ "SELECT flow_file FROM project_flow_files WHERE "
+ + "project_id=? AND project_version=?";
+
@Override
public List<byte[]> handle(final ResultSet rs) throws SQLException {
if (!rs.next()) {
diff --git a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
index a69ca0f..2ca61bd 100644
--- a/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
+++ b/azkaban-common/src/main/java/azkaban/project/JdbcProjectImpl.java
@@ -1058,4 +1058,22 @@ public class JdbcProjectImpl implements ProjectLoader {
projectVersion + ", flow " + flowName + ".", e);
}
}
+
+ @Override
+ public boolean isFlowFileUploaded(final int projectId, final int projectVersion)
+ throws ProjectManagerException {
+ final FlowFileResultHandler handler = new FlowFileResultHandler();
+ final List<byte[]> data;
+
+ try {
+ data = this.dbOperator
+ .query(FlowFileResultHandler.SELECT_ALL_FLOW_FILES, handler,
+ projectId, projectVersion);
+ } catch (final SQLException e) {
+ logger.error(e);
+ throw new ProjectManagerException("Failed to query uploaded flow files ", e);
+ }
+
+ return !data.isEmpty();
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBean.java b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
index eb62481..5cafc37 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBean.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBean.java
@@ -17,6 +17,8 @@
package azkaban.project;
+import azkaban.Constants;
+import azkaban.utils.Props;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -72,6 +74,12 @@ public class NodeBean implements Serializable {
this.nodes = nodes;
}
+ public Props getProps() {
+ final Props props = new Props(null, this.getConfig());
+ props.put(Constants.NODE_TYPE, this.getType());
+ return props;
+ }
+
@Override
public String toString() {
return "NodeBean{" +
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 00443aa..079924a 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -20,7 +20,6 @@ package azkaban.project;
import static com.google.common.base.Preconditions.checkArgument;
import azkaban.Constants;
-import azkaban.utils.Props;
import com.google.common.io.Files;
import java.io.File;
import java.io.FileInputStream;
@@ -35,15 +34,13 @@ import org.yaml.snakeyaml.Yaml;
*/
public class NodeBeanLoader {
- private static final String NODE_BEAN_TYPE_FLOW = "flow";
-
public NodeBean load(final File flowFile) throws FileNotFoundException {
checkArgument(flowFile.exists());
checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
final NodeBean nodeBean = new Yaml().loadAs(new FileInputStream(flowFile), NodeBean.class);
nodeBean.setName(getFlowName(flowFile));
- nodeBean.setType(NODE_BEAN_TYPE_FLOW);
+ nodeBean.setType(Constants.FLOW_NODE_TYPE);
return nodeBean;
}
@@ -67,10 +64,10 @@ public class NodeBeanLoader {
}
public AzkabanNode toAzkabanNode(final NodeBean nodeBean) {
- if (nodeBean.getType().equals(NODE_BEAN_TYPE_FLOW)) {
+ if (nodeBean.getType().equals(Constants.FLOW_NODE_TYPE)) {
return new AzkabanFlow.AzkabanFlowBuilder()
.setName(nodeBean.getName())
- .setProps(new Props(null, nodeBean.getConfig()))
+ .setProps(nodeBean.getProps())
.setDependsOn(nodeBean.getDependsOn())
.setNodes(
nodeBean.getNodes().stream().map(this::toAzkabanNode).collect(Collectors.toList()))
@@ -78,17 +75,13 @@ public class NodeBeanLoader {
} else {
return new AzkabanJob.AzkabanJobBuilder()
.setName(nodeBean.getName())
- .setProps(new Props(null, nodeBean.getConfig()))
+ .setProps(nodeBean.getProps())
.setType(nodeBean.getType())
.setDependsOn(nodeBean.getDependsOn())
.build();
}
}
- public Props getNodeProps(final NodeBean nodeBean) {
- return new Props(null, nodeBean.getConfig());
- }
-
public String getFlowName(final File flowFile) {
checkArgument(flowFile.exists());
checkArgument(flowFile.getName().endsWith(Constants.FLOW_FILE_SUFFIX));
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
index 42062b6..ae332a7 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectLoader.java
@@ -219,4 +219,10 @@ public interface ProjectLoader {
int getLatestFlowVersion(int projectId, int projectVersion, String flowName)
throws ProjectManagerException;
+ /**
+ * Gets all flow files that's uploaded.
+ */
+ boolean isFlowFileUploaded(int projectId, int projectVersion)
+ throws ProjectManagerException;
+
}
diff --git a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
index ba64698..ccdbbc0 100644
--- a/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/NodeBeanLoaderTest.java
@@ -136,13 +136,15 @@ public class NodeBeanLoaderTest {
final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
- assertThat(shellEnd.getProps().size()).isEqualTo(0);
+ assertThat(shellEnd.getProps().size()).isEqualTo(1);
+ assertThat(shellEnd.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_NOOP);
assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, SHELL_BASH);
final AzkabanJob shellEcho = (AzkabanJob) flow.getNode(SHELL_ECHO);
assertThat(shellEcho.getName()).isEqualTo(SHELL_ECHO);
assertThat(shellEcho.getType()).isEqualTo(TYPE_COMMAND);
- assertThat(shellEcho.getProps().size()).isEqualTo(1);
+ assertThat(shellEcho.getProps().size()).isEqualTo(2);
+ assertThat(shellEcho.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
assertThat(shellEcho.getProps().get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
}
@@ -160,7 +162,8 @@ public class NodeBeanLoaderTest {
final AzkabanJob shellEnd = (AzkabanJob) flow.getNode(SHELL_END);
assertThat(shellEnd.getName()).isEqualTo(SHELL_END);
assertThat(shellEnd.getType()).isEqualTo(TYPE_NOOP);
- assertThat(shellEnd.getProps().size()).isEqualTo(0);
+ assertThat(shellEnd.getProps().size()).isEqualTo(1);
+ assertThat(shellEnd.getProps().get(Constants.NODE_TYPE)).isEqualTo(TYPE_NOOP);
assertThat(shellEnd.getDependsOn()).contains(SHELL_PWD, SHELL_ECHO, EMBEDDED_FLOW1);
final AzkabanFlow embeddedFlow1 = (AzkabanFlow) flow.getNode(EMBEDDED_FLOW1);
@@ -189,7 +192,8 @@ public class NodeBeanLoaderTest {
public void testGetFlowProps() {
final Props flowProps = FlowLoaderUtils.getPropsFromYamlFile(BASIC_FLOW_NAME,
ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
- assertThat(flowProps.size()).isEqualTo(1);
+ assertThat(flowProps.size()).isEqualTo(2);
+ assertThat(flowProps.get(Constants.NODE_TYPE)).isEqualTo(Constants.FLOW_NODE_TYPE);
assertThat(flowProps.get(FLOW_CONFIG_KEY)).isEqualTo(FLOW_CONFIG_VALUE);
}
@@ -198,7 +202,8 @@ public class NodeBeanLoaderTest {
final Props jobProps = FlowLoaderUtils
.getPropsFromYamlFile(BASIC_FLOW_NAME + Constants.PATH_DELIMITER + SHELL_ECHO,
ExecutionsTestUtil.getFlowFile(BASIC_FLOW_YML_TEST_DIR, BASIC_FLOW_YML_FILE));
- assertThat(jobProps.size()).isEqualTo(1);
+ assertThat(jobProps.size()).isEqualTo(2);
+ assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
}
@@ -216,21 +221,24 @@ public class NodeBeanLoaderTest {
String jobPrefix = EMBEDDED_FLOW_NAME + Constants.PATH_DELIMITER;
Props jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_ECHO,
ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
- assertThat(jobProps.size()).isEqualTo(1);
+ assertThat(jobProps.size()).isEqualTo(2);
+ assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND);
// Get job props from first level embedded flow
jobPrefix = jobPrefix + EMBEDDED_FLOW1 + Constants.PATH_DELIMITER;
jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_ECHO,
ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
- assertThat(jobProps.size()).isEqualTo(1);
+ assertThat(jobProps.size()).isEqualTo(2);
+ assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(ECHO_COMMAND_1);
// Get job props from second level embedded flow
jobPrefix = jobPrefix + EMBEDDED_FLOW2 + Constants.PATH_DELIMITER;
jobProps = FlowLoaderUtils.getPropsFromYamlFile(jobPrefix + SHELL_PWD,
ExecutionsTestUtil.getFlowFile(EMBEDDED_FLOW_YML_TEST_DIR, EMBEDDED_FLOW_YML_FILE));
- assertThat(jobProps.size()).isEqualTo(1);
+ assertThat(jobProps.size()).isEqualTo(2);
+ assertThat(jobProps.get(Constants.NODE_TYPE)).isEqualTo(TYPE_COMMAND);
assertThat(jobProps.get(TYPE_COMMAND)).isEqualTo(PWD_COMMAND);
}
}
diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
index 9020f30..8b81297 100644
--- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java
@@ -18,6 +18,7 @@ package azkaban.execapp;
import static azkaban.Constants.ConfigurationKeys.AZKABAN_SERVER_HOST_NAME;
+import azkaban.Constants;
import azkaban.ServiceProvider;
import azkaban.event.Event;
import azkaban.event.EventData;
@@ -41,6 +42,7 @@ import azkaban.flow.FlowUtils;
import azkaban.jobExecutor.ProcessJob;
import azkaban.jobtype.JobTypeManager;
import azkaban.metric.MetricReportManager;
+import azkaban.project.FlowLoaderUtils;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
import azkaban.sla.SlaOption;
@@ -48,6 +50,7 @@ import azkaban.spi.AzkabanEventReporter;
import azkaban.spi.EventType;
import azkaban.utils.Props;
import azkaban.utils.SwapQueue;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.IOException;
@@ -100,6 +103,8 @@ public class FlowRunner extends EventHandler implements Runnable {
// Thread safe swap queue for finishedExecutions.
private final SwapQueue<ExecutableNode> finishedNodes;
private final AzkabanEventReporter azkabanEventReporter;
+ // Flag to indicate whether to interpret the flow as the new Flow 2.0 definition.
+ private boolean isAzkabanFlowVersion20 = false;
private Logger logger;
private Appender flowAppender;
private File logFile;
@@ -166,6 +171,9 @@ public class FlowRunner extends EventHandler implements Runnable {
// where the uninitialized logger is used in flow preparing state
createLogger(this.flow.getFlowId());
this.azkabanEventReporter = azkabanEventReporter;
+
+ // Todo jamiesjc: Enable below check after DB change of project_flow_files table is rolled out.
+ // this.isAzkabanFlowVersion20 = checkAzkabanFlowVersion();
}
public FlowRunner setFlowWatcher(final FlowWatcher watcher) {
@@ -194,6 +202,10 @@ public class FlowRunner extends EventHandler implements Runnable {
return this.execDir;
}
+ public void setAzkabanFlowVersion20(final boolean azkabanFlowVersion20) {
+ this.isAzkabanFlowVersion20 = azkabanFlowVersion20;
+ }
+
@Override
public void run() {
try {
@@ -206,7 +218,9 @@ public class FlowRunner extends EventHandler implements Runnable {
this.logger.info("Updating initial flow directory.");
updateFlow();
this.logger.info("Fetching job and shared properties.");
- loadAllProperties();
+ if (!this.isAzkabanFlowVersion20) {
+ loadAllProperties();
+ }
this.fireEventListeners(
Event.create(this, EventType.FLOW_STARTED, new EventData(this.getExecutableFlow())));
@@ -240,6 +254,10 @@ public class FlowRunner extends EventHandler implements Runnable {
}
}
+ private boolean checkAzkabanFlowVersion() {
+ return this.projectLoader.isFlowFileUploaded(this.flow.getProjectId(), this.flow.getVersion());
+ }
+
private void setupFlowExecution() {
final int projectId = this.flow.getProjectId();
final int version = this.flow.getVersion();
@@ -248,11 +266,19 @@ public class FlowRunner extends EventHandler implements Runnable {
// Add a bunch of common azkaban properties
Props commonFlowProps = FlowUtils.addCommonFlowProperties(null, this.flow);
- if (this.flow.getJobSource() != null) {
- final String source = this.flow.getJobSource();
- final Props flowProps = this.sharedProps.get(source);
- flowProps.setParent(commonFlowProps);
- commonFlowProps = flowProps;
+ if (this.isAzkabanFlowVersion20) {
+ final Props flowProps = loadFlowPropsFromYamlFile();
+ if (flowProps != null) {
+ flowProps.setParent(commonFlowProps);
+ commonFlowProps = flowProps;
+ }
+ } else {
+ if (this.flow.getJobSource() != null) {
+ final String source = this.flow.getJobSource();
+ final Props flowProps = this.sharedProps.get(source);
+ flowProps.setParent(commonFlowProps);
+ commonFlowProps = flowProps;
+ }
}
// If there are flow overrides, we apply them now.
@@ -646,13 +672,16 @@ public class FlowRunner extends EventHandler implements Runnable {
}
Props props = null;
- // 1. Shared properties (i.e. *.properties) for the jobs only. This takes
- // the
- // least precedence
- if (!(node instanceof ExecutableFlowBase)) {
- final String sharedProps = node.getPropsSource();
- if (sharedProps != null) {
- props = this.sharedProps.get(sharedProps);
+
+ if (!this.isAzkabanFlowVersion20) {
+ // 1. Shared properties (i.e. *.properties) for the jobs only. This takes
+ // the
+ // least precedence
+ if (!(node instanceof ExecutableFlowBase)) {
+ final String sharedProps = node.getPropsSource();
+ if (sharedProps != null) {
+ props = this.sharedProps.get(sharedProps);
+ }
}
}
@@ -693,37 +722,45 @@ public class FlowRunner extends EventHandler implements Runnable {
private Props loadJobProps(final ExecutableNode node) throws IOException {
Props props = null;
- final String source = node.getJobSource();
- if (source == null) {
- return null;
- }
-
- // load the override props if any
- try {
- props =
- this.projectLoader.fetchProjectProperty(this.flow.getProjectId(),
- this.flow.getVersion(), node.getId() + ".jor");
- } catch (final ProjectManagerException e) {
- e.printStackTrace();
- this.logger.error("Error loading job override property for job "
- + node.getId());
- }
+ if (this.isAzkabanFlowVersion20) {
+ props = loadJobPropsFromYamlFile(node);
+ if (props == null) {
+ this.logger.info("Job props loaded from yaml file is empty for job " + node.getId());
+ return props;
+ }
+ } else {
+ final String source = node.getJobSource();
+ if (source == null) {
+ return null;
+ }
- final File path = new File(this.execDir, source);
- if (props == null) {
- // if no override prop, load the original one on disk
+ // load the override props if any
try {
- props = new Props(null, path);
- } catch (final IOException e) {
+ props =
+ this.projectLoader.fetchProjectProperty(this.flow.getProjectId(),
+ this.flow.getVersion(), node.getId() + ".jor");
+ } catch (final ProjectManagerException e) {
e.printStackTrace();
- this.logger.error("Error loading job file " + source + " for job "
+ this.logger.error("Error loading job override property for job "
+ node.getId());
}
- }
- // setting this fake source as this will be used to determine the location
- // of log files.
- if (path.getPath() != null) {
- props.setSource(path.getPath());
+
+ final File path = new File(this.execDir, source);
+ if (props == null) {
+ // if no override prop, load the original one on disk
+ try {
+ props = new Props(null, path);
+ } catch (final IOException e) {
+ e.printStackTrace();
+ this.logger.error("Error loading job file " + source + " for job "
+ + node.getId());
+ }
+ }
+ // setting this fake source as this will be used to determine the location
+ // of log files.
+ if (path.getPath() != null) {
+ props.setSource(path.getPath());
+ }
}
customizeJobProperties(props);
@@ -731,6 +768,49 @@ public class FlowRunner extends EventHandler implements Runnable {
return props;
}
+ private Props loadFlowPropsFromYamlFile() {
+ final File flowFile = getFlowFile();
+ final Props flowProps = FlowLoaderUtils.getPropsFromYamlFile(this.flow.getId(), flowFile);
+ if (flowFile != null && flowFile.exists()) {
+ flowFile.delete();
+ }
+ return flowProps;
+ }
+
+ private Props loadJobPropsFromYamlFile(final ExecutableNode node) {
+ final File flowFile = getFlowFile();
+ final String jobPath =
+ node.getParentFlow().getFlowId() + Constants.PATH_DELIMITER + node.getId();
+ final Props props = FlowLoaderUtils.getPropsFromYamlFile(jobPath, flowFile);
+ if (flowFile != null && flowFile.exists()) {
+ flowFile.delete();
+ }
+ return props;
+ }
+
+ private File getFlowFile() {
+ File flowFile = null;
+ String source = null;
+ try {
+ final List<FlowProps> flowPropsList = ImmutableList.copyOf(this.flow.getFlowProps());
+ // There should be exact one source(file name) for each flow file.
+ if (!flowPropsList.isEmpty()) {
+ source = flowPropsList.get(0).getSource();
+ } else {
+ this.logger.error("Failed to get flow file source, flow props is empty.");
+ return null;
+ }
+ final int flowVersion = this.projectLoader
+ .getLatestFlowVersion(this.flow.getProjectId(), this.flow.getVersion(), source);
+ flowFile = this.projectLoader
+ .getUploadedFlowFile(this.flow.getProjectId(), this.flow.getVersion(), flowVersion,
+ source);
+ } catch (final ProjectManagerException e) {
+ this.logger.error("Failed to get flow file " + source, e);
+ }
+ return flowFile;
+ }
+
@SuppressWarnings("FutureReturnValueIgnored")
private void runExecutableNode(final ExecutableNode node) throws IOException {
// Collect output props from the job's dependencies.
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index 8c2feb9..5dd2fd4 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -16,16 +16,22 @@
package azkaban.execapp;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.InteractiveTestJob;
import azkaban.executor.Status;
+import azkaban.project.Project;
+import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.utils.Props;
+import java.io.File;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.io.FileUtils;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
/**
@@ -45,23 +51,52 @@ import org.junit.Test;
*/
public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {
+ private static final String EXEC_FLOW_DIR = "execpropstest";
+ private static final String FLOW_YAML_DIR = "loadpropsflowyamltest";
+ private static final String FLOW_NAME = "job3";
+ private static final String FLOW_YAML_FILE = FLOW_NAME + ".flow";
private FlowRunnerTestUtil testUtil;
- @Before
- public void setUp() throws Exception {
- this.testUtil = new FlowRunnerTestUtil("execpropstest", this.temporaryFolder);
- }
-
/**
* Tests the basic flow resolution. Flow is defined in execpropstest
*/
@Test
public void testPropertyResolution() throws Exception {
+ this.testUtil = new FlowRunnerTestUtil(EXEC_FLOW_DIR, this.temporaryFolder);
+ assertProperties(false);
+ }
+
+ /**
+ * Tests the YAML flow resolution. Flow is defined in loadpropsflowyamltest
+ */
+ @Test
+ public void testYamlFilePropertyResolution() throws Exception {
+ this.testUtil = new FlowRunnerTestUtil(FLOW_YAML_DIR, this.temporaryFolder);
+ final Project project = this.testUtil.getProject();
+ when(this.testUtil.getProjectLoader().isFlowFileUploaded(project.getId(), project.getVersion()))
+ .thenReturn(true);
+ when(this.testUtil.getProjectLoader()
+ .getLatestFlowVersion(project.getId(), project.getVersion(), FLOW_YAML_FILE)).thenReturn(1);
+ doAnswer(invocation -> {
+ final File flowFile = this.temporaryFolder.newFile(FLOW_YAML_FILE);
+ FileUtils.copyFile(ExecutionsTestUtil.getFlowFile(FLOW_YAML_DIR, FLOW_YAML_FILE), flowFile);
+ return flowFile;
+ }).when(this.testUtil.getProjectLoader())
+ .getUploadedFlowFile(project.getId(), project.getVersion(), 1, FLOW_YAML_FILE);
+ assertProperties(true);
+ }
+
+ /**
+ * Helper method to test the flow property resolution.
+ */
+ private void assertProperties(final boolean isAzkabanFlowVersion20) throws Exception {
final HashMap<String, String> flowProps = new HashMap<>();
flowProps.put("props7", "flow7");
flowProps.put("props6", "flow6");
flowProps.put("props5", "flow5");
- final FlowRunner runner = this.testUtil.createFromFlowMap("job3", flowProps);
+ final FlowRunner runner = this.testUtil.createFromFlowMap(FLOW_NAME, flowProps);
+ // Todo jamiesjc: remove below line after project_flow_files DB change is rolled out.
+ runner.setAzkabanFlowVersion20(isAzkabanFlowVersion20);
final Map<String, ExecutableNode> nodeMap = new HashMap<>();
createNodeMap(runner.getExecutableFlow(), nodeMap);
final ExecutableFlow flow = runner.getExecutableFlow();
@@ -134,7 +169,7 @@ public class FlowRunnerPropertyResolutionTest extends FlowRunnerTestBase {
job4GeneratedProps.put("props6", "g4job6");
InteractiveTestJob.getTestJob("innerflow:job4").succeedJob(
job4GeneratedProps);
- assertStatus(flow, "job3", Status.RUNNING);
+ assertStatus(flow, FLOW_NAME, Status.RUNNING);
final Props job3Props = nodeMap.get("job3").getInputProps();
Assert.assertEquals("job3", job3Props.get("props3"));
Assert.assertEquals("g4job6", job3Props.get("props6"));
diff --git a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
index c170f6e..64d421e 100644
--- a/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
+++ b/azkaban-exec-server/src/test/java/azkaban/execapp/FlowRunnerTestUtil.java
@@ -35,7 +35,8 @@ import azkaban.executor.Status;
import azkaban.flow.Flow;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypePluginSet;
-import azkaban.project.DirectoryFlowLoader;
+import azkaban.project.FlowLoader;
+import azkaban.project.FlowLoaderFactory;
import azkaban.project.Project;
import azkaban.project.ProjectLoader;
import azkaban.project.ProjectManagerException;
@@ -58,6 +59,7 @@ public class FlowRunnerTestUtil {
private final File workingDir;
private final JobTypeManager jobtypeManager;
private final File projectDir;
+ private final ProjectLoader projectLoader;
private ExecutorLoader executorLoader;
public FlowRunnerTestUtil(final String flowName, final TemporaryFolder temporaryFolder)
@@ -68,9 +70,10 @@ public class FlowRunnerTestUtil {
this.project = new Project(1, "testProject");
this.flowMap = FlowRunnerTestUtil
- .prepareProject(this.project, ExecutionsTestUtil.getFlowDir(flowName), this.workingDir);
+ .prepareProject(this.project, this.projectDir, this.workingDir);
this.executorLoader = mock(ExecutorLoader.class);
+ this.projectLoader = mock(ProjectLoader.class);
when(this.executorLoader.updateExecutableReference(anyInt(), anyLong())).thenReturn(true);
Utils.initServiceProvider();
@@ -97,7 +100,9 @@ public class FlowRunnerTestUtil {
public static Map<String, Flow> prepareProject(final Project project, final File sourceDir,
final File workingDir)
throws ProjectManagerException, IOException {
- final DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props());
+ final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+ final FlowLoader loader = loaderFactory.createFlowLoader(sourceDir);
+
loader.loadProjectFlow(project, sourceDir);
if (!loader.getErrors().isEmpty()) {
for (final String error : loader.getErrors()) {
@@ -246,7 +251,7 @@ public class FlowRunnerTestUtil {
exFlow.getExecutionOptions().addAllFlowParameters(flowParams);
this.executorLoader.uploadExecutableFlow(exFlow);
final FlowRunner runner =
- new FlowRunner(exFlow, this.executorLoader, mock(ProjectLoader.class),
+ new FlowRunner(exFlow, this.executorLoader, this.projectLoader,
this.jobtypeManager, azkabanProps, null);
if (eventCollector != null) {
runner.addListener(eventCollector);
@@ -262,4 +267,11 @@ public class FlowRunnerTestUtil {
this.executorLoader = executorLoader;
}
+ public ProjectLoader getProjectLoader() {
+ return this.projectLoader;
+ }
+
+ public Project getProject() {
+ return this.project;
+ }
}
diff --git a/test/execution-test-data/basicflowyamltest/Archive.zip b/test/execution-test-data/basicflowyamltest/Archive.zip
index 7bc2077..5137202 100644
Binary files a/test/execution-test-data/basicflowyamltest/Archive.zip and b/test/execution-test-data/basicflowyamltest/Archive.zip differ
diff --git a/test/execution-test-data/basicflowyamltest/sample_script.sh b/test/execution-test-data/basicflowyamltest/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/basicflowyamltest/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."
diff --git a/test/execution-test-data/embeddedflowyamltest/sample_script.sh b/test/execution-test-data/embeddedflowyamltest/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/embeddedflowyamltest/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."
diff --git a/test/execution-test-data/loadpropsflowyamltest/job3.flow b/test/execution-test-data/loadpropsflowyamltest/job3.flow
new file mode 100644
index 0000000..5a6ca5e
--- /dev/null
+++ b/test/execution-test-data/loadpropsflowyamltest/job3.flow
@@ -0,0 +1,50 @@
+---
+config:
+ props3: moo3
+ props4: moo4
+ props5: moo5
+ props1: shared1
+ props2: shared2
+ props6: shared6
+
+nodes:
+ - name: job2
+ type: test
+ config:
+ props4: shared4
+ props8: shared8
+ props2: job2
+ props7: job7
+
+ - name: innerflow
+ type: flow
+ dependsOn:
+ - job2
+ config:
+ props5: innerflow5
+ props6: innerflow6
+ props8: innerflow8
+
+ nodes:
+ - name: job4
+ type: test
+ dependsOn:
+ - job1
+ config:
+ props4: shared4
+ props8: job8
+ props9: job9
+
+ - name: job1
+ type: test
+ config:
+ props1: job1
+ props2: job2
+ props8: job8
+
+ - name: job3
+ type: test
+ dependsOn:
+ - innerflow
+ config:
+ props3: job3
diff --git a/test/execution-test-data/loadpropsflowyamltest/loadpropsflow.project b/test/execution-test-data/loadpropsflowyamltest/loadpropsflow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/loadpropsflowyamltest/loadpropsflow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/loadpropsflowyamltest/sample_script.sh b/test/execution-test-data/loadpropsflowyamltest/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/loadpropsflowyamltest/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."
diff --git a/test/execution-test-data/multipleembeddedflowyamltest/sample_script.sh b/test/execution-test-data/multipleembeddedflowyamltest/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/multipleembeddedflowyamltest/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."
diff --git a/test/execution-test-data/multipleflowyamltest/sample_script.sh b/test/execution-test-data/multipleflowyamltest/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/multipleflowyamltest/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."