azkaban-aplcache
Changes
test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow.project 1(+1 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index 6148182..78588c5 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull;
import azkaban.Constants;
import azkaban.Constants.ConfigurationKeys;
import azkaban.flow.Flow;
+import azkaban.project.FlowLoaderUtils.DirFilter;
import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.project.ProjectLogEvent.EventType;
import azkaban.project.validator.ValidationReport;
@@ -211,13 +212,7 @@ class AzkabanProjectLoader {
this.projectLoader.uploadProjectProperties(project, directoryFlowLoader.getPropsList());
} else if (loader instanceof DirectoryYamlFlowLoader) {
- final File[] flowFiles = projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX));
- for (final File file : flowFiles) {
- final int newFlowVersion = this.projectLoader
- .getLatestFlowVersion(project.getId(), newProjectVersion, file.getName()) + 1;
- this.projectLoader
- .uploadFlowFile(project.getId(), newProjectVersion, file, newFlowVersion);
- }
+ uploadFlowFilesRecursively(projectDir, project, newProjectVersion);
} else {
throw new ProjectManagerException("Invalid type of flow loader.");
}
@@ -227,6 +222,19 @@ class AzkabanProjectLoader {
}
}
+ private void uploadFlowFilesRecursively(final File projectDir, final Project project, final int
+ newProjectVersion) {
+ for (final File file : projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX))) {
+ final int newFlowVersion = this.projectLoader
+ .getLatestFlowVersion(project.getId(), newProjectVersion, file.getName()) + 1;
+ this.projectLoader
+ .uploadFlowFile(project.getId(), newProjectVersion, file, newFlowVersion);
+ }
+ for (final File file : projectDir.listFiles(new DirFilter())) {
+ uploadFlowFilesRecursively(file, project, newProjectVersion);
+ }
+ }
+
private void cleanUpProjectOldInstallations(final Project project)
throws ProjectManagerException{
log.info("Cleaning up old install files older than "
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index db2c2bb..326c1bf 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -22,11 +22,11 @@ import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.flow.SpecialJobTypes;
+import azkaban.project.FlowLoaderUtils.DirFilter;
import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
import java.io.File;
-import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
*/
public class DirectoryFlowLoader implements FlowLoader {
- private static final DirFilter DIR_FILTER = new DirFilter();
private static final String PROPERTY_SUFFIX = ".properties";
private static final String JOB_SUFFIX = ".job";
@@ -211,8 +210,7 @@ public class DirectoryFlowLoader implements FlowLoader {
}
}
- final File[] subDirs = dir.listFiles(DIR_FILTER);
- for (final File file : subDirs) {
+ for (final File file : dir.listFiles(new DirFilter())) {
loadProjectFromDir(base, file, parent);
}
}
@@ -394,13 +392,4 @@ public class DirectoryFlowLoader implements FlowLoader {
private String getRelativeFilePath(final String basePath, final String filePath) {
return filePath.substring(basePath.length() + 1);
}
-
- private static class DirFilter implements FileFilter {
-
- @Override
- public boolean accept(final File pathname) {
- return pathname.isDirectory();
- }
- }
-
}
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 71599cb..91bc74b 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -21,6 +21,7 @@ import azkaban.flow.Edge;
import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
+import azkaban.project.FlowLoaderUtils.DirFilter;
import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.project.validator.ValidationReport;
import azkaban.utils.Props;
@@ -112,14 +113,22 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
+ ". Duplicate nodes found or dependency undefined.");
} else {
final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
- final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
- this.flowMap.put(flow.getId(), flow);
+ if (this.flowMap.containsKey(azkabanFlow.getName())) {
+ this.errors.add("Duplicate flows found in the project with name " + azkabanFlow
+ .getName());
+ } else {
+ final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
+ this.flowMap.put(flow.getId(), flow);
+ }
}
} catch (final Exception e) {
this.errors.add("Error loading flow yaml file " + file.getName() + ":"
+ e.getMessage());
}
}
+ for (final File file : projectDir.listFiles(new DirFilter())) {
+ convertYamlFiles(file);
+ }
}
private Flow convertAzkabanFlowToFlow(final AzkabanFlow azkabanFlow, final String flowName,
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
index 439f8c0..855ca6b 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
@@ -19,6 +19,7 @@ package azkaban.project;
import static java.util.Objects.requireNonNull;
import azkaban.Constants;
+import azkaban.project.FlowLoaderUtils.DirFilter;
import azkaban.project.FlowLoaderUtils.SuffixFilter;
import azkaban.utils.Props;
import java.io.File;
@@ -53,7 +54,15 @@ public class FlowLoaderFactory {
* @return the flow loader
*/
public FlowLoader createFlowLoader(final File projectDir) throws ProjectManagerException {
+ if (checkForValidProjectYamlFile(projectDir)) {
+ return new DirectoryYamlFlowLoader(this.props);
+ } else {
+ return new DirectoryFlowLoader(this.props);
+ }
+ }
+ private boolean checkForValidProjectYamlFile(final File projectDir) throws
+ ProjectManagerException {
final File[] projectFileList = projectDir.listFiles(new SuffixFilter(Constants
.PROJECT_FILE_SUFFIX));
@@ -83,12 +92,17 @@ public class FlowLoaderFactory {
if (azkabanProject.get(Constants.ConfigurationKeys.AZKABAN_FLOW_VERSION).equals
(Constants.AZKABAN_FLOW_VERSION_2_0)) {
- return new DirectoryYamlFlowLoader(this.props);
+ return true;
} else {
throw new ProjectManagerException("Invalid azkaban-flow-version in the project YAML file.");
}
} else {
- return new DirectoryFlowLoader(this.props);
+ for (final File file : projectDir.listFiles(new DirFilter())) {
+ if (checkForValidProjectYamlFile(file)) {
+ return true;
+ }
+ }
+ return false;
}
}
}
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index eee2df3..29b5ea4 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -332,4 +332,15 @@ public class FlowLoaderUtils {
&& name.length() > this.suffix.length() && name.endsWith(this.suffix);
}
}
+
+ /**
+ * Implements Directory filter.
+ */
+ public static class DirFilter implements FileFilter {
+
+ @Override
+ public boolean accept(final File pathname) {
+ return pathname.isDirectory();
+ }
+ }
}
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 96cd07d..978b4ea 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -35,6 +35,7 @@ public class DirectoryYamlFlowLoaderTest {
private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
+ private static final String RECURSIVE_DIRECTORY_FLOW_YAML_DIR = "recursivedirectoryyamltest";
private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowyamltest";
private static final String MULTIPLE_EMBEDDED_FLOW_YAML_DIR = "multipleembeddedflowyamltest";
private static final String CYCLE_FOUND_YAML_DIR = "cyclefoundyamltest";
@@ -86,6 +87,16 @@ public class DirectoryYamlFlowLoaderTest {
}
@Test
+ public void testLoadYamlFileRecursively() {
+ final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+ loader.loadProjectFlow(this.project,
+ ExecutionsTestUtil.getFlowDir(RECURSIVE_DIRECTORY_FLOW_YAML_DIR));
+ checkFlowLoaderProperties(loader, 0, 2, 2);
+ checkFlowProperties(loader, BASIC_FLOW_1, 0, 3, 1, 2, null);
+ checkFlowProperties(loader, BASIC_FLOW_2, 0, 4, 1, 3, null);
+ }
+
+ @Test
public void testLoadEmbeddedFlowYamlFile() {
final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(EMBEDDED_FLOW_YAML_DIR));
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
index 2cb6fac..e40d0d1 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
@@ -28,6 +28,7 @@ public class FlowLoaderFactoryTest {
private static final String FLOW_10_TEST_DIRECTORY = "exectest1";
private static final String FLOW_20_TEST_DIRECTORY = "basicflowyamltest";
+ private static final String FLOW_20_TEST_RECURSIVE_DIRECTORY = "recursivedirectoryyamltest";
private static final String DUPLICATE_PROJECT_DIRECTORY = "duplicateprojectyamltest";
private static final String INVALID_FLOW_VERSION_DIRECTORY = "invalidflowversiontest";
private static final String NO_FLOW_VERSION_DIRECTORY = "noflowversiontest";
@@ -49,6 +50,14 @@ public class FlowLoaderFactoryTest {
}
@Test
+ public void testCreateDirectoryYamlFlowLoaderWithRecursiveDirectory() {
+ final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+ final File projectDir = ExecutionsTestUtil.getFlowDir(FLOW_20_TEST_RECURSIVE_DIRECTORY);
+ final FlowLoader loader = loaderFactory.createFlowLoader(projectDir);
+ assertThat(loader instanceof DirectoryYamlFlowLoader).isTrue();
+ }
+
+ @Test
public void testDuplicateProjectYamlFilesException() {
final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
final File projectDir = ExecutionsTestUtil.getFlowDir(DUPLICATE_PROJECT_DIRECTORY);
diff --git a/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/basic_flow.flow b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/basic_flow.flow
new file mode 100644
index 0000000..7551e7f
--- /dev/null
+++ b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/basic_flow.flow
@@ -0,0 +1,20 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
diff --git a/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow.project b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow2.flow b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow2.flow
new file mode 100644
index 0000000..e290878
--- /dev/null
+++ b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow2.flow
@@ -0,0 +1,26 @@
+---
+config:
+ flow-level-parameter: value
+
+nodes:
+ - name: shell_end
+ type: noop
+ dependsOn:
+ - shell_pwd
+ - shell_echo
+ - shell_bash
+
+ - name: shell_echo
+ type: command
+ config:
+ command: echo "This is an echoed text."
+
+ - name: shell_pwd
+ type: command
+ config:
+ command: pwd
+
+ - name: shell_bash
+ type: command
+ config:
+ command: bash ./subdirectory1/subdirectory2/sample_script.sh
diff --git a/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/sample_script.sh b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."