killbill-aplcache
usage: initial import of TimelineCursor Cursor for a timeline. Signed-off-by: …
7/28/2012 5:28:05 PM
Changes
Details
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/times/DefaultTimelineCursor.java b/usage/src/main/java/com/ning/billing/usage/timeline/times/DefaultTimelineCursor.java
new file mode 100644
index 0000000..0b6cefa
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/times/DefaultTimelineCursor.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.times;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.DateTimeUtils;
+
+public class DefaultTimelineCursor implements TimelineCursor {
+
+ private static final Logger log = LoggerFactory.getLogger(DefaultTimelineCursor.class);
+
+ private final DataInputStream timelineDataStream;
+ private int sampleCount;
+ private int sampleNumber;
+ private int byteCursor;
+ private int lastValue;
+ private int delta;
+ private int repeatCount;
+
+ public DefaultTimelineCursor(final byte[] times, final int sampleCount) {
+ this.timelineDataStream = new DataInputStream(new ByteArrayInputStream(times));
+ this.sampleCount = sampleCount;
+ this.sampleNumber = 0;
+ this.byteCursor = 0;
+ this.lastValue = 0;
+ this.delta = 0;
+ this.repeatCount = 0;
+ }
+
+ private int getNextTimeInternal() {
+ try {
+ if (repeatCount > 0) {
+ repeatCount--;
+ lastValue += delta;
+ } else {
+ final int nextOpcode = timelineDataStream.read();
+ byteCursor++;
+ if (nextOpcode == -1) {
+ return nextOpcode;
+ }
+ if (nextOpcode == TimelineOpcode.FULL_TIME.getOpcodeIndex()) {
+ lastValue = timelineDataStream.readInt();
+ byteCursor += 4;
+ } else if (nextOpcode == TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex()) {
+ repeatCount = timelineDataStream.readUnsignedByte() - 1;
+ delta = timelineDataStream.read();
+ byteCursor += 2;
+ lastValue += delta;
+ } else if (nextOpcode == TimelineOpcode.REPEATED_DELTA_TIME_SHORT.getOpcodeIndex()) {
+ repeatCount = timelineDataStream.readUnsignedShort() - 1;
+ delta = timelineDataStream.read();
+ byteCursor += 3;
+ lastValue += delta;
+ } else if (nextOpcode <= TimelineOpcode.MAX_DELTA_TIME) {
+ lastValue += nextOpcode;
+ } else {
+ throw new IllegalStateException(String.format("In TimeIterator.getNextTime(), unknown opcode %x at offset %d", nextOpcode, byteCursor));
+ }
+ }
+ sampleNumber++;
+ if (sampleNumber > sampleCount) {
+ log.error("In TimeIterator.getNextTime(), after update, sampleNumber %d > sampleCount %d", sampleNumber, sampleCount);
+ }
+ return lastValue;
+ } catch (IOException e) {
+ log.error("IOException in TimeIterator.getNextTime()", e);
+ return -1;
+ }
+ }
+
+ @Override
+ public void skipToSampleNumber(final int finalSampleNumber) {
+ if (finalSampleNumber > sampleCount) {
+ log.error("In TimeIterator.skipToSampleNumber(), finalSampleCount {} > sampleCount {}", finalSampleNumber, sampleCount);
+ }
+ while (sampleNumber < finalSampleNumber) {
+ try {
+ if (repeatCount > 0) {
+ final int countToSkipInRepeat = Math.min(finalSampleNumber - sampleNumber, repeatCount);
+ sampleNumber += countToSkipInRepeat;
+ repeatCount -= countToSkipInRepeat;
+ lastValue += countToSkipInRepeat * delta;
+ } else {
+ final int nextOpcode = timelineDataStream.read();
+ if (nextOpcode == -1) {
+ return;
+ }
+ byteCursor++;
+ if (nextOpcode == TimelineOpcode.FULL_TIME.getOpcodeIndex()) {
+ lastValue = timelineDataStream.readInt();
+ byteCursor += 4;
+ sampleNumber++;
+ } else if (nextOpcode == TimelineOpcode.REPEATED_DELTA_TIME_BYTE.getOpcodeIndex()) {
+ repeatCount = timelineDataStream.readUnsignedByte() - 1;
+ delta = timelineDataStream.read();
+ byteCursor += 2;
+ lastValue += delta;
+ sampleNumber++;
+ } else if (nextOpcode == TimelineOpcode.REPEATED_DELTA_TIME_SHORT.getOpcodeIndex()) {
+ repeatCount = timelineDataStream.readUnsignedShort() - 1;
+ delta = timelineDataStream.read();
+ byteCursor += 3;
+ lastValue += delta;
+ sampleNumber++;
+ } else if (nextOpcode <= TimelineOpcode.MAX_DELTA_TIME) {
+ lastValue += nextOpcode;
+ sampleNumber++;
+ } else {
+ throw new IllegalStateException(String.format("In TimeIterator.skipToSampleNumber(), unknown opcode %x at offset %d", nextOpcode, byteCursor));
+ }
+ }
+ } catch (IOException e) {
+ log.error("IOException in TimeIterator.getNextTime()", e);
+ }
+ }
+ }
+
+ @Override
+ public DateTime getNextTime() {
+ final int nextTime = getNextTimeInternal();
+ if (nextTime == -1) {
+ throw new IllegalStateException(String.format("In DecodedSampleOutputProcessor.getNextTime(), got -1 from timeCursor.getNextTimeInternal()"));
+ } else {
+ return DateTimeUtils.dateTimeFromUnixSeconds(nextTime);
+ }
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/times/TimelineCursor.java b/usage/src/main/java/com/ning/billing/usage/timeline/times/TimelineCursor.java
new file mode 100644
index 0000000..474526a
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/times/TimelineCursor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.times;
+
+import org.joda.time.DateTime;
+
+public interface TimelineCursor {
+
+ /**
+ * Skip to the given sample number within the timeline, where 0 is the first sample number
+ *
+ * @param finalSampleNumber the sample number to skip to
+ */
+ public void skipToSampleNumber(final int finalSampleNumber);
+
+ /**
+ * Return the DateTime for the next sample
+ *
+ * @return the DateTime for the next sample. If we've run out of samples, return null
+ */
+ public DateTime getNextTime();
+}
diff --git a/usage/src/test/java/com/ning/billing/usage/timeline/times/TestDefaultTimelineCoder.java b/usage/src/test/java/com/ning/billing/usage/timeline/times/TestDefaultTimelineCoder.java
index 44def78..d742bd8 100644
--- a/usage/src/test/java/com/ning/billing/usage/timeline/times/TestDefaultTimelineCoder.java
+++ b/usage/src/test/java/com/ning/billing/usage/timeline/times/TestDefaultTimelineCoder.java
@@ -261,7 +261,7 @@ public class TestDefaultTimelineCoder {
// Total samples: 6
final int sampleCount = 7;
final byte[] times = Hex.decodeHex("FF4F91D5BCFE021E00FE021EFF790B4422".toCharArray());
- final TimelineCursorImpl cursor = new TimelineCursorImpl(times, sampleCount);
+ final DefaultTimelineCursor cursor = new DefaultTimelineCursor(times, sampleCount);
for (int i = 0; i < sampleCount; i++) {
final DateTime nextTime = cursor.getNextTime();
Assert.assertNotNull(nextTime);
@@ -287,7 +287,7 @@ public class TestDefaultTimelineCoder {
// Total samples: 6
final int sampleCount = 7;
final byte[] times = Hex.decodeHex("FF4F91D5BCFE021E00FE021EFF790B4422".toCharArray());
- final TimelineCursorImpl cursor = new TimelineCursorImpl(times, sampleCount);
+ final DefaultTimelineCursor cursor = new DefaultTimelineCursor(times, sampleCount);
for (int i = 0; i < sampleCount; i++) {
final DateTime nextTime = cursor.getNextTime();
Assert.assertNotNull(nextTime);
@@ -309,7 +309,7 @@ public class TestDefaultTimelineCoder {
//final byte[] times = Hex.decodeHex("ff4f90f67afd03ce1e1ffe1a1e1d01fe771e1d01fd01df1e1d1ffe761e1d01fe771e1d01fe571e".toCharArray());
final byte[] times = Hex.decodeHex("00000018FF4F8FE521FD023D1E1FFEF01E1D01FE771E1D01FD03E21EFE07980F".toCharArray());
Assert.assertEquals(times.length, 32);
- final TimelineCursorImpl cursor = new TimelineCursorImpl(times, sampleCount);
+ final DefaultTimelineCursor cursor = new DefaultTimelineCursor(times, sampleCount);
for (int i = 0; i < sampleCount; i++) {
final DateTime nextTime = cursor.getNextTime();
Assert.assertNotNull(nextTime);
@@ -380,8 +380,8 @@ public class TestDefaultTimelineCoder {
}
final byte[] newCombined = timelineCoder.combineTimelines(timeParts, null);
final int newCombinedLength = timelineCoder.countTimeBytesSamples(newCombined);
- final TimelineCursorImpl concatedCursor = new TimelineCursorImpl(concatedTimes, sampleCount);
- final TimelineCursorImpl combinedCursor = new TimelineCursorImpl(newCombined, sampleCount);
+ final DefaultTimelineCursor concatedCursor = new DefaultTimelineCursor(concatedTimes, sampleCount);
+ final DefaultTimelineCursor combinedCursor = new DefaultTimelineCursor(newCombined, sampleCount);
for (int i = 0; i < sampleCount; i++) {
final DateTime concatedTime = concatedCursor.getNextTime();
final DateTime combinedTime = combinedCursor.getNextTime();