azkaban-aplcache

Merge pull request #477 from evlstyle/multipleexecutor modify

9/9/2015 6:42:30 PM

Changes

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 31070bf..e21ae2a 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -16,6 +16,7 @@
 
 package azkaban.executor;
 
+import java.util.Date;
 import azkaban.utils.Utils;
 
 /**
@@ -23,13 +24,14 @@ import azkaban.utils.Utils;
  *
  * @author gaggarwa
  */
-public class Executor {
+public class Executor implements Comparable<Executor> {
   private final int id;
   private final String host;
   private final int port;
   private boolean isActive;
-
-  // TODO: ExecutorStats to be added
+  // cached copy of the latest statistics from  the executor.
+  private ExecutorInfo cachedExecutorStats;
+  private Date lastStatsUpdatedTime;
 
   /**
    * <pre>
@@ -88,6 +90,13 @@ public class Executor {
     return true;
   }
 
+  @Override
+  public String toString(){
+    return String.format("%s (id: %s)",
+        null == this.host || this.host.length() == 0 ? "(empty)" : this.host,
+        this.id);
+  }
+
   public String getHost() {
     return host;
   }
@@ -104,7 +113,30 @@ public class Executor {
     return id;
   }
 
+  public ExecutorInfo getExecutorInfo() {
+    return this.cachedExecutorStats;
+  }
+
+  public void setExecutorInfo(ExecutorInfo info) {
+    this.cachedExecutorStats = info;
+    this.lastStatsUpdatedTime = new Date();
+  }
+
+  /**
+   * Gets the timestamp when the executor info is last updated.
+   * @return date object represents the timestamp, null if the executor info of this
+   *         specific executor is never refreshed.
+   * */
+  public Date getLastStatsUpdatedTime(){
+    return this.lastStatsUpdatedTime;
+  }
+
   public void setActive(boolean isActive) {
     this.isActive = isActive;
   }
+
+  @Override
+  public int compareTo(Executor o) {
+    return null == o ? 1 : this.hashCode() - o.hashCode();
+  }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
index 4d77897..2ab814d 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorApiClient.java
@@ -17,37 +17,35 @@
 package azkaban.executor;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.StatusLine;
 import org.apache.http.client.HttpResponseException;
 import org.apache.http.util.EntityUtils;
-
-import azkaban.utils.JSONUtils;
 import azkaban.utils.RestfulApiClient;
 
 /** Client class that will be used to handle all Restful API calls between Executor and the host application.
  * */
-public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
+public class ExecutorApiClient extends RestfulApiClient<String> {
+  private static ExecutorApiClient instance = null;
   private ExecutorApiClient(){}
-  private static ExecutorApiClient instance = new ExecutorApiClient();
 
-  /** Function to return the instance of the ExecutorApiClient class.
+  /**
+   * Singleton method to return the instance of the current object.
    * */
-  public static ExecutorApiClient getInstance() {
+  public static ExecutorApiClient getInstance(){
+    if (null == instance){
+      instance = new ExecutorApiClient();
+    }
+
     return instance;
   }
 
   /**Implementing the parseResponse function to return de-serialized Json object.
    * @param response  the returned response from the HttpClient.
-   * @return de-serialized object from Json or empty object if the response doesn't have a body.
+   * @return de-serialized object from Json or null if the response doesn't have a body.
    * */
-  @SuppressWarnings("unchecked")
   @Override
-  protected Map<String, Object> parseResponse(HttpResponse response)
+  protected String parseResponse(HttpResponse response)
       throws HttpResponseException, IOException {
     final StatusLine statusLine = response.getStatusLine();
     String responseBody = response.getEntity() != null ?
@@ -61,14 +59,6 @@ public class ExecutorApiClient extends RestfulApiClient<Map<String, Object>> {
         throw new HttpResponseException(statusLine.getStatusCode(),responseBody);
     }
 
-    final HttpEntity entity = response.getEntity();
-    if (null != entity){
-      Object returnVal = JSONUtils.parseJSONFromString(EntityUtils.toString(entity));
-      if (null!= returnVal){
-        return (Map<String, Object>) returnVal;
-      }
-    }
-
-    return new HashMap<String, Object>() ;
+    return responseBody;
   }
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
new file mode 100644
index 0000000..03de432
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+ /** Class that exposes the statistics from the executor server.
+  *  List of the statistics -
+  *  remainingMemoryPercent;
+  *  remainingMemory;
+  *  remainingFlowCapacity;
+  *  numberOfAssignedFlows;
+  *  lastDispatchedTime;
+  *  cpuUsage;
+  *
+  * */
+  public class ExecutorInfo implements java.io.Serializable{
+    private static final long serialVersionUID = 3009746603773371263L;
+    private double remainingMemoryPercent;
+    private long   remainingMemoryInMB;
+    private int    remainingFlowCapacity;
+    private int    numberOfAssignedFlows;
+    private long   lastDispatchedTime;
+    private double cpuUsage;
+
+    public double getCpuUsage() {
+      return this.cpuUsage;
+    }
+
+    public void setCpuUpsage(double value){
+      this.cpuUsage = value;
+    }
+
+    public double getRemainingMemoryPercent() {
+      return this.remainingMemoryPercent;
+    }
+
+    public void setRemainingMemoryPercent(double value){
+      this.remainingMemoryPercent = value;
+    }
+
+    public long getRemainingMemoryInMB(){
+      return this.remainingMemoryInMB;
+    }
+
+    public void setRemainingMemoryInMB(long value){
+      this.remainingMemoryInMB = value;
+    }
+
+    public int getRemainingFlowCapacity(){
+      return this.remainingFlowCapacity;
+    }
+
+    public void setRemainingFlowCapacity(int value){
+      this.remainingFlowCapacity = value;
+    }
+
+    public long getLastDispatchedTime(){
+      return this.lastDispatchedTime;
+    }
+
+    public void setLastDispatchedTime(long value){
+      this.lastDispatchedTime = value;
+    }
+
+    public int getNumberOfAssignedFlows () {
+      return this.numberOfAssignedFlows;
+    }
+
+    public void setNumberOfAssignedFlows (int value) {
+      this.numberOfAssignedFlows = value;
+    }
+
+    public ExecutorInfo(){}
+
+    public ExecutorInfo (double remainingMemoryPercent,
+        long remainingMemory,
+        int remainingFlowCapacity,
+        long lastDispatched,
+        double cpuUsage,
+        int numberOfAssignedFlows){
+      this.remainingMemoryInMB = remainingMemory;
+      this.cpuUsage = cpuUsage;
+      this.remainingFlowCapacity = remainingFlowCapacity;
+      this.remainingMemoryPercent = remainingMemoryPercent;
+      this.lastDispatchedTime = lastDispatched;
+      this.numberOfAssignedFlows = numberOfAssignedFlows;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (obj instanceof ExecutorInfo)
+        {
+          boolean result = true;
+          ExecutorInfo stat = (ExecutorInfo) obj;
+
+          result &=this.remainingMemoryInMB == stat.remainingMemoryInMB;
+          result &=this.cpuUsage == stat.cpuUsage;
+          result &=this.remainingFlowCapacity == stat.remainingFlowCapacity;
+          result &=this.remainingMemoryPercent == stat.remainingMemoryPercent;
+          result &=this.numberOfAssignedFlows == stat.numberOfAssignedFlows;
+          result &= this.lastDispatchedTime == stat.lastDispatchedTime;
+          return result;
+        }
+        return false;
+    }
+
+    /**
+     * Helper function to get an ExecutorInfo instance from the JSon String serialized from another object.
+     * @param  jsonString the string that will be de-serialized from.
+     * @return instance of the object if the parsing is successful, null other wise.
+     * @throws JsonParseException,JsonMappingException,IOException
+     * */
+    public static ExecutorInfo fromJSONString(String jsonString) throws
+    JsonParseException,
+    JsonMappingException,
+    IOException{
+      if (null == jsonString || jsonString.length() == 0) return null;
+      return new ObjectMapper().readValue(jsonString, ExecutorInfo.class);
+    }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
index 8234f7d..8ef2c76 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateComparator.java
@@ -31,7 +31,7 @@ import azkaban.utils.Pair;
  *
  */
 public abstract class CandidateComparator<T> implements Comparator<T> {
-  private static Logger logger = Logger.getLogger(CandidateComparator.class);
+  protected static Logger logger = Logger.getLogger(CandidateComparator.class);
 
   // internal repository of the registered comparators .
   private Map<String,FactorComparator<T>> factorComparatorList =
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
index 154d528..94b8dfa 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateFilter.java
@@ -28,14 +28,14 @@ import org.apache.log4j.Logger;
  *  register filters using the provided register function.
  */
 public abstract class CandidateFilter<T,V>  {
-  private static Logger logger = Logger.getLogger(CandidateFilter.class);
+  protected static Logger logger = Logger.getLogger(CandidateFilter.class);
 
   // internal repository of the registered filters .
   private Map<String,FactorFilter<T,V>> factorFilterList =
       new ConcurrentHashMap<String,FactorFilter<T,V>>();
 
   /** gets the name of the current implementation of the candidate filter.
-   * @returns : name of the filter.
+   * @return : name of the filter.
    * */
   public abstract String getName();
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
index 0598d95..53cb1bb 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -26,7 +26,7 @@ import org.apache.log4j.Logger;
  *  @param K executor object type.
  *  @param V dispatching object type.
  * */
-public class CandidateSelector<K,V> implements Selector<K, V> {
+public class CandidateSelector<K extends Comparable<K>, V> implements Selector<K, V> {
   private static Logger logger = Logger.getLogger(CandidateComparator.class);
 
   private CandidateFilter<K,V> filter;
@@ -70,12 +70,16 @@ public class CandidateSelector<K,V> implements Selector<K, V> {
 
      logger.info(String.format("candidate count after filtering: %s", filteredList.size()));
      if (filteredList.size() == 0){
-       logger.info("failed to select candidate as the filted candidate list is empty.");
+       logger.info("failed to select candidate as the filtered candidate list is empty.");
        return null;
      }
 
+     if (null == comparator){
+       logger.info("candidate comparator is not specified, default hash code comparator class will be used.");
+     }
+
      // final work - find the best candidate from the filtered list.
-     K executor = Collections.max(filteredList,comparator);
+     K executor = Collections.max(filteredList, comparator);
      logger.info(String.format("candidate selected %s",
          null == executor ? "(null)" : executor.toString()));
      return executor;
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
new file mode 100644
index 0000000..eafab67
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorInfo;
+
+
+/**
+ * De-normalized version of the CandidateComparator, which also contains the implementation of the factor comparators.
+ * */
+public class ExecutorComparator extends CandidateComparator<Executor> {
+  private static Map<String, ComparatorCreator> comparatorCreatorRepository = null;
+
+  /**
+   * Gets the name list of all available comparators.
+   * @return the list of the names.
+   * */
+  public static Set<String> getAvailableComparatorNames(){
+    return comparatorCreatorRepository.keySet();
+  }
+
+  // factor comparator names
+  private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
+  private static final String MEMORY_COMPARATOR_NAME = "Memory";
+  private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
+  private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
+
+  /**
+   * static initializer of the class.
+   * We will build the filter repository here.
+   * when a new comparator is added, please do remember to register it here.
+   * */
+  static {
+    comparatorCreatorRepository = new HashMap<String, ComparatorCreator>();
+
+    // register the creator for number of assigned flow comparator.
+    comparatorCreatorRepository.put(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, new ComparatorCreator(){
+      public FactorComparator<Executor> create(int weight) { return getNumberOfAssignedFlowComparator(weight); }});
+
+    // register the creator for memory comparator.
+    comparatorCreatorRepository.put(MEMORY_COMPARATOR_NAME, new ComparatorCreator(){
+      public FactorComparator<Executor> create(int weight) { return getMemoryComparator(weight); }});
+
+    // register the creator for last dispatched time comparator.
+    comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
+      public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
+
+    // register the creator for CPU Usage comparator.
+    comparatorCreatorRepository.put(CPUUSAGE_COMPARATOR_NAME, new ComparatorCreator(){
+      public FactorComparator<Executor> create(int weight) { return getCpuUsageComparator(weight); }});
+  }
+
+
+  /**
+   * constructor of the ExecutorComparator.
+   * @param comparatorList   the list of comparator, plus its weight information to be registered,
+   *  the parameter must be a not-empty and valid list object.
+   * */
+  public ExecutorComparator(Map<String,Integer> comparatorList) {
+    if (null == comparatorList|| comparatorList.size() == 0){
+      throw new IllegalArgumentException("failed to initialize executor comparator" +
+                                         "as the passed comparator list is invalid or empty.");
+    }
+
+    // register the comparators, we will now throw here if the weight is invalid, it is handled in the super.
+    for (Entry<String,Integer> entry : comparatorList.entrySet()){
+      if (comparatorCreatorRepository.containsKey(entry.getKey())){
+        this.registerFactorComparator(comparatorCreatorRepository.
+            get(entry.getKey()).
+            create(entry.getValue()));
+      } else {
+        throw new IllegalArgumentException(String.format("failed to initialize executor comparator " +
+                                        "as the comparator implementation for requested factor '%s' doesn't exist.",
+                                        entry.getKey()));
+      }
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "ExecutorComparator";
+  }
+
+  private interface ComparatorCreator{
+    FactorComparator<Executor> create(int weight);
+  }
+
+  /**<pre>
+   * helper function that does the object  on two statistics, comparator can leverage this function to provide
+   * shortcuts if   the statistics object is missing from one or both sides of the executors.
+   * </pre>
+   * @param stat1   the first statistics  object to be checked .
+   * @param stat2   the second statistics object to be checked.
+   * @param caller  the name of the calling function, for logging purpose.
+   * @param result  result Integer to pass out the result in case the statistics are not both valid.
+   * @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
+   *         false otherwise.
+   * */
+  private static boolean statisticsObjectCheck(ExecutorInfo statisticsObj1,
+                                               ExecutorInfo statisticsObj2, String caller, Integer result){
+    result = 0 ;
+    // both doesn't expose the info
+    if (null == statisticsObj1 && null == statisticsObj2){
+      logger.info(String.format("%s : neither of the executors exposed statistics info.",
+          caller));
+      return true;
+    }
+
+    //right side doesn't expose the info.
+    if (null == statisticsObj2 ){
+        logger.info(String.format("%s : choosing left side and the right side executor doesn't expose statistics info",
+            caller));
+        result = 1;
+        return true;
+    }
+
+    //left side doesn't expose the info.
+    if (null == statisticsObj1 ){
+      logger.info(String.format("%s : choosing right side and the left side executor doesn't expose statistics info",
+          caller));
+      result = -1;
+      return true;
+      }
+
+    // both not null
+    return false;
+  }
+
+  /**
+   * function defines the number of assigned flow comparator.
+   * @param weight weight of the comparator.
+   * */
+  private static FactorComparator<Executor> getNumberOfAssignedFlowComparator(int weight){
+    return FactorComparator.create(NUMOFASSIGNEDFLOW_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        ExecutorInfo stat1 = o1.getExecutorInfo();
+        ExecutorInfo stat2 = o2.getExecutorInfo();
+
+        Integer result = 0;
+        if (statisticsObjectCheck(stat1,stat2,NUMOFASSIGNEDFLOW_COMPARATOR_NAME,result)){
+          return result;
+        }
+        return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
+      }});
+  }
+
+  /**
+   * function defines the cpuUsage comparator.
+   * @param weight weight of the comparator.
+   * @return
+   * */
+  private static FactorComparator<Executor> getCpuUsageComparator(int weight){
+    return FactorComparator.create(CPUUSAGE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        ExecutorInfo stat1 = o1.getExecutorInfo();
+        ExecutorInfo stat2 = o2.getExecutorInfo();
+
+        int result = 0;
+        if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
+          return result;
+        }
+
+        // CPU usage , the lesser the value is, the better.
+        return ((Double)stat2.getCpuUsage()).compareTo(stat1.getCpuUsage());
+      }});
+  }
+
+
+  /**
+   * function defines the last dispatched time comparator.
+   * @param weight weight of the comparator.
+   * @return
+   * */
+  private static FactorComparator<Executor> getLstDispatchedTimeComparator(int weight){
+    return FactorComparator.create(LSTDISPATCHED_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        ExecutorInfo stat1 = o1.getExecutorInfo();
+        ExecutorInfo stat2 = o2.getExecutorInfo();
+
+        int result = 0;
+        if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
+          return result;
+        }
+        // Note: an earlier date time indicates higher weight.
+        return ((Long)stat2.getLastDispatchedTime()).compareTo(stat1.getLastDispatchedTime());
+      }});
+  }
+
+
+  /**<pre>
+   * function defines the Memory comparator.
+   * Note: comparator firstly take the absolute value of the remaining memory, if both sides have the same value,
+   *       it go further to check the percent of the remaining memory.
+   * </pre>
+   * @param weight weight of the comparator.
+
+   * @return
+   * */
+  private static FactorComparator<Executor> getMemoryComparator(int weight){
+    return FactorComparator.create(MEMORY_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+       ExecutorInfo stat1 = o1.getExecutorInfo();
+       ExecutorInfo stat2 = o2.getExecutorInfo();
+
+       int result = 0;
+       if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
+         return result;
+       }
+
+       if (stat1.getRemainingMemoryInMB() != stat2.getRemainingMemoryInMB()){
+         return stat1.getRemainingMemoryInMB() > stat2.getRemainingMemoryInMB() ? 1:-1;
+       }
+
+       return Double.compare(stat1.getRemainingMemoryPercent(), stat2.getRemainingMemoryPercent());
+      }});
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
new file mode 100644
index 0000000..031650e
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+import azkaban.executor.ExecutorInfo;
+
+/**
+ * De-normalized version of the candidateFilter, which also contains the implementation of the factor filters.
+ * */
+public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFlow> {
+  private static Map<String, FactorFilter<Executor, ExecutableFlow>> filterRepository = null;
+
+  /**
+   * Gets the name list of all available filters.
+   * @return the list of the names.
+   * */
+  public static Set<String> getAvailableFilterNames(){
+    return filterRepository.keySet();
+  }
+
+
+  // factor filter names.
+  private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
+  private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimunFreeMemory";
+  private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";
+
+  /**
+   * static initializer of the class.
+   * We will build the filter repository here.
+   * when a new filter is added, please do remember to register it here.
+   * */
+  static {
+    filterRepository = new HashMap<String, FactorFilter<Executor, ExecutableFlow>>();
+    filterRepository.put(STATICREMAININGFLOWSIZE_FILTER_NAME, getStaticRemainingFlowSizeFilter());
+    filterRepository.put(MINIMUMFREEMEMORY_FILTER_NAME, getMinimumReservedMemoryFilter());
+    filterRepository.put(CPUSTATUS_FILTER_NAME, getCpuStatusFilter());
+  }
+
+  /**
+   * constructor of the ExecutorFilter.
+   * @param filterList   the list of filter to be registered, the parameter must be a not-empty and valid list object.
+   * */
+  public ExecutorFilter(List<String> filterList) {
+    // shortcut if the filter list is invalid. A little bit ugly to have to throw in constructor.
+    if (null == filterList || filterList.size() == 0){
+      logger.error("failed to initialize executor filter as the passed filter list is invalid or empty.");
+      throw new IllegalArgumentException("filterList");
+    }
+
+    // register the filters according to the list.
+    for (String filterName : filterList){
+      if (filterRepository.containsKey(filterName)){
+        this.registerFactorFilter(filterRepository.get(filterName));
+      } else {
+        logger.error(String.format("failed to initialize executor filter "+
+                                   "as the filter implementation for requested factor '%s' doesn't exist.",
+                                   filterName));
+        throw new IllegalArgumentException("filterList");
+      }
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "ExecutorFilter";
+  }
+
+  /**<pre>
+   * function to register the static remaining flow size filter.
+   * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+   *        Coming for the passed flow.
+   *        Ideally this filter will make sure only the executor hasn't reached the Max allowed # of executing flows.
+   *</pre>
+   * */
+  private static FactorFilter<Executor, ExecutableFlow> getStaticRemainingFlowSizeFilter(){
+    return FactorFilter.create(STATICREMAININGFLOWSIZE_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+      public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+        if (null == filteringTarget){
+          logger.info(String.format("%s : filtering out the target as it is null.", STATICREMAININGFLOWSIZE_FILTER_NAME));
+          return false;
+        }
+
+        ExecutorInfo stats = filteringTarget.getExecutorInfo();
+        if (null == stats) {
+          logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+              STATICREMAININGFLOWSIZE_FILTER_NAME,
+              filteringTarget.toString()));
+          return false;
+        }
+        return stats.getRemainingFlowCapacity() > 0 ;
+       }
+    });
+  }
+
+  /**
+   * function to register the static Minimum Reserved Memory filter.
+   * NOTE : this is a static filter which means the filter will be filtering based on the system standard which is not
+   *        Coming for the passed flow.
+   *        This filter will filter out any executors that has the remaining  memory below 6G
+   * */
+  private static FactorFilter<Executor, ExecutableFlow> getMinimumReservedMemoryFilter(){
+    return FactorFilter.create(MINIMUMFREEMEMORY_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+      private static final int MINIMUM_FREE_MEMORY = 6 * 1024;
+      public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+        if (null == filteringTarget){
+          logger.info(String.format("%s : filtering out the target as it is null.", MINIMUMFREEMEMORY_FILTER_NAME));
+          return false;
+        }
+
+        ExecutorInfo stats = filteringTarget.getExecutorInfo();
+        if (null == stats) {
+          logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+              MINIMUMFREEMEMORY_FILTER_NAME,
+              filteringTarget.toString()));
+          return false;
+        }
+        return stats.getRemainingMemoryInMB() > MINIMUM_FREE_MEMORY ;
+       }
+    });
+  }
+
+
+  /**
+   * function to register the static Minimum Reserved Memory filter.
+   * NOTE : <pre> this is a static filter which means the filter will be filtering based on the system standard which
+   *        is not Coming for the passed flow.
+   *        This filter will filter out any executors that the current CPU usage exceed 95%
+   *        </pre>
+   * */
+  private static FactorFilter<Executor, ExecutableFlow> getCpuStatusFilter(){
+    return FactorFilter.create(CPUSTATUS_FILTER_NAME, new FactorFilter.Filter<Executor, ExecutableFlow>() {
+      private static final int MAX_CPU_CURRENT_USAGE = 95;
+      public boolean filterTarget(Executor filteringTarget, ExecutableFlow referencingObject) {
+        if (null == filteringTarget){
+          logger.info(String.format("%s : filtering out the target as it is null.", CPUSTATUS_FILTER_NAME));
+          return false;
+        }
+
+        ExecutorInfo stats = filteringTarget.getExecutorInfo();
+        if (null == stats) {
+          logger.info(String.format("%s : filtering out %s as it's stats is unavailable.",
+              MINIMUMFREEMEMORY_FILTER_NAME,
+              filteringTarget.toString()));
+          return false;
+        }
+        return stats.getCpuUsage() < MAX_CPU_CURRENT_USAGE ;
+       }
+    });
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
new file mode 100644
index 0000000..d7236e8
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorSelector.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.executor.selector;
+
+import java.util.List;
+import java.util.Map;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.Executor;
+
+/**<pre>
+ * Executor selector class implementation.
+ * NOTE: This class is a de-generalized version of the CandidateSelector, which provides a
+ *       clean and convenient constructor to take in filter and comparator name list and build
+ *       the instance from that.
+ *</pre>
+ * */
+public class ExecutorSelector extends CandidateSelector<Executor, ExecutableFlow> {
+
+  /**
+   * Contractor of the class.
+   * @param filterList      name list of the filters to be registered,
+   *                        filter feature will be disabled if a null value is passed.
+   * @param comparatorList  name/weight pair list of the comparators to be registered ,
+   *                        again comparator feature is disabled if a null value is passed.
+   * */
+  public ExecutorSelector(List<String> filterList, Map<String,Integer> comparatorList) {
+    super(null == filterList || filterList.isEmpty() ?         null : new ExecutorFilter(filterList),
+          null == comparatorList || comparatorList.isEmpty() ? null : new ExecutorComparator(comparatorList));
+  }
+}
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
index 1d9d162..6d4d2e0 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -33,7 +33,7 @@ public final class FactorComparator<T>{
    *  method provided below.
    * @param factorName : the factor name .
    * @param weight : the weight of the comparator.
-   * @ comparator : function to be provided by user on how the comparison should be made.
+   * @param comparator : function to be provided by user on how the comparison should be made.
    * */
   private FactorComparator(String factorName, int weight, Comparator<T> comparator){
     this.factorName = factorName;
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
index 74b9d5d..04b04b7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -65,13 +65,11 @@ public final class FactorFilter<T,V>{
   public interface Filter<T,V>{
 
     /**function to analyze the target item according to the reference object to decide whether the item should be filtered.
-     * @param filteringTarget:   object to be checked.
-     * @param referencingObject: object which contains statistics based on which a decision is made whether
+     * @param filteringTarget   object to be checked.
+     * @param referencingObject object which contains statistics based on which a decision is made whether
      *                      the object being checked need to be filtered or not.
      * @return true if the check passed, false if check failed, which means the item need to be filtered.
      * */
     public boolean filterTarget(T filteringTarget, V referencingObject);
   }
-
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
index 610f75a..605fd28 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/Selector.java
@@ -19,13 +19,15 @@ package azkaban.executor.selector;
 import java.util.List;
 
 
-/** Definition of the selector interface.
+/**<pre>
+ *  Definition of the selector interface.
  *  an implementation of the selector interface provides the functionality
  *  to return a candidate from the candidateList that suits best for the dispatchingObject.
+ * </pre>
  *  @param K : type of the candidate.
  *  @param V : type of the dispatching object.
  */
-public interface Selector <K,V> {
+public interface Selector <K extends Comparable<K>,V> {
 
   /** Function returns the next best suit candidate from the candidateList for the dispatching object.
    *  @param  candidateList : List of the candidates to select from .
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index b5b4509..e54b182 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -20,6 +20,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.log4j.BasicConfigurator;
 import org.junit.After;
@@ -29,10 +32,11 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import azkaban.executor.selector.*;
+import azkaban.utils.JSONUtils;
 
 public class SelectorTest {
   // mock executor object.
-  protected class MockExecutorObject{
+  protected class MockExecutorObject implements Comparable <MockExecutorObject>{
     public String name;
     public int    port;
     public double percentOfRemainingMemory;
@@ -66,6 +70,11 @@ public class SelectorTest {
     {
       return this.name;
     }
+
+    @Override
+    public int compareTo(MockExecutorObject o) {
+      return null == o ? 1 : this.hashCode() - o.hashCode();
+    }
   }
 
   // Mock flow object.
@@ -602,4 +611,128 @@ public class SelectorTest {
     Assert.assertTrue(null == executor);
 
   }
+
+  @Test
+  public void testCreatingExectorfilterObject() throws Exception{
+    List<String> validList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+    try {
+      new ExecutorFilter(validList);
+    }catch (Exception ex){
+      Assert.fail("creating ExecutorFilter with valid list throws exception . ex -" + ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreatingExectorfilterObjectWInvalidList() throws Exception{
+    List<String> invalidList = new ArrayList<String>();
+    invalidList.add("notExistingFilter");
+    Exception result = null;
+    try {
+      new ExecutorFilter(invalidList);
+    }catch (Exception ex){
+      if (ex instanceof IllegalArgumentException)
+      result = ex;
+    }
+    Assert.assertNotNull(result);
+  }
+
+  @Test
+  public void testCreatingExectorComparatorObject() throws Exception{
+   Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+   for (String name : ExecutorComparator.getAvailableComparatorNames()){
+     comparatorMap.put(name, 1);
+   }
+   try {
+      new ExecutorComparator(comparatorMap);
+    }catch (Exception ex){
+      Assert.fail("creating ExecutorComparator with valid list throws exception . ex -" + ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreatingExectorComparatorObjectWInvalidName() throws Exception{
+    Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+    comparatorMap.put("invalidName", 0);
+    Exception result = null;
+    try {
+      new ExecutorComparator(comparatorMap);
+    }catch (Exception ex){
+      if (ex instanceof IllegalArgumentException)
+      result = ex;
+    }
+    Assert.assertNotNull(result);
+  }
+
+  @Test
+  public void testCreatingExectorComparatorObjectWInvalidWeight() throws Exception{
+    Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+    for (String name : ExecutorComparator.getAvailableComparatorNames()){
+      comparatorMap.put(name, -1);
+    }
+    Exception result = null;
+    try {
+      new ExecutorComparator(comparatorMap);
+    }catch (Exception ex){
+      if (ex instanceof IllegalArgumentException)
+      result = ex;
+    }
+    Assert.assertNotNull(result);
+  }
+
+  @Test
+  public void testCreatingExecutorSelectorWithEmptyFilterComparatorList() throws Exception{
+    List<Executor> executorList = new ArrayList<Executor>();
+    executorList.add(new Executor(1, "host1", 80, true));
+    executorList.add(new Executor(2, "host2", 80, true));
+    executorList.add(new Executor(3, "host3", 80, true));
+
+    executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+    executorList.get(1).setExecutorInfo(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90,  0));
+    executorList.get(2).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90,  0));
+
+    ExecutableFlow flow = new ExecutableFlow();
+
+    ExecutorSelector selector = new ExecutorSelector(null , null);
+    Executor executor = selector.getBest(executorList, flow);
+    Assert.assertEquals(executorList.get(2), executor);
+  }
+
+
+  @Test
+  public void testExecutorSelectorE2E() throws Exception{
+    List<String> filterList = new ArrayList<String>(ExecutorFilter.getAvailableFilterNames());
+    Map<String,Integer> comparatorMap = new HashMap<String,Integer>();
+    List<Executor> executorList = new ArrayList<Executor>();
+    executorList.add(new Executor(1, "host1", 80, true));
+    executorList.add(new Executor(2, "host2", 80, true));
+    executorList.add(new Executor(3, "host3", 80, true));
+
+    executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 0));
+    executorList.get(1).setExecutorInfo(new ExecutorInfo(50, 14095, 50, System.currentTimeMillis(), 90,  0));
+    executorList.get(2).setExecutorInfo(new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 90,  0));
+
+    ExecutableFlow flow = new ExecutableFlow();
+
+    for (String name : ExecutorComparator.getAvailableComparatorNames()){
+      comparatorMap.put(name, 1);
+    }
+    ExecutorSelector selector = new ExecutorSelector(filterList,comparatorMap);
+    Executor executor = selector.getBest(executorList, flow);
+    Assert.assertEquals(executorList.get(0), executor);
+
+    // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
+    // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
+    executorList.get(0).setExecutorInfo(new ExecutorInfo(99.9, 4095, 50, System.currentTimeMillis(), 90, 1));
+    executor = selector.getBest(executorList, flow);
+    Assert.assertEquals(executorList.get(2), executor);
+  }
+
+  @Test
+  public void  testExecutorInfoJsonParser() throws Exception{
+    ExecutorInfo exeInfo = new ExecutorInfo(99.9, 14095, 50, System.currentTimeMillis(), 89, 10);
+    String json = JSONUtils.toJSON(exeInfo);
+    ExecutorInfo exeInfo2 = ExecutorInfo.fromJSONString(json);
+    Assert.assertTrue(exeInfo.equals(exeInfo2));
+  }
+
 }
diff --git a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
index 905cf5e..125f1b3 100644
--- a/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
+++ b/azkaban-common/src/test/java/azkaban/utils/cache/CacheTest.java
@@ -84,9 +84,9 @@ public class CacheTest {
 
   @Test
   public void testTimeToLiveExpiry() {
-    CacheManager.setUpdateFrequency(200);
     CacheManager manager = CacheManager.getInstance();
     Cache cache = manager.createCache();
+    CacheManager.setUpdateFrequency(200);
 
     cache.setUpdateFrequencyMs(200);
     cache.setEjectionPolicy(EjectionPolicy.FIFO);
@@ -122,9 +122,9 @@ public class CacheTest {
 
   @Test
   public void testIdleExpireExpiry() {
-    CacheManager.setUpdateFrequency(250);
     CacheManager manager = CacheManager.getInstance();
     Cache cache = manager.createCache();
+    CacheManager.setUpdateFrequency(250);
 
     cache.setUpdateFrequencyMs(250);
     cache.setEjectionPolicy(EjectionPolicy.FIFO);
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 6e012ab..235989d 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -131,6 +131,7 @@ public class AzkabanExecutorServer {
     root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
     root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
     root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
+    root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverstastics");
 
     root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
 
@@ -175,7 +176,7 @@ public class AzkabanExecutorServer {
 
   /**
    * Configure Metric Reporting as per azkaban.properties settings
-   * 
+   *
    * @throws MetricException
    */
   private void configureMetricReports() throws MetricException {
@@ -224,13 +225,13 @@ public class AzkabanExecutorServer {
   /**
    * Load a custom class, which is provided by a configuration
    * CUSTOM_JMX_ATTRIBUTE_PROCESSOR_PROPERTY.
-   * 
+   *
    * This method will try to instantiate an instance of this custom class and
    * with given properties as the argument in the constructor.
-   * 
+   *
    * Basically the custom class must have a constructor that takes an argument
    * with type Properties.
-   * 
+   *
    * @param props
    */
   private void loadCustomJMXAttributeProcessor(Props props) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index c8d7f1c..361db00 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -142,6 +143,9 @@ public class FlowRunnerManager implements EventListener,
 
   private Object executionDirDeletionSync = new Object();
 
+  // date time of the the last flow submitted.
+  private long lastFlowSubmittedDate = 0;
+
   public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
       ProjectLoader projectLoader, ClassLoader parentClassLoader)
       throws IOException {
@@ -254,6 +258,13 @@ public class FlowRunnerManager implements EventListener,
     return allProjects;
   }
 
+  public long getLastFlowSubmittedTime(){
+    // Note: this is not thread safe and may result in providing dirty data.
+    //       we will provide this data as is for now and will revisit if there
+    //       is a string justification for change.
+    return lastFlowSubmittedDate;
+  }
+
   public Props getGlobalProps() {
     return globalProps;
   }
@@ -507,6 +518,8 @@ public class FlowRunnerManager implements EventListener,
       Future<?> future = executorService.submit(runner);
       // keep track of this future
       submittedFlows.put(future, runner.getExecutionId());
+      // update the last submitted time.
+      this.lastFlowSubmittedDate = System.currentTimeMillis();
     } catch (RejectedExecutionException re) {
       throw new ExecutorManagerException(
           "Azkaban server can't execute any more flows. "
@@ -517,7 +530,7 @@ public class FlowRunnerManager implements EventListener,
 
   /**
    * Configure Azkaban metrics tracking for a new flowRunner instance
-   * 
+   *
    * @param flowRunner
    */
   private void configureFlowLevelMetrics(FlowRunner flowRunner) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
new file mode 100644
index 0000000..5566837
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutorInfo;
+import azkaban.utils.JSONUtils;
+
+public class ServerStatisticsServlet extends HttpServlet  {
+  private static final long serialVersionUID = 1L;
+  private static final int  cacheTimeInMilliseconds = 1000;
+  private static final int  samplesToTakeForMemory = 1;
+  private static final Logger logger = Logger.getLogger(ServerStatisticsServlet.class);
+  private static final String noCacheParamName = "nocache";
+
+  protected static long lastRefreshedTime = 0;
+  protected static ExecutorInfo cachedstats = null;
+
+  /**
+   * Handle all get request to Statistics Servlet {@inheritDoc}
+   *
+   * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
+   *      javax.servlet.http.HttpServletResponse)
+   */
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+
+    boolean noCache = null!= req && Boolean.valueOf(req.getParameter(noCacheParamName));
+
+    if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds){
+      this.populateStatistics(noCache);
+    }
+
+    JSONUtils.toJSON(cachedstats, resp.getOutputStream(), true);
+  }
+
+  /**
+   * fill the result set with the percent of the remaining system memory on the server.
+   * @param stats reference to the result container which contains all the results, this specific method
+   *              will only work work on the property "remainingMemory" and "remainingMemoryPercent".
+   *
+   * NOTE:
+   * a double value will be used to present the remaining memory,
+   *         a returning value of '55.6' means 55.6%
+   */
+  protected void fillRemainingMemoryPercent(ExecutorInfo stats){
+    if (new File("/bin/bash").exists()
+        && new File("/bin/cat").exists()
+        && new File("/bin/grep").exists()
+        &&  new File("/proc/meminfo").exists()) {
+    java.lang.ProcessBuilder processBuilder =
+        new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/meminfo | grep -E \"MemTotal|MemFree\"");
+    try {
+      ArrayList<String> output = new ArrayList<String>();
+      Process process = processBuilder.start();
+      process.waitFor();
+      InputStream inputStream = process.getInputStream();
+      try {
+        java.io.BufferedReader reader =
+            new java.io.BufferedReader(new InputStreamReader(inputStream));
+        String line = null;
+        while ((line = reader.readLine()) != null) {
+          output.add(line);
+        }
+      }finally {
+        inputStream.close();
+      }
+
+      long totalMemory = 0;
+      long  freeMemory = 0;
+
+      // process the output from bash call.
+      // we expect the result from the bash call to be something like following -
+      // MemTotal:       65894264 kB
+      // MemFree:        61104076 kB
+      if (output.size() == 2) {
+        for (String result : output){
+          // find the total memory and value the variable.
+          if (result.contains("MemTotal") && result.split("\\s+").length > 2){
+            try {
+              totalMemory = Long.parseLong(result.split("\\s+")[1]);
+              logger.info("Total memory : " + totalMemory);
+            }catch(NumberFormatException e){
+              logger.error("yielding 0 for total memory as output is invalid -" + result);
+            }
+          }
+          // find the free memory and value the variable.
+          if (result.contains("MemFree") && result.split("\\s+").length > 2){
+            try {
+              freeMemory = Long.parseLong(result.split("\\s+")[1]);
+              logger.info("Free memory : " + freeMemory);
+            }catch(NumberFormatException e){
+              logger.error("yielding 0 for total memory as output is invalid -" + result);
+            }
+          }
+        }
+      }else {
+        logger.error("failed to get total/free memory info as the bash call returned invalid result.");
+      }
+
+      // the number got from the proc file is in KBs we want to see the number in MBs so we are deviding it by 1024.
+      stats.setRemainingMemoryInMB(freeMemory/1024);
+      stats.setRemainingMemoryPercent(totalMemory == 0 ? 0 : ((double)freeMemory / (double)totalMemory)*100);
+    }
+    catch (Exception ex){
+      logger.error("failed fetch system memory info " +
+                   "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+    }
+  } else {
+      logger.error("failed fetch system memory info, one or more files from the following list are missing -  " +
+                   "'/bin/bash'," + "'/bin/cat'," +"'/proc/loadavg'");
+  }
+  }
+
+  /**
+   * call the data providers to fill the returning data container for statistics data.
+   * This function refreshes the static cached copy of data in case if necessary.
+   * */
+  protected synchronized void populateStatistics(boolean noCache){
+    //check again before starting the work.
+    if (noCache || System.currentTimeMillis() - lastRefreshedTime  > cacheTimeInMilliseconds){
+      final ExecutorInfo stats = new ExecutorInfo();
+
+      List<Thread> workerPool = new ArrayList<Thread>();
+      workerPool.add(new Thread(new Runnable(){ public void run() {
+        fillRemainingMemoryPercent(stats); }},"RemainingMemoryPercent"));
+
+      workerPool.add(new Thread(new Runnable(){ public void run() {
+        fillRemainingFlowCapacityAndLastDispatchedTime(stats); }},"RemainingFlowCapacityAndLastDispatchedTime"));
+
+      workerPool.add(new Thread(new Runnable(){ public void run() {
+        fillCpuUsage(stats); }},"CpuUsage"));
+
+      // start all the working threads.
+      for (Thread thread : workerPool){thread.start();}
+
+      // wait for all the threads to finish their work.
+      // NOTE: the result container itself is not thread safe, we are good as for now no
+      //       working thread will modify the same property, nor have any of them
+      //       need to compute values based on value(s) of other properties.
+      for (Thread thread : workerPool){
+        try {
+          // we gave maxim 5 seconds to let the thread finish work.
+          thread.join(5000);;
+        } catch (InterruptedException e) {
+          logger.error(String.format("failed to collect information for %s as the working thread is interrupted. Ex - ",
+              thread.getName(), e.getMessage()));
+        }}
+
+      cachedstats = stats;
+      lastRefreshedTime =  System.currentTimeMillis();
+    }
+  }
+
+  /**
+   * fill the result set with the remaining flow capacity .
+   * @param stats reference to the result container which contains all the results, this specific method
+   *              will only work on the property "remainingFlowCapacity".
+   */
+  protected void fillRemainingFlowCapacityAndLastDispatchedTime(ExecutorInfo stats){
+
+    AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
+    if (server != null){
+      FlowRunnerManager runnerMgr =  AzkabanExecutorServer.getApp().getFlowRunnerManager();
+      int assignedFlows = runnerMgr.getNumRunningFlows() + runnerMgr.getNumQueuedFlows();
+      stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() - assignedFlows);
+      stats.setNumberOfAssignedFlows(assignedFlows);
+      stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
+    }else {
+      logger.error("failed to get data for remaining flow capacity or LastDispatchedTime" +
+                   " as the AzkabanExecutorServer has yet been initialized.");
+    }
+  }
+
+
+  /**<pre>
+   * fill the result set with the Remaining temp Storage .
+   * Note : As the Top bash call doesn't yield accurate result for the system load,
+   *        the implementation has been changed to load from the "proc/loadavg" which keeps
+   *        the moving average of the system load, we are pulling the average for the recent 1 min.
+   *</pre>
+   * @param stats reference to the result container which contains all the results, this specific method
+   *              will only work on the property "cpuUdage".
+   */
+  protected void fillCpuUsage(ExecutorInfo stats){
+    if (new File("/bin/bash").exists() && new File("/bin/cat").exists() &&  new File("/proc/loadavg").exists()) {
+      java.lang.ProcessBuilder processBuilder =
+          new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/loadavg");
+      try {
+        ArrayList<String> output = new ArrayList<String>();
+        Process process = processBuilder.start();
+        process.waitFor();
+        InputStream inputStream = process.getInputStream();
+        try {
+          java.io.BufferedReader reader =
+              new java.io.BufferedReader(new InputStreamReader(inputStream));
+          String line = null;
+          while ((line = reader.readLine()) != null) {
+            output.add(line);
+          }
+        }finally {
+          inputStream.close();
+        }
+
+        // process the output from bash call.
+        if (output.size() > 0) {
+          String[] splitedresult = output.get(0).split("\\s+");
+          double cpuUsage = 0.0;
+
+          try {
+            cpuUsage = Double.parseDouble(splitedresult[0]);
+          }catch(NumberFormatException e){
+            logger.error("yielding 0.0 for CPU usage as output is invalid -" + output.get(0));
+          }
+          logger.info("System load : " + cpuUsage);
+          stats.setCpuUpsage(cpuUsage);
+        }
+      }
+      catch (Exception ex){
+        logger.error("failed fetch system load info " +
+                     "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
+      }
+    } else {
+        logger.error("failed fetch system load info, one or more files from the following list are missing -  " +
+                     "'/bin/bash'," + "'/bin/cat'," +"'/proc/loadavg'");
+    }
+  }
+}
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
new file mode 100644
index 0000000..55078b5
--- /dev/null
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -0,0 +1,76 @@
+package azkaban.execapp;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.executor.ExecutorInfo;
+
+public class StatisticsServletTest {
+  private class MockStatisticsServlet extends ServerStatisticsServlet{
+    /** */
+    private static final long serialVersionUID = 1L;
+
+    public  ExecutorInfo getStastics(){
+      return cachedstats;
+    }
+
+    public  long getUpdatedTime(){
+      return lastRefreshedTime;
+    }
+
+    public void callPopulateStatistics(){
+       this.populateStatistics(false);
+    }
+
+    public void callFillCpuUsage(ExecutorInfo stats){
+      this.fillCpuUsage(stats);}
+
+    public void callFillRemainingMemoryPercent(ExecutorInfo stats){
+        this.fillRemainingMemoryPercent(stats);}
+  }
+  private MockStatisticsServlet statServlet = new MockStatisticsServlet();
+
+  @Test
+  public void testFillMemory()  {
+    ExecutorInfo stats = new ExecutorInfo();
+    this.statServlet.callFillRemainingMemoryPercent(stats);
+    // assume any machine that runs this test should
+    // have bash and top available and at least got some remaining memory.
+    Assert.assertTrue(stats.getRemainingMemoryInMB() > 0);
+    Assert.assertTrue(stats.getRemainingMemoryPercent() > 0);
+  }
+
+  @Test
+  public void testFillCpu()  {
+    ExecutorInfo stats = new ExecutorInfo();
+    this.statServlet.callFillCpuUsage(stats);
+    Assert.assertTrue(stats.getCpuUsage() > 0);
+  }
+
+  @Test
+  public void testPopulateStatistics()  {
+    this.statServlet.callPopulateStatistics();
+    Assert.assertNotNull(this.statServlet.getStastics());
+    Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryInMB() > 0);
+    Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryPercent() > 0);
+    Assert.assertTrue(this.statServlet.getStastics().getCpuUsage() > 0);
+  }
+
+  @Test
+  public void testPopulateStatisticsCache()  {
+    this.statServlet.callPopulateStatistics();
+    final long updatedTime = this.statServlet.getUpdatedTime();
+    while (System.currentTimeMillis() - updatedTime < 1000){
+      this.statServlet.callPopulateStatistics();
+      Assert.assertEquals(updatedTime, this.statServlet.getUpdatedTime());
+    }
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {}
+
+    // make sure cache expires after timeout.
+    this.statServlet.callPopulateStatistics();
+    Assert.assertNotEquals(updatedTime, this.statServlet.getUpdatedTime());
+  }
+}