Details
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/BackgroundDBChunkWriter.java b/usage/src/main/java/com/ning/billing/usage/timeline/BackgroundDBChunkWriter.java
new file mode 100644
index 0000000..a7ab352
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/BackgroundDBChunkWriter.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.config.UsageConfig;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.persistent.TimelineDao;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * This class runs a thread that batch-writes TimelineChunks to the db.
+ * This class is thread-safe, and only holds up threads that want to queue
+ * TimelineChunks for the time it takes to copy the ArrayList of PendingChunkMaps.
+ * <p/>
+ * The background writing thread is scheduled every few seconds, as controlled by
+ * config.getBackgroundWriteCheckInterval(). It writes the current inventory of
+ * chunks if there are at least config.getBackgroundWriteBatchSize()
+ * TimelineChunks to be written, or if the time since the last write exceeds
+ * config.getBackgroundWriteMaxDelay().
+ */
+@Singleton
+public class BackgroundDBChunkWriter {
+
+ private static final Logger log = LoggerFactory.getLogger(BackgroundDBChunkWriter.class);
+
+ private final TimelineDao timelineDAO;
+ private final UsageConfig config;
+ private final boolean performForegroundWrites;
+
+ private final AtomicInteger pendingChunkCount = new AtomicInteger();
+ private final AtomicBoolean shuttingDown = new AtomicBoolean();
+ private List<PendingChunkMap> pendingChunks = new ArrayList<PendingChunkMap>();
+ private DateTime lastWriteTime = new DateTime();
+ private AtomicBoolean doingWritesNow = new AtomicBoolean();
+ private final ScheduledExecutorService backgroundWriteThread = Executors.newSingleThreadScheduledExecutor();
+
+ private final AtomicLong maybePerformBackgroundWritesCount = new AtomicLong();
+ private final AtomicLong backgroundWritesCount = new AtomicLong();
+ private final AtomicLong pendingChunkMapsAdded = new AtomicLong();
+ private final AtomicLong pendingChunksAdded = new AtomicLong();
+ private final AtomicLong pendingChunkMapsWritten = new AtomicLong();
+ private final AtomicLong pendingChunksWritten = new AtomicLong();
+ private final AtomicLong pendingChunkMapsMarkedConsumed = new AtomicLong();
+ private final AtomicLong foregroundChunkMapsWritten = new AtomicLong();
+ private final AtomicLong foregroundChunksWritten = new AtomicLong();
+
+ @Inject
+ public BackgroundDBChunkWriter(final TimelineDao timelineDAO, final UsageConfig config) {
+ this(timelineDAO, config, config.getPerformForegroundWrites());
+ }
+
+ public BackgroundDBChunkWriter(final TimelineDao timelineDAO, @Nullable final UsageConfig config, final boolean performForegroundWrites) {
+ this.timelineDAO = timelineDAO;
+ this.config = config;
+ this.performForegroundWrites = performForegroundWrites;
+ }
+
+ public synchronized void addPendingChunkMap(final PendingChunkMap chunkMap) {
+ if (shuttingDown.get()) {
+ log.error("In addPendingChunkMap(), but finishBackgroundWritingAndExit is true!");
+ } else {
+ if (performForegroundWrites) {
+ foregroundChunkMapsWritten.incrementAndGet();
+ final List<TimelineChunk> chunksToWrite = new ArrayList<TimelineChunk>(chunkMap.getChunkMap().values());
+ foregroundChunksWritten.addAndGet(chunksToWrite.size());
+ timelineDAO.bulkInsertTimelineChunks(chunksToWrite);
+ chunkMap.getAccumulator().markPendingChunkMapConsumed(chunkMap.getPendingChunkMapId());
+ } else {
+ pendingChunkMapsAdded.incrementAndGet();
+ final int chunkCount = chunkMap.getChunkCount();
+ pendingChunksAdded.addAndGet(chunkCount);
+ pendingChunks.add(chunkMap);
+ pendingChunkCount.addAndGet(chunkCount);
+ }
+ }
+ }
+
+ private void performBackgroundWrites() {
+ backgroundWritesCount.incrementAndGet();
+ List<PendingChunkMap> chunkMapsToWrite = null;
+ synchronized (this) {
+ chunkMapsToWrite = pendingChunks;
+ pendingChunks = new ArrayList<PendingChunkMap>();
+ pendingChunkCount.set(0);
+ }
+ final List<TimelineChunk> chunks = new ArrayList<TimelineChunk>();
+ for (final PendingChunkMap map : chunkMapsToWrite) {
+ pendingChunkMapsWritten.incrementAndGet();
+ pendingChunksWritten.addAndGet(map.getChunkMap().size());
+ chunks.addAll(map.getChunkMap().values());
+ }
+ timelineDAO.bulkInsertTimelineChunks(chunks);
+ for (final PendingChunkMap map : chunkMapsToWrite) {
+ pendingChunkMapsMarkedConsumed.incrementAndGet();
+ map.getAccumulator().markPendingChunkMapConsumed(map.getPendingChunkMapId());
+ }
+ }
+
+ private void maybePerformBackgroundWrites() {
+ // If already running background writes, just return
+ maybePerformBackgroundWritesCount.incrementAndGet();
+ if (!doingWritesNow.compareAndSet(false, true)) {
+ return;
+ } else {
+ try {
+ if (shuttingDown.get()) {
+ performBackgroundWrites();
+ }
+ final int pendingCount = pendingChunkCount.get();
+ if (pendingCount > 0) {
+ if (pendingCount >= config.getBackgroundWriteBatchSize() ||
+ new DateTime().isBefore(lastWriteTime.plusMillis((int) config.getBackgroundWriteMaxDelay().getMillis()))) {
+ performBackgroundWrites();
+ lastWriteTime = new DateTime();
+ }
+ }
+ } finally {
+ doingWritesNow.set(false);
+ }
+ }
+ }
+
+ public synchronized boolean getShutdownFinished() {
+ return !doingWritesNow.get() && pendingChunks.size() == 0;
+ }
+
+ public void initiateShutdown() {
+ shuttingDown.set(true);
+ }
+
+ public void runBackgroundWriteThread() {
+ if (!performForegroundWrites) {
+ backgroundWriteThread.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ maybePerformBackgroundWrites();
+ }
+ },
+ config.getBackgroundWriteCheckInterval().getMillis(),
+ config.getBackgroundWriteCheckInterval().getMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void stopBackgroundWriteThread() {
+ if (!performForegroundWrites) {
+ backgroundWriteThread.shutdown();
+ }
+ }
+
+ public long getMaybePerformBackgroundWritesCount() {
+ return maybePerformBackgroundWritesCount.get();
+ }
+
+ public long getBackgroundWritesCount() {
+ return backgroundWritesCount.get();
+ }
+
+ public long getPendingChunkMapsAdded() {
+ return pendingChunkMapsAdded.get();
+ }
+
+ public long getPendingChunksAdded() {
+ return pendingChunksAdded.get();
+ }
+
+ public long getPendingChunkMapsWritten() {
+ return pendingChunkMapsWritten.get();
+ }
+
+ public long getPendingChunksWritten() {
+ return pendingChunksWritten.get();
+ }
+
+ public long getPendingChunkMapsMarkedConsumed() {
+ return pendingChunkMapsMarkedConsumed.get();
+ }
+
+ public long getForegroundChunkMapsWritten() {
+ return foregroundChunkMapsWritten.get();
+ }
+
+ public long getForegroundChunksWritten() {
+ return foregroundChunksWritten.get();
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/PendingChunkMap.java b/usage/src/main/java/com/ning/billing/usage/timeline/PendingChunkMap.java
new file mode 100644
index 0000000..5337e77
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/PendingChunkMap.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline;
+
+import java.util.Map;
+
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+
+public class PendingChunkMap {
+
+ private final TimelineSourceEventAccumulator accumulator;
+ private final long pendingChunkMapId;
+ private final Map<Integer, TimelineChunk> chunkMap;
+
+ public PendingChunkMap(final TimelineSourceEventAccumulator accumulator, final long pendingChunkMapId, final Map<Integer, TimelineChunk> chunkMap) {
+ this.accumulator = accumulator;
+ this.pendingChunkMapId = pendingChunkMapId;
+ this.chunkMap = chunkMap;
+ }
+
+ public TimelineSourceEventAccumulator getAccumulator() {
+ return accumulator;
+ }
+
+ public long getPendingChunkMapId() {
+ return pendingChunkMapId;
+ }
+
+ public Map<Integer, TimelineChunk> getChunkMap() {
+ return chunkMap;
+ }
+
+ public int getChunkCount() {
+ return chunkMap.size();
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/TimelineEventHandler.java b/usage/src/main/java/com/ning/billing/usage/timeline/TimelineEventHandler.java
new file mode 100644
index 0000000..c7e3903
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/TimelineEventHandler.java
@@ -0,0 +1,574 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.config.UsageConfig;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.codec.SampleCoder;
+import com.ning.billing.usage.timeline.codec.TimelineChunkAccumulator;
+import com.ning.billing.usage.timeline.persistent.FileBackedBuffer;
+import com.ning.billing.usage.timeline.persistent.Replayer;
+import com.ning.billing.usage.timeline.persistent.TimelineDao;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.shutdown.ShutdownSaveMode;
+import com.ning.billing.usage.timeline.shutdown.StartTimes;
+import com.ning.billing.usage.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+
+public class TimelineEventHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(TimelineEventHandler.class);
+ private final ScheduledExecutorService purgeThread = Executors.newSingleThreadScheduledExecutor();
+ private static final Comparator<TimelineChunk> CHUNK_COMPARATOR = new Comparator<TimelineChunk>() {
+
+ @Override
+ public int compare(final TimelineChunk o1, final TimelineChunk o2) {
+ final int hostDiff = o1.getSourceId() - o1.getSourceId();
+ if (hostDiff < 0) {
+ return -1;
+ } else if (hostDiff > 0) {
+ return 1;
+ } else {
+ final int metricIdDiff = o1.getMetricId() - o2.getMetricId();
+ if (metricIdDiff < 0) {
+ return -1;
+ } else if (metricIdDiff > 0) {
+ return 1;
+ } else {
+ final long startTimeDiff = o1.getStartTime().getMillis() - o2.getStartTime().getMillis();
+ if (startTimeDiff < 0) {
+ return -1;
+ } else if (startTimeDiff > 0) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ }
+ }
+ };
+
+ // A TimelineSourceEventAccumulator records attributes for a specific host and event type.
+ // This cache maps sourceId -> categoryId -> accumulator
+ //
+ // TODO: There are still timing windows in the use of accumulators. Enumerate them and
+ // either fix them or prove they are benign
+ private final Map<Integer, SourceAccumulatorsAndUpdateDate> accumulators = new ConcurrentHashMap<Integer, SourceAccumulatorsAndUpdateDate>();
+
+ private final UsageConfig config;
+ private final TimelineDao timelineDAO;
+ private final TimelineCoder timelineCoder;
+ private final SampleCoder sampleCoder;
+ private final BackgroundDBChunkWriter backgroundWriter;
+ private final FileBackedBuffer backingBuffer;
+
+ private final ShutdownSaveMode shutdownSaveMode;
+ private final AtomicBoolean shuttingDown = new AtomicBoolean();
+ private final AtomicBoolean replaying = new AtomicBoolean();
+
+ private final AtomicLong eventsDiscarded = new AtomicLong(0L);
+ private final AtomicLong eventsReceivedAfterShuttingDown = new AtomicLong();
+ private final AtomicLong handledEventCount = new AtomicLong();
+ private final AtomicLong addedSourceEventAccumulatorMapCount = new AtomicLong();
+ private final AtomicLong addedSourceEventAccumulatorCount = new AtomicLong();
+ private final AtomicLong getInMemoryChunksCallCount = new AtomicLong();
+ private final AtomicLong accumulatorDeepCopyCount = new AtomicLong();
+ private final AtomicLong inMemoryChunksReturnedCount = new AtomicLong();
+ private final AtomicLong replayCount = new AtomicLong();
+ private final AtomicLong replaySamplesFoundCount = new AtomicLong();
+ private final AtomicLong replaySamplesOutsideTimeRangeCount = new AtomicLong();
+ private final AtomicLong replaySamplesProcessedCount = new AtomicLong();
+ private final AtomicLong forceCommitCallCount = new AtomicLong();
+ private final AtomicLong purgedAccumsBecauseSourceNotUpdated = new AtomicLong();
+ private final AtomicLong purgedAccumsBecauseCategoryNotUpdated = new AtomicLong();
+
+ @Inject
+ public TimelineEventHandler(final UsageConfig config, final TimelineDao timelineDAO, final TimelineCoder timelineCoder, final SampleCoder sampleCoder, final BackgroundDBChunkWriter backgroundWriter, final FileBackedBuffer fileBackedBuffer) {
+ this.config = config;
+ this.timelineDAO = timelineDAO;
+ this.timelineCoder = timelineCoder;
+ this.sampleCoder = sampleCoder;
+ this.backgroundWriter = backgroundWriter;
+ this.backingBuffer = fileBackedBuffer;
+ this.shutdownSaveMode = ShutdownSaveMode.fromString(config.getShutdownSaveMode());
+ }
+
+ private void saveAccumulators() {
+ for (final Map.Entry<Integer, SourceAccumulatorsAndUpdateDate> entry : accumulators.entrySet()) {
+ final int sourceId = entry.getKey();
+ final Map<Integer, TimelineSourceEventAccumulator> hostAccumulators = entry.getValue().getCategoryAccumulators();
+ for (final Map.Entry<Integer, TimelineSourceEventAccumulator> accumulatorEntry : hostAccumulators.entrySet()) {
+ final int categoryId = accumulatorEntry.getKey();
+ final TimelineSourceEventAccumulator accumulator = accumulatorEntry.getValue();
+ log.debug("Saving Timeline for sourceId [{}] and categoryId [{}]", sourceId, categoryId);
+ accumulator.extractAndQueueTimelineChunks();
+ }
+ }
+ }
+
+ private void saveStartTimes(final StartTimes startTimes) {
+ for (final Map.Entry<Integer, SourceAccumulatorsAndUpdateDate> entry : accumulators.entrySet()) {
+ final int sourceId = entry.getKey();
+ final Map<Integer, TimelineSourceEventAccumulator> hostAccumulators = entry.getValue().getCategoryAccumulators();
+ for (final Map.Entry<Integer, TimelineSourceEventAccumulator> accumulatorEntry : hostAccumulators.entrySet()) {
+ final int categoryId = accumulatorEntry.getKey();
+ final TimelineSourceEventAccumulator accumulator = accumulatorEntry.getValue();
+ log.debug("Saving Timeline start time for sourceId [{}] and category [{}]", sourceId, categoryId);
+ startTimes.addTime(sourceId, categoryId, accumulator.getStartTime());
+ }
+ }
+ }
+
+ public synchronized void purgeOldSourcesAndAccumulators(final DateTime purgeIfBeforeDate) {
+ final List<Integer> oldSourceIds = new ArrayList<Integer>();
+ for (final Map.Entry<Integer, SourceAccumulatorsAndUpdateDate> entry : accumulators.entrySet()) {
+ final int sourceId = entry.getKey();
+ final SourceAccumulatorsAndUpdateDate accumulatorsAndDate = entry.getValue();
+ final DateTime lastUpdatedDate = accumulatorsAndDate.getLastUpdateDate();
+ if (lastUpdatedDate.isBefore(purgeIfBeforeDate)) {
+ oldSourceIds.add(sourceId);
+ purgedAccumsBecauseSourceNotUpdated.incrementAndGet();
+ for (final TimelineSourceEventAccumulator categoryAccumulator : accumulatorsAndDate.getCategoryAccumulators().values()) {
+ categoryAccumulator.extractAndQueueTimelineChunks();
+ }
+ } else {
+ final List<Integer> categoryIdsToPurge = new ArrayList<Integer>();
+ final Map<Integer, TimelineSourceEventAccumulator> categoryMap = accumulatorsAndDate.getCategoryAccumulators();
+ for (final Map.Entry<Integer, TimelineSourceEventAccumulator> eventEntry : categoryMap.entrySet()) {
+ final int categoryId = eventEntry.getKey();
+ final TimelineSourceEventAccumulator categoryAccumulator = eventEntry.getValue();
+ final DateTime latestTime = categoryAccumulator.getLatestSampleAddTime();
+ if (latestTime != null && latestTime.isBefore(purgeIfBeforeDate)) {
+ purgedAccumsBecauseCategoryNotUpdated.incrementAndGet();
+ categoryAccumulator.extractAndQueueTimelineChunks();
+ categoryIdsToPurge.add(categoryId);
+ }
+ }
+ for (final int categoryId : categoryIdsToPurge) {
+ categoryMap.remove(categoryId);
+ }
+ }
+ }
+ for (final int sourceIdToPurge : oldSourceIds) {
+ accumulators.remove(sourceIdToPurge);
+ }
+ }
+
+ /**
+ * Main entry point to the timeline subsystem. Record a series of sample for a given source, at a given timestamp.
+ *
+ * @param sourceId id of the source
+ * @param eventType event category
+ * @param eventTimestamp event timestamp
+ * @param samples samples to record
+ */
+ public void record(final Integer sourceId, final String eventType, final DateTime eventTimestamp, final Map<String, Object> samples) {
+ if (shuttingDown.get()) {
+ eventsReceivedAfterShuttingDown.incrementAndGet();
+ return;
+ }
+ try {
+ handledEventCount.incrementAndGet();
+
+ // Extract and parse samples
+ final Map<Integer, ScalarSample> scalarSamples = new LinkedHashMap<Integer, ScalarSample>();
+ convertSamplesToScalarSamples(sourceId, eventType, samples, scalarSamples);
+
+ if (scalarSamples.isEmpty()) {
+ eventsDiscarded.incrementAndGet();
+ return;
+ }
+
+ final SourceSamplesForTimestamp sourceSamples = new SourceSamplesForTimestamp(sourceId, eventType, eventTimestamp, scalarSamples);
+ if (!replaying.get()) {
+ // Start by saving locally the samples
+ backingBuffer.append(sourceSamples);
+ }
+ // Then add them to the in-memory accumulator
+ processSamples(sourceSamples);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public TimelineSourceEventAccumulator getOrAddSourceEventAccumulator(final int sourceId, final int categoryId, final DateTime firstSampleTime) {
+ return this.getOrAddSourceEventAccumulator(sourceId, categoryId, firstSampleTime, (int) config.getTimelineLength().getMillis());
+ }
+
+ public synchronized TimelineSourceEventAccumulator getOrAddSourceEventAccumulator(final int sourceId, final int categoryId, final DateTime firstSampleTime, final int timelineLengthMillis) {
+ SourceAccumulatorsAndUpdateDate sourceAccumulatorsAndUpdateDate = accumulators.get(sourceId);
+ if (sourceAccumulatorsAndUpdateDate == null) {
+ addedSourceEventAccumulatorMapCount.incrementAndGet();
+ sourceAccumulatorsAndUpdateDate = new SourceAccumulatorsAndUpdateDate(new HashMap<Integer, TimelineSourceEventAccumulator>(), new DateTime());
+ accumulators.put(sourceId, sourceAccumulatorsAndUpdateDate);
+ }
+ sourceAccumulatorsAndUpdateDate.markUpdated();
+ final Map<Integer, TimelineSourceEventAccumulator> hostCategoryAccumulators = sourceAccumulatorsAndUpdateDate.getCategoryAccumulators();
+ TimelineSourceEventAccumulator accumulator = hostCategoryAccumulators.get(categoryId);
+ if (accumulator == null) {
+ addedSourceEventAccumulatorCount.incrementAndGet();
+ accumulator = new TimelineSourceEventAccumulator(timelineDAO, timelineCoder, sampleCoder, backgroundWriter, sourceId, categoryId, firstSampleTime, timelineLengthMillis);
+ hostCategoryAccumulators.put(categoryId, accumulator);
+ log.debug("Created new Timeline for sourceId [{}] and category [{}]", sourceId, categoryId);
+ }
+ return accumulator;
+ }
+
+ @VisibleForTesting
+ public void processSamples(final SourceSamplesForTimestamp hostSamples) throws ExecutionException, IOException {
+ final int sourceId = hostSamples.getSourceId();
+ final String category = hostSamples.getCategory();
+ final int categoryId = timelineDAO.getEventCategoryId(category);
+ final DateTime timestamp = hostSamples.getTimestamp();
+ final TimelineSourceEventAccumulator accumulator = getOrAddSourceEventAccumulator(sourceId, categoryId, timestamp);
+ accumulator.addSourceSamples(hostSamples);
+ }
+
+ public Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, @Nullable final DateTime filterStartTime, @Nullable final DateTime filterEndTime) throws IOException, ExecutionException {
+ return getInMemoryTimelineChunks(sourceId, ImmutableList.copyOf(timelineDAO.getMetricIdsBySourceId(sourceId)), filterStartTime, filterEndTime);
+ }
+
+ public Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, final Integer metricId, @Nullable final DateTime filterStartTime, @Nullable final DateTime filterEndTime) throws IOException, ExecutionException {
+ return getInMemoryTimelineChunks(sourceId, ImmutableList.<Integer>of(metricId), filterStartTime, filterEndTime);
+ }
+
+ public synchronized Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, final List<Integer> metricIds, @Nullable final DateTime filterStartTime, @Nullable final DateTime filterEndTime) throws IOException, ExecutionException {
+ getInMemoryChunksCallCount.incrementAndGet();
+ // Check first if there is an in-memory accumulator for this host
+ final SourceAccumulatorsAndUpdateDate sourceAccumulatorsAndDate = accumulators.get(sourceId);
+ if (sourceAccumulatorsAndDate == null) {
+ return ImmutableList.of();
+ }
+
+ // Now, filter each accumulator for this host
+ final List<TimelineChunk> samplesBySourceName = new ArrayList<TimelineChunk>();
+ for (final TimelineSourceEventAccumulator accumulator : sourceAccumulatorsAndDate.getCategoryAccumulators().values()) {
+ for (final TimelineChunk chunk : accumulator.getPendingTimelineChunks()) {
+ if ((filterStartTime != null && chunk.getEndTime().isBefore(filterStartTime)) ||
+ (filterEndTime != null && chunk.getStartTime().isAfter(filterEndTime)) ||
+ !metricIds.contains(chunk.getMetricId())) {
+ continue;
+ } else {
+ samplesBySourceName.add(chunk);
+ }
+ }
+ final List<DateTime> accumulatorTimes = accumulator.getTimes();
+ if (accumulatorTimes.size() == 0) {
+ continue;
+ }
+ final DateTime accumulatorStartTime = accumulator.getStartTime();
+ final DateTime accumulatorEndTime = accumulator.getEndTime();
+
+ // Check if the time filters apply
+ if ((filterStartTime != null && accumulatorEndTime.isBefore(filterStartTime)) || (filterEndTime != null && accumulatorStartTime.isAfter(filterEndTime))) {
+ // Ignore this accumulator
+ continue;
+ }
+
+ // This accumulator is in the right time range, now return only the sample kinds specified
+ final byte[] timeBytes = timelineCoder.compressDateTimes(accumulatorTimes);
+ for (final TimelineChunkAccumulator chunkAccumulator : accumulator.getTimelines().values()) {
+ if (metricIds.contains(chunkAccumulator.getMetricId())) {
+ // Extract the timeline for this chunk by copying it and reading encoded bytes
+ accumulatorDeepCopyCount.incrementAndGet();
+ final TimelineChunkAccumulator chunkAccumulatorCopy = chunkAccumulator.deepCopy();
+ final TimelineChunk timelineChunk = chunkAccumulatorCopy.extractTimelineChunkAndReset(accumulatorStartTime, accumulatorEndTime, timeBytes);
+ samplesBySourceName.add(timelineChunk);
+ }
+ }
+ }
+ inMemoryChunksReturnedCount.addAndGet(samplesBySourceName.size());
+ Collections.sort(samplesBySourceName, CHUNK_COMPARATOR);
+ return samplesBySourceName;
+ }
+
+ @VisibleForTesting
+ void convertSamplesToScalarSamples(final Integer sourceId, final String eventType, final Map<String, Object> inputSamples, final Map<Integer, ScalarSample> outputSamples) {
+ if (inputSamples == null) {
+ return;
+ }
+ final Integer eventCategoryId = timelineDAO.getOrAddEventCategory(eventType);
+
+ for (final String attributeName : inputSamples.keySet()) {
+ final Integer metricId = timelineDAO.getOrAddMetric(sourceId, eventCategoryId, attributeName);
+ final Object sample = inputSamples.get(attributeName);
+
+ outputSamples.put(metricId, ScalarSample.fromObject(sample));
+ }
+ }
+
+ public void replay(final String spoolDir) {
+ replayCount.incrementAndGet();
+ log.info("Starting replay of files in {}", spoolDir);
+ final Replayer replayer = new Replayer(spoolDir);
+ StartTimes lastStartTimes = null;
+ if (shutdownSaveMode == ShutdownSaveMode.SAVE_START_TIMES) {
+ lastStartTimes = timelineDAO.getLastStartTimes();
+ if (lastStartTimes == null) {
+ log.info("Did not find startTimes");
+ } else {
+ log.info("Retrieved startTimes from the db");
+ }
+ }
+ final StartTimes startTimes = lastStartTimes;
+ final DateTime minStartTime = lastStartTimes == null ? null : startTimes.getMinStartTime();
+ final long found = replaySamplesFoundCount.get();
+ final long outsideTimeRange = replaySamplesOutsideTimeRangeCount.get();
+ final long processed = replaySamplesProcessedCount.get();
+
+ try {
+ // Read all files in the spool directory and delete them after process, if
+ // startTimes is null.
+ replaying.set(true);
+ final int filesSkipped = replayer.readAll(startTimes == null, minStartTime, new Function<SourceSamplesForTimestamp, Void>() {
+ @Override
+ public Void apply(@Nullable final SourceSamplesForTimestamp hostSamples) {
+ if (hostSamples != null) {
+ replaySamplesFoundCount.incrementAndGet();
+ boolean useSamples = true;
+ try {
+ final int sourceId = hostSamples.getSourceId();
+ final String category = hostSamples.getCategory();
+ final int categoryId = timelineDAO.getEventCategoryId(category);
+ // If startTimes is non-null and the samples come from before the first time for
+ // the given host and event category, ignore the samples
+ if (startTimes != null) {
+ final DateTime timestamp = hostSamples.getTimestamp();
+ final DateTime categoryStartTime = startTimes.getStartTimeForSourceIdAndCategoryId(sourceId, categoryId);
+ if (timestamp == null ||
+ timestamp.isBefore(startTimes.getMinStartTime()) ||
+ (categoryStartTime != null && timestamp.isBefore(categoryStartTime))) {
+ replaySamplesOutsideTimeRangeCount.incrementAndGet();
+ useSamples = false;
+ }
+ }
+ if (useSamples) {
+ replaySamplesProcessedCount.incrementAndGet();
+ processSamples(hostSamples);
+ }
+ } catch (Exception e) {
+ log.warn("Got exception replaying sample, data potentially lost! {}", hostSamples.toString());
+ }
+ }
+
+ return null;
+ }
+ });
+ if (shutdownSaveMode == ShutdownSaveMode.SAVE_START_TIMES) {
+ timelineDAO.deleteLastStartTimes();
+ log.info("Deleted old startTimes");
+ }
+ log.info(String.format("Replay completed; %d files skipped, samples read %d, samples outside time range %d, samples used %d",
+ filesSkipped, replaySamplesFoundCount.get() - found, replaySamplesOutsideTimeRangeCount.get() - outsideTimeRange, replaySamplesProcessedCount.get() - processed));
+ } catch (RuntimeException e) {
+ // Catch the exception to make the collector start properly
+ log.error("Ignoring error when replaying the data", e);
+ } finally {
+ replaying.set(false);
+ }
+ }
+
+ public void forceCommit() {
+ forceCommitCallCount.incrementAndGet();
+ saveAccumulators();
+ backingBuffer.discard();
+ log.info("Timelines committed");
+ }
+
+ public void commitAndShutdown() {
+ shuttingDown.set(true);
+ final boolean doingFastShutdown = shutdownSaveMode == ShutdownSaveMode.SAVE_START_TIMES;
+ if (doingFastShutdown) {
+ final StartTimes startTimes = new StartTimes();
+ saveStartTimes(startTimes);
+ timelineDAO.insertLastStartTimes(startTimes);
+ log.info("During shutdown, saved timeline start times in the db");
+ } else {
+ saveAccumulators();
+ log.info("During shutdown, saved timeline accumulators");
+ }
+ performShutdown();
+ backingBuffer.discard();
+ }
+
+ private void performShutdown() {
+ backgroundWriter.initiateShutdown();
+ while (!backgroundWriter.getShutdownFinished()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ purgeThread.shutdown();
+ }
+
+ private synchronized void purgeFilesAndAccumulators() {
+ this.purgeFilesAndAccumulators(new DateTime().minus(config.getTimelineLength().getMillis()), new DateTime().minus(2 * config.getTimelineLength().getMillis()));
+ }
+
+ // TODO: We have a bad interaction between startTimes and purging: If the system is down
+ // for two hours, we may not want it to purge everything. Figure out what to do about this.
+ private synchronized void purgeFilesAndAccumulators(final DateTime purgeAccumulatorsIfBefore, final DateTime purgeFilesIfBefore) {
+ purgeOldSourcesAndAccumulators(purgeAccumulatorsIfBefore);
+ final Replayer replayer = new Replayer(config.getSpoolDir());
+ replayer.purgeOldFiles(purgeFilesIfBefore);
+ }
+
+ public void startHandlerThreads() {
+ purgeThread.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ purgeFilesAndAccumulators();
+ }
+ },
+ config.getTimelineLength().getMillis(),
+ config.getTimelineLength().getMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ // We use the lastUpdateDate to purge sources and their accumulators from the map
+ private static class SourceAccumulatorsAndUpdateDate {
+
+ private final Map<Integer, TimelineSourceEventAccumulator> categoryAccumulators;
+ private DateTime lastUpdateDate;
+
+ public SourceAccumulatorsAndUpdateDate(final Map<Integer, TimelineSourceEventAccumulator> categoryAccumulators, final DateTime lastUpdateDate) {
+ this.categoryAccumulators = categoryAccumulators;
+ this.lastUpdateDate = lastUpdateDate;
+ }
+
+ public Map<Integer, TimelineSourceEventAccumulator> getCategoryAccumulators() {
+ return categoryAccumulators;
+ }
+
+ public DateTime getLastUpdateDate() {
+ return lastUpdateDate;
+ }
+
+ public void markUpdated() {
+ lastUpdateDate = new DateTime();
+ }
+ }
+
+ @VisibleForTesting
+ public Collection<TimelineSourceEventAccumulator> getAccumulators() {
+ final List<TimelineSourceEventAccumulator> inMemoryAccumulator = new ArrayList<TimelineSourceEventAccumulator>();
+ for (final SourceAccumulatorsAndUpdateDate sourceEventAccumulatorMap : accumulators.values()) {
+ inMemoryAccumulator.addAll(sourceEventAccumulatorMap.getCategoryAccumulators().values());
+ }
+
+ return inMemoryAccumulator;
+ }
+
+ @VisibleForTesting
+ public FileBackedBuffer getBackingBuffer() {
+ return backingBuffer;
+ }
+
+ public long getEventsDiscarded() {
+ return eventsDiscarded.get();
+ }
+
+ public long getSourceEventAccumulatorCount() {
+ return accumulators.size();
+ }
+
+ public long getEventsReceivedAfterShuttingDown() {
+ return eventsReceivedAfterShuttingDown.get();
+ }
+
+ public long getHandledEventCount() {
+ return handledEventCount.get();
+ }
+
+ public long getAddedSourceEventAccumulatorMapCount() {
+ return addedSourceEventAccumulatorMapCount.get();
+ }
+
+ public long getAddedSourceEventAccumulatorCount() {
+ return addedSourceEventAccumulatorCount.get();
+ }
+
+ public long getGetInMemoryChunksCallCount() {
+ return getInMemoryChunksCallCount.get();
+ }
+
+ public long getAccumulatorDeepCopyCount() {
+ return accumulatorDeepCopyCount.get();
+ }
+
+ public long getInMemoryChunksReturnedCount() {
+ return inMemoryChunksReturnedCount.get();
+ }
+
+ public long getReplayCount() {
+ return replayCount.get();
+ }
+
+ public long getReplaySamplesFoundCount() {
+ return replaySamplesFoundCount.get();
+ }
+
+ public long getReplaySamplesOutsideTimeRangeCount() {
+ return replaySamplesOutsideTimeRangeCount.get();
+ }
+
+ public long getReplaySamplesProcessedCount() {
+ return replaySamplesProcessedCount.get();
+ }
+
+ public long getForceCommitCallCount() {
+ return forceCommitCallCount.get();
+ }
+
+ public long getPurgedAccumsBecauseSourceNotUpdated() {
+ return purgedAccumsBecauseSourceNotUpdated.get();
+ }
+
+ public long getPurgedAccumsBecauseCategoryNotUpdated() {
+ return purgedAccumsBecauseCategoryNotUpdated.get();
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/TimelineSourceEventAccumulator.java b/usage/src/main/java/com/ning/billing/usage/timeline/TimelineSourceEventAccumulator.java
new file mode 100644
index 0000000..d352dd9
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/TimelineSourceEventAccumulator.java
@@ -0,0 +1,314 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.codec.SampleCoder;
+import com.ning.billing.usage.timeline.codec.TimelineChunkAccumulator;
+import com.ning.billing.usage.timeline.persistent.TimelineDao;
+import com.ning.billing.usage.timeline.samples.NullSample;
+import com.ning.billing.usage.timeline.samples.RepeatSample;
+import com.ning.billing.usage.timeline.samples.ScalarSample;
+import com.ning.billing.usage.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.usage.timeline.times.TimelineCoder;
+
+/**
+ * This class represents a collection of timeline chunks, one for each
+ * metric belonging to one event category, each over a specific time period,
+ * from a single source. This class is used to accumulate samples
+ * to be written to the database; a separate streaming class with
+ * much less overhead is used to "play back" the samples read from
+ * the db in response to queries.
+ * <p/>
+ * All subordinate timelines contain the same number of samples.
+ * <p/>
+ * When enough samples have accumulated, typically one hour's worth,
+ * in-memory samples are made into TimelineChunks, one chunk for each metricId
+ * maintained by the accumulator.
+ * <p/>
+ * These new chunks are organized as PendingChunkMaps, kept in a local list and also
+ * handed off to a PendingChunkMapConsumer to written to the db by a background process. At some
+ * in the future, that background process will call markPendingChunkMapConsumed(),
+ * passing the id of a PendingChunkMap. This causes the PendingChunkMap
+ * to be removed from the local list maintained by the TimelineSourceEventAccumulator.
+ * <p/>
+ * Queries that cause the TimelineSourceEventAccumulator instance to return memory
+ * chunks also return any chunks in PendingChunkMaps in the local list of pending chunks.
+ */
+public class TimelineSourceEventAccumulator {
+
+ private static final Logger log = LoggerFactory.getLogger(TimelineSourceEventAccumulator.class);
+ private static final DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime();
+ private static final NullSample nullSample = new NullSample();
+ private static final boolean checkEveryAccess = Boolean.parseBoolean(System.getProperty("killbill.usage.checkEveryAccess"));
+ private static final Random rand = new Random(0);
+
+ private final Map<Integer, SampleSequenceNumber> metricIdCounters = new HashMap<Integer, SampleSequenceNumber>();
+ private final List<PendingChunkMap> pendingChunkMaps = new ArrayList<PendingChunkMap>();
+ private long pendingChunkMapIdCounter = 1;
+
+ private final BackgroundDBChunkWriter backgroundWriter;
+ private final TimelineCoder timelineCoder;
+ private final SampleCoder sampleCoder;
+ private final Integer timelineLengthMillis;
+ private final int sourceId;
+ private final int eventCategoryId;
+ // This is the time when we want to end the chunk. Setting the value randomly
+ // when the TimelineSourceEventAccumulator is created provides a mechanism to
+ // distribute the db writes
+ private DateTime chunkEndTime = null;
+ private DateTime startTime = null;
+ private DateTime endTime = null;
+ private DateTime latestSampleAddTime;
+ private long sampleSequenceNumber = 0;
+ private int sampleCount = 0;
+
+ /**
+ * Maps the sample kind id to the accumulator for that sample kind
+ */
+ private final Map<Integer, TimelineChunkAccumulator> timelines = new ConcurrentHashMap<Integer, TimelineChunkAccumulator>();
+
+ /**
+ * Holds the sampling times of the samples
+ */
+ private final List<DateTime> times = new ArrayList<DateTime>();
+
+ public TimelineSourceEventAccumulator(final TimelineDao dao, final TimelineCoder timelineCoder, final SampleCoder sampleCoder,
+ final BackgroundDBChunkWriter backgroundWriter, final int sourceId, final int eventCategoryId,
+ final DateTime firstSampleTime, final Integer timelineLengthMillis) {
+ this.timelineLengthMillis = timelineLengthMillis;
+ this.backgroundWriter = backgroundWriter;
+ this.timelineCoder = timelineCoder;
+ this.sampleCoder = sampleCoder;
+ this.sourceId = sourceId;
+ this.eventCategoryId = eventCategoryId;
+ // Set the end-of-chunk time by tossing a random number, to evenly distribute the db writeback load.
+ this.chunkEndTime = timelineLengthMillis != null ? firstSampleTime.plusMillis(rand.nextInt(timelineLengthMillis)) : null;
+ }
+
+ /*
+ * This constructor is used for testing; it writes chunks as soon as they are
+ * created, but because the chunkEndTime is way in the future, doesn't initiate
+ * chunk writes.
+ */
+ public TimelineSourceEventAccumulator(final TimelineDao timelineDAO, final TimelineCoder timelineCoder, final SampleCoder sampleCoder,
+ final Integer sourceId, final int eventTypeId, final DateTime firstSampleTime) {
+ this(timelineDAO, timelineCoder, sampleCoder, new BackgroundDBChunkWriter(timelineDAO, null, true), sourceId, eventTypeId, firstSampleTime, Integer.MAX_VALUE);
+ }
+
+ @SuppressWarnings("unchecked")
+ // TODO - we can probably do better than synchronize the whole method
+ public synchronized void addSourceSamples(final SourceSamplesForTimestamp samples) {
+ final DateTime timestamp = samples.getTimestamp();
+
+ if (chunkEndTime != null && chunkEndTime.isBefore(timestamp)) {
+ extractAndQueueTimelineChunks();
+ startTime = timestamp;
+ chunkEndTime = timestamp.plusMillis(timelineLengthMillis);
+ }
+
+ if (startTime == null) {
+ startTime = timestamp;
+ }
+ if (endTime == null) {
+ endTime = timestamp;
+ } else if (!timestamp.isAfter(endTime)) {
+ log.warn("Adding samples for host {}, timestamp {} is not after the end time {}; ignored",
+ new Object[]{sourceId, dateFormatter.print(timestamp), dateFormatter.print(endTime)});
+ return;
+ }
+ sampleSequenceNumber++;
+ latestSampleAddTime = new DateTime();
+ for (final Map.Entry<Integer, ScalarSample> entry : samples.getSamples().entrySet()) {
+ final Integer metricId = entry.getKey();
+ final SampleSequenceNumber counter = metricIdCounters.get(metricId);
+ if (counter != null) {
+ counter.setSequenceNumber(sampleSequenceNumber);
+ } else {
+ metricIdCounters.put(metricId, new SampleSequenceNumber(sampleSequenceNumber));
+ }
+ final ScalarSample sample = entry.getValue();
+ TimelineChunkAccumulator timeline = timelines.get(metricId);
+ if (timeline == null) {
+ timeline = new TimelineChunkAccumulator(sourceId, metricId, sampleCoder);
+ if (sampleCount > 0) {
+ addPlaceholders(timeline, sampleCount);
+ }
+ timelines.put(metricId, timeline);
+ }
+ final ScalarSample compressedSample = sampleCoder.compressSample(sample);
+ timeline.addSample(compressedSample);
+ }
+ for (final Map.Entry<Integer, SampleSequenceNumber> entry : metricIdCounters.entrySet()) {
+ final SampleSequenceNumber counter = entry.getValue();
+ if (counter.getSequenceNumber() < sampleSequenceNumber) {
+ counter.setSequenceNumber(sampleSequenceNumber);
+ final int metricId = entry.getKey();
+ final TimelineChunkAccumulator timeline = timelines.get(metricId);
+ timeline.addSample(nullSample);
+ }
+ }
+ // Now we can update the state
+ endTime = timestamp;
+ sampleCount++;
+ times.add(timestamp);
+
+ if (checkEveryAccess) {
+ checkSampleCounts(sampleCount);
+ }
+ }
+
+ private void addPlaceholders(final TimelineChunkAccumulator timeline, int countToAdd) {
+ final int maxRepeatSamples = RepeatSample.MAX_SHORT_REPEAT_COUNT;
+ while (countToAdd >= maxRepeatSamples) {
+ timeline.addPlaceholder((byte) maxRepeatSamples);
+ countToAdd -= maxRepeatSamples;
+ }
+ if (countToAdd > 0) {
+ timeline.addPlaceholder((byte) countToAdd);
+ }
+ }
+
+ /**
+ * This method queues a map of TimelineChunks extracted from the TimelineChunkAccumulators
+ * to be written to the db. When memory chunks are requested, any queued chunk will be included
+ * in the list.
+ */
+ public synchronized void extractAndQueueTimelineChunks() {
+ if (times.size() > 0) {
+ final Map<Integer, TimelineChunk> chunkMap = new HashMap<Integer, TimelineChunk>();
+ final byte[] timeBytes = timelineCoder.compressDateTimes(times);
+ for (final Map.Entry<Integer, TimelineChunkAccumulator> entry : timelines.entrySet()) {
+ final int metricId = entry.getKey();
+ final TimelineChunkAccumulator accumulator = entry.getValue();
+ final TimelineChunk chunk = accumulator.extractTimelineChunkAndReset(startTime, endTime, timeBytes);
+ chunkMap.put(metricId, chunk);
+ }
+ times.clear();
+ sampleCount = 0;
+ final long counter = pendingChunkMapIdCounter++;
+ final PendingChunkMap newChunkMap = new PendingChunkMap(this, counter, chunkMap);
+ pendingChunkMaps.add(newChunkMap);
+ backgroundWriter.addPendingChunkMap(newChunkMap);
+ }
+ }
+
+ public synchronized void markPendingChunkMapConsumed(final long pendingChunkMapId) {
+ final PendingChunkMap pendingChunkMap = pendingChunkMaps.size() > 0 ? pendingChunkMaps.get(0) : null;
+ if (pendingChunkMap == null) {
+ log.error("In TimelineSourceEventAccumulator.markPendingChunkMapConsumed(), could not find the map for {}", pendingChunkMapId);
+ } else if (pendingChunkMapId != pendingChunkMap.getPendingChunkMapId()) {
+ log.error("In TimelineSourceEventAccumulator.markPendingChunkMapConsumed(), the next map has id {}, but we're consuming id {}",
+ pendingChunkMap.getPendingChunkMapId(), pendingChunkMapId);
+ } else {
+ pendingChunkMaps.remove(0);
+ }
+ }
+
+ public synchronized List<TimelineChunk> getPendingTimelineChunks() {
+ final List<TimelineChunk> timelineChunks = new ArrayList<TimelineChunk>();
+ for (final PendingChunkMap pendingChunkMap : pendingChunkMaps) {
+ timelineChunks.addAll(pendingChunkMap.getChunkMap().values());
+ }
+ return timelineChunks;
+ }
+
+ /**
+ * Make sure all timelines have the sample count passed in; otherwise log
+ * discrepancies and return false
+ *
+ * @param assertedCount The sample count that all timelines are supposed to have
+ * @return true if all timelines have the right count; false otherwise
+ */
+ public boolean checkSampleCounts(final int assertedCount) {
+ boolean success = true;
+ if (assertedCount != sampleCount) {
+ log.error("For host {}, start time {}, the SourceTimeLines sampleCount {} is not equal to the assertedCount {}",
+ new Object[]{sourceId, dateFormatter.print(startTime), sampleCount, assertedCount});
+ success = false;
+ }
+ for (final Map.Entry<Integer, TimelineChunkAccumulator> entry : timelines.entrySet()) {
+ final int metricId = entry.getKey();
+ final TimelineChunkAccumulator timeline = entry.getValue();
+ final int lineSampleCount = timeline.getSampleCount();
+ if (lineSampleCount != assertedCount) {
+ log.error("For host {}, start time {}, sample kind id {}, the sampleCount {} is not equal to the assertedCount {}",
+ new Object[]{sourceId, dateFormatter.print(startTime), metricId, lineSampleCount, assertedCount});
+ success = false;
+ }
+ }
+ return success;
+ }
+
+ public int getSourceId() {
+ return sourceId;
+ }
+
+ public int getEventCategoryId() {
+ return eventCategoryId;
+ }
+
+ public DateTime getStartTime() {
+ return startTime;
+ }
+
+ public DateTime getEndTime() {
+ return endTime;
+ }
+
+ public Map<Integer, TimelineChunkAccumulator> getTimelines() {
+ return timelines;
+ }
+
+ public List<DateTime> getTimes() {
+ return times;
+ }
+
+ public DateTime getLatestSampleAddTime() {
+ return latestSampleAddTime;
+ }
+
+ private static class SampleSequenceNumber {
+
+ private long sequenceNumber;
+
+ public SampleSequenceNumber(final long sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public void setSequenceNumber(final long sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+ }
+}