Details
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/codec/DefaultSampleCoder.java b/usage/src/main/java/com/ning/billing/usage/timeline/codec/DefaultSampleCoder.java
new file mode 100644
index 0000000..43a0763
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/codec/DefaultSampleCoder.java
@@ -0,0 +1,526 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.samples.HalfFloat;
+import com.ning.billing.usage.timeline.samples.RepeatSample;
+import com.ning.billing.usage.timeline.samples.SampleBase;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.times.DefaultTimelineCursor;
+import com.ning.billing.usage.timeline.times.TimelineCursor;
+
+/**
+ * Instances of this class encode sample streams. In addition, this class
+ * contains a collection of static methods providing lower-level encoding plumbing
+ */
+@SuppressWarnings("unchecked")
+public class DefaultSampleCoder implements SampleCoder {
+
+ private static final Logger log = LoggerFactory.getLogger(DefaultSampleCoder.class);
+ private static final BigInteger BIGINTEGER_ZERO_VALUE = new BigInteger("0");
+ private static final ScalarSample<Void> DOUBLE_ZERO_SAMPLE = new ScalarSample<Void>(SampleOpcode.DOUBLE_ZERO, null);
+ private static final ScalarSample<Void> INT_ZERO_SAMPLE = new ScalarSample<Void>(SampleOpcode.INT_ZERO, null);
+
+ // TODO: Figure out if 1/200 is an acceptable level of inaccuracy
+ // For the HalfFloat, which has a 10-bit mantissa, this means that it could differ
+ // in the last 3 bits of the mantissa and still be treated as matching.
+ public static final double MAX_FRACTION_ERROR = 1.0 / 200.0;
+ public static final double HALF_MAX_FRACTION_ERROR = MAX_FRACTION_ERROR / 2.0;
+
+ private static final double MIN_BYTE_DOUBLE_VALUE = ((double) Byte.MIN_VALUE) * (1.0 + HALF_MAX_FRACTION_ERROR);
+ private static final double MAX_BYTE_DOUBLE_VALUE = ((double) Byte.MAX_VALUE) * (1.0 + HALF_MAX_FRACTION_ERROR);
+
+ private static final double MIN_SHORT_DOUBLE_VALUE = ((double) Short.MIN_VALUE) * (1.0 + HALF_MAX_FRACTION_ERROR);
+ private static final double MAX_SHORT_DOUBLE_VALUE = ((double) Short.MAX_VALUE) * (1.0 + HALF_MAX_FRACTION_ERROR);
+
+ @SuppressWarnings("unused")
+ private static final double INVERSE_MAX_FRACTION_ERROR = 1.0 / MAX_FRACTION_ERROR;
+
+ @Override
+ public byte[] compressSamples(final List<ScalarSample> samples) {
+ final SampleAccumulator accumulator = new SampleAccumulator(this);
+ accumulator.addSampleList(samples);
+ return accumulator.getEncodedSamples().getEncodedBytes();
+ }
+
+ @Override
+ public List<ScalarSample> decompressSamples(final byte[] sampleBytes) throws IOException {
+ final List<ScalarSample> returnedSamples = new ArrayList<ScalarSample>();
+ final ByteArrayInputStream byteStream = new ByteArrayInputStream(sampleBytes);
+ final DataInputStream inputStream = new DataInputStream(byteStream);
+ while (true) {
+ final int opcodeByte;
+ opcodeByte = inputStream.read();
+ if (opcodeByte == -1) {
+ break; // At "eof"
+ }
+ final SampleOpcode opcode = SampleOpcode.getOpcodeFromIndex(opcodeByte);
+ switch (opcode) {
+ case REPEAT_BYTE:
+ case REPEAT_SHORT:
+ final int repeatCount = opcode == SampleOpcode.REPEAT_BYTE ? inputStream.readUnsignedByte() : inputStream.readUnsignedShort();
+ final SampleOpcode repeatedOpcode = SampleOpcode.getOpcodeFromIndex(inputStream.read());
+ final Object value = decodeScalarValue(inputStream, repeatedOpcode);
+ for (int i = 0; i < repeatCount; i++) {
+ returnedSamples.add(new ScalarSample(repeatedOpcode, value));
+ }
+ break;
+ default:
+ returnedSamples.add(new ScalarSample(opcode, decodeScalarValue(inputStream, opcode)));
+ break;
+ }
+ }
+ return returnedSamples;
+ }
+
+
+ /**
+ * This method writes the binary encoding of the sample to the outputStream. This encoding
+ * is the form saved in the db and scanned when read from the db.
+ *
+ * @param outputStream the stream to which bytes should be written
+ * @param sample the sample to be written
+ */
+ @Override
+ public void encodeSample(final DataOutputStream outputStream, final SampleBase sample) {
+ final SampleOpcode opcode = sample.getOpcode();
+ try {
+ // First put out the opcode value
+ switch (opcode) {
+ case REPEAT_BYTE:
+ case REPEAT_SHORT:
+ final RepeatSample r = (RepeatSample) sample;
+ final ScalarSample repeatee = r.getSampleRepeated();
+ outputStream.write(opcode.getOpcodeIndex());
+ if (opcode == SampleOpcode.REPEAT_BYTE) {
+ outputStream.write(r.getRepeatCount());
+ } else {
+ outputStream.writeShort(r.getRepeatCount());
+ }
+ encodeScalarValue(outputStream, repeatee.getOpcode(), repeatee.getSampleValue());
+ case NULL:
+ break;
+ default:
+ if (sample instanceof ScalarSample) {
+ encodeScalarValue(outputStream, opcode, ((ScalarSample) sample).getSampleValue());
+ } else {
+ log.error("In encodeSample, opcode {} is not ScalarSample; instead {}", opcode.name(), sample.getClass().getName());
+ }
+ }
+ } catch (IOException e) {
+ log.error(String.format("In encodeSample, IOException encoding opcode %s and value %s", opcode.name(), String.valueOf(sample)), e);
+ }
+ }
+
+ /**
+ * Output the scalar value into the output stream
+ *
+ * @param outputStream the stream to which bytes should be written
+ * @param value the sample value, interpreted according to the opcode
+ */
+ @Override
+ public void encodeScalarValue(final DataOutputStream outputStream, final SampleOpcode opcode, final Object value) {
+ try {
+ outputStream.write(opcode.getOpcodeIndex());
+ switch (opcode) {
+ case NULL:
+ case DOUBLE_ZERO:
+ case INT_ZERO:
+ break;
+ case BYTE:
+ case BYTE_FOR_DOUBLE:
+ outputStream.writeByte((Byte) value);
+ break;
+ case SHORT:
+ case SHORT_FOR_DOUBLE:
+ case HALF_FLOAT_FOR_DOUBLE:
+ outputStream.writeShort((Short) value);
+ break;
+ case INT:
+ outputStream.writeInt((Integer) value);
+ break;
+ case LONG:
+ outputStream.writeLong((Long) value);
+ break;
+ case FLOAT:
+ case FLOAT_FOR_DOUBLE:
+ outputStream.writeFloat((Float) value);
+ break;
+ case DOUBLE:
+ outputStream.writeDouble((Double) value);
+ break;
+ case STRING:
+ final String s = (String) value;
+ final byte[] bytes = s.getBytes("UTF-8");
+ outputStream.writeShort(s.length());
+ outputStream.write(bytes, 0, bytes.length);
+ break;
+ case BIGINT:
+ final String bs = value.toString();
+ // Only support bigints whose length can be encoded as a short
+ if (bs.length() > Short.MAX_VALUE) {
+ throw new IllegalStateException(String.format("In DefaultSampleCoder.encodeScalarValue(), the string length of the BigInteger is %d; too large to be represented in a Short", bs.length()));
+ }
+ final byte[] bbytes = bs.getBytes("UTF-8");
+ outputStream.writeShort(bs.length());
+ outputStream.write(bbytes, 0, bbytes.length);
+ break;
+ default:
+ final String err = String.format("In encodeScalarSample, opcode %s is unrecognized", opcode.name());
+ log.error(err);
+ throw new IllegalArgumentException(err);
+ }
+ } catch (IOException e) {
+ log.error(String.format("In encodeScalarValue, IOException encoding opcode %s and value %s", opcode.name(), String.valueOf(value)), e);
+ }
+ }
+
+ /**
+ * This routine returns a ScalarSample that may have a smaller representation than the
+ * ScalarSample argument. In particular, if tries hard to choose the most compact
+ * representation of double-precision values.
+ *
+ * @param sample A ScalarSample to be compressed
+ * @return Either the same ScalarSample is that input, for for some cases of opcode DOUBLE,
+ * a more compact ScalarSample which when processed returns a double value.
+ */
+ @Override
+ public ScalarSample compressSample(final ScalarSample sample) {
+ switch (sample.getOpcode()) {
+ case INT:
+ final int intValue = (Integer) sample.getSampleValue();
+ if (intValue == 0) {
+ return INT_ZERO_SAMPLE;
+ } else if (intValue >= Byte.MIN_VALUE && intValue <= Byte.MAX_VALUE) {
+ return new ScalarSample(SampleOpcode.BYTE, (byte) intValue);
+ } else if (intValue >= Short.MIN_VALUE && intValue <= Short.MAX_VALUE) {
+ return new ScalarSample(SampleOpcode.SHORT, (short) intValue);
+ } else {
+ return sample;
+ }
+ case LONG:
+ final long longValue = (Long) sample.getSampleValue();
+ if (longValue == 0) {
+ return INT_ZERO_SAMPLE;
+ } else if (longValue >= Byte.MIN_VALUE && longValue <= Byte.MAX_VALUE) {
+ return new ScalarSample(SampleOpcode.BYTE, (byte) longValue);
+ } else if (longValue >= Short.MIN_VALUE && longValue <= Short.MAX_VALUE) {
+ return new ScalarSample(SampleOpcode.SHORT, (short) longValue);
+ } else if (longValue >= Integer.MIN_VALUE && longValue <= Integer.MAX_VALUE) {
+ return new ScalarSample(SampleOpcode.INT, (int) longValue);
+ } else {
+ return sample;
+ }
+ case BIGINT:
+ final BigInteger bigValue = (BigInteger) sample.getSampleValue();
+ if (bigValue.compareTo(BIGINTEGER_ZERO_VALUE) == 0) {
+ return INT_ZERO_SAMPLE;
+ }
+ final int digits = 1 + bigValue.bitCount();
+ if (digits <= 8) {
+ return new ScalarSample(SampleOpcode.BYTE, (byte) bigValue.intValue());
+ } else if (digits <= 16) {
+ return new ScalarSample(SampleOpcode.SHORT, (short) bigValue.intValue());
+ } else if (digits <= 32) {
+ return new ScalarSample(SampleOpcode.INT, bigValue.intValue());
+ } else if (digits <= 64) {
+ return new ScalarSample(SampleOpcode.LONG, bigValue.longValue());
+ } else {
+ return sample;
+ }
+ case FLOAT:
+ return encodeFloatOrDoubleSample(sample, (double) ((Float) sample.getSampleValue()));
+ case DOUBLE:
+ return encodeFloatOrDoubleSample(sample, (Double) sample.getSampleValue());
+ default:
+ return sample;
+ }
+ }
+
+ private ScalarSample encodeFloatOrDoubleSample(final ScalarSample sample, final double value) {
+ // We prefer representations in the following order: byte, HalfFloat, short, float and int
+ // The criterion for using each representation is the fractional error
+ if (value == 0.0) {
+ return DOUBLE_ZERO_SAMPLE;
+ }
+ final boolean integral = value >= MIN_SHORT_DOUBLE_VALUE && value <= MAX_SHORT_DOUBLE_VALUE && (Math.abs((value - (double) ((int) value)) / value) <= MAX_FRACTION_ERROR);
+ if (integral && value >= MIN_BYTE_DOUBLE_VALUE && value <= MAX_BYTE_DOUBLE_VALUE) {
+ return new ScalarSample<Byte>(SampleOpcode.BYTE_FOR_DOUBLE, (byte) value);
+ } else if (integral && value >= MIN_SHORT_DOUBLE_VALUE && value <= MAX_SHORT_DOUBLE_VALUE) {
+ return new ScalarSample<Short>(SampleOpcode.SHORT_FOR_DOUBLE, (short) value);
+ } else {
+ final int halfFloatValue = HalfFloat.fromFloat((float) value);
+ if ((Math.abs(value - HalfFloat.toFloat(halfFloatValue)) / value) <= MAX_FRACTION_ERROR) {
+ return new ScalarSample<Short>(SampleOpcode.HALF_FLOAT_FOR_DOUBLE, (short) halfFloatValue);
+ } else if (value >= Float.MIN_VALUE && value <= Float.MAX_VALUE) {
+ return new ScalarSample<Float>(SampleOpcode.FLOAT_FOR_DOUBLE, (float) value);
+ } else {
+ return sample;
+ }
+ }
+ }
+
+ @Override
+ public Object decodeScalarValue(final DataInputStream inputStream, final SampleOpcode opcode) throws IOException {
+ switch (opcode) {
+ case NULL:
+ return null;
+ case DOUBLE_ZERO:
+ return 0.0;
+ case INT_ZERO:
+ return 0;
+ case BYTE:
+ return inputStream.readByte();
+ case SHORT:
+ return inputStream.readShort();
+ case INT:
+ return inputStream.readInt();
+ case LONG:
+ return inputStream.readLong();
+ case FLOAT:
+ return inputStream.readFloat();
+ case DOUBLE:
+ return inputStream.readDouble();
+ case STRING:
+ final short s = inputStream.readShort();
+ final byte[] bytes = new byte[s];
+ final int byteCount = inputStream.read(bytes, 0, s);
+ if (byteCount != s) {
+ log.error("Reading string came up short");
+ }
+ return new String(bytes, "UTF-8");
+ case BIGINT:
+ final short bs = inputStream.readShort();
+ final byte[] bbytes = new byte[bs];
+ final int bbyteCount = inputStream.read(bbytes, 0, bs);
+ if (bbyteCount != bs) {
+ log.error("Reading bigint came up short");
+ }
+ return new BigInteger(new String(bbytes, "UTF-8"), 10);
+ case BYTE_FOR_DOUBLE:
+ return (double) inputStream.readByte();
+ case SHORT_FOR_DOUBLE:
+ return (double) inputStream.readShort();
+ case FLOAT_FOR_DOUBLE:
+ final float floatForDouble = inputStream.readFloat();
+ return (double) floatForDouble;
+ case HALF_FLOAT_FOR_DOUBLE:
+ final float f = HalfFloat.toFloat(inputStream.readShort());
+ return (double) f;
+ default:
+ final String err = String.format("In decodeScalarSample, opcode %s unrecognized", opcode.name());
+ log.error(err);
+ throw new IllegalArgumentException(err);
+ }
+ }
+
+ /*
+ * This differs from decodeScalarValue because this delivers exactly the
+ * type in the byte stream. Specifically, it does not convert the arg
+ * of *_FOR_DOUBLE int a Double()
+ */
+ private Object decodeOpcodeArg(final DataInputStream inputStream, final SampleOpcode opcode) throws IOException {
+ switch (opcode) {
+ case NULL:
+ return null;
+ case DOUBLE_ZERO:
+ return 0.0;
+ case INT_ZERO:
+ return 0;
+ case BYTE:
+ return inputStream.readByte();
+ case SHORT:
+ return inputStream.readShort();
+ case INT:
+ return inputStream.readInt();
+ case LONG:
+ return inputStream.readLong();
+ case FLOAT:
+ return inputStream.readFloat();
+ case DOUBLE:
+ return inputStream.readDouble();
+ case STRING:
+ final short s = inputStream.readShort();
+ final byte[] bytes = new byte[s];
+ final int byteCount = inputStream.read(bytes, 0, s);
+ if (byteCount != s) {
+ log.error("Reading string came up short");
+ }
+ return new String(bytes, "UTF-8");
+ case BIGINT:
+ final short bs = inputStream.readShort();
+ final byte[] bbytes = new byte[bs];
+ final int bbyteCount = inputStream.read(bbytes, 0, bs);
+ if (bbyteCount != bs) {
+ log.error("Reading bigint came up short");
+ }
+ return new BigInteger(new String(bbytes, "UTF-8"), 10);
+ case BYTE_FOR_DOUBLE:
+ return inputStream.readByte();
+ case SHORT_FOR_DOUBLE:
+ return inputStream.readShort();
+ case FLOAT_FOR_DOUBLE:
+ return inputStream.readFloat();
+ case HALF_FLOAT_FOR_DOUBLE:
+ return inputStream.readShort();
+ default:
+ final String err = String.format("In decodeOpcodeArg(), opcode %s unrecognized", opcode.name());
+ log.error(err);
+ throw new IllegalArgumentException(err);
+ }
+ }
+
+ @Override
+ public double getMaxFractionError() {
+ return MAX_FRACTION_ERROR;
+ }
+
+ @Override
+ public byte[] combineSampleBytes(final List<byte[]> sampleBytesList) {
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final DataOutputStream dataStream = new DataOutputStream(outputStream);
+ try {
+ SampleBase lastSample = null;
+ for (final byte[] samples : sampleBytesList) {
+ final ByteArrayInputStream byteStream = new ByteArrayInputStream(samples);
+ final DataInputStream byteDataStream = new DataInputStream(byteStream);
+ while (true) {
+ final int opcodeByte = byteDataStream.read();
+ if (opcodeByte == -1) {
+ break;
+ }
+ final SampleOpcode opcode = SampleOpcode.getOpcodeFromIndex(opcodeByte);
+ switch (opcode) {
+ case REPEAT_BYTE:
+ case REPEAT_SHORT:
+ final int newRepeatCount = opcode == SampleOpcode.REPEAT_BYTE ? byteDataStream.read() : byteDataStream.readUnsignedShort();
+ final SampleOpcode newRepeatedOpcode = SampleOpcode.getOpcodeFromIndex(byteDataStream.read());
+ final Object newValue = decodeOpcodeArg(byteDataStream, newRepeatedOpcode);
+ final ScalarSample newRepeatedSample = new ScalarSample(newRepeatedOpcode, newValue);
+ if (lastSample == null) {
+ lastSample = new RepeatSample(newRepeatCount, new ScalarSample(newRepeatedOpcode, newValue));
+ } else if (lastSample instanceof RepeatSample) {
+ final RepeatSample repeatSample = (RepeatSample) lastSample;
+ final ScalarSample repeatedScalarSample = repeatSample.getSampleRepeated();
+ if (repeatedScalarSample.getOpcode() == newRepeatedOpcode &&
+ (newRepeatedOpcode.getNoArgs() ||
+ (ScalarSample.sameSampleValues(repeatedScalarSample.getSampleValue(), newValue) &&
+ repeatSample.getRepeatCount() + newRepeatCount < RepeatSample.MAX_SHORT_REPEAT_COUNT))) {
+ // We can just increment the count in the repeat instance
+ repeatSample.incrementRepeatCount(newRepeatCount);
+ } else {
+ encodeSample(dataStream, lastSample);
+ lastSample = new RepeatSample(newRepeatCount, newRepeatedSample);
+ }
+ } else if (lastSample.equals(newRepeatedSample)) {
+ lastSample = new RepeatSample(newRepeatCount + 1, newRepeatedSample);
+ } else {
+ encodeSample(dataStream, lastSample);
+ lastSample = new RepeatSample(newRepeatCount, newRepeatedSample);
+ }
+ break;
+ default:
+ final ScalarSample newSample = new ScalarSample(opcode, decodeOpcodeArg(byteDataStream, opcode));
+ if (lastSample == null) {
+ lastSample = newSample;
+ } else if (lastSample instanceof RepeatSample) {
+ final RepeatSample repeatSample = (RepeatSample) lastSample;
+ final ScalarSample repeatedScalarSample = repeatSample.getSampleRepeated();
+ if (newSample.equals(repeatedScalarSample)) {
+ repeatSample.incrementRepeatCount();
+ } else {
+ encodeSample(dataStream, lastSample);
+ lastSample = newSample;
+ }
+ } else if (lastSample.equals(newSample)) {
+ lastSample = new RepeatSample(2, newSample);
+ } else {
+ encodeSample(dataStream, lastSample);
+ lastSample = newSample;
+ }
+ }
+ }
+ }
+ if (lastSample != null) {
+ encodeSample(dataStream, lastSample);
+ }
+ dataStream.flush();
+ return outputStream.toByteArray();
+ } catch (Exception e) {
+ log.error("In combineSampleBytes(), exception combining sample byte arrays", e);
+ return new byte[0];
+ }
+ }
+
+ /**
+ * This invokes the processor on the values in the timeline bytes.
+ *
+ * @param chunk the timeline chuck to scan
+ * @param processor the callback to which values value counts are passed to be processed.
+ * @throws java.io.IOException
+ */
+ @Override
+ public void scan(final TimelineChunk chunk, final SampleProcessor processor) throws IOException {
+ //System.out.printf("Decoded: %s\n", new String(Hex.encodeHex(bytes)));
+ scan(chunk.getTimeBytesAndSampleBytes().getSampleBytes(), chunk.getTimeBytesAndSampleBytes().getTimeBytes(), chunk.getSampleCount(), processor);
+ }
+
+ @Override
+ public void scan(final byte[] samples, final byte[] times, final int sampleCount, final SampleProcessor processor) throws IOException {
+ final ByteArrayInputStream byteStream = new ByteArrayInputStream(samples);
+ final DataInputStream inputStream = new DataInputStream(byteStream);
+ final TimelineCursor timeCursor = new DefaultTimelineCursor(times, sampleCount);
+ int sampleNumber = 0;
+ while (true) {
+ final int opcodeByte;
+ opcodeByte = inputStream.read();
+ if (opcodeByte == -1) {
+ return; // At "eof"
+ }
+ final SampleOpcode opcode = SampleOpcode.getOpcodeFromIndex(opcodeByte);
+ switch (opcode) {
+ case REPEAT_BYTE:
+ case REPEAT_SHORT:
+ final int repeatCount = opcode == SampleOpcode.REPEAT_BYTE ? inputStream.readUnsignedByte() : inputStream.readUnsignedShort();
+ final SampleOpcode repeatedOpcode = SampleOpcode.getOpcodeFromIndex(inputStream.read());
+ final Object value = decodeScalarValue(inputStream, repeatedOpcode);
+ final SampleOpcode replacementOpcode = repeatedOpcode.getReplacement();
+ processor.processSamples(timeCursor, repeatCount, replacementOpcode, value);
+ sampleNumber += repeatCount;
+ timeCursor.skipToSampleNumber(sampleNumber);
+ break;
+ default:
+ processor.processSamples(timeCursor, 1, opcode.getReplacement(), decodeScalarValue(inputStream, opcode));
+ break;
+ }
+ }
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/codec/EncodedBytesAndSampleCount.java b/usage/src/main/java/com/ning/billing/usage/timeline/codec/EncodedBytesAndSampleCount.java
new file mode 100644
index 0000000..62f0a3a
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/codec/EncodedBytesAndSampleCount.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.util.Arrays;
+
+public class EncodedBytesAndSampleCount {
+
+ private final byte[] encodedBytes;
+ private final int sampleCount;
+
+ public EncodedBytesAndSampleCount(final byte[] encodedBytes, final int sampleCount) {
+ this.encodedBytes = encodedBytes;
+ this.sampleCount = sampleCount;
+ }
+
+ public byte[] getEncodedBytes() {
+ return encodedBytes;
+ }
+
+ public int getSampleCount() {
+ return sampleCount;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("EncodedBytesAndSampleCount");
+ sb.append("{encodedBytes=").append(encodedBytes == null ? "null" : "");
+ for (int i = 0; encodedBytes != null && i < encodedBytes.length; ++i) {
+ sb.append(i == 0 ? "" : ", ").append(encodedBytes[i]);
+ }
+ sb.append(", sampleCount=").append(sampleCount);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final EncodedBytesAndSampleCount that = (EncodedBytesAndSampleCount) o;
+
+ if (sampleCount != that.sampleCount) {
+ return false;
+ }
+ if (!Arrays.equals(encodedBytes, that.encodedBytes)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = encodedBytes != null ? Arrays.hashCode(encodedBytes) : 0;
+ result = 31 * result + sampleCount;
+ return result;
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/codec/SampleAccumulator.java b/usage/src/main/java/com/ning/billing/usage/timeline/codec/SampleAccumulator.java
new file mode 100644
index 0000000..50a1621
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/codec/SampleAccumulator.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.samples.NullSample;
+import com.ning.billing.usage.timeline.samples.RepeatSample;
+import com.ning.billing.usage.timeline.samples.SampleBase;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+
+/**
+ * Accumulator of samples. Samples are compressed using a SampleCoder.
+ */
+public class SampleAccumulator {
+
+ private static final Logger log = LoggerFactory.getLogger(SampleAccumulator.class);
+ private static final int DEFAULT_CHUNK_BYTE_ARRAY_SIZE = 100;
+
+ private ByteArrayOutputStream byteStream;
+ private DataOutputStream outputStream;
+ private int sampleCount;
+ private SampleBase lastSample;
+ protected final SampleCoder sampleCoder;
+
+ public SampleAccumulator(final SampleCoder sampleCoder) {
+ this.sampleCoder = sampleCoder;
+ reset();
+ }
+
+ public SampleAccumulator(final byte[] bytes, final SampleBase lastSample, final int sampleCount, final SampleCoder sampleCoder) throws IOException {
+ reset();
+ this.byteStream.write(bytes);
+ this.lastSample = lastSample;
+ this.sampleCount = sampleCount;
+ this.sampleCoder = sampleCoder;
+ }
+
+ public void addSampleList(final List<ScalarSample> samples) {
+ for (final ScalarSample sample : samples) {
+ addSample(sample);
+ }
+ }
+
+ public synchronized void addSample(final ScalarSample sample) {
+ if (lastSample == null) {
+ lastSample = sample;
+ } else {
+ final SampleOpcode lastOpcode = lastSample.getOpcode();
+ final SampleOpcode sampleOpcode = sample.getOpcode();
+ if (lastSample instanceof RepeatSample) {
+ final RepeatSample repeatSample = (RepeatSample) lastSample;
+ final ScalarSample sampleRepeated = repeatSample.getSampleRepeated();
+ if (sampleRepeated.getOpcode() == sampleOpcode &&
+ (sampleOpcode.getNoArgs() || ScalarSample.sameSampleValues(sampleRepeated.getSampleValue(), sample.getSampleValue())) &&
+ repeatSample.getRepeatCount() < RepeatSample.MAX_SHORT_REPEAT_COUNT) {
+ // We can just increment the count in the repeat instance
+ repeatSample.incrementRepeatCount();
+ } else {
+ // A non-matching repeat - just add it
+ addLastSample();
+ lastSample = sample;
+ }
+ } else {
+ final ScalarSample lastScalarSample = (ScalarSample) lastSample;
+ if (sampleOpcode == lastOpcode &&
+ (sampleOpcode.getNoArgs() || ScalarSample.sameSampleValues(sample.getSampleValue(), lastScalarSample.getSampleValue()))) {
+ // Replace lastSample with repeat group
+ lastSample = new RepeatSample(2, lastScalarSample);
+ } else {
+ addLastSample();
+ lastSample = sample;
+ }
+ }
+ }
+ // In all cases, we got 1 more sample
+ sampleCount++;
+ }
+
+ public int getSampleCount() {
+ return sampleCount;
+ }
+
+ protected ByteArrayOutputStream getByteStream() {
+ return byteStream;
+ }
+
+ protected SampleBase getLastSample() {
+ return lastSample;
+ }
+
+ /**
+ * The log scanner can safely call this method, and know that the byte
+ * array will always end in a complete sample
+ *
+ * @return an instance containing the bytes and the counts of samples
+ */
+ public synchronized EncodedBytesAndSampleCount getEncodedSamples() {
+ if (lastSample != null) {
+ sampleCoder.encodeSample(outputStream, lastSample);
+ lastSample = null;
+ }
+ try {
+ outputStream.flush();
+ return new EncodedBytesAndSampleCount(byteStream.toByteArray(), sampleCount);
+ } catch (IOException e) {
+ log.error("In getEncodedSamples, IOException flushing outputStream", e);
+ // Do no harm - - this at least won't corrupt the encoding
+ return new EncodedBytesAndSampleCount(new byte[0], 0);
+ }
+ }
+
+ private synchronized void addLastSample() {
+ if (lastSample != null) {
+ sampleCoder.encodeSample(outputStream, lastSample);
+ lastSample = null;
+ }
+ }
+
+ public synchronized void reset() {
+ byteStream = new ByteArrayOutputStream(DEFAULT_CHUNK_BYTE_ARRAY_SIZE);
+ outputStream = new DataOutputStream(byteStream);
+ lastSample = null;
+ sampleCount = 0;
+ }
+
+ public synchronized void addPlaceholder(final int repeatCount) {
+ if (repeatCount > 0) {
+ addLastSample();
+ lastSample = new RepeatSample<Void>(repeatCount, new NullSample());
+ sampleCount += repeatCount;
+ }
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/codec/SampleCoder.java b/usage/src/main/java/com/ning/billing/usage/timeline/codec/SampleCoder.java
new file mode 100644
index 0000000..b218f8e
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/codec/SampleCoder.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.samples.SampleBase;
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+
+/**
+ * Samples compressor and decompressor
+ */
+public interface SampleCoder {
+
+ public byte[] compressSamples(final List<ScalarSample> samples);
+
+ public List<ScalarSample> decompressSamples(final byte[] sampleBytes) throws IOException;
+
+ /**
+ * This method writes the binary encoding of the sample to the outputStream. This encoding
+ * is the form saved in the db and scanned when read from the db.
+ *
+ * @param outputStream the stream to which bytes should be written
+ * @param sample the sample to be written
+ */
+ public void encodeSample(final DataOutputStream outputStream, final SampleBase sample);
+
+ /**
+ * Output the scalar value into the output stream
+ *
+ * @param outputStream the stream to which bytes should be written
+ * @param value the sample value, interpreted according to the opcode
+ */
+ public void encodeScalarValue(final DataOutputStream outputStream, final SampleOpcode opcode, final Object value);
+
+ /**
+ * This routine returns a ScalarSample that may have a smaller representation than the
+ * ScalarSample argument. In particular, if tries hard to choose the most compact
+ * representation of double-precision values.
+ *
+ * @param sample A ScalarSample to be compressed
+ * @return Either the same ScalarSample is that input, for for some cases of opcode DOUBLE,
+ * a more compact ScalarSample which when processed returns a double value.
+ */
+ public ScalarSample compressSample(final ScalarSample sample);
+
+ public Object decodeScalarValue(final DataInputStream inputStream, final SampleOpcode opcode) throws IOException;
+
+ public double getMaxFractionError();
+
+ public byte[] combineSampleBytes(final List<byte[]> sampleBytesList);
+
+ public void scan(final TimelineChunk chunk, final SampleProcessor processor) throws IOException;
+
+ public void scan(final byte[] samples, final byte[] times, final int sampleCount, final SampleProcessor processor) throws IOException;
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/codec/SampleProcessor.java b/usage/src/main/java/com/ning/billing/usage/timeline/codec/SampleProcessor.java
new file mode 100644
index 0000000..077a107
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/codec/SampleProcessor.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.times.TimelineCursor;
+
+public interface SampleProcessor {
+
+ /**
+ * Process sampleCount sequential samples with identical values. sampleCount will usually be 1,
+ * but may be larger than 1. Implementors may just loop processing identical values, but some
+ * implementations may optimize adding a bunch of repeated values
+ *
+ * @param timeCursor a TimeCursor object from which times can be found.
+ * @param sampleCount the count of sequential, identical values
+ * @param opcode the opcode of the sample value, which may not be a REPEAT opcode
+ * @param value the value of this kind of sample over the count of samples
+ */
+ public void processSamples(final TimelineCursor timeCursor,
+ final int sampleCount,
+ final SampleOpcode opcode,
+ final Object value);
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/codec/TimeRangeSampleProcessor.java b/usage/src/main/java/com/ning/billing/usage/timeline/codec/TimeRangeSampleProcessor.java
new file mode 100644
index 0000000..f0ffc32
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/codec/TimeRangeSampleProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import org.joda.time.DateTime;
+
+import com.ning.billing.usage.timeline.samples.SampleOpcode;
+import com.ning.billing.usage.timeline.times.TimelineCursor;
+
+public abstract class TimeRangeSampleProcessor implements SampleProcessor {
+
+ private final DateTime startTime; // Inclusive
+ private final DateTime endTime; // Inclusive
+
+ public TimeRangeSampleProcessor(final DateTime startTime, final DateTime endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ /**
+ * Process sampleCount sequential samples with identical values. sampleCount will usually be 1,
+ * but may be larger than 1. Implementors may just loop processing identical values, but some
+ * implementations may optimize adding a bunch of repeated values
+ *
+ * @param timeCursor a TimeCursor instance, which supplies successive int UNIX times
+ * @param sampleCount the count of sequential, identical values
+ * @param opcode the opcode of the sample value, which may not be a REPEAT opcode
+ * @param value the value of this kind of sample over the sampleCount samples
+ */
+ @Override
+ public void processSamples(final TimelineCursor timeCursor, final int sampleCount, final SampleOpcode opcode, final Object value) {
+ for (int i = 0; i < sampleCount; i++) {
+ // Check if the sample is in the right time range
+ final DateTime sampleTime = timeCursor.getNextTime();
+ if ((startTime == null || !sampleTime.isBefore(startTime)) && ((endTime == null || !sampleTime.isAfter(endTime)))) {
+ processOneSample(sampleTime, opcode, value);
+ }
+ }
+ }
+
+ public abstract void processOneSample(final DateTime time, final SampleOpcode opcode, final Object value);
+
+ public DateTime getStartTime() {
+ return startTime;
+ }
+
+ public DateTime getEndTime() {
+ return endTime;
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/codec/TimesAndSamplesCoder.java b/usage/src/main/java/com/ning/billing/usage/timeline/codec/TimesAndSamplesCoder.java
new file mode 100644
index 0000000..bca1eda
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/codec/TimesAndSamplesCoder.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.codec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import com.ning.billing.usage.timeline.Hex;
+import com.ning.billing.usage.timeline.chunks.TimeBytesAndSampleBytes;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+
+public class TimesAndSamplesCoder {
+
+ public static int getSizeOfTimeBytes(final byte[] timesAndSamples) {
+ final DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(timesAndSamples));
+ try {
+ return inputStream.readInt();
+ } catch (IOException e) {
+ throw new IllegalStateException(String.format("Exception reading timeByteCount in TimelineChunkMapper.map() for timesAndSamples %s",
+ new String(Hex.encodeHex(timesAndSamples))), e);
+ }
+ }
+
+ public static int getEncodedLength(final TimelineChunk chunk) {
+ return 4 + chunk.getTimeBytesAndSampleBytes().getTimeBytes().length +
+ chunk.getTimeBytesAndSampleBytes().getSampleBytes().length;
+ }
+
+ public static byte[] getTimeBytes(final byte[] timesAndSamples) {
+ final int timeByteCount = getSizeOfTimeBytes(timesAndSamples);
+ return Arrays.copyOfRange(timesAndSamples, 4, 4 + timeByteCount);
+ }
+
+ public static byte[] getSampleBytes(final byte[] timesAndSamples) {
+ final int timeByteCount = getSizeOfTimeBytes(timesAndSamples);
+ return Arrays.copyOfRange(timesAndSamples, 4 + timeByteCount, timesAndSamples.length);
+ }
+
+ public static TimeBytesAndSampleBytes getTimesBytesAndSampleBytes(final byte[] timesAndSamples) {
+ final int timeByteCount = getSizeOfTimeBytes(timesAndSamples);
+ final byte[] timeBytes = Arrays.copyOfRange(timesAndSamples, 4, 4 + timeByteCount);
+ final byte[] sampleBytes = Arrays.copyOfRange(timesAndSamples, 4 + timeByteCount, timesAndSamples.length);
+ return new TimeBytesAndSampleBytes(timeBytes, sampleBytes);
+ }
+
+ public static byte[] combineTimesAndSamples(final byte[] times, final byte[] samples) {
+ final int totalSamplesSize = 4 + times.length + samples.length;
+ final ByteArrayOutputStream baStream = new ByteArrayOutputStream(totalSamplesSize);
+ final DataOutputStream outputStream = new DataOutputStream(baStream);
+ try {
+ outputStream.writeInt(times.length);
+ outputStream.write(times);
+ outputStream.write(samples);
+ outputStream.flush();
+ return baStream.toByteArray();
+ } catch (IOException e) {
+ throw new IllegalStateException(String.format("Exception reading timeByteCount in TimelineChunkMapper.map() for times %s, samples %s",
+ new String(Hex.encodeHex(times)), new String(Hex.encodeHex(samples))), e);
+ }
+ }
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestEncodedBytesAndSampleCount.java b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestEncodedBytesAndSampleCount.java
new file mode 100644
index 0000000..7567b6a
--- /dev/null
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/codec/TestEncodedBytesAndSampleCount.java
@@ -0,0 +1,32 @@
+package com.ning.billing.usage.timeline.codec;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestEncodedBytesAndSampleCount {
+
+ @Test(groups = "fast")
+ public void testGetters() throws Exception {
+ final byte[] encodedBytes = {0xA, 0xB};
+ final int sampleCount = 20;
+ final EncodedBytesAndSampleCount encodedBytesAndSampleCount = new EncodedBytesAndSampleCount(encodedBytes, sampleCount);
+
+ Assert.assertEquals(encodedBytesAndSampleCount.getEncodedBytes(), encodedBytes);
+ Assert.assertEquals(encodedBytesAndSampleCount.getSampleCount(), sampleCount);
+ }
+
+ @Test(groups = "fast")
+ public void testEquals() throws Exception {
+ final byte[] encodedBytes = {0xA, 0xB};
+ final int sampleCount = 20;
+
+ final EncodedBytesAndSampleCount encodedBytesAndSampleCount = new EncodedBytesAndSampleCount(encodedBytes, sampleCount);
+ Assert.assertEquals(encodedBytesAndSampleCount, encodedBytesAndSampleCount);
+
+ final EncodedBytesAndSampleCount sameEncodedBytesAndSampleCount = new EncodedBytesAndSampleCount(encodedBytes, sampleCount);
+ Assert.assertEquals(sameEncodedBytesAndSampleCount, encodedBytesAndSampleCount);
+
+ final EncodedBytesAndSampleCount otherEncodedBytesAndSampleCount = new EncodedBytesAndSampleCount(encodedBytes, sampleCount + 1);
+ Assert.assertEquals(otherEncodedBytesAndSampleCount, encodedBytesAndSampleCount);
+ }
+}