adaptive-monitoring-framework

fixing behavior

8/25/2021 10:31:49 AM

Details

diff --git a/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/monitoring/aspects/TigrisCoordinator.java b/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/monitoring/aspects/TigrisCoordinator.java
index 4e070d3..00e1355 100644
--- a/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/monitoring/aspects/TigrisCoordinator.java
+++ b/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/monitoring/aspects/TigrisCoordinator.java
@@ -205,7 +205,7 @@ public class TigrisCoordinator implements Runnable {
             try {
                 repository.save(logTrace);
                 logger.debug("New trace entry: " + logTrace);
-                sampling.addSampledItem(granularity, currentTimeMillis() - monitoringStartTime);
+                sampling.addSampledItem(granularity, currentTimeMillis() - monitoringStartTime, 0);
             } catch (Exception e) {
                 logger.debug("Couldn't trace " + signature + " due to: " + e.getMessage());
             }
diff --git a/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/sampling/Sampling.java b/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/sampling/Sampling.java
index a42fa22..1919118 100644
--- a/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/sampling/Sampling.java
+++ b/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/sampling/Sampling.java
@@ -1,7 +1,6 @@
 package br.ufrgs.inf.prosoft.tigris.sampling;
 
 import com.google.common.util.concurrent.AtomicDouble;
-import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.math3.distribution.BinomialDistribution;
 import org.apache.commons.math3.ml.neuralnet.sofm.util.ExponentialDecayFunction;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
@@ -35,7 +34,7 @@ public class Sampling {
     private long startTime;
     private ExponentialDecayFunction decayingPrecision;
     private FrequencyDataSet population = new FrequencyDataSet(), sample = new FrequencyDataSet();
-    private PerformanceBaselineDataSet performanceBaselineDataSet = new PerformanceBaselineDataSet();
+    Map<Granularity, DescriptiveStatistics> sampledMemoryDataSet = new ConcurrentHashMap<>();
     private Map<Granularity, DescriptiveStatistics> sampledDataSet = new ConcurrentHashMap<>();
 
     Logger logger = LoggerFactory.getLogger(Sampling.class);
@@ -177,12 +176,18 @@ public class Sampling {
 //        this.performanceBaselineDataSet.addItem(granularity, executionTime);
 //    }
 
-    public void addSampledItem(Granularity granularity, long executionTime) {
+    public void addSampledItem(Granularity granularity, long executionTime, long actualMemUsed) {
         sample.addItem(granularity, executionTime);
 
         DescriptiveStatistics statistics = sampledDataSet.getOrDefault(granularity, new DescriptiveStatistics());
         statistics.addValue(executionTime);
         sampledDataSet.put(granularity, statistics);
+
+        if (actualMemUsed > 0) {
+            DescriptiveStatistics memoryStatistics = sampledMemoryDataSet.getOrDefault(granularity, new DescriptiveStatistics());
+            memoryStatistics.addValue(actualMemUsed);
+            sampledMemoryDataSet.put(granularity, memoryStatistics);
+        }
     }
 
     public long getMonitoringCycleTime() {
@@ -194,6 +199,7 @@ public class Sampling {
     }
 
     public double decayingConfidenceFactor(long timeInMilliseconds) {
+        if (timeInMilliseconds == 0) return 0.999999999999;
         synchronized (decayingPrecisionLock) {
             return new BigDecimal(decayingPrecision.value(timeInMilliseconds))
                     .setScale(4, BigDecimal.ROUND_FLOOR).doubleValue();
@@ -270,6 +276,7 @@ public class Sampling {
         this.sample.clear();
         this.population.clear();
         this.sampledDataSet.clear();
+        this.sampledMemoryDataSet.clear();
         this.startTime = System.currentTimeMillis();
         logger.info("Monitoring is reset...");
     }
@@ -281,40 +288,8 @@ public class Sampling {
         return monitoringCycle;
     }
 
-    private int baselineWindow;
     private Instant baselineWindowStart = Instant.now();
 
-//    public void managePerformanceBaseline(PerformanceBaselineDataSet performanceMonitoring) {
-//        if (performanceBaselineEnabled) { //is it already enabled?
-//            if (Duration.between(baselineWindowStart, Instant.now()).getSeconds() > baselineWindow) {
-////                logger.info("Collected performance baseline of {} traces", this.performanceBaselineDataSet.getTotalItems());
-//                performanceBaselineEnabled = false;
-//                this.performanceBaselineDataSet.clear();
-//            }
-//            return;
-//        }
-//
-//        double chance = new BinomialDistribution(1, 0.1d).sample();
-//        if (chance == 1) {
-//            baselineWindow = 2;//RandomUtils.nextInt(2, 4);
-//            baselineWindowStart = Instant.now();
-//
-//            logger.info("Enabling performance baseline that needs {} seconds of traces.", baselineWindow);
-//            performanceBaselineEnabled = true;
-//        }
-//
-//
-//
-////        double chance = new BinomialDistribution(1, 0.1d).sample();
-////        if (chance == 1) {
-////            baselineWindow = 2;//RandomUtils.nextInt(2, 4);
-////            baselineWindowStart = Instant.now();
-////
-////            logger.info("Enabling performance baseline that needs {} seconds of traces.", baselineWindow);
-////            performanceBaselineEnabled = true;
-////        }
-//    }
-
     public FrequencyDataSet getSample() {
         return sample;
     }
@@ -327,6 +302,10 @@ public class Sampling {
         return sampledDataSet;
     }
 
+    public Map<Granularity, DescriptiveStatistics> getSampledMemoryTraces() {
+        return sampledMemoryDataSet;
+    }
+
     public double getSamplingRate() {
         return samplingRate;
     }
diff --git a/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/sampling/SamplingAspect.java b/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/sampling/SamplingAspect.java
index ee10160..1e1ccfc 100644
--- a/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/sampling/SamplingAspect.java
+++ b/tigris/src/main/java/br/ufrgs/inf/prosoft/tigris/sampling/SamplingAspect.java
@@ -1,8 +1,6 @@
 package br.ufrgs.inf.prosoft.tigris.sampling;
 
-import br.ufrgs.inf.prosoft.tigris.configuration.annotation.TigrisConfiguration;
-import br.ufrgs.inf.prosoft.tigris.exceptions.ConfigurationException;
-import br.ufrgs.inf.prosoft.tigris.utils.ConfigurationUtils;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
@@ -20,71 +18,59 @@ import java.util.Set;
 import static java.lang.System.nanoTime;
 
 @Aspect
-public class SamplingAspect implements Runnable {
+public class SamplingAspect {
 
     @Pointcut(
             //any execution except the own framework
             "(" +
                     "(" +
-                        "execution(* org.dacapo.h2.TPCCSubmitter.runTransaction(..)) || " + //h2
-                        "execution(* org.dacapo.lusearch.QueryProcessor.doPagingSearch(..)) || " + //lusearch
-                        "execution(* org.dacapo.xalan.XalanWorker.transform(..)) || " + //xalan
-
-                        //tradebeans:
-                        "(" +
-                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doHome(..)) || " +
-                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doPortfolio(..)) || " +
-                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doQuote(..)) || " +
-                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doBuy(..)) || " +
-                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doUpdate(..)) || " +
-                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doRegister(..)) || " +
-                        "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doSell(..))" +
-                        ") || " +
-                        //cassandra:
-                        "(" +
-                            "execution(* site.ycsb.workloads.CoreWorkload.doTransactionRead(..)) || " +
-                            "execution(* site.ycsb.workloads.CoreWorkload.doTransactionUpdate(..)) || " +
-                            "execution(* site.ycsb.workloads.CoreWorkload.doTransactionInsert(..)) || " +
-                            "execution(* site.ycsb.workloads.CoreWorkload.doTransactionScan(..)) || " +
-                            "execution(* site.ycsb.workloads.CoreWorkload.doTransactionReadModifyWrite(..))" +
-                        ")" +
+                    "execution(* org.dacapo.h2.TPCCSubmitter.runTransaction(..)) || " + //h2
+                    "execution(* org.dacapo.lusearch.QueryProcessor.doPagingSearch(..)) || " + //lusearch
+                    "execution(* org.dacapo.xalan.XalanWorker.transform(..)) || " + //xalan
+
+                    //tradebeans:
+                    "(" +
+                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doHome(..)) || " +
+                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doPortfolio(..)) || " +
+                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doQuote(..)) || " +
+                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doBuy(..)) || " +
+                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doUpdate(..)) || " +
+                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doRegister(..)) || " +
+                    "execution(* org.apache.geronimo.daytrader.javaee6.dacapo.DaCapoTrader.doSell(..))" +
+                    ") || " +
+                    //cassandra:
+                    "(" +
+                    "execution(* site.ycsb.workloads.CoreWorkload.doTransactionRead(..)) || " +
+                    "execution(* site.ycsb.workloads.CoreWorkload.doTransactionUpdate(..)) || " +
+                    "execution(* site.ycsb.workloads.CoreWorkload.doTransactionInsert(..)) || " +
+                    "execution(* site.ycsb.workloads.CoreWorkload.doTransactionScan(..)) || " +
+                    "execution(* site.ycsb.workloads.CoreWorkload.doTransactionReadModifyWrite(..))" +
+                    ")" +
                     ") " +
 
-//            "(execution(* *(..)) && !within(br.ufrgs.inf.prosoft..*) " +
-                    //avoid calls from repository while serializing objects, it is necessary if a hash could not be used
-                    //"&& !cflow(call(* br.ufrgs.inf.prosoft.tigris.monitoring.storage..*(..))) " +
-                    //conditional to enable and disable at runtime
                     "&& if())"
     )
     public static boolean anyCall() {
         return enabled;
     }
 
-    //tigris config
-    private final TigrisConfiguration tigrisConfiguration;
     private static Sampling sampling;
 
-    public SamplingAspect() throws ClassNotFoundException {
-        Class<?> configClass = ConfigurationUtils.getAvailableConfigurationClass(TigrisConfiguration.class);
-        if (configClass == null) {
-            //workaround for cassandra
-            configClass = Class.forName("site.ycsb.ApplicationInitializer");
-        }
+    public SamplingAspect() {
+        final String sampling_rate = System.getProperty("sampling_rate");
+        final String sampling_cycle_time = System.getProperty("sampling_cycle_time");
+        final String sampling_adaptive = System.getProperty("sampling_adaptive");
+        final String sampling_enabled = System.getProperty("sampling_enabled");
+        final String sampling_inversely = System.getProperty("sampling_inversely");
 
-        if (configClass == null) {
-            logger.info("Tigris tracing disabled, there is no annotations.");
-            throw new ConfigurationException("Tigris tracing disabled, there is no annotations.");
-        }
+        logger.info("Loaded properties. sampling_rate: {}, sampling_cycle_time: {}, sampling_adaptive: {}, sampling_enabled: {}, sampling_inversely: {}",
+                sampling_rate, sampling_cycle_time, sampling_adaptive, sampling_enabled, sampling_inversely);
 
-        tigrisConfiguration = configClass.getAnnotation(TigrisConfiguration.class);
+        samplingEnabled = Boolean.parseBoolean(sampling_enabled);
+        adaptSamplingRateInverselyProportionalOps = Boolean.parseBoolean(sampling_inversely);
 
-        logger.info("@TigrisConfiguration found.");
-
-        sampling = new Sampling(tigrisConfiguration.samplingPercentage(), tigrisConfiguration.cycleTimeInMilliseconds(), tigrisConfiguration.adaptiveSamplingRate());
-
-//        adaptiveSamplingExecutor.scheduleWithFixedDelay(
-//                this::run,
-//                5, 1, TimeUnit.SECONDS);
+        sampling = new Sampling(Double.parseDouble(sampling_rate), Long.parseLong(sampling_cycle_time),
+                Boolean.parseBoolean(sampling_adaptive));
     }
 
     static Logger logger = LoggerFactory.getLogger(SamplingAspect.class);
@@ -93,19 +79,17 @@ public class SamplingAspect implements Runnable {
 
     public static boolean samplingEnabled = true;
 
-//    private final ScheduledExecutorService adaptiveSamplingExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreads(
-//            "adaptive-sampling",
-//            "readiness evaluation, pba trigger and sampling adaptation"
-//    ));
-
     @Around("anyCall()")
     public Object aroundMethods(ProceedingJoinPoint joinPoint) throws Throwable {
         long startTime = nanoTime();
+        long beforeUsedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
         Object result = joinPoint.proceed();
         long endTime = nanoTime();
+        long afterUsedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
         long totalTime = endTime - startTime;
+        long actualMemUsed = afterUsedMem - beforeUsedMem;
 
-        String signature = joinPoint.getSignature().toString() + joinPoint.getArgs()[0].toString(); //TODO this is to distinguish traces in H2 or lusearch / also run with Xalan / cassandra / tradebeans?
+        String signature = joinPoint.getSignature().toString() + joinPoint.getArgs()[0].toString();
         Granularity granularity = new Granularity(GranularityType.METHOD, signature);
 
         if (samplingEnabled) {
@@ -117,7 +101,7 @@ public class SamplingAspect implements Runnable {
             boolean decision = sampling.samplingDecision(granularity, totalTime);
 
             if (decision)
-                sampling.addSampledItem(granularity, nanoTime() - startTime);
+                sampling.addSampledItem(granularity, nanoTime() - startTime, actualMemUsed);
         }
 
         monitoring.addMonitoringItem(granularity, totalTime);
@@ -126,21 +110,19 @@ public class SamplingAspect implements Runnable {
 
     public static boolean adaptSamplingRateInverselyProportionalOps = false;
 
-    public static void computeCycle (int sec, Set<Integer> markers) {
+    public static void computeCycle(int sec, Set<Integer> markers) {
         if (!sampling.isAdaptiveSamplingRate() && markers != null && markers.contains(sec)) {
-            if (samplingEnabled) { //uniform or inversely proportional
-                final MonitoringCycle monitoringCycle = sampling.endMonitoringCycle();
-                mseMap.put(sec, monitoringCycle.getAverageProcTimesSample());
-            } else { //no monitoring
-                OptionalDouble mean = monitoring.currentMonitoring.values().stream().mapToDouble(SummaryStatistics::getMean)
+            if (samplingEnabled) { //uniform, full or inversely proportional
+                final OptionalDouble mean = sampling.sampledMemoryDataSet.values().stream().mapToDouble(DescriptiveStatistics::getMean)
                         .average();
-                monitoring.currentMonitoring.clear();
+                sampling.sampledMemoryDataSet.clear();
+
                 mseMap.put(sec, mean.getAsDouble());
             }
         }
     }
 
-    public static void addOperationsPerSecondAndAdapt (int operationsPerSecond, int sec, Set<Integer> markers) {
+    public static void addOperationsPerSecondAndAdapt(int operationsPerSecond, int sec, Set<Integer> markers) {
         computeCycle(sec, markers);
 
         if (adaptSamplingRateInverselyProportionalOps) {
@@ -150,8 +132,6 @@ public class SamplingAspect implements Runnable {
 
         if (sampling.isAdaptiveSamplingRate()) {
             currentSamplingRate = sampling.getSamplingRate();
-            currentNormalBehavior = monitoring.baselinesPerSecondToNormal.getPercentile(50);
-            currentNormalMonitoring = monitoring.monitoringPerSecondToNormal.getPercentile(50);
 
             // response times are used only to increase/decrease the sampling rate
             // if keep the response times under the error margin (compared to the baseline times) -> increase
@@ -164,7 +144,9 @@ public class SamplingAspect implements Runnable {
 
             if (!sampling.isPerformanceBaselineEnabled() && sampling.isReady()) {
                 logger.info("Sample is ready, releasing for analysis and resetting...");
-                mseMap.put(sec, sampling.getSample().getMeanExecutionTime());
+                final OptionalDouble mean = sampling.sampledMemoryDataSet.values().stream().mapToDouble(DescriptiveStatistics::getMean)
+                        .average();
+                mseMap.put(sec, mean.getAsDouble());
                 cycleMarkers.add(sec);
                 cycle = sampling.endMonitoringCycle();
             }
@@ -177,45 +159,12 @@ public class SamplingAspect implements Runnable {
     public static PerformanceBaselineDataSet monitoring = new PerformanceBaselineDataSet();
 
     static double currentSamplingRate;
-    static double currentNormalMonitoring;
-    static double currentNormalBehavior;
     static MonitoringCycle cycle = new MonitoringCycle();
 
-    @Override
-    public void run() {
-//        if (adaptSamplingRateInverselyProportionalOps) {
-//            currentSamplingRate = sampling.adaptSamplingRateInverselyProportional(currentOps);
-//            return;
-//        }
-//
-//        if (adaptSamplingRateInverselyProportionalUsers) {
-//            currentSamplingRate = sampling.adaptSamplingRateInverselyProportional(currentUsers);
-//            return;
-//        }
-//
-//        if (SamplingAspect.enabled && sampling.isAdaptiveSamplingRate()) {
-//            sampling.managePerformanceBaseline();
-//            currentSamplingRate = sampling.getSamplingRate();
-//
-//            if (!sampling.isPerformanceBaselineEnabled() && sampling.isReady()) {
-//                logger.info("Sample is ready, releasing for analysis and resetting...");
-//                cycle = sampling.endMonitoringCycle();
-//            }
-//        }
-    }
-
     public static double getCurrentSamplingRate() {
         return currentSamplingRate;
     }
 
-    public static double getCurrentNormalMonitoring() {
-        return currentNormalMonitoring;
-    }
-
-    public static double getCurrentNormalBehavior() {
-        return currentNormalBehavior;
-    }
-
     public static MonitoringCycle getCycle() {
         return cycle;
     }
diff --git a/tigris/src/test/java/br/ufrgs/inf/prosoft/tigris/SamplingTest.java b/tigris/src/test/java/br/ufrgs/inf/prosoft/tigris/SamplingTest.java
index 0f9fb3a..c808797 100644
--- a/tigris/src/test/java/br/ufrgs/inf/prosoft/tigris/SamplingTest.java
+++ b/tigris/src/test/java/br/ufrgs/inf/prosoft/tigris/SamplingTest.java
@@ -3,9 +3,7 @@ package br.ufrgs.inf.prosoft.tigris;
 import br.ufrgs.inf.prosoft.tigris.sampling.Granularity;
 import br.ufrgs.inf.prosoft.tigris.sampling.GranularityType;
 import br.ufrgs.inf.prosoft.tigris.sampling.Sampling;
-import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.math3.ml.neuralnet.sofm.util.ExponentialDecayFunction;
-import org.apache.commons.math3.stat.inference.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -32,7 +30,7 @@ public class SamplingTest {
         Granularity granularity = new Granularity(GranularityType.METHOD, "function");
         for (int i = 0; i < 2000; i++) {
             Assert.assertTrue(sampling.samplingDecision(granularity, 0));
-            sampling.addSampledItem(granularity, 0);
+            sampling.addSampledItem(granularity, 0, 0);
         }
         Assert.assertEquals(2000, sampling.getSample().getTotalItems());
         Assert.assertEquals(2000, sampling.getPopulation().getTotalItems());
@@ -44,7 +42,7 @@ public class SamplingTest {
         Granularity granularity = new Granularity(GranularityType.METHOD, "function");
         for (int i = 0; i < 2000; i++) {
             Assert.assertTrue(sampling.samplingDecision(granularity, 0));
-            sampling.addSampledItem(granularity, 0);
+            sampling.addSampledItem(granularity, 0, 0);
         }
         Assert.assertTrue(sampling.isReady());
     }
@@ -55,17 +53,17 @@ public class SamplingTest {
         for (int i = 0; i < 1000; i++) {
             Granularity granularity = new Granularity(GranularityType.METHOD, "function");
             Assert.assertTrue(sampling.samplingDecision(granularity, i));
-            sampling.addSampledItem(granularity, 0);
+            sampling.addSampledItem(granularity, 0, 0);
         }
         for (int i = 0; i < 1000; i++) {
             Granularity granularity = new Granularity(GranularityType.METHOD, "function2");
             Assert.assertTrue(sampling.samplingDecision(granularity, i));
-            sampling.addSampledItem(granularity, 0);
+            sampling.addSampledItem(granularity, 0, 0);
         }
         for (int i = 0; i < 1000; i++) {
             Granularity granularity = new Granularity(GranularityType.METHOD, "function3");
             Assert.assertTrue(sampling.samplingDecision(granularity, i));
-            sampling.addSampledItem(granularity, 0);
+            sampling.addSampledItem(granularity, 0, 0);
         }
         Assert.assertTrue(sampling.isReady());
     }