azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Executor.java b/azkaban-common/src/main/java/azkaban/executor/Executor.java
index 4153270..b2c3766 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Executor.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Executor.java
@@ -17,9 +17,6 @@
 package azkaban.executor;
 
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
 import azkaban.utils.Utils;
 
 /**
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
index 55c0864..7a25891 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/CandidateSelector.java
@@ -75,7 +75,7 @@ public class CandidateSelector<K,V> implements Selector<K, V> {
      }
 
      if (null == comparator){
-       logger.info("candidate comparator is not specified, default comparator from 'Collections' class will be used");
+       logger.info("candidate comparator is not specified, default comparator from 'Collections' class will be used.");
      }
 
      // final work - find the best candidate from the filtered list.
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index f0c46cd..6fc8f22 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -43,6 +43,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
   private static final String REMAININGTMPSIZE_COMPARATOR_NAME = "RemainingTmpSize";
   private static final String PRIORITY_COMPARATOR_NAME = "Priority";
   private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
+  private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
 
   /**
    * static initializer of the class.
@@ -71,6 +72,10 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
     // register the creator for last dispatched time comparator.
     comparatorCreatorRepository.put(LSTDISPATCHED_COMPARATOR_NAME, new ComparatorCreator(){
       @Override public FactorComparator<Executor> create(int weight) { return getLstDispatchedTimeComparator(weight); }});
+
+    // register the creator for CPU Usage comparator.
+    comparatorCreatorRepository.put(CPUUSAGE_COMPARATOR_NAME, new ComparatorCreator(){
+      @Override public FactorComparator<Executor> create(int weight) { return getCpuUsageComparator(weight); }});
   }
 
 
@@ -109,35 +114,35 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
   }
 
   /**
-   * helper function that does the object  on two stats, comparator can leverage this function to provide
-   * shortcuts if   the stat object is missing from one or both sides of the executors.
-   * @param stat1   the first stat object to be checked .
-   * @param stat2   the second stat object to be checked.
+   * helper function that does the object  on two statistics, comparator can leverage this function to provide
+   * shortcuts if   the statistics object is missing from one or both sides of the executors.
+   * @param stat1   the first statistics  object to be checked .
+   * @param stat2   the second statistics object to be checked.
    * @param caller  the name of the calling function, for logging purpose.
-   * @param result  result Integer to pass out the result in case the stats are not both valid.
-   * @return true if the passed stats are NOT both valid, a shortcut can be made (caller can consume the result),
+   * @param result  result Integer to pass out the result in case the statistics are not both valid.
+   * @return true if the passed statistics are NOT both valid, a shortcut can be made (caller can consume the result),
    *         false otherwise.
    * */
-  private static boolean statsObjectCheck(Statistics stat1, Statistics stat2, String caller, Integer result){
+  private static boolean statisticsObjectCheck(Statistics statisticsObj1, Statistics statisticsObj2, String caller, Integer result){
     result = 0 ;
     // both doesn't expose the info
-    if (null == stat1 && null == stat2){
-      logger.info(String.format("%s : neither of the executors exposed stats info.",
+    if (null == statisticsObj1 && null == statisticsObj2){
+      logger.info(String.format("%s : neither of the executors exposed statistics info.",
           caller));
       return true;
     }
 
     //right side doesn't expose the info.
-    if (null == stat2 ){
-        logger.info(String.format("%s : choosing left side and the right side executor doesn't expose stat info",
+    if (null == statisticsObj2 ){
+        logger.info(String.format("%s : choosing left side and the right side executor doesn't expose statistics info",
             caller));
         result = 1;
         return true;
     }
 
     //left side doesn't expose the info.
-    if (null == stat1 ){
-      logger.info(String.format("%s : choosing right side and the left side executor doesn't expose stat info",
+    if (null == statisticsObj1 ){
+      logger.info(String.format("%s : choosing right side and the left side executor doesn't expose statistics info",
           caller));
       result = -1;
       return true;
@@ -160,7 +165,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
         Statistics stat2 = o2.getExecutorStats();
 
         Integer result = 0;
-        if (statsObjectCheck(stat1,stat2,REMAININGFLOWSIZE_COMPARATOR_NAME,result)){
+        if (statisticsObjectCheck(stat1,stat2,REMAININGFLOWSIZE_COMPARATOR_NAME,result)){
           return result;
         }
         return ((Integer)stat1.getRemainingFlowCapacity()).compareTo(stat2.getRemainingFlowCapacity());
@@ -180,7 +185,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
         Statistics stat2 = o2.getExecutorStats();
 
         Integer result = 0;
-        if (statsObjectCheck(stat1,stat2,REMAININGTMPSIZE_COMPARATOR_NAME,result)){
+        if (statisticsObjectCheck(stat1,stat2,REMAININGTMPSIZE_COMPARATOR_NAME,result)){
           return result;
         }
         return ((Long)stat1.getRemainingStorage()).compareTo(stat2.getRemainingStorage());
@@ -201,13 +206,34 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
         Statistics stat2 = o2.getExecutorStats();
 
         Integer result = 0;
-        if (statsObjectCheck(stat1,stat2,PRIORITY_COMPARATOR_NAME,result)){
+        if (statisticsObjectCheck(stat1,stat2,PRIORITY_COMPARATOR_NAME,result)){
           return result;
         }
         return ((Integer)stat1.getPriority()).compareTo(stat2.getPriority());
       }});
   }
 
+  /**
+   * function defines the cpuUsage comparator.
+   * @param weight weight of the comparator.
+   * @return
+   * */
+  private static FactorComparator<Executor> getCpuUsageComparator(int weight){
+    return FactorComparator.create(CPUUSAGE_COMPARATOR_NAME, weight, new Comparator<Executor>(){
+
+      @Override
+      public int compare(Executor o1, Executor o2) {
+        Statistics stat1 = o1.getExecutorStats();
+        Statistics stat2 = o2.getExecutorStats();
+
+        Integer result = 0;
+        if (statisticsObjectCheck(stat1,stat2,CPUUSAGE_COMPARATOR_NAME,result)){
+          return result;
+        }
+        return ((Double)stat1.getCpuUsage()).compareTo(stat2.getCpuUsage());
+      }});
+  }
+
 
   /**
    * function defines the last dispatched time comparator.
@@ -223,7 +249,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
         Statistics stat2 = o2.getExecutorStats();
 
         Integer result = 0;
-        if (statsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
+        if (statisticsObjectCheck(stat1,stat2,LSTDISPATCHED_COMPARATOR_NAME,result)){
           return result;
         }
 
@@ -240,7 +266,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
         }
 
         if (null == stat1.getLastDispatchedTime()){
-          logger.info(String.format("%s : choosing rigth side as left doesn't contain last dispatched time info.",
+          logger.info(String.format("%s : choosing right side as left doesn't contain last dispatched time info.",
               LSTDISPATCHED_COMPARATOR_NAME));
           return -1;
         }
@@ -267,7 +293,7 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
        Statistics stat2 = o2.getExecutorStats();
 
        Integer result = 0;
-       if (statsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
+       if (statisticsObjectCheck(stat1,stat2,MEMORY_COMPARATOR_NAME,result)){
          return result;
        }
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index 0d61aba..f7e2c34 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -25,6 +25,7 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.executor.Executor;
 import azkaban.executor.Statistics;
 
+
 public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFlow> {
   private static Map<String, FactorFilter<Executor, ExecutableFlow>> filterRepository = null;
 
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
index 1d9d162..2dd76c7 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorComparator.java
@@ -39,6 +39,7 @@ public final class FactorComparator<T>{
     this.factorName = factorName;
     this.weight = weight;
     this.comparator = comparator;
+    logger.info("comparator created for " + this.factorName);
   }
 
   /** static function to generate an instance of the class.
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
index 74b9d5d..b277d45 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/FactorFilter.java
@@ -36,6 +36,7 @@ public final class FactorFilter<T,V>{
   private FactorFilter(String factorName, Filter<T,V> filter){
     this.factorName = factorName;
     this.filter = filter;
+    logger.info("filter created for " + this.factorName);
   }
 
   /** static function to generate an instance of the class.
@@ -72,6 +73,4 @@ public final class FactorFilter<T,V>{
      * */
     public boolean filterTarget(T filteringTarget, V referencingObject);
   }
-
-
 }
diff --git a/azkaban-common/src/main/java/azkaban/executor/Statistics.java b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
index 85d31f7..5f02052 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Statistics.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
@@ -24,39 +24,72 @@ import java.util.Date;
     private int    remainingFlowCapacity;
     private Date   lastDispatched;
     private long   remainingStorage;
+    private double cpuUsage;
     private int    priority;
 
+    public double getCpuUsage() {
+      return this.cpuUsage;
+    }
+
+    public void setCpuUpsage(double value){
+      this.cpuUsage = value;
+    }
+
     public double getRemainingMemoryPercent() {
       return this.remainingMemoryPercent;
     }
 
+    public void setRemainingMemoryPercent(double value){
+      this.remainingMemoryPercent = value;
+    }
+
     public long getRemainingMemory(){
       return this.remainingMemory;
     }
 
+    public void setRemainingMemory(long value){
+      this.remainingMemory = value;
+    }
+
     public int getRemainingFlowCapacity(){
       return this.remainingFlowCapacity;
     }
 
+    public void setRemainingFlowCapacity(int value){
+      this.remainingFlowCapacity = value;
+    }
+
     public Date getLastDispatchedTime(){
       return this.lastDispatched;
     }
 
+    public void setLastDispatchedTime(Date value){
+      this.lastDispatched = value;
+    }
+
     public long getRemainingStorage() {
       return this.remainingStorage;
     }
 
+    public void setRemainingStorage(long value){
+      this.remainingStorage = value;
+    }
+
     public int getPriority () {
       return this.priority;
     }
 
+    public Statistics(){}
+
     public Statistics (double remainingMemoryPercent,
         long remainingMemory,
         int remainingFlowCapacity,
         Date lastDispatched,
         long remainingStorage,
+        double cpuUsage,
         int  priority ){
       this.remainingMemory = remainingMemory;
+      this.cpuUsage = cpuUsage;
       this.remainingFlowCapacity = remainingFlowCapacity;
       this.priority = priority;
       this.remainingMemoryPercent = remainingMemoryPercent;
diff --git a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
index c193397..4177b70 100644
--- a/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
+++ b/azkaban-common/src/test/java/azkaban/executor/SelectorTest.java
@@ -29,7 +29,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import azkaban.executor.selector.*;
@@ -683,9 +682,9 @@ public class SelectorTest {
     executorList.add(new Executor(2, "host2", 80, true));
     executorList.add(new Executor(3, "host3", 80, true));
 
-    executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 4095, 0));
-    executorList.get(1).setExecutorStats(new Statistics(50, 4095, 50, new Date(), 4095, 0));
-    executorList.get(2).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048, 0));
+    executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 4095,99, 0));
+    executorList.get(1).setExecutorStats(new Statistics(50, 4095, 50, new Date(), 4095,99, 0));
+    executorList.get(2).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048,99, 0));
 
     ExecutableFlow flow = new ExecutableFlow();
 
@@ -695,10 +694,10 @@ public class SelectorTest {
     ExecutorSelector selector = new ExecutorSelector(filterList,comparatorMap);
     Executor executor = selector.getBest(executorList, flow);
     Assert.assertEquals(executorList.get(0), executor);
-    
+
     // simulate that once the flow is assigned, executor1's remaining TMP storage dropped to 2048
     // now we do the getBest again executor3 is expected to be selected as it has a earlier last dispatched time.
-    executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048, 0));
+    executorList.get(0).setExecutorStats(new Statistics(99.9, 4095, 50, new Date(), 2048,99, 0));
     executor = selector.getBest(executorList, flow);
     Assert.assertEquals(executorList.get(2), executor);
   }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
index 6e012ab..1060e83 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/AzkabanExecutorServer.java
@@ -131,6 +131,7 @@ public class AzkabanExecutorServer {
     root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
     root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
     root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
+    root.addServlet(new ServletHolder(new StatisticsServlet()), "/stastics");
 
     root.setAttribute(ServerConstants.AZKABAN_SERVLET_CONTEXT_KEY, this);
 
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
index c8d7f1c..10371aa 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/FlowRunnerManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -142,6 +143,9 @@ public class FlowRunnerManager implements EventListener,
 
   private Object executionDirDeletionSync = new Object();
 
+  // date time of the the last flow submitted.
+  private Date lastFlowSubmitted = null;
+
   public FlowRunnerManager(Props props, ExecutorLoader executorLoader,
       ProjectLoader projectLoader, ClassLoader parentClassLoader)
       throws IOException {
@@ -254,6 +258,13 @@ public class FlowRunnerManager implements EventListener,
     return allProjects;
   }
 
+  public Date getLastFlowSubmittedTime(){
+    // Note: this is not thread safe and may result in providing dirty data.
+    //       we will provide this data as is for now and will revisit if there
+    //       is a string justification for change.
+    return lastFlowSubmitted;
+  }
+
   public Props getGlobalProps() {
     return globalProps;
   }
@@ -507,6 +518,8 @@ public class FlowRunnerManager implements EventListener,
       Future<?> future = executorService.submit(runner);
       // keep track of this future
       submittedFlows.put(future, runner.getExecutionId());
+      // update the last submitted time.
+      this.lastFlowSubmitted = new Date();
     } catch (RejectedExecutionException re) {
       throw new ExecutorManagerException(
           "Azkaban server can't execute any more flows. "
@@ -517,7 +530,7 @@ public class FlowRunnerManager implements EventListener,
 
   /**
    * Configure Azkaban metrics tracking for a new flowRunner instance
-   * 
+   *
    * @param flowRunner
    */
   private void configureFlowLevelMetrics(FlowRunner flowRunner) {
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
new file mode 100644
index 0000000..56a6880
--- /dev/null
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
@@ -0,0 +1,202 @@
+package azkaban.execapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.Statistics;
+import azkaban.utils.JSONUtils;
+
+public class StatisticsServlet extends HttpServlet  {
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = Logger.getLogger(JMXHttpServlet.class);
+
+  /**
+   * Handle all get request to Statistics Servlet {@inheritDoc}
+   *
+   * @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
+   *      javax.servlet.http.HttpServletResponse)
+   */
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+
+    final Statistics stats = new Statistics();
+
+    List<Thread> workerPool = new ArrayList<Thread>();
+    workerPool.add(new Thread(new Runnable(){ public void run() {
+      fillRemainingMemoryPercent(stats); }},"RemainingMemoryPercent"));
+
+    workerPool.add(new Thread(new Runnable(){ public void run() {
+      fillRemainingFlowCapacityAndLastDispatchedTime(stats); }},"RemainingFlowCapacityAndLastDispatchedTime"));
+
+    workerPool.add(new Thread(new Runnable(){ public void run() {
+      fillCpuUsage(stats); }},"CpuUsage"));
+
+    // start all the working threads.
+    for (Thread thread : workerPool){thread.start();}
+
+    // wait for all the threads to finish their work.
+    // NOTE: the result container itself is not thread safe, we are good as for now no
+    //       working thread will modify the same property, nor have any of them
+    //       need to compute values based on value(s) of other properties.
+    for (Thread thread : workerPool){
+      try {
+        thread.join();
+      } catch (InterruptedException e) {
+        logger.error(String.format("failed to collect information for %s as the working thread is interrupted.",
+            thread.getName()));
+      }}
+
+    JSONUtils.toJSON(stats, resp.getOutputStream(), true);
+  }
+
+  /**
+   * fill the result set with the percent of the remaining system memory on the server.
+   * @param stats reference to the result container which contains all the results, this specific method
+   *              will only work work on the property "remainingMemory" and "remainingMemoryPercent".
+   *
+   * NOTE:
+   * a double value will be used to present the remaining memory,
+   *         a returning value of '55.6' means 55.6%
+   */
+  private void fillRemainingMemoryPercent(Statistics stats){
+    if (new File("/bin/bash").exists() &&  new File("/usr/bin/free").exists()) {
+      java.lang.ProcessBuilder processBuilder =
+          new java.lang.ProcessBuilder("/bin/bash", "-c", "/usr/bin/free -m | grep Mem:");
+      try {
+        String line = null;
+        Process process = processBuilder.start();
+        process.waitFor();
+        InputStream inputStream = process.getInputStream();
+        try {
+          java.io.BufferedReader reader =
+              new java.io.BufferedReader(new InputStreamReader(inputStream));
+          // we expect the call returns and only returns one line.
+          line = reader.readLine();
+        }finally {
+          inputStream.close();
+        }
+
+        logger.info("result from bash call - " + null == line ? "(null)" : line);
+        // process the output from bash call.
+        if (null != line && line.length() > 0) {
+          String[] splitedresult = line.split("\\s+");
+          if (splitedresult.length == 7){
+            // expected return format -
+            // "Mem:" | total | used | free | shared | buffers | cached
+            Long totalMemory = Long.parseLong(splitedresult[1]);
+            Long  freeMemory = Long.parseLong(splitedresult[3]);
+            stats.setRemainingMemory(freeMemory);
+            stats.setRemainingMemoryPercent(totalMemory == 0? 0 :
+              ((double)freeMemory/(double)totalMemory));
+          }
+        }
+      }
+      catch (Exception ex){
+        logger.error("failed fetch system memory info as exception is captured when fetching result from bash call.");
+      }
+    } else {
+        logger.error("failed fetch system memory info as 'bash' or 'free' can't be found on the current system.");
+    }
+  }
+
+  /**
+   * fill the result set with the remaining flow capacity .
+   * @param stats reference to the result container which contains all the results, this specific method
+   *              will only work on the property "remainingFlowCapacity".
+   */
+  private void fillRemainingFlowCapacityAndLastDispatchedTime(Statistics stats){
+    FlowRunnerManager runnerMgr =  AzkabanExecutorServer.getApp().getFlowRunnerManager();
+    stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() -
+                                   runnerMgr.getNumRunningFlows() -
+                                   runnerMgr.getNumQueuedFlows());
+    stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
+  }
+
+
+  /**
+   * fill the result set with the Remaining temp Storage .
+   * @param stats reference to the result container which contains all the results, this specific method
+   *              will only work on the property "cpuUdage".
+   */
+  private void fillCpuUsage(Statistics stats){
+    if (new File("/bin/bash").exists() &&  new File("/usr/bin/top").exists()) {
+      java.lang.ProcessBuilder processBuilder =
+          new java.lang.ProcessBuilder("/bin/bash", "-c", "/usr/bin/top -bn4 | grep \"Cpu(s)\"");
+      try {
+        ArrayList<String> output = new ArrayList<String>();
+        Process process = processBuilder.start();
+        process.waitFor();
+        InputStream inputStream = process.getInputStream();
+        try {
+          java.io.BufferedReader reader =
+              new java.io.BufferedReader(new InputStreamReader(inputStream));
+          String line = null;
+          while ((line = reader.readLine()) != null) {
+            output.add(line);
+          }
+        }finally {
+          inputStream.close();
+        }
+
+        logger.info("lines of the result from bash call - " + output.size());
+        // process the output from bash call.
+        if (output.size() > 0) {
+          double us = 0 ; // user
+          double sy = 0 ; // system
+          double wi = 0 ; // waiting.
+          int   sampleCount = 0;
+
+          // process all the output, we will do 5 samples for the cpu and calculate the avarage.
+          for(String line : output){
+            String[] splitedresult = line.split("\\s+");
+            if (splitedresult.length == 9){
+              // expected return format -
+              // Cpu(s):  1.4%us,  0.1%sy,  0.0%ni, 98.5%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
+              double tmp_us = 0 ; // user
+              double tmp_sy = 0 ; // system
+              double tmp_wi = 0 ; // waiting.
+              try {
+              tmp_us = Double.parseDouble(splitedresult[1].split("%")[0]);
+              tmp_sy = Double.parseDouble(splitedresult[2].split("%")[0]);
+              tmp_wi = Double.parseDouble(splitedresult[5].split("%")[0]);
+              } catch(NumberFormatException e){
+                logger.error("skipping the line from the output cause it is in unexpected format -" + line);
+                continue;
+              }
+
+              // add up the result.
+              ++sampleCount;
+              us += tmp_us;
+              sy += tmp_sy;
+              wi += tmp_wi;
+            }
+          }
+
+          // set the value.
+          if (sampleCount > 0){
+            double finalResult = (us + sy + wi)/sampleCount;
+            logger.info("Cpu usage result  - " + finalResult );
+            stats.setCpuUpsage(finalResult);
+          }
+        }
+      }
+      catch (Exception ex){
+        logger.error("failed fetch system memory info as exception is captured when fetching result from bash call.");
+      }
+    } else {
+        logger.error("failed fetch system memory info as 'bash' or 'free' can't be found on the current system.");
+    }
+  }
+
+  // TO-DO - decide if we need to populate the remaining space and priority info.
+}