killbill-aplcache

usage: initial import of TimelineCoder Compression/Decompression

7/28/2012 5:23:52 PM

Details

diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/Hex.java b/usage/src/main/java/com/ning/billing/usage/timeline/Hex.java
new file mode 100644
index 0000000..611b0c5
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/Hex.java
@@ -0,0 +1,113 @@
+package com.ning.billing.usage.timeline;
+
+/**
+ * Hex utilities from commons-codec
+ */
+public class Hex {
+
+    /**
+     * Used to build output as Hex
+     */
+    private static final char[] DIGITS_LOWER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+
+    /**
+     * Used to build output as Hex
+     */
+    private static final char[] DIGITS_UPPER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+
+    private Hex() {
+    }
+
+    /**
+     * Converts an array of bytes into an array of characters representing the hexadecimal values of each byte in order.
+     * The returned array will be double the length of the passed array, as it takes two characters to represent any
+     * given byte.
+     *
+     * @param data a byte[] to convert to Hex characters
+     * @return A char[] containing hexadecimal characters
+     */
+    public static char[] encodeHex(final byte[] data) {
+        return encodeHex(data, true);
+    }
+
+    /**
+     * Converts an array of bytes into an array of characters representing the hexadecimal values of each byte in order.
+     * The returned array will be double the length of the passed array, as it takes two characters to represent any
+     * given byte.
+     *
+     * @param data        a byte[] to convert to Hex characters
+     * @param toLowerCase <code>true</code> converts to lowercase, <code>false</code> to uppercase
+     * @return A char[] containing hexadecimal characters
+     * @since 1.4
+     */
+    public static char[] encodeHex(final byte[] data, final boolean toLowerCase) {
+        return encodeHex(data, toLowerCase ? DIGITS_LOWER : DIGITS_UPPER);
+    }
+
+    /**
+     * Converts an array of bytes into an array of characters representing the hexadecimal values of each byte in order.
+     * The returned array will be double the length of the passed array, as it takes two characters to represent any
+     * given byte.
+     *
+     * @param data     a byte[] to convert to Hex characters
+     * @param toDigits the output alphabet
+     * @return A char[] containing hexadecimal characters
+     * @since 1.4
+     */
+    public static char[] encodeHex(final byte[] data, final char[] toDigits) {
+        final int l = data.length;
+        final char[] out = new char[l << 1];
+        // two characters form the hex value.
+        for (int i = 0, j = 0; i < l; i++) {
+            out[j++] = toDigits[(0xF0 & data[i]) >>> 4];
+            out[j++] = toDigits[0x0F & data[i]];
+        }
+
+        return out;
+    }
+
+    /**
+     * Converts an array of characters representing hexadecimal values into an array of bytes of those same values. The
+     * returned array will be half the length of the passed array, as it takes two characters to represent any given
+     * byte. An exception is thrown if the passed char array has an odd number of elements.
+     *
+     * @param data An array of characters containing hexadecimal digits
+     * @return A byte array containing binary data decoded from the supplied char array.
+     * @throws IllegalArgumentException Thrown if an odd number or illegal of characters is supplied
+     */
+    public static byte[] decodeHex(final char[] data) throws IllegalArgumentException {
+        final int len = data.length;
+        if ((len & 0x01) != 0) {
+            throw new IllegalArgumentException("Odd number of characters.");
+        }
+
+        final byte[] out = new byte[len >> 1];
+
+        // two characters form the hex value.
+        for (int i = 0, j = 0; j < len; i++) {
+            int f = toDigit(data[j], j) << 4;
+            j++;
+            f = f | toDigit(data[j], j);
+            j++;
+            out[i] = (byte) (f & 0xFF);
+        }
+
+        return out;
+    }
+
+    /**
+     * Converts a hexadecimal character to an integer.
+     *
+     * @param ch    A character to convert to an integer digit
+     * @param index The index of the character in the source
+     * @return An integer
+     * @throws IllegalArgumentException Thrown if ch is an illegal hex character
+     */
+    private static int toDigit(final char ch, final int index) throws IllegalArgumentException {
+        final int digit = Character.digit(ch, 16);
+        if (digit == -1) {
+            throw new IllegalArgumentException("Illegal hexadecimal character " + ch + " at index " + index);
+        }
+        return digit;
+    }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/times/DefaultTimelineCoder.java b/usage/src/main/java/com/ning/billing/usage/timeline/times/DefaultTimelineCoder.java
new file mode 100644
index 0000000..07a3c05
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/times/DefaultTimelineCoder.java
@@ -0,0 +1,350 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.times;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.DateTimeUtils;
+import com.ning.billing.usage.timeline.Hex;
+
+public class DefaultTimelineCoder implements TimelineCoder {
+
+    public static final Logger log = LoggerFactory.getLogger(TimelineCoder.class);
+    public static final int MAX_SHORT_REPEAT_COUNT = 0xFFFF;
+    public static final int MAX_BYTE_REPEAT_COUNT = 0xFF;
+
+    /**
+     * Convert the array of unix times to a compressed timeline, and return the byte array
+     * representing that compressed timeline
+     *
+     * @param times an int array giving the unix times to be compressed
+     * @return the compressed timeline
+     */
+    @Override
+    public byte[] compressDateTimes(final List<DateTime> times) {
+        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        final DataOutputStream dataStream = new DataOutputStream(outputStream);
+        try {
+            int lastTime = 0;
+            int lastDelta = 0;
+            int repeatCount = 0;
+            for (final DateTime time : times) {
+                final int newTime = DateTimeUtils.unixSeconds(time);
+                if (lastTime == 0) {
+                    lastTime = newTime;
+                    writeTime(0, lastTime, dataStream);
+                    continue;
+                } else if (newTime < lastTime) {
+                    log.warn("In TimelineCoder.compressTimes(), newTime {} is < lastTime {}; ignored", newTime, lastTime);
+                    continue;
+                }
+                final int delta = newTime - lastTime;
+                final boolean deltaWorks = delta <= TimelineOpcode.MAX_DELTA_TIME;
+                final boolean sameDelta = repeatCount > 0 && delta == lastDelta;
+                if (deltaWorks) {
+                    if (sameDelta) {
+                        repeatCount++;
+                        if (repeatCount == MAX_SHORT_REPEAT_COUNT) {
+                            writeRepeatedDelta(delta, repeatCount, dataStream);
+                            repeatCount = 0;
+                        }
+                    } else {
+                        if (repeatCount > 0) {
+                            writeRepeatedDelta(lastDelta, repeatCount, dataStream);
+                        }
+                        repeatCount = 1;
+                    }
+                    lastDelta = delta;
+                } else {
+                    if (repeatCount > 0) {
+                        writeRepeatedDelta(lastDelta, repeatCount, dataStream);
+                    }
+                    writeTime(0, newTime, dataStream);
+                    repeatCount = 0;
+                    lastDelta = 0;
+                }
+                lastTime = newTime;
+            }
+            if (repeatCount > 0) {
+                writeRepeatedDelta(lastDelta, repeatCount, dataStream);
+            }
+            dataStream.flush();
+            return outputStream.toByteArray();
+        } catch (IOException e) {
+            log.error("Exception compressing times list of length {}", times.size(), e);
+            return null;
+        }
+    }
+
+    @Override
+    public byte[] combineTimelines(final List<byte[]> timesList, final Integer sampleCount) {
+        final byte[] timeBytes = combineTimelines(timesList);
+        final int combinedSampleCount = countTimeBytesSamples(timeBytes);
+        if (sampleCount != null && sampleCount != combinedSampleCount) {
+            final StringBuilder builder = new StringBuilder();
+            builder.append("In compressTimelineTimes(), combined sample count is ")
+                   .append(combinedSampleCount)
+                   .append(", but sample count is ")
+                   .append(sampleCount)
+                   .append(", combined TimeBytes ")
+                   .append(Hex.encodeHex(timeBytes))
+                   .append(", ")
+                   .append(timesList.size())
+                   .append(" chunks");
+            for (final byte[] bytes : timesList) {
+                builder.append(", ")
+                       .append(Hex.encodeHex(bytes));
+            }
+            log.error(builder.toString());
+        }
+        return timeBytes;
+    }
+
+    private byte[] combineTimelines(final List<byte[]> timesList) {
+        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        final DataOutputStream dataStream = new DataOutputStream(outputStream);
+        try {
+            int lastTime = 0;
+            int lastDelta = 0;
+            int repeatCount = 0;
+            int chunkCounter = 0;
+            for (byte[] times : timesList) {
+                final ByteArrayInputStream byteStream = new ByteArrayInputStream(times);
+                final DataInputStream byteDataStream = new DataInputStream(byteStream);
+                int byteCursor = 0;
+                while (true) {
+                    // Part 1: Get the opcode, and come up with newTime, newCount and newDelta
+                    final int opcode = byteDataStream.read();
+                    if (opcode == -1) {
+                        break;
+                    }
+                    byteCursor++;
+                    int newTime = 0;
+                    int newCount = 0;
+                    int newDelta = 0;
+                    boolean useNewDelta = false;
+                    boolean nonDeltaTime = false;
+                    if (opcode == TimelineOpcode.FULL_TIME.getOpcodeIndex()) {
+                        newTime = byteDataStream.readInt();
+                        if (newTime < lastTime) {
+                            log.warn("In TimelineCoder.combineTimeLines(), the fulltime read is %d, but the lastTime is %d; setting newTime to lastTime",
+                                     newTime, lastTime);
+                            newTime = lastTime;
+                        }
+                        byteCursor += 4;
+                        if (lastTime == 0) {
+                            writeTime(0, newTime, dataStream);
+                            lastTime = newTime;
+                            lastDelta = 0;
+                            repeatCount = 0;
+                            continue;
+                        } else if (newTime - lastTime <= TimelineOpcode.MAX_DELTA_TIME) {
+                            newDelta = newTime - lastTime;
+                            useNewDelta = true;
+                            newCount = 1;
+                        } else {
+                            nonDeltaTime = true;
+                        }
+                    } else if (opcode <= TimelineOpcode.MAX_DELTA_TIME) {
+                        newTime = lastTime + opcode;
+                        newDelta = opcode;
+                        useNewDelta = true;
+                        newCount = 1;
+                    } else if (opcode == TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex()) {
+                        newCount = byteDataStream.read();
+                        newDelta = byteDataStream.read();
+                        useNewDelta = true;
+                        byteCursor += 2;
+                        if (lastTime != 0) {
+                            newTime = lastTime + newDelta * newCount;
+                        } else {
+                            throw new IllegalStateException(String.format("In TimelineCoder.combineTimelines, lastTime is 0 byte opcode = %d, byteCursor %d, chunkCounter %d, chunk %s",
+                                                                          opcode, byteCursor, chunkCounter, new String(Hex.encodeHex(times))));
+                        }
+                    } else if (opcode == TimelineOpcode.REPEATED_DELTA_TIME_SHORT.getOpcodeIndex()) {
+                        newCount = byteDataStream.readUnsignedShort();
+                        newDelta = byteDataStream.read();
+                        useNewDelta = true;
+                        byteCursor += 3;
+                        if (lastTime != 0) {
+                            newTime = lastTime + newDelta * newCount;
+                        }
+                    } else {
+                        throw new IllegalStateException(String.format("In TimelineCoder.combineTimelines, Unrecognized byte opcode = %d, byteCursor %d, chunkCounter %d, chunk %s",
+                                                                      opcode, byteCursor, chunkCounter, new String(Hex.encodeHex(times))));
+                    }
+                    // Part 2: Combine existing state represented in lastTime, lastDelta and repeatCount with newTime, newCount and newDelta
+                    if (lastTime == 0) {
+                        log.error("In combineTimelines(), lastTime is 0; byteCursor {}, chunkCounter {}, times {}", new Object[]{byteCursor, chunkCounter, new String(Hex.encodeHex(times))});
+                    } else if (repeatCount > 0) {
+                        if (lastDelta == newDelta && newCount > 0) {
+                            repeatCount += newCount;
+                            lastTime = newTime;
+                        } else {
+                            writeRepeatedDelta(lastDelta, repeatCount, dataStream);
+                            if (useNewDelta) {
+                                lastDelta = newDelta;
+                                repeatCount = newCount;
+                                lastTime = newTime;
+                            } else {
+                                writeTime(lastTime, newTime, dataStream);
+                                lastTime = newTime;
+                                lastDelta = 0;
+                                repeatCount = 0;
+                            }
+                        }
+                    } else if (nonDeltaTime) {
+                        writeTime(lastTime, newTime, dataStream);
+                        lastTime = newTime;
+                        lastDelta = 0;
+                        repeatCount = 0;
+                    } else if (lastDelta == 0) {
+                        lastTime = newTime;
+                        repeatCount = newCount;
+                        lastDelta = newDelta;
+                    }
+                }
+                chunkCounter++;
+            }
+
+            if (repeatCount > 0) {
+                writeRepeatedDelta(lastDelta, repeatCount, dataStream);
+            }
+            dataStream.flush();
+
+            return outputStream.toByteArray();
+        } catch (Exception e) {
+            log.error("In combineTimesLines(), exception combining timelines", e);
+            return new byte[0];
+        }
+    }
+
+    @Override
+    public List<DateTime> decompressDateTimes(final byte[] compressedTimes) {
+        final List<DateTime> dateTimeList = new ArrayList<DateTime>(compressedTimes.length * 4);
+        final ByteArrayInputStream byteStream = new ByteArrayInputStream(compressedTimes);
+        final DataInputStream byteDataStream = new DataInputStream(byteStream);
+        int opcode;
+        int lastTime = 0;
+        try {
+            while (true) {
+                opcode = byteDataStream.read();
+                if (opcode == -1) {
+                    break;
+                }
+
+                if (opcode == TimelineOpcode.FULL_TIME.getOpcodeIndex()) {
+                    lastTime = byteDataStream.readInt();
+                    dateTimeList.add(DateTimeUtils.dateTimeFromUnixSeconds(lastTime));
+                } else if (opcode == TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex()) {
+                    final int repeatCount = byteDataStream.readUnsignedByte();
+                    final int delta = byteDataStream.readUnsignedByte();
+                    for (int i = 0; i < repeatCount; i++) {
+                        lastTime = lastTime + delta;
+                        dateTimeList.add(DateTimeUtils.dateTimeFromUnixSeconds(lastTime));
+                    }
+                } else if (opcode == TimelineOpcode.REPEATED_DELTA_TIME_SHORT.getOpcodeIndex()) {
+                    final int repeatCount = byteDataStream.readUnsignedShort();
+                    final int delta = byteDataStream.readUnsignedByte();
+                    for (int i = 0; i < repeatCount; i++) {
+                        lastTime = lastTime + delta;
+                        dateTimeList.add(DateTimeUtils.dateTimeFromUnixSeconds(lastTime));
+                    }
+                } else {
+                    // The opcode is itself a singleton delta
+                    lastTime = lastTime + opcode;
+                    dateTimeList.add(DateTimeUtils.dateTimeFromUnixSeconds(lastTime));
+                }
+            }
+        } catch (IOException e) {
+            log.error("In decompressTimes(), exception decompressing", e);
+        }
+
+        return dateTimeList;
+    }
+
+    @Override
+    public int countTimeBytesSamples(final byte[] timeBytes) {
+        int count = 0;
+        try {
+            final ByteArrayInputStream byteStream = new ByteArrayInputStream(timeBytes);
+            final DataInputStream byteDataStream = new DataInputStream(byteStream);
+            int opcode;
+            while ((opcode = byteDataStream.read()) != -1) {
+                if (opcode == TimelineOpcode.FULL_TIME.getOpcodeIndex()) {
+                    byteDataStream.readInt();
+                    count++;
+                } else if (opcode <= TimelineOpcode.MAX_DELTA_TIME) {
+                    count++;
+                } else if (opcode == TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex()) {
+                    count += byteDataStream.read();
+                    //noinspection ResultOfMethodCallIgnored
+                    byteDataStream.read();
+                } else if (opcode == TimelineOpcode.REPEATED_DELTA_TIME_SHORT.getOpcodeIndex()) {
+                    count += byteDataStream.readUnsignedShort();
+                    //noinspection ResultOfMethodCallIgnored
+                    byteDataStream.read();
+                } else {
+                    throw new IllegalStateException(String.format("In TimelineCoder.countTimeBytesSamples(), unrecognized opcode %d", opcode));
+                }
+            }
+            return count;
+        } catch (IOException e) {
+            log.error("IOException while counting timeline samples", e);
+            return count;
+        }
+    }
+
+    private void writeRepeatedDelta(final int delta, final int repeatCount, final DataOutputStream dataStream) throws IOException {
+        if (repeatCount > 1) {
+            if (repeatCount > MAX_BYTE_REPEAT_COUNT) {
+                dataStream.writeByte(TimelineOpcode.REPEATED_DELTA_TIME_SHORT.getOpcodeIndex());
+                dataStream.writeShort(repeatCount);
+            } else if (repeatCount == 2) {
+                dataStream.writeByte(delta);
+            } else {
+                dataStream.writeByte(TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex());
+                dataStream.writeByte(repeatCount);
+            }
+        }
+        dataStream.writeByte(delta);
+    }
+
+    private void writeTime(final int lastTime, final int newTime, final DataOutputStream dataStream) throws IOException {
+        if (newTime > lastTime) {
+            final int delta = (newTime - lastTime);
+            if (delta <= TimelineOpcode.MAX_DELTA_TIME) {
+                dataStream.writeByte(delta);
+            } else {
+                dataStream.writeByte(TimelineOpcode.FULL_TIME.getOpcodeIndex());
+                dataStream.writeInt(newTime);
+            }
+        } else if (newTime == lastTime) {
+            dataStream.writeByte(0);
+        }
+    }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/times/TimelineCoder.java b/usage/src/main/java/com/ning/billing/usage/timeline/times/TimelineCoder.java
new file mode 100644
index 0000000..f010417
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/times/TimelineCoder.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.times;
+
+import java.util.List;
+
+import org.joda.time.DateTime;
+
+public interface TimelineCoder {
+
+    /**
+     * Compress the list of DateTimes, producing the bytes of a timeline
+     *
+     * @param dateTimes a list of DateTimes to compress
+     * @return the bytes of the resulting timeline
+     */
+    public byte[] compressDateTimes(final List<DateTime> dateTimes);
+
+    /**
+     * Decompress the timeline bytes argument, returning a list of DateTimes
+     * Currently only used by tests.
+     *
+     * @param compressedTimes the timeline bytes
+     * @return a list of DateTimes representing the timeline times
+     */
+    public List<DateTime> decompressDateTimes(final byte[] compressedTimes);
+
+    /**
+     * Recode and combine the list of timeline byte objects, returning a single timeline.
+     * If the sampleCount is non-null and is not equal to the sum of the sample counts
+     * of the list of timelines, throw an error
+     *
+     * @param timesList   a list of timeline byte arrays
+     * @param sampleCount the expected count of samples for all timeline byte arrays
+     * @return the combined timeline
+     */
+    public byte[] combineTimelines(final List<byte[]> timesList, final Integer sampleCount);
+
+    /**
+     * Return a count of the time samples in the timeline provided
+     *
+     * @param timeBytes the bytes of a timeline
+     * @return the count of samples represented in the timeline
+     */
+    public int countTimeBytesSamples(final byte[] timeBytes);
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/times/TimelineOpcode.java b/usage/src/main/java/com/ning/billing/usage/timeline/times/TimelineOpcode.java
new file mode 100644
index 0000000..73b3420
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/times/TimelineOpcode.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.times;
+
+/**
+ * Opcodes are 1-byte entities.  Any "opcode" whose value is 240 or less is treated
+ * as a time delta to be added to the previous time value.
+ */
+public enum TimelineOpcode {
+    FULL_TIME(0xFF),                 // Followed by 4 bytes of int value
+    REPEATED_DELTA_TIME_BYTE(0xFE),  // Followed by a byte repeat count byte, 1-255, and then by a 1-byte delta whose value is 1-240
+    REPEATED_DELTA_TIME_SHORT(0xFD); // Followed by a repeat count short, 1-65535, and then by a 1-byte delta whose value is 1-240
+
+    public static final int MAX_DELTA_TIME = 0xF0;      // 240: Leaves room for 16 other opcodes, of which 3 are used
+
+    private final int opcodeIndex;
+
+    private TimelineOpcode(final int opcodeIndex) {
+        this.opcodeIndex = opcodeIndex;
+    }
+
+    public int getOpcodeIndex() {
+        return opcodeIndex;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("TimelineOpcode");
+        sb.append("{opcodeIndex=").append(opcodeIndex);
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/times/TestDefaultTimelineCoder.java b/usage/src/test/java/com/ning/billing/usage/timeline/times/TestDefaultTimelineCoder.java
new file mode 100644
index 0000000..44def78
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/times/TestDefaultTimelineCoder.java
@@ -0,0 +1,392 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.times;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.joda.time.DateTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.usage.timeline.DateTimeUtils;
+import com.ning.billing.usage.timeline.Hex;
+
+public class TestDefaultTimelineCoder {
+
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+
+    @Test(groups = "fast")
+    public void testBasicEncodeDecode() throws Exception {
+        final DateTime firstTime = DateTimeUtils.dateTimeFromUnixSeconds(1000000);
+        final List<DateTime> unencodedTimes = makeSomeTimes(firstTime);
+
+        final byte[] compressedTimes = timelineCoder.compressDateTimes(unencodedTimes);
+        //System.out.printf("Compressed times: %s\n", new String(Hex.encodeHex(compressedTimes)));
+        final List<DateTime> decompressedTimes = timelineCoder.decompressDateTimes(compressedTimes);
+        Assert.assertEquals(decompressedTimes.size(), unencodedTimes.size());
+        for (int i = 0; i < unencodedTimes.size(); i++) {
+            Assert.assertEquals(decompressedTimes.get(i), unencodedTimes.get(i));
+        }
+    }
+
+    private List<DateTime> makeSomeTimes(final DateTime firstTime) {
+        final List<DateTime> times = new ArrayList<DateTime>();
+        Collections.addAll(times, firstTime, firstTime.plusSeconds(5), firstTime.plusSeconds(5), firstTime.plusSeconds(5),
+                           firstTime.plusSeconds(1000), firstTime.plusSeconds(1000), firstTime.plusSeconds(2030), firstTime.plusSeconds(2060));
+        return times;
+    }
+
+    @Test(groups = "fast")
+    public void testRepeats() throws Exception {
+        final DateTime firstTime = DateTimeUtils.dateTimeFromUnixSeconds(1293846);
+        final List<DateTime> unencodedTimes = makeSomeTimes(firstTime);
+
+        final byte[] compressedTimes = timelineCoder.compressDateTimes(unencodedTimes);
+        final List<DateTime> decompressedTimes = timelineCoder.decompressDateTimes(compressedTimes);
+        Assert.assertEquals(decompressedTimes.size(), unencodedTimes.size());
+        for (int i = 0; i < unencodedTimes.size(); i++) {
+            Assert.assertEquals(decompressedTimes.get(i), unencodedTimes.get(i));
+        }
+    }
+
+    @Test(groups = "fast")
+    public void testCombiningTimelinesByteRepeats() throws Exception {
+        final int firstTime = 1293846;
+
+        final List<DateTime> unencodedTimes1 = new ArrayList<DateTime>();
+        final List<DateTime> unencodedTimes2 = new ArrayList<DateTime>();
+        final int sampleCount = 10;
+        for (int i = 0; i < sampleCount; i++) {
+            unencodedTimes1.add(DateTimeUtils.dateTimeFromUnixSeconds(firstTime + i * 100));
+            unencodedTimes2.add(DateTimeUtils.dateTimeFromUnixSeconds(firstTime + sampleCount * 100 + i * 100));
+        }
+        final byte[] compressedTimes1 = timelineCoder.compressDateTimes(unencodedTimes1);
+        final byte[] compressedTimes2 = timelineCoder.compressDateTimes(unencodedTimes2);
+        Assert.assertEquals(compressedTimes1.length, 8);
+        Assert.assertEquals(compressedTimes1[0] & 0xff, TimelineOpcode.FULL_TIME.getOpcodeIndex());
+        Assert.assertEquals(compressedTimes1[5] & 0xff, TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex());
+        Assert.assertEquals(compressedTimes1[6] & 0xff, 9);
+        Assert.assertEquals(compressedTimes1[7] & 0xff, 100);
+        Assert.assertEquals(compressedTimes2.length, 8);
+        Assert.assertEquals(compressedTimes2[0] & 0xff, TimelineOpcode.FULL_TIME.getOpcodeIndex());
+        Assert.assertEquals(compressedTimes2[5] & 0xff, TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex());
+        Assert.assertEquals(compressedTimes2[6] & 0xff, 9);
+        Assert.assertEquals(compressedTimes2[7] & 0xff, 100);
+        final List<byte[]> timesList = new ArrayList<byte[]>();
+        timesList.add(compressedTimes1);
+        timesList.add(compressedTimes2);
+        final byte[] combinedTimes = timelineCoder.combineTimelines(timesList, null);
+        Assert.assertEquals(combinedTimes.length, 8);
+        Assert.assertEquals(combinedTimes[0] & 0xff, TimelineOpcode.FULL_TIME.getOpcodeIndex());
+        Assert.assertEquals(combinedTimes[5] & 0xff, TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex());
+        Assert.assertEquals(combinedTimes[6] & 0xff, 19);
+        Assert.assertEquals(combinedTimes[7] & 0xff, 100);
+        // Check for 19, not 20, since the first full time took one
+        Assert.assertEquals(combinedTimes[6], 19);
+        Assert.assertEquals(timelineCoder.countTimeBytesSamples(combinedTimes), 20);
+    }
+
+    @Test(groups = "fast")
+    public void testCombiningTimelinesShortRepeats() throws Exception {
+        final int sampleCount = 240;
+        final int firstTime = 1293846;
+        final List<DateTime> unencodedTimes1 = new ArrayList<DateTime>();
+        final List<DateTime> unencodedTimes2 = new ArrayList<DateTime>();
+        for (int i = 0; i < sampleCount; i++) {
+            unencodedTimes1.add(DateTimeUtils.dateTimeFromUnixSeconds(firstTime + i * 100));
+            unencodedTimes2.add(DateTimeUtils.dateTimeFromUnixSeconds(firstTime + sampleCount * 100 + i * 100));
+        }
+        final byte[] compressedTimes1 = timelineCoder.compressDateTimes(unencodedTimes1);
+        final byte[] compressedTimes2 = timelineCoder.compressDateTimes(unencodedTimes2);
+        Assert.assertEquals(compressedTimes1.length, 8);
+        Assert.assertEquals(compressedTimes1[0] & 0xff, TimelineOpcode.FULL_TIME.getOpcodeIndex());
+        Assert.assertEquals(compressedTimes1[5] & 0xff, TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex());
+        Assert.assertEquals(compressedTimes1[6] & 0xff, sampleCount - 1);
+        Assert.assertEquals(compressedTimes1[7] & 0xff, 100);
+        Assert.assertEquals(compressedTimes2.length, 8);
+        Assert.assertEquals(compressedTimes2[0] & 0xff, TimelineOpcode.FULL_TIME.getOpcodeIndex());
+        Assert.assertEquals(compressedTimes2[5] & 0xff, TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex());
+        Assert.assertEquals(compressedTimes2[6] & 0xff, sampleCount - 1);
+        Assert.assertEquals(compressedTimes2[7] & 0xff, 100);
+        final List<byte[]> timesList = new ArrayList<byte[]>();
+        timesList.add(compressedTimes1);
+        timesList.add(compressedTimes2);
+        final byte[] combinedTimes = timelineCoder.combineTimelines(timesList, null);
+        Assert.assertEquals(combinedTimes.length, 9);
+        Assert.assertEquals(combinedTimes[0] & 0xff, TimelineOpcode.FULL_TIME.getOpcodeIndex());
+        Assert.assertEquals(combinedTimes[5] & 0xff, TimelineOpcode.REPEATED_DELTA_TIME_SHORT.getOpcodeIndex());
+        Assert.assertEquals(combinedTimes[6] & 0xff, 1);
+        Assert.assertEquals(combinedTimes[7] & 0xff, sampleCount * 2 - 1 - 256);
+        Assert.assertEquals(combinedTimes[8], 100);
+    }
+
+    @Test(groups = "fast")
+    public void testCombiningShortFragments() throws Exception {
+        final byte[] fragment0 = new byte[]{(byte) -1, (byte) 0, (byte) 15, (byte) 66, (byte) 84, (byte) 20};
+        final byte[] fragment1 = new byte[]{(byte) -1, (byte) 0, (byte) 15, (byte) 66, (byte) -122, (byte) 30};
+        final byte[] fragment2 = new byte[]{(byte) -1, (byte) 0, (byte) 15, (byte) 66, (byte) -62, (byte) 30};
+        final byte[] fragment3 = new byte[]{(byte) -1, (byte) 0, (byte) 15, (byte) 66, (byte) -2, (byte) 30};
+        final byte[][] fragmentArray = new byte[][]{fragment0, fragment1, fragment2, fragment3};
+        final byte[] combined = timelineCoder.combineTimelines(Arrays.asList(fragmentArray), null);
+        final List<DateTime> restoredTimes = timelineCoder.decompressDateTimes(combined);
+        final List<List<DateTime>> fragmentIntTimes = new ArrayList<List<DateTime>>();
+        final List<DateTime> allFragmentTimes = new ArrayList<DateTime>();
+        int totalLength = 0;
+        for (final byte[] aFragmentArray : fragmentArray) {
+            final List<DateTime> fragmentTimes = timelineCoder.decompressDateTimes(aFragmentArray);
+            fragmentIntTimes.add(fragmentTimes);
+            totalLength += fragmentTimes.size();
+            for (final DateTime time : fragmentTimes) {
+                allFragmentTimes.add(time);
+            }
+        }
+        Assert.assertEquals(restoredTimes.size(), totalLength);
+        for (int i = 0; i < totalLength; i++) {
+            Assert.assertEquals(restoredTimes.get(i), allFragmentTimes.get(i));
+        }
+    }
+
+    @Test(groups = "fast")
+    public void testCombiningTimelinesRandomRepeats() throws Exception {
+        final int[] increments = new int[]{30, 45, 10, 30, 20};
+        final int[] repetitions = new int[]{1, 2, 3, 4, 5, 240, 250, 300};
+        final int firstTimeInt = 1000000;
+        final DateTime startTime = DateTimeUtils.dateTimeFromUnixSeconds(firstTimeInt);
+        final List<DateTime> dateTimes = new ArrayList<DateTime>();
+        final Random rand = new Random(0);
+        DateTime nextTime = startTime;
+        int count = 0;
+        for (int i = 0; i < 20; i++) {
+            final int increment = increments[rand.nextInt(increments.length)];
+            final int repetition = repetitions[rand.nextInt(repetitions.length)];
+            for (int r = 0; i < repetition; i++) {
+                nextTime = nextTime.plusSeconds(increment);
+                dateTimes.add(nextTime);
+                count++;
+            }
+        }
+        final byte[] allCompressedTime = timelineCoder.compressDateTimes(dateTimes);
+        final List<DateTime> restoredTimes = timelineCoder.decompressDateTimes(allCompressedTime);
+        Assert.assertEquals(restoredTimes.size(), dateTimes.size());
+        for (int i = 0; i < count; i++) {
+            Assert.assertEquals(restoredTimes.get(i), dateTimes.get(i));
+        }
+        for (int fragmentLength = 2; fragmentLength < count / 2; fragmentLength++) {
+            final List<byte[]> fragments = new ArrayList<byte[]>();
+            final int fragmentCount = (int) Math.ceil((double) count / (double) fragmentLength);
+            for (int fragCounter = 0; fragCounter < fragmentCount; fragCounter++) {
+                final int fragIndex = fragCounter * fragmentLength;
+                final List<DateTime> fragment = dateTimes.subList(fragIndex, Math.min(count, fragIndex + fragmentLength));
+                fragments.add(timelineCoder.compressDateTimes(fragment));
+            }
+            final byte[] combined = timelineCoder.combineTimelines(fragments, null);
+            final List<DateTime> restoredDateTimes = timelineCoder.decompressDateTimes(combined);
+            //Assert.assertEquals(intTimes.length, count);
+            for (int i = 0; i < count; i++) {
+                Assert.assertEquals(restoredDateTimes.get(i), dateTimes.get(i));
+            }
+        }
+    }
+
+    @Test(groups = "fast")
+    public void test65KRepeats() throws Exception {
+        final int count = 0;
+        final List<DateTime> dateTimes = new ArrayList<DateTime>();
+        DateTime time = DateTimeUtils.dateTimeFromUnixSeconds(1000000);
+        for (int i = 0; i < 20; i++) {
+            time = time.plusSeconds(200);
+            dateTimes.add(time);
+        }
+        for (int i = 0; i < 0xFFFF + 100; i++) {
+            time = time.plusSeconds(100);
+            dateTimes.add(time);
+        }
+        final byte[] timeBytes = timelineCoder.compressDateTimes(dateTimes);
+        final String hex = new String(Hex.encodeHex(timeBytes));
+        // Here are the compressed samples: ff000f4308fe13c8fdffff64fe6464
+        // Translation:
+        // [ff 00 0f 43 08] means absolution time 1000000
+        // [fe 13 c8] means repeat 19 times delta 200 seconds
+        // [fd ff ff 64] means repeat 65525 times delta 100 seconds
+        // [fe 64 64] means repeat 100 times delta 100 seconds
+        Assert.assertEquals(timeBytes, Hex.decodeHex("ff000f4308fe13c8fdffff64fe6464".toCharArray()));
+        final List<DateTime> restoredSamples = timelineCoder.decompressDateTimes(timeBytes);
+        Assert.assertEquals(restoredSamples.size(), dateTimes.size());
+        for (int i = 0; i < count; i++) {
+            Assert.assertEquals(restoredSamples.get(i), DateTimeUtils.unixSeconds(dateTimes.get(i)));
+        }
+    }
+
+    @Test(groups = "fast")
+    public void testCombiningTimesError() throws Exception {
+        final byte[] times1 = Hex.decodeHex("ff10000001fe0310ff1000011bfe0310".toCharArray());
+        final byte[] times2 = Hex.decodeHex("ff10000160".toCharArray());
+        final List<byte[]> timesList = new ArrayList<byte[]>();
+        timesList.add(times1);
+        timesList.add(times2);
+        final byte[] combinedTimes = timelineCoder.combineTimelines(timesList, null);
+        final String hexCombinedTimes = new String(Hex.encodeHex(combinedTimes));
+        //System.out.printf("Combined times: %s\n", hexCombinedTimes);
+        Assert.assertEquals(hexCombinedTimes, "ff10000001fe0310eafe031015");
+    }
+
+    @Test(groups = "fast")
+    public void testTimeCursorWithZeroDeltaWithNext() throws Exception {
+        // This caused a TimeCursor problem
+        // FF 4F 91 D5 BC FE 02 1E 00 FE 02 1E FF 79 0B 44 22
+        // FF 4F 91 D5 BC FE 02 1E 00 FE 02 1E
+        // FF 4F 91 D5 BC          Absolute time
+        // FE 02 1E                Repeated delta time: count 2, delta: 30
+        // 00                      Delta 0.  Why did this happen?
+        // FE 02 1E                Repeated delta time: count 2, delta: 30
+        // FF 79 0B 44 22          Absolute time
+        // Total samples: 6
+        final int sampleCount = 7;
+        final byte[] times = Hex.decodeHex("FF4F91D5BCFE021E00FE021EFF790B4422".toCharArray());
+        final TimelineCursorImpl cursor = new TimelineCursorImpl(times, sampleCount);
+        for (int i = 0; i < sampleCount; i++) {
+            final DateTime nextTime = cursor.getNextTime();
+            Assert.assertNotNull(nextTime);
+        }
+        try {
+            final DateTime lastTime = cursor.getNextTime();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(true);
+        }
+    }
+
+    @Test(groups = "fast")
+    public void testTimeCursorWithZeroDeltaWithSampleSkip() throws Exception {
+        // This caused a TimeCursor problem
+        // FF 4F 91 D5 BC FE 02 1E 00 FE 02 1E FF 79 0B 44 22
+        // FF 4F 91 D5 BC FE 02 1E 00 FE 02 1E
+        // FF 4F 91 D5 BC          Absolute time
+        // FE 02 1E                Repeated delta time: count 2, delta: 30
+        // 00                      Delta 0.  Why did this happen?
+        // FE 02 1E                Repeated delta time: count 2, delta: 30
+        // FF 79 0B 44 22          Absolute time
+        // Total samples: 6
+        final int sampleCount = 7;
+        final byte[] times = Hex.decodeHex("FF4F91D5BCFE021E00FE021EFF790B4422".toCharArray());
+        final TimelineCursorImpl cursor = new TimelineCursorImpl(times, sampleCount);
+        for (int i = 0; i < sampleCount; i++) {
+            final DateTime nextTime = cursor.getNextTime();
+            Assert.assertNotNull(nextTime);
+            cursor.skipToSampleNumber(i + 1);
+        }
+        try {
+            final DateTime lastTime = cursor.getNextTime();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(true);
+        }
+    }
+
+    @Test(groups = "fast")
+    public void testTimeCursorThatShowedError() throws Exception {
+        // 39 bytes are: ff4f90f67afd03ce1e1ffe1a1e1d01fe771e1d01fd01df1e1d1ffe761e1d01fe771e1d01fe571e
+        // 1944 samples; error at 1934
+        final int sampleCount = 1944;
+        //final byte[] times = Hex.decodeHex("ff4f90f67afd03ce1e1ffe1a1e1d01fe771e1d01fd01df1e1d1ffe761e1d01fe771e1d01fe571e".toCharArray());
+        final byte[] times = Hex.decodeHex("00000018FF4F8FE521FD023D1E1FFEF01E1D01FE771E1D01FD03E21EFE07980F".toCharArray());
+        Assert.assertEquals(times.length, 32);
+        final TimelineCursorImpl cursor = new TimelineCursorImpl(times, sampleCount);
+        for (int i = 0; i < sampleCount; i++) {
+            final DateTime nextTime = cursor.getNextTime();
+            Assert.assertNotNull(nextTime);
+            cursor.skipToSampleNumber(i + 1);
+        }
+
+        try {
+            final DateTime lastTime = cursor.getNextTime();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(true);
+        }
+    }
+
+    @Test(groups = "fast")
+    public void testTimeCombineTimesError1() throws Exception {
+        checkCombinedTimelines("ff4f91fb14fe631e00fe151e", "ff4f920942");
+    }
+
+    @Test(groups = "fast")
+    public void testTimeCombineTimesError2() throws Exception {
+        checkCombinedTimelines("ff4f922618fe111e78fe111efe02005a", "ff4f923428");
+    }
+
+    @Test(groups = "fast")
+    public void testTimeCombineTimesError3() throws Exception {
+        checkCombinedTimelines("ff4f9224ecfe091e", "ff4f922618fe071e78fe111e78fe111e78fe111e78fe111efe02005afe121e78fe031e",
+                               "ff4f923428fe0d1e7dfe111e78fe111e78fe111e78fe0b1e00fe061e78fe111e", "ff4f92427cfe111e78fe111e78fe111e78fe111e82fe041e1d01fe0c1e78fe0e1e");
+    }
+
+    @Test(groups = "fast")
+    public void testTimeCombineTimesError4() throws Exception {
+        checkCombinedTimelines("ff4f95ba83fe021e", "ff4f95d595", "ff4f95e297fe021e");
+    }
+
+    @Test(groups = "fast")
+    public void testTimeCombineTimesError5() throws Exception {
+        checkCombinedTimelines("ff00000100", "ff00000200");
+    }
+
+    @Test(groups = "fast")
+    public void testTimeCombineTimesError6() throws Exception {
+        checkCombinedTimelines("ff4f95ac73fe471e00fe301e", "ff4f95ba83fe471e00fe311e", "ff4f95d595", "ff4f95e297fe091e");
+        checkCombinedTimelines("ff4f95ac7afe461e1d01fe301e", "ff4f95ba8afe471e00fe041e1ffe2b1e", "ff4f95d59d", "ff4f95e281fe0a1e");
+        checkCombinedTimelines("ff4f95aca4fe461e00fe311e", "ff4f95bab4fe461e00fe261e1f1dfe0a1e", "ff4f95d5a8", "ff4f95e28cfe091e");
+        checkCombinedTimelines("ff4f95ac88fe471e00fe311e", "ff4f95bab6fe461e00fe321e", "ff4f95d5aa", "ff4f95e28efe091eff4f95e4e6fe0a1e");
+        checkCombinedTimelines("ff4f95e394ff4f95e4fcfe0e1e5afe341e00fe221e", "ff4f95f12cfe551e00fe221e", "ff4f95ff3cfe551e00fe231e", "ff4f960d6afe541e00fe231e");
+        checkCombinedTimelines("ff4f95e396ff4f95e4fefe0e1e5afe341e00fe271e", "ff4f95f1c4fe501e00fe281e", "ff4f95fff2fe4f1e00fe281e", "ff4f960e02fe4f1e00fe291e");
+    }
+
+    private void checkCombinedTimelines(final String... timelines) throws Exception {
+        final List<byte[]> timeParts = new ArrayList<byte[]>();
+        for (final String timeline : timelines) {
+            timeParts.add(Hex.decodeHex(timeline.toCharArray()));
+        }
+        int sampleCount = 0;
+        int byteCount = 0;
+        for (final byte[] timePart : timeParts) {
+            byteCount += timePart.length;
+            sampleCount += timelineCoder.countTimeBytesSamples(timePart);
+        }
+        final byte[] concatedTimes = new byte[byteCount];
+        int offset = 0;
+        for (final byte[] timePart : timeParts) {
+            final int length = timePart.length;
+            System.arraycopy(timePart, 0, concatedTimes, offset, length);
+            offset += length;
+        }
+        final byte[] newCombined = timelineCoder.combineTimelines(timeParts, null);
+        final int newCombinedLength = timelineCoder.countTimeBytesSamples(newCombined);
+        final TimelineCursorImpl concatedCursor = new TimelineCursorImpl(concatedTimes, sampleCount);
+        final TimelineCursorImpl combinedCursor = new TimelineCursorImpl(newCombined, sampleCount);
+        for (int i = 0; i < sampleCount; i++) {
+            final DateTime concatedTime = concatedCursor.getNextTime();
+            final DateTime combinedTime = combinedCursor.getNextTime();
+            Assert.assertEquals(combinedTime, concatedTime);
+        }
+        Assert.assertEquals(newCombinedLength, sampleCount);
+    }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/times/TestTimelineOpcode.java b/usage/src/test/java/com/ning/billing/usage/timeline/times/TestTimelineOpcode.java
new file mode 100644
index 0000000..afa0037
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/times/TestTimelineOpcode.java
@@ -0,0 +1,14 @@
+package com.ning.billing.usage.timeline.times;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestTimelineOpcode {
+
+    @Test(groups = "fast")
+    public void testMaxDeltaTime() throws Exception {
+        for (final TimelineOpcode opcode : TimelineOpcode.values()) {
+            Assert.assertTrue(opcode.getOpcodeIndex() >= TimelineOpcode.MAX_DELTA_TIME);
+        }
+    }
+}