killbill-memoizeit

Details

diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
index dc4b6ae..a67b49b 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/DefaultTimelineDao.java
@@ -18,6 +18,7 @@ package com.ning.billing.meter.timeline.persistent;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import javax.annotation.Nullable;
 
@@ -26,6 +27,8 @@ import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Query;
 import org.skife.jdbi.v2.ResultIterator;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.exceptions.CallbackFailedException;
 import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
 import org.skife.jdbi.v2.sqlobject.stringtemplate.StringTemplate3StatementLocator;
@@ -38,6 +41,7 @@ import com.ning.billing.meter.timeline.chunks.TimelineChunk;
 import com.ning.billing.meter.timeline.chunks.TimelineChunkMapper;
 import com.ning.billing.meter.timeline.consumer.TimelineChunkConsumer;
 import com.ning.billing.meter.timeline.shutdown.StartTimes;
+import com.ning.billing.meter.timeline.sources.SourceRecordIdAndMetricRecordId;
 import com.ning.billing.meter.timeline.util.DateTimeUtils;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
@@ -83,13 +87,26 @@ public class DefaultTimelineDao implements TimelineDao {
     }
 
     @Override
-    public synchronized int getOrAddSource(final String source, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        delegate.begin();
-        delegate.addSource(source, context);
-        final Integer sourceId = delegate.getSourceRecordId(source, context);
-        delegate.commit();
+    public int getOrAddSource(final String source, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
 
-        return sourceId;
+        final Integer result = delegate.inTransaction(new Transaction<Integer, TimelineSqlDao>() {
+
+            @Override
+            public Integer inTransaction(final TimelineSqlDao transactional, final TransactionStatus status) throws Exception {
+                return getOrAddWithRetry(new Callable<Integer>() {
+                    @Override
+                    public Integer call() throws Exception {
+                        Integer sourceId = transactional.getSourceRecordId(source, context);
+                        if (sourceId == null) {
+                            transactional.addSource(source, context);
+                            sourceId = transactional.getSourceRecordId(source, context);
+                        }
+                        return sourceId;
+                    }
+                });
+            }
+        });
+        return result;
     }
 
     @Override
@@ -112,15 +129,29 @@ public class DefaultTimelineDao implements TimelineDao {
     }
 
     @Override
-    public synchronized int getOrAddEventCategory(final String eventCategory, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        delegate.begin();
-        delegate.addCategory(eventCategory, context);
-        final Integer eventCategoryId = delegate.getCategoryRecordId(eventCategory, context);
-        delegate.commit();
+    public int getOrAddEventCategory(final String eventCategory, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
+
+        final Integer result = delegate.inTransaction(new Transaction<Integer, TimelineSqlDao>() {
 
-        return eventCategoryId;
+            @Override
+            public Integer inTransaction(final TimelineSqlDao transactional, final TransactionStatus status) throws Exception {
+                return getOrAddWithRetry(new Callable<Integer>() {
+                    @Override
+                    public Integer call() throws Exception {
+                        Integer eventCategoryId = transactional.getCategoryRecordId(eventCategory, context);
+                        if (eventCategoryId == null) {
+                            transactional.addCategory(eventCategory, context);
+                            eventCategoryId = transactional.getCategoryRecordId(eventCategory, context);
+                        }
+                        return eventCategoryId;
+                    }
+                });
+            }
+        });
+        return result;
     }
 
+
     @Override
     public Integer getMetricId(final int eventCategoryId, final String metric, final InternalTenantContext context) throws UnableToObtainConnectionException, CallbackFailedException {
         return delegate.getMetricRecordId(eventCategoryId, metric, context);
@@ -143,21 +174,39 @@ public class DefaultTimelineDao implements TimelineDao {
 
     @Override
     public synchronized int getOrAddMetric(final Integer eventCategoryId, final String metric, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        delegate.begin();
-        delegate.addMetric(eventCategoryId, metric, context);
-        final Integer metricId = delegate.getMetricRecordId(eventCategoryId, metric, context);
-        delegate.commit();
 
-        return metricId;
+        final Integer result = delegate.inTransaction(new Transaction<Integer, TimelineSqlDao>() {
+
+            @Override
+            public Integer inTransaction(final TimelineSqlDao transactional, final TransactionStatus status) throws Exception {
+                return getOrAddWithRetry(new Callable<Integer>() {
+                    @Override
+                    public Integer call() throws Exception {
+                        Integer metricId = transactional.getMetricRecordId(eventCategoryId, metric, context);
+                        if (metricId == null) {
+                            transactional.addMetric(eventCategoryId, metric, context);
+                            metricId = transactional.getMetricRecordId(eventCategoryId, metric, context);
+                        }
+                        return metricId;
+                    }
+                });
+            }
+        });
+        return result;
     }
 
     @Override
     public Long insertTimelineChunk(final TimelineChunk timelineChunk, final InternalCallContext context) throws UnableToObtainConnectionException, CallbackFailedException {
-        delegate.begin();
-        delegate.insertTimelineChunk(timelineChunk, context);
-        final long timelineChunkId = delegate.getLastInsertedRecordId(context);
-        delegate.commit();
-        return timelineChunkId;
+
+        final Long result = delegate.inTransaction(new Transaction<Long, TimelineSqlDao>() {
+            @Override
+            public Long inTransaction(final TimelineSqlDao transactional, final TransactionStatus status) throws Exception {
+                transactional.insertTimelineChunk(timelineChunk, context);
+                final long timelineChunkId = transactional.getLastInsertedRecordId(context);
+                return timelineChunkId;
+            }
+        });
+        return result;
     }
 
     @Override
@@ -234,4 +283,23 @@ public class DefaultTimelineDao implements TimelineDao {
     public void bulkInsertTimelineChunks(final List<TimelineChunk> timelineChunkList, final InternalCallContext context) {
         delegate.bulkInsertTimelineChunks(timelineChunkList.iterator(), context);
     }
+
+    private <T> T getOrAddWithRetry(final Callable<T> task) throws Exception {
+        int retry = 1;
+        Exception lastException = null;
+        do {
+            try {
+                return task.call();
+            } catch (Exception e) {
+                //
+                // If we have two transaction that occurs at the time and try to insert
+                // both the same key, one of the transaction will rollbacl and that code will retry
+                // and (should) succeed because this time key exists and caller will first do a get.
+                //
+                lastException = e;
+            }
+        } while (retry-- > 0);
+        throw lastException;
+    }
+
 }
diff --git a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/StreamyBytesPersistentOutputStream.java b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/StreamyBytesPersistentOutputStream.java
index be15537..2983f7f 100644
--- a/meter/src/main/java/com/ning/billing/meter/timeline/persistent/StreamyBytesPersistentOutputStream.java
+++ b/meter/src/main/java/com/ning/billing/meter/timeline/persistent/StreamyBytesPersistentOutputStream.java
@@ -114,7 +114,7 @@ public class StreamyBytesPersistentOutputStream extends OutputStream {
 
     @VisibleForTesting
     String getFileName() {
-        return basePath + "arecibo." + prefix + "." + System.nanoTime() + ".bin";
+        return basePath + "killbill." + prefix + "." + System.nanoTime() + ".bin";
     }
 
     private void flushToFile(final File out) throws IOException {
diff --git a/meter/src/main/resources/com/ning/billing/meter/ddl.sql b/meter/src/main/resources/com/ning/billing/meter/ddl.sql
index f7080e2..e11feeb 100644
--- a/meter/src/main/resources/com/ning/billing/meter/ddl.sql
+++ b/meter/src/main/resources/com/ning/billing/meter/ddl.sql
@@ -1,6 +1,7 @@
 /*! SET storage_engine=INNODB */;
 
-create table sources (
+DROP TABLE IF EXISTS sources;
+CREATE TABLE sources (
   record_id int(11) unsigned not null auto_increment
 , source char(36) not null
 , created_date datetime default null
@@ -10,38 +11,41 @@ create table sources (
 , account_record_id int(11) unsigned default null
 , tenant_record_id int(11) unsigned default null
 , primary key(record_id)
-, index created_date_record_id_dx (created_date, record_id)
 );
-create index sources_tenant_account_record_id on sources(tenant_record_id, account_record_id);
+CREATE INDEX created_date_record_id_dx on sources(created_date, record_id);
+CREATE INDEX sources_tenant_account_record_id on sources(tenant_record_id, account_record_id);
 
-create table categories (
+DROP TABLE IF EXISTS categories;
+CREATE TABLE categories (
   record_id int(11) unsigned not null auto_increment
-, category varchar(256) not null
+, category varchar(255) not null
 , created_date datetime default null
 , created_by varchar(50) default null
 , updated_date datetime default null
 , updated_by varchar(50) default null
 , tenant_record_id int(11) unsigned default null
 , primary key(record_id)
-, unique index event_category_unq (category)
 );
-create index categories_tenant_record_id on categories(tenant_record_id);
+CREATE UNIQUE INDEX event_category_unq on categories(category);
+CREATE INDEX categories_tenant_record_id on categories(tenant_record_id);
 
-create table metrics (
+DROP TABLE IF EXISTS metrics;
+CREATE TABLE metrics (
   record_id int(11) unsigned not null auto_increment
 , category_record_id integer not null
-, metric varchar(256) not null
+, metric varchar(255) not null
 , created_date datetime default null
 , created_by varchar(50) default null
 , updated_date datetime default null
 , updated_by varchar(50) default null
 , tenant_record_id int(11) unsigned default null
 , primary key(record_id)
-, unique index metric_unq (category_record_id, metric)
 );
-create index metrics_tenant_record_id on metrics(tenant_record_id);
+CREATE UNIQUE INDEX metric_unq on metrics(category_record_id, metric);
+CREATE INDEX metrics_tenant_record_id on metrics(tenant_record_id);
 
-create table timeline_chunks (
+DROP TABLE IF EXISTS timeline_chunks;
+CREATE TABLE timeline_chunks (
   record_id bigint not null auto_increment
 , source_record_id integer not null
 , metric_record_id integer not null
@@ -56,14 +60,15 @@ create table timeline_chunks (
 , account_record_id int(11) unsigned default null
 , tenant_record_id int(11) unsigned default null
 , primary key(record_id)
-, unique index source_record_id_timeline_chunk_metric_record_idx (source_record_id, metric_record_id, start_time, aggregation_level)
-, index valid_agg_host_start_time (not_valid, aggregation_level, source_record_id, metric_record_id, start_time)
 );
+CREATE UNIQUE INDEX source_record_id_timeline_chunk_metric_record_idx on timeline_chunks(source_record_id, metric_record_id, start_time, aggregation_level);
+CREATE INDEX valid_agg_host_start_time on timeline_chunks(not_valid, aggregation_level, source_record_id, metric_record_id, start_time);
 
-create table last_start_times (
+DROP TABLE IF EXISTS last_start_times;
+CREATE TABLE last_start_times (
   time_inserted int not null primary key
 , start_times mediumtext not null
 );
 
-insert into timeline_chunks(record_id, source_record_id, metric_record_id, sample_count, start_time, end_time, in_row_samples, blob_samples)
-values (0, 0, 0, 0, 0, 0, null, null);
+INSERT INTO timeline_chunks(record_id, source_record_id, metric_record_id, sample_count, start_time, end_time, in_row_samples, blob_samples)
+VALUES (0, 0, 0, 0, 0, 0, null, null);