azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
index 6148182..78588c5 100644
--- a/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/AzkabanProjectLoader.java
@@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull;
 import azkaban.Constants;
 import azkaban.Constants.ConfigurationKeys;
 import azkaban.flow.Flow;
+import azkaban.project.FlowLoaderUtils.DirFilter;
 import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.project.validator.ValidationReport;
@@ -211,13 +212,7 @@ class AzkabanProjectLoader {
         this.projectLoader.uploadProjectProperties(project, directoryFlowLoader.getPropsList());
 
       } else if (loader instanceof DirectoryYamlFlowLoader) {
-        final File[] flowFiles = projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX));
-        for (final File file : flowFiles) {
-          final int newFlowVersion = this.projectLoader
-              .getLatestFlowVersion(project.getId(), newProjectVersion, file.getName()) + 1;
-          this.projectLoader
-              .uploadFlowFile(project.getId(), newProjectVersion, file, newFlowVersion);
-        }
+        uploadFlowFilesRecursively(projectDir, project, newProjectVersion);
       } else {
         throw new ProjectManagerException("Invalid type of flow loader.");
       }
@@ -227,6 +222,19 @@ class AzkabanProjectLoader {
     }
   }
 
+  private void uploadFlowFilesRecursively(final File projectDir, final Project project, final int
+      newProjectVersion) {
+    for (final File file : projectDir.listFiles(new SuffixFilter(Constants.FLOW_FILE_SUFFIX))) {
+      final int newFlowVersion = this.projectLoader
+          .getLatestFlowVersion(project.getId(), newProjectVersion, file.getName()) + 1;
+      this.projectLoader
+          .uploadFlowFile(project.getId(), newProjectVersion, file, newFlowVersion);
+    }
+    for (final File file : projectDir.listFiles(new DirFilter())) {
+      uploadFlowFilesRecursively(file, project, newProjectVersion);
+    }
+  }
+
   private void cleanUpProjectOldInstallations(final Project project)
       throws ProjectManagerException{
     log.info("Cleaning up old install files older than "
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index db2c2bb..326c1bf 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -22,11 +22,11 @@ import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
 import azkaban.flow.SpecialJobTypes;
+import azkaban.project.FlowLoaderUtils.DirFilter;
 import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
  */
 public class DirectoryFlowLoader implements FlowLoader {
 
-  private static final DirFilter DIR_FILTER = new DirFilter();
   private static final String PROPERTY_SUFFIX = ".properties";
   private static final String JOB_SUFFIX = ".job";
 
@@ -211,8 +210,7 @@ public class DirectoryFlowLoader implements FlowLoader {
       }
     }
 
-    final File[] subDirs = dir.listFiles(DIR_FILTER);
-    for (final File file : subDirs) {
+    for (final File file : dir.listFiles(new DirFilter())) {
       loadProjectFromDir(base, file, parent);
     }
   }
@@ -394,13 +392,4 @@ public class DirectoryFlowLoader implements FlowLoader {
   private String getRelativeFilePath(final String basePath, final String filePath) {
     return filePath.substring(basePath.length() + 1);
   }
-
-  private static class DirFilter implements FileFilter {
-
-    @Override
-    public boolean accept(final File pathname) {
-      return pathname.isDirectory();
-    }
-  }
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index 71599cb..91bc74b 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -21,6 +21,7 @@ import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
+import azkaban.project.FlowLoaderUtils.DirFilter;
 import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
@@ -112,14 +113,22 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
               + ". Duplicate nodes found or dependency undefined.");
         } else {
           final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
-          final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
-          this.flowMap.put(flow.getId(), flow);
+          if (this.flowMap.containsKey(azkabanFlow.getName())) {
+            this.errors.add("Duplicate flows found in the project with name " + azkabanFlow
+                .getName());
+          } else {
+            final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
+            this.flowMap.put(flow.getId(), flow);
+          }
         }
       } catch (final Exception e) {
         this.errors.add("Error loading flow yaml file " + file.getName() + ":"
             + e.getMessage());
       }
     }
+    for (final File file : projectDir.listFiles(new DirFilter())) {
+      convertYamlFiles(file);
+    }
   }
 
   private Flow convertAzkabanFlowToFlow(final AzkabanFlow azkabanFlow, final String flowName,
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
index 439f8c0..855ca6b 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderFactory.java
@@ -19,6 +19,7 @@ package azkaban.project;
 import static java.util.Objects.requireNonNull;
 
 import azkaban.Constants;
+import azkaban.project.FlowLoaderUtils.DirFilter;
 import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.utils.Props;
 import java.io.File;
@@ -53,7 +54,15 @@ public class FlowLoaderFactory {
    * @return the flow loader
    */
   public FlowLoader createFlowLoader(final File projectDir) throws ProjectManagerException {
+    if (checkForValidProjectYamlFile(projectDir)) {
+      return new DirectoryYamlFlowLoader(this.props);
+    } else {
+      return new DirectoryFlowLoader(this.props);
+    }
+  }
 
+  private boolean checkForValidProjectYamlFile(final File projectDir) throws
+      ProjectManagerException {
     final File[] projectFileList = projectDir.listFiles(new SuffixFilter(Constants
         .PROJECT_FILE_SUFFIX));
 
@@ -83,12 +92,17 @@ public class FlowLoaderFactory {
 
       if (azkabanProject.get(Constants.ConfigurationKeys.AZKABAN_FLOW_VERSION).equals
           (Constants.AZKABAN_FLOW_VERSION_2_0)) {
-        return new DirectoryYamlFlowLoader(this.props);
+        return true;
       } else {
         throw new ProjectManagerException("Invalid azkaban-flow-version in the project YAML file.");
       }
     } else {
-      return new DirectoryFlowLoader(this.props);
+      for (final File file : projectDir.listFiles(new DirFilter())) {
+        if (checkForValidProjectYamlFile(file)) {
+          return true;
+        }
+      }
+      return false;
     }
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index eee2df3..29b5ea4 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -332,4 +332,15 @@ public class FlowLoaderUtils {
           && name.length() > this.suffix.length() && name.endsWith(this.suffix);
     }
   }
+
+  /**
+   * Implements Directory filter.
+   */
+  public static class DirFilter implements FileFilter {
+
+    @Override
+    public boolean accept(final File pathname) {
+      return pathname.isDirectory();
+    }
+  }
 }
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 96cd07d..978b4ea 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -35,6 +35,7 @@ public class DirectoryYamlFlowLoaderTest {
 
   private static final String BASIC_FLOW_YAML_DIR = "basicflowyamltest";
   private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
+  private static final String RECURSIVE_DIRECTORY_FLOW_YAML_DIR = "recursivedirectoryyamltest";
   private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowyamltest";
   private static final String MULTIPLE_EMBEDDED_FLOW_YAML_DIR = "multipleembeddedflowyamltest";
   private static final String CYCLE_FOUND_YAML_DIR = "cyclefoundyamltest";
@@ -86,6 +87,16 @@ public class DirectoryYamlFlowLoaderTest {
   }
 
   @Test
+  public void testLoadYamlFileRecursively() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project,
+        ExecutionsTestUtil.getFlowDir(RECURSIVE_DIRECTORY_FLOW_YAML_DIR));
+    checkFlowLoaderProperties(loader, 0, 2, 2);
+    checkFlowProperties(loader, BASIC_FLOW_1, 0, 3, 1, 2, null);
+    checkFlowProperties(loader, BASIC_FLOW_2, 0, 4, 1, 3, null);
+  }
+
+  @Test
   public void testLoadEmbeddedFlowYamlFile() {
     final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
     loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(EMBEDDED_FLOW_YAML_DIR));
diff --git a/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
index 2cb6fac..e40d0d1 100644
--- a/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/FlowLoaderFactoryTest.java
@@ -28,6 +28,7 @@ public class FlowLoaderFactoryTest {
 
   private static final String FLOW_10_TEST_DIRECTORY = "exectest1";
   private static final String FLOW_20_TEST_DIRECTORY = "basicflowyamltest";
+  private static final String FLOW_20_TEST_RECURSIVE_DIRECTORY = "recursivedirectoryyamltest";
   private static final String DUPLICATE_PROJECT_DIRECTORY = "duplicateprojectyamltest";
   private static final String INVALID_FLOW_VERSION_DIRECTORY = "invalidflowversiontest";
   private static final String NO_FLOW_VERSION_DIRECTORY = "noflowversiontest";
@@ -49,6 +50,14 @@ public class FlowLoaderFactoryTest {
   }
 
   @Test
+  public void testCreateDirectoryYamlFlowLoaderWithRecursiveDirectory() {
+    final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
+    final File projectDir = ExecutionsTestUtil.getFlowDir(FLOW_20_TEST_RECURSIVE_DIRECTORY);
+    final FlowLoader loader = loaderFactory.createFlowLoader(projectDir);
+    assertThat(loader instanceof DirectoryYamlFlowLoader).isTrue();
+  }
+
+  @Test
   public void testDuplicateProjectYamlFilesException() {
     final FlowLoaderFactory loaderFactory = new FlowLoaderFactory(new Props(null));
     final File projectDir = ExecutionsTestUtil.getFlowDir(DUPLICATE_PROJECT_DIRECTORY);
diff --git a/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/basic_flow.flow b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/basic_flow.flow
new file mode 100644
index 0000000..7551e7f
--- /dev/null
+++ b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/basic_flow.flow
@@ -0,0 +1,20 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: shell_end
+    type: noop
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow.project b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow2.flow b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow2.flow
new file mode 100644
index 0000000..e290878
--- /dev/null
+++ b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/basic_flow2.flow
@@ -0,0 +1,26 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: shell_end
+    type: noop
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+      - shell_bash
+
+  - name: shell_echo
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    type: command
+    config:
+      command: pwd
+
+  - name: shell_bash
+    type: command
+    config:
+      command: bash ./subdirectory1/subdirectory2/sample_script.sh
diff --git a/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/sample_script.sh b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/sample_script.sh
new file mode 100644
index 0000000..9e6570d
--- /dev/null
+++ b/test/execution-test-data/recursivedirectoryyamltest/subdirectory1/subdirectory2/sample_script.sh
@@ -0,0 +1 @@
+echo "This is a bash script."