azkaban-aplcache

add filters as per the discussion from the sync up meeting, expose

9/3/2015 10:20:22 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index 6fc8f22..ce4e41a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -38,9 +38,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
   }
 
   // factor comparator names
-  private static final String REMAININGFLOWSIZE_COMPARATOR_NAME = "RemainingFlowSize";
+  private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
   private static final String MEMORY_COMPARATOR_NAME = "Memory";
-  private static final String REMAININGTMPSIZE_COMPARATOR_NAME = "RemainingTmpSize";
   private static final String PRIORITY_COMPARATOR_NAME = "Priority";
   private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
   private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
@@ -53,29 +52,25 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
   static {
     comparatorCreatorRepository = new HashMap<String, ComparatorCreator>();
 
-    // register the creator for remaining flow size comparator.
-    comparatorCreatorRepository.put(REMAININGFLOWSIZE_COMPARATOR_NAME, new ComparatorCreator(){
-      @Override public FactorComparator<Executor> create(int weight) { return getRemainingFlowSizeComparator(weight); }});
+    // register the creator for number of assigned flow comparator.
+    comparatorCreatorRepository.put(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, new ComparatorCreator(){
+      public FactorComparator<Executor> create(int weight) { return getNumberOfAssignedFlowComparator(weight); }});
 
     // register the creator for memory comparator.
     comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
-      @Override public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
+      public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
 
     // register the creator for priority comparator.
     comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
-      @Override public FactorComparator<Executor> create(int weight) { return getPriorityComparator(weight); }});
-
-    // register the creator for remaining TMP size comparator.
-    comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
-      @Override public FactorComparator<Executor> create(int weight) { return getRemainingTmpSizeComparator(weight); }});
+      public FactorComparator<Executor> create(int weight) { return getPriorityComparator(weight); }});
 
     // register the creator for last dispatched time comparator.
     comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
-      @Override public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
+      public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
 
     // register the creator for CPU Usage comparator.
     comparatorCreatorRepository.put(CPUUSAGE_COMPARATOR_NAME, new ComparatorCreator(){
-      @Override public FactorComparator<Executor> create(int weight) { return getCpuUsageComparator(weight); }});
+      public FactorComparator<Executor> create(int weight) { return getCpuUsageComparator(weight); }});
   }
 
 
@@ -97,8 +92,9 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
             get(entry.getKey()).
             create(entry.getValue()));
       } else {
-        logger.error(String.format("failed to initialize executor comparator as the comparator implementation for requested factor '%s' doesn't exist.",
-            entry.getKey()));
+        logger.error(String.format("failed to initialize executor comparator "+
+                                   "as the comparator implementation for requested factor '%s' doesn't exist.",
+                                   entry.getKey()));
         throw new IllegalArgumentException("comparatorList");
       }
     }
@@ -123,7 +119,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
    * @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
    *         false otherwise.
    * */
-  private static boolean statisticsObjectCheck(Statistics statisticsObj1, Statistics statisticsObj2, String caller, Integer result){
+  private static boolean statisticsObjectCheck(Statistics statisticsObj1,
+                                               Statistics statisticsObj2, String caller, Integer result){
     result = 0 ;
     // both doesn't expose the info
     if (null == statisticsObj1 && null == statisticsObj2){
@@ -153,11 +150,11 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
   }
 
   /**
-   * function defines the remaining flow size comparator.
+   * function defines the number of assigned flow comparator.
    * @param weight weight of the comparator.
    * */
-  private static FactorComparator<Executor> getRemainingFlowSizeComparator(int weight){
-    return FactorComparator.create(REMAININGFLOWSIZE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+  private static FactorComparator<Executor> getNumberOfAssignedFlowComparator(int weight){
+    return FactorComparator.create(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, weight, new Comparator<Executor>(){
 
       @Override
       public int compare(Executor o1, Executor o2) {
@@ -165,7 +162,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
         Statistics stat2 = o2.getExecutorStats();
 
         Integer result = 0;
-        if (statisticsObjectCheck(stat1,stat2,REMAININGFLOWSIZE_COMPARATOR_NAME,result)){
+        if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
           return result;
         }
         return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
@@ -173,26 +170,6 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
   }
 
   /**
-   * function defines the remaining folder size comparator.
-   * @param weight weight of the comparator.
-   * */
-  private static FactorComparator<Executor> getRemainingTmpSizeComparator(int weight){
-    return FactorComparator.create(REMAININGTMPSIZE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
-
-      @Override
-      public int compare(Executor o1, Executor o2) {
-        Statistics stat1 = o1.getExecutorStats();
-        Statistics stat2 = o2.getExecutorStats();
-
-        Integer result = 0;
-        if (statisticsObjectCheck(stat1,stat2,REMAININGTMPSIZE_COMPARATOR_NAME,result)){
-          return result;
-        }
-        return ((Long)stat1.getRemainingStorage()).compareTo(stat2.getRemainingStorage());
-      }});
-  }
-
-  /**
    * function defines the priority comparator.
    * @param weight weight of the comparator.
    * @return
@@ -230,7 +207,9 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
         if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
           return result;
         }
-        return ((Double)stat1.getCpuUsage()).compareTo(stat2.getCpuUsage());
+
+        // CPU usage , the lesser the value is, the better.
+        return ((Double)stat2.getCpuUsage()).compareTo(stat1.getCpuUsage());
       }});
   }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index f7e2c34..c27b673 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -40,6 +40,8 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
 
   // factor filter names.
   private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
+  private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimunFreeMemory";
+  private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";
 
   /**
    * static initializer of the class.
@@ -49,6 +51,8 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
   static {
     filterRepository = new HashMap<String, FactorFilter<Executor, ExecutableFlow>>();
     filterRepository.put(STATICREMAININGFLOWSIZE_FILTER_NAME, getStaticRemainingFlowSizeFilter());
+    filterRepository.put(MINIMUMFREEMEMORY_FILTER_NAME, getMinimumReservedMemoryFilter());
+    filterRepository.put(CPUSTATUS_FILTER_NAME, getCpuStatusFilter());
   }
 
   /**
@@ -67,8 +71,9 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
       if (filterRepository.containsKey(filterName)){
         this.registerFactorFilter(filterRepository.get(filterName));
       } else {
-        logger.error(String.format("failed to initialize executor filter as the filter implementation for requested factor '%s' doesn't exist.",
-            filterName));
+        logger.error(String.format("failed to initialize executor filter "+
+                                   "as the filter implementation for requested factor '%s' doesn't exist.",
+                                   filterName));
         throw new IllegalArgumentException("filterList");
       }
     }
@@ -83,12 +88,10 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
    * function to register the static remaining flow size filter.
    * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
    *        Coming for the passed flow.
-   *        Ideally this filter will make sure only the executor has remaining
+   *        Ideally this filter will make sure only the executor hasn't reached the Max allowed # of executing flows.
    * */
   private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
     return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
-
-      @Override
       public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
         if (null == filteringTarget){
           logger.info(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
@@ -107,6 +110,58 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
     });
   }
 
-  // TO-DO
-  // Add more Filter definitions .
+  /**
+   * function to register the static Minimum Reserved Memory filter.
+   * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+   *        Coming for the passed flow.
+   *        This filter will filter out any executors that has the remaining  memory below 6G
+   * */
+  private static FactorFilter<Executor, ExecutableFlow> getMinimumReservedMemoryFilter(){
+    return FactorFilter.create(MINIMUMFREEMEMORY_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+      private static final int MINIMUM_FREE_MEMORY = 6 * 1024;
+      public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+        if (null == filteringTarget){
+          logger.info(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
+          return false;
+        }
+
+        Statistics stats = filteringTarget.getExecutorStats();
+        if (null == stats) {
+          logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+              MINIMUMFREEMEMORY_FILTER_NAME,
+              filteringTarget.toString()));
+          return false;
+        }
+        return stats.getRemainingMemory() > MINIMUM_FREE_MEMORY ;
+       }
+    });
+  }
+
+
+  /**
+   * function to register the static Minimum Reserved Memory filter.
+   * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+   *        Coming for the passed flow.
+   *        This filter will filter out any executors that the current CPU usage exceed 95%
+   * */
+  private static FactorFilter<Executor, ExecutableFlow> getCpuStatusFilter(){
+    return FactorFilter.create(CPUSTATUS_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+      private static final int MAX_CPU_CURRENT_USAGE = 95;
+      public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+        if (null == filteringTarget){
+          logger.info(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
+          return false;
+        }
+
+        Statistics stats = filteringTarget.getExecutorStats();
+        if (null == stats) {
+          logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+              MINIMUMFREEMEMORY_FILTER_NAME,
+              filteringTarget.toString()));
+          return false;
+        }
+        return stats.getCpuUsage() < MAX_CPU_CURRENT_USAGE ;
+       }
+    });
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/Statistics.java b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
index 7c80f88..ba85154 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Statistics.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
@@ -23,6 +23,7 @@ import java.util.Map;
     private double remainingMemoryPercent;
     private long   remainingMemory;
     private int    remainingFlowCapacity;
+    private int    numberOfAssignedFlows;
     private Date   lastDispatchedTime;
     private long   remainingStorage;
     private double cpuUsage;
@@ -84,6 +85,14 @@ import java.util.Map;
       this.priority = value;
     }
 
+    public int getNumberOfAssignedFlows () {
+      return this.numberOfAssignedFlows;
+    }
+
+    public void setNumberOfAssignedFlows (int value) {
+      this.numberOfAssignedFlows = value;
+    }
+
     public Statistics(){}
 
     public Statistics (double remainingMemoryPercent,
@@ -92,7 +101,8 @@ import java.util.Map;
         Date lastDispatched,
         long remainingStorage,
         double cpuUsage,
-        int  priority ){
+        int  priority,
+        int numberOfAssignedFlows){
       this.remainingMemory = remainingMemory;
       this.cpuUsage = cpuUsage;
       this.remainingFlowCapacity = remainingFlowCapacity;
@@ -100,6 +110,7 @@ import java.util.Map;
       this.remainingMemoryPercent = remainingMemoryPercent;
       this.remainingStorage = remainingStorage;
       this.lastDispatchedTime = lastDispatched;
+      this.numberOfAssignedFlows = numberOfAssignedFlows;
     }
 
     @Override
@@ -116,6 +127,7 @@ import java.util.Map;
           result &=this.priority == stat.priority;
           result &=this.remainingMemoryPercent == stat.remainingMemoryPercent;
           result &=this.remainingStorage == stat.remainingStorage;
+          result &=this.numberOfAssignedFlows == stat.numberOfAssignedFlows;
           result &= null == this.lastDispatchedTime ? stat.lastDispatchedTime == null :
                             this.lastDispatchedTime.equals(stat.lastDispatchedTime);
           return result;
@@ -123,7 +135,7 @@ import java.util.Map;
         return false;
     }
 
-    
+
     // really ugly to have it home-made here for object-binding as base on the
     // current code base there is no any better ways to do that.
     public static Statistics fromJsonObject(Map<String,Object> mapObj){
@@ -150,6 +162,11 @@ import java.util.Map;
         stats.setPriority(Integer.parseInt(mapObj.get(priority).toString()));
       }
 
+      final String numberOfAssignedFlows = "numberOfAssignedFlows";
+      if (mapObj.containsKey(numberOfAssignedFlows) && null != mapObj.get(numberOfAssignedFlows)){
+        stats.setNumberOfAssignedFlows(Integer.parseInt(mapObj.get(numberOfAssignedFlows).toString()));
+      }
+
       final String remainingMemoryPercent = "remainingMemoryPercent";
       if (mapObj.containsKey(remainingMemoryPercent) && null != mapObj.get(remainingMemoryPercent)){
         stats.setRemainingMemoryPercent(Double.parseDouble(mapObj.get(remainingMemoryPercent).toString()));
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index 4177b70..97b460a 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -682,9 +682,9 @@ public class SelectorTest {
     executorList.add(new Executor(2, "host2", 80, true));
     executorList.add(new Executor(3, "host3", 80, true));
 
-    executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 4095,99, 0));
-    executorList.get(1).setExecutorStats(new Statistics(50, 4095, 50, new Date(), 4095,99, 0));
-    executorList.get(2).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048,99, 0));
+    executorList.get(0).setExecutorStats(new Statistics(99.9, 14095, 50, new Date(), 4095,89, 0, 0));
+    executorList.get(1).setExecutorStats(new Statistics(50, 14095, 50, new Date(), 4095,90, 0, 0));
+    executorList.get(2).setExecutorStats(new Statistics(99.9, 14095, 50, new Date(), 2048,90, 0, 0));
 
     ExecutableFlow flow = new ExecutableFlow();
 
@@ -697,7 +697,7 @@ public class SelectorTest {
 
     // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
     // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
-    executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048,99, 0));
+    executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048,90, 0, 1));
     executor = selector.getBest(executorList, flow);
     Assert.assertEquals(executorList.get(2), executor);
   }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
index 3e260b9..2298b88 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
@@ -38,6 +38,7 @@ public class StatisticsServlet extends HttpServlet  {
   private static final long serialVersionUID = 1L;
   private static final int  cacheTimeInMilliseconds = 1000;
   private static final Logger logger = Logger.getLogger(StatisticsServlet.class);
+  private static final String noCacheParamName = "nocache";
 
   protected static Date lastRefreshedTime = null;
   protected static Statistics cachedstats = null;
@@ -51,9 +52,11 @@ public class StatisticsServlet extends HttpServlet  {
   protected void doGet(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException {
 
-    if (null == lastRefreshedTime ||
+    boolean noCache = null!= req && Boolean.getBoolean(req.getParameter(noCacheParamName));
+
+    if (noCache || null == lastRefreshedTime ||
         new Date().getTime() - lastRefreshedTime.getTime() > cacheTimeInMilliseconds){
-      this.populateStatistics();
+      this.populateStatistics(noCache);
     }
 
     JSONUtils.toJSON(cachedstats, resp.getOutputStream(), true);
@@ -144,9 +147,9 @@ public class StatisticsServlet extends HttpServlet  {
    * call the data providers to fill the returning data container for statistics data.
    * This function refreshes the static cached copy of data in case if necessary.
    * */
-  protected synchronized void populateStatistics(){
+  protected synchronized void populateStatistics(boolean noCache){
     //check again before starting the work.
-    if (null == lastRefreshedTime ||
+    if (noCache || null == lastRefreshedTime ||
         new Date().getTime() - lastRefreshedTime.getTime() > cacheTimeInMilliseconds){
       final Statistics stats = new Statistics();
 
@@ -191,9 +194,9 @@ public class StatisticsServlet extends HttpServlet  {
     AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
     if (server != null){
       FlowRunnerManager runnerMgr =  AzkabanExecutorServer.getApp().getFlowRunnerManager();
-      stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() -
-                                     runnerMgr.getNumRunningFlows() -
-                                     runnerMgr.getNumQueuedFlows());
+      int assignedFlows = runnerMgr.getNumRunningFlows() + runnerMgr.getNumQueuedFlows();
+      stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() - assignedFlows);
+      stats.setNumberOfAssignedFlows(assignedFlows);
       stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
     }else {
       logger.error("failed to get data for remaining flow capacity or LastDispatchedTime" +
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
index 85d38e6..faf6abb 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -24,7 +24,7 @@ public class StatisticsServletTest {
     }
 
     public void callPopulateStatistics(){
-       this.populateStatistics();
+       this.populateStatistics(false);
     }
 
     public void callFillCpuUsage(Statistics stats){
@@ -81,7 +81,17 @@ public class StatisticsServletTest {
 
   @Test
   public void testStatisticsJsonParser() throws IOException  {
-    Statistics stat = new Statistics(0.1,1,2,new Date(),3,4,5);
+    Statistics stat = new Statistics(0.1,1,2,new Date(),3,4,5,5);
+    String jSonStr = JSONUtils.toJSON(stat);
+    @SuppressWarnings("unchecked")
+    Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
+    Statistics stat2 = Statistics.fromJsonObject(jSonObj);
+    Assert.assertTrue(stat.equals(stat2));
+    }
+
+  @Test
+  public void testStatisticsJsonParserWNullDateValue() throws IOException  {
+    Statistics stat = new Statistics(0.1,1,2,null,3,4,5,5);
     String jSonStr = JSONUtils.toJSON(stat);
     @SuppressWarnings("unchecked")
     Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);