killbill-aplcache

usage: initial import of TimelineEventHandler This is the

7/29/2012 12:29:37 AM

Details

diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/BackgroundDBChunkWriter.java b/usage/src/main/java/com/ning/billing/usage/timeline/BackgroundDBChunkWriter.java
new file mode 100644
index 0000000..a7ab352
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/BackgroundDBChunkWriter.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.config.UsageConfig;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.persistent.TimelineDao;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * This class runs a thread that batch-writes TimelineChunks to the db.
+ * This class is thread-safe, and only holds up threads that want to queue
+ * TimelineChunks for the time it takes to copy the ArrayList of PendingChunkMaps.
+ * <p/>
+ * The background writing thread is scheduled every few seconds, as controlled by
+ * config.getBackgroundWriteCheckInterval().  It writes the current inventory of
+ * chunks if there are at least config.getBackgroundWriteBatchSize()
+ * TimelineChunks to be written, or if the time since the last write exceeds
+ * config.getBackgroundWriteMaxDelay().
+ */
+@Singleton
+public class BackgroundDBChunkWriter {
+
+    private static final Logger log = LoggerFactory.getLogger(BackgroundDBChunkWriter.class);
+
+    private final TimelineDao timelineDAO;
+    private final UsageConfig config;
+    private final boolean performForegroundWrites;
+
+    private final AtomicInteger pendingChunkCount = new AtomicInteger();
+    private final AtomicBoolean shuttingDown = new AtomicBoolean();
+    private List<PendingChunkMap> pendingChunks = new ArrayList<PendingChunkMap>();
+    private DateTime lastWriteTime = new DateTime();
+    private AtomicBoolean doingWritesNow = new AtomicBoolean();
+    private final ScheduledExecutorService backgroundWriteThread = Executors.newSingleThreadScheduledExecutor();
+
+    private final AtomicLong maybePerformBackgroundWritesCount = new AtomicLong();
+    private final AtomicLong backgroundWritesCount = new AtomicLong();
+    private final AtomicLong pendingChunkMapsAdded = new AtomicLong();
+    private final AtomicLong pendingChunksAdded = new AtomicLong();
+    private final AtomicLong pendingChunkMapsWritten = new AtomicLong();
+    private final AtomicLong pendingChunksWritten = new AtomicLong();
+    private final AtomicLong pendingChunkMapsMarkedConsumed = new AtomicLong();
+    private final AtomicLong foregroundChunkMapsWritten = new AtomicLong();
+    private final AtomicLong foregroundChunksWritten = new AtomicLong();
+
+    @Inject
+    public BackgroundDBChunkWriter(final TimelineDao timelineDAO, final UsageConfig config) {
+        this(timelineDAO, config, config.getPerformForegroundWrites());
+    }
+
+    public BackgroundDBChunkWriter(final TimelineDao timelineDAO, @Nullable final UsageConfig config, final boolean performForegroundWrites) {
+        this.timelineDAO = timelineDAO;
+        this.config = config;
+        this.performForegroundWrites = performForegroundWrites;
+    }
+
+    public synchronized void addPendingChunkMap(final PendingChunkMap chunkMap) {
+        if (shuttingDown.get()) {
+            log.error("In addPendingChunkMap(), but finishBackgroundWritingAndExit is true!");
+        } else {
+            if (performForegroundWrites) {
+                foregroundChunkMapsWritten.incrementAndGet();
+                final List<TimelineChunk> chunksToWrite = new ArrayList<TimelineChunk>(chunkMap.getChunkMap().values());
+                foregroundChunksWritten.addAndGet(chunksToWrite.size());
+                timelineDAO.bulkInsertTimelineChunks(chunksToWrite);
+                chunkMap.getAccumulator().markPendingChunkMapConsumed(chunkMap.getPendingChunkMapId());
+            } else {
+                pendingChunkMapsAdded.incrementAndGet();
+                final int chunkCount = chunkMap.getChunkCount();
+                pendingChunksAdded.addAndGet(chunkCount);
+                pendingChunks.add(chunkMap);
+                pendingChunkCount.addAndGet(chunkCount);
+            }
+        }
+    }
+
+    private void performBackgroundWrites() {
+        backgroundWritesCount.incrementAndGet();
+        List<PendingChunkMap> chunkMapsToWrite = null;
+        synchronized (this) {
+            chunkMapsToWrite = pendingChunks;
+            pendingChunks = new ArrayList<PendingChunkMap>();
+            pendingChunkCount.set(0);
+        }
+        final List<TimelineChunk> chunks = new ArrayList<TimelineChunk>();
+        for (final PendingChunkMap map : chunkMapsToWrite) {
+            pendingChunkMapsWritten.incrementAndGet();
+            pendingChunksWritten.addAndGet(map.getChunkMap().size());
+            chunks.addAll(map.getChunkMap().values());
+        }
+        timelineDAO.bulkInsertTimelineChunks(chunks);
+        for (final PendingChunkMap map : chunkMapsToWrite) {
+            pendingChunkMapsMarkedConsumed.incrementAndGet();
+            map.getAccumulator().markPendingChunkMapConsumed(map.getPendingChunkMapId());
+        }
+    }
+
+    private void maybePerformBackgroundWrites() {
+        // If already running background writes, just return
+        maybePerformBackgroundWritesCount.incrementAndGet();
+        if (!doingWritesNow.compareAndSet(false, true)) {
+            return;
+        } else {
+            try {
+                if (shuttingDown.get()) {
+                    performBackgroundWrites();
+                }
+                final int pendingCount = pendingChunkCount.get();
+                if (pendingCount > 0) {
+                    if (pendingCount >= config.getBackgroundWriteBatchSize() ||
+                            new DateTime().isBefore(lastWriteTime.plusMillis((int) config.getBackgroundWriteMaxDelay().getMillis()))) {
+                        performBackgroundWrites();
+                        lastWriteTime = new DateTime();
+                    }
+                }
+            } finally {
+                doingWritesNow.set(false);
+            }
+        }
+    }
+
+    public synchronized boolean getShutdownFinished() {
+        return !doingWritesNow.get() && pendingChunks.size() == 0;
+    }
+
+    public void initiateShutdown() {
+        shuttingDown.set(true);
+    }
+
+    public void runBackgroundWriteThread() {
+        if (!performForegroundWrites) {
+            backgroundWriteThread.scheduleWithFixedDelay(new Runnable() {
+                @Override
+                public void run() {
+                    maybePerformBackgroundWrites();
+                }
+            },
+                                                         config.getBackgroundWriteCheckInterval().getMillis(),
+                                                         config.getBackgroundWriteCheckInterval().getMillis(),
+                                                         TimeUnit.MILLISECONDS);
+        }
+    }
+
+    public void stopBackgroundWriteThread() {
+        if (!performForegroundWrites) {
+            backgroundWriteThread.shutdown();
+        }
+    }
+
+    public long getMaybePerformBackgroundWritesCount() {
+        return maybePerformBackgroundWritesCount.get();
+    }
+
+    public long getBackgroundWritesCount() {
+        return backgroundWritesCount.get();
+    }
+
+    public long getPendingChunkMapsAdded() {
+        return pendingChunkMapsAdded.get();
+    }
+
+    public long getPendingChunksAdded() {
+        return pendingChunksAdded.get();
+    }
+
+    public long getPendingChunkMapsWritten() {
+        return pendingChunkMapsWritten.get();
+    }
+
+    public long getPendingChunksWritten() {
+        return pendingChunksWritten.get();
+    }
+
+    public long getPendingChunkMapsMarkedConsumed() {
+        return pendingChunkMapsMarkedConsumed.get();
+    }
+
+    public long getForegroundChunkMapsWritten() {
+        return foregroundChunkMapsWritten.get();
+    }
+
+    public long getForegroundChunksWritten() {
+        return foregroundChunksWritten.get();
+    }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/PendingChunkMap.java b/usage/src/main/java/com/ning/billing/usage/timeline/PendingChunkMap.java
new file mode 100644
index 0000000..5337e77
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/PendingChunkMap.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline;
+
+import java.util.Map;
+
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+
+public class PendingChunkMap {
+
+    private final TimelineSourceEventAccumulator accumulator;
+    private final long pendingChunkMapId;
+    private final Map<Integer, TimelineChunk> chunkMap;
+
+    public PendingChunkMap(final TimelineSourceEventAccumulator accumulator, final long pendingChunkMapId, final Map<Integer, TimelineChunk> chunkMap) {
+        this.accumulator = accumulator;
+        this.pendingChunkMapId = pendingChunkMapId;
+        this.chunkMap = chunkMap;
+    }
+
+    public TimelineSourceEventAccumulator getAccumulator() {
+        return accumulator;
+    }
+
+    public long getPendingChunkMapId() {
+        return pendingChunkMapId;
+    }
+
+    public Map<Integer, TimelineChunk> getChunkMap() {
+        return chunkMap;
+    }
+
+    public int getChunkCount() {
+        return chunkMap.size();
+    }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/TimelineEventHandler.java b/usage/src/main/java/com/ning/billing/usage/timeline/TimelineEventHandler.java
new file mode 100644
index 0000000..c7e3903
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/TimelineEventHandler.java
@@ -0,0 +1,574 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.config.UsageConfig;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.codec.SampleCoder;
+import com.ning.billing.usage.timeline.codec.TimelineChunkAccumulator;
+import com.ning.billing.usage.timeline.persistent.FileBackedBuffer;
+import com.ning.billing.usage.timeline.persistent.Replayer;
+import com.ning.billing.usage.timeline.persistent.TimelineDao;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.shutdown.ShutdownSaveMode;
+import com.ning.billing.usage.timeline.shutdown.StartTimes;
+import com.ning.billing.usage.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+
+public class TimelineEventHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(TimelineEventHandler.class);
+    private final ScheduledExecutorService purgeThread = Executors.newSingleThreadScheduledExecutor();
+    private static final Comparator<TimelineChunk> CHUNK_COMPARATOR = new Comparator<TimelineChunk>() {
+
+        @Override
+        public int compare(final TimelineChunk o1, final TimelineChunk o2) {
+            final int hostDiff = o1.getSourceId() - o1.getSourceId();
+            if (hostDiff < 0) {
+                return -1;
+            } else if (hostDiff > 0) {
+                return 1;
+            } else {
+                final int metricIdDiff = o1.getMetricId() - o2.getMetricId();
+                if (metricIdDiff < 0) {
+                    return -1;
+                } else if (metricIdDiff > 0) {
+                    return 1;
+                } else {
+                    final long startTimeDiff = o1.getStartTime().getMillis() - o2.getStartTime().getMillis();
+                    if (startTimeDiff < 0) {
+                        return -1;
+                    } else if (startTimeDiff > 0) {
+                        return 1;
+                    } else {
+                        return 0;
+                    }
+                }
+            }
+        }
+    };
+
+    // A TimelineSourceEventAccumulator records attributes for a specific host and event type.
+    // This cache maps sourceId -> categoryId -> accumulator
+    //
+    // TODO: There are still timing windows in the use of accumulators.  Enumerate them and
+    // either fix them or prove they are benign
+    private final Map<Integer, SourceAccumulatorsAndUpdateDate> accumulators = new ConcurrentHashMap<Integer, SourceAccumulatorsAndUpdateDate>();
+
+    private final UsageConfig config;
+    private final TimelineDao timelineDAO;
+    private final TimelineCoder timelineCoder;
+    private final SampleCoder sampleCoder;
+    private final BackgroundDBChunkWriter backgroundWriter;
+    private final FileBackedBuffer backingBuffer;
+
+    private final ShutdownSaveMode shutdownSaveMode;
+    private final AtomicBoolean shuttingDown = new AtomicBoolean();
+    private final AtomicBoolean replaying = new AtomicBoolean();
+
+    private final AtomicLong eventsDiscarded = new AtomicLong(0L);
+    private final AtomicLong eventsReceivedAfterShuttingDown = new AtomicLong();
+    private final AtomicLong handledEventCount = new AtomicLong();
+    private final AtomicLong addedSourceEventAccumulatorMapCount = new AtomicLong();
+    private final AtomicLong addedSourceEventAccumulatorCount = new AtomicLong();
+    private final AtomicLong getInMemoryChunksCallCount = new AtomicLong();
+    private final AtomicLong accumulatorDeepCopyCount = new AtomicLong();
+    private final AtomicLong inMemoryChunksReturnedCount = new AtomicLong();
+    private final AtomicLong replayCount = new AtomicLong();
+    private final AtomicLong replaySamplesFoundCount = new AtomicLong();
+    private final AtomicLong replaySamplesOutsideTimeRangeCount = new AtomicLong();
+    private final AtomicLong replaySamplesProcessedCount = new AtomicLong();
+    private final AtomicLong forceCommitCallCount = new AtomicLong();
+    private final AtomicLong purgedAccumsBecauseSourceNotUpdated = new AtomicLong();
+    private final AtomicLong purgedAccumsBecauseCategoryNotUpdated = new AtomicLong();
+
+    @Inject
+    public TimelineEventHandler(final UsageConfig config, final TimelineDao timelineDAO, final TimelineCoder timelineCoder, final SampleCoder sampleCoder, final BackgroundDBChunkWriter backgroundWriter, final FileBackedBuffer fileBackedBuffer) {
+        this.config = config;
+        this.timelineDAO = timelineDAO;
+        this.timelineCoder = timelineCoder;
+        this.sampleCoder = sampleCoder;
+        this.backgroundWriter = backgroundWriter;
+        this.backingBuffer = fileBackedBuffer;
+        this.shutdownSaveMode = ShutdownSaveMode.fromString(config.getShutdownSaveMode());
+    }
+
+    private void saveAccumulators() {
+        for (final Map.Entry<Integer, SourceAccumulatorsAndUpdateDate> entry : accumulators.entrySet()) {
+            final int sourceId = entry.getKey();
+            final Map<Integer, TimelineSourceEventAccumulator> hostAccumulators = entry.getValue().getCategoryAccumulators();
+            for (final Map.Entry<Integer, TimelineSourceEventAccumulator> accumulatorEntry : hostAccumulators.entrySet()) {
+                final int categoryId = accumulatorEntry.getKey();
+                final TimelineSourceEventAccumulator accumulator = accumulatorEntry.getValue();
+                log.debug("Saving Timeline for sourceId [{}] and categoryId [{}]", sourceId, categoryId);
+                accumulator.extractAndQueueTimelineChunks();
+            }
+        }
+    }
+
+    private void saveStartTimes(final StartTimes startTimes) {
+        for (final Map.Entry<Integer, SourceAccumulatorsAndUpdateDate> entry : accumulators.entrySet()) {
+            final int sourceId = entry.getKey();
+            final Map<Integer, TimelineSourceEventAccumulator> hostAccumulators = entry.getValue().getCategoryAccumulators();
+            for (final Map.Entry<Integer, TimelineSourceEventAccumulator> accumulatorEntry : hostAccumulators.entrySet()) {
+                final int categoryId = accumulatorEntry.getKey();
+                final TimelineSourceEventAccumulator accumulator = accumulatorEntry.getValue();
+                log.debug("Saving Timeline start time for sourceId [{}] and category [{}]", sourceId, categoryId);
+                startTimes.addTime(sourceId, categoryId, accumulator.getStartTime());
+            }
+        }
+    }
+
+    public synchronized void purgeOldSourcesAndAccumulators(final DateTime purgeIfBeforeDate) {
+        final List<Integer> oldSourceIds = new ArrayList<Integer>();
+        for (final Map.Entry<Integer, SourceAccumulatorsAndUpdateDate> entry : accumulators.entrySet()) {
+            final int sourceId = entry.getKey();
+            final SourceAccumulatorsAndUpdateDate accumulatorsAndDate = entry.getValue();
+            final DateTime lastUpdatedDate = accumulatorsAndDate.getLastUpdateDate();
+            if (lastUpdatedDate.isBefore(purgeIfBeforeDate)) {
+                oldSourceIds.add(sourceId);
+                purgedAccumsBecauseSourceNotUpdated.incrementAndGet();
+                for (final TimelineSourceEventAccumulator categoryAccumulator : accumulatorsAndDate.getCategoryAccumulators().values()) {
+                    categoryAccumulator.extractAndQueueTimelineChunks();
+                }
+            } else {
+                final List<Integer> categoryIdsToPurge = new ArrayList<Integer>();
+                final Map<Integer, TimelineSourceEventAccumulator> categoryMap = accumulatorsAndDate.getCategoryAccumulators();
+                for (final Map.Entry<Integer, TimelineSourceEventAccumulator> eventEntry : categoryMap.entrySet()) {
+                    final int categoryId = eventEntry.getKey();
+                    final TimelineSourceEventAccumulator categoryAccumulator = eventEntry.getValue();
+                    final DateTime latestTime = categoryAccumulator.getLatestSampleAddTime();
+                    if (latestTime != null && latestTime.isBefore(purgeIfBeforeDate)) {
+                        purgedAccumsBecauseCategoryNotUpdated.incrementAndGet();
+                        categoryAccumulator.extractAndQueueTimelineChunks();
+                        categoryIdsToPurge.add(categoryId);
+                    }
+                }
+                for (final int categoryId : categoryIdsToPurge) {
+                    categoryMap.remove(categoryId);
+                }
+            }
+        }
+        for (final int sourceIdToPurge : oldSourceIds) {
+            accumulators.remove(sourceIdToPurge);
+        }
+    }
+
+    /**
+     * Main entry point to the timeline subsystem. Record a series of sample for a given source, at a given timestamp.
+     *
+     * @param sourceId       id of the source
+     * @param eventType      event category
+     * @param eventTimestamp event timestamp
+     * @param samples        samples to record
+     */
+    public void record(final Integer sourceId, final String eventType, final DateTime eventTimestamp, final Map<String, Object> samples) {
+        if (shuttingDown.get()) {
+            eventsReceivedAfterShuttingDown.incrementAndGet();
+            return;
+        }
+        try {
+            handledEventCount.incrementAndGet();
+
+            // Extract and parse samples
+            final Map<Integer, ScalarSample> scalarSamples = new LinkedHashMap<Integer, ScalarSample>();
+            convertSamplesToScalarSamples(sourceId, eventType, samples, scalarSamples);
+
+            if (scalarSamples.isEmpty()) {
+                eventsDiscarded.incrementAndGet();
+                return;
+            }
+
+            final SourceSamplesForTimestamp sourceSamples = new SourceSamplesForTimestamp(sourceId, eventType, eventTimestamp, scalarSamples);
+            if (!replaying.get()) {
+                // Start by saving locally the samples
+                backingBuffer.append(sourceSamples);
+            }
+            // Then add them to the in-memory accumulator
+            processSamples(sourceSamples);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public TimelineSourceEventAccumulator getOrAddSourceEventAccumulator(final int sourceId, final int categoryId, final DateTime firstSampleTime) {
+        return this.getOrAddSourceEventAccumulator(sourceId, categoryId, firstSampleTime, (int) config.getTimelineLength().getMillis());
+    }
+
+    public synchronized TimelineSourceEventAccumulator getOrAddSourceEventAccumulator(final int sourceId, final int categoryId, final DateTime firstSampleTime, final int timelineLengthMillis) {
+        SourceAccumulatorsAndUpdateDate sourceAccumulatorsAndUpdateDate = accumulators.get(sourceId);
+        if (sourceAccumulatorsAndUpdateDate == null) {
+            addedSourceEventAccumulatorMapCount.incrementAndGet();
+            sourceAccumulatorsAndUpdateDate = new SourceAccumulatorsAndUpdateDate(new HashMap<Integer, TimelineSourceEventAccumulator>(), new DateTime());
+            accumulators.put(sourceId, sourceAccumulatorsAndUpdateDate);
+        }
+        sourceAccumulatorsAndUpdateDate.markUpdated();
+        final Map<Integer, TimelineSourceEventAccumulator> hostCategoryAccumulators = sourceAccumulatorsAndUpdateDate.getCategoryAccumulators();
+        TimelineSourceEventAccumulator accumulator = hostCategoryAccumulators.get(categoryId);
+        if (accumulator == null) {
+            addedSourceEventAccumulatorCount.incrementAndGet();
+            accumulator = new TimelineSourceEventAccumulator(timelineDAO, timelineCoder, sampleCoder, backgroundWriter, sourceId, categoryId, firstSampleTime, timelineLengthMillis);
+            hostCategoryAccumulators.put(categoryId, accumulator);
+            log.debug("Created new Timeline for sourceId [{}] and category [{}]", sourceId, categoryId);
+        }
+        return accumulator;
+    }
+
+    @VisibleForTesting
+    public void processSamples(final SourceSamplesForTimestamp hostSamples) throws ExecutionException, IOException {
+        final int sourceId = hostSamples.getSourceId();
+        final String category = hostSamples.getCategory();
+        final int categoryId = timelineDAO.getEventCategoryId(category);
+        final DateTime timestamp = hostSamples.getTimestamp();
+        final TimelineSourceEventAccumulator accumulator = getOrAddSourceEventAccumulator(sourceId, categoryId, timestamp);
+        accumulator.addSourceSamples(hostSamples);
+    }
+
+    public Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, @Nullable final DateTime filterStartTime, @Nullable final DateTime filterEndTime) throws IOException, ExecutionException {
+        return getInMemoryTimelineChunks(sourceId, ImmutableList.copyOf(timelineDAO.getMetricIdsBySourceId(sourceId)), filterStartTime, filterEndTime);
+    }
+
+    public Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, final Integer metricId, @Nullable final DateTime filterStartTime, @Nullable final DateTime filterEndTime) throws IOException, ExecutionException {
+        return getInMemoryTimelineChunks(sourceId, ImmutableList.<Integer>of(metricId), filterStartTime, filterEndTime);
+    }
+
+    public synchronized Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, final List<Integer> metricIds, @Nullable final DateTime filterStartTime, @Nullable final DateTime filterEndTime) throws IOException, ExecutionException {
+        getInMemoryChunksCallCount.incrementAndGet();
+        // Check first if there is an in-memory accumulator for this host
+        final SourceAccumulatorsAndUpdateDate sourceAccumulatorsAndDate = accumulators.get(sourceId);
+        if (sourceAccumulatorsAndDate == null) {
+            return ImmutableList.of();
+        }
+
+        // Now, filter each accumulator for this host
+        final List<TimelineChunk> samplesBySourceName = new ArrayList<TimelineChunk>();
+        for (final TimelineSourceEventAccumulator accumulator : sourceAccumulatorsAndDate.getCategoryAccumulators().values()) {
+            for (final TimelineChunk chunk : accumulator.getPendingTimelineChunks()) {
+                if ((filterStartTime != null && chunk.getEndTime().isBefore(filterStartTime)) ||
+                        (filterEndTime != null && chunk.getStartTime().isAfter(filterEndTime)) ||
+                        !metricIds.contains(chunk.getMetricId())) {
+                    continue;
+                } else {
+                    samplesBySourceName.add(chunk);
+                }
+            }
+            final List<DateTime> accumulatorTimes = accumulator.getTimes();
+            if (accumulatorTimes.size() == 0) {
+                continue;
+            }
+            final DateTime accumulatorStartTime = accumulator.getStartTime();
+            final DateTime accumulatorEndTime = accumulator.getEndTime();
+
+            // Check if the time filters apply
+            if ((filterStartTime != null && accumulatorEndTime.isBefore(filterStartTime)) || (filterEndTime != null && accumulatorStartTime.isAfter(filterEndTime))) {
+                // Ignore this accumulator
+                continue;
+            }
+
+            // This accumulator is in the right time range, now return only the sample kinds specified
+            final byte[] timeBytes = timelineCoder.compressDateTimes(accumulatorTimes);
+            for (final TimelineChunkAccumulator chunkAccumulator : accumulator.getTimelines().values()) {
+                if (metricIds.contains(chunkAccumulator.getMetricId())) {
+                    // Extract the timeline for this chunk by copying it and reading encoded bytes
+                    accumulatorDeepCopyCount.incrementAndGet();
+                    final TimelineChunkAccumulator chunkAccumulatorCopy = chunkAccumulator.deepCopy();
+                    final TimelineChunk timelineChunk = chunkAccumulatorCopy.extractTimelineChunkAndReset(accumulatorStartTime, accumulatorEndTime, timeBytes);
+                    samplesBySourceName.add(timelineChunk);
+                }
+            }
+        }
+        inMemoryChunksReturnedCount.addAndGet(samplesBySourceName.size());
+        Collections.sort(samplesBySourceName, CHUNK_COMPARATOR);
+        return samplesBySourceName;
+    }
+
+    @VisibleForTesting
+    void convertSamplesToScalarSamples(final Integer sourceId, final String eventType, final Map<String, Object> inputSamples, final Map<Integer, ScalarSample> outputSamples) {
+        if (inputSamples == null) {
+            return;
+        }
+        final Integer eventCategoryId = timelineDAO.getOrAddEventCategory(eventType);
+
+        for (final String attributeName : inputSamples.keySet()) {
+            final Integer metricId = timelineDAO.getOrAddMetric(sourceId, eventCategoryId, attributeName);
+            final Object sample = inputSamples.get(attributeName);
+
+            outputSamples.put(metricId, ScalarSample.fromObject(sample));
+        }
+    }
+
+    public void replay(final String spoolDir) {
+        replayCount.incrementAndGet();
+        log.info("Starting replay of files in {}", spoolDir);
+        final Replayer replayer = new Replayer(spoolDir);
+        StartTimes lastStartTimes = null;
+        if (shutdownSaveMode == ShutdownSaveMode.SAVE_START_TIMES) {
+            lastStartTimes = timelineDAO.getLastStartTimes();
+            if (lastStartTimes == null) {
+                log.info("Did not find startTimes");
+            } else {
+                log.info("Retrieved startTimes from the db");
+            }
+        }
+        final StartTimes startTimes = lastStartTimes;
+        final DateTime minStartTime = lastStartTimes == null ? null : startTimes.getMinStartTime();
+        final long found = replaySamplesFoundCount.get();
+        final long outsideTimeRange = replaySamplesOutsideTimeRangeCount.get();
+        final long processed = replaySamplesProcessedCount.get();
+
+        try {
+            // Read all files in the spool directory and delete them after process, if
+            // startTimes  is null.
+            replaying.set(true);
+            final int filesSkipped = replayer.readAll(startTimes == null, minStartTime, new Function<SourceSamplesForTimestamp, Void>() {
+                @Override
+                public Void apply(@Nullable final SourceSamplesForTimestamp hostSamples) {
+                    if (hostSamples != null) {
+                        replaySamplesFoundCount.incrementAndGet();
+                        boolean useSamples = true;
+                        try {
+                            final int sourceId = hostSamples.getSourceId();
+                            final String category = hostSamples.getCategory();
+                            final int categoryId = timelineDAO.getEventCategoryId(category);
+                            // If startTimes is non-null and the samples come from before the first time for
+                            // the given host and event category, ignore the samples
+                            if (startTimes != null) {
+                                final DateTime timestamp = hostSamples.getTimestamp();
+                                final DateTime categoryStartTime = startTimes.getStartTimeForSourceIdAndCategoryId(sourceId, categoryId);
+                                if (timestamp == null ||
+                                        timestamp.isBefore(startTimes.getMinStartTime()) ||
+                                        (categoryStartTime != null && timestamp.isBefore(categoryStartTime))) {
+                                    replaySamplesOutsideTimeRangeCount.incrementAndGet();
+                                    useSamples = false;
+                                }
+                            }
+                            if (useSamples) {
+                                replaySamplesProcessedCount.incrementAndGet();
+                                processSamples(hostSamples);
+                            }
+                        } catch (Exception e) {
+                            log.warn("Got exception replaying sample, data potentially lost! {}", hostSamples.toString());
+                        }
+                    }
+
+                    return null;
+                }
+            });
+            if (shutdownSaveMode == ShutdownSaveMode.SAVE_START_TIMES) {
+                timelineDAO.deleteLastStartTimes();
+                log.info("Deleted old startTimes");
+            }
+            log.info(String.format("Replay completed; %d files skipped, samples read %d, samples outside time range %d, samples used %d",
+                                   filesSkipped, replaySamplesFoundCount.get() - found, replaySamplesOutsideTimeRangeCount.get() - outsideTimeRange, replaySamplesProcessedCount.get() - processed));
+        } catch (RuntimeException e) {
+            // Catch the exception to make the collector start properly
+            log.error("Ignoring error when replaying the data", e);
+        } finally {
+            replaying.set(false);
+        }
+    }
+
+    public void forceCommit() {
+        forceCommitCallCount.incrementAndGet();
+        saveAccumulators();
+        backingBuffer.discard();
+        log.info("Timelines committed");
+    }
+
+    public void commitAndShutdown() {
+        shuttingDown.set(true);
+        final boolean doingFastShutdown = shutdownSaveMode == ShutdownSaveMode.SAVE_START_TIMES;
+        if (doingFastShutdown) {
+            final StartTimes startTimes = new StartTimes();
+            saveStartTimes(startTimes);
+            timelineDAO.insertLastStartTimes(startTimes);
+            log.info("During shutdown, saved timeline start times in the db");
+        } else {
+            saveAccumulators();
+            log.info("During shutdown, saved timeline accumulators");
+        }
+        performShutdown();
+        backingBuffer.discard();
+    }
+
+    private void performShutdown() {
+        backgroundWriter.initiateShutdown();
+        while (!backgroundWriter.getShutdownFinished()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+        purgeThread.shutdown();
+    }
+
+    private synchronized void purgeFilesAndAccumulators() {
+        this.purgeFilesAndAccumulators(new DateTime().minus(config.getTimelineLength().getMillis()), new DateTime().minus(2 * config.getTimelineLength().getMillis()));
+    }
+
+    // TODO: We have a bad interaction between startTimes and purging: If the system is down
+    // for two hours, we may not want it to purge everything.  Figure out what to do about this.
+    private synchronized void purgeFilesAndAccumulators(final DateTime purgeAccumulatorsIfBefore, final DateTime purgeFilesIfBefore) {
+        purgeOldSourcesAndAccumulators(purgeAccumulatorsIfBefore);
+        final Replayer replayer = new Replayer(config.getSpoolDir());
+        replayer.purgeOldFiles(purgeFilesIfBefore);
+    }
+
+    public void startHandlerThreads() {
+        purgeThread.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                purgeFilesAndAccumulators();
+            }
+        },
+                                           config.getTimelineLength().getMillis(),
+                                           config.getTimelineLength().getMillis(),
+                                           TimeUnit.MILLISECONDS);
+    }
+
+    // We use the lastUpdateDate to purge sources and their accumulators from the map
+    private static class SourceAccumulatorsAndUpdateDate {
+
+        private final Map<Integer, TimelineSourceEventAccumulator> categoryAccumulators;
+        private DateTime lastUpdateDate;
+
+        public SourceAccumulatorsAndUpdateDate(final Map<Integer, TimelineSourceEventAccumulator> categoryAccumulators, final DateTime lastUpdateDate) {
+            this.categoryAccumulators = categoryAccumulators;
+            this.lastUpdateDate = lastUpdateDate;
+        }
+
+        public Map<Integer, TimelineSourceEventAccumulator> getCategoryAccumulators() {
+            return categoryAccumulators;
+        }
+
+        public DateTime getLastUpdateDate() {
+            return lastUpdateDate;
+        }
+
+        public void markUpdated() {
+            lastUpdateDate = new DateTime();
+        }
+    }
+
+    @VisibleForTesting
+    public Collection<TimelineSourceEventAccumulator> getAccumulators() {
+        final List<TimelineSourceEventAccumulator> inMemoryAccumulator = new ArrayList<TimelineSourceEventAccumulator>();
+        for (final SourceAccumulatorsAndUpdateDate sourceEventAccumulatorMap : accumulators.values()) {
+            inMemoryAccumulator.addAll(sourceEventAccumulatorMap.getCategoryAccumulators().values());
+        }
+
+        return inMemoryAccumulator;
+    }
+
+    @VisibleForTesting
+    public FileBackedBuffer getBackingBuffer() {
+        return backingBuffer;
+    }
+
+    public long getEventsDiscarded() {
+        return eventsDiscarded.get();
+    }
+
+    public long getSourceEventAccumulatorCount() {
+        return accumulators.size();
+    }
+
+    public long getEventsReceivedAfterShuttingDown() {
+        return eventsReceivedAfterShuttingDown.get();
+    }
+
+    public long getHandledEventCount() {
+        return handledEventCount.get();
+    }
+
+    public long getAddedSourceEventAccumulatorMapCount() {
+        return addedSourceEventAccumulatorMapCount.get();
+    }
+
+    public long getAddedSourceEventAccumulatorCount() {
+        return addedSourceEventAccumulatorCount.get();
+    }
+
+    public long getGetInMemoryChunksCallCount() {
+        return getInMemoryChunksCallCount.get();
+    }
+
+    public long getAccumulatorDeepCopyCount() {
+        return accumulatorDeepCopyCount.get();
+    }
+
+    public long getInMemoryChunksReturnedCount() {
+        return inMemoryChunksReturnedCount.get();
+    }
+
+    public long getReplayCount() {
+        return replayCount.get();
+    }
+
+    public long getReplaySamplesFoundCount() {
+        return replaySamplesFoundCount.get();
+    }
+
+    public long getReplaySamplesOutsideTimeRangeCount() {
+        return replaySamplesOutsideTimeRangeCount.get();
+    }
+
+    public long getReplaySamplesProcessedCount() {
+        return replaySamplesProcessedCount.get();
+    }
+
+    public long getForceCommitCallCount() {
+        return forceCommitCallCount.get();
+    }
+
+    public long getPurgedAccumsBecauseSourceNotUpdated() {
+        return purgedAccumsBecauseSourceNotUpdated.get();
+    }
+
+    public long getPurgedAccumsBecauseCategoryNotUpdated() {
+        return purgedAccumsBecauseCategoryNotUpdated.get();
+    }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/TimelineSourceEventAccumulator.java b/usage/src/main/java/com/ning/billing/usage/timeline/TimelineSourceEventAccumulator.java
new file mode 100644
index 0000000..d352dd9
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/TimelineSourceEventAccumulator.java
@@ -0,0 +1,314 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.codec.SampleCoder;
+import com.ning.billing.usage.timeline.codec.TimelineChunkAccumulator;
+import com.ning.billing.usage.timeline.persistent.TimelineDao;
+import com.ning.billing.usage.timeline.samples.NullSample;
+import com.ning.billing.usage.timeline.samples.RepeatSample;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+
+/**
+ * This class represents a collection of timeline chunks, one for each
+ * metric belonging to one event category, each over a specific time period,
+ * from a single source.  This class is used to accumulate samples
+ * to be written to the database; a separate streaming class with
+ * much less overhead is used to "play back" the samples read from
+ * the db in response to queries.
+ * <p/>
+ * All subordinate timelines contain the same number of samples.
+ * <p/>
+ * When enough samples have accumulated, typically one hour's worth,
+ * in-memory samples are made into TimelineChunks, one chunk for each metricId
+ * maintained by the accumulator.
+ * <p/>
+ * These new chunks are organized as PendingChunkMaps, kept in a local list and also
+ * handed off to a PendingChunkMapConsumer to written to the db by a background process.  At some
+ * in the future, that background process will call markPendingChunkMapConsumed(),
+ * passing the id of a PendingChunkMap.  This causes the PendingChunkMap
+ * to be removed from the local list maintained by the TimelineSourceEventAccumulator.
+ * <p/>
+ * Queries that cause the TimelineSourceEventAccumulator instance to return memory
+ * chunks also return any chunks in PendingChunkMaps in the local list of pending chunks.
+ */
+public class TimelineSourceEventAccumulator {
+
+    private static final Logger log = LoggerFactory.getLogger(TimelineSourceEventAccumulator.class);
+    private static final DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime();
+    private static final NullSample nullSample = new NullSample();
+    private static final boolean checkEveryAccess = Boolean.parseBoolean(System.getProperty("killbill.usage.checkEveryAccess"));
+    private static final Random rand = new Random(0);
+
+    private final Map<Integer, SampleSequenceNumber> metricIdCounters = new HashMap<Integer, SampleSequenceNumber>();
+    private final List<PendingChunkMap> pendingChunkMaps = new ArrayList<PendingChunkMap>();
+    private long pendingChunkMapIdCounter = 1;
+
+    private final BackgroundDBChunkWriter backgroundWriter;
+    private final TimelineCoder timelineCoder;
+    private final SampleCoder sampleCoder;
+    private final Integer timelineLengthMillis;
+    private final int sourceId;
+    private final int eventCategoryId;
+    // This is the time when we want to end the chunk.  Setting the value randomly
+    // when the TimelineSourceEventAccumulator  is created provides a mechanism to
+    // distribute the db writes
+    private DateTime chunkEndTime = null;
+    private DateTime startTime = null;
+    private DateTime endTime = null;
+    private DateTime latestSampleAddTime;
+    private long sampleSequenceNumber = 0;
+    private int sampleCount = 0;
+
+    /**
+     * Maps the sample kind id to the accumulator for that sample kind
+     */
+    private final Map<Integer, TimelineChunkAccumulator> timelines = new ConcurrentHashMap<Integer, TimelineChunkAccumulator>();
+
+    /**
+     * Holds the sampling times of the samples
+     */
+    private final List<DateTime> times = new ArrayList<DateTime>();
+
+    public TimelineSourceEventAccumulator(final TimelineDao dao, final TimelineCoder timelineCoder, final SampleCoder sampleCoder,
+                                          final BackgroundDBChunkWriter backgroundWriter, final int sourceId, final int eventCategoryId,
+                                          final DateTime firstSampleTime, final Integer timelineLengthMillis) {
+        this.timelineLengthMillis = timelineLengthMillis;
+        this.backgroundWriter = backgroundWriter;
+        this.timelineCoder = timelineCoder;
+        this.sampleCoder = sampleCoder;
+        this.sourceId = sourceId;
+        this.eventCategoryId = eventCategoryId;
+        // Set the end-of-chunk time by tossing a random number, to evenly distribute the db writeback load.
+        this.chunkEndTime = timelineLengthMillis != null ? firstSampleTime.plusMillis(rand.nextInt(timelineLengthMillis)) : null;
+    }
+
+    /*
+     * This constructor is used for testing; it writes chunks as soon as they are
+     * created, but because the chunkEndTime is way in the future, doesn't initiate
+     * chunk writes.
+     */
+    public TimelineSourceEventAccumulator(final TimelineDao timelineDAO, final TimelineCoder timelineCoder, final SampleCoder sampleCoder,
+                                          final Integer sourceId, final int eventTypeId, final DateTime firstSampleTime) {
+        this(timelineDAO, timelineCoder, sampleCoder, new BackgroundDBChunkWriter(timelineDAO, null, true), sourceId, eventTypeId, firstSampleTime, Integer.MAX_VALUE);
+    }
+
+    @SuppressWarnings("unchecked")
+    // TODO - we can probably do better than synchronize the whole method
+    public synchronized void addSourceSamples(final SourceSamplesForTimestamp samples) {
+        final DateTime timestamp = samples.getTimestamp();
+
+        if (chunkEndTime != null && chunkEndTime.isBefore(timestamp)) {
+            extractAndQueueTimelineChunks();
+            startTime = timestamp;
+            chunkEndTime = timestamp.plusMillis(timelineLengthMillis);
+        }
+
+        if (startTime == null) {
+            startTime = timestamp;
+        }
+        if (endTime == null) {
+            endTime = timestamp;
+        } else if (!timestamp.isAfter(endTime)) {
+            log.warn("Adding samples for host {}, timestamp {} is not after the end time {}; ignored",
+                     new Object[]{sourceId, dateFormatter.print(timestamp), dateFormatter.print(endTime)});
+            return;
+        }
+        sampleSequenceNumber++;
+        latestSampleAddTime = new DateTime();
+        for (final Map.Entry<Integer, ScalarSample> entry : samples.getSamples().entrySet()) {
+            final Integer metricId = entry.getKey();
+            final SampleSequenceNumber counter = metricIdCounters.get(metricId);
+            if (counter != null) {
+                counter.setSequenceNumber(sampleSequenceNumber);
+            } else {
+                metricIdCounters.put(metricId, new SampleSequenceNumber(sampleSequenceNumber));
+            }
+            final ScalarSample sample = entry.getValue();
+            TimelineChunkAccumulator timeline = timelines.get(metricId);
+            if (timeline == null) {
+                timeline = new TimelineChunkAccumulator(sourceId, metricId, sampleCoder);
+                if (sampleCount > 0) {
+                    addPlaceholders(timeline, sampleCount);
+                }
+                timelines.put(metricId, timeline);
+            }
+            final ScalarSample compressedSample = sampleCoder.compressSample(sample);
+            timeline.addSample(compressedSample);
+        }
+        for (final Map.Entry<Integer, SampleSequenceNumber> entry : metricIdCounters.entrySet()) {
+            final SampleSequenceNumber counter = entry.getValue();
+            if (counter.getSequenceNumber() < sampleSequenceNumber) {
+                counter.setSequenceNumber(sampleSequenceNumber);
+                final int metricId = entry.getKey();
+                final TimelineChunkAccumulator timeline = timelines.get(metricId);
+                timeline.addSample(nullSample);
+            }
+        }
+        // Now we can update the state
+        endTime = timestamp;
+        sampleCount++;
+        times.add(timestamp);
+
+        if (checkEveryAccess) {
+            checkSampleCounts(sampleCount);
+        }
+    }
+
+    private void addPlaceholders(final TimelineChunkAccumulator timeline, int countToAdd) {
+        final int maxRepeatSamples = RepeatSample.MAX_SHORT_REPEAT_COUNT;
+        while (countToAdd >= maxRepeatSamples) {
+            timeline.addPlaceholder((byte) maxRepeatSamples);
+            countToAdd -= maxRepeatSamples;
+        }
+        if (countToAdd > 0) {
+            timeline.addPlaceholder((byte) countToAdd);
+        }
+    }
+
+    /**
+     * This method queues a map of TimelineChunks extracted from the TimelineChunkAccumulators
+     * to be written to the db.  When memory chunks are requested, any queued chunk will be included
+     * in the list.
+     */
+    public synchronized void extractAndQueueTimelineChunks() {
+        if (times.size() > 0) {
+            final Map<Integer, TimelineChunk> chunkMap = new HashMap<Integer, TimelineChunk>();
+            final byte[] timeBytes = timelineCoder.compressDateTimes(times);
+            for (final Map.Entry<Integer, TimelineChunkAccumulator> entry : timelines.entrySet()) {
+                final int metricId = entry.getKey();
+                final TimelineChunkAccumulator accumulator = entry.getValue();
+                final TimelineChunk chunk = accumulator.extractTimelineChunkAndReset(startTime, endTime, timeBytes);
+                chunkMap.put(metricId, chunk);
+            }
+            times.clear();
+            sampleCount = 0;
+            final long counter = pendingChunkMapIdCounter++;
+            final PendingChunkMap newChunkMap = new PendingChunkMap(this, counter, chunkMap);
+            pendingChunkMaps.add(newChunkMap);
+            backgroundWriter.addPendingChunkMap(newChunkMap);
+        }
+    }
+
+    public synchronized void markPendingChunkMapConsumed(final long pendingChunkMapId) {
+        final PendingChunkMap pendingChunkMap = pendingChunkMaps.size() > 0 ? pendingChunkMaps.get(0) : null;
+        if (pendingChunkMap == null) {
+            log.error("In TimelineSourceEventAccumulator.markPendingChunkMapConsumed(), could not find the map for {}", pendingChunkMapId);
+        } else if (pendingChunkMapId != pendingChunkMap.getPendingChunkMapId()) {
+            log.error("In TimelineSourceEventAccumulator.markPendingChunkMapConsumed(), the next map has id {}, but we're consuming id {}",
+                      pendingChunkMap.getPendingChunkMapId(), pendingChunkMapId);
+        } else {
+            pendingChunkMaps.remove(0);
+        }
+    }
+
+    public synchronized List<TimelineChunk> getPendingTimelineChunks() {
+        final List<TimelineChunk> timelineChunks = new ArrayList<TimelineChunk>();
+        for (final PendingChunkMap pendingChunkMap : pendingChunkMaps) {
+            timelineChunks.addAll(pendingChunkMap.getChunkMap().values());
+        }
+        return timelineChunks;
+    }
+
+    /**
+     * Make sure all timelines have the sample count passed in; otherwise log
+     * discrepancies and return false
+     *
+     * @param assertedCount The sample count that all timelines are supposed to have
+     * @return true if all timelines have the right count; false otherwise
+     */
+    public boolean checkSampleCounts(final int assertedCount) {
+        boolean success = true;
+        if (assertedCount != sampleCount) {
+            log.error("For host {}, start time {}, the SourceTimeLines sampleCount {} is not equal to the assertedCount {}",
+                      new Object[]{sourceId, dateFormatter.print(startTime), sampleCount, assertedCount});
+            success = false;
+        }
+        for (final Map.Entry<Integer, TimelineChunkAccumulator> entry : timelines.entrySet()) {
+            final int metricId = entry.getKey();
+            final TimelineChunkAccumulator timeline = entry.getValue();
+            final int lineSampleCount = timeline.getSampleCount();
+            if (lineSampleCount != assertedCount) {
+                log.error("For host {}, start time {}, sample kind id {}, the sampleCount {} is not equal to the assertedCount {}",
+                          new Object[]{sourceId, dateFormatter.print(startTime), metricId, lineSampleCount, assertedCount});
+                success = false;
+            }
+        }
+        return success;
+    }
+
+    public int getSourceId() {
+        return sourceId;
+    }
+
+    public int getEventCategoryId() {
+        return eventCategoryId;
+    }
+
+    public DateTime getStartTime() {
+        return startTime;
+    }
+
+    public DateTime getEndTime() {
+        return endTime;
+    }
+
+    public Map<Integer, TimelineChunkAccumulator> getTimelines() {
+        return timelines;
+    }
+
+    public List<DateTime> getTimes() {
+        return times;
+    }
+
+    public DateTime getLatestSampleAddTime() {
+        return latestSampleAddTime;
+    }
+
+    private static class SampleSequenceNumber {
+
+        private long sequenceNumber;
+
+        public SampleSequenceNumber(final long sequenceNumber) {
+            this.sequenceNumber = sequenceNumber;
+        }
+
+        public long getSequenceNumber() {
+            return sequenceNumber;
+        }
+
+        public void setSequenceNumber(final long sequenceNumber) {
+            this.sequenceNumber = sequenceNumber;
+        }
+    }
+}