SamplingAspect.java

205 lines | 8.869 kB Blame History Raw Download
package br.ufrgs.inf.prosoft.tigris.sampling;

import br.ufrgs.inf.prosoft.tigris.configuration.annotation.TigrisConfiguration;
import br.ufrgs.inf.prosoft.tigris.exceptions.ConfigurationException;
import br.ufrgs.inf.prosoft.tigris.utils.ConfigurationUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.System.nanoTime;

@Aspect
public class SamplingAspect implements Runnable {

    @Pointcut(
            //any execution except the own framework
            "(" +
                    "(" +
                        "execution(* org.dacapo.h2.TPCCSubmitter.runTransaction(..)) || " + //h2
                        "execution(* org.dacapo.lusearch.QueryProcessor.doPagingSearch(..)) || " + //lusearch
                        "execution(* org.dacapo.xalan.XalanWorker.transform(..)) || " + //xalan

                        //tradebeans:
                        "(" +
                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doHome(..)) || " +
                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doPortfolio(..)) || " +
                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doQuote(..)) || " +
                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doBuy(..)) || " +
                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doUpdate(..)) || " +
                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doRegister(..)) || " +
                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doSell(..))" +
                        ") || " +
                        //cassandra:
                        "(" +
                            "execution(* site.ycsb.workloads.CoreWorkload.doTransactionRead(..)) || " +
                            "execution(* site.ycsb.workloads.CoreWorkload.doTransactionUpdate(..)) || " +
                            "execution(* site.ycsb.workloads.CoreWorkload.doTransactionInsert(..)) || " +
                            "execution(* site.ycsb.workloads.CoreWorkload.doTransactionScan(..)) || " +
                            "execution(* site.ycsb.workloads.CoreWorkload.doTransactionReadModifyWrite(..))" +
                        ")" +
                    ") " +

//            "(execution(* *(..)) && !within(br.ufrgs.inf.prosoft..*) " +
                    //avoid calls from repository while serializing objects, it is necessary if a hash could not be used
                    //"&& !cflow(call(* br.ufrgs.inf.prosoft.tigris.monitoring.storage..*(..))) " +
                    //conditional to enable and disable at runtime
                    "&& if())"
    )
    public static boolean anyCall() {
        return enabled;
    }

    //tigris config
    private final TigrisConfiguration tigrisConfiguration;
    private static Sampling sampling;

    public SamplingAspect() throws ClassNotFoundException {
        Class<?> configClass = ConfigurationUtils.getAvailableConfigurationClass(TigrisConfiguration.class);
        if (configClass == null) {
            //workaround for cassandra
            configClass = Class.forName("site.ycsb.ApplicationInitializer");
        }

        if (configClass == null) {
            logger.info("Tigris tracing disabled, there is no annotations.");
            throw new ConfigurationException("Tigris tracing disabled, there is no annotations.");
        }

        tigrisConfiguration = configClass.getAnnotation(TigrisConfiguration.class);

        logger.info("@TigrisConfiguration found.");

        sampling = new Sampling(tigrisConfiguration.samplingPercentage(), tigrisConfiguration.cycleTimeInMilliseconds(), tigrisConfiguration.adaptiveSamplingRate());

//        adaptiveSamplingExecutor.scheduleWithFixedDelay(
//                this::run,
//                5, 1, TimeUnit.SECONDS);
    }

    static Logger logger = LoggerFactory.getLogger(SamplingAspect.class);

    public static boolean enabled = false;

    public static boolean samplingEnabled = true;

//    private final ScheduledExecutorService adaptiveSamplingExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreads(
//            "adaptive-sampling",
//            "readiness evaluation, pba trigger and sampling adaptation"
//    ));

    @Around("anyCall()")
    public Object aroundMethods(ProceedingJoinPoint joinPoint) throws Throwable {
//        String signature = joinPoint.getSignature().toString();
//        System.err.println(signature);

        long startTime = nanoTime();
        Object result = joinPoint.proceed();
        long endTime = nanoTime();
        long totalTime = endTime - startTime;

        String signature = joinPoint.getSignature().toString() + joinPoint.getArgs()[0].toString(); //TODO this is to distinguish traces in H2 or lusearch / also run with Xalan / cassandra / tradebeans?
        Granularity granularity = new Granularity(GranularityType.METHOD, signature);

        if (samplingEnabled) {
//            String signature = joinPoint.getSignature().toString() + joinPoint.getArgs()[0].toString(); //TODO this is to distinguish traces in H2 or lusearch / also run with Xalan / cassandra / tradebeans?
//            Granularity granularity = new Granularity(GranularityType.METHOD, signature);
            if (sampling.isPerformanceBaselineEnabled()) {
                monitoring.addPerformanceBaselineItem(granularity, totalTime);
                return result;
            }

            boolean decision = sampling.samplingDecision(granularity, totalTime);

            if (decision)
                sampling.addSampledItem(granularity, nanoTime() - startTime);
        }

        monitoring.addMonitoringItem(granularity, totalTime);
        return result;
    }


    public static boolean adaptSamplingRateInverselyProportionalOps = false;
    public static void addOperationsPerSecondAndAdapt (int operationsPerSecond) {

        if (adaptSamplingRateInverselyProportionalOps) {
            currentSamplingRate = sampling.adaptSamplingRateInverselyProportional(operationsPerSecond);
            return;
        }

        if (SamplingAspect.enabled && sampling.isAdaptiveSamplingRate()) {
            currentSamplingRate = sampling.getSamplingRate();
            currentNormalBehavior = monitoring.baselinesPerSecondToNormal.getPercentile(50);
            currentNormalMonitoring = monitoring.monitoringPerSecondToNormal.getPercentile(50);

            // response times are used only to increase/decrease the sampling rate
            // if keep the response times under the error margin (compared to the baseline times) -> increase
            // otherwise decrease it

            //adapt the sampling rate based on heuristic every new second
            sampling.adaptSamplingRate(monitoring,
                    operationsPerSecond,
                    currentSamplingRate);

            if (!sampling.isPerformanceBaselineEnabled() && sampling.isReady()) {
                logger.info("Sample is ready, releasing for analysis and resetting...");
                cycle = sampling.endMonitoringCycle();
            }
        }
    }

    public static PerformanceBaselineDataSet monitoring = new PerformanceBaselineDataSet();


    static double currentSamplingRate;
    static double currentNormalMonitoring;
    static double currentNormalBehavior;
    static MonitoringCycle cycle = new MonitoringCycle();

    @Override
    public void run() {
//        if (adaptSamplingRateInverselyProportionalOps) {
//            currentSamplingRate = sampling.adaptSamplingRateInverselyProportional(currentOps);
//            return;
//        }
//
//        if (adaptSamplingRateInverselyProportionalUsers) {
//            currentSamplingRate = sampling.adaptSamplingRateInverselyProportional(currentUsers);
//            return;
//        }
//
//        if (SamplingAspect.enabled && sampling.isAdaptiveSamplingRate()) {
//            sampling.managePerformanceBaseline();
//            currentSamplingRate = sampling.getSamplingRate();
//
//            if (!sampling.isPerformanceBaselineEnabled() && sampling.isReady()) {
//                logger.info("Sample is ready, releasing for analysis and resetting...");
//                cycle = sampling.endMonitoringCycle();
//            }
//        }
    }

    public static double getCurrentSamplingRate() {
        return currentSamplingRate;
    }

    public static double getCurrentNormalMonitoring() {
        return currentNormalMonitoring;
    }

    public static double getCurrentNormalBehavior() {
        return currentNormalBehavior;
    }

    public static MonitoringCycle getCycle() {
        return cycle;
    }

    public static MonitoringCycle endMonitoringCycle() {
        return sampling.endMonitoringCycle();
    }
}