killbill-aplcache
Changes
usage/src/test/java/com/ning/billing/usage/timeline/categories/TestCategoryAndMetrics.java 67(+67 -0)
usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkAccumulator.java 161(+161 -0)
Details
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/categories/TestCategoryAndMetrics.java b/usage/src/test/java/com/ning/billing/usage/timeline/categories/TestCategoryAndMetrics.java
new file mode 100644
index 0000000..1dc1a56
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/categories/TestCategoryAndMetrics.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.categories;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.categories.CategoryAndMetrics;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class TestCategoryAndMetrics {
+
+ @Test(groups = "fast")
+ public void testMapping() throws Exception {
+ final CategoryAndMetrics kinds = new CategoryAndMetrics("JVM");
+ kinds.addMetric("GC");
+ kinds.addMetric("CPU");
+
+ final ObjectMapper mapper = new ObjectMapper();
+ final String json = mapper.writeValueAsString(kinds);
+ Assert.assertEquals("{\"eventCategory\":\"JVM\",\"sampleKinds\":[\"GC\",\"CPU\"]}", json);
+
+ final CategoryAndMetrics kindsFromJson = mapper.readValue(json, CategoryAndMetrics.class);
+ Assert.assertEquals(kindsFromJson, kinds);
+ }
+
+ @Test(groups = "fast")
+ public void testComparison() throws Exception {
+ final CategoryAndMetrics aKinds = new CategoryAndMetrics("JVM");
+ aKinds.addMetric("GC");
+ aKinds.addMetric("CPU");
+ Assert.assertEquals(aKinds.compareTo(aKinds), 0);
+
+ final CategoryAndMetrics bKinds = new CategoryAndMetrics("JVM");
+ bKinds.addMetric("GC");
+ bKinds.addMetric("CPU");
+ Assert.assertEquals(aKinds.compareTo(bKinds), 0);
+ Assert.assertEquals(bKinds.compareTo(aKinds), 0);
+
+ final CategoryAndMetrics cKinds = new CategoryAndMetrics("JVM");
+ cKinds.addMetric("GC");
+ cKinds.addMetric("CPU");
+ cKinds.addMetric("Something else");
+ Assert.assertTrue(aKinds.compareTo(cKinds) < 0);
+ Assert.assertTrue(cKinds.compareTo(aKinds) > 0);
+
+ final CategoryAndMetrics dKinds = new CategoryAndMetrics("ZVM");
+ dKinds.addMetric("GC");
+ Assert.assertTrue(aKinds.compareTo(dKinds) < 0);
+ Assert.assertTrue(dKinds.compareTo(aKinds) > 0);
+ }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCoder.java b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCoder.java
new file mode 100644
index 0000000..837aef4
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCoder.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.DateTimeUtils;
+import com.ning.billing.usage.timeline.Hex;
+import com.ning.billing.usage.timeline.samples.RepeatSample;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCursor;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+import com.ning.billing.usage.timeline.times.TimelineCursor;
+
+import com.google.common.collect.ImmutableList;
+
+public class TestSampleCoder {
+
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime().withZone(DateTimeZone.UTC);
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ @Test(groups = "fast")
+ public void testScan() throws Exception {
+ final DateTime startTime = new DateTime(DateTimeZone.UTC);
+ final DateTime endTime = startTime.plusSeconds(5);
+ final List<DateTime> dateTimes = ImmutableList.<DateTime>of(startTime.plusSeconds(1), startTime.plusSeconds(2), startTime.plusSeconds(3), startTime.plusSeconds(4));
+ final byte[] compressedTimes = timelineCoder.compressDateTimes(dateTimes);
+
+
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
+ final ScalarSample<Short> sample = new ScalarSample<Short>(SampleOpcode.SHORT, (short) 4);
+ sampleCoder.encodeSample(dataOutputStream, sample);
+ sampleCoder.encodeSample(dataOutputStream, new RepeatSample<Short>(3, sample));
+ dataOutputStream.close();
+
+ sampleCoder.scan(outputStream.toByteArray(), compressedTimes, dateTimes.size(), new TimeRangeSampleProcessor(startTime, endTime) {
+ @Override
+ public void processOneSample(final DateTime time, final SampleOpcode opcode, final Object value) {
+ Assert.assertTrue(time.isAfter(startTime));
+ Assert.assertTrue(time.isBefore(endTime));
+ Assert.assertEquals(Short.valueOf(value.toString()), sample.getSampleValue());
+ }
+ });
+ }
+
+ @Test(groups = "fast")
+ public void testTimeRangeSampleProcessor() throws Exception {
+ final DateTime startTime = new DateTime(dateFormatter.parseDateTime("2012-03-23T17:35:11.000Z"));
+ final DateTime endTime = new DateTime(dateFormatter.parseDateTime("2012-03-23T17:35:17.000Z"));
+ final int sampleCount = 2;
+
+ final List<DateTime> dateTimes = ImmutableList.<DateTime>of(startTime, endTime);
+ final byte[] compressedTimes = timelineCoder.compressDateTimes(dateTimes);
+ final TimelineCursor cursor = new DefaultTimelineCursor(compressedTimes, sampleCount);
+ Assert.assertEquals(cursor.getNextTime(), startTime);
+ Assert.assertEquals(cursor.getNextTime(), endTime);
+
+ // 2 x the value 12: REPEAT_BYTE, SHORT, 2, SHORT, 12 (2 bytes)
+ final byte[] samples = new byte[]{(byte) 0xff, 2, 2, 0, 12};
+
+ final AtomicInteger samplesCount = new AtomicInteger(0);
+ sampleCoder.scan(samples, compressedTimes, sampleCount, new TimeRangeSampleProcessor(startTime, endTime) {
+ @Override
+ public void processOneSample(final DateTime time, final SampleOpcode opcode, final Object value) {
+ if (samplesCount.get() == 0) {
+ Assert.assertEquals(DateTimeUtils.unixSeconds(time), DateTimeUtils.unixSeconds(startTime));
+ } else {
+ Assert.assertEquals(DateTimeUtils.unixSeconds(time), DateTimeUtils.unixSeconds(endTime));
+ }
+ samplesCount.incrementAndGet();
+ }
+ });
+ Assert.assertEquals(samplesCount.get(), sampleCount);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(groups = "fast")
+ public void testCombineSampleBytes() throws Exception {
+ final ScalarSample[] samplesToChoose = new ScalarSample[]{new ScalarSample(SampleOpcode.DOUBLE, 2.0),
+ new ScalarSample(SampleOpcode.DOUBLE, 1.0),
+ new ScalarSample(SampleOpcode.INT_ZERO, 0)};
+ final int[] repetitions = new int[]{1, 2, 3, 4, 5, 240, 250, 300};
+ final Random rand = new Random(0);
+ int count = 0;
+ final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(0, 0, sampleCoder);
+ final List<ScalarSample> samples = new ArrayList<ScalarSample>();
+ for (int i = 0; i < 20; i++) {
+ final ScalarSample sample = samplesToChoose[rand.nextInt(samplesToChoose.length)];
+ final int repetition = repetitions[rand.nextInt(repetitions.length)];
+ for (int r = 0; r < repetition; r++) {
+ samples.add(sample);
+ accum.addSample(sample);
+ count++;
+ }
+ }
+ final byte[] sampleBytes = sampleCoder.compressSamples(samples);
+ final byte[] accumBytes = accum.getEncodedSamples().getEncodedBytes();
+ Assert.assertEquals(accumBytes, sampleBytes);
+ final List<ScalarSample> restoredSamples = sampleCoder.decompressSamples(sampleBytes);
+ Assert.assertEquals(restoredSamples.size(), samples.size());
+ for (int i = 0; i < count; i++) {
+ Assert.assertEquals(restoredSamples.get(i), samples.get(i));
+ }
+ for (int fragmentLength = 2; fragmentLength < count / 2; fragmentLength++) {
+ final List<byte[]> fragments = new ArrayList<byte[]>();
+ final int fragmentCount = (int) Math.ceil((double) count / (double) fragmentLength);
+ for (int fragCounter = 0; fragCounter < fragmentCount; fragCounter++) {
+ final int fragIndex = fragCounter * fragmentLength;
+ final List<ScalarSample> fragment = samples.subList(fragIndex, Math.min(count, fragIndex + fragmentLength));
+ fragments.add(sampleCoder.compressSamples(fragment));
+ }
+ final byte[] combined = sampleCoder.combineSampleBytes(fragments);
+ final List<ScalarSample> restored = sampleCoder.decompressSamples(combined);
+ Assert.assertEquals(restored.size(), samples.size());
+ for (int i = 0; i < count; i++) {
+ Assert.assertEquals(restored.get(i), samples.get(i));
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(groups = "fast")
+ public void testCombineMoreThan65KSamples() throws Exception {
+ final int count = 0;
+ final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(0, 0, sampleCoder);
+ final List<ScalarSample> samples = new ArrayList<ScalarSample>();
+ final ScalarSample sample1 = new ScalarSample(SampleOpcode.BYTE, (byte) 1);
+ final ScalarSample sample2 = new ScalarSample(SampleOpcode.BYTE, (byte) 2);
+ for (int i = 0; i < 20; i++) {
+ samples.add(sample1);
+ accum.addSample(sample1);
+ }
+ for (int i = 0; i < 0xFFFF + 100; i++) {
+ samples.add(sample2);
+ accum.addSample(sample2);
+ }
+ final byte[] sampleBytes = sampleCoder.compressSamples(samples);
+ final String hex = new String(Hex.encodeHex(sampleBytes));
+ // Here are the compressed samples: ff140101feffff0102ff640102
+ // Translation:
+ // [ff 14 01 01] means repeat 20 times BYTE value 1
+ // [fe ff ff 01 02] means repeat 65525 times BYTE value 2
+ // [ff 64 01 02] means repeat 100 times BYTE value 2
+ Assert.assertEquals(sampleBytes, Hex.decodeHex("ff140101feffff0102ff640102".toCharArray()));
+ final List<ScalarSample> restoredSamples = sampleCoder.decompressSamples(sampleBytes);
+ Assert.assertEquals(restoredSamples.size(), samples.size());
+ for (int i = 0; i < count; i++) {
+ Assert.assertEquals(restoredSamples.get(i), samples.get(i));
+ }
+ }
+
+ /*
+ * I saw an error in combineSampleBytes:
+ * java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.Short
+ * These were the inputs:
+ * [11, 44, 74, -1, 2, 15, 11, 40, 68, -1, 2, 15]
+ * meaning half-float-for-double; repeat 2 times double zero; half-float-for-double; repeat 2 time double zero
+ * [11, 44, 68, -1, 3, 15, 11, 40, 68]
+ * meaning meaning half-float-for-double; repeat 3 times double zero; half-float-for-double
+ * [-1, 3, 15, 11, 40, 68, -1, 2, 15, 11, 40, 68]
+ * meaning repeat 3 times double-zero; half-float-for-double; repeat 2 times double zero; half-float-for-double
+ * [-1, 2, 11, 40, 68, -1, 3, 15, 11, 40, 68, 15]
+ * meaning repeat 2 times half-float-for-double; repeat 3 times double-zero; half-float-for-double; double zero
+ */
+ @SuppressWarnings("unchecked")
+ @Test(groups = "fast")
+ public void testCombineError() throws Exception {
+ final byte[] b1 = new byte[]{11, 44, 74, -1, 2, 15, 11, 40, 68, -1, 2, 15};
+ final byte[] b2 = new byte[]{11, 44, 68, -1, 3, 15, 11, 40, 68};
+ final byte[] b3 = new byte[]{-1, 3, 15, 11, 40, 68, -1, 2, 15, 11, 40, 68};
+ final byte[] b4 = new byte[]{-1, 2, 11, 40, 68, -1, 3, 15, 11, 40, 68, 15};
+ final List<byte[]> parts = new ArrayList<byte[]>();
+ parts.add(b1);
+ parts.add(b2);
+ parts.add(b3);
+ parts.add(b4);
+ final byte[] combinedBytes = sampleCoder.combineSampleBytes(parts);
+ final List<ScalarSample> samples = sampleCoder.decompressSamples(combinedBytes);
+ Assert.assertEquals(samples.size(), 25);
+ }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCompression.java b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCompression.java
new file mode 100644
index 0000000..cae69b0
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestSampleCompression.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+
+public class TestSampleCompression {
+
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ @Test(groups = "fast")
+ public void testBasicDoubleCompression() throws Exception {
+
+ checkDoubleCodedResult(0.0, SampleOpcode.DOUBLE_ZERO, 1);
+ checkDoubleCodedResult(1.0, SampleOpcode.BYTE_FOR_DOUBLE, 2);
+ checkDoubleCodedResult(1.005, SampleOpcode.BYTE_FOR_DOUBLE, 2);
+ checkDoubleCodedResult(127.2, SampleOpcode.BYTE_FOR_DOUBLE, 2);
+ checkDoubleCodedResult(-128.2, SampleOpcode.BYTE_FOR_DOUBLE, 2);
+
+ checkDoubleCodedResult(65503.0, SampleOpcode.HALF_FLOAT_FOR_DOUBLE, 3);
+ checkDoubleCodedResult(-65503.0, SampleOpcode.HALF_FLOAT_FOR_DOUBLE, 3);
+ checkDoubleCodedResult(6.1e-5, SampleOpcode.HALF_FLOAT_FOR_DOUBLE, 3);
+ checkDoubleCodedResult(-6.1e-5, SampleOpcode.HALF_FLOAT_FOR_DOUBLE, 3);
+
+ checkDoubleCodedResult(200.0, SampleOpcode.SHORT_FOR_DOUBLE, 3);
+ checkDoubleCodedResult(32767.0, SampleOpcode.SHORT_FOR_DOUBLE, 3);
+ checkDoubleCodedResult(-200.0, SampleOpcode.SHORT_FOR_DOUBLE, 3);
+ checkDoubleCodedResult(-32768.0, SampleOpcode.SHORT_FOR_DOUBLE, 3);
+
+ checkDoubleCodedResult((double) Float.MAX_VALUE, SampleOpcode.FLOAT_FOR_DOUBLE, 5);
+ checkDoubleCodedResult((double) Float.MIN_VALUE, SampleOpcode.FLOAT_FOR_DOUBLE, 5);
+
+ checkDoubleCodedResult(((double) Float.MAX_VALUE) * 10.0, SampleOpcode.DOUBLE, 9);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void checkDoubleCodedResult(final double value, final SampleOpcode expectedOpcode, final int expectedSize) {
+ final ScalarSample codedSample = sampleCoder.compressSample(new ScalarSample(SampleOpcode.DOUBLE, value));
+ Assert.assertEquals(codedSample.getOpcode(), expectedOpcode);
+ final double error = value == 0.0 ? 0.0 : Math.abs((value - codedSample.getDoubleValue()) / value);
+ Assert.assertTrue(error <= sampleCoder.getMaxFractionError());
+ final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(123, 456, sampleCoder);
+ accum.addSample(codedSample);
+ final DateTime now = new DateTime();
+ final List<DateTime> dateTimes = new ArrayList<DateTime>();
+ dateTimes.add(now);
+ final byte[] timeBytes = timelineCoder.compressDateTimes(dateTimes);
+ final byte[] encodedSampleBytes = accum.extractTimelineChunkAndReset(now, now, timeBytes).getTimeBytesAndSampleBytes().getSampleBytes();
+ Assert.assertEquals(encodedSampleBytes.length, expectedSize);
+ }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkAccumulator.java b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkAccumulator.java
new file mode 100644
index 0000000..1c63f60
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkAccumulator.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+import com.ning.billing.usage.timeline.times.TimelineCursor;
+
+public class TestTimelineChunkAccumulator {
+
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ @SuppressWarnings("unchecked")
+ @Test(groups = "fast")
+ public void testBasicAccumulator() throws Exception {
+ final int hostId = 123;
+ final int sampleKindId = 456;
+ final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(hostId, sampleKindId, sampleCoder);
+ final List<DateTime> dateTimes = new ArrayList<DateTime>();
+ final DateTime startTime = new DateTime();
+ final DateTime endTime = startTime.plus(1000);
+
+ accum.addSample(new ScalarSample(SampleOpcode.INT, 25));
+ int timesCounter = 0;
+ dateTimes.add(startTime.plusSeconds(30 * timesCounter++));
+ for (int i = 0; i < 5; i++) {
+ accum.addSample(new ScalarSample(SampleOpcode.INT, 10));
+ dateTimes.add(startTime.plusSeconds(30 * timesCounter++));
+ }
+ accum.addSample(new ScalarSample(SampleOpcode.DOUBLE, 100.0));
+ dateTimes.add(startTime.plusSeconds(30 * timesCounter++));
+ accum.addSample(new ScalarSample(SampleOpcode.DOUBLE, 100.0));
+ dateTimes.add(startTime.plusSeconds(30 * timesCounter++));
+
+ accum.addSample(new ScalarSample(SampleOpcode.STRING, "Hiya!"));
+ dateTimes.add(startTime.plusSeconds(30 * timesCounter++));
+
+ final byte[] compressedTimes = timelineCoder.compressDateTimes(dateTimes);
+ final TimelineChunk chunk = accum.extractTimelineChunkAndReset(startTime, endTime, compressedTimes);
+ Assert.assertEquals(chunk.getSampleCount(), 9);
+ // Now play them back
+ sampleCoder.scan(chunk.getTimeBytesAndSampleBytes().getSampleBytes(), compressedTimes, dateTimes.size(), new SampleProcessor() {
+ private int sampleNumber = 0;
+
+ @Override
+ public void processSamples(final TimelineCursor timeCursor, final int sampleCount, final SampleOpcode opcode, final Object value) {
+ if (sampleNumber == 0) {
+ Assert.assertEquals(opcode, SampleOpcode.INT);
+ Assert.assertEquals(value, 25);
+ } else if (sampleNumber >= 1 && sampleNumber < 6) {
+ Assert.assertEquals(opcode, SampleOpcode.INT);
+ Assert.assertEquals(value, 10);
+ } else if (sampleNumber >= 6 && sampleNumber < 8) {
+ Assert.assertEquals(opcode, SampleOpcode.DOUBLE);
+ Assert.assertEquals(value, 100.0);
+ } else if (sampleNumber == 8) {
+ Assert.assertEquals(opcode, SampleOpcode.STRING);
+ Assert.assertEquals(value, "Hiya!");
+ } else {
+ Assert.assertTrue(false);
+ }
+ sampleNumber += sampleCount;
+ }
+ });
+ final TimelineChunkDecoded chunkDecoded = new TimelineChunkDecoded(chunk, sampleCoder);
+ System.out.printf("%s\n", chunkDecoded.toString());
+ }
+
+
+ @Test(groups = "fast")
+ public void testByteRepeater() throws Exception {
+ final int hostId = 123;
+ final int sampleKindId = 456;
+ final DateTime startTime = new DateTime();
+ final List<DateTime> dateTimes = new ArrayList<DateTime>();
+ final int byteRepeaterCount = 255;
+ final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(hostId, sampleKindId, sampleCoder);
+ for (int i = 0; i < byteRepeaterCount; i++) {
+ dateTimes.add(startTime.plusSeconds(i * 5));
+ accum.addSample(sampleCoder.compressSample(new ScalarSample<Double>(SampleOpcode.DOUBLE, 2.0)));
+ }
+ final DateTime endTime = startTime.plusSeconds(5 * byteRepeaterCount);
+ final byte[] compressedTimes = timelineCoder.compressDateTimes(dateTimes);
+ final TimelineChunk chunk = accum.extractTimelineChunkAndReset(startTime, endTime, compressedTimes);
+ final byte[] samples = chunk.getTimeBytesAndSampleBytes().getSampleBytes();
+ // Should be 0xFF 0xFF 0x12 0x02
+ Assert.assertEquals(samples.length, 4);
+ Assert.assertEquals(((int) samples[0]) & 0xff, SampleOpcode.REPEAT_BYTE.getOpcodeIndex());
+ Assert.assertEquals(((int) samples[1]) & 0xff, byteRepeaterCount);
+ Assert.assertEquals(((int) samples[2]) & 0xff, SampleOpcode.BYTE_FOR_DOUBLE.getOpcodeIndex());
+ Assert.assertEquals(((int) samples[3]) & 0xff, 0x02);
+ Assert.assertEquals(chunk.getSampleCount(), byteRepeaterCount);
+ sampleCoder.scan(chunk.getTimeBytesAndSampleBytes().getSampleBytes(), compressedTimes, dateTimes.size(), new SampleProcessor() {
+
+ @Override
+ public void processSamples(final TimelineCursor timeCursor, final int sampleCount, final SampleOpcode opcode, final Object value) {
+ Assert.assertEquals(sampleCount, byteRepeaterCount);
+ Assert.assertEquals(value, 2.0);
+ }
+ });
+ }
+
+ @Test(groups = "fast")
+ public void testShortRepeater() throws Exception {
+ final int hostId = 123;
+ final int sampleKindId = 456;
+ final DateTime startTime = new DateTime();
+ final List<DateTime> dateTimes = new ArrayList<DateTime>();
+ final int shortRepeaterCount = 256;
+ final TimelineChunkAccumulator accum = new TimelineChunkAccumulator(hostId, sampleKindId, sampleCoder);
+ for (int i = 0; i < shortRepeaterCount; i++) {
+ dateTimes.add(startTime.plusSeconds(i * 5));
+ accum.addSample(sampleCoder.compressSample(new ScalarSample<Double>(SampleOpcode.DOUBLE, 2.0)));
+ }
+ final DateTime endTime = startTime.plusSeconds(5 * shortRepeaterCount);
+ final byte[] compressedTimes = timelineCoder.compressDateTimes(dateTimes);
+ final TimelineChunk chunk = accum.extractTimelineChunkAndReset(startTime, endTime, compressedTimes);
+ final byte[] samples = chunk.getTimeBytesAndSampleBytes().getSampleBytes();
+ Assert.assertEquals(samples.length, 5);
+ Assert.assertEquals(((int) samples[0]) & 0xff, SampleOpcode.REPEAT_SHORT.getOpcodeIndex());
+ final int count = ((samples[1] & 0xff) << 8) | (samples[2] & 0xff);
+ Assert.assertEquals(count, shortRepeaterCount);
+ Assert.assertEquals(((int) samples[3]) & 0xff, SampleOpcode.BYTE_FOR_DOUBLE.getOpcodeIndex());
+ Assert.assertEquals(((int) samples[4]) & 0xff, 0x02);
+ Assert.assertEquals(chunk.getSampleCount(), shortRepeaterCount);
+
+ sampleCoder.scan(chunk.getTimeBytesAndSampleBytes().getSampleBytes(), compressedTimes, dateTimes.size(), new SampleProcessor() {
+
+ @Override
+ public void processSamples(TimelineCursor timeCursor, int sampleCount, SampleOpcode opcode, Object value) {
+ Assert.assertEquals(sampleCount, shortRepeaterCount);
+ Assert.assertEquals(value, 2.0);
+ }
+ });
+ }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkToJson.java b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkToJson.java
new file mode 100644
index 0000000..afcc647
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestTimelineChunkToJson.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.chunks.TimelineChunksViews.Compact;
+import com.ning.billing.usage.timeline.chunks.TimelineChunksViews.Loose;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+public class TestTimelineChunkToJson {
+
+ private static final ObjectMapper mapper = new ObjectMapper().configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ private static final long CHUNK_ID = 1242L;
+ private static final int HOST_ID = 1422;
+ private static final int SAMPLE_KIND_ID = 1224;
+ private static final int SAMPLE_COUNT = 2142;
+ private static final DateTime END_TIME = new DateTime(DateTimeZone.UTC);
+ private static final DateTime START_TIME = END_TIME.minusMinutes(SAMPLE_COUNT);
+
+ private byte[] samples;
+ private TimelineChunk chunk;
+
+ @BeforeMethod(groups = "fast")
+ public void setUp() throws Exception {
+ final List<DateTime> dateTimes = new ArrayList<DateTime>();
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final DataOutputStream output = new DataOutputStream(out);
+ for (int i = 0; i < SAMPLE_COUNT; i++) {
+ sampleCoder.encodeSample(output, new ScalarSample<Long>(SampleOpcode.LONG, 10L));
+ dateTimes.add(START_TIME.plusMinutes(i));
+ }
+ output.flush();
+ output.close();
+ samples = out.toByteArray();
+
+ final DateTime endTime = dateTimes.get(dateTimes.size() - 1);
+ final byte[] timeBytes = timelineCoder.compressDateTimes(dateTimes);
+ chunk = new TimelineChunk(CHUNK_ID, HOST_ID, SAMPLE_KIND_ID, START_TIME, endTime, timeBytes, samples, SAMPLE_COUNT);
+ }
+
+ @Test(groups = "fast")
+ public void testTimelineChunkCompactMapping() throws Exception {
+ final String chunkToString = mapper.writerWithView(Compact.class).writeValueAsString(chunk);
+ final Map chunkFromString = mapper.readValue(chunkToString, Map.class);
+ Assert.assertEquals(chunkFromString.keySet().size(), 11);
+ Assert.assertEquals(chunkFromString.get("hostId"), HOST_ID);
+ Assert.assertEquals(chunkFromString.get("sampleKindId"), SAMPLE_KIND_ID);
+ Assert.assertEquals(new TextNode((String) chunkFromString.get("samples")).binaryValue(), samples);
+ Assert.assertEquals(chunkFromString.get("sampleCount"), SAMPLE_COUNT);
+ Assert.assertEquals(chunkFromString.get("startTime"), START_TIME.getMillis());
+ Assert.assertEquals(chunkFromString.get("aggregationLevel"), 0);
+ Assert.assertEquals(chunkFromString.get("notValid"), false);
+ Assert.assertEquals(chunkFromString.get("dontAggregate"), false);
+ }
+
+ @Test(groups = "fast")
+ public void testTimelineChunkLooseMapping() throws Exception {
+ final String chunkToString = mapper.writerWithView(Loose.class).writeValueAsString(chunk);
+ final Map chunkFromString = mapper.readValue(chunkToString, Map.class);
+ Assert.assertEquals(chunkFromString.keySet().size(), 4);
+ Assert.assertEquals(chunkFromString.get("hostId"), HOST_ID);
+ Assert.assertEquals(chunkFromString.get("sampleKindId"), SAMPLE_KIND_ID);
+ Assert.assertNotNull(chunkFromString.get("samplesAsCSV"));
+ }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/filter/TestDecimatingFilter.java b/usage/src/test/java/com/ning/billing/usage/timeline/filter/TestDecimatingFilter.java
new file mode 100644
index 0000000..2e5404f
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/filter/TestDecimatingFilter.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.skife.config.TimeSpan;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.consumer.SampleConsumer;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+
+public class TestDecimatingFilter {
+
+ @Test(groups = "fast")
+ public void testBasicFilterOperations() throws Exception {
+ final List<Double> outputs = new ArrayList<Double>();
+ final long millisStart = System.currentTimeMillis() - 2000 * 100;
+
+ final DecimatingSampleFilter filter = new DecimatingSampleFilter(new DateTime(millisStart), new DateTime(millisStart + 2000 * 100), 25, 100, new TimeSpan("2s"), DecimationMode.PEAK_PICK,
+ new SampleConsumer() {
+
+ @Override
+ public void consumeSample(final int sampleNumber, final SampleOpcode opcode, final Object value, final DateTime time) {
+ outputs.add((double) ((Double) value));
+ }
+ });
+ for (int i = 0; i < 100; i++) {
+ // Make the value go up for 4 samples; then down for 4 samples, between 10.0 and 40.0
+ final int index = (i % 8) + 1;
+ double value = 0;
+ if (index <= 4) {
+ value = 10.0 * index;
+ } else {
+ value = (8 - (index - 1)) * 10;
+ }
+ //System.out.printf("For i %d, index %d, adding value %f\n", i, index, value);
+ filter.processOneSample(new DateTime(millisStart + 2000 * i), SampleOpcode.DOUBLE, value);
+ }
+ int index = 0;
+ for (final Double value : outputs) {
+ //System.out.printf("index %d, value %f\n", index++, (double)((Double)value));
+ if ((index & 1) == 0) {
+ Assert.assertEquals(value, 40.0);
+ } else {
+ Assert.assertEquals(value, 10.0);
+ }
+ index++;
+ }
+ }
+
+ /**
+ * This test has sample count of 21, and output count of 6, so there are 5.8 samples per output point
+ *
+ * @throws Exception
+ */
+ @Test(groups = "fast")
+ public void testFilterWithNonAlignedSampleCounts() throws Exception {
+ final List<Double> outputs = new ArrayList<Double>();
+ final long millisStart = System.currentTimeMillis() - 2000 * 21;
+
+ final DecimatingSampleFilter filter = new DecimatingSampleFilter(new DateTime(millisStart), new DateTime(millisStart + 2000 * 21), 6, 21, new TimeSpan("2s"), DecimationMode.PEAK_PICK,
+ new SampleConsumer() {
+
+ @Override
+ public void consumeSample(final int sampleNumber, final SampleOpcode opcode, final Object value, final DateTime time) {
+ outputs.add((double) ((Double) value));
+ }
+ });
+ for (int i = 0; i < 21; i++) {
+ // Make the value go up for 6 samples; then down for 6 samples, between 10.0 and 60.0
+ final int index = (i % 6) + 1;
+ double value = 0;
+ if (index <= 3) {
+ value = 10.0 * index;
+ } else {
+ value = (6 - (index - 1)) * 10;
+ }
+ //System.out.printf("For i %d, index %d, adding value %f\n", i, index, value);
+ filter.processOneSample(new DateTime(millisStart + 2000 * i), SampleOpcode.DOUBLE, value);
+ }
+ Assert.assertEquals(outputs.size(), 5);
+ final double[] expectedValues = new double[]{30.0, 20.0, 30.0, 30.0, 10.0};
+ for (int i = 0; i < 5; i++) {
+ final double value = outputs.get(i);
+ final double expectedValue = expectedValues[i];
+ //System.out.printf("index %d, value returned %f, value expected %f\n", i, value, expectedValue);
+ Assert.assertEquals(value, expectedValue);
+ }
+ }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/metrics/TestSamplesForMetricAndSource.java b/usage/src/test/java/com/ning/billing/usage/timeline/metrics/TestSamplesForMetricAndSource.java
new file mode 100644
index 0000000..2665934
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/metrics/TestSamplesForMetricAndSource.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.metrics;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class TestSamplesForMetricAndSource {
+
+ @Test(groups = "fast")
+ public void testMapping() throws Exception {
+ final SamplesForMetricAndSource samples = new SamplesForMetricAndSource("host.foo.com", "JVM", "GC", "1,2,2,0");
+
+ final ObjectMapper mapper = new ObjectMapper();
+ final String json = mapper.writeValueAsString(samples);
+ Assert.assertEquals("{\"hostName\":\"host.foo.com\",\"eventCategory\":\"JVM\",\"sampleKind\":\"GC\",\"samples\":\"1,2,2,0\"}", json);
+
+ final SamplesForMetricAndSource samplesFromJson = mapper.readValue(json, SamplesForMetricAndSource.class);
+ Assert.assertEquals(samplesFromJson, samples);
+ }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/persistent/TestReplayer.java b/usage/src/test/java/com/ning/billing/usage/timeline/persistent/TestReplayer.java
new file mode 100644
index 0000000..5989f20
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/persistent/TestReplayer.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.sources.SourceSamplesForTimestamp;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class TestReplayer {
+
+ private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestReplayer-" + System.currentTimeMillis());
+
+ private static final class MockReplayer extends Replayer {
+
+ private final List<File> expectedFiles;
+ private int seen = 0;
+
+ public MockReplayer(final String path, final List<File> expectedFiles) {
+ super(path);
+ this.expectedFiles = expectedFiles;
+ }
+
+ @Override
+ public void read(final File file, final Function<SourceSamplesForTimestamp, Void> fn) throws IOException {
+ Assert.assertEquals(file, expectedFiles.get(seen));
+ seen++;
+ }
+
+ public int getSeen() {
+ return seen;
+ }
+ }
+
+ private final StreamyBytesPersistentOutputStream outputStream = new StreamyBytesPersistentOutputStream(basePath.toString(), "pweet", null, true);
+
+ @Test(groups = "fast")
+ public void testStringOrdering() throws Exception {
+ final File file1 = new File("aaa.bbb.12345.bin");
+ final File file2 = new File("aaa.bbb.12346.bin");
+ final File file3 = new File("aaa.bbb.02345.bin");
+
+ final List<File> sortedCopy = Replayer.FILE_ORDERING.sortedCopy(ImmutableList.<File>of(file2, file1, file3));
+ Assert.assertEquals(sortedCopy.get(0), file3);
+ Assert.assertEquals(sortedCopy.get(1), file1);
+ Assert.assertEquals(sortedCopy.get(2), file2);
+ }
+
+ @Test(groups = "slow")
+ public void testOrdering() throws Exception {
+ Assert.assertTrue(basePath.mkdir());
+
+ final List<String> filePathsCreated = new ArrayList<String>();
+ final List<File> filesCreated = new ArrayList<File>();
+ final int expected = 50;
+
+ for (int i = 0; i < expected; i++) {
+ filePathsCreated.add(outputStream.getFileName());
+ Thread.sleep(17);
+ }
+
+ // Create the files in the opposite ordering to make sure we can re-read them in order
+ for (int i = expected - 1; i >= 0; i--) {
+ final File file = new File(filePathsCreated.get(i));
+ Assert.assertTrue(file.createNewFile());
+ filesCreated.add(file);
+ }
+
+ final MockReplayer replayer = new MockReplayer(basePath.toString(), Lists.reverse(filesCreated));
+ replayer.readAll();
+
+ Assert.assertEquals(replayer.getSeen(), expected);
+ }
+}