azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 9acb811..dc24544 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -47,6 +47,8 @@ import azkaban.utils.Utils;
 public class ProjectManager {
   private static final Logger logger = Logger.getLogger(ProjectManager.class);
 
+  public static final String PROJECT_ARCHIVE_FILE = "project.archive.file";
+
   private ConcurrentHashMap<Integer, Project> projectsById =
       new ConcurrentHashMap<Integer, Project>();
   private ConcurrentHashMap<String, Project> projectsByName =
@@ -364,16 +366,16 @@ public class ProjectManager {
       throw new ProjectManagerException("Error unzipping file.", e);
     }
 
+    props.put(PROJECT_ARCHIVE_FILE, archive.getAbsolutePath());
+    validatorManager.loadValidators(props, logger);
     logger.info("Validating project using the registered validators "
         + validatorManager.getValidatorsInfo().toString());
     Map<String, ValidationReport> reports = validatorManager.validate(file);
     Status status = Status.PASS;
     for (Entry<String, ValidationReport> report : reports.entrySet()) {
-      logger.info("Before: " + report.getValue().getStatus());
       if (report.getValue().getStatus().compareTo(status) > 0) {
         status = report.getValue().getStatus();
       }
-      logger.info("After: " + status);
     }
     if (status == Status.ERROR) {
       logger.error("Error found in upload to " + project.getName()
@@ -389,30 +391,7 @@ public class ProjectManager {
       return reports;
     }
 
-    logger.info("Validating Flow for upload " + archive.getName());
-    DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
-    loader.loadProjectFlow(file);
-    if (!loader.getErrors().isEmpty()) {
-      logger.error("Error found in upload to " + project.getName()
-          + ". Cleaning up.");
-
-      try {
-        FileUtils.deleteDirectory(file);
-      } catch (IOException e) {
-        file.deleteOnExit();
-        e.printStackTrace();
-      }
-
-      StringBuffer errorMessage = new StringBuffer();
-      errorMessage.append("Error found in upload. Cannot upload.\n");
-      for (String error : loader.getErrors()) {
-        errorMessage.append(error);
-        errorMessage.append('\n');
-      }
-
-      throw new ProjectManagerException(errorMessage.toString());
-    }
-
+    DirectoryFlowLoader loader = (DirectoryFlowLoader) validatorManager.getDefaultValidator();
     Map<String, Props> jobProps = loader.getJobProps();
     List<Props> propProps = loader.getProps();
 
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java b/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
index 860fa2d..f89e150 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
@@ -1,10 +1,11 @@
 package azkaban.project.validator;
 
 import java.io.File;
-import java.util.Properties;
+
+import azkaban.utils.Props;
 
 public interface ProjectValidator {
-  boolean initialize(Properties configuration);
+  boolean initialize(Props configuration);
   String getValidatorInfo();
   ValidationReport validateProject(File projectDir);
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
index 819bc9a..0a87a1e 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
@@ -1,20 +1,54 @@
 package azkaban.project.validator;
 
+import java.util.HashSet;
 import java.util.Set;
 
-public interface ValidationReport {
-  void addPassMsgs(Set<String> msgs);
-
-  void addWarningMsgs(Set<String> msgs);
-
-  void addErrorMsgs(Set<String> msgs);
-
-  Status getStatus();
-
-  Set<String> getPassMsgs();
-
-  Set<String> getWarningMsgs();
-
-  Set<String> getErrorMsgs();
+public class ValidationReport {
+
+  protected Status _status;
+  protected Set<String> _passMsgs;
+  protected Set<String> _warningMsgs;
+  protected Set<String> _errorMsgs;
+
+  public ValidationReport() {
+    _status = Status.PASS;
+    _passMsgs = new HashSet<String>();
+    _warningMsgs = new HashSet<String>();
+    _errorMsgs = new HashSet<String>();
+  }
+
+  public void addPassMsgs(Set<String> msgs) {
+    _passMsgs.addAll(msgs);
+  }
+
+  public void addWarningMsgs(Set<String> msgs) {
+    _warningMsgs.addAll(msgs);
+    if (!msgs.isEmpty() && _errorMsgs.isEmpty()) {
+      _status = Status.WARN;
+    }
+  }
+
+  public void addErrorMsgs(Set<String> msgs) {
+    _errorMsgs.addAll(msgs);
+    if (!msgs.isEmpty()) {
+      _status = Status.ERROR;
+    }
+  }
+
+  public Status getStatus() {
+    return _status;
+  }
+
+  public Set<String> getPassMsgs() {
+    return _passMsgs;
+  }
+
+  public Set<String> getWarningMsgs() {
+    return _warningMsgs;
+  }
+
+  public Set<String> getErrorMsgs() {
+    return _errorMsgs;
+  }
 
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
index 2ddf671..d1b4ffa 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
@@ -4,12 +4,16 @@ import java.io.File;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.log4j.Logger;
+
 import azkaban.utils.Props;
 
 public interface ValidatorManager {
-  void loadValidators(Props props);
+  void loadValidators(Props props, Logger logger);
 
   Map<String, ValidationReport> validate(File projectDir);
 
+  ProjectValidator getDefaultValidator();
+
   List<String> getValidatorsInfo();
 }
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManagerException.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManagerException.java
new file mode 100644
index 0000000..8b04c2f
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManagerException.java
@@ -0,0 +1,18 @@
+package azkaban.project.validator;
+
+public class ValidatorManagerException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public ValidatorManagerException(String message) {
+    super(message);
+  }
+
+  public ValidatorManagerException(Throwable cause) {
+    super(cause);
+  }
+
+  public ValidatorManagerException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
index aa46e99..ebd12dc 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
@@ -3,6 +3,9 @@ package azkaban.project.validator;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -20,31 +23,70 @@ import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
+import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.Props;
 
 public class XmlValidatorManager implements ValidatorManager {
   private static final Logger logger = Logger.getLogger(XmlValidatorManager.class
       .getName());
 
+  public static final String DEFAULT_VALIDATOR_DIR = "validators";
+  public static final String VALIDATOR_PLUGIN_DIR = "project.validators.dir";
   public static final String XML_FILE_PARAM = "project.validators.xml.file";
   public static final String AZKABAN_VALIDATOR_TAG = "azkaban-validators";
   public static final String VALIDATOR_TAG = "validator";
   public static final String CLASSNAME_ATTR = "classname";
+  public static final String ITEM_TAG = "item";
+  public static final String DEFAULT_VALIDATOR_KEY = "Directory Flow";
 
   private Map<String, ProjectValidator> validators;
+  private ClassLoader validatorLoader;
 
   public XmlValidatorManager(Props props) {
-    validators = new LinkedHashMap<String, ProjectValidator>();
-    loadValidators(props);
+    String validatorDirPath = props.getString(VALIDATOR_PLUGIN_DIR, DEFAULT_VALIDATOR_DIR);
+    File validatorDir = new File(validatorDirPath);
+    if (!validatorDir.canRead() || !validatorDir.isDirectory()) {
+      throw new ValidatorManagerException("Validator directory " + validatorDirPath
+          + " does not exist or is not a direcotry.");
+    }
+
+    List<URL> resources = new ArrayList<URL>();
+    try {
+      logger.info("Adding validator resources.");
+      for (File f : validatorDir.listFiles()) {
+        if (f.getName().endsWith(".jar")) {
+          resources.add(f.toURI().toURL());
+          logger.debug("adding to classpath " + f.toURI().toURL());
+        }
+      }
+    } catch (MalformedURLException e) {
+      throw new ValidatorManagerException(e);
+    }
+    validatorLoader = new URLClassLoader(resources.toArray(new URL[resources.size()]));
+
+    // Test loading the validators specified in the xml file.
+    try {
+      loadValidators(props, logger);
+    } catch (Exception e) {
+      logger.error("Cannot load all the validaotors.");
+      throw new ValidatorManagerException(e);
+    }
   }
 
   @Override
-  public void loadValidators(Props props) {
+  public void loadValidators(Props props, Logger log) {
+    validators = new LinkedHashMap<String, ProjectValidator>();
+    // Add the default validator
+    DirectoryFlowLoader flowLoader = new DirectoryFlowLoader(log);
+    validators.put(flowLoader.getValidatorInfo(), flowLoader);
+
+    if (!props.containsKey(XML_FILE_PARAM)) {
+      return;
+    }
     String xmlPath = props.get(XML_FILE_PARAM);
     File file = new File(xmlPath);
     if (!file.exists()) {
-      throw new IllegalArgumentException("Validator xml file " + xmlPath
-          + " doesn't exist.");
+      return;
     }
 
     // Creating the document builder to parse xml.
@@ -54,18 +96,18 @@ public class XmlValidatorManager implements ValidatorManager {
     try {
       builder = docBuilderFactory.newDocumentBuilder();
     } catch (ParserConfigurationException e) {
-      throw new IllegalArgumentException(
-          "Exception while parsing user xml. Document builder not created.", e);
+      throw new ValidatorManagerException(
+          "Exception while parsing validator xml. Document builder not created.", e);
     }
 
     Document doc = null;
     try {
       doc = builder.parse(file);
     } catch (SAXException e) {
-      throw new IllegalArgumentException("Exception while parsing " + xmlPath
+      throw new ValidatorManagerException("Exception while parsing " + xmlPath
           + ". Invalid XML.", e);
     } catch (IOException e) {
-      throw new IllegalArgumentException("Exception while parsing " + xmlPath
+      throw new ValidatorManagerException("Exception while parsing " + xmlPath
           + ". Error reading file.", e);
     }
 
@@ -77,45 +119,72 @@ public class XmlValidatorManager implements ValidatorManager {
       Node node = azkabanValidatorsList.item(i);
       if (node.getNodeType() == Node.ELEMENT_NODE) {
         if (node.getNodeName().equals(VALIDATOR_TAG)) {
-          parseValidatorTag(node, props);
+          parseValidatorTag(node, props, log);
         }
       }
     }
   }
 
-  private void parseValidatorTag(Node node, Props props) {
+  @SuppressWarnings("unchecked")
+  private void parseValidatorTag(Node node, Props props, Logger log) {
     NamedNodeMap validatorAttrMap = node.getAttributes();
     Node classNameAttr = validatorAttrMap.getNamedItem(CLASSNAME_ATTR);
     if (classNameAttr == null) {
-      throw new RuntimeException(
+      throw new ValidatorManagerException(
           "Error loading validator. The validator 'classname' attribute doesn't exist");
     }
 
+    NodeList keyValueItemsList = node.getChildNodes();
+    for (int i = 0; i < keyValueItemsList.getLength(); i++) {
+      Node keyValuePair = keyValueItemsList.item(i);
+      if (keyValuePair.getNodeName().equals(ITEM_TAG)) {
+        parseItemTag(keyValuePair, props);
+      }
+    }
     String className = classNameAttr.getNodeValue();
     try {
-      Class<?> validatorClass = Class.forName(className);
+      Class<? extends ProjectValidator> validatorClass =
+          (Class<? extends ProjectValidator>)validatorLoader.loadClass(className);
       Constructor<?> validatorConstructor =
-          validatorClass.getConstructor();
-      ProjectValidator validator = (ProjectValidator) validatorConstructor.newInstance();
-      validator.initialize(props.toProperties());
+          validatorClass.getConstructor(Logger.class);
+      ProjectValidator validator = (ProjectValidator) validatorConstructor.newInstance(log);
+      validator.initialize(props);
       validators.put(validator.getValidatorInfo(), validator);
-      logger.info("Added validator " + validator.getClass().getCanonicalName() + " to list of validators.");
+      logger.info("Added validator " + className + " to list of validators.");
     } catch (Exception e) {
       logger.error("Could not instantiate ProjectValidator " + className);
-      throw new RuntimeException(e);
+      throw new ValidatorManagerException(e);
     }
   }
 
+  private void parseItemTag(Node node, Props props) {
+    NamedNodeMap keyValueMap = node.getAttributes();
+    Node keyAttr = keyValueMap.getNamedItem("key");
+    Node valueAttr = keyValueMap.getNamedItem("value");
+    if (keyAttr == null || valueAttr == null) {
+      throw new ValidatorManagerException("Error loading validator key/value "
+          + "pair. The 'key' or 'value' attribute doesn't exist");
+    }
+    props.put(keyAttr.getNodeValue(), valueAttr.getNodeValue());
+  }
+
   @Override
   public Map<String, ValidationReport> validate(File projectDir) {
     Map<String, ValidationReport> reports = new LinkedHashMap<String, ValidationReport>();
     for (Entry<String, ProjectValidator> validator : validators.entrySet()) {
       reports.put(validator.getKey(), validator.getValue().validateProject(projectDir));
+      logger.info("Validation status of validator " + validator.getKey() + " is "
+          + reports.get(validator.getKey()).getStatus());
     }
     return reports;
   }
 
   @Override
+  public ProjectValidator getDefaultValidator() {
+    return validators.get(DEFAULT_VALIDATOR_KEY);
+  }
+
+  @Override
   public List<String> getValidatorsInfo() {
     List<String> info = new ArrayList<String>();
     for (String key : validators.keySet()) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
index 0353d5f..5d27850 100644
--- a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
@@ -36,8 +36,11 @@ import azkaban.flow.Flow;
 import azkaban.flow.FlowProps;
 import azkaban.flow.Node;
 import azkaban.flow.SpecialJobTypes;
+import azkaban.project.validator.ProjectValidator;
+import azkaban.project.validator.ValidationReport;
+import azkaban.project.validator.XmlValidatorManager;
 
-public class DirectoryFlowLoader {
+public class DirectoryFlowLoader implements ProjectValidator {
   private static final DirFilter DIR_FILTER = new DirFilter();
   private static final String PROPERTY_SUFFIX = ".properties";
   private static final String JOB_SUFFIX = ".job";
@@ -396,4 +399,22 @@ public class DirectoryFlowLoader {
           && name.length() > suffix.length() && name.endsWith(suffix);
     }
   }
+
+  @Override
+  public boolean initialize(Props configuration) {
+    return true;
+  }
+
+  @Override
+  public String getValidatorInfo() {
+    return XmlValidatorManager.DEFAULT_VALIDATOR_KEY;
+  }
+
+  @Override
+  public ValidationReport validateProject(File projectDir) {
+    loadProjectFlow(projectDir);
+    ValidationReport report = new ValidationReport();
+    report.addErrorMsgs(errors);
+    return report;
+  }
 }
diff --git a/azkaban-soloserver/.gitignore b/azkaban-soloserver/.gitignore
index 2b7a31c..46699ee 100644
--- a/azkaban-soloserver/.gitignore
+++ b/azkaban-soloserver/.gitignore
@@ -2,4 +2,7 @@ data/
 conf/
 temp/
 plugins/
+validators/
+executions/
+projects/
 *.log