azkaban-aplcache
Changes
azkaban-soloserver/.gitignore 3(+3 -0)
Details
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 9acb811..dc24544 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -47,6 +47,8 @@ import azkaban.utils.Utils;
public class ProjectManager {
private static final Logger logger = Logger.getLogger(ProjectManager.class);
+ public static final String PROJECT_ARCHIVE_FILE = "project.archive.file";
+
private ConcurrentHashMap<Integer, Project> projectsById =
new ConcurrentHashMap<Integer, Project>();
private ConcurrentHashMap<String, Project> projectsByName =
@@ -364,16 +366,16 @@ public class ProjectManager {
throw new ProjectManagerException("Error unzipping file.", e);
}
+ props.put(PROJECT_ARCHIVE_FILE, archive.getAbsolutePath());
+ validatorManager.loadValidators(props, logger);
logger.info("Validating project using the registered validators "
+ validatorManager.getValidatorsInfo().toString());
Map<String, ValidationReport> reports = validatorManager.validate(file);
Status status = Status.PASS;
for (Entry<String, ValidationReport> report : reports.entrySet()) {
- logger.info("Before: " + report.getValue().getStatus());
if (report.getValue().getStatus().compareTo(status) > 0) {
status = report.getValue().getStatus();
}
- logger.info("After: " + status);
}
if (status == Status.ERROR) {
logger.error("Error found in upload to " + project.getName()
@@ -389,30 +391,7 @@ public class ProjectManager {
return reports;
}
- logger.info("Validating Flow for upload " + archive.getName());
- DirectoryFlowLoader loader = new DirectoryFlowLoader(logger);
- loader.loadProjectFlow(file);
- if (!loader.getErrors().isEmpty()) {
- logger.error("Error found in upload to " + project.getName()
- + ". Cleaning up.");
-
- try {
- FileUtils.deleteDirectory(file);
- } catch (IOException e) {
- file.deleteOnExit();
- e.printStackTrace();
- }
-
- StringBuffer errorMessage = new StringBuffer();
- errorMessage.append("Error found in upload. Cannot upload.\n");
- for (String error : loader.getErrors()) {
- errorMessage.append(error);
- errorMessage.append('\n');
- }
-
- throw new ProjectManagerException(errorMessage.toString());
- }
-
+ DirectoryFlowLoader loader = (DirectoryFlowLoader) validatorManager.getDefaultValidator();
Map<String, Props> jobProps = loader.getJobProps();
List<Props> propProps = loader.getProps();
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java b/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
index 860fa2d..f89e150 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
@@ -1,10 +1,11 @@
package azkaban.project.validator;
import java.io.File;
-import java.util.Properties;
+
+import azkaban.utils.Props;
public interface ProjectValidator {
- boolean initialize(Properties configuration);
+ boolean initialize(Props configuration);
String getValidatorInfo();
ValidationReport validateProject(File projectDir);
}
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
index 819bc9a..0a87a1e 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidationReport.java
@@ -1,20 +1,54 @@
package azkaban.project.validator;
+import java.util.HashSet;
import java.util.Set;
-public interface ValidationReport {
- void addPassMsgs(Set<String> msgs);
-
- void addWarningMsgs(Set<String> msgs);
-
- void addErrorMsgs(Set<String> msgs);
-
- Status getStatus();
-
- Set<String> getPassMsgs();
-
- Set<String> getWarningMsgs();
-
- Set<String> getErrorMsgs();
+public class ValidationReport {
+
+ protected Status _status;
+ protected Set<String> _passMsgs;
+ protected Set<String> _warningMsgs;
+ protected Set<String> _errorMsgs;
+
+ public ValidationReport() {
+ _status = Status.PASS;
+ _passMsgs = new HashSet<String>();
+ _warningMsgs = new HashSet<String>();
+ _errorMsgs = new HashSet<String>();
+ }
+
+ public void addPassMsgs(Set<String> msgs) {
+ _passMsgs.addAll(msgs);
+ }
+
+ public void addWarningMsgs(Set<String> msgs) {
+ _warningMsgs.addAll(msgs);
+ if (!msgs.isEmpty() && _errorMsgs.isEmpty()) {
+ _status = Status.WARN;
+ }
+ }
+
+ public void addErrorMsgs(Set<String> msgs) {
+ _errorMsgs.addAll(msgs);
+ if (!msgs.isEmpty()) {
+ _status = Status.ERROR;
+ }
+ }
+
+ public Status getStatus() {
+ return _status;
+ }
+
+ public Set<String> getPassMsgs() {
+ return _passMsgs;
+ }
+
+ public Set<String> getWarningMsgs() {
+ return _warningMsgs;
+ }
+
+ public Set<String> getErrorMsgs() {
+ return _errorMsgs;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
index 2ddf671..d1b4ffa 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
@@ -4,12 +4,16 @@ import java.io.File;
import java.util.List;
import java.util.Map;
+import org.apache.log4j.Logger;
+
import azkaban.utils.Props;
public interface ValidatorManager {
- void loadValidators(Props props);
+ void loadValidators(Props props, Logger logger);
Map<String, ValidationReport> validate(File projectDir);
+ ProjectValidator getDefaultValidator();
+
List<String> getValidatorsInfo();
}
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManagerException.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManagerException.java
new file mode 100644
index 0000000..8b04c2f
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManagerException.java
@@ -0,0 +1,18 @@
+package azkaban.project.validator;
+
+public class ValidatorManagerException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public ValidatorManagerException(String message) {
+ super(message);
+ }
+
+ public ValidatorManagerException(Throwable cause) {
+ super(cause);
+ }
+
+ public ValidatorManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
index aa46e99..ebd12dc 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
@@ -3,6 +3,9 @@ package azkaban.project.validator;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -20,31 +23,70 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
+import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
public class XmlValidatorManager implements ValidatorManager {
private static final Logger logger = Logger.getLogger(XmlValidatorManager.class
.getName());
+ public static final String DEFAULT_VALIDATOR_DIR = "validators";
+ public static final String VALIDATOR_PLUGIN_DIR = "project.validators.dir";
public static final String XML_FILE_PARAM = "project.validators.xml.file";
public static final String AZKABAN_VALIDATOR_TAG = "azkaban-validators";
public static final String VALIDATOR_TAG = "validator";
public static final String CLASSNAME_ATTR = "classname";
+ public static final String ITEM_TAG = "item";
+ public static final String DEFAULT_VALIDATOR_KEY = "Directory Flow";
private Map<String, ProjectValidator> validators;
+ private ClassLoader validatorLoader;
public XmlValidatorManager(Props props) {
- validators = new LinkedHashMap<String, ProjectValidator>();
- loadValidators(props);
+ String validatorDirPath = props.getString(VALIDATOR_PLUGIN_DIR, DEFAULT_VALIDATOR_DIR);
+ File validatorDir = new File(validatorDirPath);
+ if (!validatorDir.canRead() || !validatorDir.isDirectory()) {
+ throw new ValidatorManagerException("Validator directory " + validatorDirPath
+ + " does not exist or is not a direcotry.");
+ }
+
+ List<URL> resources = new ArrayList<URL>();
+ try {
+ logger.info("Adding validator resources.");
+ for (File f : validatorDir.listFiles()) {
+ if (f.getName().endsWith(".jar")) {
+ resources.add(f.toURI().toURL());
+ logger.debug("adding to classpath " + f.toURI().toURL());
+ }
+ }
+ } catch (MalformedURLException e) {
+ throw new ValidatorManagerException(e);
+ }
+ validatorLoader = new URLClassLoader(resources.toArray(new URL[resources.size()]));
+
+ // Test loading the validators specified in the xml file.
+ try {
+ loadValidators(props, logger);
+ } catch (Exception e) {
+ logger.error("Cannot load all the validaotors.");
+ throw new ValidatorManagerException(e);
+ }
}
@Override
- public void loadValidators(Props props) {
+ public void loadValidators(Props props, Logger log) {
+ validators = new LinkedHashMap<String, ProjectValidator>();
+ // Add the default validator
+ DirectoryFlowLoader flowLoader = new DirectoryFlowLoader(log);
+ validators.put(flowLoader.getValidatorInfo(), flowLoader);
+
+ if (!props.containsKey(XML_FILE_PARAM)) {
+ return;
+ }
String xmlPath = props.get(XML_FILE_PARAM);
File file = new File(xmlPath);
if (!file.exists()) {
- throw new IllegalArgumentException("Validator xml file " + xmlPath
- + " doesn't exist.");
+ return;
}
// Creating the document builder to parse xml.
@@ -54,18 +96,18 @@ public class XmlValidatorManager implements ValidatorManager {
try {
builder = docBuilderFactory.newDocumentBuilder();
} catch (ParserConfigurationException e) {
- throw new IllegalArgumentException(
- "Exception while parsing user xml. Document builder not created.", e);
+ throw new ValidatorManagerException(
+ "Exception while parsing validator xml. Document builder not created.", e);
}
Document doc = null;
try {
doc = builder.parse(file);
} catch (SAXException e) {
- throw new IllegalArgumentException("Exception while parsing " + xmlPath
+ throw new ValidatorManagerException("Exception while parsing " + xmlPath
+ ". Invalid XML.", e);
} catch (IOException e) {
- throw new IllegalArgumentException("Exception while parsing " + xmlPath
+ throw new ValidatorManagerException("Exception while parsing " + xmlPath
+ ". Error reading file.", e);
}
@@ -77,45 +119,72 @@ public class XmlValidatorManager implements ValidatorManager {
Node node = azkabanValidatorsList.item(i);
if (node.getNodeType() == Node.ELEMENT_NODE) {
if (node.getNodeName().equals(VALIDATOR_TAG)) {
- parseValidatorTag(node, props);
+ parseValidatorTag(node, props, log);
}
}
}
}
- private void parseValidatorTag(Node node, Props props) {
+ @SuppressWarnings("unchecked")
+ private void parseValidatorTag(Node node, Props props, Logger log) {
NamedNodeMap validatorAttrMap = node.getAttributes();
Node classNameAttr = validatorAttrMap.getNamedItem(CLASSNAME_ATTR);
if (classNameAttr == null) {
- throw new RuntimeException(
+ throw new ValidatorManagerException(
"Error loading validator. The validator 'classname' attribute doesn't exist");
}
+ NodeList keyValueItemsList = node.getChildNodes();
+ for (int i = 0; i < keyValueItemsList.getLength(); i++) {
+ Node keyValuePair = keyValueItemsList.item(i);
+ if (keyValuePair.getNodeName().equals(ITEM_TAG)) {
+ parseItemTag(keyValuePair, props);
+ }
+ }
String className = classNameAttr.getNodeValue();
try {
- Class<?> validatorClass = Class.forName(className);
+ Class<? extends ProjectValidator> validatorClass =
+ (Class<? extends ProjectValidator>)validatorLoader.loadClass(className);
Constructor<?> validatorConstructor =
- validatorClass.getConstructor();
- ProjectValidator validator = (ProjectValidator) validatorConstructor.newInstance();
- validator.initialize(props.toProperties());
+ validatorClass.getConstructor(Logger.class);
+ ProjectValidator validator = (ProjectValidator) validatorConstructor.newInstance(log);
+ validator.initialize(props);
validators.put(validator.getValidatorInfo(), validator);
- logger.info("Added validator " + validator.getClass().getCanonicalName() + " to list of validators.");
+ logger.info("Added validator " + className + " to list of validators.");
} catch (Exception e) {
logger.error("Could not instantiate ProjectValidator " + className);
- throw new RuntimeException(e);
+ throw new ValidatorManagerException(e);
}
}
+ private void parseItemTag(Node node, Props props) {
+ NamedNodeMap keyValueMap = node.getAttributes();
+ Node keyAttr = keyValueMap.getNamedItem("key");
+ Node valueAttr = keyValueMap.getNamedItem("value");
+ if (keyAttr == null || valueAttr == null) {
+ throw new ValidatorManagerException("Error loading validator key/value "
+ + "pair. The 'key' or 'value' attribute doesn't exist");
+ }
+ props.put(keyAttr.getNodeValue(), valueAttr.getNodeValue());
+ }
+
@Override
public Map<String, ValidationReport> validate(File projectDir) {
Map<String, ValidationReport> reports = new LinkedHashMap<String, ValidationReport>();
for (Entry<String, ProjectValidator> validator : validators.entrySet()) {
reports.put(validator.getKey(), validator.getValue().validateProject(projectDir));
+ logger.info("Validation status of validator " + validator.getKey() + " is "
+ + reports.get(validator.getKey()).getStatus());
}
return reports;
}
@Override
+ public ProjectValidator getDefaultValidator() {
+ return validators.get(DEFAULT_VALIDATOR_KEY);
+ }
+
+ @Override
public List<String> getValidatorsInfo() {
List<String> info = new ArrayList<String>();
for (String key : validators.keySet()) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
index 0353d5f..5d27850 100644
--- a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
@@ -36,8 +36,11 @@ import azkaban.flow.Flow;
import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.flow.SpecialJobTypes;
+import azkaban.project.validator.ProjectValidator;
+import azkaban.project.validator.ValidationReport;
+import azkaban.project.validator.XmlValidatorManager;
-public class DirectoryFlowLoader {
+public class DirectoryFlowLoader implements ProjectValidator {
private static final DirFilter DIR_FILTER = new DirFilter();
private static final String PROPERTY_SUFFIX = ".properties";
private static final String JOB_SUFFIX = ".job";
@@ -396,4 +399,22 @@ public class DirectoryFlowLoader {
&& name.length() > suffix.length() && name.endsWith(suffix);
}
}
+
+ @Override
+ public boolean initialize(Props configuration) {
+ return true;
+ }
+
+ @Override
+ public String getValidatorInfo() {
+ return XmlValidatorManager.DEFAULT_VALIDATOR_KEY;
+ }
+
+ @Override
+ public ValidationReport validateProject(File projectDir) {
+ loadProjectFlow(projectDir);
+ ValidationReport report = new ValidationReport();
+ report.addErrorMsgs(errors);
+ return report;
+ }
}
azkaban-soloserver/.gitignore 3(+3 -0)
diff --git a/azkaban-soloserver/.gitignore b/azkaban-soloserver/.gitignore
index 2b7a31c..46699ee 100644
--- a/azkaban-soloserver/.gitignore
+++ b/azkaban-soloserver/.gitignore
@@ -2,4 +2,7 @@ data/
conf/
temp/
plugins/
+validators/
+executions/
+projects/
*.log