diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
index eea392d..c74576b 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.codec.SampleCoder;
-import com.ning.billing.meter.timeline.codec.TimelineChunkAccumulator;
import com.ning.billing.meter.timeline.persistent.FileBackedBuffer;
import com.ning.billing.meter.timeline.persistent.Replayer;
import com.ning.billing.meter.timeline.persistent.TimelineDao;
@@ -296,39 +295,13 @@ public class TimelineEventHandler {
// Now, filter each accumulator for this host
final List<TimelineChunk> samplesBySourceName = new ArrayList<TimelineChunk>();
for (final TimelineSourceEventAccumulator accumulator : sourceAccumulatorsAndDate.getCategoryAccumulators().values()) {
- for (final TimelineChunk chunk : accumulator.getPendingTimelineChunks()) {
- if ((filterStartTime != null && chunk.getEndTime().isBefore(filterStartTime)) ||
- (filterEndTime != null && chunk.getStartTime().isAfter(filterEndTime)) ||
- !metricIds.contains(chunk.getMetricId())) {
- continue;
- } else {
- samplesBySourceName.add(chunk);
- }
- }
- final List<DateTime> accumulatorTimes = accumulator.getTimes();
- if (accumulatorTimes.size() == 0) {
- continue;
- }
- final DateTime accumulatorStartTime = accumulator.getStartTime();
- final DateTime accumulatorEndTime = accumulator.getEndTime();
-
// Check if the time filters apply
- if ((filterStartTime != null && accumulatorEndTime.isBefore(filterStartTime)) || (filterEndTime != null && accumulatorStartTime.isAfter(filterEndTime))) {
- // Ignore this accumulator
+ if ((filterStartTime != null && accumulator.getEndTime().isBefore(filterStartTime)) || (filterEndTime != null && accumulator.getStartTime().isAfter(filterEndTime))) {
+ // Nope - ignore this accumulator
continue;
}
- // This accumulator is in the right time range, now return only the sample kinds specified
- final byte[] timeBytes = timelineCoder.compressDateTimes(accumulatorTimes);
- for (final TimelineChunkAccumulator chunkAccumulator : accumulator.getTimelines().values()) {
- if (metricIds.contains(chunkAccumulator.getMetricId())) {
- // Extract the timeline for this chunk by copying it and reading encoded bytes
- accumulatorDeepCopyCount.incrementAndGet();
- final TimelineChunkAccumulator chunkAccumulatorCopy = chunkAccumulator.deepCopy();
- final TimelineChunk timelineChunk = chunkAccumulatorCopy.extractTimelineChunkAndReset(accumulatorStartTime, accumulatorEndTime, timeBytes);
- samplesBySourceName.add(timelineChunk);
- }
- }
+ samplesBySourceName.addAll(accumulator.getInMemoryTimelineChunks(metricIds));
}
inMemoryChunksReturnedCount.addAndGet(samplesBySourceName.size());
Collections.sort(samplesBySourceName, CHUNK_COMPARATOR);
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
index 9912b37..ab5ae35 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
@@ -16,7 +16,9 @@
package com.ning.billing.meter.timeline;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -235,11 +237,32 @@ public class TimelineSourceEventAccumulator {
}
}
- public synchronized List<TimelineChunk> getPendingTimelineChunks() {
+ public synchronized Collection<TimelineChunk> getInMemoryTimelineChunks(final List<Integer> metricIds) throws IOException {
final List<TimelineChunk> timelineChunks = new ArrayList<TimelineChunk>();
+
+ // Get all the older chunks from the staging area of the BackgroundDBChunkWriter
for (final PendingChunkMap pendingChunkMap : pendingChunkMaps) {
- timelineChunks.addAll(pendingChunkMap.getChunkMap().values());
+ for (final Integer metricId : metricIds) {
+ final TimelineChunk timelineChunkForMetricId = pendingChunkMap.getChunkMap().get(metricId);
+ if (timelineChunkForMetricId != null) {
+ timelineChunks.add(timelineChunkForMetricId);
+ }
+ }
}
+
+ // Get the data in this accumulator, not yet in the staging area
+ // This is very similar to extractAndQueueTimelineChunks() above, but without changing the global state
+ final byte[] timeBytes = timelineCoder.compressDateTimes(times);
+ for (final Integer metricId : metricIds) {
+ final TimelineChunkAccumulator chunkAccumulator = timelines.get(metricId);
+ if (chunkAccumulator != null) {
+ // Extract the timeline for this chunk by copying it and reading encoded bytes
+ final TimelineChunkAccumulator chunkAccumulatorCopy = chunkAccumulator.deepCopy();
+ final TimelineChunk timelineChunk = chunkAccumulatorCopy.extractTimelineChunkAndReset(startTime, endTime, timeBytes);
+ timelineChunks.add(timelineChunk);
+ }
+ }
+
return timelineChunks;
}