azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
index b92a183..c53e5c1 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryFlowLoader.java
@@ -16,19 +16,15 @@
 
 package azkaban.project;
 
-import azkaban.Constants;
 import azkaban.flow.CommonJobProperties;
 import azkaban.flow.Edge;
 import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
 import azkaban.flow.SpecialJobTypes;
-import azkaban.jobcallback.JobCallbackValidator;
 import azkaban.project.FlowLoaderUtils.SuffixFilter;
 import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
-import azkaban.utils.PropsUtils;
-import azkaban.utils.Utils;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
@@ -50,8 +46,6 @@ public class DirectoryFlowLoader implements FlowLoader {
   private static final DirFilter DIR_FILTER = new DirFilter();
   private static final String PROPERTY_SUFFIX = ".properties";
   private static final String JOB_SUFFIX = ".job";
-  private static final String XMS = "Xms";
-  private static final String XMX = "Xmx";
 
   private static final Logger logger = LoggerFactory.getLogger(DirectoryFlowLoader.class);
   private final Props props;
@@ -146,7 +140,7 @@ public class DirectoryFlowLoader implements FlowLoader {
     // Resolve embedded flows
     resolveEmbeddedFlows();
 
-    checkJobProperties(project);
+    FlowLoaderUtils.checkJobProperties(project.getId(), this.props, this.jobPropsMap, this.errors);
 
     return FlowLoaderUtils.generateFlowLoaderReport(this.errors);
 
@@ -383,44 +377,6 @@ public class DirectoryFlowLoader implements FlowLoader {
     visited.remove(node.getId());
   }
 
-  public void checkJobProperties(final Project project) {
-    // if project is in the memory check whitelist, then we don't need to check
-    // its memory settings
-    if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
-        ProjectWhitelist.WhitelistType.MemoryCheck)) {
-      return;
-    }
-
-    final String maxXms = this.props.getString(
-        Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
-    final String maxXmx = this.props.getString(
-        Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
-    final long sizeMaxXms = Utils.parseMemString(maxXms);
-    final long sizeMaxXmx = Utils.parseMemString(maxXmx);
-
-    for (final String jobName : this.jobPropsMap.keySet()) {
-
-      final Props jobProps = this.jobPropsMap.get(jobName);
-      final String xms = jobProps.getString(XMS, null);
-      if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
-          && Utils.parseMemString(xms) > sizeMaxXms) {
-        this.errors.add(String.format(
-            "%s: Xms value has exceeded the allowed limit (max Xms = %s)",
-            jobName, maxXms));
-      }
-      final String xmx = jobProps.getString(XMX, null);
-      if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
-          && Utils.parseMemString(xmx) > sizeMaxXmx) {
-        this.errors.add(String.format(
-            "%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
-            jobName, maxXmx));
-      }
-
-      // job callback properties check
-      JobCallbackValidator.validate(jobName, this.props, jobProps, this.errors);
-    }
-  }
-
   private String getNameWithoutExtension(final File file) {
     final String filename = file.getName();
     final int index = filename.lastIndexOf('.');
diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
index dade01f..858b6ee 100644
--- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java
@@ -46,6 +46,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
   private final Set<String> errors = new HashSet<>();
   private final Map<String, Flow> flowMap = new HashMap<>();
   private final Map<String, List<Edge>> edgeMap = new HashMap<>();
+  private final Map<String, Props> jobPropsMap = new HashMap<>();
 
   /**
    * Creates a new DirectoryYamlFlowLoader.
@@ -95,7 +96,7 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
   @Override
   public ValidationReport loadProjectFlow(final Project project, final File projectDir) {
     convertYamlFiles(projectDir);
-    checkJobProperties(project);
+    FlowLoaderUtils.checkJobProperties(project.getId(), this.props, this.jobPropsMap, this.errors);
     return FlowLoaderUtils.generateFlowLoaderReport(this.errors);
   }
 
@@ -106,11 +107,17 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
       final NodeBeanLoader loader = new NodeBeanLoader();
       try {
         final NodeBean nodeBean = loader.load(file);
-        final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
-        final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
-        this.flowMap.put(flow.getId(), flow);
+        if (!loader.validate(nodeBean)) {
+          this.errors.add("Failed to validate nodeBean for " + file.getName()
+              + ". Duplicate nodes found or dependency undefined.");
+        } else {
+          final AzkabanFlow azkabanFlow = (AzkabanFlow) loader.toAzkabanNode(nodeBean);
+          final Flow flow = convertAzkabanFlowToFlow(azkabanFlow, azkabanFlow.getName(), file);
+          this.flowMap.put(flow.getId(), flow);
+        }
       } catch (final Exception e) {
-        logger.error("Error loading flow yaml file. ", e);
+        this.errors.add("Error loading flow yaml file " + file.getName() + ":"
+            + e.getMessage());
       }
     }
   }
@@ -156,6 +163,9 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
       flowNode.setEmbeddedFlow(true);
       this.flowMap.put(flowNode.getId(), flowNode);
     }
+
+    this.jobPropsMap
+        .put(flowName + Constants.PATH_DELIMITER + node.getId(), azkabanNode.getProps());
     return node;
   }
 
@@ -186,11 +196,6 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
           // Cycles found, including self cycle.
           edge.setError("Cycles found.");
           this.errors.add("Cycles found at " + edge.getId());
-        } else if (!azkabanFlow.getNodes().containsKey(parent)) {
-          // Dependency not found.
-          edge.setError("Dependency not found.");
-          this.errors.add(node.getName() + " cannot find dependency "
-              + parent);
         } else {
           // Valid edge. Continue to process the parent node recursively.
           addEdges(azkabanFlow.getNode(parent), azkabanFlow, flowName, recStack, visited);
@@ -200,8 +205,4 @@ public class DirectoryYamlFlowLoader implements FlowLoader {
     }
   }
 
-  public void checkJobProperties(final Project project) {
-    // Todo jamiesjc: implement the check later
-  }
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
index 52d0d8b..1fa4ac3 100644
--- a/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
+++ b/azkaban-common/src/main/java/azkaban/project/FlowLoaderUtils.java
@@ -18,8 +18,11 @@ package azkaban.project;
 import azkaban.Constants;
 import azkaban.flow.CommonJobProperties;
 import azkaban.flow.Flow;
+import azkaban.jobcallback.JobCallbackValidator;
 import azkaban.project.validator.ValidationReport;
 import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import azkaban.utils.Utils;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileFilter;
@@ -45,7 +48,8 @@ import org.yaml.snakeyaml.Yaml;
 public class FlowLoaderUtils {
 
   private static final Logger logger = LoggerFactory.getLogger(FlowLoaderUtils.class);
-
+  private static final String XMS = "Xms";
+  private static final String XMX = "Xmx";
   /**
    * Sets props in flow yaml file.
    *
@@ -233,6 +237,52 @@ public class FlowLoaderUtils {
   }
 
   /**
+   * Check job properties.
+   *
+   * @param projectId the project id
+   * @param props the server props
+   * @param jobPropsMap the job props map
+   * @param errors the errors
+   */
+  public static void checkJobProperties(final int projectId, final Props props,
+      final Map<String, Props> jobPropsMap, final Set<String> errors) {
+    // if project is in the memory check whitelist, then we don't need to check
+    // its memory settings
+    if (ProjectWhitelist.isProjectWhitelisted(projectId,
+        ProjectWhitelist.WhitelistType.MemoryCheck)) {
+      return;
+    }
+
+    final String maxXms = props.getString(
+        Constants.JobProperties.JOB_MAX_XMS, Constants.JobProperties.MAX_XMS_DEFAULT);
+    final String maxXmx = props.getString(
+        Constants.JobProperties.JOB_MAX_XMX, Constants.JobProperties.MAX_XMX_DEFAULT);
+    final long sizeMaxXms = Utils.parseMemString(maxXms);
+    final long sizeMaxXmx = Utils.parseMemString(maxXmx);
+
+    for (final String jobName : jobPropsMap.keySet()) {
+      final Props jobProps = jobPropsMap.get(jobName);
+      final String xms = jobProps.getString(XMS, null);
+      if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
+          && Utils.parseMemString(xms) > sizeMaxXms) {
+        errors.add(String.format(
+            "%s: Xms value has exceeded the allowed limit (max Xms = %s)",
+            jobName, maxXms));
+      }
+      final String xmx = jobProps.getString(XMX, null);
+      if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
+          && Utils.parseMemString(xmx) > sizeMaxXmx) {
+        errors.add(String.format(
+            "%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
+            jobName, maxXmx));
+      }
+
+      // job callback properties check
+      JobCallbackValidator.validate(jobName, props, jobProps, errors);
+    }
+  }
+
+  /**
    * Clean up the directory.
    *
    * @param dir the directory to be deleted
diff --git a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
index 2b303d3..0ea5c15 100644
--- a/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
+++ b/azkaban-common/src/main/java/azkaban/project/NodeBeanLoader.java
@@ -58,7 +58,7 @@ public class NodeBeanLoader {
     }
 
     for (final NodeBean n : nodeBean.getNodes()) {
-      if (!nodeNames.containsAll(n.getDependsOn())) {
+      if (n.getDependsOn() != null && !nodeNames.containsAll(n.getDependsOn())) {
         // Undefined reference to dependent job
         return false;
       }
diff --git a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
index 4766606..96cd07d 100644
--- a/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/project/DirectoryYamlFlowLoaderTest.java
@@ -37,7 +37,10 @@ public class DirectoryYamlFlowLoaderTest {
   private static final String MULTIPLE_FLOW_YAML_DIR = "multipleflowyamltest";
   private static final String EMBEDDED_FLOW_YAML_DIR = "embeddedflowyamltest";
   private static final String MULTIPLE_EMBEDDED_FLOW_YAML_DIR = "multipleembeddedflowyamltest";
-  private static final String INVALID_FLOW_YAML_DIR = "invalidflowyamltest";
+  private static final String CYCLE_FOUND_YAML_DIR = "cyclefoundyamltest";
+  private static final String DUPLICATE_NODENAME_YAML_DIR = "duplicatenodenamesyamltest";
+  private static final String DEPENDENCY_UNDEFINED_YAML_DIR = "dependencyundefinedyamltest";
+  private static final String INVALID_JOBPROPS_YAML_DIR = "invalidjobpropsyamltest";
   private static final String NO_FLOW_YAML_DIR = "noflowyamltest";
   private static final String BASIC_FLOW_1 = "basic_flow";
   private static final String BASIC_FLOW_2 = "basic_flow2";
@@ -53,10 +56,11 @@ public class DirectoryYamlFlowLoaderTest {
   private static final String EMBEDDED_FLOW_B2 =
       "embedded_flow_b" + Constants.PATH_DELIMITER + "embedded_flow1" + Constants.PATH_DELIMITER
           + "embedded_flow2";
-  private static final String INVALID_FLOW_1 = "dependency_not_found";
-  private static final String INVALID_FLOW_2 = "cycle_found";
-  private static final String DEPENDENCY_NOT_FOUND_ERROR = "Dependency not found.";
+  private static final String DUPLICATE_NODENAME_FLOW_FILE = "duplicate_nodename.flow";
+  private static final String DEPENDENCY_UNDEFINED_FLOW_FILE = "dependency_undefined.flow";
+  private static final String CYCLE_FOUND_FLOW = "cycle_found";
   private static final String CYCLE_FOUND_ERROR = "Cycles found.";
+  private static final String SHELL_PWD = "invalid_jobprops:shell_pwd";
   private Project project;
 
   @Before
@@ -106,14 +110,44 @@ public class DirectoryYamlFlowLoaderTest {
   }
 
   @Test
-  public void testLoadInvalidFlowYamlFiles() {
+  public void testLoadInvalidFlowYamlFileWithDuplicateNodeNames() {
     final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
-    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(INVALID_FLOW_YAML_DIR));
-    checkFlowLoaderProperties(loader, 2, 2, 2);
-    // Invalid flow 1: Dependency not found.
-    checkFlowProperties(loader, INVALID_FLOW_1, 1, 3, 1, 3, DEPENDENCY_NOT_FOUND_ERROR);
-    // Invalid flow 2: Cycles found.
-    checkFlowProperties(loader, INVALID_FLOW_2, 1, 4, 1, 4, CYCLE_FOUND_ERROR);
+    loader.loadProjectFlow(this.project,
+        ExecutionsTestUtil.getFlowDir(DUPLICATE_NODENAME_YAML_DIR));
+    checkFlowLoaderProperties(loader, 1, 0, 0);
+    assertThat(loader.getErrors()).containsExactly(
+        "Failed to validate nodeBean for " + DUPLICATE_NODENAME_FLOW_FILE
+            + ". Duplicate nodes found or dependency undefined.");
+  }
+
+  @Test
+  public void testLoadInvalidFlowYamlFileWithUndefinedDependency() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project,
+        ExecutionsTestUtil.getFlowDir(DEPENDENCY_UNDEFINED_YAML_DIR));
+    checkFlowLoaderProperties(loader, 1, 0, 0);
+    assertThat(loader.getErrors()).containsExactly(
+        "Failed to validate nodeBean for " + DEPENDENCY_UNDEFINED_FLOW_FILE
+            + ". Duplicate nodes found or dependency undefined.");
+  }
+
+  @Test
+  public void testLoadInvalidFlowYamlFileWithCycle() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project, ExecutionsTestUtil.getFlowDir(CYCLE_FOUND_YAML_DIR));
+    checkFlowLoaderProperties(loader, 1, 1, 1);
+    checkFlowProperties(loader, CYCLE_FOUND_FLOW, 1, 4, 1, 4, CYCLE_FOUND_ERROR);
+  }
+
+  @Test
+  public void testLoadFlowYamlFileWithInvalidJobProps() {
+    final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
+    loader.loadProjectFlow(this.project,
+        ExecutionsTestUtil.getFlowDir(INVALID_JOBPROPS_YAML_DIR));
+    checkFlowLoaderProperties(loader, 1, 1, 1);
+    assertThat(loader.getErrors()).containsExactly(
+        SHELL_PWD + ": Xms value has exceeded the allowed limit (max Xms = "
+            + Constants.JobProperties.MAX_XMS_DEFAULT + ")");
   }
 
   @Test
diff --git a/test/execution-test-data/dependencyundefinedyamltest/dependency_undefined.project b/test/execution-test-data/dependencyundefinedyamltest/dependency_undefined.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/dependencyundefinedyamltest/dependency_undefined.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.flow b/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.flow
new file mode 100644
index 0000000..311f457
--- /dev/null
+++ b/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.flow
@@ -0,0 +1,28 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: shell_end
+    type: noop
+    dependsOn:
+      - shell_echo
+
+  - name: shell_echo
+    type: command
+    dependsOn:
+      - shell_pwd
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_echo
+    type: command
+    dependsOn:
+      - shell_pwd
+    config:
+      command: echo "This is a duplicate echoed text."
+
+  - name: shell_pwd
+    type: command
+    config:
+      command: pwd
diff --git a/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.project b/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/duplicatenodenamesyamltest/duplicate_nodename.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0
diff --git a/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.flow b/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.flow
new file mode 100644
index 0000000..53867ae
--- /dev/null
+++ b/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.flow
@@ -0,0 +1,22 @@
+---
+config:
+  flow-level-parameter: value
+
+nodes:
+  - name: shell_end
+    type: noop
+    dependsOn:
+      - shell_pwd
+      - shell_echo
+
+  - name: shell_echo
+    type: command
+    config:
+      command: echo "This is an echoed text."
+
+  - name: shell_pwd
+    type: command
+    config:
+      command: pwd
+      Xms: 2G
+
diff --git a/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.project b/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.project
new file mode 100644
index 0000000..4929753
--- /dev/null
+++ b/test/execution-test-data/invalidjobpropsyamltest/invalid_jobprops.project
@@ -0,0 +1 @@
+azkaban-flow-version: 2.0