Details
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java b/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java
index 451e0ac..f39a547 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregator.java
@@ -185,15 +185,17 @@ public class TimelineAggregator {
}
private void performWrites() {
+ final InternalCallContext context = createCallContext();
+
// This is the atomic operation: bulk insert the new aggregated TimelineChunk objects, and delete
// or invalidate the ones that were aggregated. This should be very fast.
final long startWriteTime = System.currentTimeMillis();
aggregatorSqlDao.begin();
- timelineDao.bulkInsertTimelineChunks(chunksToWrite, createCallContext());
+ timelineDao.bulkInsertTimelineChunks(chunksToWrite, context);
if (config.getDeleteAggregatedChunks()) {
- aggregatorSqlDao.deleteTimelineChunks(chunkIdsToInvalidateOrDelete);
+ aggregatorSqlDao.deleteTimelineChunks(chunkIdsToInvalidateOrDelete, context);
} else {
- aggregatorSqlDao.makeTimelineChunksInvalid(chunkIdsToInvalidateOrDelete);
+ aggregatorSqlDao.makeTimelineChunksInvalid(chunkIdsToInvalidateOrDelete, context);
}
aggregatorSqlDao.commit();
msWritingDb.addAndGet(System.currentTimeMillis() - startWriteTime);
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.java
index ab6298a..d239d0a 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/aggregator/TimelineAggregatorSqlDao.java
@@ -22,21 +22,27 @@ import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
-import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.UseStringTemplate3StatementLocator;
import org.skife.jdbi.v2.unstable.BindIn;
-@ExternalizedSqlViaStringTemplate3()
+import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.callcontext.InternalTenantContextBinder;
+
+@UseStringTemplate3StatementLocator()
public interface TimelineAggregatorSqlDao extends Transactional<TimelineAggregatorSqlDao> {
@SqlQuery
int getLastInsertedId();
@SqlUpdate
- void makeTimelineChunkValid(@Bind("chunkId") final long chunkId);
+ void makeTimelineChunkValid(@Bind("chunkId") final long chunkId,
+ @InternalTenantContextBinder final InternalCallContext context);
@SqlUpdate
- void makeTimelineChunksInvalid(@BindIn("chunkIds") final List<Long> chunkIds);
+ void makeTimelineChunksInvalid(@BindIn("chunkIds") final List<Long> chunkIds,
+ @InternalTenantContextBinder final InternalCallContext context);
@SqlUpdate
- void deleteTimelineChunks(@BindIn("chunkIds") final List<Long> chunkIds);
+ void deleteTimelineChunks(@BindIn("chunkIds") final List<Long> chunkIds,
+ @InternalTenantContextBinder final InternalCallContext context);
}
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java b/meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java
index d3bd906..bb202dc 100644
--- a/meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/aggregator/TestTimelineAggregator.java
@@ -18,6 +18,7 @@ package com.ning.billing.meter.timeline.aggregator;
import java.io.IOException;
import java.util.Map;
+import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@@ -72,7 +73,9 @@ public class TestTimelineAggregator extends MeterTestSuiteWithEmbeddedDB {
@BeforeMethod(groups = "slow")
public void setUp() throws Exception {
timelineDao = new DefaultTimelineDao(getDBI());
- final MeterConfig config = new ConfigurationObjectFactory(System.getProperties()).build(MeterConfig.class);
+ final Properties properties = System.getProperties();
+ properties.put("killbill.usage.timelines.chunksToAggregate", "2,2");
+ final MeterConfig config = new ConfigurationObjectFactory(properties).build(MeterConfig.class);
aggregator = new TimelineAggregator(getDBI(), timelineDao, timelineCoder, sampleCoder, config, internalCallContextFactory);
}