Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
index 90991f6..d4cb262 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutionOptions.java
@@ -48,6 +48,7 @@ public class ExecutionOptions {
private static final String FAILURE_EMAILS_OVERRIDE = "failureEmailsOverride";
private static final String SUCCESS_EMAILS_OVERRIDE = "successEmailsOverride";
private static final String MAIL_CREATOR = "mailCreator";
+ private static final String MEMORY_CHECK = "memoryCheck";
private boolean notifyOnFirstFailure = true;
private boolean notifyOnLastFailure = false;
@@ -61,6 +62,7 @@ public class ExecutionOptions {
private Integer queueLevel = 0;
private String concurrentOption = CONCURRENT_OPTION_IGNORE;
private String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;
+ private boolean memoryCheck = true;
private Map<String, String> flowParameters = new HashMap<String, String>();
public enum FailureAction {
@@ -179,6 +181,14 @@ public class ExecutionOptions {
initiallyDisabledJobs = disabledJobs;
}
+ public boolean getMemoryCheck() {
+ return memoryCheck;
+ }
+
+ public void setMemoryCheck(boolean memoryCheck) {
+ this.memoryCheck = memoryCheck;
+ }
+
public Map<String, Object> toObject() {
HashMap<String, Object> flowOptionObj = new HashMap<String, Object>();
@@ -196,6 +206,7 @@ public class ExecutionOptions {
flowOptionObj.put(FAILURE_EMAILS_OVERRIDE, failureEmailsOverride);
flowOptionObj.put(SUCCESS_EMAILS_OVERRIDE, successEmailsOverride);
flowOptionObj.put(MAIL_CREATOR, mailCreator);
+ flowOptionObj.put(MEMORY_CHECK, memoryCheck);
return flowOptionObj;
}
@@ -252,6 +263,8 @@ public class ExecutionOptions {
options.setFailureEmailsOverridden(wrapper.getBool(FAILURE_EMAILS_OVERRIDE,
false));
+ options.setMemoryCheck(wrapper.getBool(MEMORY_CHECK, true));
+
return options;
}
}
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
index 5f7bcd5..c86648e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorManager.java
@@ -45,6 +45,7 @@ import azkaban.event.Event;
import azkaban.event.Event.Type;
import azkaban.event.EventHandler;
import azkaban.project.Project;
+import azkaban.project.ProjectWhitelist;
import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
@@ -585,6 +586,10 @@ public class ExecutorManager extends EventHandler implements
}
}
+ boolean memoryCheck = !ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
+ ProjectWhitelist.WhitelistType.MemoryCheck);
+ options.setMemoryCheck(memoryCheck);
+
// The exflow id is set by the loader. So it's unavailable until after
// this call.
executorLoader.uploadExecutableFlow(exflow);
diff --git a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
index 322d80f..b4848d2 100644
--- a/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
+++ b/azkaban-common/src/main/java/azkaban/jobcallback/JobCallbackValidator.java
@@ -77,7 +77,6 @@ public class JobCallbackValidator {
String callbackUrlValue = jobProps.get(callbackUrlKey);
if (callbackUrlValue == null || callbackUrlValue.length() == 0) {
- // no more needs to done
break;
} else {
String requestMethodKey =
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
index 13959f8..134602b 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/JavaProcessJob.java
@@ -22,8 +22,11 @@ import java.util.List;
import org.apache.log4j.Logger;
+import azkaban.server.AzkabanServer;
+import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Pair;
import azkaban.utils.Props;
+import azkaban.utils.Utils;
public class JavaProcessJob extends ProcessJob {
public static final String CLASSPATH = "classpath";
@@ -149,12 +152,30 @@ public class JavaProcessJob extends ProcessJob {
return "";
}
- protected Pair<Long, Long> getProcMemoryRequirement() {
- String strInitMemSize = getInitialMemorySize();
- String strMaxMemSize = getMaxMemorySize();
- long initMemSize = azkaban.utils.Utils.parseMemString(strInitMemSize);
- long maxMemSize = azkaban.utils.Utils.parseMemString(strMaxMemSize);
+ protected Pair<Long, Long> getProcMemoryRequirement() throws Exception {
+ String strXms = getInitialMemorySize();
+ String strXmx = getMaxMemorySize();
+ long xms = Utils.parseMemString(strXms);
+ long xmx = Utils.parseMemString(strXmx);
+
+ Props azkabanProperties = AzkabanServer.getAzkabanProperties();
+ if (azkabanProperties != null) {
+ String maxXms = azkabanProperties.getString(DirectoryFlowLoader.JOB_MAX_XMS, DirectoryFlowLoader.MAX_XMS_DEFAULT);
+ String maxXmx = azkabanProperties.getString(DirectoryFlowLoader.JOB_MAX_XMX, DirectoryFlowLoader.MAX_XMX_DEFAULT);
+ long sizeMaxXms = Utils.parseMemString(maxXms);
+ long sizeMaxXmx = Utils.parseMemString(maxXmx);
+
+ if (xms > sizeMaxXms) {
+ throw new Exception(String.format("%s: Xms value has exceeded the allowed limit (max Xms = %s)",
+ getId(), maxXms));
+ }
+
+ if (xmx > sizeMaxXmx) {
+ throw new Exception(String.format("%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
+ getId(), maxXms));
+ }
+ }
- return new Pair<Long, Long>(initMemSize, maxMemSize);
+ return new Pair<Long, Long>(xms, xmx);
}
}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
index c6e0260..18b7ffa 100644
--- a/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
+++ b/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
@@ -40,6 +40,7 @@ public class ProcessJob extends AbstractProcessJob {
private volatile AzkabanProcess process;
private static final String MEMCHECK_ENABLED = "memCheck.enabled";
private static final String MEMCHECK_FREEMEMDECRAMT = "memCheck.freeMemDecrAmt";
+ public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";
public ProcessJob(final String jobId, final Props sysProps,
final Props jobProps, final Logger log) {
@@ -54,7 +55,7 @@ public class ProcessJob extends AbstractProcessJob {
handleError("Bad property definition! " + e.getMessage(), e);
}
- if (sysProps.getBoolean(MEMCHECK_ENABLED, true)) {
+ if (sysProps.getBoolean(MEMCHECK_ENABLED, true) && jobProps.getBoolean(AZKABAN_MEMORY_CHECK, true)) {
long freeMemDecrAmt = sysProps.getLong(MEMCHECK_FREEMEMDECRAMT, 0);
Pair<Long, Long> memPair = getProcMemoryRequirement();
boolean isMemGranted = SystemMemoryInfo.canSystemGrantMemory(memPair.getFirst(), memPair.getSecond(), freeMemDecrAmt);
@@ -122,7 +123,7 @@ public class ProcessJob extends AbstractProcessJob {
*
* @return pair of min/max memory size
*/
- protected Pair<Long, Long> getProcMemoryRequirement() {
+ protected Pair<Long, Long> getProcMemoryRequirement() throws Exception {
return new Pair<Long, Long>(0L, 0L);
}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
index 4af1c9e..89c6539 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectManager.java
@@ -33,6 +33,7 @@ import org.apache.log4j.Logger;
import azkaban.flow.Flow;
import azkaban.project.ProjectLogEvent.EventType;
+import azkaban.project.ProjectWhitelist.WhitelistType;
import azkaban.project.validator.ValidationReport;
import azkaban.project.validator.ValidationStatus;
import azkaban.project.validator.ValidatorConfigs;
@@ -84,6 +85,7 @@ public class ProjectManager {
// config files for the validators.
new XmlValidatorManager(prop);
loadAllProjects();
+ loadProjectWhiteList();
}
private void loadAllProjects() {
@@ -429,7 +431,7 @@ public class ProjectManager {
logger.info("Validating project " + archive.getName()
+ " using the registered validators "
+ validatorManager.getValidatorsInfo().toString());
- Map<String, ValidationReport> reports = validatorManager.validate(file);
+ Map<String, ValidationReport> reports = validatorManager.validate(project, file);
ValidationStatus status = ValidationStatus.PASS;
for (Entry<String, ValidationReport> report : reports.entrySet()) {
if (report.getValue().getStatus().compareTo(status) > 0) {
@@ -515,4 +517,11 @@ public class ProjectManager {
projectLoader.postEvent(project, type, user, message);
}
+ public boolean loadProjectWhiteList() {
+ if (props.containsKey(ProjectWhitelist.XML_FILE_PARAM)) {
+ ProjectWhitelist.load(props);
+ return true;
+ }
+ return false;
+ }
}
diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
new file mode 100644
index 0000000..a6b1a39
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
@@ -0,0 +1,143 @@
+package azkaban.project;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+import azkaban.utils.Props;
+
+/**
+ * @author wkang
+ *
+ * This class manages project whitelist defined in xml config file.
+ * An single xml config file contains different types of whitelisted
+ * projects. For additional type of whitelist, modify WhitelistType enum.
+ *
+ * The xml config file should in the following format. Please note
+ * the tag <MemoryCheck> is same as the defined enum MemoryCheck
+ *
+ * <ProjectWhitelist>
+ * <MemoryCheck>
+ * <project projectname="project1" />
+ * <project projectname="project2" />
+ * </MemoryCheck>
+ * <ProjectWhitelist>
+ *
+ */
+public class ProjectWhitelist {
+ public static final String XML_FILE_PARAM = "project.whitelist.xml.file";
+ private static final String PROJECT_WHITELIST_TAG = "ProjectWhitelist";
+ private static final String PROJECT_TAG = "project";
+ private static final String PROJECTID_ATTR = "projectid";
+
+ private static AtomicReference<Map<WhitelistType, Set<Integer>>> projectsWhitelisted =
+ new AtomicReference<Map<WhitelistType, Set<Integer>>>();
+
+ static void load(Props props) {
+ String xmlFile = props.getString(XML_FILE_PARAM);
+ parseXMLFile(xmlFile);
+ }
+
+ private static void parseXMLFile(String xmlFile) {
+ File file = new File(xmlFile);
+ if (!file.exists()) {
+ throw new IllegalArgumentException("Project whitelist xml file " + xmlFile
+ + " doesn't exist.");
+ }
+
+ // Creating the document builder to parse xml.
+ DocumentBuilderFactory docBuilderFactory =
+ DocumentBuilderFactory.newInstance();
+ DocumentBuilder builder = null;
+ try {
+ builder = docBuilderFactory.newDocumentBuilder();
+ } catch (ParserConfigurationException e) {
+ throw new IllegalArgumentException(
+ "Exception while parsing project whitelist xml. Document builder not created.", e);
+ }
+
+ Document doc = null;
+ try {
+ doc = builder.parse(file);
+ } catch (SAXException e) {
+ throw new IllegalArgumentException("Exception while parsing " + xmlFile
+ + ". Invalid XML.", e);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Exception while parsing " + xmlFile
+ + ". Error reading file.", e);
+ }
+
+ Map<WhitelistType, Set<Integer>> projsWhitelisted = new HashMap<WhitelistType, Set<Integer>>();
+ NodeList tagList = doc.getChildNodes();
+ if (!tagList.item(0).getNodeName().equals(PROJECT_WHITELIST_TAG)) {
+ throw new RuntimeException("Cannot find tag '" + PROJECT_WHITELIST_TAG + "' in " + xmlFile);
+ }
+
+ NodeList whitelist = tagList.item(0).getChildNodes();
+ for (int n = 0; n < whitelist.getLength(); ++n) {
+ if (whitelist.item(n).getNodeType() != Node.ELEMENT_NODE) {
+ continue;
+ }
+
+ String whitelistType = whitelist.item(n).getNodeName();
+ Set<Integer> projs = new HashSet<Integer>();
+
+ NodeList projectsList = whitelist.item(n).getChildNodes();
+ for (int i = 0; i < projectsList.getLength(); ++i) {
+ Node node = projectsList.item(i);
+ if (node.getNodeType() == Node.ELEMENT_NODE) {
+ if (node.getNodeName().equals(PROJECT_TAG)) {
+ parseProjectTag(node, projs);
+ }
+ }
+ }
+ projsWhitelisted.put(WhitelistType.valueOf(whitelistType), projs);
+ }
+ projectsWhitelisted.set(projsWhitelisted);
+ }
+
+ private static void parseProjectTag(Node node, Set<Integer> projects) {
+ NamedNodeMap projectAttrMap = node.getAttributes();
+ Node projectIdAttr = projectAttrMap.getNamedItem(PROJECTID_ATTR);
+ if (projectIdAttr == null) {
+ throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR
+ + "' attribute doesn't exist");
+ }
+
+ String projectId = projectIdAttr.getNodeValue();
+ projects.add(Integer.parseInt(projectId));
+ }
+
+ public static boolean isProjectWhitelisted(int project, WhitelistType whitelistType) {
+ Map<WhitelistType, Set<Integer>> projsWhitelisted = projectsWhitelisted.get();
+ if (projsWhitelisted != null) {
+ Set<Integer> projs = projsWhitelisted.get(whitelistType);
+ if (projs != null) {
+ return projs.contains(project);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * The tag in the project whitelist xml config file should be same as
+ * the defined enums.
+ */
+ public static enum WhitelistType {
+ MemoryCheck
+ }
+}
\ No newline at end of file
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java b/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
index b1b10b8..3c4b0e2 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ProjectValidator.java
@@ -2,6 +2,7 @@ package azkaban.project.validator;
import java.io.File;
+import azkaban.project.Project;
import azkaban.utils.Props;
/**
@@ -33,5 +34,5 @@ public interface ProjectValidator {
* @param projectDir
* @return
*/
- ValidationReport validateProject(File projectDir);
+ ValidationReport validateProject(Project project, File projectDir);
}
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
index 1be62fb..e759ad2 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/ValidatorManager.java
@@ -6,6 +6,7 @@ import java.util.Map;
import org.apache.log4j.Logger;
+import azkaban.project.Project;
import azkaban.utils.Props;
/**
@@ -31,7 +32,7 @@ public interface ValidatorManager {
* @param projectDir
* @return
*/
- Map<String, ValidationReport> validate(File projectDir);
+ Map<String, ValidationReport> validate(Project project, File projectDir);
/**
* The ValidatorManager should have a default validator which checks for the most essential
diff --git a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
index 52d376c..0f240da 100644
--- a/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
+++ b/azkaban-common/src/main/java/azkaban/project/validator/XmlValidatorManager.java
@@ -23,6 +23,7 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
+import azkaban.project.Project;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
@@ -226,10 +227,10 @@ public class XmlValidatorManager implements ValidatorManager {
}
@Override
- public Map<String, ValidationReport> validate(File projectDir) {
+ public Map<String, ValidationReport> validate(Project project, File projectDir) {
Map<String, ValidationReport> reports = new LinkedHashMap<String, ValidationReport>();
for (Entry<String, ProjectValidator> validator : validators.entrySet()) {
- reports.put(validator.getKey(), validator.getValue().validateProject(projectDir));
+ reports.put(validator.getKey(), validator.getValue().validateProject(project, projectDir));
logger.info("Validation status of validator " + validator.getKey() + " is "
+ reports.get(validator.getKey()).getStatus());
}
diff --git a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
index 45f49f2..7f50989 100644
--- a/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
+++ b/azkaban-common/src/main/java/azkaban/server/AzkabanServer.java
@@ -38,9 +38,15 @@ public abstract class AzkabanServer {
public static final String AZKABAN_PRIVATE_PROPERTIES_FILE =
"azkaban.private.properties";
public static final String DEFAULT_CONF_PATH = "conf";
+ private static Props azkabanProperties = null;
public static Props loadProps(String[] args) {
- return loadProps(args, new OptionParser());
+ azkabanProperties = loadProps(args, new OptionParser());
+ return azkabanProperties;
+ }
+
+ public static Props getAzkabanProperties() {
+ return azkabanProperties;
}
public static Props loadProps(String[] args, OptionParser parser) {
diff --git a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
index 37df66e..6279099 100644
--- a/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
+++ b/azkaban-common/src/main/java/azkaban/utils/DirectoryFlowLoader.java
@@ -37,6 +37,8 @@ import azkaban.flow.FlowProps;
import azkaban.flow.Node;
import azkaban.flow.SpecialJobTypes;
import azkaban.jobcallback.JobCallbackValidator;
+import azkaban.project.Project;
+import azkaban.project.ProjectWhitelist;
import azkaban.project.validator.ProjectValidator;
import azkaban.project.validator.ValidationReport;
import azkaban.project.validator.XmlValidatorManager;
@@ -45,10 +47,10 @@ public class DirectoryFlowLoader implements ProjectValidator {
private static final DirFilter DIR_FILTER = new DirFilter();
private static final String PROPERTY_SUFFIX = ".properties";
private static final String JOB_SUFFIX = ".job";
- private static final String JOB_MAX_XMS = "job.max.Xms";
- private static final String MAX_XMS_DEFAULT = "1G";
- private static final String JOB_MAX_XMX = "job.max.Xmx";
- private static final String MAX_XMX_DEFAULT = "2G";
+ public static final String JOB_MAX_XMS = "job.max.Xms";
+ public static final String MAX_XMS_DEFAULT = "1G";
+ public static final String JOB_MAX_XMX = "job.max.Xmx";
+ public static final String MAX_XMX_DEFAULT = "2G";
private static final String XMS = "Xms";
private static final String XMX = "Xmx";
@@ -89,7 +91,7 @@ public class DirectoryFlowLoader implements ProjectValidator {
return propsList;
}
- public void loadProjectFlow(File baseDirectory) {
+ public void loadProjectFlow(Project project, File baseDirectory) {
propsList = new ArrayList<Props>();
flowPropsList = new ArrayList<FlowProps>();
jobPropsMap = new HashMap<String, Props>();
@@ -104,7 +106,7 @@ public class DirectoryFlowLoader implements ProjectValidator {
// Load all the props files and create the Node objects
loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
- jobPropertiesCheck();
+ jobPropertiesCheck(project);
// Create edges and find missing dependencies
resolveDependencies();
@@ -378,30 +380,39 @@ public class DirectoryFlowLoader implements ProjectValidator {
visited.remove(node.getId());
}
- private void jobPropertiesCheck() {
+ private void jobPropertiesCheck(Project project) {
+ // if project is in the memory check whitelist, then we don't need to check
+ // its memory settings
+ if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
+ ProjectWhitelist.WhitelistType.MemoryCheck)) {
+ return;
+ }
+
String maxXms = props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
String maxXmx = props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
- long sizeMaxXms = azkaban.utils.Utils.parseMemString(maxXms);
- long sizeMaxXmx = azkaban.utils.Utils.parseMemString(maxXmx);
+ long sizeMaxXms = Utils.parseMemString(maxXms);
+ long sizeMaxXmx = Utils.parseMemString(maxXmx);
for (String jobName : jobPropsMap.keySet()) {
- Props resolvedJobProps =
- PropsUtils.resolveProps(jobPropsMap.get(jobName));
- String xms = resolvedJobProps.getString(XMS, null);
- if (xms != null && azkaban.utils.Utils.parseMemString(xms) > sizeMaxXms) {
+
+ Props jobProps = jobPropsMap.get(jobName);
+ String xms = jobProps.getString(XMS, null);
+ if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
+ && Utils.parseMemString(xms) > sizeMaxXms) {
errors.add(String.format(
"%s: Xms value has exceeded the allowed limit (max Xms = %s)",
jobName, maxXms));
}
- String xmx = resolvedJobProps.getString(XMX, null);
- if (xmx != null && azkaban.utils.Utils.parseMemString(xmx) > sizeMaxXmx) {
+ String xmx = jobProps.getString(XMX, null);
+ if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
+ && Utils.parseMemString(xmx) > sizeMaxXmx) {
errors.add(String.format(
"%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
jobName, maxXmx));
}
// job callback properties check
- JobCallbackValidator.validate(jobName, props, resolvedJobProps, errors);
+ JobCallbackValidator.validate(jobName, props, jobProps, errors);
}
}
@@ -450,8 +461,8 @@ public class DirectoryFlowLoader implements ProjectValidator {
}
@Override
- public ValidationReport validateProject(File projectDir) {
- loadProjectFlow(projectDir);
+ public ValidationReport validateProject(Project project, File projectDir) {
+ loadProjectFlow(project, projectDir);
ValidationReport report = new ValidationReport();
report.addErrorMsgs(errors);
return report;
diff --git a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
index 6fe0ac7..596e8b3 100644
--- a/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
+++ b/azkaban-common/src/main/java/azkaban/utils/PropsUtils.java
@@ -142,6 +142,11 @@ public class PropsUtils {
private static final Pattern VARIABLE_REPLACEMENT_PATTERN = Pattern
.compile("\\$\\{([a-zA-Z_.0-9]+)\\}");
+ public static boolean isVarialbeReplacementPattern(String str) {
+ Matcher matcher = VARIABLE_REPLACEMENT_PATTERN.matcher(str);
+ return matcher.matches();
+ }
+
public static Props resolveProps(Props props) {
if (props == null)
return null;
diff --git a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
index 5948c7b..91a4290 100644
--- a/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/ExecutableFlowTest.java
@@ -43,12 +43,13 @@ public class ExecutableFlowTest {
@Before
public void setUp() throws Exception {
+ project = new Project(11, "myTestProject");
+
Logger logger = Logger.getLogger(this.getClass());
DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
- loader.loadProjectFlow(new File("unit/executions/embedded"));
+ loader.loadProjectFlow(project, new File("unit/executions/embedded"));
Assert.assertEquals(0, loader.getErrors().size());
- project = new Project(11, "myTestProject");
project.setFlows(loader.getFlowMap());
project.setVersion(123);
}
diff --git a/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java b/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
index ec067a0..f20497a 100644
--- a/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
+++ b/azkaban-common/src/test/java/azkaban/jobcallback/JobCallbackValidatorTest.java
@@ -107,7 +107,7 @@ public class JobCallbackValidatorTest {
}
@Test
- public void oneBadPostJobCallback() {
+ public void noPostBodyJobCallback() {
Props jobProps = new Props();
jobProps.put("job.notification."
+ JobCallbackStatusEnum.FAILURE.name().toLowerCase() + ".1.url",
diff --git a/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java b/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java
index b45b853..814744c 100644
--- a/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/DirectoryFlowLoaderTest.java
@@ -19,19 +19,22 @@ package azkaban.utils;
import java.io.File;
import org.apache.log4j.Logger;
-
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import azkaban.project.Project;
+
public class DirectoryFlowLoaderTest {
+ Project project = new Project(11, "myTestProject");
+
@Ignore @Test
public void testDirectoryLoad() {
Logger logger = Logger.getLogger(this.getClass());
DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
- loader.loadProjectFlow(new File("unit/executions/exectest1"));
+ loader.loadProjectFlow(project, new File("unit/executions/exectest1"));
logger.info(loader.getFlowMap().size());
}
@@ -40,7 +43,7 @@ public class DirectoryFlowLoaderTest {
Logger logger = Logger.getLogger(this.getClass());
DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
- loader.loadProjectFlow(new File("unit/executions/embedded"));
+ loader.loadProjectFlow(project, new File("unit/executions/embedded"));
Assert.assertEquals(0, loader.getErrors().size());
}
@@ -49,7 +52,7 @@ public class DirectoryFlowLoaderTest {
Logger logger = Logger.getLogger(this.getClass());
DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
- loader.loadProjectFlow(new File("unit/executions/embeddedBad"));
+ loader.loadProjectFlow(project, new File("unit/executions/embeddedBad"));
for (String error : loader.getErrors()) {
System.out.println(error);
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index a241c3c..6fb4540 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -37,6 +37,7 @@ import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.log.Log;
import org.mortbay.thread.QueuedThreadPool;
import azkaban.execapp.event.JobCallbackManager;
@@ -78,6 +79,7 @@ public class AzkabanExecutorServer {
public static final String METRIC_INTERVAL =
"executor.metric.milisecinterval.";
public static final int DEFAULT_PORT_NUMBER = 12321;
+ public static final int DEFAULT_HEADER_BUFFER_SIZE = 4096;
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
private static final int DEFAULT_THREAD_NUMBER = 50;
@@ -111,9 +113,14 @@ public class AzkabanExecutorServer {
boolean isStatsOn = props.getBoolean("executor.connector.stats", true);
logger.info("Setting up connector with stats on: " + isStatsOn);
-
- for (Connector connector : server.getConnectors()) {
+
+ for (Connector connector : server.getConnectors()) {
connector.setStatsOn(isStatsOn);
+ logger.info(String.format("Jetty connector name: %s, default header buffer size: %d",
+ connector.getName(), connector.getHeaderBufferSize()));
+ connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize", DEFAULT_HEADER_BUFFER_SIZE));
+ logger.info(String.format("Jetty connector name: %s, (if) new header buffer size: %d",
+ connector.getName(), connector.getHeaderBufferSize()));
}
Context root = new Context(server, "/", Context.SESSIONS);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
index 124b58b..da98b99 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunner.java
@@ -55,6 +55,7 @@ import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.flow.FlowProps;
+import azkaban.jobExecutor.ProcessJob;
import azkaban.jobtype.JobTypeManager;
import azkaban.metric.MetricReportManager;
import azkaban.project.ProjectLoader;
@@ -674,6 +675,16 @@ public class FlowRunner extends EventHandler implements Runnable {
node.setInputProps(props);
}
+ /**
+ * @param props
+ * This method is to put in any job properties customization before feeding
+ * to the job.
+ */
+ private void customizeJobProperties(Props props) {
+ boolean memoryCheck = flow.getExecutionOptions().getMemoryCheck();
+ props.put(ProcessJob.AZKABAN_MEMORY_CHECK, Boolean.toString(memoryCheck));
+ }
+
private Props loadJobProps(ExecutableNode node) throws IOException {
Props props = null;
String source = node.getJobSource();
@@ -708,6 +719,9 @@ public class FlowRunner extends EventHandler implements Runnable {
if (path.getPath() != null) {
props.setSource(path.getPath());
}
+
+ customizeJobProperties(props);
+
return props;
}
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
index 15cb90a..360d4f4 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPipelineTest.java
@@ -102,7 +102,7 @@ public class FlowRunnerPipelineTest {
project = new Project(1, "testProject");
File dir = new File("unit/executions/embedded2");
- prepareProject(dir);
+ prepareProject(project, dir);
InteractiveTestJob.clearTestJobs();
}
@@ -647,10 +647,10 @@ public class FlowRunnerPipelineTest {
}
}
- private void prepareProject(File directory) throws ProjectManagerException,
+ private void prepareProject(Project project, File directory) throws ProjectManagerException,
IOException {
DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
- loader.loadProjectFlow(directory);
+ loader.loadProjectFlow(project, directory);
if (!loader.getErrors().isEmpty()) {
for (String error : loader.getErrors()) {
System.out.println(error);
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
index a586f64..d5988bb 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerPropertyResolutionTest.java
@@ -95,7 +95,7 @@ public class FlowRunnerPropertyResolutionTest {
project = new Project(1, "testProject");
File dir = new File("unit/executions/execpropstest");
- prepareProject(dir);
+ prepareProject(project, dir);
InteractiveTestJob.clearTestJobs();
}
@@ -207,10 +207,10 @@ public class FlowRunnerPropertyResolutionTest {
Assert.assertEquals("moo4", job3Props.get("props4"));
}
- private void prepareProject(File directory) throws ProjectManagerException,
+ private void prepareProject(Project project, File directory) throws ProjectManagerException,
IOException {
DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
- loader.loadProjectFlow(directory);
+ loader.loadProjectFlow(project, directory);
if (!loader.getErrors().isEmpty()) {
for (String error : loader.getErrors()) {
System.out.println(error);
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
index 00f61b4..c78abaa 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/FlowRunnerTest2.java
@@ -124,7 +124,7 @@ public class FlowRunnerTest2 {
project = new Project(1, "testProject");
File dir = new File("unit/executions/embedded2");
- prepareProject(dir);
+ prepareProject(project, dir);
InteractiveTestJob.clearTestJobs();
}
@@ -1378,10 +1378,10 @@ public class FlowRunnerTest2 {
}
}
- private void prepareProject(File directory)
+ private void prepareProject(Project project, File directory)
throws ProjectManagerException, IOException {
DirectoryFlowLoader loader = new DirectoryFlowLoader(new Props(), logger);
- loader.loadProjectFlow(directory);
+ loader.loadProjectFlow(project, directory);
if (!loader.getErrors().isEmpty()) {
for (String error: loader.getErrors()) {
System.out.println(error);
diff --git a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
index 34a8fd8..435e1f9 100644
--- a/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
+++ b/azkaban-webserver/src/main/java/azkaban/webapp/servlet/ProjectManagerServlet.java
@@ -59,6 +59,7 @@ import azkaban.project.ProjectFileHandler;
import azkaban.project.ProjectLogEvent;
import azkaban.project.ProjectManager;
import azkaban.project.ProjectManagerException;
+import azkaban.project.ProjectWhitelist;
import azkaban.project.validator.ValidationReport;
import azkaban.project.validator.ValidatorConfigs;
import azkaban.scheduler.Schedule;
@@ -153,6 +154,8 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
handleProjectPage(req, resp, session);
}
return;
+ } else if (hasParam(req, "reloadProjectWhitelist")) {
+ handleReloadProjectWhitelist(req, resp, session);
}
Page page =
@@ -1738,4 +1741,37 @@ public class ProjectManagerServlet extends LoginAbstractAzkabanServlet {
return false;
}
+
+ private void handleReloadProjectWhitelist(HttpServletRequest req,
+ HttpServletResponse resp, Session session) throws IOException {
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+
+ if (hasPermission(session.getUser(), Permission.Type.ADMIN)) {
+ try {
+ if (projectManager.loadProjectWhiteList()) {
+ ret.put("success", "Project whitelist re-loaded!");
+ } else {
+ ret.put("error", "azkaban.properties doesn't contain property " + ProjectWhitelist.XML_FILE_PARAM);
+ }
+ } catch(Exception e) {
+ ret.put("error", "Exception occurred while trying to re-load project whitelist: " + e);
+ }
+ } else {
+ ret.put("error", "Provided session doesn't have admin privilege.");
+ }
+
+ this.writeJSON(resp, ret);
+ }
+
+ protected boolean hasPermission(User user, Permission.Type type) {
+ for (String roleName : user.getRoles()) {
+ Role role = userManager.getRole(roleName);
+ if (role.getPermission().isPermissionSet(type)
+ || role.getPermission().isPermissionSet(Permission.Type.ADMIN)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
}