Sampling.java

251 lines | 9.583 kB Blame History Raw Download
package br.ufrgs.inf.prosoft.tigris.sampling;

import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.math3.distribution.BinomialDistribution;
import org.apache.commons.math3.ml.neuralnet.sofm.util.ExponentialDecayFunction;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.inference.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;

/**
 * The type Sampling decision.
 */
public class Sampling implements Runnable {

    private final boolean adaptiveSamplingRate;
    private boolean samplingEnabled = true;
    private boolean performanceBaselineEnabled = false;
    private double samplingRate; // in percentage, 0 to 1
    private FrequencyDataSet population = new FrequencyDataSet(), sample = new FrequencyDataSet();
    private PerformanceBaselineDataSet performanceBaselineDataSet = new PerformanceBaselineDataSet();
    private Map<Granularity, DescriptiveStatistics> sampledDataSet = new ConcurrentHashMap<>();
    private Queue<PerformanceBaselineDataSet> lastFourPerformanceBaselineDataSets = new CircularFifoQueue<>(4);
    private ExponentialDecayFunction decayingPrecision;
    private long startTime;

    Logger logger = LoggerFactory.getLogger(Sampling.class);

    /**
     * z confidence value, ex: 1.96 for 95%
     * p proportion of the population, 0.5 is default
     * e margin of error, ex: 0.05 for 5%
     */
    private double z = 1.96, p = 0.5, e = 0.05;

    public Sampling(double initialSamplingRate, long cycleLengthInMilliseconds, boolean adaptiveSamplingRate) {
        samplingRate = initialSamplingRate;
        decayingPrecision = new ExponentialDecayFunction(1, 0.00001, cycleLengthInMilliseconds);
        this.adaptiveSamplingRate = adaptiveSamplingRate;
        startMonitoringCycle();
    }

    public boolean simpleSamplingDecision(){
        return new BinomialDistribution(1, samplingRate).sample() == 1; // sampling rate evaluation
    }

    public boolean samplingDecision(Granularity granularity) {
        population.addItem(granularity);

        if (performanceBaselineEnabled) {
            return false;
        }

        boolean simpleSamplingDecision = simpleSamplingDecision();
        boolean decision;
        if (adaptiveSamplingRate) {
            decision = samplingEnabled
                    && simpleSamplingDecision // sampling rate evaluation
                    && population.getProportion(granularity) >= sample.getProportion(granularity); // sample has not enough items of that granularity compared to the population
        } else {
            decision = simpleSamplingDecision;
        }

        if (decision)
            sample.addItem(granularity);

        return decision;
    }

    public void setSamplingRate(double samplingRate){
        this.samplingRate = samplingRate;
    }

    public boolean isReady() {
        double decayingConfidenceFactor = decayingConfidenceFactor(getMonitoringCycleTime());
        return
                // margin of error is lower than threshold
                getSampleSizeErrorMargin(z * decayingConfidenceFactor) < e
                // the sample has the min sample size based on the population
                && sample.getTotalItems() > getMinimumSampleSize(z * decayingConfidenceFactor)
                // proportion test
                && isSameProportion(decayingConfidenceFactor)
                // t-test
                && tTestEvaluation(decayingConfidenceFactor);
    }

    private double decayingConfidenceFactor(long timeInMilliseconds){
        return decayingPrecision.value(timeInMilliseconds);
    }

    private boolean tTestEvaluation(double decayingConfidenceFactor) {
        //To test the (one-sample t-test - compare with the population mean)
        // hypothesis sample mean = mu at the 95% level
        return TestUtils.tTest(population.getAsDescriptiveStatistics().getMean(),
                sample.getAsDescriptiveStatistics(),
                0.05 * decayingConfidenceFactor);
    }

    //sample proportion is the same as population
    public boolean isSameProportion(double decayingConfidenceFactor) {
        return population.getGranularities().stream().allMatch(
                granularity -> {
                    double popProportion = population.getProportion(granularity);
                    double samProportion = sample.getProportion(granularity);
                    double error = popProportion - (popProportion * decayingConfidenceFactor);

                    return samProportion <= popProportion + error &&
                            samProportion >= popProportion - error;
                });
    }

    /**
     * @return the minimum sample size for the population
     */
    public long getMinimumSampleSize() {
        return getMinimumSampleSize(population.getTotalItems());
    }

    public long getMinimumSampleSize(long n) {
        return getMinimumSampleSize(n, z);
    }

    public long getMinimumSampleSize(double precision) {
        return getMinimumSampleSize(population.getTotalItems(), precision);
    }

    public long getMinimumSampleSize(long n, double precision) {
        long n_inf = (long) ((Math.pow(precision, 2) * p * (1 - p)) / Math.pow(e, 2));
        return n_inf / (1 + ((n_inf - 1) / n));
    }

    public double getSampleSizeErrorMargin(double precision) {
        double e_n_inf = Math.sqrt((Math.pow(precision, 2) * p * (1 - p)) / sample.getTotalItems());
        return e_n_inf * Math.sqrt((population.getTotalItems() - sample.getTotalItems()) / (population.getTotalItems() - 1));
    }

    public long getMonitoringCycleTime(){
        return (System.currentTimeMillis() - startTime);
    }

    public void endMonitoringCycle() {
        this.sampledDataSet = new ConcurrentHashMap<>();
        logger.info("Adaptive Sampling Monitoring Cycle Finished - Sample traces ({}): {}", getSample().getTotalItems(),
                getSample().getTraceFrequency());
        logger.info("Adaptive Sampling Monitoring Cycle Finished - Population traces ({}): {}", getPopulation().getTotalItems(),
                getPopulation().getTraceFrequency());
        startMonitoringCycle();
    }

    private void startMonitoringCycle() {
        startTime = System.currentTimeMillis();
    }

    public boolean shouldCollectPerformanceBaseline() {
        return new BinomialDistribution(1, 0.1).sample() == 1;
    }

    public void adaptSamplingRate() {
        Apdex apdex = this.performanceBaselineDataSet.getApdexResults(this.sampledDataSet);
        double impact = 1 - ((apdex.getSatisfied() + 0.5 * apdex.getTolerated()) / apdex.getN());
        if (impact == 0) {
            logger.info("No monitoring impact detected, increasing the sampling rate...");
            //if no impact, increase by 1%
            samplingRate += 0.01;
        }
        if (impact > 0.1) {
            logger.info("Monitoring impact detected, decreasing the sampling rate by {}...", impact);
            //reduce by the amount of overhead
            samplingRate = samplingRate - impact;
        }

        //otherwise stays the same - not necessary here
        //if (impact > 0 && impact <= 0.1) {        }
    }

    @Override
    public void run() {
        //TODO this is supposed to run from time to time based on triggers? Or every new trace collected?

        //this method deals with sampling rate adaptation procedures
        if (!adaptiveSamplingRate)
            return;

        if (isReady()) {
            logger.info("Sample is ready, releasing for analysis and resetting...");
            endMonitoringCycle();
            return;
        }
        if (shouldCollectPerformanceBaseline()) {
            logger.info("Enabling performance baseline...");
            enablePerformanceBaseline();
            return;
        }
        adaptSamplingRate();
    }

    private void enablePerformanceBaseline() {
        performanceBaselineEnabled = true;
    }

    public boolean isPerformanceBaselineEnabled() {
        return performanceBaselineEnabled;
    }

    private Long minimumSampleSize;
    public void addPerformanceBaselineItem(Granularity granularity, long executionTime) {
        if (minimumSampleSize == null) {
            minimumSampleSize = getMinimumSampleSize(this.population.getTotalItems());
        }

        logger.info("Collecting performance baseline for the next {} traces...",
                minimumSampleSize - this.performanceBaselineDataSet.getTotalItems());
        this.performanceBaselineDataSet.addItem(granularity, executionTime);

        if(this.performanceBaselineDataSet.getTotalItems() > minimumSampleSize) {
            //got enough traces for PB
            logger.info("Finished to collect the performance baseline, enabling sampling again...");
            samplingEnabled = true;
            minimumSampleSize = null;
            this.performanceBaselineEnabled = false;
            lastFourPerformanceBaselineDataSets.add(this.performanceBaselineDataSet);
            this.performanceBaselineDataSet = new PerformanceBaselineDataSet();
        }
    }

    public void addSampledItem(Granularity granularity, long executionTime) {
        DescriptiveStatistics statistics = new DescriptiveStatistics();
        if (sampledDataSet.containsKey(granularity)){
            statistics = sampledDataSet.get(granularity);
        }
        statistics.addValue(executionTime);
        sampledDataSet.put(granularity, statistics);


        //TODO run it every new trace collected?
        run();
    }

    public FrequencyDataSet getSample() {
        return sample;
    }

    public FrequencyDataSet getPopulation() {
        return population;
    }
}