azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/Statistics.java b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
index 5f02052..7c80f88 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Statistics.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
@@ -17,12 +17,13 @@
 package azkaban.executor;
 
 import java.util.Date;
+import java.util.Map;
 
   public class Statistics {
     private double remainingMemoryPercent;
     private long   remainingMemory;
     private int    remainingFlowCapacity;
-    private Date   lastDispatched;
+    private Date   lastDispatchedTime;
     private long   remainingStorage;
     private double cpuUsage;
     private int    priority;
@@ -60,11 +61,11 @@ import java.util.Date;
     }
 
     public Date getLastDispatchedTime(){
-      return this.lastDispatched;
+      return this.lastDispatchedTime;
     }
 
     public void setLastDispatchedTime(Date value){
-      this.lastDispatched = value;
+      this.lastDispatchedTime = value;
     }
 
     public long getRemainingStorage() {
@@ -79,6 +80,10 @@ import java.util.Date;
       return this.priority;
     }
 
+    public void setPriority (int value) {
+      this.priority = value;
+    }
+
     public Statistics(){}
 
     public Statistics (double remainingMemoryPercent,
@@ -94,6 +99,71 @@ import java.util.Date;
       this.priority = priority;
       this.remainingMemoryPercent = remainingMemoryPercent;
       this.remainingStorage = remainingStorage;
-      this.lastDispatched = lastDispatched;
+      this.lastDispatchedTime = lastDispatched;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (obj instanceof Statistics)
+        {
+          boolean result = true;
+          Statistics stat = (Statistics) obj;
+
+          result &=this.remainingMemory == stat.remainingMemory;
+          result &=this.cpuUsage == stat.cpuUsage;
+          result &=this.remainingFlowCapacity == stat.remainingFlowCapacity;
+          result &=this.priority == stat.priority;
+          result &=this.remainingMemoryPercent == stat.remainingMemoryPercent;
+          result &=this.remainingStorage == stat.remainingStorage;
+          result &= null == this.lastDispatchedTime ? stat.lastDispatchedTime == null :
+                            this.lastDispatchedTime.equals(stat.lastDispatchedTime);
+          return result;
+        }
+        return false;
+    }
+
+    
+    // really ugly to have it home-made here for object-binding as base on the
+    // current code base there is no any better ways to do that.
+    public static Statistics fromJsonObject(Map<String,Object> mapObj){
+      if (null == mapObj) return null ;
+      Statistics stats = new Statistics ();
+
+      final String remainingMemory = "remainingMemory";
+      if (mapObj.containsKey(remainingMemory) && null != mapObj.get(remainingMemory)){
+        stats.setRemainingMemory(Long.parseLong(mapObj.get(remainingMemory).toString()));
+      }
+
+      final String cpuUsage = "cpuUsage";
+      if (mapObj.containsKey(cpuUsage) && null != mapObj.get(cpuUsage)){
+        stats.setCpuUpsage(Double.parseDouble(mapObj.get(cpuUsage).toString()));
+      }
+
+      final String remainingFlowCapacity = "remainingFlowCapacity";
+      if (mapObj.containsKey(remainingFlowCapacity) && null != mapObj.get(remainingFlowCapacity)){
+        stats.setRemainingFlowCapacity(Integer.parseInt(mapObj.get(remainingFlowCapacity).toString()));
+      }
+
+      final String priority = "priority";
+      if (mapObj.containsKey(priority) && null != mapObj.get(priority)){
+        stats.setPriority(Integer.parseInt(mapObj.get(priority).toString()));
+      }
+
+      final String remainingMemoryPercent = "remainingMemoryPercent";
+      if (mapObj.containsKey(remainingMemoryPercent) && null != mapObj.get(remainingMemoryPercent)){
+        stats.setRemainingMemoryPercent(Double.parseDouble(mapObj.get(remainingMemoryPercent).toString()));
+      }
+
+      final String remainingStorage = "remainingStorage";
+      if (mapObj.containsKey(remainingStorage) && null != mapObj.get(remainingStorage)){
+        stats.setRemainingStorage(Long.parseLong(mapObj.get(remainingStorage).toString()));
+      }
+
+      final String lastDispatched = "lastDispatchedTime";
+      if (mapObj.containsKey(lastDispatched) && null != mapObj.get(lastDispatched)){
+        stats.setLastDispatchedTime(new Date(Long.parseLong(mapObj.get(lastDispatched).toString())));
+      }
+      return stats;
     }
 }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
index 56a6880..3e260b9 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
 package azkaban.execapp;
 
 import java.io.File;
@@ -5,7 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
+
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
@@ -18,7 +36,11 @@ import azkaban.utils.JSONUtils;
 
 public class StatisticsServlet extends HttpServlet  {
   private static final long serialVersionUID = 1L;
-  private static final Logger logger = Logger.getLogger(JMXHttpServlet.class);
+  private static final int  cacheTimeInMilliseconds = 1000;
+  private static final Logger logger = Logger.getLogger(StatisticsServlet.class);
+
+  protected static Date lastRefreshedTime = null;
+  protected static Statistics cachedstats = null;
 
   /**
    * Handle all get request to Statistics Servlet {@inheritDoc}
@@ -29,34 +51,12 @@ public class StatisticsServlet extends HttpServlet  {
   protected void doGet(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException {
 
-    final Statistics stats = new Statistics();
-
-    List<Thread> workerPool = new ArrayList<Thread>();
-    workerPool.add(new Thread(new Runnable(){ public void run() {
-      fillRemainingMemoryPercent(stats); }},"RemainingMemoryPercent"));
-
-    workerPool.add(new Thread(new Runnable(){ public void run() {
-      fillRemainingFlowCapacityAndLastDispatchedTime(stats); }},"RemainingFlowCapacityAndLastDispatchedTime"));
-
-    workerPool.add(new Thread(new Runnable(){ public void run() {
-      fillCpuUsage(stats); }},"CpuUsage"));
-
-    // start all the working threads.
-    for (Thread thread : workerPool){thread.start();}
-
-    // wait for all the threads to finish their work.
-    // NOTE: the result container itself is not thread safe, we are good as for now no
-    //       working thread will modify the same property, nor have any of them
-    //       need to compute values based on value(s) of other properties.
-    for (Thread thread : workerPool){
-      try {
-        thread.join();
-      } catch (InterruptedException e) {
-        logger.error(String.format("failed to collect information for %s as the working thread is interrupted.",
-            thread.getName()));
-      }}
+    if (null == lastRefreshedTime ||
+        new Date().getTime() - lastRefreshedTime.getTime() > cacheTimeInMilliseconds){
+      this.populateStatistics();
+    }
 
-    JSONUtils.toJSON(stats, resp.getOutputStream(), true);
+    JSONUtils.toJSON(cachedstats, resp.getOutputStream(), true);
   }
 
   /**
@@ -68,33 +68,62 @@ public class StatisticsServlet extends HttpServlet  {
    * a double value will be used to present the remaining memory,
    *         a returning value of '55.6' means 55.6%
    */
-  private void fillRemainingMemoryPercent(Statistics stats){
+  protected void fillRemainingMemoryPercent(Statistics stats){
     if (new File("/bin/bash").exists() &&  new File("/usr/bin/free").exists()) {
       java.lang.ProcessBuilder processBuilder =
-          new java.lang.ProcessBuilder("/bin/bash", "-c", "/usr/bin/free -m | grep Mem:");
+          new java.lang.ProcessBuilder("/bin/bash", "-c", "/usr/bin/free -m -s 0.1 -c 5 | grep Mem:");
       try {
-        String line = null;
+        ArrayList<String> output = new ArrayList<String>();
         Process process = processBuilder.start();
         process.waitFor();
         InputStream inputStream = process.getInputStream();
         try {
           java.io.BufferedReader reader =
               new java.io.BufferedReader(new InputStreamReader(inputStream));
-          // we expect the call returns and only returns one line.
-          line = reader.readLine();
+          String line = null;
+          while ((line = reader.readLine()) != null) {
+            output.add(line);
+          }
         }finally {
           inputStream.close();
         }
-
-        logger.info("result from bash call - " + null == line ? "(null)" : line);
         // process the output from bash call.
-        if (null != line && line.length() > 0) {
-          String[] splitedresult = line.split("\\s+");
-          if (splitedresult.length == 7){
+        if (output.size() > 0) {
+          long totalMemory = 0 ;
+          long freeMemory  = 0 ;
+          int  sampleCount = 0 ;
+
+          // process all the output, we will do 5 samples.
+          for(String line : output){
+            String[] splitedresult = line.split("\\s+");
             // expected return format -
             // "Mem:" | total | used | free | shared | buffers | cached
-            Long totalMemory = Long.parseLong(splitedresult[1]);
-            Long  freeMemory = Long.parseLong(splitedresult[3]);
+            if (splitedresult.length == 7){
+              // create a temp copy of all the readings, if anything goes wrong, we drop the
+              // temp reading and move on.
+              long tmp_totalMemory = 0 ;
+              long tmp_freeMemory  = 0 ;
+              try {
+                tmp_totalMemory = Long.parseLong(splitedresult[1]);
+                tmp_freeMemory = Long.parseLong(splitedresult[3]);
+
+              } catch(NumberFormatException e){
+                logger.error("skipping the unprocessable line from the output -" + line);
+                continue;
+              }
+
+              // add up the result.
+              ++sampleCount;
+              totalMemory += tmp_totalMemory ;
+              freeMemory  += tmp_freeMemory;
+            }
+          }
+
+          // set the value.
+          if (sampleCount > 0){
+            freeMemory  = freeMemory  / sampleCount;
+            totalMemory = totalMemory / sampleCount;
+            logger.info(String.format("total memory - %s , free memory - %s", totalMemory, freeMemory));
             stats.setRemainingMemory(freeMemory);
             stats.setRemainingMemoryPercent(totalMemory == 0? 0 :
               ((double)freeMemory/(double)totalMemory));
@@ -102,10 +131,53 @@ public class StatisticsServlet extends HttpServlet  {
         }
       }
       catch (Exception ex){
-        logger.error("failed fetch system memory info as exception is captured when fetching result from bash call.");
+        logger.error("failed fetch system memory info " +
+                     "as exception is captured when fetching result from bash call.");
       }
     } else {
-        logger.error("failed fetch system memory info as 'bash' or 'free' can't be found on the current system.");
+        logger.error("failed fetch system memory info " +
+                     "as 'bash' or 'free' command can't be found on the current system.");
+    }
+  }
+
+  /**
+   * call the data providers to fill the returning data container for statistics data.
+   * This function refreshes the static cached copy of data in case if necessary.
+   * */
+  protected synchronized void populateStatistics(){
+    //check again before starting the work.
+    if (null == lastRefreshedTime ||
+        new Date().getTime() - lastRefreshedTime.getTime() > cacheTimeInMilliseconds){
+      final Statistics stats = new Statistics();
+
+      List<Thread> workerPool = new ArrayList<Thread>();
+      workerPool.add(new Thread(new Runnable(){ public void run() {
+        fillRemainingMemoryPercent(stats); }},"RemainingMemoryPercent"));
+
+      workerPool.add(new Thread(new Runnable(){ public void run() {
+        fillRemainingFlowCapacityAndLastDispatchedTime(stats); }},"RemainingFlowCapacityAndLastDispatchedTime"));
+
+      workerPool.add(new Thread(new Runnable(){ public void run() {
+        fillCpuUsage(stats); }},"CpuUsage"));
+
+      // start all the working threads.
+      for (Thread thread : workerPool){thread.start();}
+
+      // wait for all the threads to finish their work.
+      // NOTE: the result container itself is not thread safe, we are good as for now no
+      //       working thread will modify the same property, nor have any of them
+      //       need to compute values based on value(s) of other properties.
+      for (Thread thread : workerPool){
+        try {
+          // we gave maxim 5 seconds to let the thread finish work.
+          thread.join(5000);;
+        } catch (InterruptedException e) {
+          logger.error(String.format("failed to collect information for %s as the working thread is interrupted.",
+              thread.getName()));
+        }}
+
+      cachedstats = stats;
+      lastRefreshedTime =  new Date();
     }
   }
 
@@ -114,12 +186,19 @@ public class StatisticsServlet extends HttpServlet  {
    * @param stats reference to the result container which contains all the results, this specific method
    *              will only work on the property "remainingFlowCapacity".
    */
-  private void fillRemainingFlowCapacityAndLastDispatchedTime(Statistics stats){
-    FlowRunnerManager runnerMgr =  AzkabanExecutorServer.getApp().getFlowRunnerManager();
-    stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() -
-                                   runnerMgr.getNumRunningFlows() -
-                                   runnerMgr.getNumQueuedFlows());
-    stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
+  protected void fillRemainingFlowCapacityAndLastDispatchedTime(Statistics stats){
+
+    AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
+    if (server != null){
+      FlowRunnerManager runnerMgr =  AzkabanExecutorServer.getApp().getFlowRunnerManager();
+      stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() -
+                                     runnerMgr.getNumRunningFlows() -
+                                     runnerMgr.getNumQueuedFlows());
+      stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
+    }else {
+      logger.error("failed to get data for remaining flow capacity or LastDispatchedTime" +
+                   " as the AzkabanExecutorServer has yet been initialized.");
+    }
   }
 
 
@@ -128,10 +207,10 @@ public class StatisticsServlet extends HttpServlet  {
    * @param stats reference to the result container which contains all the results, this specific method
    *              will only work on the property "cpuUdage".
    */
-  private void fillCpuUsage(Statistics stats){
+  protected void fillCpuUsage(Statistics stats){
     if (new File("/bin/bash").exists() &&  new File("/usr/bin/top").exists()) {
       java.lang.ProcessBuilder processBuilder =
-          new java.lang.ProcessBuilder("/bin/bash", "-c", "/usr/bin/top -bn4 | grep \"Cpu(s)\"");
+          new java.lang.ProcessBuilder("/bin/bash", "-c", "/usr/bin/top -bn5 -d 0.1 | grep \"Cpu(s)\"");
       try {
         ArrayList<String> output = new ArrayList<String>();
         Process process = processBuilder.start();
@@ -148,7 +227,6 @@ public class StatisticsServlet extends HttpServlet  {
           inputStream.close();
         }
 
-        logger.info("lines of the result from bash call - " + output.size());
         // process the output from bash call.
         if (output.size() > 0) {
           double us = 0 ; // user
@@ -159,9 +237,11 @@ public class StatisticsServlet extends HttpServlet  {
           // process all the output, we will do 5 samples for the cpu and calculate the avarage.
           for(String line : output){
             String[] splitedresult = line.split("\\s+");
+            // expected returning format -
+            // Cpu(s):  1.4%us,  0.1%sy,  0.0%ni, 98.5%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
             if (splitedresult.length == 9){
-              // expected return format -
-              // Cpu(s):  1.4%us,  0.1%sy,  0.0%ni, 98.5%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
+              // create a temp copy of all the readings, if anything goes wrong, we drop the
+              // temp reading and move on.
               double tmp_us = 0 ; // user
               double tmp_sy = 0 ; // system
               double tmp_wi = 0 ; // waiting.
@@ -170,7 +250,7 @@ public class StatisticsServlet extends HttpServlet  {
               tmp_sy = Double.parseDouble(splitedresult[2].split("%")[0]);
               tmp_wi = Double.parseDouble(splitedresult[5].split("%")[0]);
               } catch(NumberFormatException e){
-                logger.error("skipping the line from the output cause it is in unexpected format -" + line);
+                logger.error("skipping the unprocessable line from the output -" + line);
                 continue;
               }
 
@@ -191,12 +271,14 @@ public class StatisticsServlet extends HttpServlet  {
         }
       }
       catch (Exception ex){
-        logger.error("failed fetch system memory info as exception is captured when fetching result from bash call.");
+        logger.error("failed fetch system memory info " +
+                     "as exception is captured when fetching result from bash call.");
       }
     } else {
-        logger.error("failed fetch system memory info as 'bash' or 'free' can't be found on the current system.");
+        logger.error("failed fetch system memory info " +
+                     "as 'bash' or 'top' command can't be found on the current system.");
     }
   }
 
-  // TO-DO - decide if we need to populate the remaining space and priority info.
+  // TO-DO - decide if we need to populate the remaining Storage space and priority info.
 }
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
new file mode 100644
index 0000000..85d38e6
--- /dev/null
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -0,0 +1,91 @@
+package azkaban.execapp;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.executor.Statistics;
+import azkaban.utils.JSONUtils;
+
+public class StatisticsServletTest {
+  private class MockStatisticsServlet extends StatisticsServlet{
+    /** */
+    private static final long serialVersionUID = 1L;
+
+    public  Statistics getStastics(){
+      return cachedstats;
+    }
+
+    public  Date getUpdatedTime(){
+      return lastRefreshedTime;
+    }
+
+    public void callPopulateStatistics(){
+       this.populateStatistics();
+    }
+
+    public void callFillCpuUsage(Statistics stats){
+      this.fillCpuUsage(stats);}
+
+    public void callFillRemainingMemoryPercent(Statistics stats){
+        this.fillRemainingMemoryPercent(stats);}
+  }
+  private MockStatisticsServlet statServlet = new MockStatisticsServlet();
+
+  @Test
+  public void testFillMemory()  {
+    Statistics stats = new Statistics();
+    this.statServlet.callFillRemainingMemoryPercent(stats);
+    // assume any machine that runs this test should
+    // have bash and top available and at least got some remaining memory.
+    Assert.assertTrue(stats.getRemainingMemory() > 0);
+    Assert.assertTrue(stats.getRemainingMemoryPercent() > 0);
+  }
+
+  @Test
+  public void testFillCpu()  {
+    Statistics stats = new Statistics();
+    this.statServlet.callFillCpuUsage(stats);
+    Assert.assertTrue(stats.getCpuUsage() > 0);
+  }
+
+  @Test
+  public void testPopulateStatistics()  {
+    this.statServlet.callPopulateStatistics();
+    Assert.assertNotNull(this.statServlet.getStastics());
+    Assert.assertTrue(this.statServlet.getStastics().getRemainingMemory() > 0);
+    Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryPercent() > 0);
+    Assert.assertTrue(this.statServlet.getStastics().getCpuUsage() > 0);
+  }
+
+  @Test
+  public void testPopulateStatisticsCache()  {
+    this.statServlet.callPopulateStatistics();
+    final Date updatedTime = this.statServlet.getUpdatedTime();
+    while (new Date().getTime() - updatedTime.getTime() < 1000){
+      this.statServlet.callPopulateStatistics();
+      Assert.assertEquals(updatedTime, this.statServlet.getUpdatedTime());
+    }
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {}
+
+    // make sure cache expires after timeout.
+    this.statServlet.callPopulateStatistics();
+    Assert.assertNotEquals(updatedTime, this.statServlet.getUpdatedTime());
+  }
+
+  @Test
+  public void testStatisticsJsonParser() throws IOException  {
+    Statistics stat = new Statistics(0.1,1,2,new Date(),3,4,5);
+    String jSonStr = JSONUtils.toJSON(stat);
+    @SuppressWarnings("unchecked")
+    Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
+    Statistics stat2 = Statistics.fromJsonObject(jSonObj);
+    Assert.assertTrue(stat.equals(stat2));
+    }
+}