killbill-aplcache

usage: add Dao and SqlDao layer Signed-off-by: Pierre-Alexandre

7/28/2012 6:25:23 PM

Details

diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/CachingTimelineDao.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/CachingTimelineDao.java
new file mode 100644
index 0000000..2ce5673
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/CachingTimelineDao.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.chunks.TimelineChunkConsumer;
+import com.ning.billing.usage.timeline.shutdown.StartTimes;
+import com.ning.billing.usage.timeline.sources.SourceIdAndMetricId;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableList;
+
+public class CachingTimelineDao implements TimelineDao {
+
+    private static final Logger log = LoggerFactory.getLogger(CachingTimelineDao.class);
+
+    private final BiMap<Integer, String> sourcesCache;
+    private final Map<Integer, Set<Integer>> sourceIdsMetricIdsCache;
+    private final BiMap<Integer, CategoryIdAndMetric> metricsCache;
+    private final BiMap<Integer, String> eventCategoriesCache;
+
+    private final TimelineDao delegate;
+
+    public CachingTimelineDao(final TimelineDao delegate) {
+        this.delegate = delegate;
+        sourcesCache = delegate.getSources();
+        metricsCache = delegate.getMetrics();
+        eventCategoriesCache = delegate.getEventCategories();
+        sourceIdsMetricIdsCache = new HashMap<Integer, Set<Integer>>();
+        for (final SourceIdAndMetricId both : delegate.getMetricIdsForAllSources()) {
+            final int sourceId = both.getSourceId();
+            final int metricId = both.getMetricId();
+            Set<Integer> metricIds = sourceIdsMetricIdsCache.get(sourceId);
+            if (metricIds == null) {
+                metricIds = new HashSet<Integer>();
+                sourceIdsMetricIdsCache.put(sourceId, metricIds);
+            }
+            metricIds.add(metricId);
+        }
+    }
+
+    @Override
+    public Integer getSourceId(final String source) throws UnableToObtainConnectionException, CallbackFailedException {
+        return sourcesCache.inverse().get(source);
+    }
+
+    @Override
+    public String getSource(final Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException {
+        return sourcesCache.get(sourceId);
+    }
+
+    @Override
+    public BiMap<Integer, String> getSources() throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getSources();
+    }
+
+    @Override
+    public synchronized int getOrAddSource(final String source) throws UnableToObtainConnectionException, CallbackFailedException {
+        Integer sourceId = sourcesCache.inverse().get(source);
+        if (sourceId == null) {
+            sourceId = delegate.getOrAddSource(source);
+            sourcesCache.put(sourceId, source);
+        }
+
+        return sourceId;
+    }
+
+    @Override
+    public Integer getEventCategoryId(final String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException {
+        return eventCategoriesCache.inverse().get(eventCategory);
+    }
+
+    @Override
+    public String getEventCategory(final Integer eventCategoryId) throws UnableToObtainConnectionException {
+        return eventCategoriesCache.get(eventCategoryId);
+    }
+
+    @Override
+    public BiMap<Integer, String> getEventCategories() throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getEventCategories();
+    }
+
+    @Override
+    public int getOrAddEventCategory(final String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException {
+        Integer eventCategoryId = eventCategoriesCache.inverse().get(eventCategory);
+        if (eventCategoryId == null) {
+            eventCategoryId = delegate.getOrAddEventCategory(eventCategory);
+            eventCategoriesCache.put(eventCategoryId, eventCategory);
+        }
+        return eventCategoryId;
+    }
+
+    @Override
+    public Integer getMetricId(final int eventCategoryId, final String metric) throws UnableToObtainConnectionException {
+        return metricsCache.inverse().get(new CategoryIdAndMetric(eventCategoryId, metric));
+    }
+
+    @Override
+    public CategoryIdAndMetric getCategoryIdAndMetric(final Integer metricId) throws UnableToObtainConnectionException {
+        return metricsCache.get(metricId);
+    }
+
+    @Override
+    public BiMap<Integer, CategoryIdAndMetric> getMetrics() throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getMetrics();
+    }
+
+    @Override
+    public synchronized int getOrAddMetric(final Integer sourceId, final Integer eventCategoryId, final String metric) throws UnableToObtainConnectionException, CallbackFailedException {
+        final CategoryIdAndMetric categoryIdAndMetric = new CategoryIdAndMetric(eventCategoryId, metric);
+        Integer metricId = metricsCache.inverse().get(categoryIdAndMetric);
+        if (metricId == null) {
+            metricId = delegate.getOrAddMetric(sourceId, eventCategoryId, metric);
+            metricsCache.put(metricId, categoryIdAndMetric);
+        }
+        if (sourceId != null) {
+            Set<Integer> metricIds = sourceIdsMetricIdsCache.get(sourceId);
+            if (metricIds == null) {
+                metricIds = new HashSet<Integer>();
+                sourceIdsMetricIdsCache.put(sourceId, metricIds);
+            }
+            metricIds.add(metricId);
+        }
+        return metricId;
+    }
+
+    @Override
+    public Iterable<Integer> getMetricIdsBySourceId(final Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException {
+        return ImmutableList.copyOf(sourceIdsMetricIdsCache.get(sourceId));
+    }
+
+    @Override
+    public Iterable<SourceIdAndMetricId> getMetricIdsForAllSources() throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getMetricIdsForAllSources();
+    }
+
+
+    @Override
+    public Long insertTimelineChunk(final TimelineChunk timelineChunk) throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.insertTimelineChunk(timelineChunk);
+    }
+
+    @Override
+    public void getSamplesBySourceIdsAndMetricIds(final List<Integer> sourceIds, @Nullable final List<Integer> metricIds,
+                                                  final DateTime startTime, final DateTime endTime, final TimelineChunkConsumer chunkConsumer) throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.getSamplesBySourceIdsAndMetricIds(sourceIds, metricIds, startTime, endTime, chunkConsumer);
+    }
+
+    @Override
+    public Integer insertLastStartTimes(final StartTimes startTimes) {
+        return delegate.insertLastStartTimes(startTimes);
+    }
+
+    @Override
+    public StartTimes getLastStartTimes() {
+        return delegate.getLastStartTimes();
+    }
+
+    @Override
+    public void deleteLastStartTimes() {
+        delegate.deleteLastStartTimes();
+    }
+
+    @Override
+    public void bulkInsertEventCategories(final List<String> categoryNames) throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.bulkInsertEventCategories(categoryNames);
+    }
+
+    @Override
+    public void bulkInsertSources(final List<String> sources) throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.bulkInsertSources(sources);
+    }
+
+    @Override
+    public void bulkInsertMetrics(final List<CategoryIdAndMetric> categoryAndKinds) {
+        delegate.bulkInsertMetrics(categoryAndKinds);
+    }
+
+    @Override
+    public void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList) {
+        delegate.bulkInsertTimelineChunks(timelineChunkList);
+    }
+
+    @Override
+    public void test() throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.test();
+    }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/DefaultTimelineDao.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/DefaultTimelineDao.java
new file mode 100644
index 0000000..77f232d
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/DefaultTimelineDao.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.ResultIterator;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.StringTemplate3StatementLocator;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.DateTimeUtils;
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.chunks.TimelineChunkConsumer;
+import com.ning.billing.usage.timeline.chunks.TimelineChunkMapper;
+import com.ning.billing.usage.timeline.shutdown.StartTimes;
+import com.ning.billing.usage.timeline.sources.SourceIdAndMetricId;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.inject.Inject;
+
+public class DefaultTimelineDao implements TimelineDao {
+
+    private static final Logger log = LoggerFactory.getLogger(DefaultTimelineDao.class);
+    private static final Joiner JOINER = Joiner.on(",");
+
+    private final IDBI dbi;
+    private final TimelineChunkMapper timelineChunkMapper;
+    private final TimelineSqlDao delegate;
+
+    @Inject
+    public DefaultTimelineDao(final IDBI dbi) {
+        this.dbi = dbi;
+        this.timelineChunkMapper = new TimelineChunkMapper();
+        this.delegate = dbi.onDemand(TimelineSqlDao.class);
+    }
+
+    @Override
+    public String getSource(final Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getSource(sourceId);
+    }
+
+    @Override
+    public Integer getSourceId(final String source) throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getSourceId(source);
+    }
+
+    @Override
+    public BiMap<Integer, String> getSources() throws UnableToObtainConnectionException, CallbackFailedException {
+        final HashBiMap<Integer, String> accumulator = HashBiMap.create();
+        for (final Map<String, Object> metric : delegate.getSources()) {
+            accumulator.put(Integer.valueOf(metric.get("source_id").toString()), metric.get("source_name").toString());
+        }
+        return accumulator;
+    }
+
+    @Override
+    public synchronized int getOrAddSource(final String source) throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.begin();
+        delegate.addSource(source);
+        final Integer sourceId = delegate.getSourceId(source);
+        delegate.commit();
+
+        return sourceId;
+    }
+
+    @Override
+    public Integer getEventCategoryId(final String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getEventCategoryId(eventCategory);
+    }
+
+    @Override
+    public String getEventCategory(final Integer eventCategoryId) throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getEventCategory(eventCategoryId);
+    }
+
+    @Override
+    public BiMap<Integer, String> getEventCategories() throws UnableToObtainConnectionException, CallbackFailedException {
+        final HashBiMap<Integer, String> accumulator = HashBiMap.create();
+        for (final Map<String, Object> eventCategory : delegate.getEventCategories()) {
+            accumulator.put(Integer.valueOf(eventCategory.get("event_category_id").toString()), eventCategory.get("event_category").toString());
+        }
+        return accumulator;
+    }
+
+    @Override
+    public synchronized int getOrAddEventCategory(final String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.begin();
+        delegate.addEventCategory(eventCategory);
+        final Integer eventCategoryId = delegate.getEventCategoryId(eventCategory);
+        delegate.commit();
+
+        return eventCategoryId;
+    }
+
+    @Override
+    public Integer getMetricId(final int eventCategoryId, final String metric) throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getMetricId(eventCategoryId, metric);
+    }
+
+    @Override
+    public CategoryIdAndMetric getCategoryIdAndMetric(final Integer metricId) throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getEventCategoryIdAndMetric(metricId);
+    }
+
+    @Override
+    public BiMap<Integer, CategoryIdAndMetric> getMetrics() throws UnableToObtainConnectionException, CallbackFailedException {
+        final HashBiMap<Integer, CategoryIdAndMetric> accumulator = HashBiMap.create();
+        for (final Map<String, Object> metricInfo : delegate.getMetrics()) {
+            accumulator.put(Integer.valueOf(metricInfo.get("sample_kind_id").toString()),
+                            new CategoryIdAndMetric((Integer) metricInfo.get("event_category_id"), metricInfo.get("sample_kind").toString()));
+        }
+        return accumulator;
+    }
+
+    @Override
+    public synchronized int getOrAddMetric(final Integer sourceId, final Integer eventCategoryId, final String metric) throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.begin();
+        delegate.addMetric(eventCategoryId, metric);
+        final Integer metricId = delegate.getMetricId(eventCategoryId, metric);
+        delegate.commit();
+
+        return metricId;
+    }
+
+    @Override
+    public Iterable<Integer> getMetricIdsBySourceId(final Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getMetricIdsBySourceId(sourceId);
+    }
+
+    @Override
+    public Iterable<SourceIdAndMetricId> getMetricIdsForAllSources() throws UnableToObtainConnectionException, CallbackFailedException {
+        return delegate.getMetricIdsForAllSources();
+    }
+
+    @Override
+    public Long insertTimelineChunk(final TimelineChunk timelineChunk) throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.begin();
+        delegate.insertTimelineChunk(timelineChunk);
+        final long timelineChunkId = delegate.getLastInsertedId();
+        delegate.commit();
+        return timelineChunkId;
+    }
+
+    @Override
+    public void getSamplesBySourceIdsAndMetricIds(final List<Integer> sourceIdList,
+                                                  @Nullable final List<Integer> metricIdList,
+                                                  final DateTime startTime,
+                                                  final DateTime endTime,
+                                                  final TimelineChunkConsumer chunkConsumer) {
+        dbi.withHandle(new HandleCallback<Void>() {
+            @Override
+            public Void withHandle(final Handle handle) throws Exception {
+                handle.setStatementLocator(new StringTemplate3StatementLocator(TimelineSqlDao.class));
+
+                ResultIterator<TimelineChunk> iterator = null;
+                try {
+                    final Query<Map<String, Object>> query = handle
+                            .createQuery("getSamplesBySourceIdsAndMetricIds")
+                            .bind("startTime", DateTimeUtils.unixSeconds(startTime))
+                            .bind("endTime", DateTimeUtils.unixSeconds(endTime))
+                            .define("sourceIds", JOINER.join(sourceIdList));
+
+                    if (metricIdList != null && !metricIdList.isEmpty()) {
+                        query.define("metricIds", JOINER.join(metricIdList));
+                    }
+
+                    iterator = query
+                            .map(timelineChunkMapper)
+                            .iterator();
+
+                    while (iterator.hasNext()) {
+                        chunkConsumer.processTimelineChunk(iterator.next());
+                    }
+                    return null;
+                } finally {
+                    if (iterator != null) {
+                        try {
+                            iterator.close();
+                        } catch (Exception e) {
+                            log.error("Exception closing TimelineChunkAndTimes iterator for sourceIds {} and metricIds {}", sourceIdList, metricIdList);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    @Override
+    public Integer insertLastStartTimes(final StartTimes startTimes) {
+        return delegate.insertLastStartTimes(startTimes);
+    }
+
+    @Override
+    public StartTimes getLastStartTimes() {
+        return delegate.getLastStartTimes();
+    }
+
+    @Override
+    public void deleteLastStartTimes() {
+        delegate.deleteLastStartTimes();
+    }
+
+    @Override
+    public void test() throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.test();
+    }
+
+    @Override
+    public void bulkInsertSources(final List<String> sources) throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.bulkInsertSources(sources.iterator());
+    }
+
+    @Override
+    public void bulkInsertEventCategories(final List<String> categoryNames) throws UnableToObtainConnectionException, CallbackFailedException {
+        delegate.bulkInsertEventCategories(categoryNames.iterator());
+    }
+
+    @Override
+    public void bulkInsertMetrics(final List<CategoryIdAndMetric> categoryAndKinds) {
+        delegate.bulkInsertMetrics(categoryAndKinds.iterator());
+    }
+
+    @Override
+    public void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList) {
+        delegate.bulkInsertTimelineChunks(timelineChunkList.iterator());
+    }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineDao.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineDao.java
new file mode 100644
index 0000000..5137154
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineDao.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.chunks.TimelineChunkConsumer;
+import com.ning.billing.usage.timeline.shutdown.StartTimes;
+import com.ning.billing.usage.timeline.sources.SourceIdAndMetricId;
+
+import com.google.common.collect.BiMap;
+
+public interface TimelineDao {
+
+    // Sources table
+
+    Integer getSourceId(String source) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    String getSource(Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    BiMap<Integer, String> getSources() throws UnableToObtainConnectionException, CallbackFailedException;
+
+    int getOrAddSource(String source) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    // Event categories table
+
+    Integer getEventCategoryId(String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    String getEventCategory(Integer eventCategoryId) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    BiMap<Integer, String> getEventCategories() throws UnableToObtainConnectionException, CallbackFailedException;
+
+    int getOrAddEventCategory(String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    // Sample kinds table
+
+    Integer getMetricId(int eventCategory, String metric) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    CategoryIdAndMetric getCategoryIdAndMetric(Integer metricId) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    BiMap<Integer, CategoryIdAndMetric> getMetrics() throws UnableToObtainConnectionException, CallbackFailedException;
+
+    int getOrAddMetric(Integer sourceId, Integer eventCategoryId, String metric) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    Iterable<Integer> getMetricIdsBySourceId(Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    Iterable<SourceIdAndMetricId> getMetricIdsForAllSources() throws UnableToObtainConnectionException, CallbackFailedException;
+
+    // Timelines tables
+
+    Long insertTimelineChunk(TimelineChunk timelineChunk) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    void getSamplesBySourceIdsAndMetricIds(List<Integer> sourceIds,
+                                           @Nullable List<Integer> metricIds,
+                                           DateTime startTime,
+                                           DateTime endTime,
+                                           TimelineChunkConsumer chunkConsumer) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    Integer insertLastStartTimes(StartTimes startTimes);
+
+    StartTimes getLastStartTimes();
+
+    void deleteLastStartTimes();
+
+    void bulkInsertSources(final List<String> sources) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    void bulkInsertEventCategories(final List<String> categoryNames) throws UnableToObtainConnectionException, CallbackFailedException;
+
+    void bulkInsertMetrics(final List<CategoryIdAndMetric> categoryAndKinds);
+
+    void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList);
+
+    void test() throws UnableToObtainConnectionException, CallbackFailedException;
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineSqlDao.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineSqlDao.java
new file mode 100644
index 0000000..395ee3a
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineSqlDao.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.skife.jdbi.v2.DefaultMapper;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.SqlBatch;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetricBinder;
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetricMapper;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.chunks.TimelineChunkBinder;
+import com.ning.billing.usage.timeline.shutdown.StartTimes;
+import com.ning.billing.usage.timeline.shutdown.StartTimesBinder;
+import com.ning.billing.usage.timeline.shutdown.StartTimesMapper;
+import com.ning.billing.usage.timeline.sources.SourceIdAndMetricId;
+import com.ning.billing.usage.timeline.sources.SourceIdAndMetricIdMapper;
+
+@ExternalizedSqlViaStringTemplate3()
+@RegisterMapper({CategoryIdAndMetricMapper.class, StartTimesMapper.class, SourceIdAndMetricIdMapper.class})
+public interface TimelineSqlDao extends Transactional<TimelineSqlDao> {
+
+    @SqlQuery
+    Integer getSourceId(@Bind("sourceName") final String source);
+
+    @SqlQuery
+    String getSource(@Bind("sourceId") final Integer sourceId);
+
+    @SqlQuery
+    @Mapper(DefaultMapper.class)
+    List<Map<String, Object>> getSources();
+
+    @SqlUpdate
+    void addSource(@Bind("sourceName") final String source);
+
+    @SqlBatch
+    @BatchChunkSize(1000)
+    void bulkInsertSources(@Bind("sourceName") Iterator<String> sourcesIterator);
+
+    @SqlQuery
+    Integer getEventCategoryId(@Bind("eventCategory") final String eventCategory);
+
+    @SqlQuery
+    String getEventCategory(@Bind("eventCategoryId") final Integer eventCategoryId);
+
+    @SqlUpdate
+    void addEventCategory(@Bind("eventCategory") final String eventCategory);
+
+    @SqlBatch
+    @BatchChunkSize(1000)
+    void bulkInsertEventCategories(@Bind("eventCategory") Iterator<String> cateogoryNames);
+
+    @SqlQuery
+    Iterable<Integer> getMetricIdsBySourceId(@Bind("sourceId") final Integer sourceId);
+
+    @SqlQuery
+    Iterable<SourceIdAndMetricId> getMetricIdsForAllSources();
+
+    @SqlQuery
+    Integer getMetricId(@Bind("eventCategoryId") final int eventCategoryId, @Bind("metric") final String metric);
+
+    @SqlQuery
+    CategoryIdAndMetric getEventCategoryIdAndMetric(@Bind("metricId") final Integer metricId);
+
+    @SqlUpdate
+    void addMetric(@Bind("eventCategoryId") final int eventCategoryId, @Bind("metric") final String metric);
+
+    @SqlBatch
+    @BatchChunkSize(1000)
+    void bulkInsertMetrics(@CategoryIdAndMetricBinder Iterator<CategoryIdAndMetric> categoriesAndMetrics);
+
+    @SqlQuery
+    @Mapper(DefaultMapper.class)
+    List<Map<String, Object>> getEventCategories();
+
+    @SqlQuery
+    @Mapper(DefaultMapper.class)
+    List<Map<String, Object>> getMetrics();
+
+    @SqlQuery
+    int getLastInsertedId();
+
+    @SqlQuery
+    long getHighestTimelineChunkId();
+
+    @SqlUpdate
+    void insertTimelineChunk(@TimelineChunkBinder final TimelineChunk timelineChunk);
+
+    @SqlBatch
+    @BatchChunkSize(1000)
+    void bulkInsertTimelineChunks(@TimelineChunkBinder Iterator<TimelineChunk> chunkIterator);
+
+    @SqlUpdate
+    Integer insertLastStartTimes(@StartTimesBinder final StartTimes startTimes);
+
+    @SqlQuery
+    StartTimes getLastStartTimes();
+
+    @SqlUpdate
+    void deleteLastStartTimes();
+
+    @SqlUpdate
+    void test();
+}