diff --git a/azkaban-common/src/main/java/azkaban/executor/Statistics.java b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
index 5f02052..7c80f88 100644
--- a/azkaban-common/src/main/java/azkaban/executor/Statistics.java
+++ b/azkaban-common/src/main/java/azkaban/executor/Statistics.java
@@ -17,12 +17,13 @@
package azkaban.executor;
import java.util.Date;
+import java.util.Map;
public class Statistics {
private double remainingMemoryPercent;
private long remainingMemory;
private int remainingFlowCapacity;
- private Date lastDispatched;
+ private Date lastDispatchedTime;
private long remainingStorage;
private double cpuUsage;
private int priority;
@@ -60,11 +61,11 @@ import java.util.Date;
}
public Date getLastDispatchedTime(){
- return this.lastDispatched;
+ return this.lastDispatchedTime;
}
public void setLastDispatchedTime(Date value){
- this.lastDispatched = value;
+ this.lastDispatchedTime = value;
}
public long getRemainingStorage() {
@@ -79,6 +80,10 @@ import java.util.Date;
return this.priority;
}
+ public void setPriority (int value) {
+ this.priority = value;
+ }
+
public Statistics(){}
public Statistics (double remainingMemoryPercent,
@@ -94,6 +99,71 @@ import java.util.Date;
this.priority = priority;
this.remainingMemoryPercent = remainingMemoryPercent;
this.remainingStorage = remainingStorage;
- this.lastDispatched = lastDispatched;
+ this.lastDispatchedTime = lastDispatched;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof Statistics)
+ {
+ boolean result = true;
+ Statistics stat = (Statistics) obj;
+
+ result &=this.remainingMemory == stat.remainingMemory;
+ result &=this.cpuUsage == stat.cpuUsage;
+ result &=this.remainingFlowCapacity == stat.remainingFlowCapacity;
+ result &=this.priority == stat.priority;
+ result &=this.remainingMemoryPercent == stat.remainingMemoryPercent;
+ result &=this.remainingStorage == stat.remainingStorage;
+ result &= null == this.lastDispatchedTime ? stat.lastDispatchedTime == null :
+ this.lastDispatchedTime.equals(stat.lastDispatchedTime);
+ return result;
+ }
+ return false;
+ }
+
+
+ // really ugly to have it home-made here for object-binding as base on the
+ // current code base there is no any better ways to do that.
+ public static Statistics fromJsonObject(Map<String,Object> mapObj){
+ if (null == mapObj) return null ;
+ Statistics stats = new Statistics ();
+
+ final String remainingMemory = "remainingMemory";
+ if (mapObj.containsKey(remainingMemory) && null != mapObj.get(remainingMemory)){
+ stats.setRemainingMemory(Long.parseLong(mapObj.get(remainingMemory).toString()));
+ }
+
+ final String cpuUsage = "cpuUsage";
+ if (mapObj.containsKey(cpuUsage) && null != mapObj.get(cpuUsage)){
+ stats.setCpuUpsage(Double.parseDouble(mapObj.get(cpuUsage).toString()));
+ }
+
+ final String remainingFlowCapacity = "remainingFlowCapacity";
+ if (mapObj.containsKey(remainingFlowCapacity) && null != mapObj.get(remainingFlowCapacity)){
+ stats.setRemainingFlowCapacity(Integer.parseInt(mapObj.get(remainingFlowCapacity).toString()));
+ }
+
+ final String priority = "priority";
+ if (mapObj.containsKey(priority) && null != mapObj.get(priority)){
+ stats.setPriority(Integer.parseInt(mapObj.get(priority).toString()));
+ }
+
+ final String remainingMemoryPercent = "remainingMemoryPercent";
+ if (mapObj.containsKey(remainingMemoryPercent) && null != mapObj.get(remainingMemoryPercent)){
+ stats.setRemainingMemoryPercent(Double.parseDouble(mapObj.get(remainingMemoryPercent).toString()));
+ }
+
+ final String remainingStorage = "remainingStorage";
+ if (mapObj.containsKey(remainingStorage) && null != mapObj.get(remainingStorage)){
+ stats.setRemainingStorage(Long.parseLong(mapObj.get(remainingStorage).toString()));
+ }
+
+ final String lastDispatched = "lastDispatchedTime";
+ if (mapObj.containsKey(lastDispatched) && null != mapObj.get(lastDispatched)){
+ stats.setLastDispatchedTime(new Date(Long.parseLong(mapObj.get(lastDispatched).toString())));
+ }
+ return stats;
}
}
diff --git a/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java b/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
index 56a6880..3e260b9 100644
--- a/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
+++ b/azkaban-execserver/src/main/java/azkaban/execapp/StatisticsServlet.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package azkaban.execapp;
import java.io.File;
@@ -5,7 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
+
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@@ -18,7 +36,11 @@ import azkaban.utils.JSONUtils;
public class StatisticsServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
- private static final Logger logger = Logger.getLogger(JMXHttpServlet.class);
+ private static final int cacheTimeInMilliseconds = 1000;
+ private static final Logger logger = Logger.getLogger(StatisticsServlet.class);
+
+ protected static Date lastRefreshedTime = null;
+ protected static Statistics cachedstats = null;
/**
* Handle all get request to Statistics Servlet {@inheritDoc}
@@ -29,34 +51,12 @@ public class StatisticsServlet extends HttpServlet {
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
- final Statistics stats = new Statistics();
-
- List<Thread> workerPool = new ArrayList<Thread>();
- workerPool.add(new Thread(new Runnable(){ public void run() {
- fillRemainingMemoryPercent(stats); }},"RemainingMemoryPercent"));
-
- workerPool.add(new Thread(new Runnable(){ public void run() {
- fillRemainingFlowCapacityAndLastDispatchedTime(stats); }},"RemainingFlowCapacityAndLastDispatchedTime"));
-
- workerPool.add(new Thread(new Runnable(){ public void run() {
- fillCpuUsage(stats); }},"CpuUsage"));
-
- // start all the working threads.
- for (Thread thread : workerPool){thread.start();}
-
- // wait for all the threads to finish their work.
- // NOTE: the result container itself is not thread safe, we are good as for now no
- // working thread will modify the same property, nor have any of them
- // need to compute values based on value(s) of other properties.
- for (Thread thread : workerPool){
- try {
- thread.join();
- } catch (InterruptedException e) {
- logger.error(String.format("failed to collect information for %s as the working thread is interrupted.",
- thread.getName()));
- }}
+ if (null == lastRefreshedTime ||
+ new Date().getTime() - lastRefreshedTime.getTime() > cacheTimeInMilliseconds){
+ this.populateStatistics();
+ }
- JSONUtils.toJSON(stats, resp.getOutputStream(), true);
+ JSONUtils.toJSON(cachedstats, resp.getOutputStream(), true);
}
/**
@@ -68,33 +68,62 @@ public class StatisticsServlet extends HttpServlet {
* a double value will be used to present the remaining memory,
* a returning value of '55.6' means 55.6%
*/
- private void fillRemainingMemoryPercent(Statistics stats){
+ protected void fillRemainingMemoryPercent(Statistics stats){
if (new File("/bin/bash").exists() && new File("/usr/bin/free").exists()) {
java.lang.ProcessBuilder processBuilder =
- new java.lang.ProcessBuilder("/bin/bash", "-c", "/usr/bin/free -m | grep Mem:");
+ new java.lang.ProcessBuilder("/bin/bash", "-c", "/usr/bin/free -m -s 0.1 -c 5 | grep Mem:");
try {
- String line = null;
+ ArrayList<String> output = new ArrayList<String>();
Process process = processBuilder.start();
process.waitFor();
InputStream inputStream = process.getInputStream();
try {
java.io.BufferedReader reader =
new java.io.BufferedReader(new InputStreamReader(inputStream));
- // we expect the call returns and only returns one line.
- line = reader.readLine();
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ output.add(line);
+ }
}finally {
inputStream.close();
}
-
- logger.info("result from bash call - " + null == line ? "(null)" : line);
// process the output from bash call.
- if (null != line && line.length() > 0) {
- String[] splitedresult = line.split("\\s+");
- if (splitedresult.length == 7){
+ if (output.size() > 0) {
+ long totalMemory = 0 ;
+ long freeMemory = 0 ;
+ int sampleCount = 0 ;
+
+ // process all the output, we will do 5 samples.
+ for(String line : output){
+ String[] splitedresult = line.split("\\s+");
// expected return format -
// "Mem:" | total | used | free | shared | buffers | cached
- Long totalMemory = Long.parseLong(splitedresult[1]);
- Long freeMemory = Long.parseLong(splitedresult[3]);
+ if (splitedresult.length == 7){
+ // create a temp copy of all the readings, if anything goes wrong, we drop the
+ // temp reading and move on.
+ long tmp_totalMemory = 0 ;
+ long tmp_freeMemory = 0 ;
+ try {
+ tmp_totalMemory = Long.parseLong(splitedresult[1]);
+ tmp_freeMemory = Long.parseLong(splitedresult[3]);
+
+ } catch(NumberFormatException e){
+ logger.error("skipping the unprocessable line from the output -" + line);
+ continue;
+ }
+
+ // add up the result.
+ ++sampleCount;
+ totalMemory += tmp_totalMemory ;
+ freeMemory += tmp_freeMemory;
+ }
+ }
+
+ // set the value.
+ if (sampleCount > 0){
+ freeMemory = freeMemory / sampleCount;
+ totalMemory = totalMemory / sampleCount;
+ logger.info(String.format("total memory - %s , free memory - %s", totalMemory, freeMemory));
stats.setRemainingMemory(freeMemory);
stats.setRemainingMemoryPercent(totalMemory == 0? 0 :
((double)freeMemory/(double)totalMemory));
@@ -102,10 +131,53 @@ public class StatisticsServlet extends HttpServlet {
}
}
catch (Exception ex){
- logger.error("failed fetch system memory info as exception is captured when fetching result from bash call.");
+ logger.error("failed fetch system memory info " +
+ "as exception is captured when fetching result from bash call.");
}
} else {
- logger.error("failed fetch system memory info as 'bash' or 'free' can't be found on the current system.");
+ logger.error("failed fetch system memory info " +
+ "as 'bash' or 'free' command can't be found on the current system.");
+ }
+ }
+
+ /**
+ * call the data providers to fill the returning data container for statistics data.
+ * This function refreshes the static cached copy of data in case if necessary.
+ * */
+ protected synchronized void populateStatistics(){
+ //check again before starting the work.
+ if (null == lastRefreshedTime ||
+ new Date().getTime() - lastRefreshedTime.getTime() > cacheTimeInMilliseconds){
+ final Statistics stats = new Statistics();
+
+ List<Thread> workerPool = new ArrayList<Thread>();
+ workerPool.add(new Thread(new Runnable(){ public void run() {
+ fillRemainingMemoryPercent(stats); }},"RemainingMemoryPercent"));
+
+ workerPool.add(new Thread(new Runnable(){ public void run() {
+ fillRemainingFlowCapacityAndLastDispatchedTime(stats); }},"RemainingFlowCapacityAndLastDispatchedTime"));
+
+ workerPool.add(new Thread(new Runnable(){ public void run() {
+ fillCpuUsage(stats); }},"CpuUsage"));
+
+ // start all the working threads.
+ for (Thread thread : workerPool){thread.start();}
+
+ // wait for all the threads to finish their work.
+ // NOTE: the result container itself is not thread safe, we are good as for now no
+ // working thread will modify the same property, nor have any of them
+ // need to compute values based on value(s) of other properties.
+ for (Thread thread : workerPool){
+ try {
+ // we gave maxim 5 seconds to let the thread finish work.
+ thread.join(5000);;
+ } catch (InterruptedException e) {
+ logger.error(String.format("failed to collect information for %s as the working thread is interrupted.",
+ thread.getName()));
+ }}
+
+ cachedstats = stats;
+ lastRefreshedTime = new Date();
}
}
@@ -114,12 +186,19 @@ public class StatisticsServlet extends HttpServlet {
* @param stats reference to the result container which contains all the results, this specific method
* will only work on the property "remainingFlowCapacity".
*/
- private void fillRemainingFlowCapacityAndLastDispatchedTime(Statistics stats){
- FlowRunnerManager runnerMgr = AzkabanExecutorServer.getApp().getFlowRunnerManager();
- stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() -
- runnerMgr.getNumRunningFlows() -
- runnerMgr.getNumQueuedFlows());
- stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
+ protected void fillRemainingFlowCapacityAndLastDispatchedTime(Statistics stats){
+
+ AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
+ if (server != null){
+ FlowRunnerManager runnerMgr = AzkabanExecutorServer.getApp().getFlowRunnerManager();
+ stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() -
+ runnerMgr.getNumRunningFlows() -
+ runnerMgr.getNumQueuedFlows());
+ stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
+ }else {
+ logger.error("failed to get data for remaining flow capacity or LastDispatchedTime" +
+ " as the AzkabanExecutorServer has yet been initialized.");
+ }
}
@@ -128,10 +207,10 @@ public class StatisticsServlet extends HttpServlet {
* @param stats reference to the result container which contains all the results, this specific method
* will only work on the property "cpuUdage".
*/
- private void fillCpuUsage(Statistics stats){
+ protected void fillCpuUsage(Statistics stats){
if (new File("/bin/bash").exists() && new File("/usr/bin/top").exists()) {
java.lang.ProcessBuilder processBuilder =
- new java.lang.ProcessBuilder("/bin/bash", "-c", "/usr/bin/top -bn4 | grep \"Cpu(s)\"");
+ new java.lang.ProcessBuilder("/bin/bash", "-c", "/usr/bin/top -bn5 -d 0.1 | grep \"Cpu(s)\"");
try {
ArrayList<String> output = new ArrayList<String>();
Process process = processBuilder.start();
@@ -148,7 +227,6 @@ public class StatisticsServlet extends HttpServlet {
inputStream.close();
}
- logger.info("lines of the result from bash call - " + output.size());
// process the output from bash call.
if (output.size() > 0) {
double us = 0 ; // user
@@ -159,9 +237,11 @@ public class StatisticsServlet extends HttpServlet {
// process all the output, we will do 5 samples for the cpu and calculate the avarage.
for(String line : output){
String[] splitedresult = line.split("\\s+");
+ // expected returning format -
+ // Cpu(s): 1.4%us, 0.1%sy, 0.0%ni, 98.5%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
if (splitedresult.length == 9){
- // expected return format -
- // Cpu(s): 1.4%us, 0.1%sy, 0.0%ni, 98.5%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
+ // create a temp copy of all the readings, if anything goes wrong, we drop the
+ // temp reading and move on.
double tmp_us = 0 ; // user
double tmp_sy = 0 ; // system
double tmp_wi = 0 ; // waiting.
@@ -170,7 +250,7 @@ public class StatisticsServlet extends HttpServlet {
tmp_sy = Double.parseDouble(splitedresult[2].split("%")[0]);
tmp_wi = Double.parseDouble(splitedresult[5].split("%")[0]);
} catch(NumberFormatException e){
- logger.error("skipping the line from the output cause it is in unexpected format -" + line);
+ logger.error("skipping the unprocessable line from the output -" + line);
continue;
}
@@ -191,12 +271,14 @@ public class StatisticsServlet extends HttpServlet {
}
}
catch (Exception ex){
- logger.error("failed fetch system memory info as exception is captured when fetching result from bash call.");
+ logger.error("failed fetch system memory info " +
+ "as exception is captured when fetching result from bash call.");
}
} else {
- logger.error("failed fetch system memory info as 'bash' or 'free' can't be found on the current system.");
+ logger.error("failed fetch system memory info " +
+ "as 'bash' or 'top' command can't be found on the current system.");
}
}
- // TO-DO - decide if we need to populate the remaining space and priority info.
+ // TO-DO - decide if we need to populate the remaining Storage space and priority info.
}
diff --git a/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
new file mode 100644
index 0000000..85d38e6
--- /dev/null
+++ b/azkaban-execserver/src/test/java/azkaban/execapp/StatisticsServletTest.java
@@ -0,0 +1,91 @@
+package azkaban.execapp;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import azkaban.executor.Statistics;
+import azkaban.utils.JSONUtils;
+
+public class StatisticsServletTest {
+ private class MockStatisticsServlet extends StatisticsServlet{
+ /** */
+ private static final long serialVersionUID = 1L;
+
+ public Statistics getStastics(){
+ return cachedstats;
+ }
+
+ public Date getUpdatedTime(){
+ return lastRefreshedTime;
+ }
+
+ public void callPopulateStatistics(){
+ this.populateStatistics();
+ }
+
+ public void callFillCpuUsage(Statistics stats){
+ this.fillCpuUsage(stats);}
+
+ public void callFillRemainingMemoryPercent(Statistics stats){
+ this.fillRemainingMemoryPercent(stats);}
+ }
+ private MockStatisticsServlet statServlet = new MockStatisticsServlet();
+
+ @Test
+ public void testFillMemory() {
+ Statistics stats = new Statistics();
+ this.statServlet.callFillRemainingMemoryPercent(stats);
+ // assume any machine that runs this test should
+ // have bash and top available and at least got some remaining memory.
+ Assert.assertTrue(stats.getRemainingMemory() > 0);
+ Assert.assertTrue(stats.getRemainingMemoryPercent() > 0);
+ }
+
+ @Test
+ public void testFillCpu() {
+ Statistics stats = new Statistics();
+ this.statServlet.callFillCpuUsage(stats);
+ Assert.assertTrue(stats.getCpuUsage() > 0);
+ }
+
+ @Test
+ public void testPopulateStatistics() {
+ this.statServlet.callPopulateStatistics();
+ Assert.assertNotNull(this.statServlet.getStastics());
+ Assert.assertTrue(this.statServlet.getStastics().getRemainingMemory() > 0);
+ Assert.assertTrue(this.statServlet.getStastics().getRemainingMemoryPercent() > 0);
+ Assert.assertTrue(this.statServlet.getStastics().getCpuUsage() > 0);
+ }
+
+ @Test
+ public void testPopulateStatisticsCache() {
+ this.statServlet.callPopulateStatistics();
+ final Date updatedTime = this.statServlet.getUpdatedTime();
+ while (new Date().getTime() - updatedTime.getTime() < 1000){
+ this.statServlet.callPopulateStatistics();
+ Assert.assertEquals(updatedTime, this.statServlet.getUpdatedTime());
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+
+ // make sure cache expires after timeout.
+ this.statServlet.callPopulateStatistics();
+ Assert.assertNotEquals(updatedTime, this.statServlet.getUpdatedTime());
+ }
+
+ @Test
+ public void testStatisticsJsonParser() throws IOException {
+ Statistics stat = new Statistics(0.1,1,2,new Date(),3,4,5);
+ String jSonStr = JSONUtils.toJSON(stat);
+ @SuppressWarnings("unchecked")
+ Map<String,Object> jSonObj = (Map<String,Object>)JSONUtils.parseJSONFromString(jSonStr);
+ Statistics stat2 = Statistics.fromJsonObject(jSonObj);
+ Assert.assertTrue(stat.equals(stat2));
+ }
+}