killbill-memoizeit

Merge branch 'inv-ent-integration' of git@github.com:ning/killbill

2/10/2012 8:27:41 PM

Changes

pom.xml 2(+1 -1)

Details

diff --git a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
index 1f70152..1e00dba 100644
--- a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
@@ -89,18 +89,18 @@ public class DefaultAccountDao implements AccountDao {
     public void create(final Account account) throws AccountApiException {
         final String key = account.getExternalKey();
         try {
+
             accountSqlDao.inTransaction(new Transaction<Void, AccountSqlDao>() {
                 @Override
-                public Void inTransaction(final AccountSqlDao accountSqlDao, final TransactionStatus status) throws AccountApiException, Bus.EventBusException {
-                    Account currentAccount = accountSqlDao.getAccountByKey(key);
+                public Void inTransaction(final AccountSqlDao transactionalDao, final TransactionStatus status) throws AccountApiException, Bus.EventBusException {
+                    Account currentAccount = transactionalDao.getAccountByKey(key);
                     if (currentAccount != null) {
                         throw new AccountApiException(ErrorCode.ACCOUNT_ALREADY_EXISTS, key);
                     }
-                    accountSqlDao.create(account);
-
-                    saveTagsFromWithinTransaction(account, accountSqlDao, true);
-                    saveCustomFieldsFromWithinTransaction(account, accountSqlDao, true);
+                    transactionalDao.create(account);
 
+                    saveTagsFromWithinTransaction(account, transactionalDao, true);
+                    saveCustomFieldsFromWithinTransaction(account, transactionalDao, true);
                     AccountCreationNotification creationEvent = new DefaultAccountCreationEvent(account);
                     eventBus.post(creationEvent);
                     return null;
@@ -210,7 +210,7 @@ public class DefaultAccountDao implements AccountDao {
 
         List<Tag> tagList = account.getTagList();
         if (tagList != null) {
-            tagStoreDao.save(accountId, objectType, tagList);
+            tagStoreDao.batchSaveFromTransaction(accountId, objectType, tagList);
         }
     }
 
@@ -225,7 +225,7 @@ public class DefaultAccountDao implements AccountDao {
 
         List<CustomField> fieldList = account.getFieldList();
         if (fieldList != null) {
-            fieldStoreDao.save(accountId, objectType, fieldList);
+            fieldStoreDao.batchSaveFromTransaction(accountId, objectType, fieldList);
         }
     }
 
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
index 83c699f..1f2adce 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
@@ -163,6 +163,7 @@ public class TestBasic {
     }
 
     private DateTime checkAndGetCTD(UUID subscriptionId) {
+
         SubscriptionData subscription = (SubscriptionData) entitlementUserApi.getSubscriptionFromId(subscriptionId);
         DateTime ctd = subscription.getChargedThroughDate();
         assertNotNull(ctd);
@@ -206,13 +207,13 @@ public class TestBasic {
         SubscriptionData subscription = (SubscriptionData) entitlementUserApi.createSubscription(bundle.getId(),
                 new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null);
         assertNotNull(subscription);
-
         assertTrue(busHandler.isCompleted(DELAY));
         log.info("testSimple passed first busHandler checkpoint.");
 
         //
         // VERIFY CTD HAS BEEN SET
         //
+
         checkAndGetCTD(subscription.getId());
 
         //
@@ -240,6 +241,7 @@ public class TestBasic {
         busHandler.pushExpectedEvent(NextEvent.PHASE);
         busHandler.pushExpectedEvent(NextEvent.INVOICE);
         clock.setDeltaFromReality(AT_LEAST_ONE_MONTH_MS);
+
         assertTrue(busHandler.isCompleted(DELAY));
 
         //
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBusHandler.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBusHandler.java
index 54f4b4c..2cfd46d 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBusHandler.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBusHandler.java
@@ -55,7 +55,7 @@ public class TestBusHandler {
 
     @Subscribe
     public void handleEntitlementEvents(SubscriptionTransition event) {
-        log.info(String.format("Got subscription event %s", event.toString()));
+        log.info(String.format("TestBusHandler Got subscription event %s", event.toString()));
         switch (event.getTransitionType()) {
         case MIGRATE_ENTITLEMENT:
             assertEqualsNicely(NextEvent.MIGRATE_ENTITLEMENT);
@@ -101,7 +101,7 @@ public class TestBusHandler {
 
     @Subscribe
     public void handleInvoiceEvents(InvoiceCreationNotification event) {
-        log.info(String.format("Got Invoice event %s", event.toString()));
+        log.info(String.format("TestBusHandler Got Invoice event %s", event.toString()));
         assertEqualsNicely(NextEvent.INVOICE);
         notifyIfStackEmpty();
 
@@ -130,13 +130,14 @@ public class TestBusHandler {
             }
         }
         if (!completed) {
-            log.debug("TestBusHandler did not complete in " + timeout + " ms");
+            Joiner joiner = Joiner.on(" ");
+            log.error("TestBusHandler did not complete in " + timeout + " ms, remaining events are " + joiner.join(nextExpectedEvent));
         }
         return completed;
     }
 
     private void notifyIfStackEmpty() {
-        log.debug("notifyIfStackEmpty ENTER");
+        log.debug("TestBusHandler notifyIfStackEmpty ENTER");
         synchronized (this) {
             if (nextExpectedEvent.isEmpty()) {
                 log.debug("notifyIfStackEmpty EMPTY");
@@ -144,7 +145,7 @@ public class TestBusHandler {
                 notify();
             }
         }
-        log.debug("notifyIfStackEmpty EXIT");
+        log.debug("TestBusHandler notifyIfStackEmpty EXIT");
     }
 
     private void assertEqualsNicely(NextEvent received) {
@@ -161,7 +162,7 @@ public class TestBusHandler {
         }
         if (!foundIt) {
             Joiner joiner = Joiner.on(" ");
-            System.err.println("Received event " + received + "; expected " + joiner.join(nextExpectedEvent));
+            log.error("TestBusHandler Received event " + received + "; expected " + joiner.join(nextExpectedEvent));
             // System.exit(1);
         }
     }
diff --git a/beatrix/src/test/resources/log4j.xml b/beatrix/src/test/resources/log4j.xml
index 75abc76..ac530a1 100644
--- a/beatrix/src/test/resources/log4j.xml
+++ b/beatrix/src/test/resources/log4j.xml
@@ -29,6 +29,10 @@
         <level value="info"/>
     </logger>
 
+    <logger name="com.ning.billing.util.notificationq">
+        <level value="info"/>
+    </logger>
+
     <root>
         <priority value="info"/>
         <appender-ref ref="stdout"/>
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 19eaad5..de8132e 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -145,7 +145,7 @@ public class DefaultInvoiceDao implements InvoiceDao {
 
                     List<InvoiceItem> invoiceItems = invoice.getInvoiceItems();
                     InvoiceItemSqlDao invoiceItemDao = invoiceDao.become(InvoiceItemSqlDao.class);
-                    invoiceItemDao.create(invoiceItems);
+                    invoiceItemDao.batchCreateFromTransaction(invoiceItems);
 
                     notifyOfFutureBillingEvents(invoiceSqlDao, invoiceItems);
                     setChargedThroughDates(invoiceSqlDao, invoiceItems);
@@ -154,7 +154,7 @@ public class DefaultInvoiceDao implements InvoiceDao {
                     // STEPH Why do we need that? Are the payments not always null at this point?
                     List<InvoicePayment> invoicePayments = invoice.getPayments();
                     InvoicePaymentSqlDao invoicePaymentSqlDao = invoiceDao.become(InvoicePaymentSqlDao.class);
-                    invoicePaymentSqlDao.create(invoicePayments);
+                    invoicePaymentSqlDao.batchCreateFromTransaction(invoicePayments);
 
                     InvoiceCreationNotification event;
                     event = new DefaultInvoiceCreationNotification(invoice.getId(), invoice.getAccountId(),
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceItemSqlDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceItemSqlDao.java
index 7b5476e..a76cd06 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceItemSqlDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceItemSqlDao.java
@@ -65,16 +65,18 @@ public interface InvoiceItemSqlDao extends EntityDao<InvoiceItem> {
     @SqlUpdate
     void update(@InvoiceItemBinder final InvoiceItem invoiceItem);
 
-    @SqlBatch
-    void create(@InvoiceItemBinder final List<InvoiceItem> items);
+    @SqlBatch(transactional=false)
+    void batchCreateFromTransaction(@InvoiceItemBinder final List<InvoiceItem> items);
 
     @BindingAnnotation(InvoiceItemBinder.InvoiceItemBinderFactory.class)
     @Retention(RetentionPolicy.RUNTIME)
     @Target({ElementType.PARAMETER})
     public @interface InvoiceItemBinder {
         public static class InvoiceItemBinderFactory implements BinderFactory {
+            @Override
             public Binder build(Annotation annotation) {
                 return new Binder<InvoiceItemBinder, InvoiceItem>() {
+                    @Override
                     public void bind(SQLStatement q, InvoiceItemBinder bind, InvoiceItem item) {
                         q.bind("id", item.getId().toString());
                         q.bind("invoiceId", item.getInvoiceId().toString());
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java
index 7a05d5c..7179ec1 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java
@@ -52,8 +52,8 @@ public interface InvoicePaymentSqlDao {
     @SqlUpdate
     public void create(@InvoicePaymentBinder  InvoicePayment invoicePayment);
 
-    @SqlBatch
-    void create(@InvoicePaymentBinder List<InvoicePayment> items);
+    @SqlBatch(transactional=false)
+    void batchCreateFromTransaction(@InvoicePaymentBinder List<InvoicePayment> items);
 
     @SqlUpdate
     public void update(@InvoicePaymentBinder  InvoicePayment invoicePayment);
@@ -85,7 +85,7 @@ public interface InvoicePaymentSqlDao {
             final DateTime updatedDate = getDate(result, "updated_date");
 
             return new InvoicePayment() {
-                private  DateTime now = new DateTime();
+                private final  DateTime now = new DateTime();
 
                 @Override
                 public UUID getPaymentAttemptId() {
@@ -124,8 +124,10 @@ public interface InvoicePaymentSqlDao {
     @Target({ElementType.PARAMETER})
     public @interface InvoicePaymentBinder {
         public static class InvoicePaymentBinderFactory implements BinderFactory {
+            @Override
             public Binder build(Annotation annotation) {
                 return new Binder<InvoicePaymentBinder, InvoicePayment>() {
+                    @Override
                     public void bind(SQLStatement q, InvoicePaymentBinder bind, InvoicePayment payment) {
                         q.bind("invoiceId", payment.getInvoiceId().toString());
                         q.bind("paymentAttemptId", payment.getPaymentAttemptId().toString());
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceItemSqlDao.sql.stg b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceItemSqlDao.sql.stg
index 5b37548..451ebe7 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceItemSqlDao.sql.stg
+++ b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceItemSqlDao.sql.stg
@@ -45,6 +45,12 @@ create() ::= <<
          :recurringAmount, :recurringRate, :fixedAmount, :currency);
 >>
 
+batchCreateFromTransaction() ::= <<
+  INSERT INTO invoice_items(<invoiceItemFields()>)
+  VALUES(:id, :invoiceId, :subscriptionId, :planName, :phaseName, :startDate, :endDate,
+         :recurringAmount, :recurringRate, :fixedAmount, :currency);
+>>
+
 update() ::= <<
   UPDATE invoice_items
   SET invoice_id = :invoiceId, subscription_id = :subscriptionId, plan_name = :planName, phase_name = :phaseName,
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg
index a635456..2172573 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg
+++ b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg
@@ -15,6 +15,12 @@ create() ::= <<
   VALUES(:invoiceId, :paymentAttemptId, :paymentAttemptDate, :amount, :currency, :createdDate, :updatedDate);
 >>
 
+batchCreateFromTransaction() ::= <<
+  INSERT INTO invoice_payments(<invoicePaymentFields()>)
+  VALUES(:invoiceId, :paymentAttemptId, :paymentAttemptDate, :amount, :currency, :createdDate, :updatedDate);
+>>
+
+
 update() ::= <<
   UPDATE invoice_payments
   SET payment_date = :paymentAttemptDate, amount = :amount, currency = :currency, created_date = :createdDate, updated_date = :updatedDate
diff --git a/invoice/src/test/resources/log4j.xml b/invoice/src/test/resources/log4j.xml
new file mode 100644
index 0000000..33b9662
--- /dev/null
+++ b/invoice/src/test/resources/log4j.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2010-2011 Ning, Inc.
+  ~
+  ~ Ning licenses this file to you under the Apache License, version 2.0
+  ~ (the "License"); you may not use this file except in compliance with the
+  ~ License.  You may obtain a copy of the License at:
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+  ~ License for the specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+    <appender name="stdout" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%p	%d{ISO8601}	%X{trace}	%t	%c	%m%n"/>
+        </layout>
+    </appender>
+
+
+    <logger name="com.ning.billing.invoice">
+        <level value="info"/>
+    </logger>
+
+    <root>
+        <priority value="info"/>
+        <appender-ref ref="stdout"/>
+    </root>
+</log4j:configuration>

pom.xml 2(+1 -1)

diff --git a/pom.xml b/pom.xml
index 5ec08b5..d72a41e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -232,7 +232,7 @@
             <dependency>
                 <groupId>org.jdbi</groupId>
                 <artifactId>jdbi</artifactId>
-                <version>2.27</version>
+                <version>2.31.2</version>
             </dependency>
             <dependency>
                 <groupId>org.skife.config</groupId>
diff --git a/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java b/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java
index 680a826..14c123a 100644
--- a/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java
+++ b/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java
@@ -33,6 +33,7 @@ import org.skife.jdbi.v2.sqlobject.BinderFactory;
 import org.skife.jdbi.v2.sqlobject.BindingAnnotation;
 import org.skife.jdbi.v2.sqlobject.SqlBatch;
 import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
 import org.skife.jdbi.v2.tweak.ResultSetMapper;
@@ -42,10 +43,11 @@ import com.ning.billing.util.entity.EntityCollectionDao;
 
 @ExternalizedSqlViaStringTemplate3
 @RegisterMapper(FieldStoreDao.CustomFieldMapper.class)
-public interface FieldStoreDao extends EntityCollectionDao<CustomField>, Transmogrifier {
+public interface FieldStoreDao extends EntityCollectionDao<CustomField>, Transactional<FieldStoreDao>, Transmogrifier {
+
     @Override
-    @SqlBatch
-    public void save(@Bind("objectId") final String objectId,
+    @SqlBatch(transactional=false)
+    public void batchSaveFromTransaction(@Bind("objectId") final String objectId,
                      @Bind("objectType") final String objectType,
                      @CustomFieldBinder final List<CustomField> entities);
 
@@ -65,8 +67,10 @@ public interface FieldStoreDao extends EntityCollectionDao<CustomField>, Transmo
     @Target({ElementType.PARAMETER})
     public @interface CustomFieldBinder {
         public static class CustomFieldBinderFactory implements BinderFactory {
+            @Override
             public Binder build(Annotation annotation) {
                 return new Binder<CustomFieldBinder, CustomField>() {
+                    @Override
                     public void bind(SQLStatement q, CustomFieldBinder bind, CustomField customField) {
                         q.bind("id", customField.getId().toString());
                         q.bind("fieldName", customField.getName());
diff --git a/util/src/main/java/com/ning/billing/util/entity/EntityCollectionDao.java b/util/src/main/java/com/ning/billing/util/entity/EntityCollectionDao.java
index 8134203..11d149c 100644
--- a/util/src/main/java/com/ning/billing/util/entity/EntityCollectionDao.java
+++ b/util/src/main/java/com/ning/billing/util/entity/EntityCollectionDao.java
@@ -26,8 +26,9 @@ import java.util.List;
  * @param <T>
  */
 public interface EntityCollectionDao<T extends Entity> {
-    @SqlBatch
-    public void save(@Bind("objectId") final String objectId,
+
+    @SqlBatch(transactional=false)
+    public void batchSaveFromTransaction(@Bind("objectId") final String objectId,
                      @Bind("objectType") final String objectType,
                      @BindBean final List<T> entities);
 
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index a297581..2f511ec 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -52,10 +52,10 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
     public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max, @Bind("queue_name") String queueName);
 
     @SqlUpdate
-    public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("notification_id") String eventId, @Bind("now") Date now);
+    public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("id") long id, @Bind("now") Date now);
 
     @SqlUpdate
-    public void clearNotification(@Bind("notification_id") String eventId, @Bind("owner") String owner);
+    public void clearNotification(@Bind("id") long id, @Bind("owner") String owner);
 
     @SqlUpdate
     public void insertNotification(@Bind(binder = NotificationSqlDaoBinder.class) Notification evt);
@@ -71,7 +71,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
 
         @Override
         public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, Notification evt) {
-            stmt.bind("notification_id", evt.getId().toString());
+            stmt.bind("notification_id", evt.getUUID().toString());
             stmt.bind("created_dt", getDate(new DateTime()));
             stmt.bind("notification_key", evt.getNotificationKey());
             stmt.bind("effective_dt", getDate(evt.getEffectiveDate()));
@@ -94,7 +94,8 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
         public Notification map(int index, ResultSet r, StatementContext ctx)
         throws SQLException {
 
-            final UUID id = UUID.fromString(r.getString("notification_id"));
+            final long id = r.getLong("id");
+            final UUID uuid = UUID.fromString(r.getString("notification_id"));
             final String notificationKey = r.getString("notification_key");
             final String queueName = r.getString("queue_name");
             final DateTime effectiveDate = getDate(r, "effective_dt");
@@ -102,7 +103,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final String processingOwner = r.getString("processing_owner");
             final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
 
-            return new DefaultNotification(id, processingOwner, queueName, nextAvailableDate,
+            return new DefaultNotification(id, uuid, processingOwner, queueName, nextAvailableDate,
                     processingState, notificationKey, effectiveDate);
 
         }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 3c4c476..26e6c4e 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -22,7 +22,8 @@ import org.joda.time.DateTime;
 
 public class DefaultNotification implements Notification {
 
-    private final UUID id;
+    private final long id;
+    private final UUID uuid;
     private final String owner;
     private final String queueName;
     private final DateTime nextAvailableDate;
@@ -31,11 +32,12 @@ public class DefaultNotification implements Notification {
     private final DateTime effectiveDate;
 
 
-    public DefaultNotification(UUID id, String owner, String queueName, DateTime nextAvailableDate,
+    public DefaultNotification(long id, UUID uuid, String owner, String queueName, DateTime nextAvailableDate,
             NotificationLifecycleState lifecycleState,
             String notificationKey, DateTime effectiveDate) {
         super();
         this.id = id;
+        this.uuid = uuid;
         this.owner = owner;
         this.queueName = queueName;
         this.nextAvailableDate = nextAvailableDate;
@@ -44,13 +46,17 @@ public class DefaultNotification implements Notification {
         this.effectiveDate = effectiveDate;
     }
 
-    public DefaultNotification(String queueName, String notificationKey, DateTime effectiveDate) {
-        this(UUID.randomUUID(), null, queueName, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+    @Override
+    public long getId() {
+        return id;
     }
 
+    public DefaultNotification(String queueName, String notificationKey, DateTime effectiveDate) {
+        this(-1L, UUID.randomUUID(), null, queueName, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+    }
     @Override
-    public UUID getId() {
-        return id;
+    public UUID getUUID() {
+        return uuid;
     }
 
     @Override
@@ -101,4 +107,5 @@ public class DefaultNotification implements Notification {
 	public String getQueueName() {
 		return queueName;
 	}
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 3780ade..11c640a 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -17,14 +17,12 @@
 package com.ning.billing.util.notificationq;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.Transaction;
-import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
@@ -33,20 +31,32 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
 
     protected final NotificationSqlDao dao;
 
-    public DefaultNotificationQueue(final IDBI dbi, final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+    public DefaultNotificationQueue(final IDBI dbi, Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
         super(clock, svcName, queueName, handler, config);
         this.dao = dbi.onDemand(NotificationSqlDao.class);
     }
 
     @Override
     protected void doProcessEvents(final int sequenceId) {
+
+        logDebug("ENTER doProcessEvents");
         List<Notification> notifications = getReadyNotifications(sequenceId);
-        for (Notification cur : notifications) {
+        if (notifications.size() == 0) {
+            logDebug("EXIT doProcessEvents");
+            return;
+        }
+
+        logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
+
+        for (final Notification cur : notifications) {
             nbProcessedEvents.incrementAndGet();
+            logDebug("handling notification %s, key = %s for time %s",
+                    cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
             handler.handleReadyNotification(cur.getNotificationKey());
+            clearNotification(cur);
+            logDebug("done handling notification %s, key = %s for time %s",
+                    cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
         }
-        // If anything happens before we get to clear those notifications, somebody else will pick them up
-        clearNotifications(notifications);
     }
 
     @Override
@@ -58,24 +68,8 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
 
-    private void clearNotifications(final Collection<Notification> cleared) {
-
-        log.debug(String.format("NotificationQueue %s clearEventsReady START cleared size = %d",
-                getFullQName(),
-                cleared.size()));
-
-        dao.inTransaction(new Transaction<Void, NotificationSqlDao>() {
-
-            @Override
-            public Void inTransaction(NotificationSqlDao transactional,
-                    TransactionStatus status) throws Exception {
-                for (Notification cur : cleared) {
-                    transactional.clearNotification(cur.getId().toString(), hostname);
-                    log.debug(String.format("NotificationQueue %s cleared events %s", getFullQName(), cur.getId()));
-                }
-                return null;
-            }
-        });
+    private void clearNotification(final Notification cleared) {
+        dao.clearNotification(cleared.getId(), hostname);
     }
 
     private List<Notification> getReadyNotifications(final int seqId) {
@@ -83,28 +77,22 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
         final Date now = clock.getUTCNow().toDate();
         final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
 
-        log.debug(String.format("NotificationQueue %s getEventsReady START effectiveNow =  %s",  getFullQName(), now));
-
-        List<Notification> input = dao.inTransaction(new Transaction<List<Notification>, NotificationSqlDao>() {
-            @Override
-            public List<Notification> inTransaction(NotificationSqlDao transactionalDao,
-                    TransactionStatus status) throws Exception {
-                return transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents(), getFullQName());
-            }
-        });
+        List<Notification> input = dao.getReadyNotifications(now, config.getDaoMaxReadyEvents(), getFullQName());
 
         List<Notification> claimedNotifications = new ArrayList<Notification>();
         for (Notification cur : input) {
-            final boolean claimed = (dao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
+            logDebug("about to claim notification %s,  key = %s for time %s",
+                    cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
+            final boolean claimed = (dao.claimNotification(hostname, nextAvailable, cur.getId(), now) == 1);
+            logDebug("claimed notification %s, key = %s for time %s result = %s",
+                    cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate(), Boolean.valueOf(claimed));
             if (claimed) {
                 claimedNotifications.add(cur);
-                dao.insertClaimedHistory(seqId, hostname, now, cur.getId().toString());
+                dao.insertClaimedHistory(seqId, hostname, now, cur.getUUID().toString());
             }
         }
 
         for (Notification cur : claimedNotifications) {
-            log.debug(String.format("NotificationQueue %s claimed events %s",
-                    getFullQName(), cur.getId()));
             if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
                 log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
                         getFullQName(), cur, cur.getOwner()));
@@ -112,4 +100,11 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
         }
         return claimedNotifications;
     }
+
+    private void logDebug(String format, Object...args) {
+        if (log.isDebugEnabled()) {
+            String realDebug = String.format(format, args);
+            log.debug(String.format("Thread %d [queue = %s] %s", Thread.currentThread().getId(), getFullQName(), realDebug));
+        }
+    }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 91e7110..372d1f7 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -18,6 +18,7 @@ package com.ning.billing.util.notificationq;
 
 import org.skife.jdbi.v2.IDBI;
 import com.google.inject.Inject;
+import com.google.inject.name.Named;
 import com.ning.billing.util.clock.Clock;
 
 public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index 8749fa0..d59098b 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -23,13 +23,15 @@ import org.joda.time.DateTime;
 
 public interface Notification extends NotificationLifecycle {
 
-    public UUID getId();
+    public long getId();
+
+    public UUID getUUID();
 
     public String getNotificationKey();
 
     public DateTime getEffectiveDate();
-    
+
     public String getQueueName();
-    
+
 
 }
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/TagStoreSqlDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/TagStoreSqlDao.java
index 33dcdb0..bf4ed62 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/TagStoreSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/TagStoreSqlDao.java
@@ -16,38 +16,23 @@
 
 package com.ning.billing.util.tag.dao;
 
-import java.lang.annotation.Annotation;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.sql.ResultSet;
-import java.sql.SQLException;
+
 import java.util.List;
-import java.util.UUID;
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.SQLStatement;
-import org.skife.jdbi.v2.StatementContext;
+
 import org.skife.jdbi.v2.sqlobject.Bind;
-import org.skife.jdbi.v2.sqlobject.Binder;
-import org.skife.jdbi.v2.sqlobject.BinderFactory;
-import org.skife.jdbi.v2.sqlobject.BindingAnnotation;
 import org.skife.jdbi.v2.sqlobject.SqlBatch;
 import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
 import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
-import org.skife.jdbi.v2.tweak.ResultSetMapper;
 import com.ning.billing.util.entity.EntityCollectionDao;
-import com.ning.billing.util.tag.DescriptiveTag;
-import com.ning.billing.util.tag.DefaultTagDefinition;
 import com.ning.billing.util.tag.Tag;
-import com.ning.billing.util.tag.TagDefinition;
 
 @ExternalizedSqlViaStringTemplate3
 @RegisterMapper(TagMapper.class)
-public interface TagStoreSqlDao extends EntityCollectionDao<Tag> {
+public interface TagStoreSqlDao extends EntityCollectionDao<Tag>, Transactional<TagStoreSqlDao> {
     @Override
-    @SqlBatch
-    public void save(@Bind("objectId") final String objectId,
+    @SqlBatch(transactional=false)
+    public void batchSaveFromTransaction(@Bind("objectId") final String objectId,
                      @Bind("objectType") final String objectType,
                      @TagBinder final List<Tag> entities);
 }
\ No newline at end of file
diff --git a/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg b/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg
index 883f61b..9d3e96e 100644
--- a/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg
@@ -1,6 +1,6 @@
 group FieldStoreDao;
 
-save() ::= <<
+batchSaveFromTransaction() ::= <<
   INSERT INTO custom_fields(id, object_id, object_type, field_name, field_value)
   VALUES (:id, :objectId, :objectType, :fieldName, :fieldValue)
   ON DUPLICATE KEY UPDATE
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index cf2ceb2..a0ef302 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -48,9 +48,8 @@ CREATE TABLE notifications (
     processing_state varchar(14) DEFAULT 'AVAILABLE',
     PRIMARY KEY(id)
 ) ENGINE=innodb;
-CREATE INDEX  `idx_comp_where` ON notifications (`effective_dt`,`processing_state`,`processing_owner`,`processing_available_dt`);
-CREATE INDEX  `idx_update` ON notifications (`notification_id`,`processing_state`,`processing_owner`,`processing_available_dt`);
-CREATE INDEX  `idx_update1` ON notifications (`notification_id`,`processing_owner`);
+CREATE INDEX  `idx_comp_where` ON notifications (`effective_dt`, `queue_name`, `processing_state`,`processing_owner`,`processing_available_dt`);
+CREATE INDEX  `idx_update` ON notifications (`processing_state`,`processing_owner`,`processing_available_dt`);
 CREATE INDEX  `idx_get_ready` ON notifications (`effective_dt`,`created_dt`,`id`);
 
 DROP TABLE IF EXISTS claimed_notifications;
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index efe7e4f..7a7ecab 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -2,8 +2,9 @@ group NotificationSqlDao;
 
 getReadyNotifications(now, max) ::= <<
     select
-      notification_id
-    , notification_key
+      id
+      ,  notification_id
+      , notification_key
       , created_dt
       , effective_dt
       , queue_name
@@ -19,33 +20,31 @@ getReadyNotifications(now, max) ::= <<
     order by
       effective_dt asc
       , created_dt asc
-      , id asc
+      , id
     limit :max
     ;
 >>
 
 
-claimNotification(owner, next_available, notification_id, now) ::= <<
+claimNotification(owner, next_available, id, now) ::= <<
     update notifications
     set
       processing_owner = :owner
       , processing_available_dt = :next_available
       , processing_state = 'IN_PROCESSING'
     where
-      notification_id = :notification_id
+      id = :id
       and processing_state != 'PROCESSED'
       and (processing_owner IS NULL OR processing_available_dt \<= :now)
     ;
 >>
 
-clearNotification(notification_id, owner) ::= <<
+clearNotification(id, owner) ::= <<
     update notifications
     set
-      processing_owner = NULL
-      , processing_state = 'PROCESSED'
+      processing_state = 'PROCESSED'
     where
-      notification_id = :notification_id
-      and processing_owner = :owner
+      id = :id
     ;
 >>
 
diff --git a/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg
index 2d78f48..9d7ce5c 100644
--- a/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg
@@ -1,6 +1,6 @@
 group TagStoreDao;
 
-save() ::= <<
+batchSaveFromTransaction() ::= <<
   INSERT INTO tags(id, tag_definition_name, object_id, object_type, added_date, added_by)
   VALUES (:id, :tagDefinitionName, :objectId, :objectType, :addedDate, :addedBy)
   ON DUPLICATE KEY UPDATE
diff --git a/util/src/test/java/com/ning/billing/dbi/DBIProvider.java b/util/src/test/java/com/ning/billing/dbi/DBIProvider.java
index 07ee518..d83c033 100644
--- a/util/src/test/java/com/ning/billing/dbi/DBIProvider.java
+++ b/util/src/test/java/com/ning/billing/dbi/DBIProvider.java
@@ -55,7 +55,7 @@ public class DBIProvider implements Provider<IDBI>
         dbConfig.setMaxConnectionsPerPartition(config.getMaxActive());
         dbConfig.setConnectionTimeout(config.getConnectionTimeout().getPeriod(), config.getConnectionTimeout().getUnit());
         dbConfig.setPartitionCount(1);
-        dbConfig.setDefaultTransactionIsolation("READ_COMMITTED");
+        dbConfig.setDefaultTransactionIsolation("REPEATABLE_READ");
         dbConfig.setDisableJMX(false);
 
         final BoneCPDataSource ds = new BoneCPDataSource(dbConfig);
diff --git a/util/src/test/java/com/ning/billing/util/customfield/TestFieldStore.java b/util/src/test/java/com/ning/billing/util/customfield/TestFieldStore.java
index 9619bd8..4512a5d 100644
--- a/util/src/test/java/com/ning/billing/util/customfield/TestFieldStore.java
+++ b/util/src/test/java/com/ning/billing/util/customfield/TestFieldStore.java
@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.util.UUID;
 import org.apache.commons.io.IOUtils;
 import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -62,32 +64,48 @@ public class TestFieldStore {
 
     @Test
     public void testFieldStore() {
-        UUID id = UUID.randomUUID();
-        String objectType = "Test widget";
+        final UUID id = UUID.randomUUID();
+        final String objectType = "Test widget";
 
-        FieldStore fieldStore = new DefaultFieldStore(id, objectType);
+        final FieldStore fieldStore1 = new DefaultFieldStore(id, objectType);
 
         String fieldName = "TestField1";
         String fieldValue = "Kitty Hawk";
-        fieldStore.setValue(fieldName, fieldValue);
+        fieldStore1.setValue(fieldName, fieldValue);
 
         FieldStoreDao fieldStoreDao = dbi.onDemand(FieldStoreDao.class);
-        fieldStoreDao.save(id.toString(), objectType, fieldStore.getEntityList());
+        fieldStoreDao.inTransaction(new Transaction<Void, FieldStoreDao>() {
+            @Override
+            public Void inTransaction(FieldStoreDao transactional,
+                    TransactionStatus status) throws Exception {
+                transactional.batchSaveFromTransaction(id.toString(), objectType, fieldStore1.getEntityList());
+                return null;
+            }
+        });
 
-        fieldStore = DefaultFieldStore.create(id, objectType);
-        fieldStore.add(fieldStoreDao.load(id.toString(), objectType));
 
-        assertEquals(fieldStore.getValue(fieldName), fieldValue);
+        final FieldStore fieldStore2 = DefaultFieldStore.create(id, objectType);
+        fieldStore2.add(fieldStoreDao.load(id.toString(), objectType));
 
-        fieldValue = "Cape Canaveral";
-        fieldStore.setValue(fieldName, fieldValue);
-        assertEquals(fieldStore.getValue(fieldName), fieldValue);
-        fieldStoreDao.save(id.toString(), objectType, fieldStore.getEntityList());
-
-        fieldStore = DefaultFieldStore.create(id, objectType);
-        assertEquals(fieldStore.getValue(fieldName), null);
-        fieldStore.add(fieldStoreDao.load(id.toString(), objectType));
+        assertEquals(fieldStore2.getValue(fieldName), fieldValue);
 
-        assertEquals(fieldStore.getValue(fieldName), fieldValue);
+        fieldValue = "Cape Canaveral";
+        fieldStore2.setValue(fieldName, fieldValue);
+        assertEquals(fieldStore2.getValue(fieldName), fieldValue);
+        fieldStoreDao.inTransaction(new Transaction<Void, FieldStoreDao>() {
+            @Override
+            public Void inTransaction(FieldStoreDao transactional,
+                    TransactionStatus status) throws Exception {
+                transactional.batchSaveFromTransaction(id.toString(), objectType, fieldStore2.getEntityList());
+                return null;
+            }
+        });
+
+
+        final FieldStore fieldStore3 = DefaultFieldStore.create(id, objectType);
+        assertEquals(fieldStore3.getValue(fieldName), null);
+        fieldStore3.add(fieldStoreDao.load(id.toString(), objectType));
+
+        assertEquals(fieldStore3.getValue(fieldName), fieldValue);
     }
 }
diff --git a/util/src/test/java/com/ning/billing/util/globalLocker/TestMysqlGlobalLocker.java b/util/src/test/java/com/ning/billing/util/globalLocker/TestMysqlGlobalLocker.java
new file mode 100644
index 0000000..674efa3
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/globalLocker/TestMysqlGlobalLocker.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.globalLocker;
+
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.IDBI;
+import org.testng.annotations.Test;
+
+public class TestMysqlGlobalLocker {
+
+    // Used as a manual test to validate the simple DAO by stepping through that locking is done and release correctly
+    @Test(enabled=false)
+    public void testSimpleLocking() {
+        IDBI dbi = new DBI("jdbc:mysql://localhost:3306/killbill?createDatabaseIfNotExist=true", "root", "root");
+        GlobalLocker lock = new  MySqlGlobalLocker(dbi);
+        lock.lockWithNumberOfTries("test-lock", 3);
+        System.out.println("youpihh!");
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index d744caf..cb01e00 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -120,23 +120,23 @@ public class TestNotificationSqlDao {
         assertEquals(notification.getNextAvailableDate(), null);
 
         DateTime nextAvailable = now.plusMinutes(5);
-        int res = dao.claimNotification(ownerId, nextAvailable.toDate(), notification.getId().toString(), now.toDate());
+        int res = dao.claimNotification(ownerId, nextAvailable.toDate(), notification.getId(), now.toDate());
         assertEquals(res, 1);
-        dao.insertClaimedHistory(sequenceId.incrementAndGet(), ownerId, now.toDate(), notification.getId().toString());
+        dao.insertClaimedHistory(sequenceId.incrementAndGet(), ownerId, now.toDate(), notification.getUUID().toString());
 
-        notification = fetchNotification(notification.getId().toString());
+        notification = fetchNotification(notification.getUUID().toString());
         assertEquals(notification.getNotificationKey(), notificationKey);
         validateDate(notification.getEffectiveDate(), effDt);
         assertEquals(notification.getOwner().toString(), ownerId);
         assertEquals(notification.getProcessingState(), NotificationLifecycleState.IN_PROCESSING);
         validateDate(notification.getNextAvailableDate(), nextAvailable);
 
-        dao.clearNotification(notification.getId().toString(), ownerId);
+        dao.clearNotification(notification.getId(), ownerId);
 
-        notification = fetchNotification(notification.getId().toString());
+        notification = fetchNotification(notification.getUUID().toString());
         assertEquals(notification.getNotificationKey(), notificationKey);
         validateDate(notification.getEffectiveDate(), effDt);
-        assertEquals(notification.getOwner(), null);
+        //assertEquals(notification.getOwner(), null);
         assertEquals(notification.getProcessingState(), NotificationLifecycleState.PROCESSED);
         validateDate(notification.getNextAvailableDate(), nextAvailable);
 
@@ -148,7 +148,8 @@ public class TestNotificationSqlDao {
             @Override
             public Notification withHandle(Handle handle) throws Exception {
                 Notification res = handle.createQuery("   select" +
-                		" notification_id" +
+                        " id " +
+                		", notification_id" +
                 		", notification_key" +
                 		", created_dt" +
                 		", effective_dt" +
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 0fe65ec..0ebf68f 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -75,7 +75,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
             }
             for (Notification cur : readyNotifications) {
                 handler.handleReadyNotification(cur.getNotificationKey());
-                DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+                DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
                 oldNotifications.add(cur);
                 processedNotifications.add(processedNotification);
 
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 6db48d0..eac2d78 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -48,6 +48,8 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.google.inject.name.Names;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
@@ -57,8 +59,8 @@ import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 @Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
 public class TestNotificationQueue {
 	Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
-	@Inject
-	private IDBI dbi;
+    @Inject
+    private IDBI dbi;
 
 	@Inject
 	MysqlTestingHelper helper;
@@ -67,7 +69,7 @@ public class TestNotificationQueue {
 	private Clock clock;
 
 	private DummySqlTest dao;
-	
+
 	private int eventsReceived;
 
 	// private NotificationQueue queue;
@@ -107,7 +109,7 @@ public class TestNotificationQueue {
 	/**
 	 * Test that we can post a notification in the future from a transaction and get the notification
 	 * callback with the correct key when the time is ready
-	 * @throws Exception 
+	 * @throws Exception
 	 */
 	@Test(groups={"fast"}, enabled = true)
 	public void testSimpleNotification() throws Exception {
@@ -254,20 +256,20 @@ public class TestNotificationQueue {
 		assertEquals(success, true);
 
 	}
-	
+
 	/**
 	 * Test that we can post a notification in the future from a transaction and get the notification
 	 * callback with the correct key when the time is ready
-	 * @throws Exception 
+	 * @throws Exception
 	 */
 	@Test(groups={"fast"}, enabled = true)
 	public void testMultipleHandlerNotification() throws Exception {
 
 		final Map<String, Boolean> expectedNotificationsFred = new TreeMap<String, Boolean>();
 		final Map<String, Boolean> expectedNotificationsBarney = new TreeMap<String, Boolean>();
-		
-		NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock);
-		
+
+		NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi,  clock);
+
 		NotificationConfig config=new NotificationConfig() {
             @Override
             public boolean isNotificationProcessingOff() {
@@ -286,8 +288,8 @@ public class TestNotificationQueue {
                 return 60000;
             }
 		};
-		
-		
+
+
 		final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
                 @Override
                 public void handleReadyNotification(String notificationKey) {
@@ -297,7 +299,7 @@ public class TestNotificationQueue {
                 }
             },
             config);
-		
+
 		final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
             @Override
             public void handleReadyNotification(String notificationKey) {
@@ -310,8 +312,8 @@ public class TestNotificationQueue {
 
 		queueFred.startQueue();
 //		We don't start Barney so it can never pick up notifications
-		
-		
+
+
 		final UUID key = UUID.randomUUID();
 		final DummyObject obj = new DummyObject("foo", key);
 		final DateTime now = new DateTime();
@@ -323,7 +325,7 @@ public class TestNotificationQueue {
 			}
 		};
 
-		
+
 		final NotificationKey notificationKeyBarney = new NotificationKey() {
 			@Override
 			public String toString() {
@@ -372,7 +374,7 @@ public class TestNotificationQueue {
 
 		Assert.assertTrue(expectedNotificationsFred.get(notificationKeyFred.toString()));
 		Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney.toString()));
-		  
+
 	}
 
 	NotificationConfig getNotificationConfig(final boolean off,
@@ -408,6 +410,8 @@ public class TestNotificationQueue {
 			bind(MysqlTestingHelper.class).toInstance(helper);
 			IDBI dbi = helper.getDBI();
 			bind(IDBI.class).toInstance(dbi);
+			IDBI otherDbi = helper.getDBI();
+			bind(IDBI.class).annotatedWith(Names.named("global-lock")).toInstance(otherDbi);
 			/*
             bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
             final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
diff --git a/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java b/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java
index 633095f..9e21e05 100644
--- a/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java
+++ b/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java
@@ -21,6 +21,8 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.commons.io.IOUtils;
 import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -52,7 +54,7 @@ public class TestTagStore {
     private TagStoreModuleMock module;
     private TagStoreSqlDao tagStoreSqlDao;
     private TagDefinitionDao tagDefinitionDao;
-    private Logger log = LoggerFactory.getLogger(TestTagStore.class);
+    private final Logger log = LoggerFactory.getLogger(TestTagStore.class);
 
     @BeforeClass(alwaysRun = true)
     protected void setup() throws IOException {
@@ -86,6 +88,17 @@ public class TestTagStore {
         module.stopDb();
     }
 
+    private void saveTags(final TagStoreSqlDao dao, final String objectType, final String accountId, final List<Tag> tagList)  {
+        dao.inTransaction(new Transaction<Void, TagStoreSqlDao>() {
+            @Override
+            public Void inTransaction(TagStoreSqlDao transactional,
+                    TransactionStatus status) throws Exception {
+                dao.batchSaveFromTransaction(accountId.toString(), objectType, tagList);
+                return null;
+            }
+        });
+    }
+
     @Test
     public void testTagCreationAndRetrieval() {
         UUID accountId = UUID.randomUUID();
@@ -95,7 +108,7 @@ public class TestTagStore {
         tagStore.add(tag);
 
         TagStoreSqlDao dao = dbi.onDemand(TagStoreSqlDao.class);
-        dao.save(accountId.toString(), ACCOUNT_TYPE, tagStore.getEntityList());
+        saveTags(dao, ACCOUNT_TYPE, accountId.toString(), tagStore.getEntityList());
 
         List<Tag> savedTags = dao.load(accountId.toString(), ACCOUNT_TYPE);
         assertEquals(savedTags.size(), 1);
@@ -107,6 +120,7 @@ public class TestTagStore {
         assertEquals(savedTag.getId(), tag.getId());
     }
 
+
     @Test
     public void testControlTagCreation() {
         UUID accountId = UUID.randomUUID();
@@ -117,7 +131,7 @@ public class TestTagStore {
         assertEquals(tagStore.generateInvoice(), false);
 
         List<Tag> tagList = tagStore.getEntityList();
-        tagStoreSqlDao.save(accountId.toString(), ACCOUNT_TYPE, tagList);
+        saveTags(tagStoreSqlDao, ACCOUNT_TYPE, accountId.toString(), tagList);
 
         tagStore.clear();
         assertEquals(tagStore.getEntityList().size(), 0);
@@ -147,7 +161,7 @@ public class TestTagStore {
         assertEquals(tagStore.generateInvoice(), true);
 
         List<Tag> tagList = tagStore.getEntityList();
-        tagStoreSqlDao.save(accountId.toString(), ACCOUNT_TYPE, tagList);
+        saveTags(tagStoreSqlDao, ACCOUNT_TYPE, accountId.toString(), tagList);
 
         tagStore.clear();
         assertEquals(tagStore.getEntityList().size(), 0);
@@ -181,7 +195,7 @@ public class TestTagStore {
         assertEquals(tagStore.generateInvoice(), false);
 
         List<Tag> tagList = tagStore.getEntityList();
-        tagStoreSqlDao.save(accountId.toString(), ACCOUNT_TYPE, tagList);
+        saveTags(tagStoreSqlDao, ACCOUNT_TYPE, accountId.toString(), tagList);
 
         tagStore.clear();
         assertEquals(tagStore.getEntityList().size(), 0);
@@ -244,7 +258,8 @@ public class TestTagStore {
         Tag tag = new DescriptiveTag(tagDefinition, "test", clock.getUTCNow());
         tagStore.add(tag);
 
-        tagStoreSqlDao.save(objectId.toString(), objectType, tagStore.getEntityList());
+        saveTags(tagStoreSqlDao, objectType, objectId.toString(), tagStore.getEntityList());
+
         List<Tag> tags = tagStoreSqlDao.load(objectId.toString(), objectType);
         assertEquals(tags.size(), 1);
 
@@ -269,7 +284,8 @@ public class TestTagStore {
         Tag tag = new DescriptiveTag(tagDefinition, "test", clock.getUTCNow());
         tagStore.add(tag);
 
-        tagStoreSqlDao.save(objectId.toString(), objectType, tagStore.getEntityList());
+        saveTags(tagStoreSqlDao, objectType, objectId.toString(), tagStore.getEntityList());
+
         List<Tag> tags = tagStoreSqlDao.load(objectId.toString(), objectType);
         assertEquals(tags.size(), 1);