package br.ufrgs.inf.prosoft.tigris.sampling;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.math3.distribution.BinomialDistribution;
import org.apache.commons.math3.ml.neuralnet.sofm.util.ExponentialDecayFunction;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.commons.math3.stat.inference.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* The sampling control and decision.
*/
public class Sampling {
//constructor parameters - defined once
private final boolean adaptiveSamplingRate;
private final long cycleLengthInMilliseconds;
private BinomialDistribution binomialDistSampling;
private double samplingRate; // in percentage, 0 to 1
private double initialSamplingRate; // in percentage, 0 to 1
//control vars
private boolean performanceBaselineEnabled = false;
// recreated every new monitoring cycle
private long startTime;
private ExponentialDecayFunction decayingPrecision;
private FrequencyDataSet population = new FrequencyDataSet(), sample = new FrequencyDataSet();
private PerformanceBaselineDataSet performanceBaselineDataSet = new PerformanceBaselineDataSet();
private Map<Granularity, DescriptiveStatistics> sampledDataSet = new ConcurrentHashMap<>();
Logger logger = LoggerFactory.getLogger(Sampling.class);
/**
* z confidence value, ex: 1.96 for 95%
* p proportion of the population, 0.5 is default
* e margin of error, ex: 0.05 for 5%
*/
private double z = 1.96, p = 0.5, e = 0.05;
public Sampling(double initialSamplingRate, long cycleLengthInMilliseconds, boolean adaptiveSamplingRate) {
this.initialSamplingRate = initialSamplingRate;
this.samplingRate = initialSamplingRate;
this.adaptiveSamplingRate = adaptiveSamplingRate;
this.cycleLengthInMilliseconds = cycleLengthInMilliseconds;
this.binomialDistSampling = new BinomialDistribution(1, samplingRate);
startMonitoringCycle();
}
private Object binomialDistSamplingLock = new Object();
private void resetSamplingDistribution() {
synchronized (this.binomialDistSamplingLock) {
this.binomialDistSampling = new BinomialDistribution(1, samplingRate);
}
}
public boolean simpleSamplingDecision() {
synchronized (this.binomialDistSamplingLock) {
return binomialDistSampling.sample() == 1; // sampling rate evaluation
}
}
public boolean samplingDecision(Granularity granularity, long executionTime) {
if (population.getTotalItems() == 0)
startTime = System.currentTimeMillis();
population.addItem(granularity, executionTime);
if (performanceBaselineEnabled) {
return false;
}
boolean simpleSamplingDecision = simpleSamplingDecision();
if (adaptiveSamplingRate
&& simpleSamplingDecision
&& population.getProportion(granularity) >= sample.getProportion(granularity)
) // sample has not enough items of that granularity compared to the population)
{
return true;
}
return simpleSamplingDecision;
}
public boolean isReady() {
double decayingConfidenceFactor = decayingConfidenceFactor(getMonitoringCycleTime());
boolean hasMinimumSize = sample.getTotalItems() > getMinimumSampleSize(z - (z * decayingConfidenceFactor));
boolean hasSameProportion = isSameProportion(decayingConfidenceFactor);
boolean hasComparedMean = tTestEvaluation(decayingConfidenceFactor);
return adaptiveSamplingRate
// margin of error is lower than threshold
// && getSampleSizeErrorMargin(z * decayingConfidenceFactor) < e
// the sample has the min sample size based on the population
&& hasMinimumSize
// proportion test
&& hasSameProportion
// t-test
&& hasComparedMean;
}
private Object samplingRateLock = new Object();
private boolean reducedInPreviousBaseline = false;
public void adaptSamplingRate(PerformanceBaselineDataSet monitoring,
int currentOperationsPerSecond,
double currentSamplingRate) {
synchronized (samplingRateLock) {
AtomicDouble factor = new AtomicDouble();
if (performanceBaselineEnabled) {
//TODO: every second, saves the req/s and "closes" the baseline
// save and open a new one
// if (Duration.between(baselineWindowStart, Instant.now()).getSeconds() > baselineWindow) {
performanceBaselineEnabled = false;
// }
//is baseline behavior under the "normal" baseline behavior?
if (monitoring.isBaselineUnderAveragePlusStd(factor)) {
// app not struggling
// keeping the sampling rate
logger.info("Baseline: app not struggling -> keeping sampling rate");
reducedInPreviousBaseline = false;
} else {
// app struggling
// reduce the sampling rate by X%
logger.info("Baseline: app struggling -> reduce sampling rate by {}", factor.get());
samplingRate -= (samplingRate * factor.get()) / 100;
reducedInPreviousBaseline = true;
}
monitoring.trackBaselinePerSecond(currentOperationsPerSecond);
} else {
//is monitoring behavior under the "normal" behavior?
if (monitoring.isMonitoringUnderAveragePlusStd(factor)) {
//increase the sampling rate by 1%
logger.info("Monitoring: no impact -> increase sampling rate by {}", factor.get());
samplingRate += (samplingRate * factor.get()) / 100;
reducedInPreviousBaseline = false;
} else {
if (reducedInPreviousBaseline) {
logger.info("App still struggling, reducing sampling rate by 1%.");
samplingRate -= 0.01;
}
if (Duration.between(baselineWindowStart, Instant.now()).getSeconds() > 3) {
baselineWindowStart = Instant.now();
logger.info("Enabling performance baseline, because monitoring seems degraded.");
performanceBaselineEnabled = true;
}
}
monitoring.trackMonitoringPerSecond(currentOperationsPerSecond);
}
if (samplingRate > initialSamplingRate)
samplingRate = initialSamplingRate;
if (samplingRate < 0.01)
samplingRate = 0.01;
this.resetSamplingDistribution();
logger.info("New sampling rate: {} -> {}", currentSamplingRate, samplingRate);
}
}
// public void addPerformanceBaselineItem(Granularity granularity, long executionTime) {
// this.performanceBaselineDataSet.addItem(granularity, executionTime);
// }
public void addSampledItem(Granularity granularity, long executionTime) {
sample.addItem(granularity, executionTime);
DescriptiveStatistics statistics = sampledDataSet.getOrDefault(granularity, new DescriptiveStatistics());
statistics.addValue(executionTime);
sampledDataSet.put(granularity, statistics);
}
public long getMonitoringCycleTime() {
return (System.currentTimeMillis() - startTime);
}
public boolean isPerformanceBaselineEnabled() {
return performanceBaselineEnabled;
}
public double decayingConfidenceFactor(long timeInMilliseconds) {
synchronized (decayingPrecisionLock) {
return new BigDecimal(decayingPrecision.value(timeInMilliseconds))
.setScale(4, BigDecimal.ROUND_FLOOR).doubleValue();
}
}
private boolean tTestEvaluation(double decayingConfidenceFactor) {
SummaryStatistics sampleAsDescriptiveStatistics = sample.getAsDescriptiveStatistics();
if (sampleAsDescriptiveStatistics.getN() < 2) return true;
if (sampleAsDescriptiveStatistics.getVariance() == 0) return true;
SummaryStatistics populationAsDescriptiveStatistics = population.getAsDescriptiveStatistics();
double popMean = populationAsDescriptiveStatistics.getMean();
//for some reason, t-test returns false when the sets are exactly the same...
if (sample.getTotalItems() == population.getTotalItems())
return true;
double significanceLevel = 0.5 - (0.5 * decayingConfidenceFactor);
if (significanceLevel == 0.5) //maximum cycle time reached
return true;
else {
//To test the (one-sample t-test - compare with the population mean)
// hypothesis sample mean = mu at the 95% level
return TestUtils.tTest(popMean,
sampleAsDescriptiveStatistics,
0.5 - (0.5 * decayingConfidenceFactor));
}
}
//sample proportion is the same as population
private boolean isSameProportion(double decayingConfidenceFactor) {
return population.getGranularities().stream().allMatch(
granularity -> {
double popProportion = population.getProportion(granularity);
double samProportion = sample.getProportion(granularity);
double error = popProportion - (popProportion * decayingConfidenceFactor);
return samProportion <= popProportion + error &&
samProportion >= popProportion - error;
});
}
private long getMinimumSampleSize(long n) {
return getMinimumSampleSize(n, z);
}
private long getMinimumSampleSize(double precision) {
return getMinimumSampleSize(population.getTotalItems(), precision);
}
private long getMinimumSampleSize(long n, double precision) {
if (n <= 1) return 0;
long n_inf = (long) ((Math.pow(precision, 2) * p * (1 - p)) / Math.pow(e, 2));
return n_inf / (1 + ((n_inf - 1) / n));
}
private double getSampleSizeErrorMargin(double precision) {
if (population.getTotalItems() <= 1) return 0;
double e_n_inf = Math.sqrt((Math.pow(precision, 2) * p * (1 - p)) / sample.getTotalItems());
return e_n_inf * Math.sqrt((population.getTotalItems() - sample.getTotalItems()) / (population.getTotalItems() - 1));
}
private Object decayingPrecisionLock = new Object();
public void startMonitoringCycle() {
synchronized (decayingPrecisionLock) {
this.decayingPrecision = new ExponentialDecayFunction(1, 0.1, cycleLengthInMilliseconds);
}
this.sample.clear();
this.population.clear();
this.sampledDataSet.clear();
this.startTime = System.currentTimeMillis();
logger.info("Monitoring is reset...");
}
public MonitoringCycle endMonitoringCycle() {
MonitoringCycle monitoringCycle = new MonitoringCycle(getSample(), getPopulation(), getMonitoringCycleTime());
logger.info("Adaptive Sampling Monitoring Cycle Finished: {}", monitoringCycle);
startMonitoringCycle();
return monitoringCycle;
}
private int baselineWindow;
private Instant baselineWindowStart = Instant.now();
// public void managePerformanceBaseline(PerformanceBaselineDataSet performanceMonitoring) {
// if (performanceBaselineEnabled) { //is it already enabled?
// if (Duration.between(baselineWindowStart, Instant.now()).getSeconds() > baselineWindow) {
//// logger.info("Collected performance baseline of {} traces", this.performanceBaselineDataSet.getTotalItems());
// performanceBaselineEnabled = false;
// this.performanceBaselineDataSet.clear();
// }
// return;
// }
//
// double chance = new BinomialDistribution(1, 0.1d).sample();
// if (chance == 1) {
// baselineWindow = 2;//RandomUtils.nextInt(2, 4);
// baselineWindowStart = Instant.now();
//
// logger.info("Enabling performance baseline that needs {} seconds of traces.", baselineWindow);
// performanceBaselineEnabled = true;
// }
//
//
//
//// double chance = new BinomialDistribution(1, 0.1d).sample();
//// if (chance == 1) {
//// baselineWindow = 2;//RandomUtils.nextInt(2, 4);
//// baselineWindowStart = Instant.now();
////
//// logger.info("Enabling performance baseline that needs {} seconds of traces.", baselineWindow);
//// performanceBaselineEnabled = true;
//// }
// }
public FrequencyDataSet getSample() {
return sample;
}
public FrequencyDataSet getPopulation() {
return population;
}
public Map<Granularity, DescriptiveStatistics> getSampledTraces() {
return sampledDataSet;
}
public double getSamplingRate() {
return samplingRate;
}
public boolean isAdaptiveSamplingRate() {
return adaptiveSamplingRate;
}
SummaryStatistics inverselyStats = new SummaryStatistics();
public double adaptSamplingRateInverselyProportional(int operationsPerSecond) {
if (operationsPerSecond == 0)
return samplingRate;
inverselyStats.addValue(operationsPerSecond);
double newSamplingRate =
initialSamplingRate
* ((operationsPerSecond - inverselyStats.getMin()) / (inverselyStats.getMax() - inverselyStats.getMin()));
logger.info("New sampling rate: {} -> {}", samplingRate, newSamplingRate);
samplingRate = newSamplingRate;
if (samplingRate > initialSamplingRate)
samplingRate = initialSamplingRate;
if (samplingRate < 0.01)
samplingRate = 0.01;
return getSamplingRate();
}
}