Details
diff --git a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
index 2820c08..03de432 100644
--- a/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
+++ b/azkaban-common/src/main/java/azkaban/executor/ExecutorInfo.java
@@ -35,7 +35,7 @@ import org.codehaus.jackson.map.ObjectMapper;
public class ExecutorInfo implements java.io.Serializable{
private static final long serialVersionUID = 3009746603773371263L;
private double remainingMemoryPercent;
- private long remainingMemory;
+ private long remainingMemoryInMB;
private int remainingFlowCapacity;
private int numberOfAssignedFlows;
private long lastDispatchedTime;
@@ -57,12 +57,12 @@ import org.codehaus.jackson.map.ObjectMapper;
this.remainingMemoryPercent = value;
}
- public long getRemainingMemory(){
- return this.remainingMemory;
+ public long getRemainingMemoryInMB(){
+ return this.remainingMemoryInMB;
}
- public void setRemainingMemory(long value){
- this.remainingMemory = value;
+ public void setRemainingMemoryInMB(long value){
+ this.remainingMemoryInMB = value;
}
public int getRemainingFlowCapacity(){
@@ -97,7 +97,7 @@ import org.codehaus.jackson.map.ObjectMapper;
long lastDispatched,
double cpuUsage,
int numberOfAssignedFlows){
- this.remainingMemory = remainingMemory;
+ this.remainingMemoryInMB = remainingMemory;
this.cpuUsage = cpuUsage;
this.remainingFlowCapacity = remainingFlowCapacity;
this.remainingMemoryPercent = remainingMemoryPercent;
@@ -113,7 +113,7 @@ import org.codehaus.jackson.map.ObjectMapper;
boolean result = true;
ExecutorInfo stat = (ExecutorInfo) obj;
- result &=this.remainingMemory == stat.remainingMemory;
+ result &=this.remainingMemoryInMB == stat.remainingMemoryInMB;
result &=this.cpuUsage == stat.cpuUsage;
result &=this.remainingFlowCapacity == stat.remainingFlowCapacity;
result &=this.remainingMemoryPercent == stat.remainingMemoryPercent;
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
index c6d096e..eafab67 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorComparator.java
@@ -236,8 +236,8 @@ public class ExecutorComparator extends CandidateComparator<Executor> {
return result;
}
- if (stat1.getRemainingMemory() != stat2.getRemainingMemory()){
- return stat1.getRemainingMemory() > stat2.getRemainingMemory() ? 1:-1;
+ if (stat1.getRemainingMemoryInMB() != stat2.getRemainingMemoryInMB()){
+ return stat1.getRemainingMemoryInMB() > stat2.getRemainingMemoryInMB() ? 1:-1;
}
return Double.compare(stat1.getRemainingMemoryPercent(), stat2.getRemainingMemoryPercent());
diff --git a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
index 93b4a91..031650e 100644
--- a/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
+++ b/azkaban-common/src/main/java/azkaban/executor/selector/ExecutorFilter.java
@@ -135,7 +135,7 @@ public final class ExecutorFilter extends CandidateFilter<Executor, ExecutableFl
filteringTarget.toString()));
return false;
}
- return stats.getRemainingMemory() > MINIMUM_FREE_MEMORY ;
+ return stats.getRemainingMemoryInMB() > MINIMUM_FREE_MEMORY ;
}
});
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
index a0b0b8d..5566837 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/ServerStatisticsServlet.java
@@ -71,74 +71,72 @@ public class ServerStatisticsServlet extends HttpServlet {
* a returning value of '55.6' means 55.6%
*/
protected void fillRemainingMemoryPercent(ExecutorInfo stats){
- if (new File("/bin/bash").exists() && new File("/usr/bin/free").exists()) {
- java.lang.ProcessBuilder processBuilder =
- new java.lang.ProcessBuilder("/bin/bash", "-c", String.format("/usr/bin/free -m -s 0.1 -c %s | grep Mem:",
- samplesToTakeForMemory));
+ if (new File("/bin/bash").exists()
+ && new File("/bin/cat").exists()
+ && new File("/bin/grep").exists()
+ && new File("/proc/meminfo").exists()) {
+ java.lang.ProcessBuilder processBuilder =
+ new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/cat /proc/meminfo | grep -E \"MemTotal|MemFree\"");
+ try {
+ ArrayList<String> output = new ArrayList<String>();
+ Process process = processBuilder.start();
+ process.waitFor();
+ InputStream inputStream = process.getInputStream();
try {
- List<String> output = new ArrayList<String>();
- Process process = processBuilder.start();
- process.waitFor();
- InputStream inputStream = process.getInputStream();
- try {
- java.io.BufferedReader reader =
- new java.io.BufferedReader(new InputStreamReader(inputStream));
- String line = null;
- while ((line = reader.readLine()) != null) {
- output.add(line);
- }
- }finally {
- inputStream.close();
+ java.io.BufferedReader reader =
+ new java.io.BufferedReader(new InputStreamReader(inputStream));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ output.add(line);
}
- // process the output from bash call.
- if (output.size() > 0) {
- long totalMemory = 0 ;
- long freeMemory = 0 ;
- int sampleCount = 0 ;
- for(String line : output){
- String[] splitedresult = line.split("\\s+");
- // expected return format -
- // "Mem:" | total | used | free | shared | buffers | cached
- if (splitedresult.length == 7){
- // create a temp copy of all the readings, if anything goes wrong, we drop the
- // temp reading and move on.
- long tmp_totalMemory = 0 ;
- long tmp_freeMemory = 0 ;
- try {
- tmp_totalMemory = Long.parseLong(splitedresult[1]);
- tmp_freeMemory = Long.parseLong(splitedresult[3]);
-
- } catch(NumberFormatException e){
- logger.error("skipping the unprocessable line from the output -" + line);
- continue;
- }
+ }finally {
+ inputStream.close();
+ }
- // add up the result.
- ++sampleCount;
- totalMemory += tmp_totalMemory ;
- freeMemory += tmp_freeMemory;
+ long totalMemory = 0;
+ long freeMemory = 0;
+
+ // process the output from bash call.
+ // we expect the result from the bash call to be something like following -
+ // MemTotal: 65894264 kB
+ // MemFree: 61104076 kB
+ if (output.size() == 2) {
+ for (String result : output){
+ // find the total memory and value the variable.
+ if (result.contains("MemTotal") && result.split("\\s+").length > 2){
+ try {
+ totalMemory = Long.parseLong(result.split("\\s+")[1]);
+ logger.info("Total memory : " + totalMemory);
+ }catch(NumberFormatException e){
+ logger.error("yielding 0 for total memory as output is invalid -" + result);
}
}
-
- // set the value.
- if (sampleCount > 0){
- freeMemory = freeMemory / sampleCount;
- totalMemory = totalMemory / sampleCount;
- logger.info(String.format("total memory - %s , free memory - %s", totalMemory, freeMemory));
- stats.setRemainingMemory(freeMemory);
- stats.setRemainingMemoryPercent(totalMemory == 0? 0 :
- ((double)freeMemory/(double)totalMemory) * 100);
+ // find the free memory and value the variable.
+ if (result.contains("MemFree") && result.split("\\s+").length > 2){
+ try {
+ freeMemory = Long.parseLong(result.split("\\s+")[1]);
+ logger.info("Free memory : " + freeMemory);
+ }catch(NumberFormatException e){
+ logger.error("yielding 0 for total memory as output is invalid -" + result);
+ }
}
}
+ }else {
+ logger.error("failed to get total/free memory info as the bash call returned invalid result.");
}
- catch (Exception ex){
- logger.error("failed fetch system memory info " +
- "as exception is captured when fetching result from bash call. ex -" + ex.getMessage());
- }
- } else {
- logger.error("failed fetch system memory info " +
- "as 'bash' or 'free' command can't be found on the current system.");
+
+ // the number got from the proc file is in KBs we want to see the number in MBs so we are deviding it by 1024.
+ stats.setRemainingMemoryInMB(freeMemory/1024);
+ stats.setRemainingMemoryPercent(totalMemory == 0 ? 0 : ((double)freeMemory / (double)totalMemory)*100);
+ }
+ catch (Exception ex){
+ logger.error("failed fetch system memory info " +
+ "as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
}
+ } else {
+ logger.error("failed fetch system memory info, one or more files from the following list are missing - " +
+ "'/bin/bash'," + "'/bin/cat'," +"'/proc/loadavg'");
+ }
}
/**
@@ -237,10 +235,11 @@ public class ServerStatisticsServlet extends HttpServlet {
double cpuUsage = 0.0;
try {
- cpuUsage = Double.parseDouble(splitedresult[1].split("%")[0]);
+ cpuUsage = Double.parseDouble(splitedresult[0]);
}catch(NumberFormatException e){
logger.error("yielding 0.0 for CPU usage as output is invalid -" + output.get(0));
}
+ logger.info("System load : " + cpuUsage);
stats.setCpuUpsage(cpuUsage);
}
}
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
index b754158..55078b5 100644
--- a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -36,7 +36,7 @@ public class StatisticsServletTest {
this.statServlet.callFillRemainingMemoryPercent(stats);
// assume any machine that runs this test should
// have bash and top available and at least got some remaining memory.
- Assert.assertTrue(stats.getRemainingMemory() > 0);
+ Assert.assertTrue(stats.getRemainingMemoryInMB() > 0);
Assert.assertTrue(stats.getRemainingMemoryPercent() > 0);
}
@@ -51,7 +51,7 @@ public class StatisticsServletTest {
public void testPopulateStatistics() {
this.statServlet.callPopulateStatistics();
Assert.assertNotNull(this.statServlet.getStastics());
- Assert.assertTrue(this.statServlet.getStastics().getRemainingMemory() > 0);
+ Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryInMB() > 0);
Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryPercent() > 0);
Assert.assertTrue(this.statServlet.getStastics().getCpuUsage() > 0);
}