killbill-aplcache

usage: add various tests Signed-off-by: Pierre-Alexandre

7/28/2012 6:54:40 PM

Details

diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/categories/TestCategoryAndMetrics.java b/usage/src/test/java/com/ning/billing/usage/timeline/categories/TestCategoryAndMetrics.java
new file mode 100644
index 0000000..1dc1a56
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/categories/TestCategoryAndMetrics.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.categories;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.categories.CategoryAndMetrics;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class TestCategoryAndMetrics {
+
+    @Test(groups = "fast")
+    public void testMapping() throws Exception {
+        final CategoryAndMetrics kinds = new CategoryAndMetrics("JVM");
+        kinds.addMetric("GC");
+        kinds.addMetric("CPU");
+
+        final ObjectMapper mapper = new ObjectMapper();
+        final String json = mapper.writeValueAsString(kinds);
+        Assert.assertEquals("{\"eventCategory\":\"JVM\",\"sampleKinds\":[\"GC\",\"CPU\"]}", json);
+
+        final CategoryAndMetrics kindsFromJson = mapper.readValue(json, CategoryAndMetrics.class);
+        Assert.assertEquals(kindsFromJson, kinds);
+    }
+
+    @Test(groups = "fast")
+    public void testComparison() throws Exception {
+        final CategoryAndMetrics aKinds = new CategoryAndMetrics("JVM");
+        aKinds.addMetric("GC");
+        aKinds.addMetric("CPU");
+        Assert.assertEquals(aKinds.compareTo(aKinds), 0);
+
+        final CategoryAndMetrics bKinds = new CategoryAndMetrics("JVM");
+        bKinds.addMetric("GC");
+        bKinds.addMetric("CPU");
+        Assert.assertEquals(aKinds.compareTo(bKinds), 0);
+        Assert.assertEquals(bKinds.compareTo(aKinds), 0);
+
+        final CategoryAndMetrics cKinds = new CategoryAndMetrics("JVM");
+        cKinds.addMetric("GC");
+        cKinds.addMetric("CPU");
+        cKinds.addMetric("Something else");
+        Assert.assertTrue(aKinds.compareTo(cKinds) < 0);
+        Assert.assertTrue(cKinds.compareTo(aKinds) > 0);
+
+        final CategoryAndMetrics dKinds = new CategoryAndMetrics("ZVM");
+        dKinds.addMetric("GC");
+        Assert.assertTrue(aKinds.compareTo(dKinds) < 0);
+        Assert.assertTrue(dKinds.compareTo(aKinds) > 0);
+    }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCoder.java b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCoder.java
new file mode 100644
index 0000000..837aef4
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCoder.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.DateTimeUtils;
+import com.ning.billing.usage.timeline.Hex;
+import com.ning.billing.usage.timeline.samples.RepeatSample;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCursor;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+import com.ning.billing.usage.timeline.times.TimelineCursor;
+
+import com.google.common.collect.ImmutableList;
+
+public class TestSampleCoder {
+
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime().withZone(DateTimeZone.UTC);
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    @Test(groups = "fast")
+    public void testScan() throws Exception {
+        final DateTime startTime = new DateTime(DateTimeZone.UTC);
+        final DateTime endTime = startTime.plusSeconds(5);
+        final List<DateTime> dateTimes = ImmutableList.<DateTime>of(startTime.plusSeconds(1), startTime.plusSeconds(2), startTime.plusSeconds(3), startTime.plusSeconds(4));
+        final byte[] compressedTimes = timelineCoder.compressDateTimes(dateTimes);
+
+
+        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        final DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
+        final ScalarSample<Short> sample = new ScalarSample<Short>(SampleOpcode.SHORT, (short) 4);
+        sampleCoder.encodeSample(dataOutputStream, sample);
+        sampleCoder.encodeSample(dataOutputStream, new RepeatSample<Short>(3, sample));
+        dataOutputStream.close();
+
+        sampleCoder.scan(outputStream.toByteArray(), compressedTimes, dateTimes.size(), new TimeRangeSampleProcessor(startTime, endTime) {
+            @Override
+            public void processOneSample(final DateTime time, final SampleOpcode opcode, final Object value) {
+                Assert.assertTrue(time.isAfter(startTime));
+                Assert.assertTrue(time.isBefore(endTime));
+                Assert.assertEquals(Short.valueOf(value.toString()), sample.getSampleValue());
+            }
+        });
+    }
+
+    @Test(groups = "fast")
+    public void testTimeRangeSampleProcessor() throws Exception {
+        final DateTime startTime = new DateTime(dateFormatter.parseDateTime("2012-03-23T17:35:11.000Z"));
+        final DateTime endTime = new DateTime(dateFormatter.parseDateTime("2012-03-23T17:35:17.000Z"));
+        final int sampleCount = 2;
+
+        final List<DateTime> dateTimes = ImmutableList.<DateTime>of(startTime, endTime);
+        final byte[] compressedTimes = timelineCoder.compressDateTimes(dateTimes);
+        final TimelineCursor cursor = new DefaultTimelineCursor(compressedTimes, sampleCount);
+        Assert.assertEquals(cursor.getNextTime(), startTime);
+        Assert.assertEquals(cursor.getNextTime(), endTime);
+
+        // 2 x the value 12: REPEAT_BYTE, SHORT, 2, SHORT, 12 (2 bytes)
+        final byte[] samples = new byte[]{(byte) 0xff, 2, 2, 0, 12};
+
+        final AtomicInteger samplesCount = new AtomicInteger(0);
+        sampleCoder.scan(samples, compressedTimes, sampleCount, new TimeRangeSampleProcessor(startTime, endTime) {
+            @Override
+            public void processOneSample(final DateTime time, final SampleOpcode opcode, final Object value) {
+                if (samplesCount.get() == 0) {
+                    Assert.assertEquals(DateTimeUtils.unixSeconds(time), DateTimeUtils.unixSeconds(startTime));
+                } else {
+                    Assert.assertEquals(DateTimeUtils.unixSeconds(time), DateTimeUtils.unixSeconds(endTime));
+                }
+                samplesCount.incrementAndGet();
+            }
+        });
+        Assert.assertEquals(samplesCount.get(), sampleCount);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(groups = "fast")
+    public void testCombineSampleBytes() throws Exception {
+        final ScalarSample[] samplesToChoose = new ScalarSample[]{new ScalarSample(SampleOpcode.DOUBLE, 2.0),
+                                                                  new ScalarSample(SampleOpcode.DOUBLE, 1.0),
+                                                                  new ScalarSample(SampleOpcode.INT_ZERO, 0)};
+        final int[] repetitions = new int[]{1, 2, 3, 4, 5, 240, 250, 300};
+        final Random rand = new Random(0);
+        int count = 0;
+        final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(0, 0, sampleCoder);
+        final List<ScalarSample> samples = new ArrayList<ScalarSample>();
+        for (int i = 0; i < 20; i++) {
+            final ScalarSample sample = samplesToChoose[rand.nextInt(samplesToChoose.length)];
+            final int repetition = repetitions[rand.nextInt(repetitions.length)];
+            for (int r = 0; r < repetition; r++) {
+                samples.add(sample);
+                accum.addSample(sample);
+                count++;
+            }
+        }
+        final byte[] sampleBytes = sampleCoder.compressSamples(samples);
+        final byte[] accumBytes = accum.getEncodedSamples().getEncodedBytes();
+        Assert.assertEquals(accumBytes, sampleBytes);
+        final List<ScalarSample> restoredSamples = sampleCoder.decompressSamples(sampleBytes);
+        Assert.assertEquals(restoredSamples.size(), samples.size());
+        for (int i = 0; i < count; i++) {
+            Assert.assertEquals(restoredSamples.get(i), samples.get(i));
+        }
+        for (int fragmentLength = 2; fragmentLength < count / 2; fragmentLength++) {
+            final List<byte[]> fragments = new ArrayList<byte[]>();
+            final int fragmentCount = (int) Math.ceil((double) count / (double) fragmentLength);
+            for (int fragCounter = 0; fragCounter < fragmentCount; fragCounter++) {
+                final int fragIndex = fragCounter * fragmentLength;
+                final List<ScalarSample> fragment = samples.subList(fragIndex, Math.min(count, fragIndex + fragmentLength));
+                fragments.add(sampleCoder.compressSamples(fragment));
+            }
+            final byte[] combined = sampleCoder.combineSampleBytes(fragments);
+            final List<ScalarSample> restored = sampleCoder.decompressSamples(combined);
+            Assert.assertEquals(restored.size(), samples.size());
+            for (int i = 0; i < count; i++) {
+                Assert.assertEquals(restored.get(i), samples.get(i));
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(groups = "fast")
+    public void testCombineMoreThan65KSamples() throws Exception {
+        final int count = 0;
+        final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(0, 0, sampleCoder);
+        final List<ScalarSample> samples = new ArrayList<ScalarSample>();
+        final ScalarSample sample1 = new ScalarSample(SampleOpcode.BYTE, (byte) 1);
+        final ScalarSample sample2 = new ScalarSample(SampleOpcode.BYTE, (byte) 2);
+        for (int i = 0; i < 20; i++) {
+            samples.add(sample1);
+            accum.addSample(sample1);
+        }
+        for (int i = 0; i < 0xFFFF + 100; i++) {
+            samples.add(sample2);
+            accum.addSample(sample2);
+        }
+        final byte[] sampleBytes = sampleCoder.compressSamples(samples);
+        final String hex = new String(Hex.encodeHex(sampleBytes));
+        // Here are the compressed samples: ff140101feffff0102ff640102
+        // Translation:
+        // [ff 14 01 01] means repeat 20 times BYTE value 1
+        // [fe ff ff 01 02] means repeat 65525 times BYTE value 2
+        // [ff 64 01 02] means repeat 100 times BYTE value 2
+        Assert.assertEquals(sampleBytes, Hex.decodeHex("ff140101feffff0102ff640102".toCharArray()));
+        final List<ScalarSample> restoredSamples = sampleCoder.decompressSamples(sampleBytes);
+        Assert.assertEquals(restoredSamples.size(), samples.size());
+        for (int i = 0; i < count; i++) {
+            Assert.assertEquals(restoredSamples.get(i), samples.get(i));
+        }
+    }
+
+    /*
+     * I saw an error in combineSampleBytes:
+     * java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.Short
+     * These were the inputs:
+     * [11, 44, 74, -1, 2, 15, 11, 40, 68, -1, 2, 15]
+     * meaning half-float-for-double; repeat 2 times double zero; half-float-for-double; repeat 2 time double zero
+     * [11, 44, 68, -1, 3, 15, 11, 40, 68]
+     * meaning meaning half-float-for-double; repeat 3 times double zero; half-float-for-double
+     * [-1, 3, 15, 11, 40, 68, -1, 2, 15, 11, 40, 68]
+     * meaning repeat 3 times double-zero; half-float-for-double; repeat 2 times double zero; half-float-for-double
+     * [-1, 2, 11, 40, 68, -1, 3, 15, 11, 40, 68, 15]
+     * meaning repeat 2 times half-float-for-double; repeat 3 times double-zero; half-float-for-double; double zero
+     */
+    @SuppressWarnings("unchecked")
+    @Test(groups = "fast")
+    public void testCombineError() throws Exception {
+        final byte[] b1 = new byte[]{11, 44, 74, -1, 2, 15, 11, 40, 68, -1, 2, 15};
+        final byte[] b2 = new byte[]{11, 44, 68, -1, 3, 15, 11, 40, 68};
+        final byte[] b3 = new byte[]{-1, 3, 15, 11, 40, 68, -1, 2, 15, 11, 40, 68};
+        final byte[] b4 = new byte[]{-1, 2, 11, 40, 68, -1, 3, 15, 11, 40, 68, 15};
+        final List<byte[]> parts = new ArrayList<byte[]>();
+        parts.add(b1);
+        parts.add(b2);
+        parts.add(b3);
+        parts.add(b4);
+        final byte[] combinedBytes = sampleCoder.combineSampleBytes(parts);
+        final List<ScalarSample> samples = sampleCoder.decompressSamples(combinedBytes);
+        Assert.assertEquals(samples.size(), 25);
+    }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCompression.java b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCompression.java
new file mode 100644
index 0000000..cae69b0
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCompression.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+
+public class TestSampleCompression {
+
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    @Test(groups = "fast")
+    public void testBasicDoubleCompression() throws Exception {
+
+        checkDoubleCodedResult(0.0, SampleOpcode.DOUBLE_ZERO, 1);
+        checkDoubleCodedResult(1.0, SampleOpcode.BYTE_FOR_DOUBLE, 2);
+        checkDoubleCodedResult(1.005, SampleOpcode.BYTE_FOR_DOUBLE, 2);
+        checkDoubleCodedResult(127.2, SampleOpcode.BYTE_FOR_DOUBLE, 2);
+        checkDoubleCodedResult(-128.2, SampleOpcode.BYTE_FOR_DOUBLE, 2);
+
+        checkDoubleCodedResult(65503.0, SampleOpcode.HALF_FLOAT_FOR_DOUBLE, 3);
+        checkDoubleCodedResult(-65503.0, SampleOpcode.HALF_FLOAT_FOR_DOUBLE, 3);
+        checkDoubleCodedResult(6.1e-5, SampleOpcode.HALF_FLOAT_FOR_DOUBLE, 3);
+        checkDoubleCodedResult(-6.1e-5, SampleOpcode.HALF_FLOAT_FOR_DOUBLE, 3);
+
+        checkDoubleCodedResult(200.0, SampleOpcode.SHORT_FOR_DOUBLE, 3);
+        checkDoubleCodedResult(32767.0, SampleOpcode.SHORT_FOR_DOUBLE, 3);
+        checkDoubleCodedResult(-200.0, SampleOpcode.SHORT_FOR_DOUBLE, 3);
+        checkDoubleCodedResult(-32768.0, SampleOpcode.SHORT_FOR_DOUBLE, 3);
+
+        checkDoubleCodedResult((double) Float.MAX_VALUE, SampleOpcode.FLOAT_FOR_DOUBLE, 5);
+        checkDoubleCodedResult((double) Float.MIN_VALUE, SampleOpcode.FLOAT_FOR_DOUBLE, 5);
+
+        checkDoubleCodedResult(((double) Float.MAX_VALUE) * 10.0, SampleOpcode.DOUBLE, 9);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void checkDoubleCodedResult(final double value, final SampleOpcode expectedOpcode, final int expectedSize) {
+        final ScalarSample codedSample = sampleCoder.compressSample(new ScalarSample(SampleOpcode.DOUBLE, value));
+        Assert.assertEquals(codedSample.getOpcode(), expectedOpcode);
+        final double error = value == 0.0 ? 0.0 : Math.abs((value - codedSample.getDoubleValue()) / value);
+        Assert.assertTrue(error <= sampleCoder.getMaxFractionError());
+        final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(123, 456, sampleCoder);
+        accum.addSample(codedSample);
+        final DateTime now = new DateTime();
+        final List<DateTime> dateTimes = new ArrayList<DateTime>();
+        dateTimes.add(now);
+        final byte[] timeBytes = timelineCoder.compressDateTimes(dateTimes);
+        final byte[] encodedSampleBytes = accum.extractTimelineChunkAndReset(now, now, timeBytes).getTimeBytesAndSampleBytes().getSampleBytes();
+        Assert.assertEquals(encodedSampleBytes.length, expectedSize);
+    }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkAccumulator.java b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkAccumulator.java
new file mode 100644
index 0000000..1c63f60
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkAccumulator.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+import com.ning.billing.usage.timeline.times.TimelineCursor;
+
+public class TestTimelineChunkAccumulator {
+
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    @SuppressWarnings("unchecked")
+    @Test(groups = "fast")
+    public void testBasicAccumulator() throws Exception {
+        final int hostId = 123;
+        final int sampleKindId = 456;
+        final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(hostId, sampleKindId, sampleCoder);
+        final List<DateTime> dateTimes = new ArrayList<DateTime>();
+        final DateTime startTime = new DateTime();
+        final DateTime endTime = startTime.plus(1000);
+
+        accum.addSample(new ScalarSample(SampleOpcode.INT, 25));
+        int timesCounter = 0;
+        dateTimes.add(startTime.plusSeconds(30 * timesCounter++));
+        for (int i = 0; i < 5; i++) {
+            accum.addSample(new ScalarSample(SampleOpcode.INT, 10));
+            dateTimes.add(startTime.plusSeconds(30 * timesCounter++));
+        }
+        accum.addSample(new ScalarSample(SampleOpcode.DOUBLE, 100.0));
+        dateTimes.add(startTime.plusSeconds(30 * timesCounter++));
+        accum.addSample(new ScalarSample(SampleOpcode.DOUBLE, 100.0));
+        dateTimes.add(startTime.plusSeconds(30 * timesCounter++));
+
+        accum.addSample(new ScalarSample(SampleOpcode.STRING, "Hiya!"));
+        dateTimes.add(startTime.plusSeconds(30 * timesCounter++));
+
+        final byte[] compressedTimes = timelineCoder.compressDateTimes(dateTimes);
+        final TimelineChunk chunk = accum.extractTimelineChunkAndReset(startTime, endTime, compressedTimes);
+        Assert.assertEquals(chunk.getSampleCount(), 9);
+        // Now play them back
+        sampleCoder.scan(chunk.getTimeBytesAndSampleBytes().getSampleBytes(), compressedTimes, dateTimes.size(), new SampleProcessor() {
+            private int sampleNumber = 0;
+
+            @Override
+            public void processSamples(final TimelineCursor timeCursor, final int sampleCount, final SampleOpcode opcode, final Object value) {
+                if (sampleNumber == 0) {
+                    Assert.assertEquals(opcode, SampleOpcode.INT);
+                    Assert.assertEquals(value, 25);
+                } else if (sampleNumber >= 1 && sampleNumber < 6) {
+                    Assert.assertEquals(opcode, SampleOpcode.INT);
+                    Assert.assertEquals(value, 10);
+                } else if (sampleNumber >= 6 && sampleNumber < 8) {
+                    Assert.assertEquals(opcode, SampleOpcode.DOUBLE);
+                    Assert.assertEquals(value, 100.0);
+                } else if (sampleNumber == 8) {
+                    Assert.assertEquals(opcode, SampleOpcode.STRING);
+                    Assert.assertEquals(value, "Hiya!");
+                } else {
+                    Assert.assertTrue(false);
+                }
+                sampleNumber += sampleCount;
+            }
+        });
+        final TimelineChunkDecoded chunkDecoded = new TimelineChunkDecoded(chunk, sampleCoder);
+        System.out.printf("%s\n", chunkDecoded.toString());
+    }
+
+
+    @Test(groups = "fast")
+    public void testByteRepeater() throws Exception {
+        final int hostId = 123;
+        final int sampleKindId = 456;
+        final DateTime startTime = new DateTime();
+        final List<DateTime> dateTimes = new ArrayList<DateTime>();
+        final int byteRepeaterCount = 255;
+        final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(hostId, sampleKindId, sampleCoder);
+        for (int i = 0; i < byteRepeaterCount; i++) {
+            dateTimes.add(startTime.plusSeconds(i * 5));
+            accum.addSample(sampleCoder.compressSample(new ScalarSample<Double>(SampleOpcode.DOUBLE, 2.0)));
+        }
+        final DateTime endTime = startTime.plusSeconds(5 * byteRepeaterCount);
+        final byte[] compressedTimes = timelineCoder.compressDateTimes(dateTimes);
+        final TimelineChunk chunk = accum.extractTimelineChunkAndReset(startTime, endTime, compressedTimes);
+        final byte[] samples = chunk.getTimeBytesAndSampleBytes().getSampleBytes();
+        // Should be 0xFF 0xFF 0x12 0x02
+        Assert.assertEquals(samples.length, 4);
+        Assert.assertEquals(((int) samples[0]) & 0xff, SampleOpcode.REPEAT_BYTE.getOpcodeIndex());
+        Assert.assertEquals(((int) samples[1]) & 0xff, byteRepeaterCount);
+        Assert.assertEquals(((int) samples[2]) & 0xff, SampleOpcode.BYTE_FOR_DOUBLE.getOpcodeIndex());
+        Assert.assertEquals(((int) samples[3]) & 0xff, 0x02);
+        Assert.assertEquals(chunk.getSampleCount(), byteRepeaterCount);
+        sampleCoder.scan(chunk.getTimeBytesAndSampleBytes().getSampleBytes(), compressedTimes, dateTimes.size(), new SampleProcessor() {
+
+            @Override
+            public void processSamples(final TimelineCursor timeCursor, final int sampleCount, final SampleOpcode opcode, final Object value) {
+                Assert.assertEquals(sampleCount, byteRepeaterCount);
+                Assert.assertEquals(value, 2.0);
+            }
+        });
+    }
+
+    @Test(groups = "fast")
+    public void testShortRepeater() throws Exception {
+        final int hostId = 123;
+        final int sampleKindId = 456;
+        final DateTime startTime = new DateTime();
+        final List<DateTime> dateTimes = new ArrayList<DateTime>();
+        final int shortRepeaterCount = 256;
+        final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(hostId, sampleKindId, sampleCoder);
+        for (int i = 0; i < shortRepeaterCount; i++) {
+            dateTimes.add(startTime.plusSeconds(i * 5));
+            accum.addSample(sampleCoder.compressSample(new ScalarSample<Double>(SampleOpcode.DOUBLE, 2.0)));
+        }
+        final DateTime endTime = startTime.plusSeconds(5 * shortRepeaterCount);
+        final byte[] compressedTimes = timelineCoder.compressDateTimes(dateTimes);
+        final TimelineChunk chunk = accum.extractTimelineChunkAndReset(startTime, endTime, compressedTimes);
+        final byte[] samples = chunk.getTimeBytesAndSampleBytes().getSampleBytes();
+        Assert.assertEquals(samples.length, 5);
+        Assert.assertEquals(((int) samples[0]) & 0xff, SampleOpcode.REPEAT_SHORT.getOpcodeIndex());
+        final int count = ((samples[1] & 0xff) << 8) | (samples[2] & 0xff);
+        Assert.assertEquals(count, shortRepeaterCount);
+        Assert.assertEquals(((int) samples[3]) & 0xff, SampleOpcode.BYTE_FOR_DOUBLE.getOpcodeIndex());
+        Assert.assertEquals(((int) samples[4]) & 0xff, 0x02);
+        Assert.assertEquals(chunk.getSampleCount(), shortRepeaterCount);
+
+        sampleCoder.scan(chunk.getTimeBytesAndSampleBytes().getSampleBytes(), compressedTimes, dateTimes.size(), new SampleProcessor() {
+
+            @Override
+            public void processSamples(TimelineCursor timeCursor, int sampleCount, SampleOpcode opcode, Object value) {
+                Assert.assertEquals(sampleCount, shortRepeaterCount);
+                Assert.assertEquals(value, 2.0);
+            }
+        });
+    }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkToJson.java b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkToJson.java
new file mode 100644
index 0000000..afcc647
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkToJson.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.chunks.TimelineChunksViews.Compact;
+import com.ning.billing.usage.timeline.chunks.TimelineChunksViews.Loose;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+public class TestTimelineChunkToJson {
+
+    private static final ObjectMapper mapper = new ObjectMapper().configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    private static final long CHUNK_ID = 1242L;
+    private static final int HOST_ID = 1422;
+    private static final int SAMPLE_KIND_ID = 1224;
+    private static final int SAMPLE_COUNT = 2142;
+    private static final DateTime END_TIME = new DateTime(DateTimeZone.UTC);
+    private static final DateTime START_TIME = END_TIME.minusMinutes(SAMPLE_COUNT);
+
+    private byte[] samples;
+    private TimelineChunk chunk;
+
+    @BeforeMethod(groups = "fast")
+    public void setUp() throws Exception {
+        final List<DateTime> dateTimes = new ArrayList<DateTime>();
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final DataOutputStream output = new DataOutputStream(out);
+        for (int i = 0; i < SAMPLE_COUNT; i++) {
+            sampleCoder.encodeSample(output, new ScalarSample<Long>(SampleOpcode.LONG, 10L));
+            dateTimes.add(START_TIME.plusMinutes(i));
+        }
+        output.flush();
+        output.close();
+        samples = out.toByteArray();
+
+        final DateTime endTime = dateTimes.get(dateTimes.size() - 1);
+        final byte[] timeBytes = timelineCoder.compressDateTimes(dateTimes);
+        chunk = new TimelineChunk(CHUNK_ID, HOST_ID, SAMPLE_KIND_ID, START_TIME, endTime, timeBytes, samples, SAMPLE_COUNT);
+    }
+
+    @Test(groups = "fast")
+    public void testTimelineChunkCompactMapping() throws Exception {
+        final String chunkToString = mapper.writerWithView(Compact.class).writeValueAsString(chunk);
+        final Map chunkFromString = mapper.readValue(chunkToString, Map.class);
+        Assert.assertEquals(chunkFromString.keySet().size(), 11);
+        Assert.assertEquals(chunkFromString.get("hostId"), HOST_ID);
+        Assert.assertEquals(chunkFromString.get("sampleKindId"), SAMPLE_KIND_ID);
+        Assert.assertEquals(new TextNode((String) chunkFromString.get("samples")).binaryValue(), samples);
+        Assert.assertEquals(chunkFromString.get("sampleCount"), SAMPLE_COUNT);
+        Assert.assertEquals(chunkFromString.get("startTime"), START_TIME.getMillis());
+        Assert.assertEquals(chunkFromString.get("aggregationLevel"), 0);
+        Assert.assertEquals(chunkFromString.get("notValid"), false);
+        Assert.assertEquals(chunkFromString.get("dontAggregate"), false);
+    }
+
+    @Test(groups = "fast")
+    public void testTimelineChunkLooseMapping() throws Exception {
+        final String chunkToString = mapper.writerWithView(Loose.class).writeValueAsString(chunk);
+        final Map chunkFromString = mapper.readValue(chunkToString, Map.class);
+        Assert.assertEquals(chunkFromString.keySet().size(), 4);
+        Assert.assertEquals(chunkFromString.get("hostId"), HOST_ID);
+        Assert.assertEquals(chunkFromString.get("sampleKindId"), SAMPLE_KIND_ID);
+        Assert.assertNotNull(chunkFromString.get("samplesAsCSV"));
+    }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/filter/TestDecimatingFilter.java b/usage/src/test/java/com/ning/billing/usage/timeline/filter/TestDecimatingFilter.java
new file mode 100644
index 0000000..2e5404f
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/filter/TestDecimatingFilter.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.skife.config.TimeSpan;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.consumer.SampleConsumer;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+
+public class TestDecimatingFilter {
+
+    @Test(groups = "fast")
+    public void testBasicFilterOperations() throws Exception {
+        final List<Double> outputs = new ArrayList<Double>();
+        final long millisStart = System.currentTimeMillis() - 2000 * 100;
+
+        final DecimatingSampleFilter filter = new DecimatingSampleFilter(new DateTime(millisStart), new DateTime(millisStart + 2000 * 100), 25, 100, new TimeSpan("2s"), DecimationMode.PEAK_PICK,
+                                                                         new SampleConsumer() {
+
+                                                                             @Override
+                                                                             public void consumeSample(final int sampleNumber, final SampleOpcode opcode, final Object value, final DateTime time) {
+                                                                                 outputs.add((double) ((Double) value));
+                                                                             }
+                                                                         });
+        for (int i = 0; i < 100; i++) {
+            // Make the value go up for 4 samples; then down for 4 samples, between 10.0 and 40.0
+            final int index = (i % 8) + 1;
+            double value = 0;
+            if (index <= 4) {
+                value = 10.0 * index;
+            } else {
+                value = (8 - (index - 1)) * 10;
+            }
+            //System.out.printf("For i %d, index %d, adding value %f\n", i, index, value);
+            filter.processOneSample(new DateTime(millisStart + 2000 * i), SampleOpcode.DOUBLE, value);
+        }
+        int index = 0;
+        for (final Double value : outputs) {
+            //System.out.printf("index %d, value %f\n", index++, (double)((Double)value));
+            if ((index & 1) == 0) {
+                Assert.assertEquals(value, 40.0);
+            } else {
+                Assert.assertEquals(value, 10.0);
+            }
+            index++;
+        }
+    }
+
+    /**
+     * This test has sample count of 21, and output count of 6, so there are 5.8 samples per output point
+     *
+     * @throws Exception
+     */
+    @Test(groups = "fast")
+    public void testFilterWithNonAlignedSampleCounts() throws Exception {
+        final List<Double> outputs = new ArrayList<Double>();
+        final long millisStart = System.currentTimeMillis() - 2000 * 21;
+
+        final DecimatingSampleFilter filter = new DecimatingSampleFilter(new DateTime(millisStart), new DateTime(millisStart + 2000 * 21), 6, 21, new TimeSpan("2s"), DecimationMode.PEAK_PICK,
+                                                                         new SampleConsumer() {
+
+                                                                             @Override
+                                                                             public void consumeSample(final int sampleNumber, final SampleOpcode opcode, final Object value, final DateTime time) {
+                                                                                 outputs.add((double) ((Double) value));
+                                                                             }
+                                                                         });
+        for (int i = 0; i < 21; i++) {
+            // Make the value go up for 6 samples; then down for 6 samples, between 10.0 and 60.0
+            final int index = (i % 6) + 1;
+            double value = 0;
+            if (index <= 3) {
+                value = 10.0 * index;
+            } else {
+                value = (6 - (index - 1)) * 10;
+            }
+            //System.out.printf("For i %d, index %d, adding value %f\n", i, index, value);
+            filter.processOneSample(new DateTime(millisStart + 2000 * i), SampleOpcode.DOUBLE, value);
+        }
+        Assert.assertEquals(outputs.size(), 5);
+        final double[] expectedValues = new double[]{30.0, 20.0, 30.0, 30.0, 10.0};
+        for (int i = 0; i < 5; i++) {
+            final double value = outputs.get(i);
+            final double expectedValue = expectedValues[i];
+            //System.out.printf("index %d, value returned %f, value expected %f\n", i, value, expectedValue);
+            Assert.assertEquals(value, expectedValue);
+        }
+    }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/metrics/TestSamplesForMetricAndSource.java b/usage/src/test/java/com/ning/billing/usage/timeline/metrics/TestSamplesForMetricAndSource.java
new file mode 100644
index 0000000..2665934
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/metrics/TestSamplesForMetricAndSource.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.metrics;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class TestSamplesForMetricAndSource {
+
+    @Test(groups = "fast")
+    public void testMapping() throws Exception {
+        final SamplesForMetricAndSource samples = new SamplesForMetricAndSource("host.foo.com", "JVM", "GC", "1,2,2,0");
+
+        final ObjectMapper mapper = new ObjectMapper();
+        final String json = mapper.writeValueAsString(samples);
+        Assert.assertEquals("{\"hostName\":\"host.foo.com\",\"eventCategory\":\"JVM\",\"sampleKind\":\"GC\",\"samples\":\"1,2,2,0\"}", json);
+
+        final SamplesForMetricAndSource samplesFromJson = mapper.readValue(json, SamplesForMetricAndSource.class);
+        Assert.assertEquals(samplesFromJson, samples);
+    }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/persistent/TestReplayer.java b/usage/src/test/java/com/ning/billing/usage/timeline/persistent/TestReplayer.java
new file mode 100644
index 0000000..5989f20
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/persistent/TestReplayer.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.sources.SourceSamplesForTimestamp;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class TestReplayer {
+
+    private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestReplayer-" + System.currentTimeMillis());
+
+    private static final class MockReplayer extends Replayer {
+
+        private final List<File> expectedFiles;
+        private int seen = 0;
+
+        public MockReplayer(final String path, final List<File> expectedFiles) {
+            super(path);
+            this.expectedFiles = expectedFiles;
+        }
+
+        @Override
+        public void read(final File file, final Function<SourceSamplesForTimestamp, Void> fn) throws IOException {
+            Assert.assertEquals(file, expectedFiles.get(seen));
+            seen++;
+        }
+
+        public int getSeen() {
+            return seen;
+        }
+    }
+
+    private final StreamyBytesPersistentOutputStream outputStream = new StreamyBytesPersistentOutputStream(basePath.toString(), "pweet", null, true);
+
+    @Test(groups = "fast")
+    public void testStringOrdering() throws Exception {
+        final File file1 = new File("aaa.bbb.12345.bin");
+        final File file2 = new File("aaa.bbb.12346.bin");
+        final File file3 = new File("aaa.bbb.02345.bin");
+
+        final List<File> sortedCopy = Replayer.FILE_ORDERING.sortedCopy(ImmutableList.<File>of(file2, file1, file3));
+        Assert.assertEquals(sortedCopy.get(0), file3);
+        Assert.assertEquals(sortedCopy.get(1), file1);
+        Assert.assertEquals(sortedCopy.get(2), file2);
+    }
+
+    @Test(groups = "slow")
+    public void testOrdering() throws Exception {
+        Assert.assertTrue(basePath.mkdir());
+
+        final List<String> filePathsCreated = new ArrayList<String>();
+        final List<File> filesCreated = new ArrayList<File>();
+        final int expected = 50;
+
+        for (int i = 0; i < expected; i++) {
+            filePathsCreated.add(outputStream.getFileName());
+            Thread.sleep(17);
+        }
+
+        // Create the files in the opposite ordering to make sure we can re-read them in order
+        for (int i = expected - 1; i >= 0; i--) {
+            final File file = new File(filePathsCreated.get(i));
+            Assert.assertTrue(file.createNewFile());
+            filesCreated.add(file);
+        }
+
+        final MockReplayer replayer = new MockReplayer(basePath.toString(), Lists.reverse(filesCreated));
+        replayer.readAll();
+
+        Assert.assertEquals(replayer.getSeen(), expected);
+    }
+}