/*
* Copyright 2015 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.execapp;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
import azkaban.executor.ServerStatistics;
import azkaban.utils.JSONUtils;
public class ServerStatisticsServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final int cacheTimeInMilliseconds = 1000;
private static final int samplesToTakeForMemory = 1;
private static final int samplesToTakeForCpuUsage = 1;
private static final Logger logger = Logger.getLogger(ServerStatisticsServlet.class);
private static final String noCacheParamName = "nocache";
protected static long lastRefreshedTime = 0;
protected static ServerStatistics cachedstats = null;
/**
* Handle all get request to Statistics Servlet {@inheritDoc}
*
* @see javax.servlet.http.HttpServlet#doGet(javax.servlet.http.HttpServletRequest,
* javax.servlet.http.HttpServletResponse)
*/
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
boolean noCache = null!= req && Boolean.valueOf(req.getParameter(noCacheParamName));
if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds){
this.populateStatistics(noCache);
}
JSONUtils.toJSON(cachedstats, resp.getOutputStream(), true);
}
/**
* fill the result set with the percent of the remaining system memory on the server.
* @param stats reference to the result container which contains all the results, this specific method
* will only work work on the property "remainingMemory" and "remainingMemoryPercent".
*
* NOTE:
* a double value will be used to present the remaining memory,
* a returning value of '55.6' means 55.6%
*/
protected void fillRemainingMemoryPercent(ServerStatistics stats){
if (new File("/bin/bash").exists() && new File("/usr/bin/free").exists()) {
java.lang.ProcessBuilder processBuilder =
new java.lang.ProcessBuilder("/bin/bash", "-c", String.format("/usr/bin/free -m -s 0.1 -c %s | grep Mem:",
samplesToTakeForMemory));
try {
List<String> output = new ArrayList<String>();
Process process = processBuilder.start();
process.waitFor();
InputStream inputStream = process.getInputStream();
try {
java.io.BufferedReader reader =
new java.io.BufferedReader(new InputStreamReader(inputStream));
String line = null;
while ((line = reader.readLine()) != null) {
output.add(line);
}
}finally {
inputStream.close();
}
// process the output from bash call.
if (output.size() > 0) {
long totalMemory = 0 ;
long freeMemory = 0 ;
int sampleCount = 0 ;
for(String line : output){
String[] splitedresult = line.split("\\s+");
// expected return format -
// "Mem:" | total | used | free | shared | buffers | cached
if (splitedresult.length == 7){
// create a temp copy of all the readings, if anything goes wrong, we drop the
// temp reading and move on.
long tmp_totalMemory = 0 ;
long tmp_freeMemory = 0 ;
try {
tmp_totalMemory = Long.parseLong(splitedresult[1]);
tmp_freeMemory = Long.parseLong(splitedresult[3]);
} catch(NumberFormatException e){
logger.error("skipping the unprocessable line from the output -" + line);
continue;
}
// add up the result.
++sampleCount;
totalMemory += tmp_totalMemory ;
freeMemory += tmp_freeMemory;
}
}
// set the value.
if (sampleCount > 0){
freeMemory = freeMemory / sampleCount;
totalMemory = totalMemory / sampleCount;
logger.info(String.format("total memory - %s , free memory - %s", totalMemory, freeMemory));
stats.setRemainingMemory(freeMemory);
stats.setRemainingMemoryPercent(totalMemory == 0? 0 :
((double)freeMemory/(double)totalMemory) * 100);
}
}
}
catch (Exception ex){
logger.error("failed fetch system memory info " +
"as exception is captured when fetching result from bash call. ex -" + ex.getMessage());
}
} else {
logger.error("failed fetch system memory info " +
"as 'bash' or 'free' command can't be found on the current system.");
}
}
/**
* call the data providers to fill the returning data container for statistics data.
* This function refreshes the static cached copy of data in case if necessary.
* */
protected synchronized void populateStatistics(boolean noCache){
//check again before starting the work.
if (noCache || System.currentTimeMillis() - lastRefreshedTime > cacheTimeInMilliseconds){
final ServerStatistics stats = new ServerStatistics();
List<Thread> workerPool = new ArrayList<Thread>();
workerPool.add(new Thread(new Runnable(){ public void run() {
fillRemainingMemoryPercent(stats); }},"RemainingMemoryPercent"));
workerPool.add(new Thread(new Runnable(){ public void run() {
fillRemainingFlowCapacityAndLastDispatchedTime(stats); }},"RemainingFlowCapacityAndLastDispatchedTime"));
workerPool.add(new Thread(new Runnable(){ public void run() {
fillCpuUsage(stats); }},"CpuUsage"));
// start all the working threads.
for (Thread thread : workerPool){thread.start();}
// wait for all the threads to finish their work.
// NOTE: the result container itself is not thread safe, we are good as for now no
// working thread will modify the same property, nor have any of them
// need to compute values based on value(s) of other properties.
for (Thread thread : workerPool){
try {
// we gave maxim 5 seconds to let the thread finish work.
thread.join(5000);;
} catch (InterruptedException e) {
logger.error(String.format("failed to collect information for %s as the working thread is interrupted. Ex - ",
thread.getName(), e.getMessage()));
}}
cachedstats = stats;
lastRefreshedTime = System.currentTimeMillis();
}
}
/**
* fill the result set with the remaining flow capacity .
* @param stats reference to the result container which contains all the results, this specific method
* will only work on the property "remainingFlowCapacity".
*/
protected void fillRemainingFlowCapacityAndLastDispatchedTime(ServerStatistics stats){
AzkabanExecutorServer server = AzkabanExecutorServer.getApp();
if (server != null){
FlowRunnerManager runnerMgr = AzkabanExecutorServer.getApp().getFlowRunnerManager();
int assignedFlows = runnerMgr.getNumRunningFlows() + runnerMgr.getNumQueuedFlows();
stats.setRemainingFlowCapacity(runnerMgr.getMaxNumRunningFlows() - assignedFlows);
stats.setNumberOfAssignedFlows(assignedFlows);
stats.setLastDispatchedTime(runnerMgr.getLastFlowSubmittedTime());
}else {
logger.error("failed to get data for remaining flow capacity or LastDispatchedTime" +
" as the AzkabanExecutorServer has yet been initialized.");
}
}
/**
* fill the result set with the Remaining temp Storage .
* @param stats reference to the result container which contains all the results, this specific method
* will only work on the property "cpuUdage".
*/
protected void fillCpuUsage(ServerStatistics stats){
if (new File("/bin/bash").exists() && new File("/usr/bin/top").exists()) {
java.lang.ProcessBuilder processBuilder =
new java.lang.ProcessBuilder("/bin/bash", "-c", String.format("/usr/bin/top -bn%s -d 0.1 | grep \"Cpu(s)\"",
samplesToTakeForCpuUsage));
try {
ArrayList<String> output = new ArrayList<String>();
Process process = processBuilder.start();
process.waitFor();
InputStream inputStream = process.getInputStream();
try {
java.io.BufferedReader reader =
new java.io.BufferedReader(new InputStreamReader(inputStream));
String line = null;
while ((line = reader.readLine()) != null) {
output.add(line);
}
}finally {
inputStream.close();
}
// process the output from bash call.
if (output.size() > 0) {
double us = 0 ; // user
double sy = 0 ; // system
double wi = 0 ; // waiting.
int sampleCount = 0;
for(String line : output){
String[] splitedresult = line.split("\\s+");
// expected returning format -
// Cpu(s): 1.4%us, 0.1%sy, 0.0%ni, 98.5%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
if (splitedresult.length == 9){
// create a temp copy of all the readings, if anything goes wrong, we drop the
// temp reading and move on.
double tmp_us = 0 ; // user
double tmp_sy = 0 ; // system
double tmp_wi = 0 ; // waiting.
try {
tmp_us = Double.parseDouble(splitedresult[1].split("%")[0]);
tmp_sy = Double.parseDouble(splitedresult[2].split("%")[0]);
tmp_wi = Double.parseDouble(splitedresult[5].split("%")[0]);
} catch(NumberFormatException e){
logger.error("skipping the unprocessable line from the output -" + line);
continue;
}
// add up the result.
++sampleCount;
us += tmp_us;
sy += tmp_sy;
wi += tmp_wi;
}
}
// set the value.
if (sampleCount > 0){
double finalResult = (us + sy + wi)/sampleCount;
logger.info("Cpu usage result - " + finalResult );
stats.setCpuUpsage(finalResult);
}
}
}
catch (Exception ex){
logger.error("failed fetch system memory info " +
"as exception is captured when fetching result from bash call. Ex -" + ex.getMessage());
}
} else {
logger.error("failed fetch system memory info " +
"as 'bash' or 'top' command can't be found on the current system.");
}
}
}