Details
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/CachingTimelineDao.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/CachingTimelineDao.java
new file mode 100644
index 0000000..2ce5673
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/CachingTimelineDao.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.chunks.TimelineChunkConsumer;
+import com.ning.billing.usage.timeline.shutdown.StartTimes;
+import com.ning.billing.usage.timeline.sources.SourceIdAndMetricId;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableList;
+
+public class CachingTimelineDao implements TimelineDao {
+
+ private static final Logger log = LoggerFactory.getLogger(CachingTimelineDao.class);
+
+ private final BiMap<Integer, String> sourcesCache;
+ private final Map<Integer, Set<Integer>> sourceIdsMetricIdsCache;
+ private final BiMap<Integer, CategoryIdAndMetric> metricsCache;
+ private final BiMap<Integer, String> eventCategoriesCache;
+
+ private final TimelineDao delegate;
+
+ public CachingTimelineDao(final TimelineDao delegate) {
+ this.delegate = delegate;
+ sourcesCache = delegate.getSources();
+ metricsCache = delegate.getMetrics();
+ eventCategoriesCache = delegate.getEventCategories();
+ sourceIdsMetricIdsCache = new HashMap<Integer, Set<Integer>>();
+ for (final SourceIdAndMetricId both : delegate.getMetricIdsForAllSources()) {
+ final int sourceId = both.getSourceId();
+ final int metricId = both.getMetricId();
+ Set<Integer> metricIds = sourceIdsMetricIdsCache.get(sourceId);
+ if (metricIds == null) {
+ metricIds = new HashSet<Integer>();
+ sourceIdsMetricIdsCache.put(sourceId, metricIds);
+ }
+ metricIds.add(metricId);
+ }
+ }
+
+ @Override
+ public Integer getSourceId(final String source) throws UnableToObtainConnectionException, CallbackFailedException {
+ return sourcesCache.inverse().get(source);
+ }
+
+ @Override
+ public String getSource(final Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException {
+ return sourcesCache.get(sourceId);
+ }
+
+ @Override
+ public BiMap<Integer, String> getSources() throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getSources();
+ }
+
+ @Override
+ public synchronized int getOrAddSource(final String source) throws UnableToObtainConnectionException, CallbackFailedException {
+ Integer sourceId = sourcesCache.inverse().get(source);
+ if (sourceId == null) {
+ sourceId = delegate.getOrAddSource(source);
+ sourcesCache.put(sourceId, source);
+ }
+
+ return sourceId;
+ }
+
+ @Override
+ public Integer getEventCategoryId(final String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException {
+ return eventCategoriesCache.inverse().get(eventCategory);
+ }
+
+ @Override
+ public String getEventCategory(final Integer eventCategoryId) throws UnableToObtainConnectionException {
+ return eventCategoriesCache.get(eventCategoryId);
+ }
+
+ @Override
+ public BiMap<Integer, String> getEventCategories() throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getEventCategories();
+ }
+
+ @Override
+ public int getOrAddEventCategory(final String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException {
+ Integer eventCategoryId = eventCategoriesCache.inverse().get(eventCategory);
+ if (eventCategoryId == null) {
+ eventCategoryId = delegate.getOrAddEventCategory(eventCategory);
+ eventCategoriesCache.put(eventCategoryId, eventCategory);
+ }
+ return eventCategoryId;
+ }
+
+ @Override
+ public Integer getMetricId(final int eventCategoryId, final String metric) throws UnableToObtainConnectionException {
+ return metricsCache.inverse().get(new CategoryIdAndMetric(eventCategoryId, metric));
+ }
+
+ @Override
+ public CategoryIdAndMetric getCategoryIdAndMetric(final Integer metricId) throws UnableToObtainConnectionException {
+ return metricsCache.get(metricId);
+ }
+
+ @Override
+ public BiMap<Integer, CategoryIdAndMetric> getMetrics() throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getMetrics();
+ }
+
+ @Override
+ public synchronized int getOrAddMetric(final Integer sourceId, final Integer eventCategoryId, final String metric) throws UnableToObtainConnectionException, CallbackFailedException {
+ final CategoryIdAndMetric categoryIdAndMetric = new CategoryIdAndMetric(eventCategoryId, metric);
+ Integer metricId = metricsCache.inverse().get(categoryIdAndMetric);
+ if (metricId == null) {
+ metricId = delegate.getOrAddMetric(sourceId, eventCategoryId, metric);
+ metricsCache.put(metricId, categoryIdAndMetric);
+ }
+ if (sourceId != null) {
+ Set<Integer> metricIds = sourceIdsMetricIdsCache.get(sourceId);
+ if (metricIds == null) {
+ metricIds = new HashSet<Integer>();
+ sourceIdsMetricIdsCache.put(sourceId, metricIds);
+ }
+ metricIds.add(metricId);
+ }
+ return metricId;
+ }
+
+ @Override
+ public Iterable<Integer> getMetricIdsBySourceId(final Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException {
+ return ImmutableList.copyOf(sourceIdsMetricIdsCache.get(sourceId));
+ }
+
+ @Override
+ public Iterable<SourceIdAndMetricId> getMetricIdsForAllSources() throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getMetricIdsForAllSources();
+ }
+
+
+ @Override
+ public Long insertTimelineChunk(final TimelineChunk timelineChunk) throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.insertTimelineChunk(timelineChunk);
+ }
+
+ @Override
+ public void getSamplesBySourceIdsAndMetricIds(final List<Integer> sourceIds, @Nullable final List<Integer> metricIds,
+ final DateTime startTime, final DateTime endTime, final TimelineChunkConsumer chunkConsumer) throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.getSamplesBySourceIdsAndMetricIds(sourceIds, metricIds, startTime, endTime, chunkConsumer);
+ }
+
+ @Override
+ public Integer insertLastStartTimes(final StartTimes startTimes) {
+ return delegate.insertLastStartTimes(startTimes);
+ }
+
+ @Override
+ public StartTimes getLastStartTimes() {
+ return delegate.getLastStartTimes();
+ }
+
+ @Override
+ public void deleteLastStartTimes() {
+ delegate.deleteLastStartTimes();
+ }
+
+ @Override
+ public void bulkInsertEventCategories(final List<String> categoryNames) throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.bulkInsertEventCategories(categoryNames);
+ }
+
+ @Override
+ public void bulkInsertSources(final List<String> sources) throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.bulkInsertSources(sources);
+ }
+
+ @Override
+ public void bulkInsertMetrics(final List<CategoryIdAndMetric> categoryAndKinds) {
+ delegate.bulkInsertMetrics(categoryAndKinds);
+ }
+
+ @Override
+ public void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList) {
+ delegate.bulkInsertTimelineChunks(timelineChunkList);
+ }
+
+ @Override
+ public void test() throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.test();
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/DefaultTimelineDao.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/DefaultTimelineDao.java
new file mode 100644
index 0000000..77f232d
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/DefaultTimelineDao.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.ResultIterator;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.StringTemplate3StatementLocator;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.usage.timeline.DateTimeUtils;
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.chunks.TimelineChunkConsumer;
+import com.ning.billing.usage.timeline.chunks.TimelineChunkMapper;
+import com.ning.billing.usage.timeline.shutdown.StartTimes;
+import com.ning.billing.usage.timeline.sources.SourceIdAndMetricId;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.inject.Inject;
+
+public class DefaultTimelineDao implements TimelineDao {
+
+ private static final Logger log = LoggerFactory.getLogger(DefaultTimelineDao.class);
+ private static final Joiner JOINER = Joiner.on(",");
+
+ private final IDBI dbi;
+ private final TimelineChunkMapper timelineChunkMapper;
+ private final TimelineSqlDao delegate;
+
+ @Inject
+ public DefaultTimelineDao(final IDBI dbi) {
+ this.dbi = dbi;
+ this.timelineChunkMapper = new TimelineChunkMapper();
+ this.delegate = dbi.onDemand(TimelineSqlDao.class);
+ }
+
+ @Override
+ public String getSource(final Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getSource(sourceId);
+ }
+
+ @Override
+ public Integer getSourceId(final String source) throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getSourceId(source);
+ }
+
+ @Override
+ public BiMap<Integer, String> getSources() throws UnableToObtainConnectionException, CallbackFailedException {
+ final HashBiMap<Integer, String> accumulator = HashBiMap.create();
+ for (final Map<String, Object> metric : delegate.getSources()) {
+ accumulator.put(Integer.valueOf(metric.get("source_id").toString()), metric.get("source_name").toString());
+ }
+ return accumulator;
+ }
+
+ @Override
+ public synchronized int getOrAddSource(final String source) throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.begin();
+ delegate.addSource(source);
+ final Integer sourceId = delegate.getSourceId(source);
+ delegate.commit();
+
+ return sourceId;
+ }
+
+ @Override
+ public Integer getEventCategoryId(final String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getEventCategoryId(eventCategory);
+ }
+
+ @Override
+ public String getEventCategory(final Integer eventCategoryId) throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getEventCategory(eventCategoryId);
+ }
+
+ @Override
+ public BiMap<Integer, String> getEventCategories() throws UnableToObtainConnectionException, CallbackFailedException {
+ final HashBiMap<Integer, String> accumulator = HashBiMap.create();
+ for (final Map<String, Object> eventCategory : delegate.getEventCategories()) {
+ accumulator.put(Integer.valueOf(eventCategory.get("event_category_id").toString()), eventCategory.get("event_category").toString());
+ }
+ return accumulator;
+ }
+
+ @Override
+ public synchronized int getOrAddEventCategory(final String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.begin();
+ delegate.addEventCategory(eventCategory);
+ final Integer eventCategoryId = delegate.getEventCategoryId(eventCategory);
+ delegate.commit();
+
+ return eventCategoryId;
+ }
+
+ @Override
+ public Integer getMetricId(final int eventCategoryId, final String metric) throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getMetricId(eventCategoryId, metric);
+ }
+
+ @Override
+ public CategoryIdAndMetric getCategoryIdAndMetric(final Integer metricId) throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getEventCategoryIdAndMetric(metricId);
+ }
+
+ @Override
+ public BiMap<Integer, CategoryIdAndMetric> getMetrics() throws UnableToObtainConnectionException, CallbackFailedException {
+ final HashBiMap<Integer, CategoryIdAndMetric> accumulator = HashBiMap.create();
+ for (final Map<String, Object> metricInfo : delegate.getMetrics()) {
+ accumulator.put(Integer.valueOf(metricInfo.get("sample_kind_id").toString()),
+ new CategoryIdAndMetric((Integer) metricInfo.get("event_category_id"), metricInfo.get("sample_kind").toString()));
+ }
+ return accumulator;
+ }
+
+ @Override
+ public synchronized int getOrAddMetric(final Integer sourceId, final Integer eventCategoryId, final String metric) throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.begin();
+ delegate.addMetric(eventCategoryId, metric);
+ final Integer metricId = delegate.getMetricId(eventCategoryId, metric);
+ delegate.commit();
+
+ return metricId;
+ }
+
+ @Override
+ public Iterable<Integer> getMetricIdsBySourceId(final Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getMetricIdsBySourceId(sourceId);
+ }
+
+ @Override
+ public Iterable<SourceIdAndMetricId> getMetricIdsForAllSources() throws UnableToObtainConnectionException, CallbackFailedException {
+ return delegate.getMetricIdsForAllSources();
+ }
+
+ @Override
+ public Long insertTimelineChunk(final TimelineChunk timelineChunk) throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.begin();
+ delegate.insertTimelineChunk(timelineChunk);
+ final long timelineChunkId = delegate.getLastInsertedId();
+ delegate.commit();
+ return timelineChunkId;
+ }
+
+ @Override
+ public void getSamplesBySourceIdsAndMetricIds(final List<Integer> sourceIdList,
+ @Nullable final List<Integer> metricIdList,
+ final DateTime startTime,
+ final DateTime endTime,
+ final TimelineChunkConsumer chunkConsumer) {
+ dbi.withHandle(new HandleCallback<Void>() {
+ @Override
+ public Void withHandle(final Handle handle) throws Exception {
+ handle.setStatementLocator(new StringTemplate3StatementLocator(TimelineSqlDao.class));
+
+ ResultIterator<TimelineChunk> iterator = null;
+ try {
+ final Query<Map<String, Object>> query = handle
+ .createQuery("getSamplesBySourceIdsAndMetricIds")
+ .bind("startTime", DateTimeUtils.unixSeconds(startTime))
+ .bind("endTime", DateTimeUtils.unixSeconds(endTime))
+ .define("sourceIds", JOINER.join(sourceIdList));
+
+ if (metricIdList != null && !metricIdList.isEmpty()) {
+ query.define("metricIds", JOINER.join(metricIdList));
+ }
+
+ iterator = query
+ .map(timelineChunkMapper)
+ .iterator();
+
+ while (iterator.hasNext()) {
+ chunkConsumer.processTimelineChunk(iterator.next());
+ }
+ return null;
+ } finally {
+ if (iterator != null) {
+ try {
+ iterator.close();
+ } catch (Exception e) {
+ log.error("Exception closing TimelineChunkAndTimes iterator for sourceIds {} and metricIds {}", sourceIdList, metricIdList);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public Integer insertLastStartTimes(final StartTimes startTimes) {
+ return delegate.insertLastStartTimes(startTimes);
+ }
+
+ @Override
+ public StartTimes getLastStartTimes() {
+ return delegate.getLastStartTimes();
+ }
+
+ @Override
+ public void deleteLastStartTimes() {
+ delegate.deleteLastStartTimes();
+ }
+
+ @Override
+ public void test() throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.test();
+ }
+
+ @Override
+ public void bulkInsertSources(final List<String> sources) throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.bulkInsertSources(sources.iterator());
+ }
+
+ @Override
+ public void bulkInsertEventCategories(final List<String> categoryNames) throws UnableToObtainConnectionException, CallbackFailedException {
+ delegate.bulkInsertEventCategories(categoryNames.iterator());
+ }
+
+ @Override
+ public void bulkInsertMetrics(final List<CategoryIdAndMetric> categoryAndKinds) {
+ delegate.bulkInsertMetrics(categoryAndKinds.iterator());
+ }
+
+ @Override
+ public void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList) {
+ delegate.bulkInsertTimelineChunks(timelineChunkList.iterator());
+ }
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineDao.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineDao.java
new file mode 100644
index 0000000..5137154
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineDao.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.chunks.TimelineChunkConsumer;
+import com.ning.billing.usage.timeline.shutdown.StartTimes;
+import com.ning.billing.usage.timeline.sources.SourceIdAndMetricId;
+
+import com.google.common.collect.BiMap;
+
+public interface TimelineDao {
+
+ // Sources table
+
+ Integer getSourceId(String source) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ String getSource(Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ BiMap<Integer, String> getSources() throws UnableToObtainConnectionException, CallbackFailedException;
+
+ int getOrAddSource(String source) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ // Event categories table
+
+ Integer getEventCategoryId(String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ String getEventCategory(Integer eventCategoryId) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ BiMap<Integer, String> getEventCategories() throws UnableToObtainConnectionException, CallbackFailedException;
+
+ int getOrAddEventCategory(String eventCategory) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ // Sample kinds table
+
+ Integer getMetricId(int eventCategory, String metric) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ CategoryIdAndMetric getCategoryIdAndMetric(Integer metricId) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ BiMap<Integer, CategoryIdAndMetric> getMetrics() throws UnableToObtainConnectionException, CallbackFailedException;
+
+ int getOrAddMetric(Integer sourceId, Integer eventCategoryId, String metric) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ Iterable<Integer> getMetricIdsBySourceId(Integer sourceId) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ Iterable<SourceIdAndMetricId> getMetricIdsForAllSources() throws UnableToObtainConnectionException, CallbackFailedException;
+
+ // Timelines tables
+
+ Long insertTimelineChunk(TimelineChunk timelineChunk) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ void getSamplesBySourceIdsAndMetricIds(List<Integer> sourceIds,
+ @Nullable List<Integer> metricIds,
+ DateTime startTime,
+ DateTime endTime,
+ TimelineChunkConsumer chunkConsumer) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ Integer insertLastStartTimes(StartTimes startTimes);
+
+ StartTimes getLastStartTimes();
+
+ void deleteLastStartTimes();
+
+ void bulkInsertSources(final List<String> sources) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ void bulkInsertEventCategories(final List<String> categoryNames) throws UnableToObtainConnectionException, CallbackFailedException;
+
+ void bulkInsertMetrics(final List<CategoryIdAndMetric> categoryAndKinds);
+
+ void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList);
+
+ void test() throws UnableToObtainConnectionException, CallbackFailedException;
+}
diff --git a/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineSqlDao.java b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineSqlDao.java
new file mode 100644
index 0000000..395ee3a
--- /dev/null
+++ b/usage/src/main/java/com/ning/billing/usage/timeline/persistent/TimelineSqlDao.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.usage.timeline.persistent;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.skife.jdbi.v2.DefaultMapper;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.SqlBatch;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetric;
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetricBinder;
+import com.ning.billing.usage.timeline.categories.CategoryIdAndMetricMapper;
+import com.ning.billing.usage.timeline.chunks.TimelineChunk;
+import com.ning.billing.usage.timeline.chunks.TimelineChunkBinder;
+import com.ning.billing.usage.timeline.shutdown.StartTimes;
+import com.ning.billing.usage.timeline.shutdown.StartTimesBinder;
+import com.ning.billing.usage.timeline.shutdown.StartTimesMapper;
+import com.ning.billing.usage.timeline.sources.SourceIdAndMetricId;
+import com.ning.billing.usage.timeline.sources.SourceIdAndMetricIdMapper;
+
+@ExternalizedSqlViaStringTemplate3()
+@RegisterMapper({CategoryIdAndMetricMapper.class, StartTimesMapper.class, SourceIdAndMetricIdMapper.class})
+public interface TimelineSqlDao extends Transactional<TimelineSqlDao> {
+
+ @SqlQuery
+ Integer getSourceId(@Bind("sourceName") final String source);
+
+ @SqlQuery
+ String getSource(@Bind("sourceId") final Integer sourceId);
+
+ @SqlQuery
+ @Mapper(DefaultMapper.class)
+ List<Map<String, Object>> getSources();
+
+ @SqlUpdate
+ void addSource(@Bind("sourceName") final String source);
+
+ @SqlBatch
+ @BatchChunkSize(1000)
+ void bulkInsertSources(@Bind("sourceName") Iterator<String> sourcesIterator);
+
+ @SqlQuery
+ Integer getEventCategoryId(@Bind("eventCategory") final String eventCategory);
+
+ @SqlQuery
+ String getEventCategory(@Bind("eventCategoryId") final Integer eventCategoryId);
+
+ @SqlUpdate
+ void addEventCategory(@Bind("eventCategory") final String eventCategory);
+
+ @SqlBatch
+ @BatchChunkSize(1000)
+ void bulkInsertEventCategories(@Bind("eventCategory") Iterator<String> cateogoryNames);
+
+ @SqlQuery
+ Iterable<Integer> getMetricIdsBySourceId(@Bind("sourceId") final Integer sourceId);
+
+ @SqlQuery
+ Iterable<SourceIdAndMetricId> getMetricIdsForAllSources();
+
+ @SqlQuery
+ Integer getMetricId(@Bind("eventCategoryId") final int eventCategoryId, @Bind("metric") final String metric);
+
+ @SqlQuery
+ CategoryIdAndMetric getEventCategoryIdAndMetric(@Bind("metricId") final Integer metricId);
+
+ @SqlUpdate
+ void addMetric(@Bind("eventCategoryId") final int eventCategoryId, @Bind("metric") final String metric);
+
+ @SqlBatch
+ @BatchChunkSize(1000)
+ void bulkInsertMetrics(@CategoryIdAndMetricBinder Iterator<CategoryIdAndMetric> categoriesAndMetrics);
+
+ @SqlQuery
+ @Mapper(DefaultMapper.class)
+ List<Map<String, Object>> getEventCategories();
+
+ @SqlQuery
+ @Mapper(DefaultMapper.class)
+ List<Map<String, Object>> getMetrics();
+
+ @SqlQuery
+ int getLastInsertedId();
+
+ @SqlQuery
+ long getHighestTimelineChunkId();
+
+ @SqlUpdate
+ void insertTimelineChunk(@TimelineChunkBinder final TimelineChunk timelineChunk);
+
+ @SqlBatch
+ @BatchChunkSize(1000)
+ void bulkInsertTimelineChunks(@TimelineChunkBinder Iterator<TimelineChunk> chunkIterator);
+
+ @SqlUpdate
+ Integer insertLastStartTimes(@StartTimesBinder final StartTimes startTimes);
+
+ @SqlQuery
+ StartTimes getLastStartTimes();
+
+ @SqlUpdate
+ void deleteLastStartTimes();
+
+ @SqlUpdate
+ void test();
+}