azkaban-aplcache

Details

diff --git a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
index a6b1a39..039ab3a 100644
--- a/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
+++ b/azkaban-common/src/main/java/azkaban/project/ProjectWhitelist.java
@@ -22,14 +22,14 @@ import azkaban.utils.Props;
 
 /**
  * @author wkang
- * 
+ *
  * This class manages project whitelist defined in xml config file.
  * An single xml config file contains different types of whitelisted
  * projects. For additional type of whitelist, modify WhitelistType enum.
- * 
+ *
  * The xml config file should in the following format. Please note
  * the tag <MemoryCheck> is same as the defined enum MemoryCheck
- * 
+ *
  * <ProjectWhitelist>
  *  <MemoryCheck>
  *      <project projectname="project1" />
@@ -84,7 +84,7 @@ public class ProjectWhitelist {
     Map<WhitelistType, Set<Integer>> projsWhitelisted = new HashMap<WhitelistType, Set<Integer>>();
     NodeList tagList = doc.getChildNodes();
     if (!tagList.item(0).getNodeName().equals(PROJECT_WHITELIST_TAG)) {
-      throw new RuntimeException("Cannot find tag '" +  PROJECT_WHITELIST_TAG + "' in " + xmlFile);      
+      throw new RuntimeException("Cannot find tag '" +  PROJECT_WHITELIST_TAG + "' in " + xmlFile);
     }
 
     NodeList whitelist = tagList.item(0).getChildNodes();
@@ -114,7 +114,7 @@ public class ProjectWhitelist {
     NamedNodeMap projectAttrMap = node.getAttributes();
     Node projectIdAttr = projectAttrMap.getNamedItem(PROJECTID_ATTR);
     if (projectIdAttr == null) {
-      throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR 
+      throw new RuntimeException("Error loading project. The '" + PROJECTID_ATTR
               + "' attribute doesn't exist");
     }
 
@@ -127,7 +127,7 @@ public class ProjectWhitelist {
     if (projsWhitelisted != null) {
       Set<Integer> projs = projsWhitelisted.get(whitelistType);
       if (projs != null) {
-        return projs.contains(project); 
+        return projs.contains(project);
       }
     }
     return false;
@@ -138,6 +138,7 @@ public class ProjectWhitelist {
    * the defined enums.
    */
   public static enum WhitelistType {
-    MemoryCheck
+    MemoryCheck,
+    NumJobPerFlow
   }
 }
\ No newline at end of file
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index c8d7f1c..78de829 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -51,6 +51,8 @@ import azkaban.jobtype.JobTypeManager;
 import azkaban.jobtype.JobTypeManagerException;
 import azkaban.metric.MetricReportManager;
 import azkaban.project.ProjectLoader;
+import azkaban.project.ProjectWhitelist;
+import azkaban.project.ProjectWhitelist.WhitelistType;
 import azkaban.utils.FileIOUtils;
 import azkaban.utils.FileIOUtils.JobMetaData;
 import azkaban.utils.FileIOUtils.LogData;
@@ -470,7 +472,9 @@ public class FlowRunnerManager implements EventListener,
         int numJobs =
             Integer.valueOf(options.getFlowParameters().get(
                 FLOW_NUM_JOB_THREADS));
-        if (numJobs > 0 && numJobs <= numJobThreads) {
+        if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
+                .isProjectWhitelisted(flow.getProjectId(),
+                    WhitelistType.NumJobPerFlow))) {
           numJobThreads = numJobs;
         }
       } catch (Exception e) {
@@ -517,7 +521,7 @@ public class FlowRunnerManager implements EventListener,
 
   /**
    * Configure Azkaban metrics tracking for a new flowRunner instance
-   * 
+   *
    * @param flowRunner
    */
   private void configureFlowLevelMetrics(FlowRunner flowRunner) {