killbill-aplcache

meter: import various tests from Arecibo Signed-off-by:

11/30/2012 11:42:26 PM

Changes

Details

diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java b/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java
index db7c77e..451e0ac 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java
@@ -37,17 +37,17 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.billing.util.config.MeterConfig;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.chunks.TimelineChunkMapper;
 import com.ning.billing.meter.timeline.codec.SampleCoder;
 import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
-import com.ning.billing.meter.timeline.persistent.DefaultTimelineDao;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
 import com.ning.billing.meter.timeline.times.TimelineCoder;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.config.MeterConfig;
 
 import com.google.inject.Inject;
 
@@ -61,7 +61,7 @@ public class TimelineAggregator {
     private static final Logger log = LoggerFactory.getLogger(TimelineAggregator.class);
 
     private final IDBI dbi;
-    private final DefaultTimelineDao timelineDao;
+    private final TimelineDao timelineDao;
     private final TimelineCoder timelineCoder;
     private final SampleCoder sampleCoder;
     private final MeterConfig config;
@@ -94,7 +94,7 @@ public class TimelineAggregator {
     private final List<Long> chunkIdsToInvalidateOrDelete = new ArrayList<Long>();
 
     @Inject
-    public TimelineAggregator(final IDBI dbi, final DefaultTimelineDao timelineDao, final TimelineCoder timelineCoder,
+    public TimelineAggregator(final IDBI dbi, final TimelineDao timelineDao, final TimelineCoder timelineCoder,
                               final SampleCoder sampleCoder, final MeterConfig config, final InternalCallContextFactory internalCallContextFactory) {
         this.dbi = dbi;
         this.timelineDao = timelineDao;
@@ -295,7 +295,8 @@ public class TimelineAggregator {
                 public Void withHandle(final Handle handle) throws Exception {
                     final Query<Map<String, Object>> query = handle.createQuery("getStreamingAggregationCandidates")
                                                                    .setFetchSize(Integer.MIN_VALUE)
-                                                                   .bind("aggregationLevel", aggregationLevel);
+                                                                   .bind("aggregationLevel", aggregationLevel)
+                                                                   .bind("tenantRecordId", createCallContext().getTenantRecordId());
                     query.setStatementLocator(new StringTemplate3StatementLocator(TimelineAggregatorSqlDao.class));
                     ResultIterator<TimelineChunk> iterator = null;
                     try {
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
index 2698a91..3c4531e 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
@@ -36,6 +36,10 @@ public class CSVConsumer {
         return rangeSampleProcessor.getSampleConsumer().toString();
     }
 
+    public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk) throws IOException {
+        return getSamplesAsCSV(sampleCoder, chunk, null, null);
+    }
+
     public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk, @Nullable final DateTime startTime, @Nullable final DateTime endTime) throws IOException {
         final CSVOutputProcessor processor = new CSVOutputProcessor(startTime, endTime);
         sampleCoder.scan(chunk, processor);
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java
index 158cfaf..f0231ab 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java
@@ -16,11 +16,7 @@
 
 package com.ning.billing.meter.timeline.persistent;
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import javax.annotation.Nullable;
 
@@ -34,19 +30,16 @@ import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
 import com.ning.billing.meter.timeline.shutdown.StartTimes;
-import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 
 import com.google.common.collect.BiMap;
-import com.google.common.collect.ImmutableList;
 
 public class CachingTimelineDao implements TimelineDao {
 
     private static final Logger log = LoggerFactory.getLogger(CachingTimelineDao.class);
 
     private final BiMap<Integer, String> sourcesCache;
-    private final Map<Integer, Set<Integer>> sourceIdsMetricIdsCache;
     private final BiMap<Integer, CategoryRecordIdAndMetric> metricsCache;
     private final BiMap<Integer, String> eventCategoriesCache;
 
@@ -59,17 +52,6 @@ public class CachingTimelineDao implements TimelineDao {
         sourcesCache = delegate.getSources(context);
         metricsCache = delegate.getMetrics(context);
         eventCategoriesCache = delegate.getEventCategories(context);
-        sourceIdsMetricIdsCache = new HashMap<Integer, Set<Integer>>();
-        for (final SourceRecordIdAndMetricRecordId both : delegate.getMetricIdsForAllSources(context)) {
-            final int sourceId = both.getSourceId();
-            final int metricId = both.getMetricId();
-            Set<Integer> metricIds = sourceIdsMetricIdsCache.get(sourceId);
-            if (metricIds == null) {
-                metricIds = new HashSet<Integer>();
-                sourceIdsMetricIdsCache.put(sourceId, metricIds);
-            }
-            metricIds.add(metricId);
-        }
     }
 
     @Override
@@ -139,32 +121,15 @@ public class CachingTimelineDao implements TimelineDao {
     }
 
     @Override
-    public synchronized int getOrAddMetric(final Integer sourceId, final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+    public synchronized int getOrAddMetric(final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         final CategoryRecordIdAndMetric categoryRecordIdAndMetric = new CategoryRecordIdAndMetric(eventCategoryId, metric);
         Integer metricId = metricsCache.inverse().get(categoryRecordIdAndMetric);
         if (metricId == null) {
-            metricId = delegate.getOrAddMetric(sourceId, eventCategoryId, metric, context);
+            metricId = delegate.getOrAddMetric(eventCategoryId, metric, context);
             metricsCache.put(metricId, categoryRecordIdAndMetric);
         }
-        if (sourceId != null) {
-            Set<Integer> metricIds = sourceIdsMetricIdsCache.get(sourceId);
-            if (metricIds == null) {
-                metricIds = new HashSet<Integer>();
-                sourceIdsMetricIdsCache.put(sourceId, metricIds);
-            }
-            metricIds.add(metricId);
-        }
-        return metricId;
-    }
 
-    @Override
-    public Iterable<Integer> getMetricIdsBySourceId(final Integer sourceId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return ImmutableList.copyOf(sourceIdsMetricIdsCache.get(sourceId));
-    }
-
-    @Override
-    public Iterable<SourceRecordIdAndMetricRecordId> getMetricIdsForAllSources(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getMetricIdsForAllSources(context);
+        return metricId;
     }
 
     @Override
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
index c607a8f..44658fa 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
@@ -143,7 +143,7 @@ public class DefaultTimelineDao implements TimelineDao {
     }
 
     @Override
-    public synchronized int getOrAddMetric(final Integer sourceId, final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+    public synchronized int getOrAddMetric(final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         delegate.begin();
         delegate.addMetric(eventCategoryId, metric, context);
         final Integer metricId = delegate.getMetricRecordId(eventCategoryId, metric, context);
@@ -153,16 +153,6 @@ public class DefaultTimelineDao implements TimelineDao {
     }
 
     @Override
-    public Iterable<Integer> getMetricIdsBySourceId(final Integer sourceId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getMetricRecordIdsBySourceRecordId(sourceId, context);
-    }
-
-    @Override
-    public Iterable<SourceRecordIdAndMetricRecordId> getMetricIdsForAllSources(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getMetricRecordIdsForAllSources(context);
-    }
-
-    @Override
     public Long insertTimelineChunk(final TimelineChunk timelineChunk, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         delegate.begin();
         delegate.insertTimelineChunk(timelineChunk, context);
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/FileBackedBuffer.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/FileBackedBuffer.java
index 590fda1..d7985fb 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/FileBackedBuffer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/FileBackedBuffer.java
@@ -26,8 +26,10 @@ import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
 
 import com.fasterxml.jackson.core.JsonEncoding;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
 import com.fasterxml.util.membuf.MemBuffersForBytes;
 import com.fasterxml.util.membuf.StreamyBytesMemBuffer;
 import com.google.common.annotations.VisibleForTesting;
@@ -40,7 +42,7 @@ public class FileBackedBuffer {
     private static final Logger log = LoggerFactory.getLogger(FileBackedBuffer.class);
 
     private static final SmileFactory smileFactory = new SmileFactory();
-    private static final ObjectMapper smileObjectMapper = new ObjectMapper(smileFactory);
+    private final ObjectMapper smileObjectMapper;
 
     static {
         // Disable all magic for now as we don't write the Smile header (we share the same smileGenerator
@@ -68,6 +70,10 @@ public class FileBackedBuffer {
         this.prefix = prefix;
         this.deleteFilesOnClose = deleteFilesOnClose;
 
+        smileObjectMapper = new ObjectMapper(smileFactory);
+        smileObjectMapper.registerModule(new JodaModule());
+        smileObjectMapper.enable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
         final MemBuffersForBytes bufs = new MemBuffersForBytes(segmentsSize, 1, maxNbSegments);
         inputBuffer = bufs.createStreamyBuffer(1, maxNbSegments);
 
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java
index ecf156a..486861f 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java
@@ -35,8 +35,10 @@ import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.fasterxml.jackson.dataformat.smile.SmileParser;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
@@ -50,7 +52,8 @@ public class Replayer {
 
     static {
         smileFactory.configure(SmileParser.Feature.REQUIRE_HEADER, false);
-        smileFactory.setCodec(smileMapper);
+        smileMapper.registerModule(new JodaModule());
+        smileMapper.enable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
     }
 
     @VisibleForTesting
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java
index cca7e7f..c82183a 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java
@@ -28,7 +28,6 @@ import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
 import com.ning.billing.meter.timeline.shutdown.StartTimes;
-import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 
@@ -64,11 +63,7 @@ public interface TimelineDao {
 
     BiMap<Integer, CategoryRecordIdAndMetric> getMetrics(InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
 
-    int getOrAddMetric(Integer sourceId, Integer eventCategoryId, String metric, InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException;
-
-    Iterable<Integer> getMetricIdsBySourceId(Integer sourceId, InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
-
-    Iterable<SourceRecordIdAndMetricRecordId> getMetricIdsForAllSources(InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
+    int getOrAddMetric(Integer eventCategoryId, String metric, InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException;
 
     // Timelines tables
 
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
index c376b72..805f068 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
@@ -87,9 +87,6 @@ public interface TimelineSqlDao extends Transactional<TimelineSqlDao> {
                                                          @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
-    Iterable<SourceRecordIdAndMetricRecordId> getMetricRecordIdsForAllSources(@InternalTenantContextBinder final InternalTenantContext context);
-
-    @SqlQuery
     CategoryRecordIdAndMetric getCategoryRecordIdAndMetric(@Bind("recordId") final Integer metricRecordId,
                                                            @InternalTenantContextBinder final InternalTenantContext context);
 
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
index 3735f9d..0d4349e 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
@@ -221,7 +221,7 @@ public class TimelineEventHandler {
 
             // Extract and parse samples
             final Map<Integer, ScalarSample> scalarSamples = new LinkedHashMap<Integer, ScalarSample>();
-            convertSamplesToScalarSamples(sourceId, eventType, samples, scalarSamples, context);
+            convertSamplesToScalarSamples(eventType, samples, scalarSamples, context);
 
             if (scalarSamples.isEmpty()) {
                 eventsDiscarded.incrementAndGet();
@@ -275,16 +275,17 @@ public class TimelineEventHandler {
 
     public Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, @Nullable final DateTime filterStartTime,
                                                                          @Nullable final DateTime filterEndTime, final InternalTenantContext context) throws IOException, ExecutionException {
-        return getInMemoryTimelineChunks(sourceId, ImmutableList.copyOf(timelineDAO.getMetricIdsBySourceId(sourceId, context)), filterStartTime, filterEndTime);
+        return getInMemoryTimelineChunks(sourceId, ImmutableList.copyOf(timelineDAO.getMetrics(context).keySet()), filterStartTime, filterEndTime, context);
     }
 
     public Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, final Integer metricId, @Nullable final DateTime filterStartTime,
-                                                                         @Nullable final DateTime filterEndTime) throws IOException, ExecutionException {
-        return getInMemoryTimelineChunks(sourceId, ImmutableList.<Integer>of(metricId), filterStartTime, filterEndTime);
+                                                                         @Nullable final DateTime filterEndTime, final InternalTenantContext context) throws IOException, ExecutionException {
+        return getInMemoryTimelineChunks(sourceId, ImmutableList.<Integer>of(metricId), filterStartTime, filterEndTime, context);
     }
 
     public synchronized Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, final List<Integer> metricIds,
-                                                                                      @Nullable final DateTime filterStartTime, @Nullable final DateTime filterEndTime) throws IOException, ExecutionException {
+                                                                                      @Nullable final DateTime filterStartTime, @Nullable final DateTime filterEndTime,
+                                                                                      final InternalTenantContext context) throws IOException, ExecutionException {
         getInMemoryChunksCallCount.incrementAndGet();
         // Check first if there is an in-memory accumulator for this host
         final SourceAccumulatorsAndUpdateDate sourceAccumulatorsAndDate = accumulators.get(sourceId);
@@ -335,7 +336,7 @@ public class TimelineEventHandler {
     }
 
     @VisibleForTesting
-    void convertSamplesToScalarSamples(final Integer sourceId, final String eventType, final Map<String, Object> inputSamples,
+    void convertSamplesToScalarSamples(final String eventType, final Map<String, Object> inputSamples,
                                        final Map<Integer, ScalarSample> outputSamples, final InternalCallContext context) {
         if (inputSamples == null) {
             return;
@@ -343,7 +344,7 @@ public class TimelineEventHandler {
         final Integer eventCategoryId = timelineDAO.getOrAddEventCategory(eventType, context);
 
         for (final String attributeName : inputSamples.keySet()) {
-            final Integer metricId = timelineDAO.getOrAddMetric(sourceId, eventCategoryId, attributeName, context);
+            final Integer metricId = timelineDAO.getOrAddMetric(eventCategoryId, attributeName, context);
             final Object sample = inputSamples.get(attributeName);
 
             outputSamples.put(metricId, ScalarSample.fromObject(sample));
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
index 3288dc3..a229b85 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
@@ -140,8 +140,9 @@ public class TimelineSourceEventAccumulator {
         }
         if (endTime == null) {
             endTime = timestamp;
-        } else if (!timestamp.isAfter(endTime)) {
-            log.warn("Adding samples for host {}, timestamp {} is not after the end time {}; ignored",
+        } else if (timestamp.isBefore(endTime)) {
+            // Note: we allow multiple events at the same time
+            log.warn("Adding samples for host {}, timestamp {} is before the end time {}; ignored",
                      new Object[]{sourceId, dateFormatter.print(timestamp), dateFormatter.print(endTime)});
             return;
         }
diff --git a/meter/src/main/resources/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.sql.stg b/meter/src/main/resources/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.sql.stg
index ff64329..bbb566a 100644
--- a/meter/src/main/resources/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.sql.stg
+++ b/meter/src/main/resources/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.sql.stg
@@ -4,64 +4,64 @@ CHECK_TENANT() ::= "tenant_record_id = :tenantRecordId"
 AND_CHECK_TENANT() ::= "AND <CHECK_TENANT()>"
 
 getStreamingAggregationCandidates() ::= <<
-  select
-    chunk_id
-  , source_id
-  , metric_id
-  , start_time
-  , end_time
-  , in_row_samples
-  , blob_samples
-  , sample_count
-  , aggregation_level
-  , not_valid
-  , dont_aggregate
-  from timeline_chunks
-  where source_id != 0 and aggregation_level = :aggregationLevel and not_valid = 0
-  <AND_CHECK_TENANT()>
-  order by source_id, metric_id, start_time
- >>
+select
+  record_id
+, source_record_id
+, metric_record_id
+, start_time
+, end_time
+, in_row_samples
+, blob_samples
+, sample_count
+, aggregation_level
+, not_valid
+, dont_aggregate
+from timeline_chunks
+where source_record_id != 0 and aggregation_level = :aggregationLevel and not_valid = 0
+<AND_CHECK_TENANT()>
+order by source_record_id, metric_record_id, start_time
+>>
 
- getAggregationCandidatesForSourceIdAndMetricIds(metricIds) ::= <<
-  select
-    chunk_id
-  , source_id
-  , metric_id
-  , start_time
-  , end_time
-  , in_row_samples
-  , blob_samples
-  , sample_count
-  , aggregation_level
-  , not_valid
-  , dont_aggregate
-  from timeline_chunks
-  where source_id = :source_id
-  and metric_id in (<metricIds>)
-  <AND_CHECK_TENANT()>
-  ;
+getAggregationCandidatesForSourceIdAndMetricIds(metricIds) ::= <<
+select
+  record_id
+, source_record_id
+, metric_record_id
+, start_time
+, end_time
+, in_row_samples
+, blob_samples
+, sample_count
+, aggregation_level
+, not_valid
+, dont_aggregate
+from timeline_chunks
+where source_record_id = :source_id
+and metric_record_id in (<metricIds>)
+<AND_CHECK_TENANT()>
+;
 >>
 
 getLastInsertedId() ::= <<
-  select last_insert_id();
+select last_insert_id();
 >>
 
 makeTimelineChunkValid() ::= <<
-  update timeline_chunks
-  set not_valid = 0
-  where chunk_id = :chunkId
-  <AND_CHECK_TENANT()>
-  ;
+update timeline_chunks
+set not_valid = 0
+where record_id = :chunkId
+<AND_CHECK_TENANT()>
+;
 >>
 
 makeTimelineChunksInvalid(chunkIds) ::=<<
-  update timeline_chunks
-  set not_valid = 1
-  where chunk_id in (<chunkIds>)
-  <AND_CHECK_TENANT()>
-  ;
+update timeline_chunks
+set not_valid = 1
+where record_id in (<chunkIds>)
+<AND_CHECK_TENANT()>
+;
 >>
 
 deleteTimelineChunks(chunkIds) ::=<<
-  delete from timeline_chunks where chunk_id in (<chunkIds>) <AND_CHECK_TENANT()>;
+delete from timeline_chunks where record_id in (<chunkIds>) <AND_CHECK_TENANT()>;
 >>
diff --git a/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg b/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
index e782aa1..3ccc68b 100644
--- a/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
+++ b/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
@@ -107,24 +107,6 @@ and category_record_id = :categoryRecordId
 ;
 >>
 
-getMetricRecordIdsBySourceRecordId() ::= <<
-select distinct
-  metric_record_id
-from timeline_chunks c
-where source_record_id = :sourceRecordId
-<AND_CHECK_TENANT()>
-;
->>
-
-getMetricRecordIdsForAllSources() ::= <<
-select distinct
-  metric_record_id
-, source_record_id
-from timeline_chunks c
-where <CHECK_TENANT()>
-;
->>
-
 getCategoryRecordIdAndMetric() ::= <<
 select
   category_record_id
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java b/meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java
new file mode 100644
index 0000000..d3bd906
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.aggregator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.config.ConfigurationObjectFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuiteWithEmbeddedDB;
+import com.ning.billing.meter.timeline.TimelineSourceEventAccumulator;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
+import com.ning.billing.meter.timeline.persistent.DefaultTimelineDao;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.meter.timeline.samples.SampleOpcode;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.MeterConfig;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class TestTimelineAggregator extends MeterTestSuiteWithEmbeddedDB {
+
+    private static final UUID HOST_UUID = UUID.randomUUID();
+    private static final String HOST_NAME = HOST_UUID.toString();
+    private static final String EVENT_TYPE = "myType";
+    private static final int EVENT_TYPE_ID = 123;
+    private static final String MIN_HEAPUSED_KIND = "min_heapUsed";
+    private static final String MAX_HEAPUSED_KIND = "max_heapUsed";
+    private static final DateTime START_TIME = new DateTime(DateTimeZone.UTC);
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(getDBI(), new ClockMock());
+
+    private TimelineDao timelineDao;
+    private TimelineAggregator aggregator;
+
+    private Integer hostId = null;
+    private Integer minHeapUsedKindId = null;
+    private Integer maxHeapUsedKindId = null;
+
+    @BeforeMethod(groups = "slow")
+    public void setUp() throws Exception {
+        timelineDao = new DefaultTimelineDao(getDBI());
+        final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+        aggregator = new TimelineAggregator(getDBI(), timelineDao, timelineCoder, sampleCoder, config, internalCallContextFactory);
+    }
+
+    @Test(groups = "slow")
+    public void testAggregation() throws Exception {
+        // Create the host
+        hostId = timelineDao.getOrAddSource(HOST_NAME, internalCallContext);
+        Assert.assertNotNull(hostId);
+        Assert.assertEquals(timelineDao.getSources(internalCallContext).values().size(), 1);
+
+        // Create the sample kinds
+        minHeapUsedKindId = timelineDao.getOrAddMetric(EVENT_TYPE_ID, MIN_HEAPUSED_KIND, internalCallContext);
+        Assert.assertNotNull(minHeapUsedKindId);
+        maxHeapUsedKindId = timelineDao.getOrAddMetric(EVENT_TYPE_ID, MAX_HEAPUSED_KIND, internalCallContext);
+        Assert.assertNotNull(maxHeapUsedKindId);
+        Assert.assertEquals(timelineDao.getMetrics(internalCallContext).values().size(), 2);
+
+        // Create two sets of times: T - 125 ... T - 65 ; T - 60 ... T (note the gap!)
+        createAOneHourTimelineChunk(125);
+        createAOneHourTimelineChunk(60);
+
+        // Check the getSamplesByHostIdsAndSampleKindIds DAO method works as expected
+        // You might want to draw timelines on a paper and remember boundaries are inclusive to understand these numbers
+        checkSamplesForATimeline(185, 126, 0);
+        checkSamplesForATimeline(185, 125, 2);
+        checkSamplesForATimeline(64, 61, 0);
+        checkSamplesForATimeline(125, 65, 2);
+        checkSamplesForATimeline(60, 0, 2);
+        checkSamplesForATimeline(125, 0, 4);
+        checkSamplesForATimeline(124, 0, 4);
+        checkSamplesForATimeline(124, 66, 2);
+
+        aggregator.getAndProcessTimelineAggregationCandidates();
+
+        Assert.assertEquals(timelineDao.getSources(internalCallContext).values().size(), 1);
+        Assert.assertEquals(timelineDao.getMetrics(internalCallContext).values().size(), 2);
+
+        // Similar than above, but we have only 2 now
+        checkSamplesForATimeline(185, 126, 0);
+        checkSamplesForATimeline(185, 125, 2);
+        // Note, the gap is filled now
+        checkSamplesForATimeline(64, 61, 2);
+        checkSamplesForATimeline(125, 65, 2);
+        checkSamplesForATimeline(60, 0, 2);
+        checkSamplesForATimeline(125, 0, 2);
+        checkSamplesForATimeline(124, 0, 2);
+        checkSamplesForATimeline(124, 66, 2);
+    }
+
+    private void checkSamplesForATimeline(final Integer startTimeMinutesAgo, final Integer endTimeMinutesAgo, final long expectedChunks) throws InterruptedException {
+        final AtomicLong timelineChunkSeen = new AtomicLong(0);
+
+        timelineDao.getSamplesBySourceIdsAndMetricIds(ImmutableList.<Integer>of(hostId), ImmutableList.<Integer>of(minHeapUsedKindId, maxHeapUsedKindId),
+                                                      START_TIME.minusMinutes(startTimeMinutesAgo), START_TIME.minusMinutes(endTimeMinutesAgo), new TimelineChunkConsumer() {
+
+            @Override
+            public void processTimelineChunk(final TimelineChunk chunk) {
+                Assert.assertEquals((Integer) chunk.getSourceId(), hostId);
+                Assert.assertTrue(chunk.getMetricId() == minHeapUsedKindId || chunk.getMetricId() == maxHeapUsedKindId);
+                timelineChunkSeen.incrementAndGet();
+            }
+        }, internalCallContext);
+
+        Assert.assertEquals(timelineChunkSeen.get(), expectedChunks);
+    }
+
+    private void createAOneHourTimelineChunk(final int startTimeMinutesAgo) throws IOException {
+        final DateTime firstSampleTime = START_TIME.minusMinutes(startTimeMinutesAgo);
+        final TimelineSourceEventAccumulator accumulator = new TimelineSourceEventAccumulator(timelineDao, timelineCoder, sampleCoder, hostId, EVENT_TYPE_ID, firstSampleTime, internalCallContextFactory);
+        // 120 samples per hour
+        for (int i = 0; i < 120; i++) {
+            final DateTime eventDateTime = firstSampleTime.plusSeconds(i * 30);
+            final Map<Integer, ScalarSample> event = createEvent(eventDateTime.getMillis());
+            final SourceSamplesForTimestamp samples = new SourceSamplesForTimestamp(hostId, EVENT_TYPE, eventDateTime, event);
+            accumulator.addSourceSamples(samples);
+        }
+
+        accumulator.extractAndQueueTimelineChunks();
+    }
+
+    private Map<Integer, ScalarSample> createEvent(final long ts) {
+        return ImmutableMap.<Integer, ScalarSample>of(
+                minHeapUsedKindId, new ScalarSample(SampleOpcode.LONG, Long.MIN_VALUE + ts),
+                maxHeapUsedKindId, new ScalarSample(SampleOpcode.LONG, Long.MAX_VALUE - ts)
+                                                     );
+    }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestCSVConsumer.java b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestCSVConsumer.java
new file mode 100644
index 0000000..f8718f4
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestCSVConsumer.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.consumer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.samples.SampleOpcode;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+
+public class TestCSVConsumer extends MeterTestSuite {
+
+    private static final int HOST_ID = 1242;
+    private static final int SAMPLE_KIND_ID = 12;
+    private static final int CHUNK_ID = 30;
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    @Test(groups = "fast")
+    public void testToString() throws Exception {
+        final int sampleCount = 3;
+
+        final DateTime startTime = new DateTime("2012-01-16T21:23:58.000Z", DateTimeZone.UTC);
+        final List<DateTime> dateTimes = new ArrayList<DateTime>();
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final DataOutputStream stream = new DataOutputStream(out);
+
+        for (int i = 0; i < sampleCount; i++) {
+            sampleCoder.encodeSample(stream, new ScalarSample<Long>(SampleOpcode.LONG, 12345L + i));
+            dateTimes.add(startTime.plusSeconds(1 + i));
+        }
+
+        final DateTime endTime = dateTimes.get(dateTimes.size() - 1);
+        final byte[] times = timelineCoder.compressDateTimes(dateTimes);
+        final TimelineChunk timelineChunk = new TimelineChunk(CHUNK_ID, HOST_ID, SAMPLE_KIND_ID, startTime, endTime, times, out.toByteArray(), sampleCount);
+
+        // Test CSV filtering
+        Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk), "1326749039,12345,1326749040,12346,1326749041,12347");
+        Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, null, null), "1326749039,12345,1326749040,12346,1326749041,12347");
+        Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime, null), "1326749039,12345,1326749040,12346,1326749041,12347");
+        Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, null, startTime.plusSeconds(sampleCount)), "1326749039,12345,1326749040,12346,1326749041,12347");
+        Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime.plusSeconds(1), startTime.plusSeconds(sampleCount)), "1326749039,12345,1326749040,12346,1326749041,12347");
+        Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime.plusSeconds(2), startTime.plusSeconds(sampleCount)), "1326749040,12346,1326749041,12347");
+        Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime.plusSeconds(3), startTime.plusSeconds(sampleCount)), "1326749041,12347");
+        Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime.plusSeconds(4), startTime.plusSeconds(sampleCount)), "");
+        // Buggy start date
+        Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime.plusSeconds(10), startTime.plusSeconds(sampleCount)), "");
+        // Buggy end date
+        Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime, startTime.minusSeconds(1)), "");
+    }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/MockFileBackedBuffer.java b/meter/src/test/java/com/ning/billing/meter/timeline/MockFileBackedBuffer.java
new file mode 100644
index 0000000..ffe6362
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/MockFileBackedBuffer.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.ning.billing.meter.timeline.persistent.FileBackedBuffer;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+
+public class MockFileBackedBuffer extends FileBackedBuffer {
+
+    private final List<SourceSamplesForTimestamp> hostSamplesForTimestamps = new ArrayList<SourceSamplesForTimestamp>();
+
+    public MockFileBackedBuffer() throws IOException {
+        // Kepp it small - 50 bytes. Will be allocated but not used
+        super(String.valueOf(System.nanoTime()), "test", 50, 1);
+    }
+
+    @Override
+    public boolean append(final SourceSamplesForTimestamp hostSamplesForTimestamp) {
+        hostSamplesForTimestamps.add(hostSamplesForTimestamp);
+        return true;
+    }
+
+    /**
+     * Discard in-memory and on-disk data
+     */
+    @Override
+    public void discard() {
+        hostSamplesForTimestamps.clear();
+    }
+
+    @Override
+    public long getBytesInMemory() {
+        return -1;
+    }
+
+    @Override
+    public long getBytesOnDisk() {
+        return 0;
+    }
+
+    @Override
+    public long getFilesCreated() {
+        return 0;
+    }
+
+    @Override
+    public long getInMemoryAvailableSpace() {
+        return -1;
+    }
+
+    public List<SourceSamplesForTimestamp> getSourceSamplesForTimestamps() {
+        return hostSamplesForTimestamps;
+    }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/MockTimelineDao.java b/meter/src/test/java/com/ning/billing/meter/timeline/MockTimelineDao.java
new file mode 100644
index 0000000..a2cd301
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/MockTimelineDao.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.meter.timeline.shutdown.StartTimes;
+import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+
+public final class MockTimelineDao implements TimelineDao {
+
+    private final BiMap<Integer, String> hosts = HashBiMap.create();
+    private final BiMap<Integer, CategoryRecordIdAndMetric> sampleKinds = HashBiMap.create();
+    private final BiMap<Integer, String> eventCategories = HashBiMap.create();
+    private final BiMap<Integer, TimelineChunk> timelineChunks = HashBiMap.create();
+    private final Map<Integer, Map<Integer, List<TimelineChunk>>> samplesPerHostAndSampleKind = new HashMap<Integer, Map<Integer, List<TimelineChunk>>>();
+    private final AtomicReference<StartTimes> lastStartTimes = new AtomicReference<StartTimes>();
+
+    @Override
+    public Integer getSourceId(final String host, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        synchronized (hosts) {
+            return hosts.inverse().get(host);
+        }
+    }
+
+    @Override
+    public String getSource(final Integer hostId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        synchronized (hosts) {
+            return hosts.get(hostId);
+        }
+    }
+
+    @Override
+    public BiMap<Integer, String> getSources(final InternalTenantContext context) {
+        return hosts;
+    }
+
+    @Override
+    public int getOrAddSource(final String host, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        synchronized (hosts) {
+            final Integer hostId = getSourceId(host, context);
+            if (hostId == null) {
+                hosts.put(hosts.size() + 1, host);
+                return hosts.size();
+            } else {
+                return hostId;
+            }
+        }
+    }
+
+    @Override
+    public Integer getEventCategoryId(final String eventCategory, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        synchronized (eventCategories) {
+            return eventCategories.inverse().get(eventCategory);
+        }
+    }
+
+    @Override
+    public String getEventCategory(final Integer eventCategoryId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        synchronized (eventCategories) {
+            return eventCategories.get(eventCategoryId);
+        }
+    }
+
+    @Override
+    public int getOrAddEventCategory(final String eventCategory, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        synchronized (eventCategories) {
+            Integer eventCategoryId = getEventCategoryId(eventCategory, context);
+            if (eventCategoryId == null) {
+                eventCategoryId = eventCategories.size() + 1;
+                eventCategories.put(eventCategoryId, eventCategory);
+            }
+
+            return eventCategoryId;
+        }
+    }
+
+    @Override
+    public BiMap<Integer, String> getEventCategories(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        return eventCategories;
+    }
+
+    @Override
+    public Integer getMetricId(final int eventCategoryId, final String sampleKind, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        synchronized (sampleKinds) {
+            return sampleKinds.inverse().get(new CategoryRecordIdAndMetric(eventCategoryId, sampleKind));
+        }
+    }
+
+    @Override
+    public CategoryRecordIdAndMetric getCategoryIdAndMetric(final Integer sampleKindId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        synchronized (sampleKinds) {
+            return sampleKinds.get(sampleKindId);
+        }
+    }
+
+    @Override
+    public BiMap<Integer, CategoryRecordIdAndMetric> getMetrics(final InternalTenantContext context) {
+        synchronized (sampleKinds) {
+            return sampleKinds;
+        }
+    }
+
+    @Override
+    public int getOrAddMetric(final Integer eventCategoryId, final String sampleKind, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        synchronized (sampleKinds) {
+            Integer sampleKindId = getMetricId(eventCategoryId, sampleKind, context);
+            if (sampleKindId == null) {
+                sampleKindId = sampleKinds.size() + 1;
+                sampleKinds.put(sampleKindId, new CategoryRecordIdAndMetric(eventCategoryId, sampleKind));
+            }
+            return sampleKindId;
+        }
+    }
+
+    @Override
+    public Long insertTimelineChunk(final TimelineChunk chunk, final InternalCallContext context) {
+        final Long timelineChunkId;
+        synchronized (timelineChunks) {
+            timelineChunks.put(timelineChunks.size(), chunk);
+            timelineChunkId = (long) timelineChunks.size() - 1;
+        }
+
+        synchronized (samplesPerHostAndSampleKind) {
+            Map<Integer, List<TimelineChunk>> samplesPerSampleKind = samplesPerHostAndSampleKind.get(chunk.getSourceId());
+            if (samplesPerSampleKind == null) {
+                samplesPerSampleKind = new HashMap<Integer, List<TimelineChunk>>();
+            }
+
+            List<TimelineChunk> chunkAndTimes = samplesPerSampleKind.get(chunk.getMetricId());
+            if (chunkAndTimes == null) {
+                chunkAndTimes = new ArrayList<TimelineChunk>();
+            }
+
+            chunkAndTimes.add(chunk);
+            samplesPerSampleKind.put(chunk.getMetricId(), chunkAndTimes);
+
+            samplesPerHostAndSampleKind.put(chunk.getSourceId(), samplesPerSampleKind);
+        }
+
+        return timelineChunkId;
+    }
+
+    @Override
+    public void getSamplesBySourceIdsAndMetricIds(final List<Integer> hostIds, @Nullable final List<Integer> sampleKindIds,
+                                                  final DateTime startTime, final DateTime endTime,
+                                                  final TimelineChunkConsumer chunkConsumer, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        for (final Integer hostId : samplesPerHostAndSampleKind.keySet()) {
+            if (hostIds.indexOf(hostId) == -1) {
+                continue;
+            }
+
+            final Map<Integer, List<TimelineChunk>> samplesPerSampleKind = samplesPerHostAndSampleKind.get(hostId);
+            for (final Integer sampleKindId : samplesPerSampleKind.keySet()) {
+                if (sampleKindIds != null && sampleKindIds.indexOf(sampleKindId) == -1) {
+                    continue;
+                }
+
+                for (final TimelineChunk chunk : samplesPerSampleKind.get(sampleKindId)) {
+                    if (chunk.getStartTime().isAfter(endTime) || chunk.getEndTime().isBefore(startTime)) {
+                        continue;
+                    }
+
+                    chunkConsumer.processTimelineChunk(chunk);
+                }
+            }
+        }
+    }
+
+    @Override
+    public StartTimes getLastStartTimes(final InternalTenantContext context) {
+        return lastStartTimes.get();
+    }
+
+    @Override
+    public Integer insertLastStartTimes(final StartTimes startTimes, final InternalCallContext context) {
+        lastStartTimes.set(startTimes);
+        return 1;
+    }
+
+    @Override
+    public void deleteLastStartTimes(final InternalCallContext context) {
+        lastStartTimes.set(null);
+    }
+
+    @Override
+    public void test(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+    }
+
+    public BiMap<Integer, TimelineChunk> getTimelineChunks() {
+        return timelineChunks;
+    }
+
+    @Override
+    public void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList, final InternalCallContext context) {
+        for (final TimelineChunk chunk : timelineChunkList) {
+            insertTimelineChunk(chunk, context);
+        }
+    }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java
index ad51041..860ceb5 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java
@@ -57,10 +57,10 @@ public class TestDefaultTimelineDao extends MeterTestSuiteWithEmbeddedDB {
 
         // Create the samples
         final String sampleOne = UUID.randomUUID().toString();
-        final Integer sampleOneId = dao.getOrAddMetric(hostId, eventCategoryId, sampleOne, internalCallContext);
+        final Integer sampleOneId = dao.getOrAddMetric(eventCategoryId, sampleOne, internalCallContext);
         Assert.assertNotNull(sampleOneId);
         final String sampleTwo = UUID.randomUUID().toString();
-        final Integer sampleTwoId = dao.getOrAddMetric(hostId, eventCategoryId, sampleTwo, internalCallContext);
+        final Integer sampleTwoId = dao.getOrAddMetric(eventCategoryId, sampleTwo, internalCallContext);
         Assert.assertNotNull(sampleTwoId);
 
         // Basic retrieval tests
@@ -75,26 +75,11 @@ public class TestDefaultTimelineDao extends MeterTestSuiteWithEmbeddedDB {
         Assert.assertEquals(dao.getCategoryIdAndMetric(sampleTwoId, internalCallContext).getEventCategoryId(), (int) eventCategoryId);
         Assert.assertEquals(dao.getCategoryIdAndMetric(sampleTwoId, internalCallContext).getMetric(), sampleTwo);
 
-        // No samples yet
-        Assert.assertEquals(ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext)).size(), 0);
-
         dao.insertTimelineChunk(new TimelineChunk(0, hostId, sampleOneId, startTime, endTime, new byte[0], new byte[0], 0), internalCallContext);
-        final ImmutableList<Integer> firstFetch = ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext));
-        Assert.assertEquals(firstFetch.size(), 1);
-        Assert.assertEquals(firstFetch.get(0), sampleOneId);
-
         dao.insertTimelineChunk(new TimelineChunk(0, hostId, sampleTwoId, startTime, endTime, new byte[0], new byte[0], 0), internalCallContext);
-        final ImmutableList<Integer> secondFetch = ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext));
-        Assert.assertEquals(secondFetch.size(), 2);
-        Assert.assertTrue(secondFetch.contains(sampleOneId));
-        Assert.assertTrue(secondFetch.contains(sampleTwoId));
 
         // Random sampleKind for random host
         dao.insertTimelineChunk(new TimelineChunk(0, Integer.MAX_VALUE - 100, Integer.MAX_VALUE, startTime, endTime, new byte[0], new byte[0], 0), internalCallContext);
-        final ImmutableList<Integer> thirdFetch = ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext));
-        Assert.assertEquals(secondFetch.size(), 2);
-        Assert.assertTrue(thirdFetch.contains(sampleOneId));
-        Assert.assertTrue(thirdFetch.contains(sampleTwoId));
 
         // Test dashboard query
         final AtomicInteger chunksSeen = new AtomicInteger(0);
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestFileBackedBuffer.java b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestFileBackedBuffer.java
new file mode 100644
index 0000000..b41aec0
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestFileBackedBuffer.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.persistent;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.BackgroundDBChunkWriter;
+import com.ning.billing.meter.timeline.MockTimelineDao;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.MeterConfig;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestFileBackedBuffer extends MeterTestSuite {
+
+    private static final Logger log = LoggerFactory.getLogger(TestFileBackedBuffer.class);
+
+    private static final UUID HOST_UUID = UUID.randomUUID();
+    private static final String KIND_A = "kindA";
+    private static final String KIND_B = "kindB";
+    private static final Map<String, Object> EVENT = ImmutableMap.<String, Object>of(KIND_A, 12, KIND_B, 42);
+    // ~105 bytes per event, 10 1MB buffers -> need at least 100,000 events to spill over
+    private static final int NB_EVENTS = 100000;
+    private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestFileBackedBuffer-" + System.currentTimeMillis());
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(DBI.class), new ClockMock());
+    private final TimelineDao dao = new MockTimelineDao();
+    private TimelineEventHandler timelineEventHandler;
+
+    @BeforeMethod(groups = "fast")
+    public void setUp() throws Exception {
+        Assert.assertTrue(basePath.mkdir());
+        System.setProperty("killbill.usage.timelines.spoolDir", basePath.getAbsolutePath());
+        System.setProperty("killbill.usage.timelines.length", "60s");
+        final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+        timelineEventHandler = new TimelineEventHandler(config, dao, timelineCoder, sampleCoder, new BackgroundDBChunkWriter(dao, config, internalCallContextFactory),
+                                                        new FileBackedBuffer(config.getSpoolDir(), "TimelineEventHandler", 1024 * 1024, 10));
+
+        dao.getOrAddSource(HOST_UUID.toString(), internalCallContext);
+    }
+
+    @Test(groups = "fast") // Not really fast, but doesn't require a database
+    public void testAppend() throws Exception {
+        log.info("Writing files to " + basePath);
+        final List<File> binFiles = new ArrayList<File>();
+
+        final List<DateTime> timestampsRecorded = new ArrayList<DateTime>();
+        final List<String> categoriesRecorded = new ArrayList<String>();
+
+        // Sanity check before the tests
+        Assert.assertEquals(timelineEventHandler.getBackingBuffer().getFilesCreated(), 0);
+        findBinFiles(binFiles, basePath);
+        Assert.assertEquals(binFiles.size(), 0);
+
+        // Send enough events to spill over to disk
+        final DateTime startTime = new DateTime(DateTimeZone.UTC);
+        for (int i = 0; i < NB_EVENTS; i++) {
+            final String category = UUID.randomUUID().toString();
+            final DateTime eventTimestamp = startTime.plusSeconds(i);
+            timelineEventHandler.record(HOST_UUID.toString(), category, eventTimestamp, EVENT, internalCallContext);
+            timestampsRecorded.add(eventTimestamp);
+            categoriesRecorded.add(category);
+        }
+
+        // Check the files have been created (at least one per accumulator)
+        final long bytesOnDisk = timelineEventHandler.getBackingBuffer().getBytesOnDisk();
+        Assert.assertTrue(timelineEventHandler.getBackingBuffer().getFilesCreated() > 0);
+        binFiles.clear();
+        findBinFiles(binFiles, basePath);
+        Assert.assertTrue(binFiles.size() > 0);
+
+        log.info("Sent {} events and wrote {} bytes on disk ({} bytes/event)", new Object[]{NB_EVENTS, bytesOnDisk, bytesOnDisk / NB_EVENTS});
+
+        // Replay the events. Note that size of timestamp recorded != eventsReplayed as some of the ones sent are still in memory
+        final Replayer replayer = new Replayer(basePath.getAbsolutePath());
+        final List<SourceSamplesForTimestamp> eventsReplayed = replayer.readAll();
+        for (int i = 0; i < eventsReplayed.size(); i++) {
+            // Looks like Jackson maps it back using the JVM timezone
+            Assert.assertEquals(eventsReplayed.get(i).getTimestamp().toDateTime(DateTimeZone.UTC), timestampsRecorded.get(i));
+            Assert.assertEquals(eventsReplayed.get(i).getCategory(), categoriesRecorded.get(i));
+        }
+
+        // Make sure files have been deleted
+        binFiles.clear();
+        findBinFiles(binFiles, basePath);
+        Assert.assertEquals(binFiles.size(), 0);
+    }
+
+    private static void findBinFiles(final Collection<File> files, final File directory) {
+        final File[] found = directory.listFiles(new FileFilter() {
+            @Override
+            public boolean accept(final File pathname) {
+                return pathname.getName().endsWith(".bin");
+            }
+        });
+        if (found != null) {
+            for (final File file : found) {
+                if (file.isDirectory()) {
+                    findBinFiles(files, file);
+                } else {
+                    files.add(file);
+                }
+            }
+        }
+    }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestSamplesReplayer.java b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestSamplesReplayer.java
new file mode 100644
index 0000000..4da4131
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestSamplesReplayer.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.persistent;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.DBI;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.MockTimelineDao;
+import com.ning.billing.meter.timeline.TimelineSourceEventAccumulator;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.samples.SampleOpcode;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+
+import com.google.common.collect.ImmutableMap;
+
+// Lightweight version of TestFileBackedBuffer
+public class TestSamplesReplayer extends MeterTestSuite {
+
+    // Total space: 255 * 3 = 765 bytes
+    private static final int NB_EVENTS = 3;
+    // One will still be in memory after the flush
+    private static final int EVENTS_ON_DISK = NB_EVENTS - 1;
+    private static final int HOST_ID = 1;
+    private static final int EVENT_CATEGORY_ID = 123;
+    private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestSamplesReplayer-" + System.currentTimeMillis());
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(DBI.class), new ClockMock());
+
+    @BeforeMethod(groups = "fast")
+    public void setUp() throws Exception {
+        Assert.assertTrue(basePath.mkdir());
+    }
+
+    @Test(groups = "fast")
+    public void testIdentityFilter() throws Exception {
+        // Need less than 765 + 1 (metadata) bytes
+        final FileBackedBuffer fileBackedBuffer = new FileBackedBuffer(basePath.toString(), "test", 765, 1);
+
+        // Create the host samples - this will take 255 bytes
+        final Map<Integer, ScalarSample> eventMap = new HashMap<Integer, ScalarSample>();
+        eventMap.putAll(ImmutableMap.<Integer, ScalarSample>of(
+                1, new ScalarSample(SampleOpcode.BYTE, (byte) 0),
+                2, new ScalarSample(SampleOpcode.SHORT, (short) 1),
+                3, new ScalarSample(SampleOpcode.INT, 1000),
+                4, new ScalarSample(SampleOpcode.LONG, 12345678901L),
+                5, new ScalarSample(SampleOpcode.DOUBLE, Double.MAX_VALUE)
+                                                              ));
+        eventMap.putAll(ImmutableMap.<Integer, ScalarSample>of(
+                6, new ScalarSample(SampleOpcode.FLOAT, Float.NEGATIVE_INFINITY),
+                7, new ScalarSample(SampleOpcode.STRING, "pwet")
+                                                              ));
+        final DateTime firstTime = new DateTime(DateTimeZone.UTC).minusSeconds(NB_EVENTS * 30);
+
+        // Write the samples to disk
+        for (int i = 0; i < NB_EVENTS; i++) {
+            final SourceSamplesForTimestamp samples = new SourceSamplesForTimestamp(HOST_ID, "something", firstTime.plusSeconds(30 * i), eventMap);
+            fileBackedBuffer.append(samples);
+        }
+
+        // Try the replayer
+        final Replayer replayer = new Replayer(new File(basePath.toString()).getAbsolutePath());
+        final List<SourceSamplesForTimestamp> hostSamples = replayer.readAll();
+        Assert.assertEquals(hostSamples.size(), EVENTS_ON_DISK);
+
+        // Try to encode them again
+        final MockTimelineDao dao = new MockTimelineDao();
+        final TimelineSourceEventAccumulator accumulator = new TimelineSourceEventAccumulator(dao, timelineCoder, sampleCoder, HOST_ID,
+                                                                                              EVENT_CATEGORY_ID, hostSamples.get(0).getTimestamp(), internalCallContextFactory);
+        for (final SourceSamplesForTimestamp samplesFound : hostSamples) {
+            accumulator.addSourceSamples(samplesFound);
+        }
+        Assert.assertTrue(accumulator.checkSampleCounts(EVENTS_ON_DISK));
+
+        // This will check the SampleCode can encode value correctly
+        accumulator.extractAndQueueTimelineChunks();
+        Assert.assertEquals(dao.getTimelineChunks().keySet().size(), 7);
+        for (final TimelineChunk chunk : dao.getTimelineChunks().values()) {
+            Assert.assertEquals(chunk.getSourceId(), HOST_ID);
+            Assert.assertEquals(chunk.getSampleCount(), EVENTS_ON_DISK);
+        }
+    }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/TestInMemoryEventHandler.java b/meter/src/test/java/com/ning/billing/meter/timeline/TestInMemoryEventHandler.java
new file mode 100644
index 0000000..2ee5851
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/TestInMemoryEventHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.io.File;
+import java.util.Map;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.persistent.FileBackedBuffer;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.MeterConfig;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestInMemoryEventHandler extends MeterTestSuite {
+
+    private static final UUID HOST_UUID = UUID.randomUUID();
+    private static final String EVENT_TYPE = "eventType";
+    private static final String SAMPLE_KIND_A = "kindA";
+    private static final String SAMPLE_KIND_B = "kindB";
+    private static final Map<String, Object> EVENT = ImmutableMap.<String, Object>of(SAMPLE_KIND_A, 12, SAMPLE_KIND_B, 42);
+    private static final int NB_EVENTS = 5;
+    private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestInMemoryCollectorEventProcessor-" + System.currentTimeMillis());
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(DBI.class), new ClockMock());
+
+    private final TimelineDao dao = new MockTimelineDao();
+    private TimelineEventHandler timelineEventHandler;
+    private int eventTypeId = 0;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        Assert.assertTrue(basePath.mkdir());
+        System.setProperty("killbill.usage.timelines.spoolDir", basePath.getAbsolutePath());
+        final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+        timelineEventHandler = new TimelineEventHandler(config, dao, timelineCoder, sampleCoder, new BackgroundDBChunkWriter(dao, config, internalCallContextFactory),
+                                                        new FileBackedBuffer(config.getSpoolDir(), "TimelineEventHandler", 1024 * 1024, 10));
+
+        dao.getOrAddSource(HOST_UUID.toString(), internalCallContext);
+        eventTypeId = dao.getOrAddEventCategory(EVENT_TYPE, internalCallContext);
+    }
+
+    @Test(groups = "fast")
+    public void testInMemoryFilters() throws Exception {
+        final DateTime startTime = new DateTime(DateTimeZone.UTC);
+        for (int i = 0; i < NB_EVENTS; i++) {
+            timelineEventHandler.record(HOST_UUID.toString(), EVENT_TYPE, startTime, EVENT, internalCallContext);
+        }
+        final DateTime endTime = new DateTime(DateTimeZone.UTC);
+
+        final Integer hostId = dao.getSourceId(HOST_UUID.toString(), internalCallContext);
+        Assert.assertNotNull(hostId);
+        final Integer sampleKindAId = dao.getMetricId(eventTypeId, SAMPLE_KIND_A, internalCallContext);
+        Assert.assertNotNull(sampleKindAId);
+        final Integer sampleKindBId = dao.getMetricId(eventTypeId, SAMPLE_KIND_B, internalCallContext);
+        Assert.assertNotNull(sampleKindBId);
+
+        // One per host and type
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, null, null, internalCallContext).size(), 2);
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, startTime, null, internalCallContext).size(), 2);
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, null, endTime, internalCallContext).size(), 2);
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, startTime, endTime, internalCallContext).size(), 2);
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindAId, startTime, endTime, internalCallContext).size(), 1);
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindBId, startTime, endTime, internalCallContext).size(), 1);
+        // Wider ranges should be supported
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindBId, startTime.minusSeconds(1), endTime, internalCallContext).size(), 1);
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindBId, startTime, endTime.plusSeconds(1), internalCallContext).size(), 1);
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindBId, startTime.minusSeconds(1), endTime.plusSeconds(1), internalCallContext).size(), 1);
+        // Buggy kind
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, Integer.MAX_VALUE, startTime, endTime, internalCallContext).size(), 0);
+        // Buggy start date
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, startTime.plusMinutes(1), endTime, internalCallContext).size(), 0);
+        // Buggy end date
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, startTime, endTime.minusMinutes(1), internalCallContext).size(), 0);
+        // Buggy host
+        Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(Integer.MAX_VALUE, startTime, endTime, internalCallContext).size(), 0);
+    }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineEventHandler.java b/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineEventHandler.java
new file mode 100644
index 0000000..89d5512
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineEventHandler.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.MeterConfig;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestTimelineEventHandler extends MeterTestSuite {
+
+    private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestTimelineEventHandler-" + System.currentTimeMillis());
+    private static final String EVENT_TYPE = "eventType";
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(DBI.class), new ClockMock());
+
+    private final TimelineDao dao = new MockTimelineDao();
+
+    @Test(groups = "fast")
+    public void testDownsizingValues() throws Exception {
+        Assert.assertTrue(basePath.mkdir());
+        System.setProperty("killbill.usage.timelines.spoolDir", basePath.getAbsolutePath());
+        final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+        final int eventTypeId = dao.getOrAddEventCategory(EVENT_TYPE, internalCallContext);
+        final int int2shortId = dao.getOrAddMetric(eventTypeId, "int2short", internalCallContext);
+        final int long2intId = dao.getOrAddMetric(eventTypeId, "long2int", internalCallContext);
+        final int long2shortId = dao.getOrAddMetric(eventTypeId, "long2short", internalCallContext);
+        final int int2intId = dao.getOrAddMetric(eventTypeId, "int2int", internalCallContext);
+        final int long2longId = dao.getOrAddMetric(eventTypeId, "long2long", internalCallContext);
+        final int hostId = 1;
+        final TimelineEventHandler handler = new TimelineEventHandler(config, dao, timelineCoder, sampleCoder,
+                                                                      new BackgroundDBChunkWriter(dao, config, internalCallContextFactory), new MockFileBackedBuffer());
+
+        // Test downsizing of values
+        final Map<String, Object> event = ImmutableMap.<String, Object>of(
+                "int2short", new Integer(1),
+                "long2int", new Long(Integer.MAX_VALUE),
+                "long2short", new Long(2),
+                "int2int", Integer.MAX_VALUE,
+                "long2long", Long.MAX_VALUE);
+        final Map<Integer, ScalarSample> output = convertEventToSamples(handler, event, EVENT_TYPE);
+
+        Assert.assertEquals(output.get(int2shortId).getSampleValue(), (short) 1);
+        Assert.assertEquals(output.get(int2shortId).getSampleValue().getClass(), Short.class);
+        Assert.assertEquals(output.get(long2intId).getSampleValue(), Integer.MAX_VALUE);
+        Assert.assertEquals(output.get(long2intId).getSampleValue().getClass(), Integer.class);
+        Assert.assertEquals(output.get(long2shortId).getSampleValue(), (short) 2);
+        Assert.assertEquals(output.get(long2shortId).getSampleValue().getClass(), Short.class);
+        Assert.assertEquals(output.get(int2intId).getSampleValue(), Integer.MAX_VALUE);
+        Assert.assertEquals(output.get(int2intId).getSampleValue().getClass(), Integer.class);
+        Assert.assertEquals(output.get(long2longId).getSampleValue(), Long.MAX_VALUE);
+        Assert.assertEquals(output.get(long2longId).getSampleValue().getClass(), Long.class);
+    }
+
+    private Map<Integer, ScalarSample> convertEventToSamples(final TimelineEventHandler handler, final Map<String, Object> event, final String eventType) {
+        final Map<Integer, ScalarSample> output = new HashMap<Integer, ScalarSample>();
+        handler.convertSamplesToScalarSamples(eventType, event, output, internalCallContext);
+        return output;
+    }
+
+    private void processOneEvent(final TimelineEventHandler handler, final int hostId, final String eventType, final String sampleKind, final DateTime timestamp) throws Exception {
+        final Map<String, Object> rawEvent = ImmutableMap.<String, Object>of(sampleKind, new Integer(1));
+        final Map<Integer, ScalarSample> convertedEvent = convertEventToSamples(handler, rawEvent, eventType);
+        handler.processSamples(new SourceSamplesForTimestamp(hostId, eventType, timestamp, convertedEvent), internalCallContext);
+    }
+
+    private void sleep(final int millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Test(groups = "fast")
+    public void testPurgeAccumulators() throws Exception {
+        System.setProperty("arecibo.collector.timelines.spoolDir", basePath.getAbsolutePath());
+        final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+        final TimelineEventHandler handler = new TimelineEventHandler(config, dao, timelineCoder, sampleCoder, new BackgroundDBChunkWriter(dao, config, internalCallContextFactory), new MockFileBackedBuffer());
+        Assert.assertEquals(handler.getAccumulators().size(), 0);
+        processOneEvent(handler, 1, "eventType1", "sampleKind1", new DateTime());
+        sleep(20);
+        final DateTime purgeBeforeTime = new DateTime();
+        sleep(20);
+        processOneEvent(handler, 1, "eventType2", "sampleKind2", new DateTime());
+        Assert.assertEquals(handler.getAccumulators().size(), 2);
+        handler.purgeOldSourcesAndAccumulators(purgeBeforeTime);
+        final Collection<TimelineSourceEventAccumulator> accumulators = handler.getAccumulators();
+        Assert.assertEquals(accumulators.size(), 1);
+    }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineSourceEventAccumulator.java b/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineSourceEventAccumulator.java
new file mode 100644
index 0000000..55dd0b8
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineSourceEventAccumulator.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.DBI;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.samples.SampleOpcode;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+
+public class TestTimelineSourceEventAccumulator extends MeterTestSuite {
+
+    private static final int HOST_ID = 1;
+    private static final int EVENT_CATEGORY_ID = 123;
+
+    private static final MockTimelineDao dao = new MockTimelineDao();
+    private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+    private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+    private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(DBI.class), new ClockMock());
+
+    @Test(groups = "fast")
+    public void testSimpleAggregate() throws IOException {
+        final DateTime startTime = new DateTime(DateTimeZone.UTC);
+        final TimelineSourceEventAccumulator accumulator = new TimelineSourceEventAccumulator(dao, timelineCoder, sampleCoder, HOST_ID,
+                                                                                              EVENT_CATEGORY_ID, startTime, internalCallContextFactory);
+
+        // Send a first type of data
+        final int sampleCount = 5;
+        final int sampleKindId = 1;
+        sendData(accumulator, startTime, sampleCount, sampleKindId);
+        Assert.assertEquals(accumulator.getStartTime(), startTime);
+        Assert.assertEquals(accumulator.getEndTime(), startTime.plusSeconds(sampleCount - 1));
+        Assert.assertEquals(accumulator.getSourceId(), HOST_ID);
+        Assert.assertEquals(accumulator.getTimelines().size(), 1);
+        Assert.assertEquals(accumulator.getTimelines().get(sampleKindId).getSampleCount(), sampleCount);
+        Assert.assertEquals(accumulator.getTimelines().get(sampleKindId).getMetricId(), sampleKindId);
+
+        // Send now a second type
+        final DateTime secondStartTime = startTime.plusSeconds(sampleCount + 1);
+        final int secondSampleCount = 15;
+        final int secondSampleKindId = 2;
+        sendData(accumulator, secondStartTime, secondSampleCount, secondSampleKindId);
+        // We keep the start time of the accumulator
+        Assert.assertEquals(accumulator.getStartTime(), startTime);
+        Assert.assertEquals(accumulator.getEndTime(), secondStartTime.plusSeconds(secondSampleCount - 1));
+        Assert.assertEquals(accumulator.getSourceId(), HOST_ID);
+        Assert.assertEquals(accumulator.getTimelines().size(), 2);
+        // We advance all timelines in parallel
+        Assert.assertEquals(accumulator.getTimelines().get(sampleKindId).getSampleCount(), sampleCount + secondSampleCount);
+        Assert.assertEquals(accumulator.getTimelines().get(sampleKindId).getMetricId(), sampleKindId);
+        Assert.assertEquals(accumulator.getTimelines().get(secondSampleKindId).getSampleCount(), sampleCount + secondSampleCount);
+        Assert.assertEquals(accumulator.getTimelines().get(secondSampleKindId).getMetricId(), secondSampleKindId);
+    }
+
+    private void sendData(final TimelineSourceEventAccumulator accumulator, final DateTime startTime, final int sampleCount, final int sampleKindId) {
+        final Map<Integer, ScalarSample> samples = new HashMap<Integer, ScalarSample>();
+
+        for (int i = 0; i < sampleCount; i++) {
+            samples.put(sampleKindId, new ScalarSample<Long>(SampleOpcode.LONG, i + 1242L));
+            final SourceSamplesForTimestamp hostSamplesForTimestamp = new SourceSamplesForTimestamp(HOST_ID, "JVM", startTime.plusSeconds(i), samples);
+            accumulator.addSourceSamples(hostSamplesForTimestamp);
+        }
+    }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/TimelineLoadGenerator.java b/meter/src/test/java/com/ning/billing/meter/timeline/TimelineLoadGenerator.java
new file mode 100644
index 0000000..ab34e70
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/TimelineLoadGenerator.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.persistent.CachingTimelineDao;
+import com.ning.billing.meter.timeline.persistent.DefaultTimelineDao;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.CallOrigin;
+import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.clock.ClockMock;
+
+import com.google.common.collect.BiMap;
+
+/**
+ * This class simulates the database load due to insertions and deletions of
+ * TimelineChunks rows, as required by sample processing and
+ * aggregation.  Each is single-threaded.
+ */
+public class TimelineLoadGenerator {
+
+    private static final Logger log = LoggerFactory.getLogger(TimelineLoadGenerator.class);
+    private static final int EVENT_CATEGORY_COUNT = Integer.parseInt(System.getProperty("com.ning.billing.timeline.eventCategoryCount", "250"));
+    private static final int HOST_ID_COUNT = Integer.parseInt(System.getProperty("com.ning.billing.timeline.hostIdCount", "2000"));
+    private static final int AVERAGE_SAMPLE_KINDS_PER_CATEGORY = Integer.parseInt(System.getProperty("com.ning.billing.timeline.averageSampleKindsPerCategory", "20"));
+    private static final int AVERAGE_CATEGORIES_PER_HOST = Integer.parseInt(System.getProperty("com.ning.billing.timeline.averageSampleKindsPerCategory", "25"));
+    private static final int SAMPLE_KIND_COUNT = EVENT_CATEGORY_COUNT * AVERAGE_SAMPLE_KINDS_PER_CATEGORY;
+    private static final int CREATE_BATCH_SIZE = Integer.parseInt(System.getProperty("com.ning.billing.timeline.createBatchSize", "1000"));
+    // Mandatory properties
+    private static final String DBI_URL = System.getProperty("com.ning.billing.timeline.db.url");
+    private static final String DBI_USER = System.getProperty("com.ning.billing.timeline.db.user");
+    private static final String DBI_PASSWORD = System.getProperty("com.ning.billing.timeline.db.password");
+
+    private static final Random rand = new Random(System.currentTimeMillis());
+
+    private final List<Integer> hostIds;
+    private final BiMap<Integer, String> hosts;
+    private final BiMap<Integer, String> eventCategories;
+    private final List<Integer> eventCategoryIds;
+    private final BiMap<Integer, CategoryRecordIdAndMetric> sampleKindsBiMap;
+    private final Map<Integer, List<Integer>> categorySampleKindIds;
+    private final Map<Integer, List<Integer>> categoriesForHostId;
+
+    private final DefaultTimelineDao defaultTimelineDAO;
+    private final CachingTimelineDao timelineDAO;
+    private final DBI dbi;
+    private final TimelineCoder timelineCoder;
+
+    private final AtomicInteger timelineChunkIdCounter = new AtomicInteger(0);
+
+    private final Clock clock = new ClockMock();
+    private final InternalCallContext internalCallContext = new InternalCallContext(InternalCallContextFactory.INTERNAL_TENANT_RECORD_ID, 1687L, UUID.randomUUID(),
+                                                                                    UUID.randomUUID().toString(), CallOrigin.TEST,
+                                                                                    UserType.TEST, "Testing", "This is a test",
+                                                                                    clock.getUTCNow(), clock.getUTCNow());
+
+    public TimelineLoadGenerator() {
+        this.timelineCoder = new DefaultTimelineCoder();
+
+        this.dbi = new DBI(DBI_URL, DBI_USER, DBI_PASSWORD);
+        this.defaultTimelineDAO = new DefaultTimelineDao(dbi);
+        this.timelineDAO = new CachingTimelineDao(defaultTimelineDAO);
+        log.info("DBI initialized");
+
+        // Make some hosts
+        final List<String> hostNames = new ArrayList<String>(HOST_ID_COUNT);
+        for (int i = 0; i < HOST_ID_COUNT; i++) {
+            final String hostName = String.format("host-%d", i + 1);
+            hostNames.add(hostName);
+            defaultTimelineDAO.getOrAddSource(hostName, internalCallContext);
+        }
+        hosts = timelineDAO.getSources(internalCallContext);
+        hostIds = new ArrayList<Integer>(hosts.keySet());
+        Collections.sort(hostIds);
+        log.info("%d hosts created", hostIds.size());
+
+        // Make some event categories
+        final List<String> categoryNames = new ArrayList<String>(EVENT_CATEGORY_COUNT);
+        for (int i = 0; i < EVENT_CATEGORY_COUNT; i++) {
+            final String category = String.format("category-%d", i);
+            categoryNames.add(category);
+            defaultTimelineDAO.getOrAddEventCategory(category, internalCallContext);
+        }
+        eventCategories = timelineDAO.getEventCategories(internalCallContext);
+        eventCategoryIds = new ArrayList<Integer>(eventCategories.keySet());
+        Collections.sort(eventCategoryIds);
+        log.info("%d event categories created", eventCategoryIds.size());
+
+        // Make some sample kinds.  For now, give each category the same number of sample kinds
+        final List<CategoryRecordIdAndMetric> categoriesAndSampleKinds = new ArrayList<CategoryRecordIdAndMetric>();
+        for (final int eventCategoryId : eventCategoryIds) {
+            for (int i = 0; i < AVERAGE_SAMPLE_KINDS_PER_CATEGORY; i++) {
+                final String sampleKind = String.format("%s-sample-kind-%d", eventCategories.get(eventCategoryId), i + 1);
+                categoriesAndSampleKinds.add(new CategoryRecordIdAndMetric(eventCategoryId, sampleKind));
+                defaultTimelineDAO.getOrAddMetric(eventCategoryId, sampleKind, internalCallContext);
+            }
+        }
+        // Make a fast map from categoryId to a list of sampleKindIds in that category
+        sampleKindsBiMap = timelineDAO.getMetrics(internalCallContext);
+        categorySampleKindIds = new HashMap<Integer, List<Integer>>();
+        int sampleKindIdCounter = 0;
+        for (final Map.Entry<Integer, CategoryRecordIdAndMetric> entry : sampleKindsBiMap.entrySet()) {
+            final int categoryId = entry.getValue().getEventCategoryId();
+            List<Integer> sampleKindIds = categorySampleKindIds.get(categoryId);
+            if (sampleKindIds == null) {
+                sampleKindIds = new ArrayList<Integer>();
+                categorySampleKindIds.put(categoryId, sampleKindIds);
+            }
+            final int sampleKindId = entry.getKey();
+            sampleKindIds.add(sampleKindId);
+            sampleKindIdCounter++;
+        }
+        log.info("%d sampleKindIds created", sampleKindIdCounter);
+        // Assign categories to hosts
+        categoriesForHostId = new HashMap<Integer, List<Integer>>();
+        int categoryCounter = 0;
+        for (final int hostId : hostIds) {
+            final List<Integer> categories = new ArrayList<Integer>();
+            categoriesForHostId.put(hostId, categories);
+            for (int i = 0; i < AVERAGE_CATEGORIES_PER_HOST; i++) {
+                final int categoryId = eventCategoryIds.get(categoryCounter);
+                categories.add(categoryId);
+                categoryCounter = (categoryCounter + 1) % EVENT_CATEGORY_COUNT;
+            }
+        }
+        log.info("Finished creating hosts, categories and sample kinds");
+    }
+
+    private void addChunkAndMaybeSave(final List<TimelineChunk> timelineChunkList, final TimelineChunk timelineChunk) {
+        timelineChunkList.add(timelineChunk);
+        if (timelineChunkList.size() >= CREATE_BATCH_SIZE) {
+            defaultTimelineDAO.bulkInsertTimelineChunks(timelineChunkList, internalCallContext);
+            timelineChunkList.clear();
+            log.info("Inserted %d TimelineChunk rows", timelineChunkIdCounter.get());
+        }
+    }
+
+    /**
+     * This method simulates adding a ton of timelines, in more-or-less the way they would be added in real life.
+     */
+    private void insertManyTimelines() throws Exception {
+        final List<TimelineChunk> timelineChunkList = new ArrayList<TimelineChunk>();
+        DateTime startTime = new DateTime().minusDays(1);
+        DateTime endTime = startTime.plusHours(1);
+        final int sampleCount = 120;  // 1 hours worth
+        for (int i = 0; i < 12; i++) {
+            for (final int hostId : hostIds) {
+                for (final int categoryId : categoriesForHostId.get(hostId)) {
+                    final List<DateTime> dateTimes = new ArrayList<DateTime>(sampleCount);
+                    for (int sc = 0; sc < sampleCount; sc++) {
+                        dateTimes.add(startTime.plusSeconds(sc * 30));
+                    }
+                    final byte[] timeBytes = timelineCoder.compressDateTimes(dateTimes);
+                    for (final int sampleKindId : categorySampleKindIds.get(categoryId)) {
+                        final TimelineChunk timelineChunk = makeTimelineChunk(hostId, sampleKindId, startTime, endTime, timeBytes, sampleCount);
+                        addChunkAndMaybeSave(timelineChunkList, timelineChunk);
+
+                    }
+                }
+            }
+            if (timelineChunkList.size() > 0) {
+                defaultTimelineDAO.bulkInsertTimelineChunks(timelineChunkList, internalCallContext);
+            }
+            log.info("After hour %d, inserted %d TimelineChunk rows", i, timelineChunkIdCounter.get());
+            startTime = endTime;
+            endTime = endTime.plusHours(1);
+        }
+    }
+
+    private TimelineChunk makeTimelineChunk(final int hostId, final int sampleKindId, final DateTime startTime, final DateTime endTime, final byte[] timeBytes, final int sampleCount) {
+        final byte[] samples = new byte[3 + rand.nextInt(sampleCount) * 2];
+        return new TimelineChunk(timelineChunkIdCounter.incrementAndGet(), hostId, sampleKindId, startTime, endTime, timeBytes, samples, sampleCount);
+    }
+
+    public static void main(final String[] args) throws Exception {
+        final TimelineLoadGenerator loadGenerator = new TimelineLoadGenerator();
+        loadGenerator.insertManyTimelines();
+    }
+}