Sampling.java

360 lines | 14.396 kB Blame History Raw Download
package br.ufrgs.inf.prosoft.tigris.sampling;

import com.google.common.util.concurrent.AtomicDouble;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.math3.distribution.BinomialDistribution;
import org.apache.commons.math3.ml.neuralnet.sofm.util.ExponentialDecayFunction;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.commons.math3.stat.inference.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * The sampling control and decision.
 */
public class Sampling {

    //constructor parameters - defined once
    private final boolean adaptiveSamplingRate;
    private final long cycleLengthInMilliseconds;
    private BinomialDistribution binomialDistSampling;
    private double samplingRate; // in percentage, 0 to 1
    private double initialSamplingRate; // in percentage, 0 to 1

    //control vars
    private boolean performanceBaselineEnabled = false;

    // recreated every new monitoring cycle
    private long startTime;
    private ExponentialDecayFunction decayingPrecision;
    private FrequencyDataSet population = new FrequencyDataSet(), sample = new FrequencyDataSet();
    private PerformanceBaselineDataSet performanceBaselineDataSet = new PerformanceBaselineDataSet();
    private Map<Granularity, DescriptiveStatistics> sampledDataSet = new ConcurrentHashMap<>();

    Logger logger = LoggerFactory.getLogger(Sampling.class);

    /**
     * z confidence value, ex: 1.96 for 95%
     * p proportion of the population, 0.5 is default
     * e margin of error, ex: 0.05 for 5%
     */
    private double z = 1.96, p = 0.5, e = 0.05;

    public Sampling(double initialSamplingRate, long cycleLengthInMilliseconds, boolean adaptiveSamplingRate) {
        this.initialSamplingRate = initialSamplingRate;
        this.samplingRate = initialSamplingRate;
        this.adaptiveSamplingRate = adaptiveSamplingRate;
        this.cycleLengthInMilliseconds = cycleLengthInMilliseconds;
        this.binomialDistSampling = new BinomialDistribution(1, samplingRate);
        startMonitoringCycle();
    }

    private Object binomialDistSamplingLock = new Object();

    private void resetSamplingDistribution() {
        synchronized (this.binomialDistSamplingLock) {
            this.binomialDistSampling = new BinomialDistribution(1, samplingRate);
        }
    }

    public boolean simpleSamplingDecision() {
        synchronized (this.binomialDistSamplingLock) {
            return binomialDistSampling.sample() == 1; // sampling rate evaluation
        }
    }

    public boolean samplingDecision(Granularity granularity, long executionTime) {
        if (population.getTotalItems() == 0)
            startTime = System.currentTimeMillis();
        population.addItem(granularity, executionTime);

        if (performanceBaselineEnabled) {
            return false;
        }

        boolean simpleSamplingDecision = simpleSamplingDecision();
        if (adaptiveSamplingRate
                && simpleSamplingDecision
                && population.getProportion(granularity) >= sample.getProportion(granularity)
        ) // sample has not enough items of that granularity compared to the population)
        {
            return true;
        }
        return simpleSamplingDecision;
    }

    public boolean isReady() {
        double decayingConfidenceFactor = decayingConfidenceFactor(getMonitoringCycleTime());
        boolean hasMinimumSize = sample.getTotalItems() > getMinimumSampleSize(z - (z * decayingConfidenceFactor));
        boolean hasSameProportion = isSameProportion(decayingConfidenceFactor);
        boolean hasComparedMean = tTestEvaluation(decayingConfidenceFactor);

        return adaptiveSamplingRate
                // margin of error is lower than threshold
//                    && getSampleSizeErrorMargin(z * decayingConfidenceFactor) < e
                // the sample has the min sample size based on the population
                && hasMinimumSize
                // proportion test
                && hasSameProportion
                // t-test
                && hasComparedMean;
    }

    private Object samplingRateLock = new Object();

    private boolean reducedInPreviousBaseline = false;

    public void adaptSamplingRate(PerformanceBaselineDataSet monitoring,
                                  int currentOperationsPerSecond,
                                  double currentSamplingRate) {
        synchronized (samplingRateLock) {

            AtomicDouble factor = new AtomicDouble();
            if (performanceBaselineEnabled) {
                //TODO: every second, saves the req/s and "closes" the baseline
                // save and open a new one
//                if (Duration.between(baselineWindowStart, Instant.now()).getSeconds() > baselineWindow) {
                    performanceBaselineEnabled = false;
//                }

                //is baseline behavior under the "normal" baseline behavior?
                if (monitoring.isBaselineUnderAveragePlusStd(factor)) {
                    // app not struggling
                    // keeping the sampling rate
                    logger.info("Baseline: app not struggling -> keeping sampling rate");
                    reducedInPreviousBaseline = false;
                } else {
                    // app struggling
                    // reduce the sampling rate by X%
                    logger.info("Baseline: app struggling -> reduce sampling rate by {}", factor.get());
                    samplingRate -= (samplingRate * factor.get()) / 100;
                    reducedInPreviousBaseline = true;
                }

                monitoring.trackBaselinePerSecond(currentOperationsPerSecond);
            } else {
                //is monitoring behavior under the "normal" behavior?
                if (monitoring.isMonitoringUnderAveragePlusStd(factor)) {
                    //increase the sampling rate by 1%
                    logger.info("Monitoring: no impact -> increase sampling rate by {}", factor.get());
                    samplingRate += (samplingRate * factor.get()) / 100;
                    reducedInPreviousBaseline = false;
                } else {
                    if (reducedInPreviousBaseline) {
                        logger.info("App still struggling, reducing sampling rate by 1%.");
                        samplingRate -= 0.01;
                    }

                    if (Duration.between(baselineWindowStart, Instant.now()).getSeconds() > 3) {
                        baselineWindowStart = Instant.now();
                        logger.info("Enabling performance baseline, because monitoring seems degraded.");
                        performanceBaselineEnabled = true;
                    }
                }
                monitoring.trackMonitoringPerSecond(currentOperationsPerSecond);
            }

            if (samplingRate > initialSamplingRate)
                samplingRate = initialSamplingRate;

            if (samplingRate < 0.01)
                samplingRate = 0.01;

            this.resetSamplingDistribution();
            logger.info("New sampling rate: {} -> {}", currentSamplingRate, samplingRate);
        }

    }

//    public void addPerformanceBaselineItem(Granularity granularity, long executionTime) {
//        this.performanceBaselineDataSet.addItem(granularity, executionTime);
//    }

    public void addSampledItem(Granularity granularity, long executionTime) {
        sample.addItem(granularity, executionTime);

        DescriptiveStatistics statistics = sampledDataSet.getOrDefault(granularity, new DescriptiveStatistics());
        statistics.addValue(executionTime);
        sampledDataSet.put(granularity, statistics);
    }

    public long getMonitoringCycleTime() {
        return (System.currentTimeMillis() - startTime);
    }

    public boolean isPerformanceBaselineEnabled() {
        return performanceBaselineEnabled;
    }

    public double decayingConfidenceFactor(long timeInMilliseconds) {
        synchronized (decayingPrecisionLock) {
            return new BigDecimal(decayingPrecision.value(timeInMilliseconds))
                    .setScale(4, BigDecimal.ROUND_FLOOR).doubleValue();
        }
    }

    private boolean tTestEvaluation(double decayingConfidenceFactor) {
        SummaryStatistics sampleAsDescriptiveStatistics = sample.getAsDescriptiveStatistics();
        if (sampleAsDescriptiveStatistics.getN() < 2) return true;

        if (sampleAsDescriptiveStatistics.getVariance() == 0) return true;

        SummaryStatistics populationAsDescriptiveStatistics = population.getAsDescriptiveStatistics();
        double popMean = populationAsDescriptiveStatistics.getMean();

        //for some reason, t-test returns false when the sets are exactly the same...
        if (sample.getTotalItems() == population.getTotalItems())
            return true;

        double significanceLevel = 0.5 - (0.5 * decayingConfidenceFactor);
        if (significanceLevel == 0.5) //maximum cycle time reached
            return true;
        else {
            //To test the (one-sample t-test - compare with the population mean)
            // hypothesis sample mean = mu at the 95% level
            return TestUtils.tTest(popMean,
                    sampleAsDescriptiveStatistics,
                    0.5 - (0.5 * decayingConfidenceFactor));
        }
    }

    //sample proportion is the same as population
    private boolean isSameProportion(double decayingConfidenceFactor) {
        if (decayingConfidenceFactor == 0.0) //maximum cycle time reached
            return true;

        return population.getGranularities().stream().allMatch(
                granularity -> {
                    double popProportion = population.getProportion(granularity);
                    double samProportion = sample.getProportion(granularity);
                    double error = popProportion - (popProportion * decayingConfidenceFactor);

                    return (samProportion > popProportion) || (samProportion <= popProportion + error &&
                            samProportion >= popProportion - error);
                });
    }

    private long getMinimumSampleSize(long n) {
        return getMinimumSampleSize(n, z);
    }

    private long getMinimumSampleSize(double precision) {
        return getMinimumSampleSize(population.getTotalItems(), precision);
    }

    private long getMinimumSampleSize(long n, double precision) {
        if (n <= 1) return 0;
        long n_inf = (long) ((Math.pow(precision, 2) * p * (1 - p)) / Math.pow(e, 2));
        return n_inf / (1 + ((n_inf - 1) / n));
    }

    private double getSampleSizeErrorMargin(double precision) {
        if (population.getTotalItems() <= 1) return 0;
        double e_n_inf = Math.sqrt((Math.pow(precision, 2) * p * (1 - p)) / sample.getTotalItems());
        return e_n_inf * Math.sqrt((population.getTotalItems() - sample.getTotalItems()) / (population.getTotalItems() - 1));
    }

    private Object decayingPrecisionLock = new Object();

    public void startMonitoringCycle() {
        synchronized (decayingPrecisionLock) {
            this.decayingPrecision = new ExponentialDecayFunction(1, 0.1, cycleLengthInMilliseconds);
        }
        this.sample.clear();
        this.population.clear();
        this.sampledDataSet.clear();
        this.startTime = System.currentTimeMillis();
        logger.info("Monitoring is reset...");
    }

    public MonitoringCycle endMonitoringCycle() {
        MonitoringCycle monitoringCycle = new MonitoringCycle(getSample(), getPopulation(), getMonitoringCycleTime());
        logger.info("Adaptive Sampling Monitoring Cycle Finished: {}", monitoringCycle);
        startMonitoringCycle();
        return monitoringCycle;
    }

    private int baselineWindow;
    private Instant baselineWindowStart = Instant.now();

//    public void managePerformanceBaseline(PerformanceBaselineDataSet performanceMonitoring) {
//        if (performanceBaselineEnabled) { //is it already enabled?
//            if (Duration.between(baselineWindowStart, Instant.now()).getSeconds() > baselineWindow) {
////                logger.info("Collected performance baseline of {} traces", this.performanceBaselineDataSet.getTotalItems());
//                performanceBaselineEnabled = false;
//                this.performanceBaselineDataSet.clear();
//            }
//            return;
//        }
//
//        double chance = new BinomialDistribution(1, 0.1d).sample();
//        if (chance == 1) {
//            baselineWindow = 2;//RandomUtils.nextInt(2, 4);
//            baselineWindowStart = Instant.now();
//
//            logger.info("Enabling performance baseline that needs {} seconds of traces.", baselineWindow);
//            performanceBaselineEnabled = true;
//        }
//
//
//
////        double chance = new BinomialDistribution(1, 0.1d).sample();
////        if (chance == 1) {
////            baselineWindow = 2;//RandomUtils.nextInt(2, 4);
////            baselineWindowStart = Instant.now();
////
////            logger.info("Enabling performance baseline that needs {} seconds of traces.", baselineWindow);
////            performanceBaselineEnabled = true;
////        }
//    }

    public FrequencyDataSet getSample() {
        return sample;
    }

    public FrequencyDataSet getPopulation() {
        return population;
    }

    public Map<Granularity, DescriptiveStatistics> getSampledTraces() {
        return sampledDataSet;
    }

    public double getSamplingRate() {
        return samplingRate;
    }

    public boolean isAdaptiveSamplingRate() {
        return adaptiveSamplingRate;
    }

    SummaryStatistics inverselyStats = new SummaryStatistics();
    public double adaptSamplingRateInverselyProportional(int operationsPerSecond) {
        if (operationsPerSecond == 0)
            return samplingRate;

        inverselyStats.addValue(operationsPerSecond);
        double newSamplingRate =
                initialSamplingRate
                        * ((inverselyStats.getMax() - operationsPerSecond) / (inverselyStats.getMax() - inverselyStats.getMin()));
        logger.info("New sampling rate: {} -> {}", samplingRate, newSamplingRate);
        samplingRate = newSamplingRate;

        if (samplingRate > initialSamplingRate)
            samplingRate = initialSamplingRate;

        if (samplingRate < 0.01)
            samplingRate = 0.01;

        this.resetSamplingDistribution();
        return getSamplingRate();
    }
}