azkaban-aplcache

Update code according to the code review comment, fixed a unit

9/4/2015 9:54:21 PM

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index b2c3766..efe00f9 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -24,14 +24,14 @@ import azkaban.utils.Utils;
  *
  * @author gaggarwa
  */
-public class Executor {
+public class Executor implements Comparable<Executor> {
   private final int id;
   private final String host;
   private final int port;
   private boolean isActive;
   // cached copy of the latest statistics from  the executor.
-  private Statistics cachedExecutorStats;
-  private Date lastUpdated;
+  private ServerStatistics cachedExecutorStats;
+  private Date lastStatsUpdatedTime;
 
   /**
    * <pre>
@@ -113,13 +113,13 @@ public class Executor {
     return id;
   }
 
-  public Statistics getExecutorStats() {
+  public ServerStatistics getExecutorStats() {
     return this.cachedExecutorStats;
   }
 
-  public void setExecutorStats(Statistics stats) {
+  public void setExecutorStats(ServerStatistics stats) {
     this.cachedExecutorStats = stats;
-    this.lastUpdated = new Date();
+    this.lastStatsUpdatedTime = new Date();
   }
 
   /**
@@ -128,10 +128,15 @@ public class Executor {
    *         specific object is never refreshed.
    * */
   public Date getLastStatsUpdatedTime(){
-    return this.lastUpdated;
+    return this.lastStatsUpdatedTime;
   }
 
   public void setActive(boolean isActive) {
     this.isActive = isActive;
   }
+
+  @Override
+  public int compareTo(Executor o) {
+    return null == o ? 1 : this.hashCode() - o.hashCode();
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
index 7a25891..53cb1bb 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -26,7 +26,7 @@ import org.apache.log4j.Logger;
  *  @param K executor object type.
  *  @param V dispatching object type.
  * */
-public class CandidateSelector<K,V> implements Selector<K, V> {
+public class CandidateSelector<K extends Comparable<K>, V> implements Selector<K, V> {
   private static Logger logger = Logger.getLogger(CandidateComparator.class);
 
   private CandidateFilter<K,V> filter;
@@ -70,16 +70,16 @@ public class CandidateSelector<K,V> implements Selector<K, V> {
 
      logger.info(String.format("candidate count after filtering: %s", filteredList.size()));
      if (filteredList.size() == 0){
-       logger.info("failed to select candidate as the filted candidate list is empty.");
+       logger.info("failed to select candidate as the filtered candidate list is empty.");
        return null;
      }
 
      if (null == comparator){
-       logger.info("candidate comparator is not specified, default comparator from 'Collections' class will be used.");
+       logger.info("candidate comparator is not specified, default hash code comparator class will be used.");
      }
 
      // final work - find the best candidate from the filtered list.
-     K executor = Collections.max(filteredList,comparator);
+     K executor = Collections.max(filteredList, comparator);
      logger.info(String.format("candidate selected %s",
          null == executor ? "(null)" : executor.toString()));
      return executor;
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index ce4e41a..1d48685 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -24,8 +24,12 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import azkaban.executor.Executor;
-import azkaban.executor.Statistics;
+import azkaban.executor.ServerStatistics;
 
+
+/**
+ * De-normalized version of the CandidateComparator, which also contains the implementation of the factor comparators.
+ * */
 public class ExecutorComparator extends CandidateComparator<Executor> {
   private static Map<String, ComparatorCreator> comparatorCreatorRepository = null;
 
@@ -40,7 +44,6 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
   // factor comparator names
   private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
   private static final String MEMORY_COMPARATOR_NAME = "Memory";
-  private static final String PRIORITY_COMPARATOR_NAME = "Priority";
   private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
   private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
 
@@ -60,10 +63,6 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
     comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
       public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
 
-    // register the creator for priority comparator.
-    comparatorCreatorRepository.put(PRIORITY_COMPARATOR_NAME, new ComparatorCreator(){
-      public FactorComparator<Executor> create(int weight) { return getPriorityComparator(weight); }});
-
     // register the creator for last dispatched time comparator.
     comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
       public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
@@ -81,8 +80,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
    * */
   public ExecutorComparator(Map<String,Integer> comparatorList) {
     if (null == comparatorList|| comparatorList.size() == 0){
-      logger.error("failed to initialize executor comparator as the passed comparator list is invalid or empty.");
-      throw new IllegalArgumentException("filterList");
+      throw new IllegalArgumentException("failed to initialize executor comparator" +
+                                         "as the passed comparator list is invalid or empty.");
     }
 
     // register the comparators, we will now throw here if the weight is invalid, it is handled in the super.
@@ -92,10 +91,9 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
             get(entry.getKey()).
             create(entry.getValue()));
       } else {
-        logger.error(String.format("failed to initialize executor comparator "+
-                                   "as the comparator implementation for requested factor '%s' doesn't exist.",
-                                   entry.getKey()));
-        throw new IllegalArgumentException("comparatorList");
+        throw new IllegalArgumentException(String.format("failed to initialize executor comparator " +
+                                        "as the comparator implementation for requested factor '%s' doesn't exist.",
+                                        entry.getKey()));
       }
     }
   }
@@ -109,9 +107,10 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
     FactorComparator<Executor> create(int weight);
   }
 
-  /**
+  /**<pre>
    * helper function that does the object  on two statistics, comparator can leverage this function to provide
    * shortcuts if   the statistics object is missing from one or both sides of the executors.
+   * </pre>
    * @param stat1   the first statistics  object to be checked .
    * @param stat2   the second statistics object to be checked.
    * @param caller  the name of the calling function, for logging purpose.
@@ -119,8 +118,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
    * @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
    *         false otherwise.
    * */
-  private static boolean statisticsObjectCheck(Statistics statisticsObj1,
-                                               Statistics statisticsObj2, String caller, Integer result){
+  private static boolean statisticsObjectCheck(ServerStatistics statisticsObj1,
+                                               ServerStatistics statisticsObj2, String caller, Integer result){
     result = 0 ;
     // both doesn't expose the info
     if (null == statisticsObj1 && null == statisticsObj2){
@@ -158,8 +157,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-        Statistics stat1 = o1.getExecutorStats();
-        Statistics stat2 = o2.getExecutorStats();
+        ServerStatistics stat1 = o1.getExecutorStats();
+        ServerStatistics stat2 = o2.getExecutorStats();
 
         Integer result = 0;
         if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
@@ -170,27 +169,6 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
   }
 
   /**
-   * function defines the priority comparator.
-   * @param weight weight of the comparator.
-   * @return
-   * */
-  private static FactorComparator<Executor> getPriorityComparator(int weight){
-    return FactorComparator.create(PRIORITY_COMPARATOR_NAME, weight, new Comparator<Executor>(){
-
-      @Override
-      public int compare(Executor o1, Executor o2) {
-        Statistics stat1 = o1.getExecutorStats();
-        Statistics stat2 = o2.getExecutorStats();
-
-        Integer result = 0;
-        if (statisticsObjectCheck(stat1,stat2,PRIORITY_COMPARATOR_NAME,result)){
-          return result;
-        }
-        return ((Integer)stat1.getPriority()).compareTo(stat2.getPriority());
-      }});
-  }
-
-  /**
    * function defines the cpuUsage comparator.
    * @param weight weight of the comparator.
    * @return
@@ -200,10 +178,10 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-        Statistics stat1 = o1.getExecutorStats();
-        Statistics stat2 = o2.getExecutorStats();
+        ServerStatistics stat1 = o1.getExecutorStats();
+        ServerStatistics stat2 = o2.getExecutorStats();
 
-        Integer result = 0;
+        int result = 0;
         if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
           return result;
         }
@@ -224,10 +202,10 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-        Statistics stat1 = o1.getExecutorStats();
-        Statistics stat2 = o2.getExecutorStats();
+        ServerStatistics stat1 = o1.getExecutorStats();
+        ServerStatistics stat2 = o2.getExecutorStats();
 
-        Integer result = 0;
+        int result = 0;
         if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
           return result;
         }
@@ -256,11 +234,13 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
   }
 
 
-  /**
+  /**<pre>
    * function defines the Memory comparator.
-   * @param weight weight of the comparator.
    * Note: comparator firstly take the absolute value of the remaining memory, if both sides have the same value,
    *       it go further to check the percent of the remaining memory.
+   * </pre>
+   * @param weight weight of the comparator.
+
    * @return
    * */
   private static FactorComparator<Executor> getMemoryComparator(int weight){
@@ -268,10 +248,10 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
 
       @Override
       public int compare(Executor o1, Executor o2) {
-       Statistics stat1 = o1.getExecutorStats();
-       Statistics stat2 = o2.getExecutorStats();
+       ServerStatistics stat1 = o1.getExecutorStats();
+       ServerStatistics stat2 = o2.getExecutorStats();
 
-       Integer result = 0;
+       int result = 0;
        if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
          return result;
        }
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index c27b673..9adc0dc 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -23,9 +23,11 @@ import java.util.Set;
 
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.Executor;
-import azkaban.executor.Statistics;
-
+import azkaban.executor.ServerStatistics;
 
+/**
+ * De-normalized version of the candidateFilter, which also contains the implementation of the factor filters.
+ * */
 public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFlow> {
   private static Map<String, FactorFilter<Executor, ExecutableFlow>> filterRepository = null;
 
@@ -84,11 +86,12 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
     return "ExecutorFilter";
   }
 
-  /**
+  /**<pre>
    * function to register the static remaining flow size filter.
    * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
    *        Coming for the passed flow.
    *        Ideally this filter will make sure only the executor hasn't reached the Max allowed # of executing flows.
+   *</pre>
    * */
   private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
     return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
@@ -98,7 +101,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
           return false;
         }
 
-        Statistics stats = filteringTarget.getExecutorStats();
+        ServerStatistics stats = filteringTarget.getExecutorStats();
         if (null == stats) {
           logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
               STATICREMAININGFLOWSIZE_FILTER_NAME,
@@ -125,7 +128,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
           return false;
         }
 
-        Statistics stats = filteringTarget.getExecutorStats();
+        ServerStatistics stats = filteringTarget.getExecutorStats();
         if (null == stats) {
           logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
               MINIMUMFREEMEMORY_FILTER_NAME,
@@ -140,9 +143,10 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
 
   /**
    * function to register the static Minimum Reserved Memory filter.
-   * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
-   *        Coming for the passed flow.
+   * NOTE : <pre> this is a static filter which means the filter will be filtering based on the system standard which
+   *        is not Coming for the passed flow.
    *        This filter will filter out any executors that the current CPU usage exceed 95%
+   *        </pre>
    * */
   private static FactorFilter<Executor, ExecutableFlow> getCpuStatusFilter(){
     return FactorFilter.create(CPUSTATUS_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
@@ -153,7 +157,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
           return false;
         }
 
-        Statistics stats = filteringTarget.getExecutorStats();
+        ServerStatistics stats = filteringTarget.getExecutorStats();
         if (null == stats) {
           logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
               MINIMUMFREEMEMORY_FILTER_NAME,
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
index fd15ad1..d7236e8 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
@@ -22,11 +22,12 @@ import java.util.Map;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.Executor;
 
-/**
+/**<pre>
  * Executor selector class implementation.
  * NOTE: This class is a de-generalized version of the CandidateSelector, which provides a
  *       clean and convenient constructor to take in filter and comparator name list and build
  *       the instance from that.
+ *</pre>
  * */
 public class ExecutorSelector extends CandidateSelector<Executor, ExecutableFlow> {
 
@@ -38,7 +39,7 @@ public class ExecutorSelector extends CandidateSelector<Executor, ExecutableFlow
    *                        again comparator feature is disabled if a null value is passed.
    * */
   public ExecutorSelector(List<String> filterList, Map<String,Integer> comparatorList) {
-    super(null == filterList || filterList.size() == 0 ?         null : new ExecutorFilter(filterList),
-          null == comparatorList || comparatorList.size() == 0 ? null : new ExecutorComparator(comparatorList));
+    super(null == filterList || filterList.isEmpty() ?         null : new ExecutorFilter(filterList),
+          null == comparatorList || comparatorList.isEmpty() ? null : new ExecutorComparator(comparatorList));
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
index 2dd76c7..6d4d2e0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -33,13 +33,12 @@ public final class FactorComparator<T>{
    *  method provided below.
    * @param factorName : the factor name .
    * @param weight : the weight of the comparator.
-   * @ comparator : function to be provided by user on how the comparison should be made.
+   * @param comparator : function to be provided by user on how the comparison should be made.
    * */
   private FactorComparator(String factorName, int weight, Comparator<T> comparator){
     this.factorName = factorName;
     this.weight = weight;
     this.comparator = comparator;
-    logger.info("comparator created for " + this.factorName);
   }
 
   /** static function to generate an instance of the class.
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
index b277d45..04b04b7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -36,7 +36,6 @@ public final class FactorFilter<T,V>{
   private FactorFilter(String factorName, Filter<T,V> filter){
     this.factorName = factorName;
     this.filter = filter;
-    logger.info("filter created for " + this.factorName);
   }
 
   /** static function to generate an instance of the class.
@@ -66,8 +65,8 @@ public final class FactorFilter<T,V>{
   public interface Filter<T,V>{
 
     /**function to analyze the target item according to the reference object to decide whether the item should be filtered.
-     * @param filteringTarget:   object to be checked.
-     * @param referencingObject: object which contains statistics based on which a decision is made whether
+     * @param filteringTarget   object to be checked.
+     * @param referencingObject object which contains statistics based on which a decision is made whether
      *                      the object being checked need to be filtered or not.
      * @return true if the check passed, false if check failed, which means the item need to be filtered.
      * */
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
index 610f75a..605fd28 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
@@ -19,13 +19,15 @@ package azkaban.executor.selector;
 import java.util.List;
 
 
-/** Definition of the selector interface.
+/**<pre>
+ *  Definition of the selector interface.
  *  an implementation of the selector interface provides the functionality
  *  to return a candidate from the candidateList that suits best for the dispatchingObject.
+ * </pre>
  *  @param K : type of the candidate.
  *  @param V : type of the dispatching object.
  */
-public interface Selector <K,V> {
+public interface Selector <K extends Comparable<K>,V> {
 
   /** Function returns the next best suit candidate from the candidateList for the dispatching object.
    *  @param  candidateList : List of the candidates to select from .
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index 97b460a..ca43678 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -35,7 +35,7 @@ import azkaban.executor.selector.*;
 
 public class SelectorTest {
   // mock executor object.
-  protected class MockExecutorObject{
+  protected class MockExecutorObject implements Comparable <MockExecutorObject>{
     public String name;
     public int    port;
     public double percentOfRemainingMemory;
@@ -69,6 +69,11 @@ public class SelectorTest {
     {
       return this.name;
     }
+
+    @Override
+    public int compareTo(MockExecutorObject o) {
+      return null == o ? 1 : this.hashCode() - o.hashCode();
+    }
   }
 
   // Mock flow object.
@@ -674,6 +679,25 @@ public class SelectorTest {
   }
 
   @Test
+  public void testCreatingExecutorSelectorWithEmptyFilterComparatorList() throws Exception{
+    List<Executor> executorList = new ArrayList<Executor>();
+    executorList.add(new Executor(1, "host1", 80, true));
+    executorList.add(new Executor(2, "host2", 80, true));
+    executorList.add(new Executor(3, "host3", 80, true));
+
+    executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 89, 0));
+    executorList.get(1).setExecutorStats(new ServerStatistics(50, 14095, 50, new Date(), 90,  0));
+    executorList.get(2).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 90,  0));
+
+    ExecutableFlow flow = new ExecutableFlow();
+
+    ExecutorSelector selector = new ExecutorSelector(null , null);
+    Executor executor = selector.getBest(executorList, flow);
+    Assert.assertEquals(executorList.get(2), executor);
+  }
+
+
+  @Test
   public void testExecutorSelectorE2E() throws Exception{
     List<String> filterList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
     Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
@@ -682,9 +706,9 @@ public class SelectorTest {
     executorList.add(new Executor(2, "host2", 80, true));
     executorList.add(new Executor(3, "host3", 80, true));
 
-    executorList.get(0).setExecutorStats(new Statistics(99.9, 14095, 50, new Date(), 4095,89, 0, 0));
-    executorList.get(1).setExecutorStats(new Statistics(50, 14095, 50, new Date(), 4095,90, 0, 0));
-    executorList.get(2).setExecutorStats(new Statistics(99.9, 14095, 50, new Date(), 2048,90, 0, 0));
+    executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 89, 0));
+    executorList.get(1).setExecutorStats(new ServerStatistics(50, 14095, 50, new Date(), 90,  0));
+    executorList.get(2).setExecutorStats(new ServerStatistics(99.9, 14095, 50, new Date(), 90,  0));
 
     ExecutableFlow flow = new ExecutableFlow();
 
@@ -697,7 +721,7 @@ public class SelectorTest {
 
     // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
     // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
-    executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048,90, 0, 1));
+    executorList.get(0).setExecutorStats(new ServerStatistics(99.9, 4095, 50, new Date(), 90, 1));
     executor = selector.getBest(executorList, flow);
     Assert.assertEquals(executorList.get(2), executor);
   }
diff --git a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
index 905cf5e..125f1b3 100644
--- a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
@@ -84,9 +84,9 @@ public class CacheTest {
 
   @Test
   public void testTimeToLiveExpiry() {
-    CacheManager.setUpdateFrequency(200);
     CacheManager manager = CacheManager.getInstance();
     Cache cache = manager.createCache();
+    CacheManager.setUpdateFrequency(200);
 
     cache.setUpdateFrequencyMs(200);
     cache.setEjectionPolicy(EjectionPolicy.FIFO);
@@ -122,9 +122,9 @@ public class CacheTest {
 
   @Test
   public void testIdleExpireExpiry() {
-    CacheManager.setUpdateFrequency(250);
     CacheManager manager = CacheManager.getInstance();
     Cache cache = manager.createCache();
+    CacheManager.setUpdateFrequency(250);
 
     cache.setUpdateFrequencyMs(250);
     cache.setEjectionPolicy(EjectionPolicy.FIFO);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 1060e83..235989d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -131,7 +131,7 @@ public class AzkabanExecutorServer {
     root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
     root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
     root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
-    root.addServlet(new ServletHolder(new StatisticsServlet()), "/stastics");
+    root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverstastics");
 
     root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
 
@@ -176,7 +176,7 @@ public class AzkabanExecutorServer {
 
   /**
    * Configure Metric Reporting as per azkaban.properties settings
-   * 
+   *
    * @throws MetricException
    */
   private void configureMetricReports() throws MetricException {
@@ -225,13 +225,13 @@ public class AzkabanExecutorServer {
   /**
    * Load a custom class, which is provided by a configuration
    * CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY.
-   * 
+   *
    * This method will try to instantiate an instance of this custom class and
    * with given properties as the argument in the constructor.
-   * 
+   *
    * Basically the custom class must have a constructor that takes an argument
    * with type Properties.
-   * 
+   *
    * @param props
    */
   private void loadCustomJMXAttributeProcessor(Props props) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index 10371aa..71caaff 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -144,7 +144,7 @@ public class FlowRunnerManager implements EventListener,
   private Object executionDirDeletionSync = new Object();
 
   // date time of the the last flow submitted.
-  private Date lastFlowSubmitted = null;
+  private Date lastFlowSubmittedDate = null;
 
   public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
       ProjectLoader projectLoader, ClassLoader parentClassLoader)
@@ -262,7 +262,7 @@ public class FlowRunnerManager implements EventListener,
     // Note: this is not thread safe and may result in providing dirty data.
     //       we will provide this data as is for now and will revisit if there
     //       is a string justification for change.
-    return lastFlowSubmitted;
+    return lastFlowSubmittedDate;
   }
 
   public Props getGlobalProps() {
@@ -519,7 +519,7 @@ public class FlowRunnerManager implements EventListener,
       // keep track of this future
       submittedFlows.put(future, runner.getExecutionId());
       // update the last submitted time.
-      this.lastFlowSubmitted = new Date();
+      this.lastFlowSubmittedDate = new Date();
     } catch (RejectedExecutionException re) {
       throw new ExecutorManagerException(
           "Azkaban server can't execute any more flows. "
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
index faf6abb..781973b 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -7,19 +7,19 @@ import java.util.Map;
 import org.junit.Assert;
 import org.junit.Test;
 
-import azkaban.executor.Statistics;
+import azkaban.executor.ServerStatistics;
 import azkaban.utils.JSONUtils;
 
 public class StatisticsServletTest {
-  private class MockStatisticsServlet extends StatisticsServlet{
+  private class MockStatisticsServlet extends ServerStatisticsServlet{
     /** */
     private static final long serialVersionUID = 1L;
 
-    public  Statistics getStastics(){
+    public  ServerStatistics getStastics(){
       return cachedstats;
     }
 
-    public  Date getUpdatedTime(){
+    public  long getUpdatedTime(){
       return lastRefreshedTime;
     }
 
@@ -27,17 +27,17 @@ public class StatisticsServletTest {
        this.populateStatistics(false);
     }
 
-    public void callFillCpuUsage(Statistics stats){
+    public void callFillCpuUsage(ServerStatistics stats){
       this.fillCpuUsage(stats);}
 
-    public void callFillRemainingMemoryPercent(Statistics stats){
+    public void callFillRemainingMemoryPercent(ServerStatistics stats){
         this.fillRemainingMemoryPercent(stats);}
   }
   private MockStatisticsServlet statServlet = new MockStatisticsServlet();
 
   @Test
   public void testFillMemory()  {
-    Statistics stats = new Statistics();
+    ServerStatistics stats = new ServerStatistics();
     this.statServlet.callFillRemainingMemoryPercent(stats);
     // assume any machine that runs this test should
     // have bash and top available and at least got some remaining memory.
@@ -47,7 +47,7 @@ public class StatisticsServletTest {
 
   @Test
   public void testFillCpu()  {
-    Statistics stats = new Statistics();
+    ServerStatistics stats = new ServerStatistics();
     this.statServlet.callFillCpuUsage(stats);
     Assert.assertTrue(stats.getCpuUsage() > 0);
   }
@@ -64,8 +64,8 @@ public class StatisticsServletTest {
   @Test
   public void testPopulateStatisticsCache()  {
     this.statServlet.callPopulateStatistics();
-    final Date updatedTime = this.statServlet.getUpdatedTime();
-    while (new Date().getTime() - updatedTime.getTime() < 1000){
+    final long updatedTime = this.statServlet.getUpdatedTime();
+    while (System.currentTimeMillis() - updatedTime < 1000){
       this.statServlet.callPopulateStatistics();
       Assert.assertEquals(updatedTime, this.statServlet.getUpdatedTime());
     }
@@ -81,21 +81,21 @@ public class StatisticsServletTest {
 
   @Test
   public void testStatisticsJsonParser() throws IOException  {
-    Statistics stat = new Statistics(0.1,1,2,new Date(),3,4,5,5);
+    ServerStatistics stat = new ServerStatistics(0.1,1,2,new Date(),3,5);
     String jSonStr = JSONUtils.toJSON(stat);
     @SuppressWarnings("unchecked")
     Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
-    Statistics stat2 = Statistics.fromJsonObject(jSonObj);
+    ServerStatistics stat2 = ServerStatistics.fromJsonObject(jSonObj);
     Assert.assertTrue(stat.equals(stat2));
     }
 
   @Test
   public void testStatisticsJsonParserWNullDateValue() throws IOException  {
-    Statistics stat = new Statistics(0.1,1,2,null,3,4,5,5);
+    ServerStatistics stat = new ServerStatistics(0.1,1,2,null,3,5);
     String jSonStr = JSONUtils.toJSON(stat);
     @SuppressWarnings("unchecked")
     Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
-    Statistics stat2 = Statistics.fromJsonObject(jSonObj);
+    ServerStatistics stat2 = ServerStatistics.fromJsonObject(jSonObj);
     Assert.assertTrue(stat.equals(stat2));
     }
 }