azkaban-developers

Details

diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
index 2820c08..03de432 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
@@ -35,7 +35,7 @@ import org.codehaus.jackson.map.ObjectMapper;
   public class ExecutorInfo implements java.io.Serializable{
     private static final long serialVersionUID = 3009746603773371263L;
     private double remainingMemoryPercent;
-    private long   remainingMemory;
+    private long   remainingMemoryInMB;
     private int    remainingFlowCapacity;
     private int    numberOfAssignedFlows;
     private long   lastDispatchedTime;
@@ -57,12 +57,12 @@ import org.codehaus.jackson.map.ObjectMapper;
       this.remainingMemoryPercent = value;
     }
 
-    public long getRemainingMemory(){
-      return this.remainingMemory;
+    public long getRemainingMemoryInMB(){
+      return this.remainingMemoryInMB;
     }
 
-    public void setRemainingMemory(long value){
-      this.remainingMemory = value;
+    public void setRemainingMemoryInMB(long value){
+      this.remainingMemoryInMB = value;
     }
 
     public int getRemainingFlowCapacity(){
@@ -97,7 +97,7 @@ import org.codehaus.jackson.map.ObjectMapper;
         long lastDispatched,
         double cpuUsage,
         int numberOfAssignedFlows){
-      this.remainingMemory = remainingMemory;
+      this.remainingMemoryInMB = remainingMemory;
       this.cpuUsage = cpuUsage;
       this.remainingFlowCapacity = remainingFlowCapacity;
       this.remainingMemoryPercent = remainingMemoryPercent;
@@ -113,7 +113,7 @@ import org.codehaus.jackson.map.ObjectMapper;
           boolean result = true;
           ExecutorInfo stat = (ExecutorInfo) obj;
 
-          result &=this.remainingMemory == stat.remainingMemory;
+          result &=this.remainingMemoryInMB == stat.remainingMemoryInMB;
           result &=this.cpuUsage == stat.cpuUsage;
           result &=this.remainingFlowCapacity == stat.remainingFlowCapacity;
           result &=this.remainingMemoryPercent == stat.remainingMemoryPercent;
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index c6d096e..eafab67 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -236,8 +236,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
          return result;
        }
 
-       if (stat1.getRemainingMemory() != stat2.getRemainingMemory()){
-         return stat1.getRemainingMemory() > stat2.getRemainingMemory() ? 1:-1;
+       if (stat1.getRemainingMemoryInMB() != stat2.getRemainingMemoryInMB()){
+         return stat1.getRemainingMemoryInMB() > stat2.getRemainingMemoryInMB() ? 1:-1;
        }
 
        return Double.compare(stat1.getRemainingMemoryPercent(), stat2.getRemainingMemoryPercent());
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index 93b4a91..031650e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -135,7 +135,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
               filteringTarget.toString()));
           return false;
         }
-        return stats.getRemainingMemory() > MINIMUM_FREE_MEMORY ;
+        return stats.getRemainingMemoryInMB() > MINIMUM_FREE_MEMORY ;
        }
     });
   }
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
index a0b0b8d..5566837 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
@@ -71,74 +71,72 @@ public class ServerStatisticsServlet extends HttpServlet  {
    *         a returning value of '55.6' means 55.6%
    */
   protected void fillRemainingMemoryPercent(ExecutorInfo stats){
-    if (new File("/bin/bash").exists() &&  new File("/usr/bin/free").exists()) {
-      java.lang.ProcessBuilder processBuilder =
-          new java.lang.ProcessBuilder("/bin/bash", "-c", String.format("/usr/bin/free -m -s 0.1 -c %s | grep Mem:",
-              samplesToTakeForMemory));
+    if (new File("/bin/bash").exists()
+        && new File("/bin/cat").exists()
+        && new File("/bin/grep").exists()
+        &&  new File("/proc/meminfo").exists()) {
+    java.lang.ProcessBuilder processBuilder =
+        new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/meminfo | grep -E \"MemTotal|MemFree\"");
+    try {
+      ArrayList<String> output = new ArrayList<String>();
+      Process process = processBuilder.start();
+      process.waitFor();
+      InputStream inputStream = process.getInputStream();
       try {
-        List<String> output = new ArrayList<String>();
-        Process process = processBuilder.start();
-        process.waitFor();
-        InputStream inputStream = process.getInputStream();
-        try {
-          java.io.BufferedReader reader =
-              new java.io.BufferedReader(new InputStreamReader(inputStream));
-          String line = null;
-          while ((line = reader.readLine()) != null) {
-            output.add(line);
-          }
-        }finally {
-          inputStream.close();
+        java.io.BufferedReader reader =
+            new java.io.BufferedReader(new InputStreamReader(inputStream));
+        String line = null;
+        while ((line = reader.readLine()) != null) {
+          output.add(line);
         }
-        // process the output from bash call.
-        if (output.size() > 0) {
-          long totalMemory = 0 ;
-          long freeMemory  = 0 ;
-          int  sampleCount = 0 ;
-          for(String line : output){
-            String[] splitedresult = line.split("\\s+");
-            // expected return format -
-            // "Mem:" | total | used | free | shared | buffers | cached
-            if (splitedresult.length == 7){
-              // create a temp copy of all the readings, if anything goes wrong, we drop the
-              // temp reading and move on.
-              long tmp_totalMemory = 0 ;
-              long tmp_freeMemory  = 0 ;
-              try {
-                tmp_totalMemory = Long.parseLong(splitedresult[1]);
-                tmp_freeMemory = Long.parseLong(splitedresult[3]);
-
-              } catch(NumberFormatException e){
-                logger.error("skipping the unprocessable line from the output -" + line);
-                continue;
-              }
+      }finally {
+        inputStream.close();
+      }
 
-              // add up the result.
-              ++sampleCount;
-              totalMemory += tmp_totalMemory ;
-              freeMemory  += tmp_freeMemory;
+      long totalMemory = 0;
+      long  freeMemory = 0;
+
+      // process the output from bash call.
+      // we expect the result from the bash call to be something like following -
+      // MemTotal:       65894264 kB
+      // MemFree:        61104076 kB
+      if (output.size() == 2) {
+        for (String result : output){
+          // find the total memory and value the variable.
+          if (result.contains("MemTotal") && result.split("\\s+").length > 2){
+            try {
+              totalMemory = Long.parseLong(result.split("\\s+")[1]);
+              logger.info("Total memory : " + totalMemory);
+            }catch(NumberFormatException e){
+              logger.error("yielding 0 for total memory as output is invalid -" + result);
             }
           }
-
-          // set the value.
-          if (sampleCount > 0){
-            freeMemory  = freeMemory  / sampleCount;
-            totalMemory = totalMemory / sampleCount;
-            logger.info(String.format("total memory - %s , free memory - %s", totalMemory, freeMemory));
-            stats.setRemainingMemory(freeMemory);
-            stats.setRemainingMemoryPercent(totalMemory == 0? 0 :
-              ((double)freeMemory/(double)totalMemory) * 100);
+          // find the free memory and value the variable.
+          if (result.contains("MemFree") && result.split("\\s+").length > 2){
+            try {
+              freeMemory = Long.parseLong(result.split("\\s+")[1]);
+              logger.info("Free memory : " + freeMemory);
+            }catch(NumberFormatException e){
+              logger.error("yielding 0 for total memory as output is invalid -" + result);
+            }
           }
         }
+      }else {
+        logger.error("failed to get total/free memory info as the bash call returned invalid result.");
       }
-      catch (Exception ex){
-        logger.error("failed fetch system memory info " +
-                     "as exception is captured when fetching result from bash call. ex -" + ex.getMessage());
-      }
-    } else {
-        logger.error("failed fetch system memory info " +
-                     "as 'bash' or 'free' command can't be found on the current system.");
+
+      // the number got from the proc file is in KBs we want to see the number in MBs so we are deviding it by 1024.
+      stats.setRemainingMemoryInMB(freeMemory/1024);
+      stats.setRemainingMemoryPercent(totalMemory == 0 ? 0 : ((double)freeMemory / (double)totalMemory)*100);
+    }
+    catch (Exception ex){
+      logger.error("failed fetch system memory info " +
+                   "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
     }
+  } else {
+      logger.error("failed fetch system memory info, one or more files from the following list are missing -  " +
+                   "'/bin/bash'," + "'/bin/cat'," +"'/proc/loadavg'");
+  }
   }
 
   /**
@@ -237,10 +235,11 @@ public class ServerStatisticsServlet extends HttpServlet  {
           double cpuUsage = 0.0;
 
           try {
-            cpuUsage = Double.parseDouble(splitedresult[1].split("%")[0]);
+            cpuUsage = Double.parseDouble(splitedresult[0]);
           }catch(NumberFormatException e){
             logger.error("yielding 0.0 for CPU usage as output is invalid -" + output.get(0));
           }
+          logger.info("System load : " + cpuUsage);
           stats.setCpuUpsage(cpuUsage);
         }
       }
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
index b754158..55078b5 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -36,7 +36,7 @@ public class StatisticsServletTest {
     this.statServlet.callFillRemainingMemoryPercent(stats);
     // assume any machine that runs this test should
     // have bash and top available and at least got some remaining memory.
-    Assert.assertTrue(stats.getRemainingMemory() > 0);
+    Assert.assertTrue(stats.getRemainingMemoryInMB() > 0);
     Assert.assertTrue(stats.getRemainingMemoryPercent() > 0);
   }
 
@@ -51,7 +51,7 @@ public class StatisticsServletTest {
   public void testPopulateStatistics()  {
     this.statServlet.callPopulateStatistics();
     Assert.assertNotNull(this.statServlet.getStastics());
-    Assert.assertTrue(this.statServlet.getStastics().getRemainingMemory() > 0);
+    Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryInMB() > 0);
     Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryPercent() > 0);
     Assert.assertTrue(this.statServlet.getStastics().getCpuUsage() > 0);
   }