killbill-aplcache

usage: initial import of DecimatingSampleFilter Signed-off-by:

7/28/2012 6:16:30 PM

Details

diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/filter/DecimatingSampleFilter.java b/usage/src/main/java/com/ning/billing/usage/timeline/filter/DecimatingSampleFilter.java
new file mode 100644
index 0000000..ccf26a2
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/filter/DecimatingSampleFilter.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.filter;
+
+import org.joda.time.DateTime;
+import org.skife.config.TimeSpan;
+
+import com.ning.billing.usage.timeline.codec.TimeRangeSampleProcessor;
+import com.ning.billing.usage.timeline.consumer.SampleConsumer;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+
+/**
+ * This SampleProcessor interpolates a stream of sample values to such that the
+ * number of outputs sent to the SampleConsumer is outputCount, which is less
+ * than sampleCount. It works by keeping a history of scanned samples
+ * representing at least one output sample, and makes a choice of what value to
+ * output from those scanned samples:
+ * <p/>
+ * The rules for sample generation are:
+ * <ul>
+ * <li>No averaging - - the sample returned is _always_ one of the scanned
+ * samples</li>
+ * <li>The output sample is always either the largest or the smallest of the
+ * scanned sample values</li>
+ * <li>Whether it is the largest or smallest depends on the "trend" of the
+ * samples:
+ * <ul>
+ * <li>If they are generally high-to-low then we output the low value.</li>
+ * <li>If they are generally low-to-high then we output the high value.</li>
+ * </ul>
+ * </ul>
+ * <p/>
+ * The rationale for these rules is the most interesting information is the
+ * peaks and valleys of measurements, and averaging is bad because it destroys
+ * peaks and valleys. A consequence of these rules is that quantities that
+ * bounce around a lot will generate graphs that are a solid band between the
+ * min and max values. But that's really an accurate reflection of the state. To
+ * get more information, you have to look at shorter time intervals. The class
+ * tries hard to make good choices amount
+ * <p/>
+ * Of course this sort of crude averaging isn't perfect, but at least it doesn't
+ * destroy peaks and valleys.
+ * <p/>
+ * TODO: Figure out if the time passed to SampleConsumer should be the time
+ * of the sample or the midpoint of the times between first and last sample.
+ */
+public class DecimatingSampleFilter extends TimeRangeSampleProcessor {
+
+    private final int outputCount;
+    private final SampleConsumer sampleConsumer;
+    private final TimeSpan pollingInterval;
+    private final DecimationMode decimationMode;
+    private double samplesPerOutput;
+    private double outputsPerSample;
+    private int ceilSamplesPerOutput;
+    private SampleState[] filterHistory;
+    private boolean initialized = false;
+
+    private double runningSum = 0.0;
+    private int sampleNumber = 0;
+
+    /**
+     * Build a DecimatingSampleFilter on which you call processSamples()
+     *
+     * @param startTime       The start time we're considering values, or null, meaning all time
+     * @param endTime         The end time we're considering values, or null, meaning all time
+     * @param outputCount     The number of samples to generate
+     * @param sampleCount     The number of samples to be scanned.  sampleCount must be >= outputCount
+     * @param pollingInterval The polling interval, used to compute sample counts assuming no gaps
+     * @param decimationMode  The decimation mode determines how samples will be combined to crate an output point.
+     * @param sampleConsumer  The implementor of the SampleConsumer interface
+     */
+    public DecimatingSampleFilter(final DateTime startTime, final DateTime endTime, final int outputCount, final int sampleCount,
+                                  final TimeSpan pollingInterval, final DecimationMode decimationMode, final SampleConsumer sampleConsumer) {
+        super(startTime, endTime);
+        if (outputCount <= 0 || sampleCount <= 0 || outputCount > sampleCount) {
+            throw new IllegalArgumentException(String.format("In DecimatingSampleFilter, outputCount is %d but sampleCount is %d", outputCount, sampleCount));
+        }
+        this.outputCount = outputCount;
+        this.pollingInterval = pollingInterval;
+        this.decimationMode = decimationMode;
+        this.sampleConsumer = sampleConsumer;
+        initializeFilterHistory(sampleCount);
+    }
+
+    /**
+     * This form of the constructor delays initialization til we get the first sample
+     *
+     * @param startTime       The start time we're considering values, or null, meaning all time
+     * @param endTime         The end time we're considering values, or null, meaning all time
+     * @param outputCount     The number of samples to generate
+     * @param pollingInterval The polling interval, used to compute sample counts assuming no gaps
+     * @param decimationMode  The decimation mode determines how samples will be combined to crate an output point.
+     * @param sampleConsumer  The implementor of the SampleConsumer interface
+     */
+    public DecimatingSampleFilter(final DateTime startTime, final DateTime endTime, final int outputCount, final TimeSpan pollingInterval,
+                                  final DecimationMode decimationMode, final SampleConsumer sampleConsumer) {
+        super(startTime, endTime);
+        this.outputCount = outputCount;
+        this.pollingInterval = pollingInterval;
+        this.decimationMode = decimationMode;
+        this.sampleConsumer = sampleConsumer;
+    }
+
+    private void initializeFilterHistory(final int sampleCount) {
+        if (outputCount <= 0 || sampleCount <= 0 || outputCount > sampleCount) {
+            throw new IllegalArgumentException(String.format("In DecimatingSampleFilter.initialize(), outputCount is %d but sampleCount is %d", outputCount, sampleCount));
+        }
+        this.samplesPerOutput = (double) sampleCount / (double) outputCount;
+        this.outputsPerSample = 1.0 / this.samplesPerOutput;
+        ceilSamplesPerOutput = (int) Math.ceil(samplesPerOutput);
+        filterHistory = new SampleState[ceilSamplesPerOutput];
+        initialized = true;
+    }
+
+    @Override
+    public void processOneSample(final DateTime time, final SampleOpcode opcode, final Object value) {
+        if (!initialized) {
+            // Estimate the sampleCount, assuming that there are no gaps
+            final long adjustedEndMillis = Math.min(getEndTime().getMillis(), System.currentTimeMillis());
+            final long millisTilEnd = adjustedEndMillis - time.getMillis();
+            final int sampleCount = Math.max(outputCount, (int) (millisTilEnd / pollingInterval.getMillis()));
+            initializeFilterHistory(sampleCount);
+        }
+        sampleNumber++;
+        final SampleState sampleState = new SampleState(opcode, value, ScalarSample.getDoubleValue(opcode, value), time);
+        final int historyIndex = sampleNumber % filterHistory.length;
+        filterHistory[historyIndex] = sampleState;
+        runningSum += outputsPerSample;
+        if (runningSum >= 1.0) {
+            runningSum -= 1.0;
+            if (opcode == SampleOpcode.STRING) {
+                // We don't have interpolation, so just output
+                // this one
+                sampleConsumer.consumeSample(sampleNumber, opcode, value, time);
+            } else {
+                // Time to output a sample - compare the sum of the first samples with the
+                // sum of the last samples making up the output, choosing the lowest value if
+                // if the first samples are larger, and the highest value if the last samples
+                // are larger
+                final int samplesInAverage = ceilSamplesPerOutput > 5 ? ceilSamplesPerOutput * 2 / 3 : Math.max(1, ceilSamplesPerOutput - 1);
+                final int samplesLeftOut = ceilSamplesPerOutput - samplesInAverage;
+                double max = Double.MIN_VALUE;
+                int maxIndex = 0;
+                int minIndex = 0;
+                double min = Double.MAX_VALUE;
+                double sum = 0.0;
+                double firstSum = 0.0;
+                double lastSum = 0.0;
+                for (int i = 0; i < ceilSamplesPerOutput; i++) {
+                    final int index = (sampleNumber + ceilSamplesPerOutput - i) % ceilSamplesPerOutput;
+                    final SampleState sample = filterHistory[index];
+                    if (sample != null) {
+                        final double doubleValue = sample.getDoubleValue();
+                        sum += doubleValue;
+                        if (doubleValue > max) {
+                            max = doubleValue;
+                            maxIndex = index;
+                        }
+                        if (doubleValue < min) {
+                            min = doubleValue;
+                            minIndex = index;
+                        }
+                        if (i < samplesInAverage) {
+                            lastSum += doubleValue;
+                        }
+                        if (i >= samplesLeftOut) {
+                            firstSum += doubleValue;
+                        }
+                    }
+                }
+                final SampleState firstSample = filterHistory[(sampleNumber + ceilSamplesPerOutput - (ceilSamplesPerOutput - 1)) % ceilSamplesPerOutput];
+                final SampleState lastSample = filterHistory[sampleNumber % ceilSamplesPerOutput];
+                final DateTime centerTime = firstSample != null ? new DateTime((firstSample.getTime().getMillis() + lastSample.getTime().getMillis()) >> 1) : lastSample.getTime();
+                switch (decimationMode) {
+                    case PEAK_PICK:
+                        if (firstSum > lastSum) {
+                            // The sample window is generally down with time - - pick the minimum
+                            final SampleState minSample = filterHistory[minIndex];
+                            sampleConsumer.consumeSample(sampleNumber, minSample.getSampleOpcode(), minSample.getValue(), centerTime);
+                        } else {
+                            // The sample window is generally up with time - - pick the maximum
+                            final SampleState maxSample = filterHistory[maxIndex];
+                            sampleConsumer.consumeSample(sampleNumber, maxSample.getSampleOpcode(), maxSample.getValue(), centerTime);
+                        }
+                        break;
+                    case AVERAGE:
+                        final double average = sum / ceilSamplesPerOutput;
+                        sampleConsumer.consumeSample(minIndex, SampleOpcode.DOUBLE, average, centerTime);
+                        break;
+                    default:
+                        throw new IllegalStateException(String.format("The decimation filter mode %s is not recognized", decimationMode));
+                }
+            }
+        }
+    }
+
+    public SampleConsumer getSampleConsumer() {
+        return sampleConsumer;
+    }
+
+    private static class SampleState {
+
+        private final SampleOpcode sampleOpcode;
+        private final Object value;
+        private final double doubleValue;
+        private final DateTime time;
+
+        public SampleState(final SampleOpcode sampleOpcode, final Object value, final double doubleValue, final DateTime time) {
+            this.sampleOpcode = sampleOpcode;
+            this.value = value;
+            this.doubleValue = doubleValue;
+            this.time = time;
+        }
+
+        public SampleOpcode getSampleOpcode() {
+            return sampleOpcode;
+        }
+
+        public Object getValue() {
+            return value;
+        }
+
+        public double getDoubleValue() {
+            return doubleValue;
+        }
+
+        public DateTime getTime() {
+            return time;
+        }
+    }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/filter/DecimationMode.java b/usage/src/main/java/com/ning/billing/usage/timeline/filter/DecimationMode.java
new file mode 100644
index 0000000..0523bd7
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/filter/DecimationMode.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.filter;
+
+public enum DecimationMode {
+    PEAK_PICK,
+    AVERAGE;
+
+    public static DecimationMode fromString(final String modeString) {
+        for (final DecimationMode decimationMode : DecimationMode.values()) {
+            if (decimationMode.name().equalsIgnoreCase(modeString)) {
+                return decimationMode;
+            }
+        }
+        return null;
+    }
+}