diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
index dc4b6ae..a67b49b 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
@@ -18,6 +18,7 @@ package com.ning.billing.meter.timeline.persistent;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import javax.annotation.Nullable;
@@ -26,6 +27,8 @@ import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.sqlobject.stringtemplate.StringTemplate3StatementLocator;
@@ -38,6 +41,7 @@ import com.ning.billing.meter.timeline.chunks.TimelineChunk;
import com.ning.billing.meter.timeline.chunks.TimelineChunkMapper;
import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
import com.ning.billing.meter.timeline.shutdown.StartTimes;
+import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
import com.ning.billing.meter.timeline.util.DateTimeUtils;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
@@ -83,13 +87,26 @@ public class DefaultTimelineDao implements TimelineDao {
}
@Override
- public synchronized int getOrAddSource(final String source, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
- delegate.begin();
- delegate.addSource(source, context);
- final Integer sourceId = delegate.getSourceRecordId(source, context);
- delegate.commit();
+ public int getOrAddSource(final String source, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
- return sourceId;
+ final Integer result = delegate.inTransaction(new Transaction<Integer, TimelineSqlDao>() {
+
+ @Override
+ public Integer inTransaction(final TimelineSqlDao transactional, final TransactionStatus status) throws Exception {
+ return getOrAddWithRetry(new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ Integer sourceId = transactional.getSourceRecordId(source, context);
+ if (sourceId == null) {
+ transactional.addSource(source, context);
+ sourceId = transactional.getSourceRecordId(source, context);
+ }
+ return sourceId;
+ }
+ });
+ }
+ });
+ return result;
}
@Override
@@ -112,15 +129,29 @@ public class DefaultTimelineDao implements TimelineDao {
}
@Override
- public synchronized int getOrAddEventCategory(final String eventCategory, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
- delegate.begin();
- delegate.addCategory(eventCategory, context);
- final Integer eventCategoryId = delegate.getCategoryRecordId(eventCategory, context);
- delegate.commit();
+ public int getOrAddEventCategory(final String eventCategory, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+
+ final Integer result = delegate.inTransaction(new Transaction<Integer, TimelineSqlDao>() {
- return eventCategoryId;
+ @Override
+ public Integer inTransaction(final TimelineSqlDao transactional, final TransactionStatus status) throws Exception {
+ return getOrAddWithRetry(new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ Integer eventCategoryId = transactional.getCategoryRecordId(eventCategory, context);
+ if (eventCategoryId == null) {
+ transactional.addCategory(eventCategory, context);
+ eventCategoryId = transactional.getCategoryRecordId(eventCategory, context);
+ }
+ return eventCategoryId;
+ }
+ });
+ }
+ });
+ return result;
}
+
@Override
public Integer getMetricId(final int eventCategoryId, final String metric, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
return delegate.getMetricRecordId(eventCategoryId, metric, context);
@@ -143,21 +174,39 @@ public class DefaultTimelineDao implements TimelineDao {
@Override
public synchronized int getOrAddMetric(final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
- delegate.begin();
- delegate.addMetric(eventCategoryId, metric, context);
- final Integer metricId = delegate.getMetricRecordId(eventCategoryId, metric, context);
- delegate.commit();
- return metricId;
+ final Integer result = delegate.inTransaction(new Transaction<Integer, TimelineSqlDao>() {
+
+ @Override
+ public Integer inTransaction(final TimelineSqlDao transactional, final TransactionStatus status) throws Exception {
+ return getOrAddWithRetry(new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ Integer metricId = transactional.getMetricRecordId(eventCategoryId, metric, context);
+ if (metricId == null) {
+ transactional.addMetric(eventCategoryId, metric, context);
+ metricId = transactional.getMetricRecordId(eventCategoryId, metric, context);
+ }
+ return metricId;
+ }
+ });
+ }
+ });
+ return result;
}
@Override
public Long insertTimelineChunk(final TimelineChunk timelineChunk, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
- delegate.begin();
- delegate.insertTimelineChunk(timelineChunk, context);
- final long timelineChunkId = delegate.getLastInsertedRecordId(context);
- delegate.commit();
- return timelineChunkId;
+
+ final Long result = delegate.inTransaction(new Transaction<Long, TimelineSqlDao>() {
+ @Override
+ public Long inTransaction(final TimelineSqlDao transactional, final TransactionStatus status) throws Exception {
+ transactional.insertTimelineChunk(timelineChunk, context);
+ final long timelineChunkId = transactional.getLastInsertedRecordId(context);
+ return timelineChunkId;
+ }
+ });
+ return result;
}
@Override
@@ -234,4 +283,23 @@ public class DefaultTimelineDao implements TimelineDao {
public void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList, final InternalCallContext context) {
delegate.bulkInsertTimelineChunks(timelineChunkList.iterator(), context);
}
+
+ private <T> T getOrAddWithRetry(final Callable<T> task) throws Exception {
+ int retry = 1;
+ Exception lastException = null;
+ do {
+ try {
+ return task.call();
+ } catch (Exception e) {
+ //
+ // If we have two transaction that occurs at the time and try to insert
+ // both the same key, one of the transaction will rollbacl and that code will retry
+ // and (should) succeed because this time key exists and caller will first do a get.
+ //
+ lastException = e;
+ }
+ } while (retry-- > 0);
+ throw lastException;
+ }
+
}
diff --git a/meter/src/main/resources/com/ning/billing/meter/ddl.sql b/meter/src/main/resources/com/ning/billing/meter/ddl.sql
index f7080e2..e11feeb 100644
--- a/meter/src/main/resources/com/ning/billing/meter/ddl.sql
+++ b/meter/src/main/resources/com/ning/billing/meter/ddl.sql
@@ -1,6 +1,7 @@
/*! SET storage_engine=INNODB */;
-create table sources (
+DROP TABLE IF EXISTS sources;
+CREATE TABLE sources (
record_id int(11) unsigned not null auto_increment
, source char(36) not null
, created_date datetime default null
@@ -10,38 +11,41 @@ create table sources (
, account_record_id int(11) unsigned default null
, tenant_record_id int(11) unsigned default null
, primary key(record_id)
-, index created_date_record_id_dx (created_date, record_id)
);
-create index sources_tenant_account_record_id on sources(tenant_record_id, account_record_id);
+CREATE INDEX created_date_record_id_dx on sources(created_date, record_id);
+CREATE INDEX sources_tenant_account_record_id on sources(tenant_record_id, account_record_id);
-create table categories (
+DROP TABLE IF EXISTS categories;
+CREATE TABLE categories (
record_id int(11) unsigned not null auto_increment
-, category varchar(256) not null
+, category varchar(255) not null
, created_date datetime default null
, created_by varchar(50) default null
, updated_date datetime default null
, updated_by varchar(50) default null
, tenant_record_id int(11) unsigned default null
, primary key(record_id)
-, unique index event_category_unq (category)
);
-create index categories_tenant_record_id on categories(tenant_record_id);
+CREATE UNIQUE INDEX event_category_unq on categories(category);
+CREATE INDEX categories_tenant_record_id on categories(tenant_record_id);
-create table metrics (
+DROP TABLE IF EXISTS metrics;
+CREATE TABLE metrics (
record_id int(11) unsigned not null auto_increment
, category_record_id integer not null
-, metric varchar(256) not null
+, metric varchar(255) not null
, created_date datetime default null
, created_by varchar(50) default null
, updated_date datetime default null
, updated_by varchar(50) default null
, tenant_record_id int(11) unsigned default null
, primary key(record_id)
-, unique index metric_unq (category_record_id, metric)
);
-create index metrics_tenant_record_id on metrics(tenant_record_id);
+CREATE UNIQUE INDEX metric_unq on metrics(category_record_id, metric);
+CREATE INDEX metrics_tenant_record_id on metrics(tenant_record_id);
-create table timeline_chunks (
+DROP TABLE IF EXISTS timeline_chunks;
+CREATE TABLE timeline_chunks (
record_id bigint not null auto_increment
, source_record_id integer not null
, metric_record_id integer not null
@@ -56,14 +60,15 @@ create table timeline_chunks (
, account_record_id int(11) unsigned default null
, tenant_record_id int(11) unsigned default null
, primary key(record_id)
-, unique index source_record_id_timeline_chunk_metric_record_idx (source_record_id, metric_record_id, start_time, aggregation_level)
-, index valid_agg_host_start_time (not_valid, aggregation_level, source_record_id, metric_record_id, start_time)
);
+CREATE UNIQUE INDEX source_record_id_timeline_chunk_metric_record_idx on timeline_chunks(source_record_id, metric_record_id, start_time, aggregation_level);
+CREATE INDEX valid_agg_host_start_time on timeline_chunks(not_valid, aggregation_level, source_record_id, metric_record_id, start_time);
-create table last_start_times (
+DROP TABLE IF EXISTS last_start_times;
+CREATE TABLE last_start_times (
time_inserted int not null primary key
, start_times mediumtext not null
);
-insert into timeline_chunks(record_id, source_record_id, metric_record_id, sample_count, start_time, end_time, in_row_samples, blob_samples)
-values (0, 0, 0, 0, 0, 0, null, null);
+INSERT INTO timeline_chunks(record_id, source_record_id, metric_record_id, sample_count, start_time, end_time, in_row_samples, blob_samples)
+VALUES (0, 0, 0, 0, 0, 0, null, null);