killbill-aplcache
Changes
meter/src/main/resources/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.sql.stg 94(+47 -47)
meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg 18(+0 -18)
meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java 162(+162 -0)
meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java 19(+2 -17)
meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestFileBackedBuffer.java 147(+147 -0)
meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestSamplesReplayer.java 116(+116 -0)
Details
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java b/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java
index db7c77e..451e0ac 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java
@@ -37,17 +37,17 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ning.billing.util.config.MeterConfig;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.chunks.TimelineChunkMapper;
import com.ning.billing.meter.timeline.codec.SampleCoder;
import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
-import com.ning.billing.meter.timeline.persistent.DefaultTimelineDao;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
import com.ning.billing.meter.timeline.times.TimelineCoder;
import com.ning.billing.util.callcontext.CallOrigin;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.config.MeterConfig;
import com.google.inject.Inject;
@@ -61,7 +61,7 @@ public class TimelineAggregator {
private static final Logger log = LoggerFactory.getLogger(TimelineAggregator.class);
private final IDBI dbi;
- private final DefaultTimelineDao timelineDao;
+ private final TimelineDao timelineDao;
private final TimelineCoder timelineCoder;
private final SampleCoder sampleCoder;
private final MeterConfig config;
@@ -94,7 +94,7 @@ public class TimelineAggregator {
private final List<Long> chunkIdsToInvalidateOrDelete = new ArrayList<Long>();
@Inject
- public TimelineAggregator(final IDBI dbi, final DefaultTimelineDao timelineDao, final TimelineCoder timelineCoder,
+ public TimelineAggregator(final IDBI dbi, final TimelineDao timelineDao, final TimelineCoder timelineCoder,
final SampleCoder sampleCoder, final MeterConfig config, final InternalCallContextFactory internalCallContextFactory) {
this.dbi = dbi;
this.timelineDao = timelineDao;
@@ -295,7 +295,8 @@ public class TimelineAggregator {
public Void withHandle(final Handle handle) throws Exception {
final Query<Map<String, Object>> query = handle.createQuery("getStreamingAggregationCandidates")
.setFetchSize(Integer.MIN_VALUE)
- .bind("aggregationLevel", aggregationLevel);
+ .bind("aggregationLevel", aggregationLevel)
+ .bind("tenantRecordId", createCallContext().getTenantRecordId());
query.setStatementLocator(new StringTemplate3StatementLocator(TimelineAggregatorSqlDao.class));
ResultIterator<TimelineChunk> iterator = null;
try {
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
index 2698a91..3c4531e 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/consumer/CSVConsumer.java
@@ -36,6 +36,10 @@ public class CSVConsumer {
return rangeSampleProcessor.getSampleConsumer().toString();
}
+ public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk) throws IOException {
+ return getSamplesAsCSV(sampleCoder, chunk, null, null);
+ }
+
public static String getSamplesAsCSV(final SampleCoder sampleCoder, final TimelineChunk chunk, @Nullable final DateTime startTime, @Nullable final DateTime endTime) throws IOException {
final CSVOutputProcessor processor = new CSVOutputProcessor(startTime, endTime);
sampleCoder.scan(chunk, processor);
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java
index 158cfaf..f0231ab 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java
@@ -16,11 +16,7 @@
package com.ning.billing.meter.timeline.persistent;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import javax.annotation.Nullable;
@@ -34,19 +30,16 @@ import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
import com.ning.billing.meter.timeline.shutdown.StartTimes;
-import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.google.common.collect.BiMap;
-import com.google.common.collect.ImmutableList;
public class CachingTimelineDao implements TimelineDao {
private static final Logger log = LoggerFactory.getLogger(CachingTimelineDao.class);
private final BiMap<Integer, String> sourcesCache;
- private final Map<Integer, Set<Integer>> sourceIdsMetricIdsCache;
private final BiMap<Integer, CategoryRecordIdAndMetric> metricsCache;
private final BiMap<Integer, String> eventCategoriesCache;
@@ -59,17 +52,6 @@ public class CachingTimelineDao implements TimelineDao {
sourcesCache = delegate.getSources(context);
metricsCache = delegate.getMetrics(context);
eventCategoriesCache = delegate.getEventCategories(context);
- sourceIdsMetricIdsCache = new HashMap<Integer, Set<Integer>>();
- for (final SourceRecordIdAndMetricRecordId both : delegate.getMetricIdsForAllSources(context)) {
- final int sourceId = both.getSourceId();
- final int metricId = both.getMetricId();
- Set<Integer> metricIds = sourceIdsMetricIdsCache.get(sourceId);
- if (metricIds == null) {
- metricIds = new HashSet<Integer>();
- sourceIdsMetricIdsCache.put(sourceId, metricIds);
- }
- metricIds.add(metricId);
- }
}
@Override
@@ -139,32 +121,15 @@ public class CachingTimelineDao implements TimelineDao {
}
@Override
- public synchronized int getOrAddMetric(final Integer sourceId, final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ public synchronized int getOrAddMetric(final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
final CategoryRecordIdAndMetric categoryRecordIdAndMetric = new CategoryRecordIdAndMetric(eventCategoryId, metric);
Integer metricId = metricsCache.inverse().get(categoryRecordIdAndMetric);
if (metricId == null) {
- metricId = delegate.getOrAddMetric(sourceId, eventCategoryId, metric, context);
+ metricId = delegate.getOrAddMetric(eventCategoryId, metric, context);
metricsCache.put(metricId, categoryRecordIdAndMetric);
}
- if (sourceId != null) {
- Set<Integer> metricIds = sourceIdsMetricIdsCache.get(sourceId);
- if (metricIds == null) {
- metricIds = new HashSet<Integer>();
- sourceIdsMetricIdsCache.put(sourceId, metricIds);
- }
- metricIds.add(metricId);
- }
- return metricId;
- }
- @Override
- public Iterable<Integer> getMetricIdsBySourceId(final Integer sourceId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
- return ImmutableList.copyOf(sourceIdsMetricIdsCache.get(sourceId));
- }
-
- @Override
- public Iterable<SourceRecordIdAndMetricRecordId> getMetricIdsForAllSources(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
- return delegate.getMetricIdsForAllSources(context);
+ return metricId;
}
@Override
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
index c607a8f..44658fa 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
@@ -143,7 +143,7 @@ public class DefaultTimelineDao implements TimelineDao {
}
@Override
- public synchronized int getOrAddMetric(final Integer sourceId, final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ public synchronized int getOrAddMetric(final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
delegate.begin();
delegate.addMetric(eventCategoryId, metric, context);
final Integer metricId = delegate.getMetricRecordId(eventCategoryId, metric, context);
@@ -153,16 +153,6 @@ public class DefaultTimelineDao implements TimelineDao {
}
@Override
- public Iterable<Integer> getMetricIdsBySourceId(final Integer sourceId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
- return delegate.getMetricRecordIdsBySourceRecordId(sourceId, context);
- }
-
- @Override
- public Iterable<SourceRecordIdAndMetricRecordId> getMetricIdsForAllSources(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
- return delegate.getMetricRecordIdsForAllSources(context);
- }
-
- @Override
public Long insertTimelineChunk(final TimelineChunk timelineChunk, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
delegate.begin();
delegate.insertTimelineChunk(timelineChunk, context);
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/FileBackedBuffer.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/FileBackedBuffer.java
index 590fda1..d7985fb 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/FileBackedBuffer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/FileBackedBuffer.java
@@ -26,8 +26,10 @@ import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
import com.fasterxml.util.membuf.MemBuffersForBytes;
import com.fasterxml.util.membuf.StreamyBytesMemBuffer;
import com.google.common.annotations.VisibleForTesting;
@@ -40,7 +42,7 @@ public class FileBackedBuffer {
private static final Logger log = LoggerFactory.getLogger(FileBackedBuffer.class);
private static final SmileFactory smileFactory = new SmileFactory();
- private static final ObjectMapper smileObjectMapper = new ObjectMapper(smileFactory);
+ private final ObjectMapper smileObjectMapper;
static {
// Disable all magic for now as we don't write the Smile header (we share the same smileGenerator
@@ -68,6 +70,10 @@ public class FileBackedBuffer {
this.prefix = prefix;
this.deleteFilesOnClose = deleteFilesOnClose;
+ smileObjectMapper = new ObjectMapper(smileFactory);
+ smileObjectMapper.registerModule(new JodaModule());
+ smileObjectMapper.enable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+
final MemBuffersForBytes bufs = new MemBuffersForBytes(segmentsSize, 1, maxNbSegments);
inputBuffer = bufs.createStreamyBuffer(1, maxNbSegments);
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java
index ecf156a..486861f 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/Replayer.java
@@ -35,8 +35,10 @@ import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileParser;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
@@ -50,7 +52,8 @@ public class Replayer {
static {
smileFactory.configure(SmileParser.Feature.REQUIRE_HEADER, false);
- smileFactory.setCodec(smileMapper);
+ smileMapper.registerModule(new JodaModule());
+ smileMapper.enable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
@VisibleForTesting
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java
index cca7e7f..c82183a 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java
@@ -28,7 +28,6 @@ import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
import com.ning.billing.meter.timeline.shutdown.StartTimes;
-import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
@@ -64,11 +63,7 @@ public interface TimelineDao {
BiMap<Integer, CategoryRecordIdAndMetric> getMetrics(InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
- int getOrAddMetric(Integer sourceId, Integer eventCategoryId, String metric, InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException;
-
- Iterable<Integer> getMetricIdsBySourceId(Integer sourceId, InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
-
- Iterable<SourceRecordIdAndMetricRecordId> getMetricIdsForAllSources(InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
+ int getOrAddMetric(Integer eventCategoryId, String metric, InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException;
// Timelines tables
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
index c376b72..805f068 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
@@ -87,9 +87,6 @@ public interface TimelineSqlDao extends Transactional<TimelineSqlDao> {
@InternalTenantContextBinder final InternalTenantContext context);
@SqlQuery
- Iterable<SourceRecordIdAndMetricRecordId> getMetricRecordIdsForAllSources(@InternalTenantContextBinder final InternalTenantContext context);
-
- @SqlQuery
CategoryRecordIdAndMetric getCategoryRecordIdAndMetric(@Bind("recordId") final Integer metricRecordId,
@InternalTenantContextBinder final InternalTenantContext context);
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
index 3735f9d..0d4349e 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineEventHandler.java
@@ -221,7 +221,7 @@ public class TimelineEventHandler {
// Extract and parse samples
final Map<Integer, ScalarSample> scalarSamples = new LinkedHashMap<Integer, ScalarSample>();
- convertSamplesToScalarSamples(sourceId, eventType, samples, scalarSamples, context);
+ convertSamplesToScalarSamples(eventType, samples, scalarSamples, context);
if (scalarSamples.isEmpty()) {
eventsDiscarded.incrementAndGet();
@@ -275,16 +275,17 @@ public class TimelineEventHandler {
public Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, @Nullable final DateTime filterStartTime,
@Nullable final DateTime filterEndTime, final InternalTenantContext context) throws IOException, ExecutionException {
- return getInMemoryTimelineChunks(sourceId, ImmutableList.copyOf(timelineDAO.getMetricIdsBySourceId(sourceId, context)), filterStartTime, filterEndTime);
+ return getInMemoryTimelineChunks(sourceId, ImmutableList.copyOf(timelineDAO.getMetrics(context).keySet()), filterStartTime, filterEndTime, context);
}
public Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, final Integer metricId, @Nullable final DateTime filterStartTime,
- @Nullable final DateTime filterEndTime) throws IOException, ExecutionException {
- return getInMemoryTimelineChunks(sourceId, ImmutableList.<Integer>of(metricId), filterStartTime, filterEndTime);
+ @Nullable final DateTime filterEndTime, final InternalTenantContext context) throws IOException, ExecutionException {
+ return getInMemoryTimelineChunks(sourceId, ImmutableList.<Integer>of(metricId), filterStartTime, filterEndTime, context);
}
public synchronized Collection<? extends TimelineChunk> getInMemoryTimelineChunks(final Integer sourceId, final List<Integer> metricIds,
- @Nullable final DateTime filterStartTime, @Nullable final DateTime filterEndTime) throws IOException, ExecutionException {
+ @Nullable final DateTime filterStartTime, @Nullable final DateTime filterEndTime,
+ final InternalTenantContext context) throws IOException, ExecutionException {
getInMemoryChunksCallCount.incrementAndGet();
// Check first if there is an in-memory accumulator for this host
final SourceAccumulatorsAndUpdateDate sourceAccumulatorsAndDate = accumulators.get(sourceId);
@@ -335,7 +336,7 @@ public class TimelineEventHandler {
}
@VisibleForTesting
- void convertSamplesToScalarSamples(final Integer sourceId, final String eventType, final Map<String, Object> inputSamples,
+ void convertSamplesToScalarSamples(final String eventType, final Map<String, Object> inputSamples,
final Map<Integer, ScalarSample> outputSamples, final InternalCallContext context) {
if (inputSamples == null) {
return;
@@ -343,7 +344,7 @@ public class TimelineEventHandler {
final Integer eventCategoryId = timelineDAO.getOrAddEventCategory(eventType, context);
for (final String attributeName : inputSamples.keySet()) {
- final Integer metricId = timelineDAO.getOrAddMetric(sourceId, eventCategoryId, attributeName, context);
+ final Integer metricId = timelineDAO.getOrAddMetric(eventCategoryId, attributeName, context);
final Object sample = inputSamples.get(attributeName);
outputSamples.put(metricId, ScalarSample.fromObject(sample));
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
index 3288dc3..a229b85 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/TimelineSourceEventAccumulator.java
@@ -140,8 +140,9 @@ public class TimelineSourceEventAccumulator {
}
if (endTime == null) {
endTime = timestamp;
- } else if (!timestamp.isAfter(endTime)) {
- log.warn("Adding samples for host {}, timestamp {} is not after the end time {}; ignored",
+ } else if (timestamp.isBefore(endTime)) {
+ // Note: we allow multiple events at the same time
+ log.warn("Adding samples for host {}, timestamp {} is before the end time {}; ignored",
new Object[]{sourceId, dateFormatter.print(timestamp), dateFormatter.print(endTime)});
return;
}
diff --git a/meter/src/main/resources/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.sql.stg b/meter/src/main/resources/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.sql.stg
index ff64329..bbb566a 100644
--- a/meter/src/main/resources/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.sql.stg
+++ b/meter/src/main/resources/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.sql.stg
@@ -4,64 +4,64 @@ CHECK_TENANT() ::= "tenant_record_id = :tenantRecordId"
AND_CHECK_TENANT() ::= "AND <CHECK_TENANT()>"
getStreamingAggregationCandidates() ::= <<
- select
- chunk_id
- , source_id
- , metric_id
- , start_time
- , end_time
- , in_row_samples
- , blob_samples
- , sample_count
- , aggregation_level
- , not_valid
- , dont_aggregate
- from timeline_chunks
- where source_id != 0 and aggregation_level = :aggregationLevel and not_valid = 0
- <AND_CHECK_TENANT()>
- order by source_id, metric_id, start_time
- >>
+select
+ record_id
+, source_record_id
+, metric_record_id
+, start_time
+, end_time
+, in_row_samples
+, blob_samples
+, sample_count
+, aggregation_level
+, not_valid
+, dont_aggregate
+from timeline_chunks
+where source_record_id != 0 and aggregation_level = :aggregationLevel and not_valid = 0
+<AND_CHECK_TENANT()>
+order by source_record_id, metric_record_id, start_time
+>>
- getAggregationCandidatesForSourceIdAndMetricIds(metricIds) ::= <<
- select
- chunk_id
- , source_id
- , metric_id
- , start_time
- , end_time
- , in_row_samples
- , blob_samples
- , sample_count
- , aggregation_level
- , not_valid
- , dont_aggregate
- from timeline_chunks
- where source_id = :source_id
- and metric_id in (<metricIds>)
- <AND_CHECK_TENANT()>
- ;
+getAggregationCandidatesForSourceIdAndMetricIds(metricIds) ::= <<
+select
+ record_id
+, source_record_id
+, metric_record_id
+, start_time
+, end_time
+, in_row_samples
+, blob_samples
+, sample_count
+, aggregation_level
+, not_valid
+, dont_aggregate
+from timeline_chunks
+where source_record_id = :source_id
+and metric_record_id in (<metricIds>)
+<AND_CHECK_TENANT()>
+;
>>
getLastInsertedId() ::= <<
- select last_insert_id();
+select last_insert_id();
>>
makeTimelineChunkValid() ::= <<
- update timeline_chunks
- set not_valid = 0
- where chunk_id = :chunkId
- <AND_CHECK_TENANT()>
- ;
+update timeline_chunks
+set not_valid = 0
+where record_id = :chunkId
+<AND_CHECK_TENANT()>
+;
>>
makeTimelineChunksInvalid(chunkIds) ::=<<
- update timeline_chunks
- set not_valid = 1
- where chunk_id in (<chunkIds>)
- <AND_CHECK_TENANT()>
- ;
+update timeline_chunks
+set not_valid = 1
+where record_id in (<chunkIds>)
+<AND_CHECK_TENANT()>
+;
>>
deleteTimelineChunks(chunkIds) ::=<<
- delete from timeline_chunks where chunk_id in (<chunkIds>) <AND_CHECK_TENANT()>;
+delete from timeline_chunks where record_id in (<chunkIds>) <AND_CHECK_TENANT()>;
>>
diff --git a/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg b/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
index e782aa1..3ccc68b 100644
--- a/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
+++ b/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
@@ -107,24 +107,6 @@ and category_record_id = :categoryRecordId
;
>>
-getMetricRecordIdsBySourceRecordId() ::= <<
-select distinct
- metric_record_id
-from timeline_chunks c
-where source_record_id = :sourceRecordId
-<AND_CHECK_TENANT()>
-;
->>
-
-getMetricRecordIdsForAllSources() ::= <<
-select distinct
- metric_record_id
-, source_record_id
-from timeline_chunks c
-where <CHECK_TENANT()>
-;
->>
-
getCategoryRecordIdAndMetric() ::= <<
select
category_record_id
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java b/meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java
new file mode 100644
index 0000000..d3bd906
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.aggregator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.config.ConfigurationObjectFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuiteWithEmbeddedDB;
+import com.ning.billing.meter.timeline.TimelineSourceEventAccumulator;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
+import com.ning.billing.meter.timeline.persistent.DefaultTimelineDao;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.meter.timeline.samples.SampleOpcode;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.MeterConfig;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class TestTimelineAggregator extends MeterTestSuiteWithEmbeddedDB {
+
+ private static final UUID HOST_UUID = UUID.randomUUID();
+ private static final String HOST_NAME = HOST_UUID.toString();
+ private static final String EVENT_TYPE = "myType";
+ private static final int EVENT_TYPE_ID = 123;
+ private static final String MIN_HEAPUSED_KIND = "min_heapUsed";
+ private static final String MAX_HEAPUSED_KIND = "max_heapUsed";
+ private static final DateTime START_TIME = new DateTime(DateTimeZone.UTC);
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(getDBI(), new ClockMock());
+
+ private TimelineDao timelineDao;
+ private TimelineAggregator aggregator;
+
+ private Integer hostId = null;
+ private Integer minHeapUsedKindId = null;
+ private Integer maxHeapUsedKindId = null;
+
+ @BeforeMethod(groups = "slow")
+ public void setUp() throws Exception {
+ timelineDao = new DefaultTimelineDao(getDBI());
+ final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+ aggregator = new TimelineAggregator(getDBI(), timelineDao, timelineCoder, sampleCoder, config, internalCallContextFactory);
+ }
+
+ @Test(groups = "slow")
+ public void testAggregation() throws Exception {
+ // Create the host
+ hostId = timelineDao.getOrAddSource(HOST_NAME, internalCallContext);
+ Assert.assertNotNull(hostId);
+ Assert.assertEquals(timelineDao.getSources(internalCallContext).values().size(), 1);
+
+ // Create the sample kinds
+ minHeapUsedKindId = timelineDao.getOrAddMetric(EVENT_TYPE_ID, MIN_HEAPUSED_KIND, internalCallContext);
+ Assert.assertNotNull(minHeapUsedKindId);
+ maxHeapUsedKindId = timelineDao.getOrAddMetric(EVENT_TYPE_ID, MAX_HEAPUSED_KIND, internalCallContext);
+ Assert.assertNotNull(maxHeapUsedKindId);
+ Assert.assertEquals(timelineDao.getMetrics(internalCallContext).values().size(), 2);
+
+ // Create two sets of times: T - 125 ... T - 65 ; T - 60 ... T (note the gap!)
+ createAOneHourTimelineChunk(125);
+ createAOneHourTimelineChunk(60);
+
+ // Check the getSamplesByHostIdsAndSampleKindIds DAO method works as expected
+ // You might want to draw timelines on a paper and remember boundaries are inclusive to understand these numbers
+ checkSamplesForATimeline(185, 126, 0);
+ checkSamplesForATimeline(185, 125, 2);
+ checkSamplesForATimeline(64, 61, 0);
+ checkSamplesForATimeline(125, 65, 2);
+ checkSamplesForATimeline(60, 0, 2);
+ checkSamplesForATimeline(125, 0, 4);
+ checkSamplesForATimeline(124, 0, 4);
+ checkSamplesForATimeline(124, 66, 2);
+
+ aggregator.getAndProcessTimelineAggregationCandidates();
+
+ Assert.assertEquals(timelineDao.getSources(internalCallContext).values().size(), 1);
+ Assert.assertEquals(timelineDao.getMetrics(internalCallContext).values().size(), 2);
+
+ // Similar than above, but we have only 2 now
+ checkSamplesForATimeline(185, 126, 0);
+ checkSamplesForATimeline(185, 125, 2);
+ // Note, the gap is filled now
+ checkSamplesForATimeline(64, 61, 2);
+ checkSamplesForATimeline(125, 65, 2);
+ checkSamplesForATimeline(60, 0, 2);
+ checkSamplesForATimeline(125, 0, 2);
+ checkSamplesForATimeline(124, 0, 2);
+ checkSamplesForATimeline(124, 66, 2);
+ }
+
+ private void checkSamplesForATimeline(final Integer startTimeMinutesAgo, final Integer endTimeMinutesAgo, final long expectedChunks) throws InterruptedException {
+ final AtomicLong timelineChunkSeen = new AtomicLong(0);
+
+ timelineDao.getSamplesBySourceIdsAndMetricIds(ImmutableList.<Integer>of(hostId), ImmutableList.<Integer>of(minHeapUsedKindId, maxHeapUsedKindId),
+ START_TIME.minusMinutes(startTimeMinutesAgo), START_TIME.minusMinutes(endTimeMinutesAgo), new TimelineChunkConsumer() {
+
+ @Override
+ public void processTimelineChunk(final TimelineChunk chunk) {
+ Assert.assertEquals((Integer) chunk.getSourceId(), hostId);
+ Assert.assertTrue(chunk.getMetricId() == minHeapUsedKindId || chunk.getMetricId() == maxHeapUsedKindId);
+ timelineChunkSeen.incrementAndGet();
+ }
+ }, internalCallContext);
+
+ Assert.assertEquals(timelineChunkSeen.get(), expectedChunks);
+ }
+
+ private void createAOneHourTimelineChunk(final int startTimeMinutesAgo) throws IOException {
+ final DateTime firstSampleTime = START_TIME.minusMinutes(startTimeMinutesAgo);
+ final TimelineSourceEventAccumulator accumulator = new TimelineSourceEventAccumulator(timelineDao, timelineCoder, sampleCoder, hostId, EVENT_TYPE_ID, firstSampleTime, internalCallContextFactory);
+ // 120 samples per hour
+ for (int i = 0; i < 120; i++) {
+ final DateTime eventDateTime = firstSampleTime.plusSeconds(i * 30);
+ final Map<Integer, ScalarSample> event = createEvent(eventDateTime.getMillis());
+ final SourceSamplesForTimestamp samples = new SourceSamplesForTimestamp(hostId, EVENT_TYPE, eventDateTime, event);
+ accumulator.addSourceSamples(samples);
+ }
+
+ accumulator.extractAndQueueTimelineChunks();
+ }
+
+ private Map<Integer, ScalarSample> createEvent(final long ts) {
+ return ImmutableMap.<Integer, ScalarSample>of(
+ minHeapUsedKindId, new ScalarSample(SampleOpcode.LONG, Long.MIN_VALUE + ts),
+ maxHeapUsedKindId, new ScalarSample(SampleOpcode.LONG, Long.MAX_VALUE - ts)
+ );
+ }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestCSVConsumer.java b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestCSVConsumer.java
new file mode 100644
index 0000000..f8718f4
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/consumer/TestCSVConsumer.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.consumer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.samples.SampleOpcode;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+
+public class TestCSVConsumer extends MeterTestSuite {
+
+ private static final int HOST_ID = 1242;
+ private static final int SAMPLE_KIND_ID = 12;
+ private static final int CHUNK_ID = 30;
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ @Test(groups = "fast")
+ public void testToString() throws Exception {
+ final int sampleCount = 3;
+
+ final DateTime startTime = new DateTime("2012-01-16T21:23:58.000Z", DateTimeZone.UTC);
+ final List<DateTime> dateTimes = new ArrayList<DateTime>();
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final DataOutputStream stream = new DataOutputStream(out);
+
+ for (int i = 0; i < sampleCount; i++) {
+ sampleCoder.encodeSample(stream, new ScalarSample<Long>(SampleOpcode.LONG, 12345L + i));
+ dateTimes.add(startTime.plusSeconds(1 + i));
+ }
+
+ final DateTime endTime = dateTimes.get(dateTimes.size() - 1);
+ final byte[] times = timelineCoder.compressDateTimes(dateTimes);
+ final TimelineChunk timelineChunk = new TimelineChunk(CHUNK_ID, HOST_ID, SAMPLE_KIND_ID, startTime, endTime, times, out.toByteArray(), sampleCount);
+
+ // Test CSV filtering
+ Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk), "1326749039,12345,1326749040,12346,1326749041,12347");
+ Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, null, null), "1326749039,12345,1326749040,12346,1326749041,12347");
+ Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime, null), "1326749039,12345,1326749040,12346,1326749041,12347");
+ Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, null, startTime.plusSeconds(sampleCount)), "1326749039,12345,1326749040,12346,1326749041,12347");
+ Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime.plusSeconds(1), startTime.plusSeconds(sampleCount)), "1326749039,12345,1326749040,12346,1326749041,12347");
+ Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime.plusSeconds(2), startTime.plusSeconds(sampleCount)), "1326749040,12346,1326749041,12347");
+ Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime.plusSeconds(3), startTime.plusSeconds(sampleCount)), "1326749041,12347");
+ Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime.plusSeconds(4), startTime.plusSeconds(sampleCount)), "");
+ // Buggy start date
+ Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime.plusSeconds(10), startTime.plusSeconds(sampleCount)), "");
+ // Buggy end date
+ Assert.assertEquals(CSVConsumer.getSamplesAsCSV(sampleCoder, timelineChunk, startTime, startTime.minusSeconds(1)), "");
+ }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/MockFileBackedBuffer.java b/meter/src/test/java/com/ning/billing/meter/timeline/MockFileBackedBuffer.java
new file mode 100644
index 0000000..ffe6362
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/MockFileBackedBuffer.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.ning.billing.meter.timeline.persistent.FileBackedBuffer;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+
+public class MockFileBackedBuffer extends FileBackedBuffer {
+
+ private final List<SourceSamplesForTimestamp> hostSamplesForTimestamps = new ArrayList<SourceSamplesForTimestamp>();
+
+ public MockFileBackedBuffer() throws IOException {
+ // Kepp it small - 50 bytes. Will be allocated but not used
+ super(String.valueOf(System.nanoTime()), "test", 50, 1);
+ }
+
+ @Override
+ public boolean append(final SourceSamplesForTimestamp hostSamplesForTimestamp) {
+ hostSamplesForTimestamps.add(hostSamplesForTimestamp);
+ return true;
+ }
+
+ /**
+ * Discard in-memory and on-disk data
+ */
+ @Override
+ public void discard() {
+ hostSamplesForTimestamps.clear();
+ }
+
+ @Override
+ public long getBytesInMemory() {
+ return -1;
+ }
+
+ @Override
+ public long getBytesOnDisk() {
+ return 0;
+ }
+
+ @Override
+ public long getFilesCreated() {
+ return 0;
+ }
+
+ @Override
+ public long getInMemoryAvailableSpace() {
+ return -1;
+ }
+
+ public List<SourceSamplesForTimestamp> getSourceSamplesForTimestamps() {
+ return hostSamplesForTimestamps;
+ }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/MockTimelineDao.java b/meter/src/test/java/com/ning/billing/meter/timeline/MockTimelineDao.java
new file mode 100644
index 0000000..a2cd301
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/MockTimelineDao.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.meter.timeline.shutdown.StartTimes;
+import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.callcontext.InternalTenantContext;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+
+public final class MockTimelineDao implements TimelineDao {
+
+ private final BiMap<Integer, String> hosts = HashBiMap.create();
+ private final BiMap<Integer, CategoryRecordIdAndMetric> sampleKinds = HashBiMap.create();
+ private final BiMap<Integer, String> eventCategories = HashBiMap.create();
+ private final BiMap<Integer, TimelineChunk> timelineChunks = HashBiMap.create();
+ private final Map<Integer, Map<Integer, List<TimelineChunk>>> samplesPerHostAndSampleKind = new HashMap<Integer, Map<Integer, List<TimelineChunk>>>();
+ private final AtomicReference<StartTimes> lastStartTimes = new AtomicReference<StartTimes>();
+
+ @Override
+ public Integer getSourceId(final String host, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ synchronized (hosts) {
+ return hosts.inverse().get(host);
+ }
+ }
+
+ @Override
+ public String getSource(final Integer hostId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ synchronized (hosts) {
+ return hosts.get(hostId);
+ }
+ }
+
+ @Override
+ public BiMap<Integer, String> getSources(final InternalTenantContext context) {
+ return hosts;
+ }
+
+ @Override
+ public int getOrAddSource(final String host, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ synchronized (hosts) {
+ final Integer hostId = getSourceId(host, context);
+ if (hostId == null) {
+ hosts.put(hosts.size() + 1, host);
+ return hosts.size();
+ } else {
+ return hostId;
+ }
+ }
+ }
+
+ @Override
+ public Integer getEventCategoryId(final String eventCategory, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ synchronized (eventCategories) {
+ return eventCategories.inverse().get(eventCategory);
+ }
+ }
+
+ @Override
+ public String getEventCategory(final Integer eventCategoryId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ synchronized (eventCategories) {
+ return eventCategories.get(eventCategoryId);
+ }
+ }
+
+ @Override
+ public int getOrAddEventCategory(final String eventCategory, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ synchronized (eventCategories) {
+ Integer eventCategoryId = getEventCategoryId(eventCategory, context);
+ if (eventCategoryId == null) {
+ eventCategoryId = eventCategories.size() + 1;
+ eventCategories.put(eventCategoryId, eventCategory);
+ }
+
+ return eventCategoryId;
+ }
+ }
+
+ @Override
+ public BiMap<Integer, String> getEventCategories(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ return eventCategories;
+ }
+
+ @Override
+ public Integer getMetricId(final int eventCategoryId, final String sampleKind, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ synchronized (sampleKinds) {
+ return sampleKinds.inverse().get(new CategoryRecordIdAndMetric(eventCategoryId, sampleKind));
+ }
+ }
+
+ @Override
+ public CategoryRecordIdAndMetric getCategoryIdAndMetric(final Integer sampleKindId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ synchronized (sampleKinds) {
+ return sampleKinds.get(sampleKindId);
+ }
+ }
+
+ @Override
+ public BiMap<Integer, CategoryRecordIdAndMetric> getMetrics(final InternalTenantContext context) {
+ synchronized (sampleKinds) {
+ return sampleKinds;
+ }
+ }
+
+ @Override
+ public int getOrAddMetric(final Integer eventCategoryId, final String sampleKind, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ synchronized (sampleKinds) {
+ Integer sampleKindId = getMetricId(eventCategoryId, sampleKind, context);
+ if (sampleKindId == null) {
+ sampleKindId = sampleKinds.size() + 1;
+ sampleKinds.put(sampleKindId, new CategoryRecordIdAndMetric(eventCategoryId, sampleKind));
+ }
+ return sampleKindId;
+ }
+ }
+
+ @Override
+ public Long insertTimelineChunk(final TimelineChunk chunk, final InternalCallContext context) {
+ final Long timelineChunkId;
+ synchronized (timelineChunks) {
+ timelineChunks.put(timelineChunks.size(), chunk);
+ timelineChunkId = (long) timelineChunks.size() - 1;
+ }
+
+ synchronized (samplesPerHostAndSampleKind) {
+ Map<Integer, List<TimelineChunk>> samplesPerSampleKind = samplesPerHostAndSampleKind.get(chunk.getSourceId());
+ if (samplesPerSampleKind == null) {
+ samplesPerSampleKind = new HashMap<Integer, List<TimelineChunk>>();
+ }
+
+ List<TimelineChunk> chunkAndTimes = samplesPerSampleKind.get(chunk.getMetricId());
+ if (chunkAndTimes == null) {
+ chunkAndTimes = new ArrayList<TimelineChunk>();
+ }
+
+ chunkAndTimes.add(chunk);
+ samplesPerSampleKind.put(chunk.getMetricId(), chunkAndTimes);
+
+ samplesPerHostAndSampleKind.put(chunk.getSourceId(), samplesPerSampleKind);
+ }
+
+ return timelineChunkId;
+ }
+
+ @Override
+ public void getSamplesBySourceIdsAndMetricIds(final List<Integer> hostIds, @Nullable final List<Integer> sampleKindIds,
+ final DateTime startTime, final DateTime endTime,
+ final TimelineChunkConsumer chunkConsumer, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ for (final Integer hostId : samplesPerHostAndSampleKind.keySet()) {
+ if (hostIds.indexOf(hostId) == -1) {
+ continue;
+ }
+
+ final Map<Integer, List<TimelineChunk>> samplesPerSampleKind = samplesPerHostAndSampleKind.get(hostId);
+ for (final Integer sampleKindId : samplesPerSampleKind.keySet()) {
+ if (sampleKindIds != null && sampleKindIds.indexOf(sampleKindId) == -1) {
+ continue;
+ }
+
+ for (final TimelineChunk chunk : samplesPerSampleKind.get(sampleKindId)) {
+ if (chunk.getStartTime().isAfter(endTime) || chunk.getEndTime().isBefore(startTime)) {
+ continue;
+ }
+
+ chunkConsumer.processTimelineChunk(chunk);
+ }
+ }
+ }
+ }
+
+ @Override
+ public StartTimes getLastStartTimes(final InternalTenantContext context) {
+ return lastStartTimes.get();
+ }
+
+ @Override
+ public Integer insertLastStartTimes(final StartTimes startTimes, final InternalCallContext context) {
+ lastStartTimes.set(startTimes);
+ return 1;
+ }
+
+ @Override
+ public void deleteLastStartTimes(final InternalCallContext context) {
+ lastStartTimes.set(null);
+ }
+
+ @Override
+ public void test(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+ }
+
+ public BiMap<Integer, TimelineChunk> getTimelineChunks() {
+ return timelineChunks;
+ }
+
+ @Override
+ public void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList, final InternalCallContext context) {
+ for (final TimelineChunk chunk : timelineChunkList) {
+ insertTimelineChunk(chunk, context);
+ }
+ }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java
index ad51041..860ceb5 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java
@@ -57,10 +57,10 @@ public class TestDefaultTimelineDao extends MeterTestSuiteWithEmbeddedDB {
// Create the samples
final String sampleOne = UUID.randomUUID().toString();
- final Integer sampleOneId = dao.getOrAddMetric(hostId, eventCategoryId, sampleOne, internalCallContext);
+ final Integer sampleOneId = dao.getOrAddMetric(eventCategoryId, sampleOne, internalCallContext);
Assert.assertNotNull(sampleOneId);
final String sampleTwo = UUID.randomUUID().toString();
- final Integer sampleTwoId = dao.getOrAddMetric(hostId, eventCategoryId, sampleTwo, internalCallContext);
+ final Integer sampleTwoId = dao.getOrAddMetric(eventCategoryId, sampleTwo, internalCallContext);
Assert.assertNotNull(sampleTwoId);
// Basic retrieval tests
@@ -75,26 +75,11 @@ public class TestDefaultTimelineDao extends MeterTestSuiteWithEmbeddedDB {
Assert.assertEquals(dao.getCategoryIdAndMetric(sampleTwoId, internalCallContext).getEventCategoryId(), (int) eventCategoryId);
Assert.assertEquals(dao.getCategoryIdAndMetric(sampleTwoId, internalCallContext).getMetric(), sampleTwo);
- // No samples yet
- Assert.assertEquals(ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext)).size(), 0);
-
dao.insertTimelineChunk(new TimelineChunk(0, hostId, sampleOneId, startTime, endTime, new byte[0], new byte[0], 0), internalCallContext);
- final ImmutableList<Integer> firstFetch = ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext));
- Assert.assertEquals(firstFetch.size(), 1);
- Assert.assertEquals(firstFetch.get(0), sampleOneId);
-
dao.insertTimelineChunk(new TimelineChunk(0, hostId, sampleTwoId, startTime, endTime, new byte[0], new byte[0], 0), internalCallContext);
- final ImmutableList<Integer> secondFetch = ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext));
- Assert.assertEquals(secondFetch.size(), 2);
- Assert.assertTrue(secondFetch.contains(sampleOneId));
- Assert.assertTrue(secondFetch.contains(sampleTwoId));
// Random sampleKind for random host
dao.insertTimelineChunk(new TimelineChunk(0, Integer.MAX_VALUE - 100, Integer.MAX_VALUE, startTime, endTime, new byte[0], new byte[0], 0), internalCallContext);
- final ImmutableList<Integer> thirdFetch = ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext));
- Assert.assertEquals(secondFetch.size(), 2);
- Assert.assertTrue(thirdFetch.contains(sampleOneId));
- Assert.assertTrue(thirdFetch.contains(sampleTwoId));
// Test dashboard query
final AtomicInteger chunksSeen = new AtomicInteger(0);
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestFileBackedBuffer.java b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestFileBackedBuffer.java
new file mode 100644
index 0000000..b41aec0
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestFileBackedBuffer.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.persistent;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.BackgroundDBChunkWriter;
+import com.ning.billing.meter.timeline.MockTimelineDao;
+import com.ning.billing.meter.timeline.TimelineEventHandler;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.MeterConfig;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestFileBackedBuffer extends MeterTestSuite {
+
+ private static final Logger log = LoggerFactory.getLogger(TestFileBackedBuffer.class);
+
+ private static final UUID HOST_UUID = UUID.randomUUID();
+ private static final String KIND_A = "kindA";
+ private static final String KIND_B = "kindB";
+ private static final Map<String, Object> EVENT = ImmutableMap.<String, Object>of(KIND_A, 12, KIND_B, 42);
+ // ~105 bytes per event, 10 1MB buffers -> need at least 100,000 events to spill over
+ private static final int NB_EVENTS = 100000;
+ private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestFileBackedBuffer-" + System.currentTimeMillis());
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(DBI.class), new ClockMock());
+ private final TimelineDao dao = new MockTimelineDao();
+ private TimelineEventHandler timelineEventHandler;
+
+ @BeforeMethod(groups = "fast")
+ public void setUp() throws Exception {
+ Assert.assertTrue(basePath.mkdir());
+ System.setProperty("killbill.usage.timelines.spoolDir", basePath.getAbsolutePath());
+ System.setProperty("killbill.usage.timelines.length", "60s");
+ final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+ timelineEventHandler = new TimelineEventHandler(config, dao, timelineCoder, sampleCoder, new BackgroundDBChunkWriter(dao, config, internalCallContextFactory),
+ new FileBackedBuffer(config.getSpoolDir(), "TimelineEventHandler", 1024 * 1024, 10));
+
+ dao.getOrAddSource(HOST_UUID.toString(), internalCallContext);
+ }
+
+ @Test(groups = "fast") // Not really fast, but doesn't require a database
+ public void testAppend() throws Exception {
+ log.info("Writing files to " + basePath);
+ final List<File> binFiles = new ArrayList<File>();
+
+ final List<DateTime> timestampsRecorded = new ArrayList<DateTime>();
+ final List<String> categoriesRecorded = new ArrayList<String>();
+
+ // Sanity check before the tests
+ Assert.assertEquals(timelineEventHandler.getBackingBuffer().getFilesCreated(), 0);
+ findBinFiles(binFiles, basePath);
+ Assert.assertEquals(binFiles.size(), 0);
+
+ // Send enough events to spill over to disk
+ final DateTime startTime = new DateTime(DateTimeZone.UTC);
+ for (int i = 0; i < NB_EVENTS; i++) {
+ final String category = UUID.randomUUID().toString();
+ final DateTime eventTimestamp = startTime.plusSeconds(i);
+ timelineEventHandler.record(HOST_UUID.toString(), category, eventTimestamp, EVENT, internalCallContext);
+ timestampsRecorded.add(eventTimestamp);
+ categoriesRecorded.add(category);
+ }
+
+ // Check the files have been created (at least one per accumulator)
+ final long bytesOnDisk = timelineEventHandler.getBackingBuffer().getBytesOnDisk();
+ Assert.assertTrue(timelineEventHandler.getBackingBuffer().getFilesCreated() > 0);
+ binFiles.clear();
+ findBinFiles(binFiles, basePath);
+ Assert.assertTrue(binFiles.size() > 0);
+
+ log.info("Sent {} events and wrote {} bytes on disk ({} bytes/event)", new Object[]{NB_EVENTS, bytesOnDisk, bytesOnDisk / NB_EVENTS});
+
+ // Replay the events. Note that size of timestamp recorded != eventsReplayed as some of the ones sent are still in memory
+ final Replayer replayer = new Replayer(basePath.getAbsolutePath());
+ final List<SourceSamplesForTimestamp> eventsReplayed = replayer.readAll();
+ for (int i = 0; i < eventsReplayed.size(); i++) {
+ // Looks like Jackson maps it back using the JVM timezone
+ Assert.assertEquals(eventsReplayed.get(i).getTimestamp().toDateTime(DateTimeZone.UTC), timestampsRecorded.get(i));
+ Assert.assertEquals(eventsReplayed.get(i).getCategory(), categoriesRecorded.get(i));
+ }
+
+ // Make sure files have been deleted
+ binFiles.clear();
+ findBinFiles(binFiles, basePath);
+ Assert.assertEquals(binFiles.size(), 0);
+ }
+
+ private static void findBinFiles(final Collection<File> files, final File directory) {
+ final File[] found = directory.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(final File pathname) {
+ return pathname.getName().endsWith(".bin");
+ }
+ });
+ if (found != null) {
+ for (final File file : found) {
+ if (file.isDirectory()) {
+ findBinFiles(files, file);
+ } else {
+ files.add(file);
+ }
+ }
+ }
+ }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestSamplesReplayer.java b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestSamplesReplayer.java
new file mode 100644
index 0000000..4da4131
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestSamplesReplayer.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.persistent;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.DBI;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.MockTimelineDao;
+import com.ning.billing.meter.timeline.TimelineSourceEventAccumulator;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.samples.SampleOpcode;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+
+import com.google.common.collect.ImmutableMap;
+
+// Lightweight version of TestFileBackedBuffer
+public class TestSamplesReplayer extends MeterTestSuite {
+
+ // Total space: 255 * 3 = 765 bytes
+ private static final int NB_EVENTS = 3;
+ // One will still be in memory after the flush
+ private static final int EVENTS_ON_DISK = NB_EVENTS - 1;
+ private static final int HOST_ID = 1;
+ private static final int EVENT_CATEGORY_ID = 123;
+ private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestSamplesReplayer-" + System.currentTimeMillis());
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(DBI.class), new ClockMock());
+
+ @BeforeMethod(groups = "fast")
+ public void setUp() throws Exception {
+ Assert.assertTrue(basePath.mkdir());
+ }
+
+ @Test(groups = "fast")
+ public void testIdentityFilter() throws Exception {
+ // Need less than 765 + 1 (metadata) bytes
+ final FileBackedBuffer fileBackedBuffer = new FileBackedBuffer(basePath.toString(), "test", 765, 1);
+
+ // Create the host samples - this will take 255 bytes
+ final Map<Integer, ScalarSample> eventMap = new HashMap<Integer, ScalarSample>();
+ eventMap.putAll(ImmutableMap.<Integer, ScalarSample>of(
+ 1, new ScalarSample(SampleOpcode.BYTE, (byte) 0),
+ 2, new ScalarSample(SampleOpcode.SHORT, (short) 1),
+ 3, new ScalarSample(SampleOpcode.INT, 1000),
+ 4, new ScalarSample(SampleOpcode.LONG, 12345678901L),
+ 5, new ScalarSample(SampleOpcode.DOUBLE, Double.MAX_VALUE)
+ ));
+ eventMap.putAll(ImmutableMap.<Integer, ScalarSample>of(
+ 6, new ScalarSample(SampleOpcode.FLOAT, Float.NEGATIVE_INFINITY),
+ 7, new ScalarSample(SampleOpcode.STRING, "pwet")
+ ));
+ final DateTime firstTime = new DateTime(DateTimeZone.UTC).minusSeconds(NB_EVENTS * 30);
+
+ // Write the samples to disk
+ for (int i = 0; i < NB_EVENTS; i++) {
+ final SourceSamplesForTimestamp samples = new SourceSamplesForTimestamp(HOST_ID, "something", firstTime.plusSeconds(30 * i), eventMap);
+ fileBackedBuffer.append(samples);
+ }
+
+ // Try the replayer
+ final Replayer replayer = new Replayer(new File(basePath.toString()).getAbsolutePath());
+ final List<SourceSamplesForTimestamp> hostSamples = replayer.readAll();
+ Assert.assertEquals(hostSamples.size(), EVENTS_ON_DISK);
+
+ // Try to encode them again
+ final MockTimelineDao dao = new MockTimelineDao();
+ final TimelineSourceEventAccumulator accumulator = new TimelineSourceEventAccumulator(dao, timelineCoder, sampleCoder, HOST_ID,
+ EVENT_CATEGORY_ID, hostSamples.get(0).getTimestamp(), internalCallContextFactory);
+ for (final SourceSamplesForTimestamp samplesFound : hostSamples) {
+ accumulator.addSourceSamples(samplesFound);
+ }
+ Assert.assertTrue(accumulator.checkSampleCounts(EVENTS_ON_DISK));
+
+ // This will check the SampleCode can encode value correctly
+ accumulator.extractAndQueueTimelineChunks();
+ Assert.assertEquals(dao.getTimelineChunks().keySet().size(), 7);
+ for (final TimelineChunk chunk : dao.getTimelineChunks().values()) {
+ Assert.assertEquals(chunk.getSourceId(), HOST_ID);
+ Assert.assertEquals(chunk.getSampleCount(), EVENTS_ON_DISK);
+ }
+ }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/TestInMemoryEventHandler.java b/meter/src/test/java/com/ning/billing/meter/timeline/TestInMemoryEventHandler.java
new file mode 100644
index 0000000..2ee5851
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/TestInMemoryEventHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.io.File;
+import java.util.Map;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.persistent.FileBackedBuffer;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.MeterConfig;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestInMemoryEventHandler extends MeterTestSuite {
+
+ private static final UUID HOST_UUID = UUID.randomUUID();
+ private static final String EVENT_TYPE = "eventType";
+ private static final String SAMPLE_KIND_A = "kindA";
+ private static final String SAMPLE_KIND_B = "kindB";
+ private static final Map<String, Object> EVENT = ImmutableMap.<String, Object>of(SAMPLE_KIND_A, 12, SAMPLE_KIND_B, 42);
+ private static final int NB_EVENTS = 5;
+ private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestInMemoryCollectorEventProcessor-" + System.currentTimeMillis());
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(DBI.class), new ClockMock());
+
+ private final TimelineDao dao = new MockTimelineDao();
+ private TimelineEventHandler timelineEventHandler;
+ private int eventTypeId = 0;
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception {
+ Assert.assertTrue(basePath.mkdir());
+ System.setProperty("killbill.usage.timelines.spoolDir", basePath.getAbsolutePath());
+ final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+ timelineEventHandler = new TimelineEventHandler(config, dao, timelineCoder, sampleCoder, new BackgroundDBChunkWriter(dao, config, internalCallContextFactory),
+ new FileBackedBuffer(config.getSpoolDir(), "TimelineEventHandler", 1024 * 1024, 10));
+
+ dao.getOrAddSource(HOST_UUID.toString(), internalCallContext);
+ eventTypeId = dao.getOrAddEventCategory(EVENT_TYPE, internalCallContext);
+ }
+
+ @Test(groups = "fast")
+ public void testInMemoryFilters() throws Exception {
+ final DateTime startTime = new DateTime(DateTimeZone.UTC);
+ for (int i = 0; i < NB_EVENTS; i++) {
+ timelineEventHandler.record(HOST_UUID.toString(), EVENT_TYPE, startTime, EVENT, internalCallContext);
+ }
+ final DateTime endTime = new DateTime(DateTimeZone.UTC);
+
+ final Integer hostId = dao.getSourceId(HOST_UUID.toString(), internalCallContext);
+ Assert.assertNotNull(hostId);
+ final Integer sampleKindAId = dao.getMetricId(eventTypeId, SAMPLE_KIND_A, internalCallContext);
+ Assert.assertNotNull(sampleKindAId);
+ final Integer sampleKindBId = dao.getMetricId(eventTypeId, SAMPLE_KIND_B, internalCallContext);
+ Assert.assertNotNull(sampleKindBId);
+
+ // One per host and type
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, null, null, internalCallContext).size(), 2);
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, startTime, null, internalCallContext).size(), 2);
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, null, endTime, internalCallContext).size(), 2);
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, startTime, endTime, internalCallContext).size(), 2);
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindAId, startTime, endTime, internalCallContext).size(), 1);
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindBId, startTime, endTime, internalCallContext).size(), 1);
+ // Wider ranges should be supported
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindBId, startTime.minusSeconds(1), endTime, internalCallContext).size(), 1);
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindBId, startTime, endTime.plusSeconds(1), internalCallContext).size(), 1);
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, sampleKindBId, startTime.minusSeconds(1), endTime.plusSeconds(1), internalCallContext).size(), 1);
+ // Buggy kind
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, Integer.MAX_VALUE, startTime, endTime, internalCallContext).size(), 0);
+ // Buggy start date
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, startTime.plusMinutes(1), endTime, internalCallContext).size(), 0);
+ // Buggy end date
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(hostId, startTime, endTime.minusMinutes(1), internalCallContext).size(), 0);
+ // Buggy host
+ Assert.assertEquals(timelineEventHandler.getInMemoryTimelineChunks(Integer.MAX_VALUE, startTime, endTime, internalCallContext).size(), 0);
+ }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineEventHandler.java b/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineEventHandler.java
new file mode 100644
index 0000000..89d5512
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineEventHandler.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.persistent.TimelineDao;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.MeterConfig;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestTimelineEventHandler extends MeterTestSuite {
+
+ private static final File basePath = new File(System.getProperty("java.io.tmpdir"), "TestTimelineEventHandler-" + System.currentTimeMillis());
+ private static final String EVENT_TYPE = "eventType";
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(DBI.class), new ClockMock());
+
+ private final TimelineDao dao = new MockTimelineDao();
+
+ @Test(groups = "fast")
+ public void testDownsizingValues() throws Exception {
+ Assert.assertTrue(basePath.mkdir());
+ System.setProperty("killbill.usage.timelines.spoolDir", basePath.getAbsolutePath());
+ final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+ final int eventTypeId = dao.getOrAddEventCategory(EVENT_TYPE, internalCallContext);
+ final int int2shortId = dao.getOrAddMetric(eventTypeId, "int2short", internalCallContext);
+ final int long2intId = dao.getOrAddMetric(eventTypeId, "long2int", internalCallContext);
+ final int long2shortId = dao.getOrAddMetric(eventTypeId, "long2short", internalCallContext);
+ final int int2intId = dao.getOrAddMetric(eventTypeId, "int2int", internalCallContext);
+ final int long2longId = dao.getOrAddMetric(eventTypeId, "long2long", internalCallContext);
+ final int hostId = 1;
+ final TimelineEventHandler handler = new TimelineEventHandler(config, dao, timelineCoder, sampleCoder,
+ new BackgroundDBChunkWriter(dao, config, internalCallContextFactory), new MockFileBackedBuffer());
+
+ // Test downsizing of values
+ final Map<String, Object> event = ImmutableMap.<String, Object>of(
+ "int2short", new Integer(1),
+ "long2int", new Long(Integer.MAX_VALUE),
+ "long2short", new Long(2),
+ "int2int", Integer.MAX_VALUE,
+ "long2long", Long.MAX_VALUE);
+ final Map<Integer, ScalarSample> output = convertEventToSamples(handler, event, EVENT_TYPE);
+
+ Assert.assertEquals(output.get(int2shortId).getSampleValue(), (short) 1);
+ Assert.assertEquals(output.get(int2shortId).getSampleValue().getClass(), Short.class);
+ Assert.assertEquals(output.get(long2intId).getSampleValue(), Integer.MAX_VALUE);
+ Assert.assertEquals(output.get(long2intId).getSampleValue().getClass(), Integer.class);
+ Assert.assertEquals(output.get(long2shortId).getSampleValue(), (short) 2);
+ Assert.assertEquals(output.get(long2shortId).getSampleValue().getClass(), Short.class);
+ Assert.assertEquals(output.get(int2intId).getSampleValue(), Integer.MAX_VALUE);
+ Assert.assertEquals(output.get(int2intId).getSampleValue().getClass(), Integer.class);
+ Assert.assertEquals(output.get(long2longId).getSampleValue(), Long.MAX_VALUE);
+ Assert.assertEquals(output.get(long2longId).getSampleValue().getClass(), Long.class);
+ }
+
+ private Map<Integer, ScalarSample> convertEventToSamples(final TimelineEventHandler handler, final Map<String, Object> event, final String eventType) {
+ final Map<Integer, ScalarSample> output = new HashMap<Integer, ScalarSample>();
+ handler.convertSamplesToScalarSamples(eventType, event, output, internalCallContext);
+ return output;
+ }
+
+ private void processOneEvent(final TimelineEventHandler handler, final int hostId, final String eventType, final String sampleKind, final DateTime timestamp) throws Exception {
+ final Map<String, Object> rawEvent = ImmutableMap.<String, Object>of(sampleKind, new Integer(1));
+ final Map<Integer, ScalarSample> convertedEvent = convertEventToSamples(handler, rawEvent, eventType);
+ handler.processSamples(new SourceSamplesForTimestamp(hostId, eventType, timestamp, convertedEvent), internalCallContext);
+ }
+
+ private void sleep(final int millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Test(groups = "fast")
+ public void testPurgeAccumulators() throws Exception {
+ System.setProperty("arecibo.collector.timelines.spoolDir", basePath.getAbsolutePath());
+ final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+ final TimelineEventHandler handler = new TimelineEventHandler(config, dao, timelineCoder, sampleCoder, new BackgroundDBChunkWriter(dao, config, internalCallContextFactory), new MockFileBackedBuffer());
+ Assert.assertEquals(handler.getAccumulators().size(), 0);
+ processOneEvent(handler, 1, "eventType1", "sampleKind1", new DateTime());
+ sleep(20);
+ final DateTime purgeBeforeTime = new DateTime();
+ sleep(20);
+ processOneEvent(handler, 1, "eventType2", "sampleKind2", new DateTime());
+ Assert.assertEquals(handler.getAccumulators().size(), 2);
+ handler.purgeOldSourcesAndAccumulators(purgeBeforeTime);
+ final Collection<TimelineSourceEventAccumulator> accumulators = handler.getAccumulators();
+ Assert.assertEquals(accumulators.size(), 1);
+ }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineSourceEventAccumulator.java b/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineSourceEventAccumulator.java
new file mode 100644
index 0000000..55dd0b8
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/TestTimelineSourceEventAccumulator.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.DBI;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuite;
+import com.ning.billing.meter.timeline.codec.DefaultSampleCoder;
+import com.ning.billing.meter.timeline.codec.SampleCoder;
+import com.ning.billing.meter.timeline.samples.SampleOpcode;
+import com.ning.billing.meter.timeline.samples.ScalarSample;
+import com.ning.billing.meter.timeline.sources.SourceSamplesForTimestamp;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+
+public class TestTimelineSourceEventAccumulator extends MeterTestSuite {
+
+ private static final int HOST_ID = 1;
+ private static final int EVENT_CATEGORY_ID = 123;
+
+ private static final MockTimelineDao dao = new MockTimelineDao();
+ private static final TimelineCoder timelineCoder = new DefaultTimelineCoder();
+ private static final SampleCoder sampleCoder = new DefaultSampleCoder();
+
+ private final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(DBI.class), new ClockMock());
+
+ @Test(groups = "fast")
+ public void testSimpleAggregate() throws IOException {
+ final DateTime startTime = new DateTime(DateTimeZone.UTC);
+ final TimelineSourceEventAccumulator accumulator = new TimelineSourceEventAccumulator(dao, timelineCoder, sampleCoder, HOST_ID,
+ EVENT_CATEGORY_ID, startTime, internalCallContextFactory);
+
+ // Send a first type of data
+ final int sampleCount = 5;
+ final int sampleKindId = 1;
+ sendData(accumulator, startTime, sampleCount, sampleKindId);
+ Assert.assertEquals(accumulator.getStartTime(), startTime);
+ Assert.assertEquals(accumulator.getEndTime(), startTime.plusSeconds(sampleCount - 1));
+ Assert.assertEquals(accumulator.getSourceId(), HOST_ID);
+ Assert.assertEquals(accumulator.getTimelines().size(), 1);
+ Assert.assertEquals(accumulator.getTimelines().get(sampleKindId).getSampleCount(), sampleCount);
+ Assert.assertEquals(accumulator.getTimelines().get(sampleKindId).getMetricId(), sampleKindId);
+
+ // Send now a second type
+ final DateTime secondStartTime = startTime.plusSeconds(sampleCount + 1);
+ final int secondSampleCount = 15;
+ final int secondSampleKindId = 2;
+ sendData(accumulator, secondStartTime, secondSampleCount, secondSampleKindId);
+ // We keep the start time of the accumulator
+ Assert.assertEquals(accumulator.getStartTime(), startTime);
+ Assert.assertEquals(accumulator.getEndTime(), secondStartTime.plusSeconds(secondSampleCount - 1));
+ Assert.assertEquals(accumulator.getSourceId(), HOST_ID);
+ Assert.assertEquals(accumulator.getTimelines().size(), 2);
+ // We advance all timelines in parallel
+ Assert.assertEquals(accumulator.getTimelines().get(sampleKindId).getSampleCount(), sampleCount + secondSampleCount);
+ Assert.assertEquals(accumulator.getTimelines().get(sampleKindId).getMetricId(), sampleKindId);
+ Assert.assertEquals(accumulator.getTimelines().get(secondSampleKindId).getSampleCount(), sampleCount + secondSampleCount);
+ Assert.assertEquals(accumulator.getTimelines().get(secondSampleKindId).getMetricId(), secondSampleKindId);
+ }
+
+ private void sendData(final TimelineSourceEventAccumulator accumulator, final DateTime startTime, final int sampleCount, final int sampleKindId) {
+ final Map<Integer, ScalarSample> samples = new HashMap<Integer, ScalarSample>();
+
+ for (int i = 0; i < sampleCount; i++) {
+ samples.put(sampleKindId, new ScalarSample<Long>(SampleOpcode.LONG, i + 1242L));
+ final SourceSamplesForTimestamp hostSamplesForTimestamp = new SourceSamplesForTimestamp(HOST_ID, "JVM", startTime.plusSeconds(i), samples);
+ accumulator.addSourceSamples(hostSamplesForTimestamp);
+ }
+ }
+}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/TimelineLoadGenerator.java b/meter/src/test/java/com/ning/billing/meter/timeline/TimelineLoadGenerator.java
new file mode 100644
index 0000000..ab34e70
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/TimelineLoadGenerator.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.persistent.CachingTimelineDao;
+import com.ning.billing.meter.timeline.persistent.DefaultTimelineDao;
+import com.ning.billing.meter.timeline.times.DefaultTimelineCoder;
+import com.ning.billing.meter.timeline.times.TimelineCoder;
+import com.ning.billing.util.callcontext.CallOrigin;
+import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.clock.ClockMock;
+
+import com.google.common.collect.BiMap;
+
+/**
+ * This class simulates the database load due to insertions and deletions of
+ * TimelineChunks rows, as required by sample processing and
+ * aggregation. Each is single-threaded.
+ */
+public class TimelineLoadGenerator {
+
+ private static final Logger log = LoggerFactory.getLogger(TimelineLoadGenerator.class);
+ private static final int EVENT_CATEGORY_COUNT = Integer.parseInt(System.getProperty("com.ning.billing.timeline.eventCategoryCount", "250"));
+ private static final int HOST_ID_COUNT = Integer.parseInt(System.getProperty("com.ning.billing.timeline.hostIdCount", "2000"));
+ private static final int AVERAGE_SAMPLE_KINDS_PER_CATEGORY = Integer.parseInt(System.getProperty("com.ning.billing.timeline.averageSampleKindsPerCategory", "20"));
+ private static final int AVERAGE_CATEGORIES_PER_HOST = Integer.parseInt(System.getProperty("com.ning.billing.timeline.averageSampleKindsPerCategory", "25"));
+ private static final int SAMPLE_KIND_COUNT = EVENT_CATEGORY_COUNT * AVERAGE_SAMPLE_KINDS_PER_CATEGORY;
+ private static final int CREATE_BATCH_SIZE = Integer.parseInt(System.getProperty("com.ning.billing.timeline.createBatchSize", "1000"));
+ // Mandatory properties
+ private static final String DBI_URL = System.getProperty("com.ning.billing.timeline.db.url");
+ private static final String DBI_USER = System.getProperty("com.ning.billing.timeline.db.user");
+ private static final String DBI_PASSWORD = System.getProperty("com.ning.billing.timeline.db.password");
+
+ private static final Random rand = new Random(System.currentTimeMillis());
+
+ private final List<Integer> hostIds;
+ private final BiMap<Integer, String> hosts;
+ private final BiMap<Integer, String> eventCategories;
+ private final List<Integer> eventCategoryIds;
+ private final BiMap<Integer, CategoryRecordIdAndMetric> sampleKindsBiMap;
+ private final Map<Integer, List<Integer>> categorySampleKindIds;
+ private final Map<Integer, List<Integer>> categoriesForHostId;
+
+ private final DefaultTimelineDao defaultTimelineDAO;
+ private final CachingTimelineDao timelineDAO;
+ private final DBI dbi;
+ private final TimelineCoder timelineCoder;
+
+ private final AtomicInteger timelineChunkIdCounter = new AtomicInteger(0);
+
+ private final Clock clock = new ClockMock();
+ private final InternalCallContext internalCallContext = new InternalCallContext(InternalCallContextFactory.INTERNAL_TENANT_RECORD_ID, 1687L, UUID.randomUUID(),
+ UUID.randomUUID().toString(), CallOrigin.TEST,
+ UserType.TEST, "Testing", "This is a test",
+ clock.getUTCNow(), clock.getUTCNow());
+
+ public TimelineLoadGenerator() {
+ this.timelineCoder = new DefaultTimelineCoder();
+
+ this.dbi = new DBI(DBI_URL, DBI_USER, DBI_PASSWORD);
+ this.defaultTimelineDAO = new DefaultTimelineDao(dbi);
+ this.timelineDAO = new CachingTimelineDao(defaultTimelineDAO);
+ log.info("DBI initialized");
+
+ // Make some hosts
+ final List<String> hostNames = new ArrayList<String>(HOST_ID_COUNT);
+ for (int i = 0; i < HOST_ID_COUNT; i++) {
+ final String hostName = String.format("host-%d", i + 1);
+ hostNames.add(hostName);
+ defaultTimelineDAO.getOrAddSource(hostName, internalCallContext);
+ }
+ hosts = timelineDAO.getSources(internalCallContext);
+ hostIds = new ArrayList<Integer>(hosts.keySet());
+ Collections.sort(hostIds);
+ log.info("%d hosts created", hostIds.size());
+
+ // Make some event categories
+ final List<String> categoryNames = new ArrayList<String>(EVENT_CATEGORY_COUNT);
+ for (int i = 0; i < EVENT_CATEGORY_COUNT; i++) {
+ final String category = String.format("category-%d", i);
+ categoryNames.add(category);
+ defaultTimelineDAO.getOrAddEventCategory(category, internalCallContext);
+ }
+ eventCategories = timelineDAO.getEventCategories(internalCallContext);
+ eventCategoryIds = new ArrayList<Integer>(eventCategories.keySet());
+ Collections.sort(eventCategoryIds);
+ log.info("%d event categories created", eventCategoryIds.size());
+
+ // Make some sample kinds. For now, give each category the same number of sample kinds
+ final List<CategoryRecordIdAndMetric> categoriesAndSampleKinds = new ArrayList<CategoryRecordIdAndMetric>();
+ for (final int eventCategoryId : eventCategoryIds) {
+ for (int i = 0; i < AVERAGE_SAMPLE_KINDS_PER_CATEGORY; i++) {
+ final String sampleKind = String.format("%s-sample-kind-%d", eventCategories.get(eventCategoryId), i + 1);
+ categoriesAndSampleKinds.add(new CategoryRecordIdAndMetric(eventCategoryId, sampleKind));
+ defaultTimelineDAO.getOrAddMetric(eventCategoryId, sampleKind, internalCallContext);
+ }
+ }
+ // Make a fast map from categoryId to a list of sampleKindIds in that category
+ sampleKindsBiMap = timelineDAO.getMetrics(internalCallContext);
+ categorySampleKindIds = new HashMap<Integer, List<Integer>>();
+ int sampleKindIdCounter = 0;
+ for (final Map.Entry<Integer, CategoryRecordIdAndMetric> entry : sampleKindsBiMap.entrySet()) {
+ final int categoryId = entry.getValue().getEventCategoryId();
+ List<Integer> sampleKindIds = categorySampleKindIds.get(categoryId);
+ if (sampleKindIds == null) {
+ sampleKindIds = new ArrayList<Integer>();
+ categorySampleKindIds.put(categoryId, sampleKindIds);
+ }
+ final int sampleKindId = entry.getKey();
+ sampleKindIds.add(sampleKindId);
+ sampleKindIdCounter++;
+ }
+ log.info("%d sampleKindIds created", sampleKindIdCounter);
+ // Assign categories to hosts
+ categoriesForHostId = new HashMap<Integer, List<Integer>>();
+ int categoryCounter = 0;
+ for (final int hostId : hostIds) {
+ final List<Integer> categories = new ArrayList<Integer>();
+ categoriesForHostId.put(hostId, categories);
+ for (int i = 0; i < AVERAGE_CATEGORIES_PER_HOST; i++) {
+ final int categoryId = eventCategoryIds.get(categoryCounter);
+ categories.add(categoryId);
+ categoryCounter = (categoryCounter + 1) % EVENT_CATEGORY_COUNT;
+ }
+ }
+ log.info("Finished creating hosts, categories and sample kinds");
+ }
+
+ private void addChunkAndMaybeSave(final List<TimelineChunk> timelineChunkList, final TimelineChunk timelineChunk) {
+ timelineChunkList.add(timelineChunk);
+ if (timelineChunkList.size() >= CREATE_BATCH_SIZE) {
+ defaultTimelineDAO.bulkInsertTimelineChunks(timelineChunkList, internalCallContext);
+ timelineChunkList.clear();
+ log.info("Inserted %d TimelineChunk rows", timelineChunkIdCounter.get());
+ }
+ }
+
+ /**
+ * This method simulates adding a ton of timelines, in more-or-less the way they would be added in real life.
+ */
+ private void insertManyTimelines() throws Exception {
+ final List<TimelineChunk> timelineChunkList = new ArrayList<TimelineChunk>();
+ DateTime startTime = new DateTime().minusDays(1);
+ DateTime endTime = startTime.plusHours(1);
+ final int sampleCount = 120; // 1 hours worth
+ for (int i = 0; i < 12; i++) {
+ for (final int hostId : hostIds) {
+ for (final int categoryId : categoriesForHostId.get(hostId)) {
+ final List<DateTime> dateTimes = new ArrayList<DateTime>(sampleCount);
+ for (int sc = 0; sc < sampleCount; sc++) {
+ dateTimes.add(startTime.plusSeconds(sc * 30));
+ }
+ final byte[] timeBytes = timelineCoder.compressDateTimes(dateTimes);
+ for (final int sampleKindId : categorySampleKindIds.get(categoryId)) {
+ final TimelineChunk timelineChunk = makeTimelineChunk(hostId, sampleKindId, startTime, endTime, timeBytes, sampleCount);
+ addChunkAndMaybeSave(timelineChunkList, timelineChunk);
+
+ }
+ }
+ }
+ if (timelineChunkList.size() > 0) {
+ defaultTimelineDAO.bulkInsertTimelineChunks(timelineChunkList, internalCallContext);
+ }
+ log.info("After hour %d, inserted %d TimelineChunk rows", i, timelineChunkIdCounter.get());
+ startTime = endTime;
+ endTime = endTime.plusHours(1);
+ }
+ }
+
+ private TimelineChunk makeTimelineChunk(final int hostId, final int sampleKindId, final DateTime startTime, final DateTime endTime, final byte[] timeBytes, final int sampleCount) {
+ final byte[] samples = new byte[3 + rand.nextInt(sampleCount) * 2];
+ return new TimelineChunk(timelineChunkIdCounter.incrementAndGet(), hostId, sampleKindId, startTime, endTime, timeBytes, samples, sampleCount);
+ }
+
+ public static void main(final String[] args) throws Exception {
+ final TimelineLoadGenerator loadGenerator = new TimelineLoadGenerator();
+ loadGenerator.insertManyTimelines();
+ }
+}