TigrisCoordinator.java

267 lines | 11.267 kB Blame History Raw Download
package br.ufrgs.inf.prosoft.tigris.monitoring.aspects;

import br.ufrgs.inf.prosoft.tigris.configuration.annotation.ComponentScan;
import br.ufrgs.inf.prosoft.tigris.configuration.annotation.TigrisConfiguration;
import br.ufrgs.inf.prosoft.tigris.configuration.annotation.TigrisCriteria;
import br.ufrgs.inf.prosoft.tigris.exceptions.ConfigurationException;
import br.ufrgs.inf.prosoft.tigris.metrics.DataFiltering;
import br.ufrgs.inf.prosoft.tigris.metrics.LightweightMetrics;
import br.ufrgs.inf.prosoft.tigris.metrics.StaticMetrics;
import br.ufrgs.inf.prosoft.tigris.monitoring.metadata.Key;
import br.ufrgs.inf.prosoft.tigris.monitoring.metadata.LogTrace;
import br.ufrgs.inf.prosoft.tigris.monitoring.metadata.MethodInfo;
import br.ufrgs.inf.prosoft.tigris.monitoring.storage.Repository;
import br.ufrgs.inf.prosoft.tigris.monitoring.storage.RepositoryFactory;
import br.ufrgs.inf.prosoft.tigris.monitoring.util.threads.NamedThreads;
import br.ufrgs.inf.prosoft.tigris.sampling.Granularity;
import br.ufrgs.inf.prosoft.tigris.sampling.Sampling;
import br.ufrgs.inf.prosoft.tigris.utils.ConfigurationUtils;
import ca.uqac.lif.bullwinkle.BnfParser;
import ca.uqac.lif.bullwinkle.ParseNode;
import com.google.common.io.Resources;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static java.lang.System.currentTimeMillis;

public class TigrisCoordinator implements Runnable {
    private static boolean enabled = true;
    private boolean coarseMonitoringEnabled = true;

    Logger logger = LoggerFactory.getLogger(TigrisCoordinator.class);

    //traceable configuration
    private Pattern allowedPattern;
    private Pattern deniedPattern;

    //tigris config
    private final TigrisConfiguration tigrisConfiguration;
    private final TigrisCriteria tigrisCriteria;
    private final Sampling sampling;
    private final Repository repository;
    private Class<?> customMonitoringClass;
    private CustomMonitoring customMonitoring;
    private Map<String, LightweightMetrics> metrics;
    private StaticMetrics staticMetrics;

    private final BnfParser parser = new BnfParser(Resources.getResource("tigrisdsl.bnf").openStream());

    private final ScheduledExecutorService samplingAdaptationExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreads(
            "sampling-adaptation",
            "adapting sampling rate and sample readiness"
    ));

    private final ScheduledExecutorService lightweightAnalyzerExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreads(
            "lightweight-analyzer",
            "computing lightweight metrics and setting allowed methods"
    ));

    public TigrisCoordinator() throws IOException, BnfParser.InvalidGrammarException {
        Class<?> configClass = ConfigurationUtils.getAvailableConfigurationClass(TigrisConfiguration.class);
        if (configClass == null) {
            turnoff();
            logger.info("Tigris tracing disabled, there is no annotations.");
            throw new ConfigurationException("Tigris tracing disabled, there is no annotations.");
        }

        tigrisConfiguration = configClass.getAnnotation(TigrisConfiguration.class);

        //getting criteria
        tigrisCriteria = configClass.getAnnotation(TigrisCriteria.class);

        //getting allowed packages from @ComponentScan
        ComponentScan componentScanConfig = configClass.getAnnotation(ComponentScan.class);
        if (componentScanConfig == null) {
            //if not specified, it assumes the same package where @AdaptiveCaching were declared
            allowedPattern = Pattern.compile(configClass.getPackage().getName() + ".*");
            logger.info("ComponentScan for TigrisConfiguration not found, assuming the same package where @TigrisConfiguration were declared.");
        }
        allowedPattern = Pattern.compile(componentScanConfig.allowed());
        deniedPattern = Pattern.compile(componentScanConfig.denied());
        logger.info("@TigrisConfiguration will trace and cache methods into {} and deny {} package.", allowedPattern.pattern(), deniedPattern.pattern());

        sampling = new Sampling(tigrisConfiguration.samplingPercentage(), tigrisConfiguration.cycleTimeInMilliseconds(), tigrisConfiguration.adaptiveSamplingRate());
        //TODO when to run it?
        samplingAdaptationExecutor.scheduleWithFixedDelay(
                sampling, 120000, 450000, TimeUnit.MILLISECONDS);

        repository = RepositoryFactory.getRepository(null, tigrisConfiguration.logRepository());

        if (tigrisConfiguration.clearMonitoringDataOnStart()) {
            repository.removeAll();
            logger.debug("Repository starting cleaned.");
        }

        if (coarseMonitoringEnabled) {
            metrics = new ConcurrentHashMap<>();
            staticMetrics = new StaticMetrics(tigrisConfiguration.staticMetricFile());
            lightweightAnalyzerExecutor.scheduleWithFixedDelay(
                    this::run,
                    120000, 450000, TimeUnit.MILLISECONDS);
        }

        try {
            ParseNode node = parser.parse(tigrisCriteria.criteria());

            //recursive mapping between DSL and values
            assert node.getChildren() != null;

        } catch (BnfParser.ParseException ex) {
            ex.printStackTrace();
            throw new RuntimeException(ex);
        }

        try {
            customMonitoringClass = ConfigurationUtils.getAvailableCustomMonitoringClass();
            if (customMonitoringClass != null) {
                customMonitoring = (CustomMonitoring) customMonitoringClass.newInstance();
                customMonitoring.initialize(repository);
            }
        } catch (ConfigurationException e) {
            logger.info("AdaptiveCaching not found, disabling...");
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        }
    }

    public boolean isAllowed(ProceedingJoinPoint joinPoint) {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        return
            isEnabled()
            && !signature.getName().contains("br.ufrgs.inf.prosoft")
            && allowedPattern.matcher(signature.getMethod().getDeclaringClass().getPackage().getName()).matches()
            && !deniedPattern.matcher(signature.getMethod().getDeclaringClass().getPackage().getName()).matches();
    }

    public Object process(ProceedingJoinPoint joinPoint) throws Throwable {
        long monitoringStartTime = currentTimeMillis();
        String signature = joinPoint.getSignature().toString();
        boolean detailedTrace = (!coarseMonitoringEnabled ||
                (coarseMonitoringEnabled && allowedFineGrained
                        .contains(signature)));
        logger.debug("Evaluating {}. Allowed events {}", signature, allowedFineGrained);

        //generate a hash of the method that will be used as: key to cache and compare if the method is allowed or not
        Key key = new Key(joinPoint);

        boolean customProcess = customMonitoring != null &&
                customMonitoring.beforeExecuting(key);
        if (customProcess) {
            Object returnResult = customMonitoring.returnResult(key);
            if (returnResult != null)
                return returnResult;
        }

        Object[] joinPointArgs = null;
        //method calls can change the args, so it is better to get it at the beginning
        if (detailedTrace)
            joinPointArgs = joinPoint.getArgs();

        long startTime = currentTimeMillis();
        Object result = joinPoint.proceed();
        long endTime = currentTimeMillis();

        if (coarseMonitoringEnabled) {
            LightweightMetrics metric = metrics.get(signature);
            if (metric == null) {
                metric = new LightweightMetrics(signature);
                metric.setStaticMetric(staticMetrics.getMetrics(metric.getName()));
            }
            metric.addTime(startTime, endTime - startTime);
            metric.addReturnSize(result);
            metric.incOccurrence();
            metrics.put(signature, metric);
        }

        Granularity granularity = new Granularity(tigrisCriteria.granularity(), signature);

        if (tigrisConfiguration.adaptiveSamplingRate() && sampling.isPerformanceBaselineEnabled()) {
            sampling.addPerformanceBaselineItem(granularity, endTime - startTime);
        }

        //trace only allowed by lightweight metrics
//        boolean shouldSample = sampling.simpleSamplingDecision();
        boolean shouldSample = sampling.samplingDecision(granularity);
        if (coarseMonitoringEnabled
                && detailedTrace
                && shouldSample) {
            logger.debug("New trace: " + signature);

            //we do trace null returns - maybe the method can sometimes return null... so there is not verification here
            LogTrace logTrace = new LogTrace();
            logTrace.setStartTime(startTime);
            logTrace.setEndTime(endTime);

            MethodInfo methodInfo = new MethodInfo(signature, joinPointArgs, result, key);
            logTrace.setMethodInfo(methodInfo);

            try {
                repository.save(logTrace);
                logger.debug("New trace entry: " + logTrace);
                sampling.addSampledItem(granularity, currentTimeMillis() - monitoringStartTime);
            } catch (Exception e) {
                logger.debug("Couldn't trace " + signature + " due to: " + e.getMessage());
            }
        }

        if (detailedTrace && customProcess && result != null) {
            customMonitoring.afterExecuting(key, result);
        }

        return result;
    }

    public void turnoff() {
        enabled = false;
    }

    public static boolean isEnabled() {
        return enabled;
    }

    public Repository getRepository() {
        return repository;
    }

    /**
     * The constant allowedFineGrained.
     */
    public static Set<String> allowedFineGrained = new ConcurrentSkipListSet<>();

    @Override
    public void run() {
        if (coarseMonitoringEnabled) {
            logger.info("Analyzing {} lightweight methods for relevance...", metrics.size());

            //cleaning selected methods and recalculating relevance
            allowedFineGrained = DataFiltering.calculate(staticMetrics, metrics);
            metrics.clear();
            repository.removeAll();

            logger.info("Selected methods for fine-grained ({}): {}", allowedFineGrained.size(), allowedFineGrained);

            //analyze once
            coarseMonitoringEnabled = false;
        }
        if (enabled) {
            logger.info("Adaptive Sampling Status - Sample traces ({}): {}", sampling.getSample().getTotalItems(),
                    sampling.getSample().getTraceFrequency());
            logger.info("Adaptive Sampling Status - Population traces ({}): {}", sampling.getPopulation().getTotalItems(),
                    sampling.getPopulation().getTraceFrequency());
        }
    }
}