killbill-memoizeit

meter: fix getInMemoryTimelineChunks in TimelineEventHandler We

12/7/2012 6:50:54 PM

Details

diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
index eea392d..c74576b 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
 
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.codec.SampleCoder;
-import com.ning.billing.meter.timeline.codec.TimelineChunkAccumulator;
 import com.ning.billing.meter.timeline.persistent.FileBackedBuffer;
 import com.ning.billing.meter.timeline.persistent.Replayer;
 import com.ning.billing.meter.timeline.persistent.TimelineDao;
@@ -296,39 +295,13 @@ public class TimelineEventHandler {
         // Now, filter each accumulator for this host
         final List<TimelineChunk> samplesBySourceName = new ArrayList<TimelineChunk>();
         for (final TimelineSourceEventAccumulator accumulator : sourceAccumulatorsAndDate.getCategoryAccumulators().values()) {
-            for (final TimelineChunk chunk : accumulator.getPendingTimelineChunks()) {
-                if ((filterStartTime != null && chunk.getEndTime().isBefore(filterStartTime)) ||
-                    (filterEndTime != null && chunk.getStartTime().isAfter(filterEndTime)) ||
-                    !metricIds.contains(chunk.getMetricId())) {
-                    continue;
-                } else {
-                    samplesBySourceName.add(chunk);
-                }
-            }
-            final List<DateTime> accumulatorTimes = accumulator.getTimes();
-            if (accumulatorTimes.size() == 0) {
-                continue;
-            }
-            final DateTime accumulatorStartTime = accumulator.getStartTime();
-            final DateTime accumulatorEndTime = accumulator.getEndTime();
-
             // Check if the time filters apply
-            if ((filterStartTime != null && accumulatorEndTime.isBefore(filterStartTime)) || (filterEndTime != null && accumulatorStartTime.isAfter(filterEndTime))) {
-                // Ignore this accumulator
+            if ((filterStartTime != null && accumulator.getEndTime().isBefore(filterStartTime)) || (filterEndTime != null && accumulator.getStartTime().isAfter(filterEndTime))) {
+                // Nope - ignore this accumulator
                 continue;
             }
 
-            // This accumulator is in the right time range, now return only the sample kinds specified
-            final byte[] timeBytes = timelineCoder.compressDateTimes(accumulatorTimes);
-            for (final TimelineChunkAccumulator chunkAccumulator : accumulator.getTimelines().values()) {
-                if (metricIds.contains(chunkAccumulator.getMetricId())) {
-                    // Extract the timeline for this chunk by copying it and reading encoded bytes
-                    accumulatorDeepCopyCount.incrementAndGet();
-                    final TimelineChunkAccumulator chunkAccumulatorCopy = chunkAccumulator.deepCopy();
-                    final TimelineChunk timelineChunk = chunkAccumulatorCopy.extractTimelineChunkAndReset(accumulatorStartTime, accumulatorEndTime, timeBytes);
-                    samplesBySourceName.add(timelineChunk);
-                }
-            }
+            samplesBySourceName.addAll(accumulator.getInMemoryTimelineChunks(metricIds));
         }
         inMemoryChunksReturnedCount.addAndGet(samplesBySourceName.size());
         Collections.sort(samplesBySourceName, CHUNK_COMPARATOR);
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
index 9912b37..ab5ae35 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
@@ -16,7 +16,9 @@
 
 package com.ning.billing.meter.timeline;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -235,11 +237,32 @@ public class TimelineSourceEventAccumulator {
         }
     }
 
-    public synchronized List<TimelineChunk> getPendingTimelineChunks() {
+    public synchronized Collection<TimelineChunk> getInMemoryTimelineChunks(final List<Integer> metricIds) throws IOException {
         final List<TimelineChunk> timelineChunks = new ArrayList<TimelineChunk>();
+
+        // Get all the older chunks from the staging area of the BackgroundDBChunkWriter
         for (final PendingChunkMap pendingChunkMap : pendingChunkMaps) {
-            timelineChunks.addAll(pendingChunkMap.getChunkMap().values());
+            for (final Integer metricId : metricIds) {
+                final TimelineChunk timelineChunkForMetricId = pendingChunkMap.getChunkMap().get(metricId);
+                if (timelineChunkForMetricId != null) {
+                    timelineChunks.add(timelineChunkForMetricId);
+                }
+            }
         }
+
+        // Get the data in this accumulator, not yet in the staging area
+        // This is very similar to extractAndQueueTimelineChunks() above, but without changing the global state
+        final byte[] timeBytes = timelineCoder.compressDateTimes(times);
+        for (final Integer metricId : metricIds) {
+            final TimelineChunkAccumulator chunkAccumulator = timelines.get(metricId);
+            if (chunkAccumulator != null) {
+                // Extract the timeline for this chunk by copying it and reading encoded bytes
+                final TimelineChunkAccumulator chunkAccumulatorCopy = chunkAccumulator.deepCopy();
+                final TimelineChunk timelineChunk = chunkAccumulatorCopy.extractTimelineChunkAndReset(startTime, endTime, timeBytes);
+                timelineChunks.add(timelineChunk);
+            }
+        }
+
         return timelineChunks;
     }