killbill-memoizeit
Changes
beatrix/src/test/resources/log4j.xml 4(+4 -0)
invoice/src/test/resources/log4j.xml 36(+36 -0)
pom.xml 2(+1 -1)
util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java 1(+1 -0)
Details
diff --git a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
index 1f70152..1e00dba 100644
--- a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
@@ -89,18 +89,18 @@ public class DefaultAccountDao implements AccountDao {
public void create(final Account account) throws AccountApiException {
final String key = account.getExternalKey();
try {
+
accountSqlDao.inTransaction(new Transaction<Void, AccountSqlDao>() {
@Override
- public Void inTransaction(final AccountSqlDao accountSqlDao, final TransactionStatus status) throws AccountApiException, Bus.EventBusException {
- Account currentAccount = accountSqlDao.getAccountByKey(key);
+ public Void inTransaction(final AccountSqlDao transactionalDao, final TransactionStatus status) throws AccountApiException, Bus.EventBusException {
+ Account currentAccount = transactionalDao.getAccountByKey(key);
if (currentAccount != null) {
throw new AccountApiException(ErrorCode.ACCOUNT_ALREADY_EXISTS, key);
}
- accountSqlDao.create(account);
-
- saveTagsFromWithinTransaction(account, accountSqlDao, true);
- saveCustomFieldsFromWithinTransaction(account, accountSqlDao, true);
+ transactionalDao.create(account);
+ saveTagsFromWithinTransaction(account, transactionalDao, true);
+ saveCustomFieldsFromWithinTransaction(account, transactionalDao, true);
AccountCreationNotification creationEvent = new DefaultAccountCreationEvent(account);
eventBus.post(creationEvent);
return null;
@@ -210,7 +210,7 @@ public class DefaultAccountDao implements AccountDao {
List<Tag> tagList = account.getTagList();
if (tagList != null) {
- tagStoreDao.save(accountId, objectType, tagList);
+ tagStoreDao.batchSaveFromTransaction(accountId, objectType, tagList);
}
}
@@ -225,7 +225,7 @@ public class DefaultAccountDao implements AccountDao {
List<CustomField> fieldList = account.getFieldList();
if (fieldList != null) {
- fieldStoreDao.save(accountId, objectType, fieldList);
+ fieldStoreDao.batchSaveFromTransaction(accountId, objectType, fieldList);
}
}
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
index 83c699f..1f2adce 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
@@ -163,6 +163,7 @@ public class TestBasic {
}
private DateTime checkAndGetCTD(UUID subscriptionId) {
+
SubscriptionData subscription = (SubscriptionData) entitlementUserApi.getSubscriptionFromId(subscriptionId);
DateTime ctd = subscription.getChargedThroughDate();
assertNotNull(ctd);
@@ -206,13 +207,13 @@ public class TestBasic {
SubscriptionData subscription = (SubscriptionData) entitlementUserApi.createSubscription(bundle.getId(),
new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null);
assertNotNull(subscription);
-
assertTrue(busHandler.isCompleted(DELAY));
log.info("testSimple passed first busHandler checkpoint.");
//
// VERIFY CTD HAS BEEN SET
//
+
checkAndGetCTD(subscription.getId());
//
@@ -240,6 +241,7 @@ public class TestBasic {
busHandler.pushExpectedEvent(NextEvent.PHASE);
busHandler.pushExpectedEvent(NextEvent.INVOICE);
clock.setDeltaFromReality(AT_LEAST_ONE_MONTH_MS);
+
assertTrue(busHandler.isCompleted(DELAY));
//
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBusHandler.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBusHandler.java
index 54f4b4c..2cfd46d 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBusHandler.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBusHandler.java
@@ -55,7 +55,7 @@ public class TestBusHandler {
@Subscribe
public void handleEntitlementEvents(SubscriptionTransition event) {
- log.info(String.format("Got subscription event %s", event.toString()));
+ log.info(String.format("TestBusHandler Got subscription event %s", event.toString()));
switch (event.getTransitionType()) {
case MIGRATE_ENTITLEMENT:
assertEqualsNicely(NextEvent.MIGRATE_ENTITLEMENT);
@@ -101,7 +101,7 @@ public class TestBusHandler {
@Subscribe
public void handleInvoiceEvents(InvoiceCreationNotification event) {
- log.info(String.format("Got Invoice event %s", event.toString()));
+ log.info(String.format("TestBusHandler Got Invoice event %s", event.toString()));
assertEqualsNicely(NextEvent.INVOICE);
notifyIfStackEmpty();
@@ -130,13 +130,14 @@ public class TestBusHandler {
}
}
if (!completed) {
- log.debug("TestBusHandler did not complete in " + timeout + " ms");
+ Joiner joiner = Joiner.on(" ");
+ log.error("TestBusHandler did not complete in " + timeout + " ms, remaining events are " + joiner.join(nextExpectedEvent));
}
return completed;
}
private void notifyIfStackEmpty() {
- log.debug("notifyIfStackEmpty ENTER");
+ log.debug("TestBusHandler notifyIfStackEmpty ENTER");
synchronized (this) {
if (nextExpectedEvent.isEmpty()) {
log.debug("notifyIfStackEmpty EMPTY");
@@ -144,7 +145,7 @@ public class TestBusHandler {
notify();
}
}
- log.debug("notifyIfStackEmpty EXIT");
+ log.debug("TestBusHandler notifyIfStackEmpty EXIT");
}
private void assertEqualsNicely(NextEvent received) {
@@ -161,7 +162,7 @@ public class TestBusHandler {
}
if (!foundIt) {
Joiner joiner = Joiner.on(" ");
- System.err.println("Received event " + received + "; expected " + joiner.join(nextExpectedEvent));
+ log.error("TestBusHandler Received event " + received + "; expected " + joiner.join(nextExpectedEvent));
// System.exit(1);
}
}
beatrix/src/test/resources/log4j.xml 4(+4 -0)
diff --git a/beatrix/src/test/resources/log4j.xml b/beatrix/src/test/resources/log4j.xml
index 75abc76..ac530a1 100644
--- a/beatrix/src/test/resources/log4j.xml
+++ b/beatrix/src/test/resources/log4j.xml
@@ -29,6 +29,10 @@
<level value="info"/>
</logger>
+ <logger name="com.ning.billing.util.notificationq">
+ <level value="info"/>
+ </logger>
+
<root>
<priority value="info"/>
<appender-ref ref="stdout"/>
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 19eaad5..de8132e 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -145,7 +145,7 @@ public class DefaultInvoiceDao implements InvoiceDao {
List<InvoiceItem> invoiceItems = invoice.getInvoiceItems();
InvoiceItemSqlDao invoiceItemDao = invoiceDao.become(InvoiceItemSqlDao.class);
- invoiceItemDao.create(invoiceItems);
+ invoiceItemDao.batchCreateFromTransaction(invoiceItems);
notifyOfFutureBillingEvents(invoiceSqlDao, invoiceItems);
setChargedThroughDates(invoiceSqlDao, invoiceItems);
@@ -154,7 +154,7 @@ public class DefaultInvoiceDao implements InvoiceDao {
// STEPH Why do we need that? Are the payments not always null at this point?
List<InvoicePayment> invoicePayments = invoice.getPayments();
InvoicePaymentSqlDao invoicePaymentSqlDao = invoiceDao.become(InvoicePaymentSqlDao.class);
- invoicePaymentSqlDao.create(invoicePayments);
+ invoicePaymentSqlDao.batchCreateFromTransaction(invoicePayments);
InvoiceCreationNotification event;
event = new DefaultInvoiceCreationNotification(invoice.getId(), invoice.getAccountId(),
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceItemSqlDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceItemSqlDao.java
index 7b5476e..a76cd06 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceItemSqlDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceItemSqlDao.java
@@ -65,16 +65,18 @@ public interface InvoiceItemSqlDao extends EntityDao<InvoiceItem> {
@SqlUpdate
void update(@InvoiceItemBinder final InvoiceItem invoiceItem);
- @SqlBatch
- void create(@InvoiceItemBinder final List<InvoiceItem> items);
+ @SqlBatch(transactional=false)
+ void batchCreateFromTransaction(@InvoiceItemBinder final List<InvoiceItem> items);
@BindingAnnotation(InvoiceItemBinder.InvoiceItemBinderFactory.class)
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
public @interface InvoiceItemBinder {
public static class InvoiceItemBinderFactory implements BinderFactory {
+ @Override
public Binder build(Annotation annotation) {
return new Binder<InvoiceItemBinder, InvoiceItem>() {
+ @Override
public void bind(SQLStatement q, InvoiceItemBinder bind, InvoiceItem item) {
q.bind("id", item.getId().toString());
q.bind("invoiceId", item.getInvoiceId().toString());
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java
index 7a05d5c..7179ec1 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java
@@ -52,8 +52,8 @@ public interface InvoicePaymentSqlDao {
@SqlUpdate
public void create(@InvoicePaymentBinder InvoicePayment invoicePayment);
- @SqlBatch
- void create(@InvoicePaymentBinder List<InvoicePayment> items);
+ @SqlBatch(transactional=false)
+ void batchCreateFromTransaction(@InvoicePaymentBinder List<InvoicePayment> items);
@SqlUpdate
public void update(@InvoicePaymentBinder InvoicePayment invoicePayment);
@@ -85,7 +85,7 @@ public interface InvoicePaymentSqlDao {
final DateTime updatedDate = getDate(result, "updated_date");
return new InvoicePayment() {
- private DateTime now = new DateTime();
+ private final DateTime now = new DateTime();
@Override
public UUID getPaymentAttemptId() {
@@ -124,8 +124,10 @@ public interface InvoicePaymentSqlDao {
@Target({ElementType.PARAMETER})
public @interface InvoicePaymentBinder {
public static class InvoicePaymentBinderFactory implements BinderFactory {
+ @Override
public Binder build(Annotation annotation) {
return new Binder<InvoicePaymentBinder, InvoicePayment>() {
+ @Override
public void bind(SQLStatement q, InvoicePaymentBinder bind, InvoicePayment payment) {
q.bind("invoiceId", payment.getInvoiceId().toString());
q.bind("paymentAttemptId", payment.getPaymentAttemptId().toString());
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceItemSqlDao.sql.stg b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceItemSqlDao.sql.stg
index 5b37548..451ebe7 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceItemSqlDao.sql.stg
+++ b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceItemSqlDao.sql.stg
@@ -45,6 +45,12 @@ create() ::= <<
:recurringAmount, :recurringRate, :fixedAmount, :currency);
>>
+batchCreateFromTransaction() ::= <<
+ INSERT INTO invoice_items(<invoiceItemFields()>)
+ VALUES(:id, :invoiceId, :subscriptionId, :planName, :phaseName, :startDate, :endDate,
+ :recurringAmount, :recurringRate, :fixedAmount, :currency);
+>>
+
update() ::= <<
UPDATE invoice_items
SET invoice_id = :invoiceId, subscription_id = :subscriptionId, plan_name = :planName, phase_name = :phaseName,
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg
index a635456..2172573 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg
+++ b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg
@@ -15,6 +15,12 @@ create() ::= <<
VALUES(:invoiceId, :paymentAttemptId, :paymentAttemptDate, :amount, :currency, :createdDate, :updatedDate);
>>
+batchCreateFromTransaction() ::= <<
+ INSERT INTO invoice_payments(<invoicePaymentFields()>)
+ VALUES(:invoiceId, :paymentAttemptId, :paymentAttemptDate, :amount, :currency, :createdDate, :updatedDate);
+>>
+
+
update() ::= <<
UPDATE invoice_payments
SET payment_date = :paymentAttemptDate, amount = :amount, currency = :currency, created_date = :createdDate, updated_date = :updatedDate
invoice/src/test/resources/log4j.xml 36(+36 -0)
diff --git a/invoice/src/test/resources/log4j.xml b/invoice/src/test/resources/log4j.xml
new file mode 100644
index 0000000..33b9662
--- /dev/null
+++ b/invoice/src/test/resources/log4j.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2010-2011 Ning, Inc.
+ ~
+ ~ Ning licenses this file to you under the Apache License, version 2.0
+ ~ (the "License"); you may not use this file except in compliance with the
+ ~ License. You may obtain a copy of the License at:
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ ~ License for the specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="stdout" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%p %d{ISO8601} %X{trace} %t %c %m%n"/>
+ </layout>
+ </appender>
+
+
+ <logger name="com.ning.billing.invoice">
+ <level value="info"/>
+ </logger>
+
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="stdout"/>
+ </root>
+</log4j:configuration>
pom.xml 2(+1 -1)
diff --git a/pom.xml b/pom.xml
index 5ec08b5..d72a41e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -232,7 +232,7 @@
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
- <version>2.27</version>
+ <version>2.31.2</version>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
diff --git a/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java b/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java
index 680a826..14c123a 100644
--- a/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java
+++ b/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java
@@ -33,6 +33,7 @@ import org.skife.jdbi.v2.sqlobject.BinderFactory;
import org.skife.jdbi.v2.sqlobject.BindingAnnotation;
import org.skife.jdbi.v2.sqlobject.SqlBatch;
import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
@@ -42,10 +43,11 @@ import com.ning.billing.util.entity.EntityCollectionDao;
@ExternalizedSqlViaStringTemplate3
@RegisterMapper(FieldStoreDao.CustomFieldMapper.class)
-public interface FieldStoreDao extends EntityCollectionDao<CustomField>, Transmogrifier {
+public interface FieldStoreDao extends EntityCollectionDao<CustomField>, Transactional<FieldStoreDao>, Transmogrifier {
+
@Override
- @SqlBatch
- public void save(@Bind("objectId") final String objectId,
+ @SqlBatch(transactional=false)
+ public void batchSaveFromTransaction(@Bind("objectId") final String objectId,
@Bind("objectType") final String objectType,
@CustomFieldBinder final List<CustomField> entities);
@@ -65,8 +67,10 @@ public interface FieldStoreDao extends EntityCollectionDao<CustomField>, Transmo
@Target({ElementType.PARAMETER})
public @interface CustomFieldBinder {
public static class CustomFieldBinderFactory implements BinderFactory {
+ @Override
public Binder build(Annotation annotation) {
return new Binder<CustomFieldBinder, CustomField>() {
+ @Override
public void bind(SQLStatement q, CustomFieldBinder bind, CustomField customField) {
q.bind("id", customField.getId().toString());
q.bind("fieldName", customField.getName());
diff --git a/util/src/main/java/com/ning/billing/util/entity/EntityCollectionDao.java b/util/src/main/java/com/ning/billing/util/entity/EntityCollectionDao.java
index 8134203..11d149c 100644
--- a/util/src/main/java/com/ning/billing/util/entity/EntityCollectionDao.java
+++ b/util/src/main/java/com/ning/billing/util/entity/EntityCollectionDao.java
@@ -26,8 +26,9 @@ import java.util.List;
* @param <T>
*/
public interface EntityCollectionDao<T extends Entity> {
- @SqlBatch
- public void save(@Bind("objectId") final String objectId,
+
+ @SqlBatch(transactional=false)
+ public void batchSaveFromTransaction(@Bind("objectId") final String objectId,
@Bind("objectType") final String objectType,
@BindBean final List<T> entities);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index a297581..2f511ec 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -52,10 +52,10 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max, @Bind("queue_name") String queueName);
@SqlUpdate
- public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("notification_id") String eventId, @Bind("now") Date now);
+ public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("id") long id, @Bind("now") Date now);
@SqlUpdate
- public void clearNotification(@Bind("notification_id") String eventId, @Bind("owner") String owner);
+ public void clearNotification(@Bind("id") long id, @Bind("owner") String owner);
@SqlUpdate
public void insertNotification(@Bind(binder = NotificationSqlDaoBinder.class) Notification evt);
@@ -71,7 +71,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
@Override
public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, Notification evt) {
- stmt.bind("notification_id", evt.getId().toString());
+ stmt.bind("notification_id", evt.getUUID().toString());
stmt.bind("created_dt", getDate(new DateTime()));
stmt.bind("notification_key", evt.getNotificationKey());
stmt.bind("effective_dt", getDate(evt.getEffectiveDate()));
@@ -94,7 +94,8 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
public Notification map(int index, ResultSet r, StatementContext ctx)
throws SQLException {
- final UUID id = UUID.fromString(r.getString("notification_id"));
+ final long id = r.getLong("id");
+ final UUID uuid = UUID.fromString(r.getString("notification_id"));
final String notificationKey = r.getString("notification_key");
final String queueName = r.getString("queue_name");
final DateTime effectiveDate = getDate(r, "effective_dt");
@@ -102,7 +103,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final String processingOwner = r.getString("processing_owner");
final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
- return new DefaultNotification(id, processingOwner, queueName, nextAvailableDate,
+ return new DefaultNotification(id, uuid, processingOwner, queueName, nextAvailableDate,
processingState, notificationKey, effectiveDate);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 3c4c476..26e6c4e 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -22,7 +22,8 @@ import org.joda.time.DateTime;
public class DefaultNotification implements Notification {
- private final UUID id;
+ private final long id;
+ private final UUID uuid;
private final String owner;
private final String queueName;
private final DateTime nextAvailableDate;
@@ -31,11 +32,12 @@ public class DefaultNotification implements Notification {
private final DateTime effectiveDate;
- public DefaultNotification(UUID id, String owner, String queueName, DateTime nextAvailableDate,
+ public DefaultNotification(long id, UUID uuid, String owner, String queueName, DateTime nextAvailableDate,
NotificationLifecycleState lifecycleState,
String notificationKey, DateTime effectiveDate) {
super();
this.id = id;
+ this.uuid = uuid;
this.owner = owner;
this.queueName = queueName;
this.nextAvailableDate = nextAvailableDate;
@@ -44,13 +46,17 @@ public class DefaultNotification implements Notification {
this.effectiveDate = effectiveDate;
}
- public DefaultNotification(String queueName, String notificationKey, DateTime effectiveDate) {
- this(UUID.randomUUID(), null, queueName, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+ @Override
+ public long getId() {
+ return id;
}
+ public DefaultNotification(String queueName, String notificationKey, DateTime effectiveDate) {
+ this(-1L, UUID.randomUUID(), null, queueName, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+ }
@Override
- public UUID getId() {
- return id;
+ public UUID getUUID() {
+ return uuid;
}
@Override
@@ -101,4 +107,5 @@ public class DefaultNotification implements Notification {
public String getQueueName() {
return queueName;
}
+
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 3780ade..11c640a 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -17,14 +17,12 @@
package com.ning.billing.util.notificationq;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Date;
import java.util.List;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.Transaction;
-import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
@@ -33,20 +31,32 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
protected final NotificationSqlDao dao;
- public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+ public DefaultNotificationQueue(final IDBI dbi, Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
super(clock, svcName, queueName, handler, config);
this.dao = dbi.onDemand(NotificationSqlDao.class);
}
@Override
protected void doProcessEvents(final int sequenceId) {
+
+ logDebug("ENTER doProcessEvents");
List<Notification> notifications = getReadyNotifications(sequenceId);
- for (Notification cur : notifications) {
+ if (notifications.size() == 0) {
+ logDebug("EXIT doProcessEvents");
+ return;
+ }
+
+ logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
+
+ for (final Notification cur : notifications) {
nbProcessedEvents.incrementAndGet();
+ logDebug("handling notification %s, key = %s for time %s",
+ cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
handler.handleReadyNotification(cur.getNotificationKey());
+ clearNotification(cur);
+ logDebug("done handling notification %s, key = %s for time %s",
+ cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
}
- // If anything happens before we get to clear those notifications, somebody else will pick them up
- clearNotifications(notifications);
}
@Override
@@ -58,24 +68,8 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
}
- private void clearNotifications(final Collection<Notification> cleared) {
-
- log.debug(String.format("NotificationQueue %s clearEventsReady START cleared size = %d",
- getFullQName(),
- cleared.size()));
-
- dao.inTransaction(new Transaction<Void, NotificationSqlDao>() {
-
- @Override
- public Void inTransaction(NotificationSqlDao transactional,
- TransactionStatus status) throws Exception {
- for (Notification cur : cleared) {
- transactional.clearNotification(cur.getId().toString(), hostname);
- log.debug(String.format("NotificationQueue %s cleared events %s", getFullQName(), cur.getId()));
- }
- return null;
- }
- });
+ private void clearNotification(final Notification cleared) {
+ dao.clearNotification(cleared.getId(), hostname);
}
private List<Notification> getReadyNotifications(final int seqId) {
@@ -83,28 +77,22 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
final Date now = clock.getUTCNow().toDate();
final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
- log.debug(String.format("NotificationQueue %s getEventsReady START effectiveNow = %s", getFullQName(), now));
-
- List<Notification> input = dao.inTransaction(new Transaction<List<Notification>, NotificationSqlDao>() {
- @Override
- public List<Notification> inTransaction(NotificationSqlDao transactionalDao,
- TransactionStatus status) throws Exception {
- return transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents(), getFullQName());
- }
- });
+ List<Notification> input = dao.getReadyNotifications(now, config.getDaoMaxReadyEvents(), getFullQName());
List<Notification> claimedNotifications = new ArrayList<Notification>();
for (Notification cur : input) {
- final boolean claimed = (dao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
+ logDebug("about to claim notification %s, key = %s for time %s",
+ cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
+ final boolean claimed = (dao.claimNotification(hostname, nextAvailable, cur.getId(), now) == 1);
+ logDebug("claimed notification %s, key = %s for time %s result = %s",
+ cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate(), Boolean.valueOf(claimed));
if (claimed) {
claimedNotifications.add(cur);
- dao.insertClaimedHistory(seqId, hostname, now, cur.getId().toString());
+ dao.insertClaimedHistory(seqId, hostname, now, cur.getUUID().toString());
}
}
for (Notification cur : claimedNotifications) {
- log.debug(String.format("NotificationQueue %s claimed events %s",
- getFullQName(), cur.getId()));
if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
getFullQName(), cur, cur.getOwner()));
@@ -112,4 +100,11 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
}
return claimedNotifications;
}
+
+ private void logDebug(String format, Object...args) {
+ if (log.isDebugEnabled()) {
+ String realDebug = String.format(format, args);
+ log.debug(String.format("Thread %d [queue = %s] %s", Thread.currentThread().getId(), getFullQName(), realDebug));
+ }
+ }
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 91e7110..372d1f7 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -18,6 +18,7 @@ package com.ning.billing.util.notificationq;
import org.skife.jdbi.v2.IDBI;
import com.google.inject.Inject;
+import com.google.inject.name.Named;
import com.ning.billing.util.clock.Clock;
public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index 8749fa0..d59098b 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -23,13 +23,15 @@ import org.joda.time.DateTime;
public interface Notification extends NotificationLifecycle {
- public UUID getId();
+ public long getId();
+
+ public UUID getUUID();
public String getNotificationKey();
public DateTime getEffectiveDate();
-
+
public String getQueueName();
-
+
}
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/TagStoreSqlDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/TagStoreSqlDao.java
index 33dcdb0..bf4ed62 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/TagStoreSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/TagStoreSqlDao.java
@@ -16,38 +16,23 @@
package com.ning.billing.util.tag.dao;
-import java.lang.annotation.Annotation;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.sql.ResultSet;
-import java.sql.SQLException;
+
import java.util.List;
-import java.util.UUID;
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.SQLStatement;
-import org.skife.jdbi.v2.StatementContext;
+
import org.skife.jdbi.v2.sqlobject.Bind;
-import org.skife.jdbi.v2.sqlobject.Binder;
-import org.skife.jdbi.v2.sqlobject.BinderFactory;
-import org.skife.jdbi.v2.sqlobject.BindingAnnotation;
import org.skife.jdbi.v2.sqlobject.SqlBatch;
import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
-import org.skife.jdbi.v2.tweak.ResultSetMapper;
import com.ning.billing.util.entity.EntityCollectionDao;
-import com.ning.billing.util.tag.DescriptiveTag;
-import com.ning.billing.util.tag.DefaultTagDefinition;
import com.ning.billing.util.tag.Tag;
-import com.ning.billing.util.tag.TagDefinition;
@ExternalizedSqlViaStringTemplate3
@RegisterMapper(TagMapper.class)
-public interface TagStoreSqlDao extends EntityCollectionDao<Tag> {
+public interface TagStoreSqlDao extends EntityCollectionDao<Tag>, Transactional<TagStoreSqlDao> {
@Override
- @SqlBatch
- public void save(@Bind("objectId") final String objectId,
+ @SqlBatch(transactional=false)
+ public void batchSaveFromTransaction(@Bind("objectId") final String objectId,
@Bind("objectType") final String objectType,
@TagBinder final List<Tag> entities);
}
\ No newline at end of file
diff --git a/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg b/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg
index 883f61b..9d3e96e 100644
--- a/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg
@@ -1,6 +1,6 @@
group FieldStoreDao;
-save() ::= <<
+batchSaveFromTransaction() ::= <<
INSERT INTO custom_fields(id, object_id, object_type, field_name, field_value)
VALUES (:id, :objectId, :objectType, :fieldName, :fieldValue)
ON DUPLICATE KEY UPDATE
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index cf2ceb2..a0ef302 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -48,9 +48,8 @@ CREATE TABLE notifications (
processing_state varchar(14) DEFAULT 'AVAILABLE',
PRIMARY KEY(id)
) ENGINE=innodb;
-CREATE INDEX `idx_comp_where` ON notifications (`effective_dt`,`processing_state`,`processing_owner`,`processing_available_dt`);
-CREATE INDEX `idx_update` ON notifications (`notification_id`,`processing_state`,`processing_owner`,`processing_available_dt`);
-CREATE INDEX `idx_update1` ON notifications (`notification_id`,`processing_owner`);
+CREATE INDEX `idx_comp_where` ON notifications (`effective_dt`, `queue_name`, `processing_state`,`processing_owner`,`processing_available_dt`);
+CREATE INDEX `idx_update` ON notifications (`processing_state`,`processing_owner`,`processing_available_dt`);
CREATE INDEX `idx_get_ready` ON notifications (`effective_dt`,`created_dt`,`id`);
DROP TABLE IF EXISTS claimed_notifications;
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index efe7e4f..7a7ecab 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -2,8 +2,9 @@ group NotificationSqlDao;
getReadyNotifications(now, max) ::= <<
select
- notification_id
- , notification_key
+ id
+ , notification_id
+ , notification_key
, created_dt
, effective_dt
, queue_name
@@ -19,33 +20,31 @@ getReadyNotifications(now, max) ::= <<
order by
effective_dt asc
, created_dt asc
- , id asc
+ , id
limit :max
;
>>
-claimNotification(owner, next_available, notification_id, now) ::= <<
+claimNotification(owner, next_available, id, now) ::= <<
update notifications
set
processing_owner = :owner
, processing_available_dt = :next_available
, processing_state = 'IN_PROCESSING'
where
- notification_id = :notification_id
+ id = :id
and processing_state != 'PROCESSED'
and (processing_owner IS NULL OR processing_available_dt \<= :now)
;
>>
-clearNotification(notification_id, owner) ::= <<
+clearNotification(id, owner) ::= <<
update notifications
set
- processing_owner = NULL
- , processing_state = 'PROCESSED'
+ processing_state = 'PROCESSED'
where
- notification_id = :notification_id
- and processing_owner = :owner
+ id = :id
;
>>
diff --git a/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg
index 2d78f48..9d7ce5c 100644
--- a/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg
@@ -1,6 +1,6 @@
group TagStoreDao;
-save() ::= <<
+batchSaveFromTransaction() ::= <<
INSERT INTO tags(id, tag_definition_name, object_id, object_type, added_date, added_by)
VALUES (:id, :tagDefinitionName, :objectId, :objectType, :addedDate, :addedBy)
ON DUPLICATE KEY UPDATE
diff --git a/util/src/test/java/com/ning/billing/dbi/DBIProvider.java b/util/src/test/java/com/ning/billing/dbi/DBIProvider.java
index 07ee518..d83c033 100644
--- a/util/src/test/java/com/ning/billing/dbi/DBIProvider.java
+++ b/util/src/test/java/com/ning/billing/dbi/DBIProvider.java
@@ -55,7 +55,7 @@ public class DBIProvider implements Provider<IDBI>
dbConfig.setMaxConnectionsPerPartition(config.getMaxActive());
dbConfig.setConnectionTimeout(config.getConnectionTimeout().getPeriod(), config.getConnectionTimeout().getUnit());
dbConfig.setPartitionCount(1);
- dbConfig.setDefaultTransactionIsolation("READ_COMMITTED");
+ dbConfig.setDefaultTransactionIsolation("REPEATABLE_READ");
dbConfig.setDisableJMX(false);
final BoneCPDataSource ds = new BoneCPDataSource(dbConfig);
diff --git a/util/src/test/java/com/ning/billing/util/customfield/TestFieldStore.java b/util/src/test/java/com/ning/billing/util/customfield/TestFieldStore.java
index 9619bd8..4512a5d 100644
--- a/util/src/test/java/com/ning/billing/util/customfield/TestFieldStore.java
+++ b/util/src/test/java/com/ning/billing/util/customfield/TestFieldStore.java
@@ -20,6 +20,8 @@ import java.io.IOException;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
@@ -62,32 +64,48 @@ public class TestFieldStore {
@Test
public void testFieldStore() {
- UUID id = UUID.randomUUID();
- String objectType = "Test widget";
+ final UUID id = UUID.randomUUID();
+ final String objectType = "Test widget";
- FieldStore fieldStore = new DefaultFieldStore(id, objectType);
+ final FieldStore fieldStore1 = new DefaultFieldStore(id, objectType);
String fieldName = "TestField1";
String fieldValue = "Kitty Hawk";
- fieldStore.setValue(fieldName, fieldValue);
+ fieldStore1.setValue(fieldName, fieldValue);
FieldStoreDao fieldStoreDao = dbi.onDemand(FieldStoreDao.class);
- fieldStoreDao.save(id.toString(), objectType, fieldStore.getEntityList());
+ fieldStoreDao.inTransaction(new Transaction<Void, FieldStoreDao>() {
+ @Override
+ public Void inTransaction(FieldStoreDao transactional,
+ TransactionStatus status) throws Exception {
+ transactional.batchSaveFromTransaction(id.toString(), objectType, fieldStore1.getEntityList());
+ return null;
+ }
+ });
- fieldStore = DefaultFieldStore.create(id, objectType);
- fieldStore.add(fieldStoreDao.load(id.toString(), objectType));
- assertEquals(fieldStore.getValue(fieldName), fieldValue);
+ final FieldStore fieldStore2 = DefaultFieldStore.create(id, objectType);
+ fieldStore2.add(fieldStoreDao.load(id.toString(), objectType));
- fieldValue = "Cape Canaveral";
- fieldStore.setValue(fieldName, fieldValue);
- assertEquals(fieldStore.getValue(fieldName), fieldValue);
- fieldStoreDao.save(id.toString(), objectType, fieldStore.getEntityList());
-
- fieldStore = DefaultFieldStore.create(id, objectType);
- assertEquals(fieldStore.getValue(fieldName), null);
- fieldStore.add(fieldStoreDao.load(id.toString(), objectType));
+ assertEquals(fieldStore2.getValue(fieldName), fieldValue);
- assertEquals(fieldStore.getValue(fieldName), fieldValue);
+ fieldValue = "Cape Canaveral";
+ fieldStore2.setValue(fieldName, fieldValue);
+ assertEquals(fieldStore2.getValue(fieldName), fieldValue);
+ fieldStoreDao.inTransaction(new Transaction<Void, FieldStoreDao>() {
+ @Override
+ public Void inTransaction(FieldStoreDao transactional,
+ TransactionStatus status) throws Exception {
+ transactional.batchSaveFromTransaction(id.toString(), objectType, fieldStore2.getEntityList());
+ return null;
+ }
+ });
+
+
+ final FieldStore fieldStore3 = DefaultFieldStore.create(id, objectType);
+ assertEquals(fieldStore3.getValue(fieldName), null);
+ fieldStore3.add(fieldStoreDao.load(id.toString(), objectType));
+
+ assertEquals(fieldStore3.getValue(fieldName), fieldValue);
}
}
diff --git a/util/src/test/java/com/ning/billing/util/globalLocker/TestMysqlGlobalLocker.java b/util/src/test/java/com/ning/billing/util/globalLocker/TestMysqlGlobalLocker.java
new file mode 100644
index 0000000..674efa3
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/globalLocker/TestMysqlGlobalLocker.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.globalLocker;
+
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.IDBI;
+import org.testng.annotations.Test;
+
+public class TestMysqlGlobalLocker {
+
+ // Used as a manual test to validate the simple DAO by stepping through that locking is done and release correctly
+ @Test(enabled=false)
+ public void testSimpleLocking() {
+ IDBI dbi = new DBI("jdbc:mysql://localhost:3306/killbill?createDatabaseIfNotExist=true", "root", "root");
+ GlobalLocker lock = new MySqlGlobalLocker(dbi);
+ lock.lockWithNumberOfTries("test-lock", 3);
+ System.out.println("youpihh!");
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index d744caf..cb01e00 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -120,23 +120,23 @@ public class TestNotificationSqlDao {
assertEquals(notification.getNextAvailableDate(), null);
DateTime nextAvailable = now.plusMinutes(5);
- int res = dao.claimNotification(ownerId, nextAvailable.toDate(), notification.getId().toString(), now.toDate());
+ int res = dao.claimNotification(ownerId, nextAvailable.toDate(), notification.getId(), now.toDate());
assertEquals(res, 1);
- dao.insertClaimedHistory(sequenceId.incrementAndGet(), ownerId, now.toDate(), notification.getId().toString());
+ dao.insertClaimedHistory(sequenceId.incrementAndGet(), ownerId, now.toDate(), notification.getUUID().toString());
- notification = fetchNotification(notification.getId().toString());
+ notification = fetchNotification(notification.getUUID().toString());
assertEquals(notification.getNotificationKey(), notificationKey);
validateDate(notification.getEffectiveDate(), effDt);
assertEquals(notification.getOwner().toString(), ownerId);
assertEquals(notification.getProcessingState(), NotificationLifecycleState.IN_PROCESSING);
validateDate(notification.getNextAvailableDate(), nextAvailable);
- dao.clearNotification(notification.getId().toString(), ownerId);
+ dao.clearNotification(notification.getId(), ownerId);
- notification = fetchNotification(notification.getId().toString());
+ notification = fetchNotification(notification.getUUID().toString());
assertEquals(notification.getNotificationKey(), notificationKey);
validateDate(notification.getEffectiveDate(), effDt);
- assertEquals(notification.getOwner(), null);
+ //assertEquals(notification.getOwner(), null);
assertEquals(notification.getProcessingState(), NotificationLifecycleState.PROCESSED);
validateDate(notification.getNextAvailableDate(), nextAvailable);
@@ -148,7 +148,8 @@ public class TestNotificationSqlDao {
@Override
public Notification withHandle(Handle handle) throws Exception {
Notification res = handle.createQuery(" select" +
- " notification_id" +
+ " id " +
+ ", notification_id" +
", notification_key" +
", created_dt" +
", effective_dt" +
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 0fe65ec..0ebf68f 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -75,7 +75,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
for (Notification cur : readyNotifications) {
handler.handleReadyNotification(cur.getNotificationKey());
- DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+ DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
oldNotifications.add(cur);
processedNotifications.add(processedNotification);
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 6db48d0..eac2d78 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -48,6 +48,8 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.google.inject.name.Names;
import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
@@ -57,8 +59,8 @@ import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
@Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
public class TestNotificationQueue {
Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
- @Inject
- private IDBI dbi;
+ @Inject
+ private IDBI dbi;
@Inject
MysqlTestingHelper helper;
@@ -67,7 +69,7 @@ public class TestNotificationQueue {
private Clock clock;
private DummySqlTest dao;
-
+
private int eventsReceived;
// private NotificationQueue queue;
@@ -107,7 +109,7 @@ public class TestNotificationQueue {
/**
* Test that we can post a notification in the future from a transaction and get the notification
* callback with the correct key when the time is ready
- * @throws Exception
+ * @throws Exception
*/
@Test(groups={"fast"}, enabled = true)
public void testSimpleNotification() throws Exception {
@@ -254,20 +256,20 @@ public class TestNotificationQueue {
assertEquals(success, true);
}
-
+
/**
* Test that we can post a notification in the future from a transaction and get the notification
* callback with the correct key when the time is ready
- * @throws Exception
+ * @throws Exception
*/
@Test(groups={"fast"}, enabled = true)
public void testMultipleHandlerNotification() throws Exception {
final Map<String, Boolean> expectedNotificationsFred = new TreeMap<String, Boolean>();
final Map<String, Boolean> expectedNotificationsBarney = new TreeMap<String, Boolean>();
-
- NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock);
-
+
+ NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock);
+
NotificationConfig config=new NotificationConfig() {
@Override
public boolean isNotificationProcessingOff() {
@@ -286,8 +288,8 @@ public class TestNotificationQueue {
return 60000;
}
};
-
-
+
+
final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
@Override
public void handleReadyNotification(String notificationKey) {
@@ -297,7 +299,7 @@ public class TestNotificationQueue {
}
},
config);
-
+
final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
@Override
public void handleReadyNotification(String notificationKey) {
@@ -310,8 +312,8 @@ public class TestNotificationQueue {
queueFred.startQueue();
// We don't start Barney so it can never pick up notifications
-
-
+
+
final UUID key = UUID.randomUUID();
final DummyObject obj = new DummyObject("foo", key);
final DateTime now = new DateTime();
@@ -323,7 +325,7 @@ public class TestNotificationQueue {
}
};
-
+
final NotificationKey notificationKeyBarney = new NotificationKey() {
@Override
public String toString() {
@@ -372,7 +374,7 @@ public class TestNotificationQueue {
Assert.assertTrue(expectedNotificationsFred.get(notificationKeyFred.toString()));
Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney.toString()));
-
+
}
NotificationConfig getNotificationConfig(final boolean off,
@@ -408,6 +410,8 @@ public class TestNotificationQueue {
bind(MysqlTestingHelper.class).toInstance(helper);
IDBI dbi = helper.getDBI();
bind(IDBI.class).toInstance(dbi);
+ IDBI otherDbi = helper.getDBI();
+ bind(IDBI.class).annotatedWith(Names.named("global-lock")).toInstance(otherDbi);
/*
bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
diff --git a/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java b/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java
index 633095f..9e21e05 100644
--- a/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java
+++ b/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java
@@ -21,6 +21,8 @@ import java.util.List;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
@@ -52,7 +54,7 @@ public class TestTagStore {
private TagStoreModuleMock module;
private TagStoreSqlDao tagStoreSqlDao;
private TagDefinitionDao tagDefinitionDao;
- private Logger log = LoggerFactory.getLogger(TestTagStore.class);
+ private final Logger log = LoggerFactory.getLogger(TestTagStore.class);
@BeforeClass(alwaysRun = true)
protected void setup() throws IOException {
@@ -86,6 +88,17 @@ public class TestTagStore {
module.stopDb();
}
+ private void saveTags(final TagStoreSqlDao dao, final String objectType, final String accountId, final List<Tag> tagList) {
+ dao.inTransaction(new Transaction<Void, TagStoreSqlDao>() {
+ @Override
+ public Void inTransaction(TagStoreSqlDao transactional,
+ TransactionStatus status) throws Exception {
+ dao.batchSaveFromTransaction(accountId.toString(), objectType, tagList);
+ return null;
+ }
+ });
+ }
+
@Test
public void testTagCreationAndRetrieval() {
UUID accountId = UUID.randomUUID();
@@ -95,7 +108,7 @@ public class TestTagStore {
tagStore.add(tag);
TagStoreSqlDao dao = dbi.onDemand(TagStoreSqlDao.class);
- dao.save(accountId.toString(), ACCOUNT_TYPE, tagStore.getEntityList());
+ saveTags(dao, ACCOUNT_TYPE, accountId.toString(), tagStore.getEntityList());
List<Tag> savedTags = dao.load(accountId.toString(), ACCOUNT_TYPE);
assertEquals(savedTags.size(), 1);
@@ -107,6 +120,7 @@ public class TestTagStore {
assertEquals(savedTag.getId(), tag.getId());
}
+
@Test
public void testControlTagCreation() {
UUID accountId = UUID.randomUUID();
@@ -117,7 +131,7 @@ public class TestTagStore {
assertEquals(tagStore.generateInvoice(), false);
List<Tag> tagList = tagStore.getEntityList();
- tagStoreSqlDao.save(accountId.toString(), ACCOUNT_TYPE, tagList);
+ saveTags(tagStoreSqlDao, ACCOUNT_TYPE, accountId.toString(), tagList);
tagStore.clear();
assertEquals(tagStore.getEntityList().size(), 0);
@@ -147,7 +161,7 @@ public class TestTagStore {
assertEquals(tagStore.generateInvoice(), true);
List<Tag> tagList = tagStore.getEntityList();
- tagStoreSqlDao.save(accountId.toString(), ACCOUNT_TYPE, tagList);
+ saveTags(tagStoreSqlDao, ACCOUNT_TYPE, accountId.toString(), tagList);
tagStore.clear();
assertEquals(tagStore.getEntityList().size(), 0);
@@ -181,7 +195,7 @@ public class TestTagStore {
assertEquals(tagStore.generateInvoice(), false);
List<Tag> tagList = tagStore.getEntityList();
- tagStoreSqlDao.save(accountId.toString(), ACCOUNT_TYPE, tagList);
+ saveTags(tagStoreSqlDao, ACCOUNT_TYPE, accountId.toString(), tagList);
tagStore.clear();
assertEquals(tagStore.getEntityList().size(), 0);
@@ -244,7 +258,8 @@ public class TestTagStore {
Tag tag = new DescriptiveTag(tagDefinition, "test", clock.getUTCNow());
tagStore.add(tag);
- tagStoreSqlDao.save(objectId.toString(), objectType, tagStore.getEntityList());
+ saveTags(tagStoreSqlDao, objectType, objectId.toString(), tagStore.getEntityList());
+
List<Tag> tags = tagStoreSqlDao.load(objectId.toString(), objectType);
assertEquals(tags.size(), 1);
@@ -269,7 +284,8 @@ public class TestTagStore {
Tag tag = new DescriptiveTag(tagDefinition, "test", clock.getUTCNow());
tagStore.add(tag);
- tagStoreSqlDao.save(objectId.toString(), objectType, tagStore.getEntityList());
+ saveTags(tagStoreSqlDao, objectType, objectId.toString(), tagStore.getEntityList());
+
List<Tag> tags = tagStoreSqlDao.load(objectId.toString(), objectType);
assertEquals(tags.size(), 1);