SamplingAspect.java

175 lines | 7.555 kB Blame History Raw Download
package br.ufrgs.inf.prosoft.tigris.sampling;

import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Set;

import static java.lang.System.nanoTime;

@Aspect
public class SamplingAspect {

    @Pointcut(
            //any execution except the own framework
            "(" +
                    "(" +
                    "execution(* org.dacapo.h2.TPCCSubmitter.runTransaction(..)) || " + //h2
                    "execution(* org.dacapo.lusearch.QueryProcessor.doPagingSearch(..)) || " + //lusearch
                    "execution(* org.dacapo.xalan.XalanWorker.transform(..)) || " + //xalan

                    //tradebeans:
                    "(" +
                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doHome(..)) || " +
                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doPortfolio(..)) || " +
                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doQuote(..)) || " +
                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doBuy(..)) || " +
                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doUpdate(..)) || " +
                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doRegister(..)) || " +
                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doSell(..))" +
                    ") || " +
                    //cassandra:
                    "(" +
                    "execution(* site.ycsb.workloads.CoreWorkload.doTransactionRead(..)) || " +
                    "execution(* site.ycsb.workloads.CoreWorkload.doTransactionUpdate(..)) || " +
                    "execution(* site.ycsb.workloads.CoreWorkload.doTransactionInsert(..)) || " +
                    "execution(* site.ycsb.workloads.CoreWorkload.doTransactionScan(..)) || " +
                    "execution(* site.ycsb.workloads.CoreWorkload.doTransactionReadModifyWrite(..))" +
                    ")" +
                    ") " +

                    "&& if())"
    )
    public static boolean anyCall() {
        return enabled;
    }

    private static Sampling sampling;

    public SamplingAspect() {
        final String sampling_rate = System.getProperty("sampling_rate");
        final String sampling_cycle_time = System.getProperty("sampling_cycle_time");
        final String sampling_adaptive = System.getProperty("sampling_adaptive");
        final String sampling_enabled = System.getProperty("sampling_enabled");
        final String sampling_inversely = System.getProperty("sampling_inversely");

        logger.info("Loaded properties. sampling_rate: {}, sampling_cycle_time: {}, sampling_adaptive: {}, sampling_enabled: {}, sampling_inversely: {}",
                sampling_rate, sampling_cycle_time, sampling_adaptive, sampling_enabled, sampling_inversely);

        samplingEnabled = Boolean.parseBoolean(sampling_enabled);
        adaptSamplingRateInverselyProportionalOps = Boolean.parseBoolean(sampling_inversely);

        sampling = new Sampling(Double.parseDouble(sampling_rate), Long.parseLong(sampling_cycle_time),
                Boolean.parseBoolean(sampling_adaptive));
    }

    static Logger logger = LoggerFactory.getLogger(SamplingAspect.class);

    public static boolean enabled = true;

    public static boolean samplingEnabled = true;

    @Around("anyCall()")
    public Object aroundMethods(ProceedingJoinPoint joinPoint) throws Throwable {
        long startTime = nanoTime();
        long beforeUsedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
        Object result = joinPoint.proceed();
        long endTime = nanoTime();
        long afterUsedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
        long totalTime = endTime - startTime;
        long actualMemUsed = afterUsedMem - beforeUsedMem;

        String signature = joinPoint.getSignature().toString() + joinPoint.getArgs()[0].toString();
        Granularity granularity = new Granularity(GranularityType.METHOD, signature);

        if (samplingEnabled) {
            if (sampling.isPerformanceBaselineEnabled()) {
                monitoring.addPerformanceBaselineItem(granularity, totalTime);
                return result;
            }

            boolean decision = sampling.samplingDecision(granularity, totalTime);

            if (decision)
                sampling.addSampledItem(granularity, nanoTime() - startTime, actualMemUsed);
        }

        monitoring.addMonitoringItem(granularity, totalTime);
        return result;
    }

    public static boolean adaptSamplingRateInverselyProportionalOps = false;

    public static void computeCycle(int sec, Set<Integer> markers) {
        if (!sampling.isAdaptiveSamplingRate() && markers != null && markers.contains(sec)) {
            if (samplingEnabled) { //uniform, full or inversely proportional
                final OptionalDouble mean = sampling.sampledMemoryDataSet.values().stream().mapToDouble(DescriptiveStatistics::getMean)
                        .average();
                sampling.sampledMemoryDataSet.clear();

                mseMap.put(sec, mean.getAsDouble());
            }
        }
    }

    public static void addOperationsPerSecondAndAdapt(int operationsPerSecond, int sec, Set<Integer> markers) {
        computeCycle(sec, markers);

        if (adaptSamplingRateInverselyProportionalOps) {
            currentSamplingRate = sampling.adaptSamplingRateInverselyProportional(operationsPerSecond);
            return;
        }

        if (sampling.isAdaptiveSamplingRate()) {
            currentSamplingRate = sampling.getSamplingRate();

            // response times are used only to increase/decrease the sampling rate
            // if keep the response times under the error margin (compared to the baseline times) -> increase
            // otherwise decrease it

            //adapt the sampling rate based on heuristic every new second
            sampling.adaptSamplingRate(monitoring,
                    operationsPerSecond,
                    currentSamplingRate);

            if (!sampling.isPerformanceBaselineEnabled() && sampling.isReady()) {
                logger.info("Sample is ready, releasing for analysis and resetting...");
                final OptionalDouble mean = sampling.sampledMemoryDataSet.values().stream().mapToDouble(DescriptiveStatistics::getMean)
                        .average();
                mseMap.put(sec, mean.getAsDouble());
                cycleMarkers.add(sec);
                cycle = sampling.endMonitoringCycle();
            }
        }
    }

    public static Map<Integer, Double> mseMap = new HashMap<>();
    public static Set<Integer> cycleMarkers = new HashSet<>();

    public static PerformanceBaselineDataSet monitoring = new PerformanceBaselineDataSet();

    static double currentSamplingRate;
    static MonitoringCycle cycle = new MonitoringCycle();

    public static double getCurrentSamplingRate() {
        return currentSamplingRate;
    }

    public static MonitoringCycle getCycle() {
        return cycle;
    }

    public static MonitoringCycle endMonitoringCycle() {
        return sampling.endMonitoringCycle();
    }
}