diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
index a6b1a39..039ab3a 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
@@ -22,14 +22,14 @@ import azkaban.utils.Props;
/**
* @author wkang
- *
+ *
* This class manages project whitelist defined in xml config file.
* An single xml config file contains different types of whitelisted
* projects. For additional type of whitelist, modify WhitelistType enum.
- *
+ *
* The xml config file should in the following format. Please note
* the tag <MemoryCheck> is same as the defined enum MemoryCheck
- *
+ *
* <ProjectWhitelist>
* <MemoryCheck>
* <project projectname="project1" />
@@ -84,7 +84,7 @@ public class ProjectWhitelist {
Map<WhitelistType, Set<Integer>> projsWhitelisted = new HashMap<WhitelistType, Set<Integer>>();
NodeList tagList = doc.getChildNodes();
if (!tagList.item(0).getNodeName().equals(PROJECT_WHITELIST_TAG)) {
- throw new RuntimeException("Cannot find tag '" + PROJECT_WHITELIST_TAG + "' in " + xmlFile);
+ throw new RuntimeException("Cannot find tag '" + PROJECT_WHITELIST_TAG + "' in " + xmlFile);
}
NodeList whitelist = tagList.item(0).getChildNodes();
@@ -114,7 +114,7 @@ public class ProjectWhitelist {
NamedNodeMap projectAttrMap = node.getAttributes();
Node projectIdAttr = projectAttrMap.getNamedItem(PROJECTID_ATTR);
if (projectIdAttr == null) {
- throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR
+ throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR
+ "' attribute doesn't exist");
}
@@ -127,7 +127,7 @@ public class ProjectWhitelist {
if (projsWhitelisted != null) {
Set<Integer> projs = projsWhitelisted.get(whitelistType);
if (projs != null) {
- return projs.contains(project);
+ return projs.contains(project);
}
}
return false;
@@ -138,6 +138,7 @@ public class ProjectWhitelist {
* the defined enums.
*/
public static enum WhitelistType {
- MemoryCheck
+ MemoryCheck,
+ NumJobPerFlow
}
}
\ No newline at end of file
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index c8d7f1c..78de829 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -51,6 +51,8 @@ import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
import azkaban.metric.MetricReportManager;
import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectWhitelist;
+import azkaban.project.ProjectWhitelist.WhitelistType;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FileIOUtils.JobMetaData;
import azkaban.utils.FileIOUtils.LogData;
@@ -470,7 +472,9 @@ public class FlowRunnerManager implements EventListener,
int numJobs =
Integer.valueOf(options.getFlowParameters().get(
FLOW_NUM_JOB_THREADS));
- if (numJobs > 0 && numJobs <= numJobThreads) {
+ if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
+ .isProjectWhitelisted(flow.getProjectId(),
+ WhitelistType.NumJobPerFlow))) {
numJobThreads = numJobs;
}
} catch (Exception e) {
@@ -517,7 +521,7 @@ public class FlowRunnerManager implements EventListener,
/**
* Configure Azkaban metrics tracking for a new flowRunner instance
- *
+ *
* @param flowRunner
*/
private void configureFlowLevelMetrics(FlowRunner flowRunner) {