killbill-aplcache

meter: fix DAO layer Add first passing test (TestDefaultTimelineDao). Signed-off-by:

11/30/2012 9:23:09 PM

Details

meter/pom.xml 4(+4 -0)

diff --git a/meter/pom.xml b/meter/pom.xml
index e25a51e..51a7f94 100644
--- a/meter/pom.xml
+++ b/meter/pom.xml
@@ -25,6 +25,10 @@
             <artifactId>jdbi</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.jolbox</groupId>
+            <artifactId>bonecp</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
         </dependency>
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/categories/CategoryIdAndMetricBinder.java b/meter/src/main/java/com/ning/billing/meter/timeline/categories/CategoryIdAndMetricBinder.java
index 1aca5ce..75f757e 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/categories/CategoryIdAndMetricBinder.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/categories/CategoryIdAndMetricBinder.java
@@ -37,10 +37,10 @@ public @interface CategoryIdAndMetricBinder {
     public static class CategoryIdAndMetricBinderFactory implements BinderFactory {
 
         public Binder build(final Annotation annotation) {
-            return new Binder<CategoryIdAndMetricBinder, CategoryIdAndMetric>() {
-                public void bind(final SQLStatement query, final CategoryIdAndMetricBinder binder, final CategoryIdAndMetric categoryAndKind) {
-                    query.bind("eventCategoryId", categoryAndKind.getEventCategoryId())
-                         .bind("metric", categoryAndKind.getMetric());
+            return new Binder<CategoryIdAndMetricBinder, CategoryRecordIdAndMetric>() {
+                public void bind(final SQLStatement query, final CategoryIdAndMetricBinder binder, final CategoryRecordIdAndMetric categoryRecordAndKind) {
+                    query.bind("eventCategoryId", categoryRecordAndKind.getEventCategoryId())
+                         .bind("metric", categoryRecordAndKind.getMetric());
                 }
             };
         }
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/categories/CategoryIdAndMetricMapper.java b/meter/src/main/java/com/ning/billing/meter/timeline/categories/CategoryIdAndMetricMapper.java
index b6589ad..3436ba8 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/categories/CategoryIdAndMetricMapper.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/categories/CategoryIdAndMetricMapper.java
@@ -22,12 +22,12 @@ import java.sql.SQLException;
 import org.skife.jdbi.v2.StatementContext;
 import org.skife.jdbi.v2.tweak.ResultSetMapper;
 
-public class CategoryIdAndMetricMapper implements ResultSetMapper<CategoryIdAndMetric> {
+public class CategoryIdAndMetricMapper implements ResultSetMapper<CategoryRecordIdAndMetric> {
 
     @Override
-    public CategoryIdAndMetric map(final int index, final ResultSet rs, final StatementContext ctx) throws SQLException {
-        final int eventCategoryId = rs.getInt("event_category_id");
-        final String metric = rs.getString("sample_kind");
-        return new CategoryIdAndMetric(eventCategoryId, metric);
+    public CategoryRecordIdAndMetric map(final int index, final ResultSet rs, final StatementContext ctx) throws SQLException {
+        final int categoryRecordId = rs.getInt("category_record_id");
+        final String metric = rs.getString("metric");
+        return new CategoryRecordIdAndMetric(categoryRecordId, metric);
     }
 }
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/chunks/TimelineChunkBinder.java b/meter/src/main/java/com/ning/billing/meter/timeline/chunks/TimelineChunkBinder.java
index 0a7678e..408e2e3 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/chunks/TimelineChunkBinder.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/chunks/TimelineChunkBinder.java
@@ -49,8 +49,8 @@ public @interface TimelineChunkBinder {
         public Binder build(final Annotation annotation) {
             return new Binder<TimelineChunkBinder, TimelineChunk>() {
                 public void bind(final SQLStatement query, final TimelineChunkBinder binder, final TimelineChunk timelineChunk) {
-                    query.bind("sourceId", timelineChunk.getSourceId())
-                         .bind("metricId", timelineChunk.getMetricId())
+                    query.bind("sourceRecordId", timelineChunk.getSourceId())
+                         .bind("metricRecordId", timelineChunk.getMetricId())
                          .bind("sampleCount", timelineChunk.getSampleCount())
                          .bind("startTime", DateTimeUtils.unixSeconds(timelineChunk.getStartTime()))
                          .bind("endTime", DateTimeUtils.unixSeconds(timelineChunk.getEndTime()))
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/chunks/TimelineChunkMapper.java b/meter/src/main/java/com/ning/billing/meter/timeline/chunks/TimelineChunkMapper.java
index dfa45d5..15430f7 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/chunks/TimelineChunkMapper.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/chunks/TimelineChunkMapper.java
@@ -34,9 +34,9 @@ public class TimelineChunkMapper implements ResultSetMapper<TimelineChunk> {
 
     @Override
     public TimelineChunk map(final int index, final ResultSet rs, final StatementContext ctx) throws SQLException {
-        final int chunkId = rs.getInt("chunk_id");
-        final int sourceId = rs.getInt("source_id");
-        final int metricId = rs.getInt("sample_kind_id");
+        final int chunkId = rs.getInt("record_id");
+        final int sourceId = rs.getInt("source_record_id");
+        final int metricId = rs.getInt("metric_record_id");
         final int sampleCount = rs.getInt("sample_count");
         final DateTime startTime = DateTimeUtils.dateTimeFromUnixSeconds(rs.getInt("start_time"));
         final DateTime endTime = DateTimeUtils.dateTimeFromUnixSeconds(rs.getInt("end_time"));
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java
index fee7ff4..158cfaf 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/CachingTimelineDao.java
@@ -30,11 +30,11 @@ import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.billing.meter.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
 import com.ning.billing.meter.timeline.shutdown.StartTimes;
-import com.ning.billing.meter.timeline.sources.SourceIdAndMetricId;
+import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 
@@ -47,7 +47,7 @@ public class CachingTimelineDao implements TimelineDao {
 
     private final BiMap<Integer, String> sourcesCache;
     private final Map<Integer, Set<Integer>> sourceIdsMetricIdsCache;
-    private final BiMap<Integer, CategoryIdAndMetric> metricsCache;
+    private final BiMap<Integer, CategoryRecordIdAndMetric> metricsCache;
     private final BiMap<Integer, String> eventCategoriesCache;
 
     private final TimelineDao delegate;
@@ -60,7 +60,7 @@ public class CachingTimelineDao implements TimelineDao {
         metricsCache = delegate.getMetrics(context);
         eventCategoriesCache = delegate.getEventCategories(context);
         sourceIdsMetricIdsCache = new HashMap<Integer, Set<Integer>>();
-        for (final SourceIdAndMetricId both : delegate.getMetricIdsForAllSources(context)) {
+        for (final SourceRecordIdAndMetricRecordId both : delegate.getMetricIdsForAllSources(context)) {
             final int sourceId = both.getSourceId();
             final int metricId = both.getMetricId();
             Set<Integer> metricIds = sourceIdsMetricIdsCache.get(sourceId);
@@ -125,26 +125,26 @@ public class CachingTimelineDao implements TimelineDao {
 
     @Override
     public Integer getMetricId(final int eventCategoryId, final String metric, final InternalTenantContext context) throws UnableToObtainConnectionException {
-        return metricsCache.inverse().get(new CategoryIdAndMetric(eventCategoryId, metric));
+        return metricsCache.inverse().get(new CategoryRecordIdAndMetric(eventCategoryId, metric));
     }
 
     @Override
-    public CategoryIdAndMetric getCategoryIdAndMetric(final Integer metricId, final InternalTenantContext context) throws UnableToObtainConnectionException {
+    public CategoryRecordIdAndMetric getCategoryIdAndMetric(final Integer metricId, final InternalTenantContext context) throws UnableToObtainConnectionException {
         return metricsCache.get(metricId);
     }
 
     @Override
-    public BiMap<Integer, CategoryIdAndMetric> getMetrics(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+    public BiMap<Integer, CategoryRecordIdAndMetric> getMetrics(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         return delegate.getMetrics(context);
     }
 
     @Override
     public synchronized int getOrAddMetric(final Integer sourceId, final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        final CategoryIdAndMetric categoryIdAndMetric = new CategoryIdAndMetric(eventCategoryId, metric);
-        Integer metricId = metricsCache.inverse().get(categoryIdAndMetric);
+        final CategoryRecordIdAndMetric categoryRecordIdAndMetric = new CategoryRecordIdAndMetric(eventCategoryId, metric);
+        Integer metricId = metricsCache.inverse().get(categoryRecordIdAndMetric);
         if (metricId == null) {
             metricId = delegate.getOrAddMetric(sourceId, eventCategoryId, metric, context);
-            metricsCache.put(metricId, categoryIdAndMetric);
+            metricsCache.put(metricId, categoryRecordIdAndMetric);
         }
         if (sourceId != null) {
             Set<Integer> metricIds = sourceIdsMetricIdsCache.get(sourceId);
@@ -163,7 +163,7 @@ public class CachingTimelineDao implements TimelineDao {
     }
 
     @Override
-    public Iterable<SourceIdAndMetricId> getMetricIdsForAllSources(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+    public Iterable<SourceRecordIdAndMetricRecordId> getMetricIdsForAllSources(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         return delegate.getMetricIdsForAllSources(context);
     }
 
@@ -195,21 +195,6 @@ public class CachingTimelineDao implements TimelineDao {
     }
 
     @Override
-    public void bulkInsertEventCategories(final List<String> categoryNames, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        delegate.bulkInsertEventCategories(categoryNames, context);
-    }
-
-    @Override
-    public void bulkInsertSources(final List<String> sources, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        delegate.bulkInsertSources(sources, context);
-    }
-
-    @Override
-    public void bulkInsertMetrics(final List<CategoryIdAndMetric> categoryAndKinds, final InternalCallContext context) {
-        delegate.bulkInsertMetrics(categoryAndKinds, context);
-    }
-
-    @Override
     public void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList, final InternalCallContext context) {
         delegate.bulkInsertTimelineChunks(timelineChunkList, context);
     }
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
index 3b0c337..c607a8f 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
@@ -33,12 +33,12 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.billing.meter.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.chunks.TimelineChunkMapper;
 import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
 import com.ning.billing.meter.timeline.shutdown.StartTimes;
-import com.ning.billing.meter.timeline.sources.SourceIdAndMetricId;
+import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
 import com.ning.billing.meter.timeline.util.DateTimeUtils;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
@@ -66,19 +66,19 @@ public class DefaultTimelineDao implements TimelineDao {
 
     @Override
     public String getSource(final Integer sourceId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getSource(sourceId, context);
+        return delegate.getSourceName(sourceId, context);
     }
 
     @Override
     public Integer getSourceId(final String source, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getSourceId(source, context);
+        return delegate.getSourceRecordId(source, context);
     }
 
     @Override
     public BiMap<Integer, String> getSources(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         final HashBiMap<Integer, String> accumulator = HashBiMap.create();
         for (final Map<String, Object> metric : delegate.getSources(context)) {
-            accumulator.put(Integer.valueOf(metric.get("source_id").toString()), metric.get("source_name").toString());
+            accumulator.put(Integer.valueOf(metric.get("record_id").toString()), metric.get("source").toString());
         }
         return accumulator;
     }
@@ -87,7 +87,7 @@ public class DefaultTimelineDao implements TimelineDao {
     public synchronized int getOrAddSource(final String source, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         delegate.begin();
         delegate.addSource(source, context);
-        final Integer sourceId = delegate.getSourceId(source, context);
+        final Integer sourceId = delegate.getSourceRecordId(source, context);
         delegate.commit();
 
         return sourceId;
@@ -95,19 +95,19 @@ public class DefaultTimelineDao implements TimelineDao {
 
     @Override
     public Integer getEventCategoryId(final String eventCategory, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getEventCategoryId(eventCategory, context);
+        return delegate.getCategoryRecordId(eventCategory, context);
     }
 
     @Override
     public String getEventCategory(final Integer eventCategoryId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getEventCategory(eventCategoryId, context);
+        return delegate.getCategory(eventCategoryId, context);
     }
 
     @Override
     public BiMap<Integer, String> getEventCategories(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         final HashBiMap<Integer, String> accumulator = HashBiMap.create();
-        for (final Map<String, Object> eventCategory : delegate.getEventCategories(context)) {
-            accumulator.put(Integer.valueOf(eventCategory.get("event_category_id").toString()), eventCategory.get("event_category").toString());
+        for (final Map<String, Object> eventCategory : delegate.getCategories(context)) {
+            accumulator.put(Integer.valueOf(eventCategory.get("record_id").toString()), eventCategory.get("category").toString());
         }
         return accumulator;
     }
@@ -115,8 +115,8 @@ public class DefaultTimelineDao implements TimelineDao {
     @Override
     public synchronized int getOrAddEventCategory(final String eventCategory, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         delegate.begin();
-        delegate.addEventCategory(eventCategory, context);
-        final Integer eventCategoryId = delegate.getEventCategoryId(eventCategory, context);
+        delegate.addCategory(eventCategory, context);
+        final Integer eventCategoryId = delegate.getCategoryRecordId(eventCategory, context);
         delegate.commit();
 
         return eventCategoryId;
@@ -124,20 +124,20 @@ public class DefaultTimelineDao implements TimelineDao {
 
     @Override
     public Integer getMetricId(final int eventCategoryId, final String metric, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getMetricId(eventCategoryId, metric, context);
+        return delegate.getMetricRecordId(eventCategoryId, metric, context);
     }
 
     @Override
-    public CategoryIdAndMetric getCategoryIdAndMetric(final Integer metricId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getEventCategoryIdAndMetric(metricId, context);
+    public CategoryRecordIdAndMetric getCategoryIdAndMetric(final Integer metricId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getCategoryRecordIdAndMetric(metricId, context);
     }
 
     @Override
-    public BiMap<Integer, CategoryIdAndMetric> getMetrics(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        final HashBiMap<Integer, CategoryIdAndMetric> accumulator = HashBiMap.create();
+    public BiMap<Integer, CategoryRecordIdAndMetric> getMetrics(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        final HashBiMap<Integer, CategoryRecordIdAndMetric> accumulator = HashBiMap.create();
         for (final Map<String, Object> metricInfo : delegate.getMetrics(context)) {
-            accumulator.put(Integer.valueOf(metricInfo.get("sample_kind_id").toString()),
-                            new CategoryIdAndMetric((Integer) metricInfo.get("event_category_id"), metricInfo.get("sample_kind").toString()));
+            accumulator.put(Integer.valueOf(metricInfo.get("record_id").toString()),
+                            new CategoryRecordIdAndMetric((Integer) metricInfo.get("category_record_id"), metricInfo.get("metric").toString()));
         }
         return accumulator;
     }
@@ -146,7 +146,7 @@ public class DefaultTimelineDao implements TimelineDao {
     public synchronized int getOrAddMetric(final Integer sourceId, final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         delegate.begin();
         delegate.addMetric(eventCategoryId, metric, context);
-        final Integer metricId = delegate.getMetricId(eventCategoryId, metric, context);
+        final Integer metricId = delegate.getMetricRecordId(eventCategoryId, metric, context);
         delegate.commit();
 
         return metricId;
@@ -154,19 +154,19 @@ public class DefaultTimelineDao implements TimelineDao {
 
     @Override
     public Iterable<Integer> getMetricIdsBySourceId(final Integer sourceId, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getMetricIdsBySourceId(sourceId, context);
+        return delegate.getMetricRecordIdsBySourceRecordId(sourceId, context);
     }
 
     @Override
-    public Iterable<SourceIdAndMetricId> getMetricIdsForAllSources(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        return delegate.getMetricIdsForAllSources(context);
+    public Iterable<SourceRecordIdAndMetricRecordId> getMetricIdsForAllSources(final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getMetricRecordIdsForAllSources(context);
     }
 
     @Override
     public Long insertTimelineChunk(final TimelineChunk timelineChunk, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         delegate.begin();
         delegate.insertTimelineChunk(timelineChunk, context);
-        final long timelineChunkId = delegate.getLastInsertedId(context);
+        final long timelineChunkId = delegate.getLastInsertedRecordId(context);
         delegate.commit();
         return timelineChunkId;
     }
@@ -186,9 +186,10 @@ public class DefaultTimelineDao implements TimelineDao {
                 ResultIterator<TimelineChunk> iterator = null;
                 try {
                     final Query<Map<String, Object>> query = handle
-                            .createQuery("getSamplesBySourceIdsAndMetricIds")
+                            .createQuery("getSamplesBySourceRecordIdsAndMetricRecordIds")
                             .bind("startTime", DateTimeUtils.unixSeconds(startTime))
                             .bind("endTime", DateTimeUtils.unixSeconds(endTime))
+                            .bind("tenantRecordId", context.getTenantRecordId())
                             .define("sourceIds", JOINER.join(sourceIdList));
 
                     if (metricIdList != null && !metricIdList.isEmpty()) {
@@ -237,21 +238,6 @@ public class DefaultTimelineDao implements TimelineDao {
     }
 
     @Override
-    public void bulkInsertSources(final List<String> sources, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        delegate.bulkInsertSources(sources.iterator(), context);
-    }
-
-    @Override
-    public void bulkInsertEventCategories(final List<String> categoryNames, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        delegate.bulkInsertEventCategories(categoryNames.iterator(), context);
-    }
-
-    @Override
-    public void bulkInsertMetrics(final List<CategoryIdAndMetric> categoryAndKinds, final InternalCallContext context) {
-        delegate.bulkInsertMetrics(categoryAndKinds.iterator(), context);
-    }
-
-    @Override
     public void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList, final InternalCallContext context) {
         delegate.bulkInsertTimelineChunks(timelineChunkList.iterator(), context);
     }
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java
index 96ded08..cca7e7f 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineDao.java
@@ -24,11 +24,11 @@ import org.joda.time.DateTime;
 import org.skife.jdbi.v2.exceptions.CallbackFailedException;
 import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
 
-import com.ning.billing.meter.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
 import com.ning.billing.meter.timeline.shutdown.StartTimes;
-import com.ning.billing.meter.timeline.sources.SourceIdAndMetricId;
+import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 
@@ -60,15 +60,15 @@ public interface TimelineDao {
 
     Integer getMetricId(int eventCategory, String metric, InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
 
-    CategoryIdAndMetric getCategoryIdAndMetric(Integer metricId, InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
+    CategoryRecordIdAndMetric getCategoryIdAndMetric(Integer metricId, InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
 
-    BiMap<Integer, CategoryIdAndMetric> getMetrics(InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
+    BiMap<Integer, CategoryRecordIdAndMetric> getMetrics(InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
 
     int getOrAddMetric(Integer sourceId, Integer eventCategoryId, String metric, InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException;
 
     Iterable<Integer> getMetricIdsBySourceId(Integer sourceId, InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
 
-    Iterable<SourceIdAndMetricId> getMetricIdsForAllSources(InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
+    Iterable<SourceRecordIdAndMetricRecordId> getMetricIdsForAllSources(InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
 
     // Timelines tables
 
@@ -87,12 +87,6 @@ public interface TimelineDao {
 
     void deleteLastStartTimes(InternalCallContext context);
 
-    void bulkInsertSources(List<String> sources, InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException;
-
-    void bulkInsertEventCategories(List<String> categoryNames, InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException;
-
-    void bulkInsertMetrics(List<CategoryIdAndMetric> categoryAndKinds, InternalCallContext context);
-
     void bulkInsertTimelineChunks(List<TimelineChunk> timelineChunkList, InternalCallContext context);
 
     void test(InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException;
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
index c302e20..c376b72 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.java
@@ -26,107 +26,87 @@ import org.skife.jdbi.v2.sqlobject.SqlBatch;
 import org.skife.jdbi.v2.sqlobject.SqlQuery;
 import org.skife.jdbi.v2.sqlobject.SqlUpdate;
 import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize;
-import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
 import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
 import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.skife.jdbi.v2.sqlobject.stringtemplate.UseStringTemplate3StatementLocator;
 
-import com.ning.billing.meter.timeline.categories.CategoryIdAndMetric;
-import com.ning.billing.meter.timeline.categories.CategoryIdAndMetricBinder;
 import com.ning.billing.meter.timeline.categories.CategoryIdAndMetricMapper;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
 import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.chunks.TimelineChunkBinder;
 import com.ning.billing.meter.timeline.shutdown.StartTimes;
 import com.ning.billing.meter.timeline.shutdown.StartTimesBinder;
 import com.ning.billing.meter.timeline.shutdown.StartTimesMapper;
-import com.ning.billing.meter.timeline.sources.SourceIdAndMetricId;
 import com.ning.billing.meter.timeline.sources.SourceIdAndMetricIdMapper;
+import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.callcontext.InternalTenantContextBinder;
 
 @UseStringTemplate3StatementLocator()
-@RegisterMapper({CategoryIdAndMetricMapper.class, StartTimesMapper.class, SourceIdAndMetricIdMapper.class})
-public interface TimelineSqlDao extends Transactional<TimelineSqlDao>, Transmogrifier {
+@RegisterMapper({CategoryIdAndMetricMapper.class, StartTimesMapper.class, SourceIdAndMetricIdMapper.class, DefaultMapper.class})
+public interface TimelineSqlDao extends Transactional<TimelineSqlDao> {
 
     @SqlQuery
-    Integer getSourceId(@Bind("sourceName") final String source,
-                        @InternalTenantContextBinder final InternalTenantContext context);
+    Integer getSourceRecordId(@Bind("source") final String source,
+                              @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
-    String getSource(@Bind("sourceId") final Integer sourceId,
-                     @InternalTenantContextBinder final InternalTenantContext context);
+    String getSourceName(@Bind("recordId") final Integer sourceRecordId,
+                         @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
-    @Mapper(DefaultMapper.class)
     List<Map<String, Object>> getSources(@InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlUpdate
-    void addSource(@Bind("sourceName") final String source,
+    void addSource(@Bind("source") final String source,
                    @InternalTenantContextBinder final InternalCallContext context);
 
-    @SqlBatch
-    @BatchChunkSize(1000)
-    void bulkInsertSources(@Bind("sourceName") Iterator<String> sourcesIterator,
-                           @InternalTenantContextBinder final InternalCallContext context);
+    @SqlQuery
+    Integer getCategoryRecordId(@Bind("category") final String category,
+                                @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
-    Integer getEventCategoryId(@Bind("eventCategory") final String eventCategory,
-                               @InternalTenantContextBinder final InternalTenantContext context);
+    String getCategory(@Bind("recordId") final Integer categoryRecordId,
+                       @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
-    String getEventCategory(@Bind("eventCategoryId") final Integer eventCategoryId,
-                            @InternalTenantContextBinder final InternalTenantContext context);
+    List<Map<String, Object>> getCategories(@InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlUpdate
-    void addEventCategory(@Bind("eventCategory") final String eventCategory,
-                          @InternalTenantContextBinder final InternalCallContext context);
-
-    @SqlBatch
-    @BatchChunkSize(1000)
-    void bulkInsertEventCategories(@Bind("eventCategory") Iterator<String> categoryNames,
-                                   @InternalTenantContextBinder final InternalCallContext context);
+    void addCategory(@Bind("category") final String category,
+                     @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlQuery
-    Iterable<Integer> getMetricIdsBySourceId(@Bind("sourceId") final Integer sourceId,
-                                             @InternalTenantContextBinder final InternalTenantContext context);
+    Integer getMetricRecordId(@Bind("categoryRecordId") final int categoryRecordId,
+                              @Bind("metric") final String metric,
+                              @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
-    Iterable<SourceIdAndMetricId> getMetricIdsForAllSources(@InternalTenantContextBinder final InternalTenantContext context);
+    Iterable<Integer> getMetricRecordIdsBySourceRecordId(@Bind("sourceRecordId") final Integer sourceRecordId,
+                                                         @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
-    Integer getMetricId(@Bind("eventCategoryId") final int eventCategoryId,
-                        @Bind("metric") final String metric,
-                        @InternalTenantContextBinder final InternalTenantContext context);
+    Iterable<SourceRecordIdAndMetricRecordId> getMetricRecordIdsForAllSources(@InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
-    CategoryIdAndMetric getEventCategoryIdAndMetric(@Bind("metricId") final Integer metricId,
-                                                    @InternalTenantContextBinder final InternalTenantContext context);
-
-    @SqlUpdate
-    void addMetric(@Bind("eventCategoryId") final int eventCategoryId,
-                   @Bind("metric") final String metric,
-                   @InternalTenantContextBinder final InternalCallContext context);
-
-    @SqlBatch
-    @BatchChunkSize(1000)
-    void bulkInsertMetrics(@CategoryIdAndMetricBinder Iterator<CategoryIdAndMetric> categoriesAndMetrics,
-                           @InternalTenantContextBinder final InternalCallContext context);
+    CategoryRecordIdAndMetric getCategoryRecordIdAndMetric(@Bind("recordId") final Integer metricRecordId,
+                                                           @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
-    @Mapper(DefaultMapper.class)
-    List<Map<String, Object>> getEventCategories(@InternalTenantContextBinder final InternalTenantContext context);
+    String getMetric(@Bind("recordId") final Integer metricRecordId,
+                     @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
-    @Mapper(DefaultMapper.class)
     List<Map<String, Object>> getMetrics(@InternalTenantContextBinder final InternalTenantContext context);
 
-    @SqlQuery
-    int getLastInsertedId(@InternalTenantContextBinder final InternalTenantContext context);
+    @SqlUpdate
+    void addMetric(@Bind("categoryRecordId") final int categoryRecordId,
+                   @Bind("metric") final String metric,
+                   @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlQuery
-    long getHighestTimelineChunkId(@InternalTenantContextBinder final InternalTenantContext context);
+    int getLastInsertedRecordId(@InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlUpdate
     void insertTimelineChunk(@TimelineChunkBinder final TimelineChunk timelineChunk,
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/sources/SourceIdAndMetricIdMapper.java b/meter/src/main/java/com/ning/billing/meter/timeline/sources/SourceIdAndMetricIdMapper.java
index 78586ac..92f5146 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/sources/SourceIdAndMetricIdMapper.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/sources/SourceIdAndMetricIdMapper.java
@@ -22,10 +22,10 @@ import java.sql.SQLException;
 import org.skife.jdbi.v2.StatementContext;
 import org.skife.jdbi.v2.tweak.ResultSetMapper;
 
-public class SourceIdAndMetricIdMapper implements ResultSetMapper<SourceIdAndMetricId> {
+public class SourceIdAndMetricIdMapper implements ResultSetMapper<SourceRecordIdAndMetricRecordId> {
 
     @Override
-    public SourceIdAndMetricId map(final int index, final ResultSet rs, final StatementContext ctx) throws SQLException {
-        return new SourceIdAndMetricId(rs.getInt("source_id"), rs.getInt("sample_kind_id"));
+    public SourceRecordIdAndMetricRecordId map(final int index, final ResultSet rs, final StatementContext ctx) throws SQLException {
+        return new SourceRecordIdAndMetricRecordId(rs.getInt("source_record_id"), rs.getInt("metric_record_id"));
     }
 }
diff --git a/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg b/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
index fbd2a44..e782aa1 100644
--- a/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
+++ b/meter/src/main/resources/com/ning/billing/meter/timeline/persistent/TimelineSqlDao.sql.stg
@@ -3,195 +3,269 @@ group TimelineSqlDao;
 CHECK_TENANT() ::= "tenant_record_id = :tenantRecordId"
 AND_CHECK_TENANT() ::= "AND <CHECK_TENANT()>"
 
-getSource() ::= <<
-  select
-    source_name
-  from sources
-  where source_id = :sourceId
-  <AND_CHECK_TENANT()>
-  ;
+getSourceRecordId() ::= <<
+select
+  record_id
+from sources
+where source = :source
+<AND_CHECK_TENANT()>
+;
+>>
+
+getSourceName() ::= <<
+select
+  source
+from sources
+where record_id = :recordId
+<AND_CHECK_TENANT()>
+;
 >>
 
 getSources() ::= <<
-  select
-    source_id
-  , source_name
-  from sources
-  where <CHECK_TENANT()>
-  ;
+select
+  record_id
+, source
+from sources
+where <CHECK_TENANT()>
+;
 >>
 
 addSource() ::= <<
-  insert ignore into sources (source_name, created_dt, account_record_id, tenant_record_id)
-  values (:sourceName, unix_timestamp(), :accountRecordId, :tenantRecordId);
+insert into sources (
+  source
+, created_date
+, created_by
+, updated_date
+, updated_by
+, account_record_id
+, tenant_record_id
+) values (
+  :source
+, :createdDate
+, :userName
+, :updatedDate
+, :userName
+, :accountRecordId
+, :tenantRecordId
+);
 >>
 
-getEventCategories() ::= <<
-  select event_category_id, event_category
-  from event_categories
-  where <CHECK_TENANT()>
-  order by event_category_id asc
-  ;
->>
 
-getEventCategoryId() ::= <<
-  select
-    event_category_id
-  from event_categories
-  where event_category = :eventCategory
-  <AND_CHECK_TENANT()>
-  ;
+getCategoryRecordId() ::= <<
+select
+  category_id
+from categories
+where category = :category
+<AND_CHECK_TENANT()>
+;
 >>
 
-getEventCategory() ::= <<
-  select
-    event_category
-  from event_categories
-  where event_category_id = :eventCategoryId
-  <AND_CHECK_TENANT()>
-  ;
+getCategory() ::= <<
+select
+  category
+from categories
+where record_id = :recordId
+<AND_CHECK_TENANT()>
+;
 >>
 
-addEventCategory() ::= <<
-  insert ignore into event_categories (event_category, account_record_id, tenant_record_id)
-  values (:eventCategory, :accountRecordId, :tenantRecordId);
+getCategories() ::= <<
+select
+  record_id
+, category
+from categories
+where <CHECK_TENANT()>
+;
 >>
 
-getMetricId() ::= <<
-  select
-    metric_id
-  from metrics
-  where unitType = :unitType
-    and event_category_id = :eventCategoryId
-  <AND_CHECK_TENANT()>
-  ;
+addCategory() ::= <<
+insert into categories (
+  category
+, created_date
+, created_by
+, updated_date
+, updated_by
+, tenant_record_id
+) values (
+  :category
+, :createdDate
+, :userName
+, :updatedDate
+, :userName
+, :tenantRecordId
+);
 >>
 
-getEventCategoryIdAndMetric() ::= <<
-  select
-    event_category_id
-  , unitType
-  from metrics
-  where metric_id = :metricId
-  <AND_CHECK_TENANT()>
-  ;
->>
 
-getMetric() ::= <<
-  select
-    unitType
-  from metrics
-  where metric_id = :metricId
-  <AND_CHECK_TENANT()>
-  ;
+getMetricRecordId() ::= <<
+select
+  record_id
+from metrics
+where metric = :metric
+and category_record_id = :categoryRecordId
+<AND_CHECK_TENANT()>
+;
 >>
 
-addMetric() ::= <<
-  insert ignore into metrics (event_category_id, unitType, account_record_id, tenant_record_id)
-  values (:eventCategoryId, :unitType, :accountRecordId, :tenantRecordId);
+getMetricRecordIdsBySourceRecordId() ::= <<
+select distinct
+  metric_record_id
+from timeline_chunks c
+where source_record_id = :sourceRecordId
+<AND_CHECK_TENANT()>
+;
 >>
 
-getMetricIdsBySourceId() ::= <<
-  select distinct metric_id
-  from timeline_chunks c
-  where source_id = :sourceId
-  <AND_CHECK_TENANT()>
-  ;
+getMetricRecordIdsForAllSources() ::= <<
+select distinct
+  metric_record_id
+, source_record_id
+from timeline_chunks c
+where <CHECK_TENANT()>
+;
 >>
 
-getMetricIdsForAllSources() ::= <<
-  select distinct metric_id, source_id
-  from timeline_chunks c
-  where <CHECK_TENANT()>
-  ;
+getCategoryRecordIdAndMetric() ::= <<
+select
+  category_record_id
+, metric
+from metrics
+where record_id = :recordId
+<AND_CHECK_TENANT()>
+;
 >>
 
-getMetrics() ::= <<
-  select
-    metric_id
-  , event_category_id
-  , unitType
-  from metrics
-  <AND_CHECK_TENANT()>
-  ;
->>
-
-getLastInsertedId() ::= <<
-  select last_insert_id();
+getMetric() ::= <<
+select
+  metric
+from metrics
+where record_id = :recordId
+<AND_CHECK_TENANT()>
+;
 >>
 
-insertTimelineChunk() ::= <<
-  insert into timeline_chunks (record_id, source_id, metric_id, sample_count, start_time, end_time, in_row_samples, blob_samples, aggregation_level, not_valid, dont_aggregate, account_record_id, tenant_record_id)
-  values (:chunkId, :sourceId, :metricId, :sampleCount, :startTime, :endTime, :inRowSamples, :blobSamples, :aggregationLevel, :notValid, :dontAggregate, :accountRecordId, :tenantRecordId);
->>
-
-getSamplesBySourceIdsAndMetricIds(sourceIds, metricIds) ::= <<
-  select
-    source_id
-  , metric_id
-  , record_id
-  , sample_count
-  , in_row_samples
-  , blob_samples
-  , start_time
-  , end_time
-  , aggregation_level
-  , not_valid
-  , dont_aggregate
-  from timeline_chunks
-  where end_time >= :startTime
-  and start_time \<= :endTime
-  and source_id in (<sourceIds>)
-  <if(metricIds)>
-    and metric_id in (<metricIds>)
-  <endif>
-  and not_valid = 0
-  <AND_CHECK_TENANT()>
-  order by source_id, metric_id, start_time asc
-  ;
+getMetrics() ::= <<
+select
+  record_id
+, category_record_id
+, metric
+from metrics
+where <CHECK_TENANT()>
+;
 >>
 
-insertLastStartTimes() ::= <<
-  insert into last_start_times (time_inserted, start_times, account_record_id, tenant_record_id)
-                        values (:timeInserted, :startTimes, :accountRecordId, :tenantRecordId)
+addMetric() ::= <<
+insert into metrics (
+  category_record_id
+, metric
+, created_date
+, created_by
+, updated_date
+, updated_by
+, tenant_record_id
+) values (
+  :categoryRecordId
+, :metric
+, :createdDate
+, :userName
+, :updatedDate
+, :userName
+, :tenantRecordId
+);
 >>
 
-getLastStartTimes() ::= <<
-  select time_inserted, start_times
-  from last_start_times
-  where <CHECK_TENANT()>
-  order by time_inserted desc
-  limit 1
->>
 
-deleteLastStartTimes() ::= <<
-  delete from last_start_times where <CHECK_TENANT()>
+getLastInsertedRecordId() ::= <<
+select last_insert_id();
 >>
 
-bulkInsertSources() ::= <<
-  insert into sources (source_name, created_dt, account_record_id, tenant_record_id)
-  values (:sourceName, unix_timestamp(), :accountRecordId, :tenantRecordId);
+insertTimelineChunk() ::= <<
+insert into timeline_chunks (
+  record_id
+, source_record_id
+, metric_record_id
+, sample_count
+, start_time
+, end_time
+, in_row_samples
+, blob_samples
+, aggregation_level
+, not_valid
+, dont_aggregate
+, account_record_id
+, tenant_record_id
+) values (
+  :chunkId
+, :sourceRecordId
+, :metricRecordId
+, :sampleCount
+, :startTime
+, :endTime
+, :inRowSamples
+, :blobSamples
+, :aggregationLevel
+, :notValid
+, :dontAggregate
+, :accountRecordId
+, :tenantRecordId
+);
 >>
+bulkInsertTimelineChunks() ::= "<insertTimelineChunk()>"
 
-bulkInsertEventCategories() ::= <<
-  insert into event_categories (event_category, account_record_id, tenant_record_id)
-  values (:eventCategory, :accountRecordId, :tenantRecordId);
+getSamplesBySourceRecordIdsAndMetricRecordIds(sourceIds, metricIds) ::= <<
+select
+  record_id
+, metric_record_id
+, source_record_id
+, sample_count
+, in_row_samples
+, blob_samples
+, start_time
+, end_time
+, aggregation_level
+, not_valid
+, dont_aggregate
+from timeline_chunks
+where end_time >= :startTime
+and start_time \<= :endTime
+and source_record_id in (<sourceIds>)
+<if(metricIds)>
+  and metric_record_id in (<metricIds>)
+<endif>
+and not_valid = 0
+<AND_CHECK_TENANT()>
+order by source_record_id, metric_record_id, start_time asc
+;
 >>
 
-bulkInsertMetrics() ::= <<
-  insert into metrics (event_category_id, unitType, account_record_id, tenant_record_id)
-  values (:eventCategoryId, :unitType, :accountRecordId, :tenantRecordId);
+insertLastStartTimes() ::= <<
+insert into last_start_times (
+  time_inserted
+, start_times
+, account_record_id
+, tenant_record_id
+) values (
+  :timeInserted
+, :startTimes
+, :accountRecordId
+, :tenantRecordId
+);
 >>
 
-bulkInsertTimelineChunks() ::= <<
-  insert into timeline_chunks (record_id, source_id, metric_id, sample_count, start_time, end_time, not_valid, dont_aggregate, aggregation_level, in_row_samples, blob_samples, account_record_id, tenant_record_id)
-  values (:chunkId, :sourceId, :metricId, :sampleCount, :startTime, :endTime, :dontAggregate, :notValid, :aggregationLevel, :inRowSamples, :blobSamples, :accountRecordId, :tenantRecordId);
+getLastStartTimes() ::= <<
+select
+  time_inserted
+, start_times
+from last_start_times
+where <CHECK_TENANT()>
+order by time_inserted desc
+limit 1
 >>
 
-getHighestTimelineChunkId() ::= <<
-  select record_id from timeline_chunks where <CHECK_TENANT()> order by record_id desc limit 1;
+deleteLastStartTimes() ::= <<
+delete from last_start_times where <CHECK_TENANT()>;
 >>
 
 test() ::= <<
-  select 1 where <CHECK_TENANT()>;
+select 1 from timeline_chunks where <CHECK_TENANT()>;
 >>
diff --git a/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java
new file mode 100644
index 0000000..ad51041
--- /dev/null
+++ b/meter/src/test/java/com/ning/billing/meter/timeline/persistent/TestDefaultTimelineDao.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.meter.timeline.persistent;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.meter.MeterTestSuiteWithEmbeddedDB;
+import com.ning.billing.meter.timeline.categories.CategoryRecordIdAndMetric;
+import com.ning.billing.meter.timeline.chunks.TimelineChunk;
+import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableList;
+
+public class TestDefaultTimelineDao extends MeterTestSuiteWithEmbeddedDB {
+
+    private static final TimelineChunkConsumer FAIL_CONSUMER = new TimelineChunkConsumer() {
+        @Override
+        public void processTimelineChunk(final TimelineChunk chunk) {
+            Assert.fail("Shouldn't find any sample");
+        }
+    };
+
+    @Test(groups = "slow")
+    public void testGetSampleKindsByHostName() throws Exception {
+        final TimelineDao dao = new DefaultTimelineDao(getDBI());
+        final DateTime startTime = new DateTime(DateTimeZone.UTC);
+        final DateTime endTime = startTime.plusSeconds(2);
+
+        // Create the host
+        final String hostName = UUID.randomUUID().toString();
+        final Integer hostId = dao.getOrAddSource(hostName, internalCallContext);
+        Assert.assertNotNull(hostId);
+
+        // Create a timeline times (needed for the join in the dashboard query)
+        final Integer eventCategoryId = 123;
+
+        // Create the samples
+        final String sampleOne = UUID.randomUUID().toString();
+        final Integer sampleOneId = dao.getOrAddMetric(hostId, eventCategoryId, sampleOne, internalCallContext);
+        Assert.assertNotNull(sampleOneId);
+        final String sampleTwo = UUID.randomUUID().toString();
+        final Integer sampleTwoId = dao.getOrAddMetric(hostId, eventCategoryId, sampleTwo, internalCallContext);
+        Assert.assertNotNull(sampleTwoId);
+
+        // Basic retrieval tests
+        final BiMap<Integer, CategoryRecordIdAndMetric> sampleKinds = dao.getMetrics(internalCallContext);
+        Assert.assertEquals(sampleKinds.size(), 2);
+        Assert.assertEquals(sampleKinds.get(sampleOneId).getEventCategoryId(), (int) eventCategoryId);
+        Assert.assertEquals(sampleKinds.get(sampleOneId).getMetric(), sampleOne);
+        Assert.assertEquals(sampleKinds.get(sampleTwoId).getEventCategoryId(), (int) eventCategoryId);
+        Assert.assertEquals(sampleKinds.get(sampleTwoId).getMetric(), sampleTwo);
+        Assert.assertEquals(dao.getCategoryIdAndMetric(sampleOneId, internalCallContext).getEventCategoryId(), (int) eventCategoryId);
+        Assert.assertEquals(dao.getCategoryIdAndMetric(sampleOneId, internalCallContext).getMetric(), sampleOne);
+        Assert.assertEquals(dao.getCategoryIdAndMetric(sampleTwoId, internalCallContext).getEventCategoryId(), (int) eventCategoryId);
+        Assert.assertEquals(dao.getCategoryIdAndMetric(sampleTwoId, internalCallContext).getMetric(), sampleTwo);
+
+        // No samples yet
+        Assert.assertEquals(ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext)).size(), 0);
+
+        dao.insertTimelineChunk(new TimelineChunk(0, hostId, sampleOneId, startTime, endTime, new byte[0], new byte[0], 0), internalCallContext);
+        final ImmutableList<Integer> firstFetch = ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext));
+        Assert.assertEquals(firstFetch.size(), 1);
+        Assert.assertEquals(firstFetch.get(0), sampleOneId);
+
+        dao.insertTimelineChunk(new TimelineChunk(0, hostId, sampleTwoId, startTime, endTime, new byte[0], new byte[0], 0), internalCallContext);
+        final ImmutableList<Integer> secondFetch = ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext));
+        Assert.assertEquals(secondFetch.size(), 2);
+        Assert.assertTrue(secondFetch.contains(sampleOneId));
+        Assert.assertTrue(secondFetch.contains(sampleTwoId));
+
+        // Random sampleKind for random host
+        dao.insertTimelineChunk(new TimelineChunk(0, Integer.MAX_VALUE - 100, Integer.MAX_VALUE, startTime, endTime, new byte[0], new byte[0], 0), internalCallContext);
+        final ImmutableList<Integer> thirdFetch = ImmutableList.<Integer>copyOf(dao.getMetricIdsBySourceId(hostId, internalCallContext));
+        Assert.assertEquals(secondFetch.size(), 2);
+        Assert.assertTrue(thirdFetch.contains(sampleOneId));
+        Assert.assertTrue(thirdFetch.contains(sampleTwoId));
+
+        // Test dashboard query
+        final AtomicInteger chunksSeen = new AtomicInteger(0);
+        dao.getSamplesBySourceIdsAndMetricIds(ImmutableList.<Integer>of(hostId), ImmutableList.<Integer>of(sampleOneId, sampleTwoId), startTime, startTime.plusSeconds(2), new TimelineChunkConsumer() {
+            @Override
+            public void processTimelineChunk(final TimelineChunk chunk) {
+                chunksSeen.incrementAndGet();
+                Assert.assertEquals((Integer) chunk.getSourceId(), hostId);
+                Assert.assertTrue(chunk.getMetricId() == sampleOneId || chunk.getMetricId() == sampleTwoId);
+            }
+        }, internalCallContext);
+        Assert.assertEquals(chunksSeen.get(), 2);
+
+        // Dummy queries
+        dao.getSamplesBySourceIdsAndMetricIds(ImmutableList.<Integer>of(Integer.MAX_VALUE), null, startTime, startTime.plusDays(1), FAIL_CONSUMER, internalCallContext);
+        dao.getSamplesBySourceIdsAndMetricIds(ImmutableList.<Integer>of(hostId), ImmutableList.<Integer>of(Integer.MAX_VALUE), startTime, startTime.plusDays(1), FAIL_CONSUMER, internalCallContext);
+        dao.getSamplesBySourceIdsAndMetricIds(ImmutableList.<Integer>of(hostId), ImmutableList.<Integer>of(sampleOneId, sampleTwoId), startTime.plusDays(1), startTime.plusDays(2), FAIL_CONSUMER, internalCallContext);
+    }
+}